Skip to main content

cqlite_cli/repl/
session.rs

1// REPL Session Management
2//
3// Manages the state and context of a REPL session, including database connections,
4// current keyspace, configuration, and session persistence.
5
6use super::{ReplError, ReplResult};
7use crate::config::Config;
8use anyhow::Result;
9use cqlite_core::{Database, QueryResult};
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14/// Current state of the REPL session
15#[derive(Debug, Clone, PartialEq)]
16pub enum SessionState {
17    /// Session is initializing
18    Initializing,
19    /// Session is ready for commands
20    Ready,
21    /// Session is executing a command
22    Executing,
23    /// Session is in error state
24    Error(String),
25    /// Session is shutting down
26    Shutdown,
27}
28
29/// REPL session context and state
30pub struct ReplSession {
31    /// Database connection
32    database: Arc<Database>,
33    /// Session configuration
34    config: Config,
35    /// Database file path
36    db_path: PathBuf,
37    /// Current session state
38    state: SessionState,
39    /// Current keyspace (if any)
40    current_keyspace: Option<String>,
41    /// Data directory for SSTable files
42    data_dir: Option<PathBuf>,
43    /// Session variables
44    variables: HashMap<String, String>,
45    /// Connection metadata
46    connection_info: ConnectionInfo,
47    /// Performance metrics
48    metrics: SessionMetrics,
49    /// Schema registry for coverage reporting
50    schema_registry:
51        Option<Arc<tokio::sync::RwLock<cqlite_core::schema::registry::SchemaRegistry>>>,
52}
53
54/// Connection information
55#[derive(Debug, Clone)]
56pub struct ConnectionInfo {
57    /// Database version
58    pub version: String,
59    /// Connection timestamp
60    pub connected_at: std::time::SystemTime,
61    /// Last activity timestamp
62    pub last_activity: std::time::SystemTime,
63    /// Total queries executed
64    pub queries_executed: u64,
65    /// Total errors encountered
66    pub errors_count: u64,
67}
68
69/// Session performance metrics
70#[derive(Debug, Clone, Default)]
71pub struct SessionMetrics {
72    /// Total execution time (microseconds)
73    pub total_execution_time_us: u64,
74    /// Query count by type
75    pub query_counts: HashMap<String, u64>,
76    /// Average query time (microseconds)
77    pub avg_query_time_us: f64,
78    /// Memory usage (bytes)
79    pub memory_usage_bytes: u64,
80    /// Cache statistics
81    pub cache_hits: u64,
82    pub cache_misses: u64,
83}
84
85impl ReplSession {
86    /// Create a new REPL session
87    pub fn new(db_path: &Path, config: Config, database: Database) -> ReplResult<Self> {
88        let connection_info = ConnectionInfo {
89            version: env!("CARGO_PKG_VERSION").to_string(),
90            connected_at: std::time::SystemTime::now(),
91            last_activity: std::time::SystemTime::now(),
92            queries_executed: 0,
93            errors_count: 0,
94        };
95
96        Ok(Self {
97            database: Arc::new(database),
98            config,
99            db_path: db_path.to_path_buf(),
100            state: SessionState::Initializing,
101            current_keyspace: None,
102            data_dir: None,
103            variables: HashMap::new(),
104            connection_info,
105            metrics: SessionMetrics::default(),
106            schema_registry: None,
107        })
108    }
109
110    /// Initialize the session
111    pub async fn initialize(&mut self) -> ReplResult<()> {
112        self.state = SessionState::Ready;
113        self.connection_info.last_activity = std::time::SystemTime::now();
114
115        // Try to load default keyspace from config
116        if let Some(default_keyspace) = self.config.default_keyspace.clone() {
117            if !default_keyspace.is_empty() {
118                let _ = self.use_keyspace(&default_keyspace).await;
119            }
120        }
121
122        // Set data directory if configured
123        if let Some(ref data_dir) = self.config.data_directory {
124            if !data_dir.as_os_str().is_empty() {
125                self.data_dir = Some(data_dir.clone());
126            }
127        }
128
129        Ok(())
130    }
131
132    /// Get current session state
133    pub fn state(&self) -> &SessionState {
134        &self.state
135    }
136
137    /// Get current keyspace
138    pub fn current_keyspace(&self) -> Option<&String> {
139        self.current_keyspace.as_ref()
140    }
141
142    /// Get database path
143    pub fn db_path(&self) -> &Path {
144        &self.db_path
145    }
146
147    /// Get reference to the database (for status metrics collection)
148    pub fn database(&self) -> Option<&Database> {
149        Some(&self.database)
150    }
151
152    /// Get data directory
153    pub fn data_dir(&self) -> Option<&Path> {
154        self.data_dir.as_deref()
155    }
156
157    /// Set data directory
158    pub fn set_data_dir(&mut self, path: Option<PathBuf>) {
159        self.data_dir = path;
160    }
161
162    /// Switch to a keyspace
163    pub async fn use_keyspace(&mut self, keyspace: &str) -> ReplResult<()> {
164        self.state = SessionState::Executing;
165        self.connection_info.last_activity = std::time::SystemTime::now();
166
167        // Validate keyspace exists by querying system tables
168        let query = format!(
169            "SELECT keyspace_name FROM system.keyspaces WHERE keyspace_name = '{}'",
170            keyspace
171        );
172
173        match self.database.execute(&query).await {
174            Ok(result) => {
175                if result.rows.is_empty() {
176                    self.state = SessionState::Ready;
177                    return Err(ReplError::Session(format!(
178                        "Keyspace '{}' not found",
179                        keyspace
180                    )));
181                }
182
183                self.current_keyspace = Some(keyspace.to_string());
184                self.state = SessionState::Ready;
185                Ok(())
186            }
187            Err(e) => {
188                // If system query fails, allow setting anyway (might be valid in data directory)
189                self.current_keyspace = Some(keyspace.to_string());
190                self.state = SessionState::Ready;
191                self.connection_info.errors_count += 1;
192
193                // Log warning but don't fail
194                log::warn!("Could not verify keyspace '{}': {}", keyspace, e);
195                Ok(())
196            }
197        }
198    }
199
200    /// Execute a query
201    pub async fn execute_query(&mut self, query: &str) -> ReplResult<QueryResult> {
202        self.state = SessionState::Executing;
203        self.connection_info.last_activity = std::time::SystemTime::now();
204
205        let start_time = std::time::Instant::now();
206
207        match self.database.execute(query).await {
208            Ok(result) => {
209                let elapsed = start_time.elapsed();
210                self.update_metrics(query, elapsed, true);
211                self.connection_info.queries_executed += 1;
212                self.state = SessionState::Ready;
213                Ok(result)
214            }
215            Err(e) => {
216                let elapsed = start_time.elapsed();
217                self.update_metrics(query, elapsed, false);
218                self.connection_info.errors_count += 1;
219                self.state = SessionState::Ready;
220                Err(ReplError::Database(e.into()))
221            }
222        }
223    }
224
225    /// List available tables
226    pub async fn list_tables(&mut self) -> ReplResult<Vec<String>> {
227        self.state = SessionState::Executing;
228
229        let query = if let Some(ref keyspace) = self.current_keyspace {
230            format!(
231                "SELECT table_name FROM system.tables WHERE keyspace_name = '{}'",
232                keyspace
233            )
234        } else {
235            "SELECT keyspace_name, table_name FROM system.tables WHERE keyspace_name != 'system'"
236                .to_string()
237        };
238
239        match self.database.execute(&query).await {
240            Ok(result) => {
241                self.state = SessionState::Ready;
242                let mut tables = Vec::new();
243
244                for row in &result.rows {
245                    if let Some(ref _keyspace) = self.current_keyspace {
246                        // Just table names for current keyspace
247                        if let Some(table_name) = row.get("table_name") {
248                            tables.push(table_name.to_string());
249                        }
250                    } else {
251                        // Qualified names for all keyspaces
252                        if let (Some(keyspace_name), Some(table_name)) =
253                            (row.get("keyspace_name"), row.get("table_name"))
254                        {
255                            tables.push(format!("{}.{}", keyspace_name, table_name));
256                        }
257                    }
258                }
259
260                Ok(tables)
261            }
262            Err(e) => {
263                self.state = SessionState::Ready;
264
265                // Fallback to data directory scanning if system query fails
266                if let Some(ref data_dir) = self.data_dir {
267                    match self.scan_data_directory_tables(data_dir).await {
268                        Ok(tables) => Ok(tables),
269                        Err(_) => Err(ReplError::Database(e.into())),
270                    }
271                } else {
272                    Err(ReplError::Database(e.into()))
273                }
274            }
275        }
276    }
277
278    /// List available keyspaces
279    pub async fn list_keyspaces(&mut self) -> ReplResult<Vec<String>> {
280        self.state = SessionState::Executing;
281
282        let query = "SELECT keyspace_name FROM system.keyspaces";
283
284        match self.database.execute(query).await {
285            Ok(result) => {
286                self.state = SessionState::Ready;
287                let mut keyspaces = Vec::new();
288
289                for row in &result.rows {
290                    if let Some(keyspace_name) = row.get("keyspace_name") {
291                        keyspaces.push(keyspace_name.to_string());
292                    }
293                }
294
295                Ok(keyspaces)
296            }
297            Err(e) => {
298                self.state = SessionState::Ready;
299
300                // Fallback to data directory scanning
301                if let Some(ref data_dir) = self.data_dir {
302                    match self.scan_data_directory_keyspaces(data_dir).await {
303                        Ok(keyspaces) => Ok(keyspaces),
304                        Err(_) => Err(ReplError::Database(e.into())),
305                    }
306                } else {
307                    Err(ReplError::Database(e.into()))
308                }
309            }
310        }
311    }
312
313    /// Describe an object (table, keyspace, etc.)
314    pub async fn describe_object(&mut self, object_name: &str) -> ReplResult<String> {
315        self.state = SessionState::Executing;
316
317        // Parse object name (could be keyspace.table or just table)
318        let (keyspace, table) = if object_name.contains('.') {
319            let parts: Vec<&str> = object_name.split('.').collect();
320            if parts.len() == 2 {
321                (Some(parts[0]), parts[1])
322            } else {
323                (self.current_keyspace.as_deref(), object_name)
324            }
325        } else {
326            (self.current_keyspace.as_deref(), object_name)
327        };
328
329        if let Some(ks) = keyspace {
330            match self.describe_table(ks, table).await {
331                Ok(description) => {
332                    self.state = SessionState::Ready;
333                    Ok(description)
334                }
335                Err(e) => {
336                    self.state = SessionState::Ready;
337                    Err(e)
338                }
339            }
340        } else {
341            self.state = SessionState::Ready;
342            Err(ReplError::Session(
343                "No keyspace specified and no current keyspace set".to_string(),
344            ))
345        }
346    }
347
348    /// Describe a specific table
349    async fn describe_table(&self, keyspace: &str, table: &str) -> ReplResult<String> {
350        let query = format!(
351            "SELECT column_name, type, kind FROM system.columns WHERE keyspace_name = '{}' AND table_name = '{}' ORDER BY position",
352            keyspace, table
353        );
354
355        match self.database.execute(&query).await {
356            Ok(result) => {
357                if result.rows.is_empty() {
358                    return Err(ReplError::Session(format!(
359                        "Table '{}.{}' not found",
360                        keyspace, table
361                    )));
362                }
363
364                let mut description = String::new();
365                description.push_str(&format!("Table: {}.{}\n", keyspace, table));
366                description.push_str("Columns:\n");
367
368                for row in &result.rows {
369                    if let (Some(col_name), Some(col_type), Some(col_kind)) =
370                        (row.get("column_name"), row.get("type"), row.get("kind"))
371                    {
372                        let kind_desc = match col_kind.to_string().as_str() {
373                            "partition_key" => " (PARTITION KEY)",
374                            "clustering" => " (CLUSTERING KEY)",
375                            "regular" => "",
376                            _ => "",
377                        };
378                        description
379                            .push_str(&format!("  {} {}{}\n", col_name, col_type, kind_desc));
380                    }
381                }
382
383                Ok(description)
384            }
385            Err(e) => Err(ReplError::Database(e.into())),
386        }
387    }
388
389    /// Get session variable
390    pub fn get_variable(&self, name: &str) -> Option<&String> {
391        self.variables.get(name)
392    }
393
394    /// Set session variable
395    pub fn set_variable(&mut self, name: String, value: String) {
396        self.variables.insert(name, value);
397    }
398
399    /// Get connection information
400    pub fn connection_info(&self) -> &ConnectionInfo {
401        &self.connection_info
402    }
403
404    /// Get session metrics
405    pub fn metrics(&self) -> &SessionMetrics {
406        &self.metrics
407    }
408
409    /// Update session metrics
410    fn update_metrics(&mut self, query: &str, elapsed: std::time::Duration, success: bool) {
411        let elapsed_us = elapsed.as_micros() as u64;
412        self.metrics.total_execution_time_us += elapsed_us;
413
414        // Update average
415        let total_queries = self.connection_info.queries_executed + if success { 1 } else { 0 };
416        if total_queries > 0 {
417            self.metrics.avg_query_time_us =
418                self.metrics.total_execution_time_us as f64 / total_queries as f64;
419        }
420
421        // Categorize query type
422        let query_type = self.categorize_query(query);
423        *self.metrics.query_counts.entry(query_type).or_insert(0) += 1;
424    }
425
426    /// Categorize query for metrics
427    fn categorize_query(&self, query: &str) -> String {
428        let upper = query.to_uppercase();
429        let trimmed = upper.trim();
430
431        if trimmed.starts_with("SELECT") {
432            "SELECT".to_string()
433        } else if trimmed.starts_with("INSERT") {
434            "INSERT".to_string()
435        } else if trimmed.starts_with("UPDATE") {
436            "UPDATE".to_string()
437        } else if trimmed.starts_with("DELETE") {
438            "DELETE".to_string()
439        } else if trimmed.starts_with("CREATE") {
440            "CREATE".to_string()
441        } else if trimmed.starts_with("ALTER") {
442            "ALTER".to_string()
443        } else if trimmed.starts_with("DROP") {
444            "DROP".to_string()
445        } else if trimmed.starts_with("DESCRIBE") {
446            "DESCRIBE".to_string()
447        } else {
448            "OTHER".to_string()
449        }
450    }
451
452    /// Scan data directory for tables (fallback when system queries fail)
453    async fn scan_data_directory_tables(&self, data_dir: &Path) -> Result<Vec<String>> {
454        use std::fs;
455
456        let mut tables = Vec::new();
457
458        if let Some(ref keyspace) = self.current_keyspace {
459            // Scan specific keyspace directory
460            let keyspace_dir = data_dir.join(keyspace);
461            if keyspace_dir.exists() {
462                for entry in fs::read_dir(&keyspace_dir)? {
463                    let entry = entry?;
464                    if entry.path().is_dir() {
465                        if let Some(dir_name) = entry.file_name().to_str() {
466                            if let Some(table_name) = self.extract_table_name(dir_name) {
467                                tables.push(table_name);
468                            }
469                        }
470                    }
471                }
472            }
473        } else {
474            // Scan all keyspace directories
475            for entry in fs::read_dir(data_dir)? {
476                let entry = entry?;
477                if entry.path().is_dir() {
478                    if let Some(keyspace_name) = entry.file_name().to_str() {
479                        if keyspace_name.starts_with('.') || keyspace_name == "system" {
480                            continue;
481                        }
482
483                        let keyspace_dir = entry.path();
484                        for table_entry in fs::read_dir(&keyspace_dir)? {
485                            let table_entry = table_entry?;
486                            if table_entry.path().is_dir() {
487                                if let Some(dir_name) = table_entry.file_name().to_str() {
488                                    if let Some(table_name) = self.extract_table_name(dir_name) {
489                                        tables.push(format!("{}.{}", keyspace_name, table_name));
490                                    }
491                                }
492                            }
493                        }
494                    }
495                }
496            }
497        }
498
499        Ok(tables)
500    }
501
502    /// Scan data directory for keyspaces
503    async fn scan_data_directory_keyspaces(&self, data_dir: &Path) -> Result<Vec<String>> {
504        use std::fs;
505
506        let mut keyspaces = Vec::new();
507
508        for entry in fs::read_dir(data_dir)? {
509            let entry = entry?;
510            if entry.path().is_dir() {
511                if let Some(name) = entry.file_name().to_str() {
512                    if !name.starts_with('.') && name != "system" {
513                        keyspaces.push(name.to_string());
514                    }
515                }
516            }
517        }
518
519        keyspaces.sort();
520        Ok(keyspaces)
521    }
522
523    /// Extract table name from SSTable directory name
524    fn extract_table_name(&self, dir_name: &str) -> Option<String> {
525        // Expected format: tablename-uuid
526        if let Some(dash_pos) = dir_name.find('-') {
527            let table_part = &dir_name[..dash_pos];
528            if !table_part.is_empty() && table_part.chars().all(|c| c.is_alphanumeric() || c == '_')
529            {
530                return Some(table_part.to_string());
531            }
532        }
533        None
534    }
535
536    /// Shutdown the session
537    pub async fn shutdown(&mut self) -> ReplResult<()> {
538        self.state = SessionState::Shutdown;
539
540        // Perform cleanup tasks
541        self.save_session_state().await?;
542
543        Ok(())
544    }
545
546    /// Save session state for persistence
547    async fn save_session_state(&self) -> ReplResult<()> {
548        // This would save session state to a file or database
549        // For now, just log some statistics
550        log::info!(
551            "Session ending. Queries executed: {}, Errors: {}",
552            self.connection_info.queries_executed,
553            self.connection_info.errors_count
554        );
555        Ok(())
556    }
557
558    /// Export session metrics as a report
559    pub fn export_metrics(&self) -> String {
560        let mut report = String::new();
561
562        report.push_str("=== CQLite Session Report ===\n");
563        report.push_str(&format!("Database: {}\n", self.db_path.display()));
564        report.push_str(&format!(
565            "Session Duration: {:?}\n",
566            self.connection_info
567                .last_activity
568                .duration_since(self.connection_info.connected_at)
569                .unwrap_or_default()
570        ));
571        report.push_str(&format!(
572            "Queries Executed: {}\n",
573            self.connection_info.queries_executed
574        ));
575        report.push_str(&format!("Errors: {}\n", self.connection_info.errors_count));
576        report.push_str(&format!(
577            "Average Query Time: {:.2}ms\n",
578            self.metrics.avg_query_time_us / 1000.0
579        ));
580
581        if !self.metrics.query_counts.is_empty() {
582            report.push_str("\nQuery Types:\n");
583            for (query_type, count) in &self.metrics.query_counts {
584                report.push_str(&format!("  {}: {}\n", query_type, count));
585            }
586        }
587
588        if let Some(ref keyspace) = self.current_keyspace {
589            report.push_str(&format!("Current Keyspace: {}\n", keyspace));
590        }
591
592        report
593    }
594
595    /// Replace the Database instance with a new one
596    ///
597    /// Used when rebuilding the database after ingestion changes.
598    /// The old Database will be dropped when the Arc refcount reaches zero.
599    pub fn replace_database(&mut self, new_database: Database) -> ReplResult<()> {
600        self.database = Arc::new(new_database);
601        Ok(())
602    }
603
604    /// Get reference to the session configuration
605    pub fn config(&self) -> &Config {
606        &self.config
607    }
608
609    /// Get reference to SchemaRegistry for coverage reporting
610    pub fn schema_registry(
611        &self,
612    ) -> Option<Arc<tokio::sync::RwLock<cqlite_core::schema::registry::SchemaRegistry>>> {
613        self.schema_registry.clone()
614    }
615
616    /// Set the schema registry
617    pub fn set_schema_registry(
618        &mut self,
619        registry: Option<Arc<tokio::sync::RwLock<cqlite_core::schema::registry::SchemaRegistry>>>,
620    ) {
621        self.schema_registry = registry;
622    }
623}