Skip to main content

buoyant_kernel/log_segment/
mod.rs

1//! Represents a segment of a delta log. [`LogSegment`] wraps a set of checkpoint and commit
2//! files.
3use std::num::NonZero;
4use std::sync::{Arc, LazyLock};
5
6use delta_kernel_derive::internal_api;
7use itertools::Itertools;
8use tracing::{debug, info, instrument, warn};
9use url::Url;
10
11use crate::actions::visitors::SidecarVisitor;
12use crate::actions::{
13    get_log_add_schema, schema_contains_file_actions, Sidecar, DOMAIN_METADATA_NAME, METADATA_NAME,
14    PROTOCOL_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME,
15};
16use crate::committer::CatalogCommit;
17use crate::expressions::ColumnName;
18use crate::last_checkpoint_hint::{LastCheckpointHint, LastCheckpointHintSummary};
19use crate::log_reader::commit::CommitReader;
20use crate::log_replay::ActionsBatch;
21#[internal_api]
22use crate::log_segment_files::LogSegmentFiles;
23use crate::metrics::MetricId;
24use crate::path::LogPathFileType::*;
25use crate::path::{LogPathFileType, ParsedLogPath};
26use crate::schema::compare::SchemaComparison;
27use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _};
28use crate::utils::require;
29use crate::{
30    DeltaResult, Engine, Error, Expression, FileMeta, Predicate, PredicateRef, RowVisitor,
31    StorageHandler, Version,
32};
33
34mod domain_metadata_replay;
35mod protocol_metadata_replay;
36
37pub(crate) use domain_metadata_replay::DomainMetadataMap;
38
39#[cfg(test)]
40mod crc_tests;
41#[cfg(test)]
42mod tests;
43
44/// Information about checkpoint reading for data skipping optimization.
45///
46/// Returned alongside the actions iterator from checkpoint reading functions.
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48#[internal_api]
49pub(crate) struct CheckpointReadInfo {
50    /// Whether the checkpoint has compatible pre-parsed stats for data skipping.
51    /// When `true`, checkpoint batches can use stats_parsed directly instead of parsing JSON.
52    #[allow(unused)]
53    pub has_stats_parsed: bool,
54    /// Whether the checkpoint has compatible pre-parsed partition values.
55    /// When `true`, checkpoint batches can read typed partition values directly from
56    /// `partitionValues_parsed` instead of parsing strings from `partitionValues`.
57    #[serde(default)]
58    #[allow(unused)]
59    pub has_partition_values_parsed: bool,
60    /// The schema used to read checkpoint files, potentially including stats_parsed.
61    #[allow(unused)]
62    pub checkpoint_read_schema: SchemaRef,
63}
64
65impl CheckpointReadInfo {
66    /// Create a CheckpointReadInfo configured to read checkpoints without using stats_parsed.
67    /// This is the standard configuration when stats_parsed optimization is not available.
68    #[allow(unused)]
69    pub(crate) fn without_stats_parsed() -> Self {
70        Self {
71            has_stats_parsed: false,
72            has_partition_values_parsed: false,
73            checkpoint_read_schema: get_log_add_schema().clone(),
74        }
75    }
76}
77
78/// Result of reading actions from a log segment, containing both the actions iterator
79/// and checkpoint metadata.
80///
81/// This struct provides named access to the return values instead of tuple indexing.
82#[internal_api]
83pub(crate) struct ActionsWithCheckpointInfo<A: Iterator<Item = DeltaResult<ActionsBatch>>> {
84    /// Iterator over action batches read from the log segment.
85    pub actions: A,
86    /// Metadata about checkpoint reading, including the schema used.
87    #[allow(unused)]
88    pub checkpoint_info: CheckpointReadInfo,
89}
90
91/// A [`LogSegment`] represents a contiguous section of the log and is made of checkpoint files
92/// and commit files and guarantees the following:
93///     1. Commit file versions will not have any gaps between them.
94///     2. If checkpoint(s) is/are present in the range, only commits with versions greater than the
95///        most recent checkpoint version are retained. There will not be a gap between the
96///        checkpoint version and the first commit version.
97///     3. All checkpoint_parts must belong to the same checkpoint version, and must form a complete
98///        version. Multi-part checkpoints must have all their parts.
99///
100/// [`LogSegment`] is used in [`Snapshot`] when built with [`LogSegment::for_snapshot`], and
101/// in `TableChanges` when built with [`LogSegment::for_table_changes`].
102///
103/// [`Snapshot`]: crate::snapshot::Snapshot
104#[derive(Debug, Clone, PartialEq, Eq)]
105#[internal_api]
106pub(crate) struct LogSegment {
107    pub end_version: Version,
108    pub checkpoint_version: Option<Version>,
109    pub log_root: Url,
110    /// The set of log files found during listing.
111    pub listed: LogSegmentFiles,
112
113    /// Metadata from the `_last_checkpoint` hint file.
114    ///
115    /// Note: This is only populated if the hint file was read during creation of this
116    /// log segment. The hint may describe a different checkpoint version than the one in this
117    /// segment. Callers should use explicit getters (such as [`Self::checkpoint_schema`]) rather
118    /// than reading this field directly.
119    last_checkpoint_metadata: Option<LastCheckpointHintSummary>,
120}
121
122/// Returns the identifying leaf column path for a known action type, used to build IS NOT NULL
123/// predicates that enable row group skipping in checkpoint parquet files.
124///
125/// For `txn`, this is effective because all app ids end up in a single checkpoint part when
126/// partitioned by `add.path` as the Delta spec requires. Filtering by a specific app id is not
127/// worthwhile since all app ids share one part with a large min/max range (typically UUIDs).
128fn action_identifying_column(action_name: &str) -> Option<ColumnName> {
129    match action_name {
130        METADATA_NAME => Some(ColumnName::new([METADATA_NAME, "id"])),
131        PROTOCOL_NAME => Some(ColumnName::new([PROTOCOL_NAME, "minReaderVersion"])),
132        SET_TRANSACTION_NAME => Some(ColumnName::new([SET_TRANSACTION_NAME, "appId"])),
133        DOMAIN_METADATA_NAME => Some(ColumnName::new([DOMAIN_METADATA_NAME, "domain"])),
134        _ => None,
135    }
136}
137
138/// Builds an IS NOT NULL predicate for row group skipping based on the action types in `schema`.
139/// Returns `None` if any top-level field in the schema is not a recognized action type, since
140/// an unknown type could have non-null rows in the same row group, making skipping unsafe.
141fn schema_to_is_not_null_predicate(schema: &StructType) -> Option<PredicateRef> {
142    // Collect identifying columns for every field; short-circuit to None on any unknown field.
143    let columns: Vec<ColumnName> = schema
144        .fields()
145        .map(|f| action_identifying_column(f.name()))
146        .collect::<Option<_>>()?;
147    let mut predicates = columns
148        .into_iter()
149        .map(|col| Expression::column(col).is_not_null());
150    let first = predicates.next()?;
151    Some(Arc::new(predicates.fold(first, Predicate::or)))
152}
153
154impl LogSegment {
155    /// Creates a LogSegment for a newly created table at version 0 from a single commit file.
156    ///
157    /// Normal log segments are built by listing files from storage and replaying them. For CREATE
158    /// TABLE, the table has no prior log. We construct the segment directly from the just created
159    /// commit file.
160    ///
161    /// # Errors
162    ///
163    /// Returns an `internal_error` if `commit_file` is not version 0 or not a commit file type.
164    pub(crate) fn new_for_version_zero(
165        log_root: Url,
166        commit_file: ParsedLogPath,
167    ) -> DeltaResult<Self> {
168        require!(
169            commit_file.version == 0,
170            crate::Error::internal_error(format!(
171                "new_for_version_zero called with version {}",
172                commit_file.version
173            ))
174        );
175        require!(
176            commit_file.is_commit(),
177            crate::Error::internal_error(format!(
178                "new_for_version_zero called with non-commit file type: {:?}",
179                commit_file.file_type
180            ))
181        );
182        Ok(Self {
183            end_version: commit_file.version,
184            checkpoint_version: None,
185            log_root,
186            last_checkpoint_metadata: None,
187            listed: LogSegmentFiles {
188                max_published_version: Some(commit_file.version),
189                latest_commit_file: Some(commit_file.clone()),
190                ascending_commit_files: vec![commit_file],
191                ..Default::default()
192            },
193        })
194    }
195
196    #[internal_api]
197    pub(crate) fn try_new(
198        mut listed_files: LogSegmentFiles,
199        log_root: Url,
200        end_version: Option<Version>,
201        last_checkpoint_metadata: Option<LastCheckpointHintSummary>,
202    ) -> DeltaResult<Self> {
203        validate_compaction_files(&listed_files.ascending_compaction_files)?;
204        validate_checkpoint_parts(&listed_files.checkpoint_parts)?;
205        validate_commit_file_types(&listed_files.ascending_commit_files)?;
206        validate_commit_files_contiguous(&listed_files.ascending_commit_files)?;
207
208        // Filter commits before/at checkpoint version
209        let checkpoint_version =
210            if let Some(checkpoint_file) = listed_files.checkpoint_parts.first() {
211                let version = checkpoint_file.version;
212                listed_files
213                    .ascending_commit_files
214                    .retain(|log_path| version < log_path.version);
215                Some(version)
216            } else {
217                None
218            };
219
220        validate_checkpoint_commit_gap(checkpoint_version, &listed_files.ascending_commit_files)?;
221        let effective_version = validate_end_version(
222            &listed_files.ascending_commit_files,
223            &listed_files.checkpoint_parts,
224            end_version,
225        )?;
226        // NOTE: This validation was added at `cc32593ce3040b4c6c9c16f1bf16ff57e42a5fde` which
227        // breaks `test_validate_listed_log_file_in_order_compaction_files` in buoyant_kernel
228        // presumably because upstream has disabled all the log compaction tests while
229        // buoyant_kernel has kept them in place.
230        //validate_latest_commit_file(&listed_files, effective_version)?;
231
232        let log_segment = LogSegment {
233            end_version: effective_version,
234            checkpoint_version,
235            log_root,
236            last_checkpoint_metadata,
237            listed: listed_files,
238        };
239
240        info!(segment = %log_segment.summary());
241
242        Ok(log_segment)
243    }
244
245    /// Returns the checkpoint version from the `_last_checkpoint` hint
246    pub(crate) fn last_checkpoint_version(&self) -> Option<Version> {
247        self.last_checkpoint_metadata.as_ref().map(|m| m.version)
248    }
249
250    /// Returns the checkpoint schema from the `_last_checkpoint` hint when it is safe to use for
251    /// this segment's checkpoint parquet.
252    ///
253    /// Returns `None` if there is no hint, if the hint omitted `checkpointSchema`, if this segment
254    /// has no checkpoint on disk, or if the hint's checkpoint version does not match this segment's
255    /// checkpoint version.
256    pub(crate) fn checkpoint_schema(&self) -> Option<SchemaRef> {
257        let m = self.last_checkpoint_metadata.as_ref()?;
258        if Some(m.version) != self.checkpoint_version {
259            return None;
260        }
261        m.schema.clone()
262    }
263
264    /// Returns a copy of the stored `_last_checkpoint` hint summary.
265    ///
266    /// Prefer [`Self::checkpoint_schema`] or [`Self::last_checkpoint_version`] when requiring
267    /// individual values from the hint.
268    pub(crate) fn last_checkpoint_hint_summary(&self) -> Option<LastCheckpointHintSummary> {
269        self.last_checkpoint_metadata.clone()
270    }
271
272    /// Succinct summary string for logging purposes.
273    fn summary(&self) -> String {
274        format!(
275            "{{v={}, commits={}, checkpoint_v={}, checkpoint_parts={}, compactions={}, crc_v={}, max_pub_v={}}}",
276            self.end_version,
277            self.listed.ascending_commit_files.len(),
278            self.checkpoint_version
279                .map(|v| v.to_string())
280                .unwrap_or_else(|| "none".into()),
281            self.listed.checkpoint_parts.len(),
282            self.listed.ascending_compaction_files.len(),
283            self.listed.latest_crc_file
284                .as_ref()
285                .map(|f| f.version.to_string())
286                .unwrap_or_else(|| "none".into()),
287            self.listed.max_published_version
288                .map(|v| v.to_string())
289                .unwrap_or_else(|| "none".into()),
290        )
291    }
292
293    /// Constructs a [`LogSegment`] to be used for [`Snapshot`]. For a `Snapshot` at version `n`:
294    /// Its LogSegment is made of zero or one checkpoint, and all commits between the checkpoint up
295    /// to and including the end version `n`. Note that a checkpoint may be made of multiple
296    /// parts. All these parts will have the same checkpoint version.
297    ///
298    /// The options for constructing a LogSegment for Snapshot are as follows:
299    /// - `checkpoint_hint`: a `LastCheckpointHint` to start the log segment from (e.g. from reading
300    ///   the `last_checkpoint` file).
301    /// - `time_travel_version`: The version of the log that the Snapshot will be at.
302    ///
303    /// [`Snapshot`]: crate::snapshot::Snapshot
304    ///
305    /// Reports metrics: `LogSegmentLoaded`.
306    // Span name must match `SEGMENT_FOR_SNAPSHOT_SPAN` in `metrics::reporter`.
307    #[instrument(
308        name = "segment.for_snapshot",
309        err,
310        skip(storage, time_travel_version),
311        fields(report, operation_id = %operation_id, num_commit_files, num_checkpoint_files, num_compaction_files)
312    )]
313    #[internal_api]
314    pub(crate) fn for_snapshot(
315        storage: &dyn StorageHandler,
316        log_root: Url,
317        log_tail: Vec<ParsedLogPath>,
318        time_travel_version: impl Into<Option<Version>>,
319        operation_id: MetricId,
320    ) -> DeltaResult<Self> {
321        let time_travel_version = time_travel_version.into();
322        let checkpoint_hint = LastCheckpointHint::try_read(storage, &log_root)?;
323        let result = Self::for_snapshot_impl(
324            storage,
325            log_root,
326            log_tail,
327            checkpoint_hint,
328            time_travel_version,
329        );
330
331        match result {
332            Ok(log_segment) => {
333                tracing::Span::current().record(
334                    "num_commit_files",
335                    log_segment.listed.ascending_commit_files.len() as u64,
336                );
337                tracing::Span::current().record(
338                    "num_checkpoint_files",
339                    log_segment.listed.checkpoint_parts.len() as u64,
340                );
341                tracing::Span::current().record(
342                    "num_compaction_files",
343                    log_segment.listed.ascending_compaction_files.len() as u64,
344                );
345                Ok(log_segment)
346            }
347            Err(e) => Err(e),
348        }
349    }
350
351    // factored out for testing
352    pub(crate) fn for_snapshot_impl(
353        storage: &dyn StorageHandler,
354        log_root: Url,
355        log_tail: Vec<ParsedLogPath>,
356        checkpoint_hint: Option<LastCheckpointHint>,
357        time_travel_version: Option<Version>,
358    ) -> DeltaResult<Self> {
359        let last_checkpoint_summary =
360            checkpoint_hint
361                .as_ref()
362                .map(|hint| LastCheckpointHintSummary {
363                    version: hint.version,
364                    schema: hint.checkpoint_schema.clone(),
365                });
366
367        // The end_version is the time_travel_version, if present
368        // TODO: When max catalog version is implemented, we would use that as end_version if
369        // time_travel_version is not present
370        let end_version = time_travel_version;
371
372        // Keep the hint only if it points at or before end_version, or if there is no end_version
373        // bound
374        let usable_hint = checkpoint_hint.filter(|cp| end_version.is_none_or(|v| cp.version <= v));
375
376        // Cases:
377        //
378        // 1. usable_hint present, end_version is Some  --> list_with_checkpoint_hint from
379        //    hint.version TO end_version
380        // 2. usable_hint present, end_version is None  --> list_with_checkpoint_hint from
381        //    hint.version unbounded
382        // 3. no usable_hint,      end_version is Some  --> backward-scan for checkpoint before
383        //    end_version, list from that checkpoint TO end_version (falls back to v0 if no
384        //    checkpoint found)
385        // 4. no usable_hint,      end_version is None  --> list from v0 unbounded
386
387        let listed_files = match (usable_hint, end_version) {
388            // Cases 1 and 2
389            (Some(cp), end_version) => LogSegmentFiles::list_with_checkpoint_hint(
390                &cp,
391                storage,
392                &log_root,
393                log_tail,
394                end_version,
395            )?,
396            // Case 3
397            (None, Some(end)) => LogSegmentFiles::list_with_backward_checkpoint_scan(
398                storage, &log_root, log_tail, end,
399            )?,
400            // Case 4
401            (None, None) => LogSegmentFiles::list(storage, &log_root, log_tail, None, None)?,
402        };
403
404        LogSegment::try_new(
405            listed_files,
406            log_root,
407            time_travel_version,
408            last_checkpoint_summary,
409        )
410    }
411
412    /// Constructs a [`LogSegment`] to be used for `TableChanges`. For a TableChanges between
413    /// versions `start_version` and `end_version`: Its LogSegment is made of zero checkpoints
414    /// and all commits between versions `start_version` (inclusive) and `end_version`
415    /// (inclusive). If no `end_version` is specified it will be the most recent version by
416    /// default.
417    #[internal_api]
418    pub(crate) fn for_table_changes(
419        storage: &dyn StorageHandler,
420        log_root: Url,
421        start_version: Version,
422        end_version: impl Into<Option<Version>>,
423    ) -> DeltaResult<Self> {
424        let end_version = end_version.into();
425        if let Some(end_version) = end_version {
426            if start_version > end_version {
427                return Err(Error::generic(
428                    "Failed to build LogSegment: start_version cannot be greater than end_version",
429                ));
430            }
431        }
432
433        // TODO: compactions?
434        let listed_files =
435            LogSegmentFiles::list_commits(storage, &log_root, Some(start_version), end_version)?;
436        // - Here check that the start version is correct.
437        // - [`LogSegment::try_new`] will verify that the `end_version` is correct if present.
438        // - [`LogSegmentFiles::list_commits`] also checks that there are no gaps between commits.
439        // If all three are satisfied, this implies that all the desired commits are present.
440        require!(
441            listed_files
442                .ascending_commit_files()
443                .first()
444                .is_some_and(|first_commit| first_commit.version == start_version),
445            Error::generic(format!(
446                "Expected the first commit to have version {start_version}, got {:?}",
447                listed_files
448                    .ascending_commit_files()
449                    .first()
450                    .map(|c| c.version)
451            ))
452        );
453        LogSegment::try_new(listed_files, log_root, end_version, None)
454    }
455
456    #[allow(unused)]
457    /// Constructs a [`LogSegment`] to be used for timestamp conversion. This [`LogSegment`] will
458    /// consist only of contiguous commit files up to `end_version` (inclusive). If present,
459    /// `limit` specifies the maximum length of the returned log segment. The log segment may be
460    /// shorter than `limit` if there are missing commits.
461    // This lists all files starting from `end-limit` if `limit` is defined. For large tables,
462    // listing with a `limit` can be a significant speedup over listing _all_ the files in the log.
463    pub(crate) fn for_timestamp_conversion(
464        storage: &dyn StorageHandler,
465        log_root: Url,
466        end_version: Version,
467        limit: Option<NonZero<usize>>,
468    ) -> DeltaResult<Self> {
469        // Compute the version to start listing from.
470        let start_from = limit
471            .map(|limit| match NonZero::<Version>::try_from(limit) {
472                Ok(limit) => Ok(Version::saturating_sub(end_version, limit.get() - 1)),
473                _ => Err(Error::generic(format!(
474                    "Invalid limit {limit} when building log segment in timestamp conversion",
475                ))),
476            })
477            .transpose()?;
478
479        // this is a list of commits with possible gaps, we want to take the latest contiguous
480        // chunk of commits
481        let mut listed_commits =
482            LogSegmentFiles::list_commits(storage, &log_root, start_from, Some(end_version))?;
483
484        // remove gaps - return latest contiguous chunk of commits
485        let commits = listed_commits.ascending_commit_files_mut();
486        if !commits.is_empty() {
487            let mut start_idx = commits.len() - 1;
488            while start_idx > 0 && commits[start_idx].version == 1 + commits[start_idx - 1].version
489            {
490                start_idx -= 1;
491            }
492            commits.drain(..start_idx);
493        }
494
495        LogSegment::try_new(listed_commits, log_root, Some(end_version), None)
496    }
497
498    /// Creates a new LogSegment with the given commit file added to the end.
499    /// TODO: Take in multiple commits when Kernel-RS supports txn retries and conflict rebasing.
500    #[allow(unused)]
501    pub(crate) fn new_with_commit_appended(
502        &self,
503        tail_commit_file: ParsedLogPath,
504    ) -> DeltaResult<Self> {
505        require!(
506            tail_commit_file.is_commit(),
507            Error::internal_error(format!(
508                "Cannot extend and create new LogSegment. Tail log file is not a commit file. \
509                Path: {}, Type: {:?}.",
510                tail_commit_file.location.location, tail_commit_file.file_type
511            ))
512        );
513        require!(
514            tail_commit_file.version == self.end_version.wrapping_add(1),
515            Error::internal_error(format!(
516                "Cannot extend and create new LogSegment. Tail commit file version ({}) does not \
517                equal LogSegment end_version ({}) + 1.",
518                tail_commit_file.version, self.end_version
519            ))
520        );
521
522        let mut new_log_segment = self.clone();
523
524        new_log_segment.end_version = tail_commit_file.version;
525        new_log_segment
526            .listed
527            .ascending_commit_files
528            .push(tail_commit_file.clone());
529        new_log_segment.listed.latest_commit_file = Some(tail_commit_file.clone());
530        new_log_segment.listed.max_published_version = match tail_commit_file.file_type {
531            LogPathFileType::Commit => Some(tail_commit_file.version),
532            _ => self.listed.max_published_version,
533        };
534
535        Ok(new_log_segment)
536    }
537
538    /// Creates a new LogSegment reflecting a checkpoint written at this segment's version.
539    /// The checkpoint must be at `end_version`. Kernel does not write multi-part checkpoints,
540    /// so the checkpoint must be a single file (classic parquet or V2 UUID).
541    ///
542    /// If the existing `latest_crc_file` is older than the new checkpoint version, it is
543    /// cleared to preserve the `LogSegmentFiles` invariant that `latest_crc_file.version >=
544    /// checkpoint version`.
545    pub(crate) fn try_new_with_checkpoint(&self, checkpoint: ParsedLogPath) -> DeltaResult<Self> {
546        require!(
547            matches!(
548                checkpoint.file_type,
549                LogPathFileType::SinglePartCheckpoint | LogPathFileType::UuidCheckpoint
550            ),
551            Error::internal_error(format!(
552                "Cannot update LogSegment with checkpoint. Path is not a single-file \
553                checkpoint. Path: {}, Type: {:?}.",
554                checkpoint.location.location, checkpoint.file_type
555            ))
556        );
557        require!(
558            checkpoint.version == self.end_version,
559            Error::internal_error(format!(
560                "Cannot update LogSegment with checkpoint. Checkpoint version ({}) does not \
561                equal LogSegment end_version ({}).",
562                checkpoint.version, self.end_version
563            ))
564        );
565
566        let mut new_log_segment = self.clone();
567        new_log_segment.checkpoint_version = Some(checkpoint.version);
568        let checkpoint_version = checkpoint.version;
569        new_log_segment.listed.checkpoint_parts = vec![checkpoint];
570        // A snapshot at version N only contains commits and compactions at versions <= N,
571        // so a checkpoint at N covers everything and we can clear them entirely.
572        new_log_segment.listed.ascending_commit_files.clear();
573        new_log_segment.listed.ascending_compaction_files.clear();
574        // Preserve the LogSegmentFiles invariant that `latest_crc_file.version >= checkpoint
575        // version`. A stale CRC is worse than missing: downstream P&M fallbacks (see
576        // `read_protocol_metadata_opt`) would load an older P&M and overwrite the current one.
577        new_log_segment
578            .listed
579            .latest_crc_file
580            .take_if(|crc| crc.version < checkpoint_version);
581        // TODO(#839): Once CheckpointWriter exposes the output schema, build a
582        // LastCheckpointHintSummary and thread it through here instead of None. Today the
583        // schema is computed inside checkpoint_data() but not returned. With None, the next
584        // scan will read the checkpoint parquet footer to determine the schema (e.g.
585        // whether stats_parsed or sidecar columns exist).
586        new_log_segment.last_checkpoint_metadata = None;
587        Ok(new_log_segment)
588    }
589
590    /// Creates a new LogSegment with the given CRC file recorded as the latest.
591    /// The CRC file must be at `end_version`.
592    pub(crate) fn try_new_with_crc_file(&self, crc_file: ParsedLogPath<Url>) -> DeltaResult<Self> {
593        require!(
594            crc_file.file_type == LogPathFileType::Crc,
595            Error::internal_error(format!(
596                "Cannot update LogSegment with CRC. Path is not a CRC file. \
597                Path: {}, Type: {:?}.",
598                crc_file.location, crc_file.file_type
599            ))
600        );
601        require!(
602            crc_file.version == self.end_version,
603            Error::internal_error(format!(
604                "Cannot update LogSegment with CRC. CRC version ({}) does not \
605                equal LogSegment end_version ({}).",
606                crc_file.version, self.end_version
607            ))
608        );
609        // Convert to FileMeta with placeholder metadata (size=0, last_modified=0).
610        // Only the URL matters for CRC files: downstream code uses it for version
611        // tracking and reading CRC content via `try_read_crc_file`. Neither `size`
612        // nor `last_modified` is ever accessed.
613        let crc_file = ParsedLogPath {
614            location: FileMeta {
615                location: crc_file.location,
616                last_modified: 0,
617                size: 0,
618            },
619            filename: crc_file.filename,
620            extension: crc_file.extension,
621            version: crc_file.version,
622            file_type: crc_file.file_type,
623        };
624        let mut new_log_segment = self.clone();
625        new_log_segment.listed.latest_crc_file = Some(crc_file);
626        Ok(new_log_segment)
627    }
628
629    pub(crate) fn new_as_published(&self) -> DeltaResult<Self> {
630        // In the future, we can additionally convert the staged commit files to published commit
631        // files. That would reqire faking their FileMeta locations.
632        let mut new_log_segment = self.clone();
633        new_log_segment.listed.max_published_version = Some(self.end_version);
634        Ok(new_log_segment)
635    }
636
637    pub(crate) fn get_unpublished_catalog_commits(&self) -> DeltaResult<Vec<CatalogCommit>> {
638        self.listed
639            .ascending_commit_files
640            .iter()
641            .filter(|file| file.file_type == LogPathFileType::StagedCommit)
642            .filter(|file| {
643                self.listed
644                    .max_published_version
645                    .is_none_or(|v| file.version > v)
646            })
647            .map(|file| CatalogCommit::try_new(&self.log_root, file))
648            .collect()
649    }
650
651    /// Read a stream of actions from this log segment. This returns an iterator of
652    /// [`ActionsBatch`]s which includes EngineData of actions + a boolean flag indicating whether
653    /// the data was read from a commit file (true) or a checkpoint file (false).
654    ///
655    /// The log files will be read from most recent to oldest.
656    ///
657    /// `commit_read_schema` is the (physical) schema to read the commit files with, and
658    /// `checkpoint_read_schema` is the (physical) schema to read checkpoint files with. This can be
659    /// used to project the log files to a subset of the columns. Having two different
660    /// schemas can be useful as a cheap way of doing additional filtering on the checkpoint files
661    /// (e.g. filtering out remove actions).
662    ///
663    ///  The engine data returned might have extra non-log actions (e.g. sidecar
664    ///  actions) that are not part of the schema but this is an implementation
665    ///  detail that should not be relied on and will likely change.
666    ///
667    /// Read a stream of actions from this log segment. This returns an iterator of
668    /// [`ActionsBatch`]s which includes EngineData of actions + a boolean flag indicating whether
669    /// the data was read from a commit file (true) or a checkpoint file (false).
670    ///
671    /// Also returns `CheckpointReadInfo` with stats_parsed compatibility and the checkpoint schema.
672    ///
673    /// `meta_predicate` is an optional expression for row group skipping in checkpoint parquet
674    /// files. It is _NOT_ the query's data predicate, but a hint for skipping irrelevant data.
675    /// IS NOT NULL predicates are automatically derived from `checkpoint_read_schema` and combined
676    /// (AND) with `meta_predicate`, so callers only need to supply query-based skipping predicates.
677    #[internal_api]
678    pub(crate) fn read_actions_with_projected_checkpoint_actions(
679        &self,
680        engine: &dyn Engine,
681        commit_read_schema: SchemaRef,
682        checkpoint_read_schema: SchemaRef,
683        meta_predicate: Option<PredicateRef>,
684        stats_schema: Option<&StructType>,
685        partition_schema: Option<&StructType>,
686    ) -> DeltaResult<
687        ActionsWithCheckpointInfo<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
688    > {
689        // Combine schema-derived IS NOT NULL predicate with any caller-supplied predicate so
690        // checkpoint parquet row groups without any relevant action type can be skipped.
691        // TODO: The semantics of `meta_predicate` will change in a follow-up PR.
692        let is_not_null_pred = schema_to_is_not_null_predicate(&checkpoint_read_schema);
693        let effective_predicate = match (is_not_null_pred, meta_predicate) {
694            (None, x) | (x, None) => x,
695            (Some(a), Some(b)) => Some(Arc::new(Predicate::and((*a).clone(), (*b).clone()))),
696        };
697
698        // `replay` expects commit files to be sorted in descending order, so the return value here
699        // is correct
700        let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?;
701
702        let checkpoint_result = self.create_checkpoint_stream(
703            engine,
704            checkpoint_read_schema,
705            effective_predicate,
706            stats_schema,
707            partition_schema,
708        )?;
709
710        Ok(ActionsWithCheckpointInfo {
711            actions: commit_stream.chain(checkpoint_result.actions),
712            checkpoint_info: checkpoint_result.checkpoint_info,
713        })
714    }
715
716    /// Same as [`Self::read_actions_with_projected_checkpoint_actions`], but uses the same schema
717    /// for reading checkpoints and commits. IS NOT NULL predicates are automatically derived from
718    /// the schema, so callers do not need to supply them.
719    #[internal_api]
720    pub(crate) fn read_actions(
721        &self,
722        engine: &dyn Engine,
723        action_schema: SchemaRef,
724    ) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
725        let result = self.read_actions_with_projected_checkpoint_actions(
726            engine,
727            action_schema.clone(),
728            action_schema,
729            None,
730            None,
731            None,
732        )?;
733        Ok(result.actions)
734    }
735
736    pub(crate) fn projected_checkpoint_read_info(
737        &self,
738        engine: &dyn Engine,
739        action_schema: SchemaRef,
740        stats_schema: Option<&StructType>,
741        partition_schema: Option<&StructType>,
742    ) -> DeltaResult<CheckpointReadInfo> {
743        self.plan_projected_checkpoint_read(engine, action_schema, stats_schema, partition_schema)
744            .map(|(checkpoint_info, _)| checkpoint_info)
745    }
746
747    pub(crate) fn projected_single_part_checkpoint_read_info(
748        &self,
749        engine: &dyn Engine,
750        action_schema: SchemaRef,
751        stats_schema: Option<&StructType>,
752        partition_schema: Option<&StructType>,
753    ) -> DeltaResult<CheckpointReadInfo> {
754        let file_actions_schema = match self.listed.checkpoint_parts.as_slice() {
755            [] => None,
756            [checkpoint] => match &checkpoint.file_type {
757                MultiPartCheckpoint { .. } => {
758                    return Err(Error::internal_error(
759                        "projected_single_part_checkpoint_read_info called with a multi-part checkpoint",
760                    ));
761                }
762                UuidCheckpoint if checkpoint.extension.as_str() == "json" => None,
763                SinglePartCheckpoint | UuidCheckpoint
764                    if checkpoint.extension.as_str() == "parquet" =>
765                {
766                    Some(Self::read_checkpoint_schema(
767                        engine,
768                        checkpoint,
769                        self.checkpoint_schema().as_ref(),
770                    )?)
771                }
772                _ => None,
773            },
774            _ => {
775                return Err(Error::internal_error(
776                    "projected_single_part_checkpoint_read_info called with multiple checkpoint parts",
777                ));
778            }
779        };
780
781        Self::build_projected_checkpoint_read_info(
782            action_schema,
783            file_actions_schema.as_deref(),
784            stats_schema,
785            partition_schema,
786            false,
787        )
788    }
789
790    pub(crate) fn projected_checkpoint_read_info_from_files(
791        engine: &dyn Engine,
792        action_schema: SchemaRef,
793        stats_schema: Option<&StructType>,
794        partition_schema: Option<&StructType>,
795        file_actions_files: &[FileMeta],
796    ) -> DeltaResult<CheckpointReadInfo> {
797        let file_actions_schema = match file_actions_files.first() {
798            Some(first) => Some(engine.parquet_handler().read_parquet_footer(first)?.schema),
799            None => None,
800        };
801
802        Self::build_projected_checkpoint_read_info(
803            action_schema,
804            file_actions_schema.as_deref(),
805            stats_schema,
806            partition_schema,
807            false,
808        )
809    }
810
811    /// find a minimal set to cover the range of commits we want. This is greedy so not always
812    /// optimal, but we assume there are rarely overlapping compactions so this is okay. NB: This
813    /// returns files is DESCENDING ORDER, as that's what `replay` expects. This function assumes
814    /// that all files in `self.ascending_commit_files` and `self.ascending_compaction_files` are in
815    /// range for this log segment. This invariant is maintained by our listing code.
816    pub(crate) fn find_commit_cover(&self) -> Vec<FileMeta> {
817        // Create an iterator sorted in ascending order by (initial version, end version), e.g.
818        // [00.json, 00.09.compacted.json, 00.99.compacted.json, 01.json, 02.json, ..., 10.json,
819        //  10.19.compacted.json, 11.json, ...]
820        let all_files = itertools::Itertools::merge_by(
821            self.listed.ascending_commit_files.iter(),
822            self.listed.ascending_compaction_files.iter(),
823            |path_a, path_b| path_a.version <= path_b.version,
824        );
825
826        let mut last_pushed: Option<&ParsedLogPath> = None;
827
828        let mut selected_files = vec![];
829        for next in all_files {
830            match last_pushed {
831                // Resolve version number ties in favor of the later file (it covers a wider range)
832                Some(prev) if prev.version == next.version => {
833                    let removed = selected_files.pop();
834                    debug!("Selecting {next:?} rather than {removed:?}, it covers a wider range");
835                }
836                // Skip later files whose start overlaps with the previous end
837                Some(&ParsedLogPath {
838                    file_type: LogPathFileType::CompactedCommit { hi },
839                    ..
840                }) if next.version <= hi => {
841                    debug!("Skipping log file {next:?}, it's already covered.");
842                    continue;
843                }
844                _ => {} // just fall through
845            }
846            debug!("Provisionally selecting {next:?}");
847            last_pushed = Some(next);
848            selected_files.push(next.location.clone());
849        }
850        selected_files.reverse();
851        selected_files
852    }
853
854    /// Determines the file actions schema and extracts sidecar file references for checkpoints.
855    ///
856    /// This function analyzes the checkpoint to determine:
857    /// 1. The file actions schema (for stats_parsed / partitionValues_parsed detection)
858    /// 2. Sidecar file references if this is a V2 checkpoint
859    ///
860    /// The logic is:
861    /// - No checkpoint parts: return (None, [])
862    /// - Multi-part (always V1, no sidecars): return checkpoint schema directly
863    /// - UUID-named JSON (always V2): extract sidecars, read first sidecar's schema
864    /// - Classic-named or UUID-named parquet (V1 or V2): read checkpoint schema from hint or
865    ///   footer, then check for sidecar column to distinguish
866    ///   - Has sidecar column (V2): extract sidecars, read first sidecar's schema
867    ///   - No sidecar column (V1): use checkpoint schema directly
868    fn get_file_actions_schema_and_sidecars(
869        &self,
870        engine: &dyn Engine,
871    ) -> DeltaResult<(Option<SchemaRef>, Vec<FileMeta>)> {
872        // Hint schema from `_last_checkpoint` avoids footer reads when available.
873        let hint_schema = self.checkpoint_schema();
874
875        // All parts of a multi-part checkpoint belong to the same table version and follow
876        // the same V1 spec, so reading any one part's schema is sufficient.
877        let Some(checkpoint) = self.listed.checkpoint_parts.first() else {
878            return Ok((None, vec![]));
879        };
880
881        match &checkpoint.file_type {
882            MultiPartCheckpoint { .. } => {
883                // Multi-part checkpoints are always V1 and never have sidecars.
884                let schema =
885                    Self::read_checkpoint_schema(engine, checkpoint, hint_schema.as_ref())?;
886                Ok((Some(schema), vec![]))
887            }
888            UuidCheckpoint if checkpoint.extension.as_str() == "json" => {
889                // JSON checkpoint is always V2. No checkpoint schema is available since JSON
890                // checkpoints don't have a parquet footer to read.
891                self.read_sidecar_schema_and_files(engine, checkpoint, None)
892            }
893            SinglePartCheckpoint | UuidCheckpoint if checkpoint.extension.as_str() == "parquet" => {
894                // Parquet checkpoint (classic-named or UUID-named): either can be V1 or V2.
895                // Check for sidecar column to distinguish.
896                let checkpoint_schema =
897                    Self::read_checkpoint_schema(engine, checkpoint, hint_schema.as_ref())?;
898                if checkpoint_schema.field(SIDECAR_NAME).is_some() {
899                    self.read_sidecar_schema_and_files(engine, checkpoint, Some(&checkpoint_schema))
900                } else {
901                    Ok((Some(checkpoint_schema), vec![]))
902                }
903            }
904            _ => Ok((None, vec![])),
905        }
906    }
907
908    /// Returns the checkpoint's parquet schema, using the hint from `_last_checkpoint` if
909    /// available or reading the parquet footer otherwise.
910    fn read_checkpoint_schema(
911        engine: &dyn Engine,
912        checkpoint: &ParsedLogPath<FileMeta>,
913        hint_schema: Option<&SchemaRef>,
914    ) -> DeltaResult<SchemaRef> {
915        match hint_schema {
916            Some(schema) => Ok(schema.clone()),
917            None => Ok(engine
918                .parquet_handler()
919                .read_parquet_footer(&checkpoint.location)?
920                .schema),
921        }
922    }
923
924    /// Extracts sidecar file references and reads the file actions schema from the first
925    /// sidecar's parquet footer. If no sidecars exist, falls back to `checkpoint_schema`
926    /// since V2 checkpoints may store add actions directly in the main file.
927    fn read_sidecar_schema_and_files(
928        &self,
929        engine: &dyn Engine,
930        checkpoint: &ParsedLogPath<FileMeta>,
931        checkpoint_schema: Option<&SchemaRef>,
932    ) -> DeltaResult<(Option<SchemaRef>, Vec<FileMeta>)> {
933        let sidecar_files = self.extract_sidecar_refs(engine, checkpoint)?;
934        let file_actions_schema = match sidecar_files.first() {
935            Some(first) => Some(engine.parquet_handler().read_parquet_footer(first)?.schema),
936            None => checkpoint_schema.cloned(),
937        };
938        Ok((file_actions_schema, sidecar_files))
939    }
940
941    /// Returns an iterator over checkpoint data, processing sidecar files when necessary.
942    ///
943    /// For checkpoints that need file actions, this function:
944    /// 1. Determines the file actions schema (for stats_parsed / partitionValues_parsed detection)
945    /// 2. Extracts sidecar file references if present (V2 checkpoints)
946    /// 3. Reads checkpoint and sidecar data using cached sidecar refs
947    ///
948    /// Returns a tuple of the actions iterator and [`CheckpointReadInfo`].
949    fn create_checkpoint_stream(
950        &self,
951        engine: &dyn Engine,
952        action_schema: SchemaRef,
953        meta_predicate: Option<PredicateRef>,
954        stats_schema: Option<&StructType>,
955        partition_schema: Option<&StructType>,
956    ) -> DeltaResult<
957        ActionsWithCheckpointInfo<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
958    > {
959        let (checkpoint_info, sidecar_files) = self.plan_projected_checkpoint_read(
960            engine,
961            action_schema,
962            stats_schema,
963            partition_schema,
964        )?;
965        let augmented_checkpoint_read_schema = checkpoint_info.checkpoint_read_schema.clone();
966
967        let checkpoint_file_meta: Vec<_> = self
968            .listed
969            .checkpoint_parts
970            .iter()
971            .map(|f| f.location.clone())
972            .collect();
973
974        let parquet_handler = engine.parquet_handler();
975
976        // Historically, we had a shared file reader trait for JSON and Parquet handlers,
977        // but it was removed to avoid unnecessary coupling. This is a concrete case
978        // where it *could* have been useful, but for now, we're keeping them separate.
979        // If similar patterns start appearing elsewhere, we should reconsider that decision.
980        let actions = match self.listed.checkpoint_parts.first() {
981            Some(parsed_log_path) if parsed_log_path.extension == "json" => {
982                engine.json_handler().read_json_files(
983                    &checkpoint_file_meta,
984                    augmented_checkpoint_read_schema.clone(),
985                    meta_predicate.clone(),
986                )?
987            }
988            Some(parsed_log_path) if parsed_log_path.extension == "parquet" => parquet_handler
989                .read_parquet_files(
990                    &checkpoint_file_meta,
991                    augmented_checkpoint_read_schema.clone(),
992                    meta_predicate.clone(),
993                )?,
994            Some(parsed_log_path) => {
995                return Err(Error::generic(format!(
996                    "Unsupported checkpoint file type: {}",
997                    parsed_log_path.extension,
998                )));
999            }
1000            // This is the case when there are no checkpoints in the log segment
1001            // so we return an empty iterator
1002            None => Box::new(std::iter::empty()),
1003        };
1004
1005        // Read sidecars with the same schema as checkpoint (including stats_parsed if available).
1006        // The sidecar column will be null in sidecar batches, which is harmless.
1007        // Both checkpoint and sidecar parquet files share the same `add.stats_parsed.*` column
1008        // layout, so we reuse the same predicate for row group skipping.
1009        let sidecar_batches = if !sidecar_files.is_empty() {
1010            parquet_handler.read_parquet_files(
1011                &sidecar_files,
1012                augmented_checkpoint_read_schema.clone(),
1013                meta_predicate,
1014            )?
1015        } else {
1016            Box::new(std::iter::empty())
1017        };
1018
1019        // Chain checkpoint batches with sidecar batches.
1020        // The boolean flag indicates whether the batch originated from a commit file
1021        // (true) or a checkpoint file (false).
1022        let actions_iter = actions
1023            .map_ok(|batch| ActionsBatch::new(batch, false))
1024            .chain(sidecar_batches.map_ok(|batch| ActionsBatch::new(batch, false)));
1025
1026        let checkpoint_info = CheckpointReadInfo {
1027            has_stats_parsed: checkpoint_info.has_stats_parsed,
1028            has_partition_values_parsed: checkpoint_info.has_partition_values_parsed,
1029            checkpoint_read_schema: augmented_checkpoint_read_schema,
1030        };
1031        Ok(ActionsWithCheckpointInfo {
1032            actions: actions_iter,
1033            checkpoint_info,
1034        })
1035    }
1036
1037    fn plan_projected_checkpoint_read(
1038        &self,
1039        engine: &dyn Engine,
1040        action_schema: SchemaRef,
1041        stats_schema: Option<&StructType>,
1042        partition_schema: Option<&StructType>,
1043    ) -> DeltaResult<(CheckpointReadInfo, Vec<FileMeta>)> {
1044        let need_file_actions = schema_contains_file_actions(&action_schema);
1045
1046        let (file_actions_schema, sidecar_files) = if need_file_actions {
1047            self.get_file_actions_schema_and_sidecars(engine)?
1048        } else {
1049            (None, vec![])
1050        };
1051
1052        let needs_sidecar = need_file_actions && !sidecar_files.is_empty();
1053        let checkpoint_info = Self::build_projected_checkpoint_read_info(
1054            action_schema,
1055            file_actions_schema.as_deref(),
1056            stats_schema,
1057            partition_schema,
1058            needs_sidecar,
1059        )?;
1060
1061        Ok((checkpoint_info, sidecar_files))
1062    }
1063
1064    fn build_projected_checkpoint_read_info(
1065        action_schema: SchemaRef,
1066        file_actions_schema: Option<&StructType>,
1067        stats_schema: Option<&StructType>,
1068        partition_schema: Option<&StructType>,
1069        include_sidecar_field: bool,
1070    ) -> DeltaResult<CheckpointReadInfo> {
1071        let has_stats_parsed =
1072            stats_schema
1073                .zip(file_actions_schema)
1074                .is_some_and(|(stats, file_schema)| {
1075                    Self::schema_has_compatible_stats_parsed(file_schema, stats)
1076                });
1077
1078        let has_partition_values_parsed = partition_schema
1079            .zip(file_actions_schema)
1080            .is_some_and(|(ps, fs)| Self::schema_has_compatible_partition_values_parsed(fs, ps));
1081
1082        let needs_add_augmentation = has_stats_parsed || has_partition_values_parsed;
1083        let checkpoint_read_schema = if needs_add_augmentation || include_sidecar_field {
1084            let mut new_fields: Vec<StructField> = if let (true, Some(add_field)) =
1085                (needs_add_augmentation, action_schema.field("add"))
1086            {
1087                let DataType::Struct(add_struct) = add_field.data_type() else {
1088                    return Err(Error::internal_error(
1089                        "add field in action schema must be a struct",
1090                    ));
1091                };
1092                let mut add_fields: Vec<StructField> = add_struct.fields().cloned().collect();
1093
1094                if let (true, Some(ss)) = (has_stats_parsed, stats_schema) {
1095                    add_fields.push(StructField::nullable(
1096                        "stats_parsed",
1097                        DataType::Struct(Box::new(ss.clone())),
1098                    ));
1099                }
1100
1101                if let (true, Some(ps)) = (has_partition_values_parsed, partition_schema) {
1102                    add_fields.push(StructField::nullable(
1103                        "partitionValues_parsed",
1104                        DataType::Struct(Box::new(ps.clone())),
1105                    ));
1106                }
1107
1108                action_schema
1109                    .fields()
1110                    .map(|field| {
1111                        if field.name() == "add" {
1112                            StructField::new(
1113                                add_field.name(),
1114                                StructType::new_unchecked(add_fields.clone()),
1115                                add_field.is_nullable(),
1116                            )
1117                            .with_metadata(add_field.metadata.clone())
1118                        } else {
1119                            field.clone()
1120                        }
1121                    })
1122                    .collect()
1123            } else {
1124                action_schema.fields().cloned().collect()
1125            };
1126
1127            if include_sidecar_field {
1128                new_fields.push(StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()));
1129            }
1130
1131            Arc::new(StructType::new_unchecked(new_fields))
1132        } else {
1133            action_schema
1134        };
1135
1136        Ok(CheckpointReadInfo {
1137            has_stats_parsed,
1138            has_partition_values_parsed,
1139            checkpoint_read_schema,
1140        })
1141    }
1142
1143    /// Extracts sidecar file references from a checkpoint file.
1144    fn extract_sidecar_refs(
1145        &self,
1146        engine: &dyn Engine,
1147        checkpoint: &ParsedLogPath,
1148    ) -> DeltaResult<Vec<FileMeta>> {
1149        // Read checkpoint with just the sidecar column
1150        let batches = match checkpoint.extension.as_str() {
1151            "json" => engine.json_handler().read_json_files(
1152                std::slice::from_ref(&checkpoint.location),
1153                Self::sidecar_read_schema(),
1154                None,
1155            )?,
1156            "parquet" => engine.parquet_handler().read_parquet_files(
1157                std::slice::from_ref(&checkpoint.location),
1158                Self::sidecar_read_schema(),
1159                None,
1160            )?,
1161            _ => return Ok(vec![]),
1162        };
1163
1164        // Extract sidecar file references
1165        let mut visitor = SidecarVisitor::default();
1166        for batch_result in batches {
1167            let batch = batch_result?;
1168            visitor.visit_rows_of(batch.as_ref())?;
1169        }
1170
1171        // Convert to FileMeta
1172        visitor
1173            .sidecars
1174            .iter()
1175            .map(|sidecar| sidecar.to_filemeta(&self.log_root))
1176            .try_collect()
1177    }
1178
1179    /// Creates a pruned LogSegment for replay *after* a CRC at `start_v_exclusive`.
1180    ///
1181    /// The CRC covers protocol, metadata, and checkpoint state, so this segment drops
1182    /// checkpoint files, CRC files, and last checkpoint metadata. Only commits and compactions
1183    /// in `(start_v_exclusive, end_version]` are retained.
1184    pub(crate) fn segment_after_crc(&self, start_v_exclusive: Version) -> Self {
1185        let (commits, compactions) =
1186            self.filtered_commits_and_compactions(Some(start_v_exclusive), self.end_version);
1187        LogSegment {
1188            end_version: self.end_version,
1189            checkpoint_version: None,
1190            log_root: self.log_root.clone(),
1191            last_checkpoint_metadata: None,
1192            listed: LogSegmentFiles {
1193                ascending_commit_files: commits,
1194                ascending_compaction_files: compactions,
1195                checkpoint_parts: vec![],
1196                latest_crc_file: None,
1197                latest_commit_file: None,
1198                max_published_version: None,
1199            },
1200        }
1201    }
1202
1203    /// Creates a pruned LogSegment for replay *before* a CRC at `end_v_inclusive`.
1204    ///
1205    /// Used as fallback when the CRC at `end_v_inclusive` fails to load. Falls back to
1206    /// checkpoint-based replay, so checkpoint files and metadata are preserved. Only commits
1207    /// and compactions in `(checkpoint_version, end_v_inclusive]` are retained. Fields not
1208    /// needed for this replay path (CRC file, latest commit file) are dropped.
1209    pub(crate) fn segment_through_crc(&self, end_v_inclusive: Version) -> Self {
1210        let (commits, compactions) =
1211            self.filtered_commits_and_compactions(self.checkpoint_version, end_v_inclusive);
1212        LogSegment {
1213            end_version: self.end_version,
1214            checkpoint_version: self.checkpoint_version,
1215            log_root: self.log_root.clone(),
1216            last_checkpoint_metadata: self.last_checkpoint_metadata.clone(),
1217            listed: LogSegmentFiles {
1218                ascending_commit_files: commits,
1219                ascending_compaction_files: compactions,
1220                checkpoint_parts: self.listed.checkpoint_parts.clone(),
1221                latest_crc_file: None,
1222                latest_commit_file: None,
1223                max_published_version: None,
1224            },
1225        }
1226    }
1227
1228    /// Filters commits and compactions to those within `(lo_exclusive, hi_inclusive]`.
1229    /// If `lo_exclusive` is `None`, there is no lower bound.
1230    fn filtered_commits_and_compactions(
1231        &self,
1232        lo_exclusive: Option<Version>,
1233        hi_inclusive: Version,
1234    ) -> (Vec<ParsedLogPath>, Vec<ParsedLogPath>) {
1235        let above_lo = |v: Version| lo_exclusive.is_none_or(|lo| lo < v);
1236        let commits = self
1237            .listed
1238            .ascending_commit_files
1239            .iter()
1240            .filter(|c| above_lo(c.version) && c.version <= hi_inclusive)
1241            .cloned()
1242            .collect();
1243        let compactions = self
1244            .listed
1245            .ascending_compaction_files
1246            .iter()
1247            .filter(|c| {
1248                matches!(
1249                    c.file_type,
1250                    LogPathFileType::CompactedCommit { hi }
1251                        if above_lo(c.version) && hi <= hi_inclusive
1252                )
1253            })
1254            .cloned()
1255            .collect();
1256        (commits, compactions)
1257    }
1258
1259    /// How many commits since a checkpoint, according to this log segment.
1260    pub(crate) fn commits_since_checkpoint(&self) -> u64 {
1261        // we can use 0 as the checkpoint version if there is no checkpoint since `end_version - 0`
1262        // is the correct number of commits since a checkpoint if there are no checkpoints
1263        let checkpoint_version = self.checkpoint_version.unwrap_or(0);
1264        debug_assert!(checkpoint_version <= self.end_version);
1265        self.end_version - checkpoint_version
1266    }
1267
1268    /// How many commits since a log-compaction or checkpoint, according to this log segment.
1269    pub(crate) fn commits_since_log_compaction_or_checkpoint(&self) -> u64 {
1270        // Annoyingly we have to search all the compaction files to determine this, because we only
1271        // sort by start version, so technically the max end version could be anywhere in the vec.
1272        // We can return 0 in the case there is no compaction since end_version - 0 is the correct
1273        // number of commits since compaction if there are no compactions
1274        let max_compaction_end = self
1275            .listed
1276            .ascending_compaction_files
1277            .iter()
1278            .fold(0, |cur, f| {
1279                if let &ParsedLogPath {
1280                    file_type: LogPathFileType::CompactedCommit { hi },
1281                    ..
1282                } = f
1283                {
1284                    Version::max(cur, hi)
1285                } else {
1286                    warn!("Found invalid ParsedLogPath in ascending_compaction_files: {f:?}");
1287                    cur
1288                }
1289            });
1290        // we want to subtract off the max of the max compaction end or the checkpoint version
1291        let to_sub = Version::max(self.checkpoint_version.unwrap_or(0), max_compaction_end);
1292        debug_assert!(to_sub <= self.end_version);
1293        self.end_version - to_sub
1294    }
1295
1296    pub(crate) fn validate_published(&self) -> DeltaResult<()> {
1297        require!(
1298            self.listed
1299                .max_published_version
1300                .is_some_and(|v| v == self.end_version),
1301            Error::generic("Log segment is not published")
1302        );
1303        Ok(())
1304    }
1305
1306    /// Schema to read just the sidecar column from a checkpoint file.
1307    fn sidecar_read_schema() -> SchemaRef {
1308        static SIDECAR_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
1309            Arc::new(StructType::new_unchecked([StructField::nullable(
1310                SIDECAR_NAME,
1311                Sidecar::to_schema(),
1312            )]))
1313        });
1314        SIDECAR_SCHEMA.clone()
1315    }
1316
1317    /// Checks if a checkpoint schema contains a usable `add.stats_parsed` field.
1318    ///
1319    /// This validates that:
1320    /// 1. The `add.stats_parsed` field exists in the checkpoint schema
1321    /// 2. The types in `stats_parsed` are compatible with the stats schema for data skipping
1322    ///
1323    /// The `stats_schema` parameter contains only the columns referenced in the data skipping
1324    /// predicate. This is built from the predicate and passed in by the caller.
1325    ///
1326    /// Both the checkpoint's `stats_parsed` schema and the `stats_schema` for data skipping
1327    /// use physical column names (not logical names), so direct name comparison is correct.
1328    ///
1329    /// Returns `false` if stats_parsed doesn't exist or has incompatible types.
1330    pub(crate) fn schema_has_compatible_stats_parsed(
1331        checkpoint_schema: &StructType,
1332        stats_schema: &StructType,
1333    ) -> bool {
1334        // Get add.stats_parsed from the checkpoint schema
1335        let Some(stats_parsed) = checkpoint_schema
1336            .field("add")
1337            .and_then(|f| match f.data_type() {
1338                DataType::Struct(s) => s.field("stats_parsed"),
1339                _ => None,
1340            })
1341        else {
1342            debug!("stats_parsed not compatible: checkpoint schema does not contain add.stats_parsed field");
1343            return false;
1344        };
1345
1346        let DataType::Struct(stats_struct) = stats_parsed.data_type() else {
1347            debug!(
1348                "stats_parsed not compatible: add.stats_parsed field is not a Struct, got {:?}",
1349                stats_parsed.data_type()
1350            );
1351            return false;
1352        };
1353
1354        // Check type compatibility for both minValues and maxValues structs.
1355        // While these typically have the same schema, the protocol doesn't guarantee it,
1356        // so we check both to be safe.
1357        for field_name in ["minValues", "maxValues"] {
1358            let Some(checkpoint_values_field) = stats_struct.field(field_name) else {
1359                // stats_parsed exists but no minValues/maxValues - unusual but valid
1360                continue;
1361            };
1362
1363            // minValues/maxValues must be a Struct containing per-column statistics.
1364            // If it exists but isn't a Struct, the schema is malformed and unusable.
1365            let DataType::Struct(checkpoint_values) = checkpoint_values_field.data_type() else {
1366                debug!(
1367                    "stats_parsed not compatible: stats_parsed.{} is not a Struct, got {:?}",
1368                    field_name,
1369                    checkpoint_values_field.data_type()
1370                );
1371                return false;
1372            };
1373
1374            // Get the corresponding field from stats_schema (e.g., stats_schema.minValues)
1375            let Some(stats_values_field) = stats_schema.field(field_name) else {
1376                // stats_schema doesn't have minValues/maxValues, skip this check
1377                continue;
1378            };
1379            let DataType::Struct(stats_values) = stats_values_field.data_type() else {
1380                // stats_schema.minValues/maxValues isn't a struct - shouldn't happen but skip
1381                continue;
1382            };
1383
1384            // Check type compatibility recursively for nested structs.
1385            // Only fields that exist in both schemas need compatible types.
1386            // Extra fields in checkpoint are ignored; missing fields return null.
1387            if !Self::structs_have_compatible_types(checkpoint_values, stats_values, field_name) {
1388                return false;
1389            }
1390        }
1391
1392        debug!("Checkpoint schema has compatible stats_parsed for data skipping");
1393        true
1394    }
1395
1396    /// Recursively checks if two struct types have compatible field types.
1397    ///
1398    /// Used by both `stats_parsed` and `partitionValues_parsed` compatibility checks.
1399    /// For each field in `needed`, if it exists in `available` (checkpoint):
1400    /// - Primitive types: must be compatible via [`PrimitiveType::is_stats_type_compatible_with`]
1401    ///   (allows type widening and Parquet physical type reinterpretation)
1402    /// - Nested structs: recursively check inner fields
1403    /// - Missing fields in checkpoint: OK (will return null when accessed)
1404    /// - Extra fields in checkpoint: OK (ignored)
1405    fn structs_have_compatible_types(
1406        available: &StructType,
1407        needed: &StructType,
1408        context: &str,
1409    ) -> bool {
1410        for needed_field in needed.fields() {
1411            let Some(available_field) = available.field(needed_field.name()) else {
1412                // Field missing in checkpoint - that's OK, it will be null
1413                continue;
1414            };
1415
1416            match (available_field.data_type(), needed_field.data_type()) {
1417                // Both are structs: recurse
1418                (DataType::Struct(avail_struct), DataType::Struct(need_struct)) => {
1419                    let nested_context = format!("{}.{}", context, needed_field.name());
1420                    if !Self::structs_have_compatible_types(
1421                        avail_struct,
1422                        need_struct,
1423                        &nested_context,
1424                    ) {
1425                        return false;
1426                    }
1427                }
1428                // Non-struct types: use stats-specific rules for primitives and standard
1429                // schema rules otherwise.
1430                (avail_type, need_type) => {
1431                    let compatible = match (avail_type, need_type) {
1432                        (DataType::Primitive(a), DataType::Primitive(b)) => {
1433                            a.is_stats_type_compatible_with(b)
1434                        }
1435                        (a, b) => a.can_read_as(b).is_ok(),
1436                    };
1437                    if !compatible {
1438                        debug!(
1439                            "stats_parsed not compatible: incompatible type for '{}' in {}: \
1440                             checkpoint has {:?}, stats schema needs {:?}",
1441                            needed_field.name(),
1442                            context,
1443                            avail_type,
1444                            need_type
1445                        );
1446                        return false;
1447                    }
1448                }
1449            }
1450        }
1451        true
1452    }
1453
1454    /// Checks if a checkpoint schema contains a usable `add.partitionValues_parsed` field.
1455    ///
1456    /// Validates that:
1457    /// 1. The `add.partitionValues_parsed` field exists in the checkpoint schema
1458    /// 2. The types for partition columns present in both schemas are compatible
1459    ///
1460    /// Missing partition columns in the checkpoint are OK (they simply won't contribute
1461    /// to row group skipping). Returns `false` if `partitionValues_parsed` doesn't exist
1462    /// or has incompatible types for any shared column.
1463    pub(crate) fn schema_has_compatible_partition_values_parsed(
1464        checkpoint_schema: &StructType,
1465        partition_schema: &StructType,
1466    ) -> bool {
1467        let Some(partition_parsed) =
1468            checkpoint_schema
1469                .field("add")
1470                .and_then(|f| match f.data_type() {
1471                    DataType::Struct(s) => s.field("partitionValues_parsed"),
1472                    _ => None,
1473                })
1474        else {
1475            debug!("partitionValues_parsed not compatible: checkpoint schema does not contain add.partitionValues_parsed field");
1476            return false;
1477        };
1478
1479        let DataType::Struct(partition_struct) = partition_parsed.data_type() else {
1480            warn!(
1481                "partitionValues_parsed not compatible: add.partitionValues_parsed is not a Struct, got {:?}",
1482                partition_parsed.data_type()
1483            );
1484            return false;
1485        };
1486
1487        // Flat struct: reuse the recursive type checker (trivial case with no nesting)
1488        if !Self::structs_have_compatible_types(
1489            partition_struct,
1490            partition_schema,
1491            "partitionValues_parsed",
1492        ) {
1493            return false;
1494        }
1495
1496        debug!("Checkpoint schema has compatible partitionValues_parsed for partition pruning");
1497        true
1498    }
1499}
1500
1501fn validate_compaction_files(compactions: &[ParsedLogPath]) -> DeltaResult<()> {
1502    for (i, f) in compactions.iter().enumerate() {
1503        let LogPathFileType::CompactedCommit { hi } = f.file_type else {
1504            return Err(Error::generic(
1505                "ascending_compaction_files contains non-compaction file",
1506            ));
1507        };
1508        if f.version > hi {
1509            return Err(Error::generic(format!(
1510                "compaction file has start version {} > end version {}",
1511                f.version, hi
1512            )));
1513        }
1514        if let Some(next) = compactions.get(i + 1) {
1515            // next's type is validated on its own iteration; skip sort check if it isn't a
1516            // CompactedCommit since the type error will be caught then.
1517            if let LogPathFileType::CompactedCommit { hi: next_hi } = next.file_type {
1518                if !(f.version < next.version || (f.version == next.version && hi <= next_hi)) {
1519                    return Err(Error::generic(format!(
1520                        "ascending_compaction_files is not sorted: {f:?} -> {next:?}"
1521                    )));
1522                }
1523            }
1524        }
1525    }
1526    Ok(())
1527}
1528
1529fn validate_checkpoint_parts(parts: &[ParsedLogPath]) -> DeltaResult<()> {
1530    if parts.is_empty() {
1531        return Ok(());
1532    }
1533    let n = parts.len();
1534    let first_version = parts[0].version;
1535    for p in parts {
1536        if !p.is_checkpoint() {
1537            return Err(Error::generic(
1538                "checkpoint_parts contains non-checkpoint file",
1539            ));
1540        }
1541        if p.version != first_version {
1542            return Err(Error::generic(
1543                "multi-part checkpoint parts have different versions",
1544            ));
1545        }
1546        match p.file_type {
1547            LogPathFileType::MultiPartCheckpoint { num_parts, .. } if num_parts as usize == n => {}
1548            LogPathFileType::MultiPartCheckpoint { num_parts, .. } => {
1549                return Err(Error::generic(format!(
1550                    "multi-part checkpoint part count mismatch: slice has {n} parts but num_parts field says {num_parts}"
1551                )));
1552            }
1553            _ if n > 1 => {
1554                return Err(Error::generic(format!(
1555                    "multi-part checkpoint part count mismatch: expected {n} multi-part checkpoint files but got a non-multi-part checkpoint"
1556                )));
1557            }
1558            _ => {}
1559        }
1560    }
1561    Ok(())
1562}
1563
1564fn validate_commit_file_types(commits: &[ParsedLogPath]) -> DeltaResult<()> {
1565    for f in commits {
1566        if !f.is_commit() {
1567            return Err(Error::generic(
1568                "ascending_commit_files contains non-commit file",
1569            ));
1570        }
1571    }
1572    Ok(())
1573}
1574
1575fn validate_commit_files_contiguous(commits: &[ParsedLogPath]) -> DeltaResult<()> {
1576    for pair in commits.windows(2) {
1577        if pair[0].version + 1 != pair[1].version {
1578            return Err(Error::generic(format!(
1579                "Expected contiguous commit files, but found gap: {:?} -> {:?}",
1580                pair[0], pair[1]
1581            )));
1582        }
1583    }
1584    Ok(())
1585}
1586
1587/// Validates that there is no gap between the checkpoint and the first commit file.
1588///
1589/// When a checkpoint exists and commits are also present (after filtering out commits at or before
1590/// the checkpoint), the first commit must immediately follow the checkpoint (i.e., be at
1591/// `checkpoint_version + 1`). A gap indicates missing log files.
1592fn validate_checkpoint_commit_gap(
1593    checkpoint_version: Option<Version>,
1594    commits: &[ParsedLogPath],
1595) -> DeltaResult<()> {
1596    if let (Some(checkpoint_version), Some(first_commit)) = (checkpoint_version, commits.first()) {
1597        require!(
1598            checkpoint_version + 1 == first_commit.version,
1599            Error::InvalidCheckpoint(format!(
1600                "Gap between checkpoint version {checkpoint_version} and next commit {}",
1601                first_commit.version
1602            ))
1603        );
1604    }
1605    Ok(())
1606}
1607
1608/// Validates that the log segment covers exactly `end_version` (when specified) and returns the
1609/// effective version -- the version of the last commit, or the checkpoint version if no commits
1610/// are present.
1611///
1612/// Returns an error if the segment is empty (no commits and no checkpoint parts), or if the
1613/// effective version does not match the requested `end_version`.
1614fn validate_end_version(
1615    commits: &[ParsedLogPath],
1616    checkpoint_parts: &[ParsedLogPath],
1617    end_version: Option<Version>,
1618) -> DeltaResult<Version> {
1619    let effective_version = commits
1620        .last()
1621        .or(checkpoint_parts.first())
1622        .ok_or(Error::generic("No files in log segment"))?
1623        .version;
1624    if let Some(end_version) = end_version {
1625        require!(
1626            effective_version == end_version,
1627            Error::generic(format!(
1628                "LogSegment end version {effective_version} not the same as the specified end version {end_version}"
1629            ))
1630        );
1631    }
1632    Ok(effective_version)
1633}
1634
1635/// Validates the `latest_commit_file` field of a [`LogSegmentFiles`]. Enforces:
1636///
1637/// 1. If `ascending_commit_files` is non-empty, `latest_commit_file` must be `Some`.
1638/// 2. If `latest_commit_file` is `Some`, its version must equal `effective_version`.
1639fn validate_latest_commit_file(
1640    listed: &LogSegmentFiles,
1641    effective_version: Version,
1642) -> DeltaResult<()> {
1643    require!(
1644        listed.ascending_commit_files.is_empty() || listed.latest_commit_file.is_some(),
1645        Error::internal_error(
1646            "latest_commit_file must be Some when ascending_commit_files is non-empty"
1647        )
1648    );
1649    if let Some(commit) = &listed.latest_commit_file {
1650        require!(
1651            commit.version == effective_version,
1652            Error::internal_error(format!(
1653                "latest_commit_file version {} does not match end_version {effective_version}",
1654                commit.version,
1655            ))
1656        );
1657    }
1658    Ok(())
1659}