use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::path::ParsedLogPath;
use crate::schema::SchemaRef;
use crate::snapshot::CheckpointMetadata;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version,
};
use itertools::Itertools;
use std::cmp::Ordering;
use std::convert::identity;
use std::sync::{Arc, LazyLock};
use tracing::warn;
use url::Url;
#[cfg(test)]
mod tests;
#[derive(Debug)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct LogSegment {
pub end_version: Version,
pub log_root: Url,
pub ascending_commit_files: Vec<ParsedLogPath>,
pub checkpoint_parts: Vec<ParsedLogPath>,
}
impl LogSegment {
fn try_new(
ascending_commit_files: Vec<ParsedLogPath>,
checkpoint_parts: Vec<ParsedLogPath>,
log_root: Url,
end_version: Option<Version>,
) -> DeltaResult<Self> {
require!(
ascending_commit_files
.windows(2)
.all(|cfs| cfs[0].version + 1 == cfs[1].version),
Error::generic(format!(
"Expected ordered contiguous commit files {:?}",
ascending_commit_files
))
);
if let (Some(checkpoint_file), Some(commit_file)) =
(checkpoint_parts.first(), ascending_commit_files.first())
{
require!(
checkpoint_file.version + 1 == commit_file.version,
Error::InvalidCheckpoint(format!(
"Gap between checkpoint version {} and next commit {}",
checkpoint_file.version, commit_file.version,
))
)
}
let version_eff = ascending_commit_files
.last()
.or(checkpoint_parts.first())
.ok_or(Error::generic("No files in log segment"))?
.version;
if let Some(end_version) = end_version {
require!(
version_eff == end_version,
Error::generic(format!(
"LogSegment end version {} not the same as the specified end version {}",
version_eff, end_version
))
);
}
Ok(LogSegment {
end_version: version_eff,
log_root,
ascending_commit_files,
checkpoint_parts,
})
}
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn for_snapshot(
fs_client: &dyn FileSystemClient,
log_root: Url,
checkpoint_hint: impl Into<Option<CheckpointMetadata>>,
time_travel_version: impl Into<Option<Version>>,
) -> DeltaResult<Self> {
let time_travel_version = time_travel_version.into();
let (mut ascending_commit_files, checkpoint_parts) =
match (checkpoint_hint.into(), time_travel_version) {
(Some(cp), None) => {
list_log_files_with_checkpoint(&cp, fs_client, &log_root, None)?
}
(Some(cp), Some(end_version)) if cp.version <= end_version => {
list_log_files_with_checkpoint(&cp, fs_client, &log_root, Some(end_version))?
}
_ => list_log_files_with_version(fs_client, &log_root, None, time_travel_version)?,
};
if let Some(checkpoint_file) = checkpoint_parts.first() {
ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}
LogSegment::try_new(
ascending_commit_files,
checkpoint_parts,
log_root,
time_travel_version,
)
}
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn for_table_changes(
fs_client: &dyn FileSystemClient,
log_root: Url,
start_version: Version,
end_version: impl Into<Option<Version>>,
) -> DeltaResult<Self> {
let end_version = end_version.into();
if let Some(end_version) = end_version {
if start_version > end_version {
return Err(Error::generic(
"Failed to build LogSegment: start_version cannot be greater than end_version",
));
}
}
let ascending_commit_files: Vec<_> =
list_log_files(fs_client, &log_root, start_version, end_version)?
.filter_ok(|x| x.is_commit())
.try_collect()?;
require!(
ascending_commit_files
.first()
.is_some_and(|first_commit| first_commit.version == start_version),
Error::generic(format!(
"Expected the first commit to have version {}",
start_version
))
);
LogSegment::try_new(ascending_commit_files, vec![], log_root, end_version)
}
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn replay(
&self,
engine: &dyn Engine,
commit_read_schema: SchemaRef,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let commit_files: Vec<_> = self
.ascending_commit_files
.iter()
.rev()
.map(|f| f.location.clone())
.collect();
let commit_stream = engine
.get_json_handler()
.read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())?
.map_ok(|batch| (batch, true));
let checkpoint_parts: Vec<_> = self
.checkpoint_parts
.iter()
.map(|f| f.location.clone())
.collect();
let checkpoint_stream = engine
.get_parquet_handler()
.read_parquet_files(&checkpoint_parts, checkpoint_read_schema, meta_predicate)?
.map_ok(|batch| (batch, false));
Ok(commit_stream.chain(checkpoint_stream))
}
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
let data_batches = self.replay_for_metadata(engine)?;
let (mut metadata_opt, mut protocol_opt) = (None, None);
for batch in data_batches {
let (batch, _) = batch?;
if metadata_opt.is_none() {
metadata_opt = Metadata::try_new_from_data(batch.as_ref())?;
}
if protocol_opt.is_none() {
protocol_opt = Protocol::try_new_from_data(batch.as_ref())?;
}
if metadata_opt.is_some() && protocol_opt.is_some() {
break;
}
}
match (metadata_opt, protocol_opt) {
(Some(m), Some(p)) => Ok((m, p)),
(None, Some(_)) => Err(Error::MissingMetadata),
(Some(_), None) => Err(Error::MissingProtocol),
(None, None) => Err(Error::MissingMetadataAndProtocol),
}
}
fn replay_for_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
static META_PREDICATE: LazyLock<Option<ExpressionRef>> = LazyLock::new(|| {
Some(Arc::new(Expression::or(
Expression::column([METADATA_NAME, "id"]).is_not_null(),
Expression::column([PROTOCOL_NAME, "minReaderVersion"]).is_not_null(),
)))
});
self.replay(engine, schema.clone(), schema, META_PREDICATE.clone())
}
}
fn list_log_files(
fs_client: &dyn FileSystemClient,
log_root: &Url,
start_version: impl Into<Option<Version>>,
end_version: impl Into<Option<Version>>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ParsedLogPath>>> {
let start_version = start_version.into().unwrap_or(0);
let end_version = end_version.into();
let version_prefix = format!("{:020}", start_version);
let start_from = log_root.join(&version_prefix)?;
Ok(fs_client
.list_from(&start_from)?
.map(|meta| ParsedLogPath::try_from(meta?))
.filter_map_ok(identity)
.take_while(move |path_res| match path_res {
Ok(path) => !end_version.is_some_and(|end_version| end_version < path.version),
Err(_) => true,
}))
}
fn list_log_files_with_version(
fs_client: &dyn FileSystemClient,
log_root: &Url,
start_version: Option<Version>,
end_version: Option<Version>,
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
let mut commit_files = Vec::with_capacity(10);
let mut checkpoint_parts = vec![];
let mut max_checkpoint_version = start_version;
for parsed_path in list_log_files(fs_client, log_root, start_version, end_version)? {
let parsed_path = parsed_path?;
if parsed_path.is_commit() {
commit_files.push(parsed_path);
} else if parsed_path.is_checkpoint() {
let path_version = parsed_path.version;
match max_checkpoint_version {
None => {
checkpoint_parts.push(parsed_path);
max_checkpoint_version = Some(path_version);
}
Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) {
Ordering::Greater => {
max_checkpoint_version = Some(path_version);
checkpoint_parts.clear();
checkpoint_parts.push(parsed_path);
}
Ordering::Equal => checkpoint_parts.push(parsed_path),
Ordering::Less => {}
},
}
}
}
Ok((commit_files, checkpoint_parts))
}
fn list_log_files_with_checkpoint(
checkpoint_metadata: &CheckpointMetadata,
fs_client: &dyn FileSystemClient,
log_root: &Url,
end_version: Option<Version>,
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
let (commit_files, checkpoint_parts) = list_log_files_with_version(
fs_client,
log_root,
Some(checkpoint_metadata.version),
end_version,
)?;
let Some(latest_checkpoint) = checkpoint_parts.last() else {
return Err(Error::invalid_checkpoint(
"Had a _last_checkpoint hint but didn't find any checkpoints",
));
};
if latest_checkpoint.version != checkpoint_metadata.version {
warn!(
"_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}",
checkpoint_metadata.version,
latest_checkpoint.version
);
} else if checkpoint_parts.len() != checkpoint_metadata.parts.unwrap_or(1) {
return Err(Error::InvalidCheckpoint(format!(
"_last_checkpoint indicated that checkpoint should have {} parts, but it has {}",
checkpoint_metadata.parts.unwrap_or(1),
checkpoint_parts.len()
)));
}
Ok((commit_files, checkpoint_parts))
}