use super::data_evolution_reader::DataEvolutionReader;
use super::data_file_reader::DataFileReader;
use super::read_builder::split_scan_predicates;
use super::{ArrowRecordBatchStream, Table};
use crate::arrow::filtering::reader_pruning_predicates;
use crate::spec::{CoreOptions, DataField, Predicate};
use crate::{DataSplit, Error};
#[derive(Debug, Clone)]
pub struct TableRead<'a> {
table: &'a Table,
read_type: Vec<DataField>,
data_predicates: Vec<Predicate>,
}
impl<'a> TableRead<'a> {
pub fn new(
table: &'a Table,
read_type: Vec<DataField>,
data_predicates: Vec<Predicate>,
) -> Self {
Self {
table,
read_type,
data_predicates,
}
}
pub fn read_type(&self) -> &[DataField] {
&self.read_type
}
pub fn data_predicates(&self) -> &[Predicate] {
&self.data_predicates
}
pub fn table(&self) -> &Table {
self.table
}
pub fn with_filter(mut self, filter: Predicate) -> Self {
let (_, data_predicates) = split_scan_predicates(self.table, filter);
self.data_predicates = reader_pruning_predicates(data_predicates);
self
}
pub fn to_arrow(&self, data_splits: &[DataSplit]) -> crate::Result<ArrowRecordBatchStream> {
let has_primary_keys = !self.table.schema.primary_keys().is_empty();
let core_options = CoreOptions::new(self.table.schema.options());
let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
let data_evolution = core_options.data_evolution_enabled();
if has_primary_keys && !deletion_vectors_enabled {
return Err(Error::Unsupported {
message: format!(
"Reading primary-key tables without deletion vectors is not yet supported. Primary keys: {:?}",
self.table.schema.primary_keys()
),
});
}
if data_evolution {
let reader = DataEvolutionReader::new(
self.table.file_io.clone(),
self.table.schema_manager().clone(),
self.table.schema().id(),
self.table.schema.fields().to_vec(),
self.read_type().to_vec(),
)?;
reader.read(data_splits)
} else {
let reader = DataFileReader::new(
self.table.file_io.clone(),
self.table.schema_manager().clone(),
self.table.schema().id(),
self.table.schema.fields().to_vec(),
self.read_type().to_vec(),
self.data_predicates.clone(),
);
reader.read(data_splits)
}
}
}