1use super::{
7 commands, CommandParser, CommandType, CompletionEngine, ExecutionResult, HistoryManager,
8 OutputFormat, ParsedCommand, ReplError, ReplMode, ReplResult, ReplSession,
9};
10use crate::config::Config;
11use crate::status_metrics::{HealthIndicator, StatusMetrics, METRICS_REFRESH_INTERVAL};
12use colored::Colorize;
13use cqlite_core::{Database, QueryResult};
14use std::io::{self, IsTerminal, Write};
15use std::path::Path;
16
17#[derive(Debug, Clone)]
19pub struct ReplConfig {
20 pub mode: ReplMode,
22 pub enable_history: bool,
24 pub enable_completion: bool,
26 pub enable_colors: bool,
28 pub output_format: OutputFormat,
30 pub max_history_size: usize,
32 pub page_size: usize,
34 pub show_timing: bool,
36 pub enable_paging: bool,
38 pub prompt: String,
40 pub prompt_continuation: String,
42 pub show_status_line: bool,
44}
45
46impl Default for ReplConfig {
47 fn default() -> Self {
48 Self {
49 mode: ReplMode::Interactive,
50 enable_history: true,
51 enable_completion: true,
52 enable_colors: true,
53 output_format: OutputFormat::Table,
54 max_history_size: 1000,
55 page_size: 50,
56 show_timing: false,
57 enable_paging: true,
58 prompt: "cqlite> ".to_string(),
59 prompt_continuation: " -> ".to_string(),
60 show_status_line: true,
61 }
62 }
63}
64
65pub struct ReplEngine {
67 config: ReplConfig,
69 parser: CommandParser,
71 session: ReplSession,
73 history: Option<HistoryManager>,
75 completion: Option<CompletionEngine>,
77 command_buffer: String,
79 in_multiline: bool,
81 schema_paths: Vec<std::path::PathBuf>,
83 version_hint: Option<String>,
85 cached_metrics: Option<StatusMetrics>,
87}
88
89impl ReplEngine {
90 pub fn new(
92 config: ReplConfig,
93 db_path: &Path,
94 app_config: Config,
95 database: Database,
96 ) -> ReplResult<Self> {
97 Self::with_schema_registry(config, db_path, app_config, database, None)
98 }
99
100 pub fn with_schema_registry(
106 config: ReplConfig,
107 db_path: &Path,
108 app_config: Config,
109 database: Database,
110 schema_registry: Option<
111 std::sync::Arc<tokio::sync::RwLock<cqlite_core::schema::registry::SchemaRegistry>>,
112 >,
113 ) -> ReplResult<Self> {
114 let mut session = ReplSession::new(db_path, app_config, database)?;
115
116 if let Some(registry) = schema_registry {
118 session.set_schema_registry(Some(registry));
119 }
120
121 let parser = CommandParser::new();
122
123 let history = if config.enable_history {
124 Some(HistoryManager::new(config.max_history_size)?)
125 } else {
126 None
127 };
128
129 let completion = if config.enable_completion {
130 Some(CompletionEngine::new())
131 } else {
132 None
133 };
134
135 Ok(Self {
136 config,
137 parser,
138 session,
139 history,
140 completion,
141 command_buffer: String::new(),
142 in_multiline: false,
143 schema_paths: Vec::new(),
144 version_hint: None,
145 cached_metrics: None,
146 })
147 }
148
149 pub async fn run(&mut self) -> ReplResult<()> {
151 self.session.initialize().await?;
153
154 self.display_startup_banner().await?;
155
156 match self.config.mode {
157 ReplMode::Basic => self.run_basic_repl().await,
158 ReplMode::Interactive => self.run_interactive_repl().await,
159 ReplMode::Tui => self.run_tui_repl().await,
160 }
161 }
162
163 async fn run_basic_repl(&mut self) -> ReplResult<()> {
165 let stdin = io::stdin();
166 let mut input = String::new();
167
168 loop {
169 self.display_prompt().await?;
171
172 input.clear();
174 match stdin.read_line(&mut input) {
175 Ok(0) => break, Ok(_) => {
177 let trimmed = input.trim();
178 if trimmed.is_empty() {
179 continue;
180 }
181
182 match self.process_input(trimmed).await {
183 Ok(ExecutionResult::Continue) => continue,
184 Ok(ExecutionResult::Exit) => break,
185 Ok(ExecutionResult::ExitWithCode(code)) => {
186 return Err(match code {
188 3 => ReplError::SchemaError("Schema error occurred".to_string()),
189 4 => ReplError::DataDirectoryError(
190 "Data directory error occurred".to_string(),
191 ),
192 5 => {
193 ReplError::UnsupportedFeature("Unsupported feature".to_string())
194 }
195 _ => ReplError::Session(format!("Exit with code {}", code)),
196 });
197 }
198 Err(e) => {
199 eprintln!("{} {}", "Error:".red().bold(), e);
201 if matches!(
203 e,
204 ReplError::SchemaError(_)
205 | ReplError::DataDirectoryError(_)
206 | ReplError::UnsupportedFeature(_)
207 ) {
208 return Err(e);
209 }
210 continue;
211 }
212 }
213 }
214 Err(e) => {
215 eprintln!("{} Input error: {}", "Error:".red().bold(), e);
216 break;
217 }
218 }
219 }
220
221 self.display_goodbye().await?;
222 Ok(())
223 }
224
225 async fn run_interactive_repl(&mut self) -> ReplResult<()> {
227 use rustyline::error::ReadlineError;
228 use rustyline::DefaultEditor;
229
230 let mut rl: DefaultEditor = DefaultEditor::new()
232 .map_err(|e| ReplError::Session(format!("Failed to initialize line editor: {}", e)))?;
233
234 if self.history.is_some() {
236 let history_file = self.session.config().repl.history_file.clone().or_else(|| {
238 dirs::home_dir().map(|home| home.join(".cqlite_history"))
240 });
241
242 if let Some(ref path) = history_file {
243 let _ = rl.load_history(path);
245 }
246 }
247
248 self.display_startup_banner().await?;
250
251 loop {
252 if !self.in_multiline {
254 self.display_status_line().await?;
255 }
256
257 let prompt = if self.in_multiline {
259 self.config.prompt_continuation.clone()
260 } else {
261 self.format_prompt()
262 };
263
264 let readline = rl.readline(&prompt);
266
267 match readline {
268 Ok(line) => {
269 let trimmed = line.trim();
270 if trimmed.is_empty() {
271 continue;
272 }
273
274 let _ = rl.add_history_entry(&line);
276
277 match self.process_input(trimmed).await {
278 Ok(ExecutionResult::Continue) => continue,
279 Ok(ExecutionResult::Exit) => break,
280 Ok(ExecutionResult::ExitWithCode(code)) => {
281 if let Some(history_file) =
283 self.session.config().repl.history_file.clone().or_else(|| {
284 dirs::home_dir().map(|home| home.join(".cqlite_history"))
285 })
286 {
287 let _ = rl.save_history(&history_file);
288 }
289
290 return Err(match code {
292 3 => ReplError::SchemaError("Schema error occurred".to_string()),
293 4 => ReplError::DataDirectoryError(
294 "Data directory error occurred".to_string(),
295 ),
296 5 => {
297 ReplError::UnsupportedFeature("Unsupported feature".to_string())
298 }
299 _ => ReplError::Session(format!("Exit with code {}", code)),
300 });
301 }
302 Err(e) => {
303 eprintln!("{} {}", "Error:".red().bold(), e);
305 if matches!(
307 e,
308 ReplError::SchemaError(_)
309 | ReplError::DataDirectoryError(_)
310 | ReplError::UnsupportedFeature(_)
311 ) {
312 if let Some(history_file) =
314 self.session.config().repl.history_file.clone().or_else(|| {
315 dirs::home_dir().map(|home| home.join(".cqlite_history"))
316 })
317 {
318 let _ = rl.save_history(&history_file);
319 }
320 return Err(e);
321 }
322 continue;
323 }
324 }
325 }
326 Err(ReadlineError::Interrupted) => {
327 if self.in_multiline {
329 self.reset_command_buffer();
331 println!("^C");
332 continue;
333 } else {
334 println!("^C");
336 break;
337 }
338 }
339 Err(ReadlineError::Eof) => {
340 break;
342 }
343 Err(err) => {
344 eprintln!("{} Readline error: {}", "Error:".red().bold(), err);
345 break;
346 }
347 }
348 }
349
350 if let Some(history_file) = self
352 .session
353 .config()
354 .repl
355 .history_file
356 .clone()
357 .or_else(|| dirs::home_dir().map(|home| home.join(".cqlite_history")))
358 {
359 if let Some(parent) = history_file.parent() {
361 let _ = std::fs::create_dir_all(parent);
362 }
363
364 if let Err(e) = rl.save_history(&history_file) {
366 eprintln!("{} Failed to save history: {}", "Warning:".yellow(), e);
367 }
368 }
369
370 self.display_goodbye().await?;
371 Ok(())
372 }
373
374 async fn run_tui_repl(&mut self) -> ReplResult<()> {
376 println!(
379 "{} TUI mode not yet implemented in core engine",
380 "Info:".cyan().bold()
381 );
382 self.run_interactive_repl().await
383 }
384
385 pub fn process_input<'a>(
387 &'a mut self,
388 input: &'a str,
389 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ReplResult<ExecutionResult>> + 'a>>
390 {
391 Box::pin(async move { self.process_input_impl(input).await })
392 }
393
394 async fn process_input_impl(&mut self, input: &str) -> ReplResult<ExecutionResult> {
396 if self.should_continue_multiline(input) {
398 self.add_to_command_buffer(input);
399 return Ok(ExecutionResult::Continue);
400 }
401
402 let command = if self.in_multiline {
404 self.add_to_command_buffer(input);
405 let full_command = self.command_buffer.clone();
406 self.reset_command_buffer();
407 full_command
408 } else {
409 input.to_string()
410 };
411
412 if let Some(ref mut history) = self.history {
414 history.add_command(&command)?;
415 }
416
417 match self.parser.parse(&command) {
419 Ok(parsed_command) => self.execute_command(parsed_command).await,
420 Err(e) => {
421 eprintln!("{} Command parsing error: {}", "Error:".red().bold(), e);
422 Ok(ExecutionResult::Continue)
423 }
424 }
425 }
426
427 async fn execute_command(&mut self, command: ParsedCommand) -> ReplResult<ExecutionResult> {
429 match command.command_type {
430 CommandType::Exit => Ok(ExecutionResult::Exit),
431 CommandType::Help { topic } => {
432 self.execute_help_command(topic.as_deref()).await?;
433 Ok(ExecutionResult::Continue)
434 }
435 CommandType::Config { operation } => {
436 self.execute_config_command(operation).await?;
437 Ok(ExecutionResult::Continue)
438 }
439 CommandType::Tables => {
440 self.execute_tables_command().await?;
441 Ok(ExecutionResult::Continue)
442 }
443 CommandType::Describe { object_name } => {
444 self.execute_describe_command(&object_name).await?;
445 Ok(ExecutionResult::Continue)
446 }
447 CommandType::Use { keyspace } => {
448 self.execute_use_command(&keyspace).await?;
449 Ok(ExecutionResult::Continue)
450 }
451 CommandType::CqlQuery { query } => {
452 self.execute_cql_query(&query).await?;
453 Ok(ExecutionResult::Continue)
454 }
455 CommandType::Clear => {
456 self.execute_clear_command().await?;
457 Ok(ExecutionResult::Continue)
458 }
459 CommandType::History => {
460 self.execute_history_command().await?;
461 Ok(ExecutionResult::Continue)
462 }
463 CommandType::Source { file_path } => {
464 self.execute_source_command(&file_path).await?;
465 Ok(ExecutionResult::Continue)
466 }
467 CommandType::Status => {
468 let schema_registry = self.session.schema_registry();
470 commands::execute_status(self.session.data_dir(), schema_registry)
471 .await
472 .map_err(|e| {
473 let err_msg = e.to_string().to_lowercase();
474 if err_msg.contains("requires state_machine feature") {
475 ReplError::UnsupportedFeature(e.to_string())
476 } else if err_msg.contains("data directory") {
477 ReplError::DataDirectoryError(e.to_string())
478 } else {
479 ReplError::Database(e)
480 }
481 })?;
482 Ok(ExecutionResult::Continue)
483 }
484 CommandType::Health => {
485 commands::execute_health(
488 self.session.data_dir(),
489 None, self.config.page_size,
491 self.config.show_timing,
492 self.config.enable_colors,
493 )
494 .await
495 .map_err(|e| ReplError::Database(e))?;
496 Ok(ExecutionResult::Continue)
497 }
498 CommandType::Keyspaces => {
499 commands::execute_keyspaces(self.session.data_dir())
500 .await
501 .map_err(|e| {
502 let err_msg = e.to_string().to_lowercase();
503 if err_msg.contains("requires state_machine feature") {
504 ReplError::UnsupportedFeature(e.to_string())
505 } else if err_msg.contains("data directory") {
506 ReplError::DataDirectoryError(e.to_string())
507 } else {
508 ReplError::Database(e)
509 }
510 })?;
511 Ok(ExecutionResult::Continue)
512 }
513 CommandType::Schema { operation } => {
514 self.execute_schema_command(operation).await?;
515 Ok(ExecutionResult::Continue)
516 }
517 CommandType::Unknown { input } => {
518 eprintln!("{} Unknown command: {}", "Error:".red().bold(), input);
519 println!("Type {} for help", ":help".green());
520 Ok(ExecutionResult::Continue)
521 }
522 CommandType::Flush => {
524 eprintln!(
525 "{} Flush command requires write mode. Start REPL with --writable --write-dir <path>",
526 "Error:".red().bold()
527 );
528 Ok(ExecutionResult::Continue)
529 }
530 CommandType::WriteStats => {
531 eprintln!(
532 "{} Write stats command requires write mode. Start REPL with --writable --write-dir <path>",
533 "Error:".red().bold()
534 );
535 Ok(ExecutionResult::Continue)
536 }
537 CommandType::Maintenance { budget_ms } => {
538 let _ = budget_ms;
539 eprintln!(
540 "{} Maintenance command requires write mode. Start REPL with --writable --write-dir <path>",
541 "Error:".red().bold()
542 );
543 Ok(ExecutionResult::Continue)
544 }
545 }
546 }
547
548 fn should_continue_multiline(&self, input: &str) -> bool {
550 if self.in_multiline {
552 return !input.trim_end().ends_with(';');
553 }
554
555 let trimmed = input.trim();
557 let sql_keywords = [
558 "SELECT", "INSERT", "UPDATE", "DELETE", "CREATE", "ALTER", "DROP",
559 ];
560
561 sql_keywords
562 .iter()
563 .any(|keyword| trimmed.to_uppercase().starts_with(keyword) && !trimmed.ends_with(';'))
564 }
565
566 fn add_to_command_buffer(&mut self, input: &str) {
568 if !self.in_multiline {
569 self.in_multiline = true;
570 self.command_buffer.clear();
571 }
572
573 if !self.command_buffer.is_empty() {
574 self.command_buffer.push(' ');
575 }
576 self.command_buffer.push_str(input);
577 }
578
579 fn reset_command_buffer(&mut self) {
581 self.command_buffer.clear();
582 self.in_multiline = false;
583 }
584
585 async fn display_prompt(&mut self) -> ReplResult<()> {
587 if !self.in_multiline {
589 self.display_status_line().await?;
590 }
591
592 let prompt = if self.in_multiline {
593 self.config.prompt_continuation.clone()
594 } else {
595 self.format_prompt()
596 };
597
598 print!("{}", prompt);
599 io::stdout().flush().map_err(ReplError::Io)?;
600 Ok(())
601 }
602
603 async fn display_status_line(&mut self) -> ReplResult<()> {
605 if !self.config.show_status_line || !self.config.enable_colors {
607 return Ok(());
608 }
609
610 if !io::stdout().is_terminal() {
612 return Ok(());
613 }
614
615 let needs_refresh = self
617 .cached_metrics
618 .as_ref()
619 .map(|m: &StatusMetrics| m.is_stale(METRICS_REFRESH_INTERVAL))
620 .unwrap_or(true);
621
622 if needs_refresh {
623 self.cached_metrics = Some(
624 StatusMetrics::collect(self.session.data_dir(), self.session.database()).await,
625 );
626 }
627
628 if let Some(ref metrics) = self.cached_metrics {
630 let health_str = match metrics.health {
631 HealthIndicator::Ok => "OK".green().to_string(),
632 HealthIndicator::Warning => "WARN".yellow().to_string(),
633 HealthIndicator::Error => "ERR".red().to_string(),
634 };
635
636 println!(
637 "[{}] Mem: {} | Data: {}",
638 health_str,
639 metrics.format_memory().cyan(),
640 metrics.format_data().cyan()
641 );
642 }
643
644 Ok(())
645 }
646
647 fn format_prompt(&self) -> String {
649 let mut prompt = String::new();
650
651 if let Some(ref keyspace) = self.session.current_keyspace() {
653 prompt.push_str(&format!("{}@", keyspace.cyan()));
654 }
655
656 prompt.push_str("cqlite");
658
659 match self.config.mode {
661 ReplMode::Tui => prompt.push_str("[tui]"),
662 ReplMode::Interactive => prompt.push_str("[i]"),
663 ReplMode::Basic => {}
664 }
665
666 prompt.push_str(&"> ".blue().bold().to_string());
667 prompt
668 }
669
670 async fn display_startup_banner(&self) -> ReplResult<()> {
672 if !self.config.enable_colors {
673 println!("CQLite Interactive Shell");
674 println!("Type :help for help, :quit to exit");
675 return Ok(());
676 }
677
678 println!(
679 "{}",
680 "╔═══════════════════════════════════════════════╗".cyan()
681 );
682 println!(
683 "{}",
684 "║ CQLite REPL Engine v2.0 ║"
685 .cyan()
686 .bold()
687 );
688 println!(
689 "{}",
690 "║ High-Performance Cassandra Reader ║".cyan()
691 );
692 println!(
693 "{}",
694 "╚═══════════════════════════════════════════════╝".cyan()
695 );
696 println!();
697
698 println!(
699 "🗄️ Database: {}",
700 self.session.db_path().display().to_string().yellow()
701 );
702 println!("🔧 Mode: {}", format!("{:?}", self.config.mode).green());
703 println!("📊 Engine: {}", "CQLite Core v0.1.0".green());
704
705 if let Some(ref keyspace) = self.session.current_keyspace() {
706 println!("📦 Keyspace: {}", keyspace.yellow());
707 }
708
709 println!();
710 println!("{}", "Quick Commands:".cyan().bold());
711 println!(" • {} - Show help", ":help".green());
712 println!(" • {} - List tables", ":tables".green());
713 println!(" • {} - Execute CQL", "SELECT * FROM table;".yellow());
714 println!(" • {} - Exit", ":quit".red());
715 println!();
716
717 Ok(())
718 }
719
720 async fn display_goodbye(&self) -> ReplResult<()> {
722 if self.config.enable_colors {
723 println!();
724 println!("{}", "Goodbye! Thank you for using CQLite.".cyan().bold());
725 } else {
726 println!("Goodbye!");
727 }
728 Ok(())
729 }
730
731 async fn execute_help_command(&self, topic: Option<&str>) -> ReplResult<()> {
733 match topic {
734 Some("commands") => self.show_commands_help(),
735 Some("config") => self.show_config_help(),
736 Some("cql") => self.show_cql_help(),
737 Some("examples") => self.show_examples_help(),
738 None => self.show_general_help(),
739 Some(unknown) => {
740 println!("{} Unknown help topic: {}", "Error:".red().bold(), unknown);
741 println!("Available topics: commands, config, cql, examples");
742 }
743 }
744 Ok(())
745 }
746
747 async fn execute_config_command(&mut self, operation: String) -> ReplResult<()> {
749 if operation.is_empty() || operation == "show" {
751 self.show_current_config();
752 } else {
753 println!(
754 "{} Configuration is read-only in M2.",
755 "Note:".yellow().bold()
756 );
757 println!("Use CLI flags, environment variables, or config files to modify settings.");
758 println!();
759 self.show_current_config();
760 }
761 Ok(())
762 }
763
764 async fn execute_tables_command(&mut self) -> ReplResult<()> {
769 println!("{}", "Listing tables...".cyan().bold());
770
771 #[cfg(not(feature = "state_machine"))]
772 {
773 return Err(ReplError::UnsupportedFeature(
774 "Tables command requires state_machine feature".to_string(),
775 ));
776 }
777
778 #[cfg(feature = "state_machine")]
779 {
780 use cqlite_core::discovery::DiscoveryService;
781
782 let Some(data_dir) = self.session.data_dir() else {
783 println!("No tables found");
784 println!("Configure data directory with :config data-dir <PATH>");
785 return Ok(());
786 };
787
788 let discovery_service = DiscoveryService::new(data_dir.to_path_buf(), None);
789
790 let summary = discovery_service.scan().await.map_err(|e| {
791 ReplError::DataDirectoryError(format!("Failed to scan for tables: {}", e))
792 })?;
793
794 let tables: Vec<&String> = if let Some(keyspace) = self.session.current_keyspace() {
796 let prefix = format!("{}.", keyspace);
797 summary
798 .tables
799 .iter()
800 .filter(|t| t.starts_with(&prefix))
801 .collect()
802 } else {
803 summary.tables.iter().collect()
804 };
805
806 for warning in &summary.warnings {
808 println!("{}", warning.yellow());
809 println!();
810 }
811
812 if tables.is_empty() {
813 println!("No tables found");
814 if self.session.current_keyspace().is_some() {
815 println!("Try :use to switch keyspaces or run :tables without USE");
816 }
817 } else {
818 for table in tables {
819 println!(" {}", table.green());
820 }
821 }
822
823 Ok(())
824 }
825 }
826
827 async fn execute_describe_command(&mut self, object_name: &str) -> ReplResult<()> {
832 println!(
833 "{} {}",
834 "🔍 Describing:".cyan().bold(),
835 object_name.yellow()
836 );
837
838 #[cfg(not(feature = "state_machine"))]
839 {
840 return Err(ReplError::UnsupportedFeature(
841 "Describe command requires state_machine feature".to_string(),
842 ));
843 }
844
845 #[cfg(feature = "state_machine")]
846 {
847 let (keyspace, table) = if object_name.contains('.') {
849 let parts: Vec<&str> = object_name.split('.').collect();
850 if parts.len() == 2 {
851 (Some(parts[0].to_string()), parts[1])
852 } else if parts.len() > 2 {
853 eprintln!(
854 "{} Invalid object name format: '{}'. Use keyspace.table or table",
855 "Error:".red().bold(),
856 object_name
857 );
858 return Ok(());
859 } else {
860 (self.session.current_keyspace().cloned(), object_name)
861 }
862 } else {
863 (self.session.current_keyspace().cloned(), object_name)
864 };
865
866 let Some(ks) = keyspace else {
867 eprintln!(
868 "{} No keyspace specified and no current keyspace set",
869 "Error:".red().bold()
870 );
871 println!("💡 Try :use <keyspace> first, or use keyspace.table format");
872 return Ok(());
873 };
874
875 if let Some(registry) = self.session.schema_registry() {
877 let registry_guard = registry.read().await;
878 match registry_guard.get_schema(&ks, table).await {
879 Ok(schema) => {
880 let description = self.format_table_description(&schema);
881 println!("{}", description);
882 }
883 Err(e) => {
884 eprintln!(
885 "{} Table '{}.{}' not found: {}",
886 "Error:".red().bold(),
887 ks,
888 table,
889 e
890 );
891 println!("💡 Try :tables to list available tables");
892 }
893 }
894 } else {
895 eprintln!("{} Schema registry not available", "Error:".red().bold());
896 println!("💡 Ensure schema was loaded with --schema flag");
897 }
898
899 Ok(())
900 }
901 }
902
903 #[cfg(feature = "state_machine")]
905 fn format_table_description(&self, schema: &cqlite_core::schema::TableSchema) -> String {
906 use std::fmt::Write;
907 let mut output = String::new();
908
909 let _ = writeln!(output, "Table: {}.{}\n", schema.keyspace, schema.table);
911 let _ = writeln!(output, "Columns:");
912
913 let mut all_columns: Vec<(String, String, String)> = Vec::new();
915
916 for pk in schema.ordered_partition_keys() {
918 all_columns.push((
919 pk.name.clone(),
920 pk.data_type.clone(),
921 "PARTITION KEY".to_string(),
922 ));
923 }
924
925 for ck in schema.ordered_clustering_keys() {
927 let order_str = match ck.order {
928 cqlite_core::schema::ClusteringOrder::Asc => "CLUSTERING KEY".to_string(),
929 cqlite_core::schema::ClusteringOrder::Desc => "CLUSTERING KEY DESC".to_string(),
930 };
931 all_columns.push((ck.name.clone(), ck.data_type.clone(), order_str));
932 }
933
934 for col in &schema.columns {
936 let is_key = schema.partition_keys.iter().any(|k| k.name == col.name)
938 || schema.clustering_keys.iter().any(|k| k.name == col.name);
939 if !is_key {
940 all_columns.push((col.name.clone(), col.data_type.clone(), String::new()));
941 }
942 }
943
944 let name_width = all_columns
946 .iter()
947 .map(|(n, _, _)| n.len())
948 .max()
949 .unwrap_or(10)
950 .max(10);
951 let type_width = all_columns
952 .iter()
953 .map(|(_, t, _)| t.len())
954 .max()
955 .unwrap_or(10)
956 .max(10);
957
958 for (name, dtype, role) in all_columns {
960 if role.is_empty() {
961 let _ = writeln!(output, " {:<name_width$} {:<type_width$}", name, dtype);
962 } else {
963 let _ = writeln!(
964 output,
965 " {:<name_width$} {:<type_width$} ({})",
966 name, dtype, role
967 );
968 }
969 }
970
971 output
972 }
973
974 async fn execute_use_command(&mut self, keyspace: &str) -> ReplResult<()> {
976 match self.session.use_keyspace(keyspace).await {
977 Ok(()) => {
978 println!(
979 "{} Now using keyspace: {}",
980 "✅".green(),
981 keyspace.yellow().bold()
982 );
983 }
984 Err(e) => {
985 eprintln!(
986 "{} Failed to use keyspace {}: {}",
987 "Error:".red().bold(),
988 keyspace,
989 e
990 );
991 }
992 }
993 Ok(())
994 }
995
996 async fn execute_cql_query(&mut self, query: &str) -> ReplResult<()> {
998 let start_time = std::time::Instant::now();
999
1000 println!("{} {}", "🔍 Executing:".blue().bold(), query.yellow());
1001
1002 match self.session.execute_query(query).await {
1003 Ok(result) => {
1004 let elapsed = start_time.elapsed();
1005 self.display_query_result(&result)?;
1006
1007 if self.config.show_timing {
1008 println!();
1009 println!(
1010 "{} {:.2}ms",
1011 "⏱️ Execution time:".green(),
1012 elapsed.as_millis()
1013 );
1014 }
1015 }
1016 Err(e) => {
1017 let elapsed = start_time.elapsed();
1018 eprintln!(
1019 "{} Query failed after {:.2}ms",
1020 "❌ Error:".red().bold(),
1021 elapsed.as_millis()
1022 );
1023 eprintln!(" {}", e.to_string().red());
1024 self.provide_query_hints(query, &e);
1025 }
1026 }
1027
1028 Ok(())
1029 }
1030
1031 async fn execute_clear_command(&self) -> ReplResult<()> {
1033 print!("\\x1B[2J\\x1B[1;1H");
1034 io::stdout().flush().map_err(ReplError::Io)?;
1035 Ok(())
1036 }
1037
1038 async fn execute_history_command(&self) -> ReplResult<()> {
1040 if let Some(ref history) = self.history {
1041 println!("{}", "📜 Command History".cyan().bold());
1042 println!("{}", "═".repeat(20).cyan());
1043
1044 let commands = history.recent_commands(20);
1045 if commands.is_empty() {
1046 println!("📭 No commands in history");
1047 } else {
1048 for (i, cmd) in commands.iter().enumerate() {
1049 println!(" {:3}. {}", i + 1, cmd);
1050 }
1051 }
1052 } else {
1053 println!("{} History is disabled", "Info:".cyan().bold());
1054 }
1055 Ok(())
1056 }
1057
1058 async fn execute_source_command(&mut self, file_path: &str) -> ReplResult<()> {
1060 println!(
1061 "{} Executing commands from: {}",
1062 "📂".cyan(),
1063 file_path.yellow()
1064 );
1065
1066 let path = std::path::Path::new(file_path);
1067 if !path.exists() {
1068 eprintln!("{} File not found: {}", "Error:".red().bold(), file_path);
1069 return Ok(());
1070 }
1071
1072 let content = std::fs::read_to_string(path).map_err(|e| ReplError::Io(e))?;
1073
1074 let mut executed = 0;
1075 let errors = 0;
1076
1077 for (line_num, line) in content.lines().enumerate() {
1078 let trimmed = line.trim();
1079
1080 if trimmed.is_empty() || trimmed.starts_with("--") || trimmed.starts_with("#") {
1082 continue;
1083 }
1084
1085 println!(
1086 "{}:{} {}",
1087 file_path,
1088 line_num + 1,
1089 trimmed.to_string().dimmed()
1090 );
1091
1092 match self.process_input(trimmed).await? {
1093 ExecutionResult::Continue => executed += 1,
1094 ExecutionResult::Exit => {
1095 println!("🛑 Execution stopped due to exit command");
1096 break;
1097 }
1098 ExecutionResult::ExitWithCode(_) => {
1099 println!("🛑 Execution stopped");
1100 break;
1101 }
1102 }
1103 }
1104
1105 println!();
1106 println!(
1107 "📊 File execution completed: {} commands executed, {} errors",
1108 executed, errors
1109 );
1110 Ok(())
1111 }
1112
1113 fn display_query_result(&self, result: &QueryResult) -> ReplResult<()> {
1115 match self.config.output_format {
1116 OutputFormat::Table => self.display_table_result(result),
1117 OutputFormat::Csv => self.display_csv_result(result),
1118 OutputFormat::Json => self.display_json_result(result),
1119 OutputFormat::Raw => self.display_raw_result(result),
1120 }
1121 }
1122
1123 fn display_table_result(&self, result: &QueryResult) -> ReplResult<()> {
1125 use crate::config::OutputConfig;
1126 use crate::output::TableWriter;
1127
1128 if result.rows.is_empty() {
1129 if result.rows_affected > 0 {
1130 println!(
1131 "{} {} rows affected",
1132 "✅".green().bold(),
1133 result.rows_affected
1134 );
1135 } else {
1136 println!("{} No rows returned", "📭".yellow());
1137 }
1138 return Ok(());
1139 }
1140
1141 let output_config = OutputConfig {
1143 color_enabled: self.config.enable_colors,
1144 limit: None,
1145 page_size: None,
1146 target: crate::output::OutputTarget::Stdout,
1147 overwrite: false,
1148 };
1149
1150 let formatted = TableWriter::write(result, &output_config)
1152 .map_err(|e| ReplError::Session(format!("Failed to format table output: {}", e)))?;
1153
1154 println!("{}", formatted);
1155
1156 Ok(())
1157 }
1158
1159 fn display_csv_result(&self, result: &QueryResult) -> ReplResult<()> {
1161 use crate::config::OutputConfig;
1162 use crate::output::CSVWriter;
1163
1164 let output_config = OutputConfig {
1166 color_enabled: self.config.enable_colors,
1167 limit: None, page_size: None,
1169 target: crate::output::OutputTarget::Stdout,
1170 overwrite: false,
1171 };
1172
1173 let formatted = CSVWriter::write(result, &output_config)
1174 .map_err(|e| ReplError::Session(format!("Failed to format CSV output: {}", e)))?;
1175
1176 println!("{}", formatted);
1177
1178 Ok(())
1179 }
1180
1181 fn display_json_result(&self, result: &QueryResult) -> ReplResult<()> {
1183 use crate::config::OutputConfig;
1184 use crate::output::JSONWriter;
1185
1186 let output_config = OutputConfig {
1188 color_enabled: self.config.enable_colors,
1189 limit: None, page_size: None,
1191 target: crate::output::OutputTarget::Stdout,
1192 overwrite: false,
1193 };
1194
1195 let formatted = JSONWriter::write(result, &output_config)
1196 .map_err(|e| ReplError::Session(format!("Failed to format JSON output: {}", e)))?;
1197
1198 println!("{}", formatted);
1199
1200 Ok(())
1201 }
1202
1203 fn display_raw_result(&self, _result: &QueryResult) -> ReplResult<()> {
1205 println!("Raw output not yet implemented");
1206 Ok(())
1207 }
1208
1209 fn provide_query_hints(&self, _query: &str, error: &ReplError) {
1211 let error_msg = error.to_string();
1212
1213 println!();
1214 if error_msg.contains("table") && error_msg.contains("not found") {
1215 println!("{} Table not found. Try:", "💡 Hint:".cyan().bold());
1216 println!(" • {} to list tables", ":tables".green());
1217 println!(" • Check table name spelling");
1218 } else if error_msg.contains("syntax") {
1219 println!("{} Syntax error. Try:", "💡 Hint:".cyan().bold());
1220 println!(" • {} for CQL help", ":help cql".green());
1221 println!(" • Check query syntax");
1222 } else {
1223 println!(
1224 "{} For general help: {}",
1225 "💡 Hint:".cyan().bold(),
1226 ":help".green()
1227 );
1228 }
1229 }
1230
1231 fn show_general_help(&self) {
1233 println!("{}", "CQLite REPL Help".cyan().bold());
1234 println!("{}", "═".repeat(20).cyan());
1235 println!();
1236 println!("Commands:");
1237 println!(" :help [topic] Show help (topics: commands, config, cql, examples)");
1238 println!(" :quit, :exit Exit the REPL");
1239 println!(" :tables List all tables");
1240 println!(" :describe <obj> Describe object");
1241 println!(" :use <keyspace> Switch keyspace");
1242 println!(" :config [op] Show/set configuration");
1243 println!(" :status Show discovery and schema coverage status");
1244 println!(" :health Show health diagnostics");
1245 println!(" :clear Clear screen");
1246 println!(" :history Show command history");
1247 println!(" :source <file> Execute commands from file");
1248 println!();
1249 println!("CQL queries can be executed directly (end with semicolon for multi-line)");
1250 }
1251
1252 fn show_commands_help(&self) {
1254 println!("{}", "Available Commands".cyan().bold());
1255 println!("{}", "═".repeat(20).cyan());
1256 println!();
1257 println!("Meta Commands:");
1258 println!(" :help Show this help");
1259 println!(" :quit, :exit Exit REPL");
1260 println!(" :clear Clear screen");
1261 println!(" :history Show recent commands");
1262 println!();
1263 println!("Database Commands:");
1264 println!(" :tables List all tables");
1265 println!(" :describe <obj> Show object schema");
1266 println!(" :use <keyspace> Switch to keyspace");
1267 println!(" :status Show discovery and schema coverage status");
1268 println!(" :health Show health diagnostics");
1269 println!();
1270 println!("File Commands:");
1271 println!(" :source <file> Execute CQL file");
1272 println!();
1273 println!("Configuration:");
1274 println!(" :config Show merged effective configuration (read-only)");
1275 }
1276
1277 fn show_config_help(&self) {
1279 println!("{}", "Configuration Help".cyan().bold());
1280 println!("{}", "═".repeat(20).cyan());
1281 println!();
1282 println!("View merged effective configuration (read-only):");
1283 println!(" :config Display all configuration settings");
1284 println!();
1285 println!("The :config command shows:");
1286 println!(" • Data & Schema settings (data_directory, schema_paths, default_keyspace)");
1287 println!(" • Output settings (output_mode, query_limit, colors)");
1288 println!(" • REPL settings (page_size, show_timing, history, completion)");
1289 println!(
1290 " • Precedence chain (CLI > ENV > --config > .cqlite.toml > user config > defaults)"
1291 );
1292 println!();
1293 println!("Note: Configuration is read-only in M2. Use CLI flags, environment");
1294 println!(" variables, or config files to modify settings.");
1295 }
1296
1297 fn show_cql_help(&self) {
1299 println!("{}", "CQL Query Help".cyan().bold());
1300 println!("{}", "═".repeat(20).cyan());
1301 println!();
1302 println!("Supported CQL:");
1303 println!(" SELECT * FROM table;");
1304 println!(" SELECT col1, col2 FROM table WHERE condition;");
1305 println!(" DESCRIBE TABLE table_name;");
1306 println!();
1307 println!("Multi-line queries:");
1308 println!(" Start typing a query and press Enter");
1309 println!(" Continue on next lines");
1310 println!(" End with semicolon (;) to execute");
1311 }
1312
1313 fn show_examples_help(&self) {
1315 println!("{}", "Usage Examples".cyan().bold());
1316 println!("{}", "═".repeat(20).cyan());
1317 println!();
1318 println!("Basic workflow:");
1319 println!(" :tables");
1320 println!(" :describe users");
1321 println!(" SELECT * FROM users LIMIT 5;");
1322 println!();
1323 println!("Configuration:");
1324 println!(" :config output_format=json");
1325 println!(" :config show_timing=true");
1326 println!();
1327 println!("File execution:");
1328 println!(" :source /path/to/queries.sql");
1329 }
1330
1331 fn show_current_config(&self) {
1333 let cli_config = self.session.config();
1334
1335 println!("{}", "Effective Configuration".cyan().bold());
1336 println!("{}", "═".repeat(60).cyan());
1337 println!();
1338
1339 println!("{}", "Data & Schema:".yellow().bold());
1341 if let Some(ref data_dir) = cli_config.data_directory {
1342 println!(" data_directory = {}", data_dir.display());
1343 } else {
1344 println!(" data_directory = {}", "<not set>".dimmed());
1345 }
1346
1347 if !cli_config.schema_paths.is_empty() {
1348 let paths: Vec<String> = cli_config
1349 .schema_paths
1350 .iter()
1351 .map(|p| p.display().to_string())
1352 .collect();
1353 println!(" schema_paths = [{}]", paths.join(", "));
1354 } else {
1355 println!(" schema_paths = {}", "[]".dimmed());
1356 }
1357
1358 if let Some(ref keyspace) = cli_config.default_keyspace {
1359 println!(" default_keyspace = {}", keyspace);
1360 } else {
1361 println!(" default_keyspace = {}", "<not set>".dimmed());
1362 }
1363 println!();
1364
1365 println!("{}", "Output Settings:".yellow().bold());
1367 if let Some(ref mode) = cli_config.output_mode {
1368 println!(" output_mode = {}", mode);
1369 } else {
1370 println!(" output_mode = {}", "table".dimmed());
1371 }
1372
1373 if let Some(limit) = cli_config.query_limit {
1374 println!(" query_limit = {}", limit);
1375 } else {
1376 println!(" query_limit = {}", "<unlimited>".dimmed());
1377 }
1378
1379 println!(" no_color = {}", cli_config.no_color);
1380 println!(" colors = {}", cli_config.output.colors);
1381
1382 if let Some(max_rows) = cli_config.output.max_rows {
1383 println!(" max_rows = {}", max_rows);
1384 } else {
1385 println!(" max_rows = {}", "<unlimited>".dimmed());
1386 }
1387 println!();
1388
1389 println!("{}", "REPL Settings:".yellow().bold());
1391 println!(" mode = {:?}", self.config.mode);
1392 println!(" output_format = {:?}", self.config.output_format);
1393 println!(" page_size = {}", self.config.page_size);
1394 println!(" show_timing = {}", self.config.show_timing);
1395 println!(" enable_paging = {}", self.config.enable_paging);
1396 println!(" enable_colors = {}", self.config.enable_colors);
1397 println!(
1398 " history = {}",
1399 if self.history.is_some() {
1400 "enabled"
1401 } else {
1402 "disabled"
1403 }
1404 );
1405 println!(
1406 " completion = {}",
1407 if self.completion.is_some() {
1408 "enabled"
1409 } else {
1410 "disabled"
1411 }
1412 );
1413 println!();
1414
1415 println!("{}", "Precedence Chain:".yellow().bold());
1417 println!(" {}", "CLI flags > Environment variables > Explicit config (--config) > Project config (./.cqlite.toml) > User config > Defaults".dimmed());
1418 }
1419
1420 async fn set_config_value(&mut self, key: &str, value: &str) -> ReplResult<()> {
1422 match key {
1423 "data-dir" | "data_dir" => {
1424 let data_dir = std::path::PathBuf::from(value);
1425
1426 if !data_dir.exists() {
1428 println!(
1429 "{} Directory does not exist: {}",
1430 "Error:".red().bold(),
1431 value
1432 );
1433 return Ok(());
1434 }
1435
1436 if !data_dir.is_dir() {
1437 println!(
1438 "{} Path is not a directory: {}",
1439 "Error:".red().bold(),
1440 value
1441 );
1442 return Ok(());
1443 }
1444
1445 println!(
1446 "{} Changing data directory to: {}",
1447 "Info:".cyan(),
1448 data_dir.display().to_string().yellow()
1449 );
1450
1451 self.rebuild_database_from_discovery(
1453 data_dir,
1454 self.schema_paths.clone(),
1455 self.version_hint.clone(),
1456 )
1457 .await?;
1458 }
1459 "output_format" => {
1460 self.config.output_format = match value.to_lowercase().as_str() {
1461 "table" => OutputFormat::Table,
1462 "csv" => OutputFormat::Csv,
1463 "json" => OutputFormat::Json,
1464 "raw" => OutputFormat::Raw,
1465 _ => {
1466 println!(
1467 "{} Invalid output format. Use: table, csv, json, raw",
1468 "Error:".red().bold()
1469 );
1470 return Ok(());
1471 }
1472 };
1473 println!(
1474 "{} Output format set to: {:?}",
1475 "✅".green(),
1476 self.config.output_format
1477 );
1478 }
1479 "page_size" => match value.parse::<usize>() {
1480 Ok(size) if size > 0 => {
1481 self.config.page_size = size;
1482 println!("{} Page size set to: {}", "✅".green(), size);
1483 }
1484 _ => {
1485 println!(
1486 "{} Invalid page size. Must be positive number",
1487 "Error:".red().bold()
1488 );
1489 }
1490 },
1491 "show_timing" => match value.to_lowercase().as_str() {
1492 "true" | "on" | "1" | "yes" => {
1493 self.config.show_timing = true;
1494 println!("{} Timing enabled", "✅".green());
1495 }
1496 "false" | "off" | "0" | "no" => {
1497 self.config.show_timing = false;
1498 println!("{} Timing disabled", "✅".green());
1499 }
1500 _ => {
1501 println!(
1502 "{} Invalid boolean value. Use: true/false",
1503 "Error:".red().bold()
1504 );
1505 }
1506 },
1507 "enable_paging" => match value.to_lowercase().as_str() {
1508 "true" | "on" | "1" | "yes" => {
1509 self.config.enable_paging = true;
1510 println!("{} Paging enabled", "✅".green());
1511 }
1512 "false" | "off" | "0" | "no" => {
1513 self.config.enable_paging = false;
1514 println!("{} Paging disabled", "✅".green());
1515 }
1516 _ => {
1517 println!(
1518 "{} Invalid boolean value. Use: true/false",
1519 "Error:".red().bold()
1520 );
1521 }
1522 },
1523 _ => {
1524 println!(
1525 "{} Unknown configuration key: {}",
1526 "Error:".red().bold(),
1527 key
1528 );
1529 println!("Available keys: data-dir, output_format, page_size, show_timing, enable_paging");
1530 }
1531 }
1532
1533 Ok(())
1534 }
1535
1536 pub fn session(&self) -> &ReplSession {
1538 &self.session
1539 }
1540
1541 pub fn session_mut(&mut self) -> &mut ReplSession {
1543 &mut self.session
1544 }
1545
1546 pub fn config(&self) -> &ReplConfig {
1548 &self.config
1549 }
1550
1551 async fn execute_schema_command(
1553 &mut self,
1554 operation: super::SchemaOperation,
1555 ) -> ReplResult<()> {
1556 use super::SchemaOperation;
1557
1558 match operation {
1559 SchemaOperation::Load { paths } => {
1560 println!(
1561 "{} Loading schemas from {} file(s)...",
1562 "Info:".cyan(),
1563 paths.len()
1564 );
1565
1566 let schema_paths: Vec<std::path::PathBuf> =
1568 paths.iter().map(|p| std::path::PathBuf::from(p)).collect();
1569
1570 for path in &schema_paths {
1572 if !path.exists() {
1573 return Err(ReplError::SchemaError(format!(
1574 "Schema file not found: {}",
1575 path.display()
1576 )));
1577 }
1578 }
1579
1580 let data_dir = match self.session.data_dir() {
1582 Some(dir) => dir.to_path_buf(),
1583 None => {
1584 return Err(ReplError::DataDirectoryError(
1585 "No data directory configured. Use :config data-dir=<path> first"
1586 .to_string(),
1587 ));
1588 }
1589 };
1590
1591 self.schema_paths = schema_paths.clone();
1593
1594 self.rebuild_database_from_discovery(
1596 data_dir,
1597 schema_paths,
1598 self.version_hint.clone(),
1599 )
1600 .await?;
1601 }
1602 SchemaOperation::Refresh => {
1603 println!("{} Refreshing schemas...", "Info:".cyan());
1604
1605 if self.schema_paths.is_empty() {
1606 println!(
1607 "{} No schemas loaded. Use :schema load <path> first",
1608 "Warning:".yellow()
1609 );
1610 return Ok(());
1611 }
1612
1613 let data_dir = match self.session.data_dir() {
1614 Some(dir) => dir.to_path_buf(),
1615 None => {
1616 return Err(ReplError::DataDirectoryError(
1617 "No data directory configured".to_string(),
1618 ));
1619 }
1620 };
1621
1622 self.rebuild_database_from_discovery(
1624 data_dir,
1625 self.schema_paths.clone(),
1626 self.version_hint.clone(),
1627 )
1628 .await?;
1629 }
1630 SchemaOperation::Unload => {
1631 println!("{} Unloading schemas...", "Info:".cyan());
1632
1633 let data_dir = match self.session.data_dir() {
1634 Some(dir) => dir.to_path_buf(),
1635 None => {
1636 return Err(ReplError::DataDirectoryError(
1637 "No data directory configured".to_string(),
1638 ));
1639 }
1640 };
1641
1642 self.schema_paths.clear();
1644
1645 self.rebuild_database_from_discovery(
1647 data_dir,
1648 Vec::new(),
1649 self.version_hint.clone(),
1650 )
1651 .await?;
1652 }
1653 SchemaOperation::Show => {
1654 println!("{}", "Schema Status".cyan().bold());
1655 println!("{}", "═".repeat(25).cyan());
1656 println!();
1657
1658 if self.schema_paths.is_empty() {
1659 println!("No schemas loaded");
1660 } else {
1661 println!("Loaded schemas ({}):", self.schema_paths.len());
1662 for (i, path) in self.schema_paths.iter().enumerate() {
1663 println!(" {}. {}", i + 1, path.display().to_string().green());
1664 }
1665 }
1666
1667 if let Some(ref data_dir) = self.session.data_dir() {
1668 println!();
1669 println!(
1670 "Data directory: {}",
1671 data_dir.display().to_string().yellow()
1672 );
1673 }
1674
1675 if let Some(ref version) = self.version_hint {
1676 println!("Version hint: {}", version.yellow());
1677 }
1678 }
1679 SchemaOperation::List => {
1680 commands::execute_schema_list(&self.schema_paths).await?;
1681 }
1682 }
1683
1684 Ok(())
1685 }
1686
1687 pub async fn rebuild_database_from_discovery(
1708 &mut self,
1709 data_dir: std::path::PathBuf,
1710 schema_paths: Vec<std::path::PathBuf>,
1711 version_hint: Option<String>,
1712 ) -> ReplResult<()> {
1713 use cqlite_core::ingestion::{ingest, IngestionConfig};
1714
1715 println!("{}", "Rebuilding database from discovery...".cyan().bold());
1716
1717 let ingestion_config = IngestionConfig {
1720 schema_paths: schema_paths.clone(),
1721 data_dir: data_dir.clone(),
1722 version_hint: version_hint.clone(),
1723 core_config: cqlite_core::Config::default(),
1724 table_directory_filter: None, };
1726
1727 let start_time = std::time::Instant::now();
1729 let ingestion_result = ingest(ingestion_config)
1730 .await
1731 .map_err(|e| ReplError::Database(e.into()))?;
1732 let elapsed = start_time.elapsed();
1733
1734 println!(
1736 "{} {} schemas loaded, {} UDTs loaded",
1737 "Schema:".green(),
1738 ingestion_result.schema_load_result.schemas_loaded,
1739 ingestion_result.schema_load_result.udts_loaded
1740 );
1741
1742 if !ingestion_result.schema_load_result.warnings.is_empty() {
1743 println!(
1744 "{} {} warning(s)",
1745 "Warnings:".yellow(),
1746 ingestion_result.schema_load_result.warnings.len()
1747 );
1748 for warning in &ingestion_result.schema_load_result.warnings {
1749 println!(" - {}", warning.message.yellow());
1750 }
1751 }
1752
1753 println!(
1754 "{} {} SSTables discovered across {} keyspaces",
1755 "Discovery:".green(),
1756 ingestion_result.discovery_summary.sstables_found,
1757 ingestion_result.discovery_summary.keyspaces.len()
1758 );
1759
1760 if let Some(ref version) = ingestion_result.discovery_summary.resolved_version {
1761 println!("{} Cassandra {}", "Version:".green(), version.yellow());
1762 }
1763
1764 self.session.replace_database(ingestion_result.database)?;
1767
1768 self.session.set_data_dir(Some(data_dir.clone()));
1770
1771 self.session
1773 .set_schema_registry(Some(ingestion_result.schema_registry));
1774
1775 println!(
1776 "{} Database rebuilt in {:.2}ms",
1777 "Success:".green().bold(),
1778 elapsed.as_millis()
1779 );
1780
1781 Ok(())
1782 }
1783}