1#![allow(dead_code)]
2#![allow(deprecated)]
5
6use crate::cli::OutputFormat;
7#[cfg(feature = "state_machine")]
8use crate::cli::{ExportFormat, ImportFormat};
9#[cfg(feature = "state_machine")]
10use indicatif::{ProgressBar, ProgressStyle};
11#[derive(Debug, Clone)]
16pub struct ParsedRow {
17 pub data: std::collections::HashMap<String, String>,
18}
19
20impl ParsedRow {
21 pub fn get(&self, key: &str) -> Option<&String> {
22 self.data.get(key)
23 }
24
25 pub fn to_json(&self) -> serde_json::Value {
26 serde_json::Value::Object(
27 self.data
28 .iter()
29 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
30 .collect(),
31 )
32 }
33}
34
35#[derive(Debug, Clone)]
36pub struct RealDataParser {
37 pub schema: cqlite_core::schema::TableSchema,
38}
39
40impl RealDataParser {
41 pub fn new(schema: cqlite_core::schema::TableSchema) -> Self {
42 Self { schema }
43 }
44
45 pub fn parse_entry(
46 &self,
47 _key: &cqlite_core::RowKey,
48 _value: &cqlite_core::Value,
49 ) -> Result<ParsedRow> {
50 Ok(ParsedRow {
51 data: std::collections::HashMap::new(),
52 })
53 }
54
55 pub fn get_column_names(&self) -> Vec<String> {
56 self.schema.columns.iter().map(|c| c.name.clone()).collect()
57 }
58}
59
60fn format_export_duration(duration: std::time::Duration) -> String {
62 let secs = duration.as_secs();
63 if secs == 0 {
64 let millis = duration.as_millis();
65 if millis > 0 {
66 format!("{}ms", millis)
67 } else {
68 "<1ms".to_string()
69 }
70 } else if secs < 60 {
71 format!("{}s", secs)
72 } else if secs < 3600 {
73 format!("{}m {}s", secs / 60, secs % 60)
74 } else {
75 format!("{}h {}m {}s", secs / 3600, (secs % 3600) / 60, secs % 60)
76 }
77}
78
79#[derive(Debug)]
81pub struct QueryExecutor;
82
83impl QueryExecutor {
84 pub fn new(_config: QueryExecutorConfig) -> Self {
85 Self
86 }
87
88 pub async fn execute_select(&self, _query: &str) -> Result<QueryResult> {
89 Ok(QueryResult {
90 rows: Vec::new(),
91 execution_time_ms: 0.0,
92 })
93 }
94}
95
96#[derive(Debug, Default)]
97pub struct QueryExecutorConfig;
98
99#[derive(Debug, Clone)]
101pub struct QueryResult {
102 pub rows: Vec<ParsedRow>,
103 pub execution_time_ms: f64,
104}
105
106impl QueryResult {
107 pub fn display_table(&self) {
108 if self.rows.is_empty() {
109 println!("No rows returned");
110 return;
111 }
112
113 let mut table = prettytable::Table::new();
115
116 if let Some(first_row) = self.rows.first() {
118 let headers: Vec<_> = first_row.data.keys().cloned().collect();
119 table.set_titles(prettytable::Row::new(
120 headers.iter().map(|h| prettytable::Cell::new(h)).collect(),
121 ));
122
123 for row in &self.rows {
125 let cells: Vec<_> = headers
126 .iter()
127 .map(|h| prettytable::Cell::new(row.data.get(h).unwrap_or(&String::new())))
128 .collect();
129 table.add_row(prettytable::Row::new(cells));
130 }
131 }
132
133 table.printstd();
134 }
135
136 pub fn display_json(&self) -> Result<()> {
137 let json_rows: Vec<_> = self.rows.iter().map(|r| r.to_json()).collect();
138 println!("{}", serde_json::to_string_pretty(&json_rows)?);
139 Ok(())
140 }
141
142 pub fn display_csv(&self) -> Result<()> {
143 if self.rows.is_empty() {
144 return Ok(());
145 }
146
147 let headers: Vec<_> = self.rows[0].data.keys().cloned().collect();
148
149 println!("{}", headers.join(","));
151
152 for row in &self.rows {
154 let values: Vec<_> = headers
155 .iter()
156 .map(|h| row.data.get(h).unwrap_or(&String::new()).clone())
157 .collect();
158 println!("{}", values.join(","));
159 }
160
161 Ok(())
162 }
163}
164use anyhow::{Context, Result};
168#[cfg(feature = "state_machine")]
169use cqlite_core::Database;
170use cqlite_core::{
171 schema::{parse_cql_schema, ClusteringColumn, ClusteringOrder, Column, KeyColumn, TableSchema},
172 storage::sstable::{bulletproof_reader::BulletproofReader, reader::SSTableReader},
173};
174use std::collections::HashMap;
175#[cfg(feature = "state_machine")]
176use std::fs::File;
177#[cfg(feature = "state_machine")]
178use std::io::{BufWriter, Write};
179use std::path::{Path, PathBuf};
180use std::sync::Arc;
181
182pub mod admin;
183pub mod bench;
184pub mod schema;
185pub mod write;
186
187pub mod docker;
188pub mod info;
189pub mod read_sstable;
190
191#[cfg(feature = "state_machine")]
192pub async fn execute_query(
193 database: &Database,
194 query: &str,
195 explain: bool,
196 timing: bool,
197 format: OutputFormat,
198 config: &crate::config::OutputConfig,
199) -> Result<()> {
200 use crate::output::{write_to_target, OutputTarget};
201 use std::time::Instant;
202
203 let start_time = Instant::now();
204
205 if explain {
207 let explain_result = database
208 .explain(query)
209 .await
210 .with_context(|| "Failed to explain query")?;
211
212 println!("Query Explanation");
213 println!("================");
214 println!("Query Type: {}", explain_result.query_type);
215 println!("Plan Type: {}", explain_result.plan_type);
216 println!("Estimated Cost: {:.2}", explain_result.estimated_cost);
217 println!("Estimated Rows: {}", explain_result.estimated_rows);
218
219 if !explain_result.selected_indexes.is_empty() {
220 println!("\nSelected Indexes:");
221 for index in &explain_result.selected_indexes {
222 println!(" - {index}");
223 }
224 }
225
226 if !explain_result.execution_steps.is_empty() {
227 println!("\nExecution Steps:");
228 for (i, step) in explain_result.execution_steps.iter().enumerate() {
229 println!(" {}. {}", i + 1, step);
230 }
231 }
232
233 if !explain_result.parallelization_info.is_empty() {
234 println!("\nParallelization:");
235 for info in &explain_result.parallelization_info {
236 println!(" - {info}");
237 }
238 }
239
240 if timing {
241 let elapsed = start_time.elapsed();
242 println!("\nTiming: {:.2}ms", elapsed.as_millis());
243 }
244
245 return Ok(());
246 }
247
248 let result = database
250 .execute(query)
251 .await
252 .with_context(|| "Failed to execute query")?;
253
254 let output_bytes: Vec<u8> = match format {
256 OutputFormat::Table => {
257 use crate::output::table::TableWriter;
258 let table_output = TableWriter::write(&result, config)
259 .map_err(|e| anyhow::anyhow!("Failed to format table output: {}", e))?;
260 table_output.into_bytes()
261 }
262 OutputFormat::Json => {
263 use crate::output::json::JSONWriter;
264 let json_output = JSONWriter::write(&result, config)
265 .map_err(|e| anyhow::anyhow!("Failed to format JSON output: {}", e))?;
266 json_output.into_bytes()
267 }
268 OutputFormat::Csv => {
269 use crate::output::CSVWriter;
270 let csv_output = CSVWriter::write(&result, config)
271 .map_err(|e| anyhow::anyhow!("Failed to format CSV output: {}", e))?;
272 csv_output.into_bytes()
273 }
274 OutputFormat::Parquet => {
275 use crate::output::ParquetWriter;
276 ParquetWriter::write(&result, config)
277 .map_err(|e| anyhow::anyhow!("Failed to format Parquet output: {}", e))?
278 }
279 };
280
281 write_to_target(&output_bytes, &config.target, config.overwrite)
283 .map_err(|e| anyhow::anyhow!("{}", e))?;
284
285 if matches!(config.target, OutputTarget::Stdout) && !matches!(format, OutputFormat::Parquet) {
287 if !matches!(format, OutputFormat::Csv) {
290 println!();
291 }
292 }
293
294 if let OutputTarget::File(path) = &config.target {
296 eprintln!("Output written to: {}", path.display());
297 }
298
299 if timing {
301 let elapsed = start_time.elapsed();
302 eprintln!("\nQuery executed in {:.2}ms", elapsed.as_millis());
303
304 let performance = result.performance();
305 if performance.total_time_us > 0 {
306 eprintln!(
307 "Parse time: {:.2}ms",
308 performance.parse_time_us as f64 / 1000.0
309 );
310 eprintln!(
311 "Planning time: {:.2}ms",
312 performance.planning_time_us as f64 / 1000.0
313 );
314 eprintln!(
315 "Execution time: {:.2}ms",
316 performance.execution_time_us as f64 / 1000.0
317 );
318 eprintln!("Memory usage: {} bytes", performance.memory_usage_bytes);
319 eprintln!("I/O operations: {}", performance.io_operations);
320 if performance.cache_hits + performance.cache_misses > 0 {
321 eprintln!(
322 "Cache hit ratio: {:.1}%",
323 performance.cache_hit_ratio() * 100.0
324 );
325 }
326 }
327 }
328
329 let warnings = result.warnings();
331 if !warnings.is_empty() {
332 eprintln!("\nWarnings:");
333 for warning in warnings {
334 eprintln!(" ā ļø {warning}");
335 }
336 }
337
338 Ok(())
339}
340
341#[cfg(not(feature = "state_machine"))]
342pub async fn execute_query(
343 _database: &cqlite_core::Database,
344 _query: &str,
345 _explain: bool,
346 _timing: bool,
347 _format: OutputFormat,
348 _config: &crate::config::OutputConfig,
349) -> Result<()> {
350 Err(anyhow::anyhow!(
351 "Query execution is not available in M1.\n\
352 Build with --features state_machine to enable this feature.\n\
353 See CLAUDE.md for M1 API examples."
354 ))
355}
356
357#[cfg(feature = "state_machine")]
359fn print_csv_format(
360 result: &cqlite_core::query::result::QueryResult,
361 config: &crate::config::OutputConfig,
362) -> Result<()> {
363 use crate::output::CSVWriter;
364
365 let csv_output = CSVWriter::write(result, config)
367 .map_err(|e| anyhow::anyhow!("Failed to format CSV output: {}", e))?;
368
369 print!("{}", csv_output);
370 Ok(())
371}
372
373#[cfg(feature = "state_machine")]
374pub async fn import_data(
375 database: &Database,
376 file: &Path,
377 format: ImportFormat,
378 table: Option<&str>,
379) -> Result<()> {
380 println!("Importing data from: {}", file.display());
381 println!("Format: {format}, Target table: {table:?}");
382
383 if !file.exists() {
385 return Err(anyhow::anyhow!("Import file not found: {}", file.display()));
386 }
387
388 let target_table = match table {
390 Some(t) => t.to_string(),
391 None => {
392 file.file_stem()
394 .and_then(|stem| stem.to_str())
395 .map(|s| s.to_string())
396 .ok_or_else(|| {
397 anyhow::anyhow!(
398 "Could not determine target table name. Please specify --table option."
399 )
400 })?
401 }
402 };
403
404 let table_check_query =
406 format!("SELECT table_name FROM system.tables WHERE table_name = '{target_table}'");
407 match database.execute(&table_check_query).await {
408 Ok(result) if result.rows.is_empty() => {
409 println!(
410 "ā ļø Warning: Table '{}' not found in system catalog. Assuming it exists or will be created during import.",
411 target_table
412 );
413 }
414 Ok(_) => {
415 println!("ā Target table '{target_table}' found");
416 }
417 Err(_) => {
418 println!(
419 "ā ļø Warning: Could not verify table existence (system tables may not be implemented). Proceeding with import..."
420 );
421 }
422 }
423
424 let table_columns = get_table_columns(database, &target_table).await
426 .unwrap_or_else(|_| {
427 println!("ā ļø Warning: Could not retrieve table schema. Import may fail if column types don't match.");
428 Vec::new()
429 });
430
431 let mut _imported_rows = 0;
432 let error_count = 0;
433
434 match format {
435 ImportFormat::Csv => {
436 _imported_rows = import_csv_data(database, file, &target_table, &table_columns).await?;
437 }
438 ImportFormat::Json => {
439 _imported_rows =
440 import_json_data(database, file, &target_table, &table_columns).await?;
441 }
442 ImportFormat::Parquet => {
443 return Err(anyhow::anyhow!(
444 "Parquet import not yet implemented. Please convert to CSV or JSON format first."
445 ));
446 }
447 }
448
449 println!("\nš Import Summary:");
450 println!(" Rows imported: {_imported_rows}");
451 if error_count > 0 {
452 println!(" Errors: {error_count}");
453 }
454 println!(" ā
Import completed successfully!");
455
456 Ok(())
457}
458
459#[cfg(not(feature = "state_machine"))]
460pub async fn import_data(
461 _database: &cqlite_core::Database,
462 _file: &Path,
463 _format: crate::cli::ImportFormat,
464 _table: Option<&str>,
465) -> Result<()> {
466 Err(anyhow::anyhow!(
467 "Data import is not available in M1.\n\
468 Build with --features state_machine to enable this feature.\n\
469 See CLAUDE.md for M1 API examples."
470 ))
471}
472
473#[cfg(feature = "state_machine")]
475async fn import_csv_data(
476 database: &Database,
477 file: &Path,
478 table: &str,
479 table_columns: &[String],
480) -> Result<u64> {
481 use csv::ReaderBuilder;
482 use indicatif::{ProgressBar, ProgressStyle};
483
484 let file_handle =
485 File::open(file).with_context(|| format!("Failed to open CSV file: {}", file.display()))?;
486
487 let mut csv_reader = ReaderBuilder::new()
488 .has_headers(true)
489 .from_reader(file_handle);
490
491 let headers = csv_reader
493 .headers()
494 .with_context(|| "Failed to read CSV headers")?;
495 let csv_columns: Vec<String> = headers.iter().map(|h| h.to_string()).collect();
496
497 println!("š CSV columns: {}", csv_columns.join(", "));
498 if !table_columns.is_empty() {
499 println!("š Table columns: {}", table_columns.join(", "));
500 }
501
502 let total_rows = csv_reader.records().count() as u64;
504
505 let file_handle = File::open(file)?;
507 let mut csv_reader = ReaderBuilder::new()
508 .has_headers(true)
509 .from_reader(file_handle);
510
511 let pb = ProgressBar::new(total_rows);
512 pb.set_style(
513 ProgressStyle::default_bar()
514 .template(
515 "Importing CSV [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} rows ({eta})",
516 )
517 .unwrap()
518 .progress_chars("=>-"),
519 );
520
521 let mut imported_count = 0;
522 let mut batch_statements = Vec::new();
523 let batch_size = 100; for (row_num, record_result) in csv_reader.records().enumerate() {
526 pb.set_position(row_num as u64 + 1);
527
528 let record = record_result
529 .with_context(|| format!("Failed to parse CSV record at line {}", row_num + 2))?;
530
531 let values: Vec<String> = record
533 .iter()
534 .map(|field| {
535 if field.is_empty() {
536 "NULL".to_string()
537 } else {
538 format!("'{}'", field.replace("'", "''")) }
540 })
541 .collect();
542
543 let insert_stmt = format!(
544 "INSERT INTO {} ({}) VALUES ({})",
545 table,
546 csv_columns.join(", "),
547 values.join(", ")
548 );
549
550 batch_statements.push(insert_stmt);
551
552 if batch_statements.len() >= batch_size {
554 execute_batch_statements(database, &mut batch_statements, &mut imported_count).await?;
555 }
556 }
557
558 if !batch_statements.is_empty() {
560 execute_batch_statements(database, &mut batch_statements, &mut imported_count).await?;
561 }
562
563 pb.finish_with_message(format!("Imported {imported_count} rows from CSV"));
564 Ok(imported_count)
565}
566
567#[cfg(feature = "state_machine")]
569async fn import_json_data(
570 database: &Database,
571 file: &Path,
572 table: &str,
573 _table_columns: &[String],
574) -> Result<u64> {
575 use indicatif::{ProgressBar, ProgressStyle};
576 use std::fs;
577
578 let file_content = fs::read_to_string(file)
579 .with_context(|| format!("Failed to read JSON file: {}", file.display()))?;
580
581 let json_data: serde_json::Value =
583 serde_json::from_str(&file_content).with_context(|| "Failed to parse JSON file")?;
584
585 let objects = match json_data {
586 serde_json::Value::Array(arr) => arr,
587 serde_json::Value::Object(_) => vec![json_data],
588 _ => {
589 return Err(anyhow::anyhow!(
590 "JSON file must contain an object or array of objects"
591 ));
592 }
593 };
594
595 println!("š Found {} JSON objects to import", objects.len());
596
597 let pb = ProgressBar::new(objects.len() as u64);
598 pb.set_style(
599 ProgressStyle::default_bar()
600 .template("Importing JSON [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} objects ({eta})")
601 .unwrap()
602 .progress_chars("=>-"),
603 );
604
605 let mut imported_count = 0;
606 let mut batch_statements = Vec::new();
607 let batch_size = 50;
608
609 for (index, obj) in objects.iter().enumerate() {
610 pb.set_position(index as u64 + 1);
611
612 if let serde_json::Value::Object(map) = obj {
613 let columns: Vec<String> = map.keys().cloned().collect();
614 let values: Vec<String> = map
615 .values()
616 .map(|v| match v {
617 serde_json::Value::Null => "NULL".to_string(),
618 serde_json::Value::String(s) => format!("'{}'", s.replace("'", "''")),
619 serde_json::Value::Number(n) => n.to_string(),
620 serde_json::Value::Bool(b) => b.to_string(),
621 _ => format!("'{}'", v.to_string().replace("'", "''")),
622 })
623 .collect();
624
625 let insert_stmt = format!(
626 "INSERT INTO {} ({}) VALUES ({})",
627 table,
628 columns.join(", "),
629 values.join(", ")
630 );
631
632 batch_statements.push(insert_stmt);
633
634 if batch_statements.len() >= batch_size {
635 execute_batch_statements(database, &mut batch_statements, &mut imported_count)
636 .await?;
637 }
638 } else {
639 println!("ā ļø Skipping non-object JSON element at index {index}");
640 }
641 }
642
643 if !batch_statements.is_empty() {
645 execute_batch_statements(database, &mut batch_statements, &mut imported_count).await?;
646 }
647
648 pb.finish_with_message(format!("Imported {imported_count} objects from JSON"));
649 Ok(imported_count)
650}
651
652#[cfg(feature = "state_machine")]
654async fn execute_batch_statements(
655 database: &Database,
656 statements: &mut Vec<String>,
657 imported_count: &mut u64,
658) -> Result<()> {
659 for statement in statements.drain(..) {
660 match database.execute(&statement).await {
661 Ok(_) => {
662 *imported_count += 1;
663 }
664 Err(e) => {
665 println!("ā ļø Error executing statement: {e}");
666 println!(
667 " Statement: {}",
668 statement.chars().take(100).collect::<String>() + "..."
669 );
670 }
672 }
673 }
674 Ok(())
675}
676
677#[cfg(feature = "state_machine")]
679async fn get_table_columns(database: &Database, table: &str) -> Result<Vec<String>> {
680 let query = format!("SELECT column_name FROM system.columns WHERE table_name = '{table}'");
681 match database.execute(&query).await {
682 Ok(result) => {
683 let columns = result
684 .rows
685 .iter()
686 .filter_map(|row| row.get("column_name"))
687 .map(|col| col.to_string())
688 .collect();
689 Ok(columns)
690 }
691 Err(e) => Err(anyhow::anyhow!("Failed to get table columns: {}", e)),
692 }
693}
694
695#[cfg(feature = "state_machine")]
700pub async fn export_data(
701 database: &Database,
702 source: &str,
703 file: &Path,
704 format: ExportFormat,
705 query_filter: Option<&str>,
706 limit: Option<usize>,
707 quiet: bool,
708) -> Result<()> {
709 use cqlite_core::query::result::StreamingConfig;
710 use std::io::IsTerminal;
711 use std::time::Instant;
712
713 use crate::output::{
714 create_streaming_parquet_writer, StreamingCSVWriter, StreamingJSONWriter, StreamingWriter,
715 };
716 use crate::status_metrics::format_bytes;
717
718 let show_progress = !quiet && std::io::stdout().is_terminal();
720
721 if show_progress {
722 println!("Exporting data from: {source}");
723 println!("Output file: {}, Format: {}", file.display(), format);
724 }
725
726 if let Some(parent) = file.parent() {
728 std::fs::create_dir_all(parent)
729 .with_context(|| format!("Failed to create output directory: {}", parent.display()))?;
730 }
731
732 let query = if source.to_uppercase().trim().starts_with("SELECT") {
734 match limit {
737 Some(n) => {
738 let upper = source.to_uppercase();
739 if upper.contains(" LIMIT ") {
740 source.to_string()
742 } else {
743 format!("{} LIMIT {}", source.trim_end_matches(';'), n)
744 }
745 }
746 None => source.to_string(),
747 }
748 } else {
749 let mut q = format!("SELECT * FROM {}", source);
751 if let Some(filter) = query_filter {
752 q.push_str(&format!(" WHERE {}", filter));
753 }
754 if let Some(n) = limit {
755 q.push_str(&format!(" LIMIT {}", n));
756 }
757 q
758 };
759
760 if show_progress {
761 println!(
762 "Executing query: {}",
763 query.chars().take(100).collect::<String>() + "..."
764 );
765 }
766
767 let config = match format {
769 ExportFormat::Parquet => StreamingConfig::for_parquet(),
770 _ => StreamingConfig::for_text_formats(),
771 };
772
773 let mut result_iter = database
775 .execute_streaming(&query, config.clone())
776 .await
777 .with_context(|| format!("Failed to execute streaming export query: {query}"))?;
778
779 let column_names: Vec<String> = result_iter
781 .metadata
782 .columns
783 .iter()
784 .map(|c| c.name.clone())
785 .collect();
786
787 if column_names.is_empty() {
788 return Err(anyhow::anyhow!(
789 "Could not determine column names for export"
790 ));
791 }
792
793 if show_progress {
794 println!("Columns: {}", column_names.join(", "));
795 println!("Streaming export in progress...");
796 }
797
798 let start_time = Instant::now();
800
801 let pb = if show_progress {
803 let pb = ProgressBar::new_spinner();
804 pb.set_style(
805 ProgressStyle::default_spinner()
806 .template("{spinner:.green} {msg} ({pos} rows)")
807 .unwrap(),
808 );
809 pb.set_message("Exporting");
810 pb
811 } else {
812 ProgressBar::hidden()
813 };
814
815 let chunk_size = config.chunk_size;
817 let mut rows_exported: u64 = 0;
818 let mut rows_remaining: Option<usize> = limit;
820
821 match format {
823 ExportFormat::Csv => {
824 let output_file = File::create(file)
825 .with_context(|| format!("Failed to create CSV file: {}", file.display()))?;
826 let buf_writer = BufWriter::new(output_file);
827 let mut writer = StreamingCSVWriter::new(buf_writer);
828
829 writer
830 .write_header(&result_iter.metadata)
831 .map_err(|e| anyhow::anyhow!("Failed to write CSV header: {}", e))?;
832
833 loop {
835 if rows_remaining == Some(0) {
837 break;
838 }
839
840 let chunk = result_iter
841 .collect_chunk(chunk_size)
842 .await
843 .map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
844
845 if chunk.is_empty() {
846 break;
847 }
848
849 let chunk_to_write = if let Some(remaining) = rows_remaining {
851 if chunk.len() > remaining {
852 chunk.into_iter().take(remaining).collect::<Vec<_>>()
853 } else {
854 chunk
855 }
856 } else {
857 chunk
858 };
859
860 let written = chunk_to_write.len();
861 writer
862 .write_chunk(&chunk_to_write)
863 .map_err(|e| anyhow::anyhow!("Failed to write CSV chunk: {}", e))?;
864
865 rows_exported += written as u64;
866 pb.set_position(rows_exported);
867
868 if let Some(ref mut remaining) = rows_remaining {
870 *remaining = remaining.saturating_sub(written);
871 }
872 }
873
874 writer
875 .finalize()
876 .map_err(|e| anyhow::anyhow!("Failed to finalize CSV: {}", e))?;
877 }
878 ExportFormat::Json => {
879 let output_file = File::create(file)
880 .with_context(|| format!("Failed to create JSON file: {}", file.display()))?;
881 let buf_writer = BufWriter::new(output_file);
882 let mut writer = StreamingJSONWriter::new(buf_writer);
883
884 writer
885 .write_header(&result_iter.metadata)
886 .map_err(|e| anyhow::anyhow!("Failed to write JSON header: {}", e))?;
887
888 loop {
890 if rows_remaining == Some(0) {
892 break;
893 }
894
895 let chunk = result_iter
896 .collect_chunk(chunk_size)
897 .await
898 .map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
899
900 if chunk.is_empty() {
901 break;
902 }
903
904 let chunk_to_write = if let Some(remaining) = rows_remaining {
906 if chunk.len() > remaining {
907 chunk.into_iter().take(remaining).collect::<Vec<_>>()
908 } else {
909 chunk
910 }
911 } else {
912 chunk
913 };
914
915 let written = chunk_to_write.len();
916 writer
917 .write_chunk(&chunk_to_write)
918 .map_err(|e| anyhow::anyhow!("Failed to write JSON chunk: {}", e))?;
919
920 rows_exported += written as u64;
921 pb.set_position(rows_exported);
922
923 if let Some(ref mut remaining) = rows_remaining {
925 *remaining = remaining.saturating_sub(written);
926 }
927 }
928
929 writer
930 .finalize()
931 .map_err(|e| anyhow::anyhow!("Failed to finalize JSON: {}", e))?;
932 }
933 ExportFormat::Cql => {
934 let output_file = File::create(file)
937 .with_context(|| format!("Failed to create CQL file: {}", file.display()))?;
938 let mut buf_writer = BufWriter::new(output_file);
939
940 let table_name = if source.to_uppercase().contains("FROM") {
942 source
943 .split_whitespace()
944 .skip_while(|&word| word.to_uppercase() != "FROM")
945 .nth(1)
946 .unwrap_or("exported_table")
947 } else {
948 source
949 };
950
951 writeln!(buf_writer, "-- CQL Export from CQLite (streaming)")?;
953 writeln!(buf_writer, "-- Source: {source}")?;
954 writeln!(
955 buf_writer,
956 "-- Generated: {}",
957 chrono::Utc::now().to_rfc3339()
958 )?;
959 writeln!(buf_writer)?;
960
961 loop {
963 if rows_remaining == Some(0) {
965 break;
966 }
967
968 let chunk = result_iter
969 .collect_chunk(chunk_size)
970 .await
971 .map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
972
973 if chunk.is_empty() {
974 break;
975 }
976
977 let chunk_to_write: Vec<_> = if let Some(remaining) = rows_remaining {
979 if chunk.len() > remaining {
980 chunk.into_iter().take(remaining).collect()
981 } else {
982 chunk
983 }
984 } else {
985 chunk
986 };
987
988 for row in &chunk_to_write {
989 let values: Vec<String> = column_names
990 .iter()
991 .map(|col| {
992 row.values
993 .get(col)
994 .map(|v| match v {
995 cqlite_core::Value::Text(s) => {
996 format!("'{}'", s.replace("'", "''"))
997 }
998 cqlite_core::Value::Null => "NULL".to_string(),
999 _ => v.to_string(),
1000 })
1001 .unwrap_or_else(|| "NULL".to_string())
1002 })
1003 .collect();
1004
1005 writeln!(
1006 buf_writer,
1007 "INSERT INTO {} ({}) VALUES ({});",
1008 table_name,
1009 column_names.join(", "),
1010 values.join(", ")
1011 )?;
1012 }
1013
1014 let written = chunk_to_write.len();
1015 rows_exported += written as u64;
1016 pb.set_position(rows_exported);
1017
1018 if let Some(ref mut remaining) = rows_remaining {
1020 *remaining = remaining.saturating_sub(written);
1021 }
1022 }
1023
1024 buf_writer.flush()?;
1025 }
1026 ExportFormat::Parquet => {
1027 let output_file = File::create(file)
1028 .with_context(|| format!("Failed to create Parquet file: {}", file.display()))?;
1029
1030 let mut writer =
1031 create_streaming_parquet_writer(output_file, &result_iter.metadata, chunk_size)
1032 .map_err(|e| anyhow::anyhow!("Failed to initialize Parquet writer: {}", e))?;
1033
1034 writer
1035 .write_header(&result_iter.metadata)
1036 .map_err(|e| anyhow::anyhow!("Failed to write Parquet header: {}", e))?;
1037
1038 loop {
1040 if rows_remaining == Some(0) {
1042 break;
1043 }
1044
1045 let chunk = result_iter
1046 .collect_chunk(chunk_size)
1047 .await
1048 .map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
1049
1050 if chunk.is_empty() {
1051 break;
1052 }
1053
1054 let chunk_to_write = if let Some(remaining) = rows_remaining {
1056 if chunk.len() > remaining {
1057 chunk.into_iter().take(remaining).collect::<Vec<_>>()
1058 } else {
1059 chunk
1060 }
1061 } else {
1062 chunk
1063 };
1064
1065 let written = chunk_to_write.len();
1066 writer
1067 .write_chunk(&chunk_to_write)
1068 .map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
1069
1070 rows_exported += written as u64;
1071 pb.set_position(rows_exported);
1072
1073 if let Some(ref mut remaining) = rows_remaining {
1075 *remaining = remaining.saturating_sub(written);
1076 }
1077 }
1078
1079 writer
1080 .finalize()
1081 .map_err(|e| anyhow::anyhow!("Failed to finalize Parquet: {}", e))?;
1082 }
1083 }
1084
1085 pb.finish_and_clear();
1086
1087 if !quiet {
1089 let duration = start_time.elapsed();
1090 let file_size = std::fs::metadata(file)?.len();
1091
1092 println!("\nExport complete:");
1093 println!(" Rows: {}", rows_exported);
1094 println!(" Size: {}", format_bytes(file_size));
1095 println!(" Time: {}", format_export_duration(duration));
1096 let secs_f64 = duration.as_secs_f64();
1097 if secs_f64 > 0.0 {
1098 let rate = rows_exported as f64 / secs_f64;
1099 if rate.is_finite() {
1100 println!(" Rate: {:.0} rows/sec", rate);
1101 }
1102 }
1103 }
1104
1105 Ok(())
1106}
1107
1108#[cfg(not(feature = "state_machine"))]
1109pub async fn export_data(
1110 _database: &cqlite_core::Database,
1111 _source: &str,
1112 _file: &Path,
1113 _format: crate::cli::ExportFormat,
1114 _query_filter: Option<&str>,
1115 _limit: Option<usize>,
1116 _quiet: bool,
1117) -> Result<()> {
1118 Err(anyhow::anyhow!(
1119 "Data export is not available in M1.\n\
1120 Build with --features state_machine to enable this feature.\n\
1121 See CLAUDE.md for M1 API examples."
1122 ))
1123}
1124
1125#[cfg(feature = "state_machine")]
1130async fn export_to_csv(
1131 result: &cqlite_core::query::result::QueryResult,
1132 file: &Path,
1133 _column_names: &[String],
1134 pb: &ProgressBar,
1135) -> Result<()> {
1136 use crate::output::{StreamingCSVWriter, StreamingWriter};
1137
1138 const CHUNK_SIZE: usize = 5_000;
1140
1141 let output_file = File::create(file)
1142 .with_context(|| format!("Failed to create CSV file: {}", file.display()))?;
1143
1144 let buf_writer = BufWriter::new(output_file);
1146 let mut writer = StreamingCSVWriter::new(buf_writer);
1147
1148 writer
1150 .write_header(&result.metadata)
1151 .map_err(|e| anyhow::anyhow!("Failed to write CSV header: {}", e))?;
1152
1153 for chunk in result.rows.chunks(CHUNK_SIZE) {
1155 writer
1156 .write_chunk(chunk)
1157 .map_err(|e| anyhow::anyhow!("Failed to write CSV chunk: {}", e))?;
1158 pb.inc(chunk.len() as u64);
1159 }
1160
1161 writer
1163 .finalize()
1164 .map_err(|e| anyhow::anyhow!("Failed to finalize CSV file: {}", e))?;
1165
1166 Ok(())
1167}
1168
1169#[cfg(feature = "state_machine")]
1174async fn export_to_json(
1175 result: &cqlite_core::query::result::QueryResult,
1176 file: &Path,
1177 _column_names: &[String],
1178 pb: &ProgressBar,
1179) -> Result<()> {
1180 use crate::output::{StreamingJSONWriter, StreamingWriter};
1181
1182 const CHUNK_SIZE: usize = 5_000;
1184
1185 let output_file = File::create(file)
1186 .with_context(|| format!("Failed to create JSON file: {}", file.display()))?;
1187 let buf_writer = BufWriter::new(output_file);
1188
1189 let mut writer = StreamingJSONWriter::new(buf_writer);
1191
1192 writer
1194 .write_header(&result.metadata)
1195 .map_err(|e| anyhow::anyhow!("Failed to write JSON header: {}", e))?;
1196
1197 for chunk in result.rows.chunks(CHUNK_SIZE) {
1199 writer
1200 .write_chunk(chunk)
1201 .map_err(|e| anyhow::anyhow!("Failed to write JSON chunk: {}", e))?;
1202 pb.inc(chunk.len() as u64);
1203 }
1204
1205 writer
1207 .finalize()
1208 .map_err(|e| anyhow::anyhow!("Failed to finalize JSON file: {}", e))?;
1209
1210 Ok(())
1211}
1212
1213#[cfg(feature = "state_machine")]
1215async fn export_to_cql(
1216 result: &cqlite_core::query::result::QueryResult,
1217 file: &Path,
1218 source: &str,
1219 column_names: &[String],
1220 pb: &ProgressBar,
1221) -> Result<()> {
1222 let output_file = File::create(file)
1223 .with_context(|| format!("Failed to create CQL file: {}", file.display()))?;
1224 let mut writer = BufWriter::new(output_file);
1225
1226 let table_name = if source.to_uppercase().contains("FROM") {
1228 source
1230 .split_whitespace()
1231 .skip_while(|&word| word.to_uppercase() != "FROM")
1232 .nth(1)
1233 .unwrap_or("exported_table")
1234 } else {
1235 source
1236 };
1237
1238 writeln!(writer, "-- CQL Export from CQLite")?;
1240 writeln!(writer, "-- Source: {source}")?;
1241 writeln!(writer, "-- Generated: {}", chrono::Utc::now().to_rfc3339())?;
1242 writeln!(writer, "-- Rows: {}", result.rows.len())?;
1243 writeln!(writer)?;
1244
1245 for (index, row) in result.rows.iter().enumerate() {
1247 pb.set_position(index as u64 + 1);
1248
1249 let values: Vec<String> = column_names
1250 .iter()
1251 .map(|col| {
1252 row.get(col)
1253 .map(|v| match v {
1254 cqlite_core::Value::Text(s) => format!("'{}'", s.replace("'", "''")),
1255 cqlite_core::Value::Null => "NULL".to_string(),
1256 _ => v.to_string(),
1257 })
1258 .unwrap_or_else(|| "NULL".to_string())
1259 })
1260 .collect();
1261
1262 writeln!(
1263 writer,
1264 "INSERT INTO {} ({}) VALUES ({});",
1265 table_name,
1266 column_names.join(", "),
1267 values.join(", ")
1268 )?;
1269 }
1270
1271 writer
1272 .flush()
1273 .with_context(|| "Failed to flush CQL writer")?;
1274
1275 Ok(())
1276}
1277
1278#[cfg(feature = "state_machine")]
1283async fn export_to_parquet(
1284 result: &cqlite_core::query::result::QueryResult,
1285 file: &Path,
1286 _column_names: &[String],
1287 pb: &ProgressBar,
1288) -> Result<()> {
1289 use crate::output::{create_streaming_parquet_writer, StreamingWriter};
1290
1291 const CHUNK_SIZE: usize = 10_000;
1293
1294 pb.set_message("Initializing Parquet writer...");
1295
1296 let output_file = File::create(file)
1298 .with_context(|| format!("Failed to create Parquet file: {}", file.display()))?;
1299
1300 let mut writer = create_streaming_parquet_writer(output_file, &result.metadata, CHUNK_SIZE)
1302 .map_err(|e| anyhow::anyhow!("Failed to initialize Parquet writer: {}", e))?;
1303
1304 writer
1306 .write_header(&result.metadata)
1307 .map_err(|e| anyhow::anyhow!("Failed to write Parquet header: {}", e))?;
1308
1309 pb.set_message("Streaming rows to Parquet...");
1310
1311 for chunk in result.rows.chunks(CHUNK_SIZE) {
1313 writer
1314 .write_chunk(chunk)
1315 .map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
1316 pb.inc(chunk.len() as u64);
1317 }
1318
1319 pb.set_message("Finalizing Parquet file...");
1321 writer
1322 .finalize()
1323 .map_err(|e| anyhow::anyhow!("Failed to finalize Parquet file: {}", e))?;
1324
1325 Ok(())
1326}
1327
1328pub async fn read_sstable(
1330 sstable_path: &Path,
1331 schema_path: &Path,
1332 limit: Option<usize>,
1333 skip: Option<usize>,
1334 _generation: Option<u32>,
1335 format: OutputFormat,
1336 auto_detect: bool,
1337 cassandra_version: Option<String>,
1338) -> Result<()> {
1339 let schema = load_schema_file(schema_path, auto_detect, cassandra_version.as_deref())?;
1341
1342 println!("š Reading SSTable with REAL data parsing (no mocking!)");
1343 println!("š SSTable: {}", sstable_path.display());
1344 println!("š Schema: {}", schema_path.display());
1345
1346 let actual_sstable_path = resolve_sstable_path(sstable_path)?;
1348 println!("š Data file: {}", actual_sstable_path.display());
1349
1350 println!("š Using Bulletproof SSTable Reader (supports all Cassandra versions)");
1352
1353 let mut bulletproof_reader =
1355 BulletproofReader::open(&actual_sstable_path).with_context(|| {
1356 format!(
1357 "Failed to open SSTable with bulletproof reader: {}",
1358 actual_sstable_path.display()
1359 )
1360 })?;
1361
1362 let info = bulletproof_reader.info();
1364 println!(
1365 "š Detected format: {:?} (generation {}, size {})",
1366 info.format,
1367 info.generation_numeric().unwrap_or(0),
1368 info.size
1369 );
1370
1371 if let Some(compression_info) = bulletproof_reader.compression_info() {
1372 println!(
1373 "š¦ Compression: {} ({} byte chunks)",
1374 compression_info.algorithm, compression_info.chunk_length
1375 );
1376 }
1377
1378 match bulletproof_reader.parse_sstable_data() {
1380 Ok(bulletproof_entries) => {
1381 println!(
1382 "ā
Successfully parsed {} entries with bulletproof reader",
1383 bulletproof_entries.len()
1384 );
1385
1386 let mut processed = 0;
1388 let mut displayed = 0;
1389 let skip_count = skip.unwrap_or(0);
1390 let limit_count = limit.unwrap_or(bulletproof_entries.len());
1391
1392 let mut parsed_rows = Vec::new();
1393 let parser = RealDataParser::new(schema.clone());
1394
1395 for entry in bulletproof_entries {
1396 if processed < skip_count {
1397 processed += 1;
1398 continue;
1399 }
1400
1401 if displayed >= limit_count {
1402 break;
1403 }
1404
1405 let key = entry.key.clone();
1407 let value =
1408 cqlite_core::Value::Text(format!("{:?}|{}", entry.key, entry.format_info));
1409
1410 match parser.parse_entry(&key, &value) {
1411 Ok(parsed_row) => {
1412 parsed_rows.push(parsed_row);
1413 displayed += 1;
1414 }
1415 Err(e) => {
1416 eprintln!("ā ļø Failed to parse row {}: {}", processed + 1, e);
1417 println!(
1419 "š Raw bulletproof data: key='{:?}', info='{}'",
1420 entry.key, entry.format_info
1421 );
1422 }
1423 }
1424 processed += 1;
1425 }
1426
1427 match format {
1429 OutputFormat::Table => {
1430 display_table_format(&parser.get_column_names(), &parsed_rows)
1431 }
1432 OutputFormat::Json => display_json_format(&parsed_rows)?,
1433 OutputFormat::Csv => display_csv_format(&parser.get_column_names(), &parsed_rows)?,
1434 OutputFormat::Parquet => {
1435 return Err(anyhow::anyhow!("Parquet format is not supported for this command. Use --out json or --out csv instead."));
1436 }
1437 }
1438
1439 println!(
1440 "\nā
Bulletproof reader processed {processed} entries, displayed {displayed} rows"
1441 );
1442 return Ok(());
1443 }
1444 Err(e) => {
1445 println!("ā ļø Bulletproof parser still in development: {e}");
1446 println!("š Falling back to raw data display...");
1447
1448 match bulletproof_reader.read_raw_data(0, 1024) {
1450 Ok(data) => {
1451 println!("\nš Raw SSTable data (first 1024 bytes):");
1452 for (i, chunk) in data.chunks(16).enumerate() {
1453 print!(" {:04x}: ", i * 16);
1454 for byte in chunk {
1455 print!("{byte:02x} ");
1456 }
1457 print!(" ");
1458 for byte in chunk {
1459 let c = if byte.is_ascii_graphic() || *byte == b' ' {
1460 *byte as char
1461 } else {
1462 '.'
1463 };
1464 print!("{c}");
1465 }
1466 println!();
1467 }
1468
1469 println!(
1470 "\nšÆ This shows the bulletproof reader successfully decompressed the data!"
1471 );
1472 println!(
1473 "š” The parsing layer is still being implemented for your specific format."
1474 );
1475 return Ok(());
1476 }
1477 Err(e) => {
1478 println!("ā Bulletproof reader failed to read raw data: {e}");
1479 }
1480 }
1481 }
1482 }
1483
1484 println!("š Falling back to legacy SSTable reader...");
1486 let config = cqlite_core::Config::default();
1487 let platform = Arc::new(cqlite_core::platform::Platform::new(&config).await?);
1488 let reader = SSTableReader::open(&actual_sstable_path, &config, platform)
1489 .await
1490 .with_context(|| format!("Failed to open SSTable: {}", actual_sstable_path.display()))?;
1491
1492 let parser = RealDataParser::new(schema.clone());
1494
1495 let entries = reader.get_all_entries().await?;
1497 let mut processed = 0;
1498 let mut displayed = 0;
1499 let skip_count = skip.unwrap_or(0);
1500 let limit_count = limit.unwrap_or(entries.len());
1501
1502 println!("š Found {} entries in SSTable", entries.len());
1503
1504 let mut parsed_rows = Vec::new();
1505
1506 for (_table_id, key, value) in entries {
1507 if processed < skip_count {
1508 processed += 1;
1509 continue;
1510 }
1511
1512 if displayed >= limit_count {
1513 break;
1514 }
1515
1516 match parser.parse_entry(&key, &value) {
1518 Ok(parsed_row) => {
1519 parsed_rows.push(parsed_row);
1520 displayed += 1;
1521 }
1522 Err(e) => {
1523 eprintln!("ā ļø Failed to parse row {}: {}", processed + 1, e);
1524 }
1525 }
1526 processed += 1;
1527 }
1528
1529 match format {
1531 OutputFormat::Table => display_table_format(&parser.get_column_names(), &parsed_rows),
1532 OutputFormat::Json => display_json_format(&parsed_rows)?,
1533 OutputFormat::Csv => display_csv_format(&parser.get_column_names(), &parsed_rows)?,
1534 OutputFormat::Parquet => {
1535 return Err(anyhow::anyhow!("Parquet format is not supported for this command. Use --out json or --out csv instead."));
1536 }
1537 }
1538
1539 println!("\nā
Processed {processed} entries, displayed {displayed} rows");
1540 println!("šÆ Data source: LIVE SSTable file (no mocking!)");
1541
1542 Ok(())
1543}
1544
1545pub async fn execute_select_query(
1547 sstable_path: &Path,
1548 schema_path: &Path,
1549 query: &str,
1550 format: OutputFormat,
1551 auto_detect: bool,
1552 cassandra_version: Option<String>,
1553) -> Result<()> {
1554 let _schema = load_schema_file(schema_path, auto_detect, cassandra_version.as_deref())?;
1556
1557 println!("š Executing CQL query against LIVE SSTable data!");
1558 println!("š SSTable: {}", sstable_path.display());
1559 println!("š Schema: {}", schema_path.display());
1560 println!("š Query: {query}");
1561
1562 let actual_sstable_path = resolve_sstable_path(sstable_path)?;
1564 println!("š Data file: {}", actual_sstable_path.display());
1565
1566 let executor = QueryExecutor::new(QueryExecutorConfig);
1568
1569 let result = executor.execute_select(query).await?;
1571
1572 match format {
1574 OutputFormat::Table => result.display_table(),
1575 OutputFormat::Json => result.display_json()?,
1576 OutputFormat::Csv => result.display_csv()?,
1577 OutputFormat::Parquet => {
1578 return Err(anyhow::anyhow!("Parquet format is not supported for this command. Use --out json or --out csv instead."));
1579 }
1580 }
1581
1582 Ok(())
1583}
1584
1585fn resolve_sstable_path(sstable_path: &Path) -> Result<PathBuf> {
1587 if sstable_path.is_file() {
1588 return Ok(sstable_path.to_path_buf());
1590 }
1591
1592 if sstable_path.is_dir() {
1593 println!("š Directory detected, looking for SSTable files...");
1595
1596 let patterns = ["*-Data.db", "*-big-Data.db", "nb-*-big-Data.db"];
1598
1599 for pattern in &patterns {
1600 if let Ok(entries) = std::fs::read_dir(sstable_path) {
1601 for entry in entries.flatten() {
1602 let file_name = entry.file_name();
1603 let file_name_str = file_name.to_string_lossy();
1604
1605 if pattern.contains("*") {
1607 let pattern_parts: Vec<&str> = pattern.split('*').collect();
1608 if pattern_parts.len() == 2 {
1609 let starts_with = pattern_parts[0];
1610 let ends_with = pattern_parts[1];
1611
1612 if file_name_str.starts_with(starts_with)
1613 && file_name_str.ends_with(ends_with)
1614 {
1615 let data_file = entry.path();
1616 println!("ā Found SSTable data file: {}", data_file.display());
1617 return Ok(data_file);
1618 }
1619 } else if pattern_parts.len() == 3 {
1620 let starts_with = pattern_parts[0];
1621 let middle = pattern_parts[1];
1622 let ends_with = pattern_parts[2];
1623
1624 if file_name_str.starts_with(starts_with)
1625 && file_name_str.contains(middle)
1626 && file_name_str.ends_with(ends_with)
1627 {
1628 let data_file = entry.path();
1629 println!("ā Found SSTable data file: {}", data_file.display());
1630 return Ok(data_file);
1631 }
1632 }
1633 }
1634 }
1635 }
1636 }
1637
1638 return Err(anyhow::anyhow!(
1639 "No SSTable data files found in directory: {}\nLooked for: {}",
1640 sstable_path.display(),
1641 patterns.join(", ")
1642 ));
1643 }
1644
1645 Err(anyhow::anyhow!(
1646 "Path is neither a file nor a directory: {}",
1647 sstable_path.display()
1648 ))
1649}
1650
1651fn load_schema_file(
1653 schema_path: &Path,
1654 _auto_detect: bool,
1655 _cassandra_version: Option<&str>,
1656) -> Result<TableSchema> {
1657 let schema_content = std::fs::read_to_string(schema_path)
1658 .with_context(|| format!("Failed to read schema file: {}", schema_path.display()))?;
1659
1660 println!("š Loading schema from: {}", schema_path.display());
1661
1662 let extension = schema_path
1664 .extension()
1665 .and_then(|s| s.to_str())
1666 .unwrap_or("");
1667
1668 match extension.to_lowercase().as_str() {
1669 "json" => {
1670 println!("š Parsing JSON schema format");
1671 let json_schema: serde_json::Value = serde_json::from_str(&schema_content)
1673 .with_context(|| "Failed to parse JSON schema")?;
1674
1675 parse_json_schema(&json_schema)
1677 }
1678 "cql" | "sql" | "" => {
1679 println!("š Parsing CQL schema format");
1680 parse_cql_schema(&schema_content).with_context(|| "Failed to parse CQL schema")
1682 }
1683 _ => Err(anyhow::anyhow!(
1684 "Unsupported schema file extension: .{}\nSupported formats: .json, .cql",
1685 extension
1686 )),
1687 }
1688}
1689
1690fn parse_json_schema(json: &serde_json::Value) -> Result<TableSchema> {
1692 let keyspace = json["keyspace"]
1693 .as_str()
1694 .ok_or_else(|| anyhow::anyhow!("Missing keyspace in schema"))?;
1695 let table = json["table"]
1696 .as_str()
1697 .ok_or_else(|| anyhow::anyhow!("Missing table in schema"))?;
1698
1699 let columns = json["columns"]
1700 .as_object()
1701 .ok_or_else(|| anyhow::anyhow!("Missing columns in schema"))?;
1702
1703 let mut schema_columns = Vec::new();
1704 let mut partition_keys = Vec::new();
1705 let mut clustering_columns = Vec::new();
1706
1707 for (col_name, col_info) in columns {
1708 let col_obj = col_info
1709 .as_object()
1710 .ok_or_else(|| anyhow::anyhow!("Invalid column definition for {}", col_name))?;
1711
1712 let col_type = col_obj["type"]
1713 .as_str()
1714 .ok_or_else(|| anyhow::anyhow!("Missing type for column {}", col_name))?;
1715 let col_kind = col_obj["kind"]
1716 .as_str()
1717 .ok_or_else(|| anyhow::anyhow!("Missing kind for column {}", col_name))?;
1718
1719 let column = Column {
1720 name: col_name.clone(),
1721 data_type: col_type.to_string(),
1722 nullable: true, default: None, is_static: false, };
1726
1727 match col_kind {
1728 "PartitionKey" => {
1729 partition_keys.push(KeyColumn {
1730 name: col_name.clone(),
1731 position: partition_keys.len(),
1732 data_type: col_type.to_string(),
1733 });
1734 }
1735 "ClusteringColumn" => {
1736 clustering_columns.push(ClusteringColumn {
1737 name: col_name.clone(),
1738 position: clustering_columns.len(),
1739 data_type: col_type.to_string(),
1740 order: ClusteringOrder::Asc,
1741 });
1742 }
1743 "Regular" => {
1744 }
1746 _ => return Err(anyhow::anyhow!("Unknown column kind: {}", col_kind)),
1747 }
1748
1749 schema_columns.push(column);
1750 }
1751
1752 Ok(TableSchema {
1753 keyspace: keyspace.to_string(),
1754 table: table.to_string(),
1755 columns: schema_columns,
1756 partition_keys,
1757 clustering_keys: clustering_columns,
1758 comments: HashMap::new(),
1759 })
1760}
1761
1762fn display_table_format(column_names: &[String], rows: &[ParsedRow]) {
1764 use prettytable::{Cell, Row, Table};
1765
1766 if rows.is_empty() {
1767 println!("š No results found");
1768 return;
1769 }
1770
1771 let mut table = Table::new();
1772
1773 let mut header = Row::empty();
1775 for column in column_names {
1776 header.add_cell(Cell::new(column));
1777 }
1778 table.add_row(header);
1779
1780 for parsed_row in rows {
1782 let mut row = Row::empty();
1783 for column in column_names {
1784 let cell_value = parsed_row
1785 .get(column)
1786 .map(|v| v.to_string())
1787 .unwrap_or_else(|| "NULL".to_string());
1788 row.add_cell(Cell::new(&cell_value));
1789 }
1790 table.add_row(row);
1791 }
1792
1793 println!("\nš Live SSTable Data Results:");
1794 println!("{}", "=".repeat(50));
1795 table.printstd();
1796}
1797
1798fn display_json_format(rows: &[ParsedRow]) -> Result<()> {
1800 let json_rows: Vec<serde_json::Value> = rows.iter().map(|row| row.to_json()).collect();
1801
1802 println!("{}", serde_json::to_string_pretty(&json_rows)?);
1803 Ok(())
1804}
1805
1806fn display_csv_format(column_names: &[String], rows: &[ParsedRow]) -> Result<()> {
1808 let mut wtr = csv::Writer::from_writer(std::io::stdout());
1809
1810 wtr.write_record(column_names)?;
1812
1813 for parsed_row in rows {
1815 let mut record = Vec::new();
1816 for column in column_names {
1817 let cell_value = parsed_row
1818 .get(column)
1819 .map(|v| v.to_string())
1820 .unwrap_or_else(|| "NULL".to_string());
1821 record.push(cell_value);
1822 }
1823 wtr.write_record(&record)?;
1824 }
1825
1826 wtr.flush()?;
1827 Ok(())
1828}
1829
1830#[cfg(feature = "state_machine")]
1832pub async fn export_sstable(
1833 sstable_path: &Path,
1834 schema_path: &Path,
1835 output_path: &Path,
1836 format: ExportFormat,
1837) -> Result<()> {
1838 let schema = load_schema_file(schema_path, false, None)?;
1840
1841 let config = cqlite_core::Config::default();
1842 let platform = Arc::new(cqlite_core::platform::Platform::new(&config).await?);
1843 let reader = SSTableReader::open(sstable_path, &config, platform)
1844 .await
1845 .with_context(|| format!("Failed to open SSTable: {}", sstable_path.display()))?;
1846
1847 let mut output_file = File::create(output_path)
1848 .with_context(|| format!("Failed to create output file: {}", output_path.display()))?;
1849
1850 println!("Exporting SSTable: {}", sstable_path.display());
1851 println!("Output: {} ({})", output_path.display(), format);
1852
1853 let pb = ProgressBar::new_spinner();
1854 pb.set_style(
1855 ProgressStyle::default_spinner()
1856 .template("{spinner:.green} [{elapsed_precise}] {pos} rows exported")
1857 .unwrap(),
1858 );
1859
1860 match format {
1861 ExportFormat::Json => export_as_json(&reader, &schema, &mut output_file, &pb).await,
1862 ExportFormat::Csv => export_as_csv(&reader, &schema, &mut output_file, &pb).await,
1863 ExportFormat::Parquet => {
1864 drop(output_file);
1866 export_as_parquet(&reader, &schema, output_path, &pb).await
1867 }
1868 ExportFormat::Cql => export_as_cql(&reader, &schema, &mut output_file, &pb).await,
1869 }
1870}
1871
1872#[cfg(feature = "state_machine")]
1874async fn export_as_json(
1875 reader: &SSTableReader,
1876 schema: &TableSchema,
1877 output_file: &mut File,
1878 pb: &ProgressBar,
1879) -> Result<()> {
1880 use std::io::Write;
1881
1882 let parser = RealDataParser::new(schema.clone());
1883 let entries = reader.get_all_entries().await?;
1884
1885 let mut json_objects = Vec::new();
1886
1887 for (index, (_table_id, key, value)) in entries.iter().enumerate() {
1888 pb.set_position(index as u64);
1889
1890 match parser.parse_entry(key, value) {
1891 Ok(parsed_row) => {
1892 json_objects.push(parsed_row.to_json());
1893 }
1894 Err(e) => {
1895 eprintln!("ā ļø Failed to parse row {}: {}", index + 1, e);
1896 }
1897 }
1898 }
1899
1900 let json_output = serde_json::to_string_pretty(&json_objects)?;
1901 output_file.write_all(json_output.as_bytes())?;
1902
1903 pb.finish_with_message(format!("Exported {} rows to JSON", json_objects.len()));
1904 Ok(())
1905}
1906
1907#[cfg(feature = "state_machine")]
1909async fn export_as_csv(
1910 reader: &SSTableReader,
1911 schema: &TableSchema,
1912 output_file: &mut File,
1913 pb: &ProgressBar,
1914) -> Result<()> {
1915 let parser = RealDataParser::new(schema.clone());
1916 let entries = reader.get_all_entries().await?;
1917
1918 let mut wtr = csv::Writer::from_writer(output_file);
1919 let column_names = parser.get_column_names();
1920
1921 wtr.write_record(&column_names)?;
1923
1924 let mut exported_count = 0;
1925
1926 for (index, (_table_id, key, value)) in entries.iter().enumerate() {
1927 pb.set_position(index as u64);
1928
1929 match parser.parse_entry(key, value) {
1930 Ok(parsed_row) => {
1931 let mut record = Vec::new();
1932 for column in &column_names {
1933 let cell_value = parsed_row
1934 .get(column)
1935 .map(|v| v.to_string())
1936 .unwrap_or_else(|| "NULL".to_string());
1937 record.push(cell_value);
1938 }
1939 wtr.write_record(&record)?;
1940 exported_count += 1;
1941 }
1942 Err(e) => {
1943 eprintln!("ā ļø Failed to parse row {}: {}", index + 1, e);
1944 }
1945 }
1946 }
1947
1948 wtr.flush()?;
1949 pb.finish_with_message(format!("Exported {exported_count} rows to CSV"));
1950 Ok(())
1951}
1952
1953#[cfg(feature = "state_machine")]
1955async fn export_as_cql(
1956 reader: &SSTableReader,
1957 schema: &TableSchema,
1958 output_file: &mut File,
1959 pb: &ProgressBar,
1960) -> Result<()> {
1961 use std::io::Write;
1962
1963 let parser = RealDataParser::new(schema.clone());
1964 let entries = reader.get_all_entries().await?;
1965 let column_names = parser.get_column_names();
1966
1967 writeln!(output_file, "-- CQL Export from CQLite")?;
1969 writeln!(
1970 output_file,
1971 "-- Table: {}.{}",
1972 schema.keyspace, schema.table
1973 )?;
1974 writeln!(
1975 output_file,
1976 "-- Generated: {}",
1977 chrono::Utc::now().to_rfc3339()
1978 )?;
1979 writeln!(output_file)?;
1980
1981 let mut exported_count = 0;
1982
1983 for (index, (_table_id, key, value)) in entries.iter().enumerate() {
1984 pb.set_position(index as u64);
1985
1986 match parser.parse_entry(key, value) {
1987 Ok(parsed_row) => {
1988 let values: Vec<String> = column_names
1989 .iter()
1990 .map(|col| {
1991 parsed_row
1992 .get(col)
1993 .map(|_v| "NULL".to_string())
1994 .unwrap_or_else(|| "NULL".to_string())
1995 })
1996 .collect();
1997
1998 writeln!(
1999 output_file,
2000 "INSERT INTO {}.{} ({}) VALUES ({});",
2001 schema.keyspace,
2002 schema.table,
2003 column_names.join(", "),
2004 values.join(", ")
2005 )?;
2006 exported_count += 1;
2007 }
2008 Err(e) => {
2009 eprintln!("ā ļø Failed to parse row {}: {}", index + 1, e);
2010 }
2011 }
2012 }
2013
2014 pb.finish_with_message(format!("Exported {exported_count} rows to CQL"));
2015 Ok(())
2016}
2017
2018#[cfg(feature = "state_machine")]
2023async fn export_as_parquet(
2024 reader: &SSTableReader,
2025 schema: &TableSchema,
2026 output_path: &Path,
2027 pb: &ProgressBar,
2028) -> Result<()> {
2029 use crate::output::parquet::create_streaming_parquet_writer;
2030 use crate::output::StreamingWriter;
2031
2032 let entries = reader.get_all_entries().await?;
2033
2034 if entries.is_empty() {
2035 pb.finish_with_message("No data to export");
2036 let output_file = File::create(output_path)
2038 .with_context(|| format!("Failed to create output file: {}", output_path.display()))?;
2039 let metadata = build_query_metadata_from_schema(schema);
2040 let mut writer = create_streaming_parquet_writer(output_file, &metadata, 10_000)
2041 .map_err(|e| anyhow::anyhow!("Failed to create Parquet writer: {}", e))?;
2042 writer
2043 .finalize()
2044 .map_err(|e| anyhow::anyhow!("Failed to finalize Parquet: {}", e))?;
2045 return Ok(());
2046 }
2047
2048 let metadata = build_query_metadata_from_schema(schema);
2050
2051 let output_file = File::create(output_path)
2053 .with_context(|| format!("Failed to create output file: {}", output_path.display()))?;
2054 let mut writer = create_streaming_parquet_writer(output_file, &metadata, 10_000)
2055 .map_err(|e| anyhow::anyhow!("Failed to create Parquet writer: {}", e))?;
2056
2057 let mut chunk = Vec::with_capacity(1000);
2058 let mut exported_count = 0;
2059
2060 for (index, (_table_id, row_key, value)) in entries.iter().enumerate() {
2061 pb.set_position(index as u64);
2062
2063 let query_row = convert_entry_to_query_row(row_key, value, schema);
2065 chunk.push(query_row);
2066
2067 if chunk.len() >= 1000 {
2068 writer
2069 .write_chunk(&chunk)
2070 .map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
2071 exported_count += chunk.len();
2072 chunk.clear();
2073 }
2074 }
2075
2076 if !chunk.is_empty() {
2078 writer
2079 .write_chunk(&chunk)
2080 .map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
2081 exported_count += chunk.len();
2082 }
2083
2084 writer
2085 .finalize()
2086 .map_err(|e| anyhow::anyhow!("Failed to finalize Parquet: {}", e))?;
2087
2088 pb.finish_with_message(format!("Exported {} rows to Parquet", exported_count));
2089 Ok(())
2090}
2091
2092#[cfg(feature = "state_machine")]
2094fn build_query_metadata_from_schema(schema: &TableSchema) -> cqlite_core::query::QueryMetadata {
2095 use cqlite_core::query::{ColumnInfo, QueryMetadata};
2096
2097 let mut columns = Vec::new();
2098 let mut position = 0;
2099
2100 for pk in &schema.partition_keys {
2104 columns.push(ColumnInfo {
2105 name: pk.name.clone(),
2106 data_type: parse_cql_type_string(&pk.data_type),
2107 nullable: true,
2108 position,
2109 table_name: Some(format!("{}.{}", schema.keyspace, schema.table)),
2110 cql_type: None,
2111 });
2112 position += 1;
2113 }
2114
2115 for ck in &schema.clustering_keys {
2118 columns.push(ColumnInfo {
2119 name: ck.name.clone(),
2120 data_type: parse_cql_type_string(&ck.data_type),
2121 nullable: true,
2122 position,
2123 table_name: Some(format!("{}.{}", schema.keyspace, schema.table)),
2124 cql_type: None,
2125 });
2126 position += 1;
2127 }
2128
2129 for col in &schema.columns {
2131 columns.push(ColumnInfo {
2132 name: col.name.clone(),
2133 data_type: parse_cql_type_string(&col.data_type),
2134 nullable: true,
2135 position,
2136 table_name: Some(format!("{}.{}", schema.keyspace, schema.table)),
2137 cql_type: None,
2138 });
2139 position += 1;
2140 }
2141
2142 QueryMetadata {
2143 columns,
2144 ..Default::default()
2145 }
2146}
2147
2148#[cfg(feature = "state_machine")]
2150fn parse_cql_type_string(type_str: &str) -> cqlite_core::types::DataType {
2151 use cqlite_core::types::DataType;
2152
2153 match type_str.to_lowercase().as_str() {
2154 "text" | "varchar" | "ascii" => DataType::Text,
2155 "int" | "integer" => DataType::Integer,
2156 "bigint" => DataType::BigInt,
2157 "smallint" => DataType::SmallInt,
2158 "tinyint" => DataType::TinyInt,
2159 "float" => DataType::Float32,
2160 "double" => DataType::Float,
2161 "boolean" => DataType::Boolean,
2162 "timestamp" => DataType::Timestamp,
2163 "date" => DataType::Timestamp, "time" => DataType::BigInt, "uuid" | "timeuuid" => DataType::Uuid,
2166 "blob" => DataType::Blob,
2167 "counter" => DataType::BigInt, "varint" => DataType::Blob, "decimal" => DataType::Text, s if s.starts_with("list") => DataType::List,
2171 s if s.starts_with("set") => DataType::Set,
2172 s if s.starts_with("map") => DataType::Map,
2173 s if s.starts_with("frozen") => DataType::Frozen,
2174 s if s.starts_with("tuple") => DataType::Tuple,
2175 _ => DataType::Text, }
2177}
2178
2179#[cfg(feature = "state_machine")]
2181fn convert_entry_to_query_row(
2182 row_key: &cqlite_core::RowKey,
2183 value: &cqlite_core::Value,
2184 schema: &TableSchema,
2185) -> cqlite_core::query::QueryRow {
2186 use cqlite_core::query::{QueryRow, RowMetadata};
2187 use cqlite_core::Value;
2188 use std::collections::HashMap;
2189
2190 let mut values: HashMap<String, Value> = HashMap::new();
2191
2192 match value {
2194 Value::Map(pairs) => {
2195 for (k, v) in pairs {
2197 if let Value::Text(col_name) = k {
2198 values.insert(col_name.clone(), v.clone());
2199 }
2200 }
2201 }
2202 Value::Blob(data) => {
2203 if let Some(first_col) = schema.columns.first() {
2205 values.insert(first_col.name.clone(), Value::Blob(data.clone()));
2206 }
2207 }
2208 Value::Text(s) => {
2209 if let Some(first_col) = schema.columns.first() {
2210 values.insert(first_col.name.clone(), Value::Text(s.clone()));
2211 }
2212 }
2213 other => {
2214 if let Some(first_col) = schema.columns.first() {
2216 values.insert(first_col.name.clone(), other.clone());
2217 }
2218 }
2219 }
2220
2221 for pk in &schema.partition_keys {
2223 values.entry(pk.name.clone()).or_insert(Value::Null);
2224 }
2225 for ck in &schema.clustering_keys {
2226 values.entry(ck.name.clone()).or_insert(Value::Null);
2227 }
2228 for col in &schema.columns {
2229 values.entry(col.name.clone()).or_insert(Value::Null);
2230 }
2231
2232 QueryRow {
2233 values,
2234 key: row_key.clone(),
2235 metadata: RowMetadata::default(),
2236 }
2237}
2238
2239pub async fn read_sstable_enhanced(
2241 sstable_path: &Path,
2242 schema_path: &Path,
2243 limit: Option<usize>,
2244 skip: Option<usize>,
2245 generation: Option<u32>,
2246 format: OutputFormat,
2247 auto_detect: bool,
2248 cassandra_version: Option<String>,
2249 interactive: bool,
2250 progress: bool,
2251 export: Option<PathBuf>,
2252) -> Result<()> {
2253 println!("š Enhanced SSTable Reader");
2254 println!("š SSTable: {}", sstable_path.display());
2255 println!("š Schema: {}", schema_path.display());
2256
2257 if interactive {
2258 println!("š Interactive mode enabled - use Ctrl+C to exit");
2259 }
2260
2261 if progress {
2262 println!("š Progress tracking enabled");
2263 }
2264
2265 if let Some(ref export_path) = export {
2266 println!("š¤ Export enabled to: {}", export_path.display());
2267 }
2268
2269 let result = read_sstable(
2271 sstable_path,
2272 schema_path,
2273 limit,
2274 skip,
2275 generation,
2276 format,
2277 auto_detect,
2278 cassandra_version,
2279 )
2280 .await;
2281
2282 if interactive {
2287 println!("\nš Interactive mode features coming soon!");
2288 println!(" - Filter data interactively");
2289 println!(" - Navigate through pages");
2290 println!(" - Query-like interface");
2291 }
2292
2293 if let Some(export_path) = export {
2294 println!("\nš¤ Export functionality coming soon!");
2295 println!(" Target: {}", export_path.display());
2296 println!(" Formats: JSON, CSV, Parquet");
2297 }
2298
2299 result
2300}
2301
2302pub async fn validate_sstable(
2304 sstable_path: &Path,
2305 schema_path: Option<&Path>,
2306 deep: bool,
2307 fix: bool,
2308 report_path: Option<&Path>,
2309) -> Result<()> {
2310 println!("š SSTable Validation");
2311 println!("š SSTable: {}", sstable_path.display());
2312
2313 if let Some(schema) = schema_path {
2314 println!("š Schema: {}", schema.display());
2315 }
2316
2317 if deep {
2318 println!("š¬ Deep validation enabled (thorough but slower)");
2319 }
2320
2321 if fix {
2322 println!("š§ Auto-fix enabled for recoverable issues");
2323 }
2324
2325 if let Some(report) = report_path {
2326 println!("š Report will be saved to: {}", report.display());
2327 }
2328
2329 let actual_sstable_path = resolve_sstable_path(sstable_path)?;
2331 println!("š Data file: {}", actual_sstable_path.display());
2332
2333 let mut issues_found = 0;
2334 let issues_fixed = 0;
2335 let mut validation_errors = Vec::new();
2336
2337 println!("\nš Basic file validation:");
2339 if !actual_sstable_path.exists() {
2340 let error = "ā SSTable file does not exist";
2341 println!("{error}");
2342 validation_errors.push(error.to_string());
2343 issues_found += 1;
2344 } else {
2345 println!("ā
SSTable file exists");
2346
2347 match std::fs::metadata(&actual_sstable_path) {
2349 Ok(metadata) => {
2350 println!("ā
File readable (size: {} bytes)", metadata.len());
2351
2352 if metadata.len() == 0 {
2353 let error = "ā ļø Warning: SSTable file is empty";
2354 println!("{error}");
2355 validation_errors.push(error.to_string());
2356 issues_found += 1;
2357 }
2358 }
2359 Err(e) => {
2360 let error = format!("ā Cannot read file metadata: {e}");
2361 println!("{error}");
2362 validation_errors.push(error);
2363 issues_found += 1;
2364 }
2365 }
2366 }
2367
2368 println!("\nš Format validation:");
2370 match BulletproofReader::open(&actual_sstable_path) {
2371 Ok(mut reader) => {
2372 println!("ā
SSTable format is readable");
2373
2374 let info = reader.info();
2375 println!(" Format: {:?}", info.format);
2376 println!(" Generation: {}", info.generation_numeric().unwrap_or(0));
2377 println!(" Size: {} bytes", info.size);
2378
2379 if let Some(compression) = reader.compression_info() {
2380 println!(
2381 " Compression: {} (chunk size: {})",
2382 compression.algorithm, compression.chunk_length
2383 );
2384 }
2385
2386 if deep {
2388 println!("\nš¬ Deep validation:");
2389 match reader.parse_sstable_data() {
2390 Ok(entries) => {
2391 println!("ā
Successfully parsed {} entries", entries.len());
2392
2393 if let Some(schema_path) = schema_path {
2395 match load_schema_file(schema_path, true, None) {
2396 Ok(schema) => {
2397 println!("ā
Schema loaded successfully");
2398 let parser = RealDataParser::new(schema);
2399
2400 let mut parsing_errors = 0;
2401 for entry in entries.iter() {
2402 let key = entry.key.clone();
2403 let value =
2404 cqlite_core::Value::Text(format!("{:?}", entry.key));
2405
2406 if parser.parse_entry(&key, &value).is_err() {
2407 parsing_errors += 1;
2408 }
2409 }
2410
2411 if parsing_errors > 0 {
2412 let error = format!(
2413 "ā ļø {parsing_errors} entries failed schema validation"
2414 );
2415 println!("{error}");
2416 validation_errors.push(error);
2417 issues_found += parsing_errors;
2418 } else {
2419 println!("ā
All entries match schema");
2420 }
2421 }
2422 Err(e) => {
2423 let error =
2424 format!("ā ļø Could not load schema for validation: {e}");
2425 println!("{error}");
2426 validation_errors.push(error);
2427 }
2428 }
2429 }
2430 }
2431 Err(e) => {
2432 let error = format!("ā Failed to parse SSTable data: {e}");
2433 println!("{error}");
2434 validation_errors.push(error);
2435 issues_found += 1;
2436 }
2437 }
2438 }
2439 }
2440 Err(e) => {
2441 let error = format!("ā Cannot open SSTable with bulletproof reader: {e}");
2442 println!("{error}");
2443 validation_errors.push(error);
2444 issues_found += 1;
2445 }
2446 }
2447
2448 if let Some(report_path) = report_path {
2450 let mut report_content = format!(
2451 "# SSTable Validation Report\n\n\
2452 **File:** {}\n\
2453 **Validation Time:** {}\n\
2454 **Deep Validation:** {}\n\
2455 **Auto-fix Enabled:** {}\n\n\
2456 ## Summary\n\
2457 - Issues Found: {}\n\
2458 - Issues Fixed: {}\n\n\
2459 ## Details\n",
2460 sstable_path.display(),
2461 chrono::Utc::now().to_rfc3339(),
2462 deep,
2463 fix,
2464 issues_found,
2465 issues_fixed
2466 );
2467
2468 for error in &validation_errors {
2469 report_content.push_str(&format!("- {error}\n"));
2470 }
2471
2472 std::fs::write(report_path, report_content)
2473 .with_context(|| format!("Failed to write report to {}", report_path.display()))?;
2474
2475 println!("\nš Validation report saved to: {}", report_path.display());
2476 }
2477
2478 println!("\nš Validation Summary:");
2480 println!(" Issues found: {issues_found}");
2481 println!(" Issues fixed: {issues_fixed}");
2482
2483 if issues_found == 0 {
2484 println!("ā
SSTable validation passed!");
2485 } else if fix && issues_fixed == issues_found {
2486 println!("š§ All issues fixed!");
2487 } else {
2488 println!("ā ļø {} issues remain", issues_found - issues_fixed);
2489 }
2490
2491 Ok(())
2492}
2493
2494pub async fn analyze_sstable(
2496 sstable_path: &Path,
2497 schema_path: Option<&Path>,
2498 detailed: bool,
2499 infer_schema: bool,
2500 report_path: Option<&Path>,
2501) -> Result<()> {
2502 println!("š SSTable Analysis");
2503 println!("š SSTable: {}", sstable_path.display());
2504
2505 if let Some(schema) = schema_path {
2506 println!("š Schema: {}", schema.display());
2507 }
2508
2509 if detailed {
2510 println!("š Detailed analysis enabled");
2511 }
2512
2513 if infer_schema {
2514 println!("š§ Schema inference enabled");
2515 }
2516
2517 if let Some(report) = report_path {
2518 println!("š Report will be saved to: {}", report.display());
2519 }
2520
2521 let actual_sstable_path = resolve_sstable_path(sstable_path)?;
2523 println!("š Data file: {}", actual_sstable_path.display());
2524
2525 let mut analysis_results = Vec::new();
2526
2527 println!("\nš File Analysis:");
2529 match std::fs::metadata(&actual_sstable_path) {
2530 Ok(metadata) => {
2531 let file_size = metadata.len();
2532 println!(
2533 " File size: {} bytes ({:.2} MB)",
2534 file_size,
2535 file_size as f64 / 1_048_576.0
2536 );
2537 analysis_results.push(format!("File size: {file_size} bytes"));
2538
2539 if let Ok(created) = metadata.created() {
2540 println!(" Created: {created:?}");
2541 }
2542 if let Ok(modified) = metadata.modified() {
2543 println!(" Modified: {modified:?}");
2544 }
2545 }
2546 Err(e) => {
2547 println!("ā Cannot read file metadata: {e}");
2548 return Err(anyhow::anyhow!("File metadata not accessible"));
2549 }
2550 }
2551
2552 println!("\nš Format Analysis:");
2554 match BulletproofReader::open(&actual_sstable_path) {
2555 Ok(mut reader) => {
2556 let info = reader.info();
2557 println!(" Format: {:?}", info.format);
2558 println!(" Generation: {}", info.generation_numeric().unwrap_or(0));
2559 println!(" Size: {} bytes", info.size);
2560
2561 analysis_results.push(format!("Format: {:?}", info.format));
2562 analysis_results.push(format!(
2563 "Generation: {}",
2564 info.generation_numeric().unwrap_or(0)
2565 ));
2566
2567 if let Some(compression) = reader.compression_info() {
2568 println!(" Compression: {}", compression.algorithm);
2569 println!(" Chunk length: {} bytes", compression.chunk_length);
2570 analysis_results.push(format!("Compression: {}", compression.algorithm));
2571 } else {
2572 println!(" Compression: None");
2573 analysis_results.push("Compression: None".to_string());
2574 }
2575
2576 println!("\nš Data Analysis:");
2578 match reader.parse_sstable_data() {
2579 Ok(entries) => {
2580 let entry_count = entries.len();
2581 println!(" Total entries: {entry_count}");
2582 analysis_results.push(format!("Total entries: {entry_count}"));
2583
2584 if entry_count > 0 {
2585 let total_key_size: usize =
2587 entries.iter().map(|e| format!("{:?}", e.key).len()).sum();
2588 let avg_key_size = total_key_size / entry_count;
2589 println!(" Average key size: {avg_key_size} bytes");
2590 analysis_results.push(format!("Average key size: {avg_key_size} bytes"));
2591
2592 println!("\nš Sample Entries (first 5):");
2594 for (i, entry) in entries.iter().take(5).enumerate() {
2595 println!(
2596 " {}. Key: {:?}, Info: {}",
2597 i + 1,
2598 entry.key,
2599 entry.format_info
2600 );
2601 }
2602 }
2603
2604 if detailed {
2606 println!("\nš Detailed Statistics:");
2607
2608 let mut key_lengths = entries
2610 .iter()
2611 .map(|e| format!("{:?}", e.key).len())
2612 .collect::<Vec<_>>();
2613 key_lengths.sort_unstable();
2614
2615 if !key_lengths.is_empty() {
2616 let min_key_len = key_lengths[0];
2617 let max_key_len = key_lengths[key_lengths.len() - 1];
2618 let median_key_len = key_lengths[key_lengths.len() / 2];
2619
2620 println!(
2621 " Key length min/max/median: {min_key_len}/{max_key_len}/{median_key_len}"
2622 );
2623 analysis_results.push(format!(
2624 "Key lengths - min: {min_key_len}, max: {max_key_len}, median: {median_key_len}"
2625 ));
2626 }
2627
2628 println!(" š Advanced statistics coming soon!");
2630 }
2631
2632 if infer_schema {
2634 println!("\nš§ Schema Inference:");
2635 println!(" š§ Schema inference coming soon!");
2637 analysis_results
2638 .push("Schema inference: Feature in development".to_string());
2639 }
2640 }
2641 Err(e) => {
2642 println!("ā Failed to parse SSTable data: {e}");
2643 analysis_results.push(format!("Parse error: {e}"));
2644 }
2645 }
2646 }
2647 Err(e) => {
2648 println!("ā Cannot open SSTable: {e}");
2649 return Err(anyhow::anyhow!("Cannot analyze SSTable: {}", e));
2650 }
2651 }
2652
2653 if let Some(report_path) = report_path {
2655 let mut report_content = format!(
2656 "# SSTable Analysis Report\n\n\
2657 **File:** {}\n\
2658 **Analysis Time:** {}\n\
2659 **Detailed Analysis:** {}\n\
2660 **Schema Inference:** {}\n\n\
2661 ## Results\n",
2662 sstable_path.display(),
2663 chrono::Utc::now().to_rfc3339(),
2664 detailed,
2665 infer_schema
2666 );
2667
2668 for result in &analysis_results {
2669 report_content.push_str(&format!("- {result}\n"));
2670 }
2671
2672 std::fs::write(report_path, report_content)
2673 .with_context(|| format!("Failed to write report to {}", report_path.display()))?;
2674
2675 println!("\nš Analysis report saved to: {}", report_path.display());
2676 }
2677
2678 println!("\nā
Analysis completed!");
2679
2680 Ok(())
2681}
2682
2683pub async fn benchmark_sstable(
2685 sstable_path: &Path,
2686 schema_path: Option<&Path>,
2687 iterations: u32,
2688 operations: &str,
2689 report_path: Option<&Path>,
2690 memory_profile: bool,
2691) -> Result<()> {
2692 println!("š SSTable Performance Benchmark");
2693 println!("š SSTable: {}", sstable_path.display());
2694
2695 if let Some(schema) = schema_path {
2696 println!("š Schema: {}", schema.display());
2697 }
2698
2699 println!("š Iterations: {iterations}");
2700 println!("šÆ Operations: {operations}");
2701
2702 if memory_profile {
2703 println!("š Memory profiling enabled");
2704 }
2705
2706 if let Some(report) = report_path {
2707 println!("š Report will be saved to: {}", report.display());
2708 }
2709
2710 let actual_sstable_path = resolve_sstable_path(sstable_path)?;
2712 println!("š Data file: {}", actual_sstable_path.display());
2713
2714 let mut benchmark_results = Vec::new();
2715
2716 let ops: Vec<&str> = if operations == "all" {
2718 vec!["read", "scan", "query"]
2719 } else {
2720 operations.split(',').map(|s| s.trim()).collect()
2721 };
2722
2723 println!("\nš Starting benchmarks...");
2724
2725 for op in &ops {
2726 println!("\nš Benchmarking operation: {op}");
2727
2728 let mut times = Vec::new();
2729 let mut memory_usage = Vec::new();
2730
2731 for i in 1..=iterations {
2732 print!(" Iteration {i}/{iterations}: ");
2733
2734 let start_time = std::time::Instant::now();
2735 let initial_memory = if memory_profile {
2736 0u64
2738 } else {
2739 0u64
2740 };
2741
2742 let result = match *op {
2744 "read" => benchmark_read_operation(&actual_sstable_path).await,
2745 "scan" => benchmark_scan_operation(&actual_sstable_path).await,
2746 "query" => benchmark_query_operation(&actual_sstable_path, schema_path).await,
2747 _ => {
2748 println!("ā Unknown operation: {op}");
2749 continue;
2750 }
2751 };
2752
2753 let elapsed = start_time.elapsed();
2754 let final_memory = if memory_profile {
2755 0u64
2757 } else {
2758 0u64
2759 };
2760
2761 match result {
2762 Ok(entries_processed) => {
2763 println!(
2764 "ā
{}ms ({} entries)",
2765 elapsed.as_millis(),
2766 entries_processed
2767 );
2768 times.push(elapsed.as_millis() as f64);
2769 if memory_profile {
2770 memory_usage.push(final_memory.saturating_sub(initial_memory));
2771 }
2772 }
2773 Err(e) => {
2774 println!("ā Failed: {e}");
2775 }
2776 }
2777 }
2778
2779 if !times.is_empty() {
2781 times.sort_by(|a, b| a.partial_cmp(b).unwrap());
2782 let min_time = times[0];
2783 let max_time = times[times.len() - 1];
2784 let avg_time = times.iter().sum::<f64>() / times.len() as f64;
2785 let median_time = times[times.len() / 2];
2786
2787 println!("\n š {op} Statistics:");
2788 println!(" Min time: {min_time:.2}ms");
2789 println!(" Max time: {max_time:.2}ms");
2790 println!(" Avg time: {avg_time:.2}ms");
2791 println!(" Median time: {median_time:.2}ms");
2792
2793 benchmark_results.push(format!(
2794 "{op}: min={min_time:.2}ms, max={max_time:.2}ms, avg={avg_time:.2}ms, median={median_time:.2}ms"
2795 ));
2796
2797 if memory_profile && !memory_usage.is_empty() {
2798 let avg_memory = memory_usage.iter().sum::<u64>() / memory_usage.len() as u64;
2799 println!(" Avg memory: {avg_memory} bytes");
2800 benchmark_results.push(format!("{op}: avg_memory={avg_memory}bytes"));
2801 }
2802 }
2803 }
2804
2805 if let Some(report_path) = report_path {
2807 let mut report_content = format!(
2808 "# SSTable Benchmark Report\n\n\
2809 **File:** {}\n\
2810 **Benchmark Time:** {}\n\
2811 **Iterations:** {}\n\
2812 **Operations:** {}\n\
2813 **Memory Profiling:** {}\n\n\
2814 ## Results\n",
2815 sstable_path.display(),
2816 chrono::Utc::now().to_rfc3339(),
2817 iterations,
2818 operations,
2819 memory_profile
2820 );
2821
2822 for result in &benchmark_results {
2823 report_content.push_str(&format!("- {result}\n"));
2824 }
2825
2826 std::fs::write(report_path, report_content)
2827 .with_context(|| format!("Failed to write report to {}", report_path.display()))?;
2828
2829 println!("\nš Benchmark report saved to: {}", report_path.display());
2830 }
2831
2832 println!("\nš Benchmark completed!");
2833
2834 Ok(())
2835}
2836
2837async fn benchmark_read_operation(sstable_path: &Path) -> Result<usize> {
2839 let reader = BulletproofReader::open(sstable_path).with_context(|| "Failed to open SSTable")?;
2840
2841 let _info = reader.info();
2842 Ok(1) }
2844
2845async fn benchmark_scan_operation(sstable_path: &Path) -> Result<usize> {
2847 let mut reader =
2848 BulletproofReader::open(sstable_path).with_context(|| "Failed to open SSTable")?;
2849
2850 match reader.parse_sstable_data() {
2851 Ok(entries) => Ok(entries.len()),
2852 Err(_) => {
2853 let _info = reader.info();
2855 Ok(0)
2856 }
2857 }
2858}
2859
2860async fn benchmark_query_operation(
2862 sstable_path: &Path,
2863 schema_path: Option<&Path>,
2864) -> Result<usize> {
2865 let mut reader =
2866 BulletproofReader::open(sstable_path).with_context(|| "Failed to open SSTable")?;
2867
2868 match reader.parse_sstable_data() {
2869 Ok(entries) => {
2870 if let Some(schema_path) = schema_path {
2871 match load_schema_file(schema_path, true, None) {
2872 Ok(schema) => {
2873 let parser = RealDataParser::new(schema);
2874 let mut parsed_count = 0;
2875
2876 for entry in &entries {
2877 let key = entry.key.clone();
2878 let value = cqlite_core::Value::Text(format!("{:?}", entry.key));
2879
2880 if parser.parse_entry(&key, &value).is_ok() {
2881 parsed_count += 1;
2882 }
2883 }
2884
2885 Ok(parsed_count)
2886 }
2887 Err(_) => Ok(entries.len()), }
2889 } else {
2890 Ok(entries.len())
2891 }
2892 }
2893 Err(_) => Ok(0),
2894 }
2895}