mod avro;
mod orc;
mod parquet;
use crate::io::{FileRead, OutputFile};
use crate::spec::{DataField, Predicate};
use crate::table::{ArrowRecordBatchStream, RowRange};
use crate::Error;
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
pub(crate) struct FilePredicates {
pub predicates: Vec<Predicate>,
pub file_fields: Vec<DataField>,
}
#[async_trait]
pub(crate) trait FormatFileReader: Send + Sync {
async fn read_batch_stream(
&self,
reader: Box<dyn FileRead>,
file_size: u64,
read_fields: &[DataField],
predicates: Option<&FilePredicates>,
batch_size: Option<usize>,
row_selection: Option<Vec<RowRange>>,
) -> crate::Result<ArrowRecordBatchStream>;
}
#[async_trait]
pub(crate) trait FormatFileWriter: Send {
async fn write(&mut self, batch: &RecordBatch) -> crate::Result<()>;
fn num_bytes(&self) -> usize;
fn in_progress_size(&self) -> usize;
async fn flush(&mut self) -> crate::Result<()>;
async fn close(self: Box<Self>) -> crate::Result<u64>;
}
pub(crate) fn create_format_reader(path: &str) -> crate::Result<Box<dyn FormatFileReader>> {
if path.to_ascii_lowercase().ends_with(".parquet") {
Ok(Box::new(parquet::ParquetFormatReader))
} else if path.to_ascii_lowercase().ends_with(".orc") {
Ok(Box::new(orc::OrcFormatReader))
} else if path.to_ascii_lowercase().ends_with(".avro") {
Ok(Box::new(avro::AvroFormatReader))
} else {
Err(Error::Unsupported {
message: format!(
"unsupported file format: expected .parquet, .orc, or .avro, got: {path}"
),
})
}
}
pub(crate) async fn create_format_writer(
output: &OutputFile,
schema: SchemaRef,
compression: &str,
zstd_level: i32,
) -> crate::Result<Box<dyn FormatFileWriter>> {
let path = output.location();
if path.to_ascii_lowercase().ends_with(".parquet") {
Ok(Box::new(
parquet::ParquetFormatWriter::new(output, schema, compression, zstd_level).await?,
))
} else {
Err(Error::Unsupported {
message: format!("unsupported write format: expected .parquet, got: {path}"),
})
}
}