#[cfg(feature = "object_store")]
mod store;
#[cfg(feature = "object_store")]
pub use store::*;
use crate::{
arrow::ArrowWriter,
arrow::arrow_writer::ArrowWriterOptions,
errors::{ParquetError, Result},
file::{
metadata::{KeyValue, ParquetMetaData, RowGroupMetaData},
properties::WriterProperties,
},
};
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use bytes::Bytes;
use futures::FutureExt;
use futures::future::BoxFuture;
use std::mem;
use tokio::io::{AsyncWrite, AsyncWriteExt};
pub trait AsyncFileWriter: Send {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>;
fn complete(&mut self) -> BoxFuture<'_, Result<()>>;
}
impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_> {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
self.as_mut().write(bs)
}
fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
self.as_mut().complete()
}
}
impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
async move {
self.write_all(&bs).await?;
Ok(())
}
.boxed()
}
fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
async move {
self.flush().await?;
self.shutdown().await?;
Ok(())
}
.boxed()
}
}
pub struct AsyncArrowWriter<W> {
sync_writer: ArrowWriter<Vec<u8>>,
async_writer: W,
}
impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
pub fn try_new(
writer: W,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
) -> Result<Self> {
let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
Self::try_new_with_options(writer, arrow_schema, options)
}
pub fn try_new_with_options(
writer: W,
arrow_schema: SchemaRef,
options: ArrowWriterOptions,
) -> Result<Self> {
let sync_writer = ArrowWriter::try_new_with_options(Vec::new(), arrow_schema, options)?;
Ok(Self {
sync_writer,
async_writer: writer,
})
}
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.sync_writer.flushed_row_groups()
}
pub fn memory_size(&self) -> usize {
self.sync_writer.memory_size()
}
pub fn in_progress_size(&self) -> usize {
self.sync_writer.in_progress_size()
}
pub fn in_progress_rows(&self) -> usize {
self.sync_writer.in_progress_rows()
}
pub fn bytes_written(&self) -> usize {
self.sync_writer.bytes_written()
}
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
let before = self.sync_writer.flushed_row_groups().len();
self.sync_writer.write(batch)?;
if before != self.sync_writer.flushed_row_groups().len() {
self.do_write().await?;
}
Ok(())
}
pub async fn flush(&mut self) -> Result<()> {
self.sync_writer.flush()?;
self.do_write().await?;
Ok(())
}
pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
self.sync_writer.append_key_value_metadata(kv_metadata);
}
pub async fn finish(&mut self) -> Result<ParquetMetaData> {
let metadata = self.sync_writer.finish()?;
self.do_write().await?;
self.async_writer.complete().await?;
Ok(metadata)
}
pub async fn close(mut self) -> Result<ParquetMetaData> {
self.finish().await
}
pub fn into_inner(self) -> W {
self.async_writer
}
async fn do_write(&mut self) -> Result<()> {
let buffer = mem::take(self.sync_writer.inner_mut());
self.async_writer
.write(Bytes::from(buffer))
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
use bytes::Bytes;
use std::sync::Arc;
use super::*;
fn get_test_reader() -> ParquetRecordBatchReader {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let original_data = Bytes::from(std::fs::read(path).unwrap());
ParquetRecordBatchReaderBuilder::try_new(original_data)
.unwrap()
.build()
.unwrap()
}
#[tokio::test]
async fn test_async_writer() {
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut buffer = Vec::new();
let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();
let buffer = Bytes::from(buffer);
let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
.unwrap()
.build()
.unwrap();
let read = reader.next().unwrap().unwrap();
assert_eq!(to_write, read);
}
#[tokio::test]
async fn test_async_writer_with_sync_writer() {
let reader = get_test_reader();
let write_props = WriterProperties::builder()
.set_max_row_group_row_count(Some(64))
.build();
let mut async_buffer = Vec::new();
let mut async_writer = AsyncArrowWriter::try_new(
&mut async_buffer,
reader.schema(),
Some(write_props.clone()),
)
.unwrap();
let mut sync_buffer = Vec::new();
let mut sync_writer =
ArrowWriter::try_new(&mut sync_buffer, reader.schema(), Some(write_props)).unwrap();
for record_batch in reader {
let record_batch = record_batch.unwrap();
async_writer.write(&record_batch).await.unwrap();
sync_writer.write(&record_batch).unwrap();
}
sync_writer.close().unwrap();
async_writer.close().await.unwrap();
assert_eq!(sync_buffer, async_buffer);
}
#[tokio::test]
async fn test_async_writer_bytes_written() {
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let temp = tempfile::tempfile().unwrap();
let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
let mut writer =
AsyncArrowWriter::try_new(file.try_clone().await.unwrap(), to_write.schema(), None)
.unwrap();
writer.write(&to_write).await.unwrap();
let _metadata = writer.finish().await.unwrap();
let reported = writer.bytes_written();
let actual = file.metadata().await.unwrap().len() as usize;
assert_eq!(reported, actual);
}
#[tokio::test]
async fn test_async_writer_file() {
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let col2 = Arc::new(BinaryArray::from_iter_values(vec![
vec![0; 500000],
vec![0; 500000],
vec![0; 500000],
])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col), ("col2", col2)]).unwrap();
let temp = tempfile::tempfile().unwrap();
let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
let mut writer = AsyncArrowWriter::try_new(file, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(temp)
.unwrap()
.build()
.unwrap();
let read = reader.next().unwrap().unwrap();
assert_eq!(to_write, read);
}
#[tokio::test]
async fn in_progress_accounting() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let a = Int32Array::from_value(0_i32, 512);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
let temp = tempfile::tempfile().unwrap();
let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
let mut writer = AsyncArrowWriter::try_new(file, batch.schema(), None).unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.bytes_written(), 4); writer.write(&batch).await.unwrap();
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), batch.num_rows());
let initial_memory = writer.memory_size();
assert!(
initial_size <= initial_memory,
"{initial_size} <= {initial_memory}"
);
writer.write(&batch).await.unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);
assert!(writer.memory_size() > initial_memory);
assert!(
writer.in_progress_size() <= writer.memory_size(),
"in_progress_size {} <= memory_size {}",
writer.in_progress_size(),
writer.memory_size()
);
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().await.unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.memory_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);
writer.close().await.unwrap();
}
}