Skip to main content

cqlite_core/storage/
repl_data_api.rs

1//! REPL Data Access API
2//!
3//! This module provides a high-level API for accessing SSTable data from the REPL.
4//! It integrates with the data manager to provide efficient, cached access to real
5//! Cassandra data with support for interactive queries and exploration.
6//!
7//! This module requires the `state_machine` feature for query functionality.
8
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use serde::{Deserialize, Serialize};
15use tokio::sync::RwLock;
16
17use crate::{
18    platform::Platform,
19    query::result::{QueryResult, QueryRow},
20    schema::{SchemaManager, TableSchema},
21    storage::sstable_data_manager::{
22        CacheStatistics, DataRow, SSTableDataManager, SSTableDataManagerConfig, TableDiscovery,
23        TableInfo,
24    },
25    Config, Error, Result, Value,
26};
27
28/// REPL data access configuration
29#[derive(Debug, Clone)]
30pub struct ReplDataConfig {
31    /// Data manager configuration
32    pub data_manager_config: SSTableDataManagerConfig,
33    /// Default query timeout in seconds
34    pub default_timeout_seconds: u64,
35    /// Enable automatic schema detection
36    pub auto_detect_schema: bool,
37    /// Maximum rows per query (safety limit)
38    pub max_rows_per_query: usize,
39    /// Enable query result caching
40    pub enable_query_cache: bool,
41    /// Query cache TTL in seconds
42    pub query_cache_ttl_seconds: u64,
43}
44
45impl Default for ReplDataConfig {
46    fn default() -> Self {
47        Self {
48            data_manager_config: SSTableDataManagerConfig::default(),
49            default_timeout_seconds: 30,
50            auto_detect_schema: true,
51            max_rows_per_query: 10000,
52            enable_query_cache: true,
53            query_cache_ttl_seconds: 300,
54        }
55    }
56}
57
58/// Query execution context for REPL
59#[derive(Debug, Clone)]
60pub struct QueryContext {
61    /// Current keyspace
62    pub keyspace: Option<String>,
63    /// Query timeout
64    pub timeout: Duration,
65    /// Maximum rows to return
66    pub limit: Option<usize>,
67    /// Enable timing information
68    pub timing_enabled: bool,
69    /// Page size for pagination
70    pub page_size: Option<usize>,
71    /// Current page offset
72    pub page_offset: usize,
73}
74
75impl Default for QueryContext {
76    fn default() -> Self {
77        Self {
78            keyspace: None,
79            timeout: Duration::from_secs(30),
80            limit: Some(100),
81            timing_enabled: false,
82            page_size: Some(50),
83            page_offset: 0,
84        }
85    }
86}
87
88/// Query execution result with metadata
89#[derive(Debug, Clone)]
90pub struct ReplQueryResult {
91    /// Query result data
92    pub result: QueryResult,
93    /// Execution metadata
94    pub metadata: QueryMetadata,
95    /// Schema information (if available)
96    pub schema: Option<TableSchema>,
97}
98
99/// Query execution metadata
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct QueryMetadata {
102    /// Total execution time
103    pub execution_time: Duration,
104    /// Number of rows returned
105    pub rows_returned: usize,
106    /// Total rows available (before limit)
107    pub total_rows_available: Option<usize>,
108    /// Whether result was cached
109    pub from_cache: bool,
110    /// Source SSTable files accessed
111    pub source_files: Vec<PathBuf>,
112    /// Data size read in bytes
113    pub bytes_read: u64,
114    /// Cache hit ratio for this query
115    pub cache_hit_ratio: f64,
116}
117
118/// Table listing result
119#[derive(Debug, Clone)]
120pub struct TableListing {
121    /// Keyspace name
122    pub keyspace: String,
123    /// Table information
124    pub tables: Vec<TableSummary>,
125    /// Discovery timestamp
126    pub discovered_at: Instant,
127}
128
129/// Summary information for a table
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct TableSummary {
132    /// Table name
133    pub name: String,
134    /// Estimated row count
135    pub estimated_rows: usize,
136    /// Total size in bytes
137    pub size_bytes: u64,
138    /// Number of SSTable files
139    pub sstable_count: usize,
140    /// Schema availability
141    pub has_schema: bool,
142    /// Last modified time
143    pub last_modified: Option<std::time::SystemTime>,
144    /// Health status
145    pub health_status: String,
146}
147
148/// REPL data access API
149pub struct ReplDataApi {
150    /// Configuration
151    config: ReplDataConfig,
152    /// Data manager for SSTable access
153    data_manager: Arc<SSTableDataManager>,
154    /// Current query context
155    query_context: Arc<RwLock<QueryContext>>,
156    /// Query cache (if enabled)
157    query_cache: Arc<RwLock<HashMap<String, (ReplQueryResult, Instant)>>>,
158    /// Discovery cache
159    discovery_cache: Arc<RwLock<Option<(TableDiscovery, Instant)>>>,
160}
161
162impl ReplDataApi {
163    /// Create a new REPL data API
164    pub async fn new(
165        config: ReplDataConfig,
166        platform: Arc<Platform>,
167        core_config: Config,
168        schema_manager: Arc<SchemaManager>,
169    ) -> Result<Self> {
170        let data_manager = Arc::new(
171            SSTableDataManager::new(
172                config.data_manager_config.clone(),
173                platform,
174                core_config,
175                schema_manager,
176            )
177            .await?,
178        );
179
180        Ok(Self {
181            config,
182            data_manager,
183            query_context: Arc::new(RwLock::new(QueryContext::default())),
184            query_cache: Arc::new(RwLock::new(HashMap::new())),
185            discovery_cache: Arc::new(RwLock::new(None)),
186        })
187    }
188
189    /// Initialize the API with a data directory
190    pub async fn initialize(&self, data_dir: &Path) -> Result<TableDiscovery> {
191        let discovery = self.data_manager.discover_tables(data_dir).await?;
192
193        // Cache the discovery results
194        {
195            let mut cache = self.discovery_cache.write().await;
196            *cache = Some((discovery.clone(), Instant::now()));
197        }
198
199        Ok(discovery)
200    }
201
202    /// Set the current keyspace
203    pub async fn use_keyspace(&self, keyspace: &str) -> Result<()> {
204        // Validate keyspace exists
205        let keyspaces = self.list_keyspaces().await?;
206        if !keyspaces.contains(&keyspace.to_string()) {
207            return Err(Error::CqlParse(format!(
208                "Keyspace '{}' does not exist",
209                keyspace
210            )));
211        }
212
213        let mut context = self.query_context.write().await;
214        context.keyspace = Some(keyspace.to_string());
215        Ok(())
216    }
217
218    /// Get the current keyspace
219    pub async fn current_keyspace(&self) -> Option<String> {
220        let context = self.query_context.read().await;
221        context.keyspace.clone()
222    }
223
224    /// Execute a SELECT query
225    pub async fn select(
226        &self,
227        table: &str,
228        columns: Option<Vec<String>>,
229        where_clause: Option<String>,
230        limit: Option<usize>,
231    ) -> Result<ReplQueryResult> {
232        let start_time = Instant::now();
233        let context = self.query_context.read().await;
234
235        // Ensure we have a keyspace
236        let keyspace = context.keyspace.as_ref().ok_or_else(|| {
237            Error::InvalidState("No keyspace selected. Use 'USE keyspace;' first.".to_string())
238        })?;
239
240        // Apply context limits
241        let effective_limit = limit
242            .or(context.limit)
243            .map(|l| l.min(self.config.max_rows_per_query))
244            .unwrap_or(self.config.max_rows_per_query);
245
246        // Check query cache if enabled
247        if self.config.enable_query_cache {
248            let cache_key = format!(
249                "{}:{}:{}:{:?}:{:?}",
250                keyspace,
251                table,
252                columns.as_ref().map(|c| c.join(",")).unwrap_or_default(),
253                where_clause,
254                effective_limit
255            );
256
257            if let Some((cached_result, cached_at)) = self.get_cached_query(&cache_key).await {
258                let cache_ttl = Duration::from_secs(self.config.query_cache_ttl_seconds);
259                if cached_at.elapsed() < cache_ttl {
260                    let mut result = cached_result;
261                    result.metadata.from_cache = true;
262                    result.metadata.execution_time = start_time.elapsed();
263                    return Ok(result);
264                }
265            }
266        }
267
268        // Execute the query
269        let rows = self
270            .data_manager
271            .query_data(
272                keyspace,
273                table,
274                where_clause.as_deref(),
275                Some(effective_limit),
276            )
277            .await?;
278
279        // Get schema information
280        let schema = self.data_manager.get_table_schema(keyspace, table).await?;
281
282        // Convert to QueryResult
283        let query_result = self.convert_to_query_result(rows.clone(), &columns, &schema)?;
284
285        // Create metadata
286        let metadata = QueryMetadata {
287            execution_time: start_time.elapsed(),
288            rows_returned: rows.len(),
289            total_rows_available: None, // Would require separate count query
290            from_cache: false,
291            source_files: rows
292                .iter()
293                .map(|r| r.metadata.source_file.clone())
294                .collect(),
295            bytes_read: self.estimate_bytes_read(&rows),
296            cache_hit_ratio: self.calculate_cache_hit_ratio().await,
297        };
298
299        let result = ReplQueryResult {
300            result: query_result,
301            metadata,
302            schema,
303        };
304
305        // Cache the result if enabled
306        if self.config.enable_query_cache {
307            let cache_key = format!(
308                "{}:{}:{}:{:?}:{:?}",
309                keyspace,
310                table,
311                columns.as_ref().map(|c| c.join(",")).unwrap_or_default(),
312                where_clause,
313                effective_limit
314            );
315            self.cache_query_result(cache_key, result.clone()).await;
316        }
317
318        Ok(result)
319    }
320
321    /// List all available keyspaces
322    pub async fn list_keyspaces(&self) -> Result<Vec<String>> {
323        self.data_manager.list_keyspaces().await
324    }
325
326    /// List tables in the current or specified keyspace
327    pub async fn list_tables(&self, keyspace: Option<&str>) -> Result<TableListing> {
328        let target_keyspace = if let Some(ks) = keyspace {
329            ks.to_string()
330        } else {
331            let context = self.query_context.read().await;
332            context
333                .keyspace
334                .as_ref()
335                .ok_or_else(|| Error::InvalidState("No keyspace selected".to_string()))?
336                .clone()
337        };
338
339        let table_names = self.data_manager.list_tables(&target_keyspace).await?;
340        let mut tables = Vec::new();
341
342        // Get detailed information for each table
343        for table_name in table_names {
344            if let Ok(Some(_schema)) = self
345                .data_manager
346                .get_table_schema(&target_keyspace, &table_name)
347                .await
348            {
349                // Get table info from discovery cache
350                let discovery = self.get_discovery_cache().await;
351                if let Some((ref discovery_data, _)) = discovery {
352                    for keyspace_info in &discovery_data.keyspaces {
353                        if keyspace_info.name == target_keyspace {
354                            for table_info in &keyspace_info.tables {
355                                if table_info.name == table_name {
356                                    let summary = TableSummary {
357                                        name: table_name.clone(),
358                                        estimated_rows: table_info.estimated_rows,
359                                        size_bytes: table_info.total_size_bytes,
360                                        sstable_count: table_info.sstable_files.len(),
361                                        has_schema: table_info.schema.is_some(),
362                                        last_modified: table_info.last_modified,
363                                        health_status: self.assess_table_health(table_info),
364                                    };
365                                    tables.push(summary);
366                                    break;
367                                }
368                            }
369                            break;
370                        }
371                    }
372                }
373            }
374        }
375
376        Ok(TableListing {
377            keyspace: target_keyspace,
378            tables,
379            discovered_at: Instant::now(),
380        })
381    }
382
383    /// Describe a table schema
384    pub async fn describe_table(&self, table: &str, keyspace: Option<&str>) -> Result<TableSchema> {
385        let target_keyspace = if let Some(ks) = keyspace {
386            ks.to_string()
387        } else {
388            let context = self.query_context.read().await;
389            context
390                .keyspace
391                .as_ref()
392                .ok_or_else(|| Error::InvalidState("No keyspace selected".to_string()))?
393                .clone()
394        };
395
396        self.data_manager
397            .get_table_schema(&target_keyspace, table)
398            .await?
399            .ok_or_else(|| {
400                Error::Table(format!(
401                    "Table {}.{} not found or no schema available",
402                    target_keyspace, table
403                ))
404            })
405    }
406
407    /// Get system information and statistics
408    pub async fn get_system_info(&self) -> Result<SystemInfo> {
409        let cache_stats = self.data_manager.get_cache_stats();
410        let (discovery_in_progress, last_discovery) = self.data_manager.get_discovery_status();
411
412        let discovery_info = self.get_discovery_cache().await;
413        let (total_keyspaces, total_tables, total_sstables) =
414            if let Some((ref discovery, _)) = discovery_info {
415                (
416                    discovery.keyspaces.len(),
417                    discovery.keyspaces.iter().map(|ks| ks.tables.len()).sum(),
418                    discovery.total_sstables,
419                )
420            } else {
421                (0, 0, 0)
422            };
423
424        let memory_usage_mb = cache_stats.current_cache_size_bytes / (1024 * 1024);
425
426        Ok(SystemInfo {
427            total_keyspaces,
428            total_tables,
429            total_sstables,
430            cache_stats,
431            discovery_in_progress,
432            last_discovery_time: last_discovery,
433            memory_usage_mb,
434            active_connections: 1, // REPL is single-connection
435        })
436    }
437
438    /// Update query context settings
439    pub async fn update_context(&self, updates: QueryContextUpdate) -> Result<()> {
440        let mut context = self.query_context.write().await;
441
442        if let Some(timeout) = updates.timeout_seconds {
443            context.timeout = Duration::from_secs(timeout);
444        }
445
446        if let Some(limit) = updates.limit {
447            context.limit = Some(limit.min(self.config.max_rows_per_query));
448        }
449
450        if let Some(timing) = updates.timing_enabled {
451            context.timing_enabled = timing;
452        }
453
454        if let Some(page_size) = updates.page_size {
455            context.page_size = Some(page_size);
456        }
457
458        Ok(())
459    }
460
461    /// Get current query context
462    pub async fn get_context(&self) -> QueryContext {
463        let context = self.query_context.read().await;
464        context.clone()
465    }
466
467    /// Clear all caches
468    pub async fn clear_caches(&self) -> Result<()> {
469        {
470            let mut query_cache = self.query_cache.write().await;
471            query_cache.clear();
472        }
473
474        {
475            let mut discovery_cache = self.discovery_cache.write().await;
476            *discovery_cache = None;
477        }
478
479        Ok(())
480    }
481
482    // Helper methods
483
484    async fn get_cached_query(&self, cache_key: &str) -> Option<(ReplQueryResult, Instant)> {
485        let cache = self.query_cache.read().await;
486        cache.get(cache_key).cloned()
487    }
488
489    async fn cache_query_result(&self, cache_key: String, result: ReplQueryResult) {
490        let mut cache = self.query_cache.write().await;
491        cache.insert(cache_key, (result, Instant::now()));
492
493        // Simple cache eviction (keep last 100 queries)
494        if cache.len() > 100 {
495            let oldest_key = cache
496                .iter()
497                .min_by_key(|(_, (_, time))| time)
498                .map(|(key, _)| key.clone());
499
500            if let Some(key) = oldest_key {
501                cache.remove(&key);
502            }
503        }
504    }
505
506    async fn get_discovery_cache(&self) -> Option<(TableDiscovery, Instant)> {
507        let cache = self.discovery_cache.read().await;
508        cache.clone()
509    }
510
511    fn convert_to_query_result(
512        &self,
513        rows: Vec<DataRow>,
514        requested_columns: &Option<Vec<String>>,
515        schema: &Option<TableSchema>,
516    ) -> Result<QueryResult> {
517        let mut query_rows = Vec::new();
518
519        for data_row in rows {
520            let mut row_values = Vec::new();
521
522            // Determine column order
523            let columns = if let Some(cols) = requested_columns {
524                cols.clone()
525            } else if let Some(schema) = schema {
526                schema.columns.iter().map(|c| c.name.clone()).collect()
527            } else {
528                data_row.columns.keys().cloned().collect()
529            };
530
531            // Extract values in order
532            for column_name in &columns {
533                let value = data_row
534                    .columns
535                    .get(column_name)
536                    .cloned()
537                    .unwrap_or(Value::Null);
538                row_values.push(value);
539            }
540
541            let query_row = QueryRow {
542                values: row_values
543                    .into_iter()
544                    .enumerate()
545                    .map(|(i, value)| (format!("col_{i}"), value))
546                    .collect(),
547                key: data_row.key.clone(),
548                metadata: crate::query::result::RowMetadata {
549                    version: Some(data_row.metadata.generation),
550                    ttl: data_row.metadata.ttl.map(|duration| duration.as_secs()),
551                    tags: std::collections::HashMap::new(),
552                },
553            };
554            query_rows.push(query_row);
555        }
556
557        Ok(QueryResult {
558            rows: query_rows,
559            rows_affected: 0,
560            execution_time_ms: 0,
561            metadata: crate::query::result::QueryMetadata::default(),
562        })
563    }
564
565    fn estimate_bytes_read(&self, rows: &[DataRow]) -> u64 {
566        // Rough estimation
567        (rows.len() * 256) as u64
568    }
569
570    async fn calculate_cache_hit_ratio(&self) -> f64 {
571        let stats = self.data_manager.get_cache_stats();
572        let total = stats.cache_hits + stats.cache_misses;
573        if total > 0 {
574            stats.cache_hits as f64 / total as f64
575        } else {
576            0.0
577        }
578    }
579
580    fn assess_table_health(&self, table_info: &TableInfo) -> String {
581        let healthy_files = table_info
582            .sstable_files
583            .iter()
584            .filter(|f| {
585                f.health_status == crate::storage::sstable_data_manager::FileHealthStatus::Healthy
586            })
587            .count();
588
589        let total_files = table_info.sstable_files.len();
590
591        if healthy_files == total_files {
592            "Healthy".to_string()
593        } else if healthy_files > total_files / 2 {
594            "Degraded".to_string()
595        } else {
596            "Corrupted".to_string()
597        }
598    }
599}
600
601/// System information structure
602#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct SystemInfo {
604    /// Total number of keyspaces
605    pub total_keyspaces: usize,
606    /// Total number of tables
607    pub total_tables: usize,
608    /// Total number of SSTable files
609    pub total_sstables: usize,
610    /// Cache statistics
611    pub cache_stats: CacheStatistics,
612    /// Whether discovery is in progress
613    pub discovery_in_progress: bool,
614    /// Time since last discovery
615    pub last_discovery_time: Option<Duration>,
616    /// Memory usage in MB
617    pub memory_usage_mb: usize,
618    /// Active connections (always 1 for REPL)
619    pub active_connections: usize,
620}
621
622/// Query context update structure
623#[derive(Debug, Clone, Default)]
624pub struct QueryContextUpdate {
625    /// New timeout in seconds
626    pub timeout_seconds: Option<u64>,
627    /// New default limit
628    pub limit: Option<usize>,
629    /// Enable/disable timing
630    pub timing_enabled: Option<bool>,
631    /// New page size
632    pub page_size: Option<usize>,
633}
634
635#[cfg(test)]
636mod tests {
637    use super::*;
638    use tempfile::TempDir;
639
640    #[tokio::test]
641    async fn test_repl_api_creation() {
642        let temp_dir = TempDir::new().unwrap();
643        let config = ReplDataConfig::default();
644        let core_config = Config::default();
645        let platform = Arc::new(Platform::new(&core_config).await.unwrap());
646        let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
647
648        let api = ReplDataApi::new(config, platform, core_config, schema_manager)
649            .await
650            .unwrap();
651
652        let context = api.get_context().await;
653        assert!(context.keyspace.is_none());
654        assert_eq!(context.limit, Some(100));
655    }
656
657    #[tokio::test]
658    async fn test_query_context_updates() {
659        let temp_dir = TempDir::new().unwrap();
660        let config = ReplDataConfig::default();
661        let core_config = Config::default();
662        let platform = Arc::new(Platform::new(&core_config).await.unwrap());
663        let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
664
665        let api = ReplDataApi::new(config, platform, core_config, schema_manager)
666            .await
667            .unwrap();
668
669        let updates = QueryContextUpdate {
670            timeout_seconds: Some(60),
671            limit: Some(200),
672            timing_enabled: Some(true),
673            page_size: Some(25),
674        };
675
676        api.update_context(updates).await.unwrap();
677
678        let context = api.get_context().await;
679        assert_eq!(context.timeout, Duration::from_secs(60));
680        assert_eq!(context.limit, Some(200));
681        assert!(context.timing_enabled);
682        assert_eq!(context.page_size, Some(25));
683    }
684}