use crate::types::EtlError;
use crate::types::Result;
use arrow::ipc::writer::FileWriter;
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::fs::File;
use std::path::Path;
pub fn to_parquet<P: AsRef<Path>>(batches: &[RecordBatch], path: P) -> Result<()> {
let path = path.as_ref();
let file = File::create(path)?;
let schema = batches
.first()
.ok_or_else(|| EtlError::Config("No batches to write".to_string()))?
.schema();
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
for batch in batches {
writer.write(batch)?;
}
let _ = writer.close()?;
Ok(())
}
pub fn to_arrow_ipc<P: AsRef<Path>>(batches: &[RecordBatch], path: P) -> Result<()> {
let path = path.as_ref();
let file = File::create(path)?;
let schema = batches
.first()
.ok_or_else(|| EtlError::Config("No batches to write".to_string()))?
.schema();
let mut writer = FileWriter::try_new(file, schema.as_ref())?;
for batch in batches {
writer.write(batch)?;
}
writer.finish()?;
Ok(())
}
pub struct StreamingArrowWriter {
writer: ArrowWriter<File>,
}
impl StreamingArrowWriter {
pub fn new<P: AsRef<Path>>(path: P, schema: &arrow::datatypes::Schema) -> Result<Self> {
let file = File::create(path.as_ref())?;
let props = WriterProperties::builder().build();
let writer = ArrowWriter::try_new(file, std::sync::Arc::new(schema.clone()), Some(props))?;
Ok(Self { writer })
}
pub fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
Ok(())
}
pub fn finish(self) -> Result<()> {
let _ = self.writer.close()?;
Ok(())
}
}