use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::num::NonZero;
use error::{LogHistoryError, NearestTimestamp};
use itertools::Itertools;
use search::{binary_search_by_key_with_bounds, Bound, SearchError};
use tracing::{info, trace, warn};
use url::Url;
use crate::log_segment::LogSegment;
use crate::log_segment_files::{list_from_storage, should_process_log_file};
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::snapshot::Snapshot;
use crate::table_configuration::InCommitTimestampEnablement;
use crate::utils::require;
use crate::{DeltaResult, Engine, Error as DeltaError, Version};
pub(crate) mod search;
pub mod error;
pub type Timestamp = i64;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CommitAt {
pub version: Version,
pub timestamp: Timestamp,
}
impl CommitAt {
pub fn new(version: Version, timestamp: Timestamp) -> Self {
Self { version, timestamp }
}
}
#[derive(Debug, PartialEq, Eq)]
enum TimestampSearchBounds {
ExactMatch(Version),
ICTSearchStartingFrom(usize),
FileModificationSearch,
FileModificationSearchUntil {
index: usize,
ict_enablement_version: Version,
ict_enablement_timestamp: Timestamp,
},
}
#[tracing::instrument(skip(snapshot, log_segment), ret)]
fn get_timestamp_search_bounds(
snapshot: &Snapshot,
log_segment: &LogSegment,
timestamp: Timestamp,
) -> Result<TimestampSearchBounds, LogHistoryError> {
debug_assert!(log_segment.end_version == snapshot.version());
let table_config = snapshot.table_configuration();
let ict_enablement = table_config
.in_commit_timestamp_enablement()
.map_err(|e| LogHistoryError::internal("failed to read table configuration", e))?;
let (ict_enablement_version, ict_timestamp) = match ict_enablement {
InCommitTimestampEnablement::NotEnabled => {
return Ok(TimestampSearchBounds::FileModificationSearch)
}
InCommitTimestampEnablement::Enabled {
enablement: Some((v, t)),
} => (v, t),
InCommitTimestampEnablement::Enabled { enablement: None } => {
return Ok(TimestampSearchBounds::ICTSearchStartingFrom(0));
}
};
let first_version = log_segment.listed.ascending_commit_files[0].version;
if first_version >= ict_enablement_version {
return Ok(TimestampSearchBounds::ICTSearchStartingFrom(0));
}
let commit_count = log_segment.listed.ascending_commit_files.len();
let ict_enablement_idx = log_segment
.listed
.ascending_commit_files
.binary_search_by(|x| x.version.cmp(&ict_enablement_version))
.unwrap_or_else(|idx| idx);
require!(
ict_enablement_idx < commit_count,
LogHistoryError::internal_message(
"ICT enablement version is beyond the table's latest version"
)
);
let result = match timestamp.cmp(&ict_timestamp) {
Ordering::Equal => TimestampSearchBounds::ExactMatch(ict_enablement_version),
Ordering::Less => TimestampSearchBounds::FileModificationSearchUntil {
index: ict_enablement_idx,
ict_enablement_version,
ict_enablement_timestamp: ict_timestamp,
},
Ordering::Greater => TimestampSearchBounds::ICTSearchStartingFrom(ict_enablement_idx),
};
Ok(result)
}
fn linear_search_file_mod_timestamps(
commits: &[ParsedLogPath],
timestamp: Timestamp,
bound: Bound,
) -> Result<CommitAt, LogHistoryError> {
if commits.is_empty() {
return Err(LogHistoryError::out_of_range(
timestamp,
bound,
NearestTimestamp::Unknown,
));
}
let lo_version = commits[0].version;
let hi_version = commits[commits.len() - 1].version;
info!(lo_version, hi_version, "File modification linear search");
let mut result: Option<CommitAt> = None;
let mut prev_monotonic_ts = i64::MIN;
for commit in commits {
let raw_ts = commit.location.last_modified;
let monotonic_ts = raw_ts.max(prev_monotonic_ts.saturating_add(1));
if monotonic_ts != raw_ts {
trace!(
version = commit.version,
raw_ts,
monotonic_ts,
"Adjusted non-monotonic timestamp"
);
}
match bound {
Bound::GreatestLower if monotonic_ts <= timestamp => {
result = Some(CommitAt::new(commit.version, monotonic_ts))
}
Bound::GreatestLower => break,
Bound::LeastUpper if monotonic_ts >= timestamp => {
return Ok(CommitAt::new(commit.version, monotonic_ts))
}
Bound::LeastUpper => {}
}
prev_monotonic_ts = monotonic_ts;
}
result.ok_or_else(|| {
let nearest_ts = match bound {
Bound::GreatestLower => commits[0].location.last_modified,
Bound::LeastUpper => prev_monotonic_ts,
};
LogHistoryError::out_of_range(
timestamp,
bound,
NearestTimestamp::from_boundary(bound, nearest_ts),
)
})
}
fn binary_search_ict_timestamps(
commits: &[ParsedLogPath],
timestamp: Timestamp,
bound: Bound,
engine: &dyn Engine,
) -> Result<CommitAt, LogHistoryError> {
if commits.is_empty() {
return Err(LogHistoryError::out_of_range(
timestamp,
bound,
NearestTimestamp::Unknown,
));
}
let lo_version = commits[0].version;
let hi_version = commits[commits.len() - 1].version;
info!(
lo_version,
hi_version, "ICT binary search over version range"
);
let commit_to_ict = |commit: &ParsedLogPath| -> Result<Timestamp, LogHistoryError> {
commit
.read_in_commit_timestamp(engine)
.map_err(|e| LogHistoryError::internal("failed to read in-commit timestamp", e))
};
match binary_search_by_key_with_bounds(commits, timestamp, commit_to_ict, bound) {
Ok(idx) => {
let version = commits[idx].version;
let timestamp = commit_to_ict(&commits[idx])?;
Ok(CommitAt::new(version, timestamp))
}
Err(SearchError::KeyFunctionError(error)) => Err(error),
Err(SearchError::OutOfRange) => {
let nearest = bound
.boundary_of(commits)
.and_then(|boundary| {
commit_to_ict(boundary)
.inspect_err(|e| {
warn!(
error = ?e,
version = boundary.version,
"failed to read boundary ICT for nearest_timestamp hint",
);
})
.ok()
.map(|ts| NearestTimestamp::from_boundary(bound, ts))
})
.unwrap_or(NearestTimestamp::Unknown);
Err(LogHistoryError::out_of_range(timestamp, bound, nearest))
}
}
}
pub(crate) fn timestamp_to_version(
snapshot: &Snapshot,
engine: &dyn Engine,
timestamp: Timestamp,
bound: Bound,
resolved_commit_type: HistoryCommitType,
) -> Result<CommitAt, LogHistoryError> {
let snap_ts = snapshot
.get_timestamp(engine)
.map_err(|e| LogHistoryError::internal("failed to get snapshot timestamp", e))?;
match (timestamp.cmp(&snap_ts), bound) {
(Ordering::Equal, _) => return Ok(CommitAt::new(snapshot.version(), snap_ts)),
(Ordering::Greater, Bound::GreatestLower) => {
return Ok(CommitAt::new(snapshot.version(), snap_ts))
}
(Ordering::Greater, Bound::LeastUpper) => {
return Err(LogHistoryError::out_of_range(
timestamp,
bound,
NearestTimestamp::Latest(snap_ts),
));
}
_ => {}
}
let staged_log_tail: Vec<ParsedLogPath> = snapshot
.log_segment()
.listed
.staged_commits()
.cloned()
.collect();
let listing_limit = match resolved_commit_type {
HistoryCommitType::Published => None,
HistoryCommitType::Recreatable => {
let earliest = get_earliest_commit(
engine,
&snapshot.log_segment().log_root,
None,
HistoryCommitType::Recreatable,
)
.map_err(|e| match e {
DeltaError::LogHistory(inner) => *inner,
_ => LogHistoryError::internal("failed to get earliest commit", e),
})?;
let limit = snapshot
.version()
.checked_sub(earliest)
.and_then(|diff| usize::try_from(diff + 1).ok())
.and_then(NonZero::new)
.ok_or_else(|| {
LogHistoryError::internal(
"earliest recreatable version exceeds snapshot version",
DeltaError::generic(format!(
"earliest = {earliest}, snapshot version = {}",
snapshot.version()
)),
)
})?;
Some(limit)
}
};
let log_segment = LogSegment::for_timestamp_conversion(
engine.storage_handler().as_ref(),
snapshot.log_segment().log_root.clone(),
snapshot.version(),
listing_limit,
staged_log_tail,
)
.map_err(|e| {
LogHistoryError::internal("failed to build log segment for timestamp conversion", e)
})?;
debug_assert!(
!log_segment.listed.ascending_commit_files.is_empty(),
"LogSegment should ensure that a segment is non-empty"
);
debug_assert!(log_segment.end_version == snapshot.log_segment().end_version);
let commits = &log_segment.listed.ascending_commit_files;
let search_bounds = get_timestamp_search_bounds(snapshot, &log_segment, timestamp)?;
match search_bounds {
TimestampSearchBounds::ExactMatch(version) => Ok(CommitAt::new(version, timestamp)),
TimestampSearchBounds::ICTSearchStartingFrom(lo) => {
binary_search_ict_timestamps(&commits[lo..], timestamp, bound, engine)
}
TimestampSearchBounds::FileModificationSearch => {
linear_search_file_mod_timestamps(commits, timestamp, bound)
}
TimestampSearchBounds::FileModificationSearchUntil {
index,
ict_enablement_version,
ict_enablement_timestamp,
} => {
linear_search_file_mod_timestamps(&commits[..index], timestamp, bound).or_else(|e| {
match (&e, bound) {
(LogHistoryError::TimestampOutOfRange { .. }, Bound::LeastUpper) => Ok(
CommitAt::new(ict_enablement_version, ict_enablement_timestamp),
),
_ => Err(e),
}
})
}
}
}
#[tracing::instrument(skip(snapshot, engine), ret, fields(latest_version = snapshot.version(), table_root = %snapshot.table_root()))]
pub fn latest_version_as_of(
snapshot: &Snapshot,
engine: &dyn Engine,
timestamp: Timestamp,
resolved_commit_type: HistoryCommitType,
) -> DeltaResult<CommitAt> {
timestamp_to_version(
snapshot,
engine,
timestamp,
Bound::GreatestLower,
resolved_commit_type,
)
.map_err(Into::into)
}
#[tracing::instrument(skip(snapshot, engine), ret, fields(latest_version = snapshot.version(), table_root = %snapshot.table_root()))]
pub fn first_version_after(
snapshot: &Snapshot,
engine: &dyn Engine,
timestamp: Timestamp,
resolved_commit_type: HistoryCommitType,
) -> DeltaResult<CommitAt> {
timestamp_to_version(
snapshot,
engine,
timestamp,
Bound::LeastUpper,
resolved_commit_type,
)
.map_err(Into::into)
}
#[tracing::instrument(skip(snapshot, engine), ret, fields(latest_version = snapshot.version(), table_root = %snapshot.table_root()))]
pub fn timestamp_range_to_versions(
snapshot: &Snapshot,
engine: &dyn Engine,
start_timestamp: Timestamp,
end_timestamp: Option<Timestamp>,
) -> DeltaResult<(Version, Option<Version>)> {
if let Some(end_timestamp) = end_timestamp {
require!(
start_timestamp <= end_timestamp,
LogHistoryError::InvalidTimestampRange {
start_timestamp,
end_timestamp
}
.into()
);
}
let start_version = first_version_after(
snapshot,
engine,
start_timestamp,
HistoryCommitType::Published,
)?
.version;
let end_version = end_timestamp
.map(|end| {
let end_version =
latest_version_as_of(snapshot, engine, end, HistoryCommitType::Published)?.version;
require!(
start_version <= end_version,
DeltaError::from(LogHistoryError::EmptyTimestampRange {
start_timestamp,
end_timestamp: end,
between_version: end_version,
})
);
Ok(end_version)
})
.transpose()?;
Ok((start_version, end_version))
}
#[tracing::instrument(skip(engine), ret, err)]
fn get_earliest_published_commit_version(
engine: &dyn Engine,
log_root: &Url,
earliest_ratified_commit_version: Option<Version>,
) -> DeltaResult<Version> {
list_from_storage(engine.storage_handler().as_ref(), log_root, 0, Version::MAX)?
.filter_ok(|f| f.file_type == LogPathFileType::Commit)
.next()
.transpose()?
.map(|f| f.version)
.ok_or_else(|| {
if earliest_ratified_commit_version == Some(0) {
return DeltaError::generic(format!(
"expected a published v0 commit for catalog-managed table {log_root}, \
but the log listing returned no commits"
));
}
DeltaError::from(LogHistoryError::NoCommitsFound {
log_root: log_root.clone(),
})
})
}
#[tracing::instrument(skip(engine), err, ret)]
fn get_earliest_recreatable_commit(
engine: &dyn Engine,
log_root: &Url,
earliest_ratified_commit_version: Option<Version>,
) -> DeltaResult<Version> {
let mut last_complete_checkpoint: Option<Version> = None;
let mut multi_part_checkpoint_progress = HashMap::<(Version, u32), HashSet<u32>>::new();
let mut earliest_commit_version: Option<Version> = None;
let listing = list_from_storage(engine.storage_handler().as_ref(), log_root, 0, Version::MAX)?;
for parsed_result in listing {
let parsed_log_path = parsed_result?;
if !should_process_log_file(&parsed_log_path) {
continue;
}
match parsed_log_path.file_type {
LogPathFileType::Commit => {
if parsed_log_path.version == 0 {
return Ok(0);
}
let earliest_version =
*earliest_commit_version.get_or_insert(parsed_log_path.version);
if let Some(checkpoint_version) = last_complete_checkpoint {
if checkpoint_version >= earliest_version {
return Ok(checkpoint_version);
}
}
}
LogPathFileType::SinglePartCheckpoint | LogPathFileType::UuidCheckpoint => {
last_complete_checkpoint = Some(parsed_log_path.version);
}
LogPathFileType::MultiPartCheckpoint {
part_num,
num_parts,
} => {
let parts = multi_part_checkpoint_progress
.entry((parsed_log_path.version, num_parts))
.or_default();
parts.insert(part_num);
if parts.len() == num_parts as usize {
last_complete_checkpoint = Some(parsed_log_path.version);
}
}
LogPathFileType::StagedCommit
| LogPathFileType::CompactedCommit { .. }
| LogPathFileType::Crc
| LogPathFileType::Unknown => {}
}
}
if earliest_commit_version.is_some() {
return Err(DeltaError::from(LogHistoryError::NoRecreatableCommit {
log_root: log_root.clone(),
}));
}
if earliest_ratified_commit_version == Some(0) {
return Err(DeltaError::generic(format!(
"expected a published v0 commit for catalog-managed table {log_root}, \
but the log listing returned no commits"
)));
}
Err(DeltaError::from(LogHistoryError::NoCommitsFound {
log_root: log_root.clone(),
}))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HistoryCommitType {
Published,
Recreatable,
}
#[tracing::instrument(skip(engine), err, ret)]
pub fn get_earliest_commit(
engine: &dyn Engine,
log_root: &Url,
earliest_ratified_commit_version: Option<Version>,
commit_type: HistoryCommitType,
) -> DeltaResult<Version> {
match commit_type {
HistoryCommitType::Published => get_earliest_published_commit_version(
engine,
log_root,
earliest_ratified_commit_version,
),
HistoryCommitType::Recreatable => {
get_earliest_recreatable_commit(engine, log_root, earliest_ratified_commit_version)
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs::{remove_file, OpenOptions};
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use bytes::Bytes;
use test_utils::delta_path_for_version;
use url::Url;
use uuid::Uuid;
use super::*;
use crate::actions::{CommitInfo, Metadata, Protocol};
use crate::engine::sync::SyncEngine;
use crate::object_store::memory::InMemory;
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::table_features::TableFeature;
use crate::utils::test_utils::{Action, LocalMockTable};
use crate::Version;
fn get_test_schema() -> SchemaRef {
Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]))
}
fn set_mod_time(mock_table: &LocalMockTable, version: Version, timestamp: Timestamp) {
let file_name = delta_path_for_version(version, "json")
.filename()
.unwrap()
.to_string();
let path = mock_table.table_root().join("_delta_log/").join(file_name);
let file = OpenOptions::new().write(true).open(path).unwrap();
let time = SystemTime::UNIX_EPOCH + Duration::from_millis(timestamp.try_into().unwrap());
file.set_modified(time).unwrap();
}
async fn mock_table_with_timestamps(
timestamps: &[(Timestamp, Option<Timestamp>)],
ict_enablement_version: Option<Version>,
) -> LocalMockTable {
let mut mock_table = LocalMockTable::new();
for (version, (file_mod_ts, ict_ts)) in timestamps.iter().enumerate() {
let version = version as Version;
let is_first = version == 0;
let is_ict_enablement = ict_enablement_version == Some(version);
let ict_from_creation = ict_enablement_version == Some(0) && is_first;
let mut actions: Vec<Action> = vec![];
if ict_ts.is_some() {
actions.push(Action::CommitInfo(CommitInfo {
in_commit_timestamp: *ict_ts,
..Default::default()
}));
}
if is_first || is_ict_enablement {
let config = if ict_from_creation {
HashMap::from_iter([(
"delta.enableInCommitTimestamps".to_string(),
"true".to_string(),
)])
} else if is_ict_enablement {
HashMap::from_iter([
(
"delta.enableInCommitTimestamps".to_string(),
"true".to_string(),
),
(
"delta.inCommitTimestampEnablementVersion".to_string(),
version.to_string(),
),
(
"delta.inCommitTimestampEnablementTimestamp".to_string(),
ict_ts.unwrap().to_string(),
),
])
} else {
HashMap::new()
};
actions.push(Action::Metadata(
Metadata::try_new(None, None, get_test_schema(), vec![], 0, config).unwrap(),
));
}
if is_first {
let writer_features: Option<Vec<TableFeature>> = if ict_enablement_version.is_some()
{
Some(vec![TableFeature::InCommitTimestamp])
} else {
Some(vec![])
};
actions.push(Action::Protocol(
Protocol::try_new(3, 7, Some(Vec::<String>::new()), writer_features).unwrap(),
));
}
if actions.is_empty() {
actions.push(Action::CommitInfo(CommitInfo::default()));
}
mock_table.commit(actions).await;
set_mod_time(&mock_table, version, *file_mod_ts);
}
mock_table
}
async fn build_test_snapshot(
timestamps: &[(Timestamp, Option<Timestamp>)],
ict_enablement: Option<Version>,
at_version: Option<Version>,
) -> (LocalMockTable, SyncEngine, Arc<Snapshot>, LogSegment) {
let table = mock_table_with_timestamps(timestamps, ict_enablement).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let mut builder = Snapshot::builder_for(path);
if let Some(v) = at_version {
builder = builder.at_version(v);
}
let snapshot = builder.build(&engine).unwrap();
let log_segment = LogSegment::for_timestamp_conversion(
engine.storage_handler().as_ref(),
snapshot.log_segment().log_root.clone(),
snapshot.version(),
None,
vec![],
)
.unwrap();
(table, engine, snapshot, log_segment)
}
#[rstest::rstest]
#[case::before_all(0)]
#[case::within_range(100)]
#[case::after_all(1000)]
#[tokio::test]
async fn test_search_bounds_no_ict(#[case] timestamp: Timestamp) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (_table, _engine, snapshot, log_segment) =
build_test_snapshot(×tamps, Some(3), Some(2)).await;
let res = get_timestamp_search_bounds(&snapshot, &log_segment, timestamp).unwrap();
assert!(
matches!(res, TimestampSearchBounds::FileModificationSearch),
"{res:?}"
);
}
#[rstest::rstest]
#[case::file_mod_region(50, TimestampSearchBounds::FileModificationSearchUntil { index: 3, ict_enablement_version: 3, ict_enablement_timestamp: 300 })]
#[case::file_mod_between(60, TimestampSearchBounds::FileModificationSearchUntil { index: 3, ict_enablement_version: 3, ict_enablement_timestamp: 300 })]
#[case::file_mod_last(299, TimestampSearchBounds::FileModificationSearchUntil { index: 3, ict_enablement_version: 3, ict_enablement_timestamp: 300 })]
#[case::exact_enablement(300, TimestampSearchBounds::ExactMatch(3))]
#[case::ict_after_enablement(301, TimestampSearchBounds::ICTSearchStartingFrom(3))]
#[case::ict_far_future(1000, TimestampSearchBounds::ICTSearchStartingFrom(3))]
#[tokio::test]
async fn test_search_bounds_with_ict(
#[case] timestamp: Timestamp,
#[case] expected: TimestampSearchBounds,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (_table, _engine, snapshot, log_segment) =
build_test_snapshot(×tamps, Some(3), None).await;
let res = get_timestamp_search_bounds(&snapshot, &log_segment, timestamp).unwrap();
assert_eq!(res, expected);
}
#[rstest::rstest]
#[case::before_all(50)]
#[case::at_v0(100)]
#[case::between_v0_v1(150)]
#[case::after_all(1000)]
#[tokio::test]
async fn test_search_bounds_ict_from_creation(#[case] timestamp: Timestamp) {
let timestamps = [(0, Some(100)), (0, Some(200)), (0, Some(300))];
let (_table, _engine, snapshot, log_segment) =
build_test_snapshot(×tamps, Some(0), None).await;
let res = get_timestamp_search_bounds(&snapshot, &log_segment, timestamp).unwrap();
assert!(
matches!(res, TimestampSearchBounds::ICTSearchStartingFrom(0)),
"{res:?}"
);
}
#[rstest::rstest]
#[case::before_all(50)]
#[case::at_ict_enablement(300)]
#[case::after_ict_enablement(350)]
#[case::after_all(1000)]
#[tokio::test]
async fn test_search_bounds_fast_path_pre_ict_commits_cleaned_up(#[case] timestamp: Timestamp) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (table, engine, snapshot, _log_segment) =
build_test_snapshot(×tamps, Some(3), None).await;
let log_dir = table.table_root().join("_delta_log");
for version in 0..3 {
let file_name = format!("{:020}.json", version);
std::fs::remove_file(log_dir.join(&file_name)).unwrap();
}
let cleaned_log_segment = LogSegment::for_timestamp_conversion(
engine.storage_handler().as_ref(),
snapshot.log_segment().log_root.clone(),
snapshot.version(),
None,
vec![],
)
.unwrap();
assert_eq!(
cleaned_log_segment.listed.ascending_commit_files[0].version,
3
);
let res = get_timestamp_search_bounds(&snapshot, &cleaned_log_segment, timestamp).unwrap();
assert!(
matches!(res, TimestampSearchBounds::ICTSearchStartingFrom(0)),
"Expected fast path ICTSearchStartingFrom(0), got {res:?}"
);
}
#[tokio::test]
async fn test_reading_commit_timestamps() {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (_table, engine, _snapshot, log_segment) =
build_test_snapshot(×tamps, Some(3), None).await;
let commits = &log_segment.listed.ascending_commit_files;
assert_eq!(commits[0].location.last_modified, 50);
assert_eq!(commits[1].location.last_modified, 150);
assert_eq!(commits[2].location.last_modified, 250);
let res = commits[0].read_in_commit_timestamp(&engine);
assert!(res.is_err(), "Expected error for file without ICT: {res:?}");
let mut fake_log_path = commits[0].clone();
let failing_path = if cfg!(windows) {
"C:\\phony\\path"
} else {
"/phony/path"
};
fake_log_path.location.location = Url::from_file_path(failing_path).unwrap();
let res = fake_log_path.read_in_commit_timestamp(&engine);
assert!(
res.is_err(),
"Expected error for non-existent file: {res:?}"
);
assert_eq!(commits[3].read_in_commit_timestamp(&engine).unwrap(), 300);
assert_eq!(commits[4].read_in_commit_timestamp(&engine).unwrap(), 400);
}
#[rstest::rstest]
#[case::before_all_lub(0, Bound::LeastUpper, Some(CommitAt::new(0, 50)))]
#[case::before_all_glb(0, Bound::GreatestLower, None)]
#[case::negative_lub(-1, Bound::LeastUpper, Some(CommitAt::new(0, 50)))]
#[case::after_all_lub(1000, Bound::LeastUpper, None)]
#[case::after_all_glb(1000, Bound::GreatestLower, Some(CommitAt::new(4, 400)))]
#[case::exact_v0_lub(50, Bound::LeastUpper, Some(CommitAt::new(0, 50)))]
#[case::exact_v0_glb(50, Bound::GreatestLower, Some(CommitAt::new(0, 50)))]
#[case::exact_v1_lub(150, Bound::LeastUpper, Some(CommitAt::new(1, 150)))]
#[case::exact_v1_glb(150, Bound::GreatestLower, Some(CommitAt::new(1, 150)))]
#[case::exact_v3_ict_lub(300, Bound::LeastUpper, Some(CommitAt::new(3, 300)))]
#[case::exact_v3_ict_glb(300, Bound::GreatestLower, Some(CommitAt::new(3, 300)))]
#[case::between_v0_v1_lub(100, Bound::LeastUpper, Some(CommitAt::new(1, 150)))]
#[case::between_v0_v1_glb(100, Bound::GreatestLower, Some(CommitAt::new(0, 50)))]
#[case::just_after_last_file_mod_lub(251, Bound::LeastUpper, Some(CommitAt::new(3, 300)))]
#[case::just_after_last_file_mod_glb(251, Bound::GreatestLower, Some(CommitAt::new(2, 250)))]
#[case::just_before_ict_lub(299, Bound::LeastUpper, Some(CommitAt::new(3, 300)))]
#[case::just_before_ict_glb(299, Bound::GreatestLower, Some(CommitAt::new(2, 250)))]
#[case::just_after_ict_glb(301, Bound::GreatestLower, Some(CommitAt::new(3, 300)))]
#[case::just_after_ict_lub(301, Bound::LeastUpper, Some(CommitAt::new(4, 400)))]
#[case::between_ict_commits(350, Bound::GreatestLower, Some(CommitAt::new(3, 300)))]
#[case::between_ict_commits_lub(350, Bound::LeastUpper, Some(CommitAt::new(4, 400)))]
#[tokio::test]
async fn test_timestamp_to_version_standard_table(
#[case] timestamp: Timestamp,
#[case] bound: Bound,
#[case] expected: Option<CommitAt>,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (_table, engine, snapshot, _log_segment) =
build_test_snapshot(×tamps, Some(3), None).await;
let res = timestamp_to_version(
&snapshot,
&engine,
timestamp,
bound,
HistoryCommitType::Published,
);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(
matches!(res, Err(LogHistoryError::TimestampOutOfRange { .. })),
"{res:?}"
),
}
}
#[rstest::rstest]
#[case::exact_v0_glb(100, Bound::GreatestLower, Some(CommitAt::new(0, 100)))]
#[case::exact_v0_lub(100, Bound::LeastUpper, Some(CommitAt::new(0, 100)))]
#[case::between_v0_v1_glb(150, Bound::GreatestLower, Some(CommitAt::new(0, 100)))]
#[case::between_v0_v1_lub(150, Bound::LeastUpper, Some(CommitAt::new(1, 200)))]
#[case::between_v1_v2_glb(250, Bound::GreatestLower, Some(CommitAt::new(1, 200)))]
#[case::between_v1_v2_lub(250, Bound::LeastUpper, Some(CommitAt::new(2, 300)))]
#[case::before_all_glb(50, Bound::GreatestLower, None)]
#[case::before_all_lub(50, Bound::LeastUpper, Some(CommitAt::new(0, 100)))]
#[tokio::test]
async fn test_ict_from_creation(
#[case] timestamp: Timestamp,
#[case] bound: Bound,
#[case] expected: Option<CommitAt>,
) {
let mock_table =
mock_table_with_timestamps(&[(0, Some(100)), (0, Some(200)), (0, Some(300))], Some(0))
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(mock_table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_to_version(
&snapshot,
&engine,
timestamp,
bound,
HistoryCommitType::Published,
);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(
matches!(res, Err(LogHistoryError::TimestampOutOfRange { .. })),
"{res:?}"
),
}
}
#[rstest::rstest]
#[case::exact_glb(100, Bound::GreatestLower, Some(CommitAt::new(0, 100)))]
#[case::exact_lub(100, Bound::LeastUpper, Some(CommitAt::new(0, 100)))]
#[case::before_glb(50, Bound::GreatestLower, None)]
#[case::before_lub(50, Bound::LeastUpper, Some(CommitAt::new(0, 100)))]
#[case::after_glb(150, Bound::GreatestLower, Some(CommitAt::new(0, 100)))]
#[case::after_lub(150, Bound::LeastUpper, None)]
#[tokio::test]
async fn test_single_commit(
#[case] timestamp: Timestamp,
#[case] bound: Bound,
#[case] expected: Option<CommitAt>,
) {
let mock_table = mock_table_with_timestamps(&[(100, None)], None).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(mock_table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_to_version(
&snapshot,
&engine,
timestamp,
bound,
HistoryCommitType::Published,
);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(
matches!(res, Err(LogHistoryError::TimestampOutOfRange { .. })),
"{res:?}"
),
}
}
#[rstest::rstest]
#[case::exact_v0_glb(100, Bound::GreatestLower, Some(CommitAt::new(0, 100)))]
#[case::exact_v1_glb(200, Bound::GreatestLower, Some(CommitAt::new(1, 200)))]
#[case::monotonized_v2_glb(201, Bound::GreatestLower, Some(CommitAt::new(2, 201)))]
#[case::monotonized_v3_glb(202, Bound::GreatestLower, Some(CommitAt::new(3, 202)))]
#[case::between_v3_v4_glb(250, Bound::GreatestLower, Some(CommitAt::new(3, 202)))]
#[case::exact_v4_glb(300, Bound::GreatestLower, Some(CommitAt::new(4, 300)))]
#[case::raw_v2_maps_to_v1_lub(150, Bound::LeastUpper, Some(CommitAt::new(1, 200)))]
#[case::monotonized_v2_lub(201, Bound::LeastUpper, Some(CommitAt::new(2, 201)))]
#[case::after_v3_monotonized_lub(203, Bound::LeastUpper, Some(CommitAt::new(4, 300)))]
#[tokio::test]
async fn test_non_monotonic_timestamps(
#[case] timestamp: Timestamp,
#[case] bound: Bound,
#[case] expected: Option<CommitAt>,
) {
let mock_table = mock_table_with_timestamps(
&[
(100, None),
(200, None),
(150, None),
(180, None),
(300, None),
],
None,
)
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(mock_table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_to_version(
&snapshot,
&engine,
timestamp,
bound,
HistoryCommitType::Published,
);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(
matches!(res, Err(LogHistoryError::TimestampOutOfRange { .. })),
"{res:?}"
),
}
}
#[rstest::rstest]
#[case::linear_glb_below_earliest(
&[(50, None), (150, None), (250, None), (350, Some(300)), (450, Some(400))],
Some(3),
0,
Bound::GreatestLower,
NearestTimestamp::Earliest(50),
)]
#[case::short_circuit_lub_after_snapshot(
&[(50, None), (150, None), (250, None), (350, Some(300)), (450, Some(400))],
Some(3),
1000,
Bound::LeastUpper,
NearestTimestamp::Latest(400),
)]
#[case::ict_glb_below_earliest(
&[(0, Some(100)), (0, Some(200)), (0, Some(300))],
Some(0),
50,
Bound::GreatestLower,
NearestTimestamp::Earliest(100),
)]
#[tokio::test]
async fn test_timestamp_out_of_range_nearest_timestamp(
#[case] timestamps: &[(Timestamp, Option<Timestamp>)],
#[case] ict_enablement: Option<Version>,
#[case] timestamp: Timestamp,
#[case] bound: Bound,
#[case] expected_nearest: NearestTimestamp,
) {
let (_table, engine, snapshot, _log_segment) =
build_test_snapshot(timestamps, ict_enablement, None).await;
let err = timestamp_to_version(
&snapshot,
&engine,
timestamp,
bound,
HistoryCommitType::Published,
)
.unwrap_err();
let LogHistoryError::TimestampOutOfRange {
nearest_timestamp, ..
} = err
else {
panic!("expected TimestampOutOfRange, got {err:?}");
};
assert_eq!(nearest_timestamp, expected_nearest);
}
#[tokio::test]
async fn test_missing_ict_after_enablement() {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)), (450, None), ];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_to_version(
&snapshot,
&engine,
350,
Bound::LeastUpper,
HistoryCommitType::Published,
);
assert!(
matches!(res, Err(LogHistoryError::Internal { .. })),
"Expected internal error for missing ICT: {res:?}"
);
}
#[rstest::rstest]
#[case::before_all(0, None)]
#[case::exact_v0(50, Some(CommitAt::new(0, 50)))]
#[case::between_v0_v1(100, Some(CommitAt::new(0, 50)))]
#[case::exact_v1(150, Some(CommitAt::new(1, 150)))]
#[case::just_after_last_file_mod(251, Some(CommitAt::new(2, 250)))]
#[case::just_before_ict(299, Some(CommitAt::new(2, 250)))]
#[case::exact_ict_enablement(300, Some(CommitAt::new(3, 300)))]
#[case::just_after_ict(301, Some(CommitAt::new(3, 300)))]
#[case::between_ict_commits(350, Some(CommitAt::new(3, 300)))]
#[case::after_all(1000, Some(CommitAt::new(4, 400)))]
#[tokio::test]
async fn test_latest_version_as_of(
#[case] timestamp: Timestamp,
#[case] expected: Option<CommitAt>,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = latest_version_as_of(&snapshot, &engine, timestamp, HistoryCommitType::Published);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::before_all(0, Some(CommitAt::new(0, 50)))]
#[case::exact_v0(50, Some(CommitAt::new(0, 50)))]
#[case::between_v0_v1(100, Some(CommitAt::new(1, 150)))]
#[case::exact_v1(150, Some(CommitAt::new(1, 150)))]
#[case::just_after_last_file_mod(251, Some(CommitAt::new(3, 300)))]
#[case::just_before_ict(299, Some(CommitAt::new(3, 300)))]
#[case::exact_ict_enablement(300, Some(CommitAt::new(3, 300)))]
#[case::just_after_ict(301, Some(CommitAt::new(4, 400)))]
#[case::between_ict_commits(350, Some(CommitAt::new(4, 400)))]
#[case::after_all(1000, None)]
#[tokio::test]
async fn test_first_version_after(
#[case] timestamp: Timestamp,
#[case] expected: Option<CommitAt>,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = first_version_after(&snapshot, &engine, timestamp, HistoryCommitType::Published);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::basic_range(50, Some(300), Ok((0, Some(3))))]
#[case::no_end(100, None, Ok((1, None)))]
#[case::start_equals_end(150, Some(150), Ok((1, Some(1))))]
#[case::spanning_ict_boundary(100, Some(350), Ok((1, Some(3))))]
#[case::entire_table(0, Some(1000), Ok((0, Some(4))))]
#[case::exact_v0_to_v1(50, Some(150), Ok((0, Some(1))))]
#[case::exact_v1_to_v2(150, Some(250), Ok((1, Some(2))))]
#[case::exact_v3_ict_to_v4(300, Some(400), Ok((3, Some(4))))]
#[case::between_v0_v1_to_exact_v2(100, Some(250), Ok((1, Some(2))))]
#[case::just_after_last_file_mod_no_end(251, None, Ok((3, None)))]
#[case::just_before_ict_to_just_after(299, Some(301), Ok((3, Some(3))))]
#[case::exact_ict_to_after_all(300, Some(1000), Ok((3, Some(4))))]
#[case::between_ict_commits(350, Some(400), Ok((4, Some(4))))]
#[case::just_after_ict_to_end(301, Some(1000), Ok((4, Some(4))))]
#[case::end_before_all_fails(0, Some(40), Err(()))]
#[case::start_after_all_fails(500, Some(1000), Err(()))]
#[tokio::test]
async fn test_timestamp_range_to_versions(
#[case] start: Timestamp,
#[case] end: Option<Timestamp>,
#[case] expected: Result<(Version, Option<Version>), ()>,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_range_to_versions(&snapshot, &engine, start, end);
match expected {
Ok(v) => assert_eq!(res.unwrap(), v),
Err(()) => assert!(res.is_err(), "{res:?}"),
}
}
#[tokio::test]
async fn test_timestamp_range_invalid_range() {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_range_to_versions(&snapshot, &engine, 300, Some(100));
assert!(
matches!(
res,
Err(crate::Error::LogHistory(ref e))
if matches!(**e, LogHistoryError::InvalidTimestampRange { .. })
),
"{res:?}"
);
}
#[tokio::test]
async fn test_timestamp_range_empty_range() {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_range_to_versions(&snapshot, &engine, 51, Some(149));
assert!(
matches!(
res,
Err(crate::Error::LogHistory(ref e))
if matches!(**e, LogHistoryError::EmptyTimestampRange { between_version: 0, .. })
),
"{res:?}"
);
}
#[rstest::rstest]
#[case::exact_match(200, Some(CommitAt::new(1, 200)))]
#[case::between_commits(150, Some(CommitAt::new(0, 100)))]
#[case::after_all(500, Some(CommitAt::new(2, 300)))]
#[case::before_all(50, None)]
#[tokio::test]
async fn test_latest_version_as_of_file_mod_only(
#[case] timestamp: Timestamp,
#[case] expected: Option<CommitAt>,
) {
let table =
mock_table_with_timestamps(&[(100, None), (200, None), (300, None)], None).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = latest_version_as_of(&snapshot, &engine, timestamp, HistoryCommitType::Published);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::exact_match(200, Some(CommitAt::new(1, 200)))]
#[case::between_commits(150, Some(CommitAt::new(1, 200)))]
#[case::before_all(50, Some(CommitAt::new(0, 100)))]
#[case::after_all(500, None)]
#[tokio::test]
async fn test_first_version_after_file_mod_only(
#[case] timestamp: Timestamp,
#[case] expected: Option<CommitAt>,
) {
let table =
mock_table_with_timestamps(&[(100, None), (200, None), (300, None)], None).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = first_version_after(&snapshot, &engine, timestamp, HistoryCommitType::Published);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::exact_match(200, Some(CommitAt::new(1, 200)))]
#[case::between_commits(150, Some(CommitAt::new(0, 100)))]
#[case::after_all(500, Some(CommitAt::new(2, 300)))]
#[case::before_all(50, None)]
#[tokio::test]
async fn test_latest_version_as_of_ict_from_creation(
#[case] timestamp: Timestamp,
#[case] expected: Option<CommitAt>,
) {
let table =
mock_table_with_timestamps(&[(0, Some(100)), (0, Some(200)), (0, Some(300))], Some(0))
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = latest_version_as_of(&snapshot, &engine, timestamp, HistoryCommitType::Published);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::exact_match(200, Some(CommitAt::new(1, 200)))]
#[case::between_commits(150, Some(CommitAt::new(1, 200)))]
#[case::before_all(50, Some(CommitAt::new(0, 100)))]
#[case::after_all(500, None)]
#[tokio::test]
async fn test_first_version_after_ict_from_creation(
#[case] timestamp: Timestamp,
#[case] expected: Option<CommitAt>,
) {
let table =
mock_table_with_timestamps(&[(0, Some(100)), (0, Some(200)), (0, Some(300))], Some(0))
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = first_version_after(&snapshot, &engine, timestamp, HistoryCommitType::Published);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::exact_v1(200, Some(CommitAt::new(1, 200)))]
#[case::monotonized_v2(201, Some(CommitAt::new(2, 201)))]
#[case::monotonized_v3(202, Some(CommitAt::new(3, 202)))]
#[case::between_v3_v4(250, Some(CommitAt::new(3, 202)))]
#[case::raw_v2_between_v0_v1(150, Some(CommitAt::new(0, 100)))]
#[tokio::test]
async fn test_latest_version_as_of_non_monotonic(
#[case] timestamp: Timestamp,
#[case] expected: Option<CommitAt>,
) {
let table = mock_table_with_timestamps(
&[
(100, None),
(200, None),
(150, None), (180, None), (300, None),
],
None,
)
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = latest_version_as_of(&snapshot, &engine, timestamp, HistoryCommitType::Published);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::exact_v1(200, Some(CommitAt::new(1, 200)))]
#[case::monotonized_v2(201, Some(CommitAt::new(2, 201)))]
#[case::monotonized_v3(202, Some(CommitAt::new(3, 202)))]
#[case::after_v3_monotonized(203, Some(CommitAt::new(4, 300)))]
#[case::raw_v2_maps_to_v1(150, Some(CommitAt::new(1, 200)))]
#[tokio::test]
async fn test_first_version_after_non_monotonic(
#[case] timestamp: Timestamp,
#[case] expected: Option<CommitAt>,
) {
let table = mock_table_with_timestamps(
&[
(100, None),
(200, None),
(150, None), (180, None), (300, None),
],
None,
)
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = first_version_after(&snapshot, &engine, timestamp, HistoryCommitType::Published);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(res.is_err(), "{res:?}"),
}
}
async fn build_mock_table_with_log_cleanup(
timestamps: &[Timestamp],
checkpoint_version: Version,
log_cleanup_until_version: Version,
) -> (LocalMockTable, SyncEngine, Arc<Snapshot>) {
let timestamps: Vec<_> = timestamps.iter().map(|&ts| (ts, None)).collect();
let table = mock_table_with_timestamps(×tamps, None).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let log_dir = table.table_root().join("_delta_log");
std::fs::write(
log_dir.join(format!("{checkpoint_version:020}.checkpoint.parquet")),
b"x",
)
.unwrap();
for version in 0..=log_cleanup_until_version {
remove_file(log_dir.join(format!("{version:020}.json"))).unwrap();
}
(table, engine, snapshot)
}
#[rstest::rstest]
#[case::published_glb_orphan_v1(
HistoryCommitType::Published,
15,
Bound::GreatestLower,
Some(CommitAt::new(1, 10))
)]
#[case::recreatable_glb_orphan_v1(
HistoryCommitType::Recreatable,
15,
Bound::GreatestLower,
None
)]
#[case::published_glb_orphan_v2(
HistoryCommitType::Published,
25,
Bound::GreatestLower,
Some(CommitAt::new(2, 20))
)]
#[case::recreatable_glb_orphan_v2(
HistoryCommitType::Recreatable,
25,
Bound::GreatestLower,
None
)]
#[case::published_lub_orphan(
HistoryCommitType::Published,
15,
Bound::LeastUpper,
Some(CommitAt::new(2, 20))
)]
#[case::recreatable_lub_anchor(
HistoryCommitType::Recreatable,
15,
Bound::LeastUpper,
Some(CommitAt::new(3, 30))
)]
#[tokio::test]
async fn test_timestamp_to_version_with_commit_type(
#[case] commit_type: HistoryCommitType,
#[case] timestamp: Timestamp,
#[case] bound: Bound,
#[case] expected: Option<CommitAt>,
) {
let (_table, engine, snapshot) =
build_mock_table_with_log_cleanup(&[5, 10, 20, 30, 40, 50], 3, 0).await;
let res = timestamp_to_version(&snapshot, &engine, timestamp, bound, commit_type);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(
matches!(res, Err(LogHistoryError::TimestampOutOfRange { .. })),
"{res:?}"
),
}
}
#[rstest::rstest]
#[tokio::test]
async fn test_timestamp_to_version_recreatable_matches_published_when_v0_present(
#[values(0, 50, 100, 250, 300, 350, 1000)] timestamp: Timestamp,
#[values(Bound::GreatestLower, Bound::LeastUpper)] bound: Bound,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (_table, engine, snapshot, _log_segment) =
build_test_snapshot(×tamps, Some(3), None).await;
let published = timestamp_to_version(
&snapshot,
&engine,
timestamp,
bound,
HistoryCommitType::Published,
);
let recreatable = timestamp_to_version(
&snapshot,
&engine,
timestamp,
bound,
HistoryCommitType::Recreatable,
);
match (published, recreatable) {
(Ok(p), Ok(r)) => assert_eq!(p, r, "diverged at ts={timestamp} bound={bound:?}"),
(Err(_), Err(_)) => {}
(p, r) => {
panic!("Published and Recreatable diverged at ts={timestamp} bound={bound:?}: {p:?} vs {r:?}")
}
}
}
#[rstest::rstest]
#[case::published_sees_orphan(HistoryCommitType::Published, 25, Some(CommitAt::new(2, 20)))]
#[case::recreatable_excludes_orphan(HistoryCommitType::Recreatable, 25, None)]
#[tokio::test]
async fn test_latest_version_as_of_with_commit_type(
#[case] commit_type: HistoryCommitType,
#[case] timestamp: Timestamp,
#[case] expected: Option<CommitAt>,
) {
let (_table, engine, snapshot) =
build_mock_table_with_log_cleanup(&[5, 10, 20, 30, 40, 50], 3, 0).await;
let res = latest_version_as_of(&snapshot, &engine, timestamp, commit_type);
match expected {
Some(commit) => assert_eq!(res.unwrap(), commit),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::published_sees_orphan(HistoryCommitType::Published, 15, CommitAt::new(2, 20))]
#[case::recreatable_skips_to_anchor(HistoryCommitType::Recreatable, 15, CommitAt::new(3, 30))]
#[tokio::test]
async fn test_first_version_after_with_commit_type(
#[case] commit_type: HistoryCommitType,
#[case] timestamp: Timestamp,
#[case] expected: CommitAt,
) {
let (_table, engine, snapshot) =
build_mock_table_with_log_cleanup(&[5, 10, 20, 30, 40, 50], 3, 0).await;
let res = first_version_after(&snapshot, &engine, timestamp, commit_type).unwrap();
assert_eq!(res, expected);
}
#[rstest::rstest]
#[case::basic_range(100, Some(200), Ok((0, Some(1))))]
#[case::entire_table(50, Some(500), Ok((0, Some(2))))]
#[case::no_end(150, None, Ok((1, None)))]
#[case::exact_v0_to_v2(100, Some(300), Ok((0, Some(2))))]
#[case::between_commits(150, Some(250), Ok((1, Some(1))))]
#[case::start_equals_end_exact(200, Some(200), Ok((1, Some(1))))]
#[case::end_before_all_fails(50, Some(80), Err(()))]
#[case::start_after_all_fails(400, Some(500), Err(()))]
#[tokio::test]
async fn test_timestamp_range_file_mod_only(
#[case] start: Timestamp,
#[case] end: Option<Timestamp>,
#[case] expected: Result<(Version, Option<Version>), ()>,
) {
let table =
mock_table_with_timestamps(&[(100, None), (200, None), (300, None)], None).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_range_to_versions(&snapshot, &engine, start, end);
match expected {
Ok(v) => assert_eq!(res.unwrap(), v),
Err(()) => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::basic_range(100, Some(200), Ok((0, Some(1))))]
#[case::entire_table(50, Some(500), Ok((0, Some(2))))]
#[case::no_end(150, None, Ok((1, None)))]
#[case::exact_v0_to_v2(100, Some(300), Ok((0, Some(2))))]
#[case::between_commits(150, Some(250), Ok((1, Some(1))))]
#[case::start_equals_end_exact(200, Some(200), Ok((1, Some(1))))]
#[case::end_before_all_fails(50, Some(80), Err(()))]
#[case::start_after_all_fails(400, Some(500), Err(()))]
#[tokio::test]
async fn test_timestamp_range_ict_from_creation(
#[case] start: Timestamp,
#[case] end: Option<Timestamp>,
#[case] expected: Result<(Version, Option<Version>), ()>,
) {
let table =
mock_table_with_timestamps(&[(0, Some(100)), (0, Some(200)), (0, Some(300))], Some(0))
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_range_to_versions(&snapshot, &engine, start, end);
match expected {
Ok(v) => assert_eq!(res.unwrap(), v),
Err(()) => assert!(res.is_err(), "{res:?}"),
}
}
fn engine_with_log_files(paths: &[String], empty_paths: &HashSet<String>) -> (SyncEngine, Url) {
let engine = SyncEngine::new_with_store(Arc::new(InMemory::new()));
let storage = engine.storage_handler();
for path in paths {
let bytes = if empty_paths.contains(path) {
Bytes::new()
} else {
Bytes::from_static(b"x")
};
let url = Url::parse(&format!("memory:///{path}")).unwrap();
storage.put(&url, bytes, false).expect("put log file");
}
(engine, Url::parse("memory:///_delta_log/").unwrap())
}
fn commit_path(version_range: RangeInclusive<Version>) -> Vec<String> {
version_range
.map(|v| delta_path_for_version(v, "json").to_string())
.collect()
}
fn single_part_checkpoint_path(v: Version) -> String {
format!("_delta_log/{v:020}.checkpoint.parquet")
}
fn uuid_checkpoint_path(v: Version) -> String {
format!("_delta_log/{v:020}.checkpoint.{}.parquet", Uuid::new_v4())
}
fn multi_part_checkpoint_paths(v: Version, num_parts: u32) -> Vec<String> {
(1..=num_parts)
.map(|part| format!("_delta_log/{v:020}.checkpoint.{part:010}.{num_parts:010}.parquet"))
.collect()
}
fn truncated_log(
checkpoint_paths: Vec<String>,
commit_range: RangeInclusive<Version>,
) -> Vec<String> {
let mut paths: Vec<String> = checkpoint_paths;
paths.extend(commit_path(commit_range));
paths
}
#[rstest::rstest]
#[case::v0_exists(commit_path(0..=5), None, Expected::Version(0))]
#[case::truncated_single_part(truncated_log(vec![single_part_checkpoint_path(5)], 5..=9), None, Expected::Version(5))]
#[case::truncated_uuid(truncated_log(vec![uuid_checkpoint_path(5)], 5..=9), None, Expected::Version(5))]
#[case::truncated_multi_part(truncated_log(multi_part_checkpoint_paths(5, 3), 5..=9), None, Expected::Version(5))]
#[case::ratified_zero_with_v0_present(commit_path(0..=3), Some(0), Expected::Version(0))]
#[case::non_zero_ratified_falls_through_to_scan(commit_path(0..=3), Some(4), Expected::Version(0))]
#[case::commit_cleanup_before_checkpoint(
{
let mut p = commit_path(2..=4);
p.push(uuid_checkpoint_path(4));
p
},
None,
Expected::Version(4)
)]
#[case::picks_first_anchored_checkpoint(
{
let mut p = vec![single_part_checkpoint_path(3)];
p.extend(multi_part_checkpoint_paths(7, 2));
p.extend(commit_path(7..=8));
p
},
None,
Expected::Version(7),
)]
#[case::listing_with_multiple_full_checkpoints(
{
let mut p = vec![single_part_checkpoint_path(3)];
p.extend(commit_path(3..=7));
p.push(single_part_checkpoint_path(7));
p
},
None,
Expected::Version(3),
)]
#[case::listing_with_cleanup_checkpoints(
{
let mut p = multi_part_checkpoint_paths(3, 3);
p.remove(0);
p.extend(commit_path(3..=7));
p.push(single_part_checkpoint_path(7));
p
},
None,
Expected::Version(7),
)]
#[case::listing_with_v0_and_full_checkpoint(
{
let mut p = commit_path(0..=5);
p.push(single_part_checkpoint_path(5));
p
},
None,
Expected::Version(0),
)]
#[case::checkpoint_before_smallest_commit_anchors(
{
let mut p = vec![single_part_checkpoint_path(5)];
p.extend(commit_path(6..=8));
p
},
None,
Expected::NoRecreatableCommit,
)]
#[case::empty_log(vec![], None, Expected::NoCommitsFound)]
#[case::crc_only(vec![format!("_delta_log/{:020}.crc", 5)], None, Expected::NoCommitsFound)]
#[case::compacted_only(vec![format!("_delta_log/{:020}.{:020}.compacted.json", 0, 5)], None, Expected::NoCommitsFound)]
#[case::checkpoint_only(vec![single_part_checkpoint_path(5)], None, Expected::NoCommitsFound)]
#[case::non_zero_ratified_on_empty_log(vec![], Some(5), Expected::NoCommitsFound)]
#[case::ratified_zero_on_empty_log(vec![], Some(0), Expected::CCv2MissingV0FilesystemCommit)]
#[case::commits_have_no_anchor(commit_path(2..=3), None, Expected::NoRecreatableCommit)]
#[case::ratified_zero_commits_no_anchor(commit_path(2..=3), Some(0), Expected::NoRecreatableCommit)]
#[case::staged_commits_and_last_checkpoint_ignored(
{
let mut p = truncated_log(vec![single_part_checkpoint_path(5)], 5..=9);
p.push("_delta_log/_last_checkpoint".to_string());
p.push(format!("_delta_log/_staged_commits/{:020}.{}.json", 9, Uuid::new_v4()));
p
},
None,
Expected::Version(5)
)]
fn earliest_recreatable_returns_expected(
#[case] paths: Vec<String>,
#[case] ratified: Option<Version>,
#[case] expected: Expected,
) {
let (engine, log_root) = engine_with_log_files(&paths, &HashSet::new());
let res = get_earliest_recreatable_commit(&engine, &log_root, ratified);
match expected {
Expected::Version(v) => assert_eq!(res.unwrap(), v),
Expected::NoCommitsFound => assert!(
matches!(&res, Err(DeltaError::LogHistory(e)) if matches!(**e, LogHistoryError::NoCommitsFound { .. })),
"expected NoCommitsFound error, got {res:?}"
),
Expected::NoRecreatableCommit => assert!(
matches!(&res, Err(DeltaError::LogHistory(e)) if matches!(**e, LogHistoryError::NoRecreatableCommit { .. })),
"expected NoRecreatableCommit error, got {res:?}"
),
Expected::CCv2MissingV0FilesystemCommit => {
assert!(matches!(res, Err(DeltaError::Generic(_))), "{res:?}")
}
}
}
enum Expected {
Version(Version),
NoCommitsFound,
NoRecreatableCommit,
CCv2MissingV0FilesystemCommit,
}
#[test]
fn test_empty_checkpoint_is_skipped() {
let mut paths = vec![single_part_checkpoint_path(5)];
paths.extend(commit_path(5..=9));
paths.push(single_part_checkpoint_path(8));
let mut empty_paths = HashSet::new();
empty_paths.insert(single_part_checkpoint_path(5));
let (engine, log_root) = engine_with_log_files(&paths, &empty_paths);
let version = get_earliest_recreatable_commit(&engine, &log_root, None).unwrap();
assert_eq!(version, 8);
}
#[tokio::test]
#[rstest::rstest]
#[case::no_ratified_commit(3, None, None, Expected::Version(0))]
#[case::ratified_commit_version_0(3, Some(0), None, Expected::Version(0))]
#[case::ratified_commit_version_greater_than_0(3, Some(2), None, Expected::Version(0))]
#[case::log_listing_empty_no_ratified_commit(0, None, None, Expected::NoCommitsFound)]
#[case::log_listing_empty_ratified_commit(0, Some(2), None, Expected::NoCommitsFound)]
#[case::log_listing_empty_ratified_commit_v0(
0,
Some(0),
None,
Expected::CCv2MissingV0FilesystemCommit
)]
async fn test_get_earliest_published_commit_version(
#[case] num_commits: usize,
#[case] earliest_ratified_commit_version: Option<Version>,
#[case] last_cleanup_version: Option<Version>,
#[case] expected: Expected,
) {
let timestamps = (0..num_commits)
.map(|i| (i as i64, None))
.collect::<Vec<_>>();
let table = mock_table_with_timestamps(×tamps, None).await;
let engine = SyncEngine::new();
let log_root = Url::from_directory_path(table.table_root().join("_delta_log")).unwrap();
if let Some(last_cleanup_version) = last_cleanup_version {
for version in 0..=last_cleanup_version {
let filename = format!("{version:020}.json");
remove_file(log_root.to_file_path().unwrap().join(filename)).unwrap();
}
}
let res = get_earliest_published_commit_version(
&engine,
&log_root,
earliest_ratified_commit_version,
);
match expected {
Expected::Version(v) => assert_eq!(res.unwrap(), v),
Expected::NoCommitsFound => assert!(
matches!(res, Err(DeltaError::LogHistory(ref e)) if matches!(**e, LogHistoryError::NoCommitsFound { .. })),
"{res:?}"
),
Expected::CCv2MissingV0FilesystemCommit => {
assert!(matches!(res, Err(DeltaError::Generic(_))), "{res:?}")
}
Expected::NoRecreatableCommit => {
unreachable!(
"get_earliest_published_commit_version never returns NoRecreatableCommit"
)
}
}
}
#[tokio::test]
async fn test_get_earliest_published_commit_version_ignores_staged_commits() {
let timestamps = vec![(0, None)];
let table = mock_table_with_timestamps(×tamps, None).await;
let engine = SyncEngine::new();
let log_dir = table.table_root().join("_delta_log");
let log_root = Url::from_directory_path(&log_dir).unwrap();
let staged_dir = log_dir.join("_staged_commits");
std::fs::create_dir_all(&staged_dir).unwrap();
let staged_file =
staged_dir.join("00000000000000000001.01234567-89ab-cdef-0123-456789abcdef.json");
std::fs::write(&staged_file, "{}").unwrap();
let res = get_earliest_published_commit_version(&engine, &log_root, None);
assert_eq!(res.unwrap(), 0);
}
}