#![allow(dead_code)]
#![allow(deprecated)]
use crate::cli::OutputFormat;
#[cfg(feature = "state_machine")]
use crate::cli::{ExportFormat, ImportFormat};
#[cfg(feature = "state_machine")]
use indicatif::{ProgressBar, ProgressStyle};
#[derive(Debug, Clone)]
pub struct ParsedRow {
pub data: std::collections::HashMap<String, String>,
}
impl ParsedRow {
pub fn get(&self, key: &str) -> Option<&String> {
self.data.get(key)
}
pub fn to_json(&self) -> serde_json::Value {
serde_json::Value::Object(
self.data
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
.collect(),
)
}
}
#[derive(Debug, Clone)]
pub struct RealDataParser {
pub schema: cqlite_core::schema::TableSchema,
}
impl RealDataParser {
pub fn new(schema: cqlite_core::schema::TableSchema) -> Self {
Self { schema }
}
pub fn parse_entry(
&self,
_key: &cqlite_core::RowKey,
_value: &cqlite_core::Value,
) -> Result<ParsedRow> {
Ok(ParsedRow {
data: std::collections::HashMap::new(),
})
}
pub fn get_column_names(&self) -> Vec<String> {
self.schema.columns.iter().map(|c| c.name.clone()).collect()
}
}
fn format_export_duration(duration: std::time::Duration) -> String {
let secs = duration.as_secs();
if secs == 0 {
let millis = duration.as_millis();
if millis > 0 {
format!("{}ms", millis)
} else {
"<1ms".to_string()
}
} else if secs < 60 {
format!("{}s", secs)
} else if secs < 3600 {
format!("{}m {}s", secs / 60, secs % 60)
} else {
format!("{}h {}m {}s", secs / 3600, (secs % 3600) / 60, secs % 60)
}
}
#[derive(Debug)]
pub struct QueryExecutor;
impl QueryExecutor {
pub fn new(_config: QueryExecutorConfig) -> Self {
Self
}
pub async fn execute_select(&self, _query: &str) -> Result<QueryResult> {
Ok(QueryResult {
rows: Vec::new(),
execution_time_ms: 0.0,
})
}
}
#[derive(Debug, Default)]
pub struct QueryExecutorConfig;
#[derive(Debug, Clone)]
pub struct QueryResult {
pub rows: Vec<ParsedRow>,
pub execution_time_ms: f64,
}
impl QueryResult {
pub fn display_table(&self) {
if self.rows.is_empty() {
println!("No rows returned");
return;
}
let mut table = prettytable::Table::new();
if let Some(first_row) = self.rows.first() {
let headers: Vec<_> = first_row.data.keys().cloned().collect();
table.set_titles(prettytable::Row::new(
headers.iter().map(|h| prettytable::Cell::new(h)).collect(),
));
for row in &self.rows {
let cells: Vec<_> = headers
.iter()
.map(|h| prettytable::Cell::new(row.data.get(h).unwrap_or(&String::new())))
.collect();
table.add_row(prettytable::Row::new(cells));
}
}
table.printstd();
}
pub fn display_json(&self) -> Result<()> {
let json_rows: Vec<_> = self.rows.iter().map(|r| r.to_json()).collect();
println!("{}", serde_json::to_string_pretty(&json_rows)?);
Ok(())
}
pub fn display_csv(&self) -> Result<()> {
if self.rows.is_empty() {
return Ok(());
}
let headers: Vec<_> = self.rows[0].data.keys().cloned().collect();
println!("{}", headers.join(","));
for row in &self.rows {
let values: Vec<_> = headers
.iter()
.map(|h| row.data.get(h).unwrap_or(&String::new()).clone())
.collect();
println!("{}", values.join(","));
}
Ok(())
}
}
use anyhow::{Context, Result};
#[cfg(feature = "state_machine")]
use cqlite_core::Database;
use cqlite_core::{
schema::{parse_cql_schema, ClusteringColumn, ClusteringOrder, Column, KeyColumn, TableSchema},
storage::sstable::{bulletproof_reader::BulletproofReader, reader::SSTableReader},
};
use std::collections::HashMap;
#[cfg(feature = "state_machine")]
use std::fs::File;
#[cfg(feature = "state_machine")]
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
pub mod admin;
pub mod bench;
pub mod schema;
pub mod write;
pub mod docker;
pub mod info;
pub mod read_sstable;
#[cfg(feature = "state_machine")]
pub async fn execute_query(
database: &Database,
query: &str,
explain: bool,
timing: bool,
format: OutputFormat,
config: &crate::config::OutputConfig,
) -> Result<()> {
use crate::output::{write_to_target, OutputTarget};
use std::time::Instant;
let start_time = Instant::now();
if explain {
let explain_result = database
.explain(query)
.await
.with_context(|| "Failed to explain query")?;
println!("Query Explanation");
println!("================");
println!("Query Type: {}", explain_result.query_type);
println!("Plan Type: {}", explain_result.plan_type);
println!("Estimated Cost: {:.2}", explain_result.estimated_cost);
println!("Estimated Rows: {}", explain_result.estimated_rows);
if !explain_result.selected_indexes.is_empty() {
println!("\nSelected Indexes:");
for index in &explain_result.selected_indexes {
println!(" - {index}");
}
}
if !explain_result.execution_steps.is_empty() {
println!("\nExecution Steps:");
for (i, step) in explain_result.execution_steps.iter().enumerate() {
println!(" {}. {}", i + 1, step);
}
}
if !explain_result.parallelization_info.is_empty() {
println!("\nParallelization:");
for info in &explain_result.parallelization_info {
println!(" - {info}");
}
}
if timing {
let elapsed = start_time.elapsed();
println!("\nTiming: {:.2}ms", elapsed.as_millis());
}
return Ok(());
}
let result = database
.execute(query)
.await
.with_context(|| "Failed to execute query")?;
let output_bytes: Vec<u8> = match format {
OutputFormat::Table => {
use crate::output::table::TableWriter;
let table_output = TableWriter::write(&result, config)
.map_err(|e| anyhow::anyhow!("Failed to format table output: {}", e))?;
table_output.into_bytes()
}
OutputFormat::Json => {
use crate::output::json::JSONWriter;
let json_output = JSONWriter::write(&result, config)
.map_err(|e| anyhow::anyhow!("Failed to format JSON output: {}", e))?;
json_output.into_bytes()
}
OutputFormat::Csv => {
use crate::output::CSVWriter;
let csv_output = CSVWriter::write(&result, config)
.map_err(|e| anyhow::anyhow!("Failed to format CSV output: {}", e))?;
csv_output.into_bytes()
}
OutputFormat::Parquet => {
use crate::output::ParquetWriter;
ParquetWriter::write(&result, config)
.map_err(|e| anyhow::anyhow!("Failed to format Parquet output: {}", e))?
}
};
write_to_target(&output_bytes, &config.target, config.overwrite)
.map_err(|e| anyhow::anyhow!("{}", e))?;
if matches!(config.target, OutputTarget::Stdout) && !matches!(format, OutputFormat::Parquet) {
if !matches!(format, OutputFormat::Csv) {
println!();
}
}
if let OutputTarget::File(path) = &config.target {
eprintln!("Output written to: {}", path.display());
}
if timing {
let elapsed = start_time.elapsed();
eprintln!("\nQuery executed in {:.2}ms", elapsed.as_millis());
let performance = result.performance();
if performance.total_time_us > 0 {
eprintln!(
"Parse time: {:.2}ms",
performance.parse_time_us as f64 / 1000.0
);
eprintln!(
"Planning time: {:.2}ms",
performance.planning_time_us as f64 / 1000.0
);
eprintln!(
"Execution time: {:.2}ms",
performance.execution_time_us as f64 / 1000.0
);
eprintln!("Memory usage: {} bytes", performance.memory_usage_bytes);
eprintln!("I/O operations: {}", performance.io_operations);
if performance.cache_hits + performance.cache_misses > 0 {
eprintln!(
"Cache hit ratio: {:.1}%",
performance.cache_hit_ratio() * 100.0
);
}
}
}
let warnings = result.warnings();
if !warnings.is_empty() {
eprintln!("\nWarnings:");
for warning in warnings {
eprintln!(" ⚠️ {warning}");
}
}
Ok(())
}
#[cfg(not(feature = "state_machine"))]
pub async fn execute_query(
_database: &cqlite_core::Database,
_query: &str,
_explain: bool,
_timing: bool,
_format: OutputFormat,
_config: &crate::config::OutputConfig,
) -> Result<()> {
Err(anyhow::anyhow!(
"Query execution is not available in M1.\n\
Build with --features state_machine to enable this feature.\n\
See CLAUDE.md for M1 API examples."
))
}
#[cfg(feature = "state_machine")]
fn print_csv_format(
result: &cqlite_core::query::result::QueryResult,
config: &crate::config::OutputConfig,
) -> Result<()> {
use crate::output::CSVWriter;
let csv_output = CSVWriter::write(result, config)
.map_err(|e| anyhow::anyhow!("Failed to format CSV output: {}", e))?;
print!("{}", csv_output);
Ok(())
}
#[cfg(feature = "state_machine")]
pub async fn import_data(
database: &Database,
file: &Path,
format: ImportFormat,
table: Option<&str>,
) -> Result<()> {
println!("Importing data from: {}", file.display());
println!("Format: {format}, Target table: {table:?}");
if !file.exists() {
return Err(anyhow::anyhow!("Import file not found: {}", file.display()));
}
let target_table = match table {
Some(t) => t.to_string(),
None => {
file.file_stem()
.and_then(|stem| stem.to_str())
.map(|s| s.to_string())
.ok_or_else(|| {
anyhow::anyhow!(
"Could not determine target table name. Please specify --table option."
)
})?
}
};
let table_check_query =
format!("SELECT table_name FROM system.tables WHERE table_name = '{target_table}'");
match database.execute(&table_check_query).await {
Ok(result) if result.rows.is_empty() => {
println!(
"⚠️ Warning: Table '{}' not found in system catalog. Assuming it exists or will be created during import.",
target_table
);
}
Ok(_) => {
println!("✓ Target table '{target_table}' found");
}
Err(_) => {
println!(
"⚠️ Warning: Could not verify table existence (system tables may not be implemented). Proceeding with import..."
);
}
}
let table_columns = get_table_columns(database, &target_table).await
.unwrap_or_else(|_| {
println!("⚠️ Warning: Could not retrieve table schema. Import may fail if column types don't match.");
Vec::new()
});
let mut _imported_rows = 0;
let error_count = 0;
match format {
ImportFormat::Csv => {
_imported_rows = import_csv_data(database, file, &target_table, &table_columns).await?;
}
ImportFormat::Json => {
_imported_rows =
import_json_data(database, file, &target_table, &table_columns).await?;
}
ImportFormat::Parquet => {
return Err(anyhow::anyhow!(
"Parquet import not yet implemented. Please convert to CSV or JSON format first."
));
}
}
println!("\n📊 Import Summary:");
println!(" Rows imported: {_imported_rows}");
if error_count > 0 {
println!(" Errors: {error_count}");
}
println!(" ✅ Import completed successfully!");
Ok(())
}
#[cfg(not(feature = "state_machine"))]
pub async fn import_data(
_database: &cqlite_core::Database,
_file: &Path,
_format: crate::cli::ImportFormat,
_table: Option<&str>,
) -> Result<()> {
Err(anyhow::anyhow!(
"Data import is not available in M1.\n\
Build with --features state_machine to enable this feature.\n\
See CLAUDE.md for M1 API examples."
))
}
#[cfg(feature = "state_machine")]
async fn import_csv_data(
database: &Database,
file: &Path,
table: &str,
table_columns: &[String],
) -> Result<u64> {
use csv::ReaderBuilder;
use indicatif::{ProgressBar, ProgressStyle};
let file_handle =
File::open(file).with_context(|| format!("Failed to open CSV file: {}", file.display()))?;
let mut csv_reader = ReaderBuilder::new()
.has_headers(true)
.from_reader(file_handle);
let headers = csv_reader
.headers()
.with_context(|| "Failed to read CSV headers")?;
let csv_columns: Vec<String> = headers.iter().map(|h| h.to_string()).collect();
println!("📋 CSV columns: {}", csv_columns.join(", "));
if !table_columns.is_empty() {
println!("📋 Table columns: {}", table_columns.join(", "));
}
let total_rows = csv_reader.records().count() as u64;
let file_handle = File::open(file)?;
let mut csv_reader = ReaderBuilder::new()
.has_headers(true)
.from_reader(file_handle);
let pb = ProgressBar::new(total_rows);
pb.set_style(
ProgressStyle::default_bar()
.template(
"Importing CSV [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} rows ({eta})",
)
.unwrap()
.progress_chars("=>-"),
);
let mut imported_count = 0;
let mut batch_statements = Vec::new();
let batch_size = 100;
for (row_num, record_result) in csv_reader.records().enumerate() {
pb.set_position(row_num as u64 + 1);
let record = record_result
.with_context(|| format!("Failed to parse CSV record at line {}", row_num + 2))?;
let values: Vec<String> = record
.iter()
.map(|field| {
if field.is_empty() {
"NULL".to_string()
} else {
format!("'{}'", field.replace("'", "''")) }
})
.collect();
let insert_stmt = format!(
"INSERT INTO {} ({}) VALUES ({})",
table,
csv_columns.join(", "),
values.join(", ")
);
batch_statements.push(insert_stmt);
if batch_statements.len() >= batch_size {
execute_batch_statements(database, &mut batch_statements, &mut imported_count).await?;
}
}
if !batch_statements.is_empty() {
execute_batch_statements(database, &mut batch_statements, &mut imported_count).await?;
}
pb.finish_with_message(format!("Imported {imported_count} rows from CSV"));
Ok(imported_count)
}
#[cfg(feature = "state_machine")]
async fn import_json_data(
database: &Database,
file: &Path,
table: &str,
_table_columns: &[String],
) -> Result<u64> {
use indicatif::{ProgressBar, ProgressStyle};
use std::fs;
let file_content = fs::read_to_string(file)
.with_context(|| format!("Failed to read JSON file: {}", file.display()))?;
let json_data: serde_json::Value =
serde_json::from_str(&file_content).with_context(|| "Failed to parse JSON file")?;
let objects = match json_data {
serde_json::Value::Array(arr) => arr,
serde_json::Value::Object(_) => vec![json_data],
_ => {
return Err(anyhow::anyhow!(
"JSON file must contain an object or array of objects"
));
}
};
println!("📋 Found {} JSON objects to import", objects.len());
let pb = ProgressBar::new(objects.len() as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("Importing JSON [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} objects ({eta})")
.unwrap()
.progress_chars("=>-"),
);
let mut imported_count = 0;
let mut batch_statements = Vec::new();
let batch_size = 50;
for (index, obj) in objects.iter().enumerate() {
pb.set_position(index as u64 + 1);
if let serde_json::Value::Object(map) = obj {
let columns: Vec<String> = map.keys().cloned().collect();
let values: Vec<String> = map
.values()
.map(|v| match v {
serde_json::Value::Null => "NULL".to_string(),
serde_json::Value::String(s) => format!("'{}'", s.replace("'", "''")),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::Bool(b) => b.to_string(),
_ => format!("'{}'", v.to_string().replace("'", "''")),
})
.collect();
let insert_stmt = format!(
"INSERT INTO {} ({}) VALUES ({})",
table,
columns.join(", "),
values.join(", ")
);
batch_statements.push(insert_stmt);
if batch_statements.len() >= batch_size {
execute_batch_statements(database, &mut batch_statements, &mut imported_count)
.await?;
}
} else {
println!("⚠️ Skipping non-object JSON element at index {index}");
}
}
if !batch_statements.is_empty() {
execute_batch_statements(database, &mut batch_statements, &mut imported_count).await?;
}
pb.finish_with_message(format!("Imported {imported_count} objects from JSON"));
Ok(imported_count)
}
#[cfg(feature = "state_machine")]
async fn execute_batch_statements(
database: &Database,
statements: &mut Vec<String>,
imported_count: &mut u64,
) -> Result<()> {
for statement in statements.drain(..) {
match database.execute(&statement).await {
Ok(_) => {
*imported_count += 1;
}
Err(e) => {
println!("⚠️ Error executing statement: {e}");
println!(
" Statement: {}",
statement.chars().take(100).collect::<String>() + "..."
);
}
}
}
Ok(())
}
#[cfg(feature = "state_machine")]
async fn get_table_columns(database: &Database, table: &str) -> Result<Vec<String>> {
let query = format!("SELECT column_name FROM system.columns WHERE table_name = '{table}'");
match database.execute(&query).await {
Ok(result) => {
let columns = result
.rows
.iter()
.filter_map(|row| row.get("column_name"))
.map(|col| col.to_string())
.collect();
Ok(columns)
}
Err(e) => Err(anyhow::anyhow!("Failed to get table columns: {}", e)),
}
}
#[cfg(feature = "state_machine")]
pub async fn export_data(
database: &Database,
source: &str,
file: &Path,
format: ExportFormat,
query_filter: Option<&str>,
limit: Option<usize>,
quiet: bool,
) -> Result<()> {
use cqlite_core::query::result::StreamingConfig;
use std::io::IsTerminal;
use std::time::Instant;
use crate::output::{
create_streaming_parquet_writer, StreamingCSVWriter, StreamingJSONWriter, StreamingWriter,
};
use crate::status_metrics::format_bytes;
let show_progress = !quiet && std::io::stdout().is_terminal();
if show_progress {
println!("Exporting data from: {source}");
println!("Output file: {}, Format: {}", file.display(), format);
}
if let Some(parent) = file.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("Failed to create output directory: {}", parent.display()))?;
}
let query = if source.to_uppercase().trim().starts_with("SELECT") {
match limit {
Some(n) => {
let upper = source.to_uppercase();
if upper.contains(" LIMIT ") {
source.to_string()
} else {
format!("{} LIMIT {}", source.trim_end_matches(';'), n)
}
}
None => source.to_string(),
}
} else {
let mut q = format!("SELECT * FROM {}", source);
if let Some(filter) = query_filter {
q.push_str(&format!(" WHERE {}", filter));
}
if let Some(n) = limit {
q.push_str(&format!(" LIMIT {}", n));
}
q
};
if show_progress {
println!(
"Executing query: {}",
query.chars().take(100).collect::<String>() + "..."
);
}
let config = match format {
ExportFormat::Parquet => StreamingConfig::for_parquet(),
_ => StreamingConfig::for_text_formats(),
};
let mut result_iter = database
.execute_streaming(&query, config.clone())
.await
.with_context(|| format!("Failed to execute streaming export query: {query}"))?;
let column_names: Vec<String> = result_iter
.metadata
.columns
.iter()
.map(|c| c.name.clone())
.collect();
if column_names.is_empty() {
return Err(anyhow::anyhow!(
"Could not determine column names for export"
));
}
if show_progress {
println!("Columns: {}", column_names.join(", "));
println!("Streaming export in progress...");
}
let start_time = Instant::now();
let pb = if show_progress {
let pb = ProgressBar::new_spinner();
pb.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.green} {msg} ({pos} rows)")
.unwrap(),
);
pb.set_message("Exporting");
pb
} else {
ProgressBar::hidden()
};
let chunk_size = config.chunk_size;
let mut rows_exported: u64 = 0;
let mut rows_remaining: Option<usize> = limit;
match format {
ExportFormat::Csv => {
let output_file = File::create(file)
.with_context(|| format!("Failed to create CSV file: {}", file.display()))?;
let buf_writer = BufWriter::new(output_file);
let mut writer = StreamingCSVWriter::new(buf_writer);
writer
.write_header(&result_iter.metadata)
.map_err(|e| anyhow::anyhow!("Failed to write CSV header: {}", e))?;
loop {
if rows_remaining == Some(0) {
break;
}
let chunk = result_iter
.collect_chunk(chunk_size)
.await
.map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
if chunk.is_empty() {
break;
}
let chunk_to_write = if let Some(remaining) = rows_remaining {
if chunk.len() > remaining {
chunk.into_iter().take(remaining).collect::<Vec<_>>()
} else {
chunk
}
} else {
chunk
};
let written = chunk_to_write.len();
writer
.write_chunk(&chunk_to_write)
.map_err(|e| anyhow::anyhow!("Failed to write CSV chunk: {}", e))?;
rows_exported += written as u64;
pb.set_position(rows_exported);
if let Some(ref mut remaining) = rows_remaining {
*remaining = remaining.saturating_sub(written);
}
}
writer
.finalize()
.map_err(|e| anyhow::anyhow!("Failed to finalize CSV: {}", e))?;
}
ExportFormat::Json => {
let output_file = File::create(file)
.with_context(|| format!("Failed to create JSON file: {}", file.display()))?;
let buf_writer = BufWriter::new(output_file);
let mut writer = StreamingJSONWriter::new(buf_writer);
writer
.write_header(&result_iter.metadata)
.map_err(|e| anyhow::anyhow!("Failed to write JSON header: {}", e))?;
loop {
if rows_remaining == Some(0) {
break;
}
let chunk = result_iter
.collect_chunk(chunk_size)
.await
.map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
if chunk.is_empty() {
break;
}
let chunk_to_write = if let Some(remaining) = rows_remaining {
if chunk.len() > remaining {
chunk.into_iter().take(remaining).collect::<Vec<_>>()
} else {
chunk
}
} else {
chunk
};
let written = chunk_to_write.len();
writer
.write_chunk(&chunk_to_write)
.map_err(|e| anyhow::anyhow!("Failed to write JSON chunk: {}", e))?;
rows_exported += written as u64;
pb.set_position(rows_exported);
if let Some(ref mut remaining) = rows_remaining {
*remaining = remaining.saturating_sub(written);
}
}
writer
.finalize()
.map_err(|e| anyhow::anyhow!("Failed to finalize JSON: {}", e))?;
}
ExportFormat::Cql => {
let output_file = File::create(file)
.with_context(|| format!("Failed to create CQL file: {}", file.display()))?;
let mut buf_writer = BufWriter::new(output_file);
let table_name = if source.to_uppercase().contains("FROM") {
source
.split_whitespace()
.skip_while(|&word| word.to_uppercase() != "FROM")
.nth(1)
.unwrap_or("exported_table")
} else {
source
};
writeln!(buf_writer, "-- CQL Export from CQLite (streaming)")?;
writeln!(buf_writer, "-- Source: {source}")?;
writeln!(
buf_writer,
"-- Generated: {}",
chrono::Utc::now().to_rfc3339()
)?;
writeln!(buf_writer)?;
loop {
if rows_remaining == Some(0) {
break;
}
let chunk = result_iter
.collect_chunk(chunk_size)
.await
.map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
if chunk.is_empty() {
break;
}
let chunk_to_write: Vec<_> = if let Some(remaining) = rows_remaining {
if chunk.len() > remaining {
chunk.into_iter().take(remaining).collect()
} else {
chunk
}
} else {
chunk
};
for row in &chunk_to_write {
let values: Vec<String> = column_names
.iter()
.map(|col| {
row.values
.get(col)
.map(|v| match v {
cqlite_core::Value::Text(s) => {
format!("'{}'", s.replace("'", "''"))
}
cqlite_core::Value::Null => "NULL".to_string(),
_ => v.to_string(),
})
.unwrap_or_else(|| "NULL".to_string())
})
.collect();
writeln!(
buf_writer,
"INSERT INTO {} ({}) VALUES ({});",
table_name,
column_names.join(", "),
values.join(", ")
)?;
}
let written = chunk_to_write.len();
rows_exported += written as u64;
pb.set_position(rows_exported);
if let Some(ref mut remaining) = rows_remaining {
*remaining = remaining.saturating_sub(written);
}
}
buf_writer.flush()?;
}
ExportFormat::Parquet => {
let output_file = File::create(file)
.with_context(|| format!("Failed to create Parquet file: {}", file.display()))?;
let mut writer =
create_streaming_parquet_writer(output_file, &result_iter.metadata, chunk_size)
.map_err(|e| anyhow::anyhow!("Failed to initialize Parquet writer: {}", e))?;
writer
.write_header(&result_iter.metadata)
.map_err(|e| anyhow::anyhow!("Failed to write Parquet header: {}", e))?;
loop {
if rows_remaining == Some(0) {
break;
}
let chunk = result_iter
.collect_chunk(chunk_size)
.await
.map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
if chunk.is_empty() {
break;
}
let chunk_to_write = if let Some(remaining) = rows_remaining {
if chunk.len() > remaining {
chunk.into_iter().take(remaining).collect::<Vec<_>>()
} else {
chunk
}
} else {
chunk
};
let written = chunk_to_write.len();
writer
.write_chunk(&chunk_to_write)
.map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
rows_exported += written as u64;
pb.set_position(rows_exported);
if let Some(ref mut remaining) = rows_remaining {
*remaining = remaining.saturating_sub(written);
}
}
writer
.finalize()
.map_err(|e| anyhow::anyhow!("Failed to finalize Parquet: {}", e))?;
}
}
pb.finish_and_clear();
if !quiet {
let duration = start_time.elapsed();
let file_size = std::fs::metadata(file)?.len();
println!("\nExport complete:");
println!(" Rows: {}", rows_exported);
println!(" Size: {}", format_bytes(file_size));
println!(" Time: {}", format_export_duration(duration));
let secs_f64 = duration.as_secs_f64();
if secs_f64 > 0.0 {
let rate = rows_exported as f64 / secs_f64;
if rate.is_finite() {
println!(" Rate: {:.0} rows/sec", rate);
}
}
}
Ok(())
}
#[cfg(not(feature = "state_machine"))]
pub async fn export_data(
_database: &cqlite_core::Database,
_source: &str,
_file: &Path,
_format: crate::cli::ExportFormat,
_query_filter: Option<&str>,
_limit: Option<usize>,
_quiet: bool,
) -> Result<()> {
Err(anyhow::anyhow!(
"Data export is not available in M1.\n\
Build with --features state_machine to enable this feature.\n\
See CLAUDE.md for M1 API examples."
))
}
#[cfg(feature = "state_machine")]
async fn export_to_csv(
result: &cqlite_core::query::result::QueryResult,
file: &Path,
_column_names: &[String],
pb: &ProgressBar,
) -> Result<()> {
use crate::output::{StreamingCSVWriter, StreamingWriter};
const CHUNK_SIZE: usize = 5_000;
let output_file = File::create(file)
.with_context(|| format!("Failed to create CSV file: {}", file.display()))?;
let buf_writer = BufWriter::new(output_file);
let mut writer = StreamingCSVWriter::new(buf_writer);
writer
.write_header(&result.metadata)
.map_err(|e| anyhow::anyhow!("Failed to write CSV header: {}", e))?;
for chunk in result.rows.chunks(CHUNK_SIZE) {
writer
.write_chunk(chunk)
.map_err(|e| anyhow::anyhow!("Failed to write CSV chunk: {}", e))?;
pb.inc(chunk.len() as u64);
}
writer
.finalize()
.map_err(|e| anyhow::anyhow!("Failed to finalize CSV file: {}", e))?;
Ok(())
}
#[cfg(feature = "state_machine")]
async fn export_to_json(
result: &cqlite_core::query::result::QueryResult,
file: &Path,
_column_names: &[String],
pb: &ProgressBar,
) -> Result<()> {
use crate::output::{StreamingJSONWriter, StreamingWriter};
const CHUNK_SIZE: usize = 5_000;
let output_file = File::create(file)
.with_context(|| format!("Failed to create JSON file: {}", file.display()))?;
let buf_writer = BufWriter::new(output_file);
let mut writer = StreamingJSONWriter::new(buf_writer);
writer
.write_header(&result.metadata)
.map_err(|e| anyhow::anyhow!("Failed to write JSON header: {}", e))?;
for chunk in result.rows.chunks(CHUNK_SIZE) {
writer
.write_chunk(chunk)
.map_err(|e| anyhow::anyhow!("Failed to write JSON chunk: {}", e))?;
pb.inc(chunk.len() as u64);
}
writer
.finalize()
.map_err(|e| anyhow::anyhow!("Failed to finalize JSON file: {}", e))?;
Ok(())
}
#[cfg(feature = "state_machine")]
async fn export_to_cql(
result: &cqlite_core::query::result::QueryResult,
file: &Path,
source: &str,
column_names: &[String],
pb: &ProgressBar,
) -> Result<()> {
let output_file = File::create(file)
.with_context(|| format!("Failed to create CQL file: {}", file.display()))?;
let mut writer = BufWriter::new(output_file);
let table_name = if source.to_uppercase().contains("FROM") {
source
.split_whitespace()
.skip_while(|&word| word.to_uppercase() != "FROM")
.nth(1)
.unwrap_or("exported_table")
} else {
source
};
writeln!(writer, "-- CQL Export from CQLite")?;
writeln!(writer, "-- Source: {source}")?;
writeln!(writer, "-- Generated: {}", chrono::Utc::now().to_rfc3339())?;
writeln!(writer, "-- Rows: {}", result.rows.len())?;
writeln!(writer)?;
for (index, row) in result.rows.iter().enumerate() {
pb.set_position(index as u64 + 1);
let values: Vec<String> = column_names
.iter()
.map(|col| {
row.get(col)
.map(|v| match v {
cqlite_core::Value::Text(s) => format!("'{}'", s.replace("'", "''")),
cqlite_core::Value::Null => "NULL".to_string(),
_ => v.to_string(),
})
.unwrap_or_else(|| "NULL".to_string())
})
.collect();
writeln!(
writer,
"INSERT INTO {} ({}) VALUES ({});",
table_name,
column_names.join(", "),
values.join(", ")
)?;
}
writer
.flush()
.with_context(|| "Failed to flush CQL writer")?;
Ok(())
}
#[cfg(feature = "state_machine")]
async fn export_to_parquet(
result: &cqlite_core::query::result::QueryResult,
file: &Path,
_column_names: &[String],
pb: &ProgressBar,
) -> Result<()> {
use crate::output::{create_streaming_parquet_writer, StreamingWriter};
const CHUNK_SIZE: usize = 10_000;
pb.set_message("Initializing Parquet writer...");
let output_file = File::create(file)
.with_context(|| format!("Failed to create Parquet file: {}", file.display()))?;
let mut writer = create_streaming_parquet_writer(output_file, &result.metadata, CHUNK_SIZE)
.map_err(|e| anyhow::anyhow!("Failed to initialize Parquet writer: {}", e))?;
writer
.write_header(&result.metadata)
.map_err(|e| anyhow::anyhow!("Failed to write Parquet header: {}", e))?;
pb.set_message("Streaming rows to Parquet...");
for chunk in result.rows.chunks(CHUNK_SIZE) {
writer
.write_chunk(chunk)
.map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
pb.inc(chunk.len() as u64);
}
pb.set_message("Finalizing Parquet file...");
writer
.finalize()
.map_err(|e| anyhow::anyhow!("Failed to finalize Parquet file: {}", e))?;
Ok(())
}
pub async fn read_sstable(
sstable_path: &Path,
schema_path: &Path,
limit: Option<usize>,
skip: Option<usize>,
_generation: Option<u32>,
format: OutputFormat,
auto_detect: bool,
cassandra_version: Option<String>,
) -> Result<()> {
let schema = load_schema_file(schema_path, auto_detect, cassandra_version.as_deref())?;
println!("🔍 Reading SSTable with REAL data parsing (no mocking!)");
println!("📂 SSTable: {}", sstable_path.display());
println!("📋 Schema: {}", schema_path.display());
let actual_sstable_path = resolve_sstable_path(sstable_path)?;
println!("📄 Data file: {}", actual_sstable_path.display());
println!("🚀 Using Bulletproof SSTable Reader (supports all Cassandra versions)");
let mut bulletproof_reader =
BulletproofReader::open(&actual_sstable_path).with_context(|| {
format!(
"Failed to open SSTable with bulletproof reader: {}",
actual_sstable_path.display()
)
})?;
let info = bulletproof_reader.info();
println!(
"📋 Detected format: {:?} (generation {}, size {})",
info.format,
info.generation_numeric().unwrap_or(0),
info.size
);
if let Some(compression_info) = bulletproof_reader.compression_info() {
println!(
"📦 Compression: {} ({} byte chunks)",
compression_info.algorithm, compression_info.chunk_length
);
}
match bulletproof_reader.parse_sstable_data() {
Ok(bulletproof_entries) => {
println!(
"✅ Successfully parsed {} entries with bulletproof reader",
bulletproof_entries.len()
);
let mut processed = 0;
let mut displayed = 0;
let skip_count = skip.unwrap_or(0);
let limit_count = limit.unwrap_or(bulletproof_entries.len());
let mut parsed_rows = Vec::new();
let parser = RealDataParser::new(schema.clone());
for entry in bulletproof_entries {
if processed < skip_count {
processed += 1;
continue;
}
if displayed >= limit_count {
break;
}
let key = entry.key.clone();
let value =
cqlite_core::Value::Text(format!("{:?}|{}", entry.key, entry.format_info));
match parser.parse_entry(&key, &value) {
Ok(parsed_row) => {
parsed_rows.push(parsed_row);
displayed += 1;
}
Err(e) => {
eprintln!("⚠️ Failed to parse row {}: {}", processed + 1, e);
println!(
"📄 Raw bulletproof data: key='{:?}', info='{}'",
entry.key, entry.format_info
);
}
}
processed += 1;
}
match format {
OutputFormat::Table => {
display_table_format(&parser.get_column_names(), &parsed_rows)
}
OutputFormat::Json => display_json_format(&parsed_rows)?,
OutputFormat::Csv => display_csv_format(&parser.get_column_names(), &parsed_rows)?,
OutputFormat::Parquet => {
return Err(anyhow::anyhow!("Parquet format is not supported for this command. Use --out json or --out csv instead."));
}
}
println!(
"\n✅ Bulletproof reader processed {processed} entries, displayed {displayed} rows"
);
return Ok(());
}
Err(e) => {
println!("⚠️ Bulletproof parser still in development: {e}");
println!("🔄 Falling back to raw data display...");
match bulletproof_reader.read_raw_data(0, 1024) {
Ok(data) => {
println!("\n📊 Raw SSTable data (first 1024 bytes):");
for (i, chunk) in data.chunks(16).enumerate() {
print!(" {:04x}: ", i * 16);
for byte in chunk {
print!("{byte:02x} ");
}
print!(" ");
for byte in chunk {
let c = if byte.is_ascii_graphic() || *byte == b' ' {
*byte as char
} else {
'.'
};
print!("{c}");
}
println!();
}
println!(
"\n🎯 This shows the bulletproof reader successfully decompressed the data!"
);
println!(
"💡 The parsing layer is still being implemented for your specific format."
);
return Ok(());
}
Err(e) => {
println!("❌ Bulletproof reader failed to read raw data: {e}");
}
}
}
}
println!("🔄 Falling back to legacy SSTable reader...");
let config = cqlite_core::Config::default();
let platform = Arc::new(cqlite_core::platform::Platform::new(&config).await?);
let reader = SSTableReader::open(&actual_sstable_path, &config, platform)
.await
.with_context(|| format!("Failed to open SSTable: {}", actual_sstable_path.display()))?;
let parser = RealDataParser::new(schema.clone());
let entries = reader.get_all_entries().await?;
let mut processed = 0;
let mut displayed = 0;
let skip_count = skip.unwrap_or(0);
let limit_count = limit.unwrap_or(entries.len());
println!("📊 Found {} entries in SSTable", entries.len());
let mut parsed_rows = Vec::new();
for (_table_id, key, value) in entries {
if processed < skip_count {
processed += 1;
continue;
}
if displayed >= limit_count {
break;
}
match parser.parse_entry(&key, &value) {
Ok(parsed_row) => {
parsed_rows.push(parsed_row);
displayed += 1;
}
Err(e) => {
eprintln!("⚠️ Failed to parse row {}: {}", processed + 1, e);
}
}
processed += 1;
}
match format {
OutputFormat::Table => display_table_format(&parser.get_column_names(), &parsed_rows),
OutputFormat::Json => display_json_format(&parsed_rows)?,
OutputFormat::Csv => display_csv_format(&parser.get_column_names(), &parsed_rows)?,
OutputFormat::Parquet => {
return Err(anyhow::anyhow!("Parquet format is not supported for this command. Use --out json or --out csv instead."));
}
}
println!("\n✅ Processed {processed} entries, displayed {displayed} rows");
println!("🎯 Data source: LIVE SSTable file (no mocking!)");
Ok(())
}
pub async fn execute_select_query(
sstable_path: &Path,
schema_path: &Path,
query: &str,
format: OutputFormat,
auto_detect: bool,
cassandra_version: Option<String>,
) -> Result<()> {
let _schema = load_schema_file(schema_path, auto_detect, cassandra_version.as_deref())?;
println!("🚀 Executing CQL query against LIVE SSTable data!");
println!("📂 SSTable: {}", sstable_path.display());
println!("📋 Schema: {}", schema_path.display());
println!("🔍 Query: {query}");
let actual_sstable_path = resolve_sstable_path(sstable_path)?;
println!("📄 Data file: {}", actual_sstable_path.display());
let executor = QueryExecutor::new(QueryExecutorConfig);
let result = executor.execute_select(query).await?;
match format {
OutputFormat::Table => result.display_table(),
OutputFormat::Json => result.display_json()?,
OutputFormat::Csv => result.display_csv()?,
OutputFormat::Parquet => {
return Err(anyhow::anyhow!("Parquet format is not supported for this command. Use --out json or --out csv instead."));
}
}
Ok(())
}
fn resolve_sstable_path(sstable_path: &Path) -> Result<PathBuf> {
if sstable_path.is_file() {
return Ok(sstable_path.to_path_buf());
}
if sstable_path.is_dir() {
println!("📁 Directory detected, looking for SSTable files...");
let patterns = ["*-Data.db", "*-big-Data.db", "nb-*-big-Data.db"];
for pattern in &patterns {
if let Ok(entries) = std::fs::read_dir(sstable_path) {
for entry in entries.flatten() {
let file_name = entry.file_name();
let file_name_str = file_name.to_string_lossy();
if pattern.contains("*") {
let pattern_parts: Vec<&str> = pattern.split('*').collect();
if pattern_parts.len() == 2 {
let starts_with = pattern_parts[0];
let ends_with = pattern_parts[1];
if file_name_str.starts_with(starts_with)
&& file_name_str.ends_with(ends_with)
{
let data_file = entry.path();
println!("✓ Found SSTable data file: {}", data_file.display());
return Ok(data_file);
}
} else if pattern_parts.len() == 3 {
let starts_with = pattern_parts[0];
let middle = pattern_parts[1];
let ends_with = pattern_parts[2];
if file_name_str.starts_with(starts_with)
&& file_name_str.contains(middle)
&& file_name_str.ends_with(ends_with)
{
let data_file = entry.path();
println!("✓ Found SSTable data file: {}", data_file.display());
return Ok(data_file);
}
}
}
}
}
}
return Err(anyhow::anyhow!(
"No SSTable data files found in directory: {}\nLooked for: {}",
sstable_path.display(),
patterns.join(", ")
));
}
Err(anyhow::anyhow!(
"Path is neither a file nor a directory: {}",
sstable_path.display()
))
}
fn load_schema_file(
schema_path: &Path,
_auto_detect: bool,
_cassandra_version: Option<&str>,
) -> Result<TableSchema> {
let schema_content = std::fs::read_to_string(schema_path)
.with_context(|| format!("Failed to read schema file: {}", schema_path.display()))?;
println!("📋 Loading schema from: {}", schema_path.display());
let extension = schema_path
.extension()
.and_then(|s| s.to_str())
.unwrap_or("");
match extension.to_lowercase().as_str() {
"json" => {
println!("📝 Parsing JSON schema format");
let json_schema: serde_json::Value = serde_json::from_str(&schema_content)
.with_context(|| "Failed to parse JSON schema")?;
parse_json_schema(&json_schema)
}
"cql" | "sql" | "" => {
println!("📝 Parsing CQL schema format");
parse_cql_schema(&schema_content).with_context(|| "Failed to parse CQL schema")
}
_ => Err(anyhow::anyhow!(
"Unsupported schema file extension: .{}\nSupported formats: .json, .cql",
extension
)),
}
}
fn parse_json_schema(json: &serde_json::Value) -> Result<TableSchema> {
let keyspace = json["keyspace"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing keyspace in schema"))?;
let table = json["table"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing table in schema"))?;
let columns = json["columns"]
.as_object()
.ok_or_else(|| anyhow::anyhow!("Missing columns in schema"))?;
let mut schema_columns = Vec::new();
let mut partition_keys = Vec::new();
let mut clustering_columns = Vec::new();
for (col_name, col_info) in columns {
let col_obj = col_info
.as_object()
.ok_or_else(|| anyhow::anyhow!("Invalid column definition for {}", col_name))?;
let col_type = col_obj["type"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing type for column {}", col_name))?;
let col_kind = col_obj["kind"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing kind for column {}", col_name))?;
let column = Column {
name: col_name.clone(),
data_type: col_type.to_string(),
nullable: true, default: None, is_static: false, };
match col_kind {
"PartitionKey" => {
partition_keys.push(KeyColumn {
name: col_name.clone(),
position: partition_keys.len(),
data_type: col_type.to_string(),
});
}
"ClusteringColumn" => {
clustering_columns.push(ClusteringColumn {
name: col_name.clone(),
position: clustering_columns.len(),
data_type: col_type.to_string(),
order: ClusteringOrder::Asc,
});
}
"Regular" => {
}
_ => return Err(anyhow::anyhow!("Unknown column kind: {}", col_kind)),
}
schema_columns.push(column);
}
Ok(TableSchema {
keyspace: keyspace.to_string(),
table: table.to_string(),
columns: schema_columns,
partition_keys,
clustering_keys: clustering_columns,
comments: HashMap::new(),
})
}
fn display_table_format(column_names: &[String], rows: &[ParsedRow]) {
use prettytable::{Cell, Row, Table};
if rows.is_empty() {
println!("📭 No results found");
return;
}
let mut table = Table::new();
let mut header = Row::empty();
for column in column_names {
header.add_cell(Cell::new(column));
}
table.add_row(header);
for parsed_row in rows {
let mut row = Row::empty();
for column in column_names {
let cell_value = parsed_row
.get(column)
.map(|v| v.to_string())
.unwrap_or_else(|| "NULL".to_string());
row.add_cell(Cell::new(&cell_value));
}
table.add_row(row);
}
println!("\n📊 Live SSTable Data Results:");
println!("{}", "=".repeat(50));
table.printstd();
}
fn display_json_format(rows: &[ParsedRow]) -> Result<()> {
let json_rows: Vec<serde_json::Value> = rows.iter().map(|row| row.to_json()).collect();
println!("{}", serde_json::to_string_pretty(&json_rows)?);
Ok(())
}
fn display_csv_format(column_names: &[String], rows: &[ParsedRow]) -> Result<()> {
let mut wtr = csv::Writer::from_writer(std::io::stdout());
wtr.write_record(column_names)?;
for parsed_row in rows {
let mut record = Vec::new();
for column in column_names {
let cell_value = parsed_row
.get(column)
.map(|v| v.to_string())
.unwrap_or_else(|| "NULL".to_string());
record.push(cell_value);
}
wtr.write_record(&record)?;
}
wtr.flush()?;
Ok(())
}
#[cfg(feature = "state_machine")]
pub async fn export_sstable(
sstable_path: &Path,
schema_path: &Path,
output_path: &Path,
format: ExportFormat,
) -> Result<()> {
let schema = load_schema_file(schema_path, false, None)?;
let config = cqlite_core::Config::default();
let platform = Arc::new(cqlite_core::platform::Platform::new(&config).await?);
let reader = SSTableReader::open(sstable_path, &config, platform)
.await
.with_context(|| format!("Failed to open SSTable: {}", sstable_path.display()))?;
let mut output_file = File::create(output_path)
.with_context(|| format!("Failed to create output file: {}", output_path.display()))?;
println!("Exporting SSTable: {}", sstable_path.display());
println!("Output: {} ({})", output_path.display(), format);
let pb = ProgressBar::new_spinner();
pb.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.green} [{elapsed_precise}] {pos} rows exported")
.unwrap(),
);
match format {
ExportFormat::Json => export_as_json(&reader, &schema, &mut output_file, &pb).await,
ExportFormat::Csv => export_as_csv(&reader, &schema, &mut output_file, &pb).await,
ExportFormat::Parquet => {
drop(output_file);
export_as_parquet(&reader, &schema, output_path, &pb).await
}
ExportFormat::Cql => export_as_cql(&reader, &schema, &mut output_file, &pb).await,
}
}
#[cfg(feature = "state_machine")]
async fn export_as_json(
reader: &SSTableReader,
schema: &TableSchema,
output_file: &mut File,
pb: &ProgressBar,
) -> Result<()> {
use std::io::Write;
let parser = RealDataParser::new(schema.clone());
let entries = reader.get_all_entries().await?;
let mut json_objects = Vec::new();
for (index, (_table_id, key, value)) in entries.iter().enumerate() {
pb.set_position(index as u64);
match parser.parse_entry(key, value) {
Ok(parsed_row) => {
json_objects.push(parsed_row.to_json());
}
Err(e) => {
eprintln!("⚠️ Failed to parse row {}: {}", index + 1, e);
}
}
}
let json_output = serde_json::to_string_pretty(&json_objects)?;
output_file.write_all(json_output.as_bytes())?;
pb.finish_with_message(format!("Exported {} rows to JSON", json_objects.len()));
Ok(())
}
#[cfg(feature = "state_machine")]
async fn export_as_csv(
reader: &SSTableReader,
schema: &TableSchema,
output_file: &mut File,
pb: &ProgressBar,
) -> Result<()> {
let parser = RealDataParser::new(schema.clone());
let entries = reader.get_all_entries().await?;
let mut wtr = csv::Writer::from_writer(output_file);
let column_names = parser.get_column_names();
wtr.write_record(&column_names)?;
let mut exported_count = 0;
for (index, (_table_id, key, value)) in entries.iter().enumerate() {
pb.set_position(index as u64);
match parser.parse_entry(key, value) {
Ok(parsed_row) => {
let mut record = Vec::new();
for column in &column_names {
let cell_value = parsed_row
.get(column)
.map(|v| v.to_string())
.unwrap_or_else(|| "NULL".to_string());
record.push(cell_value);
}
wtr.write_record(&record)?;
exported_count += 1;
}
Err(e) => {
eprintln!("⚠️ Failed to parse row {}: {}", index + 1, e);
}
}
}
wtr.flush()?;
pb.finish_with_message(format!("Exported {exported_count} rows to CSV"));
Ok(())
}
#[cfg(feature = "state_machine")]
async fn export_as_cql(
reader: &SSTableReader,
schema: &TableSchema,
output_file: &mut File,
pb: &ProgressBar,
) -> Result<()> {
use std::io::Write;
let parser = RealDataParser::new(schema.clone());
let entries = reader.get_all_entries().await?;
let column_names = parser.get_column_names();
writeln!(output_file, "-- CQL Export from CQLite")?;
writeln!(
output_file,
"-- Table: {}.{}",
schema.keyspace, schema.table
)?;
writeln!(
output_file,
"-- Generated: {}",
chrono::Utc::now().to_rfc3339()
)?;
writeln!(output_file)?;
let mut exported_count = 0;
for (index, (_table_id, key, value)) in entries.iter().enumerate() {
pb.set_position(index as u64);
match parser.parse_entry(key, value) {
Ok(parsed_row) => {
let values: Vec<String> = column_names
.iter()
.map(|col| {
parsed_row
.get(col)
.map(|_v| "NULL".to_string())
.unwrap_or_else(|| "NULL".to_string())
})
.collect();
writeln!(
output_file,
"INSERT INTO {}.{} ({}) VALUES ({});",
schema.keyspace,
schema.table,
column_names.join(", "),
values.join(", ")
)?;
exported_count += 1;
}
Err(e) => {
eprintln!("⚠️ Failed to parse row {}: {}", index + 1, e);
}
}
}
pb.finish_with_message(format!("Exported {exported_count} rows to CQL"));
Ok(())
}
#[cfg(feature = "state_machine")]
async fn export_as_parquet(
reader: &SSTableReader,
schema: &TableSchema,
output_path: &Path,
pb: &ProgressBar,
) -> Result<()> {
use crate::output::parquet::create_streaming_parquet_writer;
use crate::output::StreamingWriter;
let entries = reader.get_all_entries().await?;
if entries.is_empty() {
pb.finish_with_message("No data to export");
let output_file = File::create(output_path)
.with_context(|| format!("Failed to create output file: {}", output_path.display()))?;
let metadata = build_query_metadata_from_schema(schema);
let mut writer = create_streaming_parquet_writer(output_file, &metadata, 10_000)
.map_err(|e| anyhow::anyhow!("Failed to create Parquet writer: {}", e))?;
writer
.finalize()
.map_err(|e| anyhow::anyhow!("Failed to finalize Parquet: {}", e))?;
return Ok(());
}
let metadata = build_query_metadata_from_schema(schema);
let output_file = File::create(output_path)
.with_context(|| format!("Failed to create output file: {}", output_path.display()))?;
let mut writer = create_streaming_parquet_writer(output_file, &metadata, 10_000)
.map_err(|e| anyhow::anyhow!("Failed to create Parquet writer: {}", e))?;
let mut chunk = Vec::with_capacity(1000);
let mut exported_count = 0;
for (index, (_table_id, row_key, value)) in entries.iter().enumerate() {
pb.set_position(index as u64);
let query_row = convert_entry_to_query_row(row_key, value, schema);
chunk.push(query_row);
if chunk.len() >= 1000 {
writer
.write_chunk(&chunk)
.map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
exported_count += chunk.len();
chunk.clear();
}
}
if !chunk.is_empty() {
writer
.write_chunk(&chunk)
.map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
exported_count += chunk.len();
}
writer
.finalize()
.map_err(|e| anyhow::anyhow!("Failed to finalize Parquet: {}", e))?;
pb.finish_with_message(format!("Exported {} rows to Parquet", exported_count));
Ok(())
}
#[cfg(feature = "state_machine")]
fn build_query_metadata_from_schema(schema: &TableSchema) -> cqlite_core::query::QueryMetadata {
use cqlite_core::query::{ColumnInfo, QueryMetadata};
let mut columns = Vec::new();
let mut position = 0;
for pk in &schema.partition_keys {
columns.push(ColumnInfo {
name: pk.name.clone(),
data_type: parse_cql_type_string(&pk.data_type),
nullable: true,
position,
table_name: Some(format!("{}.{}", schema.keyspace, schema.table)),
cql_type: None,
});
position += 1;
}
for ck in &schema.clustering_keys {
columns.push(ColumnInfo {
name: ck.name.clone(),
data_type: parse_cql_type_string(&ck.data_type),
nullable: true,
position,
table_name: Some(format!("{}.{}", schema.keyspace, schema.table)),
cql_type: None,
});
position += 1;
}
for col in &schema.columns {
columns.push(ColumnInfo {
name: col.name.clone(),
data_type: parse_cql_type_string(&col.data_type),
nullable: true,
position,
table_name: Some(format!("{}.{}", schema.keyspace, schema.table)),
cql_type: None,
});
position += 1;
}
QueryMetadata {
columns,
..Default::default()
}
}
#[cfg(feature = "state_machine")]
fn parse_cql_type_string(type_str: &str) -> cqlite_core::types::DataType {
use cqlite_core::types::DataType;
match type_str.to_lowercase().as_str() {
"text" | "varchar" | "ascii" => DataType::Text,
"int" | "integer" => DataType::Integer,
"bigint" => DataType::BigInt,
"smallint" => DataType::SmallInt,
"tinyint" => DataType::TinyInt,
"float" => DataType::Float32,
"double" => DataType::Float,
"boolean" => DataType::Boolean,
"timestamp" => DataType::Timestamp,
"date" => DataType::Timestamp, "time" => DataType::BigInt, "uuid" | "timeuuid" => DataType::Uuid,
"blob" => DataType::Blob,
"counter" => DataType::BigInt, "varint" => DataType::Blob, "decimal" => DataType::Text, s if s.starts_with("list") => DataType::List,
s if s.starts_with("set") => DataType::Set,
s if s.starts_with("map") => DataType::Map,
s if s.starts_with("frozen") => DataType::Frozen,
s if s.starts_with("tuple") => DataType::Tuple,
_ => DataType::Text, }
}
#[cfg(feature = "state_machine")]
fn convert_entry_to_query_row(
row_key: &cqlite_core::RowKey,
value: &cqlite_core::Value,
schema: &TableSchema,
) -> cqlite_core::query::QueryRow {
use cqlite_core::query::{QueryRow, RowMetadata};
use cqlite_core::Value;
use std::collections::HashMap;
let mut values: HashMap<String, Value> = HashMap::new();
match value {
Value::Map(pairs) => {
for (k, v) in pairs {
if let Value::Text(col_name) = k {
values.insert(col_name.clone(), v.clone());
}
}
}
Value::Blob(data) => {
if let Some(first_col) = schema.columns.first() {
values.insert(first_col.name.clone(), Value::Blob(data.clone()));
}
}
Value::Text(s) => {
if let Some(first_col) = schema.columns.first() {
values.insert(first_col.name.clone(), Value::Text(s.clone()));
}
}
other => {
if let Some(first_col) = schema.columns.first() {
values.insert(first_col.name.clone(), other.clone());
}
}
}
for pk in &schema.partition_keys {
values.entry(pk.name.clone()).or_insert(Value::Null);
}
for ck in &schema.clustering_keys {
values.entry(ck.name.clone()).or_insert(Value::Null);
}
for col in &schema.columns {
values.entry(col.name.clone()).or_insert(Value::Null);
}
QueryRow {
values,
key: row_key.clone(),
metadata: RowMetadata::default(),
}
}
pub async fn read_sstable_enhanced(
sstable_path: &Path,
schema_path: &Path,
limit: Option<usize>,
skip: Option<usize>,
generation: Option<u32>,
format: OutputFormat,
auto_detect: bool,
cassandra_version: Option<String>,
interactive: bool,
progress: bool,
export: Option<PathBuf>,
) -> Result<()> {
println!("🚀 Enhanced SSTable Reader");
println!("📂 SSTable: {}", sstable_path.display());
println!("📋 Schema: {}", schema_path.display());
if interactive {
println!("🔍 Interactive mode enabled - use Ctrl+C to exit");
}
if progress {
println!("📊 Progress tracking enabled");
}
if let Some(ref export_path) = export {
println!("📤 Export enabled to: {}", export_path.display());
}
let result = read_sstable(
sstable_path,
schema_path,
limit,
skip,
generation,
format,
auto_detect,
cassandra_version,
)
.await;
if interactive {
println!("\n🔍 Interactive mode features coming soon!");
println!(" - Filter data interactively");
println!(" - Navigate through pages");
println!(" - Query-like interface");
}
if let Some(export_path) = export {
println!("\n📤 Export functionality coming soon!");
println!(" Target: {}", export_path.display());
println!(" Formats: JSON, CSV, Parquet");
}
result
}
pub async fn validate_sstable(
sstable_path: &Path,
schema_path: Option<&Path>,
deep: bool,
fix: bool,
report_path: Option<&Path>,
) -> Result<()> {
println!("🔍 SSTable Validation");
println!("📂 SSTable: {}", sstable_path.display());
if let Some(schema) = schema_path {
println!("📋 Schema: {}", schema.display());
}
if deep {
println!("🔬 Deep validation enabled (thorough but slower)");
}
if fix {
println!("🔧 Auto-fix enabled for recoverable issues");
}
if let Some(report) = report_path {
println!("📋 Report will be saved to: {}", report.display());
}
let actual_sstable_path = resolve_sstable_path(sstable_path)?;
println!("📄 Data file: {}", actual_sstable_path.display());
let mut issues_found = 0;
let issues_fixed = 0;
let mut validation_errors = Vec::new();
println!("\n🔍 Basic file validation:");
if !actual_sstable_path.exists() {
let error = "❌ SSTable file does not exist";
println!("{error}");
validation_errors.push(error.to_string());
issues_found += 1;
} else {
println!("✅ SSTable file exists");
match std::fs::metadata(&actual_sstable_path) {
Ok(metadata) => {
println!("✅ File readable (size: {} bytes)", metadata.len());
if metadata.len() == 0 {
let error = "⚠️ Warning: SSTable file is empty";
println!("{error}");
validation_errors.push(error.to_string());
issues_found += 1;
}
}
Err(e) => {
let error = format!("❌ Cannot read file metadata: {e}");
println!("{error}");
validation_errors.push(error);
issues_found += 1;
}
}
}
println!("\n🔍 Format validation:");
match BulletproofReader::open(&actual_sstable_path) {
Ok(mut reader) => {
println!("✅ SSTable format is readable");
let info = reader.info();
println!(" Format: {:?}", info.format);
println!(" Generation: {}", info.generation_numeric().unwrap_or(0));
println!(" Size: {} bytes", info.size);
if let Some(compression) = reader.compression_info() {
println!(
" Compression: {} (chunk size: {})",
compression.algorithm, compression.chunk_length
);
}
if deep {
println!("\n🔬 Deep validation:");
match reader.parse_sstable_data() {
Ok(entries) => {
println!("✅ Successfully parsed {} entries", entries.len());
if let Some(schema_path) = schema_path {
match load_schema_file(schema_path, true, None) {
Ok(schema) => {
println!("✅ Schema loaded successfully");
let parser = RealDataParser::new(schema);
let mut parsing_errors = 0;
for entry in entries.iter() {
let key = entry.key.clone();
let value =
cqlite_core::Value::Text(format!("{:?}", entry.key));
if parser.parse_entry(&key, &value).is_err() {
parsing_errors += 1;
}
}
if parsing_errors > 0 {
let error = format!(
"⚠️ {parsing_errors} entries failed schema validation"
);
println!("{error}");
validation_errors.push(error);
issues_found += parsing_errors;
} else {
println!("✅ All entries match schema");
}
}
Err(e) => {
let error =
format!("⚠️ Could not load schema for validation: {e}");
println!("{error}");
validation_errors.push(error);
}
}
}
}
Err(e) => {
let error = format!("❌ Failed to parse SSTable data: {e}");
println!("{error}");
validation_errors.push(error);
issues_found += 1;
}
}
}
}
Err(e) => {
let error = format!("❌ Cannot open SSTable with bulletproof reader: {e}");
println!("{error}");
validation_errors.push(error);
issues_found += 1;
}
}
if let Some(report_path) = report_path {
let mut report_content = format!(
"# SSTable Validation Report\n\n\
**File:** {}\n\
**Validation Time:** {}\n\
**Deep Validation:** {}\n\
**Auto-fix Enabled:** {}\n\n\
## Summary\n\
- Issues Found: {}\n\
- Issues Fixed: {}\n\n\
## Details\n",
sstable_path.display(),
chrono::Utc::now().to_rfc3339(),
deep,
fix,
issues_found,
issues_fixed
);
for error in &validation_errors {
report_content.push_str(&format!("- {error}\n"));
}
std::fs::write(report_path, report_content)
.with_context(|| format!("Failed to write report to {}", report_path.display()))?;
println!("\n📋 Validation report saved to: {}", report_path.display());
}
println!("\n📊 Validation Summary:");
println!(" Issues found: {issues_found}");
println!(" Issues fixed: {issues_fixed}");
if issues_found == 0 {
println!("✅ SSTable validation passed!");
} else if fix && issues_fixed == issues_found {
println!("🔧 All issues fixed!");
} else {
println!("⚠️ {} issues remain", issues_found - issues_fixed);
}
Ok(())
}
pub async fn analyze_sstable(
sstable_path: &Path,
schema_path: Option<&Path>,
detailed: bool,
infer_schema: bool,
report_path: Option<&Path>,
) -> Result<()> {
println!("📊 SSTable Analysis");
println!("📂 SSTable: {}", sstable_path.display());
if let Some(schema) = schema_path {
println!("📋 Schema: {}", schema.display());
}
if detailed {
println!("🔍 Detailed analysis enabled");
}
if infer_schema {
println!("🧠 Schema inference enabled");
}
if let Some(report) = report_path {
println!("📋 Report will be saved to: {}", report.display());
}
let actual_sstable_path = resolve_sstable_path(sstable_path)?;
println!("📄 Data file: {}", actual_sstable_path.display());
let mut analysis_results = Vec::new();
println!("\n📁 File Analysis:");
match std::fs::metadata(&actual_sstable_path) {
Ok(metadata) => {
let file_size = metadata.len();
println!(
" File size: {} bytes ({:.2} MB)",
file_size,
file_size as f64 / 1_048_576.0
);
analysis_results.push(format!("File size: {file_size} bytes"));
if let Ok(created) = metadata.created() {
println!(" Created: {created:?}");
}
if let Ok(modified) = metadata.modified() {
println!(" Modified: {modified:?}");
}
}
Err(e) => {
println!("❌ Cannot read file metadata: {e}");
return Err(anyhow::anyhow!("File metadata not accessible"));
}
}
println!("\n🔍 Format Analysis:");
match BulletproofReader::open(&actual_sstable_path) {
Ok(mut reader) => {
let info = reader.info();
println!(" Format: {:?}", info.format);
println!(" Generation: {}", info.generation_numeric().unwrap_or(0));
println!(" Size: {} bytes", info.size);
analysis_results.push(format!("Format: {:?}", info.format));
analysis_results.push(format!(
"Generation: {}",
info.generation_numeric().unwrap_or(0)
));
if let Some(compression) = reader.compression_info() {
println!(" Compression: {}", compression.algorithm);
println!(" Chunk length: {} bytes", compression.chunk_length);
analysis_results.push(format!("Compression: {}", compression.algorithm));
} else {
println!(" Compression: None");
analysis_results.push("Compression: None".to_string());
}
println!("\n📊 Data Analysis:");
match reader.parse_sstable_data() {
Ok(entries) => {
let entry_count = entries.len();
println!(" Total entries: {entry_count}");
analysis_results.push(format!("Total entries: {entry_count}"));
if entry_count > 0 {
let total_key_size: usize =
entries.iter().map(|e| format!("{:?}", e.key).len()).sum();
let avg_key_size = total_key_size / entry_count;
println!(" Average key size: {avg_key_size} bytes");
analysis_results.push(format!("Average key size: {avg_key_size} bytes"));
println!("\n📋 Sample Entries (first 5):");
for (i, entry) in entries.iter().take(5).enumerate() {
println!(
" {}. Key: {:?}, Info: {}",
i + 1,
entry.key,
entry.format_info
);
}
}
if detailed {
println!("\n🔍 Detailed Statistics:");
let mut key_lengths = entries
.iter()
.map(|e| format!("{:?}", e.key).len())
.collect::<Vec<_>>();
key_lengths.sort_unstable();
if !key_lengths.is_empty() {
let min_key_len = key_lengths[0];
let max_key_len = key_lengths[key_lengths.len() - 1];
let median_key_len = key_lengths[key_lengths.len() / 2];
println!(
" Key length min/max/median: {min_key_len}/{max_key_len}/{median_key_len}"
);
analysis_results.push(format!(
"Key lengths - min: {min_key_len}, max: {max_key_len}, median: {median_key_len}"
));
}
println!(" 📊 Advanced statistics coming soon!");
}
if infer_schema {
println!("\n🧠 Schema Inference:");
println!(" 🚧 Schema inference coming soon!");
analysis_results
.push("Schema inference: Feature in development".to_string());
}
}
Err(e) => {
println!("❌ Failed to parse SSTable data: {e}");
analysis_results.push(format!("Parse error: {e}"));
}
}
}
Err(e) => {
println!("❌ Cannot open SSTable: {e}");
return Err(anyhow::anyhow!("Cannot analyze SSTable: {}", e));
}
}
if let Some(report_path) = report_path {
let mut report_content = format!(
"# SSTable Analysis Report\n\n\
**File:** {}\n\
**Analysis Time:** {}\n\
**Detailed Analysis:** {}\n\
**Schema Inference:** {}\n\n\
## Results\n",
sstable_path.display(),
chrono::Utc::now().to_rfc3339(),
detailed,
infer_schema
);
for result in &analysis_results {
report_content.push_str(&format!("- {result}\n"));
}
std::fs::write(report_path, report_content)
.with_context(|| format!("Failed to write report to {}", report_path.display()))?;
println!("\n📋 Analysis report saved to: {}", report_path.display());
}
println!("\n✅ Analysis completed!");
Ok(())
}
pub async fn benchmark_sstable(
sstable_path: &Path,
schema_path: Option<&Path>,
iterations: u32,
operations: &str,
report_path: Option<&Path>,
memory_profile: bool,
) -> Result<()> {
println!("🏁 SSTable Performance Benchmark");
println!("📂 SSTable: {}", sstable_path.display());
if let Some(schema) = schema_path {
println!("📋 Schema: {}", schema.display());
}
println!("🔄 Iterations: {iterations}");
println!("🎯 Operations: {operations}");
if memory_profile {
println!("📊 Memory profiling enabled");
}
if let Some(report) = report_path {
println!("📋 Report will be saved to: {}", report.display());
}
let actual_sstable_path = resolve_sstable_path(sstable_path)?;
println!("📄 Data file: {}", actual_sstable_path.display());
let mut benchmark_results = Vec::new();
let ops: Vec<&str> = if operations == "all" {
vec!["read", "scan", "query"]
} else {
operations.split(',').map(|s| s.trim()).collect()
};
println!("\n🚀 Starting benchmarks...");
for op in &ops {
println!("\n📊 Benchmarking operation: {op}");
let mut times = Vec::new();
let mut memory_usage = Vec::new();
for i in 1..=iterations {
print!(" Iteration {i}/{iterations}: ");
let start_time = std::time::Instant::now();
let initial_memory = if memory_profile {
0u64
} else {
0u64
};
let result = match *op {
"read" => benchmark_read_operation(&actual_sstable_path).await,
"scan" => benchmark_scan_operation(&actual_sstable_path).await,
"query" => benchmark_query_operation(&actual_sstable_path, schema_path).await,
_ => {
println!("❌ Unknown operation: {op}");
continue;
}
};
let elapsed = start_time.elapsed();
let final_memory = if memory_profile {
0u64
} else {
0u64
};
match result {
Ok(entries_processed) => {
println!(
"✅ {}ms ({} entries)",
elapsed.as_millis(),
entries_processed
);
times.push(elapsed.as_millis() as f64);
if memory_profile {
memory_usage.push(final_memory.saturating_sub(initial_memory));
}
}
Err(e) => {
println!("❌ Failed: {e}");
}
}
}
if !times.is_empty() {
times.sort_by(|a, b| a.partial_cmp(b).unwrap());
let min_time = times[0];
let max_time = times[times.len() - 1];
let avg_time = times.iter().sum::<f64>() / times.len() as f64;
let median_time = times[times.len() / 2];
println!("\n 📊 {op} Statistics:");
println!(" Min time: {min_time:.2}ms");
println!(" Max time: {max_time:.2}ms");
println!(" Avg time: {avg_time:.2}ms");
println!(" Median time: {median_time:.2}ms");
benchmark_results.push(format!(
"{op}: min={min_time:.2}ms, max={max_time:.2}ms, avg={avg_time:.2}ms, median={median_time:.2}ms"
));
if memory_profile && !memory_usage.is_empty() {
let avg_memory = memory_usage.iter().sum::<u64>() / memory_usage.len() as u64;
println!(" Avg memory: {avg_memory} bytes");
benchmark_results.push(format!("{op}: avg_memory={avg_memory}bytes"));
}
}
}
if let Some(report_path) = report_path {
let mut report_content = format!(
"# SSTable Benchmark Report\n\n\
**File:** {}\n\
**Benchmark Time:** {}\n\
**Iterations:** {}\n\
**Operations:** {}\n\
**Memory Profiling:** {}\n\n\
## Results\n",
sstable_path.display(),
chrono::Utc::now().to_rfc3339(),
iterations,
operations,
memory_profile
);
for result in &benchmark_results {
report_content.push_str(&format!("- {result}\n"));
}
std::fs::write(report_path, report_content)
.with_context(|| format!("Failed to write report to {}", report_path.display()))?;
println!("\n📋 Benchmark report saved to: {}", report_path.display());
}
println!("\n🏆 Benchmark completed!");
Ok(())
}
async fn benchmark_read_operation(sstable_path: &Path) -> Result<usize> {
let reader = BulletproofReader::open(sstable_path).with_context(|| "Failed to open SSTable")?;
let _info = reader.info();
Ok(1) }
async fn benchmark_scan_operation(sstable_path: &Path) -> Result<usize> {
let mut reader =
BulletproofReader::open(sstable_path).with_context(|| "Failed to open SSTable")?;
match reader.parse_sstable_data() {
Ok(entries) => Ok(entries.len()),
Err(_) => {
let _info = reader.info();
Ok(0)
}
}
}
async fn benchmark_query_operation(
sstable_path: &Path,
schema_path: Option<&Path>,
) -> Result<usize> {
let mut reader =
BulletproofReader::open(sstable_path).with_context(|| "Failed to open SSTable")?;
match reader.parse_sstable_data() {
Ok(entries) => {
if let Some(schema_path) = schema_path {
match load_schema_file(schema_path, true, None) {
Ok(schema) => {
let parser = RealDataParser::new(schema);
let mut parsed_count = 0;
for entry in &entries {
let key = entry.key.clone();
let value = cqlite_core::Value::Text(format!("{:?}", entry.key));
if parser.parse_entry(&key, &value).is_ok() {
parsed_count += 1;
}
}
Ok(parsed_count)
}
Err(_) => Ok(entries.len()), }
} else {
Ok(entries.len())
}
}
Err(_) => Ok(0),
}
}