Skip to main content

cqlite_core/storage/
sstable_data_manager.rs

1//! SSTable Data Loading and Caching System
2//!
3//! This module provides efficient data loading, caching, and access for the REPL system.
4//! It integrates with existing SSTable parsers and provides high-performance data access
5//! for interactive queries and exploration.
6
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use tokio::sync::{RwLock as AsyncRwLock, Semaphore};
15
16use crate::{
17    parser::header::CassandraVersion,
18    platform::Platform,
19    schema::{SchemaManager, TableSchema},
20    storage::sstable::reader::SSTableReader,
21    Config, Error, Result, RowKey, Value,
22};
23
24/// Configuration for the SSTable data manager
25#[derive(Debug, Clone)]
26pub struct SSTableDataManagerConfig {
27    /// Maximum memory for caching in MB
28    pub max_cache_size_mb: usize,
29    /// Cache TTL for data entries
30    pub cache_ttl_seconds: u64,
31    /// Maximum concurrent file operations
32    pub max_concurrent_ops: usize,
33    /// Enable background preloading
34    pub enable_preloading: bool,
35    /// Preload batch size
36    pub preload_batch_size: usize,
37    /// Discovery scan interval in seconds
38    pub discovery_interval_seconds: u64,
39    /// Enable integrity checks
40    pub enable_integrity_checks: bool,
41}
42
43impl Default for SSTableDataManagerConfig {
44    fn default() -> Self {
45        Self {
46            max_cache_size_mb: 512,
47            cache_ttl_seconds: 300, // 5 minutes
48            max_concurrent_ops: 10,
49            enable_preloading: true,
50            preload_batch_size: 1000,
51            discovery_interval_seconds: 30,
52            enable_integrity_checks: true,
53        }
54    }
55}
56
57/// Cached data entry with metadata
58#[derive(Debug, Clone)]
59pub struct CachedDataEntry {
60    /// The actual data rows
61    pub rows: Vec<DataRow>,
62    /// When this entry was cached
63    pub cached_at: Instant,
64    /// Size in bytes (approximate)
65    pub size_bytes: usize,
66    /// Access count for LRU eviction
67    pub access_count: u64,
68    /// Last access time
69    pub last_accessed: Instant,
70}
71
72/// Unified data row representation
73#[derive(Debug, Clone)]
74pub struct DataRow {
75    /// Row key
76    pub key: RowKey,
77    /// Column data
78    pub columns: HashMap<String, Value>,
79    /// Row metadata
80    pub metadata: RowMetadata,
81}
82
83/// Row metadata for tracking and validation
84#[derive(Debug, Clone)]
85pub struct RowMetadata {
86    /// Source SSTable file
87    pub source_file: PathBuf,
88    /// Write timestamp
89    pub write_time: Option<i64>,
90    /// TTL information
91    pub ttl: Option<Duration>,
92    /// Generation number
93    pub generation: u64,
94}
95
96/// Table discovery results
97#[derive(Debug, Clone)]
98pub struct TableDiscovery {
99    /// Discovered keyspaces
100    pub keyspaces: Vec<KeyspaceInfo>,
101    /// Total SSTables found
102    pub total_sstables: usize,
103    /// Discovery completion time
104    pub discovery_time: Duration,
105}
106
107/// Keyspace information
108#[derive(Debug, Clone)]
109pub struct KeyspaceInfo {
110    /// Keyspace name
111    pub name: String,
112    /// Tables in this keyspace
113    pub tables: Vec<TableInfo>,
114    /// SSTable directory path
115    pub path: PathBuf,
116}
117
118/// Table information with metadata
119#[derive(Debug, Clone)]
120pub struct TableInfo {
121    /// Table name
122    pub name: String,
123    /// Schema information
124    pub schema: Option<TableSchema>,
125    /// Associated SSTable files
126    pub sstable_files: Vec<SSTableFileInfo>,
127    /// Total estimated rows
128    pub estimated_rows: usize,
129    /// Total size in bytes
130    pub total_size_bytes: u64,
131    /// Last modified time
132    pub last_modified: Option<std::time::SystemTime>,
133}
134
135/// SSTable file information
136#[derive(Debug, Clone)]
137pub struct SSTableFileInfo {
138    /// File path
139    pub path: PathBuf,
140    /// File size
141    pub size_bytes: u64,
142    /// Cassandra version detected
143    pub version: Option<CassandraVersion>,
144    /// Compression info
145    pub compression: Option<String>,
146    /// Estimated row count
147    pub estimated_rows: usize,
148    /// Health status
149    pub health_status: FileHealthStatus,
150}
151
152/// File health status
153#[derive(Debug, Clone, PartialEq, Eq)]
154pub enum FileHealthStatus {
155    /// File is healthy and readable
156    Healthy,
157    /// File has minor issues but is usable
158    Degraded,
159    /// File is corrupted or unreadable
160    Corrupted,
161    /// File access permission issues
162    AccessDenied,
163}
164
165/// Memory-efficient SSTable data manager
166#[allow(dead_code)]
167pub struct SSTableDataManager {
168    /// Configuration
169    config: SSTableDataManagerConfig,
170    /// Platform abstraction
171    platform: Arc<Platform>,
172    /// Core configuration
173    core_config: Config,
174    /// Schema manager for metadata
175    schema_manager: Arc<SchemaManager>,
176    /// Data cache with LRU eviction
177    data_cache: Arc<DashMap<String, CachedDataEntry>>,
178    /// Discovered tables cache
179    discovered_tables: Arc<AsyncRwLock<HashMap<String, TableInfo>>>,
180    /// SSTable readers pool
181    readers_pool: Arc<DashMap<PathBuf, Arc<SSTableReader>>>,
182    /// Concurrency control
183    operation_semaphore: Arc<Semaphore>,
184    /// Background discovery state
185    discovery_state: Arc<RwLock<DiscoveryState>>,
186    /// Cache statistics
187    cache_stats: Arc<RwLock<CacheStatistics>>,
188}
189
190/// Discovery state tracking
191#[derive(Debug, Clone)]
192struct DiscoveryState {
193    /// Last discovery run
194    last_discovery: Option<Instant>,
195    /// Discovery in progress flag
196    discovery_in_progress: bool,
197    /// Discovery results
198    last_results: Option<TableDiscovery>,
199}
200
201/// Cache performance statistics
202#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
203pub struct CacheStatistics {
204    /// Total cache hits
205    pub cache_hits: u64,
206    /// Total cache misses
207    pub cache_misses: u64,
208    /// Current cache size in bytes
209    pub current_cache_size_bytes: usize,
210    /// Number of cache entries
211    pub cache_entries: usize,
212    /// Number of evictions
213    pub evictions: u64,
214    /// Average access time in microseconds
215    pub avg_access_time_micros: u64,
216    /// Background operations count
217    pub background_operations: u64,
218}
219
220impl SSTableDataManager {
221    /// Create a new SSTable data manager
222    pub async fn new(
223        config: SSTableDataManagerConfig,
224        platform: Arc<Platform>,
225        core_config: Config,
226        schema_manager: Arc<SchemaManager>,
227    ) -> Result<Self> {
228        let operation_semaphore = Arc::new(Semaphore::new(config.max_concurrent_ops));
229
230        let manager = Self {
231            config,
232            platform,
233            core_config,
234            schema_manager,
235            data_cache: Arc::new(DashMap::new()),
236            discovered_tables: Arc::new(AsyncRwLock::new(HashMap::new())),
237            readers_pool: Arc::new(DashMap::new()),
238            operation_semaphore,
239            discovery_state: Arc::new(RwLock::new(DiscoveryState {
240                last_discovery: None,
241                discovery_in_progress: false,
242                last_results: None,
243            })),
244            cache_stats: Arc::new(RwLock::new(CacheStatistics {
245                cache_hits: 0,
246                cache_misses: 0,
247                current_cache_size_bytes: 0,
248                cache_entries: 0,
249                evictions: 0,
250                avg_access_time_micros: 0,
251                background_operations: 0,
252            })),
253        };
254
255        Ok(manager)
256    }
257
258    /// Discover all available keyspaces and tables
259    pub async fn discover_tables(&self, data_dir: &Path) -> Result<TableDiscovery> {
260        let _start_time = Instant::now();
261
262        // Check if discovery is already in progress
263        {
264            let mut state = self.discovery_state.write();
265            if state.discovery_in_progress {
266                // Return cached results if available
267                if let Some(ref results) = state.last_results {
268                    return Ok(results.clone());
269                }
270            }
271            state.discovery_in_progress = true;
272        }
273
274        let discovery_result = self.perform_discovery(data_dir).await;
275
276        // Update discovery state
277        {
278            let mut state = self.discovery_state.write();
279            state.discovery_in_progress = false;
280            state.last_discovery = Some(Instant::now());
281            if let Ok(ref results) = discovery_result {
282                state.last_results = Some(results.clone());
283            }
284        }
285
286        discovery_result
287    }
288
289    /// Perform the actual table discovery
290    async fn perform_discovery(&self, data_dir: &Path) -> Result<TableDiscovery> {
291        let start_time = Instant::now();
292        let mut keyspaces = Vec::new();
293        let mut total_sstables = 0;
294
295        // Scan for keyspace directories
296        let mut keyspace_entries = self.platform.fs().read_dir(data_dir).await.map_err(|e| {
297            Error::Io(std::io::Error::other(format!(
298                "Failed to read data directory: {}",
299                e
300            )))
301        })?;
302
303        while let Some(entry) = keyspace_entries.next_entry().await.map_err(|e| {
304            Error::Io(std::io::Error::other(format!(
305                "Error reading directory entry: {}",
306                e
307            )))
308        })? {
309            let path = entry.path();
310            if path.is_dir() {
311                if let Some(keyspace_name) = path.file_name().and_then(|n| n.to_str()) {
312                    // Skip system directories
313                    if keyspace_name.starts_with('.') || keyspace_name == "system" {
314                        continue;
315                    }
316
317                    if let Ok(keyspace_info) =
318                        self.discover_keyspace_tables(&path, keyspace_name).await
319                    {
320                        total_sstables += keyspace_info
321                            .tables
322                            .iter()
323                            .map(|t| t.sstable_files.len())
324                            .sum::<usize>();
325                        keyspaces.push(keyspace_info);
326                    }
327                }
328            }
329        }
330
331        // Update discovered tables cache
332        {
333            let mut discovered = self.discovered_tables.write().await;
334            discovered.clear();
335
336            for keyspace in &keyspaces {
337                for table in &keyspace.tables {
338                    let full_name = format!("{}.{}", keyspace.name, table.name);
339                    discovered.insert(full_name, table.clone());
340                }
341            }
342        }
343
344        Ok(TableDiscovery {
345            keyspaces,
346            total_sstables,
347            discovery_time: start_time.elapsed(),
348        })
349    }
350
351    /// Discover tables within a keyspace
352    async fn discover_keyspace_tables(
353        &self,
354        keyspace_path: &Path,
355        keyspace_name: &str,
356    ) -> Result<KeyspaceInfo> {
357        let mut tables = Vec::new();
358
359        let mut table_entries = self
360            .platform
361            .fs()
362            .read_dir(keyspace_path)
363            .await
364            .map_err(|e| {
365                Error::Io(std::io::Error::other(format!(
366                    "Failed to read keyspace directory: {}",
367                    e
368                )))
369            })?;
370
371        while let Some(entry) = table_entries.next_entry().await.map_err(|e| {
372            Error::Io(std::io::Error::other(format!(
373                "Error reading table entry: {}",
374                e
375            )))
376        })? {
377            let path = entry.path();
378            if path.is_dir() {
379                if let Some(table_name) = path.file_name().and_then(|n| n.to_str()) {
380                    // Skip UUID-based table directories unless they contain valid SSTables
381                    if let Ok(table_info) = self.discover_table_sstables(&path, table_name).await {
382                        if !table_info.sstable_files.is_empty() {
383                            tables.push(table_info);
384                        }
385                    }
386                }
387            }
388        }
389
390        Ok(KeyspaceInfo {
391            name: keyspace_name.to_string(),
392            tables,
393            path: keyspace_path.to_path_buf(),
394        })
395    }
396
397    /// Discover SSTable files for a specific table
398    async fn discover_table_sstables(
399        &self,
400        table_path: &Path,
401        table_name: &str,
402    ) -> Result<TableInfo> {
403        let mut sstable_files = Vec::new();
404        let mut total_size_bytes = 0u64;
405        let mut last_modified = None;
406
407        let mut file_entries = self.platform.fs().read_dir(table_path).await.map_err(|e| {
408            Error::Io(std::io::Error::other(format!(
409                "Failed to read table directory: {}",
410                e
411            )))
412        })?;
413
414        while let Some(entry) = file_entries.next_entry().await.map_err(|e| {
415            Error::Io(std::io::Error::other(format!(
416                "Error reading file entry: {}",
417                e
418            )))
419        })? {
420            let path = entry.path();
421            if let Some(extension) = path.extension() {
422                if extension == "db" {
423                    // Cassandra SSTable data files
424                    let metadata = entry.metadata().await.map_err(|e| {
425                        Error::Io(std::io::Error::other(format!(
426                            "Failed to get file metadata: {}",
427                            e
428                        )))
429                    })?;
430
431                    let size_bytes = metadata.len();
432                    total_size_bytes += size_bytes;
433
434                    if last_modified.is_none() || metadata.modified().ok() > last_modified {
435                        last_modified = metadata.modified().ok();
436                    }
437
438                    let file_info = self.analyze_sstable_file(&path, size_bytes).await;
439                    sstable_files.push(file_info);
440                }
441            }
442        }
443
444        // Try to load schema information
445        let schema = self.load_table_schema(table_name).await.ok();
446
447        // Estimate total rows
448        let estimated_rows = sstable_files.iter().map(|f| f.estimated_rows).sum();
449
450        Ok(TableInfo {
451            name: table_name.to_string(),
452            schema,
453            sstable_files,
454            estimated_rows,
455            total_size_bytes,
456            last_modified,
457        })
458    }
459
460    /// Analyze an individual SSTable file
461    async fn analyze_sstable_file(&self, file_path: &Path, size_bytes: u64) -> SSTableFileInfo {
462        let mut file_info = SSTableFileInfo {
463            path: file_path.to_path_buf(),
464            size_bytes,
465            version: None,
466            compression: None,
467            estimated_rows: 0,
468            health_status: FileHealthStatus::Healthy,
469        };
470
471        // Try to read header information
472        if let Ok(reader) = self.get_or_create_reader(file_path).await {
473            // Extract version and compression info
474            let header = reader.header();
475            file_info.version = Some(header.cassandra_version);
476            file_info.compression = Some(header.compression.algorithm.clone());
477
478            // Estimate row count based on file size and typical row size
479            file_info.estimated_rows = self.estimate_row_count(size_bytes, &reader).await;
480
481            // Perform basic health check if enabled
482            if self.config.enable_integrity_checks {
483                file_info.health_status = self.check_file_health(&reader).await;
484            }
485        } else {
486            file_info.health_status = FileHealthStatus::Corrupted;
487        }
488
489        file_info
490    }
491
492    /// Get or create a reader for the specified file
493    async fn get_or_create_reader(&self, file_path: &Path) -> Result<Arc<SSTableReader>> {
494        if let Some(reader) = self.readers_pool.get(file_path) {
495            return Ok(reader.clone());
496        }
497
498        let _permit = self
499            .operation_semaphore
500            .acquire()
501            .await
502            .map_err(|_| Error::Io(std::io::Error::other("Semaphore acquisition failed")))?;
503
504        // Double-check after acquiring permit
505        if let Some(reader) = self.readers_pool.get(file_path) {
506            return Ok(reader.clone());
507        }
508
509        let reader = Arc::new(
510            SSTableReader::open(file_path, &self.core_config, self.platform.clone()).await?,
511        );
512
513        self.readers_pool
514            .insert(file_path.to_path_buf(), reader.clone());
515        Ok(reader)
516    }
517
518    /// Load data for a specific table with caching
519    pub async fn load_table_data(
520        &self,
521        keyspace: &str,
522        table: &str,
523        limit: Option<usize>,
524    ) -> Result<Vec<DataRow>> {
525        let start_time = Instant::now();
526        let cache_key = format!("{}:{}", keyspace, table);
527
528        // Check cache first
529        if let Some(cached) = self.data_cache.get(&cache_key) {
530            if !self.is_cache_expired(&cached) {
531                self.update_cache_stats(true, start_time.elapsed());
532                return Ok(cached.rows.clone());
533            }
534        }
535
536        // Load from disk
537        let rows = self
538            .load_table_data_from_disk(keyspace, table, limit)
539            .await?;
540
541        // Cache the results
542        let cache_entry = CachedDataEntry {
543            size_bytes: self.estimate_rows_size(&rows),
544            rows: rows.clone(),
545            cached_at: Instant::now(),
546            access_count: 1,
547            last_accessed: Instant::now(),
548        };
549
550        self.data_cache.insert(cache_key, cache_entry);
551        self.update_cache_stats(false, start_time.elapsed());
552        self.maybe_evict_cache().await;
553
554        Ok(rows)
555    }
556
557    /// Load table data directly from disk
558    async fn load_table_data_from_disk(
559        &self,
560        keyspace: &str,
561        table: &str,
562        limit: Option<usize>,
563    ) -> Result<Vec<DataRow>> {
564        let full_table_name = format!("{}.{}", keyspace, table);
565
566        // Get table info
567        let table_info = {
568            let discovered = self.discovered_tables.read().await;
569            discovered.get(&full_table_name).cloned()
570        };
571
572        let table_info = table_info
573            .ok_or_else(|| Error::Table(format!("Table {}.{} not found", keyspace, table)))?;
574
575        let mut all_rows = Vec::new();
576        let mut loaded_count = 0;
577
578        // Load data from all SSTable files
579        for file_info in &table_info.sstable_files {
580            if file_info.health_status != FileHealthStatus::Healthy {
581                continue; // Skip corrupted files
582            }
583
584            let reader = self.get_or_create_reader(&file_info.path).await?;
585            let file_rows = self
586                .load_rows_from_reader(&reader, &table_info, limit)
587                .await?;
588
589            for row in file_rows {
590                all_rows.push(row);
591                loaded_count += 1;
592
593                if let Some(limit) = limit {
594                    if loaded_count >= limit {
595                        break;
596                    }
597                }
598            }
599
600            if let Some(limit) = limit {
601                if loaded_count >= limit {
602                    break;
603                }
604            }
605        }
606
607        Ok(all_rows)
608    }
609
610    /// Load rows from a specific SSTable reader
611    ///
612    /// Converts SSTableReader entries (TableId, RowKey, Value) to DataRow format.
613    ///
614    /// # Value Type Handling
615    /// - `Value::Map`: Normal case - extracts all column name/value pairs (Issue #191 fix)
616    /// - `Value::Null`: Tombstoned rows - skipped, not included in results
617    /// - Other types: Unexpected - logs warning and uses fallback single-column format
618    ///
619    /// # Note
620    /// The `limit` parameter applies to the number of rows returned AFTER filtering
621    /// tombstones, so the actual number of entries scanned may be higher.
622    async fn load_rows_from_reader(
623        &self,
624        reader: &SSTableReader,
625        _table_info: &TableInfo,
626        limit: Option<usize>,
627    ) -> Result<Vec<DataRow>> {
628        let mut rows = Vec::new();
629
630        // TODO(Issue #190): SSTableReader::get_all_entries() replaces streaming API
631        // Future enhancement: Add true streaming support to SSTableReader if needed
632        let all_entries = reader.get_all_entries().await?;
633        let entries_to_process = if let Some(lim) = limit {
634            all_entries.into_iter().take(lim).collect::<Vec<_>>()
635        } else {
636            all_entries
637        };
638
639        for (_table_id, row_key, value) in entries_to_process {
640            // Convert SSTableReader entry format to DataRow
641            // FIXED (Issue #191): SSTableReader returns Value::Map with all columns
642            // Extract each (column_name, column_value) pair from the map
643            // Performance optimization: consume value instead of cloning (no ref)
644            let columns = match value {
645                Value::Map(map_entries) => map_entries
646                    .into_iter()
647                    .filter_map(|(key, val)| match key {
648                        Value::Text(column_name) => Some((column_name, val)),
649                        _ => {
650                            log::warn!(
651                                "Unexpected map key type for row {:?}: {:?}, skipping column",
652                                row_key,
653                                key
654                            );
655                            None
656                        }
657                    })
658                    .collect(),
659                Value::Null => {
660                    // Row was deleted or has no regular columns (tombstone)
661                    log::debug!("Skipping null row for key: {:?}", row_key);
662                    continue; // Skip this row entirely
663                }
664                _ => {
665                    // Unexpected value type - log warning but continue with fallback
666                    log::warn!(
667                        "Expected Value::Map from SSTableReader, got {:?} for key: {:?}",
668                        value,
669                        row_key
670                    );
671                    // Fallback: treat as single-column value (move instead of clone)
672                    HashMap::from([("value".to_string(), value)])
673                }
674            };
675
676            let metadata = RowMetadata {
677                source_file: reader.file_path.clone(),
678                write_time: None,
679                ttl: None,
680                generation: reader.generation,
681            };
682
683            rows.push(DataRow {
684                key: row_key,
685                columns,
686                metadata,
687            });
688        }
689
690        Ok(rows)
691    }
692
693    /// Convert SSTable entry to DataRow
694    /// TODO(Issue #190): Legacy method from BulletproofReader API - may be removed
695    #[allow(dead_code)]
696    async fn convert_entry_to_row(
697        &self,
698        entry: crate::storage::sstable::bulletproof_reader::SSTableEntry,
699        table_info: &TableInfo,
700        source_file: &Path,
701    ) -> Result<DataRow> {
702        let mut columns = HashMap::new();
703
704        // Parse entry data based on schema
705        if let Some(ref schema) = table_info.schema {
706            for (i, column) in schema.columns.iter().enumerate() {
707                if i < entry.values.len() {
708                    let parsed_value =
709                        self.parse_column_value(&entry.values[i], &column.data_type)?;
710                    columns.insert(column.name.clone(), parsed_value);
711                }
712            }
713        } else {
714            // Fallback: create generic columns
715            for (i, value) in entry.values.iter().enumerate() {
716                columns.insert(format!("column_{}", i), value.clone());
717            }
718        }
719
720        let metadata = RowMetadata {
721            source_file: source_file.to_path_buf(),
722            write_time: entry.timestamp,
723            ttl: None, // Would be extracted from entry metadata
724            generation: entry.generation.unwrap_or(0),
725        };
726
727        Ok(DataRow {
728            key: entry.key,
729            columns,
730            metadata,
731        })
732    }
733
734    /// Parse column value based on data type
735    /// TODO(Issue #190): Legacy method from BulletproofReader API - may be removed
736    #[allow(dead_code)]
737    fn parse_column_value(&self, value: &Value, _data_type: &str) -> Result<Value> {
738        // For now, return the value as-is
739        // In a real implementation, this would handle type conversions
740        Ok(value.clone())
741    }
742
743    /// Query data with CQL-like filtering
744    pub async fn query_data(
745        &self,
746        keyspace: &str,
747        table: &str,
748        where_clause: Option<&str>,
749        limit: Option<usize>,
750    ) -> Result<Vec<DataRow>> {
751        let rows = self.load_table_data(keyspace, table, None).await?;
752
753        // Apply filtering if where clause is provided
754        let filtered_rows = if let Some(_where_clause) = where_clause {
755            // TODO: Implement proper CQL WHERE clause parsing and filtering
756            rows
757        } else {
758            rows
759        };
760
761        // Apply limit
762        let final_rows = if let Some(limit) = limit {
763            filtered_rows.into_iter().take(limit).collect()
764        } else {
765            filtered_rows
766        };
767
768        Ok(final_rows)
769    }
770
771    /// Get table schema information
772    pub async fn get_table_schema(
773        &self,
774        keyspace: &str,
775        table: &str,
776    ) -> Result<Option<TableSchema>> {
777        let full_table_name = format!("{}.{}", keyspace, table);
778        let discovered = self.discovered_tables.read().await;
779
780        if let Some(table_info) = discovered.get(&full_table_name) {
781            Ok(table_info.schema.clone())
782        } else {
783            Ok(None)
784        }
785    }
786
787    /// List all discovered keyspaces
788    pub async fn list_keyspaces(&self) -> Result<Vec<String>> {
789        let discovered = self.discovered_tables.read().await;
790        let mut keyspaces: Vec<String> = discovered
791            .keys()
792            .map(|full_name| full_name.split('.').next().unwrap_or("").to_string())
793            .collect();
794
795        keyspaces.sort();
796        keyspaces.dedup();
797        Ok(keyspaces)
798    }
799
800    /// List tables in a keyspace
801    pub async fn list_tables(&self, keyspace: &str) -> Result<Vec<String>> {
802        let discovered = self.discovered_tables.read().await;
803        let tables: Vec<String> = discovered
804            .keys()
805            .filter_map(|full_name| {
806                let parts: Vec<&str> = full_name.split('.').collect();
807                if parts.len() == 2 && parts[0] == keyspace {
808                    Some(parts[1].to_string())
809                } else {
810                    None
811                }
812            })
813            .collect();
814
815        Ok(tables)
816    }
817
818    /// Get cache statistics
819    pub fn get_cache_stats(&self) -> CacheStatistics {
820        self.cache_stats.read().clone()
821    }
822
823    /// Get discovery status
824    pub fn get_discovery_status(&self) -> (bool, Option<Duration>) {
825        let state = self.discovery_state.read();
826        let time_since_last = state.last_discovery.map(|last| last.elapsed());
827        (state.discovery_in_progress, time_since_last)
828    }
829
830    // Helper methods
831
832    async fn load_table_schema(&self, table_name: &str) -> Result<TableSchema> {
833        // Try to load schema from schema manager
834        self.schema_manager.get_table_schema(table_name).await
835    }
836
837    async fn estimate_row_count(&self, file_size_bytes: u64, _reader: &SSTableReader) -> usize {
838        // Estimate based on file size and average row size
839        // This is a rough estimate - in practice you'd sample some rows
840        let estimated_avg_row_size = 256; // bytes
841        (file_size_bytes / estimated_avg_row_size) as usize
842    }
843
844    async fn check_file_health(&self, reader: &SSTableReader) -> FileHealthStatus {
845        // TODO(Issue #190): SSTableReader integrity checking API differs
846        // For now, perform basic file accessibility check
847        // Future: use reader.check_integrity() when available
848        if reader.file_path.exists() {
849            FileHealthStatus::Healthy
850        } else {
851            FileHealthStatus::AccessDenied
852        }
853    }
854
855    fn is_cache_expired(&self, entry: &CachedDataEntry) -> bool {
856        let ttl = Duration::from_secs(self.config.cache_ttl_seconds);
857        entry.cached_at.elapsed() > ttl
858    }
859
860    fn estimate_rows_size(&self, rows: &[DataRow]) -> usize {
861        // Rough estimation of memory usage
862        rows.len() * 256 // Average row size estimate
863    }
864
865    fn update_cache_stats(&self, hit: bool, access_time: Duration) {
866        let mut stats = self.cache_stats.write();
867        if hit {
868            stats.cache_hits += 1;
869        } else {
870            stats.cache_misses += 1;
871        }
872
873        // Update average access time (simple moving average)
874        let new_time_micros = access_time.as_micros() as u64;
875        stats.avg_access_time_micros = (stats.avg_access_time_micros + new_time_micros) / 2;
876
877        stats.cache_entries = self.data_cache.len();
878        stats.current_cache_size_bytes = self.data_cache.iter().map(|entry| entry.size_bytes).sum();
879    }
880
881    async fn maybe_evict_cache(&self) {
882        let max_size_bytes = self.config.max_cache_size_mb * 1024 * 1024;
883        let current_size: usize = self.data_cache.iter().map(|entry| entry.size_bytes).sum();
884
885        if current_size > max_size_bytes {
886            self.evict_lru_entries(current_size - max_size_bytes).await;
887        }
888    }
889
890    async fn evict_lru_entries(&self, bytes_to_evict: usize) {
891        let mut entries_to_remove = Vec::new();
892        let mut bytes_evicted = 0;
893
894        // Collect entries sorted by last access time
895        let mut sorted_entries: Vec<_> = self
896            .data_cache
897            .iter()
898            .map(|entry| (entry.key().clone(), entry.last_accessed, entry.size_bytes))
899            .collect();
900
901        sorted_entries.sort_by_key(|(_, last_accessed, _)| *last_accessed);
902
903        for (key, _, size) in sorted_entries {
904            entries_to_remove.push(key);
905            bytes_evicted += size;
906
907            if bytes_evicted >= bytes_to_evict {
908                break;
909            }
910        }
911
912        // Remove the entries
913        for key in entries_to_remove {
914            self.data_cache.remove(&key);
915            let mut stats = self.cache_stats.write();
916            stats.evictions += 1;
917        }
918    }
919}
920
921#[cfg(test)]
922mod tests {
923    use super::*;
924    use tempfile::TempDir;
925
926    #[tokio::test]
927    async fn test_data_manager_creation() {
928        let temp_dir = TempDir::new().unwrap();
929        let config = SSTableDataManagerConfig::default();
930        let core_config = Config::default();
931        let platform = Arc::new(Platform::new(&core_config).await.unwrap());
932        let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
933
934        let manager = SSTableDataManager::new(config, platform, core_config, schema_manager)
935            .await
936            .unwrap();
937
938        let stats = manager.get_cache_stats();
939        assert_eq!(stats.cache_entries, 0);
940        assert_eq!(stats.cache_hits, 0);
941    }
942
943    #[tokio::test]
944    async fn test_cache_statistics() {
945        let temp_dir = TempDir::new().unwrap();
946        let config = SSTableDataManagerConfig::default();
947        let core_config = Config::default();
948        let platform = Arc::new(Platform::new(&core_config).await.unwrap());
949        let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
950
951        let manager = SSTableDataManager::new(config, platform, core_config, schema_manager)
952            .await
953            .unwrap();
954
955        // Test initial state
956        let stats = manager.get_cache_stats();
957        assert_eq!(stats.cache_hits, 0);
958        assert_eq!(stats.cache_misses, 0);
959
960        // Test discovery status
961        let (in_progress, last_discovery) = manager.get_discovery_status();
962        assert!(!in_progress);
963        assert!(last_discovery.is_none());
964    }
965}