use bytes::Bytes;
use futures::future::BoxFuture;
use std::sync::Arc;
use crate::arrow::async_writer::AsyncFileWriter;
use crate::errors::{ParquetError, Result};
use object_store::ObjectStore;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use tokio::io::AsyncWriteExt;
#[derive(Debug)]
pub struct ParquetObjectWriter {
w: BufWriter,
}
impl ParquetObjectWriter {
pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
Self::from_buf_writer(BufWriter::new(store, path))
}
pub fn from_buf_writer(w: BufWriter) -> Self {
Self { w }
}
pub fn into_inner(self) -> BufWriter {
self.w
}
}
impl AsyncFileWriter for ParquetObjectWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
Box::pin(async {
self.w
.put(bs)
.await
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
Box::pin(async {
self.w
.shutdown()
.await
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
}
impl From<BufWriter> for ParquetObjectWriter {
fn from(w: BufWriter) -> Self {
Self::from_buf_writer(w)
}
}
#[cfg(test)]
mod tests {
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use object_store::memory::InMemory;
use std::sync::Arc;
use super::*;
use crate::arrow::AsyncArrowWriter;
use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use object_store::ObjectStoreExt;
#[tokio::test]
async fn test_async_writer() {
let store = Arc::new(InMemory::new());
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let object_store_writer = ParquetObjectWriter::new(store.clone(), Path::from("test"));
let mut writer =
AsyncArrowWriter::try_new(object_store_writer, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();
let buffer = store
.get(&Path::from("test"))
.await
.unwrap()
.bytes()
.await
.unwrap();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
.unwrap()
.build()
.unwrap();
let read = reader.next().unwrap().unwrap();
assert_eq!(to_write, read);
}
}