use super::{
commands, CommandParser, CommandType, CompletionEngine, ExecutionResult, HistoryManager,
OutputFormat, ParsedCommand, ReplError, ReplMode, ReplResult, ReplSession,
};
use crate::config::Config;
use crate::status_metrics::{HealthIndicator, StatusMetrics, METRICS_REFRESH_INTERVAL};
use colored::Colorize;
use cqlite_core::{Database, QueryResult};
use std::io::{self, IsTerminal, Write};
use std::path::Path;
#[derive(Debug, Clone)]
pub struct ReplConfig {
pub mode: ReplMode,
pub enable_history: bool,
pub enable_completion: bool,
pub enable_colors: bool,
pub output_format: OutputFormat,
pub max_history_size: usize,
pub page_size: usize,
pub show_timing: bool,
pub enable_paging: bool,
pub prompt: String,
pub prompt_continuation: String,
pub show_status_line: bool,
}
impl Default for ReplConfig {
fn default() -> Self {
Self {
mode: ReplMode::Interactive,
enable_history: true,
enable_completion: true,
enable_colors: true,
output_format: OutputFormat::Table,
max_history_size: 1000,
page_size: 50,
show_timing: false,
enable_paging: true,
prompt: "cqlite> ".to_string(),
prompt_continuation: " -> ".to_string(),
show_status_line: true,
}
}
}
pub struct ReplEngine {
config: ReplConfig,
parser: CommandParser,
session: ReplSession,
history: Option<HistoryManager>,
completion: Option<CompletionEngine>,
command_buffer: String,
in_multiline: bool,
schema_paths: Vec<std::path::PathBuf>,
version_hint: Option<String>,
cached_metrics: Option<StatusMetrics>,
}
impl ReplEngine {
pub fn new(
config: ReplConfig,
db_path: &Path,
app_config: Config,
database: Database,
) -> ReplResult<Self> {
Self::with_schema_registry(config, db_path, app_config, database, None)
}
pub fn with_schema_registry(
config: ReplConfig,
db_path: &Path,
app_config: Config,
database: Database,
schema_registry: Option<
std::sync::Arc<tokio::sync::RwLock<cqlite_core::schema::registry::SchemaRegistry>>,
>,
) -> ReplResult<Self> {
let mut session = ReplSession::new(db_path, app_config, database)?;
if let Some(registry) = schema_registry {
session.set_schema_registry(Some(registry));
}
let parser = CommandParser::new();
let history = if config.enable_history {
Some(HistoryManager::new(config.max_history_size)?)
} else {
None
};
let completion = if config.enable_completion {
Some(CompletionEngine::new())
} else {
None
};
Ok(Self {
config,
parser,
session,
history,
completion,
command_buffer: String::new(),
in_multiline: false,
schema_paths: Vec::new(),
version_hint: None,
cached_metrics: None,
})
}
pub async fn run(&mut self) -> ReplResult<()> {
self.session.initialize().await?;
self.display_startup_banner().await?;
match self.config.mode {
ReplMode::Basic => self.run_basic_repl().await,
ReplMode::Interactive => self.run_interactive_repl().await,
ReplMode::Tui => self.run_tui_repl().await,
}
}
async fn run_basic_repl(&mut self) -> ReplResult<()> {
let stdin = io::stdin();
let mut input = String::new();
loop {
self.display_prompt().await?;
input.clear();
match stdin.read_line(&mut input) {
Ok(0) => break, Ok(_) => {
let trimmed = input.trim();
if trimmed.is_empty() {
continue;
}
match self.process_input(trimmed).await {
Ok(ExecutionResult::Continue) => continue,
Ok(ExecutionResult::Exit) => break,
Ok(ExecutionResult::ExitWithCode(code)) => {
return Err(match code {
3 => ReplError::SchemaError("Schema error occurred".to_string()),
4 => ReplError::DataDirectoryError(
"Data directory error occurred".to_string(),
),
5 => {
ReplError::UnsupportedFeature("Unsupported feature".to_string())
}
_ => ReplError::Session(format!("Exit with code {}", code)),
});
}
Err(e) => {
eprintln!("{} {}", "Error:".red().bold(), e);
if matches!(
e,
ReplError::SchemaError(_)
| ReplError::DataDirectoryError(_)
| ReplError::UnsupportedFeature(_)
) {
return Err(e);
}
continue;
}
}
}
Err(e) => {
eprintln!("{} Input error: {}", "Error:".red().bold(), e);
break;
}
}
}
self.display_goodbye().await?;
Ok(())
}
async fn run_interactive_repl(&mut self) -> ReplResult<()> {
use rustyline::error::ReadlineError;
use rustyline::DefaultEditor;
let mut rl: DefaultEditor = DefaultEditor::new()
.map_err(|e| ReplError::Session(format!("Failed to initialize line editor: {}", e)))?;
if self.history.is_some() {
let history_file = self.session.config().repl.history_file.clone().or_else(|| {
dirs::home_dir().map(|home| home.join(".cqlite_history"))
});
if let Some(ref path) = history_file {
let _ = rl.load_history(path);
}
}
self.display_startup_banner().await?;
loop {
if !self.in_multiline {
self.display_status_line().await?;
}
let prompt = if self.in_multiline {
self.config.prompt_continuation.clone()
} else {
self.format_prompt()
};
let readline = rl.readline(&prompt);
match readline {
Ok(line) => {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let _ = rl.add_history_entry(&line);
match self.process_input(trimmed).await {
Ok(ExecutionResult::Continue) => continue,
Ok(ExecutionResult::Exit) => break,
Ok(ExecutionResult::ExitWithCode(code)) => {
if let Some(history_file) =
self.session.config().repl.history_file.clone().or_else(|| {
dirs::home_dir().map(|home| home.join(".cqlite_history"))
})
{
let _ = rl.save_history(&history_file);
}
return Err(match code {
3 => ReplError::SchemaError("Schema error occurred".to_string()),
4 => ReplError::DataDirectoryError(
"Data directory error occurred".to_string(),
),
5 => {
ReplError::UnsupportedFeature("Unsupported feature".to_string())
}
_ => ReplError::Session(format!("Exit with code {}", code)),
});
}
Err(e) => {
eprintln!("{} {}", "Error:".red().bold(), e);
if matches!(
e,
ReplError::SchemaError(_)
| ReplError::DataDirectoryError(_)
| ReplError::UnsupportedFeature(_)
) {
if let Some(history_file) =
self.session.config().repl.history_file.clone().or_else(|| {
dirs::home_dir().map(|home| home.join(".cqlite_history"))
})
{
let _ = rl.save_history(&history_file);
}
return Err(e);
}
continue;
}
}
}
Err(ReadlineError::Interrupted) => {
if self.in_multiline {
self.reset_command_buffer();
println!("^C");
continue;
} else {
println!("^C");
break;
}
}
Err(ReadlineError::Eof) => {
break;
}
Err(err) => {
eprintln!("{} Readline error: {}", "Error:".red().bold(), err);
break;
}
}
}
if let Some(history_file) = self
.session
.config()
.repl
.history_file
.clone()
.or_else(|| dirs::home_dir().map(|home| home.join(".cqlite_history")))
{
if let Some(parent) = history_file.parent() {
let _ = std::fs::create_dir_all(parent);
}
if let Err(e) = rl.save_history(&history_file) {
eprintln!("{} Failed to save history: {}", "Warning:".yellow(), e);
}
}
self.display_goodbye().await?;
Ok(())
}
async fn run_tui_repl(&mut self) -> ReplResult<()> {
println!(
"{} TUI mode not yet implemented in core engine",
"Info:".cyan().bold()
);
self.run_interactive_repl().await
}
pub fn process_input<'a>(
&'a mut self,
input: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ReplResult<ExecutionResult>> + 'a>>
{
Box::pin(async move { self.process_input_impl(input).await })
}
async fn process_input_impl(&mut self, input: &str) -> ReplResult<ExecutionResult> {
if self.should_continue_multiline(input) {
self.add_to_command_buffer(input);
return Ok(ExecutionResult::Continue);
}
let command = if self.in_multiline {
self.add_to_command_buffer(input);
let full_command = self.command_buffer.clone();
self.reset_command_buffer();
full_command
} else {
input.to_string()
};
if let Some(ref mut history) = self.history {
history.add_command(&command)?;
}
match self.parser.parse(&command) {
Ok(parsed_command) => self.execute_command(parsed_command).await,
Err(e) => {
eprintln!("{} Command parsing error: {}", "Error:".red().bold(), e);
Ok(ExecutionResult::Continue)
}
}
}
async fn execute_command(&mut self, command: ParsedCommand) -> ReplResult<ExecutionResult> {
match command.command_type {
CommandType::Exit => Ok(ExecutionResult::Exit),
CommandType::Help { topic } => {
self.execute_help_command(topic.as_deref()).await?;
Ok(ExecutionResult::Continue)
}
CommandType::Config { operation } => {
self.execute_config_command(operation).await?;
Ok(ExecutionResult::Continue)
}
CommandType::Tables => {
self.execute_tables_command().await?;
Ok(ExecutionResult::Continue)
}
CommandType::Describe { object_name } => {
self.execute_describe_command(&object_name).await?;
Ok(ExecutionResult::Continue)
}
CommandType::Use { keyspace } => {
self.execute_use_command(&keyspace).await?;
Ok(ExecutionResult::Continue)
}
CommandType::CqlQuery { query } => {
self.execute_cql_query(&query).await?;
Ok(ExecutionResult::Continue)
}
CommandType::Clear => {
self.execute_clear_command().await?;
Ok(ExecutionResult::Continue)
}
CommandType::History => {
self.execute_history_command().await?;
Ok(ExecutionResult::Continue)
}
CommandType::Source { file_path } => {
self.execute_source_command(&file_path).await?;
Ok(ExecutionResult::Continue)
}
CommandType::Status => {
let schema_registry = self.session.schema_registry();
commands::execute_status(self.session.data_dir(), schema_registry)
.await
.map_err(|e| {
let err_msg = e.to_string().to_lowercase();
if err_msg.contains("requires state_machine feature") {
ReplError::UnsupportedFeature(e.to_string())
} else if err_msg.contains("data directory") {
ReplError::DataDirectoryError(e.to_string())
} else {
ReplError::Database(e)
}
})?;
Ok(ExecutionResult::Continue)
}
CommandType::Health => {
commands::execute_health(
self.session.data_dir(),
None, self.config.page_size,
self.config.show_timing,
self.config.enable_colors,
)
.await
.map_err(|e| ReplError::Database(e))?;
Ok(ExecutionResult::Continue)
}
CommandType::Keyspaces => {
commands::execute_keyspaces(self.session.data_dir())
.await
.map_err(|e| {
let err_msg = e.to_string().to_lowercase();
if err_msg.contains("requires state_machine feature") {
ReplError::UnsupportedFeature(e.to_string())
} else if err_msg.contains("data directory") {
ReplError::DataDirectoryError(e.to_string())
} else {
ReplError::Database(e)
}
})?;
Ok(ExecutionResult::Continue)
}
CommandType::Schema { operation } => {
self.execute_schema_command(operation).await?;
Ok(ExecutionResult::Continue)
}
CommandType::Unknown { input } => {
eprintln!("{} Unknown command: {}", "Error:".red().bold(), input);
println!("Type {} for help", ":help".green());
Ok(ExecutionResult::Continue)
}
CommandType::Flush => {
eprintln!(
"{} Flush command requires write mode. Start REPL with --writable --write-dir <path>",
"Error:".red().bold()
);
Ok(ExecutionResult::Continue)
}
CommandType::WriteStats => {
eprintln!(
"{} Write stats command requires write mode. Start REPL with --writable --write-dir <path>",
"Error:".red().bold()
);
Ok(ExecutionResult::Continue)
}
CommandType::Maintenance { budget_ms } => {
let _ = budget_ms;
eprintln!(
"{} Maintenance command requires write mode. Start REPL with --writable --write-dir <path>",
"Error:".red().bold()
);
Ok(ExecutionResult::Continue)
}
}
}
fn should_continue_multiline(&self, input: &str) -> bool {
if self.in_multiline {
return !input.trim_end().ends_with(';');
}
let trimmed = input.trim();
let sql_keywords = [
"SELECT", "INSERT", "UPDATE", "DELETE", "CREATE", "ALTER", "DROP",
];
sql_keywords
.iter()
.any(|keyword| trimmed.to_uppercase().starts_with(keyword) && !trimmed.ends_with(';'))
}
fn add_to_command_buffer(&mut self, input: &str) {
if !self.in_multiline {
self.in_multiline = true;
self.command_buffer.clear();
}
if !self.command_buffer.is_empty() {
self.command_buffer.push(' ');
}
self.command_buffer.push_str(input);
}
fn reset_command_buffer(&mut self) {
self.command_buffer.clear();
self.in_multiline = false;
}
async fn display_prompt(&mut self) -> ReplResult<()> {
if !self.in_multiline {
self.display_status_line().await?;
}
let prompt = if self.in_multiline {
self.config.prompt_continuation.clone()
} else {
self.format_prompt()
};
print!("{}", prompt);
io::stdout().flush().map_err(ReplError::Io)?;
Ok(())
}
async fn display_status_line(&mut self) -> ReplResult<()> {
if !self.config.show_status_line || !self.config.enable_colors {
return Ok(());
}
if !io::stdout().is_terminal() {
return Ok(());
}
let needs_refresh = self
.cached_metrics
.as_ref()
.map(|m: &StatusMetrics| m.is_stale(METRICS_REFRESH_INTERVAL))
.unwrap_or(true);
if needs_refresh {
self.cached_metrics = Some(
StatusMetrics::collect(self.session.data_dir(), self.session.database()).await,
);
}
if let Some(ref metrics) = self.cached_metrics {
let health_str = match metrics.health {
HealthIndicator::Ok => "OK".green().to_string(),
HealthIndicator::Warning => "WARN".yellow().to_string(),
HealthIndicator::Error => "ERR".red().to_string(),
};
println!(
"[{}] Mem: {} | Data: {}",
health_str,
metrics.format_memory().cyan(),
metrics.format_data().cyan()
);
}
Ok(())
}
fn format_prompt(&self) -> String {
let mut prompt = String::new();
if let Some(ref keyspace) = self.session.current_keyspace() {
prompt.push_str(&format!("{}@", keyspace.cyan()));
}
prompt.push_str("cqlite");
match self.config.mode {
ReplMode::Tui => prompt.push_str("[tui]"),
ReplMode::Interactive => prompt.push_str("[i]"),
ReplMode::Basic => {}
}
prompt.push_str(&"> ".blue().bold().to_string());
prompt
}
async fn display_startup_banner(&self) -> ReplResult<()> {
if !self.config.enable_colors {
println!("CQLite Interactive Shell");
println!("Type :help for help, :quit to exit");
return Ok(());
}
println!(
"{}",
"╔═══════════════════════════════════════════════╗".cyan()
);
println!(
"{}",
"║ CQLite REPL Engine v2.0 ║"
.cyan()
.bold()
);
println!(
"{}",
"║ High-Performance Cassandra Reader ║".cyan()
);
println!(
"{}",
"╚═══════════════════════════════════════════════╝".cyan()
);
println!();
println!(
"🗄️ Database: {}",
self.session.db_path().display().to_string().yellow()
);
println!("🔧 Mode: {}", format!("{:?}", self.config.mode).green());
println!("📊 Engine: {}", "CQLite Core v0.1.0".green());
if let Some(ref keyspace) = self.session.current_keyspace() {
println!("📦 Keyspace: {}", keyspace.yellow());
}
println!();
println!("{}", "Quick Commands:".cyan().bold());
println!(" • {} - Show help", ":help".green());
println!(" • {} - List tables", ":tables".green());
println!(" • {} - Execute CQL", "SELECT * FROM table;".yellow());
println!(" • {} - Exit", ":quit".red());
println!();
Ok(())
}
async fn display_goodbye(&self) -> ReplResult<()> {
if self.config.enable_colors {
println!();
println!("{}", "Goodbye! Thank you for using CQLite.".cyan().bold());
} else {
println!("Goodbye!");
}
Ok(())
}
async fn execute_help_command(&self, topic: Option<&str>) -> ReplResult<()> {
match topic {
Some("commands") => self.show_commands_help(),
Some("config") => self.show_config_help(),
Some("cql") => self.show_cql_help(),
Some("examples") => self.show_examples_help(),
None => self.show_general_help(),
Some(unknown) => {
println!("{} Unknown help topic: {}", "Error:".red().bold(), unknown);
println!("Available topics: commands, config, cql, examples");
}
}
Ok(())
}
async fn execute_config_command(&mut self, operation: String) -> ReplResult<()> {
if operation.is_empty() || operation == "show" {
self.show_current_config();
} else {
println!(
"{} Configuration is read-only in M2.",
"Note:".yellow().bold()
);
println!("Use CLI flags, environment variables, or config files to modify settings.");
println!();
self.show_current_config();
}
Ok(())
}
async fn execute_tables_command(&mut self) -> ReplResult<()> {
println!("{}", "Listing tables...".cyan().bold());
#[cfg(not(feature = "state_machine"))]
{
return Err(ReplError::UnsupportedFeature(
"Tables command requires state_machine feature".to_string(),
));
}
#[cfg(feature = "state_machine")]
{
use cqlite_core::discovery::DiscoveryService;
let Some(data_dir) = self.session.data_dir() else {
println!("No tables found");
println!("Configure data directory with :config data-dir <PATH>");
return Ok(());
};
let discovery_service = DiscoveryService::new(data_dir.to_path_buf(), None);
let summary = discovery_service.scan().await.map_err(|e| {
ReplError::DataDirectoryError(format!("Failed to scan for tables: {}", e))
})?;
let tables: Vec<&String> = if let Some(keyspace) = self.session.current_keyspace() {
let prefix = format!("{}.", keyspace);
summary
.tables
.iter()
.filter(|t| t.starts_with(&prefix))
.collect()
} else {
summary.tables.iter().collect()
};
for warning in &summary.warnings {
println!("{}", warning.yellow());
println!();
}
if tables.is_empty() {
println!("No tables found");
if self.session.current_keyspace().is_some() {
println!("Try :use to switch keyspaces or run :tables without USE");
}
} else {
for table in tables {
println!(" {}", table.green());
}
}
Ok(())
}
}
async fn execute_describe_command(&mut self, object_name: &str) -> ReplResult<()> {
println!(
"{} {}",
"🔍 Describing:".cyan().bold(),
object_name.yellow()
);
#[cfg(not(feature = "state_machine"))]
{
return Err(ReplError::UnsupportedFeature(
"Describe command requires state_machine feature".to_string(),
));
}
#[cfg(feature = "state_machine")]
{
let (keyspace, table) = if object_name.contains('.') {
let parts: Vec<&str> = object_name.split('.').collect();
if parts.len() == 2 {
(Some(parts[0].to_string()), parts[1])
} else if parts.len() > 2 {
eprintln!(
"{} Invalid object name format: '{}'. Use keyspace.table or table",
"Error:".red().bold(),
object_name
);
return Ok(());
} else {
(self.session.current_keyspace().cloned(), object_name)
}
} else {
(self.session.current_keyspace().cloned(), object_name)
};
let Some(ks) = keyspace else {
eprintln!(
"{} No keyspace specified and no current keyspace set",
"Error:".red().bold()
);
println!("💡 Try :use <keyspace> first, or use keyspace.table format");
return Ok(());
};
if let Some(registry) = self.session.schema_registry() {
let registry_guard = registry.read().await;
match registry_guard.get_schema(&ks, table).await {
Ok(schema) => {
let description = self.format_table_description(&schema);
println!("{}", description);
}
Err(e) => {
eprintln!(
"{} Table '{}.{}' not found: {}",
"Error:".red().bold(),
ks,
table,
e
);
println!("💡 Try :tables to list available tables");
}
}
} else {
eprintln!("{} Schema registry not available", "Error:".red().bold());
println!("💡 Ensure schema was loaded with --schema flag");
}
Ok(())
}
}
#[cfg(feature = "state_machine")]
fn format_table_description(&self, schema: &cqlite_core::schema::TableSchema) -> String {
use std::fmt::Write;
let mut output = String::new();
let _ = writeln!(output, "Table: {}.{}\n", schema.keyspace, schema.table);
let _ = writeln!(output, "Columns:");
let mut all_columns: Vec<(String, String, String)> = Vec::new();
for pk in schema.ordered_partition_keys() {
all_columns.push((
pk.name.clone(),
pk.data_type.clone(),
"PARTITION KEY".to_string(),
));
}
for ck in schema.ordered_clustering_keys() {
let order_str = match ck.order {
cqlite_core::schema::ClusteringOrder::Asc => "CLUSTERING KEY".to_string(),
cqlite_core::schema::ClusteringOrder::Desc => "CLUSTERING KEY DESC".to_string(),
};
all_columns.push((ck.name.clone(), ck.data_type.clone(), order_str));
}
for col in &schema.columns {
let is_key = schema.partition_keys.iter().any(|k| k.name == col.name)
|| schema.clustering_keys.iter().any(|k| k.name == col.name);
if !is_key {
all_columns.push((col.name.clone(), col.data_type.clone(), String::new()));
}
}
let name_width = all_columns
.iter()
.map(|(n, _, _)| n.len())
.max()
.unwrap_or(10)
.max(10);
let type_width = all_columns
.iter()
.map(|(_, t, _)| t.len())
.max()
.unwrap_or(10)
.max(10);
for (name, dtype, role) in all_columns {
if role.is_empty() {
let _ = writeln!(output, " {:<name_width$} {:<type_width$}", name, dtype);
} else {
let _ = writeln!(
output,
" {:<name_width$} {:<type_width$} ({})",
name, dtype, role
);
}
}
output
}
async fn execute_use_command(&mut self, keyspace: &str) -> ReplResult<()> {
match self.session.use_keyspace(keyspace).await {
Ok(()) => {
println!(
"{} Now using keyspace: {}",
"✅".green(),
keyspace.yellow().bold()
);
}
Err(e) => {
eprintln!(
"{} Failed to use keyspace {}: {}",
"Error:".red().bold(),
keyspace,
e
);
}
}
Ok(())
}
async fn execute_cql_query(&mut self, query: &str) -> ReplResult<()> {
let start_time = std::time::Instant::now();
println!("{} {}", "🔍 Executing:".blue().bold(), query.yellow());
match self.session.execute_query(query).await {
Ok(result) => {
let elapsed = start_time.elapsed();
self.display_query_result(&result)?;
if self.config.show_timing {
println!();
println!(
"{} {:.2}ms",
"⏱️ Execution time:".green(),
elapsed.as_millis()
);
}
}
Err(e) => {
let elapsed = start_time.elapsed();
eprintln!(
"{} Query failed after {:.2}ms",
"❌ Error:".red().bold(),
elapsed.as_millis()
);
eprintln!(" {}", e.to_string().red());
self.provide_query_hints(query, &e);
}
}
Ok(())
}
async fn execute_clear_command(&self) -> ReplResult<()> {
print!("\\x1B[2J\\x1B[1;1H");
io::stdout().flush().map_err(ReplError::Io)?;
Ok(())
}
async fn execute_history_command(&self) -> ReplResult<()> {
if let Some(ref history) = self.history {
println!("{}", "📜 Command History".cyan().bold());
println!("{}", "═".repeat(20).cyan());
let commands = history.recent_commands(20);
if commands.is_empty() {
println!("📭 No commands in history");
} else {
for (i, cmd) in commands.iter().enumerate() {
println!(" {:3}. {}", i + 1, cmd);
}
}
} else {
println!("{} History is disabled", "Info:".cyan().bold());
}
Ok(())
}
async fn execute_source_command(&mut self, file_path: &str) -> ReplResult<()> {
println!(
"{} Executing commands from: {}",
"📂".cyan(),
file_path.yellow()
);
let path = std::path::Path::new(file_path);
if !path.exists() {
eprintln!("{} File not found: {}", "Error:".red().bold(), file_path);
return Ok(());
}
let content = std::fs::read_to_string(path).map_err(|e| ReplError::Io(e))?;
let mut executed = 0;
let errors = 0;
for (line_num, line) in content.lines().enumerate() {
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with("--") || trimmed.starts_with("#") {
continue;
}
println!(
"{}:{} {}",
file_path,
line_num + 1,
trimmed.to_string().dimmed()
);
match self.process_input(trimmed).await? {
ExecutionResult::Continue => executed += 1,
ExecutionResult::Exit => {
println!("🛑 Execution stopped due to exit command");
break;
}
ExecutionResult::ExitWithCode(_) => {
println!("🛑 Execution stopped");
break;
}
}
}
println!();
println!(
"📊 File execution completed: {} commands executed, {} errors",
executed, errors
);
Ok(())
}
fn display_query_result(&self, result: &QueryResult) -> ReplResult<()> {
match self.config.output_format {
OutputFormat::Table => self.display_table_result(result),
OutputFormat::Csv => self.display_csv_result(result),
OutputFormat::Json => self.display_json_result(result),
OutputFormat::Raw => self.display_raw_result(result),
}
}
fn display_table_result(&self, result: &QueryResult) -> ReplResult<()> {
use crate::config::OutputConfig;
use crate::output::TableWriter;
if result.rows.is_empty() {
if result.rows_affected > 0 {
println!(
"{} {} rows affected",
"✅".green().bold(),
result.rows_affected
);
} else {
println!("{} No rows returned", "📭".yellow());
}
return Ok(());
}
let output_config = OutputConfig {
color_enabled: self.config.enable_colors,
limit: None,
page_size: None,
target: crate::output::OutputTarget::Stdout,
overwrite: false,
};
let formatted = TableWriter::write(result, &output_config)
.map_err(|e| ReplError::Session(format!("Failed to format table output: {}", e)))?;
println!("{}", formatted);
Ok(())
}
fn display_csv_result(&self, result: &QueryResult) -> ReplResult<()> {
use crate::config::OutputConfig;
use crate::output::CSVWriter;
let output_config = OutputConfig {
color_enabled: self.config.enable_colors,
limit: None, page_size: None,
target: crate::output::OutputTarget::Stdout,
overwrite: false,
};
let formatted = CSVWriter::write(result, &output_config)
.map_err(|e| ReplError::Session(format!("Failed to format CSV output: {}", e)))?;
println!("{}", formatted);
Ok(())
}
fn display_json_result(&self, result: &QueryResult) -> ReplResult<()> {
use crate::config::OutputConfig;
use crate::output::JSONWriter;
let output_config = OutputConfig {
color_enabled: self.config.enable_colors,
limit: None, page_size: None,
target: crate::output::OutputTarget::Stdout,
overwrite: false,
};
let formatted = JSONWriter::write(result, &output_config)
.map_err(|e| ReplError::Session(format!("Failed to format JSON output: {}", e)))?;
println!("{}", formatted);
Ok(())
}
fn display_raw_result(&self, _result: &QueryResult) -> ReplResult<()> {
println!("Raw output not yet implemented");
Ok(())
}
fn provide_query_hints(&self, _query: &str, error: &ReplError) {
let error_msg = error.to_string();
println!();
if error_msg.contains("table") && error_msg.contains("not found") {
println!("{} Table not found. Try:", "💡 Hint:".cyan().bold());
println!(" • {} to list tables", ":tables".green());
println!(" • Check table name spelling");
} else if error_msg.contains("syntax") {
println!("{} Syntax error. Try:", "💡 Hint:".cyan().bold());
println!(" • {} for CQL help", ":help cql".green());
println!(" • Check query syntax");
} else {
println!(
"{} For general help: {}",
"💡 Hint:".cyan().bold(),
":help".green()
);
}
}
fn show_general_help(&self) {
println!("{}", "CQLite REPL Help".cyan().bold());
println!("{}", "═".repeat(20).cyan());
println!();
println!("Commands:");
println!(" :help [topic] Show help (topics: commands, config, cql, examples)");
println!(" :quit, :exit Exit the REPL");
println!(" :tables List all tables");
println!(" :describe <obj> Describe object");
println!(" :use <keyspace> Switch keyspace");
println!(" :config [op] Show/set configuration");
println!(" :status Show discovery and schema coverage status");
println!(" :health Show health diagnostics");
println!(" :clear Clear screen");
println!(" :history Show command history");
println!(" :source <file> Execute commands from file");
println!();
println!("CQL queries can be executed directly (end with semicolon for multi-line)");
}
fn show_commands_help(&self) {
println!("{}", "Available Commands".cyan().bold());
println!("{}", "═".repeat(20).cyan());
println!();
println!("Meta Commands:");
println!(" :help Show this help");
println!(" :quit, :exit Exit REPL");
println!(" :clear Clear screen");
println!(" :history Show recent commands");
println!();
println!("Database Commands:");
println!(" :tables List all tables");
println!(" :describe <obj> Show object schema");
println!(" :use <keyspace> Switch to keyspace");
println!(" :status Show discovery and schema coverage status");
println!(" :health Show health diagnostics");
println!();
println!("File Commands:");
println!(" :source <file> Execute CQL file");
println!();
println!("Configuration:");
println!(" :config Show merged effective configuration (read-only)");
}
fn show_config_help(&self) {
println!("{}", "Configuration Help".cyan().bold());
println!("{}", "═".repeat(20).cyan());
println!();
println!("View merged effective configuration (read-only):");
println!(" :config Display all configuration settings");
println!();
println!("The :config command shows:");
println!(" • Data & Schema settings (data_directory, schema_paths, default_keyspace)");
println!(" • Output settings (output_mode, query_limit, colors)");
println!(" • REPL settings (page_size, show_timing, history, completion)");
println!(
" • Precedence chain (CLI > ENV > --config > .cqlite.toml > user config > defaults)"
);
println!();
println!("Note: Configuration is read-only in M2. Use CLI flags, environment");
println!(" variables, or config files to modify settings.");
}
fn show_cql_help(&self) {
println!("{}", "CQL Query Help".cyan().bold());
println!("{}", "═".repeat(20).cyan());
println!();
println!("Supported CQL:");
println!(" SELECT * FROM table;");
println!(" SELECT col1, col2 FROM table WHERE condition;");
println!(" DESCRIBE TABLE table_name;");
println!();
println!("Multi-line queries:");
println!(" Start typing a query and press Enter");
println!(" Continue on next lines");
println!(" End with semicolon (;) to execute");
}
fn show_examples_help(&self) {
println!("{}", "Usage Examples".cyan().bold());
println!("{}", "═".repeat(20).cyan());
println!();
println!("Basic workflow:");
println!(" :tables");
println!(" :describe users");
println!(" SELECT * FROM users LIMIT 5;");
println!();
println!("Configuration:");
println!(" :config output_format=json");
println!(" :config show_timing=true");
println!();
println!("File execution:");
println!(" :source /path/to/queries.sql");
}
fn show_current_config(&self) {
let cli_config = self.session.config();
println!("{}", "Effective Configuration".cyan().bold());
println!("{}", "═".repeat(60).cyan());
println!();
println!("{}", "Data & Schema:".yellow().bold());
if let Some(ref data_dir) = cli_config.data_directory {
println!(" data_directory = {}", data_dir.display());
} else {
println!(" data_directory = {}", "<not set>".dimmed());
}
if !cli_config.schema_paths.is_empty() {
let paths: Vec<String> = cli_config
.schema_paths
.iter()
.map(|p| p.display().to_string())
.collect();
println!(" schema_paths = [{}]", paths.join(", "));
} else {
println!(" schema_paths = {}", "[]".dimmed());
}
if let Some(ref keyspace) = cli_config.default_keyspace {
println!(" default_keyspace = {}", keyspace);
} else {
println!(" default_keyspace = {}", "<not set>".dimmed());
}
println!();
println!("{}", "Output Settings:".yellow().bold());
if let Some(ref mode) = cli_config.output_mode {
println!(" output_mode = {}", mode);
} else {
println!(" output_mode = {}", "table".dimmed());
}
if let Some(limit) = cli_config.query_limit {
println!(" query_limit = {}", limit);
} else {
println!(" query_limit = {}", "<unlimited>".dimmed());
}
println!(" no_color = {}", cli_config.no_color);
println!(" colors = {}", cli_config.output.colors);
if let Some(max_rows) = cli_config.output.max_rows {
println!(" max_rows = {}", max_rows);
} else {
println!(" max_rows = {}", "<unlimited>".dimmed());
}
println!();
println!("{}", "REPL Settings:".yellow().bold());
println!(" mode = {:?}", self.config.mode);
println!(" output_format = {:?}", self.config.output_format);
println!(" page_size = {}", self.config.page_size);
println!(" show_timing = {}", self.config.show_timing);
println!(" enable_paging = {}", self.config.enable_paging);
println!(" enable_colors = {}", self.config.enable_colors);
println!(
" history = {}",
if self.history.is_some() {
"enabled"
} else {
"disabled"
}
);
println!(
" completion = {}",
if self.completion.is_some() {
"enabled"
} else {
"disabled"
}
);
println!();
println!("{}", "Precedence Chain:".yellow().bold());
println!(" {}", "CLI flags > Environment variables > Explicit config (--config) > Project config (./.cqlite.toml) > User config > Defaults".dimmed());
}
async fn set_config_value(&mut self, key: &str, value: &str) -> ReplResult<()> {
match key {
"data-dir" | "data_dir" => {
let data_dir = std::path::PathBuf::from(value);
if !data_dir.exists() {
println!(
"{} Directory does not exist: {}",
"Error:".red().bold(),
value
);
return Ok(());
}
if !data_dir.is_dir() {
println!(
"{} Path is not a directory: {}",
"Error:".red().bold(),
value
);
return Ok(());
}
println!(
"{} Changing data directory to: {}",
"Info:".cyan(),
data_dir.display().to_string().yellow()
);
self.rebuild_database_from_discovery(
data_dir,
self.schema_paths.clone(),
self.version_hint.clone(),
)
.await?;
}
"output_format" => {
self.config.output_format = match value.to_lowercase().as_str() {
"table" => OutputFormat::Table,
"csv" => OutputFormat::Csv,
"json" => OutputFormat::Json,
"raw" => OutputFormat::Raw,
_ => {
println!(
"{} Invalid output format. Use: table, csv, json, raw",
"Error:".red().bold()
);
return Ok(());
}
};
println!(
"{} Output format set to: {:?}",
"✅".green(),
self.config.output_format
);
}
"page_size" => match value.parse::<usize>() {
Ok(size) if size > 0 => {
self.config.page_size = size;
println!("{} Page size set to: {}", "✅".green(), size);
}
_ => {
println!(
"{} Invalid page size. Must be positive number",
"Error:".red().bold()
);
}
},
"show_timing" => match value.to_lowercase().as_str() {
"true" | "on" | "1" | "yes" => {
self.config.show_timing = true;
println!("{} Timing enabled", "✅".green());
}
"false" | "off" | "0" | "no" => {
self.config.show_timing = false;
println!("{} Timing disabled", "✅".green());
}
_ => {
println!(
"{} Invalid boolean value. Use: true/false",
"Error:".red().bold()
);
}
},
"enable_paging" => match value.to_lowercase().as_str() {
"true" | "on" | "1" | "yes" => {
self.config.enable_paging = true;
println!("{} Paging enabled", "✅".green());
}
"false" | "off" | "0" | "no" => {
self.config.enable_paging = false;
println!("{} Paging disabled", "✅".green());
}
_ => {
println!(
"{} Invalid boolean value. Use: true/false",
"Error:".red().bold()
);
}
},
_ => {
println!(
"{} Unknown configuration key: {}",
"Error:".red().bold(),
key
);
println!("Available keys: data-dir, output_format, page_size, show_timing, enable_paging");
}
}
Ok(())
}
pub fn session(&self) -> &ReplSession {
&self.session
}
pub fn session_mut(&mut self) -> &mut ReplSession {
&mut self.session
}
pub fn config(&self) -> &ReplConfig {
&self.config
}
async fn execute_schema_command(
&mut self,
operation: super::SchemaOperation,
) -> ReplResult<()> {
use super::SchemaOperation;
match operation {
SchemaOperation::Load { paths } => {
println!(
"{} Loading schemas from {} file(s)...",
"Info:".cyan(),
paths.len()
);
let schema_paths: Vec<std::path::PathBuf> =
paths.iter().map(|p| std::path::PathBuf::from(p)).collect();
for path in &schema_paths {
if !path.exists() {
return Err(ReplError::SchemaError(format!(
"Schema file not found: {}",
path.display()
)));
}
}
let data_dir = match self.session.data_dir() {
Some(dir) => dir.to_path_buf(),
None => {
return Err(ReplError::DataDirectoryError(
"No data directory configured. Use :config data-dir=<path> first"
.to_string(),
));
}
};
self.schema_paths = schema_paths.clone();
self.rebuild_database_from_discovery(
data_dir,
schema_paths,
self.version_hint.clone(),
)
.await?;
}
SchemaOperation::Refresh => {
println!("{} Refreshing schemas...", "Info:".cyan());
if self.schema_paths.is_empty() {
println!(
"{} No schemas loaded. Use :schema load <path> first",
"Warning:".yellow()
);
return Ok(());
}
let data_dir = match self.session.data_dir() {
Some(dir) => dir.to_path_buf(),
None => {
return Err(ReplError::DataDirectoryError(
"No data directory configured".to_string(),
));
}
};
self.rebuild_database_from_discovery(
data_dir,
self.schema_paths.clone(),
self.version_hint.clone(),
)
.await?;
}
SchemaOperation::Unload => {
println!("{} Unloading schemas...", "Info:".cyan());
let data_dir = match self.session.data_dir() {
Some(dir) => dir.to_path_buf(),
None => {
return Err(ReplError::DataDirectoryError(
"No data directory configured".to_string(),
));
}
};
self.schema_paths.clear();
self.rebuild_database_from_discovery(
data_dir,
Vec::new(),
self.version_hint.clone(),
)
.await?;
}
SchemaOperation::Show => {
println!("{}", "Schema Status".cyan().bold());
println!("{}", "═".repeat(25).cyan());
println!();
if self.schema_paths.is_empty() {
println!("No schemas loaded");
} else {
println!("Loaded schemas ({}):", self.schema_paths.len());
for (i, path) in self.schema_paths.iter().enumerate() {
println!(" {}. {}", i + 1, path.display().to_string().green());
}
}
if let Some(ref data_dir) = self.session.data_dir() {
println!();
println!(
"Data directory: {}",
data_dir.display().to_string().yellow()
);
}
if let Some(ref version) = self.version_hint {
println!("Version hint: {}", version.yellow());
}
}
SchemaOperation::List => {
commands::execute_schema_list(&self.schema_paths).await?;
}
}
Ok(())
}
pub async fn rebuild_database_from_discovery(
&mut self,
data_dir: std::path::PathBuf,
schema_paths: Vec<std::path::PathBuf>,
version_hint: Option<String>,
) -> ReplResult<()> {
use cqlite_core::ingestion::{ingest, IngestionConfig};
println!("{}", "Rebuilding database from discovery...".cyan().bold());
let ingestion_config = IngestionConfig {
schema_paths: schema_paths.clone(),
data_dir: data_dir.clone(),
version_hint: version_hint.clone(),
core_config: cqlite_core::Config::default(),
table_directory_filter: None, };
let start_time = std::time::Instant::now();
let ingestion_result = ingest(ingestion_config)
.await
.map_err(|e| ReplError::Database(e.into()))?;
let elapsed = start_time.elapsed();
println!(
"{} {} schemas loaded, {} UDTs loaded",
"Schema:".green(),
ingestion_result.schema_load_result.schemas_loaded,
ingestion_result.schema_load_result.udts_loaded
);
if !ingestion_result.schema_load_result.warnings.is_empty() {
println!(
"{} {} warning(s)",
"Warnings:".yellow(),
ingestion_result.schema_load_result.warnings.len()
);
for warning in &ingestion_result.schema_load_result.warnings {
println!(" - {}", warning.message.yellow());
}
}
println!(
"{} {} SSTables discovered across {} keyspaces",
"Discovery:".green(),
ingestion_result.discovery_summary.sstables_found,
ingestion_result.discovery_summary.keyspaces.len()
);
if let Some(ref version) = ingestion_result.discovery_summary.resolved_version {
println!("{} Cassandra {}", "Version:".green(), version.yellow());
}
self.session.replace_database(ingestion_result.database)?;
self.session.set_data_dir(Some(data_dir.clone()));
self.session
.set_schema_registry(Some(ingestion_result.schema_registry));
println!(
"{} Database rebuilt in {:.2}ms",
"Success:".green().bold(),
elapsed.as_millis()
);
Ok(())
}
}