use std::cmp::Ordering;
use error::LogHistoryError;
use search::{binary_search_by_key_with_bounds, Bound, SearchError};
use tracing::{info, trace};
use crate::log_segment::LogSegment;
use crate::path::ParsedLogPath;
use crate::snapshot::Snapshot;
use crate::table_configuration::InCommitTimestampEnablement;
use crate::utils::require;
use crate::{DeltaResult, Engine, Error as DeltaError, Version};
pub(crate) mod search;
pub mod error;
pub type Timestamp = i64;
#[derive(Debug, PartialEq, Eq)]
enum TimestampSearchBounds {
ExactMatch(Version),
ICTSearchStartingFrom(usize),
FileModificationSearch,
FileModificationSearchUntil {
index: usize,
ict_enablement_version: Version,
},
}
#[tracing::instrument(skip(snapshot, log_segment), ret)]
fn get_timestamp_search_bounds(
snapshot: &Snapshot,
log_segment: &LogSegment,
timestamp: Timestamp,
) -> Result<TimestampSearchBounds, LogHistoryError> {
debug_assert!(log_segment.end_version == snapshot.version());
let table_config = snapshot.table_configuration();
let ict_enablement = table_config
.in_commit_timestamp_enablement()
.map_err(|e| LogHistoryError::internal("failed to read table configuration", e))?;
let (ict_enablement_version, ict_timestamp) = match ict_enablement {
InCommitTimestampEnablement::NotEnabled => {
return Ok(TimestampSearchBounds::FileModificationSearch)
}
InCommitTimestampEnablement::Enabled {
enablement: Some((v, t)),
} => (v, t),
InCommitTimestampEnablement::Enabled { enablement: None } => {
return Ok(TimestampSearchBounds::ICTSearchStartingFrom(0));
}
};
let first_version = log_segment.listed.ascending_commit_files[0].version;
if first_version >= ict_enablement_version {
return Ok(TimestampSearchBounds::ICTSearchStartingFrom(0));
}
let commit_count = log_segment.listed.ascending_commit_files.len();
let ict_enablement_idx = log_segment
.listed
.ascending_commit_files
.binary_search_by(|x| x.version.cmp(&ict_enablement_version))
.unwrap_or_else(|idx| idx);
require!(
ict_enablement_idx < commit_count,
LogHistoryError::internal_message(
"ICT enablement version is beyond the table's latest version"
)
);
let result = match timestamp.cmp(&ict_timestamp) {
Ordering::Equal => TimestampSearchBounds::ExactMatch(ict_enablement_version),
Ordering::Less => TimestampSearchBounds::FileModificationSearchUntil {
index: ict_enablement_idx,
ict_enablement_version,
},
Ordering::Greater => TimestampSearchBounds::ICTSearchStartingFrom(ict_enablement_idx),
};
Ok(result)
}
fn linear_search_file_mod_timestamps(
commits: &[ParsedLogPath],
timestamp: Timestamp,
bound: Bound,
) -> Result<Version, LogHistoryError> {
if commits.is_empty() {
return Err(LogHistoryError::TimestampOutOfRange {
timestamp,
reason: bound.out_of_range_reason(),
});
}
let lo_version = commits[0].version;
let hi_version = commits[commits.len() - 1].version;
info!(lo_version, hi_version, "File modification linear search");
let mut result: Option<Version> = None;
let mut prev_monotonic_ts = i64::MIN;
for commit in commits {
let raw_ts = commit.location.last_modified;
let monotonic_ts = raw_ts.max(prev_monotonic_ts.saturating_add(1));
if monotonic_ts != raw_ts {
trace!(
version = commit.version,
raw_ts,
monotonic_ts,
"Adjusted non-monotonic timestamp"
);
}
match bound {
Bound::GreatestLower if monotonic_ts <= timestamp => result = Some(commit.version),
Bound::GreatestLower => break,
Bound::LeastUpper if monotonic_ts >= timestamp => return Ok(commit.version),
Bound::LeastUpper => {}
}
prev_monotonic_ts = monotonic_ts;
}
result.ok_or(LogHistoryError::TimestampOutOfRange {
timestamp,
reason: bound.out_of_range_reason(),
})
}
fn binary_search_ict_timestamps(
commits: &[ParsedLogPath],
timestamp: Timestamp,
bound: Bound,
engine: &dyn Engine,
) -> Result<Version, LogHistoryError> {
if commits.is_empty() {
return Err(LogHistoryError::TimestampOutOfRange {
timestamp,
reason: bound.out_of_range_reason(),
});
}
let lo_version = commits[0].version;
let hi_version = commits[commits.len() - 1].version;
info!(
lo_version,
hi_version, "ICT binary search over version range"
);
let commit_to_ict = |commit: &ParsedLogPath| -> Result<Timestamp, LogHistoryError> {
commit
.read_in_commit_timestamp(engine)
.map_err(|e| LogHistoryError::internal("failed to read in-commit timestamp", e))
};
match binary_search_by_key_with_bounds(commits, timestamp, commit_to_ict, bound) {
Ok(idx) => Ok(commits[idx].version),
Err(SearchError::KeyFunctionError(error)) => Err(error),
Err(SearchError::OutOfRange) => Err(LogHistoryError::TimestampOutOfRange {
timestamp,
reason: bound.out_of_range_reason(),
}),
}
}
pub(crate) fn timestamp_to_version(
snapshot: &Snapshot,
engine: &dyn Engine,
timestamp: Timestamp,
bound: Bound,
) -> Result<Version, LogHistoryError> {
let snap_ts = snapshot
.get_timestamp(engine)
.map_err(|e| LogHistoryError::internal("failed to get snapshot timestamp", e))?;
match (timestamp.cmp(&snap_ts), bound) {
(Ordering::Equal, _) => return Ok(snapshot.version()),
(Ordering::Greater, Bound::GreatestLower) => return Ok(snapshot.version()),
(Ordering::Greater, Bound::LeastUpper) => {
return Err(LogHistoryError::TimestampOutOfRange {
timestamp,
reason: bound.out_of_range_reason(),
});
}
_ => {}
}
let has_staged_commits = snapshot
.log_segment()
.listed
.ascending_commit_files
.last() .is_some_and(|c| c.file_type == crate::path::LogPathFileType::StagedCommit);
if has_staged_commits {
return Err(LogHistoryError::internal_message(
"timestamp conversion not yet supported for snapshots with staged commits",
));
}
let log_segment = LogSegment::for_timestamp_conversion(
engine.storage_handler().as_ref(),
snapshot.log_segment().log_root.clone(),
snapshot.version(),
None,
)
.map_err(|e| {
LogHistoryError::internal("failed to build log segment for timestamp conversion", e)
})?;
debug_assert!(
!log_segment.listed.ascending_commit_files.is_empty(),
"LogSegment should ensure that a segment is non-empty"
);
debug_assert!(log_segment.end_version == snapshot.log_segment().end_version);
let commits = &log_segment.listed.ascending_commit_files;
let search_bounds = get_timestamp_search_bounds(snapshot, &log_segment, timestamp)?;
match search_bounds {
TimestampSearchBounds::ExactMatch(version) => Ok(version),
TimestampSearchBounds::ICTSearchStartingFrom(lo) => {
binary_search_ict_timestamps(&commits[lo..], timestamp, bound, engine)
}
TimestampSearchBounds::FileModificationSearch => {
linear_search_file_mod_timestamps(commits, timestamp, bound)
}
TimestampSearchBounds::FileModificationSearchUntil {
index,
ict_enablement_version,
} => {
linear_search_file_mod_timestamps(&commits[..index], timestamp, bound).or_else(|e| {
match (&e, bound) {
(LogHistoryError::TimestampOutOfRange { .. }, Bound::LeastUpper) => {
Ok(ict_enablement_version)
}
_ => Err(e),
}
})
}
}
}
#[tracing::instrument(skip(snapshot, engine), ret, fields(latest_version = snapshot.version(), table_root = %snapshot.table_root()))]
pub fn latest_version_as_of(
snapshot: &Snapshot,
engine: &dyn Engine,
timestamp: Timestamp,
) -> DeltaResult<Version> {
timestamp_to_version(snapshot, engine, timestamp, Bound::GreatestLower).map_err(Into::into)
}
#[tracing::instrument(skip(snapshot, engine), ret, fields(latest_version = snapshot.version(), table_root = %snapshot.table_root()))]
pub fn first_version_after(
snapshot: &Snapshot,
engine: &dyn Engine,
timestamp: Timestamp,
) -> DeltaResult<Version> {
timestamp_to_version(snapshot, engine, timestamp, Bound::LeastUpper).map_err(Into::into)
}
#[tracing::instrument(skip(snapshot, engine), ret, fields(latest_version = snapshot.version(), table_root = %snapshot.table_root()))]
pub fn timestamp_range_to_versions(
snapshot: &Snapshot,
engine: &dyn Engine,
start_timestamp: Timestamp,
end_timestamp: Option<Timestamp>,
) -> DeltaResult<(Version, Option<Version>)> {
if let Some(end_timestamp) = end_timestamp {
require!(
start_timestamp <= end_timestamp,
LogHistoryError::InvalidTimestampRange {
start_timestamp,
end_timestamp
}
.into()
);
}
let start_version = first_version_after(snapshot, engine, start_timestamp)?;
let end_version = end_timestamp
.map(|end| {
let end_version = latest_version_as_of(snapshot, engine, end)?;
require!(
start_version <= end_version,
DeltaError::from(LogHistoryError::EmptyTimestampRange {
start_timestamp,
end_timestamp: end,
between_version: end_version,
})
);
Ok(end_version)
})
.transpose()?;
Ok((start_version, end_version))
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use test_utils::delta_path_for_version;
use url::Url;
use super::*;
use crate::actions::{CommitInfo, Metadata, Protocol};
use crate::engine::sync::SyncEngine;
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::table_features::TableFeature;
use crate::utils::test_utils::{Action, LocalMockTable};
use crate::Version;
fn get_test_schema() -> SchemaRef {
Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]))
}
fn set_mod_time(mock_table: &LocalMockTable, version: Version, timestamp: Timestamp) {
let file_name = delta_path_for_version(version, "json")
.filename()
.unwrap()
.to_string();
let path = mock_table.table_root().join("_delta_log/").join(file_name);
let file = OpenOptions::new().write(true).open(path).unwrap();
let time = SystemTime::UNIX_EPOCH + Duration::from_millis(timestamp.try_into().unwrap());
file.set_modified(time).unwrap();
}
async fn mock_table_with_timestamps(
timestamps: &[(Timestamp, Option<Timestamp>)],
ict_enablement_version: Option<Version>,
) -> LocalMockTable {
let mut mock_table = LocalMockTable::new();
for (version, (file_mod_ts, ict_ts)) in timestamps.iter().enumerate() {
let version = version as Version;
let is_first = version == 0;
let is_ict_enablement = ict_enablement_version == Some(version);
let ict_from_creation = ict_enablement_version == Some(0) && is_first;
let mut actions: Vec<Action> = vec![];
if ict_ts.is_some() {
actions.push(Action::CommitInfo(CommitInfo {
in_commit_timestamp: *ict_ts,
..Default::default()
}));
}
if is_first || is_ict_enablement {
let config = if ict_from_creation {
HashMap::from_iter([(
"delta.enableInCommitTimestamps".to_string(),
"true".to_string(),
)])
} else if is_ict_enablement {
HashMap::from_iter([
(
"delta.enableInCommitTimestamps".to_string(),
"true".to_string(),
),
(
"delta.inCommitTimestampEnablementVersion".to_string(),
version.to_string(),
),
(
"delta.inCommitTimestampEnablementTimestamp".to_string(),
ict_ts.unwrap().to_string(),
),
])
} else {
HashMap::new()
};
actions.push(Action::Metadata(
Metadata::try_new(None, None, get_test_schema(), vec![], 0, config).unwrap(),
));
}
if is_first {
let writer_features: Option<Vec<TableFeature>> = if ict_enablement_version.is_some()
{
Some(vec![TableFeature::InCommitTimestamp])
} else {
Some(vec![])
};
actions.push(Action::Protocol(
Protocol::try_new(3, 7, Some(Vec::<String>::new()), writer_features).unwrap(),
));
}
if actions.is_empty() {
actions.push(Action::CommitInfo(CommitInfo::default()));
}
mock_table.commit(actions).await;
set_mod_time(&mock_table, version, *file_mod_ts);
}
mock_table
}
async fn build_test_snapshot(
timestamps: &[(Timestamp, Option<Timestamp>)],
ict_enablement: Option<Version>,
at_version: Option<Version>,
) -> (LocalMockTable, SyncEngine, Arc<Snapshot>, LogSegment) {
let table = mock_table_with_timestamps(timestamps, ict_enablement).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let mut builder = Snapshot::builder_for(path);
if let Some(v) = at_version {
builder = builder.at_version(v);
}
let snapshot = builder.build(&engine).unwrap();
let log_segment = LogSegment::for_timestamp_conversion(
engine.storage_handler().as_ref(),
snapshot.log_segment().log_root.clone(),
snapshot.version(),
None,
)
.unwrap();
(table, engine, snapshot, log_segment)
}
#[rstest::rstest]
#[case::before_all(0)]
#[case::within_range(100)]
#[case::after_all(1000)]
#[tokio::test]
async fn test_search_bounds_no_ict(#[case] timestamp: Timestamp) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (_table, _engine, snapshot, log_segment) =
build_test_snapshot(×tamps, Some(3), Some(2)).await;
let res = get_timestamp_search_bounds(&snapshot, &log_segment, timestamp).unwrap();
assert!(
matches!(res, TimestampSearchBounds::FileModificationSearch),
"{res:?}"
);
}
#[rstest::rstest]
#[case::file_mod_region(50, TimestampSearchBounds::FileModificationSearchUntil { index: 3, ict_enablement_version: 3 })]
#[case::file_mod_between(60, TimestampSearchBounds::FileModificationSearchUntil { index: 3, ict_enablement_version: 3 })]
#[case::file_mod_last(299, TimestampSearchBounds::FileModificationSearchUntil { index: 3, ict_enablement_version: 3 })]
#[case::exact_enablement(300, TimestampSearchBounds::ExactMatch(3))]
#[case::ict_after_enablement(301, TimestampSearchBounds::ICTSearchStartingFrom(3))]
#[case::ict_far_future(1000, TimestampSearchBounds::ICTSearchStartingFrom(3))]
#[tokio::test]
async fn test_search_bounds_with_ict(
#[case] timestamp: Timestamp,
#[case] expected: TimestampSearchBounds,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (_table, _engine, snapshot, log_segment) =
build_test_snapshot(×tamps, Some(3), None).await;
let res = get_timestamp_search_bounds(&snapshot, &log_segment, timestamp).unwrap();
assert_eq!(res, expected);
}
#[rstest::rstest]
#[case::before_all(50)]
#[case::at_v0(100)]
#[case::between_v0_v1(150)]
#[case::after_all(1000)]
#[tokio::test]
async fn test_search_bounds_ict_from_creation(#[case] timestamp: Timestamp) {
let timestamps = [(0, Some(100)), (0, Some(200)), (0, Some(300))];
let (_table, _engine, snapshot, log_segment) =
build_test_snapshot(×tamps, Some(0), None).await;
let res = get_timestamp_search_bounds(&snapshot, &log_segment, timestamp).unwrap();
assert!(
matches!(res, TimestampSearchBounds::ICTSearchStartingFrom(0)),
"{res:?}"
);
}
#[rstest::rstest]
#[case::before_all(50)]
#[case::at_ict_enablement(300)]
#[case::after_ict_enablement(350)]
#[case::after_all(1000)]
#[tokio::test]
async fn test_search_bounds_fast_path_pre_ict_commits_cleaned_up(#[case] timestamp: Timestamp) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (table, engine, snapshot, _log_segment) =
build_test_snapshot(×tamps, Some(3), None).await;
let log_dir = table.table_root().join("_delta_log");
for version in 0..3 {
let file_name = format!("{:020}.json", version);
std::fs::remove_file(log_dir.join(&file_name)).unwrap();
}
let cleaned_log_segment = LogSegment::for_timestamp_conversion(
engine.storage_handler().as_ref(),
snapshot.log_segment().log_root.clone(),
snapshot.version(),
None,
)
.unwrap();
assert_eq!(
cleaned_log_segment.listed.ascending_commit_files[0].version,
3
);
let res = get_timestamp_search_bounds(&snapshot, &cleaned_log_segment, timestamp).unwrap();
assert!(
matches!(res, TimestampSearchBounds::ICTSearchStartingFrom(0)),
"Expected fast path ICTSearchStartingFrom(0), got {res:?}"
);
}
#[tokio::test]
async fn test_reading_commit_timestamps() {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (_table, engine, _snapshot, log_segment) =
build_test_snapshot(×tamps, Some(3), None).await;
let commits = &log_segment.listed.ascending_commit_files;
assert_eq!(commits[0].location.last_modified, 50);
assert_eq!(commits[1].location.last_modified, 150);
assert_eq!(commits[2].location.last_modified, 250);
let res = commits[0].read_in_commit_timestamp(&engine);
assert!(res.is_err(), "Expected error for file without ICT: {res:?}");
let mut fake_log_path = commits[0].clone();
let failing_path = if cfg!(windows) {
"C:\\phony\\path"
} else {
"/phony/path"
};
fake_log_path.location.location = Url::from_file_path(failing_path).unwrap();
let res = fake_log_path.read_in_commit_timestamp(&engine);
assert!(
res.is_err(),
"Expected error for non-existent file: {res:?}"
);
assert_eq!(commits[3].read_in_commit_timestamp(&engine).unwrap(), 300);
assert_eq!(commits[4].read_in_commit_timestamp(&engine).unwrap(), 400);
}
#[rstest::rstest]
#[case::before_all_lub(0, Bound::LeastUpper, Some(0))]
#[case::before_all_glb(0, Bound::GreatestLower, None)]
#[case::negative_lub(-1, Bound::LeastUpper, Some(0))]
#[case::after_all_lub(1000, Bound::LeastUpper, None)]
#[case::after_all_glb(1000, Bound::GreatestLower, Some(4))]
#[case::exact_v0_lub(50, Bound::LeastUpper, Some(0))]
#[case::exact_v0_glb(50, Bound::GreatestLower, Some(0))]
#[case::exact_v1_lub(150, Bound::LeastUpper, Some(1))]
#[case::exact_v1_glb(150, Bound::GreatestLower, Some(1))]
#[case::exact_v3_ict_lub(300, Bound::LeastUpper, Some(3))]
#[case::exact_v3_ict_glb(300, Bound::GreatestLower, Some(3))]
#[case::between_v0_v1_lub(100, Bound::LeastUpper, Some(1))]
#[case::between_v0_v1_glb(100, Bound::GreatestLower, Some(0))]
#[case::just_after_last_file_mod_lub(251, Bound::LeastUpper, Some(3))]
#[case::just_after_last_file_mod_glb(251, Bound::GreatestLower, Some(2))]
#[case::just_before_ict_lub(299, Bound::LeastUpper, Some(3))]
#[case::just_before_ict_glb(299, Bound::GreatestLower, Some(2))]
#[case::just_after_ict_glb(301, Bound::GreatestLower, Some(3))]
#[case::just_after_ict_lub(301, Bound::LeastUpper, Some(4))]
#[case::between_ict_commits(350, Bound::GreatestLower, Some(3))]
#[case::between_ict_commits_lub(350, Bound::LeastUpper, Some(4))]
#[tokio::test]
async fn test_timestamp_to_version_standard_table(
#[case] timestamp: Timestamp,
#[case] bound: Bound,
#[case] expected: Option<Version>,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let (_table, engine, snapshot, _log_segment) =
build_test_snapshot(×tamps, Some(3), None).await;
let res = timestamp_to_version(&snapshot, &engine, timestamp, bound);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(
matches!(res, Err(LogHistoryError::TimestampOutOfRange { .. })),
"{res:?}"
),
}
}
#[rstest::rstest]
#[case::exact_v0_glb(100, Bound::GreatestLower, Some(0))]
#[case::exact_v0_lub(100, Bound::LeastUpper, Some(0))]
#[case::between_v0_v1_glb(150, Bound::GreatestLower, Some(0))]
#[case::between_v0_v1_lub(150, Bound::LeastUpper, Some(1))]
#[case::between_v1_v2_glb(250, Bound::GreatestLower, Some(1))]
#[case::between_v1_v2_lub(250, Bound::LeastUpper, Some(2))]
#[case::before_all_glb(50, Bound::GreatestLower, None)]
#[case::before_all_lub(50, Bound::LeastUpper, Some(0))]
#[tokio::test]
async fn test_ict_from_creation(
#[case] timestamp: Timestamp,
#[case] bound: Bound,
#[case] expected: Option<Version>,
) {
let mock_table =
mock_table_with_timestamps(&[(0, Some(100)), (0, Some(200)), (0, Some(300))], Some(0))
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(mock_table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_to_version(&snapshot, &engine, timestamp, bound);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(
matches!(res, Err(LogHistoryError::TimestampOutOfRange { .. })),
"{res:?}"
),
}
}
#[rstest::rstest]
#[case::exact_glb(100, Bound::GreatestLower, Some(0))]
#[case::exact_lub(100, Bound::LeastUpper, Some(0))]
#[case::before_glb(50, Bound::GreatestLower, None)]
#[case::before_lub(50, Bound::LeastUpper, Some(0))]
#[case::after_glb(150, Bound::GreatestLower, Some(0))]
#[case::after_lub(150, Bound::LeastUpper, None)]
#[tokio::test]
async fn test_single_commit(
#[case] timestamp: Timestamp,
#[case] bound: Bound,
#[case] expected: Option<Version>,
) {
let mock_table = mock_table_with_timestamps(&[(100, None)], None).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(mock_table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_to_version(&snapshot, &engine, timestamp, bound);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(
matches!(res, Err(LogHistoryError::TimestampOutOfRange { .. })),
"{res:?}"
),
}
}
#[rstest::rstest]
#[case::exact_v0_glb(100, Bound::GreatestLower, Some(0))]
#[case::exact_v1_glb(200, Bound::GreatestLower, Some(1))]
#[case::monotonized_v2_glb(201, Bound::GreatestLower, Some(2))]
#[case::monotonized_v3_glb(202, Bound::GreatestLower, Some(3))]
#[case::between_v3_v4_glb(250, Bound::GreatestLower, Some(3))]
#[case::exact_v4_glb(300, Bound::GreatestLower, Some(4))]
#[case::raw_v2_maps_to_v1_lub(150, Bound::LeastUpper, Some(1))]
#[case::monotonized_v2_lub(201, Bound::LeastUpper, Some(2))]
#[case::after_v3_monotonized_lub(203, Bound::LeastUpper, Some(4))]
#[tokio::test]
async fn test_non_monotonic_timestamps(
#[case] timestamp: Timestamp,
#[case] bound: Bound,
#[case] expected: Option<Version>,
) {
let mock_table = mock_table_with_timestamps(
&[
(100, None),
(200, None),
(150, None),
(180, None),
(300, None),
],
None,
)
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(mock_table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_to_version(&snapshot, &engine, timestamp, bound);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(
matches!(res, Err(LogHistoryError::TimestampOutOfRange { .. })),
"{res:?}"
),
}
}
#[tokio::test]
async fn test_missing_ict_after_enablement() {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)), (450, None), ];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_to_version(&snapshot, &engine, 350, Bound::LeastUpper);
assert!(
matches!(res, Err(LogHistoryError::Internal { .. })),
"Expected internal error for missing ICT: {res:?}"
);
}
#[rstest::rstest]
#[case::before_all(0, None)]
#[case::exact_v0(50, Some(0))]
#[case::between_v0_v1(100, Some(0))]
#[case::exact_v1(150, Some(1))]
#[case::just_after_last_file_mod(251, Some(2))]
#[case::just_before_ict(299, Some(2))]
#[case::exact_ict_enablement(300, Some(3))]
#[case::just_after_ict(301, Some(3))]
#[case::between_ict_commits(350, Some(3))]
#[case::after_all(1000, Some(4))]
#[tokio::test]
async fn test_latest_version_as_of(
#[case] timestamp: Timestamp,
#[case] expected: Option<Version>,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = latest_version_as_of(&snapshot, &engine, timestamp);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::before_all(0, Some(0))]
#[case::exact_v0(50, Some(0))]
#[case::between_v0_v1(100, Some(1))]
#[case::exact_v1(150, Some(1))]
#[case::just_after_last_file_mod(251, Some(3))]
#[case::just_before_ict(299, Some(3))]
#[case::exact_ict_enablement(300, Some(3))]
#[case::just_after_ict(301, Some(4))]
#[case::between_ict_commits(350, Some(4))]
#[case::after_all(1000, None)]
#[tokio::test]
async fn test_first_version_after(
#[case] timestamp: Timestamp,
#[case] expected: Option<Version>,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = first_version_after(&snapshot, &engine, timestamp);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::basic_range(50, Some(300), Ok((0, Some(3))))]
#[case::no_end(100, None, Ok((1, None)))]
#[case::start_equals_end(150, Some(150), Ok((1, Some(1))))]
#[case::spanning_ict_boundary(100, Some(350), Ok((1, Some(3))))]
#[case::entire_table(0, Some(1000), Ok((0, Some(4))))]
#[case::exact_v0_to_v1(50, Some(150), Ok((0, Some(1))))]
#[case::exact_v1_to_v2(150, Some(250), Ok((1, Some(2))))]
#[case::exact_v3_ict_to_v4(300, Some(400), Ok((3, Some(4))))]
#[case::between_v0_v1_to_exact_v2(100, Some(250), Ok((1, Some(2))))]
#[case::just_after_last_file_mod_no_end(251, None, Ok((3, None)))]
#[case::just_before_ict_to_just_after(299, Some(301), Ok((3, Some(3))))]
#[case::exact_ict_to_after_all(300, Some(1000), Ok((3, Some(4))))]
#[case::between_ict_commits(350, Some(400), Ok((4, Some(4))))]
#[case::just_after_ict_to_end(301, Some(1000), Ok((4, Some(4))))]
#[case::end_before_all_fails(0, Some(40), Err(()))]
#[case::start_after_all_fails(500, Some(1000), Err(()))]
#[tokio::test]
async fn test_timestamp_range_to_versions(
#[case] start: Timestamp,
#[case] end: Option<Timestamp>,
#[case] expected: Result<(Version, Option<Version>), ()>,
) {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_range_to_versions(&snapshot, &engine, start, end);
match expected {
Ok(v) => assert_eq!(res.unwrap(), v),
Err(()) => assert!(res.is_err(), "{res:?}"),
}
}
#[tokio::test]
async fn test_timestamp_range_invalid_range() {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_range_to_versions(&snapshot, &engine, 300, Some(100));
assert!(
matches!(
res,
Err(crate::Error::LogHistory(ref e))
if matches!(**e, LogHistoryError::InvalidTimestampRange { .. })
),
"{res:?}"
);
}
#[tokio::test]
async fn test_timestamp_range_empty_range() {
let timestamps = [
(50, None),
(150, None),
(250, None),
(350, Some(300)),
(450, Some(400)),
];
let table = mock_table_with_timestamps(×tamps, Some(3)).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_range_to_versions(&snapshot, &engine, 51, Some(149));
assert!(
matches!(
res,
Err(crate::Error::LogHistory(ref e))
if matches!(**e, LogHistoryError::EmptyTimestampRange { between_version: 0, .. })
),
"{res:?}"
);
}
#[rstest::rstest]
#[case::exact_match(200, Some(1))]
#[case::between_commits(150, Some(0))]
#[case::after_all(500, Some(2))]
#[case::before_all(50, None)]
#[tokio::test]
async fn test_latest_version_as_of_file_mod_only(
#[case] timestamp: Timestamp,
#[case] expected: Option<Version>,
) {
let table =
mock_table_with_timestamps(&[(100, None), (200, None), (300, None)], None).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = latest_version_as_of(&snapshot, &engine, timestamp);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::exact_match(200, Some(1))]
#[case::between_commits(150, Some(1))]
#[case::before_all(50, Some(0))]
#[case::after_all(500, None)]
#[tokio::test]
async fn test_first_version_after_file_mod_only(
#[case] timestamp: Timestamp,
#[case] expected: Option<Version>,
) {
let table =
mock_table_with_timestamps(&[(100, None), (200, None), (300, None)], None).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = first_version_after(&snapshot, &engine, timestamp);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::exact_match(200, Some(1))]
#[case::between_commits(150, Some(0))]
#[case::after_all(500, Some(2))]
#[case::before_all(50, None)]
#[tokio::test]
async fn test_latest_version_as_of_ict_from_creation(
#[case] timestamp: Timestamp,
#[case] expected: Option<Version>,
) {
let table =
mock_table_with_timestamps(&[(0, Some(100)), (0, Some(200)), (0, Some(300))], Some(0))
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = latest_version_as_of(&snapshot, &engine, timestamp);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::exact_match(200, Some(1))]
#[case::between_commits(150, Some(1))]
#[case::before_all(50, Some(0))]
#[case::after_all(500, None)]
#[tokio::test]
async fn test_first_version_after_ict_from_creation(
#[case] timestamp: Timestamp,
#[case] expected: Option<Version>,
) {
let table =
mock_table_with_timestamps(&[(0, Some(100)), (0, Some(200)), (0, Some(300))], Some(0))
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = first_version_after(&snapshot, &engine, timestamp);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::exact_v1(200, Some(1))]
#[case::monotonized_v2(201, Some(2))]
#[case::monotonized_v3(202, Some(3))]
#[case::between_v3_v4(250, Some(3))]
#[case::raw_v2_between_v0_v1(150, Some(0))]
#[tokio::test]
async fn test_latest_version_as_of_non_monotonic(
#[case] timestamp: Timestamp,
#[case] expected: Option<Version>,
) {
let table = mock_table_with_timestamps(
&[
(100, None),
(200, None),
(150, None), (180, None), (300, None),
],
None,
)
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = latest_version_as_of(&snapshot, &engine, timestamp);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::exact_v1(200, Some(1))]
#[case::monotonized_v2(201, Some(2))]
#[case::monotonized_v3(202, Some(3))]
#[case::after_v3_monotonized(203, Some(4))]
#[case::raw_v2_maps_to_v1(150, Some(1))]
#[tokio::test]
async fn test_first_version_after_non_monotonic(
#[case] timestamp: Timestamp,
#[case] expected: Option<Version>,
) {
let table = mock_table_with_timestamps(
&[
(100, None),
(200, None),
(150, None), (180, None), (300, None),
],
None,
)
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = first_version_after(&snapshot, &engine, timestamp);
match expected {
Some(v) => assert_eq!(res.unwrap(), v),
None => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::basic_range(100, Some(200), Ok((0, Some(1))))]
#[case::entire_table(50, Some(500), Ok((0, Some(2))))]
#[case::no_end(150, None, Ok((1, None)))]
#[case::exact_v0_to_v2(100, Some(300), Ok((0, Some(2))))]
#[case::between_commits(150, Some(250), Ok((1, Some(1))))]
#[case::start_equals_end_exact(200, Some(200), Ok((1, Some(1))))]
#[case::end_before_all_fails(50, Some(80), Err(()))]
#[case::start_after_all_fails(400, Some(500), Err(()))]
#[tokio::test]
async fn test_timestamp_range_file_mod_only(
#[case] start: Timestamp,
#[case] end: Option<Timestamp>,
#[case] expected: Result<(Version, Option<Version>), ()>,
) {
let table =
mock_table_with_timestamps(&[(100, None), (200, None), (300, None)], None).await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_range_to_versions(&snapshot, &engine, start, end);
match expected {
Ok(v) => assert_eq!(res.unwrap(), v),
Err(()) => assert!(res.is_err(), "{res:?}"),
}
}
#[rstest::rstest]
#[case::basic_range(100, Some(200), Ok((0, Some(1))))]
#[case::entire_table(50, Some(500), Ok((0, Some(2))))]
#[case::no_end(150, None, Ok((1, None)))]
#[case::exact_v0_to_v2(100, Some(300), Ok((0, Some(2))))]
#[case::between_commits(150, Some(250), Ok((1, Some(1))))]
#[case::start_equals_end_exact(200, Some(200), Ok((1, Some(1))))]
#[case::end_before_all_fails(50, Some(80), Err(()))]
#[case::start_after_all_fails(400, Some(500), Err(()))]
#[tokio::test]
async fn test_timestamp_range_ict_from_creation(
#[case] start: Timestamp,
#[case] end: Option<Timestamp>,
#[case] expected: Result<(Version, Option<Version>), ()>,
) {
let table =
mock_table_with_timestamps(&[(0, Some(100)), (0, Some(200)), (0, Some(300))], Some(0))
.await;
let engine = SyncEngine::new();
let path = Url::from_directory_path(table.table_root()).unwrap();
let snapshot = Snapshot::builder_for(path).build(&engine).unwrap();
let res = timestamp_range_to_versions(&snapshot, &engine, start, end);
match expected {
Ok(v) => assert_eq!(res.unwrap(), v),
Err(()) => assert!(res.is_err(), "{res:?}"),
}
}
}