1use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use serde::{Deserialize, Serialize};
15use tokio::sync::RwLock;
16
17use crate::{
18 platform::Platform,
19 query::result::{QueryResult, QueryRow},
20 schema::{SchemaManager, TableSchema},
21 storage::sstable_data_manager::{
22 CacheStatistics, DataRow, SSTableDataManager, SSTableDataManagerConfig, TableDiscovery,
23 TableInfo,
24 },
25 Config, Error, Result, Value,
26};
27
28#[derive(Debug, Clone)]
30pub struct ReplDataConfig {
31 pub data_manager_config: SSTableDataManagerConfig,
33 pub default_timeout_seconds: u64,
35 pub auto_detect_schema: bool,
37 pub max_rows_per_query: usize,
39 pub enable_query_cache: bool,
41 pub query_cache_ttl_seconds: u64,
43}
44
45impl Default for ReplDataConfig {
46 fn default() -> Self {
47 Self {
48 data_manager_config: SSTableDataManagerConfig::default(),
49 default_timeout_seconds: 30,
50 auto_detect_schema: true,
51 max_rows_per_query: 10000,
52 enable_query_cache: true,
53 query_cache_ttl_seconds: 300,
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
60pub struct QueryContext {
61 pub keyspace: Option<String>,
63 pub timeout: Duration,
65 pub limit: Option<usize>,
67 pub timing_enabled: bool,
69 pub page_size: Option<usize>,
71 pub page_offset: usize,
73}
74
75impl Default for QueryContext {
76 fn default() -> Self {
77 Self {
78 keyspace: None,
79 timeout: Duration::from_secs(30),
80 limit: Some(100),
81 timing_enabled: false,
82 page_size: Some(50),
83 page_offset: 0,
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct ReplQueryResult {
91 pub result: QueryResult,
93 pub metadata: QueryMetadata,
95 pub schema: Option<TableSchema>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct QueryMetadata {
102 pub execution_time: Duration,
104 pub rows_returned: usize,
106 pub total_rows_available: Option<usize>,
108 pub from_cache: bool,
110 pub source_files: Vec<PathBuf>,
112 pub bytes_read: u64,
114 pub cache_hit_ratio: f64,
116}
117
118#[derive(Debug, Clone)]
120pub struct TableListing {
121 pub keyspace: String,
123 pub tables: Vec<TableSummary>,
125 pub discovered_at: Instant,
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct TableSummary {
132 pub name: String,
134 pub estimated_rows: usize,
136 pub size_bytes: u64,
138 pub sstable_count: usize,
140 pub has_schema: bool,
142 pub last_modified: Option<std::time::SystemTime>,
144 pub health_status: String,
146}
147
148pub struct ReplDataApi {
150 config: ReplDataConfig,
152 data_manager: Arc<SSTableDataManager>,
154 query_context: Arc<RwLock<QueryContext>>,
156 query_cache: Arc<RwLock<HashMap<String, (ReplQueryResult, Instant)>>>,
158 discovery_cache: Arc<RwLock<Option<(TableDiscovery, Instant)>>>,
160}
161
162impl ReplDataApi {
163 pub async fn new(
165 config: ReplDataConfig,
166 platform: Arc<Platform>,
167 core_config: Config,
168 schema_manager: Arc<SchemaManager>,
169 ) -> Result<Self> {
170 let data_manager = Arc::new(
171 SSTableDataManager::new(
172 config.data_manager_config.clone(),
173 platform,
174 core_config,
175 schema_manager,
176 )
177 .await?,
178 );
179
180 Ok(Self {
181 config,
182 data_manager,
183 query_context: Arc::new(RwLock::new(QueryContext::default())),
184 query_cache: Arc::new(RwLock::new(HashMap::new())),
185 discovery_cache: Arc::new(RwLock::new(None)),
186 })
187 }
188
189 pub async fn initialize(&self, data_dir: &Path) -> Result<TableDiscovery> {
191 let discovery = self.data_manager.discover_tables(data_dir).await?;
192
193 {
195 let mut cache = self.discovery_cache.write().await;
196 *cache = Some((discovery.clone(), Instant::now()));
197 }
198
199 Ok(discovery)
200 }
201
202 pub async fn use_keyspace(&self, keyspace: &str) -> Result<()> {
204 let keyspaces = self.list_keyspaces().await?;
206 if !keyspaces.contains(&keyspace.to_string()) {
207 return Err(Error::CqlParse(format!(
208 "Keyspace '{}' does not exist",
209 keyspace
210 )));
211 }
212
213 let mut context = self.query_context.write().await;
214 context.keyspace = Some(keyspace.to_string());
215 Ok(())
216 }
217
218 pub async fn current_keyspace(&self) -> Option<String> {
220 let context = self.query_context.read().await;
221 context.keyspace.clone()
222 }
223
224 pub async fn select(
226 &self,
227 table: &str,
228 columns: Option<Vec<String>>,
229 where_clause: Option<String>,
230 limit: Option<usize>,
231 ) -> Result<ReplQueryResult> {
232 let start_time = Instant::now();
233 let context = self.query_context.read().await;
234
235 let keyspace = context.keyspace.as_ref().ok_or_else(|| {
237 Error::InvalidState("No keyspace selected. Use 'USE keyspace;' first.".to_string())
238 })?;
239
240 let effective_limit = limit
242 .or(context.limit)
243 .map(|l| l.min(self.config.max_rows_per_query))
244 .unwrap_or(self.config.max_rows_per_query);
245
246 if self.config.enable_query_cache {
248 let cache_key = format!(
249 "{}:{}:{}:{:?}:{:?}",
250 keyspace,
251 table,
252 columns.as_ref().map(|c| c.join(",")).unwrap_or_default(),
253 where_clause,
254 effective_limit
255 );
256
257 if let Some((cached_result, cached_at)) = self.get_cached_query(&cache_key).await {
258 let cache_ttl = Duration::from_secs(self.config.query_cache_ttl_seconds);
259 if cached_at.elapsed() < cache_ttl {
260 let mut result = cached_result;
261 result.metadata.from_cache = true;
262 result.metadata.execution_time = start_time.elapsed();
263 return Ok(result);
264 }
265 }
266 }
267
268 let rows = self
270 .data_manager
271 .query_data(
272 keyspace,
273 table,
274 where_clause.as_deref(),
275 Some(effective_limit),
276 )
277 .await?;
278
279 let schema = self.data_manager.get_table_schema(keyspace, table).await?;
281
282 let query_result = self.convert_to_query_result(rows.clone(), &columns, &schema)?;
284
285 let metadata = QueryMetadata {
287 execution_time: start_time.elapsed(),
288 rows_returned: rows.len(),
289 total_rows_available: None, from_cache: false,
291 source_files: rows
292 .iter()
293 .map(|r| r.metadata.source_file.clone())
294 .collect(),
295 bytes_read: self.estimate_bytes_read(&rows),
296 cache_hit_ratio: self.calculate_cache_hit_ratio().await,
297 };
298
299 let result = ReplQueryResult {
300 result: query_result,
301 metadata,
302 schema,
303 };
304
305 if self.config.enable_query_cache {
307 let cache_key = format!(
308 "{}:{}:{}:{:?}:{:?}",
309 keyspace,
310 table,
311 columns.as_ref().map(|c| c.join(",")).unwrap_or_default(),
312 where_clause,
313 effective_limit
314 );
315 self.cache_query_result(cache_key, result.clone()).await;
316 }
317
318 Ok(result)
319 }
320
321 pub async fn list_keyspaces(&self) -> Result<Vec<String>> {
323 self.data_manager.list_keyspaces().await
324 }
325
326 pub async fn list_tables(&self, keyspace: Option<&str>) -> Result<TableListing> {
328 let target_keyspace = if let Some(ks) = keyspace {
329 ks.to_string()
330 } else {
331 let context = self.query_context.read().await;
332 context
333 .keyspace
334 .as_ref()
335 .ok_or_else(|| Error::InvalidState("No keyspace selected".to_string()))?
336 .clone()
337 };
338
339 let table_names = self.data_manager.list_tables(&target_keyspace).await?;
340 let mut tables = Vec::new();
341
342 for table_name in table_names {
344 if let Ok(Some(_schema)) = self
345 .data_manager
346 .get_table_schema(&target_keyspace, &table_name)
347 .await
348 {
349 let discovery = self.get_discovery_cache().await;
351 if let Some((ref discovery_data, _)) = discovery {
352 for keyspace_info in &discovery_data.keyspaces {
353 if keyspace_info.name == target_keyspace {
354 for table_info in &keyspace_info.tables {
355 if table_info.name == table_name {
356 let summary = TableSummary {
357 name: table_name.clone(),
358 estimated_rows: table_info.estimated_rows,
359 size_bytes: table_info.total_size_bytes,
360 sstable_count: table_info.sstable_files.len(),
361 has_schema: table_info.schema.is_some(),
362 last_modified: table_info.last_modified,
363 health_status: self.assess_table_health(table_info),
364 };
365 tables.push(summary);
366 break;
367 }
368 }
369 break;
370 }
371 }
372 }
373 }
374 }
375
376 Ok(TableListing {
377 keyspace: target_keyspace,
378 tables,
379 discovered_at: Instant::now(),
380 })
381 }
382
383 pub async fn describe_table(&self, table: &str, keyspace: Option<&str>) -> Result<TableSchema> {
385 let target_keyspace = if let Some(ks) = keyspace {
386 ks.to_string()
387 } else {
388 let context = self.query_context.read().await;
389 context
390 .keyspace
391 .as_ref()
392 .ok_or_else(|| Error::InvalidState("No keyspace selected".to_string()))?
393 .clone()
394 };
395
396 self.data_manager
397 .get_table_schema(&target_keyspace, table)
398 .await?
399 .ok_or_else(|| {
400 Error::Table(format!(
401 "Table {}.{} not found or no schema available",
402 target_keyspace, table
403 ))
404 })
405 }
406
407 pub async fn get_system_info(&self) -> Result<SystemInfo> {
409 let cache_stats = self.data_manager.get_cache_stats();
410 let (discovery_in_progress, last_discovery) = self.data_manager.get_discovery_status();
411
412 let discovery_info = self.get_discovery_cache().await;
413 let (total_keyspaces, total_tables, total_sstables) =
414 if let Some((ref discovery, _)) = discovery_info {
415 (
416 discovery.keyspaces.len(),
417 discovery.keyspaces.iter().map(|ks| ks.tables.len()).sum(),
418 discovery.total_sstables,
419 )
420 } else {
421 (0, 0, 0)
422 };
423
424 let memory_usage_mb = cache_stats.current_cache_size_bytes / (1024 * 1024);
425
426 Ok(SystemInfo {
427 total_keyspaces,
428 total_tables,
429 total_sstables,
430 cache_stats,
431 discovery_in_progress,
432 last_discovery_time: last_discovery,
433 memory_usage_mb,
434 active_connections: 1, })
436 }
437
438 pub async fn update_context(&self, updates: QueryContextUpdate) -> Result<()> {
440 let mut context = self.query_context.write().await;
441
442 if let Some(timeout) = updates.timeout_seconds {
443 context.timeout = Duration::from_secs(timeout);
444 }
445
446 if let Some(limit) = updates.limit {
447 context.limit = Some(limit.min(self.config.max_rows_per_query));
448 }
449
450 if let Some(timing) = updates.timing_enabled {
451 context.timing_enabled = timing;
452 }
453
454 if let Some(page_size) = updates.page_size {
455 context.page_size = Some(page_size);
456 }
457
458 Ok(())
459 }
460
461 pub async fn get_context(&self) -> QueryContext {
463 let context = self.query_context.read().await;
464 context.clone()
465 }
466
467 pub async fn clear_caches(&self) -> Result<()> {
469 {
470 let mut query_cache = self.query_cache.write().await;
471 query_cache.clear();
472 }
473
474 {
475 let mut discovery_cache = self.discovery_cache.write().await;
476 *discovery_cache = None;
477 }
478
479 Ok(())
480 }
481
482 async fn get_cached_query(&self, cache_key: &str) -> Option<(ReplQueryResult, Instant)> {
485 let cache = self.query_cache.read().await;
486 cache.get(cache_key).cloned()
487 }
488
489 async fn cache_query_result(&self, cache_key: String, result: ReplQueryResult) {
490 let mut cache = self.query_cache.write().await;
491 cache.insert(cache_key, (result, Instant::now()));
492
493 if cache.len() > 100 {
495 let oldest_key = cache
496 .iter()
497 .min_by_key(|(_, (_, time))| time)
498 .map(|(key, _)| key.clone());
499
500 if let Some(key) = oldest_key {
501 cache.remove(&key);
502 }
503 }
504 }
505
506 async fn get_discovery_cache(&self) -> Option<(TableDiscovery, Instant)> {
507 let cache = self.discovery_cache.read().await;
508 cache.clone()
509 }
510
511 fn convert_to_query_result(
512 &self,
513 rows: Vec<DataRow>,
514 requested_columns: &Option<Vec<String>>,
515 schema: &Option<TableSchema>,
516 ) -> Result<QueryResult> {
517 let mut query_rows = Vec::new();
518
519 for data_row in rows {
520 let mut row_values = Vec::new();
521
522 let columns = if let Some(cols) = requested_columns {
524 cols.clone()
525 } else if let Some(schema) = schema {
526 schema.columns.iter().map(|c| c.name.clone()).collect()
527 } else {
528 data_row.columns.keys().cloned().collect()
529 };
530
531 for column_name in &columns {
533 let value = data_row
534 .columns
535 .get(column_name)
536 .cloned()
537 .unwrap_or(Value::Null);
538 row_values.push(value);
539 }
540
541 let query_row = QueryRow {
542 values: row_values
543 .into_iter()
544 .enumerate()
545 .map(|(i, value)| (format!("col_{i}"), value))
546 .collect(),
547 key: data_row.key.clone(),
548 metadata: crate::query::result::RowMetadata {
549 version: Some(data_row.metadata.generation),
550 ttl: data_row.metadata.ttl.map(|duration| duration.as_secs()),
551 tags: std::collections::HashMap::new(),
552 },
553 };
554 query_rows.push(query_row);
555 }
556
557 Ok(QueryResult {
558 rows: query_rows,
559 rows_affected: 0,
560 execution_time_ms: 0,
561 metadata: crate::query::result::QueryMetadata::default(),
562 })
563 }
564
565 fn estimate_bytes_read(&self, rows: &[DataRow]) -> u64 {
566 (rows.len() * 256) as u64
568 }
569
570 async fn calculate_cache_hit_ratio(&self) -> f64 {
571 let stats = self.data_manager.get_cache_stats();
572 let total = stats.cache_hits + stats.cache_misses;
573 if total > 0 {
574 stats.cache_hits as f64 / total as f64
575 } else {
576 0.0
577 }
578 }
579
580 fn assess_table_health(&self, table_info: &TableInfo) -> String {
581 let healthy_files = table_info
582 .sstable_files
583 .iter()
584 .filter(|f| {
585 f.health_status == crate::storage::sstable_data_manager::FileHealthStatus::Healthy
586 })
587 .count();
588
589 let total_files = table_info.sstable_files.len();
590
591 if healthy_files == total_files {
592 "Healthy".to_string()
593 } else if healthy_files > total_files / 2 {
594 "Degraded".to_string()
595 } else {
596 "Corrupted".to_string()
597 }
598 }
599}
600
601#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct SystemInfo {
604 pub total_keyspaces: usize,
606 pub total_tables: usize,
608 pub total_sstables: usize,
610 pub cache_stats: CacheStatistics,
612 pub discovery_in_progress: bool,
614 pub last_discovery_time: Option<Duration>,
616 pub memory_usage_mb: usize,
618 pub active_connections: usize,
620}
621
622#[derive(Debug, Clone, Default)]
624pub struct QueryContextUpdate {
625 pub timeout_seconds: Option<u64>,
627 pub limit: Option<usize>,
629 pub timing_enabled: Option<bool>,
631 pub page_size: Option<usize>,
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638 use tempfile::TempDir;
639
640 #[tokio::test]
641 async fn test_repl_api_creation() {
642 let temp_dir = TempDir::new().unwrap();
643 let config = ReplDataConfig::default();
644 let core_config = Config::default();
645 let platform = Arc::new(Platform::new(&core_config).await.unwrap());
646 let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
647
648 let api = ReplDataApi::new(config, platform, core_config, schema_manager)
649 .await
650 .unwrap();
651
652 let context = api.get_context().await;
653 assert!(context.keyspace.is_none());
654 assert_eq!(context.limit, Some(100));
655 }
656
657 #[tokio::test]
658 async fn test_query_context_updates() {
659 let temp_dir = TempDir::new().unwrap();
660 let config = ReplDataConfig::default();
661 let core_config = Config::default();
662 let platform = Arc::new(Platform::new(&core_config).await.unwrap());
663 let schema_manager = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
664
665 let api = ReplDataApi::new(config, platform, core_config, schema_manager)
666 .await
667 .unwrap();
668
669 let updates = QueryContextUpdate {
670 timeout_seconds: Some(60),
671 limit: Some(200),
672 timing_enabled: Some(true),
673 page_size: Some(25),
674 };
675
676 api.update_context(updates).await.unwrap();
677
678 let context = api.get_context().await;
679 assert_eq!(context.timeout, Duration::from_secs(60));
680 assert_eq!(context.limit, Some(200));
681 assert!(context.timing_enabled);
682 assert_eq!(context.page_size, Some(25));
683 }
684}