use std::sync::{Arc, LazyLock};
use itertools::Itertools;
use url::Url;
use crate::actions::visitors::SidecarVisitor;
use crate::actions::{Add, Remove, Sidecar, ADD_NAME, REMOVE_NAME, SIDECAR_NAME};
use crate::log_replay::ActionsBatch;
use crate::path::ParsedLogPath;
use crate::schema::{SchemaRef, StructField, StructType, ToSchema};
use crate::utils::require;
use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor};
#[allow(unused)]
pub(crate) struct CheckpointManifestReader {
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
sidecar_visitor: SidecarVisitor,
log_root: Url,
is_complete: bool,
manifest_file: FileMeta,
}
impl CheckpointManifestReader {
#[allow(unused)]
pub(crate) fn try_new(
engine: Arc<dyn Engine>,
manifest: &ParsedLogPath,
log_root: Url,
) -> DeltaResult<Self> {
static MANIFEST_READ_SCHMEA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
]))
});
let actions = match manifest.extension.as_str() {
"json" => engine.json_handler().read_json_files(
std::slice::from_ref(&manifest.location),
MANIFEST_READ_SCHMEA.clone(),
None,
)?,
"parquet" => engine.parquet_handler().read_parquet_files(
std::slice::from_ref(&manifest.location),
MANIFEST_READ_SCHMEA.clone(),
None,
)?,
extension => {
return Err(Error::generic(format!(
"Unsupported checkpoint extension: {extension}",
)));
}
};
let actions = Box::new(actions.map_ok(|batch_res| ActionsBatch::new(batch_res, false)));
Ok(Self {
actions,
sidecar_visitor: SidecarVisitor::default(),
log_root,
is_complete: false,
manifest_file: manifest.location.clone(),
})
}
#[allow(unused)]
pub(crate) fn extract_sidecars(self) -> DeltaResult<Vec<FileMeta>> {
require!(
self.is_complete,
Error::generic(format!(
"Cannot extract sidecars from in-progress ManifestReader for file: {}",
self.manifest_file.location
))
);
let sidecars: Vec<_> = self
.sidecar_visitor
.sidecars
.into_iter()
.map(|s| s.to_filemeta(&self.log_root))
.try_collect()?;
Ok(sidecars)
}
}
impl Iterator for CheckpointManifestReader {
type Item = DeltaResult<ActionsBatch>;
fn next(&mut self) -> Option<Self::Item> {
let Some(result) = self.actions.next() else {
self.is_complete = true;
return None;
};
Some(result.and_then(|batch| {
self.sidecar_visitor.visit_rows_of(batch.actions())?;
Ok(batch)
}))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use itertools::Itertools;
use super::*;
use crate::arrow::array::{Array, StringArray, StructArray};
use crate::engine::arrow_data::EngineDataArrowExt as _;
use crate::utils::test_utils::{assert_result_error_with_message, load_test_table};
use crate::SnapshotRef;
fn verify_manifest_phase(
engine: Arc<dyn Engine>,
snapshot: SnapshotRef,
expected_add_paths: &[&str],
expected_sidecars: &[&str],
) -> DeltaResult<()> {
let log_segment = snapshot.log_segment();
let log_root = log_segment.log_root.clone();
assert_eq!(log_segment.listed.checkpoint_parts.len(), 1);
let checkpoint_file = &log_segment.listed.checkpoint_parts[0];
let mut manifest_phase =
CheckpointManifestReader::try_new(engine.clone(), checkpoint_file, log_root)?;
let mut file_paths = vec![];
for result in manifest_phase.by_ref() {
let batch = result?;
let ActionsBatch {
actions,
is_log_batch,
} = batch;
assert!(!is_log_batch, "Manifest should not be a log batch");
let record_batch = actions.try_into_record_batch()?;
let add = record_batch.column_by_name("add").unwrap();
let add_struct = add.as_any().downcast_ref::<StructArray>().unwrap();
let path = add_struct
.column_by_name("path")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec();
file_paths.extend(batch_paths);
}
file_paths.sort();
assert_eq!(
file_paths, expected_add_paths,
"CheckpointManifestReader should extract expected Add file paths from checkpoint"
);
let actual_sidecars = manifest_phase.extract_sidecars()?;
assert_eq!(
actual_sidecars.len(),
expected_sidecars.len(),
"Should collect exactly {} actual_sidecars",
expected_sidecars.len()
);
let mut collected_paths: Vec<String> = actual_sidecars
.iter()
.map(|fm| {
fm.location
.path_segments()
.and_then(|mut segments| segments.next_back())
.unwrap_or("")
.to_string()
})
.collect();
collected_paths.sort();
assert_eq!(collected_paths, expected_sidecars.to_vec());
Ok(())
}
#[test]
fn test_manifest_phase_extracts_file_paths() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?;
verify_manifest_phase(
engine,
snapshot,
&["part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet"],
&[],
)
}
#[test]
fn test_manifest_phase_early_finalize_error() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?;
let manifest_phase = CheckpointManifestReader::try_new(
engine.clone(),
&snapshot.log_segment().listed.checkpoint_parts[0],
snapshot.log_segment().log_root.clone(),
)?;
let result = manifest_phase.extract_sidecars();
assert_result_error_with_message(
result,
"Cannot extract sidecars from in-progress ManifestReader for file",
);
Ok(())
}
#[test]
fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-json-with-sidecars")?;
verify_manifest_phase(
engine,
snapshot,
&[],
&[
"00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet",
"00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet",
],
)
}
#[test]
fn test_manifest_phase_collects_sidecars_parquet() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-parquet-with-sidecars")?;
verify_manifest_phase(
engine,
snapshot,
&[],
&[
"00000000000000000006.checkpoint.0000000001.0000000002.76931b15-ead3-480d-b86c-afe55a577fc3.parquet",
"00000000000000000006.checkpoint.0000000002.0000000002.4367b29c-0e87-447f-8e81-9814cc01ad1f.parquet",
],
)
}
}