use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use itertools::Itertools;
use test_utils::LoggingTest;
use super::{table_changes_action_iter, TableChangesScanMetadata};
use crate::actions::deletion_vector::{DeletionVectorDescriptor, DeletionVectorStorageType};
use crate::actions::{Add, Cdc, CommitInfo, Metadata, Protocol, Remove};
use crate::engine::sync::SyncEngine;
use crate::expressions::{column_expr, BinaryPredicateOp, Scalar};
use crate::log_segment::LogSegment;
use crate::path::ParsedLogPath;
use crate::scan::state::DvInfo;
use crate::scan::PhysicalPredicate;
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::table_changes::log_replay::LogReplayScanner;
use crate::table_configuration::TableConfiguration;
use crate::table_features::{ColumnMappingMode, TableFeature};
use crate::utils::test_utils::{assert_result_error_with_message, Action, LocalMockTable};
use crate::{DeltaResult, Engine, Error, Predicate, Version};
fn get_schema() -> SchemaRef {
Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
]))
}
fn get_default_table_config(table_root: &url::Url) -> TableConfiguration {
let metadata = Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
("delta.columnMapping.mode".to_string(), "none".to_string()),
]),
)
.unwrap();
let protocol = Protocol::try_new_legacy(1, 4).unwrap();
TableConfiguration::try_new(metadata, protocol, table_root.clone(), 0).unwrap()
}
fn metadata_action(schema: SchemaRef, configuration: HashMap<String, String>) -> Action {
Action::Metadata(
Metadata::try_new(None, None, schema.clone(), vec![], 0, configuration).unwrap(),
)
}
fn metadata_with_cdf(schema: SchemaRef) -> Action {
metadata_action(
schema,
HashMap::from([("delta.enableChangeDataFeed".to_string(), "true".to_string())]),
)
}
fn protocol_action(
min_reader: i32,
min_writer: i32,
reader_features: Option<Vec<TableFeature>>,
writer_features: Option<Vec<TableFeature>>,
) -> Action {
Action::Protocol(
Protocol::try_new(min_reader, min_writer, reader_features, writer_features).unwrap(),
)
}
fn execute_table_changes(
engine: Arc<dyn Engine>,
mock_table: &LocalMockTable,
start_version: Version,
end_version: Option<Version>,
) -> DeltaResult<Vec<TableChangesScanMetadata>> {
let commits = get_segment(
engine.as_ref(),
mock_table.table_root(),
start_version,
end_version,
)?
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
table_changes_action_iter(engine, &table_config, commits, get_schema(), None)?.try_collect()
}
fn assert_midstream_failure(engine: Arc<dyn Engine>, mock_table: &LocalMockTable) {
let res_v0 = execute_table_changes(engine.clone(), mock_table, 0, Some(0));
assert!(res_v0.is_ok(), "Reading version 0 alone should succeed");
let res_v0_v1 = execute_table_changes(engine.clone(), mock_table, 0, Some(1));
assert!(
matches!(res_v0_v1, Err(Error::ChangeDataFeedUnsupported(_))),
"Reading versions 0-1 should fail"
);
let res_v1 = execute_table_changes(engine, mock_table, 1, Some(1));
assert!(
matches!(res_v1, Err(Error::ChangeDataFeedUnsupported(_))),
"Reading version 1 alone should fail"
);
}
fn get_segment(
engine: &dyn Engine,
path: &Path,
start_version: Version,
end_version: impl Into<Option<Version>>,
) -> DeltaResult<Vec<ParsedLogPath>> {
let table_root = url::Url::from_directory_path(path).unwrap();
let log_root = table_root.join("_delta_log/")?;
let log_segment = LogSegment::for_table_changes(
engine.storage_handler().as_ref(),
log_root,
start_version,
end_version,
)?;
Ok(log_segment.listed.ascending_commit_files)
}
fn result_to_sv(iter: impl Iterator<Item = DeltaResult<TableChangesScanMetadata>>) -> Vec<bool> {
iter.map_ok(|scan_metadata| scan_metadata.selection_vector.into_iter())
.flatten_ok()
.try_collect()
.unwrap()
}
#[tokio::test]
async fn metadata_protocol() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
Action::Metadata(
Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.columnMapping.mode".to_string(), "none".to_string()),
]),
)
.unwrap(),
),
Action::Protocol(
Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[TableFeature::DeletionVectors, TableFeature::ChangeDataFeed],
)
.unwrap(),
),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let scan_batches =
table_changes_action_iter(engine, &table_config, commits, get_schema(), None).unwrap();
let sv = result_to_sv(scan_batches);
assert_eq!(sv, &[false, false]);
}
#[tokio::test]
async fn cdf_not_enabled() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([Action::Metadata(
Metadata::try_new(None, None, get_schema(), vec![], 0, HashMap::new()).unwrap(),
)])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.try_collect();
assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_))));
}
#[tokio::test]
async fn unsupported_reader_feature() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([Action::Protocol(
Protocol::try_new_modern(
[
TableFeature::DeletionVectors,
TableFeature::unknown("unsupportedReaderFeature"),
],
[
TableFeature::DeletionVectors,
TableFeature::ChangeDataFeed,
TableFeature::unknown("unsupportedReaderFeature"),
],
)
.unwrap(),
)])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.try_collect();
assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_))));
}
#[tokio::test]
async fn column_mapping_should_succeed() {
use crate::schema::{ColumnMetadataKey, MetadataValue};
fn cm_field(name: &str, data_type: DataType, id: i64) -> StructField {
StructField::nullable(name, data_type).with_metadata(HashMap::from([
(
ColumnMetadataKey::ColumnMappingId.as_ref().to_string(),
MetadataValue::Number(id),
),
(
ColumnMetadataKey::ColumnMappingPhysicalName
.as_ref()
.to_string(),
MetadataValue::String(name.to_string()),
),
]))
}
let cm_schema = Arc::new(StructType::new_unchecked([
cm_field("id", DataType::INTEGER, 1),
cm_field("value", DataType::STRING, 2),
]));
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
Action::Protocol(
Protocol::try_new_modern(
[TableFeature::DeletionVectors, TableFeature::ColumnMapping],
[
TableFeature::DeletionVectors,
TableFeature::ColumnMapping,
TableFeature::ChangeDataFeed,
],
)
.unwrap(),
),
Action::Metadata(
Metadata::try_new(
None,
None,
cm_schema.clone(),
vec![],
0,
HashMap::from([
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
("delta.columnMapping.mode".to_string(), "id".to_string()),
]),
)
.unwrap(),
),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, cm_schema, None)
.unwrap()
.try_collect();
assert!(res.is_ok(), "CDF should now support column mapping");
}
#[tokio::test]
async fn cdf_disabled_midstream() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table.commit([metadata_with_cdf(get_schema())]).await;
mock_table
.commit([metadata_action(
get_schema(),
HashMap::from([(
"delta.enableChangeDataFeed".to_string(),
"false".to_string(),
)]),
)])
.await;
assert_midstream_failure(engine, &mock_table);
}
#[tokio::test]
async fn unsupported_protocol_feature_midstream() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
protocol_action(2, 6, None, None),
metadata_with_cdf(get_schema()),
])
.await;
mock_table
.commit([protocol_action(
3,
7,
Some(vec![TableFeature::unknown("unsupportedFeature")]),
Some(vec![
TableFeature::unknown("unsupportedFeature"),
TableFeature::ChangeDataFeed,
]),
)])
.await;
assert_midstream_failure(engine, &mock_table);
}
#[tokio::test]
async fn incompatible_schemas_fail() {
async fn assert_incompatible_schema(commit_schema: SchemaRef, cdf_schema: SchemaRef) {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([Action::Metadata(
Metadata::try_new(
None,
None,
commit_schema,
vec![],
0,
HashMap::from([("delta.enableChangeDataFeed".to_string(), "true".to_string())]),
)
.unwrap(),
)])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, cdf_schema, None)
.unwrap()
.try_collect();
assert!(matches!(
res,
Err(Error::ChangeDataFeedIncompatibleSchema(_, _))
));
}
let schema = Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::LONG),
StructField::nullable("value", DataType::STRING),
StructField::nullable("year", DataType::INTEGER),
]));
assert_incompatible_schema(schema, get_schema()).await;
let schema = Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::LONG),
StructField::nullable("value", DataType::STRING),
]));
assert_incompatible_schema(schema, get_schema()).await;
let cdf_schema = Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::LONG),
StructField::nullable("value", DataType::STRING),
]));
let commit_schema = Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
]));
assert_incompatible_schema(cdf_schema, commit_schema).await;
let schema = Arc::new(StructType::new_unchecked([
StructField::not_null("id", DataType::LONG),
StructField::nullable("value", DataType::STRING),
]));
assert_incompatible_schema(schema, get_schema()).await;
let schema = Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::STRING),
StructField::nullable("value", DataType::STRING),
]));
assert_incompatible_schema(schema, get_schema()).await;
let schema = Arc::new(get_schema().project_as_struct(&["id"]).unwrap());
assert_incompatible_schema(schema, get_schema()).await;
}
async fn test_schema_evolution(
initial_schema: SchemaRef,
evolved_schema: SchemaRef,
) -> DeltaResult<Vec<TableChangesScanMetadata>> {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
metadata_with_cdf(initial_schema.clone()),
protocol_action(1, 1, None, None),
])
.await;
mock_table
.commit([Action::Add(Add {
path: "file1.parquet".into(),
data_change: true,
..Default::default()
})])
.await;
mock_table
.commit([metadata_with_cdf(evolved_schema.clone())])
.await;
mock_table
.commit([Action::Add(Add {
path: "file2.parquet".into(),
data_change: true,
..Default::default()
})])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)?.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
table_changes_action_iter(engine, &table_config, commits, evolved_schema, None)?.try_collect()
}
#[tokio::test]
async fn demonstration_schema_evolution_failures() {
let initial = Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
]));
let evolved = Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
StructField::nullable("new_col", DataType::INTEGER),
]));
let res = test_schema_evolution(initial, evolved).await;
assert!(
matches!(res, Err(Error::ChangeDataFeedIncompatibleSchema(_, _))),
"Expected ChangeDataFeedIncompatibleSchema error for adding nullable column"
);
let initial = Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
]));
let evolved = Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::LONG),
StructField::nullable("value", DataType::STRING),
]));
let res = test_schema_evolution(initial, evolved).await;
assert!(
matches!(res, Err(Error::ChangeDataFeedIncompatibleSchema(_, _))),
"Expected ChangeDataFeedIncompatibleSchema error for type widening"
);
let initial = Arc::new(StructType::new_unchecked([
StructField::not_null("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
]));
let evolved = Arc::new(StructType::new_unchecked([
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
]));
let res = test_schema_evolution(initial, evolved).await;
assert!(
matches!(res, Err(Error::ChangeDataFeedIncompatibleSchema(_, _))),
"Expected ChangeDataFeedIncompatibleSchema error for nullability change"
);
}
#[tokio::test]
async fn add_remove() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
Action::Remove(Remove {
path: "fake_path_2".into(),
data_change: true,
..Default::default()
}),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let sv = table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.flat_map(|scan_metadata| {
let scan_metadata = scan_metadata.unwrap();
assert_eq!(scan_metadata.remove_dvs, HashMap::new().into());
scan_metadata.selection_vector
})
.collect_vec();
assert_eq!(sv, &[true, true]);
}
#[tokio::test]
async fn filter_data_change() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
Action::Remove(Remove {
path: "fake_path_1".into(),
data_change: false,
..Default::default()
}),
Action::Remove(Remove {
path: "fake_path_2".into(),
data_change: false,
..Default::default()
}),
Action::Remove(Remove {
path: "fake_path_3".into(),
data_change: false,
..Default::default()
}),
Action::Remove(Remove {
path: "fake_path_4".into(),
data_change: false,
..Default::default()
}),
Action::Add(Add {
path: "fake_path_5".into(),
data_change: false,
..Default::default()
}),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let sv = table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.flat_map(|scan_metadata| {
let scan_metadata = scan_metadata.unwrap();
assert_eq!(scan_metadata.remove_dvs, HashMap::new().into());
scan_metadata.selection_vector
})
.collect_vec();
assert_eq!(sv, &[false; 5]);
}
#[tokio::test]
async fn cdc_selection() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
})])
.await;
mock_table
.commit([
Action::Remove(Remove {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
Action::Cdc(Cdc {
path: "fake_path_3".into(),
..Default::default()
}),
Action::Cdc(Cdc {
path: "fake_path_4".into(),
..Default::default()
}),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let sv = table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.flat_map(|scan_metadata| {
let scan_metadata = scan_metadata.unwrap();
assert_eq!(scan_metadata.remove_dvs, HashMap::new().into());
scan_metadata.selection_vector
})
.collect_vec();
assert_eq!(sv, &[true, false, true, true]);
}
#[tokio::test]
async fn dv() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
let deletion_vector1 = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(),
offset: Some(1),
size_in_bytes: 36,
cardinality: 2,
};
let deletion_vector2 = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "U5OWRz5k%CFT.Td}yCPW".to_string(),
offset: Some(1),
size_in_bytes: 38,
cardinality: 3,
};
mock_table
.commit([
Action::Remove(Remove {
path: "fake_path_1".into(),
data_change: true,
deletion_vector: Some(deletion_vector1.clone()),
..Default::default()
}),
Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
Action::Remove(Remove {
path: "fake_path_2".into(),
data_change: true,
deletion_vector: Some(deletion_vector2.clone()),
..Default::default()
}),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let expected_remove_dvs = HashMap::from([(
"fake_path_1".to_string(),
DvInfo {
deletion_vector: Some(deletion_vector1.clone()),
},
)])
.into();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let sv = table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.flat_map(|scan_metadata| {
let scan_metadata = scan_metadata.unwrap();
assert_eq!(scan_metadata.remove_dvs, expected_remove_dvs);
scan_metadata.selection_vector
})
.collect_vec();
assert_eq!(sv, &[false, true, true]);
}
#[tokio::test]
async fn data_skipping_filter() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
let deletion_vector = Some(DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(),
offset: Some(1),
size_in_bytes: 36,
cardinality: 2,
});
mock_table
.commit([
Action::Remove(Remove {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
Action::Add(Add {
path: "fake_path_1".into(),
stats: Some("{\"numRecords\":4,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":6},\"nullCount\":{\"id\":3}}".into()),
data_change: true,
deletion_vector: deletion_vector.clone(),
..Default::default()
}),
Action::Remove(Remove {
path: "fake_path_2".into(),
data_change: true,
..Default::default()
}),
Action::Add(Add {
path: "fake_path_2".into(),
stats: Some("{\"numRecords\":4,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":3}}".into()),
data_change: true,
deletion_vector,
..Default::default()
}),
Action::Add(Add {
path: "fake_path_3".into(),
stats: Some("{\"numRecords\":4,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":5},\"nullCount\":{\"id\":3}}".into()),
data_change: true,
..Default::default()
}),
])
.await;
let predicate = Predicate::binary(
BinaryPredicateOp::GreaterThan,
column_expr!("id"),
Scalar::from(4),
);
let logical_schema = get_schema();
let predicate =
match PhysicalPredicate::try_new(&predicate, &logical_schema, ColumnMappingMode::None) {
Ok(PhysicalPredicate::Some(p, s)) => Some((p, s)),
other => panic!("Unexpected result: {other:?}"),
};
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let sv = table_changes_action_iter(engine, &table_config, commits, logical_schema, predicate)
.unwrap()
.flat_map(|scan_metadata| {
let scan_metadata = scan_metadata.unwrap();
scan_metadata.selection_vector
})
.collect_vec();
assert_eq!(sv, &[false, true, false, false, true]);
}
#[tokio::test]
async fn failing_protocol() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
let protocol = Protocol::try_new_modern(["fake_feature"], ["fake_feature"]).unwrap();
mock_table
.commit([
Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
Action::Remove(Remove {
path: "fake_path_2".into(),
data_change: true,
..Default::default()
}),
Action::Protocol(protocol),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let res: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.try_collect();
assert_result_error_with_message(
res,
"Change data feed is unsupported for the table at version 0",
);
}
#[tokio::test]
async fn file_meta_timestamp() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
})])
.await;
let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let commit = commits.next().unwrap();
let file_meta_ts = commit.location.last_modified;
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let mut table_config = get_default_table_config(&table_root_url);
let scanner =
LogReplayScanner::try_new(engine.as_ref(), &mut table_config, commit, &get_schema())
.unwrap();
assert_eq!(scanner.timestamp, file_meta_ts);
}
#[tokio::test]
async fn print_table_configuration() {
let tracing_guard = LoggingTest::new();
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
Action::Metadata(
Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.columnMapping.mode".to_string(), "none".to_string()),
]),
)
.unwrap(),
),
Action::Protocol(
Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[TableFeature::DeletionVectors, TableFeature::ChangeDataFeed],
)
.unwrap(),
),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let _scan_batches: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.try_collect();
let log_output = tracing_guard.logs();
assert!(log_output.contains("Table configuration updated during CDF query"));
assert!(log_output.contains("version=0"));
assert!(log_output.contains("id="));
assert!(log_output.contains("writerFeatures=[deletionVectors, changeDataFeed]"));
assert!(log_output.contains("minReaderVersion=3"));
assert!(log_output.contains("minWriterVersion=7"));
assert!(log_output.contains("schemaString={\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"));
assert!(log_output.contains("configuration="));
assert!(log_output.contains("\"delta.enableChangeDataFeed\": \"true\""));
assert!(log_output.contains("\"delta.columnMapping.mode\": \"none\""));
assert!(log_output.contains("\"delta.enableDeletionVectors\": \"true\""));
}
#[tokio::test]
async fn print_table_info_post_phase1() {
let tracing_guard = LoggingTest::new();
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
Action::Metadata(
Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.columnMapping.mode".to_string(), "none".to_string()),
]),
)
.unwrap(),
),
Action::Protocol(
Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[TableFeature::DeletionVectors, TableFeature::ChangeDataFeed],
)
.unwrap(),
),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let _scan_batches: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.try_collect();
let log_output = tracing_guard.logs();
assert!(log_output.contains("Phase 1 of CDF query processing completed"));
assert!(log_output.contains("id="));
assert!(log_output.contains("remove_dvs_size=0"));
assert!(log_output.contains("has_cdc_action=false"));
assert!(log_output.contains("file_path="));
assert!(log_output.contains("version=0"));
assert!(log_output.contains("timestamp="));
}
#[tokio::test]
async fn print_table_info_post_phase1_has_cdc() {
let tracing_guard = LoggingTest::new();
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
Action::Cdc(Cdc {
path: "fake_path_2".into(),
..Default::default()
}),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let _scan_batches: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.try_collect();
let log_output = tracing_guard.logs();
assert!(log_output.contains("Phase 1 of CDF query processing completed"));
assert!(log_output.contains("id="));
assert!(log_output.contains("remove_dvs_size=0"));
assert!(log_output.contains("has_cdc_action=true"));
assert!(log_output.contains("file_path="));
assert!(log_output.contains("version=0"));
assert!(log_output.contains("timestamp="));
}
#[tokio::test]
async fn print_table_info_post_phase1_has_dv() {
let tracing_guard = LoggingTest::new();
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
let deletion_vector1 = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(),
offset: Some(1),
size_in_bytes: 36,
cardinality: 2,
};
let deletion_vector2 = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "U5OWRz5k%CFT.Td}yCPW".to_string(),
offset: Some(1),
size_in_bytes: 38,
cardinality: 3,
};
mock_table
.commit([
Action::Remove(Remove {
path: "fake_path_1".into(),
data_change: true,
deletion_vector: Some(deletion_vector1.clone()),
..Default::default()
}),
Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
Action::Remove(Remove {
path: "fake_path_2".into(),
data_change: true,
deletion_vector: Some(deletion_vector2.clone()),
..Default::default()
}),
])
.await;
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let _scan_batches: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, get_schema(), None)
.unwrap()
.try_collect();
let log_output = tracing_guard.logs();
let expected_remove_dvs: Arc<HashMap<String, DvInfo>> = HashMap::from([(
"fake_path_1".to_string(),
DvInfo {
deletion_vector: Some(deletion_vector1.clone()),
},
)])
.into();
assert!(log_output.contains("Phase 1 of CDF query processing completed"));
assert!(log_output.contains("id="));
assert!(log_output.contains(&format!("remove_dvs_size={}", expected_remove_dvs.len())));
assert!(log_output.contains("has_cdc_action=false"));
assert!(log_output.contains("file_path="));
assert!(log_output.contains("version=0"));
assert!(log_output.contains("timestamp="));
}
#[tokio::test]
async fn test_timestamp_with_ict_enabled() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
Action::CommitInfo(CommitInfo::new(1000, Some(2000), None, None, false)),
Action::Metadata(
Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableInCommitTimestamps".to_string(),
"true".to_string(),
),
]),
)
.unwrap(),
),
Action::Protocol(
Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[
TableFeature::InCommitTimestamp,
TableFeature::ChangeDataFeed,
TableFeature::DeletionVectors,
],
)
.unwrap(),
),
])
.await;
let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let commit = commits.next().unwrap();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let mut table_config = get_default_table_config(&table_root_url);
let scanner =
LogReplayScanner::try_new(engine.as_ref(), &mut table_config, commit, &get_schema())
.unwrap();
assert_eq!(scanner.timestamp, 2000);
}
#[tokio::test]
async fn test_timestamp_with_ict_disabled() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
Action::CommitInfo(CommitInfo::new(1000, Some(2000), None, None, false)),
Action::Metadata(
Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([("delta.enableChangeDataFeed".to_string(), "true".to_string())]),
)
.unwrap(),
),
Action::Protocol(
Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[
TableFeature::InCommitTimestamp,
TableFeature::ChangeDataFeed,
TableFeature::DeletionVectors,
],
)
.unwrap(),
),
])
.await;
let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let commit = commits.next().unwrap();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let mut table_config = get_default_table_config(&table_root_url);
let scanner = LogReplayScanner::try_new(
engine.as_ref(),
&mut table_config,
commit.clone(),
&get_schema(),
)
.unwrap();
assert_ne!(scanner.timestamp, 2000);
assert_eq!(scanner.timestamp, commit.location.last_modified);
}
#[tokio::test]
async fn test_timestamp_with_commit_info_not_first() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
mock_table
.commit([
Action::Metadata(
Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableInCommitTimestamps".to_string(),
"true".to_string(),
),
]),
)
.unwrap(),
),
Action::Protocol(
Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[
TableFeature::InCommitTimestamp,
TableFeature::ChangeDataFeed,
TableFeature::DeletionVectors,
],
)
.unwrap(),
),
Action::CommitInfo(CommitInfo::new(1000, Some(2000), None, None, false)),
])
.await;
let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
let commit = commits.next().unwrap();
let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let mut table_config = get_default_table_config(&table_root_url);
let result =
LogReplayScanner::try_new(engine.as_ref(), &mut table_config, commit, &get_schema());
assert_result_error_with_message(
result,
"In-commit timestamp is enabled but not found in commit at version 0",
);
}