1use std::collections::{HashMap, HashSet};
5use std::sync::Arc;
6
7use delta_kernel_derive::internal_api;
8use tracing::{debug, info, instrument, warn};
9use url::Url;
10
11use crate::action_reconciliation::calculate_transaction_expiration_timestamp;
12use crate::actions::set_transaction::{is_set_txn_expired, SetTransactionScanner};
13use crate::actions::{DomainMetadata, INTERNAL_DOMAIN_PREFIX};
14use crate::checkpoint::{
15 CheckpointSpec, CheckpointWriter, V2CheckpointConfig, DEFAULT_FILE_ACTIONS_PER_SIDECAR_HINT,
16};
17use crate::clustering::{parse_clustering_columns, CLUSTERING_DOMAIN_NAME};
18use crate::committer::{Committer, PublishMetadata};
19use crate::crc::{
20 read_crc_file_or_none, try_write_crc_file, Crc, CrcDelta, DomainMetadataState, FileStats,
21 SetTransactionState,
22};
23use crate::expressions::ColumnName;
24use crate::incremental_scan::IncrementalScanBuilder;
25use crate::log_segment::{DomainMetadataMap, LogSegment};
26use crate::metrics::events::{DOMAIN_METADATA_LOADED_SPAN, SET_TRANSACTION_LOADED_SPAN};
27use crate::metrics::MetricId;
28use crate::path::ParsedLogPath;
29use crate::scan::ScanBuilder;
30use crate::schema::SchemaRef;
31use crate::table_configuration::{InCommitTimestampEnablement, TableConfiguration};
32use crate::table_features::{physical_to_logical_column_name, ColumnMappingMode, TableFeature};
33use crate::table_properties::TableProperties;
34use crate::transaction::builder::alter_table::AlterTableTransactionBuilder;
35use crate::transaction::Transaction;
36use crate::utils::require;
37use crate::{DeltaResult, Engine, Error, LogCompactionWriter, Version};
38
39mod builder;
40mod incremental;
41pub use builder::SnapshotBuilder;
42
43pub type SnapshotRef = Arc<Snapshot>;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ChecksumWriteResult {
49 AlreadyExists,
52 Written,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum CheckpointWriteResult {
59 AlreadyExists,
61 Written,
63}
64
65pub struct Snapshot {
71 span: tracing::Span,
72 log_segment: LogSegment,
73 table_configuration: TableConfiguration,
74 crc: Option<Arc<Crc>>,
78}
79
80impl PartialEq for Snapshot {
81 fn eq(&self, other: &Self) -> bool {
82 self.log_segment == other.log_segment
83 && self.table_configuration == other.table_configuration
84 }
85}
86
87impl Eq for Snapshot {}
88
89impl Drop for Snapshot {
90 fn drop(&mut self) {
91 debug!("Dropping snapshot");
92 }
93}
94
95impl std::fmt::Debug for Snapshot {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.debug_struct("Snapshot")
98 .field("path", &self.log_segment.log_root.as_str())
99 .field("version", &self.version())
100 .field("metadata", &self.table_configuration().metadata())
101 .field("log_segment", &self.log_segment)
102 .finish()
103 }
104}
105
106impl Snapshot {
107 pub fn builder_for(table_root: impl AsRef<str>) -> SnapshotBuilder {
115 SnapshotBuilder::new_for(table_root)
116 }
117
118 pub fn builder_from(existing_snapshot: SnapshotRef) -> SnapshotBuilder {
123 SnapshotBuilder::new_from(existing_snapshot)
124 }
125
126 #[internal_api]
132 #[allow(unused)]
133 pub(crate) fn new(
134 log_segment: LogSegment,
135 table_configuration: TableConfiguration,
136 ) -> DeltaResult<Self> {
137 Self::new_with_crc(log_segment, table_configuration, None)
138 }
139
140 pub(crate) fn new_with_crc(
145 log_segment: LogSegment,
146 table_configuration: TableConfiguration,
147 crc: Option<Arc<Crc>>,
148 ) -> DeltaResult<Self> {
149 if let Some(crc) = crc.as_ref() {
150 require!(
151 crc.version == table_configuration.version(),
152 Error::internal_error(format!(
153 "CRC version {} does not match snapshot version {}",
154 crc.version,
155 table_configuration.version()
156 ))
157 );
158 }
159 let span = tracing::info_span!(
160 parent: tracing::Span::none(),
161 "snap",
162 path = %table_configuration.table_root(),
163 version = table_configuration.version(),
164 );
165 info!(parent: &span, "Created snapshot");
166 Ok(Self {
167 span,
168 log_segment,
169 table_configuration,
170 crc,
171 })
172 }
173
174 #[instrument(err, fields(version, operation_id = %operation_id), skip(engine))]
177 fn try_new_from_log_segment(
178 location: Url,
179 log_segment: LogSegment,
180 engine: &dyn Engine,
181 operation_id: MetricId,
182 ) -> DeltaResult<Self> {
183 let read_crc = log_segment
184 .listed
185 .latest_crc_file
186 .as_ref()
187 .and_then(|crc_file| read_crc_file_or_none(engine, crc_file));
188
189 let (metadata, protocol) =
190 log_segment.read_protocol_metadata(engine, read_crc.as_ref(), operation_id)?;
191
192 let table_configuration =
193 TableConfiguration::try_new(metadata, protocol, location, log_segment.end_version)?;
194
195 tracing::Span::current().record("version", table_configuration.version());
196
197 let crc = read_crc.filter(|c| c.version == log_segment.end_version);
198 Self::new_with_crc(log_segment, table_configuration, crc)
199 }
200
201 pub(crate) fn new_post_commit(
213 &self,
214 commit: ParsedLogPath,
215 crc_delta: CrcDelta,
216 ) -> DeltaResult<Self> {
217 require!(
218 commit.is_commit(),
219 Error::internal_error(format!(
220 "Cannot create post-commit Snapshot. Log file is not a commit file. \
221 Path: {}, Type: {:?}.",
222 commit.location.location, commit.file_type
223 ))
224 );
225 let read_version = self.version();
226 let new_version = commit.version;
227 require!(
228 new_version == read_version.wrapping_add(1),
229 Error::internal_error(format!(
230 "Cannot create post-commit Snapshot. Log file version ({new_version}) does not \
231 equal Snapshot version ({read_version}) + 1."
232 ))
233 );
234
235 let new_table_configuration = TableConfiguration::new_post_commit(
236 self.table_configuration(),
237 new_version,
238 crc_delta.metadata.clone(),
239 crc_delta.protocol.clone(),
240 )?;
241
242 let new_log_segment = self.log_segment.new_with_commit_appended(commit)?;
243
244 let new_crc = self
245 .crc
246 .as_deref()
247 .map(|base| Arc::new(base.clone().apply(crc_delta, new_version)));
248
249 Snapshot::new_with_crc(new_log_segment, new_table_configuration, new_crc)
250 }
251
252 #[internal_api]
258 pub(crate) fn log_segment(&self) -> &LogSegment {
259 &self.log_segment
260 }
261
262 #[internal_api]
267 pub(crate) fn crc(&self) -> Option<&Arc<Crc>> {
268 self.crc.as_ref()
269 }
270
271 pub fn table_root(&self) -> &Url {
272 self.table_configuration.table_root()
273 }
274
275 pub fn version(&self) -> Version {
277 self.table_configuration().version()
278 }
279
280 pub fn schema(&self) -> SchemaRef {
284 self.table_configuration.logical_schema()
285 }
286
287 pub fn estimated_owned_heap_size_bytes(&self) -> usize {
307 self.log_segment.listed.estimated_heap_size_bytes()
308 + self.log_segment.log_root.as_str().len()
309 + self
310 .table_configuration()
311 .metadata()
312 .schema_string()
313 .capacity()
314 }
315
316 pub fn table_properties(&self) -> &TableProperties {
318 self.table_configuration().table_properties()
319 }
320
321 #[allow(unused)]
328 #[internal_api]
329 pub(crate) fn get_protocol_derived_properties(&self) -> HashMap<String, String> {
330 let protocol = self.table_configuration().protocol();
331
332 let mut properties = HashMap::from([
333 (
334 "delta.minReaderVersion".into(),
335 protocol.min_reader_version().to_string(),
336 ),
337 (
338 "delta.minWriterVersion".into(),
339 protocol.min_writer_version().to_string(),
340 ),
341 ]);
342
343 let features = protocol
344 .reader_features()
345 .into_iter()
346 .flatten()
347 .chain(protocol.writer_features().into_iter().flatten());
348
349 for feature in features {
350 properties
351 .entry(format!("delta.feature.{}", feature.as_ref()))
352 .or_insert_with(|| "supported".to_string());
353 }
354
355 properties
356 }
357
358 #[allow(unused)]
364 #[internal_api]
365 pub(crate) fn metadata_configuration(&self) -> &HashMap<String, String> {
366 self.table_configuration().metadata().configuration()
367 }
368
369 #[internal_api]
371 pub(crate) fn table_configuration(&self) -> &TableConfiguration {
372 &self.table_configuration
373 }
374
375 #[instrument(
383 parent = &self.span,
384 name = SET_TRANSACTION_LOADED_SPAN,
385 skip_all,
386 err,
387 fields(report, from_cache, found)
388 )]
389 pub fn get_app_id_version(
390 &self,
391 application_id: &str,
392 engine: &dyn Engine,
393 ) -> DeltaResult<Option<i64>> {
394 fn record_metric(from_cache: bool, found: bool) {
395 let span = tracing::Span::current();
396 span.record("from_cache", from_cache);
397 span.record("found", found);
398 }
399
400 let expiration_timestamp =
401 calculate_transaction_expiration_timestamp(self.table_properties())?;
402
403 if let Some(crc) = self.crc.as_deref() {
405 match &crc.set_transaction_state {
406 SetTransactionState::Complete(map) => {
407 let version = map
409 .get(application_id)
410 .filter(|txn| !is_set_txn_expired(expiration_timestamp, txn.last_updated))
411 .map(|txn| txn.version);
412 record_metric(true, version.is_some());
413 return Ok(version);
414 }
415 SetTransactionState::Partial(map) => {
416 if let Some(txn) = map.get(application_id) {
418 let version = (!is_set_txn_expired(expiration_timestamp, txn.last_updated))
419 .then_some(txn.version);
420 record_metric(true, version.is_some());
421 return Ok(version);
422 }
423 }
424 }
425 }
426
427 let txn = SetTransactionScanner::get_one(
429 self.log_segment(),
430 application_id,
431 engine,
432 expiration_timestamp,
433 )?;
434 record_metric(false, txn.is_some());
435 Ok(txn.map(|t| t.version))
436 }
437
438 pub fn get_domain_metadata(
443 &self,
444 domain: &str,
445 engine: &dyn Engine,
446 ) -> DeltaResult<Option<String>> {
447 if domain.starts_with(INTERNAL_DOMAIN_PREFIX) {
448 return Err(Error::generic(
449 "User DomainMetadata are not allowed to use system-controlled 'delta.*' domain",
450 ));
451 }
452
453 self.get_domain_metadata_internal(domain, engine)
454 }
455
456 #[allow(unused)]
475 #[internal_api]
476 pub(crate) fn get_logical_clustering_columns(
477 &self,
478 engine: &dyn Engine,
479 ) -> DeltaResult<Option<Vec<ColumnName>>> {
480 let physical_columns = match self.get_physical_clustering_columns(engine)? {
481 Some(cols) => cols,
482 None => return Ok(None),
483 };
484 let column_mapping_mode = self.table_configuration.column_mapping_mode();
485 if column_mapping_mode == ColumnMappingMode::None {
486 return Ok(Some(physical_columns));
488 }
489 let logical_schema = self.table_configuration.logical_schema();
491 let logical_columns = physical_columns
492 .iter()
493 .map(|physical_col| {
494 physical_to_logical_column_name(&logical_schema, physical_col, column_mapping_mode)
495 })
496 .collect::<DeltaResult<Vec<_>>>()?;
497 Ok(Some(logical_columns))
498 }
499
500 #[internal_api]
509 pub(crate) fn get_physical_clustering_columns(
510 &self,
511 engine: &dyn Engine,
512 ) -> DeltaResult<Option<Vec<ColumnName>>> {
513 if !self
514 .table_configuration
515 .protocol()
516 .has_table_feature(&TableFeature::ClusteredTable)
517 {
518 return Ok(None);
519 }
520 match self.get_domain_metadata_internal(CLUSTERING_DOMAIN_NAME, engine)? {
521 Some(config) => Ok(Some(parse_clustering_columns(&config)?)),
522 None => Ok(None),
523 }
524 }
525
526 #[instrument(
532 parent = &self.span,
533 name = DOMAIN_METADATA_LOADED_SPAN,
534 skip_all,
535 err,
536 fields(report, from_cache, num_domains_returned)
537 )]
538 #[internal_api]
539 pub(crate) fn get_domain_metadatas_internal(
540 &self,
541 engine: &dyn Engine,
542 domains: Option<&HashSet<&str>>,
543 ) -> DeltaResult<DomainMetadataMap> {
544 fn record_metric(from_cache: bool, num_domains_returned: usize) {
545 let span = tracing::Span::current();
546 span.record("from_cache", from_cache);
547 span.record("num_domains_returned", num_domains_returned as u64);
548 }
549
550 if let Some(crc) = self.crc.as_deref() {
552 match &crc.domain_metadata_state {
553 DomainMetadataState::Complete(map) => {
554 let hits: DomainMetadataMap = match domains {
555 None => map.clone(),
556 Some(filter) => filter
560 .iter()
561 .filter_map(|&k| map.get(k).map(|v| (k.to_string(), v.clone())))
562 .collect(),
563 };
564 record_metric(true, hits.len());
565 return Ok(hits);
566 }
567 DomainMetadataState::Partial(map) => {
568 if let Some(filter) = domains {
569 let hits: Option<DomainMetadataMap> = filter
576 .iter()
577 .map(|&k| map.get(k).map(|v| (k.to_string(), v.clone())))
578 .collect();
579 if let Some(hits) = hits {
580 record_metric(true, hits.len());
581 return Ok(hits);
582 }
583 }
584 }
585 }
586 }
587 let replayed = self.log_segment().scan_domain_metadatas(domains, engine)?;
593 record_metric(false, replayed.len());
594 Ok(replayed)
595 }
596
597 #[allow(unused)]
604 #[internal_api]
605 pub(crate) fn get_domain_metadata_internal(
606 &self,
607 domain: &str,
608 engine: &dyn Engine,
609 ) -> DeltaResult<Option<String>> {
610 let mut map = self.get_domain_metadatas_internal(engine, Some(&HashSet::from([domain])))?;
611 Ok(map.remove(domain).map(|dm| dm.configuration().to_owned()))
612 }
613
614 #[allow(unused)]
618 #[internal_api]
619 pub(crate) fn get_all_domain_metadata(
620 &self,
621 engine: &dyn Engine,
622 ) -> DeltaResult<Vec<DomainMetadata>> {
623 let all_metadata = self.get_domain_metadatas_internal(engine, None)?;
624 Ok(all_metadata
625 .into_values()
626 .filter(|domain| !domain.is_internal())
627 .collect())
628 }
629
630 pub fn get_file_stats_if_present(&self) -> Option<FileStats> {
633 self.crc.as_ref().and_then(|crc| crc.file_stats().cloned())
634 }
635
636 #[instrument(parent = &self.span, name = "snap.get_ict", skip_all, err)]
646 #[internal_api]
647 pub(crate) fn get_in_commit_timestamp(&self, engine: &dyn Engine) -> DeltaResult<Option<i64>> {
648 let enablement = self
650 .table_configuration()
651 .in_commit_timestamp_enablement()?;
652
653 if matches!(enablement, InCommitTimestampEnablement::NotEnabled) {
655 return Ok(None);
656 }
657
658 if let InCommitTimestampEnablement::Enabled {
661 enablement: Some((enablement_version, _)),
662 } = enablement
663 {
664 if self.version() < enablement_version {
665 return Err(Error::generic(format!(
666 "Invalid state: snapshot at version {} has ICT enablement version {} in the future",
667 self.version(),
668 enablement_version
669 )));
670 }
671 }
672
673 if let Some(crc) = self.crc.as_deref() {
675 match crc.in_commit_timestamp_opt {
676 Some(ict) => return Ok(Some(ict)),
677 None => {
678 return Err(Error::generic(format!(
679 "In-Commit Timestamp not found in CRC file at version {}",
680 self.version()
681 )));
682 }
683 }
684 }
685
686 match &self.log_segment.listed.latest_commit_file {
688 Some(commit_file_meta) => {
689 let ict = commit_file_meta.read_in_commit_timestamp(engine)?;
690 Ok(Some(ict))
691 }
692 None => Err(Error::generic("Last commit file not found in log segment")),
693 }
694 }
695
696 #[allow(unused)]
708 #[instrument(parent = &self.span, name = "snap.get_ts", skip_all, err)]
709 pub fn get_timestamp(&self, engine: &dyn Engine) -> DeltaResult<i64> {
710 match self
711 .table_configuration()
712 .in_commit_timestamp_enablement()?
713 {
714 InCommitTimestampEnablement::NotEnabled => {
715 match &self.log_segment.listed.latest_commit_file {
716 Some(commit_file_meta) => {
717 let ts = commit_file_meta.location.last_modified;
718 Ok(ts)
719 }
720 None => Err(Error::generic(format!(
721 "Last commit file not found in log segment for version {} \
722 (ICT disabled): cannot read filesystem modification timestamp",
723 self.version()
724 ))),
725 }
726 }
727 InCommitTimestampEnablement::Enabled { .. } => self
728 .get_in_commit_timestamp(engine)
729 .map_err(|e| {
730 Error::generic(format!(
731 "Unable to read in-commit timestamp for version {}: {e}",
732 self.version()
733 ))
734 })?
735 .ok_or_else(|| {
736 Error::internal_error(format!(
737 "Invalid state: version {}, ICT is enabled \
738 but get_in_commit_timestamp returned None",
739 self.version()
740 ))
741 }),
742 }
743 }
744
745 pub fn scan_builder(self: Arc<Self>) -> ScanBuilder {
751 ScanBuilder::new(self)
752 }
753
754 pub fn incremental_scan_builder(
759 self: Arc<Self>,
760 base_version: Version,
761 ) -> IncrementalScanBuilder {
762 IncrementalScanBuilder::new(self, base_version)
763 }
764
765 pub fn transaction(
770 self: Arc<Self>,
771 committer: Box<dyn Committer>,
772 engine: &dyn Engine,
773 ) -> DeltaResult<Transaction> {
774 Transaction::try_new_existing_table(self, committer, engine)
775 }
776
777 pub fn alter_table(self: Arc<Self>) -> AlterTableTransactionBuilder {
785 AlterTableTransactionBuilder::new(self)
786 }
787
788 pub fn create_checkpoint_writer(self: Arc<Self>) -> DeltaResult<CheckpointWriter> {
793 CheckpointWriter::try_new(self)
794 }
795
796 pub fn log_compaction_writer(
810 self: Arc<Self>,
811 start_version: Version,
812 end_version: Version,
813 ) -> DeltaResult<LogCompactionWriter> {
814 LogCompactionWriter::try_new(self, start_version, end_version)
815 }
816
817 #[instrument(parent = &self.span, name = "snap.write_checksum", skip_all, err)]
845 pub fn write_checksum(
846 self: &SnapshotRef,
847 engine: &dyn Engine,
848 ) -> DeltaResult<(ChecksumWriteResult, SnapshotRef)> {
849 let has_crc_on_disk = self
850 .log_segment
851 .listed
852 .latest_crc_file
853 .as_ref()
854 .is_some_and(|f| f.version == self.version());
855
856 if has_crc_on_disk {
857 info!(
858 "CRC file already exists on disk at version {}",
859 self.version()
860 );
861 return Ok((ChecksumWriteResult::AlreadyExists, Arc::clone(self)));
862 }
863
864 let crc = self.crc.as_deref().ok_or_else(|| {
865 Error::ChecksumWriteUnsupported(
866 "No in-memory CRC available at this snapshot version.".to_string(),
867 )
868 })?;
869
870 let crc_path = ParsedLogPath::new_crc(self.table_root(), self.version())?;
871
872 match try_write_crc_file(engine, &crc_path.location, crc) {
873 Ok(()) => {
874 info!("Wrote CRC file at {}", crc_path.location);
875 let new_log_segment = self.log_segment.try_new_with_crc_file(crc_path)?;
876 let new_snapshot = Arc::new(Snapshot::new_with_crc(
877 new_log_segment,
878 self.table_configuration().clone(),
879 self.crc.clone(),
880 )?);
881 Ok((ChecksumWriteResult::Written, new_snapshot))
882 }
883 Err(Error::FileAlreadyExists(_)) => {
884 info!(
885 "Another writer beat us to writing CRC file at {}",
886 crc_path.location
887 );
888 Ok((ChecksumWriteResult::AlreadyExists, Arc::clone(self)))
889 }
890 Err(e) => Err(e),
891 }
892 }
893
894 #[instrument(parent = &self.span, name = "snap.checkpoint", skip_all, err)]
933 pub fn checkpoint(
934 self: &SnapshotRef,
935 engine: &dyn Engine,
936 spec: Option<&CheckpointSpec>,
937 ) -> DeltaResult<(CheckpointWriteResult, SnapshotRef)> {
938 if self.log_segment.checkpoint_version == Some(self.log_segment.end_version) {
939 info!(
940 "Checkpoint already exists for snapshot version {}",
941 self.version()
942 );
943 return Ok((CheckpointWriteResult::AlreadyExists, Arc::clone(self)));
944 }
945 info!(
946 "Writing checkpoint for snapshot version {} with spec {:?}",
947 self.version(),
948 spec
949 );
950
951 let v2_supported = self
952 .table_configuration()
953 .is_feature_supported(&TableFeature::V2Checkpoint);
954 match spec {
955 Some(CheckpointSpec::V2(cfg)) => {
956 if !v2_supported {
957 return Err(Error::checkpoint_write(
958 "CheckpointSpec::V2 requires the v2Checkpoint table feature to be supported",
959 ));
960 }
961 if let V2CheckpointConfig::WithSidecar {
962 file_actions_per_sidecar_hint: Some(0),
963 } = cfg
964 {
965 return Err(Error::checkpoint_write(
966 "file_actions_per_sidecar_hint must be greater than 0",
967 ));
968 }
969 }
970 Some(CheckpointSpec::V1) if v2_supported => {
971 return Err(Error::unsupported(
974 "Kernel does not support writing V1 checkpoints when the table supports v2Checkpoint",
975 ));
976 }
977 _ => {}
978 }
979
980 let writer = Arc::clone(self).create_checkpoint_writer()?;
981
982 let write_result = match spec {
983 Some(CheckpointSpec::V2(V2CheckpointConfig::WithSidecar {
984 file_actions_per_sidecar_hint,
985 })) => {
986 let hint =
987 file_actions_per_sidecar_hint.unwrap_or(DEFAULT_FILE_ACTIONS_PER_SIDECAR_HINT);
988 writer.write_v2_checkpoint_with_sidecars(engine, hint)
989 }
990 _ => writer.write_checkpoint_without_sidecars(engine),
991 };
992
993 let info = match write_result {
994 Ok(info) => info,
995 Err(Error::FileAlreadyExists(_)) => {
996 warn!(
999 "ParquetHandler::write_parquet_file unexpectedly failed on \
1000 FileAlreadyExists for version {}",
1001 self.version()
1002 );
1003 return Ok((CheckpointWriteResult::AlreadyExists, Arc::clone(self)));
1004 }
1005 Err(e) => return Err(e),
1006 };
1007
1008 writer.finalize(engine, &info.last_checkpoint_stats)?;
1009
1010 let checkpoint_log_path = ParsedLogPath::try_from(info.file_meta)?.ok_or_else(|| {
1011 Error::internal_error("Checkpoint path could not be parsed as a log path")
1012 })?;
1013 let new_log_segment = self
1014 .log_segment
1015 .try_new_with_checkpoint(checkpoint_log_path)?;
1016 Ok((
1017 CheckpointWriteResult::Written,
1018 Arc::new(Snapshot::new_with_crc(
1019 new_log_segment,
1020 self.table_configuration().clone(),
1021 self.crc.clone(),
1022 )?),
1023 ))
1024 }
1025
1026 #[instrument(parent = &self.span, name = "snap.publish", skip_all, err)]
1047 pub fn publish(
1048 self: &SnapshotRef,
1049 engine: &dyn Engine,
1050 committer: &dyn Committer,
1051 ) -> DeltaResult<SnapshotRef> {
1052 let unpublished_catalog_commits = self.log_segment().get_unpublished_catalog_commits()?;
1053
1054 if unpublished_catalog_commits.is_empty() {
1055 return Ok(Arc::clone(self));
1056 }
1057
1058 require!(
1059 unpublished_catalog_commits
1060 .windows(2)
1061 .all(|commits| commits[0].version() + 1 == commits[1].version()),
1062 Error::generic(format!(
1063 "Expected ordered and contiguous unpublished catalog commits. \
1064 Got: {unpublished_catalog_commits:?}"
1065 ))
1066 );
1067
1068 require!(
1069 self.table_configuration().is_catalog_managed(),
1070 Error::generic(
1071 "There are catalog commits that need publishing, but the table is not catalog-managed.",
1072 )
1073 );
1074
1075 require!(
1076 committer.is_catalog_committer(),
1077 Error::generic(
1078 "There are catalog commits that need publishing, but the committer is not a catalog committer.",
1079 )
1080 );
1081
1082 let publish_metadata =
1083 PublishMetadata::try_new(self.version(), unpublished_catalog_commits)?;
1084
1085 committer.publish(engine, publish_metadata)?;
1086
1087 Ok(Arc::new(Snapshot::new_with_crc(
1088 self.log_segment().new_as_published()?,
1089 self.table_configuration().clone(),
1090 self.crc.clone(),
1091 )?))
1092 }
1093}
1094
1095#[cfg(test)]
1097async fn commit(
1098 table_root: impl AsRef<str>,
1099 store: &crate::object_store::memory::InMemory,
1100 version: Version,
1101 commit: Vec<serde_json::Value>,
1102) {
1103 let commit_data = commit
1104 .iter()
1105 .map(ToString::to_string)
1106 .collect::<Vec<String>>()
1107 .join("\n");
1108 test_utils::add_commit(table_root, store, version, commit_data)
1109 .await
1110 .unwrap();
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115 use std::path::PathBuf;
1116 use std::sync::Arc;
1117
1118 use rstest::rstest;
1119 use serde_json::json;
1120 use test_utils::table_builder::{
1121 checkpoint_json_stats, unpartitioned, FeatureSet, LogState, TestTableBuilder, VersionTarget,
1122 };
1123 use test_utils::{add_commit, delta_path_for_version};
1124
1125 use super::{commit, *};
1126 use crate::actions::{DomainMetadata, Protocol};
1127 use crate::arrow::array::StringArray;
1128 use crate::arrow::record_batch::RecordBatch;
1129 use crate::committer::FileSystemCommitter;
1130 use crate::engine::arrow_data::ArrowEngineData;
1131 use crate::engine::sync::SyncEngine;
1132 use crate::last_checkpoint_hint::LastCheckpointHint;
1133 use crate::log_segment::LogSegment;
1134 use crate::log_segment_files::LogSegmentFiles;
1135 use crate::object_store::memory::InMemory;
1136 use crate::object_store::path::Path;
1137 use crate::object_store::ObjectStoreExt as _;
1138 use crate::parquet::arrow::ArrowWriter;
1139 use crate::path::ParsedLogPath;
1140 use crate::schema::{DataType, StructField, StructType};
1141 use crate::table_features::{
1142 TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION,
1143 };
1144 use crate::table_properties::ENABLE_IN_COMMIT_TIMESTAMPS;
1145 use crate::transaction::create_table::create_table;
1146 use crate::utils::test_utils::{assert_result_error_with_message, string_array_to_engine_data};
1147
1148 fn create_commit_info(timestamp: i64, ict: Option<i64>) -> serde_json::Value {
1150 let mut commit_info = json!({
1151 "timestamp": timestamp,
1152 "operation": "WRITE",
1153 });
1154
1155 if let Some(ict_value) = ict {
1156 commit_info["inCommitTimestamp"] = json!(ict_value);
1157 }
1158
1159 json!({
1160 "commitInfo": commit_info
1161 })
1162 }
1163
1164 fn create_protocol(ict_enabled: bool, min_reader_version: Option<u32>) -> serde_json::Value {
1165 let reader_version = min_reader_version.unwrap_or(1);
1166
1167 if ict_enabled {
1168 let mut protocol = json!({
1169 "protocol": {
1170 "minReaderVersion": reader_version,
1171 "minWriterVersion": TABLE_FEATURES_MIN_WRITER_VERSION,
1172 "writerFeatures": ["inCommitTimestamp"]
1173 }
1174 });
1175
1176 if reader_version >= TABLE_FEATURES_MIN_READER_VERSION as u32 {
1178 protocol["protocol"]["readerFeatures"] = json!([]);
1179 }
1180
1181 protocol
1182 } else {
1183 json!({
1184 "protocol": {
1185 "minReaderVersion": reader_version,
1186 "minWriterVersion": 2
1187 }
1188 })
1189 }
1190 }
1191
1192 fn create_metadata(
1193 id: Option<&str>,
1194 schema_string: Option<&str>,
1195 created_time: Option<u64>,
1196 ict_config: Option<(String, String)>,
1197 ict_enabled_but_missing_version: bool,
1198 ) -> serde_json::Value {
1199 let config = if ict_enabled_but_missing_version {
1200 json!({
1202 "delta.enableInCommitTimestamps": "true"
1203 })
1204 } else if let Some((enablement_version, enablement_timestamp)) = ict_config {
1205 json!({
1206 "delta.enableInCommitTimestamps": "true",
1207 "delta.inCommitTimestampEnablementVersion": enablement_version,
1208 "delta.inCommitTimestampEnablementTimestamp": enablement_timestamp
1209 })
1210 } else {
1211 json!({})
1212 };
1213
1214 json!({
1215 "metaData": {
1216 "id": id.unwrap_or("testId"),
1217 "format": {"provider": "parquet", "options": {}},
1218 "schemaString": schema_string.unwrap_or("{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}"),
1219 "partitionColumns": [],
1220 "configuration": config,
1221 "createdTime": created_time.unwrap_or(1587968586154u64)
1222 }
1223 })
1224 }
1225
1226 fn create_basic_commit(ict_enabled: bool, ict_config: Option<(String, String)>) -> String {
1227 let protocol = create_protocol(ict_enabled, None);
1228 let metadata = create_metadata(None, None, None, ict_config, false);
1229 format!("{protocol}\n{metadata}")
1230 }
1231
1232 fn create_snapshot_with_commit_file_absent_from_log_segment(
1233 url: &Url,
1234 table_cfg: TableConfiguration,
1235 ) -> DeltaResult<Snapshot> {
1236 let checkpoint_parts = vec![ParsedLogPath::try_from(crate::FileMeta {
1239 location: url.join("_delta_log/00000000000000000000.checkpoint.parquet")?,
1240 last_modified: 0,
1241 size: 100,
1242 })?
1243 .unwrap()];
1244
1245 let listed_files = LogSegmentFiles {
1246 checkpoint_parts,
1247 ..Default::default()
1248 };
1249
1250 let log_segment =
1251 LogSegment::try_new(listed_files, url.join("_delta_log/")?, Some(0), None)?;
1252
1253 Snapshot::new_with_crc(log_segment, table_cfg, None)
1254 }
1255
1256 #[test]
1257 fn test_snapshot_read_metadata() {
1258 let path =
1259 std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
1260 let url = url::Url::from_directory_path(path).unwrap();
1261
1262 let engine = SyncEngine::new();
1263 let snapshot = Snapshot::builder_for(url)
1264 .at_version(1)
1265 .build(&engine)
1266 .unwrap();
1267
1268 let expected = Protocol::try_new_modern(["deletionVectors"], ["deletionVectors"]).unwrap();
1269 assert_eq!(snapshot.table_configuration().protocol(), &expected);
1270
1271 let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#;
1272 let expected: SchemaRef = serde_json::from_str(schema_string).unwrap();
1273 assert_eq!(snapshot.schema(), expected);
1274 }
1275
1276 #[test]
1277 fn test_new_snapshot() {
1278 let path =
1279 std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
1280 let url = url::Url::from_directory_path(path).unwrap();
1281
1282 let engine = SyncEngine::new();
1283 let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
1284
1285 let expected = Protocol::try_new_modern(["deletionVectors"], ["deletionVectors"]).unwrap();
1286 assert_eq!(snapshot.table_configuration().protocol(), &expected);
1287
1288 let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#;
1289 let expected: SchemaRef = serde_json::from_str(schema_string).unwrap();
1290 assert_eq!(snapshot.schema(), expected);
1291 }
1292
1293 #[test]
1294 fn test_read_table_with_missing_last_checkpoint() {
1295 let path = std::fs::canonicalize(PathBuf::from(
1297 "./tests/data/table-with-dv-small/_delta_log/",
1298 ))
1299 .unwrap();
1300 let url = url::Url::from_directory_path(path).unwrap();
1301
1302 let engine = SyncEngine::new();
1303 let storage = engine.storage_handler();
1304 let cp = LastCheckpointHint::try_read(storage.as_ref(), &url).unwrap();
1305 assert!(cp.is_none());
1306 }
1307
1308 fn valid_last_checkpoint() -> (Vec<u8>, LastCheckpointHint) {
1309 let checkpoint = LastCheckpointHint {
1310 version: 1,
1311 size: 8,
1312 parts: None,
1313 size_in_bytes: Some(21857),
1314 num_of_add_files: None,
1315 checkpoint_schema: None,
1316 checksum: None,
1317 tags: None,
1318 };
1319 let data = checkpoint.to_json_bytes();
1320 (data, checkpoint)
1321 }
1322
1323 fn valid_last_checkpoint_with_tags() -> (Vec<u8>, LastCheckpointHint) {
1324 use std::collections::HashMap;
1325
1326 let (_, base_checkpoint) = valid_last_checkpoint();
1327
1328 let mut tags = HashMap::new();
1329 tags.insert(
1330 "author".to_string(),
1331 "test_read_table_with_last_checkpoint".to_string(),
1332 );
1333 tags.insert("environment".to_string(), "snapshot_tests".to_string());
1334 tags.insert("created_by".to_string(), "delta-kernel-rs".to_string());
1335
1336 let checkpoint = LastCheckpointHint {
1337 tags: Some(tags),
1338 ..base_checkpoint
1339 };
1340
1341 let data = checkpoint.to_json_bytes();
1342 (data, checkpoint)
1343 }
1344
1345 #[tokio::test]
1346 async fn test_read_table_with_empty_last_checkpoint() {
1347 let store = Arc::new(InMemory::new());
1349
1350 let empty = "{}".as_bytes().to_vec();
1352 let invalid_path = Path::from("invalid/_last_checkpoint");
1353
1354 store
1355 .put(&invalid_path, empty.into())
1356 .await
1357 .expect("put _last_checkpoint");
1358
1359 let engine = SyncEngine::new_with_store(store);
1360 let storage = engine.storage_handler();
1361 let url = Url::parse("memory:///invalid/").expect("valid url");
1362 let invalid =
1363 LastCheckpointHint::try_read(storage.as_ref(), &url).expect("read last checkpoint");
1364 assert!(invalid.is_none())
1365 }
1366
1367 #[tokio::test]
1368 async fn test_read_table_with_last_checkpoint() {
1369 let store = Arc::new(InMemory::new());
1371
1372 let (data, expected) = valid_last_checkpoint();
1374 let (data_with_tags, expected_with_tags) = valid_last_checkpoint_with_tags();
1375 let test_cases = vec![
1376 ("valid", data, Some(expected)),
1377 ("invalid", "invalid".as_bytes().to_vec(), None),
1378 ("valid_with_tags", data_with_tags, Some(expected_with_tags)),
1379 ];
1380
1381 for (path_prefix, data, _) in &test_cases {
1383 let path = Path::from(format!("{path_prefix}/_last_checkpoint"));
1384 store
1385 .put(&path, data.clone().into())
1386 .await
1387 .expect("put _last_checkpoint");
1388 }
1389
1390 let engine = SyncEngine::new_with_store(store);
1391 let storage = engine.storage_handler();
1392
1393 for (path_prefix, _, expected_result) in test_cases {
1396 let url = Url::parse(&format!("memory:///{path_prefix}/")).expect("valid url");
1397 let result =
1398 LastCheckpointHint::try_read(storage.as_ref(), &url).expect("read last checkpoint");
1399 assert_eq!(result, expected_result);
1400 }
1401 }
1402
1403 #[test_log::test]
1404 fn test_read_table_with_checkpoint() {
1405 let path = std::fs::canonicalize(PathBuf::from(
1406 "./tests/data/with_checkpoint_no_last_checkpoint/",
1407 ))
1408 .unwrap();
1409 let location = url::Url::from_directory_path(path).unwrap();
1410 let engine = SyncEngine::new();
1411 let snapshot = Snapshot::builder_for(location).build(&engine).unwrap();
1412
1413 assert_eq!(snapshot.log_segment.listed.checkpoint_parts.len(), 1);
1414 assert_eq!(
1415 ParsedLogPath::try_from(
1416 snapshot.log_segment.listed.checkpoint_parts[0]
1417 .location
1418 .clone()
1419 )
1420 .unwrap()
1421 .unwrap()
1422 .version,
1423 2,
1424 );
1425 assert_eq!(snapshot.log_segment.listed.ascending_commit_files.len(), 1);
1426 assert_eq!(
1427 ParsedLogPath::try_from(
1428 snapshot.log_segment.listed.ascending_commit_files[0]
1429 .location
1430 .clone()
1431 )
1432 .unwrap()
1433 .unwrap()
1434 .version,
1435 3,
1436 );
1437 }
1438
1439 #[tokio::test]
1440 async fn test_domain_metadata() -> DeltaResult<()> {
1441 let table_root = "memory:///test_table/";
1442 let store = Arc::new(InMemory::new());
1443 let engine = SyncEngine::new_with_store(store.clone());
1444
1445 let commit = [
1449 json!({
1450 "protocol": {
1451 "minReaderVersion": 1,
1452 "minWriterVersion": 1
1453 }
1454 }),
1455 json!({
1456 "metaData": {
1457 "id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9",
1458 "format": { "provider": "parquet", "options": {} },
1459 "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
1460 "partitionColumns": [],
1461 "configuration": {},
1462 "createdTime": 1587968585495i64
1463 }
1464 }),
1465 json!({
1466 "domainMetadata": {
1467 "domain": "domain1",
1468 "configuration": "domain1_commit0",
1469 "removed": false
1470 }
1471 }),
1472 json!({
1473 "domainMetadata": {
1474 "domain": "domain2",
1475 "configuration": "domain2_commit0",
1476 "removed": false
1477 }
1478 }),
1479 json!({
1480 "domainMetadata": {
1481 "domain": "domain3",
1482 "configuration": "domain3_commit0",
1483 "removed": false
1484 }
1485 }),
1486 ]
1487 .map(|json| json.to_string())
1488 .join("\n");
1489 add_commit(table_root, store.clone().as_ref(), 0, commit)
1490 .await
1491 .unwrap();
1492
1493 let commit = [
1498 json!({
1499 "domainMetadata": {
1500 "domain": "domain1",
1501 "configuration": "domain1_commit1",
1502 "removed": true
1503 }
1504 }),
1505 json!({
1506 "domainMetadata": {
1507 "domain": "domain2",
1508 "configuration": "domain2_commit1",
1509 "removed": false
1510 }
1511 }),
1512 json!({
1513 "domainMetadata": {
1514 "domain": "delta.domain3",
1515 "configuration": "domain3_commit1",
1516 "removed": false
1517 }
1518 }),
1519 ]
1520 .map(|json| json.to_string())
1521 .join("\n");
1522 add_commit(table_root, store.as_ref(), 1, commit)
1523 .await
1524 .unwrap();
1525
1526 let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
1527
1528 assert_eq!(snapshot.get_domain_metadata("domain1", &engine)?, None);
1531 assert_eq!(
1532 snapshot.get_domain_metadata("domain2", &engine)?,
1533 Some("domain2_commit1".to_string())
1534 );
1535 assert_eq!(
1536 snapshot.get_domain_metadata("domain3", &engine)?,
1537 Some("domain3_commit0".to_string())
1538 );
1539 let err = snapshot
1540 .get_domain_metadata("delta.domain3", &engine)
1541 .unwrap_err();
1542 assert!(matches!(err, Error::Generic(msg) if
1543 msg == "User DomainMetadata are not allowed to use system-controlled 'delta.*' domain"));
1544
1545 assert_eq!(
1547 snapshot.get_domain_metadata_internal("delta.domain3", &engine)?,
1548 Some("domain3_commit1".to_string())
1549 );
1550
1551 let mut metadata = snapshot.get_all_domain_metadata(&engine)?;
1553 metadata.sort_by(|a, b| a.domain().cmp(b.domain()));
1554
1555 let mut expected = vec![
1556 DomainMetadata::new("domain2".to_string(), "domain2_commit1".to_string()),
1557 DomainMetadata::new("domain3".to_string(), "domain3_commit0".to_string()),
1558 ];
1559 expected.sort_by(|a, b| a.domain().cmp(b.domain()));
1560
1561 assert_eq!(metadata, expected);
1562
1563 Ok(())
1564 }
1565
1566 #[test]
1567 #[ignore = "log compaction disabled (#2337)"]
1568 fn test_log_compaction_writer() {
1569 let path =
1570 std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
1571 let url = url::Url::from_directory_path(path).unwrap();
1572
1573 let engine = SyncEngine::new();
1574 let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
1575
1576 let writer = snapshot.clone().log_compaction_writer(0, 1).unwrap();
1578 let path = writer.compaction_path();
1579
1580 let expected_filename = "00000000000000000000.00000000000000000001.compacted.json";
1582 assert!(path.to_string().ends_with(expected_filename));
1583
1584 let result = snapshot.clone().log_compaction_writer(2, 1);
1586 assert_result_error_with_message(result, "Invalid version range");
1587
1588 let result = snapshot.log_compaction_writer(1, 1);
1590 assert_result_error_with_message(result, "Invalid version range");
1591 }
1592
1593 #[test]
1595 fn test_log_compaction_writer_unsupported() {
1596 let path =
1597 std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
1598 let url = url::Url::from_directory_path(path).unwrap();
1599
1600 let engine = SyncEngine::new();
1601 let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
1602
1603 let result = snapshot.log_compaction_writer(0, 1);
1604 assert_result_error_with_message(result, "not currently supported");
1605 }
1606
1607 #[tokio::test]
1608 async fn test_timestamp_with_ict_disabled() -> Result<(), Box<dyn std::error::Error>> {
1609 let store = Arc::new(InMemory::new());
1610 let table_root = "memory://test/";
1611 let engine = SyncEngine::new_with_store(store.clone());
1612
1613 let commit0 = create_basic_commit(false, None);
1615 add_commit(table_root, store.as_ref(), 0, commit0).await?;
1616
1617 let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
1618
1619 let result = snapshot.get_in_commit_timestamp(&engine)?;
1621 assert_eq!(result, None);
1622
1623 Ok(())
1624 }
1625
1626 #[tokio::test]
1627 async fn test_timestamp_with_ict_enablement_timeline() -> Result<(), Box<dyn std::error::Error>>
1628 {
1629 let store = Arc::new(InMemory::new());
1630 let table_root = "memory://test/";
1631 let engine = SyncEngine::new_with_store(store.clone());
1632
1633 let commit0 = create_basic_commit(false, None);
1635 add_commit(table_root, store.as_ref(), 0, commit0).await?;
1636
1637 let commit1 =
1639 create_basic_commit(true, Some(("1".to_string(), "1587968586154".to_string())));
1640 add_commit(table_root, store.as_ref(), 1, commit1).await?;
1641
1642 let expected_timestamp = 1587968586200i64;
1644 let commit2 = format!(
1645 r#"{{"commitInfo":{{"timestamp":1587968586154,"inCommitTimestamp":{expected_timestamp},"operation":"WRITE"}}}}"#,
1646 );
1647 add_commit(table_root, store.as_ref(), 2, commit2.to_string()).await?;
1648
1649 let snapshot_v0 = Snapshot::builder_for(table_root)
1651 .at_version(0)
1652 .build(&engine)?;
1653 let result_v0 = snapshot_v0.get_in_commit_timestamp(&engine)?;
1655 assert_eq!(result_v0, None);
1656
1657 let snapshot_v2 = Snapshot::builder_for(table_root)
1659 .at_version(2)
1660 .build(&engine)?;
1661 let result_v2 = snapshot_v2.get_in_commit_timestamp(&engine)?;
1663 assert_eq!(result_v2, Some(expected_timestamp));
1664
1665 Ok(())
1666 }
1667
1668 #[tokio::test]
1669 async fn test_get_timestamp_enablement_version_in_future() -> DeltaResult<()> {
1670 let table_root = "memory:///test_table/";
1672 let store = Arc::new(InMemory::new());
1673 let engine = SyncEngine::new_with_store(store.clone());
1674
1675 let commit_data = [
1676 json!({
1677 "protocol": {
1678 "minReaderVersion": TABLE_FEATURES_MIN_READER_VERSION,
1679 "minWriterVersion": TABLE_FEATURES_MIN_WRITER_VERSION,
1680 "readerFeatures": [],
1681 "writerFeatures": ["inCommitTimestamp"]
1682 }
1683 }),
1684 json!({
1685 "metaData": {
1686 "id": "test_id2",
1687 "format": {"provider": "parquet", "options": {}},
1688 "schemaString": "{\"type\":\"struct\",\"fields\":[]}",
1689 "partitionColumns": [],
1690 "configuration": {
1691 "delta.enableInCommitTimestamps": "true",
1692 "delta.inCommitTimestampEnablementVersion": "5", "delta.inCommitTimestampEnablementTimestamp": "1612345678"
1694 },
1695 "createdTime": 1677811175819u64
1696 }
1697 }),
1698 ];
1699 commit(table_root, store.as_ref(), 0, commit_data.to_vec()).await;
1700
1701 let commit_predates = [create_commit_info(1234567890, None)];
1703 commit(table_root, store.as_ref(), 1, commit_predates.to_vec()).await;
1704
1705 let snapshot_predates = Snapshot::builder_for(table_root)
1706 .at_version(1)
1707 .build(&engine)?;
1708 let result_predates = snapshot_predates.get_in_commit_timestamp(&engine);
1709
1710 assert_result_error_with_message(
1712 result_predates,
1713 "Invalid state: snapshot at version 1 has ICT enablement version 5 in the future",
1714 );
1715
1716 Ok(())
1717 }
1718
1719 #[tokio::test]
1720 async fn test_get_timestamp_missing_ict_when_enabled() -> DeltaResult<()> {
1721 let table_root = "memory:///test_table/";
1723 let store = Arc::new(InMemory::new());
1724 let engine = SyncEngine::new_with_store(store.clone());
1725
1726 let commit_data = [
1727 create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
1728 create_metadata(
1729 Some("test_id"),
1730 Some("{\"type\":\"struct\",\"fields\":[]}"),
1731 Some(1677811175819),
1732 Some(("0".to_string(), "1612345678".to_string())),
1733 false,
1734 ),
1735 ];
1736 commit(table_root, store.as_ref(), 0, commit_data.to_vec()).await; let commit_missing_ict = [create_commit_info(1234567890, None)];
1740 commit(table_root, store.as_ref(), 1, commit_missing_ict.to_vec()).await;
1741
1742 let snapshot_missing = Snapshot::builder_for(table_root)
1743 .at_version(1)
1744 .build(&engine)?;
1745 let result = snapshot_missing.get_in_commit_timestamp(&engine);
1746 assert_result_error_with_message(result, "In-Commit Timestamp not found");
1747
1748 Ok(())
1749 }
1750
1751 #[tokio::test]
1752 async fn test_get_timestamp_fails_when_commit_missing() -> DeltaResult<()> {
1753 let url = Url::parse("memory:///")?;
1757 let store = Arc::new(InMemory::new());
1758 let engine = SyncEngine::new_with_store(store.clone());
1759
1760 let commit_data = [
1762 create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
1763 create_metadata(
1764 Some("test_id"),
1765 Some("{\"type\":\"struct\",\"fields\":[]}"),
1766 Some(1677811175819),
1767 Some(("0".to_string(), "1612345678".to_string())), false,
1769 ),
1770 ];
1771 commit(url.as_str(), store.as_ref(), 0, commit_data.to_vec()).await;
1772
1773 let snapshot = Snapshot::builder_for(url.as_str())
1775 .at_version(0)
1776 .build(&engine)?;
1777
1778 let snapshot_no_commit = create_snapshot_with_commit_file_absent_from_log_segment(
1779 &url,
1780 snapshot.table_configuration().clone(),
1781 )?;
1782
1783 let result = snapshot_no_commit.get_in_commit_timestamp(&engine);
1785 assert_result_error_with_message(result, "Last commit file not found in log segment");
1786
1787 Ok(())
1788 }
1789
1790 #[tokio::test]
1791 async fn test_get_timestamp_with_checkpoint_and_commit_same_version() -> DeltaResult<()> {
1792 let table_root = "memory:///test_table/";
1795 let store = Arc::new(InMemory::new());
1796 let engine = SyncEngine::new_with_store(store.clone());
1797
1798 let commit0_data = [
1800 create_commit_info(1587968586154, None),
1801 create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
1802 create_metadata(
1803 Some("5fba94ed-9794-4965-ba6e-6ee3c0d22af9"),
1804 Some("{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"),
1805 Some(1587968585495),
1806 Some(("0".to_string(), "1587968586154".to_string())),
1807 false,
1808 ),
1809 ];
1810 commit(table_root, store.as_ref(), 0, commit0_data.to_vec()).await;
1811
1812 let checkpoint_data = [
1814 create_commit_info(1587968586154, None),
1815 create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
1816 create_metadata(
1817 Some("5fba94ed-9794-4965-ba6e-6ee3c0d22af9"),
1818 Some("{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"),
1819 Some(1587968585495),
1820 Some(("0".to_string(), "1587968586154".to_string())),
1821 false,
1822 ),
1823 ];
1824
1825 let handler = engine.json_handler();
1826 let json_strings: StringArray = checkpoint_data
1827 .into_iter()
1828 .map(|json| json.to_string())
1829 .collect::<Vec<_>>()
1830 .into();
1831 let parsed = handler.parse_json(
1832 string_array_to_engine_data(json_strings),
1833 crate::actions::get_commit_schema().clone(),
1834 )?;
1835 let checkpoint = ArrowEngineData::try_from_engine_data(parsed)?;
1836 let checkpoint: RecordBatch = checkpoint.into();
1837
1838 let mut buffer = vec![];
1839 let mut writer = ArrowWriter::try_new(&mut buffer, checkpoint.schema(), None)?;
1840 writer.write(&checkpoint)?;
1841 writer.close()?;
1842
1843 let checkpoint_path = delta_path_for_version(1, "checkpoint.parquet");
1844 store.put(&checkpoint_path, buffer.into()).await?;
1845
1846 let expected_ict = 1587968586200i64;
1848 let commit1_data = [create_commit_info(1587968586200, Some(expected_ict))];
1849 commit(table_root, store.as_ref(), 1, commit1_data.to_vec()).await;
1850
1851 let snapshot = Snapshot::builder_for(table_root)
1854 .at_version(1)
1855 .build(&engine)?;
1856
1857 let timestamp = snapshot.get_in_commit_timestamp(&engine)?;
1859 assert_eq!(timestamp, Some(expected_ict));
1860
1861 Ok(())
1862 }
1863
1864 #[rstest]
1865 #[case::ict_disabled(false)]
1866 #[case::ict_enabled(true)]
1867 fn test_get_timestamp_returns_valid_timestamp(#[case] ict_enabled: bool) -> DeltaResult<()> {
1868 let temp_dir = tempfile::tempdir().unwrap();
1869 let table_path = Url::from_directory_path(temp_dir.path())
1870 .unwrap()
1871 .to_string();
1872 let engine = SyncEngine::new();
1873
1874 let schema = Arc::new(StructType::try_new(vec![StructField::new(
1875 "id",
1876 DataType::INTEGER,
1877 true,
1878 )])?);
1879
1880 let mut create_table_builder = create_table(&table_path, schema, "Test/1.0");
1881 if ict_enabled {
1882 create_table_builder = create_table_builder
1883 .with_table_properties(vec![(ENABLE_IN_COMMIT_TIMESTAMPS, "true")]);
1884 }
1885
1886 let _ = create_table_builder
1887 .build(&engine, Box::new(FileSystemCommitter::new()))?
1888 .commit(&engine)?;
1889
1890 let snapshot = Snapshot::builder_for(&table_path).build(&engine)?;
1891 let ts = snapshot.get_timestamp(&engine)?;
1892 let now_ms = chrono::Utc::now().timestamp_millis();
1893 let two_days_ms = 2 * 24 * 60 * 60 * 1000_i64;
1894 assert!(
1895 (now_ms - two_days_ms..=now_ms).contains(&ts),
1896 "timestamp {ts} not within 2 days of now ({now_ms})"
1897 );
1898
1899 if ict_enabled {
1900 let ict_ts = snapshot.get_in_commit_timestamp(&engine)?.unwrap();
1901 assert_eq!(ts, ict_ts);
1902 }
1903 Ok(())
1904 }
1905
1906 #[rstest]
1907 #[case::ict_enabled(true)]
1908 #[case::ict_disabled(false)]
1909 #[tokio::test]
1910 async fn test_get_timestamp_errors_when_commit_file_missing(
1911 #[case] ict_enabled: bool,
1912 ) -> DeltaResult<()> {
1913 let url = Url::parse("memory:///")?;
1914 let store = Arc::new(InMemory::new());
1915 let engine = SyncEngine::new_with_store(store.clone());
1916
1917 let ict_config = ict_enabled.then(|| ("0".to_string(), "1612345678".to_string()));
1923 let reader_version = ict_enabled.then_some(TABLE_FEATURES_MIN_READER_VERSION as u32);
1924
1925 let mut commit_data = vec![];
1926 if ict_enabled {
1928 commit_data.push(create_commit_info(1677811175819, Some(1677811175999)));
1929 }
1930 commit_data.extend([
1931 create_protocol(ict_enabled, reader_version),
1932 create_metadata(
1933 Some("test_id"),
1934 Some("{\"type\":\"struct\",\"fields\":[]}"),
1935 Some(1677811175819),
1936 ict_config,
1937 false,
1938 ),
1939 ]);
1940 commit(url.as_str(), store.as_ref(), 0, commit_data).await;
1941
1942 let snapshot = Snapshot::builder_for(url.as_str())
1943 .at_version(0)
1944 .build(&engine)?;
1945
1946 let snapshot_no_commit = create_snapshot_with_commit_file_absent_from_log_segment(
1947 &url,
1948 snapshot.table_configuration().clone(),
1949 )?;
1950
1951 let result = snapshot_no_commit.get_timestamp(&engine);
1952 assert_result_error_with_message(result, "Last commit file not found in log segment");
1953
1954 Ok(())
1955 }
1956
1957 #[tokio::test]
1958 async fn test_get_timestamp_errors_when_ict_missing_from_commit_info() -> DeltaResult<()> {
1959 let store = Arc::new(InMemory::new());
1962 let table_root = "memory:///test_table/";
1963 let engine = SyncEngine::new_with_store(store.clone());
1964
1965 let commit0_data = vec![
1966 create_commit_info(1677811175819, None), create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
1968 create_metadata(
1969 Some("test_id"),
1970 Some("{\"type\":\"struct\",\"fields\":[]}"),
1971 Some(1677811175819),
1972 Some(("0".to_string(), "1612345678".to_string())), false,
1974 ),
1975 ];
1976 commit(table_root, store.as_ref(), 0, commit0_data).await;
1977
1978 let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
1979 let result = snapshot.get_timestamp(&engine);
1980 assert_result_error_with_message(result, "In-Commit Timestamp not found in commit file");
1981
1982 Ok(())
1983 }
1984
1985 #[test]
1989 fn test_context_macro_works_in_unit_test() {
1990 let (_engine, snap, _table) = test_utils::test_context!(
1991 LogState::with_latest_version(2),
1992 FeatureSet::empty(),
1993 unpartitioned(),
1994 checkpoint_json_stats(),
1995 VersionTarget::Latest,
1996 SyncEngine::new_with_store
1997 );
1998 assert_eq!(snap.version(), 2);
1999 }
2000
2001 #[test]
2002 fn test_new_post_commit_simple() {
2003 let path = std::fs::canonicalize(PathBuf::from("./tests/data/basic_partitioned/")).unwrap();
2005 let url = url::Url::from_directory_path(path).unwrap();
2006 let engine = SyncEngine::new();
2007 let base_snapshot = Snapshot::builder_for(url.clone()).build(&engine).unwrap();
2008 let next_version = base_snapshot.version() + 1;
2009
2010 let fake_new_commit = ParsedLogPath::create_parsed_published_commit(&url, next_version);
2012 let post_commit_snapshot = base_snapshot
2013 .new_post_commit(fake_new_commit, CrcDelta::default())
2014 .unwrap();
2015
2016 assert_eq!(post_commit_snapshot.version(), next_version);
2018 assert_eq!(post_commit_snapshot.log_segment().end_version, next_version);
2019 }
2020
2021 #[test]
2022 fn test_get_protocol_derived_properties() {
2023 let path =
2024 std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
2025 let url = url::Url::from_directory_path(path).unwrap();
2026
2027 let engine = SyncEngine::new();
2028 let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
2029
2030 let props = snapshot.get_protocol_derived_properties();
2031 assert_eq!(
2032 props.get("delta.minReaderVersion").unwrap(),
2033 &TABLE_FEATURES_MIN_READER_VERSION.to_string()
2034 );
2035 assert_eq!(
2036 props.get("delta.minWriterVersion").unwrap(),
2037 &TABLE_FEATURES_MIN_WRITER_VERSION.to_string()
2038 );
2039 assert_eq!(
2040 props.get("delta.feature.deletionVectors").unwrap(),
2041 "supported"
2042 );
2043 }
2044
2045 #[tokio::test]
2046 async fn test_metadata_configuration() {
2047 let storage = Arc::new(InMemory::new());
2048 let table_root = "memory:///";
2049 let engine = SyncEngine::new_with_store(storage.clone());
2050
2051 let actions = vec![
2053 json!({"commitInfo": {"timestamp": 123, "operation": "CREATE TABLE"}}),
2054 json!({"protocol": {
2055 "minReaderVersion": 3,
2056 "minWriterVersion": 7,
2057 "readerFeatures": [],
2058 "writerFeatures": []
2059 }}),
2060 json!({"metaData": {
2061 "id": "test-id",
2062 "format": {"provider": "parquet", "options": {}},
2063 "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}",
2064 "partitionColumns": [],
2065 "configuration": {
2066 "io.unitycatalog.tableId": "abc-123",
2067 "myapp.setting": "value"
2068 },
2069 "createdTime": 1234567890
2070 }}),
2071 ];
2072 commit(table_root, &storage, 0, actions).await;
2073
2074 let snapshot = Snapshot::builder_for(table_root).build(&engine).unwrap();
2075 let config = snapshot.metadata_configuration();
2076 assert_eq!(
2077 config.get("io.unitycatalog.tableId"),
2078 Some(&"abc-123".to_string())
2079 );
2080 assert_eq!(config.get("myapp.setting"), Some(&"value".to_string()));
2081 }
2082
2083 #[rstest::rstest]
2084 #[case::no_clustering(None, None, None)]
2085 #[case::clustered_no_column_mapping(
2086 Some(vec!["region"]),
2087 None,
2088 Some(vec![ColumnName::new(["region"])])
2089 )]
2090 #[case::clustered_with_column_mapping(
2091 Some(vec!["region"]),
2092 Some("name"),
2093 Some(vec![ColumnName::new(["region"])])
2094 )]
2095 fn test_get_logical_clustering_columns(
2096 #[case] clustering_cols: Option<Vec<&str>>,
2097 #[case] column_mapping_mode: Option<&str>,
2098 #[case] expected: Option<Vec<ColumnName>>,
2099 ) {
2100 use crate::transaction::create_table::create_table;
2101 use crate::transaction::data_layout::DataLayout;
2102
2103 let storage = Arc::new(InMemory::new());
2104 let engine = SyncEngine::new_with_store(storage);
2105 let schema = Arc::new(
2106 crate::schema::StructType::try_new(vec![
2107 crate::schema::StructField::new("id", crate::schema::DataType::INTEGER, true),
2108 crate::schema::StructField::new("region", crate::schema::DataType::STRING, true),
2109 ])
2110 .unwrap(),
2111 );
2112 let mut builder = create_table("memory:///", schema, "test");
2113 if let Some(cols) = &clustering_cols {
2114 builder = builder.with_data_layout(DataLayout::clustered(cols.clone()));
2115 }
2116 if let Some(mode) = column_mapping_mode {
2117 builder = builder.with_table_properties([("delta.columnMapping.mode", mode)]);
2118 }
2119 let _ = builder
2120 .build(
2121 &engine,
2122 Box::new(crate::committer::FileSystemCommitter::new()),
2123 )
2124 .unwrap()
2125 .commit(&engine)
2126 .unwrap();
2127 let snapshot = Snapshot::builder_for("memory:///").build(&engine).unwrap();
2128 let result = snapshot.get_logical_clustering_columns(&engine).unwrap();
2129 assert_eq!(result, expected);
2130 }
2131
2132 #[test]
2136 fn estimated_owned_heap_size_on_table_with_many_log_files() {
2137 let (_engine, baseline_snap, _table) = test_utils::test_context!(
2139 LogState::with_latest_version(100),
2140 FeatureSet::empty(),
2141 unpartitioned(),
2142 checkpoint_json_stats(),
2143 VersionTarget::Latest,
2144 SyncEngine::new_with_store
2145 );
2146
2147 let baseline_heap = baseline_snap.estimated_owned_heap_size_bytes();
2148 let struct_size = size_of::<Snapshot>();
2149 assert!(
2152 baseline_heap > 5 * struct_size,
2153 "baseline heap {baseline_heap} should exceed 5 * sizeof(Snapshot)={}",
2154 5 * struct_size
2155 );
2156
2157 let snap_extra_checkpoints =
2160 snapshot_with_extra_files(&baseline_snap, |listed, log_root| {
2161 for i in 1..=100u32 {
2162 let filename =
2163 format!("00000000000000000099.checkpoint.{i:010}.0000000100.parquet");
2164 let location = log_root.join(&filename).unwrap();
2165 let part = ParsedLogPath::try_from(crate::FileMeta {
2166 location,
2167 last_modified: 0,
2168 size: 100,
2169 })
2170 .unwrap()
2171 .unwrap();
2172 listed.checkpoint_parts.push(part);
2173 }
2174 });
2175 let delta_checkpoints =
2176 snap_extra_checkpoints.estimated_owned_heap_size_bytes() - baseline_heap;
2177 assert!(
2178 delta_checkpoints >= 15_000,
2179 "delta_checkpoints {delta_checkpoints} should be >= 15_000 for 100 checkpoint parts"
2180 );
2181
2182 let snap_extra_compactions =
2185 snapshot_with_extra_files(&baseline_snap, |listed, log_root| {
2186 for i in 0..100u64 {
2187 let start = i * 10;
2188 let end = start + 5;
2189 let filename = format!("{start:020}.{end:020}.compacted.json");
2190 let location = log_root.join(&filename).unwrap();
2191 let comp = ParsedLogPath::try_from(crate::FileMeta {
2192 location,
2193 last_modified: 0,
2194 size: 100,
2195 })
2196 .unwrap()
2197 .unwrap();
2198 listed.ascending_compaction_files.push(comp);
2199 }
2200 });
2201 let delta_compactions =
2202 snap_extra_compactions.estimated_owned_heap_size_bytes() - baseline_heap;
2203 assert!(
2204 delta_compactions >= 15_000,
2205 "delta_compactions {delta_compactions} should be >= 15_000 for 100 compaction files"
2206 );
2207 }
2208
2209 #[test]
2212 fn estimated_owned_heap_size_reflects_schema_string() {
2213 fn snap_with_schema(schema: SchemaRef) -> SnapshotRef {
2214 let store = Arc::new(InMemory::new());
2215 let engine = SyncEngine::new_with_store(store);
2216 create_table("memory:///", schema, "test")
2217 .build(&engine, Box::new(FileSystemCommitter::new()))
2218 .unwrap()
2219 .commit(&engine)
2220 .unwrap()
2221 .unwrap_committed();
2222 Snapshot::builder_for("memory:///").build(&engine).unwrap()
2223 }
2224
2225 let small_schema = Arc::new(
2226 StructType::try_new(vec![StructField::nullable("a", DataType::INTEGER)]).unwrap(),
2227 );
2228 let wide_schema = Arc::new(
2229 StructType::try_new(
2230 (0..50)
2231 .map(|i| StructField::nullable(format!("field_{i:03}"), DataType::STRING))
2232 .collect::<Vec<_>>(),
2233 )
2234 .unwrap(),
2235 );
2236
2237 let snap_small = snap_with_schema(small_schema);
2238 let snap_wide = snap_with_schema(wide_schema);
2239
2240 let schema_str_delta = snap_wide
2241 .table_configuration()
2242 .metadata()
2243 .schema_string()
2244 .capacity()
2245 - snap_small
2246 .table_configuration()
2247 .metadata()
2248 .schema_string()
2249 .capacity();
2250 let heap_delta = snap_wide.estimated_owned_heap_size_bytes()
2251 - snap_small.estimated_owned_heap_size_bytes();
2252 let ratio = heap_delta as f64 / schema_str_delta as f64;
2255 assert!(
2256 (0.8..=1.2).contains(&ratio),
2257 "heap_delta {heap_delta} should be within 20% of schema_str_delta {schema_str_delta} (ratio = {ratio:.3})"
2258 );
2259 }
2260
2261 #[test]
2262 fn estimated_owned_heap_size_for_version_zero() {
2263 let table = TestTableBuilder::new().build().unwrap();
2264 let engine = SyncEngine::new_with_store(table.store().clone());
2265 let snapshot = Snapshot::builder_for(table.table_root())
2266 .build(&engine)
2267 .unwrap();
2268
2269 let heap = snapshot.estimated_owned_heap_size_bytes();
2270 assert!(heap > 0, "heap size should be nonzero");
2271 assert!(
2272 heap < 10000,
2273 "heap size {heap} unexpectedly large for v0 snapshot"
2274 );
2275 }
2276
2277 fn snapshot_with_extra_files<F>(baseline: &SnapshotRef, mutate: F) -> Snapshot
2278 where
2279 F: FnOnce(&mut LogSegmentFiles, &Url),
2280 {
2281 let mut new_log_segment = baseline.log_segment().clone();
2282 mutate(&mut new_log_segment.listed, &new_log_segment.log_root);
2283 Snapshot::new(new_log_segment, baseline.table_configuration().clone()).unwrap()
2284 }
2285}