use std::sync::Arc;
use futures::{StreamExt, TryStreamExt};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use crate::arrow::ArrowReader;
use crate::arrow::reader::ParquetReadOptions;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
use crate::spec::{Schema, SchemaRef};
use crate::{Error, ErrorKind, Result};
#[allow(unused)]
#[async_trait::async_trait]
pub trait DeleteFileLoader {
async fn read_delete_file(
&self,
task: &FileScanTaskDeleteFile,
schema: SchemaRef,
) -> Result<ArrowRecordBatchStream>;
}
#[derive(Clone, Debug)]
pub(crate) struct BasicDeleteFileLoader {
file_io: FileIO,
}
#[allow(unused_variables)]
impl BasicDeleteFileLoader {
pub fn new(file_io: FileIO) -> Self {
BasicDeleteFileLoader { file_io }
}
pub(crate) async fn parquet_to_batch_stream(
&self,
data_file_path: &str,
file_size_in_bytes: u64,
) -> Result<ArrowRecordBatchStream> {
let parquet_read_options = ParquetReadOptions::builder().build();
let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
data_file_path,
&self.file_io,
file_size_in_bytes,
parquet_read_options,
)
.await?;
let record_batch_stream =
ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata)
.build()?
.map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}
pub(crate) async fn evolve_schema(
record_batch_stream: ArrowRecordBatchStream,
target_schema: Arc<Schema>,
equality_ids: &[i32],
) -> Result<ArrowRecordBatchStream> {
let mut record_batch_transformer =
RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build();
let record_batch_stream = record_batch_stream.map(move |record_batch| {
record_batch.and_then(|record_batch| {
record_batch_transformer.process_record_batch(record_batch)
})
});
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}
}
#[async_trait::async_trait]
impl DeleteFileLoader for BasicDeleteFileLoader {
async fn read_delete_file(
&self,
task: &FileScanTaskDeleteFile,
schema: SchemaRef,
) -> Result<ArrowRecordBatchStream> {
let raw_batch_stream = self
.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
.await?;
let field_ids = match &task.equality_ids {
Some(ids) => ids.clone(),
None => schema.field_id_to_name_map().keys().cloned().collect(),
};
Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use super::*;
use crate::arrow::delete_filter::tests::setup;
#[tokio::test]
async fn test_basic_delete_file_loader_read_delete_file() {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path();
let file_io = FileIO::new_with_fs();
let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
let file_scan_tasks = setup(table_location);
let result = delete_file_loader
.read_delete_file(
&file_scan_tasks[0].deletes[0],
file_scan_tasks[0].schema_ref(),
)
.await
.unwrap();
let result = result.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 1);
}
}