Skip to main content

cqlite_cli/commands/
write.rs

1//! Write command handlers for CLI write operations (Issue #392)
2//!
3//! This module provides command handlers for write operations including:
4//! - Mutation writes (JSON format)
5//! - Maintenance (compaction) operations
6//! - Write engine statistics
7//! - SSTable export
8//!
9//! All handlers require the `write-support` feature flag.
10
11#[cfg(feature = "write-support")]
12use anyhow::Context;
13use anyhow::Result;
14#[cfg(feature = "write-support")]
15use std::io::{BufRead, BufReader};
16#[allow(unused_imports)]
17use std::path::Path;
18#[cfg(feature = "write-support")]
19use std::time::{Duration, Instant};
20
21#[cfg(feature = "write-support")]
22use cqlite_core::storage::write_engine::{ExportOptions, MaintenanceReport, Mutation, WriteEngine};
23
24/// Result of a single write operation
25#[derive(Debug)]
26pub struct WriteResult {
27    /// Number of rows affected
28    pub rows_affected: u64,
29    /// Execution time in milliseconds
30    pub execution_time_ms: f64,
31}
32
33impl WriteResult {
34    /// Display the result to stdout
35    pub fn display(&self) {
36        println!(
37            "OK: {} row(s) affected ({:.1}ms)",
38            self.rows_affected, self.execution_time_ms
39        );
40    }
41}
42
43/// Result of a batch write operation
44#[derive(Debug)]
45pub struct BatchWriteResult {
46    /// Total number of rows affected
47    pub total_rows: u64,
48    /// Number of successful writes
49    pub successful_writes: u64,
50    /// Number of failed writes
51    pub failed_writes: u64,
52    /// Total execution time in milliseconds
53    pub execution_time_ms: f64,
54}
55
56impl BatchWriteResult {
57    /// Display the result to stdout
58    pub fn display(&self) {
59        println!(
60            "Batch complete: {} row(s) affected ({} succeeded, {} failed) in {:.1}ms",
61            self.total_rows, self.successful_writes, self.failed_writes, self.execution_time_ms
62        );
63    }
64}
65
66/// Write engine statistics
67#[derive(Debug)]
68pub struct WriteStats {
69    /// Current memtable size in bytes
70    pub memtable_size: usize,
71    /// Current memtable row count
72    pub memtable_rows: usize,
73    /// Current WAL size in bytes
74    pub wal_size: u64,
75    /// Current SSTable generation number
76    pub generation: u64,
77}
78
79impl WriteStats {
80    /// Display the statistics to stdout
81    pub fn display(&self) {
82        println!("Write Engine Statistics:");
83        println!("  Memtable size: {} bytes", self.memtable_size);
84        println!("  Memtable rows: {}", self.memtable_rows);
85        println!("  WAL size: {} bytes", self.wal_size);
86        println!("  Generation: {}", self.generation);
87    }
88}
89
90/// Export operation report
91#[derive(Debug)]
92pub struct ExportResult {
93    /// Output path of the Data.db file
94    pub output_path: std::path::PathBuf,
95    /// Number of rows exported
96    pub row_count: u64,
97    /// Size of the Data.db file in bytes
98    pub data_file_size: u64,
99    /// Total execution time in milliseconds
100    pub execution_time_ms: f64,
101}
102
103impl ExportResult {
104    /// Display the result to stdout
105    pub fn display(&self) {
106        println!("Export complete:");
107        println!("  Output: {}", self.output_path.display());
108        println!("  Rows: {}", self.row_count);
109        println!("  Size: {} bytes", self.data_file_size);
110        println!("  Time: {:.1}ms", self.execution_time_ms);
111    }
112}
113
114/// Handle a single mutation write from JSON
115///
116/// # Arguments
117///
118/// * `write_engine` - The write engine to use
119/// * `mutation_json` - JSON string representing the mutation
120///
121/// # Returns
122///
123/// A WriteResult on success, or an error if the mutation fails
124#[cfg(feature = "write-support")]
125pub async fn handle_mutation_write(
126    write_engine: &mut WriteEngine,
127    mutation_json: &str,
128) -> Result<WriteResult> {
129    let start = Instant::now();
130
131    // Parse the mutation from JSON
132    let mutation: Mutation =
133        serde_json::from_str(mutation_json).with_context(|| "Failed to parse mutation JSON")?;
134
135    // Write the mutation
136    write_engine
137        .write_async(mutation)
138        .await
139        .with_context(|| "Failed to write mutation")?;
140
141    Ok(WriteResult {
142        rows_affected: 1,
143        execution_time_ms: start.elapsed().as_secs_f64() * 1000.0,
144    })
145}
146
147/// Handle a file containing mutations in JSONL format
148///
149/// # Arguments
150///
151/// * `write_engine` - The write engine to use
152/// * `file_path` - Path to the JSONL file
153///
154/// # Returns
155///
156/// A BatchWriteResult on success, or an error if file reading fails
157#[cfg(feature = "write-support")]
158pub async fn handle_mutations_file(
159    write_engine: &mut WriteEngine,
160    file_path: &Path,
161) -> Result<BatchWriteResult> {
162    let start = Instant::now();
163
164    let file = std::fs::File::open(file_path)
165        .with_context(|| format!("Failed to open mutations file: {}", file_path.display()))?;
166
167    let reader = BufReader::new(file);
168    let mut successful_writes = 0u64;
169    let mut failed_writes = 0u64;
170    let mut line_number = 0u64;
171
172    for line in reader.lines() {
173        line_number += 1;
174        let line = line
175            .with_context(|| format!("Failed to read line {} from mutations file", line_number))?;
176
177        let trimmed = line.trim();
178        if trimmed.is_empty() || trimmed.starts_with('#') {
179            // Skip empty lines and comments
180            continue;
181        }
182
183        match serde_json::from_str::<Mutation>(trimmed) {
184            Ok(mutation) => match write_engine.write_async(mutation).await {
185                Ok(()) => {
186                    successful_writes += 1;
187                }
188                Err(e) => {
189                    eprintln!("Line {}: Write failed: {}", line_number, e);
190                    failed_writes += 1;
191                }
192            },
193            Err(e) => {
194                eprintln!("Line {}: Invalid JSON: {}", line_number, e);
195                failed_writes += 1;
196            }
197        }
198    }
199
200    Ok(BatchWriteResult {
201        total_rows: successful_writes,
202        successful_writes,
203        failed_writes,
204        execution_time_ms: start.elapsed().as_secs_f64() * 1000.0,
205    })
206}
207
208/// Handle the maintenance subcommand
209///
210/// # Arguments
211///
212/// * `write_engine` - The write engine to use
213/// * `budget_ms` - Time budget in milliseconds
214///
215/// # Returns
216///
217/// A MaintenanceReport on success
218#[cfg(feature = "write-support")]
219pub fn handle_maintenance(
220    write_engine: &mut WriteEngine,
221    budget_ms: u64,
222) -> Result<MaintenanceReport> {
223    let budget = Duration::from_millis(budget_ms);
224    write_engine
225        .maintenance_step(budget)
226        .with_context(|| "Maintenance step failed")
227}
228
229/// Display a maintenance report
230#[cfg(feature = "write-support")]
231pub fn display_maintenance_report(report: &MaintenanceReport) {
232    println!("Maintenance complete:");
233    println!("  Time spent: {:?}", report.time_spent);
234    println!("  Rows merged: {}", report.rows_merged);
235    println!("  Bytes written: {} bytes", report.bytes_written);
236    println!("  Pending compaction: {}", report.pending_compaction);
237    if !report.completed_merges.is_empty() {
238        println!("  Completed merges:");
239        for path in &report.completed_merges {
240            println!("    - {}", path.display());
241        }
242    }
243}
244
245/// Handle the write-stats subcommand
246///
247/// # Arguments
248///
249/// * `write_engine` - The write engine to query
250///
251/// # Returns
252///
253/// WriteStats containing current engine statistics
254#[cfg(feature = "write-support")]
255pub fn handle_write_stats(write_engine: &WriteEngine) -> Result<WriteStats> {
256    Ok(WriteStats {
257        memtable_size: write_engine.memtable_size(),
258        memtable_rows: write_engine.memtable_row_count(),
259        wal_size: write_engine.wal_size(),
260        generation: write_engine.generation(),
261    })
262}
263
264/// Handle the export-sstable subcommand
265///
266/// # Arguments
267///
268/// * `write_engine` - The write engine to export from
269/// * `output_dir` - Output directory for the SSTable files
270/// * `keyspace` - Keyspace name for the exported SSTable
271/// * `table` - Table name for the exported SSTable
272/// * `compact` - Run compaction before export to merge multiple SSTables
273/// * `skip_validate` - Skip validation after export
274///
275/// # Returns
276///
277/// An ExportResult on success
278#[cfg(feature = "write-support")]
279pub async fn handle_export(
280    write_engine: &mut WriteEngine,
281    output_dir: &Path,
282    keyspace: &str,
283    table: &str,
284    compact: bool,
285    skip_validate: bool,
286) -> Result<ExportResult> {
287    let start = Instant::now();
288
289    // If --compact was requested, run maintenance_step() before export
290    if compact {
291        let budget = std::time::Duration::from_secs(300); // 5-minute budget
292        write_engine
293            .maintenance_step(budget)
294            .with_context(|| "Compaction before export failed")?;
295    }
296
297    // Use the current generation from the write engine
298    let generation = write_engine.generation();
299
300    let mut options = ExportOptions::new(keyspace, table, generation);
301    if skip_validate {
302        options = options.skip_validation();
303    }
304
305    let report = write_engine
306        .export_sstable(output_dir, options)
307        .await
308        .with_context(|| "SSTable export failed")?;
309
310    Ok(ExportResult {
311        output_path: report.output_path,
312        row_count: report.row_count,
313        data_file_size: report.data_file_size,
314        execution_time_ms: start.elapsed().as_secs_f64() * 1000.0,
315    })
316}
317
318/// Handle the flush operation
319///
320/// # Arguments
321///
322/// * `write_engine` - The write engine to flush
323///
324/// # Returns
325///
326/// Ok with SSTableInfo if data was flushed, or None if memtable was empty
327#[cfg(feature = "write-support")]
328pub async fn handle_flush(
329    write_engine: &mut WriteEngine,
330) -> Result<Option<cqlite_core::storage::sstable::writer::SSTableInfo>> {
331    write_engine
332        .flush()
333        .await
334        .with_context(|| "Flush operation failed")
335}
336
337/// Display the result of a flush operation
338#[cfg(feature = "write-support")]
339pub fn display_flush_result(info: Option<&cqlite_core::storage::sstable::writer::SSTableInfo>) {
340    match info {
341        Some(info) => {
342            println!(
343                "Flushed: {} partitions, {} bytes",
344                info.partition_count, info.data_size
345            );
346            println!("  Output: {}", info.data_path.display());
347        }
348        None => {
349            println!("Nothing to flush (memtable empty)");
350        }
351    }
352}
353
354// Stubs for when write-support feature is not enabled
355#[cfg(not(feature = "write-support"))]
356pub async fn handle_mutation_write(
357    _write_engine: &mut (),
358    _mutation_json: &str,
359) -> Result<WriteResult> {
360    Err(anyhow::anyhow!(
361        "Write support is not enabled. Build with --features write-support to enable write operations."
362    ))
363}
364
365#[cfg(not(feature = "write-support"))]
366pub async fn handle_mutations_file(
367    _write_engine: &mut (),
368    _file_path: &Path,
369) -> Result<BatchWriteResult> {
370    Err(anyhow::anyhow!(
371        "Write support is not enabled. Build with --features write-support to enable write operations."
372    ))
373}
374
375#[cfg(not(feature = "write-support"))]
376pub fn handle_maintenance(_write_engine: &mut (), _budget_ms: u64) -> Result<()> {
377    Err(anyhow::anyhow!(
378        "Write support is not enabled. Build with --features write-support to enable write operations."
379    ))
380}
381
382#[cfg(not(feature = "write-support"))]
383pub fn handle_write_stats(_write_engine: &()) -> Result<WriteStats> {
384    Err(anyhow::anyhow!(
385        "Write support is not enabled. Build with --features write-support to enable write operations."
386    ))
387}
388
389#[cfg(not(feature = "write-support"))]
390pub async fn handle_export(
391    _write_engine: &mut (),
392    _output_dir: &Path,
393    _keyspace: &str,
394    _table: &str,
395    _compact: bool,
396    _skip_validate: bool,
397) -> Result<ExportResult> {
398    Err(anyhow::anyhow!(
399        "Write support is not enabled. Build with --features write-support to enable write operations."
400    ))
401}
402
403#[cfg(not(feature = "write-support"))]
404pub async fn handle_flush(_write_engine: &mut ()) -> Result<Option<()>> {
405    Err(anyhow::anyhow!(
406        "Write support is not enabled. Build with --features write-support to enable write operations."
407    ))
408}