use std::sync::Arc;
use itertools::Itertools;
use tracing::debug;
use url::Url;
use crate::actions::deletion_vector::split_vector;
use crate::scan::state::GlobalScanState;
use crate::scan::{ColumnType, PhysicalPredicate, ScanResult};
use crate::schema::{SchemaRef, StructType};
use crate::{DeltaResult, Engine, ExpressionRef, FileMeta};
use super::log_replay::{table_changes_action_iter, TableChangesScanData};
use super::physical_to_logical::{physical_to_logical_expr, scan_file_physical_schema};
use super::resolve_dvs::{resolve_scan_file_dv, ResolvedCdfScanFile};
use super::scan_file::scan_data_to_scan_file;
use super::{TableChanges, CDF_FIELDS};
#[derive(Debug)]
pub struct TableChangesScan {
table_changes: Arc<TableChanges>,
logical_schema: SchemaRef,
physical_schema: SchemaRef,
physical_predicate: PhysicalPredicate,
all_fields: Arc<Vec<ColumnType>>,
}
#[derive(Debug)]
pub struct TableChangesScanBuilder {
table_changes: Arc<TableChanges>,
schema: Option<SchemaRef>,
predicate: Option<ExpressionRef>,
}
impl TableChangesScanBuilder {
pub fn new(table_changes: impl Into<Arc<TableChanges>>) -> Self {
Self {
table_changes: table_changes.into(),
schema: None,
predicate: None,
}
}
pub fn with_schema(mut self, schema: impl Into<Option<SchemaRef>>) -> Self {
self.schema = schema.into();
self
}
pub fn with_predicate(mut self, predicate: impl Into<Option<ExpressionRef>>) -> Self {
self.predicate = predicate.into();
self
}
pub fn build(self) -> DeltaResult<TableChangesScan> {
let logical_schema = self
.schema
.unwrap_or_else(|| self.table_changes.schema.clone().into());
let mut read_fields = Vec::with_capacity(logical_schema.fields.len());
let all_fields = logical_schema
.fields()
.enumerate()
.map(|(index, logical_field)| -> DeltaResult<_> {
if self
.table_changes
.partition_columns()
.contains(logical_field.name())
{
Ok(ColumnType::Partition(index))
} else if CDF_FIELDS
.iter()
.any(|field| field.name() == logical_field.name())
{
Ok(ColumnType::Selected(logical_field.name().to_string()))
} else {
let physical_field = logical_field.make_physical();
debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n");
let physical_name = physical_field.name.clone();
read_fields.push(physical_field);
Ok(ColumnType::Selected(physical_name))
}
})
.try_collect()?;
let physical_predicate = match self.predicate {
Some(predicate) => PhysicalPredicate::try_new(&predicate, &logical_schema)?,
None => PhysicalPredicate::None,
};
Ok(TableChangesScan {
table_changes: self.table_changes,
logical_schema,
physical_predicate,
all_fields: Arc::new(all_fields),
physical_schema: StructType::new(read_fields).into(),
})
}
}
impl TableChangesScan {
fn scan_data(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanData>>> {
let commits = self
.table_changes
.log_segment
.ascending_commit_files
.clone();
let physical_predicate = match self.physical_predicate.clone() {
PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()),
PhysicalPredicate::Some(predicate, schema) => Some((predicate, schema)),
PhysicalPredicate::None => None,
};
let schema = self.table_changes.end_snapshot.schema().clone().into();
let it = table_changes_action_iter(engine, commits, schema, physical_predicate)?;
Ok(Some(it).into_iter().flatten())
}
fn global_scan_state(&self) -> GlobalScanState {
let end_snapshot = &self.table_changes.end_snapshot;
GlobalScanState {
table_root: self.table_changes.table_root.to_string(),
partition_columns: end_snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
physical_schema: self.physical_schema.clone(),
column_mapping_mode: end_snapshot.column_mapping_mode,
}
}
pub fn schema(&self) -> &SchemaRef {
&self.logical_schema
}
fn physical_predicate(&self) -> Option<ExpressionRef> {
if let PhysicalPredicate::Some(ref predicate, _) = self.physical_predicate {
Some(predicate.clone())
} else {
None
}
}
pub fn execute(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> {
let scan_data = self.scan_data(engine.clone())?;
let scan_files = scan_data_to_scan_file(scan_data);
let global_scan_state = self.global_scan_state();
let table_root = self.table_changes.table_root().clone();
let all_fields = self.all_fields.clone();
let physical_predicate = self.physical_predicate();
let dv_engine_ref = engine.clone();
let result = scan_files
.map(move |scan_file| {
resolve_scan_file_dv(dv_engine_ref.as_ref(), &table_root, scan_file?)
}) .flatten_ok() .map(move |resolved_scan_file| -> DeltaResult<_> {
read_scan_file(
engine.as_ref(),
resolved_scan_file?,
&global_scan_state,
&all_fields,
physical_predicate.clone(),
)
}) .flatten_ok() .map(|x| x?);
Ok(result)
}
}
fn read_scan_file(
engine: &dyn Engine,
resolved_scan_file: ResolvedCdfScanFile,
global_state: &GlobalScanState,
all_fields: &[ColumnType],
physical_predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> {
let ResolvedCdfScanFile {
scan_file,
mut selection_vector,
} = resolved_scan_file;
let physical_to_logical_expr =
physical_to_logical_expr(&scan_file, global_state.logical_schema.as_ref(), all_fields)?;
let physical_schema =
scan_file_physical_schema(&scan_file, global_state.physical_schema.as_ref());
let phys_to_logical_eval = engine.get_expression_handler().get_evaluator(
physical_schema.clone(),
physical_to_logical_expr,
global_state.logical_schema.clone().into(),
);
let is_dv_resolved_pair = scan_file.remove_dv.is_some();
let table_root = Url::parse(&global_state.table_root)?;
let location = table_root.join(&scan_file.path)?;
let file = FileMeta {
last_modified: 0,
size: 0,
location,
};
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[file],
physical_schema,
physical_predicate,
)?;
let result = read_result_iter.map(move |batch| -> DeltaResult<_> {
let batch = batch?;
let logical = phys_to_logical_eval.evaluate(batch.as_ref());
let len = logical.as_ref().map_or(0, |res| res.len());
let mut sv = selection_vector.take();
let extend = Some(!is_dv_resolved_pair);
let rest = split_vector(sv.as_mut(), len, extend);
let result = ScanResult {
raw_data: logical,
raw_mask: sv,
};
selection_vector = rest;
Ok(result)
});
Ok(result)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::engine::sync::SyncEngine;
use crate::expressions::{column_expr, Scalar};
use crate::scan::{ColumnType, PhysicalPredicate};
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::COMMIT_VERSION_COL_NAME;
use crate::{Expression, Table};
#[test]
fn simple_table_changes_scan_builder() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();
let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap();
let scan = table_changes.into_scan_builder().build().unwrap();
assert_eq!(
scan.all_fields,
vec![
ColumnType::Selected("part".to_string()),
ColumnType::Selected("id".to_string()),
ColumnType::Selected("_change_type".to_string()),
ColumnType::Selected("_commit_version".to_string()),
ColumnType::Selected("_commit_timestamp".to_string()),
]
.into()
);
assert_eq!(scan.physical_predicate, PhysicalPredicate::None);
}
#[test]
fn projected_and_filtered_table_changes_scan_builder() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();
let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap();
let schema = table_changes
.schema()
.project(&["id", COMMIT_VERSION_COL_NAME])
.unwrap();
let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10)));
let scan = table_changes
.into_scan_builder()
.with_schema(schema)
.with_predicate(predicate.clone())
.build()
.unwrap();
assert_eq!(
scan.all_fields,
vec![
ColumnType::Selected("id".to_string()),
ColumnType::Selected("_commit_version".to_string()),
]
.into()
);
assert_eq!(
scan.logical_schema,
StructType::new([
StructField::new("id", DataType::INTEGER, true),
StructField::new("_commit_version", DataType::LONG, false),
])
.into()
);
assert_eq!(
scan.physical_predicate,
PhysicalPredicate::Some(
predicate,
StructType::new([StructField::new("id", DataType::INTEGER, true),]).into()
)
);
}
}