use std::sync::{Arc, LazyLock, OnceLock};
use crate::action_reconciliation::log_replay::{
ActionReconciliationBatch, ActionReconciliationProcessor,
};
use crate::action_reconciliation::{
ActionReconciliationIterator, ActionReconciliationIteratorState, RetentionCalculator,
};
use crate::actions::{
Add, DomainMetadata, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME,
CHECKPOINT_METADATA_NAME, DOMAIN_METADATA_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
SET_TRANSACTION_NAME, SIDECAR_NAME,
};
use crate::engine_data::FilteredEngineData;
use crate::expressions::{Expression, Scalar, StructData, Transform};
use crate::last_checkpoint_hint::LastCheckpointHint;
use crate::log_replay::LogReplayProcessor;
use crate::path::ParsedLogPath;
use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _};
use crate::snapshot::SnapshotRef;
use crate::table_features::TableFeature;
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Engine, EngineData, Error, EvaluationHandlerExtension, FileMeta};
use url::Url;
mod checkpoint_transform;
use checkpoint_transform::{
build_checkpoint_output_schema, build_checkpoint_read_schema, build_checkpoint_transform,
StatsTransformConfig,
};
#[cfg(test)]
mod tests;
static LAST_CHECKPOINT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructType::new_unchecked([
StructField::not_null("version", DataType::LONG),
StructField::not_null("size", DataType::LONG),
StructField::nullable("parts", DataType::LONG),
StructField::nullable("sizeInBytes", DataType::LONG),
StructField::nullable("numOfAddFiles", DataType::LONG),
])
.into()
});
fn base_checkpoint_action_fields() -> Vec<StructField> {
vec![
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
StructField::nullable(METADATA_NAME, Metadata::to_schema()),
StructField::nullable(PROTOCOL_NAME, Protocol::to_schema()),
StructField::nullable(SET_TRANSACTION_NAME, SetTransaction::to_schema()),
StructField::nullable(DOMAIN_METADATA_NAME, DomainMetadata::to_schema()),
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
]
}
static CHECKPOINT_ACTIONS_SCHEMA_V1: LazyLock<SchemaRef> =
LazyLock::new(|| Arc::new(StructType::new_unchecked(base_checkpoint_action_fields())));
fn checkpoint_metadata_field() -> StructField {
StructField::nullable(
CHECKPOINT_METADATA_NAME,
DataType::struct_type_unchecked([StructField::not_null("version", DataType::LONG)]),
)
}
static CHECKPOINT_ACTIONS_SCHEMA_V2: LazyLock<SchemaRef> = LazyLock::new(|| {
let mut fields = base_checkpoint_action_fields();
fields.push(checkpoint_metadata_field());
Arc::new(StructType::new_unchecked(fields))
});
#[derive(Debug, Clone)]
pub struct CheckpointWriter {
pub(crate) snapshot: SnapshotRef,
version: i64,
checkpoint_output_schema: OnceLock<SchemaRef>,
}
impl RetentionCalculator for CheckpointWriter {
fn table_properties(&self) -> &TableProperties {
self.snapshot.table_properties()
}
}
impl CheckpointWriter {
pub(crate) fn try_new(snapshot: SnapshotRef) -> DeltaResult<Self> {
let version = i64::try_from(snapshot.version()).map_err(|e| {
Error::CheckpointWrite(format!(
"Failed to convert checkpoint version from u64 {} to i64: {}",
snapshot.version(),
e
))
})?;
snapshot.log_segment().validate_published()?;
Ok(Self {
snapshot,
version,
checkpoint_output_schema: OnceLock::new(),
})
}
fn get_or_init_output_schema(
&self,
f: impl FnOnce() -> DeltaResult<SchemaRef>,
) -> DeltaResult<SchemaRef> {
if let Some(schema) = self.checkpoint_output_schema.get() {
return Ok(schema.clone());
}
let schema = f()?;
let _ = self.checkpoint_output_schema.set(schema);
self.checkpoint_output_schema
.get()
.cloned()
.ok_or_else(|| Error::internal_error("OnceLock should be initialized"))
}
pub fn checkpoint_path(&self) -> DeltaResult<Url> {
ParsedLogPath::new_classic_parquet_checkpoint(
self.snapshot.table_root(),
self.snapshot.version(),
)
.map(|parsed| parsed.location)
}
pub fn checkpoint_data(
&self,
engine: &dyn Engine,
) -> DeltaResult<ActionReconciliationIterator> {
let config = StatsTransformConfig::from_table_properties(self.snapshot.table_properties());
let tc = self.snapshot.table_configuration();
let physical_clustering_columns = self.snapshot.get_physical_clustering_columns(engine)?;
let stats_schema = tc
.build_expected_stats_schemas(physical_clustering_columns.as_deref(), None)?
.physical;
let is_v2_checkpoints_supported = self
.snapshot
.table_configuration()
.is_feature_supported(&TableFeature::V2Checkpoint);
let base_schema = if is_v2_checkpoints_supported {
&CHECKPOINT_ACTIONS_SCHEMA_V2
} else {
&CHECKPOINT_ACTIONS_SCHEMA_V1
};
let partition_schema = self
.snapshot
.table_configuration()
.build_partition_values_parsed_schema();
let read_schema =
build_checkpoint_read_schema(base_schema, &stats_schema, partition_schema.as_deref())?;
let actions = self
.snapshot
.log_segment()
.read_actions(engine, read_schema.clone())?;
let checkpoint_data = ActionReconciliationProcessor::new(
self.deleted_file_retention_timestamp()?,
self.get_transaction_expiration_timestamp()?,
)
.process_actions_iter(actions);
let output_schema = self.get_or_init_output_schema(|| {
build_checkpoint_output_schema(
&config,
base_schema,
&stats_schema,
partition_schema.as_deref(),
)
})?;
let transform_expr =
build_checkpoint_transform(&config, &stats_schema, partition_schema.as_ref());
let evaluator = engine.evaluation_handler().new_expression_evaluator(
read_schema,
transform_expr,
output_schema.clone().into(),
)?;
let transformed = checkpoint_data.map(move |batch_result| {
let batch = batch_result?;
let (data, sv) = batch.filtered_data.into_parts();
let transformed = evaluator.evaluate(data.as_ref())?;
Ok(ActionReconciliationBatch {
filtered_data: FilteredEngineData::try_new(transformed, sv)?,
actions_count: batch.actions_count,
add_actions_count: batch.add_actions_count,
})
});
let checkpoint_metadata = is_v2_checkpoints_supported
.then(|| self.create_checkpoint_metadata_batch(engine, &output_schema));
Ok(ActionReconciliationIterator::new(Box::new(
transformed.chain(checkpoint_metadata),
)))
}
pub fn finalize(
self,
engine: &dyn Engine,
metadata: &FileMeta,
checkpoint_iter_state: &ActionReconciliationIteratorState,
) -> DeltaResult<()> {
if !checkpoint_iter_state.is_exhausted() {
return Err(Error::checkpoint_write(
"The checkpoint data iterator must be fully consumed and written to storage before calling finalize"
));
}
let size_in_bytes = i64::try_from(metadata.size).map_err(|e| {
Error::CheckpointWrite(format!(
"Failed to convert checkpoint size in bytes from u64 {} to i64: {}, when writing _last_checkpoint",
metadata.size, e
))
})?;
let data = create_last_checkpoint_data(
engine,
self.version,
checkpoint_iter_state.actions_count(),
checkpoint_iter_state.add_actions_count(),
size_in_bytes,
);
let last_checkpoint_path = LastCheckpointHint::path(&self.snapshot.log_segment().log_root)?;
let filtered_data = FilteredEngineData::with_all_rows_selected(data?);
engine.json_handler().write_json_file(
&last_checkpoint_path,
Box::new(std::iter::once(Ok(filtered_data))),
true,
)?;
Ok(())
}
fn create_checkpoint_metadata_batch(
&self,
engine: &dyn Engine,
schema: &SchemaRef,
) -> DeltaResult<ActionReconciliationBatch> {
let null_row = engine.evaluation_handler().null_row(schema.clone())?;
let checkpoint_metadata_value = Scalar::Struct(StructData::try_new(
vec![StructField::not_null("version", DataType::LONG)],
vec![Scalar::from(self.version)],
)?);
let transform = Transform::new_top_level().with_replaced_field(
CHECKPOINT_METADATA_NAME,
Arc::new(Expression::literal(checkpoint_metadata_value)),
);
let evaluator = engine.evaluation_handler().new_expression_evaluator(
schema.clone(),
Arc::new(Expression::transform(transform)),
schema.clone().into(),
)?;
let checkpoint_metadata_batch = evaluator.evaluate(null_row.as_ref())?;
let filtered_data = FilteredEngineData::with_all_rows_selected(checkpoint_metadata_batch);
Ok(ActionReconciliationBatch {
filtered_data,
actions_count: 1,
add_actions_count: 0,
})
}
}
pub(crate) fn create_last_checkpoint_data(
engine: &dyn Engine,
version: i64,
actions_counter: i64,
add_actions_counter: i64,
size_in_bytes: i64,
) -> DeltaResult<Box<dyn EngineData>> {
engine.evaluation_handler().create_one(
LAST_CHECKPOINT_SCHEMA.clone(),
&[
version.into(),
actions_counter.into(),
None::<i64>.into(), size_in_bytes.into(),
add_actions_counter.into(),
],
)
}