use std::slice;
use std::sync::{Arc, LazyLock};
use url::Url;
use crate::actions::visitors::InCommitTimestampVisitor;
use crate::actions::{
CommitInfo, Metadata, Protocol, COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME,
};
use crate::commit_range::with_version_context;
use crate::engine_data::RowVisitor as _;
use crate::path::ParsedLogPath;
use crate::schema::{SchemaRef, StructField, StructType, ToSchema as _};
use crate::table_configuration::{InCommitTimestampEnablement, TableConfiguration};
use crate::table_features::{ensure_table_can_be_read, Operation};
use crate::{DeltaResult, Engine, Error, FileDataReadResultIterator, Version};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DeltaAction {
Add,
Remove,
Metadata,
Protocol,
CommitInfo,
Cdc,
DomainMetadata,
SetTxn,
CheckpointMetadata,
Sidecar,
}
static HEADER_READ_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([
StructField::nullable(PROTOCOL_NAME, Protocol::to_schema()),
StructField::nullable(METADATA_NAME, Metadata::to_schema()),
StructField::nullable(COMMIT_INFO_NAME, CommitInfo::to_schema()),
]))
});
pub struct CommitAction {
table_root: Url,
log_path: ParsedLogPath,
read_schema: SchemaRef,
protocol: Option<Protocol>,
metadata: Option<Metadata>,
timestamp: i64,
}
impl CommitAction {
pub(crate) fn try_new(
engine: &dyn Engine,
table_root: Url,
log_path: ParsedLogPath,
read_schema: SchemaRef,
seed_protocol: Option<Protocol>,
seed_metadata: Option<Metadata>,
) -> DeltaResult<Self> {
let timestamp = log_path.location.last_modified;
let mut this = Self {
table_root,
log_path,
read_schema,
protocol: seed_protocol,
metadata: seed_metadata,
timestamp,
};
let extracted_ict = this.read_commit_header(engine)?;
let table_config = match (&this.protocol, &this.metadata) {
(Some(protocol), Some(metadata)) => Some(TableConfiguration::try_new(
metadata.clone(),
protocol.clone(),
this.table_root.clone(),
this.version(),
)?),
_ => None,
};
this.protocol_validation(&table_config)?;
this.resolve_timestamp(&table_config, extracted_ict)?;
Ok(this)
}
pub fn version(&self) -> Version {
self.log_path.version
}
pub fn timestamp(&self) -> i64 {
self.timestamp
}
pub(crate) fn protocol(&self) -> Option<&Protocol> {
self.protocol.as_ref()
}
pub(crate) fn metadata(&self) -> Option<&Metadata> {
self.metadata.as_ref()
}
fn read_commit_header(&mut self, engine: &dyn Engine) -> DeltaResult<Option<i64>> {
let json_iter = engine.json_handler().read_json_files(
slice::from_ref(&self.log_path.location),
HEADER_READ_SCHEMA.clone(),
None,
)?;
let mut extracted_protocol: Option<Protocol> = None;
let mut extracted_metadata: Option<Metadata> = None;
let mut ict_visitor = InCommitTimestampVisitor::default();
for (batch_index, batch_res) in json_iter.enumerate() {
let batch = batch_res?;
if batch_index == 0 {
ict_visitor.visit_rows_of(batch.as_ref())?;
}
if extracted_protocol.is_none() {
extracted_protocol = Protocol::try_new_from_data(batch.as_ref())?;
}
if extracted_metadata.is_none() {
extracted_metadata = Metadata::try_new_from_data(batch.as_ref())?;
}
if extracted_protocol.is_some() && extracted_metadata.is_some() {
break;
}
}
if extracted_protocol.is_some() {
self.protocol = extracted_protocol;
}
if extracted_metadata.is_some() {
self.metadata = extracted_metadata;
}
Ok(ict_visitor.in_commit_timestamp)
}
fn resolve_timestamp(
&mut self,
table_config: &Option<TableConfiguration>,
extracted_ict: Option<i64>,
) -> DeltaResult<()> {
let version = self.version();
self.timestamp = match table_config {
Some(table_config) => {
let ict_applies = match table_config.in_commit_timestamp_enablement()? {
InCommitTimestampEnablement::NotEnabled => false,
InCommitTimestampEnablement::Enabled { enablement: None } => true,
InCommitTimestampEnablement::Enabled {
enablement: Some((enablement_version, _)),
} => version >= enablement_version,
};
if ict_applies {
extracted_ict.ok_or_else(|| {
with_version_context(version, Error::generic(
"in-commit timestamp is enabled but missing ICT timestamp field in commit"
))
})?
} else {
self.log_path.location.last_modified
}
}
None => extracted_ict.unwrap_or(self.log_path.location.last_modified),
};
Ok(())
}
fn protocol_validation(&self, table_config: &Option<TableConfiguration>) -> DeltaResult<()> {
match (table_config, &self.protocol) {
(Some(table_config), _) => table_config.ensure_operation_supported(Operation::Scan),
(None, Some(protocol)) => ensure_table_can_be_read(protocol),
(None, None) => Ok(()),
}
}
pub fn get_actions(&self, engine: &dyn Engine) -> DeltaResult<FileDataReadResultIterator> {
engine.json_handler().read_json_files(
slice::from_ref(&self.log_path.location),
self.read_schema.clone(),
None,
)
}
}