use std::{path::PathBuf, sync::Arc};
use itertools::Itertools;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use url::Url;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::filesystem::ObjectStoreFileSystemClient;
use crate::engine::sync::SyncEngine;
use crate::log_segment::LogSegment;
use crate::snapshot::CheckpointMetadata;
use crate::{FileSystemClient, Table};
use test_utils::delta_path_for_version;
#[test]
fn test_replay_for_metadata() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();
let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let data: Vec<_> = snapshot
.log_segment
.replay_for_metadata(&engine)
.unwrap()
.try_collect()
.unwrap();
assert_eq!(data.len(), 4);
}
fn delta_path_for_multipart_checkpoint(version: u64, part_num: u32, num_parts: u32) -> Path {
let path =
format!("_delta_log/{version:020}.checkpoint.{part_num:010}.{num_parts:010}.parquet");
Path::from(path.as_str())
}
fn build_log_with_paths_and_checkpoint(
paths: &[Path],
checkpoint_metadata: Option<&CheckpointMetadata>,
) -> (Box<dyn FileSystemClient>, Url) {
let store = Arc::new(InMemory::new());
let data = bytes::Bytes::from("kernel-data");
tokio::runtime::Runtime::new()
.expect("create tokio runtime")
.block_on(async {
for path in paths {
store
.put(path, data.clone().into())
.await
.expect("put log file in store");
}
if let Some(checkpoint_metadata) = checkpoint_metadata {
let checkpoint_str =
serde_json::to_string(checkpoint_metadata).expect("Serialize checkpoint");
store
.put(
&Path::from("_delta_log/_last_checkpoint"),
checkpoint_str.into(),
)
.await
.expect("Write _last_checkpoint");
}
});
let client = ObjectStoreFileSystemClient::new(
store,
false, Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
let table_root = Url::parse("memory:///").expect("valid url");
let log_root = table_root.join("_delta_log/").unwrap();
(Box::new(client), log_root)
}
#[test]
fn build_snapshot_with_out_of_date_last_checkpoint() {
let checkpoint_metadata = CheckpointMetadata {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
);
let log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(commit_files.len(), 2);
assert_eq!(checkpoint_parts[0].version, 5);
assert_eq!(commit_files[0].version, 6);
assert_eq!(commit_files[1].version, 7);
}
#[test]
fn build_snapshot_with_correct_last_multipart_checkpoint() {
let checkpoint_metadata = CheckpointMetadata {
version: 5,
size: 10,
parts: Some(3),
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
delta_path_for_multipart_checkpoint(5, 2, 3),
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
);
let log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 3);
assert_eq!(commit_files.len(), 2);
assert_eq!(checkpoint_parts[0].version, 5);
assert_eq!(commit_files[0].version, 6);
assert_eq!(commit_files[1].version, 7);
}
#[test]
fn build_snapshot_with_missing_checkpoint_part_from_hint_fails() {
let checkpoint_metadata = CheckpointMetadata {
version: 5,
size: 10,
parts: Some(3),
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
);
let log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, None);
assert!(log_segment.is_err())
}
#[test]
fn build_snapshot_with_bad_checkpoint_hint_fails() {
let checkpoint_metadata = CheckpointMetadata {
version: 5,
size: 10,
parts: Some(1),
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 2),
delta_path_for_multipart_checkpoint(5, 2, 2),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
);
let log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, None);
assert!(log_segment.is_err())
}
#[ignore]
#[test]
fn build_snapshot_with_missing_checkpoint_part_no_hint() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
);
let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
assert_eq!(versions, expected_versions);
}
#[test]
fn build_snapshot_without_checkpoints() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
);
let log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root.clone(), None, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 5);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![6, 7];
assert_eq!(versions, expected_versions);
let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, Some(2)).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 1);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![2];
assert_eq!(versions, expected_versions);
}
#[test]
fn build_snapshot_with_checkpoint_greater_than_time_travel_version() {
let checkpoint_metadata = CheckpointMetadata {
version: 5,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
);
let log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, Some(4)).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);
assert_eq!(commit_files.len(), 1);
assert_eq!(commit_files[0].version, 4);
}
#[test]
fn build_snapshot_with_start_checkpoint_and_time_travel_version() {
let checkpoint_metadata = CheckpointMetadata {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
);
let log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, Some(4)).unwrap();
assert_eq!(log_segment.checkpoint_parts[0].version, 3);
assert_eq!(log_segment.ascending_commit_files.len(), 1);
assert_eq!(log_segment.ascending_commit_files[0].version, 4);
}
#[test]
fn build_table_changes_with_commit_versions() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
);
let log_segment =
LogSegment::for_table_changes(client.as_ref(), log_root.clone(), 2, 5).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 0);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = (2..=5).collect_vec();
assert_eq!(versions, expected_versions);
let log_segment =
LogSegment::for_table_changes(client.as_ref(), log_root.clone(), 0, Some(0)).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 0);
assert_eq!(commit_files.len(), 1);
assert_eq!(commit_files[0].version, 0);
let log_segment = LogSegment::for_table_changes(client.as_ref(), log_root, 0, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;
assert_eq!(checkpoint_parts.len(), 0);
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = (0..=7).collect_vec();
assert_eq!(versions, expected_versions);
}
#[test]
fn test_non_contiguous_log() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(2, "json"),
],
None,
);
let log_segment_res = LogSegment::for_table_changes(client.as_ref(), log_root.clone(), 0, None);
assert!(log_segment_res.is_err());
let log_segment_res = LogSegment::for_table_changes(client.as_ref(), log_root.clone(), 1, None);
assert!(log_segment_res.is_err());
let log_segment_res = LogSegment::for_table_changes(client.as_ref(), log_root, 0, Some(1));
assert!(log_segment_res.is_err());
}
#[test]
fn table_changes_fails_with_larger_start_version_than_end() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "json"),
],
None,
);
let log_segment_res = LogSegment::for_table_changes(client.as_ref(), log_root, 1, Some(0));
assert!(log_segment_res.is_err());
}