use url::Url;
use crate::commit_range::CommitRange;
use crate::log_segment::LogSegment;
use crate::path::ParsedLogPath;
use crate::snapshot::SnapshotRef;
use crate::{DeltaResult, Engine, Error, Version};
pub struct CommitRangeBuilder {
table_root: String,
start_version: Version,
end_version: Option<Version>,
snapshot: Option<SnapshotRef>,
commit_ordering: CommitOrdering,
}
impl CommitRangeBuilder {
pub(crate) fn new_for(table_root: impl AsRef<str>, start_version: Version) -> Self {
CommitRangeBuilder {
table_root: table_root.as_ref().to_string(),
start_version,
end_version: None,
snapshot: None,
commit_ordering: CommitOrdering::AscendingOrder,
}
}
pub(crate) fn new_from(snapshot: SnapshotRef, start_version: Version) -> Self {
CommitRangeBuilder {
table_root: snapshot.table_root().to_string(),
start_version,
end_version: None,
snapshot: Some(snapshot.clone()),
commit_ordering: CommitOrdering::AscendingOrder,
}
}
pub fn with_end_version(mut self, end_version: Version) -> Self {
self.end_version = Some(end_version);
self
}
pub fn with_ordering(mut self, commit_ordering: CommitOrdering) -> Self {
self.commit_ordering = commit_ordering;
self
}
pub fn build(&self, engine: &dyn Engine) -> DeltaResult<CommitRange> {
let table_root = Self::parse_table_root(&self.table_root)?;
let log_root = table_root.join("_delta_log/")?;
let start_version = self.start_version;
let end_version = self.end_version;
let log_segment = match &self.snapshot {
Some(snapshot) => snapshot.log_segment().clone(),
None => LogSegment::for_table_changes(
engine.storage_handler().as_ref(),
log_root,
start_version,
end_version,
)?,
};
let end_version = end_version.unwrap_or(log_segment.end_version);
if self.snapshot.is_some() && end_version > log_segment.end_version {
return Err(Error::generic(format!(
"end_version ({end_version}) cannot exceed snapshot version ({})",
log_segment.end_version
)));
}
let mut commit_files: Vec<ParsedLogPath> = log_segment
.listed
.ascending_commit_files
.into_iter()
.filter(|f| f.version >= start_version && f.version <= end_version)
.collect();
validate_version_range(start_version, end_version)?;
if self.snapshot.is_some() {
validate_start_version_available(start_version, commit_files.first())?;
}
validate_number_of_commit_files(start_version, end_version, commit_files.len())?;
if self.commit_ordering == CommitOrdering::DescendingOrder {
commit_files.reverse();
}
Ok(CommitRange {
table_root,
commit_files,
start_version,
end_version,
commit_ordering: self.commit_ordering,
})
}
fn parse_table_root(table_root: &str) -> DeltaResult<Url> {
crate::utils::try_parse_uri(table_root)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CommitOrdering {
AscendingOrder,
DescendingOrder,
}
fn validate_version_range(start: Version, end: Version) -> DeltaResult<()> {
if start > end {
return Err(Error::generic(format!(
"start_version ({start}) must be <= end_version ({end})",
)));
}
Ok(())
}
fn validate_start_version_available(
start_version: Version,
first_commit: Option<&ParsedLogPath>,
) -> DeltaResult<()> {
if first_commit.map(|f| f.version) == Some(start_version) {
return Ok(());
}
let earliest_available_commit = first_commit
.map(|f| f.version)
.ok_or_else(|| Error::generic("snapshot's log segment must have at least one commit"))?;
Err(Error::generic(format!(
"start_version {start_version} is not available in the snapshot's log segment \
(earliest available commit: {earliest_available_commit})",
)))
}
fn validate_number_of_commit_files(
start: Version,
end: Version,
commit_file_count: usize,
) -> DeltaResult<()> {
let expected = end - start + 1;
let actual = commit_file_count as u64;
if expected != actual {
return Err(Error::generic(format!(
"The number of commit files: {actual} does not match the expected range (start_version: {start}, end_version: {end}): expected {expected} commit files",
)));
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::sync::Arc;
use super::*;
use crate::commit_range::DeltaAction;
use crate::engine::sync::SyncEngine;
use crate::{Engine, Snapshot};
fn dv_small_table_root() -> Url {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
Url::from_directory_path(path).unwrap()
}
#[test]
fn test_build_path_based_succeeds() {
let table_root = dv_small_table_root();
let engine = SyncEngine::new();
let range = CommitRange::builder_for(table_root.as_str(), 0)
.with_end_version(1)
.build(&engine)
.unwrap();
assert_eq!(range.start_version(), 0);
assert_eq!(range.end_version(), 1);
assert_eq!(range.table_root().as_str(), table_root.as_str());
}
#[test]
fn test_build_snapshot_based_succeeds() {
let table_root = dv_small_table_root();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(table_root.as_str())
.build(&engine)
.unwrap();
let snapshot_version = snapshot.version();
let range = CommitRange::builder_from(snapshot, 0)
.build(&engine)
.unwrap();
assert_eq!(range.start_version(), 0);
assert_eq!(range.end_version(), snapshot_version);
}
#[rstest::rstest]
#[case::start_past_snapshot_version(
5,
None,
&["start_version (5)", "end_version"],
)]
#[case::end_past_snapshot_version(
0,
Some(99),
&["99", "snapshot"],
)]
fn test_build_errors_on_version_past_snapshot_version(
#[case] start: Version,
#[case] end: Option<Version>,
#[case] expected_substrings: &[&str],
) {
let table_root = dv_small_table_root();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(table_root.as_str())
.build(&engine)
.unwrap();
let mut builder = CommitRange::builder_from(snapshot, start);
if let Some(end) = end {
builder = builder.with_end_version(end);
}
let err = builder.build(&engine).expect_err("must error");
let msg = format!("{err}");
for needle in expected_substrings {
assert!(
msg.contains(needle),
"expected message to contain {needle:?}, got: {msg}",
);
}
}
#[test]
fn test_build_snapshot_based_with_explicit_end_version() {
let table_root = dv_small_table_root();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(table_root.as_str())
.build(&engine)
.unwrap();
let range = CommitRange::builder_from(snapshot, 0)
.with_end_version(0)
.build(&engine)
.unwrap();
assert_eq!(
range.end_version(),
0,
"explicit end_version must be honored",
);
}
#[test]
fn test_build_path_based_without_end_version_extends_to_latest() {
let table_root = dv_small_table_root();
let engine = SyncEngine::new();
let range = CommitRange::builder_for(table_root.as_str(), 0)
.build(&engine)
.unwrap();
assert_eq!(range.start_version(), 0);
assert_eq!(
range.end_version(),
1,
"table-with-dv-small latest commit is v=1"
);
}
#[test]
fn test_build_descending_ordering_yields_commits_in_reverse_order() {
let table_root = dv_small_table_root();
let engine: Arc<dyn Engine> = Arc::new(SyncEngine::new());
let actions = [DeltaAction::Add, DeltaAction::Remove];
let asc_range = CommitRange::builder_for(table_root.as_str(), 0)
.with_end_version(1)
.build(engine.as_ref())
.unwrap();
let desc_range = CommitRange::builder_for(table_root.as_str(), 0)
.with_end_version(1)
.with_ordering(CommitOrdering::DescendingOrder)
.build(engine.as_ref())
.unwrap();
let snapshot_at_start = Snapshot::builder_for(table_root.as_str())
.at_version(0)
.build(engine.as_ref())
.unwrap();
let snapshot_at_end = Snapshot::builder_for(table_root.as_str())
.at_version(1)
.build(engine.as_ref())
.unwrap();
let asc_versions = asc_range
.commits(engine.clone(), Some(snapshot_at_start), &actions)
.unwrap()
.map(|c| c.unwrap().version())
.collect::<Vec<_>>();
let desc_versions = desc_range
.commits(engine, Some(snapshot_at_end), &actions)
.unwrap()
.map(|c| c.unwrap().version())
.collect::<Vec<_>>();
assert_eq!(asc_versions, vec![0, 1]);
assert_eq!(desc_versions, vec![1, 0]);
}
}