use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use rstest::rstest;
use url::Url;
use super::*;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::filesystem::ObjectStoreStorageHandler;
use crate::object_store::memory::InMemory;
use crate::object_store::path::Path as ObjectPath;
use crate::object_store::ObjectStoreExt as _;
use crate::FileMeta;
const FILESYSTEM_SIZE_MARKER: u64 = 10;
const CATALOG_SIZE_MARKER: u64 = 7;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CommitSource {
Filesystem,
Catalog,
}
fn log_path_for_file_type(version: Version, file_type: &LogPathFileType) -> String {
match file_type {
LogPathFileType::Commit => {
format!("_delta_log/{version:020}.json")
}
LogPathFileType::StagedCommit => {
let uuid = uuid::Uuid::new_v4();
format!("_delta_log/_staged_commits/{version:020}.{uuid}.json")
}
LogPathFileType::SinglePartCheckpoint => {
format!("_delta_log/{version:020}.checkpoint.parquet")
}
LogPathFileType::MultiPartCheckpoint {
part_num,
num_parts,
} => {
format!("_delta_log/{version:020}.checkpoint.{part_num:010}.{num_parts:010}.parquet")
}
LogPathFileType::Crc => {
format!("_delta_log/{version:020}.crc")
}
LogPathFileType::CompactedCommit { hi } => {
format!("_delta_log/{version:020}.{hi:020}.compacted.json")
}
LogPathFileType::UuidCheckpoint | LogPathFileType::Unknown => {
panic!("Unsupported file type in test: {file_type:?}")
}
}
}
async fn create_storage(
log_files: Vec<(Version, LogPathFileType, CommitSource)>,
) -> (Box<dyn StorageHandler>, Url) {
let store = Arc::new(InMemory::new());
let log_root = Url::parse("memory:///_delta_log/").unwrap();
for (version, file_type, source) in log_files {
let path = log_path_for_file_type(version, &file_type);
let data = match source {
CommitSource::Filesystem => bytes::Bytes::from("filesystem"),
CommitSource::Catalog => bytes::Bytes::from("catalog"),
};
store
.put(&ObjectPath::from(path.as_str()), data.into())
.await
.expect("Failed to put test file");
}
let executor = Arc::new(TokioBackgroundExecutor::new());
let storage = Box::new(ObjectStoreStorageHandler::new(store, executor, None));
(storage, log_root)
}
fn make_parsed_log_path_with_source(
version: Version,
file_type: LogPathFileType,
source: CommitSource,
) -> ParsedLogPath {
let url = Url::parse(&format!("memory:///_delta_log/{version:020}.json")).unwrap();
let mut filename_path_segments = url.path_segments().unwrap();
let filename = filename_path_segments.next_back().unwrap().to_string();
let extension = filename.split('.').next_back().unwrap().to_string();
let size = match source {
CommitSource::Filesystem => FILESYSTEM_SIZE_MARKER,
CommitSource::Catalog => CATALOG_SIZE_MARKER,
};
let location = FileMeta {
location: url,
last_modified: 0,
size,
};
ParsedLogPath {
location,
filename,
extension,
version,
file_type,
}
}
fn assert_source(commit: &ParsedLogPath, expected_source: CommitSource) {
let expected_size = match expected_source {
CommitSource::Filesystem => FILESYSTEM_SIZE_MARKER,
CommitSource::Catalog => CATALOG_SIZE_MARKER,
};
assert_eq!(
commit.location.size, expected_size,
"Commit version {} should be from {:?}, but size was {}",
commit.version, expected_source, commit.location.size
);
}
struct CountingStorageHandler {
inner: Box<dyn StorageHandler>,
list_from_count: AtomicU32,
}
impl CountingStorageHandler {
fn new(inner: Box<dyn StorageHandler>) -> Self {
Self {
inner,
list_from_count: AtomicU32::new(0),
}
}
fn call_count(&self) -> u32 {
self.list_from_count.load(Ordering::Relaxed)
}
}
impl StorageHandler for CountingStorageHandler {
fn list_from(
&self,
path: &Url,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
self.list_from_count.fetch_add(1, Ordering::Relaxed);
self.inner.list_from(path)
}
fn read_files(
&self,
_files: Vec<crate::FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<bytes::Bytes>>>> {
panic!("read_files should not be called during listing");
}
fn put(&self, _path: &Url, _data: bytes::Bytes, _overwrite: bool) -> DeltaResult<()> {
panic!("put should not be called during listing");
}
fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> {
panic!("copy_atomic should not be called during listing");
}
fn head(&self, _path: &Url) -> DeltaResult<crate::FileMeta> {
panic!("head should not be called during listing");
}
}
#[allow(clippy::type_complexity)]
fn list_and_destructure(
storage: &dyn StorageHandler,
log_root: &Url,
log_tail: Vec<ParsedLogPath>,
start_version: Option<Version>,
end_version: Option<Version>,
) -> (
Vec<ParsedLogPath>,
Vec<ParsedLogPath>,
Vec<ParsedLogPath>,
Option<ParsedLogPath>,
Option<ParsedLogPath>,
Option<Version>,
) {
let r = LogSegmentFiles::list(storage, log_root, log_tail, start_version, end_version).unwrap();
(
r.ascending_commit_files,
r.ascending_compaction_files,
r.checkpoint_parts,
r.latest_crc_file,
r.latest_commit_file,
r.max_published_version,
)
}
#[tokio::test]
async fn test_empty_log_tail() {
let log_files = vec![
(0, LogPathFileType::Commit, CommitSource::Filesystem),
(1, LogPathFileType::Commit, CommitSource::Filesystem),
(2, LogPathFileType::Commit, CommitSource::Filesystem),
];
let (storage, log_root) = create_storage(log_files).await;
let (commits, _, _, _, latest_commit, max_pub) =
list_and_destructure(storage.as_ref(), &log_root, vec![], Some(1), Some(2));
assert_eq!(commits.len(), 2);
assert_eq!(commits[0].version, 1);
assert_eq!(commits[1].version, 2);
assert_source(&commits[0], CommitSource::Filesystem);
assert_source(&commits[1], CommitSource::Filesystem);
assert_eq!(latest_commit.unwrap().version, 2);
assert_eq!(max_pub, Some(2));
}
#[tokio::test]
async fn test_log_tail_has_latest_commit_files() {
let log_files = vec![
(0, LogPathFileType::Commit, CommitSource::Filesystem),
(1, LogPathFileType::Commit, CommitSource::Filesystem),
(2, LogPathFileType::Commit, CommitSource::Filesystem),
];
let (storage, log_root) = create_storage(log_files).await;
let log_tail = vec![
make_parsed_log_path_with_source(3, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(4, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(5, LogPathFileType::Commit, CommitSource::Catalog),
];
let (commits, _, _, _, latest_commit, max_pub) =
list_and_destructure(storage.as_ref(), &log_root, log_tail, Some(0), Some(5));
assert_eq!(commits.len(), 6);
for (i, commit) in commits.iter().enumerate().take(3) {
assert_eq!(commit.version, i as u64);
assert_source(commit, CommitSource::Filesystem);
}
for (i, commit) in commits.iter().enumerate().skip(3) {
assert_eq!(commit.version, i as u64);
assert_source(commit, CommitSource::Catalog);
}
assert_eq!(latest_commit.unwrap().version, 5);
assert_eq!(max_pub, Some(5));
}
#[tokio::test]
async fn test_request_subset_with_log_tail() {
let log_files = vec![
(0, LogPathFileType::Commit, CommitSource::Filesystem),
(1, LogPathFileType::Commit, CommitSource::Filesystem),
];
let (storage, log_root) = create_storage(log_files).await;
let log_tail = vec![
make_parsed_log_path_with_source(2, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(3, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(4, LogPathFileType::Commit, CommitSource::Catalog),
];
let (commits, _, _, _, latest_commit, max_pub) =
list_and_destructure(storage.as_ref(), &log_root, log_tail, Some(1), Some(3));
assert_eq!(commits.len(), 3);
assert_eq!(commits[0].version, 1);
assert_eq!(commits[1].version, 2);
assert_eq!(commits[2].version, 3);
assert_source(&commits[0], CommitSource::Filesystem);
assert_source(&commits[1], CommitSource::Catalog);
assert_source(&commits[2], CommitSource::Catalog);
assert_eq!(latest_commit.unwrap().version, 3);
assert_eq!(max_pub, Some(3));
}
#[tokio::test]
async fn test_log_tail_defines_latest_version() {
let log_files = vec![
(0, LogPathFileType::Commit, CommitSource::Filesystem),
(1, LogPathFileType::Commit, CommitSource::Filesystem),
(2, LogPathFileType::Commit, CommitSource::Filesystem), ];
let (storage, log_root) = create_storage(log_files).await;
let log_tail = vec![make_parsed_log_path_with_source(
1,
LogPathFileType::Commit,
CommitSource::Catalog,
)];
let (commits, _, _, _, latest_commit, max_pub) =
list_and_destructure(storage.as_ref(), &log_root, log_tail, Some(0), None);
assert_eq!(commits.len(), 2);
assert_eq!(commits[0].version, 0);
assert_eq!(commits[1].version, 1);
assert_source(&commits[0], CommitSource::Filesystem);
assert_source(&commits[1], CommitSource::Catalog);
assert_eq!(latest_commit.unwrap().version, 1);
assert_eq!(max_pub, Some(2));
}
#[test]
fn test_log_tail_covers_entire_range_empty_filesystem() {
struct EmptyStorageHandler;
impl StorageHandler for EmptyStorageHandler {
fn list_from(
&self,
_path: &Url,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
Ok(Box::new(std::iter::empty()))
}
fn read_files(
&self,
_files: Vec<crate::FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<bytes::Bytes>>>> {
panic!("read_files should not be called during listing");
}
fn put(&self, _path: &Url, _data: bytes::Bytes, _overwrite: bool) -> DeltaResult<()> {
panic!("put should not be called during listing");
}
fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> {
panic!("copy_atomic should not be called during listing");
}
fn head(&self, _path: &Url) -> DeltaResult<crate::FileMeta> {
panic!("head should not be called during listing");
}
}
let log_tail = vec![
make_parsed_log_path_with_source(0, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(1, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(2, LogPathFileType::StagedCommit, CommitSource::Catalog),
];
let storage = EmptyStorageHandler;
let url = Url::parse("memory:///anything/_delta_log/").unwrap();
let (commits, _, _, _, latest_commit, max_pub) =
list_and_destructure(&storage, &url, log_tail, Some(0), Some(2));
assert_eq!(commits.len(), 3);
assert_eq!(commits[0].version, 0);
assert_eq!(commits[1].version, 1);
assert_eq!(commits[2].version, 2);
assert_source(&commits[0], CommitSource::Catalog);
assert_source(&commits[1], CommitSource::Catalog);
assert_source(&commits[2], CommitSource::Catalog);
assert_eq!(latest_commit.unwrap().version, 2);
assert_eq!(max_pub, Some(1));
}
#[tokio::test]
async fn test_log_tail_covers_entire_range_with_crc() {
let log_files = vec![
(0, LogPathFileType::Commit, CommitSource::Filesystem),
(1, LogPathFileType::Commit, CommitSource::Filesystem),
(2, LogPathFileType::Crc, CommitSource::Filesystem),
];
let (storage, log_root) = create_storage(log_files).await;
let log_tail = vec![
make_parsed_log_path_with_source(0, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(1, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(2, LogPathFileType::StagedCommit, CommitSource::Catalog),
];
let (commits, _, _, latest_crc, latest_commit, max_pub) =
list_and_destructure(storage.as_ref(), &log_root, log_tail, Some(0), Some(2));
assert_eq!(commits.len(), 3);
assert_source(&commits[0], CommitSource::Catalog);
assert_source(&commits[1], CommitSource::Catalog);
assert_source(&commits[2], CommitSource::Catalog);
let crc = latest_crc.unwrap();
assert_eq!(crc.version, 2);
assert!(matches!(crc.file_type, LogPathFileType::Crc));
assert_eq!(latest_commit.unwrap().version, 2);
assert_eq!(max_pub, Some(1));
}
#[tokio::test]
async fn test_listing_omits_staged_commits() {
let log_files = vec![
(0, LogPathFileType::Commit, CommitSource::Filesystem),
(1, LogPathFileType::Commit, CommitSource::Filesystem), (1, LogPathFileType::StagedCommit, CommitSource::Filesystem),
(2, LogPathFileType::StagedCommit, CommitSource::Filesystem),
];
let (storage, log_root) = create_storage(log_files).await;
let (commits, _, _, _, latest_commit, max_pub) =
list_and_destructure(storage.as_ref(), &log_root, vec![], None, None);
assert_eq!(commits.len(), 2);
assert_eq!(commits[0].version, 0);
assert_eq!(commits[1].version, 1);
assert_source(&commits[0], CommitSource::Filesystem);
assert_source(&commits[1], CommitSource::Filesystem);
assert_eq!(latest_commit.unwrap().version, 1);
assert_eq!(max_pub, Some(1));
}
#[tokio::test]
async fn test_listing_with_large_end_version() {
let log_files = vec![
(0, LogPathFileType::Commit, CommitSource::Filesystem),
(1, LogPathFileType::Commit, CommitSource::Filesystem), (2, LogPathFileType::StagedCommit, CommitSource::Filesystem),
];
let (storage, log_root) = create_storage(log_files).await;
let (commits, _, _, _, latest_commit, max_pub) =
list_and_destructure(storage.as_ref(), &log_root, vec![], None, Some(3));
assert_eq!(commits.len(), 2);
assert_eq!(commits[0].version, 0);
assert_eq!(commits[1].version, 1);
assert_eq!(latest_commit.unwrap().version, 1);
assert_eq!(max_pub, Some(1));
}
#[tokio::test]
async fn test_non_commit_files_at_log_tail_versions_are_preserved() {
let log_files = vec![
(0, LogPathFileType::Commit, CommitSource::Filesystem),
(1, LogPathFileType::Commit, CommitSource::Filesystem),
(2, LogPathFileType::Commit, CommitSource::Filesystem),
(3, LogPathFileType::Commit, CommitSource::Filesystem),
(4, LogPathFileType::Commit, CommitSource::Filesystem),
(5, LogPathFileType::Commit, CommitSource::Filesystem),
(
7,
LogPathFileType::SinglePartCheckpoint,
CommitSource::Filesystem,
),
(8, LogPathFileType::Crc, CommitSource::Filesystem),
];
let (storage, log_root) = create_storage(log_files).await;
let log_tail = vec![
make_parsed_log_path_with_source(6, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(7, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(8, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(9, LogPathFileType::Commit, CommitSource::Catalog),
make_parsed_log_path_with_source(10, LogPathFileType::Commit, CommitSource::Catalog),
];
let (commits, _, checkpoint_parts, latest_crc, latest_commit, max_pub) =
list_and_destructure(storage.as_ref(), &log_root, log_tail, Some(0), Some(10));
assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 7);
assert!(checkpoint_parts[0].is_checkpoint());
let crc = latest_crc.unwrap();
assert_eq!(crc.version, 8);
assert!(matches!(crc.file_type, LogPathFileType::Crc));
assert_eq!(commits.len(), 5);
for (i, commit) in commits.iter().enumerate() {
assert_eq!(commit.version, (i + 6) as u64);
assert_source(commit, CommitSource::Catalog);
}
assert_eq!(latest_commit.unwrap().version, 10);
assert_eq!(max_pub, Some(10));
}
#[rstest]
#[case::no_checkpoint(None, 0..=1005, None, 2)]
#[case::checkpoint_beyond_end(Some(1006), 0..=1005, None, 2)]
#[case::checkpoint_at_end(Some(1005), 0..0, Some(1005), 1)]
#[case::checkpoint_in_second_window(Some(5), 6..=1005, Some(5), 2)]
#[case::checkpoint_in_first_window(Some(6), 7..=1005, Some(6), 1)]
#[tokio::test]
async fn backward_scan_single_checkpoint_cases(
#[case] checkpoint_version: Option<u64>,
#[case] expected_commits: impl Iterator<Item = u64>,
#[case] expected_checkpoint: Option<u64>,
#[case] expected_listings: u32,
) {
let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=1005)
.map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem))
.collect();
if let Some(cp) = checkpoint_version {
log_files.push((
cp,
LogPathFileType::SinglePartCheckpoint,
CommitSource::Filesystem,
));
}
let (storage, log_root) = create_storage(log_files).await;
let counter = CountingStorageHandler::new(storage);
let result =
LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 1005)
.unwrap();
assert_eq!(counter.call_count(), expected_listings);
assert_eq!(
result.checkpoint_parts.len(),
if expected_checkpoint.is_some() { 1 } else { 0 }
);
if let Some(cp_version) = expected_checkpoint {
assert_eq!(result.checkpoint_parts[0].version, cp_version);
}
assert!(result
.ascending_commit_files
.iter()
.map(|f| f.version)
.eq(expected_commits));
}
fn files_incomplete_in_second_window_complete_in_third_window(
) -> Vec<(Version, LogPathFileType, CommitSource)> {
let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=3000)
.map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem))
.collect();
log_files.push((
500,
LogPathFileType::SinglePartCheckpoint,
CommitSource::Filesystem,
));
log_files.push((
1500,
LogPathFileType::MultiPartCheckpoint {
part_num: 1,
num_parts: 2,
},
CommitSource::Filesystem,
));
log_files
}
fn multipart_checkpoint_files() -> Vec<(Version, LogPathFileType, CommitSource)> {
let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=52)
.map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem))
.collect();
log_files.extend([
(
50,
LogPathFileType::MultiPartCheckpoint {
part_num: 1,
num_parts: 3,
},
CommitSource::Filesystem,
),
(
50,
LogPathFileType::MultiPartCheckpoint {
part_num: 2,
num_parts: 3,
},
CommitSource::Filesystem,
),
(
50,
LogPathFileType::MultiPartCheckpoint {
part_num: 3,
num_parts: 3,
},
CommitSource::Filesystem,
),
]);
log_files
}
struct BackwardScanExpected {
listings: u32,
checkpoint_parts: usize,
checkpoint_version: Version,
commit_count: usize,
first_commit: Version,
last_commit: Version,
}
#[rstest]
#[case::multipart_checkpoint(
multipart_checkpoint_files(),
52,
BackwardScanExpected { listings: 1, checkpoint_parts: 3, checkpoint_version: 50, commit_count: 2, first_commit: 51, last_commit: 52 }
)]
#[case::incomplete_in_second_window_complete_in_third(
files_incomplete_in_second_window_complete_in_third_window(),
3000,
BackwardScanExpected { listings: 3, checkpoint_parts: 1, checkpoint_version: 500, commit_count: 2500, first_commit: 501, last_commit: 3000 }
)]
#[tokio::test]
async fn backward_scan_multipart_checkpoint_cases(
#[case] log_files: Vec<(Version, LogPathFileType, CommitSource)>,
#[case] end_version: Version,
#[case] expected: BackwardScanExpected,
) {
let BackwardScanExpected {
listings: expected_listings,
checkpoint_parts: expected_checkpoint_parts,
checkpoint_version: expected_checkpoint_version,
commit_count: expected_commit_count,
first_commit: expected_first_commit,
last_commit: expected_last_commit,
} = expected;
let (storage, log_root) = create_storage(log_files).await;
let counter = CountingStorageHandler::new(storage);
let result = LogSegmentFiles::list_with_backward_checkpoint_scan(
&counter,
&log_root,
vec![],
end_version,
)
.unwrap();
assert_eq!(counter.call_count(), expected_listings);
assert_eq!(result.checkpoint_parts.len(), expected_checkpoint_parts);
assert!(result
.checkpoint_parts
.iter()
.all(|p| p.version == expected_checkpoint_version));
assert_eq!(result.ascending_commit_files.len(), expected_commit_count);
assert_eq!(
result.ascending_commit_files.first().unwrap().version,
expected_first_commit
);
assert_eq!(
result.ascending_commit_files.last().unwrap().version,
expected_last_commit
);
assert_eq!(
result.latest_commit_file.unwrap().version,
expected_last_commit
);
}
#[tokio::test]
async fn backward_scan_with_log_tail_derives_lower_bound_from_checkpoint() {
let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=7)
.map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem))
.collect();
log_files.push((
5,
LogPathFileType::SinglePartCheckpoint,
CommitSource::Filesystem,
));
let (storage, log_root) = create_storage(log_files).await;
let log_tail: Vec<_> = (8u64..=10)
.map(|v| {
make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Catalog)
})
.collect();
let result = LogSegmentFiles::list_with_backward_checkpoint_scan(
storage.as_ref(),
&log_root,
log_tail,
10,
)
.unwrap();
assert_eq!(result.checkpoint_parts.len(), 1);
assert_eq!(result.checkpoint_parts[0].version, 5);
let expected = [
(6, CommitSource::Filesystem),
(7, CommitSource::Filesystem),
(8, CommitSource::Catalog),
(9, CommitSource::Catalog),
(10, CommitSource::Catalog),
];
assert_eq!(result.ascending_commit_files.len(), expected.len());
for (file, (version, source)) in result.ascending_commit_files.iter().zip(expected) {
assert_eq!(file.version, version);
assert_source(file, source);
}
assert_eq!(result.latest_commit_file.unwrap().version, 10);
}
#[tokio::test]
async fn backward_scan_with_log_tail_starting_before_checkpoint() {
let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=5)
.map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem))
.collect();
log_files.push((
5,
LogPathFileType::SinglePartCheckpoint,
CommitSource::Filesystem,
));
log_files.push((6, LogPathFileType::Crc, CommitSource::Filesystem));
let (storage, log_root) = create_storage(log_files).await;
let log_tail: Vec<_> = (3u64..=8)
.map(|v| {
make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Catalog)
})
.collect();
let result = LogSegmentFiles::list_with_backward_checkpoint_scan(
storage.as_ref(),
&log_root,
log_tail,
8,
)
.unwrap();
assert_eq!(result.checkpoint_parts.len(), 1);
assert_eq!(result.checkpoint_parts[0].version, 5);
let crc = result.latest_crc_file.unwrap();
assert_eq!(crc.version, 6);
assert!(matches!(crc.file_type, LogPathFileType::Crc));
assert_eq!(result.ascending_commit_files.len(), 4);
for (i, commit) in result.ascending_commit_files.iter().enumerate() {
assert_eq!(commit.version, (i + 5) as u64);
assert_source(commit, CommitSource::Catalog);
}
assert_eq!(result.latest_commit_file.unwrap().version, 8);
}
#[tokio::test]
async fn backward_scan_log_tail_defines_latest_version() {
let log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=5)
.map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem))
.collect();
let (storage, log_root) = create_storage(log_files).await;
let log_tail = vec![make_parsed_log_path_with_source(
4,
LogPathFileType::Commit,
CommitSource::Catalog,
)];
let result = LogSegmentFiles::list_with_backward_checkpoint_scan(
storage.as_ref(),
&log_root,
log_tail,
5,
)
.unwrap();
let expected = [
(0, CommitSource::Filesystem),
(1, CommitSource::Filesystem),
(2, CommitSource::Filesystem),
(3, CommitSource::Filesystem),
(4, CommitSource::Catalog),
];
assert_eq!(result.ascending_commit_files.len(), expected.len());
for (file, (version, source)) in result.ascending_commit_files.iter().zip(expected) {
assert_eq!(file.version, version);
assert_source(file, source);
}
assert_eq!(result.latest_commit_file.unwrap().version, 4);
assert_eq!(result.max_published_version, Some(5));
}
async fn create_storage_with_empty_files(
log_files: Vec<(Version, LogPathFileType, bool)>,
) -> (Box<dyn StorageHandler>, Url) {
let store = Arc::new(InMemory::new());
let log_root = Url::parse("memory:///_delta_log/").unwrap();
for (version, file_type, is_empty) in log_files {
let path = log_path_for_file_type(version, &file_type);
let data = if is_empty {
bytes::Bytes::new()
} else {
bytes::Bytes::from("placeholder")
};
store
.put(&ObjectPath::from(path.as_str()), data.into())
.await
.expect("Failed to put test file");
}
let executor = Arc::new(TokioBackgroundExecutor::new());
let storage = Box::new(ObjectStoreStorageHandler::new(store, executor, None));
(storage, log_root)
}
#[tokio::test]
async fn test_zero_byte_commit_kept_in_listing() {
let log_files = vec![
(0, LogPathFileType::Commit, false),
(1, LogPathFileType::Commit, false),
(2, LogPathFileType::Commit, true), ];
let (storage, log_root) = create_storage_with_empty_files(log_files).await;
let result =
LogSegmentFiles::list(storage.as_ref(), &log_root, vec![], Some(0), Some(2)).unwrap();
assert_eq!(result.ascending_commit_files.len(), 3);
assert_eq!(result.ascending_commit_files[0].version, 0);
assert_eq!(result.ascending_commit_files[1].version, 1);
assert_eq!(result.ascending_commit_files[2].version, 2);
assert_eq!(result.ascending_commit_files[2].location.size, 0);
}
#[rstest]
#[case::forward_list(false)]
#[case::backward_scan(true)]
#[tokio::test]
async fn test_zero_byte_compaction_skipped_commits_used(#[case] use_backward_scan: bool) {
let log_files = vec![
(0, LogPathFileType::Commit, false),
(1, LogPathFileType::Commit, false),
(2, LogPathFileType::Commit, false),
(3, LogPathFileType::Commit, false),
(4, LogPathFileType::Commit, false),
(
0,
LogPathFileType::CompactedCommit { hi: 4 },
true, ),
];
let (storage, log_root) = create_storage_with_empty_files(log_files).await;
let result = if use_backward_scan {
LogSegmentFiles::list_with_backward_checkpoint_scan(storage.as_ref(), &log_root, vec![], 4)
.unwrap()
} else {
LogSegmentFiles::list(storage.as_ref(), &log_root, vec![], Some(0), Some(4)).unwrap()
};
assert!(
result.ascending_compaction_files.is_empty(),
"0-byte compaction should have been skipped"
);
assert_eq!(result.ascending_commit_files.len(), 5);
for (i, commit) in result.ascending_commit_files.iter().enumerate() {
assert_eq!(commit.version, i as u64);
}
}
#[rstest]
#[case::forward_list(false)]
#[case::backward_scan(true)]
#[tokio::test]
async fn test_zero_byte_checkpoint_skipped_older_used(#[case] use_backward_scan: bool) {
let log_files = vec![
(0, LogPathFileType::Commit, false),
(1, LogPathFileType::Commit, false),
(2, LogPathFileType::Commit, false),
(3, LogPathFileType::Commit, false),
(4, LogPathFileType::Commit, false),
(5, LogPathFileType::Commit, false),
(6, LogPathFileType::Commit, false),
(7, LogPathFileType::Commit, false),
(8, LogPathFileType::Commit, false),
(9, LogPathFileType::Commit, false),
(10, LogPathFileType::Commit, false),
(5, LogPathFileType::SinglePartCheckpoint, false), (10, LogPathFileType::SinglePartCheckpoint, true), ];
let (storage, log_root) = create_storage_with_empty_files(log_files).await;
let result = if use_backward_scan {
LogSegmentFiles::list_with_backward_checkpoint_scan(storage.as_ref(), &log_root, vec![], 10)
.unwrap()
} else {
LogSegmentFiles::list(storage.as_ref(), &log_root, vec![], Some(0), Some(10)).unwrap()
};
assert_eq!(result.checkpoint_parts.len(), 1);
assert_eq!(result.checkpoint_parts[0].version, 5);
assert_eq!(result.ascending_commit_files.len(), 5);
assert_eq!(result.ascending_commit_files[0].version, 6);
assert_eq!(result.ascending_commit_files[4].version, 10);
}
#[tokio::test]
async fn test_zero_byte_crc_kept() {
let log_files = vec![
(0, LogPathFileType::Commit, false),
(1, LogPathFileType::Commit, false),
(2, LogPathFileType::Commit, false),
(1, LogPathFileType::Crc, false), (2, LogPathFileType::Crc, true), ];
let (storage, log_root) = create_storage_with_empty_files(log_files).await;
let result =
LogSegmentFiles::list(storage.as_ref(), &log_root, vec![], Some(0), Some(2)).unwrap();
let crc = result.latest_crc_file.unwrap();
assert_eq!(crc.version, 2);
}
#[tokio::test]
async fn test_zero_byte_checkpoint_backward_scan_crosses_windows() {
let mut log_files: Vec<(Version, LogPathFileType, bool)> = (0u64..=1005)
.map(|v| (v, LogPathFileType::Commit, false))
.collect();
log_files.push((5, LogPathFileType::SinglePartCheckpoint, false));
log_files.push((1005, LogPathFileType::SinglePartCheckpoint, true));
let (storage, log_root) = create_storage_with_empty_files(log_files).await;
let counter = CountingStorageHandler::new(storage);
let result =
LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 1005)
.unwrap();
assert_eq!(counter.call_count(), 2);
assert_eq!(result.checkpoint_parts.len(), 1);
assert_eq!(result.checkpoint_parts[0].version, 5);
assert_eq!(result.ascending_commit_files.len(), 1000);
assert_eq!(result.ascending_commit_files[0].version, 6);
assert_eq!(result.ascending_commit_files[999].version, 1005);
}
#[tokio::test]
async fn test_list_commits_zero_byte_commit_kept() {
let log_files = vec![
(0, LogPathFileType::Commit, false),
(1, LogPathFileType::Commit, false),
(2, LogPathFileType::Commit, true), ];
let (storage, log_root) = create_storage_with_empty_files(log_files).await;
let result =
LogSegmentFiles::list_commits(storage.as_ref(), &log_root, Some(0), Some(2)).unwrap();
assert_eq!(result.ascending_commit_files.len(), 3);
assert_eq!(result.ascending_commit_files[2].version, 2);
assert_eq!(result.ascending_commit_files[2].location.size, 0);
}
fn incomplete_then_complete_files() -> Vec<ParsedLogPath> {
let mut files: Vec<ParsedLogPath> = (0..=10)
.map(|v| {
make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Filesystem)
})
.collect();
files.push(make_parsed_log_path_with_source(
5,
LogPathFileType::MultiPartCheckpoint {
part_num: 1,
num_parts: 3,
},
CommitSource::Filesystem,
));
files.push(make_parsed_log_path_with_source(
10,
LogPathFileType::SinglePartCheckpoint,
CommitSource::Filesystem,
));
files
}
fn two_complete_checkpoints_files() -> Vec<ParsedLogPath> {
let mut files: Vec<ParsedLogPath> = (0..=10)
.map(|v| {
make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Filesystem)
})
.collect();
files.push(make_parsed_log_path_with_source(
5,
LogPathFileType::SinglePartCheckpoint,
CommitSource::Filesystem,
));
files.push(make_parsed_log_path_with_source(
10,
LogPathFileType::SinglePartCheckpoint,
CommitSource::Filesystem,
));
files
}
#[rstest]
#[case::no_checkpoint(
(0u64..=5).map(|v| make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Filesystem)).collect(),
None
)]
#[case::incomplete_then_complete(incomplete_then_complete_files(), Some(10))]
#[case::two_complete(two_complete_checkpoints_files(), Some(10))]
fn find_complete_checkpoint_version_cases(
#[case] files: Vec<ParsedLogPath>,
#[case] expected: Option<u64>,
) {
assert_eq!(find_complete_checkpoint_version(&files), expected);
}