1#[cfg(feature = "write-support")]
12use anyhow::Context;
13use anyhow::Result;
14#[cfg(feature = "write-support")]
15use std::io::{BufRead, BufReader};
16#[allow(unused_imports)]
17use std::path::Path;
18#[cfg(feature = "write-support")]
19use std::time::{Duration, Instant};
20
21#[cfg(feature = "write-support")]
22use cqlite_core::storage::write_engine::{ExportOptions, MaintenanceReport, Mutation, WriteEngine};
23
24#[derive(Debug)]
26pub struct WriteResult {
27 pub rows_affected: u64,
29 pub execution_time_ms: f64,
31}
32
33impl WriteResult {
34 pub fn display(&self) {
36 println!(
37 "OK: {} row(s) affected ({:.1}ms)",
38 self.rows_affected, self.execution_time_ms
39 );
40 }
41}
42
43#[derive(Debug)]
45pub struct BatchWriteResult {
46 pub total_rows: u64,
48 pub successful_writes: u64,
50 pub failed_writes: u64,
52 pub execution_time_ms: f64,
54}
55
56impl BatchWriteResult {
57 pub fn display(&self) {
59 println!(
60 "Batch complete: {} row(s) affected ({} succeeded, {} failed) in {:.1}ms",
61 self.total_rows, self.successful_writes, self.failed_writes, self.execution_time_ms
62 );
63 }
64}
65
66#[derive(Debug)]
68pub struct WriteStats {
69 pub memtable_size: usize,
71 pub memtable_rows: usize,
73 pub wal_size: u64,
75 pub generation: u64,
77}
78
79impl WriteStats {
80 pub fn display(&self) {
82 println!("Write Engine Statistics:");
83 println!(" Memtable size: {} bytes", self.memtable_size);
84 println!(" Memtable rows: {}", self.memtable_rows);
85 println!(" WAL size: {} bytes", self.wal_size);
86 println!(" Generation: {}", self.generation);
87 }
88}
89
90#[derive(Debug)]
92pub struct ExportResult {
93 pub output_path: std::path::PathBuf,
95 pub row_count: u64,
97 pub data_file_size: u64,
99 pub execution_time_ms: f64,
101}
102
103impl ExportResult {
104 pub fn display(&self) {
106 println!("Export complete:");
107 println!(" Output: {}", self.output_path.display());
108 println!(" Rows: {}", self.row_count);
109 println!(" Size: {} bytes", self.data_file_size);
110 println!(" Time: {:.1}ms", self.execution_time_ms);
111 }
112}
113
114#[cfg(feature = "write-support")]
125pub async fn handle_mutation_write(
126 write_engine: &mut WriteEngine,
127 mutation_json: &str,
128) -> Result<WriteResult> {
129 let start = Instant::now();
130
131 let mutation: Mutation =
133 serde_json::from_str(mutation_json).with_context(|| "Failed to parse mutation JSON")?;
134
135 write_engine
137 .write_async(mutation)
138 .await
139 .with_context(|| "Failed to write mutation")?;
140
141 Ok(WriteResult {
142 rows_affected: 1,
143 execution_time_ms: start.elapsed().as_secs_f64() * 1000.0,
144 })
145}
146
147#[cfg(feature = "write-support")]
158pub async fn handle_mutations_file(
159 write_engine: &mut WriteEngine,
160 file_path: &Path,
161) -> Result<BatchWriteResult> {
162 let start = Instant::now();
163
164 let file = std::fs::File::open(file_path)
165 .with_context(|| format!("Failed to open mutations file: {}", file_path.display()))?;
166
167 let reader = BufReader::new(file);
168 let mut successful_writes = 0u64;
169 let mut failed_writes = 0u64;
170 let mut line_number = 0u64;
171
172 for line in reader.lines() {
173 line_number += 1;
174 let line = line
175 .with_context(|| format!("Failed to read line {} from mutations file", line_number))?;
176
177 let trimmed = line.trim();
178 if trimmed.is_empty() || trimmed.starts_with('#') {
179 continue;
181 }
182
183 match serde_json::from_str::<Mutation>(trimmed) {
184 Ok(mutation) => match write_engine.write_async(mutation).await {
185 Ok(()) => {
186 successful_writes += 1;
187 }
188 Err(e) => {
189 eprintln!("Line {}: Write failed: {}", line_number, e);
190 failed_writes += 1;
191 }
192 },
193 Err(e) => {
194 eprintln!("Line {}: Invalid JSON: {}", line_number, e);
195 failed_writes += 1;
196 }
197 }
198 }
199
200 Ok(BatchWriteResult {
201 total_rows: successful_writes,
202 successful_writes,
203 failed_writes,
204 execution_time_ms: start.elapsed().as_secs_f64() * 1000.0,
205 })
206}
207
208#[cfg(feature = "write-support")]
219pub fn handle_maintenance(
220 write_engine: &mut WriteEngine,
221 budget_ms: u64,
222) -> Result<MaintenanceReport> {
223 let budget = Duration::from_millis(budget_ms);
224 write_engine
225 .maintenance_step(budget)
226 .with_context(|| "Maintenance step failed")
227}
228
229#[cfg(feature = "write-support")]
231pub fn display_maintenance_report(report: &MaintenanceReport) {
232 println!("Maintenance complete:");
233 println!(" Time spent: {:?}", report.time_spent);
234 println!(" Rows merged: {}", report.rows_merged);
235 println!(" Bytes written: {} bytes", report.bytes_written);
236 println!(" Pending compaction: {}", report.pending_compaction);
237 if !report.completed_merges.is_empty() {
238 println!(" Completed merges:");
239 for path in &report.completed_merges {
240 println!(" - {}", path.display());
241 }
242 }
243}
244
245#[cfg(feature = "write-support")]
255pub fn handle_write_stats(write_engine: &WriteEngine) -> Result<WriteStats> {
256 Ok(WriteStats {
257 memtable_size: write_engine.memtable_size(),
258 memtable_rows: write_engine.memtable_row_count(),
259 wal_size: write_engine.wal_size(),
260 generation: write_engine.generation(),
261 })
262}
263
264#[cfg(feature = "write-support")]
279pub async fn handle_export(
280 write_engine: &mut WriteEngine,
281 output_dir: &Path,
282 keyspace: &str,
283 table: &str,
284 compact: bool,
285 skip_validate: bool,
286) -> Result<ExportResult> {
287 let start = Instant::now();
288
289 if compact {
291 let budget = std::time::Duration::from_secs(300); write_engine
293 .maintenance_step(budget)
294 .with_context(|| "Compaction before export failed")?;
295 }
296
297 let generation = write_engine.generation();
299
300 let mut options = ExportOptions::new(keyspace, table, generation);
301 if skip_validate {
302 options = options.skip_validation();
303 }
304
305 let report = write_engine
306 .export_sstable(output_dir, options)
307 .await
308 .with_context(|| "SSTable export failed")?;
309
310 Ok(ExportResult {
311 output_path: report.output_path,
312 row_count: report.row_count,
313 data_file_size: report.data_file_size,
314 execution_time_ms: start.elapsed().as_secs_f64() * 1000.0,
315 })
316}
317
318#[cfg(feature = "write-support")]
328pub async fn handle_flush(
329 write_engine: &mut WriteEngine,
330) -> Result<Option<cqlite_core::storage::sstable::writer::SSTableInfo>> {
331 write_engine
332 .flush()
333 .await
334 .with_context(|| "Flush operation failed")
335}
336
337#[cfg(feature = "write-support")]
339pub fn display_flush_result(info: Option<&cqlite_core::storage::sstable::writer::SSTableInfo>) {
340 match info {
341 Some(info) => {
342 println!(
343 "Flushed: {} partitions, {} bytes",
344 info.partition_count, info.data_size
345 );
346 println!(" Output: {}", info.data_path.display());
347 }
348 None => {
349 println!("Nothing to flush (memtable empty)");
350 }
351 }
352}
353
354#[cfg(not(feature = "write-support"))]
356pub async fn handle_mutation_write(
357 _write_engine: &mut (),
358 _mutation_json: &str,
359) -> Result<WriteResult> {
360 Err(anyhow::anyhow!(
361 "Write support is not enabled. Build with --features write-support to enable write operations."
362 ))
363}
364
365#[cfg(not(feature = "write-support"))]
366pub async fn handle_mutations_file(
367 _write_engine: &mut (),
368 _file_path: &Path,
369) -> Result<BatchWriteResult> {
370 Err(anyhow::anyhow!(
371 "Write support is not enabled. Build with --features write-support to enable write operations."
372 ))
373}
374
375#[cfg(not(feature = "write-support"))]
376pub fn handle_maintenance(_write_engine: &mut (), _budget_ms: u64) -> Result<()> {
377 Err(anyhow::anyhow!(
378 "Write support is not enabled. Build with --features write-support to enable write operations."
379 ))
380}
381
382#[cfg(not(feature = "write-support"))]
383pub fn handle_write_stats(_write_engine: &()) -> Result<WriteStats> {
384 Err(anyhow::anyhow!(
385 "Write support is not enabled. Build with --features write-support to enable write operations."
386 ))
387}
388
389#[cfg(not(feature = "write-support"))]
390pub async fn handle_export(
391 _write_engine: &mut (),
392 _output_dir: &Path,
393 _keyspace: &str,
394 _table: &str,
395 _compact: bool,
396 _skip_validate: bool,
397) -> Result<ExportResult> {
398 Err(anyhow::anyhow!(
399 "Write support is not enabled. Build with --features write-support to enable write operations."
400 ))
401}
402
403#[cfg(not(feature = "write-support"))]
404pub async fn handle_flush(_write_engine: &mut ()) -> Result<Option<()>> {
405 Err(anyhow::anyhow!(
406 "Write support is not enabled. Build with --features write-support to enable write operations."
407 ))
408}