use std::sync::Arc;
use std::sync::LazyLock;
use itertools::Itertools;
use rstest::rstest;
use url::Url;
use crate::actions::visitors::AddVisitor;
use crate::actions::{
get_all_actions_schema, get_commit_schema, Add, Sidecar, ADD_NAME, DOMAIN_METADATA_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME,
};
use crate::arrow::array::StringArray;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::filesystem::ObjectStoreStorageHandler;
use crate::engine::default::DefaultEngineBuilder;
use crate::engine::sync::json::SyncJsonHandler;
use crate::engine::sync::SyncEngine;
use crate::expressions::ColumnName;
use crate::last_checkpoint_hint::LastCheckpointHint;
use crate::log_replay::ActionsBatch;
use crate::log_segment::LogSegment;
use crate::log_segment_files::LogSegmentFiles;
use crate::object_store::{memory::InMemory, path::Path, ObjectStoreExt as _};
use crate::parquet::arrow::ArrowWriter;
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::scan::test_utils::{
add_batch_simple, add_batch_with_remove, sidecar_batch_with_given_paths,
sidecar_batch_with_given_paths_and_sizes,
};
use crate::schema::{DataType, StructField, StructType};
use crate::utils::test_utils::string_array_to_engine_data;
use crate::utils::test_utils::{assert_batch_matches, assert_result_error_with_message, Action};
use crate::{
DeltaResult, Engine as _, EngineData, Expression, FileMeta, JsonHandler, Predicate,
PredicateRef, RowVisitor, StorageHandler,
};
use test_utils::{
compacted_log_path_for_versions, delta_path_for_version, staged_commit_path_for_version,
};
use super::*;
use crate::actions::visitors::SidecarVisitor;
use crate::ParquetHandler;
fn process_sidecars(
parquet_handler: Arc<dyn ParquetHandler>,
log_root: Url,
batch: &dyn EngineData,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<PredicateRef>,
) -> DeltaResult<Option<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>> {
let mut visitor = SidecarVisitor::default();
visitor.visit_rows_of(batch)?;
if visitor.sidecars.is_empty() {
return Ok(None);
}
let sidecar_files: Vec<_> = visitor
.sidecars
.iter()
.map(|sidecar| sidecar.to_filemeta(&log_root))
.try_collect()?;
Ok(Some(parquet_handler.read_parquet_files(
&sidecar_files,
checkpoint_read_schema,
meta_predicate,
)?))
}
fn delta_path_for_multipart_checkpoint(version: u64, part_num: u32, num_parts: u32) -> Path {
let path =
format!("_delta_log/{version:020}.checkpoint.{part_num:010}.{num_parts:010}.parquet");
Path::from(path.as_str())
}
async fn build_log_with_paths_and_checkpoint(
paths: &[Path],
checkpoint_metadata: Option<&LastCheckpointHint>,
) -> (Box<dyn StorageHandler>, Url) {
let store = Arc::new(InMemory::new());
let data = bytes::Bytes::from("kernel-data");
for path in paths {
store
.put(path, data.clone().into())
.await
.expect("put log file in store");
}
if let Some(checkpoint_metadata) = checkpoint_metadata {
let checkpoint_str =
serde_json::to_string(checkpoint_metadata).expect("Serialize checkpoint");
store
.put(
&Path::from("_delta_log/_last_checkpoint"),
checkpoint_str.into(),
)
.await
.expect("Write _last_checkpoint");
}
let storage =
ObjectStoreStorageHandler::new(store, Arc::new(TokioBackgroundExecutor::new()), None);
let table_root = Url::parse("memory:///").expect("valid url");
let log_root = table_root.join("_delta_log/").unwrap();
(Box::new(storage), log_root)
}
fn new_in_memory_store() -> (Arc<InMemory>, Url) {
(
Arc::new(InMemory::new()),
Url::parse("memory:///")
.unwrap()
.join("_delta_log/")
.unwrap(),
)
}
async fn write_parquet_to_store(
store: &Arc<InMemory>,
path: String,
data: Box<dyn EngineData>,
) -> DeltaResult<()> {
let batch = ArrowEngineData::try_from_engine_data(data)?;
let record_batch = batch.record_batch();
let mut buffer = vec![];
let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?;
writer.write(record_batch)?;
writer.close()?;
store.put(&Path::from(path), buffer.into()).await?;
Ok(())
}
pub(crate) async fn add_checkpoint_to_store(
store: &Arc<InMemory>,
data: Box<dyn EngineData>,
filename: &str,
) -> DeltaResult<()> {
let path = format!("_delta_log/{filename}");
write_parquet_to_store(store, path, data).await
}
async fn add_sidecar_to_store(
store: &Arc<InMemory>,
data: Box<dyn EngineData>,
filename: &str,
) -> DeltaResult<FileMeta> {
let path = format!("_delta_log/_sidecars/{filename}");
write_parquet_to_store(store, path.clone(), data).await?;
let size = get_file_size(store, &path).await;
let location = Url::parse(&format!("memory:///{path}")).expect("valid url");
Ok(FileMeta {
location,
last_modified: 0,
size,
})
}
async fn write_json_to_store(
store: &Arc<InMemory>,
actions: Vec<Action>,
filename: &str,
) -> DeltaResult<()> {
let json_lines: Vec<String> = actions
.into_iter()
.map(|action| serde_json::to_string(&action).expect("action to string"))
.collect();
let content = json_lines.join("\n");
let checkpoint_path = format!("_delta_log/{filename}");
store
.put(&Path::from(checkpoint_path), content.into())
.await?;
Ok(())
}
fn create_log_path(path: &str) -> ParsedLogPath<FileMeta> {
create_log_path_with_size(path, 0)
}
fn create_log_path_with_size(path: &str, size: u64) -> ParsedLogPath<FileMeta> {
ParsedLogPath::try_from(FileMeta {
location: Url::parse(path).expect("Invalid file URL"),
last_modified: 0,
size,
})
.unwrap()
.unwrap()
}
async fn get_file_size(store: &Arc<InMemory>, path: &str) -> u64 {
let object_meta = store.head(&Path::from(path)).await.unwrap();
object_meta.size
}
#[tokio::test]
async fn build_snapshot_with_uuid_checkpoint_parquet() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], None,
None,
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 5);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![6, 7];
assert_eq!(versions, expected_versions);
}
#[tokio::test]
async fn build_snapshot_with_uuid_checkpoint_json() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], None,
None,
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 5);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![6, 7];
assert_eq!(versions, expected_versions);
}
#[tokio::test]
async fn build_snapshot_with_correct_last_uuid_checkpoint() {
let checkpoint_metadata = LastCheckpointHint {
version: 5,
size: 10,
parts: Some(1),
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
tags: None,
};
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], Some(checkpoint_metadata),
None,
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(commit_files.len(), 2);
assert_eq!(checkpoint_parts[0].version, 5);
assert_eq!(commit_files[0].version, 6);
assert_eq!(commit_files[1].version, 7);
}
#[tokio::test]
async fn build_snapshot_with_multiple_incomplete_multipart_checkpoints() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_multipart_checkpoint(1, 1, 3),
delta_path_for_multipart_checkpoint(1, 3, 3),
delta_path_for_multipart_checkpoint(2, 1, 2),
delta_path_for_version(2, "json"),
delta_path_for_multipart_checkpoint(3, 1, 3),
delta_path_for_multipart_checkpoint(3, 3, 3),
delta_path_for_multipart_checkpoint(3, 1, 4),
delta_path_for_multipart_checkpoint(3, 2, 4),
delta_path_for_multipart_checkpoint(3, 3, 4),
delta_path_for_multipart_checkpoint(3, 4, 4),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], None,
None,
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 4);
assert_eq!(checkpoint_parts[0].version, 3);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
assert_eq!(versions, expected_versions);
}
#[tokio::test]
async fn build_snapshot_with_out_of_date_last_checkpoint() {
let checkpoint_metadata = LastCheckpointHint {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
tags: None,
};
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], Some(checkpoint_metadata),
None,
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(commit_files.len(), 2);
assert_eq!(checkpoint_parts[0].version, 5);
assert_eq!(commit_files[0].version, 6);
assert_eq!(commit_files[1].version, 7);
}
#[tokio::test]
async fn build_snapshot_with_correct_last_multipart_checkpoint() {
let checkpoint_metadata = LastCheckpointHint {
version: 5,
size: 10,
parts: Some(3),
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
tags: None,
};
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
delta_path_for_multipart_checkpoint(5, 2, 3),
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], Some(checkpoint_metadata),
None,
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 3);
assert_eq!(commit_files.len(), 2);
assert_eq!(checkpoint_parts[0].version, 5);
assert_eq!(commit_files[0].version, 6);
assert_eq!(commit_files[1].version, 7);
}
#[tokio::test]
async fn build_snapshot_with_missing_checkpoint_part_from_hint_fails() {
let checkpoint_metadata = LastCheckpointHint {
version: 5,
size: 10,
parts: Some(3),
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
tags: None,
};
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], Some(checkpoint_metadata),
None,
);
assert_result_error_with_message(
log_segment,
"Invalid Checkpoint: Had a _last_checkpoint hint but didn't find any checkpoints",
)
}
#[tokio::test]
async fn build_snapshot_with_bad_checkpoint_hint_fails() {
let checkpoint_metadata = LastCheckpointHint {
version: 5,
size: 10,
parts: Some(1),
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
tags: None,
};
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 2),
delta_path_for_multipart_checkpoint(5, 2, 2),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], Some(checkpoint_metadata),
None,
);
assert_result_error_with_message(
log_segment,
"Invalid Checkpoint: _last_checkpoint indicated that checkpoint should have 1 parts, but \
it has 2",
)
}
#[tokio::test]
async fn build_snapshot_with_missing_checkpoint_part_no_hint() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], None,
None,
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
assert_eq!(versions, expected_versions);
}
#[tokio::test]
async fn build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpoint() {
let checkpoint_metadata = LastCheckpointHint {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
tags: None,
};
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], Some(checkpoint_metadata),
None,
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
assert_eq!(versions, expected_versions);
}
#[tokio::test]
async fn build_snapshot_without_checkpoints() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root.clone(),
vec![], None,
None,
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 5);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![6, 7];
assert_eq!(versions, expected_versions);
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], None,
Some(2),
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 1);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![2];
assert_eq!(versions, expected_versions);
}
#[tokio::test]
async fn build_snapshot_with_checkpoint_greater_than_time_travel_version() {
let checkpoint_metadata = LastCheckpointHint {
version: 5,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
tags: None,
};
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], Some(checkpoint_metadata),
Some(4),
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);
assert_eq!(commit_files.len(), 1);
assert_eq!(commit_files[0].version, 4);
}
#[tokio::test]
async fn build_snapshot_with_start_checkpoint_and_time_travel_version() {
let checkpoint_metadata = LastCheckpointHint {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
tags: None,
};
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], Some(checkpoint_metadata),
Some(4),
)
.unwrap();
assert_eq!(log_segment.listed.checkpoint_parts[0].version, 3);
assert_eq!(log_segment.listed.ascending_commit_files.len(), 1);
assert_eq!(log_segment.listed.ascending_commit_files[0].version, 4);
}
#[rstest::rstest]
#[case::no_hint(None)]
#[case::stale_hint(Some(LastCheckpointHint {
version: 10, // stale: 10 > end_version 5, so it is discarded
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
tags: None,
}))]
#[tokio::test]
async fn build_snapshot_time_travel_no_checkpoint_falls_back_to_v0(
#[case] hint: Option<LastCheckpointHint>,
) {
let paths: Vec<Path> = (0..=5).map(|v| delta_path_for_version(v, "json")).collect();
let (storage, log_root) = build_log_with_paths_and_checkpoint(&paths, None).await;
let log_segment =
LogSegment::for_snapshot_impl(storage.as_ref(), log_root, vec![], hint, Some(5)).unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 0);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
assert_eq!(versions, vec![0, 1, 2, 3, 4, 5]);
}
#[tokio::test]
async fn build_snapshot_time_travel_no_hint_checkpoint_at_end_version_included() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
],
None,
)
.await;
let log_segment =
LogSegment::for_snapshot_impl(storage.as_ref(), log_root, vec![], None, Some(5)).unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 5);
assert_eq!(commit_files.len(), 0);
}
#[tokio::test]
async fn build_table_changes_with_commit_versions() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment =
LogSegment::for_table_changes(storage.as_ref(), log_root.clone(), 2, 5).unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 0);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = (2..=5).collect_vec();
assert_eq!(versions, expected_versions);
let log_segment =
LogSegment::for_table_changes(storage.as_ref(), log_root.clone(), 0, Some(0)).unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 0);
assert_eq!(commit_files.len(), 1);
assert_eq!(commit_files[0].version, 0);
let log_segment = LogSegment::for_table_changes(storage.as_ref(), log_root, 0, None).unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 0);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = (0..=7).collect_vec();
assert_eq!(versions, expected_versions);
}
#[tokio::test]
async fn test_non_contiguous_log() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(2, "json"),
],
None,
)
.await;
let log_segment_res =
LogSegment::for_table_changes(storage.as_ref(), log_root.clone(), 0, None);
let expected_error_pattern = "Generic delta kernel error: Expected contiguous commit files, \
but found gap: ParsedLogPath { location: FileMeta { location: Url { scheme: \"memory\", \
cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \
\"/_delta_log/00000000000000000000.json\", query: None, fragment: None }, last_modified:";
assert_result_error_with_message(log_segment_res, expected_error_pattern);
let log_segment_res =
LogSegment::for_table_changes(storage.as_ref(), log_root.clone(), 1, None);
assert_result_error_with_message(
log_segment_res,
"Generic delta kernel error: Expected the first commit to have version 1",
);
let log_segment_res = LogSegment::for_table_changes(storage.as_ref(), log_root, 0, Some(1));
assert_result_error_with_message(
log_segment_res,
"Generic delta kernel error: LogSegment end version 0 not the same as the specified end \
version 1",
);
}
#[tokio::test]
async fn table_changes_fails_with_larger_start_version_than_end() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
],
None,
)
.await;
let log_segment_res = LogSegment::for_table_changes(storage.as_ref(), log_root, 1, Some(0));
assert_result_error_with_message(log_segment_res, "Generic delta kernel error: Failed to build LogSegment: start_version cannot be greater than end_version");
}
#[test_log::test(rstest::rstest)]
#[case::simple_path("example.parquet", "file:///var/_delta_log/_sidecars/example.parquet")]
#[case::full_path(
"file:///var/_delta_log/_sidecars/example.parquet",
"file:///var/_delta_log/_sidecars/example.parquet"
)]
#[case::nested_path(
"test/test/example.parquet",
"file:///var/_delta_log/_sidecars/test/test/example.parquet"
)]
fn test_sidecar_to_filemeta_valid_paths(
#[case] input_path: &str,
#[case] expected_url: &str,
) -> DeltaResult<()> {
let log_root = Url::parse("file:///var/_delta_log/")?;
let sidecar = Sidecar {
path: expected_url.to_string(),
modification_time: 0,
size_in_bytes: 1000,
tags: None,
};
let filemeta = sidecar.to_filemeta(&log_root)?;
assert_eq!(
filemeta.location.as_str(),
expected_url,
"Mismatch for input path: {input_path}"
);
Ok(())
}
#[test]
fn test_checkpoint_batch_with_no_sidecars_returns_none() -> DeltaResult<()> {
let (_, log_root) = new_in_memory_store();
let engine = Arc::new(SyncEngine::new());
let checkpoint_batch = add_batch_simple(get_all_actions_schema().clone());
let mut iter = process_sidecars(
engine.parquet_handler(),
log_root,
checkpoint_batch.as_ref(),
get_all_actions_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?,
None,
)?
.into_iter()
.flatten();
assert!(iter.next().is_none());
Ok(())
}
#[tokio::test]
async fn test_checkpoint_batch_with_sidecars_returns_sidecar_batches() -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
let read_schema = get_all_actions_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?;
let sidecar1_size = add_sidecar_to_store(
&store,
add_batch_simple(read_schema.clone()),
"sidecarfile1.parquet",
)
.await?
.size;
let sidecar2_size = add_sidecar_to_store(
&store,
add_batch_with_remove(read_schema.clone()),
"sidecarfile2.parquet",
)
.await?
.size;
let checkpoint_batch = sidecar_batch_with_given_paths_and_sizes(
vec![
("sidecarfile1.parquet", sidecar1_size),
("sidecarfile2.parquet", sidecar2_size),
],
read_schema.clone(),
);
let mut iter = process_sidecars(
engine.parquet_handler(),
log_root,
checkpoint_batch.as_ref(),
read_schema.clone(),
None,
)?
.into_iter()
.flatten();
assert_batch_matches(iter.next().unwrap()?, add_batch_simple(read_schema.clone()));
assert_batch_matches(iter.next().unwrap()?, add_batch_with_remove(read_schema));
assert!(iter.next().is_none());
Ok(())
}
#[test]
fn test_checkpoint_batch_with_sidecar_files_that_do_not_exist() -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
let checkpoint_batch = sidecar_batch_with_given_paths(
vec!["sidecarfile1.parquet", "sidecarfile2.parquet"],
get_all_actions_schema().clone(),
);
let mut iter = process_sidecars(
engine.parquet_handler(),
log_root,
checkpoint_batch.as_ref(),
get_all_actions_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?,
None,
)?
.into_iter()
.flatten();
let err = iter.next().unwrap();
assert_result_error_with_message(err, "Arrow error: External: Object at location _delta_log/_sidecars/sidecarfile1.parquet not found: No data in memory found. Location: _delta_log/_sidecars/sidecarfile1.parquet");
Ok(())
}
#[tokio::test]
async fn test_reading_sidecar_files_with_predicate() -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
let read_schema = get_all_actions_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?;
let sidecar_size = add_sidecar_to_store(
&store,
add_batch_simple(read_schema.clone()),
"sidecarfile1.parquet",
)
.await?
.size;
let checkpoint_batch = sidecar_batch_with_given_paths_and_sizes(
vec![("sidecarfile1.parquet", sidecar_size)],
read_schema.clone(),
);
let remove_predicate: LazyLock<Option<PredicateRef>> = LazyLock::new(|| {
Some(Arc::new(
Expression::column([REMOVE_NAME, "path"]).is_not_null(),
))
});
let mut iter = process_sidecars(
engine.parquet_handler(),
log_root,
checkpoint_batch.as_ref(),
read_schema.clone(),
remove_predicate.clone(),
)?
.into_iter()
.flatten();
assert!(iter.next().is_none());
Ok(())
}
#[tokio::test]
async fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_no_file_actions(
) -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
add_checkpoint_to_store(
&store,
sidecar_batch_with_given_paths(vec!["sidecar1.parquet"], get_commit_schema().clone()),
"00000000000000000001.checkpoint.parquet",
)
.await?;
let checkpoint_one_file = log_root
.join("00000000000000000001.checkpoint.parquet")?
.to_string();
let v2_checkpoint_read_schema = get_commit_schema().project(&[METADATA_NAME])?;
let log_segment = LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![create_log_path(&checkpoint_one_file)],
latest_commit_file: Some(create_log_path("file:///00000000000000000001.json")),
..Default::default()
},
log_root,
None,
None,
)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None, None, None, )?;
let mut iter = checkpoint_result.actions;
let ActionsBatch {
actions: first_batch,
is_log_batch,
} = iter.next().unwrap()?;
assert!(!is_log_batch);
assert_batch_matches(
first_batch,
sidecar_batch_with_given_paths(vec!["sidecar1.parquet"], v2_checkpoint_read_schema),
);
assert!(iter.next().is_none());
Ok(())
}
#[tokio::test]
async fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_multi_part(
) -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
let checkpoint_part_1 = "00000000000000000001.checkpoint.0000000001.0000000002.parquet";
let checkpoint_part_2 = "00000000000000000001.checkpoint.0000000002.0000000002.parquet";
add_checkpoint_to_store(
&store,
sidecar_batch_with_given_paths(vec!["sidecar1.parquet"], get_all_actions_schema().clone()),
checkpoint_part_1,
)
.await?;
add_checkpoint_to_store(
&store,
sidecar_batch_with_given_paths(vec!["sidecar2.parquet"], get_all_actions_schema().clone()),
checkpoint_part_2,
)
.await?;
let cp1_size = get_file_size(&store, &format!("_delta_log/{checkpoint_part_1}")).await;
let cp2_size = get_file_size(&store, &format!("_delta_log/{checkpoint_part_2}")).await;
let checkpoint_one_file = log_root.join(checkpoint_part_1)?.to_string();
let checkpoint_two_file = log_root.join(checkpoint_part_2)?.to_string();
let v2_checkpoint_read_schema = get_commit_schema().project(&[ADD_NAME])?;
let log_segment = LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![
create_log_path_with_size(&checkpoint_one_file, cp1_size),
create_log_path_with_size(&checkpoint_two_file, cp2_size),
],
latest_commit_file: Some(create_log_path("file:///00000000000000000001.json")),
..Default::default()
},
log_root,
None,
None,
)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None, None, None, )?;
let mut iter = checkpoint_result.actions;
for expected_sidecar in ["sidecar1.parquet", "sidecar2.parquet"].iter() {
let ActionsBatch {
actions: batch,
is_log_batch,
} = iter.next().unwrap()?;
assert!(!is_log_batch);
assert_batch_matches(
batch,
sidecar_batch_with_given_paths(
vec![expected_sidecar],
v2_checkpoint_read_schema.clone(),
),
);
}
assert!(iter.next().is_none());
Ok(())
}
#[tokio::test]
async fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars(
) -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
add_checkpoint_to_store(
&store,
add_batch_simple(get_commit_schema().clone()),
"00000000000000000001.checkpoint.parquet",
)
.await?;
let checkpoint_one_file = log_root
.join("00000000000000000001.checkpoint.parquet")?
.to_string();
let checkpoint_size =
get_file_size(&store, "_delta_log/00000000000000000001.checkpoint.parquet").await;
let v2_checkpoint_read_schema = get_all_actions_schema().project(&[ADD_NAME, SIDECAR_NAME])?;
let log_segment = LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![create_log_path_with_size(
&checkpoint_one_file,
checkpoint_size,
)],
latest_commit_file: Some(create_log_path("file:///00000000000000000001.json")),
..Default::default()
},
log_root,
None,
None,
)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None, None, None, )?;
let mut iter = checkpoint_result.actions;
let ActionsBatch {
actions: first_batch,
is_log_batch,
} = iter.next().unwrap()?;
assert!(!is_log_batch);
assert_batch_matches(first_batch, add_batch_simple(v2_checkpoint_read_schema));
assert!(iter.next().is_none());
Ok(())
}
#[tokio::test]
async fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars(
) -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
let filename = "00000000000000000010.checkpoint.80a083e8-7026-4e79-81be-64bd76c43a11.json";
write_json_to_store(
&store,
vec![Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
})],
filename,
)
.await?;
let checkpoint_one_file = log_root.join(filename)?.to_string();
let v2_checkpoint_read_schema = get_all_actions_schema().project(&[ADD_NAME, SIDECAR_NAME])?;
let log_segment = LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![create_log_path(&checkpoint_one_file)],
latest_commit_file: Some(create_log_path("file:///00000000000000000001.json")),
..Default::default()
},
log_root,
None,
None,
)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema,
None, None, None, )?;
let mut iter = checkpoint_result.actions;
let ActionsBatch {
actions: first_batch,
is_log_batch,
} = iter.next().unwrap()?;
assert!(!is_log_batch);
let mut visitor = AddVisitor::default();
visitor.visit_rows_of(&*first_batch)?;
assert!(visitor.adds.len() == 1);
assert!(visitor.adds[0].path == "fake_path_1");
assert!(iter.next().is_none());
Ok(())
}
#[tokio::test]
async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar_batches(
) -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
let sidecar1_size = add_sidecar_to_store(
&store,
add_batch_simple(get_commit_schema().project(&[ADD_NAME, REMOVE_NAME])?),
"sidecarfile1.parquet",
)
.await?
.size;
let sidecar2_size = add_sidecar_to_store(
&store,
add_batch_with_remove(get_commit_schema().project(&[ADD_NAME, REMOVE_NAME])?),
"sidecarfile2.parquet",
)
.await?
.size;
add_checkpoint_to_store(
&store,
sidecar_batch_with_given_paths_and_sizes(
vec![
("sidecarfile1.parquet", sidecar1_size),
("sidecarfile2.parquet", sidecar2_size),
],
get_all_actions_schema().clone(),
),
"00000000000000000001.checkpoint.parquet",
)
.await?;
let checkpoint_file_path = log_root
.join("00000000000000000001.checkpoint.parquet")?
.to_string();
let checkpoint_size =
get_file_size(&store, "_delta_log/00000000000000000001.checkpoint.parquet").await;
let v2_checkpoint_read_schema = get_all_actions_schema().project(&[ADD_NAME, SIDECAR_NAME])?;
let log_segment = LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![create_log_path_with_size(
&checkpoint_file_path,
checkpoint_size,
)],
latest_commit_file: Some(create_log_path("file:///00000000000000000001.json")),
..Default::default()
},
log_root,
None,
None,
)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None, None, None, )?;
let mut iter = checkpoint_result.actions;
let ActionsBatch {
actions: first_batch,
is_log_batch,
} = iter.next().unwrap()?;
assert!(!is_log_batch);
assert_batch_matches(
first_batch,
sidecar_batch_with_given_paths_and_sizes(
vec![
("sidecarfile1.parquet", sidecar1_size),
("sidecarfile2.parquet", sidecar2_size),
],
get_all_actions_schema().project(&[ADD_NAME, SIDECAR_NAME])?,
),
);
let ActionsBatch {
actions: second_batch,
is_log_batch,
} = iter.next().unwrap()?;
assert!(!is_log_batch);
assert_batch_matches(
second_batch,
add_batch_simple(v2_checkpoint_read_schema.clone()),
);
let ActionsBatch {
actions: third_batch,
is_log_batch,
} = iter.next().unwrap()?;
assert!(!is_log_batch);
assert_batch_matches(
third_batch,
add_batch_with_remove(v2_checkpoint_read_schema),
);
assert!(iter.next().is_none());
Ok(())
}
#[derive(Default)]
struct LogSegmentConfig<'a> {
published_commit_versions: &'a [u64],
staged_commit_versions: &'a [u64],
compaction_versions: &'a [(u64, u64)],
checkpoint_version: Option<u64>,
version_to_load: Option<u64>,
}
async fn create_segment_for(segment: LogSegmentConfig<'_>) -> LogSegment {
let mut paths: Vec<Path> = segment
.published_commit_versions
.iter()
.map(|version| delta_path_for_version(*version, "json"))
.chain(
segment
.compaction_versions
.iter()
.map(|(start, end)| compacted_log_path_for_versions(*start, *end, "json")),
)
.collect();
if let Some(version) = segment.checkpoint_version {
paths.push(delta_path_for_version(
version,
"checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json",
));
}
let (storage, log_root) = build_log_with_paths_and_checkpoint(&paths, None).await;
let table_root = Url::parse("memory:///").expect("valid url");
let staged_commits_log_tail: Vec<ParsedLogPath> = segment
.staged_commit_versions
.iter()
.map(|version| staged_commit_path_for_version(*version))
.map(|path| {
ParsedLogPath::try_from(FileMeta {
location: table_root.join(path.as_ref()).unwrap(),
last_modified: 0,
size: 0,
})
.unwrap()
.unwrap()
})
.collect();
LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root.clone(),
staged_commits_log_tail,
None,
segment.version_to_load,
)
.unwrap()
}
#[tokio::test]
async fn test_list_log_files_with_version() -> DeltaResult<()> {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(0, "crc"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "crc"),
delta_path_for_version(2, "json"),
],
None,
)
.await;
let result = LogSegmentFiles::list(
storage.as_ref(),
&log_root,
vec![], Some(0),
None,
)?;
let latest_crc = result.latest_crc_file.unwrap();
assert_eq!(
latest_crc.location.location.path(),
"/_delta_log/00000000000000000001.crc".to_string()
);
assert_eq!(latest_crc.version, 1);
assert_eq!(latest_crc.filename, "00000000000000000001.crc".to_string());
assert_eq!(latest_crc.extension, "crc".to_string());
assert_eq!(latest_crc.file_type, LogPathFileType::Crc);
Ok(())
}
async fn test_compaction_listing(
commit_versions: &[u64],
compaction_versions: &[(u64, u64)],
checkpoint_version: Option<u64>,
version_to_load: Option<u64>,
) {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: commit_versions,
compaction_versions,
checkpoint_version,
version_to_load,
..Default::default()
})
.await;
let version_to_load = version_to_load.unwrap_or(u64::MAX);
let checkpoint_cuttoff = checkpoint_version.map(|v| v as i64).unwrap_or(-1);
let expected_commit_versions: Vec<&u64> = commit_versions
.iter()
.filter(|v| **v as i64 > checkpoint_cuttoff && **v <= version_to_load)
.collect();
let expected_compaction_versions: Vec<&(u64, u64)> = compaction_versions
.iter()
.filter(|(start, end)| *start as i64 > checkpoint_cuttoff && *end <= version_to_load)
.collect();
assert_eq!(
log_segment.listed.ascending_commit_files.len(),
expected_commit_versions.len()
);
assert_eq!(
log_segment.listed.ascending_compaction_files.len(),
expected_compaction_versions.len()
);
for (commit_file, expected_version) in log_segment
.listed
.ascending_commit_files
.iter()
.zip(expected_commit_versions.iter())
{
assert!(commit_file.is_commit());
assert_eq!(commit_file.version, **expected_version);
}
for (compaction_file, (expected_start, expected_end)) in log_segment
.listed
.ascending_compaction_files
.iter()
.zip(expected_compaction_versions.iter())
{
assert!(matches!(
compaction_file.file_type,
LogPathFileType::CompactedCommit { .. }
));
assert_eq!(compaction_file.version, *expected_start);
if let LogPathFileType::CompactedCommit { hi } = compaction_file.file_type {
assert_eq!(hi, *expected_end);
} else {
panic!("File was compaction but type was not CompactedCommit");
}
}
}
#[tokio::test]
async fn test_compaction_simple() {
test_compaction_listing(
&[0, 1, 2],
&[(1, 2)],
None, None, )
.await;
}
#[tokio::test]
async fn test_compaction_in_version_range() {
test_compaction_listing(
&[0, 1, 2, 3],
&[(1, 2)],
None, Some(2), )
.await;
}
#[tokio::test]
async fn test_compaction_out_of_version_range() {
test_compaction_listing(
&[0, 1, 2, 3, 4],
&[(1, 3)],
None, Some(2), )
.await;
}
#[tokio::test]
async fn test_multi_compaction() {
test_compaction_listing(
&[0, 1, 2, 3, 4, 5],
&[(1, 2), (3, 5)],
None, None, )
.await;
}
#[tokio::test]
async fn test_multi_compaction_one_out_of_range() {
test_compaction_listing(
&[0, 1, 2, 3, 4, 5],
&[(1, 2), (3, 5)],
None, Some(4), )
.await;
}
#[tokio::test]
async fn test_compaction_with_checkpoint() {
test_compaction_listing(
&[0, 1, 2, 4, 5],
&[(1, 2), (4, 5)],
Some(3), None, )
.await;
}
#[tokio::test]
async fn test_compaction_to_early_with_checkpoint() {
test_compaction_listing(
&[0, 1, 2, 4, 5],
&[(1, 2)],
Some(3), None, )
.await;
}
#[tokio::test]
async fn test_compaction_starts_at_checkpoint() {
test_compaction_listing(
&[0, 1, 2, 4, 5],
&[(3, 5)],
Some(3), None, )
.await;
}
enum ExpectedFile {
Commit(Version),
Compaction(Version, Version),
}
async fn test_commit_cover(
commit_versions: &[u64],
compaction_versions: &[(u64, u64)],
checkpoint_version: Option<u64>,
version_to_load: Option<u64>,
expected_files: &[ExpectedFile],
) {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: commit_versions,
compaction_versions,
checkpoint_version,
version_to_load,
..Default::default()
})
.await;
let cover = log_segment.find_commit_cover();
let expected_locations = expected_files.iter().map(|ef| match ef {
ExpectedFile::Commit(version) => log_segment
.log_root
.join(&format!("{version:020}.json"))
.expect("Couldn't join"),
ExpectedFile::Compaction(lo, hi) => log_segment
.log_root
.join(&format!("{lo:020}.{hi:020}.compacted.json"))
.expect("Couldn't join"),
});
assert_eq!(cover.len(), expected_locations.len());
for (location, expected_location) in cover.iter().zip(expected_locations) {
assert_eq!(location.location, expected_location);
}
}
#[tokio::test]
async fn test_commit_cover_one_compaction() {
test_commit_cover(
&[0, 1, 2],
&[(1, 2)],
None, None, &[ExpectedFile::Compaction(1, 2), ExpectedFile::Commit(0)],
)
.await;
}
#[tokio::test]
async fn test_commit_cover_in_version_range() {
test_commit_cover(
&[0, 1, 2, 3],
&[(1, 2)],
None, Some(2), &[ExpectedFile::Compaction(1, 2), ExpectedFile::Commit(0)],
)
.await;
}
#[tokio::test]
async fn test_commit_cover_out_of_version_range() {
test_commit_cover(
&[0, 1, 2, 3, 4],
&[(1, 3)],
None, Some(2), &[
ExpectedFile::Commit(2),
ExpectedFile::Commit(1),
ExpectedFile::Commit(0),
],
)
.await;
}
#[tokio::test]
async fn test_commit_cover_multi_compaction() {
test_commit_cover(
&[0, 1, 2, 3, 4, 5],
&[(1, 2), (3, 5)],
None, None, &[
ExpectedFile::Compaction(3, 5),
ExpectedFile::Compaction(1, 2),
ExpectedFile::Commit(0),
],
)
.await;
}
#[tokio::test]
async fn test_commit_cover_multi_compaction_one_out_of_range() {
test_commit_cover(
&[0, 1, 2, 3, 4, 5],
&[(1, 2), (3, 5)],
None, Some(4), &[
ExpectedFile::Commit(4),
ExpectedFile::Commit(3),
ExpectedFile::Compaction(1, 2),
ExpectedFile::Commit(0),
],
)
.await;
}
#[tokio::test]
async fn test_commit_cover_compaction_with_checkpoint() {
test_commit_cover(
&[0, 1, 2, 4, 5],
&[(1, 2), (4, 5)],
Some(3), None, &[ExpectedFile::Compaction(4, 5)],
)
.await;
}
#[tokio::test]
async fn test_commit_cover_too_early_with_checkpoint() {
test_commit_cover(
&[0, 1, 2, 4, 5],
&[(1, 2)],
Some(3), None, &[ExpectedFile::Commit(5), ExpectedFile::Commit(4)],
)
.await;
}
#[tokio::test]
async fn test_commit_cover_starts_at_checkpoint() {
test_commit_cover(
&[0, 1, 2, 4, 5],
&[(3, 5)],
Some(3), None, &[ExpectedFile::Commit(5), ExpectedFile::Commit(4)],
)
.await;
}
#[tokio::test]
async fn test_commit_cover_wider_range() {
test_commit_cover(
&Vec::from_iter(0..20),
&[(0, 5), (0, 10), (5, 10), (13, 19)],
None, None, &[
ExpectedFile::Compaction(13, 19),
ExpectedFile::Commit(12),
ExpectedFile::Commit(11),
ExpectedFile::Compaction(0, 10),
],
)
.await;
}
#[tokio::test]
async fn test_commit_cover_no_compactions() {
test_commit_cover(
&Vec::from_iter(0..4),
&[],
None, None, &[
ExpectedFile::Commit(3),
ExpectedFile::Commit(2),
ExpectedFile::Commit(1),
ExpectedFile::Commit(0),
],
)
.await;
}
#[tokio::test]
async fn test_commit_cover_minimal_overlap() {
test_commit_cover(
&Vec::from_iter(0..6),
&[(0, 2), (2, 5)],
None, None, &[
ExpectedFile::Commit(5),
ExpectedFile::Commit(4),
ExpectedFile::Commit(3),
ExpectedFile::Compaction(0, 2),
],
)
.await;
}
#[test]
fn test_validate_listed_log_file_in_order_compaction_files() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
ascending_commit_files: vec![create_log_path(
"file:///_delta_log/00000000000000000001.json",
)],
ascending_compaction_files: vec![
create_log_path(
"file:///_delta_log/00000000000000000000.00000000000000000004.compacted.json",
),
create_log_path(
"file:///_delta_log/00000000000000000001.00000000000000000002.compacted.json",
),
],
..Default::default()
},
log_root,
None,
None,
)
.is_ok());
}
#[test]
fn test_validate_listed_log_file_out_of_order_compaction_files() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
ascending_commit_files: vec![create_log_path(
"file:///_delta_log/00000000000000000001.json",
)],
ascending_compaction_files: vec![
create_log_path(
"file:///_delta_log/00000000000000000000.00000000000000000004.compacted.json",
),
create_log_path(
"file:///_delta_log/00000000000000000000.00000000000000000003.compacted.json",
),
],
..Default::default()
},
log_root,
None,
None,
)
.is_err());
}
#[test]
fn test_validate_listed_log_file_different_multipart_checkpoint_versions() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![
create_log_path(
"file:///_delta_log/00000000000000000010.checkpoint.0000000001.0000000002.parquet",
),
create_log_path(
"file:///_delta_log/00000000000000000011.checkpoint.0000000002.0000000002.parquet",
),
],
..Default::default()
},
log_root,
None,
None,
)
.is_err());
}
#[test]
fn test_validate_listed_log_file_out_of_order_commit_files() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
ascending_commit_files: vec![
create_log_path("file:///_delta_log/00000000000000000003.json"),
create_log_path("file:///_delta_log/00000000000000000001.json"),
],
..Default::default()
},
log_root,
None,
None,
)
.is_err());
}
#[test]
fn test_validate_listed_log_file_checkpoint_parts_contains_non_checkpoint() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![create_log_path(
"file:///_delta_log/00000000000000000010.json",
)],
..Default::default()
},
log_root,
None,
None,
)
.is_err());
}
#[test]
fn test_validate_listed_log_file_multipart_checkpoint_part_count_mismatch() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![
create_log_path(
"file:///_delta_log/00000000000000000010.checkpoint.0000000001.0000000003.parquet",
),
create_log_path(
"file:///_delta_log/00000000000000000010.checkpoint.0000000002.0000000003.parquet",
),
],
..Default::default()
},
log_root,
None,
None,
)
.is_err());
}
#[test]
fn test_validate_listed_log_file_single_multipart_checkpoint_num_parts_mismatch() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![create_log_path(
"file:///_delta_log/00000000000000000010.checkpoint.0000000001.0000000002.parquet",
)],
..Default::default()
},
log_root,
None,
None,
)
.is_err());
}
#[test]
fn test_validate_listed_log_file_multiple_single_part_checkpoints() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![
create_log_path("file:///_delta_log/00000000000000000010.checkpoint.parquet"),
create_log_path("file:///_delta_log/00000000000000000010.checkpoint.parquet"),
],
..Default::default()
},
log_root,
None,
None,
)
.is_err());
}
#[test]
fn test_validate_listed_log_file_commit_files_contains_non_commit() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
ascending_commit_files: vec![create_log_path(
"file:///_delta_log/00000000000000000010.checkpoint.parquet",
)],
..Default::default()
},
log_root,
None,
None,
)
.is_err());
}
#[test]
fn test_validate_listed_log_file_compaction_files_contains_non_compaction() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
ascending_commit_files: vec![create_log_path(
"file:///_delta_log/00000000000000000002.json",
)],
ascending_compaction_files: vec![create_log_path(
"file:///_delta_log/00000000000000000001.json",
)],
..Default::default()
},
log_root,
None,
None,
)
.is_err());
}
#[test]
fn test_validate_listed_log_file_compaction_start_exceeds_end() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
ascending_commit_files: vec![create_log_path(
"file:///_delta_log/00000000000000000005.json",
)],
ascending_compaction_files: vec![create_log_path(
"file:///_delta_log/00000000000000000005.00000000000000000002.compacted.json",
)],
..Default::default()
},
log_root,
None,
None,
)
.is_err());
}
#[tokio::test]
async fn commits_since() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &Vec::from_iter(0..=4),
..Default::default()
})
.await;
assert_eq!(log_segment.commits_since_checkpoint(), 4);
assert_eq!(log_segment.commits_since_log_compaction_or_checkpoint(), 4);
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &Vec::from_iter(0..=4),
compaction_versions: &[(0, 2)],
..Default::default()
})
.await;
assert_eq!(log_segment.commits_since_checkpoint(), 4);
assert_eq!(log_segment.commits_since_log_compaction_or_checkpoint(), 2);
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &Vec::from_iter(0..=6),
checkpoint_version: Some(3),
..Default::default()
})
.await;
assert_eq!(log_segment.commits_since_checkpoint(), 3);
assert_eq!(log_segment.commits_since_log_compaction_or_checkpoint(), 3);
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &Vec::from_iter(0..=6),
compaction_versions: &[(0, 2)],
checkpoint_version: Some(3),
..Default::default()
})
.await;
assert_eq!(log_segment.commits_since_checkpoint(), 3);
assert_eq!(log_segment.commits_since_log_compaction_or_checkpoint(), 3);
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &Vec::from_iter(0..=6),
compaction_versions: &[(3, 4)],
checkpoint_version: Some(2),
..Default::default()
})
.await;
assert_eq!(log_segment.commits_since_checkpoint(), 4);
assert_eq!(log_segment.commits_since_log_compaction_or_checkpoint(), 2);
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &Vec::from_iter(0..=6),
compaction_versions: &[(1, 2), (3, 4)],
..Default::default()
})
.await;
assert_eq!(log_segment.commits_since_checkpoint(), 6);
assert_eq!(log_segment.commits_since_log_compaction_or_checkpoint(), 2);
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &Vec::from_iter(0..=10),
compaction_versions: &[(1, 2), (3, 9), (4, 6)],
..Default::default()
})
.await;
assert_eq!(log_segment.commits_since_checkpoint(), 10);
assert_eq!(log_segment.commits_since_log_compaction_or_checkpoint(), 1);
}
#[tokio::test]
async fn for_timestamp_conversion_gets_commit_range() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment =
LogSegment::for_timestamp_conversion(storage.as_ref(), log_root.clone(), 7, None).unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert!(checkpoint_parts.is_empty());
let versions = commit_files.iter().map(|x| x.version).collect_vec();
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], versions);
}
#[tokio::test]
async fn for_timestamp_conversion_with_old_end_version() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment =
LogSegment::for_timestamp_conversion(storage.as_ref(), log_root.clone(), 5, None).unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert!(checkpoint_parts.is_empty());
let versions = commit_files.iter().map(|x| x.version).collect_vec();
assert_eq!(vec![0, 1, 2, 3, 4, 5], versions);
}
#[tokio::test]
async fn for_timestamp_conversion_only_contiguous_ranges() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment =
LogSegment::for_timestamp_conversion(storage.as_ref(), log_root.clone(), 7, None).unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert!(checkpoint_parts.is_empty());
let versions = commit_files.iter().map(|x| x.version).collect_vec();
assert_eq!(vec![5, 6, 7], versions);
}
#[tokio::test]
async fn for_timestamp_conversion_with_limit() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment = LogSegment::for_timestamp_conversion(
storage.as_ref(),
log_root.clone(),
7,
Some(NonZero::new(3).unwrap()),
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert!(checkpoint_parts.is_empty());
let versions = commit_files.iter().map(|x| x.version).collect_vec();
assert_eq!(vec![5, 6, 7], versions);
}
#[tokio::test]
async fn for_timestamp_conversion_with_large_limit() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
)
.await;
let log_segment = LogSegment::for_timestamp_conversion(
storage.as_ref(),
log_root.clone(),
7,
Some(NonZero::new(20).unwrap()),
)
.unwrap();
let commit_files = log_segment.listed.ascending_commit_files;
let checkpoint_parts = log_segment.listed.checkpoint_parts;
assert!(checkpoint_parts.is_empty());
let versions = commit_files.iter().map(|x| x.version).collect_vec();
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], versions);
}
#[tokio::test]
async fn for_timestamp_conversion_no_commit_files() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[delta_path_for_version(5, "checkpoint.parquet")],
None,
)
.await;
let res = LogSegment::for_timestamp_conversion(storage.as_ref(), log_root.clone(), 0, None);
assert_result_error_with_message(res, "Generic delta kernel error: No files in log segment");
}
#[tokio::test]
async fn test_latest_commit_file_field_is_captured() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(2, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
],
None,
)
.await;
let log_segment =
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None, None, None)
.unwrap();
assert_eq!(log_segment.listed.latest_commit_file.unwrap().version, 5);
assert_eq!(log_segment.listed.ascending_commit_files.len(), 3);
assert_eq!(log_segment.listed.ascending_commit_files[0].version, 3);
assert_eq!(log_segment.listed.ascending_commit_files[2].version, 5);
}
#[tokio::test]
async fn test_latest_commit_file_with_checkpoint_filtering() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
],
None,
)
.await;
let log_segment =
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None, None, None)
.unwrap();
assert_eq!(log_segment.listed.latest_commit_file.unwrap().version, 4);
assert_eq!(log_segment.listed.ascending_commit_files.len(), 1);
assert_eq!(log_segment.listed.ascending_commit_files[0].version, 4);
}
#[tokio::test]
async fn test_latest_commit_file_with_no_commits() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[delta_path_for_version(2, "checkpoint.parquet")],
None,
)
.await;
let log_segment =
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None, None, None)
.unwrap();
assert!(log_segment.listed.latest_commit_file.is_none());
assert_eq!(log_segment.checkpoint_version, Some(2));
}
#[tokio::test]
async fn test_latest_commit_file_with_checkpoint_at_same_version() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
],
None,
)
.await;
let log_segment =
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None, None, None)
.unwrap();
assert_eq!(log_segment.listed.latest_commit_file.unwrap().version, 1);
assert_eq!(log_segment.listed.ascending_commit_files.len(), 0);
assert_eq!(log_segment.checkpoint_version, Some(1));
}
#[tokio::test]
async fn test_latest_commit_file_edge_case_commit_before_checkpoint() {
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
],
None,
)
.await;
let log_segment =
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None, None, None)
.unwrap();
assert!(log_segment.listed.latest_commit_file.is_none());
assert_eq!(log_segment.checkpoint_version, Some(1));
assert_eq!(log_segment.listed.ascending_commit_files.len(), 0);
}
#[test]
fn test_log_segment_contiguous_commit_files() {
let log_root = Url::parse("file:///_delta_log/").unwrap();
assert!(LogSegment::try_new(
LogSegmentFiles {
ascending_commit_files: vec![
create_log_path("file:///_delta_log/00000000000000000001.json"),
create_log_path("file:///_delta_log/00000000000000000002.json"),
create_log_path("file:///_delta_log/00000000000000000003.json"),
],
..Default::default()
},
log_root.clone(),
None,
None,
)
.is_ok());
let log_segment = LogSegment::try_new(
LogSegmentFiles {
ascending_commit_files: vec![
create_log_path("file:///_delta_log/00000000000000000001.json"),
create_log_path("file:///_delta_log/00000000000000000003.json"),
],
..Default::default()
},
log_root,
None,
None,
);
assert_result_error_with_message(
log_segment,
"Generic delta kernel error: Expected contiguous commit files, but found gap: \
ParsedLogPath { location: FileMeta { location: Url { scheme: \
\"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: \
None, path: \"/_delta_log/00000000000000000001.json\", query: None, fragment: None }, last_modified: \
0, size: 0 }, filename: \"00000000000000000001.json\", extension: \"json\", version: 1, \
file_type: Commit } -> ParsedLogPath { location: FileMeta { location: Url { scheme: \
\"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: \
None, path: \"/_delta_log/00000000000000000003.json\", query: None, fragment: None }, last_modified: \
0, size: 0 }, filename: \"00000000000000000003.json\", extension: \"json\", version: 3, \
file_type: Commit }",
);
}
#[tokio::test]
async fn test_checkpoint_schema_propagation_from_hint() {
use crate::schema::{StructField, StructType};
let sample_schema: SchemaRef = Arc::new(StructType::new_unchecked([
StructField::nullable("add", StructType::new_unchecked([])),
StructField::nullable("remove", StructType::new_unchecked([])),
]));
let checkpoint_metadata = LastCheckpointHint {
version: 5,
size: 10,
parts: Some(1),
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: Some(sample_schema.clone()),
checksum: None,
tags: None,
};
let (storage, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
],
Some(&checkpoint_metadata),
)
.await;
let log_segment = LogSegment::for_snapshot_impl(
storage.as_ref(),
log_root,
vec![], Some(checkpoint_metadata),
None,
)
.unwrap();
assert!(log_segment.checkpoint_schema.is_some());
assert_eq!(log_segment.checkpoint_schema.unwrap(), sample_schema);
}
#[tokio::test]
async fn test_get_file_actions_schema_v1_parquet_with_hint() -> DeltaResult<()> {
use crate::schema::{StructField, StructType};
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
let v1_schema = get_commit_schema().project(&[ADD_NAME, REMOVE_NAME])?;
add_checkpoint_to_store(
&store,
add_batch_simple(v1_schema.clone()),
"00000000000000000001.checkpoint.parquet",
)
.await?;
let checkpoint_file = log_root
.join("00000000000000000001.checkpoint.parquet")?
.to_string();
let hint_schema: SchemaRef = Arc::new(StructType::new_unchecked([
StructField::nullable("add", StructType::new_unchecked([])),
StructField::nullable("remove", StructType::new_unchecked([])),
]));
let log_segment = LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![create_log_path(&checkpoint_file)],
latest_commit_file: Some(create_log_path("file:///00000000000000000002.json")),
..Default::default()
},
log_root,
None,
Some(hint_schema.clone()), )?;
let (schema, sidecars) = log_segment.get_file_actions_schema_and_sidecars(&engine)?;
assert!(schema.is_some(), "Should return hint schema for V1");
assert_eq!(
schema.unwrap(),
hint_schema,
"Should use hint schema directly"
);
assert!(sidecars.is_empty(), "V1 checkpoint should have no sidecars");
Ok(())
}
#[rstest]
#[case::with_hint(true)]
#[case::without_hint(false)]
#[tokio::test]
async fn test_get_file_actions_schema_multi_part_v1(#[case] use_hint: bool) -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
let checkpoint_part_1 = "00000000000000000001.checkpoint.0000000001.0000000002.parquet";
let checkpoint_part_2 = "00000000000000000001.checkpoint.0000000002.0000000002.parquet";
let stats_parsed = StructType::new_unchecked([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable(
"minValues",
StructType::new_unchecked([StructField::nullable("id", DataType::LONG)]),
),
StructField::nullable(
"maxValues",
StructType::new_unchecked([StructField::nullable("id", DataType::LONG)]),
),
]);
let add_schema = StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("stats_parsed", stats_parsed),
]);
let remove_schema =
StructType::new_unchecked([StructField::nullable("path", DataType::STRING)]);
let v1_schema = Arc::new(StructType::new_unchecked([
StructField::nullable(ADD_NAME, add_schema),
StructField::nullable(REMOVE_NAME, remove_schema),
]));
add_checkpoint_to_store(
&store,
add_batch_simple(v1_schema.clone()),
checkpoint_part_1,
)
.await?;
add_checkpoint_to_store(
&store,
add_batch_simple(v1_schema.clone()),
checkpoint_part_2,
)
.await?;
let cp1_size = get_file_size(&store, &format!("_delta_log/{checkpoint_part_1}")).await;
let cp2_size = get_file_size(&store, &format!("_delta_log/{checkpoint_part_2}")).await;
let cp1_file = log_root.join(checkpoint_part_1)?.to_string();
let cp2_file = log_root.join(checkpoint_part_2)?.to_string();
let log_segment = LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![
create_log_path_with_size(&cp1_file, cp1_size),
create_log_path_with_size(&cp2_file, cp2_size),
],
latest_commit_file: Some(create_log_path("file:///00000000000000000002.json")),
..Default::default()
},
log_root,
None,
use_hint.then(|| v1_schema.clone() as SchemaRef),
)?;
let (schema, sidecars) = log_segment.get_file_actions_schema_and_sidecars(&engine)?;
let schema = schema.expect("Multi-part V1 should return file actions schema");
let add_field = schema.field(ADD_NAME).expect("should have add field");
let DataType::Struct(add_struct) = add_field.data_type() else {
panic!("add field should be a struct type");
};
assert!(
add_struct.field("stats_parsed").is_some(),
"Returned schema should include stats_parsed for data skipping"
);
assert!(sidecars.is_empty(), "Multi-part V1 should have no sidecars");
Ok(())
}
#[tokio::test]
async fn test_max_published_version_only_published_commits() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2, 3, 4],
..Default::default()
})
.await;
assert_eq!(log_segment.listed.max_published_version.unwrap(), 4);
}
#[tokio::test]
async fn test_max_published_version_checkpoint_followed_by_published_commits() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[5, 6, 7, 8],
checkpoint_version: Some(5),
..Default::default()
})
.await;
assert_eq!(log_segment.listed.max_published_version.unwrap(), 8);
}
#[tokio::test]
async fn test_max_published_version_only_staged_commits() {
let log_segment = create_segment_for(LogSegmentConfig {
staged_commit_versions: &[0, 1, 2, 3, 4],
..Default::default()
})
.await;
assert_eq!(log_segment.listed.max_published_version, None);
}
#[tokio::test]
async fn test_max_published_version_checkpoint_followed_by_staged_commits() {
let log_segment = create_segment_for(LogSegmentConfig {
staged_commit_versions: &[5, 6, 7, 8],
checkpoint_version: Some(5),
..Default::default()
})
.await;
assert_eq!(log_segment.listed.max_published_version, None);
}
#[tokio::test]
async fn test_max_published_version_published_and_staged_commits_no_overlap() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2],
staged_commit_versions: &[3, 4],
..Default::default()
})
.await;
assert_eq!(log_segment.listed.max_published_version.unwrap(), 2);
}
#[tokio::test]
async fn test_max_published_version_checkpoint_followed_by_published_and_staged_commits_no_overlap()
{
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[5, 6, 7],
staged_commit_versions: &[8, 9, 10],
checkpoint_version: Some(5),
..Default::default()
})
.await;
assert_eq!(log_segment.listed.max_published_version.unwrap(), 7);
}
#[tokio::test]
async fn test_max_published_version_published_and_staged_commits_with_overlap() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2],
staged_commit_versions: &[2, 3, 4],
..Default::default()
})
.await;
assert_eq!(log_segment.listed.max_published_version.unwrap(), 2);
}
#[tokio::test]
async fn test_max_published_version_checkpoint_followed_by_published_and_staged_commits_with_overlap(
) {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[5, 6, 7, 8, 9],
staged_commit_versions: &[7, 8, 9, 10],
checkpoint_version: Some(5),
..Default::default()
})
.await;
assert_eq!(log_segment.listed.max_published_version.unwrap(), 9);
}
#[tokio::test]
async fn test_max_published_version_checkpoint_only() {
let log_segment = create_segment_for(LogSegmentConfig {
checkpoint_version: Some(5),
..Default::default()
})
.await;
assert_eq!(log_segment.listed.max_published_version, None);
}
fn create_checkpoint_schema_with_stats_parsed(min_values_fields: Vec<StructField>) -> StructType {
let stats_parsed = StructType::new_unchecked([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable(
"minValues",
StructType::new_unchecked(min_values_fields.clone()),
),
StructField::nullable("maxValues", StructType::new_unchecked(min_values_fields)),
]);
let add_schema = StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("stats_parsed", stats_parsed),
]);
StructType::new_unchecked([StructField::nullable("add", add_schema)])
}
fn create_stats_schema(column_fields: Vec<StructField>) -> StructType {
StructType::new_unchecked([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable(
"minValues",
StructType::new_unchecked(column_fields.clone()),
),
StructField::nullable("maxValues", StructType::new_unchecked(column_fields)),
])
}
fn create_checkpoint_schema_without_stats_parsed() -> StructType {
use crate::schema::StructType;
let add_schema = StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("stats", DataType::STRING),
]);
StructType::new_unchecked([StructField::nullable("add", add_schema)])
}
#[test]
fn test_schema_has_compatible_stats_parsed_basic() {
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"id",
DataType::INTEGER,
)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
let stats_schema_widened =
create_stats_schema(vec![StructField::nullable("id", DataType::LONG)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema_widened
));
let checkpoint_schema_string =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"id",
DataType::STRING,
)]);
assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema_string,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_missing_column_ok() {
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"id",
DataType::INTEGER,
)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("other", DataType::INTEGER)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_extra_column_ok() {
let checkpoint_schema = create_checkpoint_schema_with_stats_parsed(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("extra", DataType::STRING),
]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_no_stats_parsed() {
let checkpoint_schema = create_checkpoint_schema_without_stats_parsed();
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_empty_stats_schema() {
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"id",
DataType::INTEGER,
)]);
let stats_schema = create_stats_schema(vec![]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_multiple_columns() {
let checkpoint_schema = create_checkpoint_schema_with_stats_parsed(vec![
StructField::nullable("good_col", DataType::LONG),
StructField::nullable("bad_col", DataType::STRING),
]);
let stats_schema = create_stats_schema(vec![
StructField::nullable("good_col", DataType::LONG),
StructField::nullable("bad_col", DataType::INTEGER),
]);
assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_missing_min_max_values() {
let stats_parsed = StructType::new_unchecked([
StructField::nullable("numRecords", DataType::LONG),
]);
let add_schema = StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("stats_parsed", stats_parsed),
]);
let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_min_values_not_struct() {
let stats_parsed = StructType::new_unchecked([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("minValues", DataType::STRING),
StructField::nullable("maxValues", DataType::STRING),
]);
let add_schema = StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("stats_parsed", stats_parsed),
]);
let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_nested_struct() {
let user_struct = StructType::new_unchecked([
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER),
]);
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"user",
user_struct.clone(),
)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("user", user_struct)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_nested_struct_with_extra_fields() {
let checkpoint_user = StructType::new_unchecked([
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER),
StructField::nullable("extra", DataType::STRING), ]);
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"user",
checkpoint_user,
)]);
let stats_user = StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("user", stats_user)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_nested_struct_missing_field_ok() {
let checkpoint_user =
StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"user",
checkpoint_user,
)]);
let stats_user = StructType::new_unchecked([
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER), ]);
let stats_schema = create_stats_schema(vec![StructField::nullable("user", stats_user)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_nested_struct_type_mismatch() {
let checkpoint_user = StructType::new_unchecked([
StructField::nullable("name", DataType::INTEGER), ]);
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"user",
checkpoint_user,
)]);
let stats_user = StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("user", stats_user)]);
assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_deeply_nested() {
let team = StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
let department = StructType::new_unchecked([StructField::nullable("team", team.clone())]);
let company = StructType::new_unchecked([StructField::nullable("department", department)]);
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"company",
company.clone(),
)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("company", company)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_deeply_nested_type_mismatch() {
let checkpoint_team =
StructType::new_unchecked([StructField::nullable("name", DataType::INTEGER)]); let checkpoint_dept =
StructType::new_unchecked([StructField::nullable("team", checkpoint_team)]);
let checkpoint_company =
StructType::new_unchecked([StructField::nullable("department", checkpoint_dept)]);
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"company",
checkpoint_company,
)]);
let stats_team = StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
let stats_dept = StructType::new_unchecked([StructField::nullable("team", stats_team)]);
let stats_company =
StructType::new_unchecked([StructField::nullable("department", stats_dept)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("company", stats_company)]);
assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_long_to_timestamp() {
let checkpoint_schema = create_checkpoint_schema_with_stats_parsed(vec![
StructField::nullable("ts_col", DataType::LONG),
StructField::nullable("ts_ntz_col", DataType::LONG),
]);
let stats_schema = create_stats_schema(vec![
StructField::nullable("ts_col", DataType::TIMESTAMP),
StructField::nullable("ts_ntz_col", DataType::TIMESTAMP_NTZ),
]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_timestamp_to_long_rejected() {
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"ts_col",
DataType::TIMESTAMP,
)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("ts_col", DataType::LONG)]);
assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_integer_to_date() {
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"date_col",
DataType::INTEGER,
)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("date_col", DataType::DATE)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_schema_has_compatible_stats_parsed_date_to_integer_rejected() {
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"date_col",
DataType::DATE,
)]);
let stats_schema =
create_stats_schema(vec![StructField::nullable("date_col", DataType::INTEGER)]);
assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[rstest]
#[case::widening_integer_to_long(DataType::INTEGER, DataType::LONG, true)]
#[case::reinterpret_integer_to_date(DataType::INTEGER, DataType::DATE, true)]
#[case::reinterpret_long_to_timestamp(DataType::LONG, DataType::TIMESTAMP, true)]
#[case::reinterpret_plus_widen_integer_to_timestamp(DataType::INTEGER, DataType::TIMESTAMP, false)]
#[case::reinterpret_plus_widen_integer_to_timestamp_ntz(
DataType::INTEGER,
DataType::TIMESTAMP_NTZ,
false
)]
#[case::date_widened_to_timestamp(DataType::DATE, DataType::TIMESTAMP, false)]
fn test_stats_parsed_widening_and_reinterpretation_interaction(
#[case] checkpoint_type: DataType,
#[case] stats_type: DataType,
#[case] expected: bool,
) {
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"col",
checkpoint_type,
)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("col", stats_type)]);
assert_eq!(
LogSegment::schema_has_compatible_stats_parsed(&checkpoint_schema, &stats_schema),
expected
);
}
#[test]
fn test_stats_parsed_mixed_widening_and_reinterpretation() {
let checkpoint_schema = create_checkpoint_schema_with_stats_parsed(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("ts_col", DataType::LONG),
StructField::nullable("date_col", DataType::INTEGER),
]);
let stats_schema = create_stats_schema(vec![
StructField::nullable("id", DataType::LONG),
StructField::nullable("ts_col", DataType::TIMESTAMP),
StructField::nullable("date_col", DataType::DATE),
]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
#[test]
fn test_stats_parsed_mixed_with_one_incompatible_rejects_all() {
let checkpoint_schema = create_checkpoint_schema_with_stats_parsed(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("ts_col", DataType::LONG),
StructField::nullable("bad_col", DataType::INTEGER),
]);
let stats_schema = create_stats_schema(vec![
StructField::nullable("id", DataType::LONG),
StructField::nullable("ts_col", DataType::TIMESTAMP),
StructField::nullable("bad_col", DataType::TIMESTAMP),
]);
assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));
}
fn add_batch_with_partition_values_parsed(output_schema: SchemaRef) -> Box<ArrowEngineData> {
let handler = SyncJsonHandler {};
let json_strings: StringArray = vec![
r#"{"add":{"path":"part-00000.parquet","partitionValues":{"id":"1"},"partitionValues_parsed":{"id":1},"size":635,"modificationTime":1677811178336,"dataChange":true}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1677811175819}}"#,
]
.into();
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
ArrowEngineData::try_from_engine_data(parsed).unwrap()
}
#[tokio::test]
async fn test_checkpoint_stream_sets_has_partition_values_parsed() -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
let partition_parsed_struct =
StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let add_struct = StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable(
"partitionValues",
crate::schema::MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::nullable("partitionValues_parsed", partition_parsed_struct),
StructField::nullable("size", DataType::LONG),
StructField::nullable("modificationTime", DataType::LONG),
StructField::nullable("dataChange", DataType::BOOLEAN),
]);
let metadata_struct = StructType::new_unchecked([
StructField::nullable("id", DataType::STRING),
StructField::nullable(
"format",
StructType::new_unchecked([StructField::nullable("provider", DataType::STRING)]),
),
StructField::nullable("schemaString", DataType::STRING),
StructField::nullable(
"partitionColumns",
crate::schema::ArrayType::new(DataType::STRING, false),
),
StructField::nullable(
"configuration",
crate::schema::MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::nullable("createdTime", DataType::LONG),
]);
let checkpoint_schema: SchemaRef = Arc::new(StructType::new_unchecked([
StructField::nullable("add", add_struct),
StructField::nullable("metaData", metadata_struct),
]));
add_checkpoint_to_store(
&store,
add_batch_with_partition_values_parsed(checkpoint_schema),
"00000000000000000001.checkpoint.parquet",
)
.await?;
let checkpoint_file = log_root
.join("00000000000000000001.checkpoint.parquet")?
.to_string();
let checkpoint_size =
get_file_size(&store, "_delta_log/00000000000000000001.checkpoint.parquet").await;
let read_schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::nullable(
"add",
StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable(
"partitionValues",
crate::schema::MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::nullable("size", DataType::LONG),
StructField::nullable("modificationTime", DataType::LONG),
StructField::nullable("dataChange", DataType::BOOLEAN),
]),
)]));
let log_segment = LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![create_log_path_with_size(&checkpoint_file, checkpoint_size)],
latest_commit_file: Some(create_log_path("file:///00000000000000000001.json")),
..Default::default()
},
log_root,
None,
None,
)?;
let partition_schema =
StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
read_schema,
None, None, Some(&partition_schema),
)?;
assert!(
checkpoint_result
.checkpoint_info
.has_partition_values_parsed,
"Expected has_partition_values_parsed to be true"
);
let schema = &checkpoint_result.checkpoint_info.checkpoint_read_schema;
let add_field = schema.field("add").expect("schema should have 'add' field");
let DataType::Struct(add_struct) = add_field.data_type() else {
panic!("add field should be a struct");
};
assert!(
add_struct.field("partitionValues_parsed").is_some(),
"checkpoint read schema should include add.partitionValues_parsed"
);
Ok(())
}
#[tokio::test]
async fn test_checkpoint_stream_no_partition_values_parsed_when_incompatible() -> DeltaResult<()> {
let (store, log_root) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
add_checkpoint_to_store(
&store,
add_batch_simple(get_all_actions_schema().project(&[ADD_NAME])?),
"00000000000000000001.checkpoint.parquet",
)
.await?;
let checkpoint_file = log_root
.join("00000000000000000001.checkpoint.parquet")?
.to_string();
let checkpoint_size =
get_file_size(&store, "_delta_log/00000000000000000001.checkpoint.parquet").await;
let read_schema = get_all_actions_schema().project(&[ADD_NAME])?;
let log_segment = LogSegment::try_new(
LogSegmentFiles {
checkpoint_parts: vec![create_log_path_with_size(&checkpoint_file, checkpoint_size)],
latest_commit_file: Some(create_log_path("file:///00000000000000000001.json")),
..Default::default()
},
log_root,
None,
None,
)?;
let partition_schema =
StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
read_schema.clone(),
None,
None,
Some(&partition_schema),
)?;
assert!(
!checkpoint_result
.checkpoint_info
.has_partition_values_parsed,
"Expected has_partition_values_parsed to be false"
);
let schema = &checkpoint_result.checkpoint_info.checkpoint_read_schema;
if let Some(add_field) = schema.field("add") {
let DataType::Struct(add_struct) = add_field.data_type() else {
panic!("add field should be a struct");
};
assert!(
add_struct.field("partitionValues_parsed").is_none(),
"checkpoint read schema should NOT include add.partitionValues_parsed"
);
}
Ok(())
}
fn create_checkpoint_schema_with_partition_parsed(
partition_fields: Vec<StructField>,
) -> StructType {
let partition_parsed = StructType::new_unchecked(partition_fields);
let add_struct = StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("partitionValues_parsed", partition_parsed),
]);
StructType::new_unchecked([StructField::nullable("add", add_struct)])
}
fn create_checkpoint_schema_without_partition_parsed() -> StructType {
let add_struct = StructType::new_unchecked([StructField::nullable("path", DataType::STRING)]);
StructType::new_unchecked([StructField::nullable("add", add_struct)])
}
#[test]
fn test_partition_values_parsed_compatible_basic() {
let checkpoint_schema = create_checkpoint_schema_with_partition_parsed(vec![
StructField::nullable("date", DataType::DATE),
StructField::nullable("region", DataType::STRING),
]);
let partition_schema = StructType::new_unchecked([
StructField::nullable("date", DataType::DATE),
StructField::nullable("region", DataType::STRING),
]);
assert!(LogSegment::schema_has_compatible_partition_values_parsed(
&checkpoint_schema,
&partition_schema,
));
}
#[test]
fn test_partition_values_parsed_missing_field() {
let checkpoint_schema =
create_checkpoint_schema_with_partition_parsed(vec![StructField::nullable(
"date",
DataType::DATE,
)]);
let partition_schema = StructType::new_unchecked([
StructField::nullable("date", DataType::DATE),
StructField::nullable("region", DataType::STRING),
]);
assert!(LogSegment::schema_has_compatible_partition_values_parsed(
&checkpoint_schema,
&partition_schema,
));
}
#[test]
fn test_partition_values_parsed_extra_field() {
let checkpoint_schema = create_checkpoint_schema_with_partition_parsed(vec![
StructField::nullable("date", DataType::DATE),
StructField::nullable("region", DataType::STRING),
StructField::nullable("extra", DataType::INTEGER),
]);
let partition_schema =
StructType::new_unchecked([StructField::nullable("date", DataType::DATE)]);
assert!(LogSegment::schema_has_compatible_partition_values_parsed(
&checkpoint_schema,
&partition_schema,
));
}
#[test]
fn test_partition_values_parsed_type_mismatch() {
let checkpoint_schema =
create_checkpoint_schema_with_partition_parsed(vec![StructField::nullable(
"date",
DataType::STRING,
)]);
let partition_schema =
StructType::new_unchecked([StructField::nullable("date", DataType::DATE)]);
assert!(!LogSegment::schema_has_compatible_partition_values_parsed(
&checkpoint_schema,
&partition_schema,
));
}
#[test]
fn test_partition_values_parsed_not_present() {
let checkpoint_schema = create_checkpoint_schema_without_partition_parsed();
let partition_schema =
StructType::new_unchecked([StructField::nullable("date", DataType::DATE)]);
assert!(!LogSegment::schema_has_compatible_partition_values_parsed(
&checkpoint_schema,
&partition_schema,
));
}
#[test]
fn test_partition_values_parsed_not_a_struct() {
let add_struct = StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("partitionValues_parsed", DataType::STRING),
]);
let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_struct)]);
let partition_schema =
StructType::new_unchecked([StructField::nullable("date", DataType::DATE)]);
assert!(!LogSegment::schema_has_compatible_partition_values_parsed(
&checkpoint_schema,
&partition_schema,
));
}
#[test]
fn test_partition_values_parsed_empty_partition_schema() {
let checkpoint_schema =
create_checkpoint_schema_with_partition_parsed(vec![StructField::nullable(
"date",
DataType::DATE,
)]);
let partition_schema = StructType::new_unchecked(Vec::<StructField>::new());
assert!(LogSegment::schema_has_compatible_partition_values_parsed(
&checkpoint_schema,
&partition_schema,
));
}
fn assert_log_segment_extended(orig: LogSegment, new: LogSegment) {
assert_eq!(orig.end_version + 1, new.end_version);
assert_eq!(
orig.listed.ascending_commit_files.len() + 1,
new.listed.ascending_commit_files.len()
);
assert_eq!(
orig.listed.latest_commit_file.as_ref().unwrap().version + 1,
new.listed.latest_commit_file.as_ref().unwrap().version
);
fn normalize(log_segment: LogSegment) -> LogSegment {
use crate::log_segment_files::LogSegmentFiles;
LogSegment {
end_version: 0,
listed: LogSegmentFiles {
max_published_version: None,
ascending_commit_files: vec![],
latest_commit_file: None,
..log_segment.listed
},
..log_segment
}
}
assert_eq!(normalize(orig), normalize(new));
}
#[tokio::test]
async fn test_new_with_commit_published_commit() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2, 3, 4],
..Default::default()
})
.await;
let table_root = Url::parse("memory:///").unwrap();
let new_commit = ParsedLogPath::create_parsed_published_commit(&table_root, 5);
let new_log_segment = log_segment
.clone()
.new_with_commit_appended(new_commit)
.unwrap();
assert_eq!(new_log_segment.listed.max_published_version, Some(5));
assert_log_segment_extended(log_segment, new_log_segment);
}
#[tokio::test]
async fn test_new_with_commit_staged_commit() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2, 3, 4],
..Default::default()
})
.await;
let table_root = Url::parse("memory:///").unwrap();
let new_commit = ParsedLogPath::create_parsed_staged_commit(&table_root, 5);
let new_log_segment = log_segment
.clone()
.new_with_commit_appended(new_commit)
.unwrap();
assert_eq!(new_log_segment.listed.max_published_version, Some(4));
assert_log_segment_extended(log_segment, new_log_segment);
}
#[tokio::test]
async fn test_new_with_commit_not_commit_type() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2, 3, 4],
..Default::default()
})
.await;
let checkpoint = create_log_path("file:///_delta_log/00000000000000000005.checkpoint.parquet");
let result = log_segment.new_with_commit_appended(checkpoint);
assert_result_error_with_message(
result,
"Cannot extend and create new LogSegment. Tail log file is not a commit file.",
);
}
#[tokio::test]
async fn test_new_with_commit_not_end_version_plus_one() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2, 3, 4],
..Default::default()
})
.await;
let table_root = Url::parse("memory:///").unwrap();
let wrong_version_commit = ParsedLogPath::create_parsed_published_commit(&table_root, 10);
let result = log_segment.new_with_commit_appended(wrong_version_commit);
assert_result_error_with_message(
result,
"Cannot extend and create new LogSegment. Tail commit file version (10) does not equal LogSegment end_version (4) + 1."
);
}
#[rstest]
#[case::non_checkpoint_file(
"file:///_delta_log/00000000000000000002.json",
"Path is not a single-file checkpoint"
)]
#[case::multi_part_checkpoint(
"file:///_delta_log/00000000000000000002.checkpoint.0000000001.0000000002.parquet",
"Path is not a single-file checkpoint"
)]
#[case::wrong_version(
"file:///_delta_log/00000000000000000005.checkpoint.parquet",
"Checkpoint version (5) does not equal LogSegment end_version (2)"
)]
#[tokio::test]
async fn test_try_new_with_checkpoint_rejects_invalid_path(
#[case] path: &str,
#[case] expected_error: &str,
) {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2],
..Default::default()
})
.await;
let result = log_segment.try_new_with_checkpoint(create_log_path(path));
assert_result_error_with_message(result, expected_error);
}
#[rstest]
#[case::classic_parquet("file:///_delta_log/00000000000000000002.checkpoint.parquet")]
#[case::v2_uuid(
"file:///_delta_log/00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"
)]
#[tokio::test]
async fn test_try_new_with_checkpoint_sets_checkpoint_and_clears_commits(#[case] path: &str) {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2],
compaction_versions: &[(0, 2)],
..Default::default()
})
.await;
assert!(!log_segment.listed.ascending_commit_files.is_empty());
assert!(!log_segment.listed.ascending_compaction_files.is_empty());
let ckpt_path = create_log_path(path);
let result = log_segment.try_new_with_checkpoint(ckpt_path).unwrap();
assert_eq!(result.checkpoint_version, Some(2));
assert_eq!(result.listed.checkpoint_parts.len(), 1);
assert_eq!(result.listed.checkpoint_parts[0].version, 2);
assert!(result.listed.ascending_commit_files.is_empty());
assert!(result.listed.ascending_compaction_files.is_empty());
assert!(result.checkpoint_schema.is_none());
assert_eq!(
result.listed.latest_commit_file.as_ref().map(|f| f.version),
log_segment
.listed
.latest_commit_file
.as_ref()
.map(|f| f.version)
);
assert_eq!(result.end_version, log_segment.end_version);
assert_eq!(result.log_root, log_segment.log_root);
}
#[rstest]
#[case::non_crc_file(
"file:///_delta_log/00000000000000000002.json",
"Path is not a CRC file"
)]
#[case::wrong_version(
"file:///_delta_log/00000000000000000005.crc",
"CRC version (5) does not equal LogSegment end_version (2)"
)]
#[tokio::test]
async fn test_try_new_with_crc_file_rejects_invalid_path(
#[case] path: &str,
#[case] expected_error: &str,
) {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2],
..Default::default()
})
.await;
let url = Url::parse(path).unwrap();
let crc_path = ParsedLogPath::try_from(url).unwrap().unwrap();
let result = log_segment.try_new_with_crc_file(crc_path);
assert_result_error_with_message(result, expected_error);
}
#[tokio::test]
async fn test_try_new_with_crc_file_sets_crc_and_preserves_other_fields() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2],
checkpoint_version: Some(1),
..Default::default()
})
.await;
let url = Url::parse("file:///_delta_log/00000000000000000002.crc").unwrap();
let crc_path = ParsedLogPath::try_from(url).unwrap().unwrap();
let result = log_segment.try_new_with_crc_file(crc_path).unwrap();
let crc_file = result.listed.latest_crc_file.as_ref().unwrap();
assert_eq!(crc_file.version, 2);
assert_eq!(result.end_version, log_segment.end_version);
assert_eq!(result.checkpoint_version, log_segment.checkpoint_version);
assert_eq!(
result.listed.ascending_commit_files.len(),
log_segment.listed.ascending_commit_files.len()
);
assert_eq!(
result.listed.checkpoint_parts.len(),
log_segment.listed.checkpoint_parts.len()
);
assert_eq!(result.log_root, log_segment.log_root);
}
#[tokio::test]
async fn test_get_unpublished_catalog_commits() {
let log_segment = create_segment_for(LogSegmentConfig {
published_commit_versions: &[0, 1, 2],
staged_commit_versions: &[2, 3, 4],
..Default::default()
})
.await;
assert_eq!(log_segment.listed.max_published_version, Some(2));
let unpublished = log_segment.get_unpublished_catalog_commits().unwrap();
let versions: Vec<_> = unpublished.iter().map(|c| c.version()).collect();
assert_eq!(versions, vec![3, 4]);
}
fn extract_commit_versions(seg: &LogSegment) -> Vec<u64> {
seg.listed
.ascending_commit_files
.iter()
.map(|c| c.version)
.collect()
}
fn extract_compaction_ranges(seg: &LogSegment) -> Vec<(u64, u64)> {
seg.listed
.ascending_compaction_files
.iter()
.map(|c| match c.file_type {
LogPathFileType::CompactedCommit { hi } => (c.version, hi),
_ => panic!("expected compaction"),
})
.collect()
}
struct CrcPruningCase {
commits: &'static [u64],
compactions: &'static [(u64, u64)],
checkpoint: Option<u64>,
crc_version: u64,
after_commits: &'static [u64],
after_compactions: &'static [(u64, u64)],
through_commits: &'static [u64],
through_compactions: &'static [(u64, u64)],
}
#[rstest::rstest]
#[case::only_deltas_no_checkpoint(CrcPruningCase {
commits: &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
compactions: &[],
checkpoint: None,
crc_version: 4,
after_commits: &[5, 6, 7, 8, 9],
after_compactions: &[],
through_commits: &[0, 1, 2, 3, 4],
through_compactions: &[],
})]
#[case::only_deltas_with_checkpoint(CrcPruningCase {
commits: &[3, 4, 5, 6, 7, 8, 9],
compactions: &[],
checkpoint: Some(2),
crc_version: 4,
after_commits: &[5, 6, 7, 8, 9],
after_compactions: &[],
through_commits: &[3, 4],
through_compactions: &[],
})]
#[case::compaction_after_crc(CrcPruningCase {
commits: &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
compactions: &[(5, 7)],
checkpoint: None,
crc_version: 4,
after_commits: &[5, 6, 7, 8, 9],
after_compactions: &[(5, 7)],
through_commits: &[0, 1, 2, 3, 4],
through_compactions: &[],
})]
#[case::compaction_overlaps_crc(CrcPruningCase {
commits: &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
compactions: &[(2, 6)],
checkpoint: None,
crc_version: 4,
after_commits: &[5, 6, 7, 8, 9],
after_compactions: &[],
through_commits: &[0, 1, 2, 3, 4],
through_compactions: &[],
})]
#[case::compaction_before_crc(CrcPruningCase {
commits: &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
compactions: &[(0, 2)],
checkpoint: None,
crc_version: 4,
after_commits: &[5, 6, 7, 8, 9],
after_compactions: &[],
through_commits: &[0, 1, 2, 3, 4],
through_compactions: &[(0, 2)],
})]
#[tokio::test]
async fn test_segment_crc_filtering(#[case] case: CrcPruningCase) {
let seg = create_segment_for(LogSegmentConfig {
published_commit_versions: case.commits,
compaction_versions: case.compactions,
checkpoint_version: case.checkpoint,
..Default::default()
})
.await;
let after = seg.segment_after_crc(case.crc_version);
assert_eq!(extract_commit_versions(&after), case.after_commits);
assert_eq!(extract_compaction_ranges(&after), case.after_compactions);
assert!(after.checkpoint_version.is_none());
assert!(after.listed.checkpoint_parts.is_empty());
let through = seg.segment_through_crc(case.crc_version);
assert_eq!(extract_commit_versions(&through), case.through_commits);
assert_eq!(
extract_compaction_ranges(&through),
case.through_compactions
);
assert_eq!(through.checkpoint_version, case.checkpoint);
}
#[rstest::rstest]
#[case::empty_schema(StructType::new_unchecked([]), None)]
#[case::metadata_field(
StructType::new_unchecked([StructField::nullable(
METADATA_NAME,
StructType::new_unchecked([]),
)]),
Some(Arc::new(
Expression::column(ColumnName::new([METADATA_NAME, "id"])).is_not_null(),
)),
)]
#[case::protocol_field(
StructType::new_unchecked([StructField::nullable(
PROTOCOL_NAME,
StructType::new_unchecked([]),
)]),
Some(Arc::new(
Expression::column(ColumnName::new([PROTOCOL_NAME, "minReaderVersion"])).is_not_null(),
)),
)]
#[case::txn_field(
StructType::new_unchecked([StructField::nullable(
SET_TRANSACTION_NAME,
StructType::new_unchecked([]),
)]),
Some(Arc::new(
Expression::column(ColumnName::new([SET_TRANSACTION_NAME, "appId"])).is_not_null(),
)),
)]
#[case::domain_metadata_field(
StructType::new_unchecked([StructField::nullable(
DOMAIN_METADATA_NAME,
StructType::new_unchecked([]),
)]),
Some(Arc::new(
Expression::column(ColumnName::new([DOMAIN_METADATA_NAME, "domain"])).is_not_null(),
)),
)]
#[case::unknown_field_returns_none(
StructType::new_unchecked([StructField::nullable(ADD_NAME, StructType::new_unchecked([]))]),
None,
)]
#[case::multiple_known_fields(
StructType::new_unchecked([
StructField::nullable(METADATA_NAME, StructType::new_unchecked([])),
StructField::nullable(PROTOCOL_NAME, StructType::new_unchecked([])),
]),
Some(Arc::new(Predicate::or(
Expression::column(ColumnName::new([METADATA_NAME, "id"])).is_not_null(),
Expression::column(ColumnName::new([PROTOCOL_NAME, "minReaderVersion"])).is_not_null(),
))),
)]
#[case::known_and_unknown_field_returns_none(
StructType::new_unchecked([
StructField::nullable(METADATA_NAME, StructType::new_unchecked([])),
StructField::nullable(ADD_NAME, StructType::new_unchecked([])),
]),
None,
)]
fn test_schema_to_is_not_null_predicate(
#[case] schema: StructType,
#[case] expected: Option<PredicateRef>,
) {
assert_eq!(schema_to_is_not_null_predicate(&schema), expected);
}
#[rstest]
#[case::remove_partition_values(
"remove",
"partitionValues",
r#"{"remove":{"path":"file.parquet","deletionTimestamp":1000,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"year":"2024","month":null},"size":100}}"#
)]
#[case::remove_tags(
"remove",
"tags",
r#"{"remove":{"path":"file.parquet","deletionTimestamp":1000,"dataChange":true,"tags":{"key1":"val1","key2":null}}}"#
)]
#[case::add_partition_values(
"add",
"partitionValues",
r#"{"add":{"path":"file.parquet","partitionValues":{"year":"2024","month":null},"size":100,"modificationTime":1000,"dataChange":true}}"#
)]
#[case::add_tags(
"add",
"tags",
r#"{"add":{"path":"file.parquet","partitionValues":{},"size":100,"modificationTime":1000,"dataChange":true,"tags":{"key1":"val1","key2":null}}}"#
)]
#[case::cdc_partition_values(
"cdc",
"partitionValues",
r#"{"cdc":{"path":"file.parquet","partitionValues":{"year":"2024","month":null},"size":100,"dataChange":false}}"#
)]
#[case::cdc_tags(
"cdc",
"tags",
r#"{"cdc":{"path":"file.parquet","partitionValues":{},"size":100,"dataChange":false,"tags":{"key1":"val1","key2":null}}}"#
)]
#[case::sidecar_tags(
"sidecar",
"tags",
r#"{"sidecar":{"path":"sidecar.parquet","sizeInBytes":100,"modificationTime":1000,"tags":{"key1":"val1","key2":null}}}"#
)]
#[case::checkpoint_metadata_tags(
"checkpointMetadata",
"tags",
r#"{"checkpointMetadata":{"version":0,"tags":{"key1":"val1","key2":null}}}"#
)]
#[should_panic(expected = "StructArray re-validation failed")]
#[case::commit_info_operation_parameters_known_issue(
"commitInfo",
"operationParameters",
r#"{"commitInfo":{"timestamp":1000,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","description":null}}}"#
)]
#[should_panic(expected = "StructArray re-validation failed")]
#[case::metadata_configuration_known_issue(
"metaData",
"configuration",
r#"{"metaData":{"id":"test","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[]}","partitionColumns":[],"configuration":{"key1":"val1","key2":null},"createdTime":1000}}"#
)]
#[tokio::test]
async fn read_actions_with_null_map_values(
#[case] action_name: &str,
#[case] map_field: &str,
#[case] json_action: &str,
) {
use crate::arrow::array::{Array, AsArray, MapArray, StructArray};
let store = Arc::new(InMemory::new());
let log_root = Url::parse("memory:///_delta_log/").unwrap();
store
.put(
&delta_path_for_version(0, "json"),
json_action.to_string().into(),
)
.await
.unwrap();
let engine = DefaultEngineBuilder::new(store).build();
let log_segment =
LogSegment::for_table_changes(engine.storage_handler().as_ref(), log_root, 0, Some(0))
.unwrap();
let action_schema = get_all_actions_schema().clone();
let action_batches = log_segment
.read_actions(&engine, action_schema)
.expect("read_actions should succeed");
let mut found = false;
for batch_result in action_batches {
let actions_batch = batch_result.expect("Iterating action batches should succeed");
let data_any = actions_batch.actions.into_any();
let arrow_data = data_any
.downcast_ref::<ArrowEngineData>()
.expect("ArrowEngineData");
let rb = arrow_data.record_batch();
let Some(action_col) = rb.column_by_name(action_name) else {
continue;
};
let action_struct = action_col
.as_struct_opt()
.unwrap_or_else(|| panic!("{action_name} column should be a struct"));
let map_col = action_struct
.column_by_name(map_field)
.unwrap_or_else(|| panic!("{action_name}.{map_field} not found"));
let map_array = map_col
.as_any()
.downcast_ref::<MapArray>()
.unwrap_or_else(|| panic!("{action_name}.{map_field} should be a MapArray"));
let entries = map_array.entries();
StructArray::try_new(
entries.fields().clone(),
entries.columns().to_vec(),
entries.nulls().cloned(),
)
.unwrap_or_else(|e| {
panic!(
"{action_name}.{map_field} entries StructArray re-validation failed: {e}. \
This means the schema has non-nullable value field but the data has nulls."
)
});
found = true;
}
assert!(found, "Should have found a {action_name} action batch");
}