use std::sync::{Arc, Mutex};
use crate::action_reconciliation::ActionReconciliationIterator;
use crate::actions::{ADD_NAME, REMOVE_NAME};
use crate::engine_data::filter_by_predicate;
use crate::expressions::{Expression, Predicate, Scalar, Transform};
use crate::schema::{SchemaRef, StructType};
use crate::{
DeltaResult, EngineData, Error, EvaluationHandler, ExpressionEvaluator, PredicateEvaluator,
};
struct NonNullRowsPicker {
transform: Arc<dyn ExpressionEvaluator>,
null_row_filter: Arc<dyn PredicateEvaluator>,
}
impl NonNullRowsPicker {
fn pick(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>> {
let transformed = self.transform.evaluate(batch)?;
filter_by_predicate(self.null_row_filter.as_ref(), transformed)
}
}
pub(super) struct SidecarSplitter {
checkpoint_data_iter: ActionReconciliationIterator,
file_actions_picker: NonNullRowsPicker,
non_file_actions_picker: NonNullRowsPicker,
non_file_action_batches: Vec<Box<dyn EngineData>>,
exhausted: bool,
}
impl SidecarSplitter {
pub(super) fn new(
checkpoint_data_iterator: ActionReconciliationIterator,
eval_handler: &dyn EvaluationHandler,
checkpoint_data_schema: SchemaRef,
) -> DeltaResult<Self> {
let add_field = checkpoint_data_schema.field(ADD_NAME).ok_or_else(|| {
Error::checkpoint_write(format!("Checkpoint data schema missing '{ADD_NAME}' field"))
})?;
let remove_field = checkpoint_data_schema.field(REMOVE_NAME).ok_or_else(|| {
Error::checkpoint_write(format!(
"Checkpoint data schema missing '{REMOVE_NAME}' field"
))
})?;
if !add_field.is_nullable() {
return Err(Error::checkpoint_write(format!(
"Checkpoint data schema '{ADD_NAME}' field must be nullable"
)));
}
if !remove_field.is_nullable() {
return Err(Error::checkpoint_write(format!(
"Checkpoint data schema '{REMOVE_NAME}' field must be nullable"
)));
}
let sidecar_output_schema: SchemaRef =
StructType::try_new([add_field.clone(), remove_field.clone()])?.into();
let file_action_projector = eval_handler.new_expression_evaluator(
checkpoint_data_schema.clone(),
Arc::new(Expression::struct_from([
Expression::column([ADD_NAME]),
Expression::column([REMOVE_NAME]),
])),
sidecar_output_schema.clone().into(),
)?;
let file_actions_null_row_filter = eval_handler.new_predicate_evaluator(
sidecar_output_schema,
Arc::new(Predicate::or(
Predicate::is_not_null(Expression::column([ADD_NAME])),
Predicate::is_not_null(Expression::column([REMOVE_NAME])),
)),
)?;
let non_file_action_nullifier = eval_handler.new_expression_evaluator(
checkpoint_data_schema.clone(),
Arc::new(Expression::transform(
Transform::new_top_level()
.with_replaced_field(
ADD_NAME,
Arc::new(Expression::literal(Scalar::Null(
add_field.data_type.clone(),
))),
)
.with_replaced_field(
REMOVE_NAME,
Arc::new(Expression::literal(Scalar::Null(
remove_field.data_type.clone(),
))),
),
)),
checkpoint_data_schema.clone().into(),
)?;
let non_file_actions_null_row_filter = {
let predicate = Predicate::or_from(
checkpoint_data_schema
.fields()
.filter(|f| f.name != ADD_NAME && f.name != REMOVE_NAME)
.map(|f| Predicate::is_not_null(Expression::column([f.name.as_str()]))),
);
eval_handler.new_predicate_evaluator(checkpoint_data_schema, Arc::new(predicate))?
};
Ok(Self {
checkpoint_data_iter: checkpoint_data_iterator,
file_actions_picker: NonNullRowsPicker {
transform: file_action_projector,
null_row_filter: file_actions_null_row_filter,
},
non_file_actions_picker: NonNullRowsPicker {
transform: non_file_action_nullifier,
null_row_filter: non_file_actions_null_row_filter,
},
non_file_action_batches: Vec::new(),
exhausted: false,
})
}
pub(super) fn new_mut_shared(
checkpoint_data_iterator: ActionReconciliationIterator,
eval_handler: &dyn EvaluationHandler,
checkpoint_data_schema: SchemaRef,
) -> DeltaResult<Arc<Mutex<Self>>> {
Self::new(
checkpoint_data_iterator,
eval_handler,
checkpoint_data_schema,
)
.map(|s| Arc::new(Mutex::new(s)))
}
pub(super) fn is_exhausted(&self) -> bool {
self.exhausted
}
pub(super) fn into_non_file_batches(self) -> Vec<Box<dyn EngineData>> {
self.non_file_action_batches
}
fn next_file_actions_batch(&mut self) -> Option<DeltaResult<Box<dyn EngineData>>> {
loop {
let result = self.checkpoint_data_iter.next().or_else(|| {
self.exhausted = true;
None
})?;
let batch = match result.and_then(|f| f.apply_selection_vector()) {
Ok(b) => b,
Err(e) => return Some(Err(e)),
};
let non_file_actions_batch = match self.non_file_actions_picker.pick(batch.as_ref()) {
Ok(b) => b,
Err(e) => return Some(Err(e)),
};
if !non_file_actions_batch.is_empty() {
self.non_file_action_batches.push(non_file_actions_batch);
}
match self.file_actions_picker.pick(batch.as_ref()) {
Ok(file_actions_batch) if file_actions_batch.is_empty() => continue,
other => return Some(other),
}
}
}
}
pub(super) struct SingleSidecarDataIterator {
splitter: Arc<Mutex<SidecarSplitter>>,
max_file_actions_hint: usize,
yielded_row_count: usize,
}
impl SingleSidecarDataIterator {
pub(super) fn new(
splitter: Arc<Mutex<SidecarSplitter>>,
max_file_actions_hint: usize,
) -> DeltaResult<Self> {
if max_file_actions_hint == 0 {
return Err(Error::checkpoint_write(
"max_file_actions_hint must be greater than 0",
));
}
Ok(Self {
splitter,
max_file_actions_hint,
yielded_row_count: 0,
})
}
}
impl Iterator for SingleSidecarDataIterator {
type Item = DeltaResult<Box<dyn EngineData>>;
fn next(&mut self) -> Option<Self::Item> {
if self.yielded_row_count >= self.max_file_actions_hint {
return None;
}
let mut splitter = match self.splitter.lock() {
Ok(guard) => guard,
Err(e) => {
return Some(Err(Error::internal_error(format!(
"sidecar splitter lock poisoned: {e}"
))))
}
};
match splitter.next_file_actions_batch() {
Some(Ok(file_batch)) => {
self.yielded_row_count += file_batch.len();
Some(Ok(file_batch))
}
Some(Err(e)) => Some(Err(e)),
None => None,
}
}
}
#[cfg(test)]
mod tests;