#![allow(unused)]
use std::sync::Arc;
use itertools::Itertools;
use crate::log_reader::checkpoint_manifest::CheckpointManifestReader;
use crate::log_reader::commit::CommitReader;
use crate::log_replay::LogReplayProcessor;
use crate::log_segment::LogSegment;
use crate::scan::COMMIT_READ_SCHEMA;
use crate::utils::require;
use crate::{DeltaResult, Engine, Error, FileMeta};
use delta_kernel_derive::internal_api;
#[internal_api]
pub(crate) struct SequentialPhase<P: LogReplayProcessor> {
processor: P,
commit_phase: Option<CommitReader>,
checkpoint_manifest_phase: Option<CheckpointManifestReader>,
is_finished: bool,
checkpoint_parts: Vec<FileMeta>,
}
#[internal_api]
pub(crate) enum AfterSequential<P: LogReplayProcessor> {
Done(P),
Parallel { processor: P, files: Vec<FileMeta> },
}
impl<P: LogReplayProcessor> SequentialPhase<P> {
#[internal_api]
pub(crate) fn try_new(
processor: P,
log_segment: &LogSegment,
engine: Arc<dyn Engine>,
) -> DeltaResult<Self> {
let commit_phase = Some(CommitReader::try_new(
engine.as_ref(),
log_segment,
COMMIT_READ_SCHEMA.clone(),
)?);
let checkpoint_manifest_phase = match log_segment.listed.checkpoint_parts.as_slice() {
[single_part] => Some(CheckpointManifestReader::try_new(
engine,
single_part,
log_segment.log_root.clone(),
)?),
_ => None,
};
let checkpoint_parts = log_segment
.listed
.checkpoint_parts
.iter()
.map(|path| path.location.clone())
.collect_vec();
Ok(Self {
processor,
commit_phase,
checkpoint_manifest_phase,
is_finished: false,
checkpoint_parts,
})
}
#[internal_api]
pub(crate) fn finish(self) -> DeltaResult<AfterSequential<P>> {
if !self.is_finished {
return Err(Error::generic(
"Must exhaust iterator before calling finish()",
));
}
let parallel_files = match self.checkpoint_manifest_phase {
Some(manifest_reader) => manifest_reader.extract_sidecars()?,
None => {
let parts = self.checkpoint_parts;
require!(
parts.len() != 1,
Error::generic(
"Invariant violation: If there is exactly one checkpoint part,
there must be a manifest reader"
)
);
parts
}
};
if parallel_files.is_empty() {
Ok(AfterSequential::Done(self.processor))
} else {
Ok(AfterSequential::Parallel {
processor: self.processor,
files: parallel_files,
})
}
}
}
impl<P: LogReplayProcessor> Iterator for SequentialPhase<P> {
type Item = DeltaResult<P::Output>;
fn next(&mut self) -> Option<Self::Item> {
let next = self
.commit_phase
.as_mut()
.and_then(|commit_phase| commit_phase.next())
.or_else(|| {
self.commit_phase = None;
self.checkpoint_manifest_phase.as_mut()?.next()
});
let Some(result) = next else {
self.is_finished = true;
return None;
};
Some(result.and_then(|batch| self.processor.process_actions_batch(batch)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scan::AfterSequentialScanMetadata;
use crate::utils::test_utils::{assert_result_error_with_message, load_test_table};
fn verify_sequential_processing(
table_name: &str,
expected_adds: &[&str],
expected_sidecars: &[&str],
) -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table(table_name)?;
let scan = snapshot.scan_builder().build()?;
let mut sequential = scan.parallel_scan_metadata(engine)?;
let mut file_paths = Vec::new();
for result in sequential.by_ref() {
let metadata = result?;
file_paths =
metadata.visit_scan_files(file_paths, |ps: &mut Vec<String>, file_stat| {
ps.push(file_stat.path);
})?;
}
file_paths.sort();
assert_eq!(
file_paths, expected_adds,
"Sequential phase should collect expected Add file paths"
);
let result = sequential.finish()?;
match (expected_sidecars, result) {
(sidecars, AfterSequentialScanMetadata::Done) => {
assert!(
sidecars.is_empty(),
"Expected Done but got sidecars {sidecars:?}"
);
}
(expected_sidecars, AfterSequentialScanMetadata::Parallel { files, .. }) => {
assert_eq!(
files.len(),
expected_sidecars.len(),
"Should collect exactly {} sidecar files",
expected_sidecars.len()
);
let mut collected_paths = files
.iter()
.map(|fm| {
fm.location
.path_segments()
.and_then(|mut segments| segments.next_back())
.unwrap_or("")
.to_string()
})
.collect_vec();
collected_paths.sort();
assert_eq!(collected_paths, expected_sidecars);
}
}
Ok(())
}
#[test]
fn test_sequential_v2_with_commits_only() -> DeltaResult<()> {
verify_sequential_processing(
"table-without-dv-small",
&["part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"],
&[], )
}
#[test]
fn test_sequential_v2_with_sidecars() -> DeltaResult<()> {
verify_sequential_processing(
"v2-checkpoints-json-with-sidecars",
&[], &[
"00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet",
"00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet",
],
)
}
#[test]
fn test_sequential_finish_before_exhaustion_error() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("table-without-dv-small")?;
let scan = snapshot.scan_builder().build()?;
let sequential = scan.parallel_scan_metadata(engine)?;
let result = sequential.finish();
assert_result_error_with_message(result, "Must exhaust iterator before calling finish()");
Ok(())
}
#[test]
fn test_sequential_checkpoint_without_sidecars() -> DeltaResult<()> {
verify_sequential_processing(
"v2-checkpoints-json-without-sidecars",
&[
"test%25file%25prefix-part-00000-0e32f92c-e232-4daa-b734-369d1a800502-c000.snappy.parquet",
"test%25file%25prefix-part-00000-91daf7c5-9ba0-4f76-aefd-0c3b21d33c6c-c000.snappy.parquet",
"test%25file%25prefix-part-00001-a5c41be1-ded0-4b18-a638-a927d233876e-c000.snappy.parquet",
],
&[], )
}
#[test]
fn test_sequential_parquet_checkpoint_with_sidecars() -> DeltaResult<()> {
verify_sequential_processing(
"v2-checkpoints-parquet-with-sidecars",
&[], &[
"00000000000000000006.checkpoint.0000000001.0000000002.76931b15-ead3-480d-b86c-afe55a577fc3.parquet",
"00000000000000000006.checkpoint.0000000002.0000000002.4367b29c-0e87-447f-8e81-9814cc01ad1f.parquet",
],
)
}
#[test]
fn test_sequential_checkpoint_no_commits() -> DeltaResult<()> {
verify_sequential_processing(
"with_checkpoint_no_last_checkpoint",
&["part-00000-70b1dcdf-0236-4f63-a072-124cdbafd8a0-c000.snappy.parquet"], &[], )
}
}