use std::io::Write;
use std::sync::Arc;
use arrow::array::RecordBatchReader;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;
use crate::ColumnarError;
pub struct ColumnarStreamWriter<W: Write + Send> {
writer: ArrowWriter<W>,
}
impl<W: Write + Send> ColumnarStreamWriter<W> {
pub fn new(
schema: Arc<Schema>,
sink: W,
props: Option<WriterProperties>,
) -> Result<Self, ColumnarError> {
let effective_props = props.unwrap_or_else(|| crate::writer::build_writer_props(&schema));
let writer = ArrowWriter::try_new(sink, Arc::clone(&schema), Some(effective_props))?;
Ok(ColumnarStreamWriter { writer })
}
pub fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ColumnarError> {
self.writer.write(batch).map_err(ColumnarError::Parquet)
}
pub fn finish(self) -> Result<(), ColumnarError> {
self.writer
.close()
.map(|_| ())
.map_err(ColumnarError::Parquet)
}
}
pub struct ColumnarStreamReader {
inner: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
}
impl ColumnarStreamReader {
pub fn from_bytes(bytes: Vec<u8>) -> Result<Self, ColumnarError> {
let cursor = Bytes::from(bytes);
let builder = ParquetRecordBatchReaderBuilder::try_new(cursor)?;
let inner = builder.build()?;
Ok(ColumnarStreamReader { inner })
}
pub fn schema(&self) -> Arc<Schema> {
self.inner.schema()
}
}
impl Iterator for ColumnarStreamReader {
type Item = Result<RecordBatch, ColumnarError>;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|r| r.map_err(ColumnarError::Arrow))
}
}