use std::collections::HashMap;
use delta_kernel_derive::internal_api;
use itertools::Itertools;
use tracing::{debug, info, instrument, warn};
use url::Url;
use crate::last_checkpoint_hint::LastCheckpointHint;
use crate::path::LogPathFileType::*;
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::{DeltaResult, Error, StorageHandler, Version};
#[cfg(test)]
mod tests;
#[derive(Debug, Default, Clone, PartialEq, Eq)]
#[internal_api]
pub(crate) struct LogSegmentFiles {
pub ascending_commit_files: Vec<ParsedLogPath>,
pub ascending_compaction_files: Vec<ParsedLogPath>,
pub checkpoint_parts: Vec<ParsedLogPath>,
pub latest_crc_file: Option<ParsedLogPath>,
pub latest_commit_file: Option<ParsedLogPath>,
pub max_published_version: Option<Version>,
}
fn list_from_storage(
storage: &dyn StorageHandler,
log_root: &Url,
start_version: Version,
end_version: Version,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ParsedLogPath>>> {
let start_from = log_root.join(&format!("{start_version:020}"))?;
let files = storage
.list_from(&start_from)?
.map(|meta| ParsedLogPath::try_from(meta?))
.filter_map_ok(|path_opt| path_opt.filter(|p| p.should_list()))
.take_while(move |path_res| match path_res {
Ok(path) => path.version <= end_version,
Err(_) => true,
});
Ok(files)
}
fn group_checkpoint_parts(parts: Vec<ParsedLogPath>) -> HashMap<u32, Vec<ParsedLogPath>> {
let mut checkpoints: HashMap<u32, Vec<ParsedLogPath>> = HashMap::new();
for part_file in parts {
match &part_file.file_type {
SinglePartCheckpoint
| UuidCheckpoint
| MultiPartCheckpoint {
part_num: 1,
num_parts: 1,
} => {
checkpoints.insert(1, vec![part_file]);
}
MultiPartCheckpoint {
part_num: 1,
num_parts,
} => {
checkpoints.insert(*num_parts, vec![part_file]);
}
MultiPartCheckpoint {
part_num,
num_parts,
} => {
if let Some(part_files) = checkpoints.get_mut(num_parts) {
if *part_num as usize == 1 + part_files.len() {
part_files.push(part_file);
}
}
}
Commit | StagedCommit | CompactedCommit { .. } | Crc | Unknown => {}
}
}
checkpoints
}
fn find_complete_checkpoint_version(ascending_files: &[ParsedLogPath]) -> Option<Version> {
ascending_files
.iter()
.filter(|f| f.is_checkpoint() && should_process_log_file(f))
.chunk_by(|f| f.version)
.into_iter()
.filter_map(|(version, parts)| {
let owned: Vec<ParsedLogPath> = parts.cloned().collect();
group_checkpoint_parts(owned)
.iter()
.any(|(num_parts, part_files)| part_files.len() == *num_parts as usize)
.then_some(version)
})
.last()
}
fn should_process_log_file(file: &ParsedLogPath) -> bool {
if file.location.size > 0 {
return true;
}
match file.file_type {
Commit | StagedCommit => {
warn!(
"{:?} file is empty (0 bytes): {}",
file.file_type, file.location.location,
);
return true;
}
CompactedCommit { .. } => {
warn!(
"Skipping empty (0 byte) compacted log file {}, \
falling back to individual commits",
file.location.location,
);
}
SinglePartCheckpoint | UuidCheckpoint | MultiPartCheckpoint { .. } => {
warn!(
"Skipping empty (0 byte) checkpoint file: {}",
file.location.location,
);
}
Crc => {
warn!("CRC file is empty (0 bytes): {}", file.location.location,);
return true;
}
Unknown => return true,
}
false
}
#[derive(Default)]
struct ListingAccumulator {
output: LogSegmentFiles,
pending_checkpoint_parts: Vec<ParsedLogPath>,
end_version: Option<Version>,
group_version: Option<Version>,
}
impl ListingAccumulator {
fn process_file(&mut self, file: ParsedLogPath) {
if !should_process_log_file(&file) {
return;
}
match file.file_type {
Commit | StagedCommit => self.output.ascending_commit_files.push(file),
CompactedCommit { hi } if self.end_version.is_none_or(|end| hi <= end) => {
self.output.ascending_compaction_files.push(file);
}
CompactedCommit { .. } => (), SinglePartCheckpoint | UuidCheckpoint | MultiPartCheckpoint { .. } => {
self.pending_checkpoint_parts.push(file)
}
Crc => {
self.output.latest_crc_file.replace(file);
}
Unknown => {
debug!(
"Found file {} with unknown file type {:?} at version {}",
file.filename, file.file_type, file.version
);
}
}
}
fn maybe_flush_and_advance(&mut self, file_version: Version) {
match self.group_version {
Some(gv) if file_version != gv => {
self.flush_checkpoint_group(gv);
self.group_version = Some(file_version);
}
None => {
self.group_version = Some(file_version);
}
_ => {} }
}
fn flush_checkpoint_group(&mut self, version: Version) {
let pending_checkpoint_parts = std::mem::take(&mut self.pending_checkpoint_parts);
if let Some((_, complete_checkpoint)) = group_checkpoint_parts(pending_checkpoint_parts)
.into_iter()
.find(|(num_parts, part_files)| part_files.len() == *num_parts as usize)
{
self.output.checkpoint_parts = complete_checkpoint;
self.output.latest_commit_file = self
.output
.ascending_commit_files
.last()
.filter(|c| c.version == version)
.cloned();
self.output.ascending_commit_files.clear();
self.output.ascending_compaction_files.clear();
if self
.output
.latest_crc_file
.as_ref()
.is_some_and(|crc| crc.version < version)
{
self.output.latest_crc_file = None;
}
}
}
}
const BACKWARD_SCAN_WINDOW_SIZE: u64 = 1000;
impl LogSegmentFiles {
pub(crate) fn build_log_segment_files(
fs_files: impl Iterator<Item = DeltaResult<ParsedLogPath>>,
log_tail: Vec<ParsedLogPath>,
start_version: Version,
end_version: Option<Version>,
) -> DeltaResult<Self> {
debug_assert!(
log_tail.iter().all(|entry| entry.is_commit()),
"log_tail should only contain commits"
);
let log_tail_start_version = log_tail.first().map(|f| f.version);
let end = end_version.unwrap_or(Version::MAX);
let mut acc = ListingAccumulator {
end_version,
..Default::default()
};
for file_result in fs_files {
let file = file_result?;
if matches!(file.file_type, LogPathFileType::Commit) {
acc.output.max_published_version =
acc.output.max_published_version.max(Some(file.version));
}
if file.is_commit()
&& log_tail_start_version.is_some_and(|tail_start| file.version >= tail_start)
{
continue;
}
acc.maybe_flush_and_advance(file.version);
acc.process_file(file);
}
let filtered_log_tail = log_tail
.into_iter()
.filter(|entry| entry.version >= start_version && entry.version <= end);
for file in filtered_log_tail {
if matches!(file.file_type, LogPathFileType::Commit) {
acc.output.max_published_version =
acc.output.max_published_version.max(Some(file.version));
}
acc.maybe_flush_and_advance(file.version);
acc.process_file(file);
}
if let Some(gv) = acc.group_version {
acc.flush_checkpoint_group(gv);
}
if let Some(commit_file) = acc.output.ascending_commit_files.last() {
acc.output.latest_commit_file = Some(commit_file.clone());
}
Ok(acc.output)
}
pub(crate) fn ascending_commit_files(&self) -> &Vec<ParsedLogPath> {
&self.ascending_commit_files
}
pub(crate) fn ascending_commit_files_mut(&mut self) -> &mut Vec<ParsedLogPath> {
&mut self.ascending_commit_files
}
pub(crate) fn checkpoint_parts(&self) -> &Vec<ParsedLogPath> {
&self.checkpoint_parts
}
pub(crate) fn latest_commit_file(&self) -> &Option<ParsedLogPath> {
&self.latest_commit_file
}
pub(crate) fn list_commits(
storage: &dyn StorageHandler,
log_root: &Url,
start_version: Option<Version>,
end_version: Option<Version>,
) -> DeltaResult<Self> {
let start = start_version.unwrap_or(0);
let end = end_version.unwrap_or(Version::MAX);
let fs_iter = list_from_storage(storage, log_root, start, end)?;
let mut listed_commits = Vec::new();
let mut max_published_version: Option<Version> = None;
for file_result in fs_iter {
let file = file_result?;
if matches!(file.file_type, LogPathFileType::Commit) {
should_process_log_file(&file); max_published_version = max_published_version.max(Some(file.version));
listed_commits.push(file);
}
}
let latest_commit_file = listed_commits.last().cloned();
Ok(LogSegmentFiles {
ascending_commit_files: listed_commits,
latest_commit_file,
max_published_version,
..Default::default()
})
}
#[instrument(name = "log.list", skip_all, fields(start = ?start_version, end = ?end_version), err)]
pub(crate) fn list(
storage: &dyn StorageHandler,
log_root: &Url,
log_tail: Vec<ParsedLogPath>,
start_version: Option<Version>,
end_version: Option<Version>,
) -> DeltaResult<Self> {
let start = start_version.unwrap_or(0);
let end = end_version.unwrap_or(Version::MAX);
let fs_iter = list_from_storage(storage, log_root, start, end)?;
Self::build_log_segment_files(fs_iter, log_tail, start, end_version)
}
pub(crate) fn list_with_checkpoint_hint(
checkpoint_metadata: &LastCheckpointHint,
storage: &dyn StorageHandler,
log_root: &Url,
log_tail: Vec<ParsedLogPath>,
end_version: Option<Version>,
) -> DeltaResult<Self> {
let listed_files = Self::list(
storage,
log_root,
log_tail,
Some(checkpoint_metadata.version),
end_version,
)?;
let Some(latest_checkpoint) = listed_files.checkpoint_parts.last() else {
return Err(Error::invalid_checkpoint(
"Had a _last_checkpoint hint but didn't find any checkpoints",
));
};
if latest_checkpoint.version != checkpoint_metadata.version {
info!(
"_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}",
checkpoint_metadata.version,
latest_checkpoint.version
);
} else if listed_files.checkpoint_parts.len() != checkpoint_metadata.parts.unwrap_or(1) {
return Err(Error::InvalidCheckpoint(format!(
"_last_checkpoint indicated that checkpoint should have {} parts, but it has {}",
checkpoint_metadata.parts.unwrap_or(1),
listed_files.checkpoint_parts.len()
)));
}
Ok(listed_files)
}
#[instrument(name = "log.list_with_backward_checkpoint_scan", skip_all, fields(end = end_version), err)]
pub(crate) fn list_with_backward_checkpoint_scan(
storage: &dyn StorageHandler,
log_root: &Url,
log_tail: Vec<ParsedLogPath>,
end_version: Version,
) -> DeltaResult<Self> {
let mut windows: Vec<Vec<ParsedLogPath>> = Vec::new();
let mut found_checkpoint_version: Option<Version> = None;
let mut upper = end_version + 1;
while upper > 0 {
let lower = upper.saturating_sub(BACKWARD_SCAN_WINDOW_SIZE);
let window_files: Vec<_> =
list_from_storage(storage, log_root, lower, upper - 1)?.try_collect()?;
found_checkpoint_version = find_complete_checkpoint_version(&window_files);
windows.push(window_files);
if found_checkpoint_version.is_some() {
break;
}
upper = lower;
}
let fs_iter = windows.into_iter().rev().flatten().map(Ok);
let start = found_checkpoint_version.unwrap_or(0);
Self::build_log_segment_files(fs_iter, log_tail, start, Some(end_version))
}
}