Skip to main content

cqlite_cli/output/
parquet.rs

1//! CLI adapter for the core Parquet export writer (Epic #682)
2//!
3//! The Parquet writer implementation lives in `cqlite_core::export::parquet`
4//! (behind cqlite-core's `parquet` feature, which this crate enables).  This
5//! module is a thin boundary layer that:
6//!
7//! - exposes the historical CLI-facing API (`ParquetWriter::write` taking an
8//!   [`OutputConfig`], `create_streaming_parquet_writer*` taking a row group
9//!   size) so existing call sites and tests are unchanged,
10//! - implements the CLI [`StreamingWriter`] trait on top of the core
11//!   streaming writer, and
12//! - maps [`ParquetExportError`] to the CLI [`OutputError`] (as an I/O error,
13//!   matching the pre-lift error rendering).
14//!
15//! Output bytes are produced exclusively by the core writer; the adapter adds
16//! no formatting of its own, so CLI `--out parquet` output is byte-identical
17//! to the pre-lift implementation (see `tests/parquet_golden_tests.rs`).
18
19use crate::config::OutputConfig;
20use crate::output::{OutputError, StreamingWriter};
21use cqlite_core::export::parquet::{
22    ParquetExportError, ParquetExportOptions, ParquetWriter as CoreParquetWriter,
23    StreamingParquetWriter as CoreStreamingParquetWriter,
24};
25use cqlite_core::query::{QueryMetadata, QueryResult, QueryRow};
26use std::error::Error as StdError;
27use std::fs::File;
28use std::io::Write;
29
30/// Map a core Parquet export error to the CLI output error type.
31///
32/// The pre-lift implementation wrapped all Parquet failures as
33/// `OutputError::Io`, so the mapping preserves the historical rendering
34/// ("I/O error: ...").
35fn map_parquet_err(e: ParquetExportError) -> OutputError {
36    OutputError::Io(std::io::Error::other(e.to_string()))
37}
38
39/// Build core export options from the CLI output configuration.
40fn options_from_config(config: &OutputConfig) -> ParquetExportOptions {
41    ParquetExportOptions {
42        row_limit: config.limit,
43        ..Default::default()
44    }
45}
46
47/// Parquet writer for QueryResult (CLI facade)
48///
49/// Delegates to [`cqlite_core::export::parquet::ParquetWriter`].
50/// Unlike JSON/CSV writers, this returns `Vec<u8>` (binary data).
51pub struct ParquetWriter;
52
53impl ParquetWriter {
54    /// Write QueryResult to Parquet binary format
55    ///
56    /// # Arguments
57    ///
58    /// * `result` - The query result to convert to Parquet
59    /// * `config` - Output configuration for row limits
60    ///
61    /// # Returns
62    ///
63    /// Binary Parquet data or error
64    pub fn write(
65        result: &QueryResult,
66        config: &OutputConfig,
67    ) -> Result<Vec<u8>, Box<dyn StdError>> {
68        CoreParquetWriter::write(result, &options_from_config(config))
69            .map_err(|e| Box::new(e) as Box<dyn StdError>)
70    }
71}
72
73/// Streaming Parquet writer for memory-efficient export of large datasets
74///
75/// CLI wrapper around [`cqlite_core::export::parquet::StreamingParquetWriter`]
76/// implementing the CLI [`StreamingWriter`] trait.
77pub struct StreamingParquetWriter<W: Write + Send> {
78    inner: CoreStreamingParquetWriter<W>,
79}
80
81impl<W: Write + Send> StreamingWriter for StreamingParquetWriter<W> {
82    fn write_header(&mut self, _metadata: &QueryMetadata) -> Result<(), OutputError> {
83        // The core writer builds its Arrow schema and writes the Parquet file
84        // header at construction time (the constructors below take the query
85        // metadata), so there is nothing left to do here.
86        Ok(())
87    }
88
89    fn write_chunk(&mut self, rows: &[QueryRow]) -> Result<usize, OutputError> {
90        self.inner.write_chunk(rows).map_err(map_parquet_err)
91    }
92
93    fn finalize(&mut self) -> Result<(), OutputError> {
94        self.inner.finalize().map_err(map_parquet_err)
95    }
96
97    fn rows_written(&self) -> u64 {
98        self.inner.rows_written()
99    }
100}
101
102/// Create a StreamingParquetWriter that writes to a file
103///
104/// This is a convenience function that handles the file creation and
105/// ArrowWriter initialization.
106pub fn create_streaming_parquet_writer(
107    file: File,
108    metadata: &QueryMetadata,
109    row_group_size: usize,
110) -> Result<StreamingParquetWriter<File>, OutputError> {
111    create_streaming_parquet_writer_from_writer(file, metadata, row_group_size)
112}
113
114/// Create a StreamingParquetWriter that writes to any `W: Write + Send`.
115///
116/// Both constructors delegate to the same core constructor, so the schema
117/// produced by the streaming path is always identical to the schema produced
118/// by the batch `ParquetWriter`.
119pub fn create_streaming_parquet_writer_from_writer<W: Write + Send>(
120    output: W,
121    metadata: &QueryMetadata,
122    row_group_size: usize,
123) -> Result<StreamingParquetWriter<W>, OutputError> {
124    let options = ParquetExportOptions {
125        row_group_size,
126        ..Default::default()
127    };
128    let inner =
129        CoreStreamingParquetWriter::new(output, metadata, &options).map_err(map_parquet_err)?;
130    Ok(StreamingParquetWriter { inner })
131}