Skip to main content

delta_kernel/snapshot/
mod.rs

1//! In-memory representation of snapshots of tables (snapshot is a table at given point in time, it
2//! has schema etc.)
3
4use std::collections::{HashMap, HashSet};
5use std::sync::Arc;
6
7use delta_kernel_derive::internal_api;
8use tracing::{debug, info, instrument, warn};
9use url::Url;
10
11use crate::action_reconciliation::calculate_transaction_expiration_timestamp;
12use crate::actions::set_transaction::{is_set_txn_expired, SetTransactionScanner};
13use crate::actions::{DomainMetadata, INTERNAL_DOMAIN_PREFIX};
14use crate::checkpoint::{
15    CheckpointSpec, CheckpointWriter, V2CheckpointConfig, DEFAULT_FILE_ACTIONS_PER_SIDECAR_HINT,
16};
17use crate::clustering::{parse_clustering_columns, CLUSTERING_DOMAIN_NAME};
18use crate::committer::{Committer, PublishMetadata};
19use crate::crc::{
20    read_crc_file_or_none, try_write_crc_file, Crc, CrcDelta, DomainMetadataState, FileStats,
21    SetTransactionState,
22};
23use crate::expressions::ColumnName;
24use crate::incremental_scan::IncrementalScanBuilder;
25use crate::log_segment::{DomainMetadataMap, LogSegment};
26use crate::metrics::events::{DOMAIN_METADATA_LOADED_SPAN, SET_TRANSACTION_LOADED_SPAN};
27use crate::metrics::MetricId;
28use crate::path::ParsedLogPath;
29use crate::scan::ScanBuilder;
30use crate::schema::SchemaRef;
31use crate::table_configuration::{InCommitTimestampEnablement, TableConfiguration};
32use crate::table_features::{physical_to_logical_column_name, ColumnMappingMode, TableFeature};
33use crate::table_properties::TableProperties;
34use crate::transaction::builder::alter_table::AlterTableTransactionBuilder;
35use crate::transaction::Transaction;
36use crate::utils::require;
37use crate::{DeltaResult, Engine, Error, LogCompactionWriter, Version};
38
39mod builder;
40mod incremental;
41pub use builder::SnapshotBuilder;
42
43/// A shared, thread-safe reference to a [`Snapshot`].
44pub type SnapshotRef = Arc<Snapshot>;
45
46/// Result of attempting to write a version checksum (CRC) file.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ChecksumWriteResult {
49    /// A CRC file already exists at this version. Per the Delta protocol, writers MUST NOT
50    /// overwrite existing version checksum files.
51    AlreadyExists,
52    /// The CRC file was successfully written to storage.
53    Written,
54}
55
56/// Result of attempting to write a checkpoint file.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum CheckpointWriteResult {
59    /// A checkpoint already exists at this version.
60    AlreadyExists,
61    /// The checkpoint was successfully written to storage.
62    Written,
63}
64
65// TODO expose methods for accessing the files of a table (with file pruning).
66/// In-memory representation of a specific snapshot of a Delta table. While a `DeltaTable` exists
67/// throughout time, `Snapshot`s represent a view of a table at a specific point in time; they
68/// have a defined schema (which may change over time for any given table), specific version, and
69/// frozen log segment.
70pub struct Snapshot {
71    span: tracing::Span,
72    log_segment: LogSegment,
73    table_configuration: TableConfiguration,
74    /// CRC at this snapshot's version, eagerly resolved at construction time. `Some(crc)`
75    /// means `crc.version == self.version()` and the CRC can be queried at zero I/O. `None`
76    /// means no CRC was loadable (no CRC on disk at this version, or the read failed).
77    crc: Option<Arc<Crc>>,
78}
79
80impl PartialEq for Snapshot {
81    fn eq(&self, other: &Self) -> bool {
82        self.log_segment == other.log_segment
83            && self.table_configuration == other.table_configuration
84    }
85}
86
87impl Eq for Snapshot {}
88
89impl Drop for Snapshot {
90    fn drop(&mut self) {
91        debug!("Dropping snapshot");
92    }
93}
94
95impl std::fmt::Debug for Snapshot {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.debug_struct("Snapshot")
98            .field("path", &self.log_segment.log_root.as_str())
99            .field("version", &self.version())
100            .field("metadata", &self.table_configuration().metadata())
101            .field("log_segment", &self.log_segment)
102            .finish()
103    }
104}
105
106impl Snapshot {
107    // ============================================================================
108    // Construction entry points
109    // ============================================================================
110
111    /// Create a new [`SnapshotBuilder`] to build a new [`Snapshot`] for a given table root. If you
112    /// instead have an existing [`Snapshot`] you would like to do minimal work to update, consider
113    /// using [`Snapshot::builder_from`] instead.
114    pub fn builder_for(table_root: impl AsRef<str>) -> SnapshotBuilder {
115        SnapshotBuilder::new_for(table_root)
116    }
117
118    /// Create a new [`SnapshotBuilder`] to incrementally update an existing [`Snapshot`] to a
119    /// more recent version.
120    ///
121    /// See `Snapshot::try_new_from` for the case-by-case behavior.
122    pub fn builder_from(existing_snapshot: SnapshotRef) -> SnapshotBuilder {
123        SnapshotBuilder::new_from(existing_snapshot)
124    }
125
126    // ============================================================================
127    // Internal constructors
128    // ============================================================================
129
130    /// Create a new [`Snapshot`] from a [`LogSegment`] and [`TableConfiguration`].
131    #[internal_api]
132    #[allow(unused)]
133    pub(crate) fn new(
134        log_segment: LogSegment,
135        table_configuration: TableConfiguration,
136    ) -> DeltaResult<Self> {
137        Self::new_with_crc(log_segment, table_configuration, None)
138    }
139
140    /// Internal constructor that accepts an explicit pre-resolved CRC.
141    ///
142    /// A `Some(crc)` must be at the table configuration's version; otherwise this returns an
143    /// internal error.
144    pub(crate) fn new_with_crc(
145        log_segment: LogSegment,
146        table_configuration: TableConfiguration,
147        crc: Option<Arc<Crc>>,
148    ) -> DeltaResult<Self> {
149        if let Some(crc) = crc.as_ref() {
150            require!(
151                crc.version == table_configuration.version(),
152                Error::internal_error(format!(
153                    "CRC version {} does not match snapshot version {}",
154                    crc.version,
155                    table_configuration.version()
156                ))
157            );
158        }
159        let span = tracing::info_span!(
160            parent: tracing::Span::none(),
161            "snap",
162            path = %table_configuration.table_root(),
163            version = table_configuration.version(),
164        );
165        info!(parent: &span, "Created snapshot");
166        Ok(Self {
167            span,
168            log_segment,
169            table_configuration,
170            crc,
171        })
172    }
173
174    /// Create a new [`Snapshot`] from a freshly-listed [`LogSegment`], eagerly resolving the
175    /// CRC at the segment's end version when one is present on disk.
176    #[instrument(err, fields(version, operation_id = %operation_id), skip(engine))]
177    fn try_new_from_log_segment(
178        location: Url,
179        log_segment: LogSegment,
180        engine: &dyn Engine,
181        operation_id: MetricId,
182    ) -> DeltaResult<Self> {
183        let read_crc = log_segment
184            .listed
185            .latest_crc_file
186            .as_ref()
187            .and_then(|crc_file| read_crc_file_or_none(engine, crc_file));
188
189        let (metadata, protocol) =
190            log_segment.read_protocol_metadata(engine, read_crc.as_ref(), operation_id)?;
191
192        let table_configuration =
193            TableConfiguration::try_new(metadata, protocol, location, log_segment.end_version)?;
194
195        tracing::Span::current().record("version", table_configuration.version());
196
197        let crc = read_crc.filter(|c| c.version == log_segment.end_version);
198        Self::new_with_crc(log_segment, table_configuration, crc)
199    }
200
201    /// Creates a new [`Snapshot`] representing the table state immediately after a commit.
202    ///
203    /// Appends the newly committed file to this snapshot's log segment and bumps the version,
204    /// producing a post-commit snapshot without a full log replay from storage.
205    ///
206    /// The `crc_delta` captures the CRC-relevant changes from the committed transaction
207    /// (file stats, domain metadata, ICT, etc.). If this snapshot had a CRC at its version,
208    /// the delta is applied to produce a precomputed in-memory CRC for the new version,
209    /// avoiding re-reading metadata from storage. If no CRC was available, the new snapshot
210    /// has no CRC either. CREATE TABLE handles CRC construction separately in
211    /// `Transaction::into_committed`.
212    pub(crate) fn new_post_commit(
213        &self,
214        commit: ParsedLogPath,
215        crc_delta: CrcDelta,
216    ) -> DeltaResult<Self> {
217        require!(
218            commit.is_commit(),
219            Error::internal_error(format!(
220                "Cannot create post-commit Snapshot. Log file is not a commit file. \
221                Path: {}, Type: {:?}.",
222                commit.location.location, commit.file_type
223            ))
224        );
225        let read_version = self.version();
226        let new_version = commit.version;
227        require!(
228            new_version == read_version.wrapping_add(1),
229            Error::internal_error(format!(
230                "Cannot create post-commit Snapshot. Log file version ({new_version}) does not \
231                equal Snapshot version ({read_version}) + 1."
232            ))
233        );
234
235        let new_table_configuration = TableConfiguration::new_post_commit(
236            self.table_configuration(),
237            new_version,
238            crc_delta.metadata.clone(),
239            crc_delta.protocol.clone(),
240        )?;
241
242        let new_log_segment = self.log_segment.new_with_commit_appended(commit)?;
243
244        let new_crc = self
245            .crc
246            .as_deref()
247            .map(|base| Arc::new(base.clone().apply(crc_delta, new_version)));
248
249        Snapshot::new_with_crc(new_log_segment, new_table_configuration, new_crc)
250    }
251
252    // ============================================================================
253    // Field accessors and state queries
254    // ============================================================================
255
256    /// Log segment this snapshot uses
257    #[internal_api]
258    pub(crate) fn log_segment(&self) -> &LogSegment {
259        &self.log_segment
260    }
261
262    /// Returns the CRC for this snapshot, if one is resolved.
263    ///
264    /// When `Some(crc)`, `crc.version == self.version()` and queries backed by the CRC hit
265    /// cache at zero I/O.
266    #[internal_api]
267    pub(crate) fn crc(&self) -> Option<&Arc<Crc>> {
268        self.crc.as_ref()
269    }
270
271    pub fn table_root(&self) -> &Url {
272        self.table_configuration.table_root()
273    }
274
275    /// Version of this `Snapshot` in the table.
276    pub fn version(&self) -> Version {
277        self.table_configuration().version()
278    }
279
280    /// Table [`Schema`] at this `Snapshot`s version.
281    ///
282    /// [`Schema`]: crate::schema::Schema
283    pub fn schema(&self) -> SchemaRef {
284        self.table_configuration.logical_schema()
285    }
286
287    /// Estimated owned heap size in bytes for this snapshot. Best-effort estimate
288    /// for capacity tracking, not authoritative.
289    ///
290    /// Counts only the dominant per-snapshot heap contributors, normally > 70% of the snapshot's
291    /// owned heap size:
292    /// - For every listed log path (commit, compaction, checkpoint, latest CRC, latest commit): the
293    ///   filename / extension / Url string heap.
294    /// - Vec buffer capacity (`capacity * size_of::<ParsedLogPath>()`) for the three Vec fields on
295    ///   `LogSegmentFiles`.
296    /// - The log root Url string.
297    /// - The raw `schemaString` JSON on table metadata.
298    ///
299    /// The Arc-shared variables (e.g. logical/physical schemas, `crc`) are not counted,
300    /// as they can be shared between multiple snapshots and are not owned by a single snapshot.
301    ///
302    /// Other variables' contributions to heap size are relatively small, so they are not counted
303    /// here.
304    ///
305    /// Runs in O(n) over listed log files.
306    pub fn estimated_owned_heap_size_bytes(&self) -> usize {
307        self.log_segment.listed.estimated_heap_size_bytes()
308            + self.log_segment.log_root.as_str().len()
309            + self
310                .table_configuration()
311                .metadata()
312                .schema_string()
313                .capacity()
314    }
315
316    /// Get the [`TableProperties`] for this [`Snapshot`].
317    pub fn table_properties(&self) -> &TableProperties {
318        self.table_configuration().table_properties()
319    }
320
321    /// Returns the protocol-derived table properties as a map of key-value pairs.
322    ///
323    /// This includes:
324    /// - `delta.minReaderVersion` and `delta.minWriterVersion`
325    /// - `delta.feature.<name> = "supported"` for each reader and writer feature (when using table
326    ///   features protocol, i.e. reader version 3 / writer version 7)
327    #[allow(unused)]
328    #[internal_api]
329    pub(crate) fn get_protocol_derived_properties(&self) -> HashMap<String, String> {
330        let protocol = self.table_configuration().protocol();
331
332        let mut properties = HashMap::from([
333            (
334                "delta.minReaderVersion".into(),
335                protocol.min_reader_version().to_string(),
336            ),
337            (
338                "delta.minWriterVersion".into(),
339                protocol.min_writer_version().to_string(),
340            ),
341        ]);
342
343        let features = protocol
344            .reader_features()
345            .into_iter()
346            .flatten()
347            .chain(protocol.writer_features().into_iter().flatten());
348
349        for feature in features {
350            properties
351                .entry(format!("delta.feature.{}", feature.as_ref()))
352                .or_insert_with(|| "supported".to_string());
353        }
354
355        properties
356    }
357
358    /// Get the raw metadata configuration for this table.
359    ///
360    /// This returns the `Metadata.configuration` map as stored in the Delta log, containing
361    /// user-defined properties, delta table properties (e.g., `delta.enableInCommitTimestamps`),
362    /// and application-specific properties (e.g., `io.unitycatalog.tableId`).
363    #[allow(unused)]
364    #[internal_api]
365    pub(crate) fn metadata_configuration(&self) -> &HashMap<String, String> {
366        self.table_configuration().metadata().configuration()
367    }
368
369    /// Get the [`TableConfiguration`] for this [`Snapshot`].
370    #[internal_api]
371    pub(crate) fn table_configuration(&self) -> &TableConfiguration {
372        &self.table_configuration
373    }
374
375    /// Fetch the latest version of the provided `application_id` for this snapshot. Filters the
376    /// txn based on the delta.setTransactionRetentionDuration property and lastUpdated.
377    ///
378    /// Uses the CRC fast path when available, otherwise falls back to log replay.
379    ///
380    /// Reports metrics: `SetTransactionLoaded`.
381    // TODO: add a get_app_id_versions to fetch all at once using SetTransactionScanner::get_all
382    #[instrument(
383        parent = &self.span,
384        name = SET_TRANSACTION_LOADED_SPAN,
385        skip_all,
386        err,
387        fields(report, from_cache, found)
388    )]
389    pub fn get_app_id_version(
390        &self,
391        application_id: &str,
392        engine: &dyn Engine,
393    ) -> DeltaResult<Option<i64>> {
394        fn record_metric(from_cache: bool, found: bool) {
395            let span = tracing::Span::current();
396            span.record("from_cache", from_cache);
397            span.record("found", found);
398        }
399
400        let expiration_timestamp =
401            calculate_transaction_expiration_timestamp(self.table_properties())?;
402
403        // Fast path: serve from CRC if available at this version.
404        if let Some(crc) = self.crc.as_deref() {
405            match &crc.set_transaction_state {
406                SetTransactionState::Complete(map) => {
407                    // Complete is authoritative: a miss means the app_id has no transaction.
408                    let version = map
409                        .get(application_id)
410                        .filter(|txn| !is_set_txn_expired(expiration_timestamp, txn.last_updated))
411                        .map(|txn| txn.version);
412                    record_metric(true, version.is_some());
413                    return Ok(version);
414                }
415                SetTransactionState::Partial(map) => {
416                    // Hit is authoritative; miss falls through to log replay below.
417                    if let Some(txn) = map.get(application_id) {
418                        let version = (!is_set_txn_expired(expiration_timestamp, txn.last_updated))
419                            .then_some(txn.version);
420                        record_metric(true, version.is_some());
421                        return Ok(version);
422                    }
423                }
424            }
425        }
426
427        // Fallback: full log replay.
428        let txn = SetTransactionScanner::get_one(
429            self.log_segment(),
430            application_id,
431            engine,
432            expiration_timestamp,
433        )?;
434        record_metric(false, txn.is_some());
435        Ok(txn.map(|t| t.version))
436    }
437
438    /// Fetch the domainMetadata for a specific domain in this snapshot. This returns the latest
439    /// configuration for the domain, or None if the domain does not exist.
440    ///
441    /// Note that this method performs log replay (fetches and processes metadata from storage).
442    pub fn get_domain_metadata(
443        &self,
444        domain: &str,
445        engine: &dyn Engine,
446    ) -> DeltaResult<Option<String>> {
447        if domain.starts_with(INTERNAL_DOMAIN_PREFIX) {
448            return Err(Error::generic(
449                "User DomainMetadata are not allowed to use system-controlled 'delta.*' domain",
450            ));
451        }
452
453        self.get_domain_metadata_internal(domain, engine)
454    }
455
456    /// Get the logical clustering columns for this snapshot, if clustering is enabled.
457    ///
458    /// Returns `Ok(Some(columns))` if the ClusteredTable feature is enabled and clustering
459    /// columns are defined, `Ok(None)` if clustering is not enabled, or an error if the
460    /// clustering metadata is malformed.
461    ///
462    /// The columns are returned as logical [`ColumnName`]s. When column mapping is enabled,
463    /// this converts the physical names stored in domain metadata back to logical names using
464    /// the table schema.
465    ///
466    /// Note that this method performs log replay (fetches and processes metadata from storage).
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the clustering domain metadata is malformed, or if a physical
471    /// column name cannot be resolved to a logical name in the schema.
472    ///
473    /// [`ColumnName`]: crate::expressions::ColumnName
474    #[allow(unused)]
475    #[internal_api]
476    pub(crate) fn get_logical_clustering_columns(
477        &self,
478        engine: &dyn Engine,
479    ) -> DeltaResult<Option<Vec<ColumnName>>> {
480        let physical_columns = match self.get_physical_clustering_columns(engine)? {
481            Some(cols) => cols,
482            None => return Ok(None),
483        };
484        let column_mapping_mode = self.table_configuration.column_mapping_mode();
485        if column_mapping_mode == ColumnMappingMode::None {
486            // No column mapping: physical = logical
487            return Ok(Some(physical_columns));
488        }
489        // Convert physical column names to logical names by walking the schema
490        let logical_schema = self.table_configuration.logical_schema();
491        let logical_columns = physical_columns
492            .iter()
493            .map(|physical_col| {
494                physical_to_logical_column_name(&logical_schema, physical_col, column_mapping_mode)
495            })
496            .collect::<DeltaResult<Vec<_>>>()?;
497        Ok(Some(logical_columns))
498    }
499
500    /// Get the clustering columns for this snapshot, if the table has clustering enabled.
501    ///
502    /// Returns `Ok(Some(columns))` if the ClusteredTable feature is enabled and clustering
503    /// columns are defined, `Ok(None)` if clustering is not enabled, or an error if the
504    /// clustering metadata is malformed.
505    ///
506    /// The columns are returned as physical column names, respecting the column mapping mode.
507    /// Note that this method performs log replay (fetches and processes metadata from storage).
508    #[internal_api]
509    pub(crate) fn get_physical_clustering_columns(
510        &self,
511        engine: &dyn Engine,
512    ) -> DeltaResult<Option<Vec<ColumnName>>> {
513        if !self
514            .table_configuration
515            .protocol()
516            .has_table_feature(&TableFeature::ClusteredTable)
517        {
518            return Ok(None);
519        }
520        match self.get_domain_metadata_internal(CLUSTERING_DOMAIN_NAME, engine)? {
521            Some(config) => Ok(Some(parse_clustering_columns(&config)?)),
522            None => Ok(None),
523        }
524    }
525
526    /// Load domain metadata: if Complete in the CRC, answer from the cache; else if every
527    /// requested domain is in a Partial cache, also answer from the cache; else full log
528    /// replay. `domains == None` means load all.
529    ///
530    /// Reports metrics: `DomainMetadataLoaded`.
531    #[instrument(
532        parent = &self.span,
533        name = DOMAIN_METADATA_LOADED_SPAN,
534        skip_all,
535        err,
536        fields(report, from_cache, num_domains_returned)
537    )]
538    #[internal_api]
539    pub(crate) fn get_domain_metadatas_internal(
540        &self,
541        engine: &dyn Engine,
542        domains: Option<&HashSet<&str>>,
543    ) -> DeltaResult<DomainMetadataMap> {
544        fn record_metric(from_cache: bool, num_domains_returned: usize) {
545            let span = tracing::Span::current();
546            span.record("from_cache", from_cache);
547            span.record("num_domains_returned", num_domains_returned as u64);
548        }
549
550        // Fast path: serve from CRC if it tracks domain metadata at this version.
551        if let Some(crc) = self.crc.as_deref() {
552            match &crc.domain_metadata_state {
553                DomainMetadataState::Complete(map) => {
554                    let hits: DomainMetadataMap = match domains {
555                        None => map.clone(),
556                        // Look up each filter key. A miss means the domain does not exist
557                        // at this version (Complete is authoritative for misses), so skip
558                        // it and return whatever hits we found.
559                        Some(filter) => filter
560                            .iter()
561                            .filter_map(|&k| map.get(k).map(|v| (k.to_string(), v.clone())))
562                            .collect(),
563                    };
564                    record_metric(true, hits.len());
565                    return Ok(hits);
566                }
567                DomainMetadataState::Partial(map) => {
568                    if let Some(filter) = domains {
569                        // Look up each filter key. A miss means we do not know whether the
570                        // domain exists, so abandon the cache and fall through to log
571                        // replay. `collect::<Option<_>>` short-circuits on the first None.
572                        // TODO(#2572): track tombstoned domains in `Partial` so removals
573                        //              observed during apply can return authoritative
574                        //              `None` instead of falling through.
575                        let hits: Option<DomainMetadataMap> = filter
576                            .iter()
577                            .map(|&k| map.get(k).map(|v| (k.to_string(), v.clone())))
578                            .collect();
579                        if let Some(hits) = hits {
580                            record_metric(true, hits.len());
581                            return Ok(hits);
582                        }
583                    }
584                }
585            }
586        }
587        // Fallback: scan the log_segment from scratch.
588        // TODO: a Partial cache already covers the commits read during snapshot load. A
589        //       miss search could skip that range and only scan the older commits, then
590        //       union those results with the Partial cache's entries to produce the final
591        //       answer.
592        let replayed = self.log_segment().scan_domain_metadatas(domains, engine)?;
593        record_metric(false, replayed.len());
594        Ok(replayed)
595    }
596
597    /// Fetch both user-controlled and system-controlled domain metadata for a specific domain
598    /// in this snapshot.
599    ///
600    /// Returns the latest configuration for the domain, or `None` if the domain does not exist
601    /// (or was removed). Unlike [`Snapshot::get_domain_metadata`], this does not reject `delta.*`
602    /// domains.
603    #[allow(unused)]
604    #[internal_api]
605    pub(crate) fn get_domain_metadata_internal(
606        &self,
607        domain: &str,
608        engine: &dyn Engine,
609    ) -> DeltaResult<Option<String>> {
610        let mut map = self.get_domain_metadatas_internal(engine, Some(&HashSet::from([domain])))?;
611        Ok(map.remove(domain).map(|dm| dm.configuration().to_owned()))
612    }
613
614    /// Fetch all non-internal domain metadata for this snapshot as a `Vec`.
615    ///
616    /// Internal (`delta.*`) domains are filtered out.
617    #[allow(unused)]
618    #[internal_api]
619    pub(crate) fn get_all_domain_metadata(
620        &self,
621        engine: &dyn Engine,
622    ) -> DeltaResult<Vec<DomainMetadata>> {
623        let all_metadata = self.get_domain_metadatas_internal(engine, None)?;
624        Ok(all_metadata
625            .into_values()
626            .filter(|domain| !domain.is_internal())
627            .collect())
628    }
629
630    /// Returns file-level statistics, or `None` if this snapshot has no CRC, or its CRC does
631    /// not have `Complete` file stats. Performs no I/O (the CRC is resolved at construction).
632    pub fn get_file_stats_if_present(&self) -> Option<FileStats> {
633        self.crc.as_ref().and_then(|crc| crc.file_stats().cloned())
634    }
635
636    /// Get the In-Commit Timestamp (ICT) for this snapshot.
637    ///
638    /// Returns the `inCommitTimestamp` from the CommitInfo action of the commit that created this
639    /// snapshot.
640    ///
641    /// # Returns
642    /// - `Ok(Some(timestamp))` - ICT is enabled and available for this version
643    /// - `Ok(None)` - ICT is not enabled
644    /// - `Err(...)` - ICT is enabled but cannot be read, or enablement version is invalid
645    #[instrument(parent = &self.span, name = "snap.get_ict", skip_all, err)]
646    #[internal_api]
647    pub(crate) fn get_in_commit_timestamp(&self, engine: &dyn Engine) -> DeltaResult<Option<i64>> {
648        // Get ICT enablement info and check if we should read ICT for this version
649        let enablement = self
650            .table_configuration()
651            .in_commit_timestamp_enablement()?;
652
653        // Return None if ICT is not enabled at all
654        if matches!(enablement, InCommitTimestampEnablement::NotEnabled) {
655            return Ok(None);
656        }
657
658        // If ICT is enabled with an enablement version, verify the enablement version is not in the
659        // future
660        if let InCommitTimestampEnablement::Enabled {
661            enablement: Some((enablement_version, _)),
662        } = enablement
663        {
664            if self.version() < enablement_version {
665                return Err(Error::generic(format!(
666                    "Invalid state: snapshot at version {} has ICT enablement version {} in the future",
667                    self.version(),
668                    enablement_version
669                )));
670            }
671        }
672
673        // Fast path: serve ICT from CRC if available at this version.
674        if let Some(crc) = self.crc.as_deref() {
675            match crc.in_commit_timestamp_opt {
676                Some(ict) => return Ok(Some(ict)),
677                None => {
678                    return Err(Error::generic(format!(
679                        "In-Commit Timestamp not found in CRC file at version {}",
680                        self.version()
681                    )));
682                }
683            }
684        }
685
686        // Fallback: read the ICT from latest_commit_file
687        match &self.log_segment.listed.latest_commit_file {
688            Some(commit_file_meta) => {
689                let ict = commit_file_meta.read_in_commit_timestamp(engine)?;
690                Ok(Some(ict))
691            }
692            None => Err(Error::generic("Last commit file not found in log segment")),
693        }
694    }
695
696    /// Get the timestamp for this snapshot's version, in milliseconds since the Unix epoch.
697    ///
698    /// When In-Commit Timestamp (ICT) are enabled, returns the In-Commit Timestamp value.
699    /// Otherwise, falls back to the filesystem last-modified time of the latest commit file.
700    ///
701    /// Returns an error if the commit file is missing, the ICT configuration is invalid, or the
702    /// ICT value cannot be read.
703    ///
704    /// See also [`get_in_commit_timestamp`] for ICT-only semantics.
705    ///
706    /// [`get_in_commit_timestamp`]: Self::get_in_commit_timestamp
707    #[allow(unused)]
708    #[instrument(parent = &self.span, name = "snap.get_ts", skip_all, err)]
709    pub fn get_timestamp(&self, engine: &dyn Engine) -> DeltaResult<i64> {
710        match self
711            .table_configuration()
712            .in_commit_timestamp_enablement()?
713        {
714            InCommitTimestampEnablement::NotEnabled => {
715                match &self.log_segment.listed.latest_commit_file {
716                    Some(commit_file_meta) => {
717                        let ts = commit_file_meta.location.last_modified;
718                        Ok(ts)
719                    }
720                    None => Err(Error::generic(format!(
721                        "Last commit file not found in log segment for version {} \
722                         (ICT disabled): cannot read filesystem modification timestamp",
723                        self.version()
724                    ))),
725                }
726            }
727            InCommitTimestampEnablement::Enabled { .. } => self
728                .get_in_commit_timestamp(engine)
729                .map_err(|e| {
730                    Error::generic(format!(
731                        "Unable to read in-commit timestamp for version {}: {e}",
732                        self.version()
733                    ))
734                })?
735                .ok_or_else(|| {
736                    Error::internal_error(format!(
737                        "Invalid state: version {}, ICT is enabled \
738                        but get_in_commit_timestamp returned None",
739                        self.version()
740                    ))
741                }),
742        }
743    }
744
745    // ============================================================================
746    // Downstream operation builders
747    // ============================================================================
748
749    /// Create a [`ScanBuilder`] for an `SnapshotRef`.
750    pub fn scan_builder(self: Arc<Self>) -> ScanBuilder {
751        ScanBuilder::new(self)
752    }
753
754    /// Create an [`IncrementalScanBuilder`] for the range `(base_version, self.version()]`.
755    ///
756    /// Use this to advance a cached file listing from `base_version` to this snapshot's
757    /// version without doing a full scan. See [`IncrementalScanBuilder`] for details.
758    pub fn incremental_scan_builder(
759        self: Arc<Self>,
760        base_version: Version,
761    ) -> IncrementalScanBuilder {
762        IncrementalScanBuilder::new(self, base_version)
763    }
764
765    /// Create a [`Transaction`] for this `SnapshotRef`. With the specified [`Committer`].
766    ///
767    /// Note: For tables with clustering enabled, this performs log replay to read clustering
768    /// columns from domain metadata, which may have a performance cost.
769    pub fn transaction(
770        self: Arc<Self>,
771        committer: Box<dyn Committer>,
772        engine: &dyn Engine,
773    ) -> DeltaResult<Transaction> {
774        Transaction::try_new_existing_table(self, committer, engine)
775    }
776
777    /// Creates a builder for altering this table's metadata. Currently supports schema change
778    /// operations.
779    ///
780    /// The returned builder allows chaining operations before building an
781    /// [`AlterTableTransaction`] that can be committed.
782    ///
783    /// [`AlterTableTransaction`]: crate::transaction::AlterTableTransaction
784    pub fn alter_table(self: Arc<Self>) -> AlterTableTransactionBuilder {
785        AlterTableTransactionBuilder::new(self)
786    }
787
788    /// Creates a [`CheckpointWriter`] for generating a checkpoint from this snapshot.
789    ///
790    /// See the [`crate::checkpoint`] module documentation for more details on checkpoint types
791    /// and the overall checkpoint process.
792    pub fn create_checkpoint_writer(self: Arc<Self>) -> DeltaResult<CheckpointWriter> {
793        CheckpointWriter::try_new(self)
794    }
795
796    /// Creates a [`LogCompactionWriter`] for generating a log compaction file.
797    ///
798    /// Log compaction aggregates commit files in a version range into a single compacted file,
799    /// improving performance by reducing the number of files to process during log replay.
800    ///
801    /// # Parameters
802    /// - `start_version`: The first version to include in the compaction (inclusive)
803    /// - `end_version`: The last version to include in the compaction (inclusive)
804    ///
805    /// # Returns
806    /// A [`LogCompactionWriter`] that can be used to generate the compaction file.
807    ///
808    /// NOTE: This method is currently a no-op because log compaction is disabled (#2337)
809    pub fn log_compaction_writer(
810        self: Arc<Self>,
811        start_version: Version,
812        end_version: Version,
813    ) -> DeltaResult<LogCompactionWriter> {
814        LogCompactionWriter::try_new(self, start_version, end_version)
815    }
816
817    // ============================================================================
818    // Mutations
819    // ============================================================================
820
821    /// Writes a version checksum (CRC) file for this snapshot. Writers should call this after
822    /// every commit because checksums enable faster snapshot loading and table state validation.
823    ///
824    /// Currently only supports writing from a post-commit snapshot that has pre-computed CRC
825    /// information in memory (i.e. the snapshot returned by
826    /// [`CommittedTransaction::post_commit_snapshot`]).
827    ///
828    /// Returns a tuple of [`ChecksumWriteResult`] and a [`SnapshotRef`]. On
829    /// [`ChecksumWriteResult::Written`], the returned snapshot has the CRC file recorded in
830    /// its log segment. On [`ChecksumWriteResult::AlreadyExists`], the original snapshot is
831    /// returned unchanged.
832    ///
833    /// # Errors
834    ///
835    /// - [`Error::ChecksumWriteUnsupported`] if no in-memory CRC is available at this snapshot's
836    ///   version (e.g. a snapshot loaded from disk that has no CRC file), if the CRC's
837    ///   `file_stats_state` is `Indeterminate` (a non-incremental operation like ANALYZE STATS was
838    ///   encountered, or a file action had a missing size; recoverable with a full state
839    ///   reconstruction in the future), or if `delta.enableInCommitTimestamps` is `true` but
840    ///   `inCommitTimestampOpt` is absent.
841    /// - I/O errors from the engine's storage handler if the write fails.
842    ///
843    /// [`CommittedTransaction::post_commit_snapshot`]: crate::transaction::CommittedTransaction::post_commit_snapshot
844    #[instrument(parent = &self.span, name = "snap.write_checksum", skip_all, err)]
845    pub fn write_checksum(
846        self: &SnapshotRef,
847        engine: &dyn Engine,
848    ) -> DeltaResult<(ChecksumWriteResult, SnapshotRef)> {
849        let has_crc_on_disk = self
850            .log_segment
851            .listed
852            .latest_crc_file
853            .as_ref()
854            .is_some_and(|f| f.version == self.version());
855
856        if has_crc_on_disk {
857            info!(
858                "CRC file already exists on disk at version {}",
859                self.version()
860            );
861            return Ok((ChecksumWriteResult::AlreadyExists, Arc::clone(self)));
862        }
863
864        let crc = self.crc.as_deref().ok_or_else(|| {
865            Error::ChecksumWriteUnsupported(
866                "No in-memory CRC available at this snapshot version.".to_string(),
867            )
868        })?;
869
870        let crc_path = ParsedLogPath::new_crc(self.table_root(), self.version())?;
871
872        match try_write_crc_file(engine, &crc_path.location, crc) {
873            Ok(()) => {
874                info!("Wrote CRC file at {}", crc_path.location);
875                let new_log_segment = self.log_segment.try_new_with_crc_file(crc_path)?;
876                let new_snapshot = Arc::new(Snapshot::new_with_crc(
877                    new_log_segment,
878                    self.table_configuration().clone(),
879                    self.crc.clone(),
880                )?);
881                Ok((ChecksumWriteResult::Written, new_snapshot))
882            }
883            Err(Error::FileAlreadyExists(_)) => {
884                info!(
885                    "Another writer beat us to writing CRC file at {}",
886                    crc_path.location
887                );
888                Ok((ChecksumWriteResult::AlreadyExists, Arc::clone(self)))
889            }
890            Err(e) => Err(e),
891        }
892    }
893
894    /// Performs a complete checkpoint of this snapshot using the provided engine.
895    ///
896    /// If a checkpoint already exists at this version, returns
897    /// [`CheckpointWriteResult::AlreadyExists`] with the original snapshot unchanged.
898    /// Otherwise, writes a checkpoint parquet file and the `_last_checkpoint` file and returns
899    /// [`CheckpointWriteResult::Written`] with an updated [`SnapshotRef`] whose log segment
900    /// reflects the new checkpoint. Commits and compaction files subsumed by the checkpoint are
901    /// dropped from the returned snapshot.
902    ///
903    /// # Parameters
904    /// - `engine`: Engine for data processing and I/O
905    /// - `spec`: Checkpoint format specification. `None` uses the default checkpoint settings
906    ///   (auto-detecting V1/V2 from table features). For V2 checkpoints, the default is to not
907    ///   write sidecar files.
908    ///
909    /// # Errors
910    /// - If `CheckpointSpec::V2` is used but the table does not support the `v2Checkpoint` feature.
911    /// - If `CheckpointSpec::V1` is used but the table supports `v2Checkpoint` feature. Note: the
912    ///   Delta protocol permits writing V1 checkpoints to such tables; this is a kernel limitation.
913    /// - If `file_actions_per_sidecar_hint` is `Some(0)`.
914    /// - If the checkpoint write fails (e.g. I/O, parquet write). A `FileAlreadyExists` error is
915    ///   not propagated; it returns [`CheckpointWriteResult::AlreadyExists`] instead. Note: this
916    ///   also fires on the (unlikely) case of a sidecar UUID filename collision, where
917    ///   it should ideally surface as an error. Tracked in
918    ///   <https://github.com/delta-io/delta-kernel-rs/issues/2503>.
919    ///
920    /// Note:
921    ///     - It is still possible that an existing checkpoint gets overwritten if that checkpoint
922    ///       was written by a concurrent writer.
923    ///     - This function uses [`crate::ParquetHandler::write_parquet_file`] and
924    ///       [`crate::StorageHandler::head`], which may not be implemented by all engines. If you
925    ///       are using the default engine, make sure to build it with the multi-threaded executor
926    ///       if you want to use this method.
927    ///
928    /// [`CheckpointSpec`]: crate::checkpoint::CheckpointSpec
929    ///
930    /// Note: There is currently no public api for callers to determine whether a table supports V2
931    /// checkpoints directly. Tracked in <https://github.com/delta-io/delta-kernel-rs/issues/2450>.
932    #[instrument(parent = &self.span, name = "snap.checkpoint", skip_all, err)]
933    pub fn checkpoint(
934        self: &SnapshotRef,
935        engine: &dyn Engine,
936        spec: Option<&CheckpointSpec>,
937    ) -> DeltaResult<(CheckpointWriteResult, SnapshotRef)> {
938        if self.log_segment.checkpoint_version == Some(self.log_segment.end_version) {
939            info!(
940                "Checkpoint already exists for snapshot version {}",
941                self.version()
942            );
943            return Ok((CheckpointWriteResult::AlreadyExists, Arc::clone(self)));
944        }
945        info!(
946            "Writing checkpoint for snapshot version {} with spec {:?}",
947            self.version(),
948            spec
949        );
950
951        let v2_supported = self
952            .table_configuration()
953            .is_feature_supported(&TableFeature::V2Checkpoint);
954        match spec {
955            Some(CheckpointSpec::V2(cfg)) => {
956                if !v2_supported {
957                    return Err(Error::checkpoint_write(
958                        "CheckpointSpec::V2 requires the v2Checkpoint table feature to be supported",
959                    ));
960                }
961                if let V2CheckpointConfig::WithSidecar {
962                    file_actions_per_sidecar_hint: Some(0),
963                } = cfg
964                {
965                    return Err(Error::checkpoint_write(
966                        "file_actions_per_sidecar_hint must be greater than 0",
967                    ));
968                }
969            }
970            Some(CheckpointSpec::V1) if v2_supported => {
971                // TODO: remove this once we support writing V1 checkpoints even if table supports
972                // v2Checkpoint See <https://github.com/delta-io/delta-kernel-rs/issues/2454>.
973                return Err(Error::unsupported(
974                    "Kernel does not support writing V1 checkpoints when the table supports v2Checkpoint",
975                ));
976            }
977            _ => {}
978        }
979
980        let writer = Arc::clone(self).create_checkpoint_writer()?;
981
982        let write_result = match spec {
983            Some(CheckpointSpec::V2(V2CheckpointConfig::WithSidecar {
984                file_actions_per_sidecar_hint,
985            })) => {
986                let hint =
987                    file_actions_per_sidecar_hint.unwrap_or(DEFAULT_FILE_ACTIONS_PER_SIDECAR_HINT);
988                writer.write_v2_checkpoint_with_sidecars(engine, hint)
989            }
990            _ => writer.write_checkpoint_without_sidecars(engine),
991        };
992
993        let info = match write_result {
994            Ok(info) => info,
995            Err(Error::FileAlreadyExists(_)) => {
996                // NOTE: Per write_parquet_file's documentation, it should silently overwrite
997                // existing files, so we log a warning but still return the correct result.
998                warn!(
999                    "ParquetHandler::write_parquet_file unexpectedly failed on \
1000                    FileAlreadyExists for version {}",
1001                    self.version()
1002                );
1003                return Ok((CheckpointWriteResult::AlreadyExists, Arc::clone(self)));
1004            }
1005            Err(e) => return Err(e),
1006        };
1007
1008        writer.finalize(engine, &info.last_checkpoint_stats)?;
1009
1010        let checkpoint_log_path = ParsedLogPath::try_from(info.file_meta)?.ok_or_else(|| {
1011            Error::internal_error("Checkpoint path could not be parsed as a log path")
1012        })?;
1013        let new_log_segment = self
1014            .log_segment
1015            .try_new_with_checkpoint(checkpoint_log_path)?;
1016        Ok((
1017            CheckpointWriteResult::Written,
1018            Arc::new(Snapshot::new_with_crc(
1019                new_log_segment,
1020                self.table_configuration().clone(),
1021                self.crc.clone(),
1022            )?),
1023        ))
1024    }
1025
1026    /// Publishes all catalog commits at this table version. Applicable only to catalog-managed
1027    /// tables. This method is a no-op for filesystem-managed tables or if there are no catalog
1028    /// commits to publish.
1029    ///
1030    /// Publishing copies ratified catalog commits to the Delta log as published Delta files,
1031    /// reducing catalog storage requirements and enabling some table maintenance operations,
1032    /// like checkpointing.
1033    ///
1034    /// # Parameters
1035    ///
1036    /// - `engine`: The engine to use for publishing commits
1037    ///
1038    /// # Errors
1039    ///
1040    /// Returns an error if the publish operation fails, or if there are catalog commits that need
1041    /// publishing but the table or committer don't support publishing.
1042    ///
1043    /// # See Also
1044    ///
1045    /// - [`Committer::publish`]
1046    #[instrument(parent = &self.span, name = "snap.publish", skip_all, err)]
1047    pub fn publish(
1048        self: &SnapshotRef,
1049        engine: &dyn Engine,
1050        committer: &dyn Committer,
1051    ) -> DeltaResult<SnapshotRef> {
1052        let unpublished_catalog_commits = self.log_segment().get_unpublished_catalog_commits()?;
1053
1054        if unpublished_catalog_commits.is_empty() {
1055            return Ok(Arc::clone(self));
1056        }
1057
1058        require!(
1059            unpublished_catalog_commits
1060                .windows(2)
1061                .all(|commits| commits[0].version() + 1 == commits[1].version()),
1062            Error::generic(format!(
1063                "Expected ordered and contiguous unpublished catalog commits. \
1064                 Got: {unpublished_catalog_commits:?}"
1065            ))
1066        );
1067
1068        require!(
1069            self.table_configuration().is_catalog_managed(),
1070            Error::generic(
1071                "There are catalog commits that need publishing, but the table is not catalog-managed.",
1072            )
1073        );
1074
1075        require!(
1076            committer.is_catalog_committer(),
1077            Error::generic(
1078                "There are catalog commits that need publishing, but the committer is not a catalog committer.",
1079            )
1080        );
1081
1082        let publish_metadata =
1083            PublishMetadata::try_new(self.version(), unpublished_catalog_commits)?;
1084
1085        committer.publish(engine, publish_metadata)?;
1086
1087        Ok(Arc::new(Snapshot::new_with_crc(
1088            self.log_segment().new_as_published()?,
1089            self.table_configuration().clone(),
1090            self.crc.clone(),
1091        )?))
1092    }
1093}
1094
1095// TODO: unify this and lots of stuff in LogSegment tests and test_utils.
1096#[cfg(test)]
1097async fn commit(
1098    table_root: impl AsRef<str>,
1099    store: &crate::object_store::memory::InMemory,
1100    version: Version,
1101    commit: Vec<serde_json::Value>,
1102) {
1103    let commit_data = commit
1104        .iter()
1105        .map(ToString::to_string)
1106        .collect::<Vec<String>>()
1107        .join("\n");
1108    test_utils::add_commit(table_root, store, version, commit_data)
1109        .await
1110        .unwrap();
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115    use std::path::PathBuf;
1116    use std::sync::Arc;
1117
1118    use rstest::rstest;
1119    use serde_json::json;
1120    use test_utils::table_builder::{
1121        checkpoint_json_stats, unpartitioned, FeatureSet, LogState, TestTableBuilder, VersionTarget,
1122    };
1123    use test_utils::{add_commit, delta_path_for_version};
1124
1125    use super::{commit, *};
1126    use crate::actions::{DomainMetadata, Protocol};
1127    use crate::arrow::array::StringArray;
1128    use crate::arrow::record_batch::RecordBatch;
1129    use crate::committer::FileSystemCommitter;
1130    use crate::engine::arrow_data::ArrowEngineData;
1131    use crate::engine::sync::SyncEngine;
1132    use crate::last_checkpoint_hint::LastCheckpointHint;
1133    use crate::log_segment::LogSegment;
1134    use crate::log_segment_files::LogSegmentFiles;
1135    use crate::object_store::memory::InMemory;
1136    use crate::object_store::path::Path;
1137    use crate::object_store::ObjectStoreExt as _;
1138    use crate::parquet::arrow::ArrowWriter;
1139    use crate::path::ParsedLogPath;
1140    use crate::schema::{DataType, StructField, StructType};
1141    use crate::table_features::{
1142        TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION,
1143    };
1144    use crate::table_properties::ENABLE_IN_COMMIT_TIMESTAMPS;
1145    use crate::transaction::create_table::create_table;
1146    use crate::utils::test_utils::{assert_result_error_with_message, string_array_to_engine_data};
1147
1148    /// Helper function to create a commitInfo action with optional ICT
1149    fn create_commit_info(timestamp: i64, ict: Option<i64>) -> serde_json::Value {
1150        let mut commit_info = json!({
1151            "timestamp": timestamp,
1152            "operation": "WRITE",
1153        });
1154
1155        if let Some(ict_value) = ict {
1156            commit_info["inCommitTimestamp"] = json!(ict_value);
1157        }
1158
1159        json!({
1160            "commitInfo": commit_info
1161        })
1162    }
1163
1164    fn create_protocol(ict_enabled: bool, min_reader_version: Option<u32>) -> serde_json::Value {
1165        let reader_version = min_reader_version.unwrap_or(1);
1166
1167        if ict_enabled {
1168            let mut protocol = json!({
1169                "protocol": {
1170                    "minReaderVersion": reader_version,
1171                    "minWriterVersion": TABLE_FEATURES_MIN_WRITER_VERSION,
1172                    "writerFeatures": ["inCommitTimestamp"]
1173                }
1174            });
1175
1176            // Only include readerFeatures if minReaderVersion >= table-features minimum.
1177            if reader_version >= TABLE_FEATURES_MIN_READER_VERSION as u32 {
1178                protocol["protocol"]["readerFeatures"] = json!([]);
1179            }
1180
1181            protocol
1182        } else {
1183            json!({
1184                "protocol": {
1185                    "minReaderVersion": reader_version,
1186                    "minWriterVersion": 2
1187                }
1188            })
1189        }
1190    }
1191
1192    fn create_metadata(
1193        id: Option<&str>,
1194        schema_string: Option<&str>,
1195        created_time: Option<u64>,
1196        ict_config: Option<(String, String)>,
1197        ict_enabled_but_missing_version: bool,
1198    ) -> serde_json::Value {
1199        let config = if ict_enabled_but_missing_version {
1200            // Special case for testing ICT enabled but missing enablement info
1201            json!({
1202                "delta.enableInCommitTimestamps": "true"
1203            })
1204        } else if let Some((enablement_version, enablement_timestamp)) = ict_config {
1205            json!({
1206                "delta.enableInCommitTimestamps": "true",
1207                "delta.inCommitTimestampEnablementVersion": enablement_version,
1208                "delta.inCommitTimestampEnablementTimestamp": enablement_timestamp
1209            })
1210        } else {
1211            json!({})
1212        };
1213
1214        json!({
1215            "metaData": {
1216                "id": id.unwrap_or("testId"),
1217                "format": {"provider": "parquet", "options": {}},
1218                "schemaString": schema_string.unwrap_or("{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}"),
1219                "partitionColumns": [],
1220                "configuration": config,
1221                "createdTime": created_time.unwrap_or(1587968586154u64)
1222            }
1223        })
1224    }
1225
1226    fn create_basic_commit(ict_enabled: bool, ict_config: Option<(String, String)>) -> String {
1227        let protocol = create_protocol(ict_enabled, None);
1228        let metadata = create_metadata(None, None, None, ict_config, false);
1229        format!("{protocol}\n{metadata}")
1230    }
1231
1232    fn create_snapshot_with_commit_file_absent_from_log_segment(
1233        url: &Url,
1234        table_cfg: TableConfiguration,
1235    ) -> DeltaResult<Snapshot> {
1236        // Create a log segment with only checkpoint and no commit file (simulating scenario
1237        // where a checkpoint exists but the commit file has been cleaned up)
1238        let checkpoint_parts = vec![ParsedLogPath::try_from(crate::FileMeta {
1239            location: url.join("_delta_log/00000000000000000000.checkpoint.parquet")?,
1240            last_modified: 0,
1241            size: 100,
1242        })?
1243        .unwrap()];
1244
1245        let listed_files = LogSegmentFiles {
1246            checkpoint_parts,
1247            ..Default::default()
1248        };
1249
1250        let log_segment =
1251            LogSegment::try_new(listed_files, url.join("_delta_log/")?, Some(0), None)?;
1252
1253        Snapshot::new_with_crc(log_segment, table_cfg, None)
1254    }
1255
1256    #[test]
1257    fn test_snapshot_read_metadata() {
1258        let path =
1259            std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
1260        let url = url::Url::from_directory_path(path).unwrap();
1261
1262        let engine = SyncEngine::new();
1263        let snapshot = Snapshot::builder_for(url)
1264            .at_version(1)
1265            .build(&engine)
1266            .unwrap();
1267
1268        let expected = Protocol::try_new_modern(["deletionVectors"], ["deletionVectors"]).unwrap();
1269        assert_eq!(snapshot.table_configuration().protocol(), &expected);
1270
1271        let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#;
1272        let expected: SchemaRef = serde_json::from_str(schema_string).unwrap();
1273        assert_eq!(snapshot.schema(), expected);
1274    }
1275
1276    #[test]
1277    fn test_new_snapshot() {
1278        let path =
1279            std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
1280        let url = url::Url::from_directory_path(path).unwrap();
1281
1282        let engine = SyncEngine::new();
1283        let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
1284
1285        let expected = Protocol::try_new_modern(["deletionVectors"], ["deletionVectors"]).unwrap();
1286        assert_eq!(snapshot.table_configuration().protocol(), &expected);
1287
1288        let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#;
1289        let expected: SchemaRef = serde_json::from_str(schema_string).unwrap();
1290        assert_eq!(snapshot.schema(), expected);
1291    }
1292
1293    #[test]
1294    fn test_read_table_with_missing_last_checkpoint() {
1295        // this table doesn't have a _last_checkpoint file
1296        let path = std::fs::canonicalize(PathBuf::from(
1297            "./tests/data/table-with-dv-small/_delta_log/",
1298        ))
1299        .unwrap();
1300        let url = url::Url::from_directory_path(path).unwrap();
1301
1302        let engine = SyncEngine::new();
1303        let storage = engine.storage_handler();
1304        let cp = LastCheckpointHint::try_read(storage.as_ref(), &url).unwrap();
1305        assert!(cp.is_none());
1306    }
1307
1308    fn valid_last_checkpoint() -> (Vec<u8>, LastCheckpointHint) {
1309        let checkpoint = LastCheckpointHint {
1310            version: 1,
1311            size: 8,
1312            parts: None,
1313            size_in_bytes: Some(21857),
1314            num_of_add_files: None,
1315            checkpoint_schema: None,
1316            checksum: None,
1317            tags: None,
1318        };
1319        let data = checkpoint.to_json_bytes();
1320        (data, checkpoint)
1321    }
1322
1323    fn valid_last_checkpoint_with_tags() -> (Vec<u8>, LastCheckpointHint) {
1324        use std::collections::HashMap;
1325
1326        let (_, base_checkpoint) = valid_last_checkpoint();
1327
1328        let mut tags = HashMap::new();
1329        tags.insert(
1330            "author".to_string(),
1331            "test_read_table_with_last_checkpoint".to_string(),
1332        );
1333        tags.insert("environment".to_string(), "snapshot_tests".to_string());
1334        tags.insert("created_by".to_string(), "delta-kernel-rs".to_string());
1335
1336        let checkpoint = LastCheckpointHint {
1337            tags: Some(tags),
1338            ..base_checkpoint
1339        };
1340
1341        let data = checkpoint.to_json_bytes();
1342        (data, checkpoint)
1343    }
1344
1345    #[tokio::test]
1346    async fn test_read_table_with_empty_last_checkpoint() {
1347        // in memory file system
1348        let store = Arc::new(InMemory::new());
1349
1350        // do a _last_checkpoint file with "{}" as content
1351        let empty = "{}".as_bytes().to_vec();
1352        let invalid_path = Path::from("invalid/_last_checkpoint");
1353
1354        store
1355            .put(&invalid_path, empty.into())
1356            .await
1357            .expect("put _last_checkpoint");
1358
1359        let engine = SyncEngine::new_with_store(store);
1360        let storage = engine.storage_handler();
1361        let url = Url::parse("memory:///invalid/").expect("valid url");
1362        let invalid =
1363            LastCheckpointHint::try_read(storage.as_ref(), &url).expect("read last checkpoint");
1364        assert!(invalid.is_none())
1365    }
1366
1367    #[tokio::test]
1368    async fn test_read_table_with_last_checkpoint() {
1369        // in memory file system
1370        let store = Arc::new(InMemory::new());
1371
1372        // Define test cases: (path, data, expected_result)
1373        let (data, expected) = valid_last_checkpoint();
1374        let (data_with_tags, expected_with_tags) = valid_last_checkpoint_with_tags();
1375        let test_cases = vec![
1376            ("valid", data, Some(expected)),
1377            ("invalid", "invalid".as_bytes().to_vec(), None),
1378            ("valid_with_tags", data_with_tags, Some(expected_with_tags)),
1379        ];
1380
1381        // Write all test files to the in memory file system
1382        for (path_prefix, data, _) in &test_cases {
1383            let path = Path::from(format!("{path_prefix}/_last_checkpoint"));
1384            store
1385                .put(&path, data.clone().into())
1386                .await
1387                .expect("put _last_checkpoint");
1388        }
1389
1390        let engine = SyncEngine::new_with_store(store);
1391        let storage = engine.storage_handler();
1392
1393        // Test reading all checkpoints from the in memory file system for cases where the data is
1394        // valid, invalid and valid with tags.
1395        for (path_prefix, _, expected_result) in test_cases {
1396            let url = Url::parse(&format!("memory:///{path_prefix}/")).expect("valid url");
1397            let result =
1398                LastCheckpointHint::try_read(storage.as_ref(), &url).expect("read last checkpoint");
1399            assert_eq!(result, expected_result);
1400        }
1401    }
1402
1403    #[test_log::test]
1404    fn test_read_table_with_checkpoint() {
1405        let path = std::fs::canonicalize(PathBuf::from(
1406            "./tests/data/with_checkpoint_no_last_checkpoint/",
1407        ))
1408        .unwrap();
1409        let location = url::Url::from_directory_path(path).unwrap();
1410        let engine = SyncEngine::new();
1411        let snapshot = Snapshot::builder_for(location).build(&engine).unwrap();
1412
1413        assert_eq!(snapshot.log_segment.listed.checkpoint_parts.len(), 1);
1414        assert_eq!(
1415            ParsedLogPath::try_from(
1416                snapshot.log_segment.listed.checkpoint_parts[0]
1417                    .location
1418                    .clone()
1419            )
1420            .unwrap()
1421            .unwrap()
1422            .version,
1423            2,
1424        );
1425        assert_eq!(snapshot.log_segment.listed.ascending_commit_files.len(), 1);
1426        assert_eq!(
1427            ParsedLogPath::try_from(
1428                snapshot.log_segment.listed.ascending_commit_files[0]
1429                    .location
1430                    .clone()
1431            )
1432            .unwrap()
1433            .unwrap()
1434            .version,
1435            3,
1436        );
1437    }
1438
1439    #[tokio::test]
1440    async fn test_domain_metadata() -> DeltaResult<()> {
1441        let table_root = "memory:///test_table/";
1442        let store = Arc::new(InMemory::new());
1443        let engine = SyncEngine::new_with_store(store.clone());
1444
1445        // commit0
1446        // - domain1: not removed
1447        // - domain2: not removed
1448        let commit = [
1449            json!({
1450                "protocol": {
1451                    "minReaderVersion": 1,
1452                    "minWriterVersion": 1
1453                }
1454            }),
1455            json!({
1456                "metaData": {
1457                    "id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9",
1458                    "format": { "provider": "parquet", "options": {} },
1459                    "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
1460                    "partitionColumns": [],
1461                    "configuration": {},
1462                    "createdTime": 1587968585495i64
1463                }
1464            }),
1465            json!({
1466                "domainMetadata": {
1467                    "domain": "domain1",
1468                    "configuration": "domain1_commit0",
1469                    "removed": false
1470                }
1471            }),
1472            json!({
1473                "domainMetadata": {
1474                    "domain": "domain2",
1475                    "configuration": "domain2_commit0",
1476                    "removed": false
1477                }
1478            }),
1479            json!({
1480                "domainMetadata": {
1481                    "domain": "domain3",
1482                    "configuration": "domain3_commit0",
1483                    "removed": false
1484                }
1485            }),
1486        ]
1487        .map(|json| json.to_string())
1488        .join("\n");
1489        add_commit(table_root, store.clone().as_ref(), 0, commit)
1490            .await
1491            .unwrap();
1492
1493        // commit1
1494        // - domain1: removed
1495        // - domain2: not-removed
1496        // - internal domain
1497        let commit = [
1498            json!({
1499                "domainMetadata": {
1500                    "domain": "domain1",
1501                    "configuration": "domain1_commit1",
1502                    "removed": true
1503                }
1504            }),
1505            json!({
1506                "domainMetadata": {
1507                    "domain": "domain2",
1508                    "configuration": "domain2_commit1",
1509                    "removed": false
1510                }
1511            }),
1512            json!({
1513                "domainMetadata": {
1514                    "domain": "delta.domain3",
1515                    "configuration": "domain3_commit1",
1516                    "removed": false
1517                }
1518            }),
1519        ]
1520        .map(|json| json.to_string())
1521        .join("\n");
1522        add_commit(table_root, store.as_ref(), 1, commit)
1523            .await
1524            .unwrap();
1525
1526        let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
1527
1528        // Test get_domain_metadata
1529
1530        assert_eq!(snapshot.get_domain_metadata("domain1", &engine)?, None);
1531        assert_eq!(
1532            snapshot.get_domain_metadata("domain2", &engine)?,
1533            Some("domain2_commit1".to_string())
1534        );
1535        assert_eq!(
1536            snapshot.get_domain_metadata("domain3", &engine)?,
1537            Some("domain3_commit0".to_string())
1538        );
1539        let err = snapshot
1540            .get_domain_metadata("delta.domain3", &engine)
1541            .unwrap_err();
1542        assert!(matches!(err, Error::Generic(msg) if
1543                msg == "User DomainMetadata are not allowed to use system-controlled 'delta.*' domain"));
1544
1545        // Test get_domain_metadata_internal
1546        assert_eq!(
1547            snapshot.get_domain_metadata_internal("delta.domain3", &engine)?,
1548            Some("domain3_commit1".to_string())
1549        );
1550
1551        // Test get_all_domain_metadata
1552        let mut metadata = snapshot.get_all_domain_metadata(&engine)?;
1553        metadata.sort_by(|a, b| a.domain().cmp(b.domain()));
1554
1555        let mut expected = vec![
1556            DomainMetadata::new("domain2".to_string(), "domain2_commit1".to_string()),
1557            DomainMetadata::new("domain3".to_string(), "domain3_commit0".to_string()),
1558        ];
1559        expected.sort_by(|a, b| a.domain().cmp(b.domain()));
1560
1561        assert_eq!(metadata, expected);
1562
1563        Ok(())
1564    }
1565
1566    #[test]
1567    #[ignore = "log compaction disabled (#2337)"]
1568    fn test_log_compaction_writer() {
1569        let path =
1570            std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
1571        let url = url::Url::from_directory_path(path).unwrap();
1572
1573        let engine = SyncEngine::new();
1574        let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
1575
1576        // Test creating a log compaction writer
1577        let writer = snapshot.clone().log_compaction_writer(0, 1).unwrap();
1578        let path = writer.compaction_path();
1579
1580        // Verify the path format is correct
1581        let expected_filename = "00000000000000000000.00000000000000000001.compacted.json";
1582        assert!(path.to_string().ends_with(expected_filename));
1583
1584        // Test invalid version range (start >= end)
1585        let result = snapshot.clone().log_compaction_writer(2, 1);
1586        assert_result_error_with_message(result, "Invalid version range");
1587
1588        // Test equal version range (also invalid)
1589        let result = snapshot.log_compaction_writer(1, 1);
1590        assert_result_error_with_message(result, "Invalid version range");
1591    }
1592
1593    // TODO(#2337): remove this test when log compaction is re-enabled.
1594    #[test]
1595    fn test_log_compaction_writer_unsupported() {
1596        let path =
1597            std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
1598        let url = url::Url::from_directory_path(path).unwrap();
1599
1600        let engine = SyncEngine::new();
1601        let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
1602
1603        let result = snapshot.log_compaction_writer(0, 1);
1604        assert_result_error_with_message(result, "not currently supported");
1605    }
1606
1607    #[tokio::test]
1608    async fn test_timestamp_with_ict_disabled() -> Result<(), Box<dyn std::error::Error>> {
1609        let store = Arc::new(InMemory::new());
1610        let table_root = "memory://test/";
1611        let engine = SyncEngine::new_with_store(store.clone());
1612
1613        // Create a basic commit without ICT enabled
1614        let commit0 = create_basic_commit(false, None);
1615        add_commit(table_root, store.as_ref(), 0, commit0).await?;
1616
1617        let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
1618
1619        // When ICT is disabled, get_timestamp should return None
1620        let result = snapshot.get_in_commit_timestamp(&engine)?;
1621        assert_eq!(result, None);
1622
1623        Ok(())
1624    }
1625
1626    #[tokio::test]
1627    async fn test_timestamp_with_ict_enablement_timeline() -> Result<(), Box<dyn std::error::Error>>
1628    {
1629        let store = Arc::new(InMemory::new());
1630        let table_root = "memory://test/";
1631        let engine = SyncEngine::new_with_store(store.clone());
1632
1633        // Create initial commit without ICT
1634        let commit0 = create_basic_commit(false, None);
1635        add_commit(table_root, store.as_ref(), 0, commit0).await?;
1636
1637        // Create commit that enables ICT (version 1 = enablement version)
1638        let commit1 =
1639            create_basic_commit(true, Some(("1".to_string(), "1587968586154".to_string())));
1640        add_commit(table_root, store.as_ref(), 1, commit1).await?;
1641
1642        // Create commit with ICT enabled
1643        let expected_timestamp = 1587968586200i64;
1644        let commit2 = format!(
1645            r#"{{"commitInfo":{{"timestamp":1587968586154,"inCommitTimestamp":{expected_timestamp},"operation":"WRITE"}}}}"#,
1646        );
1647        add_commit(table_root, store.as_ref(), 2, commit2.to_string()).await?;
1648
1649        // Read snapshot at version 0 (before ICT enablement)
1650        let snapshot_v0 = Snapshot::builder_for(table_root)
1651            .at_version(0)
1652            .build(&engine)?;
1653        // This snapshot version predates ICT enablement, so ICT is not available
1654        let result_v0 = snapshot_v0.get_in_commit_timestamp(&engine)?;
1655        assert_eq!(result_v0, None);
1656
1657        // Read snapshot at version 2 (after ICT enabled)
1658        let snapshot_v2 = Snapshot::builder_for(table_root)
1659            .at_version(2)
1660            .build(&engine)?;
1661        // When ICT is enabled and available, timestamp() should return inCommitTimestamp
1662        let result_v2 = snapshot_v2.get_in_commit_timestamp(&engine)?;
1663        assert_eq!(result_v2, Some(expected_timestamp));
1664
1665        Ok(())
1666    }
1667
1668    #[tokio::test]
1669    async fn test_get_timestamp_enablement_version_in_future() -> DeltaResult<()> {
1670        // Test invalid state where snapshot has enablement version in the future - should error
1671        let table_root = "memory:///test_table/";
1672        let store = Arc::new(InMemory::new());
1673        let engine = SyncEngine::new_with_store(store.clone());
1674
1675        let commit_data = [
1676            json!({
1677                "protocol": {
1678                    "minReaderVersion": TABLE_FEATURES_MIN_READER_VERSION,
1679                    "minWriterVersion": TABLE_FEATURES_MIN_WRITER_VERSION,
1680                    "readerFeatures": [],
1681                    "writerFeatures": ["inCommitTimestamp"]
1682                }
1683            }),
1684            json!({
1685                "metaData": {
1686                    "id": "test_id2",
1687                    "format": {"provider": "parquet", "options": {}},
1688                    "schemaString": "{\"type\":\"struct\",\"fields\":[]}",
1689                    "partitionColumns": [],
1690                    "configuration": {
1691                        "delta.enableInCommitTimestamps": "true",
1692                        "delta.inCommitTimestampEnablementVersion": "5", // Enablement after version 1
1693                        "delta.inCommitTimestampEnablementTimestamp": "1612345678"
1694                    },
1695                    "createdTime": 1677811175819u64
1696                }
1697            }),
1698        ];
1699        commit(table_root, store.as_ref(), 0, commit_data.to_vec()).await;
1700
1701        // Create commit that predates ICT enablement (no inCommitTimestamp)
1702        let commit_predates = [create_commit_info(1234567890, None)];
1703        commit(table_root, store.as_ref(), 1, commit_predates.to_vec()).await;
1704
1705        let snapshot_predates = Snapshot::builder_for(table_root)
1706            .at_version(1)
1707            .build(&engine)?;
1708        let result_predates = snapshot_predates.get_in_commit_timestamp(&engine);
1709
1710        // Version 1 with enablement at version 5 is invalid - should error
1711        assert_result_error_with_message(
1712            result_predates,
1713            "Invalid state: snapshot at version 1 has ICT enablement version 5 in the future",
1714        );
1715
1716        Ok(())
1717    }
1718
1719    #[tokio::test]
1720    async fn test_get_timestamp_missing_ict_when_enabled() -> DeltaResult<()> {
1721        // Test missing ICT when it should be present - should error
1722        let table_root = "memory:///test_table/";
1723        let store = Arc::new(InMemory::new());
1724        let engine = SyncEngine::new_with_store(store.clone());
1725
1726        let commit_data = [
1727            create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
1728            create_metadata(
1729                Some("test_id"),
1730                Some("{\"type\":\"struct\",\"fields\":[]}"),
1731                Some(1677811175819),
1732                Some(("0".to_string(), "1612345678".to_string())),
1733                false,
1734            ),
1735        ];
1736        commit(table_root, store.as_ref(), 0, commit_data.to_vec()).await; // ICT enabled from version 0
1737
1738        // Create commit without ICT despite being enabled (corrupt case)
1739        let commit_missing_ict = [create_commit_info(1234567890, None)];
1740        commit(table_root, store.as_ref(), 1, commit_missing_ict.to_vec()).await;
1741
1742        let snapshot_missing = Snapshot::builder_for(table_root)
1743            .at_version(1)
1744            .build(&engine)?;
1745        let result = snapshot_missing.get_in_commit_timestamp(&engine);
1746        assert_result_error_with_message(result, "In-Commit Timestamp not found");
1747
1748        Ok(())
1749    }
1750
1751    #[tokio::test]
1752    async fn test_get_timestamp_fails_when_commit_missing() -> DeltaResult<()> {
1753        // When ICT is enabled but commit file is not found in log segment,
1754        // get_in_commit_timestamp should return an error
1755
1756        let url = Url::parse("memory:///")?;
1757        let store = Arc::new(InMemory::new());
1758        let engine = SyncEngine::new_with_store(store.clone());
1759
1760        // Create initial commit with ICT enabled
1761        let commit_data = [
1762            create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
1763            create_metadata(
1764                Some("test_id"),
1765                Some("{\"type\":\"struct\",\"fields\":[]}"),
1766                Some(1677811175819),
1767                Some(("0".to_string(), "1612345678".to_string())), // ICT enabled from version 0
1768                false,
1769            ),
1770        ];
1771        commit(url.as_str(), store.as_ref(), 0, commit_data.to_vec()).await;
1772
1773        // Build snapshot to get table configuration
1774        let snapshot = Snapshot::builder_for(url.as_str())
1775            .at_version(0)
1776            .build(&engine)?;
1777
1778        let snapshot_no_commit = create_snapshot_with_commit_file_absent_from_log_segment(
1779            &url,
1780            snapshot.table_configuration().clone(),
1781        )?;
1782
1783        // Should return an error when commit file is missing
1784        let result = snapshot_no_commit.get_in_commit_timestamp(&engine);
1785        assert_result_error_with_message(result, "Last commit file not found in log segment");
1786
1787        Ok(())
1788    }
1789
1790    #[tokio::test]
1791    async fn test_get_timestamp_with_checkpoint_and_commit_same_version() -> DeltaResult<()> {
1792        // Test the scenario where both checkpoint and commit exist at the same version with ICT
1793        // enabled.
1794        let table_root = "memory:///test_table/";
1795        let store = Arc::new(InMemory::new());
1796        let engine = SyncEngine::new_with_store(store.clone());
1797
1798        // Create 00000000000000000000.json with ICT enabled
1799        let commit0_data = [
1800            create_commit_info(1587968586154, None),
1801            create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
1802            create_metadata(
1803                Some("5fba94ed-9794-4965-ba6e-6ee3c0d22af9"),
1804                Some("{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"),
1805                Some(1587968585495),
1806                Some(("0".to_string(), "1587968586154".to_string())),
1807                false,
1808            ),
1809        ];
1810        commit(table_root, store.as_ref(), 0, commit0_data.to_vec()).await;
1811
1812        // Create 00000000000000000001.checkpoint.parquet
1813        let checkpoint_data = [
1814            create_commit_info(1587968586154, None),
1815            create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
1816            create_metadata(
1817                Some("5fba94ed-9794-4965-ba6e-6ee3c0d22af9"),
1818                Some("{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"),
1819                Some(1587968585495),
1820                Some(("0".to_string(), "1587968586154".to_string())),
1821                false,
1822            ),
1823        ];
1824
1825        let handler = engine.json_handler();
1826        let json_strings: StringArray = checkpoint_data
1827            .into_iter()
1828            .map(|json| json.to_string())
1829            .collect::<Vec<_>>()
1830            .into();
1831        let parsed = handler.parse_json(
1832            string_array_to_engine_data(json_strings),
1833            crate::actions::get_commit_schema().clone(),
1834        )?;
1835        let checkpoint = ArrowEngineData::try_from_engine_data(parsed)?;
1836        let checkpoint: RecordBatch = checkpoint.into();
1837
1838        let mut buffer = vec![];
1839        let mut writer = ArrowWriter::try_new(&mut buffer, checkpoint.schema(), None)?;
1840        writer.write(&checkpoint)?;
1841        writer.close()?;
1842
1843        let checkpoint_path = delta_path_for_version(1, "checkpoint.parquet");
1844        store.put(&checkpoint_path, buffer.into()).await?;
1845
1846        // Create 00000000000000000001.json with ICT
1847        let expected_ict = 1587968586200i64;
1848        let commit1_data = [create_commit_info(1587968586200, Some(expected_ict))];
1849        commit(table_root, store.as_ref(), 1, commit1_data.to_vec()).await;
1850
1851        // Build snapshot - LogSegment will filter out the commit file because checkpoint exists at
1852        // same version
1853        let snapshot = Snapshot::builder_for(table_root)
1854            .at_version(1)
1855            .build(&engine)?;
1856
1857        // We should successfully read ICT by falling back to storage
1858        let timestamp = snapshot.get_in_commit_timestamp(&engine)?;
1859        assert_eq!(timestamp, Some(expected_ict));
1860
1861        Ok(())
1862    }
1863
1864    #[rstest]
1865    #[case::ict_disabled(false)]
1866    #[case::ict_enabled(true)]
1867    fn test_get_timestamp_returns_valid_timestamp(#[case] ict_enabled: bool) -> DeltaResult<()> {
1868        let temp_dir = tempfile::tempdir().unwrap();
1869        let table_path = Url::from_directory_path(temp_dir.path())
1870            .unwrap()
1871            .to_string();
1872        let engine = SyncEngine::new();
1873
1874        let schema = Arc::new(StructType::try_new(vec![StructField::new(
1875            "id",
1876            DataType::INTEGER,
1877            true,
1878        )])?);
1879
1880        let mut create_table_builder = create_table(&table_path, schema, "Test/1.0");
1881        if ict_enabled {
1882            create_table_builder = create_table_builder
1883                .with_table_properties(vec![(ENABLE_IN_COMMIT_TIMESTAMPS, "true")]);
1884        }
1885
1886        let _ = create_table_builder
1887            .build(&engine, Box::new(FileSystemCommitter::new()))?
1888            .commit(&engine)?;
1889
1890        let snapshot = Snapshot::builder_for(&table_path).build(&engine)?;
1891        let ts = snapshot.get_timestamp(&engine)?;
1892        let now_ms = chrono::Utc::now().timestamp_millis();
1893        let two_days_ms = 2 * 24 * 60 * 60 * 1000_i64;
1894        assert!(
1895            (now_ms - two_days_ms..=now_ms).contains(&ts),
1896            "timestamp {ts} not within 2 days of now ({now_ms})"
1897        );
1898
1899        if ict_enabled {
1900            let ict_ts = snapshot.get_in_commit_timestamp(&engine)?.unwrap();
1901            assert_eq!(ts, ict_ts);
1902        }
1903        Ok(())
1904    }
1905
1906    #[rstest]
1907    #[case::ict_enabled(true)]
1908    #[case::ict_disabled(false)]
1909    #[tokio::test]
1910    async fn test_get_timestamp_errors_when_commit_file_missing(
1911        #[case] ict_enabled: bool,
1912    ) -> DeltaResult<()> {
1913        let url = Url::parse("memory:///")?;
1914        let store = Arc::new(InMemory::new());
1915        let engine = SyncEngine::new_with_store(store.clone());
1916
1917        // TODO: refactor `ict_config` from a raw tuple to a dedicated ICTConfig struct so the
1918        // enablement version and enablement timestamp fields are named and self-documenting.
1919        // The ict_config tuple is (inCommitTimestampEnablementVersion,
1920        // inCommitTimestampEnablementTimestamp): if ICT is enabled, the enablement version
1921        // is 0 with an arbitrary enablement timestamp.
1922        let ict_config = ict_enabled.then(|| ("0".to_string(), "1612345678".to_string()));
1923        let reader_version = ict_enabled.then_some(TABLE_FEATURES_MIN_READER_VERSION as u32);
1924
1925        let mut commit_data = vec![];
1926        // When ICT is enabled, commitInfo must be the first action (protocol requirement)
1927        if ict_enabled {
1928            commit_data.push(create_commit_info(1677811175819, Some(1677811175999)));
1929        }
1930        commit_data.extend([
1931            create_protocol(ict_enabled, reader_version),
1932            create_metadata(
1933                Some("test_id"),
1934                Some("{\"type\":\"struct\",\"fields\":[]}"),
1935                Some(1677811175819),
1936                ict_config,
1937                false,
1938            ),
1939        ]);
1940        commit(url.as_str(), store.as_ref(), 0, commit_data).await;
1941
1942        let snapshot = Snapshot::builder_for(url.as_str())
1943            .at_version(0)
1944            .build(&engine)?;
1945
1946        let snapshot_no_commit = create_snapshot_with_commit_file_absent_from_log_segment(
1947            &url,
1948            snapshot.table_configuration().clone(),
1949        )?;
1950
1951        let result = snapshot_no_commit.get_timestamp(&engine);
1952        assert_result_error_with_message(result, "Last commit file not found in log segment");
1953
1954        Ok(())
1955    }
1956
1957    #[tokio::test]
1958    async fn test_get_timestamp_errors_when_ict_missing_from_commit_info() -> DeltaResult<()> {
1959        // ICT is enabled and commit file IS present in the log segment, but the commitInfo
1960        // action does not carry an inCommitTimestamp value (corrupt/incomplete commit).
1961        let store = Arc::new(InMemory::new());
1962        let table_root = "memory:///test_table/";
1963        let engine = SyncEngine::new_with_store(store.clone());
1964
1965        let commit0_data = vec![
1966            create_commit_info(1677811175819, None), // commitInfo without inCommitTimestamp
1967            create_protocol(true, Some(TABLE_FEATURES_MIN_READER_VERSION as u32)),
1968            create_metadata(
1969                Some("test_id"),
1970                Some("{\"type\":\"struct\",\"fields\":[]}"),
1971                Some(1677811175819),
1972                Some(("0".to_string(), "1612345678".to_string())), /* ict enabled at version 0, and an arbitrary timestamp */
1973                false,
1974            ),
1975        ];
1976        commit(table_root, store.as_ref(), 0, commit0_data).await;
1977
1978        let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
1979        let result = snapshot.get_timestamp(&engine);
1980        assert_result_error_with_message(result, "In-Commit Timestamp not found in commit file");
1981
1982        Ok(())
1983    }
1984
1985    // Verifies the test_context! macro works from kernel/src/ unit tests
1986    // (crosses the crate type boundary via macro expansion). The kernel can't construct a
1987    // DefaultEngine here, so we pass a SyncEngine factory.
1988    #[test]
1989    fn test_context_macro_works_in_unit_test() {
1990        let (_engine, snap, _table) = test_utils::test_context!(
1991            LogState::with_latest_version(2),
1992            FeatureSet::empty(),
1993            unpartitioned(),
1994            checkpoint_json_stats(),
1995            VersionTarget::Latest,
1996            SyncEngine::new_with_store
1997        );
1998        assert_eq!(snap.version(), 2);
1999    }
2000
2001    #[test]
2002    fn test_new_post_commit_simple() {
2003        // GIVEN
2004        let path = std::fs::canonicalize(PathBuf::from("./tests/data/basic_partitioned/")).unwrap();
2005        let url = url::Url::from_directory_path(path).unwrap();
2006        let engine = SyncEngine::new();
2007        let base_snapshot = Snapshot::builder_for(url.clone()).build(&engine).unwrap();
2008        let next_version = base_snapshot.version() + 1;
2009
2010        // WHEN
2011        let fake_new_commit = ParsedLogPath::create_parsed_published_commit(&url, next_version);
2012        let post_commit_snapshot = base_snapshot
2013            .new_post_commit(fake_new_commit, CrcDelta::default())
2014            .unwrap();
2015
2016        // THEN
2017        assert_eq!(post_commit_snapshot.version(), next_version);
2018        assert_eq!(post_commit_snapshot.log_segment().end_version, next_version);
2019    }
2020
2021    #[test]
2022    fn test_get_protocol_derived_properties() {
2023        let path =
2024            std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
2025        let url = url::Url::from_directory_path(path).unwrap();
2026
2027        let engine = SyncEngine::new();
2028        let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
2029
2030        let props = snapshot.get_protocol_derived_properties();
2031        assert_eq!(
2032            props.get("delta.minReaderVersion").unwrap(),
2033            &TABLE_FEATURES_MIN_READER_VERSION.to_string()
2034        );
2035        assert_eq!(
2036            props.get("delta.minWriterVersion").unwrap(),
2037            &TABLE_FEATURES_MIN_WRITER_VERSION.to_string()
2038        );
2039        assert_eq!(
2040            props.get("delta.feature.deletionVectors").unwrap(),
2041            "supported"
2042        );
2043    }
2044
2045    #[tokio::test]
2046    async fn test_metadata_configuration() {
2047        let storage = Arc::new(InMemory::new());
2048        let table_root = "memory:///";
2049        let engine = SyncEngine::new_with_store(storage.clone());
2050
2051        // Create a commit with custom configuration
2052        let actions = vec![
2053            json!({"commitInfo": {"timestamp": 123, "operation": "CREATE TABLE"}}),
2054            json!({"protocol": {
2055                "minReaderVersion": 3,
2056                "minWriterVersion": 7,
2057                "readerFeatures": [],
2058                "writerFeatures": []
2059            }}),
2060            json!({"metaData": {
2061                "id": "test-id",
2062                "format": {"provider": "parquet", "options": {}},
2063                "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}",
2064                "partitionColumns": [],
2065                "configuration": {
2066                    "io.unitycatalog.tableId": "abc-123",
2067                    "myapp.setting": "value"
2068                },
2069                "createdTime": 1234567890
2070            }}),
2071        ];
2072        commit(table_root, &storage, 0, actions).await;
2073
2074        let snapshot = Snapshot::builder_for(table_root).build(&engine).unwrap();
2075        let config = snapshot.metadata_configuration();
2076        assert_eq!(
2077            config.get("io.unitycatalog.tableId"),
2078            Some(&"abc-123".to_string())
2079        );
2080        assert_eq!(config.get("myapp.setting"), Some(&"value".to_string()));
2081    }
2082
2083    #[rstest::rstest]
2084    #[case::no_clustering(None, None, None)]
2085    #[case::clustered_no_column_mapping(
2086        Some(vec!["region"]),
2087        None,
2088        Some(vec![ColumnName::new(["region"])])
2089    )]
2090    #[case::clustered_with_column_mapping(
2091        Some(vec!["region"]),
2092        Some("name"),
2093        Some(vec![ColumnName::new(["region"])])
2094    )]
2095    fn test_get_logical_clustering_columns(
2096        #[case] clustering_cols: Option<Vec<&str>>,
2097        #[case] column_mapping_mode: Option<&str>,
2098        #[case] expected: Option<Vec<ColumnName>>,
2099    ) {
2100        use crate::transaction::create_table::create_table;
2101        use crate::transaction::data_layout::DataLayout;
2102
2103        let storage = Arc::new(InMemory::new());
2104        let engine = SyncEngine::new_with_store(storage);
2105        let schema = Arc::new(
2106            crate::schema::StructType::try_new(vec![
2107                crate::schema::StructField::new("id", crate::schema::DataType::INTEGER, true),
2108                crate::schema::StructField::new("region", crate::schema::DataType::STRING, true),
2109            ])
2110            .unwrap(),
2111        );
2112        let mut builder = create_table("memory:///", schema, "test");
2113        if let Some(cols) = &clustering_cols {
2114            builder = builder.with_data_layout(DataLayout::clustered(cols.clone()));
2115        }
2116        if let Some(mode) = column_mapping_mode {
2117            builder = builder.with_table_properties([("delta.columnMapping.mode", mode)]);
2118        }
2119        let _ = builder
2120            .build(
2121                &engine,
2122                Box::new(crate::committer::FileSystemCommitter::new()),
2123            )
2124            .unwrap()
2125            .commit(&engine)
2126            .unwrap();
2127        let snapshot = Snapshot::builder_for("memory:///").build(&engine).unwrap();
2128        let result = snapshot.get_logical_clustering_columns(&engine).unwrap();
2129        assert_eq!(result, expected);
2130    }
2131
2132    // === estimated_owned_heap_size ===
2133    /// Test that the estimated_owned_heap_size is correctly considering normal commit jsons,
2134    /// checkpoint parts, and log compaction files.
2135    #[test]
2136    fn estimated_owned_heap_size_on_table_with_many_log_files() {
2137        // Baseline: 101 commits (v0..=v100), no checkpoint, no compactions.
2138        let (_engine, baseline_snap, _table) = test_utils::test_context!(
2139            LogState::with_latest_version(100),
2140            FeatureSet::empty(),
2141            unpartitioned(),
2142            checkpoint_json_stats(),
2143            VersionTarget::Latest,
2144            SyncEngine::new_with_store
2145        );
2146
2147        let baseline_heap = baseline_snap.estimated_owned_heap_size_bytes();
2148        let struct_size = size_of::<Snapshot>();
2149        // Heap size should be at least 5 times the stack size, to account for
2150        // the 100 commits file metadata.
2151        assert!(
2152            baseline_heap > 5 * struct_size,
2153            "baseline heap {baseline_heap} should exceed 5 * sizeof(Snapshot)={}",
2154            5 * struct_size
2155        );
2156
2157        // 100 extra checkpoint parts: each contributes to heap.
2158        // Kernel doesn't yet support writing multi-part checkpoints, so we manually add them here.
2159        let snap_extra_checkpoints =
2160            snapshot_with_extra_files(&baseline_snap, |listed, log_root| {
2161                for i in 1..=100u32 {
2162                    let filename =
2163                        format!("00000000000000000099.checkpoint.{i:010}.0000000100.parquet");
2164                    let location = log_root.join(&filename).unwrap();
2165                    let part = ParsedLogPath::try_from(crate::FileMeta {
2166                        location,
2167                        last_modified: 0,
2168                        size: 100,
2169                    })
2170                    .unwrap()
2171                    .unwrap();
2172                    listed.checkpoint_parts.push(part);
2173                }
2174            });
2175        let delta_checkpoints =
2176            snap_extra_checkpoints.estimated_owned_heap_size_bytes() - baseline_heap;
2177        assert!(
2178            delta_checkpoints >= 15_000,
2179            "delta_checkpoints {delta_checkpoints} should be >= 15_000 for 100 checkpoint parts"
2180        );
2181
2182        // 100 extra log compaction files: each contributes to heap.
2183        // Kernel disables writing log compaction files currently, so we manually add them here.
2184        let snap_extra_compactions =
2185            snapshot_with_extra_files(&baseline_snap, |listed, log_root| {
2186                for i in 0..100u64 {
2187                    let start = i * 10;
2188                    let end = start + 5;
2189                    let filename = format!("{start:020}.{end:020}.compacted.json");
2190                    let location = log_root.join(&filename).unwrap();
2191                    let comp = ParsedLogPath::try_from(crate::FileMeta {
2192                        location,
2193                        last_modified: 0,
2194                        size: 100,
2195                    })
2196                    .unwrap()
2197                    .unwrap();
2198                    listed.ascending_compaction_files.push(comp);
2199                }
2200            });
2201        let delta_compactions =
2202            snap_extra_compactions.estimated_owned_heap_size_bytes() - baseline_heap;
2203        assert!(
2204            delta_compactions >= 15_000,
2205            "delta_compactions {delta_compactions} should be >= 15_000 for 100 compaction files"
2206        );
2207    }
2208
2209    /// Two tables that differ only in schema width: the wider schema should bump
2210    /// estimated_owned_heap_size by approximately the schemaString delta.
2211    #[test]
2212    fn estimated_owned_heap_size_reflects_schema_string() {
2213        fn snap_with_schema(schema: SchemaRef) -> SnapshotRef {
2214            let store = Arc::new(InMemory::new());
2215            let engine = SyncEngine::new_with_store(store);
2216            create_table("memory:///", schema, "test")
2217                .build(&engine, Box::new(FileSystemCommitter::new()))
2218                .unwrap()
2219                .commit(&engine)
2220                .unwrap()
2221                .unwrap_committed();
2222            Snapshot::builder_for("memory:///").build(&engine).unwrap()
2223        }
2224
2225        let small_schema = Arc::new(
2226            StructType::try_new(vec![StructField::nullable("a", DataType::INTEGER)]).unwrap(),
2227        );
2228        let wide_schema = Arc::new(
2229            StructType::try_new(
2230                (0..50)
2231                    .map(|i| StructField::nullable(format!("field_{i:03}"), DataType::STRING))
2232                    .collect::<Vec<_>>(),
2233            )
2234            .unwrap(),
2235        );
2236
2237        let snap_small = snap_with_schema(small_schema);
2238        let snap_wide = snap_with_schema(wide_schema);
2239
2240        let schema_str_delta = snap_wide
2241            .table_configuration()
2242            .metadata()
2243            .schema_string()
2244            .capacity()
2245            - snap_small
2246                .table_configuration()
2247                .metadata()
2248                .schema_string()
2249                .capacity();
2250        let heap_delta = snap_wide.estimated_owned_heap_size_bytes()
2251            - snap_small.estimated_owned_heap_size_bytes();
2252        // Tables differ only in schemaString, so heap_delta should be approximately the
2253        // schema_str_delta.
2254        let ratio = heap_delta as f64 / schema_str_delta as f64;
2255        assert!(
2256            (0.8..=1.2).contains(&ratio),
2257            "heap_delta {heap_delta} should be within 20% of schema_str_delta {schema_str_delta} (ratio = {ratio:.3})"
2258        );
2259    }
2260
2261    #[test]
2262    fn estimated_owned_heap_size_for_version_zero() {
2263        let table = TestTableBuilder::new().build().unwrap();
2264        let engine = SyncEngine::new_with_store(table.store().clone());
2265        let snapshot = Snapshot::builder_for(table.table_root())
2266            .build(&engine)
2267            .unwrap();
2268
2269        let heap = snapshot.estimated_owned_heap_size_bytes();
2270        assert!(heap > 0, "heap size should be nonzero");
2271        assert!(
2272            heap < 10000,
2273            "heap size {heap} unexpectedly large for v0 snapshot"
2274        );
2275    }
2276
2277    fn snapshot_with_extra_files<F>(baseline: &SnapshotRef, mutate: F) -> Snapshot
2278    where
2279        F: FnOnce(&mut LogSegmentFiles, &Url),
2280    {
2281        let mut new_log_segment = baseline.log_segment().clone();
2282        mutate(&mut new_log_segment.listed, &new_log_segment.log_root);
2283        Snapshot::new(new_log_segment, baseline.table_configuration().clone()).unwrap()
2284    }
2285}