Skip to main content

cqlite_cli/output/
mod.rs

1//! Output formatting for QueryResult
2//!
3//! This module provides writers that adapt QueryResult to various output formats
4//! (table, JSON, CSV) with stable, cqlsh-compatible formatting.
5//!
6//! ## Contract
7//!
8//! All writers follow the QUERY_RESULT_CONTRACT.md specification:
9//! - Column order determined by `metadata.columns`
10//! - Null values handled consistently
11//! - Format-specific conventions (e.g., row count footer for tables)
12
13use std::fs::{self, File};
14use std::io::Write;
15use std::path::{Path, PathBuf};
16
17#[cfg(feature = "state_machine")]
18pub mod csv;
19#[cfg(feature = "state_machine")]
20pub mod json;
21#[cfg(feature = "state_machine")]
22pub mod parquet;
23#[cfg(feature = "state_machine")]
24pub mod table;
25pub mod value_fmt;
26
27#[cfg(feature = "state_machine")]
28pub use csv::{CSVWriter, StreamingCSVWriter};
29#[cfg(feature = "state_machine")]
30pub use json::{JSONWriter, StreamingJSONWriter};
31#[cfg(feature = "state_machine")]
32#[allow(unused_imports)]
33pub use parquet::{
34    create_streaming_parquet_writer, create_streaming_parquet_writer_from_writer, ParquetWriter,
35};
36#[cfg(feature = "state_machine")]
37#[allow(unused_imports)]
38pub use table::TableWriter;
39#[allow(unused_imports)]
40pub use value_fmt::ValueFormatter;
41
42// ============================================================================
43// Output Target Types (Issue #279)
44// ============================================================================
45
46/// Target destination for query output
47#[derive(Debug, Clone, Default)]
48pub enum OutputTarget {
49    /// Write to stdout (default, backward compatible)
50    #[default]
51    Stdout,
52    /// Write to a file at the specified path
53    File(PathBuf),
54}
55
56impl OutputTarget {
57    /// Returns true if this is a file target
58    pub fn is_file(&self) -> bool {
59        matches!(self, OutputTarget::File(_))
60    }
61
62    /// Get the file path if this is a file target
63    #[allow(dead_code)]
64    pub fn path(&self) -> Option<&PathBuf> {
65        match self {
66            OutputTarget::File(p) => Some(p),
67            OutputTarget::Stdout => None,
68        }
69    }
70}
71
72/// Error type for output operations
73#[derive(Debug)]
74pub enum OutputError {
75    /// File I/O error
76    Io(std::io::Error),
77    /// Directory does not exist
78    DirectoryNotFound(PathBuf),
79    /// File already exists and overwrite not allowed
80    FileExists(PathBuf),
81    /// Parquet requires file output
82    ParquetRequiresFile,
83}
84
85impl std::fmt::Display for OutputError {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        match self {
88            OutputError::Io(e) => write!(f, "I/O error: {}", e),
89            OutputError::DirectoryNotFound(p) => {
90                write!(f, "Directory not found: {}", p.display())
91            }
92            OutputError::FileExists(p) => {
93                write!(
94                    f,
95                    "File already exists: {}. Use --overwrite to replace.",
96                    p.display()
97                )
98            }
99            OutputError::ParquetRequiresFile => {
100                write!(
101                    f,
102                    "Parquet format requires file output.\n\n\
103                     Use --output/-o to specify an output file:\n\
104                     cqlite --out parquet --output results.parquet -e 'SELECT * FROM table'"
105                )
106            }
107        }
108    }
109}
110
111impl std::error::Error for OutputError {
112    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
113        match self {
114            OutputError::Io(e) => Some(e),
115            _ => None,
116        }
117    }
118}
119
120// ============================================================================
121// Streaming Writer Interface (Issue #280)
122// ============================================================================
123
124#[cfg(feature = "state_machine")]
125use cqlite_core::query::{QueryMetadata, QueryRow};
126
127/// Trait for writers that support streaming/chunked output
128///
129/// Unlike batch writers that receive a complete `QueryResult`, streaming writers
130/// process data incrementally via `write_header()`, `write_chunk()`, and `finalize()`.
131/// This enables processing of arbitrarily large result sets within the 128MB memory budget.
132///
133/// # Memory Budget
134///
135/// To stay within the 128MB target:
136/// - Chunk sizes should typically be 5,000-10,000 rows
137/// - For large blob/text columns, use smaller chunks (1,000-5,000)
138/// - Parquet writers buffer rows for row groups; default is 10,000 rows
139///
140/// # Contract
141///
142/// 1. `write_header()` MUST be called exactly once before any `write_chunk()` calls
143/// 2. `write_chunk()` MAY be called zero or more times
144/// 3. `finalize()` MUST be called exactly once to complete the output
145/// 4. After `finalize()`, no further calls are allowed
146/// 5. Implementors SHOULD return errors rather than panic on contract violations
147///
148/// # Troubleshooting
149///
150/// If you encounter OOM errors:
151/// 1. Reduce chunk sizes when calling `write_chunk()` (max: 10,000 rows)
152/// 2. For Parquet, reduce `row_group_size` (default: 10,000 rows)
153/// 3. Check for large blob/text columns that inflate row size
154///
155/// # Example
156///
157/// ```ignore
158/// let mut writer = StreamingCSVWriter::new(file);
159/// writer.write_header(&metadata)?;
160///
161/// for chunk in result_stream.chunks(10_000) {
162///     writer.write_chunk(&chunk)?;
163/// }
164///
165/// writer.finalize()?;
166/// ```
167#[cfg(feature = "state_machine")]
168pub trait StreamingWriter: Send {
169    /// Initialize writer with column metadata (write header if applicable)
170    ///
171    /// Called once before any data is written. For CSV, this writes the header row.
172    /// For Parquet, this initializes the Arrow schema.
173    fn write_header(&mut self, metadata: &QueryMetadata) -> Result<(), OutputError>;
174
175    /// Write a chunk of rows (called multiple times during streaming)
176    ///
177    /// Returns the number of rows actually written (may differ from input if
178    /// the writer buffers internally, e.g., Parquet row groups).
179    fn write_chunk(&mut self, rows: &[QueryRow]) -> Result<usize, OutputError>;
180
181    /// Finalize output (flush buffers, write footer, close resources)
182    ///
183    /// Called once after all data has been written. For Parquet, this writes
184    /// the final row group and file footer.
185    fn finalize(&mut self) -> Result<(), OutputError>;
186
187    /// Get count of rows written so far
188    ///
189    /// Intended for progress reporting; not yet integrated into export functions.
190    #[allow(dead_code)]
191    fn rows_written(&self) -> u64;
192
193    /// Get bytes written so far (if trackable)
194    ///
195    /// Intended for progress reporting; not yet integrated into export functions.
196    #[allow(dead_code)]
197    fn bytes_written(&self) -> Option<u64> {
198        None
199    }
200}
201
202/// Progress information for streaming operations
203#[derive(Debug, Clone, Default)]
204#[allow(dead_code)]
205pub struct StreamingProgress {
206    /// Total rows written so far
207    pub rows_written: u64,
208    /// Total bytes written so far (if known)
209    pub bytes_written: Option<u64>,
210    /// Elapsed time in milliseconds
211    pub elapsed_ms: u64,
212    /// Estimated total rows (if known)
213    pub total_rows_hint: Option<u64>,
214}
215
216#[allow(dead_code)]
217impl StreamingProgress {
218    /// Calculate progress percentage (if total is known)
219    pub fn percent(&self) -> Option<f64> {
220        self.total_rows_hint.map(|total| {
221            if total == 0 {
222                100.0
223            } else {
224                (self.rows_written as f64 / total as f64) * 100.0
225            }
226        })
227    }
228
229    /// Calculate rows per second throughput
230    pub fn rows_per_second(&self) -> f64 {
231        if self.elapsed_ms == 0 {
232            0.0
233        } else {
234            (self.rows_written as f64) / (self.elapsed_ms as f64 / 1000.0)
235        }
236    }
237}
238
239/// Write output to target (stdout or file) with atomic file writes
240///
241/// For file targets, this uses atomic write semantics:
242/// 1. Write to a temporary file in the same directory
243/// 2. Sync to disk for durability
244/// 3. Rename to final path (atomic on POSIX systems)
245///
246/// # Arguments
247/// * `content` - The bytes to write
248/// * `target` - Where to write (stdout or file path)
249/// * `overwrite` - If true, overwrite existing files; if false, error on existing
250///
251/// # Errors
252/// Returns `OutputError` on I/O failures, missing directories, or file exists conflicts
253pub fn write_to_target(
254    content: &[u8],
255    target: &OutputTarget,
256    overwrite: bool,
257) -> Result<(), OutputError> {
258    match target {
259        OutputTarget::Stdout => {
260            std::io::stdout()
261                .write_all(content)
262                .map_err(OutputError::Io)?;
263            std::io::stdout().flush().map_err(OutputError::Io)
264        }
265        OutputTarget::File(path) => write_atomic(path, content, overwrite),
266    }
267}
268
269/// Write to file atomically (write to temp, rename on success)
270///
271/// This ensures that interrupted writes don't leave partial files.
272fn write_atomic(path: &Path, content: &[u8], overwrite: bool) -> Result<(), OutputError> {
273    // Validate parent directory exists
274    if let Some(parent) = path.parent() {
275        if !parent.as_os_str().is_empty() && !parent.exists() {
276            return Err(OutputError::DirectoryNotFound(parent.to_path_buf()));
277        }
278    }
279
280    // Check if file exists when overwrite is false
281    if !overwrite && path.exists() {
282        return Err(OutputError::FileExists(path.to_path_buf()));
283    }
284
285    // Generate temp file path in same directory (required for atomic rename)
286    let extension = path
287        .extension()
288        .unwrap_or_default()
289        .to_string_lossy()
290        .to_string();
291    let temp_path = path.with_extension(format!("{}.tmp.{}", extension, std::process::id()));
292
293    // Write to temp file
294    let write_result = (|| {
295        let mut file = File::create(&temp_path).map_err(OutputError::Io)?;
296        file.write_all(content).map_err(OutputError::Io)?;
297        file.sync_all().map_err(OutputError::Io)?;
298        Ok(())
299    })();
300
301    // If write failed, clean up temp file
302    if let Err(e) = write_result {
303        let _ = fs::remove_file(&temp_path);
304        return Err(e);
305    }
306
307    // Atomic rename (on POSIX, this is atomic if same filesystem)
308    fs::rename(&temp_path, path).map_err(|e| {
309        // Clean up temp file on rename failure
310        let _ = fs::remove_file(&temp_path);
311        OutputError::Io(e)
312    })
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use tempfile::TempDir;
319
320    #[test]
321    fn test_output_target_default_is_stdout() {
322        let target = OutputTarget::default();
323        assert!(matches!(target, OutputTarget::Stdout));
324        assert!(!target.is_file());
325    }
326
327    #[test]
328    fn test_output_target_file() {
329        let target = OutputTarget::File(PathBuf::from("/tmp/test.json"));
330        assert!(target.is_file());
331        assert_eq!(target.path(), Some(&PathBuf::from("/tmp/test.json")));
332    }
333
334    #[test]
335    fn test_write_to_file_creates_file() {
336        let temp_dir = TempDir::new().unwrap();
337        let file_path = temp_dir.path().join("output.json");
338
339        let content = b"test content";
340        let target = OutputTarget::File(file_path.clone());
341
342        write_to_target(content, &target, false).unwrap();
343
344        assert!(file_path.exists());
345        let written = std::fs::read_to_string(&file_path).unwrap();
346        assert_eq!(written, "test content");
347    }
348
349    #[test]
350    fn test_write_to_file_respects_overwrite_false() {
351        let temp_dir = TempDir::new().unwrap();
352        let file_path = temp_dir.path().join("existing.json");
353
354        // Create existing file
355        std::fs::write(&file_path, "existing content").unwrap();
356
357        let content = b"new content";
358        let target = OutputTarget::File(file_path.clone());
359
360        let result = write_to_target(content, &target, false);
361        assert!(matches!(result, Err(OutputError::FileExists(_))));
362
363        // Original content should be preserved
364        let written = std::fs::read_to_string(&file_path).unwrap();
365        assert_eq!(written, "existing content");
366    }
367
368    #[test]
369    fn test_write_to_file_respects_overwrite_true() {
370        let temp_dir = TempDir::new().unwrap();
371        let file_path = temp_dir.path().join("existing.json");
372
373        // Create existing file
374        std::fs::write(&file_path, "existing content").unwrap();
375
376        let content = b"new content";
377        let target = OutputTarget::File(file_path.clone());
378
379        write_to_target(content, &target, true).unwrap();
380
381        // Content should be replaced
382        let written = std::fs::read_to_string(&file_path).unwrap();
383        assert_eq!(written, "new content");
384    }
385
386    #[test]
387    fn test_write_to_file_missing_directory_error() {
388        let file_path = PathBuf::from("/nonexistent/directory/output.json");
389        let content = b"test content";
390        let target = OutputTarget::File(file_path);
391
392        let result = write_to_target(content, &target, false);
393        assert!(matches!(result, Err(OutputError::DirectoryNotFound(_))));
394    }
395
396    #[test]
397    fn test_write_atomic_no_temp_file_on_success() {
398        let temp_dir = TempDir::new().unwrap();
399        let file_path = temp_dir.path().join("output.json");
400
401        let content = b"test content";
402        write_atomic(&file_path, content, false).unwrap();
403
404        // Check no temp files left behind
405        let entries: Vec<_> = std::fs::read_dir(temp_dir.path())
406            .unwrap()
407            .filter_map(|e| e.ok())
408            .collect();
409        assert_eq!(entries.len(), 1);
410        assert_eq!(entries[0].file_name(), "output.json");
411    }
412
413    #[test]
414    fn test_output_error_display() {
415        let io_err = OutputError::Io(std::io::Error::new(
416            std::io::ErrorKind::NotFound,
417            "test error",
418        ));
419        assert!(io_err.to_string().contains("I/O error"));
420
421        let dir_err = OutputError::DirectoryNotFound(PathBuf::from("/missing"));
422        assert!(dir_err.to_string().contains("Directory not found"));
423
424        let exists_err = OutputError::FileExists(PathBuf::from("/existing"));
425        assert!(exists_err.to_string().contains("File already exists"));
426        assert!(exists_err.to_string().contains("--overwrite"));
427
428        let parquet_err = OutputError::ParquetRequiresFile;
429        assert!(parquet_err.to_string().contains("Parquet format requires"));
430        assert!(parquet_err.to_string().contains("--output"));
431    }
432}