Skip to main content

cqlite_cli/repl/
engine.rs

1// Core REPL Engine Implementation
2//
3// The main REPL engine that coordinates command parsing, execution, and output formatting.
4// This is the central component that integrates with the existing CLI infrastructure.
5
6use super::{
7    commands, CommandParser, CommandType, CompletionEngine, ExecutionResult, HistoryManager,
8    OutputFormat, ParsedCommand, ReplError, ReplMode, ReplResult, ReplSession,
9};
10use crate::config::Config;
11use crate::status_metrics::{HealthIndicator, StatusMetrics, METRICS_REFRESH_INTERVAL};
12use colored::Colorize;
13use cqlite_core::{Database, QueryResult};
14use std::io::{self, IsTerminal, Write};
15use std::path::Path;
16
17/// Core REPL engine configuration
18#[derive(Debug, Clone)]
19pub struct ReplConfig {
20    /// REPL mode (basic, tui, interactive)
21    pub mode: ReplMode,
22    /// Enable command history
23    pub enable_history: bool,
24    /// Enable command completion
25    pub enable_completion: bool,
26    /// Enable colored output
27    pub enable_colors: bool,
28    /// Default output format
29    pub output_format: OutputFormat,
30    /// Maximum history size
31    pub max_history_size: usize,
32    /// Page size for results
33    pub page_size: usize,
34    /// Enable timing display
35    pub show_timing: bool,
36    /// Enable paging for large results
37    pub enable_paging: bool,
38    /// Prompt customization
39    pub prompt: String,
40    /// Secondary prompt for multi-line commands
41    pub prompt_continuation: String,
42    /// Show status line before prompt (Issue #242)
43    pub show_status_line: bool,
44}
45
46impl Default for ReplConfig {
47    fn default() -> Self {
48        Self {
49            mode: ReplMode::Interactive,
50            enable_history: true,
51            enable_completion: true,
52            enable_colors: true,
53            output_format: OutputFormat::Table,
54            max_history_size: 1000,
55            page_size: 50,
56            show_timing: false,
57            enable_paging: true,
58            prompt: "cqlite> ".to_string(),
59            prompt_continuation: "    -> ".to_string(),
60            show_status_line: true,
61        }
62    }
63}
64
65/// Main REPL engine
66pub struct ReplEngine {
67    /// REPL configuration
68    config: ReplConfig,
69    /// Command parser
70    parser: CommandParser,
71    /// Session state
72    session: ReplSession,
73    /// Command history manager
74    history: Option<HistoryManager>,
75    /// Completion engine
76    completion: Option<CompletionEngine>,
77    /// Current multi-line command buffer
78    command_buffer: String,
79    /// Whether we're in multi-line mode
80    in_multiline: bool,
81    /// Currently loaded schema paths (for refresh)
82    schema_paths: Vec<std::path::PathBuf>,
83    /// Cassandra version hint
84    version_hint: Option<String>,
85    /// Cached status metrics for status line (Issue #242)
86    cached_metrics: Option<StatusMetrics>,
87}
88
89impl ReplEngine {
90    /// Create a new REPL engine
91    pub fn new(
92        config: ReplConfig,
93        db_path: &Path,
94        app_config: Config,
95        database: Database,
96    ) -> ReplResult<Self> {
97        Self::with_schema_registry(config, db_path, app_config, database, None)
98    }
99
100    /// Create a new REPL engine with an optional pre-loaded SchemaRegistry
101    ///
102    /// This is used when the CLI performs ingestion at startup, allowing
103    /// the REPL to have immediate access to schema information for commands
104    /// like `:describe` without needing to run `:schema refresh` first.
105    pub fn with_schema_registry(
106        config: ReplConfig,
107        db_path: &Path,
108        app_config: Config,
109        database: Database,
110        schema_registry: Option<
111            std::sync::Arc<tokio::sync::RwLock<cqlite_core::schema::registry::SchemaRegistry>>,
112        >,
113    ) -> ReplResult<Self> {
114        let mut session = ReplSession::new(db_path, app_config, database)?;
115
116        // If schema registry was provided from startup ingestion, set it
117        if let Some(registry) = schema_registry {
118            session.set_schema_registry(Some(registry));
119        }
120
121        let parser = CommandParser::new();
122
123        let history = if config.enable_history {
124            Some(HistoryManager::new(config.max_history_size)?)
125        } else {
126            None
127        };
128
129        let completion = if config.enable_completion {
130            Some(CompletionEngine::new())
131        } else {
132            None
133        };
134
135        Ok(Self {
136            config,
137            parser,
138            session,
139            history,
140            completion,
141            command_buffer: String::new(),
142            in_multiline: false,
143            schema_paths: Vec::new(),
144            version_hint: None,
145            cached_metrics: None,
146        })
147    }
148
149    /// Start the REPL loop
150    pub async fn run(&mut self) -> ReplResult<()> {
151        // Initialize session (loads data dir, default keyspace, etc.)
152        self.session.initialize().await?;
153
154        self.display_startup_banner().await?;
155
156        match self.config.mode {
157            ReplMode::Basic => self.run_basic_repl().await,
158            ReplMode::Interactive => self.run_interactive_repl().await,
159            ReplMode::Tui => self.run_tui_repl().await,
160        }
161    }
162
163    /// Run basic REPL mode
164    async fn run_basic_repl(&mut self) -> ReplResult<()> {
165        let stdin = io::stdin();
166        let mut input = String::new();
167
168        loop {
169            // Display prompt (with status line)
170            self.display_prompt().await?;
171
172            // Read input
173            input.clear();
174            match stdin.read_line(&mut input) {
175                Ok(0) => break, // EOF
176                Ok(_) => {
177                    let trimmed = input.trim();
178                    if trimmed.is_empty() {
179                        continue;
180                    }
181
182                    match self.process_input(trimmed).await {
183                        Ok(ExecutionResult::Continue) => continue,
184                        Ok(ExecutionResult::Exit) => break,
185                        Ok(ExecutionResult::ExitWithCode(code)) => {
186                            // Convert exit code to appropriate ReplError
187                            return Err(match code {
188                                3 => ReplError::SchemaError("Schema error occurred".to_string()),
189                                4 => ReplError::DataDirectoryError(
190                                    "Data directory error occurred".to_string(),
191                                ),
192                                5 => {
193                                    ReplError::UnsupportedFeature("Unsupported feature".to_string())
194                                }
195                                _ => ReplError::Session(format!("Exit with code {}", code)),
196                            });
197                        }
198                        Err(e) => {
199                            // Print error but continue REPL (non-fatal errors)
200                            eprintln!("{} {}", "Error:".red().bold(), e);
201                            // For certain errors, we should exit instead of continuing
202                            if matches!(
203                                e,
204                                ReplError::SchemaError(_)
205                                    | ReplError::DataDirectoryError(_)
206                                    | ReplError::UnsupportedFeature(_)
207                            ) {
208                                return Err(e);
209                            }
210                            continue;
211                        }
212                    }
213                }
214                Err(e) => {
215                    eprintln!("{} Input error: {}", "Error:".red().bold(), e);
216                    break;
217                }
218            }
219        }
220
221        self.display_goodbye().await?;
222        Ok(())
223    }
224
225    /// Run interactive REPL mode (with enhanced features)
226    async fn run_interactive_repl(&mut self) -> ReplResult<()> {
227        use rustyline::error::ReadlineError;
228        use rustyline::DefaultEditor;
229
230        // Create rustyline editor
231        let mut rl: DefaultEditor = DefaultEditor::new()
232            .map_err(|e| ReplError::Session(format!("Failed to initialize line editor: {}", e)))?;
233
234        // Load history from file if persistent history is enabled
235        if self.history.is_some() {
236            // Try to get history file path from config
237            let history_file = self.session.config().repl.history_file.clone().or_else(|| {
238                // Fallback to default location in user's home directory
239                dirs::home_dir().map(|home| home.join(".cqlite_history"))
240            });
241
242            if let Some(ref path) = history_file {
243                // Load history from file (ignore errors if file doesn't exist)
244                let _ = rl.load_history(path);
245            }
246        }
247
248        // Display startup banner
249        self.display_startup_banner().await?;
250
251        loop {
252            // Display status line before prompt (not during multiline)
253            if !self.in_multiline {
254                self.display_status_line().await?;
255            }
256
257            // Get the appropriate prompt
258            let prompt = if self.in_multiline {
259                self.config.prompt_continuation.clone()
260            } else {
261                self.format_prompt()
262            };
263
264            // Read line with rustyline (supports arrow keys, history, etc.)
265            let readline = rl.readline(&prompt);
266
267            match readline {
268                Ok(line) => {
269                    let trimmed = line.trim();
270                    if trimmed.is_empty() {
271                        continue;
272                    }
273
274                    // Add to rustyline history
275                    let _ = rl.add_history_entry(&line);
276
277                    match self.process_input(trimmed).await {
278                        Ok(ExecutionResult::Continue) => continue,
279                        Ok(ExecutionResult::Exit) => break,
280                        Ok(ExecutionResult::ExitWithCode(code)) => {
281                            // Save history before exiting
282                            if let Some(history_file) =
283                                self.session.config().repl.history_file.clone().or_else(|| {
284                                    dirs::home_dir().map(|home| home.join(".cqlite_history"))
285                                })
286                            {
287                                let _ = rl.save_history(&history_file);
288                            }
289
290                            // Convert exit code to appropriate ReplError
291                            return Err(match code {
292                                3 => ReplError::SchemaError("Schema error occurred".to_string()),
293                                4 => ReplError::DataDirectoryError(
294                                    "Data directory error occurred".to_string(),
295                                ),
296                                5 => {
297                                    ReplError::UnsupportedFeature("Unsupported feature".to_string())
298                                }
299                                _ => ReplError::Session(format!("Exit with code {}", code)),
300                            });
301                        }
302                        Err(e) => {
303                            // Print error but continue REPL (non-fatal errors)
304                            eprintln!("{} {}", "Error:".red().bold(), e);
305                            // For certain errors, we should exit instead of continuing
306                            if matches!(
307                                e,
308                                ReplError::SchemaError(_)
309                                    | ReplError::DataDirectoryError(_)
310                                    | ReplError::UnsupportedFeature(_)
311                            ) {
312                                // Save history before exiting on fatal error
313                                if let Some(history_file) =
314                                    self.session.config().repl.history_file.clone().or_else(|| {
315                                        dirs::home_dir().map(|home| home.join(".cqlite_history"))
316                                    })
317                                {
318                                    let _ = rl.save_history(&history_file);
319                                }
320                                return Err(e);
321                            }
322                            continue;
323                        }
324                    }
325                }
326                Err(ReadlineError::Interrupted) => {
327                    // Ctrl-C pressed
328                    if self.in_multiline {
329                        // Cancel multi-line input
330                        self.reset_command_buffer();
331                        println!("^C");
332                        continue;
333                    } else {
334                        // Exit on Ctrl-C at regular prompt
335                        println!("^C");
336                        break;
337                    }
338                }
339                Err(ReadlineError::Eof) => {
340                    // Ctrl-D pressed (EOF)
341                    break;
342                }
343                Err(err) => {
344                    eprintln!("{} Readline error: {}", "Error:".red().bold(), err);
345                    break;
346                }
347            }
348        }
349
350        // Save history to file when exiting
351        if let Some(history_file) = self
352            .session
353            .config()
354            .repl
355            .history_file
356            .clone()
357            .or_else(|| dirs::home_dir().map(|home| home.join(".cqlite_history")))
358        {
359            // Ensure parent directory exists
360            if let Some(parent) = history_file.parent() {
361                let _ = std::fs::create_dir_all(parent);
362            }
363
364            // Save history
365            if let Err(e) = rl.save_history(&history_file) {
366                eprintln!("{} Failed to save history: {}", "Warning:".yellow(), e);
367            }
368        }
369
370        self.display_goodbye().await?;
371        Ok(())
372    }
373
374    /// Run TUI REPL mode
375    async fn run_tui_repl(&mut self) -> ReplResult<()> {
376        // Placeholder for TUI integration
377        // This would integrate with the existing tui.rs module
378        println!(
379            "{} TUI mode not yet implemented in core engine",
380            "Info:".cyan().bold()
381        );
382        self.run_interactive_repl().await
383    }
384
385    /// Process a line of input
386    pub fn process_input<'a>(
387        &'a mut self,
388        input: &'a str,
389    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ReplResult<ExecutionResult>> + 'a>>
390    {
391        Box::pin(async move { self.process_input_impl(input).await })
392    }
393
394    /// Internal implementation of process_input
395    async fn process_input_impl(&mut self, input: &str) -> ReplResult<ExecutionResult> {
396        // Handle multi-line commands
397        if self.should_continue_multiline(input) {
398            self.add_to_command_buffer(input);
399            return Ok(ExecutionResult::Continue);
400        }
401
402        // Complete command (either single line or end of multi-line)
403        let command = if self.in_multiline {
404            self.add_to_command_buffer(input);
405            let full_command = self.command_buffer.clone();
406            self.reset_command_buffer();
407            full_command
408        } else {
409            input.to_string()
410        };
411
412        // Add to history
413        if let Some(ref mut history) = self.history {
414            history.add_command(&command)?;
415        }
416
417        // Parse and execute command
418        match self.parser.parse(&command) {
419            Ok(parsed_command) => self.execute_command(parsed_command).await,
420            Err(e) => {
421                eprintln!("{} Command parsing error: {}", "Error:".red().bold(), e);
422                Ok(ExecutionResult::Continue)
423            }
424        }
425    }
426
427    /// Execute a parsed command
428    async fn execute_command(&mut self, command: ParsedCommand) -> ReplResult<ExecutionResult> {
429        match command.command_type {
430            CommandType::Exit => Ok(ExecutionResult::Exit),
431            CommandType::Help { topic } => {
432                self.execute_help_command(topic.as_deref()).await?;
433                Ok(ExecutionResult::Continue)
434            }
435            CommandType::Config { operation } => {
436                self.execute_config_command(operation).await?;
437                Ok(ExecutionResult::Continue)
438            }
439            CommandType::Tables => {
440                self.execute_tables_command().await?;
441                Ok(ExecutionResult::Continue)
442            }
443            CommandType::Describe { object_name } => {
444                self.execute_describe_command(&object_name).await?;
445                Ok(ExecutionResult::Continue)
446            }
447            CommandType::Use { keyspace } => {
448                self.execute_use_command(&keyspace).await?;
449                Ok(ExecutionResult::Continue)
450            }
451            CommandType::CqlQuery { query } => {
452                self.execute_cql_query(&query).await?;
453                Ok(ExecutionResult::Continue)
454            }
455            CommandType::Clear => {
456                self.execute_clear_command().await?;
457                Ok(ExecutionResult::Continue)
458            }
459            CommandType::History => {
460                self.execute_history_command().await?;
461                Ok(ExecutionResult::Continue)
462            }
463            CommandType::Source { file_path } => {
464                self.execute_source_command(&file_path).await?;
465                Ok(ExecutionResult::Continue)
466            }
467            CommandType::Status => {
468                // Get schema registry from session
469                let schema_registry = self.session.schema_registry();
470                commands::execute_status(self.session.data_dir(), schema_registry)
471                    .await
472                    .map_err(|e| {
473                        let err_msg = e.to_string().to_lowercase();
474                        if err_msg.contains("requires state_machine feature") {
475                            ReplError::UnsupportedFeature(e.to_string())
476                        } else if err_msg.contains("data directory") {
477                            ReplError::DataDirectoryError(e.to_string())
478                        } else {
479                            ReplError::Database(e)
480                        }
481                    })?;
482                Ok(ExecutionResult::Continue)
483            }
484            CommandType::Health => {
485                // Get configuration parameters for health checks
486                // Note: config_file path is not tracked in session, so we pass None
487                commands::execute_health(
488                    self.session.data_dir(),
489                    None, // Config file path not tracked in session
490                    self.config.page_size,
491                    self.config.show_timing,
492                    self.config.enable_colors,
493                )
494                .await
495                .map_err(|e| ReplError::Database(e))?;
496                Ok(ExecutionResult::Continue)
497            }
498            CommandType::Keyspaces => {
499                commands::execute_keyspaces(self.session.data_dir())
500                    .await
501                    .map_err(|e| {
502                        let err_msg = e.to_string().to_lowercase();
503                        if err_msg.contains("requires state_machine feature") {
504                            ReplError::UnsupportedFeature(e.to_string())
505                        } else if err_msg.contains("data directory") {
506                            ReplError::DataDirectoryError(e.to_string())
507                        } else {
508                            ReplError::Database(e)
509                        }
510                    })?;
511                Ok(ExecutionResult::Continue)
512            }
513            CommandType::Schema { operation } => {
514                self.execute_schema_command(operation).await?;
515                Ok(ExecutionResult::Continue)
516            }
517            CommandType::Unknown { input } => {
518                eprintln!("{} Unknown command: {}", "Error:".red().bold(), input);
519                println!("Type {} for help", ":help".green());
520                Ok(ExecutionResult::Continue)
521            }
522            // Issue #392: Write commands (placeholder implementations)
523            CommandType::Flush => {
524                eprintln!(
525                    "{} Flush command requires write mode. Start REPL with --writable --write-dir <path>",
526                    "Error:".red().bold()
527                );
528                Ok(ExecutionResult::Continue)
529            }
530            CommandType::WriteStats => {
531                eprintln!(
532                    "{} Write stats command requires write mode. Start REPL with --writable --write-dir <path>",
533                    "Error:".red().bold()
534                );
535                Ok(ExecutionResult::Continue)
536            }
537            CommandType::Maintenance { budget_ms } => {
538                let _ = budget_ms;
539                eprintln!(
540                    "{} Maintenance command requires write mode. Start REPL with --writable --write-dir <path>",
541                    "Error:".red().bold()
542                );
543                Ok(ExecutionResult::Continue)
544            }
545        }
546    }
547
548    /// Check if input should continue multi-line command
549    fn should_continue_multiline(&self, input: &str) -> bool {
550        // Continue if we're already in multi-line mode and line doesn't end with semicolon
551        if self.in_multiline {
552            return !input.trim_end().ends_with(';');
553        }
554
555        // Start multi-line mode for certain SQL keywords without semicolon
556        let trimmed = input.trim();
557        let sql_keywords = [
558            "SELECT", "INSERT", "UPDATE", "DELETE", "CREATE", "ALTER", "DROP",
559        ];
560
561        sql_keywords
562            .iter()
563            .any(|keyword| trimmed.to_uppercase().starts_with(keyword) && !trimmed.ends_with(';'))
564    }
565
566    /// Add input to command buffer
567    fn add_to_command_buffer(&mut self, input: &str) {
568        if !self.in_multiline {
569            self.in_multiline = true;
570            self.command_buffer.clear();
571        }
572
573        if !self.command_buffer.is_empty() {
574            self.command_buffer.push(' ');
575        }
576        self.command_buffer.push_str(input);
577    }
578
579    /// Reset command buffer
580    fn reset_command_buffer(&mut self) {
581        self.command_buffer.clear();
582        self.in_multiline = false;
583    }
584
585    /// Display REPL prompt with optional status line (Issue #242)
586    async fn display_prompt(&mut self) -> ReplResult<()> {
587        // Display status line before prompt (not during multiline)
588        if !self.in_multiline {
589            self.display_status_line().await?;
590        }
591
592        let prompt = if self.in_multiline {
593            self.config.prompt_continuation.clone()
594        } else {
595            self.format_prompt()
596        };
597
598        print!("{}", prompt);
599        io::stdout().flush().map_err(ReplError::Io)?;
600        Ok(())
601    }
602
603    /// Display status line with health, memory, and data metrics (Issue #242)
604    async fn display_status_line(&mut self) -> ReplResult<()> {
605        // Skip if disabled or colors are off or not a TTY
606        if !self.config.show_status_line || !self.config.enable_colors {
607            return Ok(());
608        }
609
610        // Check if stdout is a TTY (skip for piped output)
611        if !io::stdout().is_terminal() {
612            return Ok(());
613        }
614
615        // Refresh metrics if stale (METRICS_REFRESH_INTERVAL cache)
616        let needs_refresh = self
617            .cached_metrics
618            .as_ref()
619            .map(|m: &StatusMetrics| m.is_stale(METRICS_REFRESH_INTERVAL))
620            .unwrap_or(true);
621
622        if needs_refresh {
623            self.cached_metrics = Some(
624                StatusMetrics::collect(self.session.data_dir(), self.session.database()).await,
625            );
626        }
627
628        // Format and display status line
629        if let Some(ref metrics) = self.cached_metrics {
630            let health_str = match metrics.health {
631                HealthIndicator::Ok => "OK".green().to_string(),
632                HealthIndicator::Warning => "WARN".yellow().to_string(),
633                HealthIndicator::Error => "ERR".red().to_string(),
634            };
635
636            println!(
637                "[{}] Mem: {} | Data: {}",
638                health_str,
639                metrics.format_memory().cyan(),
640                metrics.format_data().cyan()
641            );
642        }
643
644        Ok(())
645    }
646
647    /// Format the main prompt with context
648    fn format_prompt(&self) -> String {
649        let mut prompt = String::new();
650
651        // Add keyspace if set
652        if let Some(ref keyspace) = self.session.current_keyspace() {
653            prompt.push_str(&format!("{}@", keyspace.cyan()));
654        }
655
656        // Add base prompt
657        prompt.push_str("cqlite");
658
659        // Add mode indicator for non-basic modes
660        match self.config.mode {
661            ReplMode::Tui => prompt.push_str("[tui]"),
662            ReplMode::Interactive => prompt.push_str("[i]"),
663            ReplMode::Basic => {}
664        }
665
666        prompt.push_str(&"> ".blue().bold().to_string());
667        prompt
668    }
669
670    /// Display startup banner
671    async fn display_startup_banner(&self) -> ReplResult<()> {
672        if !self.config.enable_colors {
673            println!("CQLite Interactive Shell");
674            println!("Type :help for help, :quit to exit");
675            return Ok(());
676        }
677
678        println!(
679            "{}",
680            "╔═══════════════════════════════════════════════╗".cyan()
681        );
682        println!(
683            "{}",
684            "║           CQLite REPL Engine v2.0            ║"
685                .cyan()
686                .bold()
687        );
688        println!(
689            "{}",
690            "║      High-Performance Cassandra Reader       ║".cyan()
691        );
692        println!(
693            "{}",
694            "╚═══════════════════════════════════════════════╝".cyan()
695        );
696        println!();
697
698        println!(
699            "🗄️  Database: {}",
700            self.session.db_path().display().to_string().yellow()
701        );
702        println!("🔧 Mode: {}", format!("{:?}", self.config.mode).green());
703        println!("📊 Engine: {}", "CQLite Core v0.1.0".green());
704
705        if let Some(ref keyspace) = self.session.current_keyspace() {
706            println!("📦 Keyspace: {}", keyspace.yellow());
707        }
708
709        println!();
710        println!("{}", "Quick Commands:".cyan().bold());
711        println!("  • {} - Show help", ":help".green());
712        println!("  • {} - List tables", ":tables".green());
713        println!("  • {} - Execute CQL", "SELECT * FROM table;".yellow());
714        println!("  • {} - Exit", ":quit".red());
715        println!();
716
717        Ok(())
718    }
719
720    /// Display goodbye message
721    async fn display_goodbye(&self) -> ReplResult<()> {
722        if self.config.enable_colors {
723            println!();
724            println!("{}", "Goodbye! Thank you for using CQLite.".cyan().bold());
725        } else {
726            println!("Goodbye!");
727        }
728        Ok(())
729    }
730
731    /// Execute help command
732    async fn execute_help_command(&self, topic: Option<&str>) -> ReplResult<()> {
733        match topic {
734            Some("commands") => self.show_commands_help(),
735            Some("config") => self.show_config_help(),
736            Some("cql") => self.show_cql_help(),
737            Some("examples") => self.show_examples_help(),
738            None => self.show_general_help(),
739            Some(unknown) => {
740                println!("{} Unknown help topic: {}", "Error:".red().bold(), unknown);
741                println!("Available topics: commands, config, cql, examples");
742            }
743        }
744        Ok(())
745    }
746
747    /// Execute config command
748    async fn execute_config_command(&mut self, operation: String) -> ReplResult<()> {
749        // Issue #143: Display merged effective configuration (read-only in M2)
750        if operation.is_empty() || operation == "show" {
751            self.show_current_config();
752        } else {
753            println!(
754                "{} Configuration is read-only in M2.",
755                "Note:".yellow().bold()
756            );
757            println!("Use CLI flags, environment variables, or config files to modify settings.");
758            println!();
759            self.show_current_config();
760        }
761        Ok(())
762    }
763
764    /// Execute tables command
765    ///
766    /// Uses DiscoveryService to scan the data directory for tables,
767    /// consistent with the :status command behavior.
768    async fn execute_tables_command(&mut self) -> ReplResult<()> {
769        println!("{}", "Listing tables...".cyan().bold());
770
771        #[cfg(not(feature = "state_machine"))]
772        {
773            return Err(ReplError::UnsupportedFeature(
774                "Tables command requires state_machine feature".to_string(),
775            ));
776        }
777
778        #[cfg(feature = "state_machine")]
779        {
780            use cqlite_core::discovery::DiscoveryService;
781
782            let Some(data_dir) = self.session.data_dir() else {
783                println!("No tables found");
784                println!("Configure data directory with :config data-dir <PATH>");
785                return Ok(());
786            };
787
788            let discovery_service = DiscoveryService::new(data_dir.to_path_buf(), None);
789
790            let summary = discovery_service.scan().await.map_err(|e| {
791                ReplError::DataDirectoryError(format!("Failed to scan for tables: {}", e))
792            })?;
793
794            // Filter by current keyspace if set
795            let tables: Vec<&String> = if let Some(keyspace) = self.session.current_keyspace() {
796                let prefix = format!("{}.", keyspace);
797                summary
798                    .tables
799                    .iter()
800                    .filter(|t| t.starts_with(&prefix))
801                    .collect()
802            } else {
803                summary.tables.iter().collect()
804            };
805
806            // Display any warnings about directory structure
807            for warning in &summary.warnings {
808                println!("{}", warning.yellow());
809                println!();
810            }
811
812            if tables.is_empty() {
813                println!("No tables found");
814                if self.session.current_keyspace().is_some() {
815                    println!("Try :use to switch keyspaces or run :tables without USE");
816                }
817            } else {
818                for table in tables {
819                    println!("  {}", table.green());
820                }
821            }
822
823            Ok(())
824        }
825    }
826
827    /// Execute describe command
828    ///
829    /// Uses SchemaRegistry to look up table schema information,
830    /// consistent with how other REPL commands access schema data.
831    async fn execute_describe_command(&mut self, object_name: &str) -> ReplResult<()> {
832        println!(
833            "{} {}",
834            "🔍 Describing:".cyan().bold(),
835            object_name.yellow()
836        );
837
838        #[cfg(not(feature = "state_machine"))]
839        {
840            return Err(ReplError::UnsupportedFeature(
841                "Describe command requires state_machine feature".to_string(),
842            ));
843        }
844
845        #[cfg(feature = "state_machine")]
846        {
847            // Parse object_name into keyspace.table
848            let (keyspace, table) = if object_name.contains('.') {
849                let parts: Vec<&str> = object_name.split('.').collect();
850                if parts.len() == 2 {
851                    (Some(parts[0].to_string()), parts[1])
852                } else if parts.len() > 2 {
853                    eprintln!(
854                        "{} Invalid object name format: '{}'. Use keyspace.table or table",
855                        "Error:".red().bold(),
856                        object_name
857                    );
858                    return Ok(());
859                } else {
860                    (self.session.current_keyspace().cloned(), object_name)
861                }
862            } else {
863                (self.session.current_keyspace().cloned(), object_name)
864            };
865
866            let Some(ks) = keyspace else {
867                eprintln!(
868                    "{} No keyspace specified and no current keyspace set",
869                    "Error:".red().bold()
870                );
871                println!("💡 Try :use <keyspace> first, or use keyspace.table format");
872                return Ok(());
873            };
874
875            // Try to get schema from registry
876            if let Some(registry) = self.session.schema_registry() {
877                let registry_guard = registry.read().await;
878                match registry_guard.get_schema(&ks, table).await {
879                    Ok(schema) => {
880                        let description = self.format_table_description(&schema);
881                        println!("{}", description);
882                    }
883                    Err(e) => {
884                        eprintln!(
885                            "{} Table '{}.{}' not found: {}",
886                            "Error:".red().bold(),
887                            ks,
888                            table,
889                            e
890                        );
891                        println!("💡 Try :tables to list available tables");
892                    }
893                }
894            } else {
895                eprintln!("{} Schema registry not available", "Error:".red().bold());
896                println!("💡 Ensure schema was loaded with --schema flag");
897            }
898
899            Ok(())
900        }
901    }
902
903    /// Format a TableSchema as human-readable description
904    #[cfg(feature = "state_machine")]
905    fn format_table_description(&self, schema: &cqlite_core::schema::TableSchema) -> String {
906        use std::fmt::Write;
907        let mut output = String::new();
908
909        // Note: writeln! to String cannot fail, but we use let _ = for lint compliance
910        let _ = writeln!(output, "Table: {}.{}\n", schema.keyspace, schema.table);
911        let _ = writeln!(output, "Columns:");
912
913        // Collect all columns with their roles
914        let mut all_columns: Vec<(String, String, String)> = Vec::new();
915
916        // Partition keys
917        for pk in schema.ordered_partition_keys() {
918            all_columns.push((
919                pk.name.clone(),
920                pk.data_type.clone(),
921                "PARTITION KEY".to_string(),
922            ));
923        }
924
925        // Clustering keys
926        for ck in schema.ordered_clustering_keys() {
927            let order_str = match ck.order {
928                cqlite_core::schema::ClusteringOrder::Asc => "CLUSTERING KEY".to_string(),
929                cqlite_core::schema::ClusteringOrder::Desc => "CLUSTERING KEY DESC".to_string(),
930            };
931            all_columns.push((ck.name.clone(), ck.data_type.clone(), order_str));
932        }
933
934        // Regular columns
935        for col in &schema.columns {
936            // Skip if already listed as key
937            let is_key = schema.partition_keys.iter().any(|k| k.name == col.name)
938                || schema.clustering_keys.iter().any(|k| k.name == col.name);
939            if !is_key {
940                all_columns.push((col.name.clone(), col.data_type.clone(), String::new()));
941            }
942        }
943
944        // Calculate column widths for alignment
945        let name_width = all_columns
946            .iter()
947            .map(|(n, _, _)| n.len())
948            .max()
949            .unwrap_or(10)
950            .max(10);
951        let type_width = all_columns
952            .iter()
953            .map(|(_, t, _)| t.len())
954            .max()
955            .unwrap_or(10)
956            .max(10);
957
958        // Print aligned columns
959        for (name, dtype, role) in all_columns {
960            if role.is_empty() {
961                let _ = writeln!(output, "  {:<name_width$}  {:<type_width$}", name, dtype);
962            } else {
963                let _ = writeln!(
964                    output,
965                    "  {:<name_width$}  {:<type_width$}  ({})",
966                    name, dtype, role
967                );
968            }
969        }
970
971        output
972    }
973
974    /// Execute use command
975    async fn execute_use_command(&mut self, keyspace: &str) -> ReplResult<()> {
976        match self.session.use_keyspace(keyspace).await {
977            Ok(()) => {
978                println!(
979                    "{} Now using keyspace: {}",
980                    "✅".green(),
981                    keyspace.yellow().bold()
982                );
983            }
984            Err(e) => {
985                eprintln!(
986                    "{} Failed to use keyspace {}: {}",
987                    "Error:".red().bold(),
988                    keyspace,
989                    e
990                );
991            }
992        }
993        Ok(())
994    }
995
996    /// Execute CQL query
997    async fn execute_cql_query(&mut self, query: &str) -> ReplResult<()> {
998        let start_time = std::time::Instant::now();
999
1000        println!("{} {}", "🔍 Executing:".blue().bold(), query.yellow());
1001
1002        match self.session.execute_query(query).await {
1003            Ok(result) => {
1004                let elapsed = start_time.elapsed();
1005                self.display_query_result(&result)?;
1006
1007                if self.config.show_timing {
1008                    println!();
1009                    println!(
1010                        "{} {:.2}ms",
1011                        "⏱️  Execution time:".green(),
1012                        elapsed.as_millis()
1013                    );
1014                }
1015            }
1016            Err(e) => {
1017                let elapsed = start_time.elapsed();
1018                eprintln!(
1019                    "{} Query failed after {:.2}ms",
1020                    "❌ Error:".red().bold(),
1021                    elapsed.as_millis()
1022                );
1023                eprintln!("  {}", e.to_string().red());
1024                self.provide_query_hints(query, &e);
1025            }
1026        }
1027
1028        Ok(())
1029    }
1030
1031    /// Execute clear command
1032    async fn execute_clear_command(&self) -> ReplResult<()> {
1033        print!("\\x1B[2J\\x1B[1;1H");
1034        io::stdout().flush().map_err(ReplError::Io)?;
1035        Ok(())
1036    }
1037
1038    /// Execute history command
1039    async fn execute_history_command(&self) -> ReplResult<()> {
1040        if let Some(ref history) = self.history {
1041            println!("{}", "📜 Command History".cyan().bold());
1042            println!("{}", "═".repeat(20).cyan());
1043
1044            let commands = history.recent_commands(20);
1045            if commands.is_empty() {
1046                println!("📭 No commands in history");
1047            } else {
1048                for (i, cmd) in commands.iter().enumerate() {
1049                    println!("  {:3}. {}", i + 1, cmd);
1050                }
1051            }
1052        } else {
1053            println!("{} History is disabled", "Info:".cyan().bold());
1054        }
1055        Ok(())
1056    }
1057
1058    /// Execute source command
1059    async fn execute_source_command(&mut self, file_path: &str) -> ReplResult<()> {
1060        println!(
1061            "{} Executing commands from: {}",
1062            "📂".cyan(),
1063            file_path.yellow()
1064        );
1065
1066        let path = std::path::Path::new(file_path);
1067        if !path.exists() {
1068            eprintln!("{} File not found: {}", "Error:".red().bold(), file_path);
1069            return Ok(());
1070        }
1071
1072        let content = std::fs::read_to_string(path).map_err(|e| ReplError::Io(e))?;
1073
1074        let mut executed = 0;
1075        let errors = 0;
1076
1077        for (line_num, line) in content.lines().enumerate() {
1078            let trimmed = line.trim();
1079
1080            // Skip empty lines and comments
1081            if trimmed.is_empty() || trimmed.starts_with("--") || trimmed.starts_with("#") {
1082                continue;
1083            }
1084
1085            println!(
1086                "{}:{} {}",
1087                file_path,
1088                line_num + 1,
1089                trimmed.to_string().dimmed()
1090            );
1091
1092            match self.process_input(trimmed).await? {
1093                ExecutionResult::Continue => executed += 1,
1094                ExecutionResult::Exit => {
1095                    println!("🛑 Execution stopped due to exit command");
1096                    break;
1097                }
1098                ExecutionResult::ExitWithCode(_) => {
1099                    println!("🛑 Execution stopped");
1100                    break;
1101                }
1102            }
1103        }
1104
1105        println!();
1106        println!(
1107            "📊 File execution completed: {} commands executed, {} errors",
1108            executed, errors
1109        );
1110        Ok(())
1111    }
1112
1113    /// Display query result
1114    fn display_query_result(&self, result: &QueryResult) -> ReplResult<()> {
1115        match self.config.output_format {
1116            OutputFormat::Table => self.display_table_result(result),
1117            OutputFormat::Csv => self.display_csv_result(result),
1118            OutputFormat::Json => self.display_json_result(result),
1119            OutputFormat::Raw => self.display_raw_result(result),
1120        }
1121    }
1122
1123    /// Display result in table format
1124    fn display_table_result(&self, result: &QueryResult) -> ReplResult<()> {
1125        use crate::config::OutputConfig;
1126        use crate::output::TableWriter;
1127
1128        if result.rows.is_empty() {
1129            if result.rows_affected > 0 {
1130                println!(
1131                    "{} {} rows affected",
1132                    "✅".green().bold(),
1133                    result.rows_affected
1134                );
1135            } else {
1136                println!("{} No rows returned", "📭".yellow());
1137            }
1138            return Ok(());
1139        }
1140
1141        // Build output config from REPL config
1142        let output_config = OutputConfig {
1143            color_enabled: self.config.enable_colors,
1144            limit: None,
1145            page_size: None,
1146            target: crate::output::OutputTarget::Stdout,
1147            overwrite: false,
1148        };
1149
1150        // Format using TableWriter
1151        let formatted = TableWriter::write(result, &output_config)
1152            .map_err(|e| ReplError::Session(format!("Failed to format table output: {}", e)))?;
1153
1154        println!("{}", formatted);
1155
1156        Ok(())
1157    }
1158
1159    /// Display result in CSV format
1160    fn display_csv_result(&self, result: &QueryResult) -> ReplResult<()> {
1161        use crate::config::OutputConfig;
1162        use crate::output::CSVWriter;
1163
1164        // Build output config from REPL config
1165        let output_config = OutputConfig {
1166            color_enabled: self.config.enable_colors,
1167            limit: None, // REPL doesn't use CLI limit
1168            page_size: None,
1169            target: crate::output::OutputTarget::Stdout,
1170            overwrite: false,
1171        };
1172
1173        let formatted = CSVWriter::write(result, &output_config)
1174            .map_err(|e| ReplError::Session(format!("Failed to format CSV output: {}", e)))?;
1175
1176        println!("{}", formatted);
1177
1178        Ok(())
1179    }
1180
1181    /// Display result in JSON format
1182    fn display_json_result(&self, result: &QueryResult) -> ReplResult<()> {
1183        use crate::config::OutputConfig;
1184        use crate::output::JSONWriter;
1185
1186        // Build output config from REPL config
1187        let output_config = OutputConfig {
1188            color_enabled: self.config.enable_colors,
1189            limit: None, // REPL doesn't use CLI limit
1190            page_size: None,
1191            target: crate::output::OutputTarget::Stdout,
1192            overwrite: false,
1193        };
1194
1195        let formatted = JSONWriter::write(result, &output_config)
1196            .map_err(|e| ReplError::Session(format!("Failed to format JSON output: {}", e)))?;
1197
1198        println!("{}", formatted);
1199
1200        Ok(())
1201    }
1202
1203    /// Display result in raw format
1204    fn display_raw_result(&self, _result: &QueryResult) -> ReplResult<()> {
1205        println!("Raw output not yet implemented");
1206        Ok(())
1207    }
1208
1209    /// Provide helpful hints for query errors
1210    fn provide_query_hints(&self, _query: &str, error: &ReplError) {
1211        let error_msg = error.to_string();
1212
1213        println!();
1214        if error_msg.contains("table") && error_msg.contains("not found") {
1215            println!("{} Table not found. Try:", "💡 Hint:".cyan().bold());
1216            println!("  • {} to list tables", ":tables".green());
1217            println!("  • Check table name spelling");
1218        } else if error_msg.contains("syntax") {
1219            println!("{} Syntax error. Try:", "💡 Hint:".cyan().bold());
1220            println!("  • {} for CQL help", ":help cql".green());
1221            println!("  • Check query syntax");
1222        } else {
1223            println!(
1224                "{} For general help: {}",
1225                "💡 Hint:".cyan().bold(),
1226                ":help".green()
1227            );
1228        }
1229    }
1230
1231    /// Show general help
1232    fn show_general_help(&self) {
1233        println!("{}", "CQLite REPL Help".cyan().bold());
1234        println!("{}", "═".repeat(20).cyan());
1235        println!();
1236        println!("Commands:");
1237        println!("  :help [topic]    Show help (topics: commands, config, cql, examples)");
1238        println!("  :quit, :exit     Exit the REPL");
1239        println!("  :tables          List all tables");
1240        println!("  :describe <obj>  Describe object");
1241        println!("  :use <keyspace>  Switch keyspace");
1242        println!("  :config [op]     Show/set configuration");
1243        println!("  :status          Show discovery and schema coverage status");
1244        println!("  :health          Show health diagnostics");
1245        println!("  :clear           Clear screen");
1246        println!("  :history         Show command history");
1247        println!("  :source <file>   Execute commands from file");
1248        println!();
1249        println!("CQL queries can be executed directly (end with semicolon for multi-line)");
1250    }
1251
1252    /// Show commands help
1253    fn show_commands_help(&self) {
1254        println!("{}", "Available Commands".cyan().bold());
1255        println!("{}", "═".repeat(20).cyan());
1256        println!();
1257        println!("Meta Commands:");
1258        println!("  :help            Show this help");
1259        println!("  :quit, :exit     Exit REPL");
1260        println!("  :clear           Clear screen");
1261        println!("  :history         Show recent commands");
1262        println!();
1263        println!("Database Commands:");
1264        println!("  :tables          List all tables");
1265        println!("  :describe <obj>  Show object schema");
1266        println!("  :use <keyspace>  Switch to keyspace");
1267        println!("  :status          Show discovery and schema coverage status");
1268        println!("  :health          Show health diagnostics");
1269        println!();
1270        println!("File Commands:");
1271        println!("  :source <file>   Execute CQL file");
1272        println!();
1273        println!("Configuration:");
1274        println!("  :config          Show merged effective configuration (read-only)");
1275    }
1276
1277    /// Show config help
1278    fn show_config_help(&self) {
1279        println!("{}", "Configuration Help".cyan().bold());
1280        println!("{}", "═".repeat(20).cyan());
1281        println!();
1282        println!("View merged effective configuration (read-only):");
1283        println!("  :config                    Display all configuration settings");
1284        println!();
1285        println!("The :config command shows:");
1286        println!("  • Data & Schema settings (data_directory, schema_paths, default_keyspace)");
1287        println!("  • Output settings (output_mode, query_limit, colors)");
1288        println!("  • REPL settings (page_size, show_timing, history, completion)");
1289        println!(
1290            "  • Precedence chain (CLI > ENV > --config > .cqlite.toml > user config > defaults)"
1291        );
1292        println!();
1293        println!("Note: Configuration is read-only in M2. Use CLI flags, environment");
1294        println!("      variables, or config files to modify settings.");
1295    }
1296
1297    /// Show CQL help
1298    fn show_cql_help(&self) {
1299        println!("{}", "CQL Query Help".cyan().bold());
1300        println!("{}", "═".repeat(20).cyan());
1301        println!();
1302        println!("Supported CQL:");
1303        println!("  SELECT * FROM table;");
1304        println!("  SELECT col1, col2 FROM table WHERE condition;");
1305        println!("  DESCRIBE TABLE table_name;");
1306        println!();
1307        println!("Multi-line queries:");
1308        println!("  Start typing a query and press Enter");
1309        println!("  Continue on next lines");
1310        println!("  End with semicolon (;) to execute");
1311    }
1312
1313    /// Show examples help
1314    fn show_examples_help(&self) {
1315        println!("{}", "Usage Examples".cyan().bold());
1316        println!("{}", "═".repeat(20).cyan());
1317        println!();
1318        println!("Basic workflow:");
1319        println!("  :tables");
1320        println!("  :describe users");
1321        println!("  SELECT * FROM users LIMIT 5;");
1322        println!();
1323        println!("Configuration:");
1324        println!("  :config output_format=json");
1325        println!("  :config show_timing=true");
1326        println!();
1327        println!("File execution:");
1328        println!("  :source /path/to/queries.sql");
1329    }
1330
1331    /// Show current configuration (Issue #143: Display merged effective config)
1332    fn show_current_config(&self) {
1333        let cli_config = self.session.config();
1334
1335        println!("{}", "Effective Configuration".cyan().bold());
1336        println!("{}", "═".repeat(60).cyan());
1337        println!();
1338
1339        // Data and Schema Configuration
1340        println!("{}", "Data & Schema:".yellow().bold());
1341        if let Some(ref data_dir) = cli_config.data_directory {
1342            println!("  data_directory       = {}", data_dir.display());
1343        } else {
1344            println!("  data_directory       = {}", "<not set>".dimmed());
1345        }
1346
1347        if !cli_config.schema_paths.is_empty() {
1348            let paths: Vec<String> = cli_config
1349                .schema_paths
1350                .iter()
1351                .map(|p| p.display().to_string())
1352                .collect();
1353            println!("  schema_paths         = [{}]", paths.join(", "));
1354        } else {
1355            println!("  schema_paths         = {}", "[]".dimmed());
1356        }
1357
1358        if let Some(ref keyspace) = cli_config.default_keyspace {
1359            println!("  default_keyspace     = {}", keyspace);
1360        } else {
1361            println!("  default_keyspace     = {}", "<not set>".dimmed());
1362        }
1363        println!();
1364
1365        // Output Configuration
1366        println!("{}", "Output Settings:".yellow().bold());
1367        if let Some(ref mode) = cli_config.output_mode {
1368            println!("  output_mode          = {}", mode);
1369        } else {
1370            println!("  output_mode          = {}", "table".dimmed());
1371        }
1372
1373        if let Some(limit) = cli_config.query_limit {
1374            println!("  query_limit          = {}", limit);
1375        } else {
1376            println!("  query_limit          = {}", "<unlimited>".dimmed());
1377        }
1378
1379        println!("  no_color             = {}", cli_config.no_color);
1380        println!("  colors               = {}", cli_config.output.colors);
1381
1382        if let Some(max_rows) = cli_config.output.max_rows {
1383            println!("  max_rows             = {}", max_rows);
1384        } else {
1385            println!("  max_rows             = {}", "<unlimited>".dimmed());
1386        }
1387        println!();
1388
1389        // REPL Settings
1390        println!("{}", "REPL Settings:".yellow().bold());
1391        println!("  mode                 = {:?}", self.config.mode);
1392        println!("  output_format        = {:?}", self.config.output_format);
1393        println!("  page_size            = {}", self.config.page_size);
1394        println!("  show_timing          = {}", self.config.show_timing);
1395        println!("  enable_paging        = {}", self.config.enable_paging);
1396        println!("  enable_colors        = {}", self.config.enable_colors);
1397        println!(
1398            "  history              = {}",
1399            if self.history.is_some() {
1400                "enabled"
1401            } else {
1402                "disabled"
1403            }
1404        );
1405        println!(
1406            "  completion           = {}",
1407            if self.completion.is_some() {
1408                "enabled"
1409            } else {
1410                "disabled"
1411            }
1412        );
1413        println!();
1414
1415        // Precedence Information
1416        println!("{}", "Precedence Chain:".yellow().bold());
1417        println!("  {}", "CLI flags > Environment variables > Explicit config (--config) > Project config (./.cqlite.toml) > User config > Defaults".dimmed());
1418    }
1419
1420    /// Set configuration value
1421    async fn set_config_value(&mut self, key: &str, value: &str) -> ReplResult<()> {
1422        match key {
1423            "data-dir" | "data_dir" => {
1424                let data_dir = std::path::PathBuf::from(value);
1425
1426                // Validate directory exists
1427                if !data_dir.exists() {
1428                    println!(
1429                        "{} Directory does not exist: {}",
1430                        "Error:".red().bold(),
1431                        value
1432                    );
1433                    return Ok(());
1434                }
1435
1436                if !data_dir.is_dir() {
1437                    println!(
1438                        "{} Path is not a directory: {}",
1439                        "Error:".red().bold(),
1440                        value
1441                    );
1442                    return Ok(());
1443                }
1444
1445                println!(
1446                    "{} Changing data directory to: {}",
1447                    "Info:".cyan(),
1448                    data_dir.display().to_string().yellow()
1449                );
1450
1451                // Rebuild database with new data directory
1452                self.rebuild_database_from_discovery(
1453                    data_dir,
1454                    self.schema_paths.clone(),
1455                    self.version_hint.clone(),
1456                )
1457                .await?;
1458            }
1459            "output_format" => {
1460                self.config.output_format = match value.to_lowercase().as_str() {
1461                    "table" => OutputFormat::Table,
1462                    "csv" => OutputFormat::Csv,
1463                    "json" => OutputFormat::Json,
1464                    "raw" => OutputFormat::Raw,
1465                    _ => {
1466                        println!(
1467                            "{} Invalid output format. Use: table, csv, json, raw",
1468                            "Error:".red().bold()
1469                        );
1470                        return Ok(());
1471                    }
1472                };
1473                println!(
1474                    "{} Output format set to: {:?}",
1475                    "✅".green(),
1476                    self.config.output_format
1477                );
1478            }
1479            "page_size" => match value.parse::<usize>() {
1480                Ok(size) if size > 0 => {
1481                    self.config.page_size = size;
1482                    println!("{} Page size set to: {}", "✅".green(), size);
1483                }
1484                _ => {
1485                    println!(
1486                        "{} Invalid page size. Must be positive number",
1487                        "Error:".red().bold()
1488                    );
1489                }
1490            },
1491            "show_timing" => match value.to_lowercase().as_str() {
1492                "true" | "on" | "1" | "yes" => {
1493                    self.config.show_timing = true;
1494                    println!("{} Timing enabled", "✅".green());
1495                }
1496                "false" | "off" | "0" | "no" => {
1497                    self.config.show_timing = false;
1498                    println!("{} Timing disabled", "✅".green());
1499                }
1500                _ => {
1501                    println!(
1502                        "{} Invalid boolean value. Use: true/false",
1503                        "Error:".red().bold()
1504                    );
1505                }
1506            },
1507            "enable_paging" => match value.to_lowercase().as_str() {
1508                "true" | "on" | "1" | "yes" => {
1509                    self.config.enable_paging = true;
1510                    println!("{} Paging enabled", "✅".green());
1511                }
1512                "false" | "off" | "0" | "no" => {
1513                    self.config.enable_paging = false;
1514                    println!("{} Paging disabled", "✅".green());
1515                }
1516                _ => {
1517                    println!(
1518                        "{} Invalid boolean value. Use: true/false",
1519                        "Error:".red().bold()
1520                    );
1521                }
1522            },
1523            _ => {
1524                println!(
1525                    "{} Unknown configuration key: {}",
1526                    "Error:".red().bold(),
1527                    key
1528                );
1529                println!("Available keys: data-dir, output_format, page_size, show_timing, enable_paging");
1530            }
1531        }
1532
1533        Ok(())
1534    }
1535
1536    /// Get reference to session
1537    pub fn session(&self) -> &ReplSession {
1538        &self.session
1539    }
1540
1541    /// Get mutable reference to session
1542    pub fn session_mut(&mut self) -> &mut ReplSession {
1543        &mut self.session
1544    }
1545
1546    /// Get current configuration
1547    pub fn config(&self) -> &ReplConfig {
1548        &self.config
1549    }
1550
1551    /// Execute schema command
1552    async fn execute_schema_command(
1553        &mut self,
1554        operation: super::SchemaOperation,
1555    ) -> ReplResult<()> {
1556        use super::SchemaOperation;
1557
1558        match operation {
1559            SchemaOperation::Load { paths } => {
1560                println!(
1561                    "{} Loading schemas from {} file(s)...",
1562                    "Info:".cyan(),
1563                    paths.len()
1564                );
1565
1566                // Convert paths to PathBuf
1567                let schema_paths: Vec<std::path::PathBuf> =
1568                    paths.iter().map(|p| std::path::PathBuf::from(p)).collect();
1569
1570                // Validate all paths exist
1571                for path in &schema_paths {
1572                    if !path.exists() {
1573                        return Err(ReplError::SchemaError(format!(
1574                            "Schema file not found: {}",
1575                            path.display()
1576                        )));
1577                    }
1578                }
1579
1580                // Get current data directory
1581                let data_dir = match self.session.data_dir() {
1582                    Some(dir) => dir.to_path_buf(),
1583                    None => {
1584                        return Err(ReplError::DataDirectoryError(
1585                            "No data directory configured. Use :config data-dir=<path> first"
1586                                .to_string(),
1587                        ));
1588                    }
1589                };
1590
1591                // Store schema paths for future refresh
1592                self.schema_paths = schema_paths.clone();
1593
1594                // Rebuild database with new schemas
1595                self.rebuild_database_from_discovery(
1596                    data_dir,
1597                    schema_paths,
1598                    self.version_hint.clone(),
1599                )
1600                .await?;
1601            }
1602            SchemaOperation::Refresh => {
1603                println!("{} Refreshing schemas...", "Info:".cyan());
1604
1605                if self.schema_paths.is_empty() {
1606                    println!(
1607                        "{} No schemas loaded. Use :schema load <path> first",
1608                        "Warning:".yellow()
1609                    );
1610                    return Ok(());
1611                }
1612
1613                let data_dir = match self.session.data_dir() {
1614                    Some(dir) => dir.to_path_buf(),
1615                    None => {
1616                        return Err(ReplError::DataDirectoryError(
1617                            "No data directory configured".to_string(),
1618                        ));
1619                    }
1620                };
1621
1622                // Rebuild with existing schema paths
1623                self.rebuild_database_from_discovery(
1624                    data_dir,
1625                    self.schema_paths.clone(),
1626                    self.version_hint.clone(),
1627                )
1628                .await?;
1629            }
1630            SchemaOperation::Unload => {
1631                println!("{} Unloading schemas...", "Info:".cyan());
1632
1633                let data_dir = match self.session.data_dir() {
1634                    Some(dir) => dir.to_path_buf(),
1635                    None => {
1636                        return Err(ReplError::DataDirectoryError(
1637                            "No data directory configured".to_string(),
1638                        ));
1639                    }
1640                };
1641
1642                // Clear schema paths
1643                self.schema_paths.clear();
1644
1645                // Rebuild with no schemas
1646                self.rebuild_database_from_discovery(
1647                    data_dir,
1648                    Vec::new(),
1649                    self.version_hint.clone(),
1650                )
1651                .await?;
1652            }
1653            SchemaOperation::Show => {
1654                println!("{}", "Schema Status".cyan().bold());
1655                println!("{}", "═".repeat(25).cyan());
1656                println!();
1657
1658                if self.schema_paths.is_empty() {
1659                    println!("No schemas loaded");
1660                } else {
1661                    println!("Loaded schemas ({}):", self.schema_paths.len());
1662                    for (i, path) in self.schema_paths.iter().enumerate() {
1663                        println!("  {}. {}", i + 1, path.display().to_string().green());
1664                    }
1665                }
1666
1667                if let Some(ref data_dir) = self.session.data_dir() {
1668                    println!();
1669                    println!(
1670                        "Data directory: {}",
1671                        data_dir.display().to_string().yellow()
1672                    );
1673                }
1674
1675                if let Some(ref version) = self.version_hint {
1676                    println!("Version hint: {}", version.yellow());
1677                }
1678            }
1679            SchemaOperation::List => {
1680                commands::execute_schema_list(&self.schema_paths).await?;
1681            }
1682        }
1683
1684        Ok(())
1685    }
1686
1687    /// Rebuild Database from discovery when ingestion changes
1688    ///
1689    /// This method orchestrates schema loading and SSTable discovery to create
1690    /// a new Database instance, replacing the existing one in the REPL session.
1691    ///
1692    /// Use cases:
1693    /// - After `:config data-dir <path>` changes the data directory
1694    /// - After `:schema load <path>` loads new schema files
1695    /// - After `:schema refresh` reloads existing schemas
1696    ///
1697    /// # Arguments
1698    ///
1699    /// * `data_dir` - Root data directory containing SSTables
1700    /// * `schema_paths` - Schema file paths (.cql or .json) to load
1701    /// * `version_hint` - Optional Cassandra version hint (e.g., "5.0")
1702    ///
1703    /// # Errors
1704    ///
1705    /// Returns ReplError::Database for ingestion failures, schema loading errors,
1706    /// or database initialization errors.
1707    pub async fn rebuild_database_from_discovery(
1708        &mut self,
1709        data_dir: std::path::PathBuf,
1710        schema_paths: Vec<std::path::PathBuf>,
1711        version_hint: Option<String>,
1712    ) -> ReplResult<()> {
1713        use cqlite_core::ingestion::{ingest, IngestionConfig};
1714
1715        println!("{}", "Rebuilding database from discovery...".cyan().bold());
1716
1717        // Step 1: Create ingestion config
1718        // Note: Using default core config as CLI config is different from core config
1719        let ingestion_config = IngestionConfig {
1720            schema_paths: schema_paths.clone(),
1721            data_dir: data_dir.clone(),
1722            version_hint: version_hint.clone(),
1723            core_config: cqlite_core::Config::default(),
1724            table_directory_filter: None, // REPL doesn't use filtering
1725        };
1726
1727        // Step 2: Run ingestion flow
1728        let start_time = std::time::Instant::now();
1729        let ingestion_result = ingest(ingestion_config)
1730            .await
1731            .map_err(|e| ReplError::Database(e.into()))?;
1732        let elapsed = start_time.elapsed();
1733
1734        // Step 3: Report ingestion results
1735        println!(
1736            "{} {} schemas loaded, {} UDTs loaded",
1737            "Schema:".green(),
1738            ingestion_result.schema_load_result.schemas_loaded,
1739            ingestion_result.schema_load_result.udts_loaded
1740        );
1741
1742        if !ingestion_result.schema_load_result.warnings.is_empty() {
1743            println!(
1744                "{} {} warning(s)",
1745                "Warnings:".yellow(),
1746                ingestion_result.schema_load_result.warnings.len()
1747            );
1748            for warning in &ingestion_result.schema_load_result.warnings {
1749                println!("  - {}", warning.message.yellow());
1750            }
1751        }
1752
1753        println!(
1754            "{} {} SSTables discovered across {} keyspaces",
1755            "Discovery:".green(),
1756            ingestion_result.discovery_summary.sstables_found,
1757            ingestion_result.discovery_summary.keyspaces.len()
1758        );
1759
1760        if let Some(ref version) = ingestion_result.discovery_summary.resolved_version {
1761            println!("{} Cassandra {}", "Version:".green(), version.yellow());
1762        }
1763
1764        // Step 4: Replace Database in session
1765        // The old Database will be dropped and properly closed when Arc refcount hits zero
1766        self.session.replace_database(ingestion_result.database)?;
1767
1768        // Step 5: Update session data directory
1769        self.session.set_data_dir(Some(data_dir.clone()));
1770
1771        // Step 6: Store schema registry for coverage reporting
1772        self.session
1773            .set_schema_registry(Some(ingestion_result.schema_registry));
1774
1775        println!(
1776            "{} Database rebuilt in {:.2}ms",
1777            "Success:".green().bold(),
1778            elapsed.as_millis()
1779        );
1780
1781        Ok(())
1782    }
1783}