1use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use tokio::sync::{RwLock as AsyncRwLock, Semaphore};
15
16use crate::{
17 parser::header::CassandraVersion,
18 platform::Platform,
19 schema::{SchemaManager, TableSchema},
20 storage::sstable::reader::SSTableReader,
21 Config, Error, Result, RowKey, Value,
22};
23
24#[derive(Debug, Clone)]
26pub struct SSTableDataManagerConfig {
27 pub max_cache_size_mb: usize,
29 pub cache_ttl_seconds: u64,
31 pub max_concurrent_ops: usize,
33 pub enable_preloading: bool,
35 pub preload_batch_size: usize,
37 pub discovery_interval_seconds: u64,
39 pub enable_integrity_checks: bool,
41}
42
43impl Default for SSTableDataManagerConfig {
44 fn default() -> Self {
45 Self {
46 max_cache_size_mb: 512,
47 cache_ttl_seconds: 300, max_concurrent_ops: 10,
49 enable_preloading: true,
50 preload_batch_size: 1000,
51 discovery_interval_seconds: 30,
52 enable_integrity_checks: true,
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
59pub struct CachedDataEntry {
60 pub rows: Vec<DataRow>,
62 pub cached_at: Instant,
64 pub size_bytes: usize,
66 pub access_count: u64,
68 pub last_accessed: Instant,
70}
71
72#[derive(Debug, Clone)]
74pub struct DataRow {
75 pub key: RowKey,
77 pub columns: HashMap<String, Value>,
79 pub metadata: RowMetadata,
81}
82
83#[derive(Debug, Clone)]
85pub struct RowMetadata {
86 pub source_file: PathBuf,
88 pub write_time: Option<i64>,
90 pub ttl: Option<Duration>,
92 pub generation: u64,
94}
95
96#[derive(Debug, Clone)]
98pub struct TableDiscovery {
99 pub keyspaces: Vec<KeyspaceInfo>,
101 pub total_sstables: usize,
103 pub discovery_time: Duration,
105}
106
107#[derive(Debug, Clone)]
109pub struct KeyspaceInfo {
110 pub name: String,
112 pub tables: Vec<TableInfo>,
114 pub path: PathBuf,
116}
117
118#[derive(Debug, Clone)]
120pub struct TableInfo {
121 pub name: String,
123 pub schema: Option<TableSchema>,
125 pub sstable_files: Vec<SSTableFileInfo>,
127 pub estimated_rows: usize,
129 pub total_size_bytes: u64,
131 pub last_modified: Option<std::time::SystemTime>,
133}
134
135#[derive(Debug, Clone)]
137pub struct SSTableFileInfo {
138 pub path: PathBuf,
140 pub size_bytes: u64,
142 pub version: Option<CassandraVersion>,
144 pub compression: Option<String>,
146 pub estimated_rows: usize,
148 pub health_status: FileHealthStatus,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq)]
154pub enum FileHealthStatus {
155 Healthy,
157 Degraded,
159 Corrupted,
161 AccessDenied,
163}
164
165#[allow(dead_code)]
167pub struct SSTableDataManager {
168 config: SSTableDataManagerConfig,
170 platform: Arc<Platform>,
172 core_config: Config,
174 schema_manager: Arc<SchemaManager>,
176 data_cache: Arc<DashMap<String, CachedDataEntry>>,
178 discovered_tables: Arc<AsyncRwLock<HashMap<String, TableInfo>>>,
180 readers_pool: Arc<DashMap<PathBuf, Arc<SSTableReader>>>,
182 operation_semaphore: Arc<Semaphore>,
184 discovery_state: Arc<RwLock<DiscoveryState>>,
186 cache_stats: Arc<RwLock<CacheStatistics>>,
188}
189
190#[derive(Debug, Clone)]
192struct DiscoveryState {
193 last_discovery: Option<Instant>,
195 discovery_in_progress: bool,
197 last_results: Option<TableDiscovery>,
199}
200
201#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
203pub struct CacheStatistics {
204 pub cache_hits: u64,
206 pub cache_misses: u64,
208 pub current_cache_size_bytes: usize,
210 pub cache_entries: usize,
212 pub evictions: u64,
214 pub avg_access_time_micros: u64,
216 pub background_operations: u64,
218}
219
220impl SSTableDataManager {
221 pub async fn new(
223 config: SSTableDataManagerConfig,
224 platform: Arc<Platform>,
225 core_config: Config,
226 schema_manager: Arc<SchemaManager>,
227 ) -> Result<Self> {
228 let operation_semaphore = Arc::new(Semaphore::new(config.max_concurrent_ops));
229
230 let manager = Self {
231 config,
232 platform,
233 core_config,
234 schema_manager,
235 data_cache: Arc::new(DashMap::new()),
236 discovered_tables: Arc::new(AsyncRwLock::new(HashMap::new())),
237 readers_pool: Arc::new(DashMap::new()),
238 operation_semaphore,
239 discovery_state: Arc::new(RwLock::new(DiscoveryState {
240 last_discovery: None,
241 discovery_in_progress: false,
242 last_results: None,
243 })),
244 cache_stats: Arc::new(RwLock::new(CacheStatistics {
245 cache_hits: 0,
246 cache_misses: 0,
247 current_cache_size_bytes: 0,
248 cache_entries: 0,
249 evictions: 0,
250 avg_access_time_micros: 0,
251 background_operations: 0,
252 })),
253 };
254
255 Ok(manager)
256 }
257
258 pub async fn discover_tables(&self, data_dir: &Path) -> Result<TableDiscovery> {
260 let _start_time = Instant::now();
261
262 {
264 let mut state = self.discovery_state.write();
265 if state.discovery_in_progress {
266 if let Some(ref results) = state.last_results {
268 return Ok(results.clone());
269 }
270 }
271 state.discovery_in_progress = true;
272 }
273
274 let discovery_result = self.perform_discovery(data_dir).await;
275
276 {
278 let mut state = self.discovery_state.write();
279 state.discovery_in_progress = false;
280 state.last_discovery = Some(Instant::now());
281 if let Ok(ref results) = discovery_result {
282 state.last_results = Some(results.clone());
283 }
284 }
285
286 discovery_result
287 }
288
289 async fn perform_discovery(&self, data_dir: &Path) -> Result<TableDiscovery> {
291 let start_time = Instant::now();
292 let mut keyspaces = Vec::new();
293 let mut total_sstables = 0;
294
295 let mut keyspace_entries = self.platform.fs().read_dir(data_dir).await.map_err(|e| {
297 Error::Io(std::io::Error::other(format!(
298 "Failed to read data directory: {}",
299 e
300 )))
301 })?;
302
303 while let Some(entry) = keyspace_entries.next_entry().await.map_err(|e| {
304 Error::Io(std::io::Error::other(format!(
305 "Error reading directory entry: {}",
306 e
307 )))
308 })? {
309 let path = entry.path();
310 if path.is_dir() {
311 if let Some(keyspace_name) = path.file_name().and_then(|n| n.to_str()) {
312 if keyspace_name.starts_with('.') || keyspace_name == "system" {
314 continue;
315 }
316
317 if let Ok(keyspace_info) =
318 self.discover_keyspace_tables(&path, keyspace_name).await
319 {
320 total_sstables += keyspace_info
321 .tables
322 .iter()
323 .map(|t| t.sstable_files.len())
324 .sum::<usize>();
325 keyspaces.push(keyspace_info);
326 }
327 }
328 }
329 }
330
331 {
333 let mut discovered = self.discovered_tables.write().await;
334 discovered.clear();
335
336 for keyspace in &keyspaces {
337 for table in &keyspace.tables {
338 let full_name = format!("{}.{}", keyspace.name, table.name);
339 discovered.insert(full_name, table.clone());
340 }
341 }
342 }
343
344 Ok(TableDiscovery {
345 keyspaces,
346 total_sstables,
347 discovery_time: start_time.elapsed(),
348 })
349 }
350
351 async fn discover_keyspace_tables(
353 &self,
354 keyspace_path: &Path,
355 keyspace_name: &str,
356 ) -> Result<KeyspaceInfo> {
357 let mut tables = Vec::new();
358
359 let mut table_entries = self
360 .platform
361 .fs()
362 .read_dir(keyspace_path)
363 .await
364 .map_err(|e| {
365 Error::Io(std::io::Error::other(format!(
366 "Failed to read keyspace directory: {}",
367 e
368 )))
369 })?;
370
371 while let Some(entry) = table_entries.next_entry().await.map_err(|e| {
372 Error::Io(std::io::Error::other(format!(
373 "Error reading table entry: {}",
374 e
375 )))
376 })? {
377 let path = entry.path();
378 if path.is_dir() {
379 if let Some(table_name) = path.file_name().and_then(|n| n.to_str()) {
380 if let Ok(table_info) = self.discover_table_sstables(&path, table_name).await {
382 if !table_info.sstable_files.is_empty() {
383 tables.push(table_info);
384 }
385 }
386 }
387 }
388 }
389
390 Ok(KeyspaceInfo {
391 name: keyspace_name.to_string(),
392 tables,
393 path: keyspace_path.to_path_buf(),
394 })
395 }
396
397 async fn discover_table_sstables(
399 &self,
400 table_path: &Path,
401 table_name: &str,
402 ) -> Result<TableInfo> {
403 let mut sstable_files = Vec::new();
404 let mut total_size_bytes = 0u64;
405 let mut last_modified = None;
406
407 let mut file_entries = self.platform.fs().read_dir(table_path).await.map_err(|e| {
408 Error::Io(std::io::Error::other(format!(
409 "Failed to read table directory: {}",
410 e
411 )))
412 })?;
413
414 while let Some(entry) = file_entries.next_entry().await.map_err(|e| {
415 Error::Io(std::io::Error::other(format!(
416 "Error reading file entry: {}",
417 e
418 )))
419 })? {
420 let path = entry.path();
421 if let Some(extension) = path.extension() {
422 if extension == "db" {
423 let metadata = entry.metadata().await.map_err(|e| {
425 Error::Io(std::io::Error::other(format!(
426 "Failed to get file metadata: {}",
427 e
428 )))
429 })?;
430
431 let size_bytes = metadata.len();
432 total_size_bytes += size_bytes;
433
434 if last_modified.is_none() || metadata.modified().ok() > last_modified {
435 last_modified = metadata.modified().ok();
436 }
437
438 let file_info = self.analyze_sstable_file(&path, size_bytes).await;
439 sstable_files.push(file_info);
440 }
441 }
442 }
443
444 let schema = self.load_table_schema(table_name).await.ok();
446
447 let estimated_rows = sstable_files.iter().map(|f| f.estimated_rows).sum();
449
450 Ok(TableInfo {
451 name: table_name.to_string(),
452 schema,
453 sstable_files,
454 estimated_rows,
455 total_size_bytes,
456 last_modified,
457 })
458 }
459
460 async fn analyze_sstable_file(&self, file_path: &Path, size_bytes: u64) -> SSTableFileInfo {
462 let mut file_info = SSTableFileInfo {
463 path: file_path.to_path_buf(),
464 size_bytes,
465 version: None,
466 compression: None,
467 estimated_rows: 0,
468 health_status: FileHealthStatus::Healthy,
469 };
470
471 if let Ok(reader) = self.get_or_create_reader(file_path).await {
473 let header = reader.header();
475 file_info.version = Some(header.cassandra_version);
476 file_info.compression = Some(header.compression.algorithm.clone());
477
478 file_info.estimated_rows = self.estimate_row_count(size_bytes, &reader).await;
480
481 if self.config.enable_integrity_checks {
483 file_info.health_status = self.check_file_health(&reader).await;
484 }
485 } else {
486 file_info.health_status = FileHealthStatus::Corrupted;
487 }
488
489 file_info
490 }
491
492 async fn get_or_create_reader(&self, file_path: &Path) -> Result<Arc<SSTableReader>> {
494 if let Some(reader) = self.readers_pool.get(file_path) {
495 return Ok(reader.clone());
496 }
497
498 let _permit = self
499 .operation_semaphore
500 .acquire()
501 .await
502 .map_err(|_| Error::Io(std::io::Error::other("Semaphore acquisition failed")))?;
503
504 if let Some(reader) = self.readers_pool.get(file_path) {
506 return Ok(reader.clone());
507 }
508
509 let reader = Arc::new(
510 SSTableReader::open(file_path, &self.core_config, self.platform.clone()).await?,
511 );
512
513 self.readers_pool
514 .insert(file_path.to_path_buf(), reader.clone());
515 Ok(reader)
516 }
517
518 pub async fn load_table_data(
520 &self,
521 keyspace: &str,
522 table: &str,
523 limit: Option<usize>,
524 ) -> Result<Vec<DataRow>> {
525 let start_time = Instant::now();
526 let cache_key = format!("{}:{}", keyspace, table);
527
528 if let Some(cached) = self.data_cache.get(&cache_key) {
530 if !self.is_cache_expired(&cached) {
531 self.update_cache_stats(true, start_time.elapsed());
532 return Ok(cached.rows.clone());
533 }
534 }
535
536 let rows = self
538 .load_table_data_from_disk(keyspace, table, limit)
539 .await?;
540
541 let cache_entry = CachedDataEntry {
543 size_bytes: self.estimate_rows_size(&rows),
544 rows: rows.clone(),
545 cached_at: Instant::now(),
546 access_count: 1,
547 last_accessed: Instant::now(),
548 };
549
550 self.data_cache.insert(cache_key, cache_entry);
551 self.update_cache_stats(false, start_time.elapsed());
552 self.maybe_evict_cache().await;
553
554 Ok(rows)
555 }
556
557 async fn load_table_data_from_disk(
559 &self,
560 keyspace: &str,
561 table: &str,
562 limit: Option<usize>,
563 ) -> Result<Vec<DataRow>> {
564 let full_table_name = format!("{}.{}", keyspace, table);
565
566 let table_info = {
568 let discovered = self.discovered_tables.read().await;
569 discovered.get(&full_table_name).cloned()
570 };
571
572 let table_info = table_info
573 .ok_or_else(|| Error::Table(format!("Table {}.{} not found", keyspace, table)))?;
574
575 let mut all_rows = Vec::new();
576 let mut loaded_count = 0;
577
578 for file_info in &table_info.sstable_files {
580 if file_info.health_status != FileHealthStatus::Healthy {
581 continue; }
583
584 let reader = self.get_or_create_reader(&file_info.path).await?;
585 let file_rows = self
586 .load_rows_from_reader(&reader, &table_info, limit)
587 .await?;
588
589 for row in file_rows {
590 all_rows.push(row);
591 loaded_count += 1;
592
593 if let Some(limit) = limit {
594 if loaded_count >= limit {
595 break;
596 }
597 }
598 }
599
600 if let Some(limit) = limit {
601 if loaded_count >= limit {
602 break;
603 }
604 }
605 }
606
607 Ok(all_rows)
608 }
609
610 async fn load_rows_from_reader(
623 &self,
624 reader: &SSTableReader,
625 _table_info: &TableInfo,
626 limit: Option<usize>,
627 ) -> Result<Vec<DataRow>> {
628 let mut rows = Vec::new();
629
630 let all_entries = reader.get_all_entries().await?;
633 let entries_to_process = if let Some(lim) = limit {
634 all_entries.into_iter().take(lim).collect::<Vec<_>>()
635 } else {
636 all_entries
637 };
638
639 for (_table_id, row_key, value) in entries_to_process {
640 let columns = match value {
645 Value::Map(map_entries) => map_entries
646 .into_iter()
647 .filter_map(|(key, val)| match key {
648 Value::Text(column_name) => Some((column_name, val)),
649 _ => {
650 log::warn!(
651 "Unexpected map key type for row {:?}: {:?}, skipping column",
652 row_key,
653 key
654 );
655 None
656 }
657 })
658 .collect(),
659 Value::Null => {
660 log::debug!("Skipping null row for key: {:?}", row_key);
662 continue; }
664 _ => {
665 log::warn!(
667 "Expected Value::Map from SSTableReader, got {:?} for key: {:?}",
668 value,
669 row_key
670 );
671 HashMap::from([("value".to_string(), value)])
673 }
674 };
675
676 let metadata = RowMetadata {
677 source_file: reader.file_path.clone(),
678 write_time: None,
679 ttl: None,
680 generation: reader.generation,
681 };
682
683 rows.push(DataRow {
684 key: row_key,
685 columns,
686 metadata,
687 });
688 }
689
690 Ok(rows)
691 }
692
693 #[allow(dead_code)]
696 async fn convert_entry_to_row(
697 &self,
698 entry: crate::storage::sstable::bulletproof_reader::SSTableEntry,
699 table_info: &TableInfo,
700 source_file: &Path,
701 ) -> Result<DataRow> {
702 let mut columns = HashMap::new();
703
704 if let Some(ref schema) = table_info.schema {
706 for (i, column) in schema.columns.iter().enumerate() {
707 if i < entry.values.len() {
708 let parsed_value =
709 self.parse_column_value(&entry.values[i], &column.data_type)?;
710 columns.insert(column.name.clone(), parsed_value);
711 }
712 }
713 } else {
714 for (i, value) in entry.values.iter().enumerate() {
716 columns.insert(format!("column_{}", i), value.clone());
717 }
718 }
719
720 let metadata = RowMetadata {
721 source_file: source_file.to_path_buf(),
722 write_time: entry.timestamp,
723 ttl: None, generation: entry.generation.unwrap_or(0),
725 };
726
727 Ok(DataRow {
728 key: entry.key,
729 columns,
730 metadata,
731 })
732 }
733
734 #[allow(dead_code)]
737 fn parse_column_value(&self, value: &Value, _data_type: &str) -> Result<Value> {
738 Ok(value.clone())
741 }
742
743 pub async fn query_data(
745 &self,
746 keyspace: &str,
747 table: &str,
748 where_clause: Option<&str>,
749 limit: Option<usize>,
750 ) -> Result<Vec<DataRow>> {
751 let rows = self.load_table_data(keyspace, table, None).await?;
752
753 let filtered_rows = if let Some(_where_clause) = where_clause {
755 rows
757 } else {
758 rows
759 };
760
761 let final_rows = if let Some(limit) = limit {
763 filtered_rows.into_iter().take(limit).collect()
764 } else {
765 filtered_rows
766 };
767
768 Ok(final_rows)
769 }
770
771 pub async fn get_table_schema(
773 &self,
774 keyspace: &str,
775 table: &str,
776 ) -> Result<Option<TableSchema>> {
777 let full_table_name = format!("{}.{}", keyspace, table);
778 let discovered = self.discovered_tables.read().await;
779
780 if let Some(table_info) = discovered.get(&full_table_name) {
781 Ok(table_info.schema.clone())
782 } else {
783 Ok(None)
784 }
785 }
786
787 pub async fn list_keyspaces(&self) -> Result<Vec<String>> {
789 let discovered = self.discovered_tables.read().await;
790 let mut keyspaces: Vec<String> = discovered
791 .keys()
792 .map(|full_name| full_name.split('.').next().unwrap_or("").to_string())
793 .collect();
794
795 keyspaces.sort();
796 keyspaces.dedup();
797 Ok(keyspaces)
798 }
799
800 pub async fn list_tables(&self, keyspace: &str) -> Result<Vec<String>> {
802 let discovered = self.discovered_tables.read().await;
803 let tables: Vec<String> = discovered
804 .keys()
805 .filter_map(|full_name| {
806 let parts: Vec<&str> = full_name.split('.').collect();
807 if parts.len() == 2 && parts[0] == keyspace {
808 Some(parts[1].to_string())
809 } else {
810 None
811 }
812 })
813 .collect();
814
815 Ok(tables)
816 }
817
818 pub fn get_cache_stats(&self) -> CacheStatistics {
820 self.cache_stats.read().clone()
821 }
822
823 pub fn get_discovery_status(&self) -> (bool, Option<Duration>) {
825 let state = self.discovery_state.read();
826 let time_since_last = state.last_discovery.map(|last| last.elapsed());
827 (state.discovery_in_progress, time_since_last)
828 }
829
830 async fn load_table_schema(&self, table_name: &str) -> Result<TableSchema> {
833 self.schema_manager.get_table_schema(table_name).await
835 }
836
837 async fn estimate_row_count(&self, file_size_bytes: u64, _reader: &SSTableReader) -> usize {
838 let estimated_avg_row_size = 256; (file_size_bytes / estimated_avg_row_size) as usize
842 }
843
844 async fn check_file_health(&self, reader: &SSTableReader) -> FileHealthStatus {
845 if reader.file_path.exists() {
849 FileHealthStatus::Healthy
850 } else {
851 FileHealthStatus::AccessDenied
852 }
853 }
854
855 fn is_cache_expired(&self, entry: &CachedDataEntry) -> bool {
856 let ttl = Duration::from_secs(self.config.cache_ttl_seconds);
857 entry.cached_at.elapsed() > ttl
858 }
859
860 fn estimate_rows_size(&self, rows: &[DataRow]) -> usize {
861 rows.len() * 256 }
864
865 fn update_cache_stats(&self, hit: bool, access_time: Duration) {
866 let mut stats = self.cache_stats.write();
867 if hit {
868 stats.cache_hits += 1;
869 } else {
870 stats.cache_misses += 1;
871 }
872
873 let new_time_micros = access_time.as_micros() as u64;
875 stats.avg_access_time_micros = (stats.avg_access_time_micros + new_time_micros) / 2;
876
877 stats.cache_entries = self.data_cache.len();
878 stats.current_cache_size_bytes = self.data_cache.iter().map(|entry| entry.size_bytes).sum();
879 }
880
881 async fn maybe_evict_cache(&self) {
882 let max_size_bytes = self.config.max_cache_size_mb * 1024 * 1024;
883 let current_size: usize = self.data_cache.iter().map(|entry| entry.size_bytes).sum();
884
885 if current_size > max_size_bytes {
886 self.evict_lru_entries(current_size - max_size_bytes).await;
887 }
888 }
889
890 async fn evict_lru_entries(&self, bytes_to_evict: usize) {
891 let mut entries_to_remove = Vec::new();
892 let mut bytes_evicted = 0;
893
894 let mut sorted_entries: Vec<_> = self
896 .data_cache
897 .iter()
898 .map(|entry| (entry.key().clone(), entry.last_accessed, entry.size_bytes))
899 .collect();
900
901 sorted_entries.sort_by_key(|(_, last_accessed, _)| *last_accessed);
902
903 for (key, _, size) in sorted_entries {
904 entries_to_remove.push(key);
905 bytes_evicted += size;
906
907 if bytes_evicted >= bytes_to_evict {
908 break;
909 }
910 }
911
912 for key in entries_to_remove {
914 self.data_cache.remove(&key);
915 let mut stats = self.cache_stats.write();
916 stats.evictions += 1;
917 }
918 }
919}
920
921#[cfg(test)]
922mod tests {
923 use super::*;
924 use tempfile::TempDir;
925
926 #[tokio::test]
927 async fn test_data_manager_creation() {
928 let temp_dir = TempDir::new().unwrap();
929 let config = SSTableDataManagerConfig::default();
930 let core_config = Config::default();
931 let platform = Arc::new(Platform::new(&core_config).await.unwrap());
932 let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
933
934 let manager = SSTableDataManager::new(config, platform, core_config, schema_manager)
935 .await
936 .unwrap();
937
938 let stats = manager.get_cache_stats();
939 assert_eq!(stats.cache_entries, 0);
940 assert_eq!(stats.cache_hits, 0);
941 }
942
943 #[tokio::test]
944 async fn test_cache_statistics() {
945 let temp_dir = TempDir::new().unwrap();
946 let config = SSTableDataManagerConfig::default();
947 let core_config = Config::default();
948 let platform = Arc::new(Platform::new(&core_config).await.unwrap());
949 let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
950
951 let manager = SSTableDataManager::new(config, platform, core_config, schema_manager)
952 .await
953 .unwrap();
954
955 let stats = manager.get_cache_stats();
957 assert_eq!(stats.cache_hits, 0);
958 assert_eq!(stats.cache_misses, 0);
959
960 let (in_progress, last_discovery) = manager.get_discovery_status();
962 assert!(!in_progress);
963 assert!(last_discovery.is_none());
964 }
965}