use std::sync::Arc;
use std::time::SystemTime;
use arrow_schema::{ArrowError, Field, Schema};
use chrono::{DateTime, Utc};
use datafusion::catalog::Session;
use datafusion::common::ScalarValue;
use datafusion::config::TableParquetOptions;
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::table_schema::TableSchema;
use datafusion::physical_expr::{PhysicalExpr, expressions};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::union::UnionExec;
use tracing::log;
use crate::DeltaTableError;
use crate::delta_datafusion::{DataFusionMixins, DeltaSessionExt};
use crate::errors::DeltaResult;
use crate::kernel::transaction::PROTOCOL;
use crate::kernel::{
Action, Add, AddCDCFile, CommitInfo, EagerSnapshot, Version, resolve_snapshot,
};
use crate::logstore::{LogStoreRef, get_actions};
use crate::{delta_datafusion::cdf::*, kernel::Remove};
#[derive(Clone)]
pub struct CdfLoadBuilder {
pub(crate) snapshot: Option<EagerSnapshot>,
log_store: LogStoreRef,
starting_version: Option<Version>,
ending_version: Option<Version>,
starting_timestamp: Option<DateTime<Utc>>,
ending_timestamp: Option<DateTime<Utc>>,
allow_out_of_range: bool,
session: Option<Arc<dyn Session>>,
}
impl std::fmt::Debug for CdfLoadBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CdfLoadBuilder")
.field("snapshot", &self.snapshot)
.field("log_store", &self.log_store)
.field("starting_version", &self.starting_version)
.field("ending_version", &self.ending_version)
.field("starting_timestamp", &self.starting_timestamp)
.field("ending_timestamp", &self.ending_timestamp)
.field("allow_out_of_range", &self.allow_out_of_range)
.finish()
}
}
impl CdfLoadBuilder {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
snapshot,
log_store,
starting_version: None,
ending_version: None,
starting_timestamp: None,
ending_timestamp: None,
allow_out_of_range: false,
session: None,
}
}
pub fn with_starting_version(mut self, starting_version: Version) -> Self {
self.starting_version = Some(starting_version);
self
}
pub fn with_ending_version(mut self, ending_version: Version) -> Self {
self.ending_version = Some(ending_version);
self
}
pub fn with_ending_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.ending_timestamp = Some(timestamp);
self
}
pub fn with_starting_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.starting_timestamp = Some(timestamp);
self
}
pub fn with_allow_out_of_range(mut self) -> Self {
self.allow_out_of_range = true;
self
}
pub fn with_session_state(mut self, session: Arc<dyn Session>) -> Self {
self.session = Some(session);
self
}
async fn calculate_earliest_version(&self, snapshot: &EagerSnapshot) -> DeltaResult<Version> {
let ts = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
for v in 0..snapshot.version() {
if let Ok(Some(bytes)) = self.log_store.read_commit_entry(v).await
&& let Ok(actions) = get_actions(v, &bytes)
&& actions.iter().any(|action| {
matches!(action, Action::CommitInfo(CommitInfo {
timestamp: Some(t), ..
}) if ts.timestamp_millis() < *t)
})
{
return Ok(v);
}
}
Ok(0)
}
async fn determine_files_to_read(
&self,
snapshot: &EagerSnapshot,
) -> DeltaResult<(
Vec<CdcDataSpec<AddCDCFile>>,
Vec<CdcDataSpec<Add>>,
Vec<CdcDataSpec<Remove>>,
)> {
if self.starting_version.is_none() && self.starting_timestamp.is_none() {
return Err(DeltaTableError::NoStartingVersionOrTimestamp);
}
let start = if let Some(s) = self.starting_version {
s
} else {
self.calculate_earliest_version(snapshot).await?
};
let mut change_files: Vec<CdcDataSpec<AddCDCFile>> = vec![];
let mut add_files: Vec<CdcDataSpec<Add>> = vec![];
let mut remove_files: Vec<CdcDataSpec<Remove>> = vec![];
let latest_version = match self.log_store.get_latest_version(start).await {
Ok(latest_version) => latest_version,
Err(DeltaTableError::InvalidVersion(_)) if self.allow_out_of_range => {
return Ok((change_files, add_files, remove_files));
}
Err(e) => return Err(e),
};
let mut end = self.ending_version.unwrap_or(latest_version);
if end > latest_version {
end = latest_version;
}
if end < start {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
};
}
if start > latest_version {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::InvalidVersion(start))
};
}
let starting_timestamp = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
let ending_timestamp = self
.ending_timestamp
.unwrap_or(DateTime::from(SystemTime::now()));
let latest_snapshot_bytes = self
.log_store
.read_commit_entry(latest_version)
.await?
.ok_or(DeltaTableError::InvalidVersion(latest_version))?;
let latest_version_actions: Vec<Action> =
get_actions(latest_version, &latest_snapshot_bytes)?;
let latest_version_commit = latest_version_actions
.iter()
.find(|a| matches!(a, Action::CommitInfo(_)));
if let Some(Action::CommitInfo(CommitInfo {
timestamp: Some(latest_timestamp),
..
})) = latest_version_commit
&& starting_timestamp.timestamp_millis() > *latest_timestamp
{
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { ending_timestamp })
};
}
log::debug!(
"starting timestamp = {starting_timestamp:?}, ending timestamp = {ending_timestamp:?}"
);
log::debug!("starting version = {start}, ending version = {end:?}");
for version in start..=end {
let snapshot_bytes = self
.log_store
.read_commit_entry(version)
.await?
.ok_or(DeltaTableError::InvalidVersion(version));
let version_actions: Vec<Action> = get_actions(version, &snapshot_bytes?)?;
let mut ts = 0;
let mut cdc_actions = vec![];
if self.starting_timestamp.is_some() || self.ending_timestamp.is_some() {
let version_commit = version_actions
.iter()
.find(|a| matches!(a, Action::CommitInfo(_)));
if let Some(Action::CommitInfo(CommitInfo {
timestamp: Some(t), ..
})) = version_commit
&& (starting_timestamp.timestamp_millis() > *t
|| *t > ending_timestamp.timestamp_millis())
{
log::debug!("Version: {version} skipped, due to commit timestamp");
continue;
}
}
for action in &version_actions {
match action {
Action::Cdc(f) => cdc_actions.push(f.clone()),
Action::Metadata(md) => {
log::info!("Metadata: {md:?}");
if let Some(key) = &md.configuration().get("delta.enableChangeDataFeed") {
let key = key.to_lowercase();
if (version == start && key != "true") || key == "false" {
return Err(DeltaTableError::ChangeDataNotRecorded {
version,
start,
end,
});
}
} else if version == start {
return Err(DeltaTableError::ChangeDataNotEnabled { version });
};
}
Action::CommitInfo(ci) => {
ts = ci.timestamp.unwrap_or(0);
}
_ => {}
}
}
if !cdc_actions.is_empty() {
log::debug!(
"Located {} cdf actions for version: {version}",
cdc_actions.len(),
);
change_files.push(CdcDataSpec::new(version, ts, cdc_actions))
} else {
let add_actions = version_actions
.iter()
.filter_map(|a| match a {
Action::Add(a) if a.data_change => Some(a.clone()),
_ => None,
})
.collect::<Vec<Add>>();
let remove_actions = version_actions
.iter()
.filter_map(|r| match r {
Action::Remove(r) if r.data_change => Some(r.clone()),
_ => None,
})
.collect::<Vec<Remove>>();
if !add_actions.is_empty() {
log::debug!(
"Located {} cdf actions for version: {version}",
add_actions.len(),
);
add_files.push(CdcDataSpec::new(version, ts, add_actions));
}
if !remove_actions.is_empty() {
log::debug!(
"Located {} cdf actions for version: {version}",
remove_actions.len(),
);
remove_files.push(CdcDataSpec::new(version, ts, remove_actions));
}
}
}
Ok((change_files, add_files, remove_files))
}
#[inline]
fn get_add_action_type() -> Option<ScalarValue> {
Some(ScalarValue::Utf8(Some(String::from("insert"))))
}
#[inline]
fn get_remove_action_type() -> Option<ScalarValue> {
Some(ScalarValue::Utf8(Some(String::from("delete"))))
}
pub async fn build(
&self,
session: &dyn Session,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> DeltaResult<Arc<dyn ExecutionPlan>> {
let snapshot = resolve_snapshot(&self.log_store, self.snapshot.clone(), true, None).await?;
PROTOCOL.can_read_from(&snapshot)?;
let (cdc, add, remove) = self.determine_files_to_read(&snapshot).await?;
session.ensure_log_store_registered(self.log_store.as_ref())?;
let partition_values = snapshot.metadata().partition_columns().clone();
let schema = snapshot.input_schema();
let schema_fields: Vec<Arc<Field>> = schema
.fields()
.into_iter()
.filter(|f| !partition_values.contains(f.name()))
.cloned()
.collect();
let this_partition_values = partition_values
.iter()
.map(|name| schema.field_with_name(name).map(|f| f.to_owned()))
.collect::<Result<Vec<_>, ArrowError>>()?;
let cdc_file_schema = create_cdc_schema(schema_fields.clone(), true);
let add_remove_file_schema = create_cdc_schema(schema_fields, false);
let mut cdc_partition_cols = CDC_PARTITION_SCHEMA.clone();
let mut add_remove_partition_cols = ADD_PARTITION_SCHEMA.clone();
cdc_partition_cols.extend_from_slice(&this_partition_values);
add_remove_partition_cols.extend_from_slice(&this_partition_values);
let cdc_file_groups =
create_partition_values(schema.clone(), cdc, &partition_values, None)?;
let add_file_groups = create_partition_values(
schema.clone(),
add,
&partition_values,
Self::get_add_action_type(),
)?;
let remove_file_groups = create_partition_values(
schema.clone(),
remove,
&partition_values,
Self::get_remove_action_type(),
)?;
let cdc_partition_fields: Vec<Arc<Field>> =
cdc_partition_cols.into_iter().map(Arc::new).collect();
let add_remove_partition_fields: Vec<Arc<Field>> = add_remove_partition_cols
.into_iter()
.map(Arc::new)
.collect();
let cdc_table_schema = TableSchema::new(Arc::clone(&cdc_file_schema), cdc_partition_fields);
let add_table_schema = TableSchema::new(
Arc::clone(&add_remove_file_schema),
add_remove_partition_fields.clone(),
);
let remove_table_schema = TableSchema::new(
Arc::clone(&add_remove_file_schema),
add_remove_partition_fields,
);
let parquet_options = TableParquetOptions {
global: session.config().options().execution.parquet.clone(),
..Default::default()
};
let mut cdc_source = ParquetSource::new(cdc_table_schema)
.with_table_parquet_options(parquet_options.clone());
let mut add_source = ParquetSource::new(add_table_schema)
.with_table_parquet_options(parquet_options.clone());
let mut remove_source =
ParquetSource::new(remove_table_schema).with_table_parquet_options(parquet_options);
if let Some(filters) = filters {
cdc_source = cdc_source.with_predicate(Arc::clone(filters));
add_source = add_source.with_predicate(Arc::clone(filters));
remove_source = remove_source.with_predicate(Arc::clone(filters));
}
let cdc_scan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(
FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(cdc_source))
.with_file_groups(cdc_file_groups.into_values().map(FileGroup::from).collect())
.build(),
);
let add_scan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(
FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(add_source))
.with_file_groups(add_file_groups.into_values().map(FileGroup::from).collect())
.build(),
);
let remove_scan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(
FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(remove_source))
.with_file_groups(
remove_file_groups
.into_values()
.map(FileGroup::from)
.collect(),
)
.build(),
);
let union_scan = UnionExec::try_new(vec![cdc_scan, add_scan, remove_scan])?;
let mut fields = schema.fields().to_vec();
for f in ADD_PARTITION_SCHEMA.clone() {
fields.push(f.into());
}
let project_schema = Schema::new(fields);
let union_schema = union_scan.schema();
let expressions: Vec<(Arc<dyn PhysicalExpr>, String)> = project_schema
.fields()
.into_iter()
.map(|f| -> (Arc<dyn PhysicalExpr>, String) {
let field_name = f.name();
let expr = Arc::new(expressions::Column::new(
field_name,
union_schema.index_of(field_name).unwrap(),
));
(expr, field_name.to_owned())
})
.collect();
let scan = Arc::new(ProjectionExec::try_new(expressions, union_scan)?);
Ok(scan)
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::str::FromStr;
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::Schema;
use chrono::NaiveDateTime;
use datafusion::common::assert_batches_sorted_eq;
use datafusion::physical_plan::collect;
use datafusion::prelude::SessionContext;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use itertools::Itertools;
use crate::test_utils::TestSchemas;
use crate::writer::test_utils::TestResult;
use crate::{DeltaTable, TableProperty};
use std::path::Path;
use url::Url;
#[tokio::test]
async fn test_load_local() -> TestResult {
let ctx: SessionContext = SessionContext::new();
let table_path = Path::new("../test/tests/data/cdf-table");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTable::try_from_url(table_uri)
.await?
.scan_cdf()
.with_starting_version(0)
.build(&ctx.state(), None)
.await?;
let batches = collect(table, ctx.task_ctx()).await?;
assert_batches_sorted_eq! {
[
"+----+--------+------------+------------------+-----------------+-------------------------+",
"| id | name | birthday | _change_type | _commit_version | _commit_timestamp |",
"+----+--------+------------+------------------+-----------------+-------------------------+",
"| 1 | Steve | 2023-12-22 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 10 | Borb | 2023-12-25 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 2 | Bob | 2023-12-22 | update_postimage | 1 | 2023-12-22T17:10:21.675 |",
"| 2 | Bob | 2023-12-23 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 2 | Bob | 2023-12-23 | update_preimage | 1 | 2023-12-22T17:10:21.675 |",
"| 3 | Dave | 2023-12-22 | update_postimage | 1 | 2023-12-22T17:10:21.675 |",
"| 3 | Dave | 2023-12-23 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 3 | Dave | 2023-12-23 | update_preimage | 1 | 2023-12-22T17:10:21.675 |",
"| 4 | Kate | 2023-12-22 | update_postimage | 1 | 2023-12-22T17:10:21.675 |",
"| 4 | Kate | 2023-12-23 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 4 | Kate | 2023-12-23 | update_preimage | 1 | 2023-12-22T17:10:21.675 |",
"| 5 | Emily | 2023-12-24 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 5 | Emily | 2023-12-24 | update_preimage | 2 | 2023-12-29T21:41:33.785 |",
"| 5 | Emily | 2023-12-29 | update_postimage | 2 | 2023-12-29T21:41:33.785 |",
"| 6 | Carl | 2023-12-24 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 6 | Carl | 2023-12-24 | update_preimage | 2 | 2023-12-29T21:41:33.785 |",
"| 6 | Carl | 2023-12-29 | update_postimage | 2 | 2023-12-29T21:41:33.785 |",
"| 7 | Dennis | 2023-12-24 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 7 | Dennis | 2023-12-24 | update_preimage | 2 | 2023-12-29T21:41:33.785 |",
"| 7 | Dennis | 2023-12-29 | delete | 3 | 2024-01-06T16:44:59.570 |",
"| 7 | Dennis | 2023-12-29 | update_postimage | 2 | 2023-12-29T21:41:33.785 |",
"| 8 | Claire | 2023-12-25 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 9 | Ada | 2023-12-25 | insert | 0 | 2023-12-22T17:10:18.828 |",
"+----+--------+------------+------------------+-----------------+-------------------------+",
], &batches }
Ok(())
}
#[tokio::test]
async fn test_load_local_datetime() -> TestResult {
let ctx = SessionContext::new();
let starting_timestamp = NaiveDateTime::from_str("2023-12-22T17:10:21.675").unwrap();
let table_path = Path::new("../test/tests/data/cdf-table");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTable::try_from_url(table_uri)
.await?
.scan_cdf()
.with_starting_version(0)
.with_ending_timestamp(starting_timestamp.and_utc())
.build(&ctx.state(), None)
.await
.unwrap();
let batches = collect(table, ctx.task_ctx()).await?;
assert_batches_sorted_eq! {
[
"+----+--------+------------+------------------+-----------------+-------------------------+",
"| id | name | birthday | _change_type | _commit_version | _commit_timestamp |",
"+----+--------+------------+------------------+-----------------+-------------------------+",
"| 1 | Steve | 2023-12-22 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 10 | Borb | 2023-12-25 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 2 | Bob | 2023-12-22 | update_postimage | 1 | 2023-12-22T17:10:21.675 |",
"| 2 | Bob | 2023-12-23 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 2 | Bob | 2023-12-23 | update_preimage | 1 | 2023-12-22T17:10:21.675 |",
"| 3 | Dave | 2023-12-22 | update_postimage | 1 | 2023-12-22T17:10:21.675 |",
"| 3 | Dave | 2023-12-23 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 3 | Dave | 2023-12-23 | update_preimage | 1 | 2023-12-22T17:10:21.675 |",
"| 4 | Kate | 2023-12-22 | update_postimage | 1 | 2023-12-22T17:10:21.675 |",
"| 4 | Kate | 2023-12-23 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 4 | Kate | 2023-12-23 | update_preimage | 1 | 2023-12-22T17:10:21.675 |",
"| 5 | Emily | 2023-12-24 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 6 | Carl | 2023-12-24 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 7 | Dennis | 2023-12-24 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 8 | Claire | 2023-12-25 | insert | 0 | 2023-12-22T17:10:18.828 |",
"| 9 | Ada | 2023-12-25 | insert | 0 | 2023-12-22T17:10:18.828 |",
"+----+--------+------------+------------------+-----------------+-------------------------+",
],
&batches
}
Ok(())
}
#[tokio::test]
async fn test_load_local_non_partitioned() -> TestResult {
let ctx = SessionContext::new();
let table_path = Path::new("../test/tests/data/cdf-table-non-partitioned");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTable::try_from_url(table_uri)
.await?
.scan_cdf()
.with_starting_version(0)
.build(&ctx.state(), None)
.await?;
let batches = collect(table, ctx.task_ctx()).await?;
assert_batches_sorted_eq! {
["+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+-------------------------+",
"| id | name | birthday | long_field | boolean_field | double_field | smallint_field | _change_type | _commit_version | _commit_timestamp |",
"+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+-------------------------+",
"| 7 | Dennis | 2024-04-14 | 6 | true | 3.14 | 1 | delete | 3 | 2024-04-14T15:58:32.495 |",
"| 3 | Dave | 2024-04-15 | 2 | true | 3.14 | 1 | update_preimage | 1 | 2024-04-14T15:58:29.393 |",
"| 3 | Dave | 2024-04-14 | 2 | true | 3.14 | 1 | update_postimage | 1 | 2024-04-14T15:58:29.393 |",
"| 4 | Kate | 2024-04-15 | 3 | true | 3.14 | 1 | update_preimage | 1 | 2024-04-14T15:58:29.393 |",
"| 4 | Kate | 2024-04-14 | 3 | true | 3.14 | 1 | update_postimage | 1 | 2024-04-14T15:58:29.393 |",
"| 2 | Bob | 2024-04-15 | 1 | true | 3.14 | 1 | update_preimage | 1 | 2024-04-14T15:58:29.393 |",
"| 2 | Bob | 2024-04-14 | 1 | true | 3.14 | 1 | update_postimage | 1 | 2024-04-14T15:58:29.393 |",
"| 7 | Dennis | 2024-04-16 | 6 | true | 3.14 | 1 | update_preimage | 2 | 2024-04-14T15:58:31.257 |",
"| 7 | Dennis | 2024-04-14 | 6 | true | 3.14 | 1 | update_postimage | 2 | 2024-04-14T15:58:31.257 |",
"| 5 | Emily | 2024-04-16 | 4 | true | 3.14 | 1 | update_preimage | 2 | 2024-04-14T15:58:31.257 |",
"| 5 | Emily | 2024-04-14 | 4 | true | 3.14 | 1 | update_postimage | 2 | 2024-04-14T15:58:31.257 |",
"| 6 | Carl | 2024-04-16 | 5 | true | 3.14 | 1 | update_preimage | 2 | 2024-04-14T15:58:31.257 |",
"| 6 | Carl | 2024-04-14 | 5 | true | 3.14 | 1 | update_postimage | 2 | 2024-04-14T15:58:31.257 |",
"| 1 | Alex | 2024-04-14 | 1 | true | 3.14 | 1 | insert | 4 | 2024-04-14T15:58:33.444 |",
"| 2 | Alan | 2024-04-15 | 1 | true | 3.14 | 1 | insert | 4 | 2024-04-14T15:58:33.444 |",
"| 1 | Steve | 2024-04-14 | 1 | true | 3.14 | 1 | insert | 0 | 2024-04-14T15:58:26.249 |",
"| 2 | Bob | 2024-04-15 | 1 | true | 3.14 | 1 | insert | 0 | 2024-04-14T15:58:26.249 |",
"| 3 | Dave | 2024-04-15 | 2 | true | 3.14 | 1 | insert | 0 | 2024-04-14T15:58:26.249 |",
"| 4 | Kate | 2024-04-15 | 3 | true | 3.14 | 1 | insert | 0 | 2024-04-14T15:58:26.249 |",
"| 5 | Emily | 2024-04-16 | 4 | true | 3.14 | 1 | insert | 0 | 2024-04-14T15:58:26.249 |",
"| 6 | Carl | 2024-04-16 | 5 | true | 3.14 | 1 | insert | 0 | 2024-04-14T15:58:26.249 |",
"| 7 | Dennis | 2024-04-16 | 6 | true | 3.14 | 1 | insert | 0 | 2024-04-14T15:58:26.249 |",
"| 8 | Claire | 2024-04-17 | 7 | true | 3.14 | 1 | insert | 0 | 2024-04-14T15:58:26.249 |",
"| 9 | Ada | 2024-04-17 | 8 | true | 3.14 | 1 | insert | 0 | 2024-04-14T15:58:26.249 |",
"| 10 | Borb | 2024-04-17 | 99999999999999999 | true | 3.14 | 1 | insert | 0 | 2024-04-14T15:58:26.249 |",
"+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+-------------------------+"],
&batches
}
Ok(())
}
#[tokio::test]
async fn test_load_bad_version_range() -> TestResult {
let ctx = SessionContext::new();
let table_path = Path::new("../test/tests/data/cdf-table-non-partitioned");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTable::try_from_url(table_uri)
.await?
.scan_cdf()
.with_starting_version(4)
.with_ending_version(1)
.build(&ctx.state(), None)
.await;
assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
DeltaTableError::ChangeDataInvalidVersionRange { .. }
));
Ok(())
}
#[tokio::test]
async fn test_load_version_out_of_range() -> TestResult {
let ctx = SessionContext::new();
let table_path = Path::new("../test/tests/data/cdf-table-non-partitioned");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTable::try_from_url(table_uri)
.await?
.scan_cdf()
.with_starting_version(5)
.build(&ctx.state(), None)
.await;
assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
DeltaTableError::InvalidVersion(5)
));
Ok(())
}
#[tokio::test]
async fn test_load_version_out_of_range_with_flag() -> TestResult {
let ctx = SessionContext::new();
let table_path = Path::new("../test/tests/data/cdf-table-non-partitioned");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTable::try_from_url(table_uri)
.await?
.scan_cdf()
.with_starting_version(5)
.with_allow_out_of_range()
.build(&ctx.state(), None)
.await?;
let batches = collect(table, ctx.task_ctx()).await?;
assert!(batches.is_empty());
Ok(())
}
#[tokio::test]
async fn test_load_timestamp_out_of_range() -> TestResult {
let ending_timestamp = NaiveDateTime::from_str("2033-12-22T17:10:21.675").unwrap();
let ctx = SessionContext::new();
let table_path = Path::new("../test/tests/data/cdf-table-non-partitioned");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTable::try_from_url(table_uri)
.await?
.scan_cdf()
.with_starting_timestamp(ending_timestamp.and_utc())
.build(&ctx.state(), None)
.await;
assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
DeltaTableError::ChangeDataTimestampGreaterThanCommit { .. }
));
Ok(())
}
#[tokio::test]
async fn test_load_timestamp_out_of_range_with_flag() -> TestResult {
let ctx = SessionContext::new();
let ending_timestamp = NaiveDateTime::from_str("2033-12-22T17:10:21.675").unwrap();
let table_path = Path::new("../test/tests/data/cdf-table-non-partitioned");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTable::try_from_url(table_uri)
.await?
.scan_cdf()
.with_starting_timestamp(ending_timestamp.and_utc())
.with_allow_out_of_range()
.build(&ctx.state(), None)
.await?;
let batches = collect(table, ctx.task_ctx()).await?;
assert!(batches.is_empty());
Ok(())
}
#[tokio::test]
async fn test_load_non_cdf() -> TestResult {
let ctx = SessionContext::new();
let table_path = Path::new("../test/tests/data/simple_table");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTable::try_from_url(table_uri)
.await?
.scan_cdf()
.with_starting_version(0)
.build(&ctx.state(), None)
.await;
assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
DeltaTableError::ChangeDataNotEnabled { .. }
));
Ok(())
}
#[tokio::test]
async fn test_load_vacuumed_table() -> TestResult {
let ending_timestamp = NaiveDateTime::from_str("2024-01-06T15:44:59.570")?;
let ctx = SessionContext::new();
let table_path = Path::new("../test/tests/data/checkpoint-cdf-table");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTable::try_from_url(table_uri)
.await?
.scan_cdf()
.with_starting_timestamp(ending_timestamp.and_utc())
.build(&ctx.state(), None)
.await?;
let batches = collect(table, ctx.task_ctx()).await?;
assert_batches_sorted_eq! {
[
"+----+--------+------------+------------------+-----------------+-------------------------+",
"| id | name | birthday | _change_type | _commit_version | _commit_timestamp |",
"+----+--------+------------+------------------+-----------------+-------------------------+",
"| 11 | Ossama | 2024-12-30 | insert | 4 | 2025-01-06T16:33:18.167 |",
"| 11 | Ossama | 2024-12-30 | update_preimage | 5 | 2025-01-06T16:38:19.623 |",
"| 12 | Nick | 2023-12-29 | insert | 4 | 2025-01-06T16:33:18.167 |",
"| 12 | Nick | 2023-12-29 | update_preimage | 5 | 2025-01-06T16:38:19.623 |",
"| 12 | Ossama | 2024-12-30 | update_postimage | 5 | 2025-01-06T16:38:19.623 |",
"| 13 | Nick | 2023-12-29 | update_postimage | 5 | 2025-01-06T16:38:19.623 |",
"| 13 | Ryan | 2023-12-22 | insert | 4 | 2025-01-06T16:33:18.167 |",
"| 13 | Ryan | 2023-12-22 | update_preimage | 5 | 2025-01-06T16:38:19.623 |",
"| 14 | Ryan | 2023-12-22 | update_postimage | 5 | 2025-01-06T16:38:19.623 |",
"| 14 | Zach | 2023-12-25 | insert | 4 | 2025-01-06T16:33:18.167 |",
"| 14 | Zach | 2023-12-25 | update_preimage | 5 | 2025-01-06T16:38:19.623 |",
"| 15 | Zach | 2023-12-25 | update_postimage | 5 | 2025-01-06T16:38:19.623 |",
"| 7 | Dennis | 2023-12-29 | delete | 3 | 2024-01-06T16:44:59.570 |",
"+----+--------+------------+------------------+-----------------+-------------------------+",
],
&batches
}
Ok(())
}
#[tokio::test]
async fn test_use_remove_actions_for_deletions() -> TestResult {
let delta_schema = TestSchemas::simple();
let table: DeltaTable = DeltaTable::new_in_memory()
.create()
.with_columns(delta_schema.fields().cloned())
.with_partition_columns(["id"])
.with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
.await
.unwrap();
assert_eq!(table.version(), Some(0));
let schema: Arc<Schema> = Arc::new(delta_schema.try_into_arrow()?);
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(StringArray::from(vec![Some("1"), Some("2"), Some("3")])),
Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
Arc::new(StringArray::from(vec![
Some("yes"),
Some("yes"),
Some("no"),
])),
],
)
.unwrap();
let second_batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(StringArray::from(vec![Some("3")])),
Arc::new(Int32Array::from(vec![Some(10)])),
Arc::new(StringArray::from(vec![Some("yes")])),
],
)
.unwrap();
let table = table
.write(vec![batch])
.await
.expect("Failed to write first batch");
assert_eq!(table.version(), Some(1));
let table = table
.write([second_batch])
.with_save_mode(crate::protocol::SaveMode::Overwrite)
.await
.unwrap();
assert_eq!(table.version(), Some(2));
let ctx = SessionContext::new();
let cdf_scan = table
.clone()
.scan_cdf()
.with_starting_version(0)
.build(&ctx.state(), None)
.await
.expect("Failed to load CDF");
let mut batches = collect(cdf_scan, ctx.task_ctx())
.await
.expect("Failed to collect batches");
let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(5)).collect();
assert_batches_sorted_eq! {[
"+-------+----------+----+--------------+-----------------+",
"| value | modified | id | _change_type | _commit_version |",
"+-------+----------+----+--------------+-----------------+",
"| 1 | yes | 1 | delete | 2 |",
"| 1 | yes | 1 | insert | 1 |",
"| 10 | yes | 3 | insert | 2 |",
"| 2 | yes | 2 | delete | 2 |",
"| 2 | yes | 2 | insert | 1 |",
"| 3 | no | 3 | delete | 2 |",
"| 3 | no | 3 | insert | 1 |",
"+-------+----------+----+--------------+-----------------+",
], &batches }
let snapshot_bytes = table
.log_store
.read_commit_entry(2)
.await?
.expect("failed to get snapshot bytes");
let version_actions = get_actions(2, &snapshot_bytes)?;
let cdc_actions = version_actions
.iter()
.filter(|action| matches!(action, &&Action::Cdc(_)))
.collect_vec();
assert!(cdc_actions.is_empty());
Ok(())
}
}