use bytes::Bytes;
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use super::error::StorageError;
use super::object_store_handle::JammiObjectStore;
pub async fn is_valid_parquet(handle: &JammiObjectStore) -> Result<bool, StorageError> {
let path = handle.data_path()?;
if !handle.exists(&path).await? {
return Ok(false);
}
let bytes = handle.get_bytes(&path).await?;
Ok(ParquetRecordBatchReaderBuilder::try_new(bytes).is_ok())
}
pub async fn count_parquet_rows(handle: &JammiObjectStore) -> Result<usize, StorageError> {
let path = handle.data_path()?;
let bytes = handle.get_bytes(&path).await?;
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes).map_err(|e| {
StorageError::layout(path.to_string(), format!("Parquet metadata read: {e}"))
})?;
Ok(builder.metadata().file_metadata().num_rows() as usize)
}
pub async fn read_all(handle: &JammiObjectStore) -> Result<Bytes, StorageError> {
let path = handle.data_path()?;
handle.get_bytes(&path).await
}
pub async fn read_all_record_batches(
handle: &JammiObjectStore,
) -> Result<Vec<arrow::array::RecordBatch>, StorageError> {
let path = handle.data_path()?;
let reader = ParquetObjectReader::new(handle.driver(), path.clone());
let stream = ParquetRecordBatchStreamBuilder::new(reader)
.await
.map_err(|e| {
StorageError::layout(path.to_string(), format!("Parquet reader builder: {e}"))
})?
.build()
.map_err(|e| {
StorageError::layout(path.to_string(), format!("Parquet stream build: {e}"))
})?;
stream
.try_collect()
.await
.map_err(|e| StorageError::layout(path.to_string(), format!("Parquet batch read: {e}")))
}