use std::sync::{Arc, LazyLock, Mutex, OnceLock};
use tracing::info;
use url::Url;
use crate::action_reconciliation::log_replay::{
ActionReconciliationBatch, ActionReconciliationProcessor,
};
use crate::action_reconciliation::{
ActionReconciliationIterator, ActionReconciliationIteratorState, RetentionCalculator,
};
use crate::actions::{
Add, DomainMetadata, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME,
CHECKPOINT_METADATA_NAME, DOMAIN_METADATA_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
SET_TRANSACTION_NAME, SIDECAR_NAME,
};
use crate::engine_data::FilteredEngineData;
use crate::expressions::{Expression, Scalar, StructData, Transform};
use crate::last_checkpoint_hint::LastCheckpointHint;
use crate::log_replay::LogReplayProcessor;
use crate::path::{self, ParsedLogPath};
use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _};
use crate::snapshot::SnapshotRef;
use crate::table_features::TableFeature;
use crate::table_properties::TableProperties;
use crate::{
DeltaResult, Engine, EngineData, Error, EvaluationHandlerExtension, FileMeta, Version,
};
mod checkpoint_transform;
mod sidecar;
use checkpoint_transform::{
build_checkpoint_output_schema, build_checkpoint_read_schema, build_checkpoint_transform,
StatsTransformConfig,
};
use sidecar::{create_sidecar_action_batch, SidecarSplitter, SingleSidecarDataIterator};
#[cfg(test)]
mod tests;
#[derive(Debug)]
pub struct LastCheckpointHintStats {
num_actions: i64,
size_in_bytes: i64,
num_of_add_files: i64,
}
impl LastCheckpointHintStats {
pub fn from_reconciliation_state(
state: ActionReconciliationIteratorState,
size_in_bytes: u64,
num_sidecars: u64,
) -> DeltaResult<Self> {
if !state.is_exhausted() {
return Err(Error::checkpoint_write(
"Cannot build LastCheckpointHintStats: the reconciliation iterator must be fully \
consumed and all data written to storage before finalizing",
));
}
let size_in_bytes = i64::try_from(size_in_bytes).map_err(|e| {
Error::checkpoint_write(format!("size_in_bytes {size_in_bytes} exceeds i64: {e}"))
})?;
let num_sidecars_i64 = i64::try_from(num_sidecars).map_err(|e| {
Error::checkpoint_write(format!("num_sidecars {num_sidecars} exceeds i64: {e}"))
})?;
let num_actions = state
.actions_count()
.checked_add(num_sidecars_i64)
.ok_or_else(|| {
Error::checkpoint_write(format!(
"checkpoint action count overflowed i64: {} + {num_sidecars}",
state.actions_count()
))
})?;
Ok(Self {
num_actions,
size_in_bytes,
num_of_add_files: state.add_actions_count(),
})
}
}
#[derive(Debug)]
pub(crate) struct WrittenCheckpointInfo {
pub(crate) file_meta: FileMeta,
pub(crate) last_checkpoint_stats: LastCheckpointHintStats,
}
struct CheckpointSchemaContext {
stats_config: StatsTransformConfig,
checkpoint_base_schema: SchemaRef,
stats_schema: SchemaRef,
partition_schema: Option<SchemaRef>,
is_v2: bool,
}
pub const DEFAULT_FILE_ACTIONS_PER_SIDECAR_HINT: usize = 50_000;
#[derive(Debug)]
pub enum CheckpointSpec {
V1,
V2(V2CheckpointConfig),
}
#[derive(Debug)]
pub enum V2CheckpointConfig {
NoSidecar,
WithSidecar {
file_actions_per_sidecar_hint: Option<usize>,
},
}
static LAST_CHECKPOINT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructType::new_unchecked([
StructField::not_null("version", DataType::LONG),
StructField::not_null("size", DataType::LONG),
StructField::nullable("parts", DataType::LONG),
StructField::nullable("sizeInBytes", DataType::LONG),
StructField::nullable("numOfAddFiles", DataType::LONG),
])
.into()
});
fn base_checkpoint_action_fields() -> Vec<StructField> {
vec![
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
StructField::nullable(METADATA_NAME, Metadata::to_schema()),
StructField::nullable(PROTOCOL_NAME, Protocol::to_schema()),
StructField::nullable(SET_TRANSACTION_NAME, SetTransaction::to_schema()),
StructField::nullable(DOMAIN_METADATA_NAME, DomainMetadata::to_schema()),
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
]
}
static CHECKPOINT_ACTIONS_SCHEMA_V1: LazyLock<SchemaRef> =
LazyLock::new(|| Arc::new(StructType::new_unchecked(base_checkpoint_action_fields())));
fn checkpoint_metadata_field() -> StructField {
StructField::nullable(
CHECKPOINT_METADATA_NAME,
DataType::struct_type_unchecked([StructField::not_null("version", DataType::LONG)]),
)
}
static CHECKPOINT_ACTIONS_SCHEMA_V2: LazyLock<SchemaRef> = LazyLock::new(|| {
let mut fields = base_checkpoint_action_fields();
fields.push(checkpoint_metadata_field());
Arc::new(StructType::new_unchecked(fields))
});
#[derive(Debug, Clone)]
pub struct CheckpointWriter {
pub(crate) snapshot: SnapshotRef,
version: i64,
checkpoint_output_schema: OnceLock<SchemaRef>,
}
impl RetentionCalculator for CheckpointWriter {
fn table_properties(&self) -> &TableProperties {
self.snapshot.table_properties()
}
}
impl CheckpointWriter {
pub(crate) fn try_new(snapshot: SnapshotRef) -> DeltaResult<Self> {
let version = i64::try_from(snapshot.version()).map_err(|e| {
Error::CheckpointWrite(format!(
"Failed to convert checkpoint version from u64 {} to i64: {}",
snapshot.version(),
e
))
})?;
snapshot.log_segment().validate_published()?;
Ok(Self {
snapshot,
version,
checkpoint_output_schema: OnceLock::new(),
})
}
fn get_or_init_output_schema(
&self,
f: impl FnOnce() -> DeltaResult<SchemaRef>,
) -> DeltaResult<SchemaRef> {
if let Some(schema) = self.checkpoint_output_schema.get() {
return Ok(schema.clone());
}
let schema = f()?;
let _ = self.checkpoint_output_schema.set(schema);
self.checkpoint_output_schema
.get()
.cloned()
.ok_or_else(|| Error::internal_error("OnceLock should be initialized"))
}
pub fn checkpoint_path(&self) -> DeltaResult<Url> {
ParsedLogPath::new_classic_parquet_checkpoint(
self.snapshot.table_root(),
self.snapshot.version(),
)
.map(|parsed| parsed.location)
}
pub fn checkpoint_data(
&self,
engine: &dyn Engine,
) -> DeltaResult<ActionReconciliationIterator> {
let schema_context = self.checkpoint_schema_context(engine)?;
let read_schema = build_checkpoint_read_schema(
&schema_context.checkpoint_base_schema,
&schema_context.stats_schema,
schema_context.partition_schema.as_deref(),
)?;
let actions = self
.snapshot
.log_segment()
.read_actions(engine, read_schema.clone())?;
let checkpoint_data = ActionReconciliationProcessor::new(
self.deleted_file_retention_timestamp()?,
self.get_transaction_expiration_timestamp()?,
)
.process_actions_iter(actions);
let output_schema = self.get_or_init_output_schema(|| {
build_checkpoint_output_schema(
&schema_context.stats_config,
&schema_context.checkpoint_base_schema,
&schema_context.stats_schema,
schema_context.partition_schema.as_deref(),
)
})?;
let transform_expr = build_checkpoint_transform(
&schema_context.stats_config,
&schema_context.stats_schema,
schema_context.partition_schema.as_ref(),
);
let evaluator = engine.evaluation_handler().new_expression_evaluator(
read_schema,
transform_expr,
output_schema.clone().into(),
)?;
let transformed = checkpoint_data.map(move |batch_result| {
let batch = batch_result?;
let (data, sv) = batch.filtered_data.into_parts();
let transformed = evaluator.evaluate(data.as_ref())?;
Ok(ActionReconciliationBatch {
filtered_data: FilteredEngineData::try_new(transformed, sv)?,
actions_count: batch.actions_count,
add_actions_count: batch.add_actions_count,
})
});
let checkpoint_metadata = schema_context
.is_v2
.then(|| self.create_checkpoint_metadata_batch(engine, &output_schema));
Ok(ActionReconciliationIterator::new(Box::new(
transformed.chain(checkpoint_metadata),
)))
}
pub fn finalize(
self,
engine: &dyn Engine,
last_checkpoint_stats: &LastCheckpointHintStats,
) -> DeltaResult<()> {
let checkpoint_version = self.snapshot.version();
if let Some(hint_version) = self.snapshot.log_segment().last_checkpoint_version() {
if hint_version > checkpoint_version {
info!(
hint_version,
checkpoint_version,
"Skipping _last_checkpoint write: existing hint is newer than checkpoint"
);
return Ok(());
}
}
let data = create_last_checkpoint_data(
engine,
self.version,
last_checkpoint_stats.num_actions,
last_checkpoint_stats.num_of_add_files,
last_checkpoint_stats.size_in_bytes,
);
let last_checkpoint_path = LastCheckpointHint::path(&self.snapshot.log_segment().log_root)?;
let filtered_data = FilteredEngineData::with_all_rows_selected(data?);
engine.json_handler().write_json_file(
&last_checkpoint_path,
Box::new(std::iter::once(Ok(filtered_data))),
true,
)?;
Ok(())
}
pub(crate) fn write_v2_checkpoint_with_sidecars(
&self,
engine: &dyn Engine,
file_actions_per_sidecar_hint: usize,
) -> DeltaResult<WrittenCheckpointInfo> {
let output_schema = self.get_or_init_output_schema(|| {
let ctx = self.checkpoint_schema_context(engine)?;
build_checkpoint_output_schema(
&ctx.stats_config,
&ctx.checkpoint_base_schema,
&ctx.stats_schema,
ctx.partition_schema.as_deref(),
)
})?;
let data_iter = self.checkpoint_data(engine)?;
let iter_state = data_iter.state();
let splitter = SidecarSplitter::new_mut_shared(
data_iter,
engine.evaluation_handler().as_ref(),
output_schema.clone(),
)?;
let mut sidecar_metas: Vec<(String, FileMeta)> = Vec::new();
loop {
if let Some(entry) = write_single_sidecar(
engine,
&splitter,
file_actions_per_sidecar_hint,
self.snapshot.table_root(),
self.snapshot.version(),
)? {
sidecar_metas.push(entry);
}
let is_exhausted = splitter
.lock()
.map_err(|e| Error::internal_error(format!("sidecar splitter lock poisoned: {e}")))?
.is_exhausted();
if is_exhausted {
break;
}
}
let non_file_batches = Arc::into_inner(splitter)
.ok_or_else(|| {
Error::internal_error("sidecar splitter Arc should have no other references")
})?
.into_inner()
.map_err(|e| Error::internal_error(format!("sidecar splitter lock poisoned: {e}")))?
.into_non_file_batches();
let sidecar_batch = create_sidecar_action_batch(engine, &output_schema, &sidecar_metas)?;
let checkpoint_path = self.checkpoint_path()?;
let main_data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send> =
Box::new(non_file_batches.into_iter().chain(sidecar_batch).map(Ok));
engine
.parquet_handler()
.write_parquet_file(checkpoint_path.clone(), main_data)?;
let sidecar_sizes_sum = sidecar_metas
.iter()
.try_fold(0u64, |acc, (_, m)| acc.checked_add(m.size))
.ok_or_else(|| Error::internal_error("sidecar sizes sum overflowed u64"))?;
let sidecar_count = sidecar_metas.len() as u64;
build_written_checkpoint_info(
engine,
&checkpoint_path,
iter_state,
sidecar_sizes_sum,
sidecar_count,
)
}
pub(crate) fn write_checkpoint_without_sidecars(
&self,
engine: &dyn Engine,
) -> DeltaResult<WrittenCheckpointInfo> {
let checkpoint_path = self.checkpoint_path()?;
let data_iter = self.checkpoint_data(engine)?;
let state = data_iter.state();
let lazy_data = data_iter.map(|r| r.and_then(|f| f.apply_selection_vector()));
engine
.parquet_handler()
.write_parquet_file(checkpoint_path.clone(), Box::new(lazy_data))?;
build_written_checkpoint_info(
engine,
&checkpoint_path,
state,
0,
0,
)
}
fn create_checkpoint_metadata_batch(
&self,
engine: &dyn Engine,
schema: &SchemaRef,
) -> DeltaResult<ActionReconciliationBatch> {
let null_row = engine.evaluation_handler().null_row(schema.clone())?;
let checkpoint_metadata_value = Scalar::Struct(StructData::try_new(
vec![StructField::not_null("version", DataType::LONG)],
vec![Scalar::from(self.version)],
)?);
let transform = Transform::new_top_level().with_replaced_field(
CHECKPOINT_METADATA_NAME,
Arc::new(Expression::literal(checkpoint_metadata_value)),
);
let evaluator = engine.evaluation_handler().new_expression_evaluator(
schema.clone(),
Arc::new(Expression::transform(transform)),
schema.clone().into(),
)?;
let checkpoint_metadata_batch = evaluator.evaluate(null_row.as_ref())?;
let filtered_data = FilteredEngineData::with_all_rows_selected(checkpoint_metadata_batch);
Ok(ActionReconciliationBatch {
filtered_data,
actions_count: 1,
add_actions_count: 0,
})
}
fn checkpoint_schema_context(
&self,
engine: &dyn Engine,
) -> DeltaResult<CheckpointSchemaContext> {
let tc = self.snapshot.table_configuration();
let config = StatsTransformConfig::from_table_properties(self.snapshot.table_properties());
let is_v2 = tc.is_feature_supported(&TableFeature::V2Checkpoint);
let base_schema = if is_v2 {
CHECKPOINT_ACTIONS_SCHEMA_V2.clone()
} else {
CHECKPOINT_ACTIONS_SCHEMA_V1.clone()
};
let physical_clustering_columns = self.snapshot.get_physical_clustering_columns(engine)?;
let stats_schema = tc
.build_expected_stats_schemas(physical_clustering_columns.as_deref(), None)?
.physical;
let partition_schema = tc.build_partition_values_parsed_schema();
Ok(CheckpointSchemaContext {
stats_config: config,
checkpoint_base_schema: base_schema,
stats_schema,
partition_schema,
is_v2,
})
}
}
pub(crate) fn create_last_checkpoint_data(
engine: &dyn Engine,
version: i64,
actions_counter: i64,
add_actions_counter: i64,
size_in_bytes: i64,
) -> DeltaResult<Box<dyn EngineData>> {
engine.evaluation_handler().create_one(
LAST_CHECKPOINT_SCHEMA.clone(),
&[
version.into(),
actions_counter.into(),
None::<i64>.into(), size_in_bytes.into(),
add_actions_counter.into(),
],
)
}
fn write_single_sidecar(
engine: &dyn Engine,
splitter: &Arc<Mutex<SidecarSplitter>>,
file_actions_per_sidecar_hint: usize,
table_root: &Url,
version: Version,
) -> DeltaResult<Option<(String, FileMeta)>> {
let mut iter =
SingleSidecarDataIterator::new(splitter.clone(), file_actions_per_sidecar_hint)?.peekable();
if iter.peek().is_none() {
return Ok(None);
}
let (filename, sidecar_url) = path::new_sidecar(table_root, version)?;
engine
.parquet_handler()
.write_parquet_file(sidecar_url.clone(), Box::new(iter))?;
let meta = engine.storage_handler().head(&sidecar_url)?;
Ok(Some((filename, meta)))
}
fn build_written_checkpoint_info(
engine: &dyn Engine,
checkpoint_path: &Url,
state: Arc<ActionReconciliationIteratorState>,
sidecar_sizes_sum: u64,
sidecar_count: u64,
) -> DeltaResult<WrittenCheckpointInfo> {
let file_meta = engine.storage_handler().head(checkpoint_path)?;
let total_size_in_bytes = file_meta
.size
.checked_add(sidecar_sizes_sum)
.ok_or_else(|| Error::internal_error("checkpoint total size_in_bytes overflowed u64"))?;
let state = Arc::into_inner(state).ok_or_else(|| {
Error::internal_error("ActionReconciliationIteratorState Arc has other references")
})?;
let last_checkpoint_stats = LastCheckpointHintStats::from_reconciliation_state(
state,
total_size_in_bytes,
sidecar_count,
)?;
Ok(WrittenCheckpointInfo {
file_meta,
last_checkpoint_stats,
})
}