use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;
use delta_kernel_derive::internal_api;
use tracing::{debug, info, instrument, warn};
use url::Url;
use crate::action_reconciliation::calculate_transaction_expiration_timestamp;
use crate::actions::set_transaction::{is_set_txn_expired, SetTransactionScanner};
use crate::actions::{DomainMetadata, INTERNAL_DOMAIN_PREFIX};
use crate::checkpoint::CheckpointWriter;
use crate::clustering::{parse_clustering_columns, CLUSTERING_DOMAIN_NAME};
use crate::committer::{Committer, PublishMetadata};
#[cfg(any(test, feature = "test-utils"))]
use crate::crc::Crc;
use crate::crc::{try_write_crc_file, CrcDelta, LazyCrc};
use crate::expressions::ColumnName;
use crate::log_segment::{DomainMetadataMap, LogSegment};
use crate::log_segment_files::LogSegmentFiles;
use crate::metrics::{MetricEvent, MetricId};
use crate::path::ParsedLogPath;
use crate::scan::ScanBuilder;
use crate::schema::SchemaRef;
use crate::table_configuration::{InCommitTimestampEnablement, TableConfiguration};
use crate::table_features::{physical_to_logical_column_name, ColumnMappingMode, TableFeature};
use crate::table_properties::TableProperties;
use crate::transaction::Transaction;
use crate::utils::require;
use crate::LogCompactionWriter;
use crate::{DeltaResult, Engine, Error, Version};
mod builder;
pub use builder::SnapshotBuilder;
pub type SnapshotRef = Arc<Snapshot>;
#[allow(unused)]
#[internal_api]
pub(crate) type FileStats = crate::crc::FileStats;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChecksumWriteResult {
AlreadyExists,
Written,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CheckpointWriteResult {
AlreadyExists,
Written,
}
pub struct Snapshot {
span: tracing::Span,
log_segment: LogSegment,
table_configuration: TableConfiguration,
lazy_crc: Arc<LazyCrc>,
}
impl PartialEq for Snapshot {
fn eq(&self, other: &Self) -> bool {
self.log_segment == other.log_segment
&& self.table_configuration == other.table_configuration
}
}
impl Eq for Snapshot {}
impl Drop for Snapshot {
fn drop(&mut self) {
debug!("Dropping snapshot");
}
}
impl std::fmt::Debug for Snapshot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Snapshot")
.field("path", &self.log_segment.log_root.as_str())
.field("version", &self.version())
.field("metadata", &self.table_configuration().metadata())
.field("log_segment", &self.log_segment)
.finish()
}
}
impl Snapshot {
pub fn builder_for(table_root: impl AsRef<str>) -> SnapshotBuilder {
SnapshotBuilder::new_for(table_root)
}
pub fn builder_from(existing_snapshot: SnapshotRef) -> SnapshotBuilder {
SnapshotBuilder::new_from(existing_snapshot)
}
#[internal_api]
pub(crate) fn new(log_segment: LogSegment, table_configuration: TableConfiguration) -> Self {
Self::new_with_crc(
log_segment,
table_configuration,
Arc::new(LazyCrc::new(None)),
)
}
pub(crate) fn new_with_crc(
log_segment: LogSegment,
table_configuration: TableConfiguration,
lazy_crc: Arc<LazyCrc>,
) -> Self {
let span = tracing::info_span!(
parent: tracing::Span::none(),
"snap",
path = %table_configuration.table_root(),
version = table_configuration.version(),
);
info!(parent: &span, "Created snapshot");
Self {
span,
log_segment,
table_configuration,
lazy_crc,
}
}
fn try_new_from_impl(
existing_snapshot: Arc<Snapshot>,
log_tail: Vec<ParsedLogPath>,
engine: &dyn Engine,
target_version: impl Into<Option<Version>>,
operation_id: MetricId,
) -> DeltaResult<Arc<Self>> {
let old_log_segment = &existing_snapshot.log_segment;
let old_version = existing_snapshot.version();
let requested_version = target_version.into();
if let Some(requested_version) = requested_version {
if requested_version == old_version {
return Ok(existing_snapshot.clone());
}
if requested_version < old_version {
return Err(Error::Generic(format!(
"Requested snapshot version {requested_version} is older than snapshot hint version {old_version}"
)));
}
}
let log_root = old_log_segment.log_root.clone();
let storage = engine.storage_handler();
let listing_start = old_log_segment.checkpoint_version.unwrap_or(0) + 1;
let new_listed_files = LogSegmentFiles::list(
storage.as_ref(),
&log_root,
log_tail,
Some(listing_start),
requested_version,
)?;
if new_listed_files.ascending_commit_files().is_empty()
&& new_listed_files.checkpoint_parts().is_empty()
{
match requested_version {
Some(requested_version) if requested_version != old_version => {
return Err(Error::Generic(format!(
"Requested snapshot version {requested_version} is newer than the latest version {old_version}"
)));
}
_ => {
return Ok(existing_snapshot.clone());
}
}
}
let new_latest_commit_file = new_listed_files.latest_commit_file().clone();
let mut new_log_segment =
LogSegment::try_new(new_listed_files, log_root.clone(), requested_version, None)?;
let new_end_version = new_log_segment.end_version;
if new_end_version < old_version {
return Err(Error::Generic(format!(
"Unexpected state: The newest version in the log {new_end_version} is older than the old version {old_version}")));
}
if new_log_segment.checkpoint_version.is_some() {
let snapshot = Self::try_new_from_log_segment_impl(
existing_snapshot.table_root().clone(),
new_log_segment,
engine,
operation_id,
);
return Ok(Arc::new(snapshot?));
}
if new_end_version == old_version {
return Ok(existing_snapshot.clone());
}
new_log_segment
.listed
.ascending_commit_files
.retain(|log_path| old_version < log_path.version);
new_log_segment
.listed
.ascending_compaction_files
.retain(|log_path| old_version < log_path.version);
let (crc_file, lazy_crc) = Self::resolve_crc(
&new_log_segment,
old_log_segment,
&existing_snapshot.lazy_crc,
);
let (new_metadata, new_protocol) =
new_log_segment.read_protocol_metadata_opt(engine, &lazy_crc)?;
let table_configuration = TableConfiguration::try_new_from(
existing_snapshot.table_configuration(),
new_metadata,
new_protocol,
new_log_segment.end_version,
)?;
let mut ascending_commit_files = old_log_segment.listed.ascending_commit_files.clone();
ascending_commit_files.extend(new_log_segment.listed.ascending_commit_files);
let mut ascending_compaction_files =
old_log_segment.listed.ascending_compaction_files.clone();
ascending_compaction_files.extend(new_log_segment.listed.ascending_compaction_files);
let latest_commit_file =
new_latest_commit_file.or_else(|| old_log_segment.listed.latest_commit_file.clone());
let combined_log_segment = LogSegment::try_new(
LogSegmentFiles {
ascending_commit_files,
ascending_compaction_files,
checkpoint_parts: old_log_segment.listed.checkpoint_parts.clone(),
latest_crc_file: crc_file,
latest_commit_file,
max_published_version: new_log_segment
.listed
.max_published_version
.max(old_log_segment.listed.max_published_version),
},
log_root,
requested_version,
old_log_segment.last_checkpoint_hint_summary(),
)?;
Ok(Arc::new(Snapshot::new_with_crc(
combined_log_segment,
table_configuration,
lazy_crc,
)))
}
fn resolve_crc(
new_log_segment: &LogSegment,
old_log_segment: &LogSegment,
existing_lazy_crc: &Arc<LazyCrc>,
) -> (Option<ParsedLogPath>, Arc<LazyCrc>) {
let new_crc_file = new_log_segment.listed.latest_crc_file.clone();
let old_crc_file = old_log_segment.listed.latest_crc_file.clone();
let crc_file = new_crc_file.or(old_crc_file);
let crc_version = crc_file.as_ref().map(|f| f.version);
let lazy_crc = if crc_version == existing_lazy_crc.crc_version() {
existing_lazy_crc.clone()
} else {
Arc::new(LazyCrc::new(crc_file.clone()))
};
(crc_file, lazy_crc)
}
fn try_new_from_log_segment_impl(
location: Url,
log_segment: LogSegment,
engine: &dyn Engine,
operation_id: MetricId,
) -> DeltaResult<Self> {
let reporter = engine.get_metrics_reporter();
let lazy_crc = Arc::new(LazyCrc::new(log_segment.listed.latest_crc_file.clone()));
let start = Instant::now();
let (metadata, protocol) = log_segment.read_protocol_metadata(engine, &lazy_crc)?;
let read_metadata_duration = start.elapsed();
reporter.as_ref().inspect(|r| {
r.report(MetricEvent::ProtocolMetadataLoaded {
operation_id,
duration: read_metadata_duration,
});
});
let table_configuration =
TableConfiguration::try_new(metadata, protocol, location, log_segment.end_version)?;
Ok(Self::new_with_crc(
log_segment,
table_configuration,
lazy_crc,
))
}
pub(crate) fn new_post_commit(
&self,
commit: ParsedLogPath,
crc_delta: CrcDelta,
) -> DeltaResult<Self> {
require!(
commit.is_commit(),
Error::internal_error(format!(
"Cannot create post-commit Snapshot. Log file is not a commit file. \
Path: {}, Type: {:?}.",
commit.location.location, commit.file_type
))
);
let read_version = self.version();
let new_version = commit.version;
require!(
new_version == read_version.wrapping_add(1),
Error::internal_error(format!(
"Cannot create post-commit Snapshot. Log file version ({new_version}) does not \
equal Snapshot version ({read_version}) + 1."
))
);
let new_table_configuration =
TableConfiguration::new_post_commit(self.table_configuration(), new_version);
let new_log_segment = self.log_segment.new_with_commit_appended(commit)?;
let new_lazy_crc = self.compute_post_commit_crc(new_version, crc_delta);
Ok(Snapshot::new_with_crc(
new_log_segment,
new_table_configuration,
new_lazy_crc,
))
}
fn compute_post_commit_crc(&self, new_version: Version, crc_delta: CrcDelta) -> Arc<LazyCrc> {
let crc = if self.version() == crate::PRE_COMMIT_VERSION {
crc_delta.into_crc_for_version_zero()
} else {
self.lazy_crc
.get_if_loaded_at_version(self.version())
.map(|base| {
let mut crc = base.as_ref().clone();
crc.apply(crc_delta);
crc
})
};
match crc {
Some(c) => Arc::new(LazyCrc::new_precomputed(c, new_version)),
None => self.lazy_crc.clone(),
}
}
pub fn create_checkpoint_writer(self: Arc<Self>) -> DeltaResult<CheckpointWriter> {
CheckpointWriter::try_new(self)
}
#[instrument(parent = &self.span, name = "snap.checkpoint", skip_all, err)]
pub fn checkpoint(
self: &SnapshotRef,
engine: &dyn Engine,
) -> DeltaResult<(CheckpointWriteResult, SnapshotRef)> {
if self.log_segment.checkpoint_version == Some(self.log_segment.end_version) {
info!(
"Checkpoint already exists for snapshot version {}",
self.version()
);
return Ok((CheckpointWriteResult::AlreadyExists, Arc::clone(self)));
}
let writer = Arc::clone(self).create_checkpoint_writer()?;
let checkpoint_path = writer.checkpoint_path()?;
let data_iter = writer.checkpoint_data(engine)?;
let state = data_iter.state();
let lazy_data = data_iter.map(|r| r.and_then(|f| f.apply_selection_vector()));
match engine
.parquet_handler()
.write_parquet_file(checkpoint_path.clone(), Box::new(lazy_data))
{
Ok(()) => (),
Err(Error::FileAlreadyExists(_)) => {
warn!(
"ParquetHandler::write_parquet_file unexpectedly failed on \
FileAlreadyExists for version {}",
self.version()
);
return Ok((CheckpointWriteResult::AlreadyExists, Arc::clone(self)));
}
Err(e) => return Err(e),
}
let file_meta = engine.storage_handler().head(&checkpoint_path)?;
writer.finalize(engine, &file_meta, &state)?;
let checkpoint_log_path = ParsedLogPath::try_from(file_meta)?.ok_or_else(|| {
Error::internal_error("Checkpoint path could not be parsed as a log path")
})?;
let new_log_segment = self
.log_segment
.try_new_with_checkpoint(checkpoint_log_path)?;
Ok((
CheckpointWriteResult::Written,
Arc::new(Snapshot::new_with_crc(
new_log_segment,
self.table_configuration().clone(),
self.lazy_crc.clone(),
)),
))
}
pub fn log_compaction_writer(
self: Arc<Self>,
start_version: Version,
end_version: Version,
) -> DeltaResult<LogCompactionWriter> {
LogCompactionWriter::try_new(self, start_version, end_version)
}
#[internal_api]
pub(crate) fn log_segment(&self) -> &LogSegment {
&self.log_segment
}
pub fn table_root(&self) -> &Url {
self.table_configuration.table_root()
}
pub fn version(&self) -> Version {
self.table_configuration().version()
}
pub fn schema(&self) -> SchemaRef {
self.table_configuration.logical_schema()
}
pub fn table_properties(&self) -> &TableProperties {
self.table_configuration().table_properties()
}
#[allow(unused)]
#[internal_api]
pub(crate) fn get_protocol_derived_properties(&self) -> HashMap<String, String> {
let protocol = self.table_configuration().protocol();
let mut properties = HashMap::from([
(
"delta.minReaderVersion".into(),
protocol.min_reader_version().to_string(),
),
(
"delta.minWriterVersion".into(),
protocol.min_writer_version().to_string(),
),
]);
let features = protocol
.reader_features()
.into_iter()
.flatten()
.chain(protocol.writer_features().into_iter().flatten());
for feature in features {
properties
.entry(format!("delta.feature.{}", feature.as_ref()))
.or_insert_with(|| "supported".to_string());
}
properties
}
#[allow(unused)]
#[internal_api]
pub(crate) fn metadata_configuration(&self) -> &HashMap<String, String> {
self.table_configuration().metadata().configuration()
}
#[internal_api]
pub(crate) fn table_configuration(&self) -> &TableConfiguration {
&self.table_configuration
}
pub fn scan_builder(self: Arc<Self>) -> ScanBuilder {
ScanBuilder::new(self)
}
pub fn transaction(
self: Arc<Self>,
committer: Box<dyn Committer>,
engine: &dyn Engine,
) -> DeltaResult<Transaction> {
Transaction::try_new_existing_table(self, committer, engine)
}
#[instrument(parent = &self.span, name = "snap.get_app_id_version", skip_all, err)]
pub fn get_app_id_version(
&self,
application_id: &str,
engine: &dyn Engine,
) -> DeltaResult<Option<i64>> {
let expiration_timestamp =
calculate_transaction_expiration_timestamp(self.table_properties())?;
if let Some(crc) = self
.lazy_crc
.get_or_load_if_at_version(engine, self.version())
{
if let Some(txn_map) = &crc.set_transactions {
return Ok(txn_map
.get(application_id)
.filter(|txn| !is_set_txn_expired(expiration_timestamp, txn.last_updated))
.map(|txn| txn.version));
}
}
let txn = SetTransactionScanner::get_one(
self.log_segment(),
application_id,
engine,
expiration_timestamp,
)?;
Ok(txn.map(|t| t.version))
}
pub fn get_domain_metadata(
&self,
domain: &str,
engine: &dyn Engine,
) -> DeltaResult<Option<String>> {
if domain.starts_with(INTERNAL_DOMAIN_PREFIX) {
return Err(Error::generic(
"User DomainMetadata are not allowed to use system-controlled 'delta.*' domain",
));
}
self.get_domain_metadata_internal(domain, engine)
}
#[allow(unused)]
#[internal_api]
pub(crate) fn get_logical_clustering_columns(
&self,
engine: &dyn Engine,
) -> DeltaResult<Option<Vec<ColumnName>>> {
let physical_columns = match self.get_physical_clustering_columns(engine)? {
Some(cols) => cols,
None => return Ok(None),
};
let column_mapping_mode = self.table_configuration.column_mapping_mode();
if column_mapping_mode == ColumnMappingMode::None {
return Ok(Some(physical_columns));
}
let logical_schema = self.table_configuration.logical_schema();
let logical_columns = physical_columns
.iter()
.map(|physical_col| {
physical_to_logical_column_name(&logical_schema, physical_col, column_mapping_mode)
})
.collect::<DeltaResult<Vec<_>>>()?;
Ok(Some(logical_columns))
}
#[internal_api]
pub(crate) fn get_physical_clustering_columns(
&self,
engine: &dyn Engine,
) -> DeltaResult<Option<Vec<ColumnName>>> {
if !self
.table_configuration
.protocol()
.has_table_feature(&TableFeature::ClusteredTable)
{
return Ok(None);
}
match self.get_domain_metadata_internal(CLUSTERING_DOMAIN_NAME, engine)? {
Some(config) => Ok(Some(parse_clustering_columns(&config)?)),
None => Ok(None),
}
}
#[internal_api]
pub(crate) fn get_domain_metadatas_internal(
&self,
engine: &dyn Engine,
domains: Option<&HashSet<&str>>,
) -> DeltaResult<DomainMetadataMap> {
if let Some(crc) = self
.lazy_crc
.get_or_load_if_at_version(engine, self.version())
{
if let Some(dm_map) = &crc.domain_metadata {
return Ok(match domains {
None => dm_map.clone(),
Some(filter) => dm_map
.iter()
.filter(|(k, _)| filter.contains(k.as_str()))
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
});
}
}
self.log_segment().scan_domain_metadatas(domains, engine)
}
#[allow(unused)]
#[internal_api]
pub(crate) fn get_or_load_file_stats(&self, engine: &dyn Engine) -> Option<FileStats> {
let crc = self
.lazy_crc
.get_or_load_if_at_version(engine, self.version())?;
crc.file_stats()
}
#[internal_api]
pub(crate) fn get_file_stats_if_loaded(&self) -> Option<FileStats> {
let crc = self.lazy_crc.get_if_loaded_at_version(self.version())?;
crc.file_stats()
}
#[cfg(any(test, feature = "test-utils"))]
pub fn get_current_crc_if_loaded_for_testing(&self) -> Option<&Crc> {
if self.lazy_crc.crc_version() != Some(self.version()) {
return None;
}
self.lazy_crc.cached.get()?.get().map(|arc| arc.as_ref())
}
#[cfg(any(test, feature = "test-utils"))]
pub fn crc_version_for_testing(&self) -> Option<Version> {
self.lazy_crc.crc_version()
}
#[instrument(parent = &self.span, name = "snap.write_checksum", skip_all, err)]
pub fn write_checksum(
self: &SnapshotRef,
engine: &dyn Engine,
) -> DeltaResult<(ChecksumWriteResult, SnapshotRef)> {
let has_crc_on_disk = self
.log_segment
.listed
.latest_crc_file
.as_ref()
.is_some_and(|f| f.version == self.version());
if has_crc_on_disk {
info!(
"CRC file already exists on disk at version {}",
self.version()
);
return Ok((ChecksumWriteResult::AlreadyExists, Arc::clone(self)));
}
let crc = self
.lazy_crc
.get_if_loaded_at_version(self.version())
.ok_or_else(|| {
Error::ChecksumWriteUnsupported(
"No in-memory CRC available at this snapshot version.".to_string(),
)
})?;
let crc_path = ParsedLogPath::new_crc(self.table_root(), self.version())?;
match try_write_crc_file(engine, &crc_path.location, crc) {
Ok(()) => {
info!("Wrote CRC file at {}", crc_path.location);
let new_log_segment = self.log_segment.try_new_with_crc_file(crc_path)?;
let new_snapshot = Arc::new(Snapshot::new_with_crc(
new_log_segment,
self.table_configuration().clone(),
self.lazy_crc.clone(),
));
Ok((ChecksumWriteResult::Written, new_snapshot))
}
Err(Error::FileAlreadyExists(_)) => {
info!(
"Another writer beat us to writing CRC file at {}",
crc_path.location
);
Ok((ChecksumWriteResult::AlreadyExists, Arc::clone(self)))
}
Err(e) => Err(e),
}
}
#[instrument(parent = &self.span, name = "snap.publish", skip_all, err)]
pub fn publish(
self: &SnapshotRef,
engine: &dyn Engine,
committer: &dyn Committer,
) -> DeltaResult<SnapshotRef> {
let unpublished_catalog_commits = self.log_segment().get_unpublished_catalog_commits()?;
if unpublished_catalog_commits.is_empty() {
return Ok(Arc::clone(self));
}
require!(
unpublished_catalog_commits
.windows(2)
.all(|commits| commits[0].version() + 1 == commits[1].version()),
Error::generic(format!(
"Expected ordered and contiguous unpublished catalog commits. \
Got: {unpublished_catalog_commits:?}"
))
);
require!(
self.table_configuration().is_catalog_managed(),
Error::generic(
"There are catalog commits that need publishing, but the table is not catalog-managed.",
)
);
require!(
committer.is_catalog_committer(),
Error::generic(
"There are catalog commits that need publishing, but the committer is not a catalog committer.",
)
);
let publish_metadata =
PublishMetadata::try_new(self.version(), unpublished_catalog_commits)?;
committer.publish(engine, publish_metadata)?;
Ok(Arc::new(Snapshot::new_with_crc(
self.log_segment().new_as_published()?,
self.table_configuration().clone(),
self.lazy_crc.clone(),
)))
}
#[allow(unused)]
#[internal_api]
pub(crate) fn get_domain_metadata_internal(
&self,
domain: &str,
engine: &dyn Engine,
) -> DeltaResult<Option<String>> {
let mut map = self.get_domain_metadatas_internal(engine, Some(&HashSet::from([domain])))?;
Ok(map.remove(domain).map(|dm| dm.configuration().to_owned()))
}
#[allow(unused)]
#[internal_api]
pub(crate) fn get_all_domain_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<Vec<DomainMetadata>> {
let all_metadata = self.get_domain_metadatas_internal(engine, None)?;
Ok(all_metadata
.into_values()
.filter(|domain| !domain.is_internal())
.collect())
}
#[instrument(parent = &self.span, name = "snap.get_ict", skip_all, err)]
#[internal_api]
pub(crate) fn get_in_commit_timestamp(&self, engine: &dyn Engine) -> DeltaResult<Option<i64>> {
let enablement = self
.table_configuration()
.in_commit_timestamp_enablement()?;
if matches!(enablement, InCommitTimestampEnablement::NotEnabled) {
return Ok(None);
}
if let InCommitTimestampEnablement::Enabled {
enablement: Some((enablement_version, _)),
} = enablement
{
if self.version() < enablement_version {
return Err(Error::generic(format!(
"Invalid state: snapshot at version {} has ICT enablement version {} in the future",
self.version(),
enablement_version
)));
}
}
if let Some(crc) = self
.lazy_crc
.get_or_load_if_at_version(engine, self.version())
{
match crc.in_commit_timestamp_opt {
Some(ict) => return Ok(Some(ict)),
None => {
return Err(Error::generic(format!(
"In-Commit Timestamp not found in CRC file at version {}",
self.version()
)));
}
}
}
match &self.log_segment.listed.latest_commit_file {
Some(commit_file_meta) => {
let ict = commit_file_meta.read_in_commit_timestamp(engine)?;
Ok(Some(ict))
}
None => Err(Error::generic("Last commit file not found in log segment")),
}
}
#[allow(unused)]
#[instrument(parent = &self.span, name = "snap.get_ts", skip_all, err)]
pub fn get_timestamp(&self, engine: &dyn Engine) -> DeltaResult<i64> {
match self
.table_configuration()
.in_commit_timestamp_enablement()?
{
InCommitTimestampEnablement::NotEnabled => {
match &self.log_segment.listed.latest_commit_file {
Some(commit_file_meta) => {
let ts = commit_file_meta.location.last_modified;
Ok(ts)
}
None => Err(Error::generic(format!(
"Last commit file not found in log segment for version {} \
(ICT disabled): cannot read filesystem modification timestamp",
self.version()
))),
}
}
InCommitTimestampEnablement::Enabled { .. } => self
.get_in_commit_timestamp(engine)
.map_err(|e| {
Error::generic(format!(
"Unable to read in-commit timestamp for version {}: {e}",
self.version()
))
})?
.ok_or_else(|| {
Error::internal_error(format!(
"Invalid state: version {}, ICT is enabled \
but get_in_commit_timestamp returned None",
self.version()
))
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use std::sync::Arc;
use rstest::rstest;
use serde_json::json;
use test_utils::table_builder::{FeatureSet, LogState, TestTableBuilder, VersionTarget};
use test_utils::{add_commit, delta_path_for_version};
use crate::actions::{DomainMetadata, Protocol};
use crate::arrow::array::StringArray;
use crate::arrow::record_batch::RecordBatch;
use crate::committer::FileSystemCommitter;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::default::executor::tokio::{
TokioBackgroundExecutor, TokioMultiThreadExecutor,
};
use crate::engine::default::filesystem::ObjectStoreStorageHandler;
use crate::engine::default::{DefaultEngine, DefaultEngineBuilder};
use crate::engine::sync::SyncEngine;
use crate::last_checkpoint_hint::LastCheckpointHint;
use crate::log_segment::LogSegment;
use crate::log_segment_files::LogSegmentFiles;
use crate::object_store::local::LocalFileSystem;
use crate::object_store::memory::InMemory;
use crate::object_store::path::Path;
use crate::object_store::ObjectStoreExt as _;
use crate::parquet::arrow::ArrowWriter;
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::{DataType, StructField, StructType};
use crate::table_features::{
TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION,
};
use crate::table_properties::ENABLE_IN_COMMIT_TIMESTAMPS;
use crate::transaction::create_table::create_table;
use crate::utils::test_utils::{assert_result_error_with_message, string_array_to_engine_data};
fn create_commit_info(timestamp: i64, ict: Option<i64>) -> serde_json::Value {
let mut commit_info = json!({
"timestamp": timestamp,
"operation": "WRITE",
});
if let Some(ict_value) = ict {
commit_info["inCommitTimestamp"] = json!(ict_value);
}
json!({
"commitInfo": commit_info
})
}
fn create_protocol(ict_enabled: bool, min_reader_version: Option<u32>) -> serde_json::Value {
let reader_version = min_reader_version.unwrap_or(1);
if ict_enabled {
let mut protocol = json!({
"protocol": {
"minReaderVersion": reader_version,
"minWriterVersion": TABLE_FEATURES_MIN_WRITER_VERSION,
"writerFeatures": ["inCommitTimestamp"]
}
});
if reader_version >= TABLE_FEATURES_MIN_READER_VERSION as u32 {
protocol["protocol"]["readerFeatures"] = json!([]);
}
protocol
} else {
json!({
"protocol": {
"minReaderVersion": reader_version,
"minWriterVersion": 2
}
})
}
}
fn create_metadata(
id: Option<&str>,
schema_string: Option<&str>,
created_time: Option<u64>,
ict_config: Option<(String, String)>,
ict_enabled_but_missing_version: bool,
) -> serde_json::Value {
let config = if ict_enabled_but_missing_version {
json!({
"delta.enableInCommitTimestamps": "true"
})
} else if let Some((enablement_version, enablement_timestamp)) = ict_config {
json!({
"delta.enableInCommitTimestamps": "true",
"delta.inCommitTimestampEnablementVersion": enablement_version,
"delta.inCommitTimestampEnablementTimestamp": enablement_timestamp
})
} else {
json!({})
};
json!({
"metaData": {
"id": id.unwrap_or("testId"),
"format": {"provider": "parquet", "options": {}},
"schemaString": schema_string.unwrap_or("{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}"),
"partitionColumns": [],
"configuration": config,
"createdTime": created_time.unwrap_or(1587968586154u64)
}
})
}
fn create_basic_commit(ict_enabled: bool, ict_config: Option<(String, String)>) -> String {
let protocol = create_protocol(ict_enabled, None);
let metadata = create_metadata(None, None, None, ict_config, false);
format!("{protocol}\n{metadata}")
}
fn create_snapshot_with_commit_file_absent_from_log_segment(
url: &Url,
table_cfg: TableConfiguration,
) -> DeltaResult<Snapshot> {
let checkpoint_parts = vec![ParsedLogPath::try_from(crate::FileMeta {
location: url.join("_delta_log/00000000000000000000.checkpoint.parquet")?,
last_modified: 0,
size: 100,
})?
.unwrap()];
let listed_files = LogSegmentFiles {
checkpoint_parts,
..Default::default()
};
let log_segment =
LogSegment::try_new(listed_files, url.join("_delta_log/")?, Some(0), None)?;
Ok(Snapshot::new(log_segment, table_cfg))
}
#[test]
fn test_snapshot_read_metadata() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url)
.at_version(1)
.build(&engine)
.unwrap();
let expected = Protocol::try_new_modern(["deletionVectors"], ["deletionVectors"]).unwrap();
assert_eq!(snapshot.table_configuration().protocol(), &expected);
let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#;
let expected: SchemaRef = serde_json::from_str(schema_string).unwrap();
assert_eq!(snapshot.schema(), expected);
}
#[test]
fn test_new_snapshot() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let expected = Protocol::try_new_modern(["deletionVectors"], ["deletionVectors"]).unwrap();
assert_eq!(snapshot.table_configuration().protocol(), &expected);
let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#;
let expected: SchemaRef = serde_json::from_str(schema_string).unwrap();
assert_eq!(snapshot.schema(), expected);
}
async fn commit(
table_root: impl AsRef<str>,
store: &InMemory,
version: Version,
commit: Vec<serde_json::Value>,
) {
let commit_data = commit
.iter()
.map(ToString::to_string)
.collect::<Vec<String>>()
.join("\n");
add_commit(table_root, store, version, commit_data)
.await
.unwrap();
}
#[tokio::test]
async fn test_snapshot_new_from() -> DeltaResult<()> {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let old_snapshot = Snapshot::builder_for(url.clone())
.at_version(1)
.build(&engine)
.unwrap();
let snapshot_res = Snapshot::builder_from(old_snapshot.clone())
.at_version(0)
.build(&engine);
assert!(matches!(
snapshot_res,
Err(Error::Generic(msg)) if msg == "Requested snapshot version 0 is older than snapshot hint version 1"
));
let snapshot = Snapshot::builder_from(old_snapshot.clone())
.at_version(1)
.build(&engine)
.unwrap();
let expected = old_snapshot.clone();
assert_eq!(snapshot, expected);
fn test_new_from(store: Arc<InMemory>) -> DeltaResult<()> {
let table_root = "memory:///";
let engine = DefaultEngineBuilder::new(store).build();
let base_snapshot = Snapshot::builder_for(table_root)
.at_version(0)
.build(&engine)?;
let snapshot = Snapshot::builder_from(base_snapshot.clone())
.at_version(1)
.build(&engine)?;
let expected = Snapshot::builder_for(table_root)
.at_version(1)
.build(&engine)?;
assert_eq!(snapshot, expected);
Ok(())
}
let store = Arc::new(InMemory::new());
let commit0 = vec![
json!({
"commitInfo": {
"timestamp": 1587968586154i64,
"operation": "WRITE",
"operationParameters": {"mode":"ErrorIfExists","partitionBy":"[]"},
"isBlindAppend":true
}
}),
json!({
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 2
}
}),
json!({
"metaData": {
"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9",
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1587968585495i64
}
}),
];
let table_root = "memory:///";
commit(table_root, store.as_ref(), 0, commit0.clone()).await;
let engine = DefaultEngineBuilder::new(Arc::new(store.fork())).build();
let base_snapshot = Snapshot::builder_for(table_root)
.at_version(0)
.build(&engine)?;
let snapshot = Snapshot::builder_from(base_snapshot.clone()).build(&engine)?;
let expected = Snapshot::builder_for(table_root)
.at_version(0)
.build(&engine)?;
assert_eq!(snapshot, expected);
assert!(matches!(
Snapshot::builder_from(base_snapshot.clone()).at_version(1).build(&engine),
Err(Error::Generic(msg)) if msg == "Requested snapshot version 1 is newer than the latest version 0"
));
let store_3a = store.fork();
let mut checkpoint1 = commit0.clone();
commit(table_root, &store_3a, 1, commit0.clone()).await;
checkpoint1[1] = json!({
"protocol": {
"minReaderVersion": 2,
"minWriterVersion": 5
}
});
checkpoint1[2]["partitionColumns"] = serde_json::to_value(["some_partition_column"])?;
let handler = engine.json_handler();
let json_strings: StringArray = checkpoint1
.into_iter()
.map(|json| json.to_string())
.collect::<Vec<_>>()
.into();
let parsed = handler
.parse_json(
string_array_to_engine_data(json_strings),
crate::actions::get_commit_schema().clone(),
)
.unwrap();
let checkpoint = ArrowEngineData::try_from_engine_data(parsed).unwrap();
let checkpoint: RecordBatch = checkpoint.into();
let mut buffer = vec![];
let mut writer = ArrowWriter::try_new(&mut buffer, checkpoint.schema(), None)?;
writer.write(&checkpoint)?;
writer.close()?;
store_3a
.put(
&delta_path_for_version(1, "checkpoint.parquet"),
buffer.into(),
)
.await
.unwrap();
test_new_from(store_3a.into())?;
let store_3c_i = Arc::new(store.fork());
let mut commit1 = commit0.clone();
commit1[1] = json!({
"protocol": {
"minReaderVersion": 2,
"minWriterVersion": 5
}
});
commit1[2]["partitionColumns"] = serde_json::to_value(["some_partition_column"])?;
commit(table_root, store_3c_i.as_ref(), 1, commit1).await;
test_new_from(store_3c_i.clone())?;
let engine = DefaultEngineBuilder::new(store_3c_i).build();
let base_snapshot = Snapshot::builder_for(table_root)
.at_version(0)
.build(&engine)?;
assert!(matches!(
Snapshot::builder_from(base_snapshot.clone()).at_version(2).build(&engine),
Err(Error::Generic(msg)) if msg == "LogSegment end version 1 not the same as the specified end version 2"
));
let store_3c_ii = store.fork();
let mut commit1 = commit0.clone();
commit1[1] = json!({
"protocol": {
"minReaderVersion": 2,
"minWriterVersion": 5
}
});
commit1.remove(2); commit(table_root, &store_3c_ii, 1, commit1).await;
test_new_from(store_3c_ii.into())?;
let store_3c_iii = store.fork();
let mut commit1 = commit0.clone();
commit1[2]["partitionColumns"] = serde_json::to_value(["some_partition_column"])?;
commit1.remove(1); commit(table_root, &store_3c_iii, 1, commit1).await;
test_new_from(store_3c_iii.into())?;
let store_3c_iv = store.fork();
let commit1 = vec![commit0[0].clone()];
commit(table_root, &store_3c_iv, 1, commit1).await;
test_new_from(store_3c_iv.into())?;
Ok(())
}
#[tokio::test]
async fn test_snapshot_new_from_crc() -> Result<(), Box<dyn std::error::Error>> {
let store = Arc::new(InMemory::new());
let table_root = "memory:///";
let engine = DefaultEngineBuilder::new(store.clone()).build();
let protocol = |reader_version, writer_version| {
json!({
"protocol": {
"minReaderVersion": reader_version,
"minWriterVersion": writer_version
}
})
};
let metadata = json!({
"metaData": {
"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9",
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1587968585495i64
}
});
let commit0 = vec![
json!({
"commitInfo": {
"timestamp": 1587968586154i64,
"operation": "WRITE",
"operationParameters": {"mode":"ErrorIfExists","partitionBy":"[]"},
"isBlindAppend":true
}
}),
protocol(1, 1),
metadata.clone(),
];
let commit1 = vec![
json!({
"commitInfo": {
"timestamp": 1587968586154i64,
"operation": "WRITE",
"operationParameters": {"mode":"ErrorIfExists","partitionBy":"[]"},
"isBlindAppend":true
}
}),
protocol(1, 2),
];
commit(table_root, &store, 0, commit0.clone()).await;
commit(table_root, &store, 1, commit1).await;
let crc = json!({
"table_size_bytes": 100,
"num_files": 1,
"num_metadata": 1,
"num_protocol": 1,
"metadata": metadata,
"protocol": protocol(1, 1),
});
let path = delta_path_for_version(0, "crc");
store.put(&path, crc.to_string().into()).await?;
let base_snapshot = Snapshot::builder_for(table_root)
.at_version(0)
.build(&engine)?;
let snapshot = Snapshot::builder_from(base_snapshot.clone())
.at_version(1)
.build(&engine)?;
assert_eq!(
snapshot
.log_segment
.listed
.latest_crc_file
.as_ref()
.unwrap()
.version,
0
);
let path = delta_path_for_version(1, "crc");
let crc = json!({
"table_size_bytes": 100,
"num_files": 1,
"num_metadata": 1,
"num_protocol": 1,
"metadata": metadata,
"protocol": protocol(1, 2),
});
store.put(&path, crc.to_string().into()).await?;
let snapshot = Snapshot::builder_from(base_snapshot.clone())
.at_version(1)
.build(&engine)?;
let expected = Snapshot::builder_for(table_root)
.at_version(1)
.build(&engine)?;
assert_eq!(snapshot, expected);
assert_eq!(
snapshot
.log_segment
.listed
.latest_crc_file
.as_ref()
.unwrap()
.version,
1
);
Ok(())
}
#[test]
fn test_read_table_with_missing_last_checkpoint() {
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/table-with-dv-small/_delta_log/",
))
.unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let store = Arc::new(LocalFileSystem::new());
let executor = Arc::new(TokioBackgroundExecutor::new());
let storage = ObjectStoreStorageHandler::new(store, executor, None);
let cp = LastCheckpointHint::try_read(&storage, &url).unwrap();
assert!(cp.is_none());
}
fn valid_last_checkpoint() -> (Vec<u8>, LastCheckpointHint) {
let checkpoint = LastCheckpointHint {
version: 1,
size: 8,
parts: None,
size_in_bytes: Some(21857),
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
tags: None,
};
let data = checkpoint.to_json_bytes();
(data, checkpoint)
}
fn valid_last_checkpoint_with_tags() -> (Vec<u8>, LastCheckpointHint) {
use std::collections::HashMap;
let (_, base_checkpoint) = valid_last_checkpoint();
let mut tags = HashMap::new();
tags.insert(
"author".to_string(),
"test_read_table_with_last_checkpoint".to_string(),
);
tags.insert("environment".to_string(), "snapshot_tests".to_string());
tags.insert("created_by".to_string(), "delta-kernel-rs".to_string());
let checkpoint = LastCheckpointHint {
tags: Some(tags),
..base_checkpoint
};
let data = checkpoint.to_json_bytes();
(data, checkpoint)
}
#[tokio::test]
async fn test_read_table_with_empty_last_checkpoint() {
let store = Arc::new(InMemory::new());
let empty = "{}".as_bytes().to_vec();
let invalid_path = Path::from("invalid/_last_checkpoint");
store
.put(&invalid_path, empty.into())
.await
.expect("put _last_checkpoint");
let executor = Arc::new(TokioBackgroundExecutor::new());
let storage = ObjectStoreStorageHandler::new(store, executor, None);
let url = Url::parse("memory:///invalid/").expect("valid url");
let invalid = LastCheckpointHint::try_read(&storage, &url).expect("read last checkpoint");
assert!(invalid.is_none())
}
#[tokio::test]
async fn test_read_table_with_last_checkpoint() {
let store = Arc::new(InMemory::new());
let (data, expected) = valid_last_checkpoint();
let (data_with_tags, expected_with_tags) = valid_last_checkpoint_with_tags();
let test_cases = vec![
("valid", data, Some(expected)),
("invalid", "invalid".as_bytes().to_vec(), None),
("valid_with_tags", data_with_tags, Some(expected_with_tags)),
];
for (path_prefix, data, _) in &test_cases {
let path = Path::from(format!("{path_prefix}/_last_checkpoint"));
store
.put(&path, data.clone().into())
.await
.expect("put _last_checkpoint");
}
let executor = Arc::new(TokioBackgroundExecutor::new());
let storage = ObjectStoreStorageHandler::new(store, executor, None);
for (path_prefix, _, expected_result) in test_cases {
let url = Url::parse(&format!("memory:///{path_prefix}/")).expect("valid url");
let result =
LastCheckpointHint::try_read(&storage, &url).expect("read last checkpoint");
assert_eq!(result, expected_result);
}
}
#[test_log::test]
fn test_read_table_with_checkpoint() {
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/with_checkpoint_no_last_checkpoint/",
))
.unwrap();
let location = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(location).build(&engine).unwrap();
assert_eq!(snapshot.log_segment.listed.checkpoint_parts.len(), 1);
assert_eq!(
ParsedLogPath::try_from(
snapshot.log_segment.listed.checkpoint_parts[0]
.location
.clone()
)
.unwrap()
.unwrap()
.version,
2,
);
assert_eq!(snapshot.log_segment.listed.ascending_commit_files.len(), 1);
assert_eq!(
ParsedLogPath::try_from(
snapshot.log_segment.listed.ascending_commit_files[0]
.location
.clone()
)
.unwrap()
.unwrap()
.version,
3,
);
}
#[tokio::test]
async fn test_domain_metadata() -> DeltaResult<()> {
let table_root = "memory:///test_table/";
let store = Arc::new(InMemory::new());
let engine = DefaultEngineBuilder::new(store.clone()).build();
let commit = [
json!({
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 1
}
}),
json!({
"metaData": {
"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9",
"format": { "provider": "parquet", "options": {} },
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1587968585495i64
}
}),
json!({
"domainMetadata": {
"domain": "domain1",
"configuration": "domain1_commit0",
"removed": false
}
}),
json!({
"domainMetadata": {
"domain": "domain2",
"configuration": "domain2_commit0",
"removed": false
}
}),
json!({
"domainMetadata": {
"domain": "domain3",
"configuration": "domain3_commit0",
"removed": false
}
}),
]
.map(|json| json.to_string())
.join("\n");
add_commit(table_root, store.clone().as_ref(), 0, commit)
.await
.unwrap();
let commit = [
json!({
"domainMetadata": {
"domain": "domain1",
"configuration": "domain1_commit1",
"removed": true
}
}),
json!({
"domainMetadata": {
"domain": "domain2",
"configuration": "domain2_commit1",
"removed": false
}
}),
json!({
"domainMetadata": {
"domain": "delta.domain3",
"configuration": "domain3_commit1",
"removed": false
}
}),
]
.map(|json| json.to_string())
.join("\n");
add_commit(table_root, store.as_ref(), 1, commit)
.await
.unwrap();
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
assert_eq!(snapshot.get_domain_metadata("domain1", &engine)?, None);
assert_eq!(
snapshot.get_domain_metadata("domain2", &engine)?,
Some("domain2_commit1".to_string())
);
assert_eq!(
snapshot.get_domain_metadata("domain3", &engine)?,
Some("domain3_commit0".to_string())
);
let err = snapshot
.get_domain_metadata("delta.domain3", &engine)
.unwrap_err();
assert!(matches!(err, Error::Generic(msg) if
msg == "User DomainMetadata are not allowed to use system-controlled 'delta.*' domain"));
assert_eq!(
snapshot.get_domain_metadata_internal("delta.domain3", &engine)?,
Some("domain3_commit1".to_string())
);
let mut metadata = snapshot.get_all_domain_metadata(&engine)?;
metadata.sort_by(|a, b| a.domain().cmp(b.domain()));
let mut expected = vec![
DomainMetadata::new("domain2".to_string(), "domain2_commit1".to_string()),
DomainMetadata::new("domain3".to_string(), "domain3_commit0".to_string()),
];
expected.sort_by(|a, b| a.domain().cmp(b.domain()));
assert_eq!(metadata, expected);
Ok(())
}
#[test]
fn test_log_compaction_writer() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let writer = snapshot.clone().log_compaction_writer(0, 1).unwrap();
let path = writer.compaction_path();
let expected_filename = "00000000000000000000.00000000000000000001.compacted.json";
assert!(path.to_string().ends_with(expected_filename));
let result = snapshot.clone().log_compaction_writer(2, 1);
assert_result_error_with_message(result, "Invalid version range");
let result = snapshot.log_compaction_writer(1, 1);
assert_result_error_with_message(result, "Invalid version range");
}
#[tokio::test]
async fn test_timestamp_with_ict_disabled() -> Result<(), Box<dyn std::error::Error>> {
let store = Arc::new(InMemory::new());
let table_root = "memory://test/";
let engine = DefaultEngineBuilder::new(store.clone()).build();
let commit0 = create_basic_commit(false, None);
add_commit(table_root, store.as_ref(), 0, commit0).await?;
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let result = snapshot.get_in_commit_timestamp(&engine)?;
assert_eq!(result, None);
Ok(())
}
#[tokio::test]
async fn test_timestamp_with_ict_enablement_timeline() -> Result<(), Box<dyn std::error::Error>>
{
let store = Arc::new(InMemory::new());
let table_root = "memory://test/";
let engine = DefaultEngineBuilder::new(store.clone()).build();
let commit0 = create_basic_commit(false, None);
add_commit(table_root, store.as_ref(), 0, commit0).await?;
let commit1 =
create_basic_commit(true, Some(("1".to_string(), "1587968586154".to_string())));
add_commit(table_root, store.as_ref(), 1, commit1).await?;
let expected_timestamp = 1587968586200i64;
let commit2 = format!(
r#"{{"commitInfo":{{"timestamp":1587968586154,"inCommitTimestamp":{expected_timestamp},"operation":"WRITE"}}}}"#,
);
add_commit(table_root, store.as_ref(), 2, commit2.to_string()).await?;
let snapshot_v0 = Snapshot::builder_for(table_root)
.at_version(0)
.build(&engine)?;
let result_v0 = snapshot_v0.get_in_commit_timestamp(&engine)?;
assert_eq!(result_v0, None);
let snapshot_v2 = Snapshot::builder_for(table_root)
.at_version(2)
.build(&engine)?;
let result_v2 = snapshot_v2.get_in_commit_timestamp(&engine)?;
assert_eq!(result_v2, Some(expected_timestamp));
Ok(())
}
#[tokio::test]
async fn test_get_timestamp_enablement_version_in_future() -> DeltaResult<()> {
let table_root = "memory:///test_table/";
let store = Arc::new(InMemory::new());
let engine = DefaultEngineBuilder::new(store.clone()).build();
let commit_data = [
json!({
"protocol": {
"minReaderVersion": TABLE_FEATURES_MIN_READER_VERSION,
"minWriterVersion": TABLE_FEATURES_MIN_WRITER_VERSION,
"readerFeatures": [],
"writerFeatures": ["inCommitTimestamp"]
}
}),
json!({
"metaData": {
"id": "test_id2",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[]}",
"partitionColumns": [],
"configuration": {
"delta.enableInCommitTimestamps": "true",
"delta.inCommitTimestampEnablementVersion": "5", "delta.inCommitTimestampEnablementTimestamp": "1612345678"
},
"createdTime": 1677811175819u64
}
}),
];
commit(table_root, store.as_ref(), 0, commit_data.to_vec()).await;
let commit_predates = [create_commit_info(1234567890, None)];
commit(table_root, store.as_ref(), 1, commit_predates.to_vec()).await;
let snapshot_predates = Snapshot::builder_for(table_root)
.at_version(1)
.build(&engine)?;
let result_predates = snapshot_predates.get_in_commit_timestamp(&engine);
assert_result_error_with_message(
result_predates,
"Invalid state: snapshot at version 1 has ICT enablement version 5 in the future",
);
Ok(())
}
#[tokio::test]
async fn test_get_timestamp_missing_ict_when_enabled() -> DeltaResult<()> {
let table_root = "memory:///test_table/";
let store = Arc::new(InMemory::new());
let engine = DefaultEngineBuilder::new(store.clone()).build();
let commit_data = [
create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
create_metadata(
Some("test_id"),
Some("{\"type\":\"struct\",\"fields\":[]}"),
Some(1677811175819),
Some(("0".to_string(), "1612345678".to_string())),
false,
),
];
commit(table_root, store.as_ref(), 0, commit_data.to_vec()).await;
let commit_missing_ict = [create_commit_info(1234567890, None)];
commit(table_root, store.as_ref(), 1, commit_missing_ict.to_vec()).await;
let snapshot_missing = Snapshot::builder_for(table_root)
.at_version(1)
.build(&engine)?;
let result = snapshot_missing.get_in_commit_timestamp(&engine);
assert_result_error_with_message(result, "In-Commit Timestamp not found");
Ok(())
}
#[tokio::test]
async fn test_get_timestamp_fails_when_commit_missing() -> DeltaResult<()> {
let url = Url::parse("memory:///")?;
let store = Arc::new(InMemory::new());
let engine = DefaultEngineBuilder::new(store.clone()).build();
let commit_data = [
create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
create_metadata(
Some("test_id"),
Some("{\"type\":\"struct\",\"fields\":[]}"),
Some(1677811175819),
Some(("0".to_string(), "1612345678".to_string())), false,
),
];
commit(url.as_str(), store.as_ref(), 0, commit_data.to_vec()).await;
let snapshot = Snapshot::builder_for(url.as_str())
.at_version(0)
.build(&engine)?;
let snapshot_no_commit = create_snapshot_with_commit_file_absent_from_log_segment(
&url,
snapshot.table_configuration().clone(),
)?;
let result = snapshot_no_commit.get_in_commit_timestamp(&engine);
assert_result_error_with_message(result, "Last commit file not found in log segment");
Ok(())
}
#[tokio::test]
async fn test_get_timestamp_with_checkpoint_and_commit_same_version() -> DeltaResult<()> {
let table_root = "memory:///test_table/";
let store = Arc::new(InMemory::new());
let engine = DefaultEngineBuilder::new(store.clone()).build();
let commit0_data = [
create_commit_info(1587968586154, None),
create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
create_metadata(
Some("5fba94ed-9794-4965-ba6e-6ee3c0d22af9"),
Some("{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"),
Some(1587968585495),
Some(("0".to_string(), "1587968586154".to_string())),
false,
),
];
commit(table_root, store.as_ref(), 0, commit0_data.to_vec()).await;
let checkpoint_data = [
create_commit_info(1587968586154, None),
create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
create_metadata(
Some("5fba94ed-9794-4965-ba6e-6ee3c0d22af9"),
Some("{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"),
Some(1587968585495),
Some(("0".to_string(), "1587968586154".to_string())),
false,
),
];
let handler = engine.json_handler();
let json_strings: StringArray = checkpoint_data
.into_iter()
.map(|json| json.to_string())
.collect::<Vec<_>>()
.into();
let parsed = handler.parse_json(
string_array_to_engine_data(json_strings),
crate::actions::get_commit_schema().clone(),
)?;
let checkpoint = ArrowEngineData::try_from_engine_data(parsed)?;
let checkpoint: RecordBatch = checkpoint.into();
let mut buffer = vec![];
let mut writer = ArrowWriter::try_new(&mut buffer, checkpoint.schema(), None)?;
writer.write(&checkpoint)?;
writer.close()?;
let checkpoint_path = delta_path_for_version(1, "checkpoint.parquet");
store.put(&checkpoint_path, buffer.into()).await?;
let expected_ict = 1587968586200i64;
let commit1_data = [create_commit_info(1587968586200, Some(expected_ict))];
commit(table_root, store.as_ref(), 1, commit1_data.to_vec()).await;
let snapshot = Snapshot::builder_for(table_root)
.at_version(1)
.build(&engine)?;
let timestamp = snapshot.get_in_commit_timestamp(&engine)?;
assert_eq!(timestamp, Some(expected_ict));
Ok(())
}
#[rstest]
#[case::ict_disabled(false)]
#[case::ict_enabled(true)]
fn test_get_timestamp_returns_valid_timestamp(#[case] ict_enabled: bool) -> DeltaResult<()> {
let temp_dir = tempfile::tempdir().unwrap();
let table_path = Url::from_directory_path(temp_dir.path())
.unwrap()
.to_string();
let store = Arc::new(LocalFileSystem::new());
let engine = DefaultEngineBuilder::new(store).build();
let schema = Arc::new(StructType::try_new(vec![StructField::new(
"id",
DataType::INTEGER,
true,
)])?);
let mut create_table_builder = create_table(&table_path, schema, "Test/1.0");
if ict_enabled {
create_table_builder = create_table_builder
.with_table_properties(vec![(ENABLE_IN_COMMIT_TIMESTAMPS, "true")]);
}
let _ = create_table_builder
.build(&engine, Box::new(FileSystemCommitter::new()))?
.commit(&engine)?;
let snapshot = Snapshot::builder_for(&table_path).build(&engine)?;
let ts = snapshot.get_timestamp(&engine)?;
let now_ms = chrono::Utc::now().timestamp_millis();
let two_days_ms = 2 * 24 * 60 * 60 * 1000_i64;
assert!(
(now_ms - two_days_ms..=now_ms).contains(&ts),
"timestamp {ts} not within 2 days of now ({now_ms})"
);
if ict_enabled {
let ict_ts = snapshot.get_in_commit_timestamp(&engine)?.unwrap();
assert_eq!(ts, ict_ts);
}
Ok(())
}
#[rstest]
#[case::ict_enabled(true)]
#[case::ict_disabled(false)]
#[tokio::test]
async fn test_get_timestamp_errors_when_commit_file_missing(
#[case] ict_enabled: bool,
) -> DeltaResult<()> {
let url = Url::parse("memory:///")?;
let store = Arc::new(InMemory::new());
let engine = DefaultEngineBuilder::new(store.clone()).build();
let ict_config = ict_enabled.then(|| ("0".to_string(), "1612345678".to_string()));
let reader_version = ict_enabled.then_some(TABLE_FEATURES_MIN_READER_VERSION as u32);
let mut commit_data = vec![];
if ict_enabled {
commit_data.push(create_commit_info(1677811175819, Some(1677811175999)));
}
commit_data.extend([
create_protocol(ict_enabled, reader_version),
create_metadata(
Some("test_id"),
Some("{\"type\":\"struct\",\"fields\":[]}"),
Some(1677811175819),
ict_config,
false,
),
]);
commit(url.as_str(), store.as_ref(), 0, commit_data).await;
let snapshot = Snapshot::builder_for(url.as_str())
.at_version(0)
.build(&engine)?;
let snapshot_no_commit = create_snapshot_with_commit_file_absent_from_log_segment(
&url,
snapshot.table_configuration().clone(),
)?;
let result = snapshot_no_commit.get_timestamp(&engine);
assert_result_error_with_message(result, "Last commit file not found in log segment");
Ok(())
}
#[tokio::test]
async fn test_get_timestamp_errors_when_ict_missing_from_commit_info() -> DeltaResult<()> {
let store = Arc::new(InMemory::new());
let table_root = "memory:///test_table/";
let engine = DefaultEngineBuilder::new(store.clone()).build();
let commit0_data = vec![
create_commit_info(1677811175819, None), create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
create_metadata(
Some("test_id"),
Some("{\"type\":\"struct\",\"fields\":[]}"),
Some(1677811175819),
Some(("0".to_string(), "1612345678".to_string())), false,
),
];
commit(table_root, store.as_ref(), 0, commit0_data).await;
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let result = snapshot.get_timestamp(&engine);
assert_result_error_with_message(result, "In-Commit Timestamp not found in commit file");
Ok(())
}
#[test]
fn test_context_macro_works_in_unit_test() {
let (_engine, snap, _table) = test_utils::test_context!(
LogState::with_commits(3),
FeatureSet::empty(),
VersionTarget::Latest
);
assert_eq!(snap.version(), 2);
}
#[test]
fn test_try_new_from_empty_log_tail() -> DeltaResult<()> {
let table = TestTableBuilder::new().build().unwrap();
let engine = DefaultEngineBuilder::new(table.store().clone()).build();
let base_snapshot = Snapshot::builder_for(table.table_root())
.at_version(0)
.build(&engine)?;
let result = Snapshot::try_new_from_impl(
base_snapshot.clone(),
vec![],
&engine,
None,
MetricId::default(),
)?;
assert_eq!(result, base_snapshot);
Ok(())
}
#[tokio::test]
async fn test_try_new_from_latest_commit_preservation() -> DeltaResult<()> {
let store = Arc::new(InMemory::new());
let url = Url::parse("memory:///")?;
let engine = DefaultEngineBuilder::new(store.clone()).build();
let base_commit = vec![
json!({"protocol": {"minReaderVersion": 1, "minWriterVersion": 2}}),
json!({
"metaData": {
"id": "test-id",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1587968585495i64
}
}),
];
commit(url.as_str(), store.as_ref(), 0, base_commit.clone()).await;
commit(
url.as_str(),
store.as_ref(),
1,
vec![json!({"commitInfo": {"timestamp": 1234}})],
)
.await;
commit(
url.as_str(),
store.as_ref(),
2,
vec![json!({"commitInfo": {"timestamp": 5678}})],
)
.await;
let base_snapshot = Snapshot::builder_for(url.as_str())
.at_version(1)
.build(&engine)?;
assert_eq!(
base_snapshot
.log_segment
.listed
.latest_commit_file
.as_ref()
.map(|f| f.version),
Some(1)
);
let commit_2_url = url.join("_delta_log/")?.join("00000000000000000002.json")?;
let file_meta = crate::FileMeta {
location: commit_2_url,
last_modified: 1234567890,
size: 100,
};
let parsed_path = ParsedLogPath::try_from(file_meta)?
.ok_or_else(|| Error::Generic("Failed to parse log path".to_string()))?;
let log_tail = vec![parsed_path];
let new_snapshot = Snapshot::try_new_from_impl(
base_snapshot.clone(),
log_tail,
&engine,
Some(2),
MetricId::default(),
)?;
assert_eq!(
new_snapshot
.log_segment
.listed
.latest_commit_file
.as_ref()
.map(|f| f.version),
Some(2)
);
Ok(())
}
#[tokio::test]
async fn test_try_new_from_version_boundary_cases() -> DeltaResult<()> {
let store = Arc::new(InMemory::new());
let table_root = "memory:///test_table/";
let engine = DefaultEngineBuilder::new(store.clone()).build();
let base_commit = vec![
json!({"protocol": {"minReaderVersion": 1, "minWriterVersion": 2}}),
json!({
"metaData": {
"id": "test-id",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1587968585495i64
}
}),
];
commit(table_root, store.as_ref(), 0, base_commit).await;
commit(
table_root,
store.as_ref(),
1,
vec![json!({"commitInfo": {"timestamp": 1234}})],
)
.await;
let base_snapshot = Snapshot::builder_for(table_root)
.at_version(1)
.build(&engine)?;
let same_version = Snapshot::try_new_from_impl(
base_snapshot.clone(),
vec![],
&engine,
Some(1),
MetricId::default(),
)?;
assert!(Arc::ptr_eq(&same_version, &base_snapshot));
let older_version = Snapshot::try_new_from_impl(
base_snapshot.clone(),
vec![],
&engine,
Some(0),
MetricId::default(),
);
assert!(matches!(
older_version,
Err(Error::Generic(msg)) if msg.contains("older than snapshot hint version")
));
Ok(())
}
#[test]
fn test_new_post_commit_simple() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/basic_partitioned/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let base_snapshot = Snapshot::builder_for(url.clone()).build(&engine).unwrap();
let next_version = base_snapshot.version() + 1;
let fake_new_commit = ParsedLogPath::create_parsed_published_commit(&url, next_version);
let post_commit_snapshot = base_snapshot
.new_post_commit(fake_new_commit, CrcDelta::default())
.unwrap();
assert_eq!(post_commit_snapshot.version(), next_version);
assert_eq!(post_commit_snapshot.log_segment().end_version, next_version);
}
async fn setup_test_table_with_commits(
table_root: impl AsRef<str>,
store: &InMemory,
num_commits: u64,
) -> DeltaResult<()> {
let commit0 = vec![
json!({"protocol": {"minReaderVersion": 1, "minWriterVersion": 2}}),
json!({
"metaData": {
"id": "test-id",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1587968585495i64
}
}),
json!({"add": {"path": "file1.parquet", "partitionValues": {}, "size": 100, "modificationTime": 1000, "dataChange": true}}),
];
commit(table_root.as_ref(), store, 0, commit0).await;
for i in 1..num_commits {
let commit_i = vec![json!({
"add": {
"path": format!("file{}.parquet", i + 1),
"partitionValues": {},
"size": (i + 1) * 100,
"modificationTime": (i + 1) * 1000,
"dataChange": true
}
})];
commit(table_root.as_ref(), store, i, commit_i).await;
}
Ok(())
}
async fn write_compaction_file(store: &InMemory, start: u64, end: u64) -> DeltaResult<()> {
let content = r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#;
store
.put(
&test_utils::compacted_log_path_for_versions(start, end, "json"),
content.into(),
)
.await?;
Ok(())
}
struct IncrementalSnapshotTestContext {
store: Arc<InMemory>,
url: Url,
engine: Arc<DefaultEngine<TokioMultiThreadExecutor>>,
}
fn setup_incremental_snapshot_test() -> DeltaResult<IncrementalSnapshotTestContext> {
let store = Arc::new(InMemory::new());
let url = Url::parse("memory:///")?;
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
let engine = Arc::new(
DefaultEngineBuilder::new(store.clone())
.with_task_executor(executor)
.build(),
);
Ok(IncrementalSnapshotTestContext { store, url, engine })
}
fn compare_snapshots(left: &Snapshot, right: &Snapshot) {
assert_eq!(left.table_configuration, right.table_configuration);
assert_eq!(left.log_segment.end_version, right.log_segment.end_version);
assert_eq!(
left.log_segment.checkpoint_version,
right.log_segment.checkpoint_version
);
assert_eq!(left.log_segment.log_root, right.log_segment.log_root);
assert_eq!(left.log_segment.listed, right.log_segment.listed);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_incremental_snapshot_picks_up_checkpoint_written_at_current_version(
) -> DeltaResult<()> {
let ctx = setup_incremental_snapshot_test()?;
setup_test_table_with_commits(ctx.url.as_str(), &ctx.store, 2).await?;
let snapshot_v1 = Snapshot::builder_for(ctx.url.as_str())
.at_version(1)
.build(ctx.engine.as_ref())?;
assert_eq!(snapshot_v1.log_segment.checkpoint_version, None);
snapshot_v1.clone().checkpoint(ctx.engine.as_ref())?;
let fresh = Snapshot::builder_for(ctx.url.as_str()).build(ctx.engine.as_ref())?;
assert_eq!(fresh.version(), 1);
assert_eq!(fresh.log_segment.checkpoint_version, Some(1));
let updated = Snapshot::builder_from(snapshot_v1).build(ctx.engine.as_ref())?;
assert_eq!(updated.version(), 1);
assert_eq!(updated.log_segment.checkpoint_version, Some(1));
compare_snapshots(&updated, &fresh);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_incremental_snapshot_picks_up_newer_checkpoint_below_current_version(
) -> DeltaResult<()> {
let ctx = setup_incremental_snapshot_test()?;
setup_test_table_with_commits(ctx.url.as_str(), &ctx.store, 4).await?;
Snapshot::builder_for(ctx.url.as_str())
.at_version(1)
.build(ctx.engine.as_ref())?
.checkpoint(ctx.engine.as_ref())?;
let snapshot_v3 = Snapshot::builder_for(ctx.url.as_str())
.at_version(3)
.build(ctx.engine.as_ref())?;
assert_eq!(snapshot_v3.log_segment.checkpoint_version, Some(1));
Snapshot::builder_for(ctx.url.as_str())
.at_version(2)
.build(ctx.engine.as_ref())?
.checkpoint(ctx.engine.as_ref())?;
let fresh = Snapshot::builder_for(ctx.url.as_str()).build(ctx.engine.as_ref())?;
assert_eq!(fresh.version(), 3);
assert_eq!(fresh.log_segment.checkpoint_version, Some(2));
let updated = Snapshot::builder_from(snapshot_v3).build(ctx.engine.as_ref())?;
assert_eq!(updated.version(), 3);
assert_eq!(updated.log_segment.checkpoint_version, Some(2));
compare_snapshots(&updated, &fresh);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_explicit_same_version_request_keeps_existing_snapshot_after_checkpoint_write(
) -> DeltaResult<()> {
let ctx = setup_incremental_snapshot_test()?;
setup_test_table_with_commits(ctx.url.as_str(), &ctx.store, 2).await?;
let snapshot_v1 = Snapshot::builder_for(ctx.url.as_str())
.at_version(1)
.build(ctx.engine.as_ref())?;
assert_eq!(snapshot_v1.log_segment.checkpoint_version, None);
snapshot_v1.clone().checkpoint(ctx.engine.as_ref())?;
let refreshed = Snapshot::builder_for(ctx.url.as_str()).build(ctx.engine.as_ref())?;
assert_eq!(refreshed.log_segment.checkpoint_version, Some(1));
let pinned = Snapshot::builder_from(snapshot_v1.clone())
.at_version(1)
.build(ctx.engine.as_ref())?;
assert!(Arc::ptr_eq(&pinned, &snapshot_v1));
assert_eq!(pinned.log_segment.checkpoint_version, None);
Ok(())
}
#[tokio::test]
async fn test_incremental_snapshot_with_compaction_files() -> DeltaResult<()> {
let store = Arc::new(InMemory::new());
let table_root = "memory:///";
let engine = DefaultEngineBuilder::new(store.clone()).build();
setup_test_table_with_commits(table_root, &store, 3).await?;
write_compaction_file(&store, 1, 1).await?;
write_compaction_file(&store, 1, 2).await?;
let snapshot_v2 = Snapshot::builder_for(table_root)
.at_version(2)
.build(&engine)?;
assert_eq!(
snapshot_v2
.log_segment
.listed
.ascending_compaction_files
.len(),
2
);
commit(
table_root,
&store,
3,
vec![json!({"add": {"path": "file4.parquet", "partitionValues": {}, "size": 400, "modificationTime": 4000, "dataChange": true}})],
)
.await;
let snapshot_v3 = Snapshot::builder_from(snapshot_v2)
.at_version(3)
.build(&engine)?;
assert_eq!(snapshot_v3.version(), 3);
assert_eq!(
snapshot_v3
.log_segment
.listed
.ascending_compaction_files
.len(),
2
);
Ok(())
}
#[tokio::test]
async fn test_incremental_snapshot_with_new_compaction_files() -> DeltaResult<()> {
let store = Arc::new(InMemory::new());
let table_root = "memory:///";
let engine = DefaultEngineBuilder::new(store.clone()).build();
setup_test_table_with_commits(table_root, &store, 4).await?;
write_compaction_file(&store, 1, 2).await?;
write_compaction_file(&store, 2, 2).await?;
let snapshot_v2 = Snapshot::builder_for(table_root)
.at_version(2)
.build(&engine)?;
assert_eq!(
snapshot_v2
.log_segment
.listed
.ascending_compaction_files
.len(),
2
);
write_compaction_file(&store, 1, 3).await?;
let snapshot_v3 = Snapshot::builder_from(snapshot_v2)
.at_version(3)
.build(&engine)?;
assert_eq!(snapshot_v3.version(), 3);
assert_eq!(
snapshot_v3
.log_segment
.listed
.ascending_compaction_files
.len(),
2
);
let versions_and_his: Vec<_> = snapshot_v3
.log_segment
.listed
.ascending_compaction_files
.iter()
.map(|p| match p.file_type {
LogPathFileType::CompactedCommit { hi } => (p.version, hi),
_ => panic!("Expected CompactedCommit"),
})
.collect();
assert_eq!(versions_and_his, vec![(1, 2), (2, 2)]);
Ok(())
}
#[test]
fn test_get_protocol_derived_properties() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let props = snapshot.get_protocol_derived_properties();
assert_eq!(
props.get("delta.minReaderVersion").unwrap(),
&TABLE_FEATURES_MIN_READER_VERSION.to_string()
);
assert_eq!(
props.get("delta.minWriterVersion").unwrap(),
&TABLE_FEATURES_MIN_WRITER_VERSION.to_string()
);
assert_eq!(
props.get("delta.feature.deletionVectors").unwrap(),
"supported"
);
}
#[tokio::test]
async fn test_metadata_configuration() {
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let engine = DefaultEngineBuilder::new(storage.clone()).build();
let actions = vec![
json!({"commitInfo": {"timestamp": 123, "operation": "CREATE TABLE"}}),
json!({"protocol": {
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": [],
"writerFeatures": []
}}),
json!({"metaData": {
"id": "test-id",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {
"io.unitycatalog.tableId": "abc-123",
"myapp.setting": "value"
},
"createdTime": 1234567890
}}),
];
commit(table_root, &storage, 0, actions).await;
let snapshot = Snapshot::builder_for(table_root).build(&engine).unwrap();
let config = snapshot.metadata_configuration();
assert_eq!(
config.get("io.unitycatalog.tableId"),
Some(&"abc-123".to_string())
);
assert_eq!(config.get("myapp.setting"), Some(&"value".to_string()));
}
#[rstest::rstest]
#[case::no_clustering(None, None, None)]
#[case::clustered_no_column_mapping(
Some(vec!["region"]),
None,
Some(vec![ColumnName::new(["region"])])
)]
#[case::clustered_with_column_mapping(
Some(vec!["region"]),
Some("name"),
Some(vec![ColumnName::new(["region"])])
)]
fn test_get_logical_clustering_columns(
#[case] clustering_cols: Option<Vec<&str>>,
#[case] column_mapping_mode: Option<&str>,
#[case] expected: Option<Vec<ColumnName>>,
) {
use crate::transaction::create_table::create_table;
use crate::transaction::data_layout::DataLayout;
let storage = Arc::new(InMemory::new());
let engine = DefaultEngineBuilder::new(storage).build();
let schema = Arc::new(
crate::schema::StructType::try_new(vec![
crate::schema::StructField::new("id", crate::schema::DataType::INTEGER, false),
crate::schema::StructField::new("region", crate::schema::DataType::STRING, true),
])
.unwrap(),
);
let mut builder = create_table("memory:///", schema, "test");
if let Some(cols) = &clustering_cols {
builder = builder.with_data_layout(DataLayout::clustered(cols.clone()));
}
if let Some(mode) = column_mapping_mode {
builder = builder.with_table_properties([("delta.columnMapping.mode", mode)]);
}
let _ = builder
.build(
&engine,
Box::new(crate::committer::FileSystemCommitter::new()),
)
.unwrap()
.commit(&engine)
.unwrap();
let snapshot = Snapshot::builder_for("memory:///").build(&engine).unwrap();
let result = snapshot.get_logical_clustering_columns(&engine).unwrap();
assert_eq!(result, expected);
}
}