use std::sync::Arc;
use itertools::Itertools;
use url::Url;
use crate::actions::deletion_vector::split_vector;
use crate::scan::field_classifiers::CdfTransformFieldClassifier;
use crate::scan::state_info::StateInfo;
use crate::scan::PhysicalPredicate;
use crate::scan::StatsOutputMode;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta, PredicateRef};
use super::log_replay::{table_changes_action_iter, TableChangesScanMetadata};
use super::physical_to_logical::{get_cdf_transform_expr, scan_file_physical_schema};
use super::resolve_dvs::{resolve_scan_file_dv, ResolvedCdfScanFile};
use super::scan_file::scan_metadata_to_scan_file;
use super::TableChanges;
#[derive(Debug)]
pub struct TableChangesScan {
table_changes: Arc<TableChanges>,
state_info: Arc<StateInfo>,
}
#[derive(Debug)]
pub struct TableChangesScanBuilder {
table_changes: Arc<TableChanges>,
schema: Option<SchemaRef>,
predicate: Option<PredicateRef>,
}
impl TableChangesScanBuilder {
pub fn new(table_changes: impl Into<Arc<TableChanges>>) -> Self {
Self {
table_changes: table_changes.into(),
schema: None,
predicate: None,
}
}
pub fn with_schema(mut self, schema: impl Into<Option<SchemaRef>>) -> Self {
self.schema = schema.into();
self
}
pub fn with_predicate(mut self, predicate: impl Into<Option<PredicateRef>>) -> Self {
self.predicate = predicate.into();
self
}
pub fn build(self) -> DeltaResult<TableChangesScan> {
let logical_schema = self
.schema
.unwrap_or_else(|| self.table_changes.schema.clone().into());
let state_info = StateInfo::try_new(
logical_schema,
self.table_changes.end_snapshot.table_configuration(),
self.predicate,
StatsOutputMode::default(),
CdfTransformFieldClassifier,
)?;
Ok(TableChangesScan {
table_changes: self.table_changes,
state_info: Arc::new(state_info),
})
}
}
impl TableChangesScan {
fn scan_metadata(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanMetadata>>> {
let commits = self
.table_changes
.log_segment
.listed
.ascending_commit_files
.clone();
let physical_predicate = match self.state_info.physical_predicate.clone() {
PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()),
PhysicalPredicate::Some(predicate, schema) => Some((predicate, schema)),
PhysicalPredicate::None => None,
};
let schema = self.table_changes.end_snapshot.schema();
let it = table_changes_action_iter(
engine,
&self.table_changes.start_table_config,
commits,
schema,
physical_predicate,
)?;
Ok(Some(it).into_iter().flatten())
}
pub fn logical_schema(&self) -> &SchemaRef {
&self.state_info.logical_schema
}
pub fn physical_schema(&self) -> &SchemaRef {
&self.state_info.physical_schema
}
pub fn table_root(&self) -> &Url {
self.table_changes.table_root()
}
fn physical_predicate(&self) -> Option<PredicateRef> {
if let PhysicalPredicate::Some(ref predicate, _) = self.state_info.physical_predicate {
Some(predicate.clone())
} else {
None
}
}
pub fn execute(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>>> {
let scan_metadata = self.scan_metadata(engine.clone())?;
let scan_files = scan_metadata_to_scan_file(scan_metadata);
let table_root = self.table_changes.table_root().clone();
let state_info = self.state_info.clone();
let dv_engine_ref = engine.clone();
let table_root_copy = self.table_changes.table_root().clone();
let physical_predicate = self.physical_predicate().clone();
let result = scan_files
.map(move |scan_file| {
resolve_scan_file_dv(dv_engine_ref.as_ref(), &table_root, scan_file?)
}) .flatten_ok() .map(move |resolved_scan_file| -> DeltaResult<_> {
read_scan_file(
engine.as_ref(),
resolved_scan_file?,
&table_root_copy,
state_info.as_ref(),
physical_predicate.clone(),
)
}) .flatten_ok() .map(|x| x?);
Ok(result)
}
}
fn read_scan_file(
engine: &dyn Engine,
resolved_scan_file: ResolvedCdfScanFile,
table_root: &Url,
state_info: &StateInfo,
_physical_predicate: Option<PredicateRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>>> {
let ResolvedCdfScanFile {
scan_file,
mut selection_vector,
} = resolved_scan_file;
let physical_schema =
scan_file_physical_schema(&scan_file, state_info.physical_schema.as_ref());
let transform_expr = get_cdf_transform_expr(&scan_file, state_info, physical_schema.as_ref())?;
let phys_to_logical_eval = transform_expr
.map(|expr| {
engine.evaluation_handler().new_expression_evaluator(
physical_schema.clone(),
expr,
state_info.logical_schema.clone().into(),
)
})
.transpose()?;
let is_dv_resolved_pair = scan_file.remove_dv.is_some();
let location = table_root.join(&scan_file.path)?;
let file = FileMeta {
last_modified: 0,
size: match scan_file.size {
Some(s) => s
.try_into()
.map_err(|_| Error::generic(format!("invalid file size: {s}")))?,
None => 0,
},
location,
};
let read_result_iter =
engine
.parquet_handler()
.read_parquet_files(&[file], physical_schema, None)?;
let result = read_result_iter.map(move |batch| -> DeltaResult<_> {
let batch = batch?;
let logical = if let Some(ref eval) = phys_to_logical_eval {
eval.evaluate(batch.as_ref())
} else {
Ok(batch)
};
let len = logical.as_ref().map_or(0, |res| res.len());
let mut sv = selection_vector.take();
let extend = Some(!is_dv_resolved_pair);
let rest = split_vector(sv.as_mut(), len, extend);
let result = match sv {
Some(sv) => logical.and_then(|data| data.apply_selection_vector(sv)),
None => logical,
};
selection_vector = rest;
result
});
Ok(result)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::engine::sync::SyncEngine;
use crate::expressions::{column_expr, Scalar};
use crate::scan::transform_spec::FieldTransformSpec;
use crate::scan::PhysicalPredicate;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::TableChanges;
use crate::table_changes::COMMIT_VERSION_COL_NAME;
use crate::Predicate;
#[test]
fn simple_table_changes_scan_builder() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let url = delta_kernel::try_parse_uri(path).unwrap();
let table_changes = TableChanges::try_new(url, engine.as_ref(), 0, Some(1)).unwrap();
let scan = table_changes.into_scan_builder().build().unwrap();
assert!(scan.state_info.transform_spec.is_some());
let transform_spec = scan.state_info.transform_spec.as_ref().unwrap();
assert_eq!(transform_spec.len(), 3);
assert!(transform_spec.iter().any(|t| matches!(t,
FieldTransformSpec::DynamicColumn {
physical_name,
insert_after: Some(insert_after),
field_index
}
if physical_name == "_change_type"
&& insert_after == "id"
&& *field_index == 2
)));
assert!(transform_spec.iter().any(|t| matches!(t,
FieldTransformSpec::MetadataDerivedColumn {
insert_after: Some(insert_after),
field_index
}
if insert_after == "id" && *field_index == 3
)));
assert!(transform_spec.iter().any(|t| matches!(t,
FieldTransformSpec::MetadataDerivedColumn {
insert_after: Some(insert_after),
field_index
}
if insert_after == "id" && *field_index == 4
)));
assert!(matches!(
scan.state_info.physical_predicate,
PhysicalPredicate::None
));
}
#[test]
fn projected_and_filtered_table_changes_scan_builder() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let url = delta_kernel::try_parse_uri(path).unwrap();
let table_changes = TableChanges::try_new(url, engine.as_ref(), 0, Some(1)).unwrap();
let schema = table_changes
.schema()
.project(&["id", COMMIT_VERSION_COL_NAME])
.unwrap();
let predicate = Arc::new(Predicate::gt(column_expr!("id"), Scalar::from(10)));
let scan = table_changes
.into_scan_builder()
.with_schema(schema.clone())
.with_predicate(predicate.clone())
.build()
.unwrap();
assert_eq!(
*scan.logical_schema(),
StructType::new_unchecked([
StructField::nullable("id", DataType::INTEGER),
StructField::not_null("_commit_version", DataType::LONG),
])
.into()
);
assert_eq!(scan.state_info.physical_schema.fields().len(), 1);
assert_eq!(
scan.state_info
.physical_schema
.field_at_index(0)
.unwrap()
.name(),
"id"
);
assert!(scan.state_info.transform_spec.is_some());
let transform_spec = scan.state_info.transform_spec.as_ref().unwrap();
assert_eq!(transform_spec.len(), 1); assert!(matches!(&transform_spec[0],
FieldTransformSpec::MetadataDerivedColumn {
field_index,
insert_after: Some(insert_after)
}
if *field_index == 1 && insert_after == "id"
));
assert!(matches!(&scan.state_info.physical_predicate,
PhysicalPredicate::Some(pred, pred_schema)
if pred == &predicate && pred_schema.fields().len() == 1
));
}
}