use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use dashmap::DashMap;
use parking_lot::RwLock;
use tokio::sync::{RwLock as AsyncRwLock, Semaphore};
use crate::{
parser::header::CassandraVersion,
platform::Platform,
schema::{SchemaManager, TableSchema},
storage::sstable::reader::SSTableReader,
Config, Error, Result, RowKey, Value,
};
#[derive(Debug, Clone)]
pub struct SSTableDataManagerConfig {
pub max_cache_size_mb: usize,
pub cache_ttl_seconds: u64,
pub max_concurrent_ops: usize,
pub enable_preloading: bool,
pub preload_batch_size: usize,
pub discovery_interval_seconds: u64,
pub enable_integrity_checks: bool,
}
impl Default for SSTableDataManagerConfig {
fn default() -> Self {
Self {
max_cache_size_mb: 512,
cache_ttl_seconds: 300, max_concurrent_ops: 10,
enable_preloading: true,
preload_batch_size: 1000,
discovery_interval_seconds: 30,
enable_integrity_checks: true,
}
}
}
#[derive(Debug, Clone)]
pub struct CachedDataEntry {
pub rows: Vec<DataRow>,
pub cached_at: Instant,
pub size_bytes: usize,
pub access_count: u64,
pub last_accessed: Instant,
}
#[derive(Debug, Clone)]
pub struct DataRow {
pub key: RowKey,
pub columns: HashMap<String, Value>,
pub metadata: RowMetadata,
}
#[derive(Debug, Clone)]
pub struct RowMetadata {
pub source_file: PathBuf,
pub write_time: Option<i64>,
pub ttl: Option<Duration>,
pub generation: u64,
}
#[derive(Debug, Clone)]
pub struct TableDiscovery {
pub keyspaces: Vec<KeyspaceInfo>,
pub total_sstables: usize,
pub discovery_time: Duration,
}
#[derive(Debug, Clone)]
pub struct KeyspaceInfo {
pub name: String,
pub tables: Vec<TableInfo>,
pub path: PathBuf,
}
#[derive(Debug, Clone)]
pub struct TableInfo {
pub name: String,
pub schema: Option<TableSchema>,
pub sstable_files: Vec<SSTableFileInfo>,
pub estimated_rows: usize,
pub total_size_bytes: u64,
pub last_modified: Option<std::time::SystemTime>,
}
#[derive(Debug, Clone)]
pub struct SSTableFileInfo {
pub path: PathBuf,
pub size_bytes: u64,
pub version: Option<CassandraVersion>,
pub compression: Option<String>,
pub estimated_rows: usize,
pub health_status: FileHealthStatus,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FileHealthStatus {
Healthy,
Degraded,
Corrupted,
AccessDenied,
}
#[allow(dead_code)]
pub struct SSTableDataManager {
config: SSTableDataManagerConfig,
platform: Arc<Platform>,
core_config: Config,
schema_manager: Arc<SchemaManager>,
data_cache: Arc<DashMap<String, CachedDataEntry>>,
discovered_tables: Arc<AsyncRwLock<HashMap<String, TableInfo>>>,
readers_pool: Arc<DashMap<PathBuf, Arc<SSTableReader>>>,
operation_semaphore: Arc<Semaphore>,
discovery_state: Arc<RwLock<DiscoveryState>>,
cache_stats: Arc<RwLock<CacheStatistics>>,
}
#[derive(Debug, Clone)]
struct DiscoveryState {
last_discovery: Option<Instant>,
discovery_in_progress: bool,
last_results: Option<TableDiscovery>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CacheStatistics {
pub cache_hits: u64,
pub cache_misses: u64,
pub current_cache_size_bytes: usize,
pub cache_entries: usize,
pub evictions: u64,
pub avg_access_time_micros: u64,
pub background_operations: u64,
}
impl SSTableDataManager {
pub async fn new(
config: SSTableDataManagerConfig,
platform: Arc<Platform>,
core_config: Config,
schema_manager: Arc<SchemaManager>,
) -> Result<Self> {
let operation_semaphore = Arc::new(Semaphore::new(config.max_concurrent_ops));
let manager = Self {
config,
platform,
core_config,
schema_manager,
data_cache: Arc::new(DashMap::new()),
discovered_tables: Arc::new(AsyncRwLock::new(HashMap::new())),
readers_pool: Arc::new(DashMap::new()),
operation_semaphore,
discovery_state: Arc::new(RwLock::new(DiscoveryState {
last_discovery: None,
discovery_in_progress: false,
last_results: None,
})),
cache_stats: Arc::new(RwLock::new(CacheStatistics {
cache_hits: 0,
cache_misses: 0,
current_cache_size_bytes: 0,
cache_entries: 0,
evictions: 0,
avg_access_time_micros: 0,
background_operations: 0,
})),
};
Ok(manager)
}
pub async fn discover_tables(&self, data_dir: &Path) -> Result<TableDiscovery> {
let _start_time = Instant::now();
{
let mut state = self.discovery_state.write();
if state.discovery_in_progress {
if let Some(ref results) = state.last_results {
return Ok(results.clone());
}
}
state.discovery_in_progress = true;
}
let discovery_result = self.perform_discovery(data_dir).await;
{
let mut state = self.discovery_state.write();
state.discovery_in_progress = false;
state.last_discovery = Some(Instant::now());
if let Ok(ref results) = discovery_result {
state.last_results = Some(results.clone());
}
}
discovery_result
}
async fn perform_discovery(&self, data_dir: &Path) -> Result<TableDiscovery> {
let start_time = Instant::now();
let mut keyspaces = Vec::new();
let mut total_sstables = 0;
let mut keyspace_entries = self.platform.fs().read_dir(data_dir).await.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to read data directory: {}",
e
)))
})?;
while let Some(entry) = keyspace_entries.next_entry().await.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Error reading directory entry: {}",
e
)))
})? {
let path = entry.path();
if path.is_dir() {
if let Some(keyspace_name) = path.file_name().and_then(|n| n.to_str()) {
if keyspace_name.starts_with('.') || keyspace_name == "system" {
continue;
}
if let Ok(keyspace_info) =
self.discover_keyspace_tables(&path, keyspace_name).await
{
total_sstables += keyspace_info
.tables
.iter()
.map(|t| t.sstable_files.len())
.sum::<usize>();
keyspaces.push(keyspace_info);
}
}
}
}
{
let mut discovered = self.discovered_tables.write().await;
discovered.clear();
for keyspace in &keyspaces {
for table in &keyspace.tables {
let full_name = format!("{}.{}", keyspace.name, table.name);
discovered.insert(full_name, table.clone());
}
}
}
Ok(TableDiscovery {
keyspaces,
total_sstables,
discovery_time: start_time.elapsed(),
})
}
async fn discover_keyspace_tables(
&self,
keyspace_path: &Path,
keyspace_name: &str,
) -> Result<KeyspaceInfo> {
let mut tables = Vec::new();
let mut table_entries = self
.platform
.fs()
.read_dir(keyspace_path)
.await
.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to read keyspace directory: {}",
e
)))
})?;
while let Some(entry) = table_entries.next_entry().await.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Error reading table entry: {}",
e
)))
})? {
let path = entry.path();
if path.is_dir() {
if let Some(table_name) = path.file_name().and_then(|n| n.to_str()) {
if let Ok(table_info) = self.discover_table_sstables(&path, table_name).await {
if !table_info.sstable_files.is_empty() {
tables.push(table_info);
}
}
}
}
}
Ok(KeyspaceInfo {
name: keyspace_name.to_string(),
tables,
path: keyspace_path.to_path_buf(),
})
}
async fn discover_table_sstables(
&self,
table_path: &Path,
table_name: &str,
) -> Result<TableInfo> {
let mut sstable_files = Vec::new();
let mut total_size_bytes = 0u64;
let mut last_modified = None;
let mut file_entries = self.platform.fs().read_dir(table_path).await.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to read table directory: {}",
e
)))
})?;
while let Some(entry) = file_entries.next_entry().await.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Error reading file entry: {}",
e
)))
})? {
let path = entry.path();
if let Some(extension) = path.extension() {
if extension == "db" {
let metadata = entry.metadata().await.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to get file metadata: {}",
e
)))
})?;
let size_bytes = metadata.len();
total_size_bytes += size_bytes;
if last_modified.is_none() || metadata.modified().ok() > last_modified {
last_modified = metadata.modified().ok();
}
let file_info = self.analyze_sstable_file(&path, size_bytes).await;
sstable_files.push(file_info);
}
}
}
let schema = self.load_table_schema(table_name).await.ok();
let estimated_rows = sstable_files.iter().map(|f| f.estimated_rows).sum();
Ok(TableInfo {
name: table_name.to_string(),
schema,
sstable_files,
estimated_rows,
total_size_bytes,
last_modified,
})
}
async fn analyze_sstable_file(&self, file_path: &Path, size_bytes: u64) -> SSTableFileInfo {
let mut file_info = SSTableFileInfo {
path: file_path.to_path_buf(),
size_bytes,
version: None,
compression: None,
estimated_rows: 0,
health_status: FileHealthStatus::Healthy,
};
if let Ok(reader) = self.get_or_create_reader(file_path).await {
let header = reader.header();
file_info.version = Some(header.cassandra_version);
file_info.compression = Some(header.compression.algorithm.clone());
file_info.estimated_rows = self.estimate_row_count(size_bytes, &reader).await;
if self.config.enable_integrity_checks {
file_info.health_status = self.check_file_health(&reader).await;
}
} else {
file_info.health_status = FileHealthStatus::Corrupted;
}
file_info
}
async fn get_or_create_reader(&self, file_path: &Path) -> Result<Arc<SSTableReader>> {
if let Some(reader) = self.readers_pool.get(file_path) {
return Ok(reader.clone());
}
let _permit = self
.operation_semaphore
.acquire()
.await
.map_err(|_| Error::Io(std::io::Error::other("Semaphore acquisition failed")))?;
if let Some(reader) = self.readers_pool.get(file_path) {
return Ok(reader.clone());
}
let reader = Arc::new(
SSTableReader::open(file_path, &self.core_config, self.platform.clone()).await?,
);
self.readers_pool
.insert(file_path.to_path_buf(), reader.clone());
Ok(reader)
}
pub async fn load_table_data(
&self,
keyspace: &str,
table: &str,
limit: Option<usize>,
) -> Result<Vec<DataRow>> {
let start_time = Instant::now();
let cache_key = format!("{}:{}", keyspace, table);
if let Some(cached) = self.data_cache.get(&cache_key) {
if !self.is_cache_expired(&cached) {
self.update_cache_stats(true, start_time.elapsed());
return Ok(cached.rows.clone());
}
}
let rows = self
.load_table_data_from_disk(keyspace, table, limit)
.await?;
let cache_entry = CachedDataEntry {
size_bytes: self.estimate_rows_size(&rows),
rows: rows.clone(),
cached_at: Instant::now(),
access_count: 1,
last_accessed: Instant::now(),
};
self.data_cache.insert(cache_key, cache_entry);
self.update_cache_stats(false, start_time.elapsed());
self.maybe_evict_cache().await;
Ok(rows)
}
async fn load_table_data_from_disk(
&self,
keyspace: &str,
table: &str,
limit: Option<usize>,
) -> Result<Vec<DataRow>> {
let full_table_name = format!("{}.{}", keyspace, table);
let table_info = {
let discovered = self.discovered_tables.read().await;
discovered.get(&full_table_name).cloned()
};
let table_info = table_info
.ok_or_else(|| Error::Table(format!("Table {}.{} not found", keyspace, table)))?;
let mut all_rows = Vec::new();
let mut loaded_count = 0;
for file_info in &table_info.sstable_files {
if file_info.health_status != FileHealthStatus::Healthy {
continue; }
let reader = self.get_or_create_reader(&file_info.path).await?;
let file_rows = self
.load_rows_from_reader(&reader, &table_info, limit)
.await?;
for row in file_rows {
all_rows.push(row);
loaded_count += 1;
if let Some(limit) = limit {
if loaded_count >= limit {
break;
}
}
}
if let Some(limit) = limit {
if loaded_count >= limit {
break;
}
}
}
Ok(all_rows)
}
async fn load_rows_from_reader(
&self,
reader: &SSTableReader,
_table_info: &TableInfo,
limit: Option<usize>,
) -> Result<Vec<DataRow>> {
let mut rows = Vec::new();
let all_entries = reader.get_all_entries().await?;
let entries_to_process = if let Some(lim) = limit {
all_entries.into_iter().take(lim).collect::<Vec<_>>()
} else {
all_entries
};
for (_table_id, row_key, value) in entries_to_process {
let columns = match value {
Value::Map(map_entries) => map_entries
.into_iter()
.filter_map(|(key, val)| match key {
Value::Text(column_name) => Some((column_name, val)),
_ => {
log::warn!(
"Unexpected map key type for row {:?}: {:?}, skipping column",
row_key,
key
);
None
}
})
.collect(),
Value::Null => {
log::debug!("Skipping null row for key: {:?}", row_key);
continue; }
_ => {
log::warn!(
"Expected Value::Map from SSTableReader, got {:?} for key: {:?}",
value,
row_key
);
HashMap::from([("value".to_string(), value)])
}
};
let metadata = RowMetadata {
source_file: reader.file_path.clone(),
write_time: None,
ttl: None,
generation: reader.generation,
};
rows.push(DataRow {
key: row_key,
columns,
metadata,
});
}
Ok(rows)
}
#[allow(dead_code)]
async fn convert_entry_to_row(
&self,
entry: crate::storage::sstable::bulletproof_reader::SSTableEntry,
table_info: &TableInfo,
source_file: &Path,
) -> Result<DataRow> {
let mut columns = HashMap::new();
if let Some(ref schema) = table_info.schema {
for (i, column) in schema.columns.iter().enumerate() {
if i < entry.values.len() {
let parsed_value =
self.parse_column_value(&entry.values[i], &column.data_type)?;
columns.insert(column.name.clone(), parsed_value);
}
}
} else {
for (i, value) in entry.values.iter().enumerate() {
columns.insert(format!("column_{}", i), value.clone());
}
}
let metadata = RowMetadata {
source_file: source_file.to_path_buf(),
write_time: entry.timestamp,
ttl: None, generation: entry.generation.unwrap_or(0),
};
Ok(DataRow {
key: entry.key,
columns,
metadata,
})
}
#[allow(dead_code)]
fn parse_column_value(&self, value: &Value, _data_type: &str) -> Result<Value> {
Ok(value.clone())
}
pub async fn query_data(
&self,
keyspace: &str,
table: &str,
where_clause: Option<&str>,
limit: Option<usize>,
) -> Result<Vec<DataRow>> {
let rows = self.load_table_data(keyspace, table, None).await?;
let filtered_rows = if let Some(_where_clause) = where_clause {
rows
} else {
rows
};
let final_rows = if let Some(limit) = limit {
filtered_rows.into_iter().take(limit).collect()
} else {
filtered_rows
};
Ok(final_rows)
}
pub async fn get_table_schema(
&self,
keyspace: &str,
table: &str,
) -> Result<Option<TableSchema>> {
let full_table_name = format!("{}.{}", keyspace, table);
let discovered = self.discovered_tables.read().await;
if let Some(table_info) = discovered.get(&full_table_name) {
Ok(table_info.schema.clone())
} else {
Ok(None)
}
}
pub async fn list_keyspaces(&self) -> Result<Vec<String>> {
let discovered = self.discovered_tables.read().await;
let mut keyspaces: Vec<String> = discovered
.keys()
.map(|full_name| full_name.split('.').next().unwrap_or("").to_string())
.collect();
keyspaces.sort();
keyspaces.dedup();
Ok(keyspaces)
}
pub async fn list_tables(&self, keyspace: &str) -> Result<Vec<String>> {
let discovered = self.discovered_tables.read().await;
let tables: Vec<String> = discovered
.keys()
.filter_map(|full_name| {
let parts: Vec<&str> = full_name.split('.').collect();
if parts.len() == 2 && parts[0] == keyspace {
Some(parts[1].to_string())
} else {
None
}
})
.collect();
Ok(tables)
}
pub fn get_cache_stats(&self) -> CacheStatistics {
self.cache_stats.read().clone()
}
pub fn get_discovery_status(&self) -> (bool, Option<Duration>) {
let state = self.discovery_state.read();
let time_since_last = state.last_discovery.map(|last| last.elapsed());
(state.discovery_in_progress, time_since_last)
}
async fn load_table_schema(&self, table_name: &str) -> Result<TableSchema> {
self.schema_manager.get_table_schema(table_name).await
}
async fn estimate_row_count(&self, file_size_bytes: u64, _reader: &SSTableReader) -> usize {
let estimated_avg_row_size = 256; (file_size_bytes / estimated_avg_row_size) as usize
}
async fn check_file_health(&self, reader: &SSTableReader) -> FileHealthStatus {
if reader.file_path.exists() {
FileHealthStatus::Healthy
} else {
FileHealthStatus::AccessDenied
}
}
fn is_cache_expired(&self, entry: &CachedDataEntry) -> bool {
let ttl = Duration::from_secs(self.config.cache_ttl_seconds);
entry.cached_at.elapsed() > ttl
}
fn estimate_rows_size(&self, rows: &[DataRow]) -> usize {
rows.len() * 256 }
fn update_cache_stats(&self, hit: bool, access_time: Duration) {
let mut stats = self.cache_stats.write();
if hit {
stats.cache_hits += 1;
} else {
stats.cache_misses += 1;
}
let new_time_micros = access_time.as_micros() as u64;
stats.avg_access_time_micros = (stats.avg_access_time_micros + new_time_micros) / 2;
stats.cache_entries = self.data_cache.len();
stats.current_cache_size_bytes = self.data_cache.iter().map(|entry| entry.size_bytes).sum();
}
async fn maybe_evict_cache(&self) {
let max_size_bytes = self.config.max_cache_size_mb * 1024 * 1024;
let current_size: usize = self.data_cache.iter().map(|entry| entry.size_bytes).sum();
if current_size > max_size_bytes {
self.evict_lru_entries(current_size - max_size_bytes).await;
}
}
async fn evict_lru_entries(&self, bytes_to_evict: usize) {
let mut entries_to_remove = Vec::new();
let mut bytes_evicted = 0;
let mut sorted_entries: Vec<_> = self
.data_cache
.iter()
.map(|entry| (entry.key().clone(), entry.last_accessed, entry.size_bytes))
.collect();
sorted_entries.sort_by_key(|(_, last_accessed, _)| *last_accessed);
for (key, _, size) in sorted_entries {
entries_to_remove.push(key);
bytes_evicted += size;
if bytes_evicted >= bytes_to_evict {
break;
}
}
for key in entries_to_remove {
self.data_cache.remove(&key);
let mut stats = self.cache_stats.write();
stats.evictions += 1;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_data_manager_creation() {
let temp_dir = TempDir::new().unwrap();
let config = SSTableDataManagerConfig::default();
let core_config = Config::default();
let platform = Arc::new(Platform::new(&core_config).await.unwrap());
let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
let manager = SSTableDataManager::new(config, platform, core_config, schema_manager)
.await
.unwrap();
let stats = manager.get_cache_stats();
assert_eq!(stats.cache_entries, 0);
assert_eq!(stats.cache_hits, 0);
}
#[tokio::test]
async fn test_cache_statistics() {
let temp_dir = TempDir::new().unwrap();
let config = SSTableDataManagerConfig::default();
let core_config = Config::default();
let platform = Arc::new(Platform::new(&core_config).await.unwrap());
let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
let manager = SSTableDataManager::new(config, platform, core_config, schema_manager)
.await
.unwrap();
let stats = manager.get_cache_stats();
assert_eq!(stats.cache_hits, 0);
assert_eq!(stats.cache_misses, 0);
let (in_progress, last_discovery) = manager.get_discovery_status();
assert!(!in_progress);
assert!(last_discovery.is_none());
}
}