Skip to main content

cqlite_cli/commands/
mod.rs

1#![allow(dead_code)]
2// Allow deprecated BulletproofReader usage (Issue #190 - experimental reader)
3// This will be removed once BulletproofReader is fully replaced with SSTableReader
4#![allow(deprecated)]
5
6use crate::cli::OutputFormat;
7#[cfg(feature = "state_machine")]
8use crate::cli::{ExportFormat, ImportFormat};
9#[cfg(feature = "state_machine")]
10use indicatif::{ProgressBar, ProgressStyle};
11// use crate::formatter::CqlshTableFormatter;
12// use crate::data_parser::{RealDataParser, ParsedRow};
13
14// Temporary stub types for disabled modules
15#[derive(Debug, Clone)]
16pub struct ParsedRow {
17    pub data: std::collections::HashMap<String, String>,
18}
19
20impl ParsedRow {
21    pub fn get(&self, key: &str) -> Option<&String> {
22        self.data.get(key)
23    }
24
25    pub fn to_json(&self) -> serde_json::Value {
26        serde_json::Value::Object(
27            self.data
28                .iter()
29                .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
30                .collect(),
31        )
32    }
33}
34
35#[derive(Debug, Clone)]
36pub struct RealDataParser {
37    pub schema: cqlite_core::schema::TableSchema,
38}
39
40impl RealDataParser {
41    pub fn new(schema: cqlite_core::schema::TableSchema) -> Self {
42        Self { schema }
43    }
44
45    pub fn parse_entry(
46        &self,
47        _key: &cqlite_core::RowKey,
48        _value: &cqlite_core::Value,
49    ) -> Result<ParsedRow> {
50        Ok(ParsedRow {
51            data: std::collections::HashMap::new(),
52        })
53    }
54
55    pub fn get_column_names(&self) -> Vec<String> {
56        self.schema.columns.iter().map(|c| c.name.clone()).collect()
57    }
58}
59
60/// Format duration for export statistics display
61fn format_export_duration(duration: std::time::Duration) -> String {
62    let secs = duration.as_secs();
63    if secs == 0 {
64        let millis = duration.as_millis();
65        if millis > 0 {
66            format!("{}ms", millis)
67        } else {
68            "<1ms".to_string()
69        }
70    } else if secs < 60 {
71        format!("{}s", secs)
72    } else if secs < 3600 {
73        format!("{}m {}s", secs / 60, secs % 60)
74    } else {
75        format!("{}h {}m {}s", secs / 3600, (secs % 3600) / 60, secs % 60)
76    }
77}
78
79// Stub for QueryExecutor
80#[derive(Debug)]
81pub struct QueryExecutor;
82
83impl QueryExecutor {
84    pub fn new(_config: QueryExecutorConfig) -> Self {
85        Self
86    }
87
88    pub async fn execute_select(&self, _query: &str) -> Result<QueryResult> {
89        Ok(QueryResult {
90            rows: Vec::new(),
91            execution_time_ms: 0.0,
92        })
93    }
94}
95
96#[derive(Debug, Default)]
97pub struct QueryExecutorConfig;
98
99// Wrapper struct for query results
100#[derive(Debug, Clone)]
101pub struct QueryResult {
102    pub rows: Vec<ParsedRow>,
103    pub execution_time_ms: f64,
104}
105
106impl QueryResult {
107    pub fn display_table(&self) {
108        if self.rows.is_empty() {
109            println!("No rows returned");
110            return;
111        }
112
113        // Create a simple table display
114        let mut table = prettytable::Table::new();
115
116        // Add headers if we can determine them from first row
117        if let Some(first_row) = self.rows.first() {
118            let headers: Vec<_> = first_row.data.keys().cloned().collect();
119            table.set_titles(prettytable::Row::new(
120                headers.iter().map(|h| prettytable::Cell::new(h)).collect(),
121            ));
122
123            // Add data rows
124            for row in &self.rows {
125                let cells: Vec<_> = headers
126                    .iter()
127                    .map(|h| prettytable::Cell::new(row.data.get(h).unwrap_or(&String::new())))
128                    .collect();
129                table.add_row(prettytable::Row::new(cells));
130            }
131        }
132
133        table.printstd();
134    }
135
136    pub fn display_json(&self) -> Result<()> {
137        let json_rows: Vec<_> = self.rows.iter().map(|r| r.to_json()).collect();
138        println!("{}", serde_json::to_string_pretty(&json_rows)?);
139        Ok(())
140    }
141
142    pub fn display_csv(&self) -> Result<()> {
143        if self.rows.is_empty() {
144            return Ok(());
145        }
146
147        let headers: Vec<_> = self.rows[0].data.keys().cloned().collect();
148
149        // Print headers
150        println!("{}", headers.join(","));
151
152        // Print data
153        for row in &self.rows {
154            let values: Vec<_> = headers
155                .iter()
156                .map(|h| row.data.get(h).unwrap_or(&String::new()).clone())
157                .collect();
158            println!("{}", values.join(","));
159        }
160
161        Ok(())
162    }
163}
164// use crate::pagination::{PaginationConfig, PaginatedReader, StreamingProcessor, PaginationProgress};
165// use crate::query_executor::{QueryExecutor, QueryExecutorConfig};
166// use crate::table_scanner::{TableScanner, ScanStrategy, ScanConfig};
167use anyhow::{Context, Result};
168#[cfg(feature = "state_machine")]
169use cqlite_core::Database;
170use cqlite_core::{
171    schema::{parse_cql_schema, ClusteringColumn, ClusteringOrder, Column, KeyColumn, TableSchema},
172    storage::sstable::{bulletproof_reader::BulletproofReader, reader::SSTableReader},
173};
174use std::collections::HashMap;
175#[cfg(feature = "state_machine")]
176use std::fs::File;
177#[cfg(feature = "state_machine")]
178use std::io::{BufWriter, Write};
179use std::path::{Path, PathBuf};
180use std::sync::Arc;
181
182pub mod admin;
183pub mod bench;
184pub mod schema;
185pub mod write;
186
187pub mod docker;
188pub mod info;
189pub mod read_sstable;
190
191#[cfg(feature = "state_machine")]
192pub async fn execute_query(
193    database: &Database,
194    query: &str,
195    explain: bool,
196    timing: bool,
197    format: OutputFormat,
198    config: &crate::config::OutputConfig,
199) -> Result<()> {
200    use crate::output::{write_to_target, OutputTarget};
201    use std::time::Instant;
202
203    let start_time = Instant::now();
204
205    // Handle explain queries (always to stdout, not affected by --output)
206    if explain {
207        let explain_result = database
208            .explain(query)
209            .await
210            .with_context(|| "Failed to explain query")?;
211
212        println!("Query Explanation");
213        println!("================");
214        println!("Query Type: {}", explain_result.query_type);
215        println!("Plan Type: {}", explain_result.plan_type);
216        println!("Estimated Cost: {:.2}", explain_result.estimated_cost);
217        println!("Estimated Rows: {}", explain_result.estimated_rows);
218
219        if !explain_result.selected_indexes.is_empty() {
220            println!("\nSelected Indexes:");
221            for index in &explain_result.selected_indexes {
222                println!("  - {index}");
223            }
224        }
225
226        if !explain_result.execution_steps.is_empty() {
227            println!("\nExecution Steps:");
228            for (i, step) in explain_result.execution_steps.iter().enumerate() {
229                println!("  {}. {}", i + 1, step);
230            }
231        }
232
233        if !explain_result.parallelization_info.is_empty() {
234            println!("\nParallelization:");
235            for info in &explain_result.parallelization_info {
236                println!("  - {info}");
237            }
238        }
239
240        if timing {
241            let elapsed = start_time.elapsed();
242            println!("\nTiming: {:.2}ms", elapsed.as_millis());
243        }
244
245        return Ok(());
246    }
247
248    // Execute the query
249    let result = database
250        .execute(query)
251        .await
252        .with_context(|| "Failed to execute query")?;
253
254    // Generate output bytes based on format (Issue #279)
255    let output_bytes: Vec<u8> = match format {
256        OutputFormat::Table => {
257            use crate::output::table::TableWriter;
258            let table_output = TableWriter::write(&result, config)
259                .map_err(|e| anyhow::anyhow!("Failed to format table output: {}", e))?;
260            table_output.into_bytes()
261        }
262        OutputFormat::Json => {
263            use crate::output::json::JSONWriter;
264            let json_output = JSONWriter::write(&result, config)
265                .map_err(|e| anyhow::anyhow!("Failed to format JSON output: {}", e))?;
266            json_output.into_bytes()
267        }
268        OutputFormat::Csv => {
269            use crate::output::CSVWriter;
270            let csv_output = CSVWriter::write(&result, config)
271                .map_err(|e| anyhow::anyhow!("Failed to format CSV output: {}", e))?;
272            csv_output.into_bytes()
273        }
274        OutputFormat::Parquet => {
275            use crate::output::ParquetWriter;
276            ParquetWriter::write(&result, config)
277                .map_err(|e| anyhow::anyhow!("Failed to format Parquet output: {}", e))?
278        }
279    };
280
281    // Write to target (stdout or file)
282    write_to_target(&output_bytes, &config.target, config.overwrite)
283        .map_err(|e| anyhow::anyhow!("{}", e))?;
284
285    // Add newline for text formats written to stdout (not for binary/file output)
286    if matches!(config.target, OutputTarget::Stdout) && !matches!(format, OutputFormat::Parquet) {
287        // Text writers don't include trailing newline, so add one for stdout
288        // (CSV already ends with newline from csv crate)
289        if !matches!(format, OutputFormat::Csv) {
290            println!();
291        }
292    }
293
294    // Show success message for file output (to stderr so it doesn't mix with output)
295    if let OutputTarget::File(path) = &config.target {
296        eprintln!("Output written to: {}", path.display());
297    }
298
299    // Show timing information if requested (to stderr when writing to file)
300    if timing {
301        let elapsed = start_time.elapsed();
302        eprintln!("\nQuery executed in {:.2}ms", elapsed.as_millis());
303
304        let performance = result.performance();
305        if performance.total_time_us > 0 {
306            eprintln!(
307                "Parse time: {:.2}ms",
308                performance.parse_time_us as f64 / 1000.0
309            );
310            eprintln!(
311                "Planning time: {:.2}ms",
312                performance.planning_time_us as f64 / 1000.0
313            );
314            eprintln!(
315                "Execution time: {:.2}ms",
316                performance.execution_time_us as f64 / 1000.0
317            );
318            eprintln!("Memory usage: {} bytes", performance.memory_usage_bytes);
319            eprintln!("I/O operations: {}", performance.io_operations);
320            if performance.cache_hits + performance.cache_misses > 0 {
321                eprintln!(
322                    "Cache hit ratio: {:.1}%",
323                    performance.cache_hit_ratio() * 100.0
324                );
325            }
326        }
327    }
328
329    // Show warnings if any (to stderr)
330    let warnings = result.warnings();
331    if !warnings.is_empty() {
332        eprintln!("\nWarnings:");
333        for warning in warnings {
334            eprintln!("  āš ļø  {warning}");
335        }
336    }
337
338    Ok(())
339}
340
341#[cfg(not(feature = "state_machine"))]
342pub async fn execute_query(
343    _database: &cqlite_core::Database,
344    _query: &str,
345    _explain: bool,
346    _timing: bool,
347    _format: OutputFormat,
348    _config: &crate::config::OutputConfig,
349) -> Result<()> {
350    Err(anyhow::anyhow!(
351        "Query execution is not available in M1.\n\
352         Build with --features state_machine to enable this feature.\n\
353         See CLAUDE.md for M1 API examples."
354    ))
355}
356
357/// Print results in CSV format
358#[cfg(feature = "state_machine")]
359fn print_csv_format(
360    result: &cqlite_core::query::result::QueryResult,
361    config: &crate::config::OutputConfig,
362) -> Result<()> {
363    use crate::output::CSVWriter;
364
365    // CSVWriter handles limit internally via config
366    let csv_output = CSVWriter::write(result, config)
367        .map_err(|e| anyhow::anyhow!("Failed to format CSV output: {}", e))?;
368
369    print!("{}", csv_output);
370    Ok(())
371}
372
373#[cfg(feature = "state_machine")]
374pub async fn import_data(
375    database: &Database,
376    file: &Path,
377    format: ImportFormat,
378    table: Option<&str>,
379) -> Result<()> {
380    println!("Importing data from: {}", file.display());
381    println!("Format: {format}, Target table: {table:?}");
382
383    // Validate input file exists
384    if !file.exists() {
385        return Err(anyhow::anyhow!("Import file not found: {}", file.display()));
386    }
387
388    // Determine target table
389    let target_table = match table {
390        Some(t) => t.to_string(),
391        None => {
392            // Try to infer table name from filename
393            file.file_stem()
394                .and_then(|stem| stem.to_str())
395                .map(|s| s.to_string())
396                .ok_or_else(|| {
397                    anyhow::anyhow!(
398                        "Could not determine target table name. Please specify --table option."
399                    )
400                })?
401        }
402    };
403
404    // Try to validate target table exists, but don't fail if we can't verify
405    let table_check_query =
406        format!("SELECT table_name FROM system.tables WHERE table_name = '{target_table}'");
407    match database.execute(&table_check_query).await {
408        Ok(result) if result.rows.is_empty() => {
409            println!(
410                "āš ļø  Warning: Table '{}' not found in system catalog. Assuming it exists or will be created during import.",
411                target_table
412            );
413        }
414        Ok(_) => {
415            println!("āœ“ Target table '{target_table}' found");
416        }
417        Err(_) => {
418            println!(
419                "āš ļø  Warning: Could not verify table existence (system tables may not be implemented). Proceeding with import..."
420            );
421        }
422    }
423
424    // Get table schema for validation
425    let table_columns = get_table_columns(database, &target_table).await
426        .unwrap_or_else(|_| {
427            println!("āš ļø  Warning: Could not retrieve table schema. Import may fail if column types don't match.");
428            Vec::new()
429        });
430
431    let mut _imported_rows = 0;
432    let error_count = 0;
433
434    match format {
435        ImportFormat::Csv => {
436            _imported_rows = import_csv_data(database, file, &target_table, &table_columns).await?;
437        }
438        ImportFormat::Json => {
439            _imported_rows =
440                import_json_data(database, file, &target_table, &table_columns).await?;
441        }
442        ImportFormat::Parquet => {
443            return Err(anyhow::anyhow!(
444                "Parquet import not yet implemented. Please convert to CSV or JSON format first."
445            ));
446        }
447    }
448
449    println!("\nšŸ“Š Import Summary:");
450    println!("  Rows imported: {_imported_rows}");
451    if error_count > 0 {
452        println!("  Errors: {error_count}");
453    }
454    println!("  āœ… Import completed successfully!");
455
456    Ok(())
457}
458
459#[cfg(not(feature = "state_machine"))]
460pub async fn import_data(
461    _database: &cqlite_core::Database,
462    _file: &Path,
463    _format: crate::cli::ImportFormat,
464    _table: Option<&str>,
465) -> Result<()> {
466    Err(anyhow::anyhow!(
467        "Data import is not available in M1.\n\
468         Build with --features state_machine to enable this feature.\n\
469         See CLAUDE.md for M1 API examples."
470    ))
471}
472
473/// Import CSV data into the specified table
474#[cfg(feature = "state_machine")]
475async fn import_csv_data(
476    database: &Database,
477    file: &Path,
478    table: &str,
479    table_columns: &[String],
480) -> Result<u64> {
481    use csv::ReaderBuilder;
482    use indicatif::{ProgressBar, ProgressStyle};
483
484    let file_handle =
485        File::open(file).with_context(|| format!("Failed to open CSV file: {}", file.display()))?;
486
487    let mut csv_reader = ReaderBuilder::new()
488        .has_headers(true)
489        .from_reader(file_handle);
490
491    // Get headers from CSV
492    let headers = csv_reader
493        .headers()
494        .with_context(|| "Failed to read CSV headers")?;
495    let csv_columns: Vec<String> = headers.iter().map(|h| h.to_string()).collect();
496
497    println!("šŸ“‹ CSV columns: {}", csv_columns.join(", "));
498    if !table_columns.is_empty() {
499        println!("šŸ“‹ Table columns: {}", table_columns.join(", "));
500    }
501
502    // Count total rows for progress
503    let total_rows = csv_reader.records().count() as u64;
504
505    // Reopen file for actual processing
506    let file_handle = File::open(file)?;
507    let mut csv_reader = ReaderBuilder::new()
508        .has_headers(true)
509        .from_reader(file_handle);
510
511    let pb = ProgressBar::new(total_rows);
512    pb.set_style(
513        ProgressStyle::default_bar()
514            .template(
515                "Importing CSV [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} rows ({eta})",
516            )
517            .unwrap()
518            .progress_chars("=>-"),
519    );
520
521    let mut imported_count = 0;
522    let mut batch_statements = Vec::new();
523    let batch_size = 100; // Process in batches for better performance
524
525    for (row_num, record_result) in csv_reader.records().enumerate() {
526        pb.set_position(row_num as u64 + 1);
527
528        let record = record_result
529            .with_context(|| format!("Failed to parse CSV record at line {}", row_num + 2))?;
530
531        // Create INSERT statement
532        let values: Vec<String> = record
533            .iter()
534            .map(|field| {
535                if field.is_empty() {
536                    "NULL".to_string()
537                } else {
538                    format!("'{}'", field.replace("'", "''")) // Escape single quotes
539                }
540            })
541            .collect();
542
543        let insert_stmt = format!(
544            "INSERT INTO {} ({}) VALUES ({})",
545            table,
546            csv_columns.join(", "),
547            values.join(", ")
548        );
549
550        batch_statements.push(insert_stmt);
551
552        // Execute batch when it reaches the batch size
553        if batch_statements.len() >= batch_size {
554            execute_batch_statements(database, &mut batch_statements, &mut imported_count).await?;
555        }
556    }
557
558    // Execute remaining statements
559    if !batch_statements.is_empty() {
560        execute_batch_statements(database, &mut batch_statements, &mut imported_count).await?;
561    }
562
563    pb.finish_with_message(format!("Imported {imported_count} rows from CSV"));
564    Ok(imported_count)
565}
566
567/// Import JSON data into the specified table
568#[cfg(feature = "state_machine")]
569async fn import_json_data(
570    database: &Database,
571    file: &Path,
572    table: &str,
573    _table_columns: &[String],
574) -> Result<u64> {
575    use indicatif::{ProgressBar, ProgressStyle};
576    use std::fs;
577
578    let file_content = fs::read_to_string(file)
579        .with_context(|| format!("Failed to read JSON file: {}", file.display()))?;
580
581    // Try to parse as array of objects or single object
582    let json_data: serde_json::Value =
583        serde_json::from_str(&file_content).with_context(|| "Failed to parse JSON file")?;
584
585    let objects = match json_data {
586        serde_json::Value::Array(arr) => arr,
587        serde_json::Value::Object(_) => vec![json_data],
588        _ => {
589            return Err(anyhow::anyhow!(
590                "JSON file must contain an object or array of objects"
591            ));
592        }
593    };
594
595    println!("šŸ“‹ Found {} JSON objects to import", objects.len());
596
597    let pb = ProgressBar::new(objects.len() as u64);
598    pb.set_style(
599        ProgressStyle::default_bar()
600            .template("Importing JSON [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} objects ({eta})")
601            .unwrap()
602            .progress_chars("=>-"),
603    );
604
605    let mut imported_count = 0;
606    let mut batch_statements = Vec::new();
607    let batch_size = 50;
608
609    for (index, obj) in objects.iter().enumerate() {
610        pb.set_position(index as u64 + 1);
611
612        if let serde_json::Value::Object(map) = obj {
613            let columns: Vec<String> = map.keys().cloned().collect();
614            let values: Vec<String> = map
615                .values()
616                .map(|v| match v {
617                    serde_json::Value::Null => "NULL".to_string(),
618                    serde_json::Value::String(s) => format!("'{}'", s.replace("'", "''")),
619                    serde_json::Value::Number(n) => n.to_string(),
620                    serde_json::Value::Bool(b) => b.to_string(),
621                    _ => format!("'{}'", v.to_string().replace("'", "''")),
622                })
623                .collect();
624
625            let insert_stmt = format!(
626                "INSERT INTO {} ({}) VALUES ({})",
627                table,
628                columns.join(", "),
629                values.join(", ")
630            );
631
632            batch_statements.push(insert_stmt);
633
634            if batch_statements.len() >= batch_size {
635                execute_batch_statements(database, &mut batch_statements, &mut imported_count)
636                    .await?;
637            }
638        } else {
639            println!("āš ļø  Skipping non-object JSON element at index {index}");
640        }
641    }
642
643    // Execute remaining statements
644    if !batch_statements.is_empty() {
645        execute_batch_statements(database, &mut batch_statements, &mut imported_count).await?;
646    }
647
648    pb.finish_with_message(format!("Imported {imported_count} objects from JSON"));
649    Ok(imported_count)
650}
651
652/// Execute a batch of INSERT statements
653#[cfg(feature = "state_machine")]
654async fn execute_batch_statements(
655    database: &Database,
656    statements: &mut Vec<String>,
657    imported_count: &mut u64,
658) -> Result<()> {
659    for statement in statements.drain(..) {
660        match database.execute(&statement).await {
661            Ok(_) => {
662                *imported_count += 1;
663            }
664            Err(e) => {
665                println!("āš ļø  Error executing statement: {e}");
666                println!(
667                    "   Statement: {}",
668                    statement.chars().take(100).collect::<String>() + "..."
669                );
670                // Continue with next statement rather than failing completely
671            }
672        }
673    }
674    Ok(())
675}
676
677/// Get table columns for schema validation
678#[cfg(feature = "state_machine")]
679async fn get_table_columns(database: &Database, table: &str) -> Result<Vec<String>> {
680    let query = format!("SELECT column_name FROM system.columns WHERE table_name = '{table}'");
681    match database.execute(&query).await {
682        Ok(result) => {
683            let columns = result
684                .rows
685                .iter()
686                .filter_map(|row| row.get("column_name"))
687                .map(|col| col.to_string())
688                .collect();
689            Ok(columns)
690        }
691        Err(e) => Err(anyhow::anyhow!("Failed to get table columns: {}", e)),
692    }
693}
694
695/// Export data using true streaming execution (Issue #280)
696///
697/// This function uses `execute_streaming()` to process rows incrementally,
698/// avoiding the need to materialize all query results in memory at once.
699#[cfg(feature = "state_machine")]
700pub async fn export_data(
701    database: &Database,
702    source: &str,
703    file: &Path,
704    format: ExportFormat,
705    query_filter: Option<&str>,
706    limit: Option<usize>,
707    quiet: bool,
708) -> Result<()> {
709    use cqlite_core::query::result::StreamingConfig;
710    use std::io::IsTerminal;
711    use std::time::Instant;
712
713    use crate::output::{
714        create_streaming_parquet_writer, StreamingCSVWriter, StreamingJSONWriter, StreamingWriter,
715    };
716    use crate::status_metrics::format_bytes;
717
718    // Determine if progress should be shown (not quiet, and output is a TTY)
719    let show_progress = !quiet && std::io::stdout().is_terminal();
720
721    if show_progress {
722        println!("Exporting data from: {source}");
723        println!("Output file: {}, Format: {}", file.display(), format);
724    }
725
726    // Create output directory if it doesn't exist
727    if let Some(parent) = file.parent() {
728        std::fs::create_dir_all(parent)
729            .with_context(|| format!("Failed to create output directory: {}", parent.display()))?;
730    }
731
732    // Determine if source is a table name or a query
733    let query = if source.to_uppercase().trim().starts_with("SELECT") {
734        // If source is already a SELECT query, append LIMIT if specified
735        // but only if the query doesn't already have a LIMIT clause
736        match limit {
737            Some(n) => {
738                let upper = source.to_uppercase();
739                if upper.contains(" LIMIT ") {
740                    // Query already has LIMIT - use as-is to avoid invalid SQL
741                    source.to_string()
742                } else {
743                    format!("{} LIMIT {}", source.trim_end_matches(';'), n)
744                }
745            }
746            None => source.to_string(),
747        }
748    } else {
749        // Source is a table name - build SELECT with optional WHERE and LIMIT
750        let mut q = format!("SELECT * FROM {}", source);
751        if let Some(filter) = query_filter {
752            q.push_str(&format!(" WHERE {}", filter));
753        }
754        if let Some(n) = limit {
755            q.push_str(&format!(" LIMIT {}", n));
756        }
757        q
758    };
759
760    if show_progress {
761        println!(
762            "Executing query: {}",
763            query.chars().take(100).collect::<String>() + "..."
764        );
765    }
766
767    // Configure streaming based on format
768    let config = match format {
769        ExportFormat::Parquet => StreamingConfig::for_parquet(),
770        _ => StreamingConfig::for_text_formats(),
771    };
772
773    // Execute the query with streaming (Issue #280 - true end-to-end streaming)
774    let mut result_iter = database
775        .execute_streaming(&query, config.clone())
776        .await
777        .with_context(|| format!("Failed to execute streaming export query: {query}"))?;
778
779    // Get column names from metadata
780    let column_names: Vec<String> = result_iter
781        .metadata
782        .columns
783        .iter()
784        .map(|c| c.name.clone())
785        .collect();
786
787    if column_names.is_empty() {
788        return Err(anyhow::anyhow!(
789            "Could not determine column names for export"
790        ));
791    }
792
793    if show_progress {
794        println!("Columns: {}", column_names.join(", "));
795        println!("Streaming export in progress...");
796    }
797
798    // Track timing for statistics
799    let start_time = Instant::now();
800
801    // Create spinner progress bar (unknown total for streaming)
802    let pb = if show_progress {
803        let pb = ProgressBar::new_spinner();
804        pb.set_style(
805            ProgressStyle::default_spinner()
806                .template("{spinner:.green} {msg} ({pos} rows)")
807                .unwrap(),
808        );
809        pb.set_message("Exporting");
810        pb
811    } else {
812        ProgressBar::hidden()
813    };
814
815    // Chunk size for collecting rows before writing
816    let chunk_size = config.chunk_size;
817    let mut rows_exported: u64 = 0;
818    // Track remaining rows for limit enforcement (streaming doesn't automatically enforce LIMIT)
819    let mut rows_remaining: Option<usize> = limit;
820
821    // Export based on format with true streaming
822    match format {
823        ExportFormat::Csv => {
824            let output_file = File::create(file)
825                .with_context(|| format!("Failed to create CSV file: {}", file.display()))?;
826            let buf_writer = BufWriter::new(output_file);
827            let mut writer = StreamingCSVWriter::new(buf_writer);
828
829            writer
830                .write_header(&result_iter.metadata)
831                .map_err(|e| anyhow::anyhow!("Failed to write CSV header: {}", e))?;
832
833            // Stream rows in chunks
834            loop {
835                // Check if we've hit the limit
836                if rows_remaining == Some(0) {
837                    break;
838                }
839
840                let chunk = result_iter
841                    .collect_chunk(chunk_size)
842                    .await
843                    .map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
844
845                if chunk.is_empty() {
846                    break;
847                }
848
849                // Truncate chunk if it exceeds remaining limit
850                let chunk_to_write = if let Some(remaining) = rows_remaining {
851                    if chunk.len() > remaining {
852                        chunk.into_iter().take(remaining).collect::<Vec<_>>()
853                    } else {
854                        chunk
855                    }
856                } else {
857                    chunk
858                };
859
860                let written = chunk_to_write.len();
861                writer
862                    .write_chunk(&chunk_to_write)
863                    .map_err(|e| anyhow::anyhow!("Failed to write CSV chunk: {}", e))?;
864
865                rows_exported += written as u64;
866                pb.set_position(rows_exported);
867
868                // Update remaining count
869                if let Some(ref mut remaining) = rows_remaining {
870                    *remaining = remaining.saturating_sub(written);
871                }
872            }
873
874            writer
875                .finalize()
876                .map_err(|e| anyhow::anyhow!("Failed to finalize CSV: {}", e))?;
877        }
878        ExportFormat::Json => {
879            let output_file = File::create(file)
880                .with_context(|| format!("Failed to create JSON file: {}", file.display()))?;
881            let buf_writer = BufWriter::new(output_file);
882            let mut writer = StreamingJSONWriter::new(buf_writer);
883
884            writer
885                .write_header(&result_iter.metadata)
886                .map_err(|e| anyhow::anyhow!("Failed to write JSON header: {}", e))?;
887
888            // Stream rows in chunks
889            loop {
890                // Check if we've hit the limit
891                if rows_remaining == Some(0) {
892                    break;
893                }
894
895                let chunk = result_iter
896                    .collect_chunk(chunk_size)
897                    .await
898                    .map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
899
900                if chunk.is_empty() {
901                    break;
902                }
903
904                // Truncate chunk if it exceeds remaining limit
905                let chunk_to_write = if let Some(remaining) = rows_remaining {
906                    if chunk.len() > remaining {
907                        chunk.into_iter().take(remaining).collect::<Vec<_>>()
908                    } else {
909                        chunk
910                    }
911                } else {
912                    chunk
913                };
914
915                let written = chunk_to_write.len();
916                writer
917                    .write_chunk(&chunk_to_write)
918                    .map_err(|e| anyhow::anyhow!("Failed to write JSON chunk: {}", e))?;
919
920                rows_exported += written as u64;
921                pb.set_position(rows_exported);
922
923                // Update remaining count
924                if let Some(ref mut remaining) = rows_remaining {
925                    *remaining = remaining.saturating_sub(written);
926                }
927            }
928
929            writer
930                .finalize()
931                .map_err(|e| anyhow::anyhow!("Failed to finalize JSON: {}", e))?;
932        }
933        ExportFormat::Cql => {
934            // CQL format needs special handling - collect all for table name extraction
935            // For now, fall back to non-streaming for CQL
936            let output_file = File::create(file)
937                .with_context(|| format!("Failed to create CQL file: {}", file.display()))?;
938            let mut buf_writer = BufWriter::new(output_file);
939
940            // Extract table name from source
941            let table_name = if source.to_uppercase().contains("FROM") {
942                source
943                    .split_whitespace()
944                    .skip_while(|&word| word.to_uppercase() != "FROM")
945                    .nth(1)
946                    .unwrap_or("exported_table")
947            } else {
948                source
949            };
950
951            // Write header comment
952            writeln!(buf_writer, "-- CQL Export from CQLite (streaming)")?;
953            writeln!(buf_writer, "-- Source: {source}")?;
954            writeln!(
955                buf_writer,
956                "-- Generated: {}",
957                chrono::Utc::now().to_rfc3339()
958            )?;
959            writeln!(buf_writer)?;
960
961            // Stream rows
962            loop {
963                // Check if we've hit the limit
964                if rows_remaining == Some(0) {
965                    break;
966                }
967
968                let chunk = result_iter
969                    .collect_chunk(chunk_size)
970                    .await
971                    .map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
972
973                if chunk.is_empty() {
974                    break;
975                }
976
977                // Truncate chunk if it exceeds remaining limit
978                let chunk_to_write: Vec<_> = if let Some(remaining) = rows_remaining {
979                    if chunk.len() > remaining {
980                        chunk.into_iter().take(remaining).collect()
981                    } else {
982                        chunk
983                    }
984                } else {
985                    chunk
986                };
987
988                for row in &chunk_to_write {
989                    let values: Vec<String> = column_names
990                        .iter()
991                        .map(|col| {
992                            row.values
993                                .get(col)
994                                .map(|v| match v {
995                                    cqlite_core::Value::Text(s) => {
996                                        format!("'{}'", s.replace("'", "''"))
997                                    }
998                                    cqlite_core::Value::Null => "NULL".to_string(),
999                                    _ => v.to_string(),
1000                                })
1001                                .unwrap_or_else(|| "NULL".to_string())
1002                        })
1003                        .collect();
1004
1005                    writeln!(
1006                        buf_writer,
1007                        "INSERT INTO {} ({}) VALUES ({});",
1008                        table_name,
1009                        column_names.join(", "),
1010                        values.join(", ")
1011                    )?;
1012                }
1013
1014                let written = chunk_to_write.len();
1015                rows_exported += written as u64;
1016                pb.set_position(rows_exported);
1017
1018                // Update remaining count
1019                if let Some(ref mut remaining) = rows_remaining {
1020                    *remaining = remaining.saturating_sub(written);
1021                }
1022            }
1023
1024            buf_writer.flush()?;
1025        }
1026        ExportFormat::Parquet => {
1027            let output_file = File::create(file)
1028                .with_context(|| format!("Failed to create Parquet file: {}", file.display()))?;
1029
1030            let mut writer =
1031                create_streaming_parquet_writer(output_file, &result_iter.metadata, chunk_size)
1032                    .map_err(|e| anyhow::anyhow!("Failed to initialize Parquet writer: {}", e))?;
1033
1034            writer
1035                .write_header(&result_iter.metadata)
1036                .map_err(|e| anyhow::anyhow!("Failed to write Parquet header: {}", e))?;
1037
1038            // Stream rows in chunks
1039            loop {
1040                // Check if we've hit the limit
1041                if rows_remaining == Some(0) {
1042                    break;
1043                }
1044
1045                let chunk = result_iter
1046                    .collect_chunk(chunk_size)
1047                    .await
1048                    .map_err(|e| anyhow::anyhow!("Failed to collect chunk: {}", e))?;
1049
1050                if chunk.is_empty() {
1051                    break;
1052                }
1053
1054                // Truncate chunk if it exceeds remaining limit
1055                let chunk_to_write = if let Some(remaining) = rows_remaining {
1056                    if chunk.len() > remaining {
1057                        chunk.into_iter().take(remaining).collect::<Vec<_>>()
1058                    } else {
1059                        chunk
1060                    }
1061                } else {
1062                    chunk
1063                };
1064
1065                let written = chunk_to_write.len();
1066                writer
1067                    .write_chunk(&chunk_to_write)
1068                    .map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
1069
1070                rows_exported += written as u64;
1071                pb.set_position(rows_exported);
1072
1073                // Update remaining count
1074                if let Some(ref mut remaining) = rows_remaining {
1075                    *remaining = remaining.saturating_sub(written);
1076                }
1077            }
1078
1079            writer
1080                .finalize()
1081                .map_err(|e| anyhow::anyhow!("Failed to finalize Parquet: {}", e))?;
1082        }
1083    }
1084
1085    pb.finish_and_clear();
1086
1087    // Display statistics (unless quiet)
1088    if !quiet {
1089        let duration = start_time.elapsed();
1090        let file_size = std::fs::metadata(file)?.len();
1091
1092        println!("\nExport complete:");
1093        println!("  Rows: {}", rows_exported);
1094        println!("  Size: {}", format_bytes(file_size));
1095        println!("  Time: {}", format_export_duration(duration));
1096        let secs_f64 = duration.as_secs_f64();
1097        if secs_f64 > 0.0 {
1098            let rate = rows_exported as f64 / secs_f64;
1099            if rate.is_finite() {
1100                println!("  Rate: {:.0} rows/sec", rate);
1101            }
1102        }
1103    }
1104
1105    Ok(())
1106}
1107
1108#[cfg(not(feature = "state_machine"))]
1109pub async fn export_data(
1110    _database: &cqlite_core::Database,
1111    _source: &str,
1112    _file: &Path,
1113    _format: crate::cli::ExportFormat,
1114    _query_filter: Option<&str>,
1115    _limit: Option<usize>,
1116    _quiet: bool,
1117) -> Result<()> {
1118    Err(anyhow::anyhow!(
1119        "Data export is not available in M1.\n\
1120         Build with --features state_machine to enable this feature.\n\
1121         See CLAUDE.md for M1 API examples."
1122    ))
1123}
1124
1125/// Export query result to CSV format using streaming writer (Issue #280)
1126///
1127/// Uses `StreamingCSVWriter` for memory-efficient chunked export.
1128/// Rows are written directly to file in chunks.
1129#[cfg(feature = "state_machine")]
1130async fn export_to_csv(
1131    result: &cqlite_core::query::result::QueryResult,
1132    file: &Path,
1133    _column_names: &[String],
1134    pb: &ProgressBar,
1135) -> Result<()> {
1136    use crate::output::{StreamingCSVWriter, StreamingWriter};
1137
1138    // Chunk size for CSV streaming
1139    const CHUNK_SIZE: usize = 5_000;
1140
1141    let output_file = File::create(file)
1142        .with_context(|| format!("Failed to create CSV file: {}", file.display()))?;
1143
1144    // Create streaming CSV writer with buffering for I/O efficiency
1145    let buf_writer = BufWriter::new(output_file);
1146    let mut writer = StreamingCSVWriter::new(buf_writer);
1147
1148    // Write header (column names from metadata)
1149    writer
1150        .write_header(&result.metadata)
1151        .map_err(|e| anyhow::anyhow!("Failed to write CSV header: {}", e))?;
1152
1153    // Process rows in chunks for memory efficiency
1154    for chunk in result.rows.chunks(CHUNK_SIZE) {
1155        writer
1156            .write_chunk(chunk)
1157            .map_err(|e| anyhow::anyhow!("Failed to write CSV chunk: {}", e))?;
1158        pb.inc(chunk.len() as u64);
1159    }
1160
1161    // Finalize (flush)
1162    writer
1163        .finalize()
1164        .map_err(|e| anyhow::anyhow!("Failed to finalize CSV file: {}", e))?;
1165
1166    Ok(())
1167}
1168
1169/// Export query result to JSON format using streaming writer (Issue #280)
1170///
1171/// Uses `StreamingJSONWriter` for memory-efficient chunked export.
1172/// Rows are processed in chunks to avoid building entire JSON array in memory.
1173#[cfg(feature = "state_machine")]
1174async fn export_to_json(
1175    result: &cqlite_core::query::result::QueryResult,
1176    file: &Path,
1177    _column_names: &[String],
1178    pb: &ProgressBar,
1179) -> Result<()> {
1180    use crate::output::{StreamingJSONWriter, StreamingWriter};
1181
1182    // Chunk size for JSON streaming (smaller than Parquet since JSON is text-heavy)
1183    const CHUNK_SIZE: usize = 5_000;
1184
1185    let output_file = File::create(file)
1186        .with_context(|| format!("Failed to create JSON file: {}", file.display()))?;
1187    let buf_writer = BufWriter::new(output_file);
1188
1189    // Create streaming JSON writer with pretty-printing
1190    let mut writer = StreamingJSONWriter::new(buf_writer);
1191
1192    // Write header (opening bracket and store column order)
1193    writer
1194        .write_header(&result.metadata)
1195        .map_err(|e| anyhow::anyhow!("Failed to write JSON header: {}", e))?;
1196
1197    // Process rows in chunks for memory efficiency
1198    for chunk in result.rows.chunks(CHUNK_SIZE) {
1199        writer
1200            .write_chunk(chunk)
1201            .map_err(|e| anyhow::anyhow!("Failed to write JSON chunk: {}", e))?;
1202        pb.inc(chunk.len() as u64);
1203    }
1204
1205    // Finalize (write closing bracket)
1206    writer
1207        .finalize()
1208        .map_err(|e| anyhow::anyhow!("Failed to finalize JSON file: {}", e))?;
1209
1210    Ok(())
1211}
1212
1213/// Export query result to CQL INSERT statements
1214#[cfg(feature = "state_machine")]
1215async fn export_to_cql(
1216    result: &cqlite_core::query::result::QueryResult,
1217    file: &Path,
1218    source: &str,
1219    column_names: &[String],
1220    pb: &ProgressBar,
1221) -> Result<()> {
1222    let output_file = File::create(file)
1223        .with_context(|| format!("Failed to create CQL file: {}", file.display()))?;
1224    let mut writer = BufWriter::new(output_file);
1225
1226    // Extract table name from source
1227    let table_name = if source.to_uppercase().contains("FROM") {
1228        // Try to extract table name from SELECT query
1229        source
1230            .split_whitespace()
1231            .skip_while(|&word| word.to_uppercase() != "FROM")
1232            .nth(1)
1233            .unwrap_or("exported_table")
1234    } else {
1235        source
1236    };
1237
1238    // Write header comment
1239    writeln!(writer, "-- CQL Export from CQLite")?;
1240    writeln!(writer, "-- Source: {source}")?;
1241    writeln!(writer, "-- Generated: {}", chrono::Utc::now().to_rfc3339())?;
1242    writeln!(writer, "-- Rows: {}", result.rows.len())?;
1243    writeln!(writer)?;
1244
1245    // Write INSERT statements
1246    for (index, row) in result.rows.iter().enumerate() {
1247        pb.set_position(index as u64 + 1);
1248
1249        let values: Vec<String> = column_names
1250            .iter()
1251            .map(|col| {
1252                row.get(col)
1253                    .map(|v| match v {
1254                        cqlite_core::Value::Text(s) => format!("'{}'", s.replace("'", "''")),
1255                        cqlite_core::Value::Null => "NULL".to_string(),
1256                        _ => v.to_string(),
1257                    })
1258                    .unwrap_or_else(|| "NULL".to_string())
1259            })
1260            .collect();
1261
1262        writeln!(
1263            writer,
1264            "INSERT INTO {} ({}) VALUES ({});",
1265            table_name,
1266            column_names.join(", "),
1267            values.join(", ")
1268        )?;
1269    }
1270
1271    writer
1272        .flush()
1273        .with_context(|| "Failed to flush CQL writer")?;
1274
1275    Ok(())
1276}
1277
1278/// Export query result to Parquet format using streaming writer (Issue #280)
1279///
1280/// Uses `StreamingParquetWriter` for memory-efficient chunked export.
1281/// Rows are processed in chunks (default 10,000) matching Parquet row group size.
1282#[cfg(feature = "state_machine")]
1283async fn export_to_parquet(
1284    result: &cqlite_core::query::result::QueryResult,
1285    file: &Path,
1286    _column_names: &[String],
1287    pb: &ProgressBar,
1288) -> Result<()> {
1289    use crate::output::{create_streaming_parquet_writer, StreamingWriter};
1290
1291    // Default chunk size matches Parquet row group size
1292    const CHUNK_SIZE: usize = 10_000;
1293
1294    pb.set_message("Initializing Parquet writer...");
1295
1296    // Create file for streaming output
1297    let output_file = File::create(file)
1298        .with_context(|| format!("Failed to create Parquet file: {}", file.display()))?;
1299
1300    // Create streaming writer with row group size = chunk size
1301    let mut writer = create_streaming_parquet_writer(output_file, &result.metadata, CHUNK_SIZE)
1302        .map_err(|e| anyhow::anyhow!("Failed to initialize Parquet writer: {}", e))?;
1303
1304    // Write header (initializes Arrow schema)
1305    writer
1306        .write_header(&result.metadata)
1307        .map_err(|e| anyhow::anyhow!("Failed to write Parquet header: {}", e))?;
1308
1309    pb.set_message("Streaming rows to Parquet...");
1310
1311    // Process rows in chunks for memory efficiency
1312    for chunk in result.rows.chunks(CHUNK_SIZE) {
1313        writer
1314            .write_chunk(chunk)
1315            .map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
1316        pb.inc(chunk.len() as u64);
1317    }
1318
1319    // Finalize (flush remaining rows, write footer)
1320    pb.set_message("Finalizing Parquet file...");
1321    writer
1322        .finalize()
1323        .map_err(|e| anyhow::anyhow!("Failed to finalize Parquet file: {}", e))?;
1324
1325    Ok(())
1326}
1327
1328/// Read and display SSTable directory or file data with schema
1329pub async fn read_sstable(
1330    sstable_path: &Path,
1331    schema_path: &Path,
1332    limit: Option<usize>,
1333    skip: Option<usize>,
1334    _generation: Option<u32>,
1335    format: OutputFormat,
1336    auto_detect: bool,
1337    cassandra_version: Option<String>,
1338) -> Result<()> {
1339    // Load schema from file (supports both .cql and .json)
1340    let schema = load_schema_file(schema_path, auto_detect, cassandra_version.as_deref())?;
1341
1342    println!("šŸ” Reading SSTable with REAL data parsing (no mocking!)");
1343    println!("šŸ“‚ SSTable: {}", sstable_path.display());
1344    println!("šŸ“‹ Schema: {}", schema_path.display());
1345
1346    // Smart path resolution: if directory, find the Data.db file
1347    let actual_sstable_path = resolve_sstable_path(sstable_path)?;
1348    println!("šŸ“„ Data file: {}", actual_sstable_path.display());
1349
1350    // Use Bulletproof SSTable Reader for universal format support
1351    println!("šŸš€ Using Bulletproof SSTable Reader (supports all Cassandra versions)");
1352
1353    // Try bulletproof reader first
1354    let mut bulletproof_reader =
1355        BulletproofReader::open(&actual_sstable_path).with_context(|| {
1356            format!(
1357                "Failed to open SSTable with bulletproof reader: {}",
1358                actual_sstable_path.display()
1359            )
1360        })?;
1361
1362    // Show format detection results
1363    let info = bulletproof_reader.info();
1364    println!(
1365        "šŸ“‹ Detected format: {:?} (generation {}, size {})",
1366        info.format,
1367        info.generation_numeric().unwrap_or(0),
1368        info.size
1369    );
1370
1371    if let Some(compression_info) = bulletproof_reader.compression_info() {
1372        println!(
1373            "šŸ“¦ Compression: {} ({} byte chunks)",
1374            compression_info.algorithm, compression_info.chunk_length
1375        );
1376    }
1377
1378    // Try to parse the SSTable data
1379    match bulletproof_reader.parse_sstable_data() {
1380        Ok(bulletproof_entries) => {
1381            println!(
1382                "āœ… Successfully parsed {} entries with bulletproof reader",
1383                bulletproof_entries.len()
1384            );
1385
1386            // Convert bulletproof entries to the format expected by the rest of the code
1387            let mut processed = 0;
1388            let mut displayed = 0;
1389            let skip_count = skip.unwrap_or(0);
1390            let limit_count = limit.unwrap_or(bulletproof_entries.len());
1391
1392            let mut parsed_rows = Vec::new();
1393            let parser = RealDataParser::new(schema.clone());
1394
1395            for entry in bulletproof_entries {
1396                if processed < skip_count {
1397                    processed += 1;
1398                    continue;
1399                }
1400
1401                if displayed >= limit_count {
1402                    break;
1403                }
1404
1405                // Create mock key and value from bulletproof entry for compatibility
1406                let key = entry.key.clone();
1407                let value =
1408                    cqlite_core::Value::Text(format!("{:?}|{}", entry.key, entry.format_info));
1409
1410                match parser.parse_entry(&key, &value) {
1411                    Ok(parsed_row) => {
1412                        parsed_rows.push(parsed_row);
1413                        displayed += 1;
1414                    }
1415                    Err(e) => {
1416                        eprintln!("āš ļø  Failed to parse row {}: {}", processed + 1, e);
1417                        // Show bulletproof data anyway
1418                        println!(
1419                            "šŸ“„ Raw bulletproof data: key='{:?}', info='{}'",
1420                            entry.key, entry.format_info
1421                        );
1422                    }
1423                }
1424                processed += 1;
1425            }
1426
1427            // Display results
1428            match format {
1429                OutputFormat::Table => {
1430                    display_table_format(&parser.get_column_names(), &parsed_rows)
1431                }
1432                OutputFormat::Json => display_json_format(&parsed_rows)?,
1433                OutputFormat::Csv => display_csv_format(&parser.get_column_names(), &parsed_rows)?,
1434                OutputFormat::Parquet => {
1435                    return Err(anyhow::anyhow!("Parquet format is not supported for this command. Use --out json or --out csv instead."));
1436                }
1437            }
1438
1439            println!(
1440                "\nāœ… Bulletproof reader processed {processed} entries, displayed {displayed} rows"
1441            );
1442            return Ok(());
1443        }
1444        Err(e) => {
1445            println!("āš ļø  Bulletproof parser still in development: {e}");
1446            println!("šŸ”„ Falling back to raw data display...");
1447
1448            // Show raw decompressed data as fallback
1449            match bulletproof_reader.read_raw_data(0, 1024) {
1450                Ok(data) => {
1451                    println!("\nšŸ“Š Raw SSTable data (first 1024 bytes):");
1452                    for (i, chunk) in data.chunks(16).enumerate() {
1453                        print!("  {:04x}: ", i * 16);
1454                        for byte in chunk {
1455                            print!("{byte:02x} ");
1456                        }
1457                        print!("  ");
1458                        for byte in chunk {
1459                            let c = if byte.is_ascii_graphic() || *byte == b' ' {
1460                                *byte as char
1461                            } else {
1462                                '.'
1463                            };
1464                            print!("{c}");
1465                        }
1466                        println!();
1467                    }
1468
1469                    println!(
1470                        "\nšŸŽÆ This shows the bulletproof reader successfully decompressed the data!"
1471                    );
1472                    println!(
1473                        "šŸ’” The parsing layer is still being implemented for your specific format."
1474                    );
1475                    return Ok(());
1476                }
1477                Err(e) => {
1478                    println!("āŒ Bulletproof reader failed to read raw data: {e}");
1479                }
1480            }
1481        }
1482    }
1483
1484    // If bulletproof reader fails completely, fall back to old reader
1485    println!("šŸ”„ Falling back to legacy SSTable reader...");
1486    let config = cqlite_core::Config::default();
1487    let platform = Arc::new(cqlite_core::platform::Platform::new(&config).await?);
1488    let reader = SSTableReader::open(&actual_sstable_path, &config, platform)
1489        .await
1490        .with_context(|| format!("Failed to open SSTable: {}", actual_sstable_path.display()))?;
1491
1492    // Create real data parser
1493    let parser = RealDataParser::new(schema.clone());
1494
1495    // Get entries from SSTable
1496    let entries = reader.get_all_entries().await?;
1497    let mut processed = 0;
1498    let mut displayed = 0;
1499    let skip_count = skip.unwrap_or(0);
1500    let limit_count = limit.unwrap_or(entries.len());
1501
1502    println!("šŸ“Š Found {} entries in SSTable", entries.len());
1503
1504    let mut parsed_rows = Vec::new();
1505
1506    for (_table_id, key, value) in entries {
1507        if processed < skip_count {
1508            processed += 1;
1509            continue;
1510        }
1511
1512        if displayed >= limit_count {
1513            break;
1514        }
1515
1516        // Parse the entry using real data parser
1517        match parser.parse_entry(&key, &value) {
1518            Ok(parsed_row) => {
1519                parsed_rows.push(parsed_row);
1520                displayed += 1;
1521            }
1522            Err(e) => {
1523                eprintln!("āš ļø  Failed to parse row {}: {}", processed + 1, e);
1524            }
1525        }
1526        processed += 1;
1527    }
1528
1529    // Display results based on format
1530    match format {
1531        OutputFormat::Table => display_table_format(&parser.get_column_names(), &parsed_rows),
1532        OutputFormat::Json => display_json_format(&parsed_rows)?,
1533        OutputFormat::Csv => display_csv_format(&parser.get_column_names(), &parsed_rows)?,
1534        OutputFormat::Parquet => {
1535            return Err(anyhow::anyhow!("Parquet format is not supported for this command. Use --out json or --out csv instead."));
1536        }
1537    }
1538
1539    println!("\nāœ… Processed {processed} entries, displayed {displayed} rows");
1540    println!("šŸŽÆ Data source: LIVE SSTable file (no mocking!)");
1541
1542    Ok(())
1543}
1544
1545/// Execute a CQL SELECT query against SSTable data (live data, no mocking!)
1546pub async fn execute_select_query(
1547    sstable_path: &Path,
1548    schema_path: &Path,
1549    query: &str,
1550    format: OutputFormat,
1551    auto_detect: bool,
1552    cassandra_version: Option<String>,
1553) -> Result<()> {
1554    // Load schema from file (supports both .cql and .json)
1555    let _schema = load_schema_file(schema_path, auto_detect, cassandra_version.as_deref())?;
1556
1557    println!("šŸš€ Executing CQL query against LIVE SSTable data!");
1558    println!("šŸ“‚ SSTable: {}", sstable_path.display());
1559    println!("šŸ“‹ Schema: {}", schema_path.display());
1560    println!("šŸ” Query: {query}");
1561
1562    // Smart path resolution: if directory, find the Data.db file
1563    let actual_sstable_path = resolve_sstable_path(sstable_path)?;
1564    println!("šŸ“„ Data file: {}", actual_sstable_path.display());
1565
1566    // Create query executor
1567    let executor = QueryExecutor::new(QueryExecutorConfig);
1568
1569    // Execute the query
1570    let result = executor.execute_select(query).await?;
1571
1572    // Display results
1573    match format {
1574        OutputFormat::Table => result.display_table(),
1575        OutputFormat::Json => result.display_json()?,
1576        OutputFormat::Csv => result.display_csv()?,
1577        OutputFormat::Parquet => {
1578            return Err(anyhow::anyhow!("Parquet format is not supported for this command. Use --out json or --out csv instead."));
1579        }
1580    }
1581
1582    Ok(())
1583}
1584
1585/// Resolve SSTable path: if directory, find the Data.db file
1586fn resolve_sstable_path(sstable_path: &Path) -> Result<PathBuf> {
1587    if sstable_path.is_file() {
1588        // If it's already a file, use it directly
1589        return Ok(sstable_path.to_path_buf());
1590    }
1591
1592    if sstable_path.is_dir() {
1593        // If it's a directory, look for SSTable data files
1594        println!("šŸ“ Directory detected, looking for SSTable files...");
1595
1596        // Look for common SSTable data file patterns
1597        let patterns = ["*-Data.db", "*-big-Data.db", "nb-*-big-Data.db"];
1598
1599        for pattern in &patterns {
1600            if let Ok(entries) = std::fs::read_dir(sstable_path) {
1601                for entry in entries.flatten() {
1602                    let file_name = entry.file_name();
1603                    let file_name_str = file_name.to_string_lossy();
1604
1605                    // Match the pattern
1606                    if pattern.contains("*") {
1607                        let pattern_parts: Vec<&str> = pattern.split('*').collect();
1608                        if pattern_parts.len() == 2 {
1609                            let starts_with = pattern_parts[0];
1610                            let ends_with = pattern_parts[1];
1611
1612                            if file_name_str.starts_with(starts_with)
1613                                && file_name_str.ends_with(ends_with)
1614                            {
1615                                let data_file = entry.path();
1616                                println!("āœ“ Found SSTable data file: {}", data_file.display());
1617                                return Ok(data_file);
1618                            }
1619                        } else if pattern_parts.len() == 3 {
1620                            let starts_with = pattern_parts[0];
1621                            let middle = pattern_parts[1];
1622                            let ends_with = pattern_parts[2];
1623
1624                            if file_name_str.starts_with(starts_with)
1625                                && file_name_str.contains(middle)
1626                                && file_name_str.ends_with(ends_with)
1627                            {
1628                                let data_file = entry.path();
1629                                println!("āœ“ Found SSTable data file: {}", data_file.display());
1630                                return Ok(data_file);
1631                            }
1632                        }
1633                    }
1634                }
1635            }
1636        }
1637
1638        return Err(anyhow::anyhow!(
1639            "No SSTable data files found in directory: {}\nLooked for: {}",
1640            sstable_path.display(),
1641            patterns.join(", ")
1642        ));
1643    }
1644
1645    Err(anyhow::anyhow!(
1646        "Path is neither a file nor a directory: {}",
1647        sstable_path.display()
1648    ))
1649}
1650
1651/// Load schema from JSON or CQL file
1652fn load_schema_file(
1653    schema_path: &Path,
1654    _auto_detect: bool,
1655    _cassandra_version: Option<&str>,
1656) -> Result<TableSchema> {
1657    let schema_content = std::fs::read_to_string(schema_path)
1658        .with_context(|| format!("Failed to read schema file: {}", schema_path.display()))?;
1659
1660    println!("šŸ“‹ Loading schema from: {}", schema_path.display());
1661
1662    // Determine file type by extension
1663    let extension = schema_path
1664        .extension()
1665        .and_then(|s| s.to_str())
1666        .unwrap_or("");
1667
1668    match extension.to_lowercase().as_str() {
1669        "json" => {
1670            println!("šŸ“ Parsing JSON schema format");
1671            // Parse JSON schema
1672            let json_schema: serde_json::Value = serde_json::from_str(&schema_content)
1673                .with_context(|| "Failed to parse JSON schema")?;
1674
1675            // Convert JSON to TableSchema
1676            parse_json_schema(&json_schema)
1677        }
1678        "cql" | "sql" | "" => {
1679            println!("šŸ“ Parsing CQL schema format");
1680            // Parse CQL schema
1681            parse_cql_schema(&schema_content).with_context(|| "Failed to parse CQL schema")
1682        }
1683        _ => Err(anyhow::anyhow!(
1684            "Unsupported schema file extension: .{}\nSupported formats: .json, .cql",
1685            extension
1686        )),
1687    }
1688}
1689
1690/// Parse JSON schema format
1691fn parse_json_schema(json: &serde_json::Value) -> Result<TableSchema> {
1692    let keyspace = json["keyspace"]
1693        .as_str()
1694        .ok_or_else(|| anyhow::anyhow!("Missing keyspace in schema"))?;
1695    let table = json["table"]
1696        .as_str()
1697        .ok_or_else(|| anyhow::anyhow!("Missing table in schema"))?;
1698
1699    let columns = json["columns"]
1700        .as_object()
1701        .ok_or_else(|| anyhow::anyhow!("Missing columns in schema"))?;
1702
1703    let mut schema_columns = Vec::new();
1704    let mut partition_keys = Vec::new();
1705    let mut clustering_columns = Vec::new();
1706
1707    for (col_name, col_info) in columns {
1708        let col_obj = col_info
1709            .as_object()
1710            .ok_or_else(|| anyhow::anyhow!("Invalid column definition for {}", col_name))?;
1711
1712        let col_type = col_obj["type"]
1713            .as_str()
1714            .ok_or_else(|| anyhow::anyhow!("Missing type for column {}", col_name))?;
1715        let col_kind = col_obj["kind"]
1716            .as_str()
1717            .ok_or_else(|| anyhow::anyhow!("Missing kind for column {}", col_name))?;
1718
1719        let column = Column {
1720            name: col_name.clone(),
1721            data_type: col_type.to_string(),
1722            nullable: true,   // Default to nullable
1723            default: None,    // No default value
1724            is_static: false, // SSTable metadata doesn't track static columns
1725        };
1726
1727        match col_kind {
1728            "PartitionKey" => {
1729                partition_keys.push(KeyColumn {
1730                    name: col_name.clone(),
1731                    position: partition_keys.len(),
1732                    data_type: col_type.to_string(),
1733                });
1734            }
1735            "ClusteringColumn" => {
1736                clustering_columns.push(ClusteringColumn {
1737                    name: col_name.clone(),
1738                    position: clustering_columns.len(),
1739                    data_type: col_type.to_string(),
1740                    order: ClusteringOrder::Asc,
1741                });
1742            }
1743            "Regular" => {
1744                // Regular column - just add to columns list
1745            }
1746            _ => return Err(anyhow::anyhow!("Unknown column kind: {}", col_kind)),
1747        }
1748
1749        schema_columns.push(column);
1750    }
1751
1752    Ok(TableSchema {
1753        keyspace: keyspace.to_string(),
1754        table: table.to_string(),
1755        columns: schema_columns,
1756        partition_keys,
1757        clustering_keys: clustering_columns,
1758        comments: HashMap::new(),
1759    })
1760}
1761
1762/// Display results in table format
1763fn display_table_format(column_names: &[String], rows: &[ParsedRow]) {
1764    use prettytable::{Cell, Row, Table};
1765
1766    if rows.is_empty() {
1767        println!("šŸ“­ No results found");
1768        return;
1769    }
1770
1771    let mut table = Table::new();
1772
1773    // Add header
1774    let mut header = Row::empty();
1775    for column in column_names {
1776        header.add_cell(Cell::new(column));
1777    }
1778    table.add_row(header);
1779
1780    // Add data rows
1781    for parsed_row in rows {
1782        let mut row = Row::empty();
1783        for column in column_names {
1784            let cell_value = parsed_row
1785                .get(column)
1786                .map(|v| v.to_string())
1787                .unwrap_or_else(|| "NULL".to_string());
1788            row.add_cell(Cell::new(&cell_value));
1789        }
1790        table.add_row(row);
1791    }
1792
1793    println!("\nšŸ“Š Live SSTable Data Results:");
1794    println!("{}", "=".repeat(50));
1795    table.printstd();
1796}
1797
1798/// Display results in JSON format
1799fn display_json_format(rows: &[ParsedRow]) -> Result<()> {
1800    let json_rows: Vec<serde_json::Value> = rows.iter().map(|row| row.to_json()).collect();
1801
1802    println!("{}", serde_json::to_string_pretty(&json_rows)?);
1803    Ok(())
1804}
1805
1806/// Display results in CSV format
1807fn display_csv_format(column_names: &[String], rows: &[ParsedRow]) -> Result<()> {
1808    let mut wtr = csv::Writer::from_writer(std::io::stdout());
1809
1810    // Write header
1811    wtr.write_record(column_names)?;
1812
1813    // Write data rows
1814    for parsed_row in rows {
1815        let mut record = Vec::new();
1816        for column in column_names {
1817            let cell_value = parsed_row
1818                .get(column)
1819                .map(|v| v.to_string())
1820                .unwrap_or_else(|| "NULL".to_string());
1821            record.push(cell_value);
1822        }
1823        wtr.write_record(&record)?;
1824    }
1825
1826    wtr.flush()?;
1827    Ok(())
1828}
1829
1830/// Export SSTable data to file
1831#[cfg(feature = "state_machine")]
1832pub async fn export_sstable(
1833    sstable_path: &Path,
1834    schema_path: &Path,
1835    output_path: &Path,
1836    format: ExportFormat,
1837) -> Result<()> {
1838    // Load schema with auto-detection
1839    let schema = load_schema_file(schema_path, false, None)?;
1840
1841    let config = cqlite_core::Config::default();
1842    let platform = Arc::new(cqlite_core::platform::Platform::new(&config).await?);
1843    let reader = SSTableReader::open(sstable_path, &config, platform)
1844        .await
1845        .with_context(|| format!("Failed to open SSTable: {}", sstable_path.display()))?;
1846
1847    let mut output_file = File::create(output_path)
1848        .with_context(|| format!("Failed to create output file: {}", output_path.display()))?;
1849
1850    println!("Exporting SSTable: {}", sstable_path.display());
1851    println!("Output: {} ({})", output_path.display(), format);
1852
1853    let pb = ProgressBar::new_spinner();
1854    pb.set_style(
1855        ProgressStyle::default_spinner()
1856            .template("{spinner:.green} [{elapsed_precise}] {pos} rows exported")
1857            .unwrap(),
1858    );
1859
1860    match format {
1861        ExportFormat::Json => export_as_json(&reader, &schema, &mut output_file, &pb).await,
1862        ExportFormat::Csv => export_as_csv(&reader, &schema, &mut output_file, &pb).await,
1863        ExportFormat::Parquet => {
1864            // Parquet writer manages its own file handle, so we drop the one we created
1865            drop(output_file);
1866            export_as_parquet(&reader, &schema, output_path, &pb).await
1867        }
1868        ExportFormat::Cql => export_as_cql(&reader, &schema, &mut output_file, &pb).await,
1869    }
1870}
1871
1872/// Export SSTable data as JSON
1873#[cfg(feature = "state_machine")]
1874async fn export_as_json(
1875    reader: &SSTableReader,
1876    schema: &TableSchema,
1877    output_file: &mut File,
1878    pb: &ProgressBar,
1879) -> Result<()> {
1880    use std::io::Write;
1881
1882    let parser = RealDataParser::new(schema.clone());
1883    let entries = reader.get_all_entries().await?;
1884
1885    let mut json_objects = Vec::new();
1886
1887    for (index, (_table_id, key, value)) in entries.iter().enumerate() {
1888        pb.set_position(index as u64);
1889
1890        match parser.parse_entry(key, value) {
1891            Ok(parsed_row) => {
1892                json_objects.push(parsed_row.to_json());
1893            }
1894            Err(e) => {
1895                eprintln!("āš ļø  Failed to parse row {}: {}", index + 1, e);
1896            }
1897        }
1898    }
1899
1900    let json_output = serde_json::to_string_pretty(&json_objects)?;
1901    output_file.write_all(json_output.as_bytes())?;
1902
1903    pb.finish_with_message(format!("Exported {} rows to JSON", json_objects.len()));
1904    Ok(())
1905}
1906
1907/// Export SSTable data as CSV
1908#[cfg(feature = "state_machine")]
1909async fn export_as_csv(
1910    reader: &SSTableReader,
1911    schema: &TableSchema,
1912    output_file: &mut File,
1913    pb: &ProgressBar,
1914) -> Result<()> {
1915    let parser = RealDataParser::new(schema.clone());
1916    let entries = reader.get_all_entries().await?;
1917
1918    let mut wtr = csv::Writer::from_writer(output_file);
1919    let column_names = parser.get_column_names();
1920
1921    // Write header
1922    wtr.write_record(&column_names)?;
1923
1924    let mut exported_count = 0;
1925
1926    for (index, (_table_id, key, value)) in entries.iter().enumerate() {
1927        pb.set_position(index as u64);
1928
1929        match parser.parse_entry(key, value) {
1930            Ok(parsed_row) => {
1931                let mut record = Vec::new();
1932                for column in &column_names {
1933                    let cell_value = parsed_row
1934                        .get(column)
1935                        .map(|v| v.to_string())
1936                        .unwrap_or_else(|| "NULL".to_string());
1937                    record.push(cell_value);
1938                }
1939                wtr.write_record(&record)?;
1940                exported_count += 1;
1941            }
1942            Err(e) => {
1943                eprintln!("āš ļø  Failed to parse row {}: {}", index + 1, e);
1944            }
1945        }
1946    }
1947
1948    wtr.flush()?;
1949    pb.finish_with_message(format!("Exported {exported_count} rows to CSV"));
1950    Ok(())
1951}
1952
1953/// Export SSTable data as CQL INSERT statements
1954#[cfg(feature = "state_machine")]
1955async fn export_as_cql(
1956    reader: &SSTableReader,
1957    schema: &TableSchema,
1958    output_file: &mut File,
1959    pb: &ProgressBar,
1960) -> Result<()> {
1961    use std::io::Write;
1962
1963    let parser = RealDataParser::new(schema.clone());
1964    let entries = reader.get_all_entries().await?;
1965    let column_names = parser.get_column_names();
1966
1967    // Write header
1968    writeln!(output_file, "-- CQL Export from CQLite")?;
1969    writeln!(
1970        output_file,
1971        "-- Table: {}.{}",
1972        schema.keyspace, schema.table
1973    )?;
1974    writeln!(
1975        output_file,
1976        "-- Generated: {}",
1977        chrono::Utc::now().to_rfc3339()
1978    )?;
1979    writeln!(output_file)?;
1980
1981    let mut exported_count = 0;
1982
1983    for (index, (_table_id, key, value)) in entries.iter().enumerate() {
1984        pb.set_position(index as u64);
1985
1986        match parser.parse_entry(key, value) {
1987            Ok(parsed_row) => {
1988                let values: Vec<String> = column_names
1989                    .iter()
1990                    .map(|col| {
1991                        parsed_row
1992                            .get(col)
1993                            .map(|_v| "NULL".to_string())
1994                            .unwrap_or_else(|| "NULL".to_string())
1995                    })
1996                    .collect();
1997
1998                writeln!(
1999                    output_file,
2000                    "INSERT INTO {}.{} ({}) VALUES ({});",
2001                    schema.keyspace,
2002                    schema.table,
2003                    column_names.join(", "),
2004                    values.join(", ")
2005                )?;
2006                exported_count += 1;
2007            }
2008            Err(e) => {
2009                eprintln!("āš ļø  Failed to parse row {}: {}", index + 1, e);
2010            }
2011        }
2012    }
2013
2014    pb.finish_with_message(format!("Exported {exported_count} rows to CQL"));
2015    Ok(())
2016}
2017
2018/// Export SSTable data as Parquet using StreamingParquetWriter
2019///
2020/// This function converts SSTable entries to QueryRow format and uses
2021/// the StreamingParquetWriter for memory-efficient export.
2022#[cfg(feature = "state_machine")]
2023async fn export_as_parquet(
2024    reader: &SSTableReader,
2025    schema: &TableSchema,
2026    output_path: &Path,
2027    pb: &ProgressBar,
2028) -> Result<()> {
2029    use crate::output::parquet::create_streaming_parquet_writer;
2030    use crate::output::StreamingWriter;
2031
2032    let entries = reader.get_all_entries().await?;
2033
2034    if entries.is_empty() {
2035        pb.finish_with_message("No data to export");
2036        // Create empty Parquet file
2037        let output_file = File::create(output_path)
2038            .with_context(|| format!("Failed to create output file: {}", output_path.display()))?;
2039        let metadata = build_query_metadata_from_schema(schema);
2040        let mut writer = create_streaming_parquet_writer(output_file, &metadata, 10_000)
2041            .map_err(|e| anyhow::anyhow!("Failed to create Parquet writer: {}", e))?;
2042        writer
2043            .finalize()
2044            .map_err(|e| anyhow::anyhow!("Failed to finalize Parquet: {}", e))?;
2045        return Ok(());
2046    }
2047
2048    // Build QueryMetadata from schema
2049    let metadata = build_query_metadata_from_schema(schema);
2050
2051    // Create streaming Parquet writer
2052    let output_file = File::create(output_path)
2053        .with_context(|| format!("Failed to create output file: {}", output_path.display()))?;
2054    let mut writer = create_streaming_parquet_writer(output_file, &metadata, 10_000)
2055        .map_err(|e| anyhow::anyhow!("Failed to create Parquet writer: {}", e))?;
2056
2057    let mut chunk = Vec::with_capacity(1000);
2058    let mut exported_count = 0;
2059
2060    for (index, (_table_id, row_key, value)) in entries.iter().enumerate() {
2061        pb.set_position(index as u64);
2062
2063        // Convert SSTable entry to QueryRow
2064        let query_row = convert_entry_to_query_row(row_key, value, schema);
2065        chunk.push(query_row);
2066
2067        if chunk.len() >= 1000 {
2068            writer
2069                .write_chunk(&chunk)
2070                .map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
2071            exported_count += chunk.len();
2072            chunk.clear();
2073        }
2074    }
2075
2076    // Write remaining rows
2077    if !chunk.is_empty() {
2078        writer
2079            .write_chunk(&chunk)
2080            .map_err(|e| anyhow::anyhow!("Failed to write Parquet chunk: {}", e))?;
2081        exported_count += chunk.len();
2082    }
2083
2084    writer
2085        .finalize()
2086        .map_err(|e| anyhow::anyhow!("Failed to finalize Parquet: {}", e))?;
2087
2088    pb.finish_with_message(format!("Exported {} rows to Parquet", exported_count));
2089    Ok(())
2090}
2091
2092/// Build QueryMetadata from TableSchema for Parquet export
2093#[cfg(feature = "state_machine")]
2094fn build_query_metadata_from_schema(schema: &TableSchema) -> cqlite_core::query::QueryMetadata {
2095    use cqlite_core::query::{ColumnInfo, QueryMetadata};
2096
2097    let mut columns = Vec::new();
2098    let mut position = 0;
2099
2100    // Add partition keys
2101    // Mark as nullable because direct SSTable export may not extract all key values
2102    // from the raw binary RowKey format
2103    for pk in &schema.partition_keys {
2104        columns.push(ColumnInfo {
2105            name: pk.name.clone(),
2106            data_type: parse_cql_type_string(&pk.data_type),
2107            nullable: true,
2108            position,
2109            table_name: Some(format!("{}.{}", schema.keyspace, schema.table)),
2110            cql_type: None,
2111        });
2112        position += 1;
2113    }
2114
2115    // Add clustering keys
2116    // Mark as nullable because direct SSTable export may not extract all key values
2117    for ck in &schema.clustering_keys {
2118        columns.push(ColumnInfo {
2119            name: ck.name.clone(),
2120            data_type: parse_cql_type_string(&ck.data_type),
2121            nullable: true,
2122            position,
2123            table_name: Some(format!("{}.{}", schema.keyspace, schema.table)),
2124            cql_type: None,
2125        });
2126        position += 1;
2127    }
2128
2129    // Add regular columns
2130    for col in &schema.columns {
2131        columns.push(ColumnInfo {
2132            name: col.name.clone(),
2133            data_type: parse_cql_type_string(&col.data_type),
2134            nullable: true,
2135            position,
2136            table_name: Some(format!("{}.{}", schema.keyspace, schema.table)),
2137            cql_type: None,
2138        });
2139        position += 1;
2140    }
2141
2142    QueryMetadata {
2143        columns,
2144        ..Default::default()
2145    }
2146}
2147
2148/// Parse CQL type string to DataType
2149#[cfg(feature = "state_machine")]
2150fn parse_cql_type_string(type_str: &str) -> cqlite_core::types::DataType {
2151    use cqlite_core::types::DataType;
2152
2153    match type_str.to_lowercase().as_str() {
2154        "text" | "varchar" | "ascii" => DataType::Text,
2155        "int" | "integer" => DataType::Integer,
2156        "bigint" => DataType::BigInt,
2157        "smallint" => DataType::SmallInt,
2158        "tinyint" => DataType::TinyInt,
2159        "float" => DataType::Float32,
2160        "double" => DataType::Float,
2161        "boolean" => DataType::Boolean,
2162        "timestamp" => DataType::Timestamp,
2163        "date" => DataType::Timestamp, // Map date to Timestamp
2164        "time" => DataType::BigInt,    // Map time to BigInt (nanoseconds)
2165        "uuid" | "timeuuid" => DataType::Uuid,
2166        "blob" => DataType::Blob,
2167        "counter" => DataType::BigInt, // Map counter to BigInt
2168        "varint" => DataType::Blob,    // Map varint to Blob
2169        "decimal" => DataType::Text,   // Map decimal to Text (for now)
2170        s if s.starts_with("list") => DataType::List,
2171        s if s.starts_with("set") => DataType::Set,
2172        s if s.starts_with("map") => DataType::Map,
2173        s if s.starts_with("frozen") => DataType::Frozen,
2174        s if s.starts_with("tuple") => DataType::Tuple,
2175        _ => DataType::Text, // Default fallback
2176    }
2177}
2178
2179/// Convert SSTable entry to QueryRow for Parquet export
2180#[cfg(feature = "state_machine")]
2181fn convert_entry_to_query_row(
2182    row_key: &cqlite_core::RowKey,
2183    value: &cqlite_core::Value,
2184    schema: &TableSchema,
2185) -> cqlite_core::query::QueryRow {
2186    use cqlite_core::query::{QueryRow, RowMetadata};
2187    use cqlite_core::Value;
2188    use std::collections::HashMap;
2189
2190    let mut values: HashMap<String, Value> = HashMap::new();
2191
2192    // Extract values from the Value (which is typically a Map for parsed rows)
2193    match value {
2194        Value::Map(pairs) => {
2195            // Each pair is (key_value, column_value)
2196            for (k, v) in pairs {
2197                if let Value::Text(col_name) = k {
2198                    values.insert(col_name.clone(), v.clone());
2199                }
2200            }
2201        }
2202        Value::Blob(data) => {
2203            // For raw blob data, assign to first regular column if available
2204            if let Some(first_col) = schema.columns.first() {
2205                values.insert(first_col.name.clone(), Value::Blob(data.clone()));
2206            }
2207        }
2208        Value::Text(s) => {
2209            if let Some(first_col) = schema.columns.first() {
2210                values.insert(first_col.name.clone(), Value::Text(s.clone()));
2211            }
2212        }
2213        other => {
2214            // For other value types, assign to first column
2215            if let Some(first_col) = schema.columns.first() {
2216                values.insert(first_col.name.clone(), other.clone());
2217            }
2218        }
2219    }
2220
2221    // Ensure all schema columns have entries (use Null for missing)
2222    for pk in &schema.partition_keys {
2223        values.entry(pk.name.clone()).or_insert(Value::Null);
2224    }
2225    for ck in &schema.clustering_keys {
2226        values.entry(ck.name.clone()).or_insert(Value::Null);
2227    }
2228    for col in &schema.columns {
2229        values.entry(col.name.clone()).or_insert(Value::Null);
2230    }
2231
2232    QueryRow {
2233        values,
2234        key: row_key.clone(),
2235        metadata: RowMetadata::default(),
2236    }
2237}
2238
2239/// Enhanced SSTable reader with interactive features, progress tracking, and export
2240pub async fn read_sstable_enhanced(
2241    sstable_path: &Path,
2242    schema_path: &Path,
2243    limit: Option<usize>,
2244    skip: Option<usize>,
2245    generation: Option<u32>,
2246    format: OutputFormat,
2247    auto_detect: bool,
2248    cassandra_version: Option<String>,
2249    interactive: bool,
2250    progress: bool,
2251    export: Option<PathBuf>,
2252) -> Result<()> {
2253    println!("šŸš€ Enhanced SSTable Reader");
2254    println!("šŸ“‚ SSTable: {}", sstable_path.display());
2255    println!("šŸ“‹ Schema: {}", schema_path.display());
2256
2257    if interactive {
2258        println!("šŸ” Interactive mode enabled - use Ctrl+C to exit");
2259    }
2260
2261    if progress {
2262        println!("šŸ“Š Progress tracking enabled");
2263    }
2264
2265    if let Some(ref export_path) = export {
2266        println!("šŸ“¤ Export enabled to: {}", export_path.display());
2267    }
2268
2269    // Use the existing read_sstable function as base
2270    let result = read_sstable(
2271        sstable_path,
2272        schema_path,
2273        limit,
2274        skip,
2275        generation,
2276        format,
2277        auto_detect,
2278        cassandra_version,
2279    )
2280    .await;
2281
2282    // TODO: Add interactive features when needed
2283    // TODO: Add enhanced progress tracking
2284    // TODO: Add export functionality
2285
2286    if interactive {
2287        println!("\nšŸ” Interactive mode features coming soon!");
2288        println!("   - Filter data interactively");
2289        println!("   - Navigate through pages");
2290        println!("   - Query-like interface");
2291    }
2292
2293    if let Some(export_path) = export {
2294        println!("\nšŸ“¤ Export functionality coming soon!");
2295        println!("   Target: {}", export_path.display());
2296        println!("   Formats: JSON, CSV, Parquet");
2297    }
2298
2299    result
2300}
2301
2302/// Validate SSTable format, integrity, and data consistency
2303pub async fn validate_sstable(
2304    sstable_path: &Path,
2305    schema_path: Option<&Path>,
2306    deep: bool,
2307    fix: bool,
2308    report_path: Option<&Path>,
2309) -> Result<()> {
2310    println!("šŸ” SSTable Validation");
2311    println!("šŸ“‚ SSTable: {}", sstable_path.display());
2312
2313    if let Some(schema) = schema_path {
2314        println!("šŸ“‹ Schema: {}", schema.display());
2315    }
2316
2317    if deep {
2318        println!("šŸ”¬ Deep validation enabled (thorough but slower)");
2319    }
2320
2321    if fix {
2322        println!("šŸ”§ Auto-fix enabled for recoverable issues");
2323    }
2324
2325    if let Some(report) = report_path {
2326        println!("šŸ“‹ Report will be saved to: {}", report.display());
2327    }
2328
2329    // Smart path resolution
2330    let actual_sstable_path = resolve_sstable_path(sstable_path)?;
2331    println!("šŸ“„ Data file: {}", actual_sstable_path.display());
2332
2333    let mut issues_found = 0;
2334    let issues_fixed = 0;
2335    let mut validation_errors = Vec::new();
2336
2337    // Basic file existence and readability
2338    println!("\nšŸ” Basic file validation:");
2339    if !actual_sstable_path.exists() {
2340        let error = "āŒ SSTable file does not exist";
2341        println!("{error}");
2342        validation_errors.push(error.to_string());
2343        issues_found += 1;
2344    } else {
2345        println!("āœ… SSTable file exists");
2346
2347        // Check file permissions
2348        match std::fs::metadata(&actual_sstable_path) {
2349            Ok(metadata) => {
2350                println!("āœ… File readable (size: {} bytes)", metadata.len());
2351
2352                if metadata.len() == 0 {
2353                    let error = "āš ļø  Warning: SSTable file is empty";
2354                    println!("{error}");
2355                    validation_errors.push(error.to_string());
2356                    issues_found += 1;
2357                }
2358            }
2359            Err(e) => {
2360                let error = format!("āŒ Cannot read file metadata: {e}");
2361                println!("{error}");
2362                validation_errors.push(error);
2363                issues_found += 1;
2364            }
2365        }
2366    }
2367
2368    // Try loading with bulletproof reader
2369    println!("\nšŸ” Format validation:");
2370    match BulletproofReader::open(&actual_sstable_path) {
2371        Ok(mut reader) => {
2372            println!("āœ… SSTable format is readable");
2373
2374            let info = reader.info();
2375            println!("   Format: {:?}", info.format);
2376            println!("   Generation: {}", info.generation_numeric().unwrap_or(0));
2377            println!("   Size: {} bytes", info.size);
2378
2379            if let Some(compression) = reader.compression_info() {
2380                println!(
2381                    "   Compression: {} (chunk size: {})",
2382                    compression.algorithm, compression.chunk_length
2383                );
2384            }
2385
2386            // Deep validation
2387            if deep {
2388                println!("\nšŸ”¬ Deep validation:");
2389                match reader.parse_sstable_data() {
2390                    Ok(entries) => {
2391                        println!("āœ… Successfully parsed {} entries", entries.len());
2392
2393                        // Validate data consistency if schema provided
2394                        if let Some(schema_path) = schema_path {
2395                            match load_schema_file(schema_path, true, None) {
2396                                Ok(schema) => {
2397                                    println!("āœ… Schema loaded successfully");
2398                                    let parser = RealDataParser::new(schema);
2399
2400                                    let mut parsing_errors = 0;
2401                                    for entry in entries.iter() {
2402                                        let key = entry.key.clone();
2403                                        let value =
2404                                            cqlite_core::Value::Text(format!("{:?}", entry.key));
2405
2406                                        if parser.parse_entry(&key, &value).is_err() {
2407                                            parsing_errors += 1;
2408                                        }
2409                                    }
2410
2411                                    if parsing_errors > 0 {
2412                                        let error = format!(
2413                                            "āš ļø  {parsing_errors} entries failed schema validation"
2414                                        );
2415                                        println!("{error}");
2416                                        validation_errors.push(error);
2417                                        issues_found += parsing_errors;
2418                                    } else {
2419                                        println!("āœ… All entries match schema");
2420                                    }
2421                                }
2422                                Err(e) => {
2423                                    let error =
2424                                        format!("āš ļø  Could not load schema for validation: {e}");
2425                                    println!("{error}");
2426                                    validation_errors.push(error);
2427                                }
2428                            }
2429                        }
2430                    }
2431                    Err(e) => {
2432                        let error = format!("āŒ Failed to parse SSTable data: {e}");
2433                        println!("{error}");
2434                        validation_errors.push(error);
2435                        issues_found += 1;
2436                    }
2437                }
2438            }
2439        }
2440        Err(e) => {
2441            let error = format!("āŒ Cannot open SSTable with bulletproof reader: {e}");
2442            println!("{error}");
2443            validation_errors.push(error);
2444            issues_found += 1;
2445        }
2446    }
2447
2448    // Generate report
2449    if let Some(report_path) = report_path {
2450        let mut report_content = format!(
2451            "# SSTable Validation Report\n\n\
2452            **File:** {}\n\
2453            **Validation Time:** {}\n\
2454            **Deep Validation:** {}\n\
2455            **Auto-fix Enabled:** {}\n\n\
2456            ## Summary\n\
2457            - Issues Found: {}\n\
2458            - Issues Fixed: {}\n\n\
2459            ## Details\n",
2460            sstable_path.display(),
2461            chrono::Utc::now().to_rfc3339(),
2462            deep,
2463            fix,
2464            issues_found,
2465            issues_fixed
2466        );
2467
2468        for error in &validation_errors {
2469            report_content.push_str(&format!("- {error}\n"));
2470        }
2471
2472        std::fs::write(report_path, report_content)
2473            .with_context(|| format!("Failed to write report to {}", report_path.display()))?;
2474
2475        println!("\nšŸ“‹ Validation report saved to: {}", report_path.display());
2476    }
2477
2478    // Summary
2479    println!("\nšŸ“Š Validation Summary:");
2480    println!("   Issues found: {issues_found}");
2481    println!("   Issues fixed: {issues_fixed}");
2482
2483    if issues_found == 0 {
2484        println!("āœ… SSTable validation passed!");
2485    } else if fix && issues_fixed == issues_found {
2486        println!("šŸ”§ All issues fixed!");
2487    } else {
2488        println!("āš ļø  {} issues remain", issues_found - issues_fixed);
2489    }
2490
2491    Ok(())
2492}
2493
2494/// Analyze SSTable structure, statistics, and performance characteristics
2495pub async fn analyze_sstable(
2496    sstable_path: &Path,
2497    schema_path: Option<&Path>,
2498    detailed: bool,
2499    infer_schema: bool,
2500    report_path: Option<&Path>,
2501) -> Result<()> {
2502    println!("šŸ“Š SSTable Analysis");
2503    println!("šŸ“‚ SSTable: {}", sstable_path.display());
2504
2505    if let Some(schema) = schema_path {
2506        println!("šŸ“‹ Schema: {}", schema.display());
2507    }
2508
2509    if detailed {
2510        println!("šŸ” Detailed analysis enabled");
2511    }
2512
2513    if infer_schema {
2514        println!("🧠 Schema inference enabled");
2515    }
2516
2517    if let Some(report) = report_path {
2518        println!("šŸ“‹ Report will be saved to: {}", report.display());
2519    }
2520
2521    // Smart path resolution
2522    let actual_sstable_path = resolve_sstable_path(sstable_path)?;
2523    println!("šŸ“„ Data file: {}", actual_sstable_path.display());
2524
2525    let mut analysis_results = Vec::new();
2526
2527    // File-level analysis
2528    println!("\nšŸ“ File Analysis:");
2529    match std::fs::metadata(&actual_sstable_path) {
2530        Ok(metadata) => {
2531            let file_size = metadata.len();
2532            println!(
2533                "   File size: {} bytes ({:.2} MB)",
2534                file_size,
2535                file_size as f64 / 1_048_576.0
2536            );
2537            analysis_results.push(format!("File size: {file_size} bytes"));
2538
2539            if let Ok(created) = metadata.created() {
2540                println!("   Created: {created:?}");
2541            }
2542            if let Ok(modified) = metadata.modified() {
2543                println!("   Modified: {modified:?}");
2544            }
2545        }
2546        Err(e) => {
2547            println!("āŒ Cannot read file metadata: {e}");
2548            return Err(anyhow::anyhow!("File metadata not accessible"));
2549        }
2550    }
2551
2552    // Format analysis
2553    println!("\nšŸ” Format Analysis:");
2554    match BulletproofReader::open(&actual_sstable_path) {
2555        Ok(mut reader) => {
2556            let info = reader.info();
2557            println!("   Format: {:?}", info.format);
2558            println!("   Generation: {}", info.generation_numeric().unwrap_or(0));
2559            println!("   Size: {} bytes", info.size);
2560
2561            analysis_results.push(format!("Format: {:?}", info.format));
2562            analysis_results.push(format!(
2563                "Generation: {}",
2564                info.generation_numeric().unwrap_or(0)
2565            ));
2566
2567            if let Some(compression) = reader.compression_info() {
2568                println!("   Compression: {}", compression.algorithm);
2569                println!("   Chunk length: {} bytes", compression.chunk_length);
2570                analysis_results.push(format!("Compression: {}", compression.algorithm));
2571            } else {
2572                println!("   Compression: None");
2573                analysis_results.push("Compression: None".to_string());
2574            }
2575
2576            // Data analysis
2577            println!("\nšŸ“Š Data Analysis:");
2578            match reader.parse_sstable_data() {
2579                Ok(entries) => {
2580                    let entry_count = entries.len();
2581                    println!("   Total entries: {entry_count}");
2582                    analysis_results.push(format!("Total entries: {entry_count}"));
2583
2584                    if entry_count > 0 {
2585                        // Calculate average key size
2586                        let total_key_size: usize =
2587                            entries.iter().map(|e| format!("{:?}", e.key).len()).sum();
2588                        let avg_key_size = total_key_size / entry_count;
2589                        println!("   Average key size: {avg_key_size} bytes");
2590                        analysis_results.push(format!("Average key size: {avg_key_size} bytes"));
2591
2592                        // Show sample entries
2593                        println!("\nšŸ“‹ Sample Entries (first 5):");
2594                        for (i, entry) in entries.iter().take(5).enumerate() {
2595                            println!(
2596                                "   {}. Key: {:?}, Info: {}",
2597                                i + 1,
2598                                entry.key,
2599                                entry.format_info
2600                            );
2601                        }
2602                    }
2603
2604                    // Detailed analysis
2605                    if detailed {
2606                        println!("\nšŸ” Detailed Statistics:");
2607
2608                        // Key distribution analysis
2609                        let mut key_lengths = entries
2610                            .iter()
2611                            .map(|e| format!("{:?}", e.key).len())
2612                            .collect::<Vec<_>>();
2613                        key_lengths.sort_unstable();
2614
2615                        if !key_lengths.is_empty() {
2616                            let min_key_len = key_lengths[0];
2617                            let max_key_len = key_lengths[key_lengths.len() - 1];
2618                            let median_key_len = key_lengths[key_lengths.len() / 2];
2619
2620                            println!(
2621                                "   Key length min/max/median: {min_key_len}/{max_key_len}/{median_key_len}"
2622                            );
2623                            analysis_results.push(format!(
2624                                "Key lengths - min: {min_key_len}, max: {max_key_len}, median: {median_key_len}"
2625                            ));
2626                        }
2627
2628                        // TODO: Add more detailed statistics
2629                        println!("   šŸ“Š Advanced statistics coming soon!");
2630                    }
2631
2632                    // Schema inference
2633                    if infer_schema {
2634                        println!("\n🧠 Schema Inference:");
2635                        // TODO: Implement schema inference logic
2636                        println!("   🚧 Schema inference coming soon!");
2637                        analysis_results
2638                            .push("Schema inference: Feature in development".to_string());
2639                    }
2640                }
2641                Err(e) => {
2642                    println!("āŒ Failed to parse SSTable data: {e}");
2643                    analysis_results.push(format!("Parse error: {e}"));
2644                }
2645            }
2646        }
2647        Err(e) => {
2648            println!("āŒ Cannot open SSTable: {e}");
2649            return Err(anyhow::anyhow!("Cannot analyze SSTable: {}", e));
2650        }
2651    }
2652
2653    // Generate report
2654    if let Some(report_path) = report_path {
2655        let mut report_content = format!(
2656            "# SSTable Analysis Report\n\n\
2657            **File:** {}\n\
2658            **Analysis Time:** {}\n\
2659            **Detailed Analysis:** {}\n\
2660            **Schema Inference:** {}\n\n\
2661            ## Results\n",
2662            sstable_path.display(),
2663            chrono::Utc::now().to_rfc3339(),
2664            detailed,
2665            infer_schema
2666        );
2667
2668        for result in &analysis_results {
2669            report_content.push_str(&format!("- {result}\n"));
2670        }
2671
2672        std::fs::write(report_path, report_content)
2673            .with_context(|| format!("Failed to write report to {}", report_path.display()))?;
2674
2675        println!("\nšŸ“‹ Analysis report saved to: {}", report_path.display());
2676    }
2677
2678    println!("\nāœ… Analysis completed!");
2679
2680    Ok(())
2681}
2682
2683/// Benchmark SSTable read performance with various operations
2684pub async fn benchmark_sstable(
2685    sstable_path: &Path,
2686    schema_path: Option<&Path>,
2687    iterations: u32,
2688    operations: &str,
2689    report_path: Option<&Path>,
2690    memory_profile: bool,
2691) -> Result<()> {
2692    println!("šŸ SSTable Performance Benchmark");
2693    println!("šŸ“‚ SSTable: {}", sstable_path.display());
2694
2695    if let Some(schema) = schema_path {
2696        println!("šŸ“‹ Schema: {}", schema.display());
2697    }
2698
2699    println!("šŸ”„ Iterations: {iterations}");
2700    println!("šŸŽÆ Operations: {operations}");
2701
2702    if memory_profile {
2703        println!("šŸ“Š Memory profiling enabled");
2704    }
2705
2706    if let Some(report) = report_path {
2707        println!("šŸ“‹ Report will be saved to: {}", report.display());
2708    }
2709
2710    // Smart path resolution
2711    let actual_sstable_path = resolve_sstable_path(sstable_path)?;
2712    println!("šŸ“„ Data file: {}", actual_sstable_path.display());
2713
2714    let mut benchmark_results = Vec::new();
2715
2716    // Parse operations list
2717    let ops: Vec<&str> = if operations == "all" {
2718        vec!["read", "scan", "query"]
2719    } else {
2720        operations.split(',').map(|s| s.trim()).collect()
2721    };
2722
2723    println!("\nšŸš€ Starting benchmarks...");
2724
2725    for op in &ops {
2726        println!("\nšŸ“Š Benchmarking operation: {op}");
2727
2728        let mut times = Vec::new();
2729        let mut memory_usage = Vec::new();
2730
2731        for i in 1..=iterations {
2732            print!("   Iteration {i}/{iterations}: ");
2733
2734            let start_time = std::time::Instant::now();
2735            let initial_memory = if memory_profile {
2736                // TODO: Implement memory measurement
2737                0u64
2738            } else {
2739                0u64
2740            };
2741
2742            // Perform the operation
2743            let result = match *op {
2744                "read" => benchmark_read_operation(&actual_sstable_path).await,
2745                "scan" => benchmark_scan_operation(&actual_sstable_path).await,
2746                "query" => benchmark_query_operation(&actual_sstable_path, schema_path).await,
2747                _ => {
2748                    println!("āŒ Unknown operation: {op}");
2749                    continue;
2750                }
2751            };
2752
2753            let elapsed = start_time.elapsed();
2754            let final_memory = if memory_profile {
2755                // TODO: Implement memory measurement
2756                0u64
2757            } else {
2758                0u64
2759            };
2760
2761            match result {
2762                Ok(entries_processed) => {
2763                    println!(
2764                        "āœ… {}ms ({} entries)",
2765                        elapsed.as_millis(),
2766                        entries_processed
2767                    );
2768                    times.push(elapsed.as_millis() as f64);
2769                    if memory_profile {
2770                        memory_usage.push(final_memory.saturating_sub(initial_memory));
2771                    }
2772                }
2773                Err(e) => {
2774                    println!("āŒ Failed: {e}");
2775                }
2776            }
2777        }
2778
2779        // Calculate statistics
2780        if !times.is_empty() {
2781            times.sort_by(|a, b| a.partial_cmp(b).unwrap());
2782            let min_time = times[0];
2783            let max_time = times[times.len() - 1];
2784            let avg_time = times.iter().sum::<f64>() / times.len() as f64;
2785            let median_time = times[times.len() / 2];
2786
2787            println!("\n   šŸ“Š {op} Statistics:");
2788            println!("      Min time: {min_time:.2}ms");
2789            println!("      Max time: {max_time:.2}ms");
2790            println!("      Avg time: {avg_time:.2}ms");
2791            println!("      Median time: {median_time:.2}ms");
2792
2793            benchmark_results.push(format!(
2794                "{op}: min={min_time:.2}ms, max={max_time:.2}ms, avg={avg_time:.2}ms, median={median_time:.2}ms"
2795            ));
2796
2797            if memory_profile && !memory_usage.is_empty() {
2798                let avg_memory = memory_usage.iter().sum::<u64>() / memory_usage.len() as u64;
2799                println!("      Avg memory: {avg_memory} bytes");
2800                benchmark_results.push(format!("{op}: avg_memory={avg_memory}bytes"));
2801            }
2802        }
2803    }
2804
2805    // Generate report
2806    if let Some(report_path) = report_path {
2807        let mut report_content = format!(
2808            "# SSTable Benchmark Report\n\n\
2809            **File:** {}\n\
2810            **Benchmark Time:** {}\n\
2811            **Iterations:** {}\n\
2812            **Operations:** {}\n\
2813            **Memory Profiling:** {}\n\n\
2814            ## Results\n",
2815            sstable_path.display(),
2816            chrono::Utc::now().to_rfc3339(),
2817            iterations,
2818            operations,
2819            memory_profile
2820        );
2821
2822        for result in &benchmark_results {
2823            report_content.push_str(&format!("- {result}\n"));
2824        }
2825
2826        std::fs::write(report_path, report_content)
2827            .with_context(|| format!("Failed to write report to {}", report_path.display()))?;
2828
2829        println!("\nšŸ“‹ Benchmark report saved to: {}", report_path.display());
2830    }
2831
2832    println!("\nšŸ† Benchmark completed!");
2833
2834    Ok(())
2835}
2836
2837/// Benchmark read operation (open and basic info)
2838async fn benchmark_read_operation(sstable_path: &Path) -> Result<usize> {
2839    let reader = BulletproofReader::open(sstable_path).with_context(|| "Failed to open SSTable")?;
2840
2841    let _info = reader.info();
2842    Ok(1) // Return 1 as we processed the file info
2843}
2844
2845/// Benchmark scan operation (iterate through all entries)
2846async fn benchmark_scan_operation(sstable_path: &Path) -> Result<usize> {
2847    let mut reader =
2848        BulletproofReader::open(sstable_path).with_context(|| "Failed to open SSTable")?;
2849
2850    match reader.parse_sstable_data() {
2851        Ok(entries) => Ok(entries.len()),
2852        Err(_) => {
2853            // Fallback to basic read
2854            let _info = reader.info();
2855            Ok(0)
2856        }
2857    }
2858}
2859
2860/// Benchmark query operation (with schema parsing if available)
2861async fn benchmark_query_operation(
2862    sstable_path: &Path,
2863    schema_path: Option<&Path>,
2864) -> Result<usize> {
2865    let mut reader =
2866        BulletproofReader::open(sstable_path).with_context(|| "Failed to open SSTable")?;
2867
2868    match reader.parse_sstable_data() {
2869        Ok(entries) => {
2870            if let Some(schema_path) = schema_path {
2871                match load_schema_file(schema_path, true, None) {
2872                    Ok(schema) => {
2873                        let parser = RealDataParser::new(schema);
2874                        let mut parsed_count = 0;
2875
2876                        for entry in &entries {
2877                            let key = entry.key.clone();
2878                            let value = cqlite_core::Value::Text(format!("{:?}", entry.key));
2879
2880                            if parser.parse_entry(&key, &value).is_ok() {
2881                                parsed_count += 1;
2882                            }
2883                        }
2884
2885                        Ok(parsed_count)
2886                    }
2887                    Err(_) => Ok(entries.len()), // Fallback to just entry count
2888                }
2889            } else {
2890                Ok(entries.len())
2891            }
2892        }
2893        Err(_) => Ok(0),
2894    }
2895}