use crate::config::OutputConfig;
use crate::output::{OutputError, StreamingWriter};
use cqlite_core::export::parquet::{
ParquetExportError, ParquetExportOptions, ParquetWriter as CoreParquetWriter,
StreamingParquetWriter as CoreStreamingParquetWriter,
};
use cqlite_core::query::{QueryMetadata, QueryResult, QueryRow};
use std::error::Error as StdError;
use std::fs::File;
use std::io::Write;
fn map_parquet_err(e: ParquetExportError) -> OutputError {
OutputError::Io(std::io::Error::other(e.to_string()))
}
fn options_from_config(config: &OutputConfig) -> ParquetExportOptions {
ParquetExportOptions {
row_limit: config.limit,
..Default::default()
}
}
pub struct ParquetWriter;
impl ParquetWriter {
pub fn write(
result: &QueryResult,
config: &OutputConfig,
) -> Result<Vec<u8>, Box<dyn StdError>> {
CoreParquetWriter::write(result, &options_from_config(config))
.map_err(|e| Box::new(e) as Box<dyn StdError>)
}
}
pub struct StreamingParquetWriter<W: Write + Send> {
inner: CoreStreamingParquetWriter<W>,
}
impl<W: Write + Send> StreamingWriter for StreamingParquetWriter<W> {
fn write_header(&mut self, _metadata: &QueryMetadata) -> Result<(), OutputError> {
Ok(())
}
fn write_chunk(&mut self, rows: &[QueryRow]) -> Result<usize, OutputError> {
self.inner.write_chunk(rows).map_err(map_parquet_err)
}
fn finalize(&mut self) -> Result<(), OutputError> {
self.inner.finalize().map_err(map_parquet_err)
}
fn rows_written(&self) -> u64 {
self.inner.rows_written()
}
}
pub fn create_streaming_parquet_writer(
file: File,
metadata: &QueryMetadata,
row_group_size: usize,
) -> Result<StreamingParquetWriter<File>, OutputError> {
create_streaming_parquet_writer_from_writer(file, metadata, row_group_size)
}
pub fn create_streaming_parquet_writer_from_writer<W: Write + Send>(
output: W,
metadata: &QueryMetadata,
row_group_size: usize,
) -> Result<StreamingParquetWriter<W>, OutputError> {
let options = ParquetExportOptions {
row_group_size,
..Default::default()
};
let inner =
CoreStreamingParquetWriter::new(output, metadata, &options).map_err(map_parquet_err)?;
Ok(StreamingParquetWriter { inner })
}