1use std::num::NonZero;
4use std::sync::{Arc, LazyLock};
5
6use delta_kernel_derive::internal_api;
7use itertools::Itertools;
8use tracing::{debug, info, instrument, warn};
9use url::Url;
10
11use crate::actions::visitors::SidecarVisitor;
12use crate::actions::{
13 get_log_add_schema, schema_contains_file_actions, Sidecar, DOMAIN_METADATA_NAME, METADATA_NAME,
14 PROTOCOL_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME,
15};
16use crate::committer::CatalogCommit;
17use crate::expressions::ColumnName;
18use crate::last_checkpoint_hint::{LastCheckpointHint, LastCheckpointHintSummary};
19use crate::log_reader::commit::CommitReader;
20use crate::log_replay::ActionsBatch;
21#[internal_api]
22use crate::log_segment_files::LogSegmentFiles;
23use crate::metrics::MetricId;
24use crate::path::LogPathFileType::*;
25use crate::path::{LogPathFileType, ParsedLogPath};
26use crate::schema::compare::SchemaComparison;
27use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _};
28use crate::utils::require;
29use crate::{
30 DeltaResult, Engine, Error, Expression, FileMeta, Predicate, PredicateRef, RowVisitor,
31 StorageHandler, Version,
32};
33
34mod domain_metadata_replay;
35mod protocol_metadata_replay;
36
37pub(crate) use domain_metadata_replay::DomainMetadataMap;
38
39#[cfg(test)]
40mod crc_tests;
41#[cfg(test)]
42mod tests;
43
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48#[internal_api]
49pub(crate) struct CheckpointReadInfo {
50 #[allow(unused)]
53 pub has_stats_parsed: bool,
54 #[serde(default)]
58 #[allow(unused)]
59 pub has_partition_values_parsed: bool,
60 #[allow(unused)]
62 pub checkpoint_read_schema: SchemaRef,
63}
64
65impl CheckpointReadInfo {
66 #[allow(unused)]
69 pub(crate) fn without_stats_parsed() -> Self {
70 Self {
71 has_stats_parsed: false,
72 has_partition_values_parsed: false,
73 checkpoint_read_schema: get_log_add_schema().clone(),
74 }
75 }
76}
77
78#[internal_api]
83pub(crate) struct ActionsWithCheckpointInfo<A: Iterator<Item = DeltaResult<ActionsBatch>>> {
84 pub actions: A,
86 #[allow(unused)]
88 pub checkpoint_info: CheckpointReadInfo,
89}
90
91#[derive(Debug, Clone, PartialEq, Eq)]
105#[internal_api]
106pub(crate) struct LogSegment {
107 pub end_version: Version,
108 pub checkpoint_version: Option<Version>,
109 pub log_root: Url,
110 pub listed: LogSegmentFiles,
112
113 last_checkpoint_metadata: Option<LastCheckpointHintSummary>,
120}
121
122fn action_identifying_column(action_name: &str) -> Option<ColumnName> {
129 match action_name {
130 METADATA_NAME => Some(ColumnName::new([METADATA_NAME, "id"])),
131 PROTOCOL_NAME => Some(ColumnName::new([PROTOCOL_NAME, "minReaderVersion"])),
132 SET_TRANSACTION_NAME => Some(ColumnName::new([SET_TRANSACTION_NAME, "appId"])),
133 DOMAIN_METADATA_NAME => Some(ColumnName::new([DOMAIN_METADATA_NAME, "domain"])),
134 _ => None,
135 }
136}
137
138fn schema_to_is_not_null_predicate(schema: &StructType) -> Option<PredicateRef> {
142 let columns: Vec<ColumnName> = schema
144 .fields()
145 .map(|f| action_identifying_column(f.name()))
146 .collect::<Option<_>>()?;
147 let mut predicates = columns
148 .into_iter()
149 .map(|col| Expression::column(col).is_not_null());
150 let first = predicates.next()?;
151 Some(Arc::new(predicates.fold(first, Predicate::or)))
152}
153
154impl LogSegment {
155 pub(crate) fn new_for_version_zero(
165 log_root: Url,
166 commit_file: ParsedLogPath,
167 ) -> DeltaResult<Self> {
168 require!(
169 commit_file.version == 0,
170 crate::Error::internal_error(format!(
171 "new_for_version_zero called with version {}",
172 commit_file.version
173 ))
174 );
175 require!(
176 commit_file.is_commit(),
177 crate::Error::internal_error(format!(
178 "new_for_version_zero called with non-commit file type: {:?}",
179 commit_file.file_type
180 ))
181 );
182 Ok(Self {
183 end_version: commit_file.version,
184 checkpoint_version: None,
185 log_root,
186 last_checkpoint_metadata: None,
187 listed: LogSegmentFiles {
188 max_published_version: Some(commit_file.version),
189 latest_commit_file: Some(commit_file.clone()),
190 ascending_commit_files: vec![commit_file],
191 ..Default::default()
192 },
193 })
194 }
195
196 #[internal_api]
197 pub(crate) fn try_new(
198 mut listed_files: LogSegmentFiles,
199 log_root: Url,
200 end_version: Option<Version>,
201 last_checkpoint_metadata: Option<LastCheckpointHintSummary>,
202 ) -> DeltaResult<Self> {
203 validate_compaction_files(&listed_files.ascending_compaction_files)?;
204 validate_checkpoint_parts(&listed_files.checkpoint_parts)?;
205 validate_commit_file_types(&listed_files.ascending_commit_files)?;
206 validate_commit_files_contiguous(&listed_files.ascending_commit_files)?;
207
208 let checkpoint_version =
210 if let Some(checkpoint_file) = listed_files.checkpoint_parts.first() {
211 let version = checkpoint_file.version;
212 listed_files
213 .ascending_commit_files
214 .retain(|log_path| version < log_path.version);
215 Some(version)
216 } else {
217 None
218 };
219
220 validate_checkpoint_commit_gap(checkpoint_version, &listed_files.ascending_commit_files)?;
221 let effective_version = validate_end_version(
222 &listed_files.ascending_commit_files,
223 &listed_files.checkpoint_parts,
224 end_version,
225 )?;
226 let log_segment = LogSegment {
233 end_version: effective_version,
234 checkpoint_version,
235 log_root,
236 last_checkpoint_metadata,
237 listed: listed_files,
238 };
239
240 info!(segment = %log_segment.summary());
241
242 Ok(log_segment)
243 }
244
245 pub(crate) fn last_checkpoint_version(&self) -> Option<Version> {
247 self.last_checkpoint_metadata.as_ref().map(|m| m.version)
248 }
249
250 pub(crate) fn checkpoint_schema(&self) -> Option<SchemaRef> {
257 let m = self.last_checkpoint_metadata.as_ref()?;
258 if Some(m.version) != self.checkpoint_version {
259 return None;
260 }
261 m.schema.clone()
262 }
263
264 pub(crate) fn last_checkpoint_hint_summary(&self) -> Option<LastCheckpointHintSummary> {
269 self.last_checkpoint_metadata.clone()
270 }
271
272 fn summary(&self) -> String {
274 format!(
275 "{{v={}, commits={}, checkpoint_v={}, checkpoint_parts={}, compactions={}, crc_v={}, max_pub_v={}}}",
276 self.end_version,
277 self.listed.ascending_commit_files.len(),
278 self.checkpoint_version
279 .map(|v| v.to_string())
280 .unwrap_or_else(|| "none".into()),
281 self.listed.checkpoint_parts.len(),
282 self.listed.ascending_compaction_files.len(),
283 self.listed.latest_crc_file
284 .as_ref()
285 .map(|f| f.version.to_string())
286 .unwrap_or_else(|| "none".into()),
287 self.listed.max_published_version
288 .map(|v| v.to_string())
289 .unwrap_or_else(|| "none".into()),
290 )
291 }
292
293 #[instrument(
308 name = "segment.for_snapshot",
309 err,
310 skip(storage, time_travel_version),
311 fields(report, operation_id = %operation_id, num_commit_files, num_checkpoint_files, num_compaction_files)
312 )]
313 #[internal_api]
314 pub(crate) fn for_snapshot(
315 storage: &dyn StorageHandler,
316 log_root: Url,
317 log_tail: Vec<ParsedLogPath>,
318 time_travel_version: impl Into<Option<Version>>,
319 operation_id: MetricId,
320 ) -> DeltaResult<Self> {
321 let time_travel_version = time_travel_version.into();
322 let checkpoint_hint = LastCheckpointHint::try_read(storage, &log_root)?;
323 let result = Self::for_snapshot_impl(
324 storage,
325 log_root,
326 log_tail,
327 checkpoint_hint,
328 time_travel_version,
329 );
330
331 match result {
332 Ok(log_segment) => {
333 tracing::Span::current().record(
334 "num_commit_files",
335 log_segment.listed.ascending_commit_files.len() as u64,
336 );
337 tracing::Span::current().record(
338 "num_checkpoint_files",
339 log_segment.listed.checkpoint_parts.len() as u64,
340 );
341 tracing::Span::current().record(
342 "num_compaction_files",
343 log_segment.listed.ascending_compaction_files.len() as u64,
344 );
345 Ok(log_segment)
346 }
347 Err(e) => Err(e),
348 }
349 }
350
351 pub(crate) fn for_snapshot_impl(
353 storage: &dyn StorageHandler,
354 log_root: Url,
355 log_tail: Vec<ParsedLogPath>,
356 checkpoint_hint: Option<LastCheckpointHint>,
357 time_travel_version: Option<Version>,
358 ) -> DeltaResult<Self> {
359 let last_checkpoint_summary =
360 checkpoint_hint
361 .as_ref()
362 .map(|hint| LastCheckpointHintSummary {
363 version: hint.version,
364 schema: hint.checkpoint_schema.clone(),
365 });
366
367 let end_version = time_travel_version;
371
372 let usable_hint = checkpoint_hint.filter(|cp| end_version.is_none_or(|v| cp.version <= v));
375
376 let listed_files = match (usable_hint, end_version) {
388 (Some(cp), end_version) => LogSegmentFiles::list_with_checkpoint_hint(
390 &cp,
391 storage,
392 &log_root,
393 log_tail,
394 end_version,
395 )?,
396 (None, Some(end)) => LogSegmentFiles::list_with_backward_checkpoint_scan(
398 storage, &log_root, log_tail, end,
399 )?,
400 (None, None) => LogSegmentFiles::list(storage, &log_root, log_tail, None, None)?,
402 };
403
404 LogSegment::try_new(
405 listed_files,
406 log_root,
407 time_travel_version,
408 last_checkpoint_summary,
409 )
410 }
411
412 #[internal_api]
418 pub(crate) fn for_table_changes(
419 storage: &dyn StorageHandler,
420 log_root: Url,
421 start_version: Version,
422 end_version: impl Into<Option<Version>>,
423 ) -> DeltaResult<Self> {
424 let end_version = end_version.into();
425 if let Some(end_version) = end_version {
426 if start_version > end_version {
427 return Err(Error::generic(
428 "Failed to build LogSegment: start_version cannot be greater than end_version",
429 ));
430 }
431 }
432
433 let listed_files =
435 LogSegmentFiles::list_commits(storage, &log_root, Some(start_version), end_version)?;
436 require!(
441 listed_files
442 .ascending_commit_files()
443 .first()
444 .is_some_and(|first_commit| first_commit.version == start_version),
445 Error::generic(format!(
446 "Expected the first commit to have version {start_version}, got {:?}",
447 listed_files
448 .ascending_commit_files()
449 .first()
450 .map(|c| c.version)
451 ))
452 );
453 LogSegment::try_new(listed_files, log_root, end_version, None)
454 }
455
456 #[allow(unused)]
457 pub(crate) fn for_timestamp_conversion(
464 storage: &dyn StorageHandler,
465 log_root: Url,
466 end_version: Version,
467 limit: Option<NonZero<usize>>,
468 ) -> DeltaResult<Self> {
469 let start_from = limit
471 .map(|limit| match NonZero::<Version>::try_from(limit) {
472 Ok(limit) => Ok(Version::saturating_sub(end_version, limit.get() - 1)),
473 _ => Err(Error::generic(format!(
474 "Invalid limit {limit} when building log segment in timestamp conversion",
475 ))),
476 })
477 .transpose()?;
478
479 let mut listed_commits =
482 LogSegmentFiles::list_commits(storage, &log_root, start_from, Some(end_version))?;
483
484 let commits = listed_commits.ascending_commit_files_mut();
486 if !commits.is_empty() {
487 let mut start_idx = commits.len() - 1;
488 while start_idx > 0 && commits[start_idx].version == 1 + commits[start_idx - 1].version
489 {
490 start_idx -= 1;
491 }
492 commits.drain(..start_idx);
493 }
494
495 LogSegment::try_new(listed_commits, log_root, Some(end_version), None)
496 }
497
498 #[allow(unused)]
501 pub(crate) fn new_with_commit_appended(
502 &self,
503 tail_commit_file: ParsedLogPath,
504 ) -> DeltaResult<Self> {
505 require!(
506 tail_commit_file.is_commit(),
507 Error::internal_error(format!(
508 "Cannot extend and create new LogSegment. Tail log file is not a commit file. \
509 Path: {}, Type: {:?}.",
510 tail_commit_file.location.location, tail_commit_file.file_type
511 ))
512 );
513 require!(
514 tail_commit_file.version == self.end_version.wrapping_add(1),
515 Error::internal_error(format!(
516 "Cannot extend and create new LogSegment. Tail commit file version ({}) does not \
517 equal LogSegment end_version ({}) + 1.",
518 tail_commit_file.version, self.end_version
519 ))
520 );
521
522 let mut new_log_segment = self.clone();
523
524 new_log_segment.end_version = tail_commit_file.version;
525 new_log_segment
526 .listed
527 .ascending_commit_files
528 .push(tail_commit_file.clone());
529 new_log_segment.listed.latest_commit_file = Some(tail_commit_file.clone());
530 new_log_segment.listed.max_published_version = match tail_commit_file.file_type {
531 LogPathFileType::Commit => Some(tail_commit_file.version),
532 _ => self.listed.max_published_version,
533 };
534
535 Ok(new_log_segment)
536 }
537
538 pub(crate) fn try_new_with_checkpoint(&self, checkpoint: ParsedLogPath) -> DeltaResult<Self> {
546 require!(
547 matches!(
548 checkpoint.file_type,
549 LogPathFileType::SinglePartCheckpoint | LogPathFileType::UuidCheckpoint
550 ),
551 Error::internal_error(format!(
552 "Cannot update LogSegment with checkpoint. Path is not a single-file \
553 checkpoint. Path: {}, Type: {:?}.",
554 checkpoint.location.location, checkpoint.file_type
555 ))
556 );
557 require!(
558 checkpoint.version == self.end_version,
559 Error::internal_error(format!(
560 "Cannot update LogSegment with checkpoint. Checkpoint version ({}) does not \
561 equal LogSegment end_version ({}).",
562 checkpoint.version, self.end_version
563 ))
564 );
565
566 let mut new_log_segment = self.clone();
567 new_log_segment.checkpoint_version = Some(checkpoint.version);
568 let checkpoint_version = checkpoint.version;
569 new_log_segment.listed.checkpoint_parts = vec![checkpoint];
570 new_log_segment.listed.ascending_commit_files.clear();
573 new_log_segment.listed.ascending_compaction_files.clear();
574 new_log_segment
578 .listed
579 .latest_crc_file
580 .take_if(|crc| crc.version < checkpoint_version);
581 new_log_segment.last_checkpoint_metadata = None;
587 Ok(new_log_segment)
588 }
589
590 pub(crate) fn try_new_with_crc_file(&self, crc_file: ParsedLogPath<Url>) -> DeltaResult<Self> {
593 require!(
594 crc_file.file_type == LogPathFileType::Crc,
595 Error::internal_error(format!(
596 "Cannot update LogSegment with CRC. Path is not a CRC file. \
597 Path: {}, Type: {:?}.",
598 crc_file.location, crc_file.file_type
599 ))
600 );
601 require!(
602 crc_file.version == self.end_version,
603 Error::internal_error(format!(
604 "Cannot update LogSegment with CRC. CRC version ({}) does not \
605 equal LogSegment end_version ({}).",
606 crc_file.version, self.end_version
607 ))
608 );
609 let crc_file = ParsedLogPath {
614 location: FileMeta {
615 location: crc_file.location,
616 last_modified: 0,
617 size: 0,
618 },
619 filename: crc_file.filename,
620 extension: crc_file.extension,
621 version: crc_file.version,
622 file_type: crc_file.file_type,
623 };
624 let mut new_log_segment = self.clone();
625 new_log_segment.listed.latest_crc_file = Some(crc_file);
626 Ok(new_log_segment)
627 }
628
629 pub(crate) fn new_as_published(&self) -> DeltaResult<Self> {
630 let mut new_log_segment = self.clone();
633 new_log_segment.listed.max_published_version = Some(self.end_version);
634 Ok(new_log_segment)
635 }
636
637 pub(crate) fn get_unpublished_catalog_commits(&self) -> DeltaResult<Vec<CatalogCommit>> {
638 self.listed
639 .ascending_commit_files
640 .iter()
641 .filter(|file| file.file_type == LogPathFileType::StagedCommit)
642 .filter(|file| {
643 self.listed
644 .max_published_version
645 .is_none_or(|v| file.version > v)
646 })
647 .map(|file| CatalogCommit::try_new(&self.log_root, file))
648 .collect()
649 }
650
651 #[internal_api]
678 pub(crate) fn read_actions_with_projected_checkpoint_actions(
679 &self,
680 engine: &dyn Engine,
681 commit_read_schema: SchemaRef,
682 checkpoint_read_schema: SchemaRef,
683 meta_predicate: Option<PredicateRef>,
684 stats_schema: Option<&StructType>,
685 partition_schema: Option<&StructType>,
686 ) -> DeltaResult<
687 ActionsWithCheckpointInfo<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
688 > {
689 let is_not_null_pred = schema_to_is_not_null_predicate(&checkpoint_read_schema);
693 let effective_predicate = match (is_not_null_pred, meta_predicate) {
694 (None, x) | (x, None) => x,
695 (Some(a), Some(b)) => Some(Arc::new(Predicate::and((*a).clone(), (*b).clone()))),
696 };
697
698 let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?;
701
702 let checkpoint_result = self.create_checkpoint_stream(
703 engine,
704 checkpoint_read_schema,
705 effective_predicate,
706 stats_schema,
707 partition_schema,
708 )?;
709
710 Ok(ActionsWithCheckpointInfo {
711 actions: commit_stream.chain(checkpoint_result.actions),
712 checkpoint_info: checkpoint_result.checkpoint_info,
713 })
714 }
715
716 #[internal_api]
720 pub(crate) fn read_actions(
721 &self,
722 engine: &dyn Engine,
723 action_schema: SchemaRef,
724 ) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
725 let result = self.read_actions_with_projected_checkpoint_actions(
726 engine,
727 action_schema.clone(),
728 action_schema,
729 None,
730 None,
731 None,
732 )?;
733 Ok(result.actions)
734 }
735
736 pub(crate) fn projected_checkpoint_read_info(
737 &self,
738 engine: &dyn Engine,
739 action_schema: SchemaRef,
740 stats_schema: Option<&StructType>,
741 partition_schema: Option<&StructType>,
742 ) -> DeltaResult<CheckpointReadInfo> {
743 self.plan_projected_checkpoint_read(engine, action_schema, stats_schema, partition_schema)
744 .map(|(checkpoint_info, _)| checkpoint_info)
745 }
746
747 pub(crate) fn projected_single_part_checkpoint_read_info(
748 &self,
749 engine: &dyn Engine,
750 action_schema: SchemaRef,
751 stats_schema: Option<&StructType>,
752 partition_schema: Option<&StructType>,
753 ) -> DeltaResult<CheckpointReadInfo> {
754 let file_actions_schema = match self.listed.checkpoint_parts.as_slice() {
755 [] => None,
756 [checkpoint] => match &checkpoint.file_type {
757 MultiPartCheckpoint { .. } => {
758 return Err(Error::internal_error(
759 "projected_single_part_checkpoint_read_info called with a multi-part checkpoint",
760 ));
761 }
762 UuidCheckpoint if checkpoint.extension.as_str() == "json" => None,
763 SinglePartCheckpoint | UuidCheckpoint
764 if checkpoint.extension.as_str() == "parquet" =>
765 {
766 Some(Self::read_checkpoint_schema(
767 engine,
768 checkpoint,
769 self.checkpoint_schema().as_ref(),
770 )?)
771 }
772 _ => None,
773 },
774 _ => {
775 return Err(Error::internal_error(
776 "projected_single_part_checkpoint_read_info called with multiple checkpoint parts",
777 ));
778 }
779 };
780
781 Self::build_projected_checkpoint_read_info(
782 action_schema,
783 file_actions_schema.as_deref(),
784 stats_schema,
785 partition_schema,
786 false,
787 )
788 }
789
790 pub(crate) fn projected_checkpoint_read_info_from_files(
791 engine: &dyn Engine,
792 action_schema: SchemaRef,
793 stats_schema: Option<&StructType>,
794 partition_schema: Option<&StructType>,
795 file_actions_files: &[FileMeta],
796 ) -> DeltaResult<CheckpointReadInfo> {
797 let file_actions_schema = match file_actions_files.first() {
798 Some(first) => Some(engine.parquet_handler().read_parquet_footer(first)?.schema),
799 None => None,
800 };
801
802 Self::build_projected_checkpoint_read_info(
803 action_schema,
804 file_actions_schema.as_deref(),
805 stats_schema,
806 partition_schema,
807 false,
808 )
809 }
810
811 pub(crate) fn find_commit_cover(&self) -> Vec<FileMeta> {
817 let all_files = itertools::Itertools::merge_by(
821 self.listed.ascending_commit_files.iter(),
822 self.listed.ascending_compaction_files.iter(),
823 |path_a, path_b| path_a.version <= path_b.version,
824 );
825
826 let mut last_pushed: Option<&ParsedLogPath> = None;
827
828 let mut selected_files = vec![];
829 for next in all_files {
830 match last_pushed {
831 Some(prev) if prev.version == next.version => {
833 let removed = selected_files.pop();
834 debug!("Selecting {next:?} rather than {removed:?}, it covers a wider range");
835 }
836 Some(&ParsedLogPath {
838 file_type: LogPathFileType::CompactedCommit { hi },
839 ..
840 }) if next.version <= hi => {
841 debug!("Skipping log file {next:?}, it's already covered.");
842 continue;
843 }
844 _ => {} }
846 debug!("Provisionally selecting {next:?}");
847 last_pushed = Some(next);
848 selected_files.push(next.location.clone());
849 }
850 selected_files.reverse();
851 selected_files
852 }
853
854 fn get_file_actions_schema_and_sidecars(
869 &self,
870 engine: &dyn Engine,
871 ) -> DeltaResult<(Option<SchemaRef>, Vec<FileMeta>)> {
872 let hint_schema = self.checkpoint_schema();
874
875 let Some(checkpoint) = self.listed.checkpoint_parts.first() else {
878 return Ok((None, vec![]));
879 };
880
881 match &checkpoint.file_type {
882 MultiPartCheckpoint { .. } => {
883 let schema =
885 Self::read_checkpoint_schema(engine, checkpoint, hint_schema.as_ref())?;
886 Ok((Some(schema), vec![]))
887 }
888 UuidCheckpoint if checkpoint.extension.as_str() == "json" => {
889 self.read_sidecar_schema_and_files(engine, checkpoint, None)
892 }
893 SinglePartCheckpoint | UuidCheckpoint if checkpoint.extension.as_str() == "parquet" => {
894 let checkpoint_schema =
897 Self::read_checkpoint_schema(engine, checkpoint, hint_schema.as_ref())?;
898 if checkpoint_schema.field(SIDECAR_NAME).is_some() {
899 self.read_sidecar_schema_and_files(engine, checkpoint, Some(&checkpoint_schema))
900 } else {
901 Ok((Some(checkpoint_schema), vec![]))
902 }
903 }
904 _ => Ok((None, vec![])),
905 }
906 }
907
908 fn read_checkpoint_schema(
911 engine: &dyn Engine,
912 checkpoint: &ParsedLogPath<FileMeta>,
913 hint_schema: Option<&SchemaRef>,
914 ) -> DeltaResult<SchemaRef> {
915 match hint_schema {
916 Some(schema) => Ok(schema.clone()),
917 None => Ok(engine
918 .parquet_handler()
919 .read_parquet_footer(&checkpoint.location)?
920 .schema),
921 }
922 }
923
924 fn read_sidecar_schema_and_files(
928 &self,
929 engine: &dyn Engine,
930 checkpoint: &ParsedLogPath<FileMeta>,
931 checkpoint_schema: Option<&SchemaRef>,
932 ) -> DeltaResult<(Option<SchemaRef>, Vec<FileMeta>)> {
933 let sidecar_files = self.extract_sidecar_refs(engine, checkpoint)?;
934 let file_actions_schema = match sidecar_files.first() {
935 Some(first) => Some(engine.parquet_handler().read_parquet_footer(first)?.schema),
936 None => checkpoint_schema.cloned(),
937 };
938 Ok((file_actions_schema, sidecar_files))
939 }
940
941 fn create_checkpoint_stream(
950 &self,
951 engine: &dyn Engine,
952 action_schema: SchemaRef,
953 meta_predicate: Option<PredicateRef>,
954 stats_schema: Option<&StructType>,
955 partition_schema: Option<&StructType>,
956 ) -> DeltaResult<
957 ActionsWithCheckpointInfo<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
958 > {
959 let (checkpoint_info, sidecar_files) = self.plan_projected_checkpoint_read(
960 engine,
961 action_schema,
962 stats_schema,
963 partition_schema,
964 )?;
965 let augmented_checkpoint_read_schema = checkpoint_info.checkpoint_read_schema.clone();
966
967 let checkpoint_file_meta: Vec<_> = self
968 .listed
969 .checkpoint_parts
970 .iter()
971 .map(|f| f.location.clone())
972 .collect();
973
974 let parquet_handler = engine.parquet_handler();
975
976 let actions = match self.listed.checkpoint_parts.first() {
981 Some(parsed_log_path) if parsed_log_path.extension == "json" => {
982 engine.json_handler().read_json_files(
983 &checkpoint_file_meta,
984 augmented_checkpoint_read_schema.clone(),
985 meta_predicate.clone(),
986 )?
987 }
988 Some(parsed_log_path) if parsed_log_path.extension == "parquet" => parquet_handler
989 .read_parquet_files(
990 &checkpoint_file_meta,
991 augmented_checkpoint_read_schema.clone(),
992 meta_predicate.clone(),
993 )?,
994 Some(parsed_log_path) => {
995 return Err(Error::generic(format!(
996 "Unsupported checkpoint file type: {}",
997 parsed_log_path.extension,
998 )));
999 }
1000 None => Box::new(std::iter::empty()),
1003 };
1004
1005 let sidecar_batches = if !sidecar_files.is_empty() {
1010 parquet_handler.read_parquet_files(
1011 &sidecar_files,
1012 augmented_checkpoint_read_schema.clone(),
1013 meta_predicate,
1014 )?
1015 } else {
1016 Box::new(std::iter::empty())
1017 };
1018
1019 let actions_iter = actions
1023 .map_ok(|batch| ActionsBatch::new(batch, false))
1024 .chain(sidecar_batches.map_ok(|batch| ActionsBatch::new(batch, false)));
1025
1026 let checkpoint_info = CheckpointReadInfo {
1027 has_stats_parsed: checkpoint_info.has_stats_parsed,
1028 has_partition_values_parsed: checkpoint_info.has_partition_values_parsed,
1029 checkpoint_read_schema: augmented_checkpoint_read_schema,
1030 };
1031 Ok(ActionsWithCheckpointInfo {
1032 actions: actions_iter,
1033 checkpoint_info,
1034 })
1035 }
1036
1037 fn plan_projected_checkpoint_read(
1038 &self,
1039 engine: &dyn Engine,
1040 action_schema: SchemaRef,
1041 stats_schema: Option<&StructType>,
1042 partition_schema: Option<&StructType>,
1043 ) -> DeltaResult<(CheckpointReadInfo, Vec<FileMeta>)> {
1044 let need_file_actions = schema_contains_file_actions(&action_schema);
1045
1046 let (file_actions_schema, sidecar_files) = if need_file_actions {
1047 self.get_file_actions_schema_and_sidecars(engine)?
1048 } else {
1049 (None, vec![])
1050 };
1051
1052 let needs_sidecar = need_file_actions && !sidecar_files.is_empty();
1053 let checkpoint_info = Self::build_projected_checkpoint_read_info(
1054 action_schema,
1055 file_actions_schema.as_deref(),
1056 stats_schema,
1057 partition_schema,
1058 needs_sidecar,
1059 )?;
1060
1061 Ok((checkpoint_info, sidecar_files))
1062 }
1063
1064 fn build_projected_checkpoint_read_info(
1065 action_schema: SchemaRef,
1066 file_actions_schema: Option<&StructType>,
1067 stats_schema: Option<&StructType>,
1068 partition_schema: Option<&StructType>,
1069 include_sidecar_field: bool,
1070 ) -> DeltaResult<CheckpointReadInfo> {
1071 let has_stats_parsed =
1072 stats_schema
1073 .zip(file_actions_schema)
1074 .is_some_and(|(stats, file_schema)| {
1075 Self::schema_has_compatible_stats_parsed(file_schema, stats)
1076 });
1077
1078 let has_partition_values_parsed = partition_schema
1079 .zip(file_actions_schema)
1080 .is_some_and(|(ps, fs)| Self::schema_has_compatible_partition_values_parsed(fs, ps));
1081
1082 let needs_add_augmentation = has_stats_parsed || has_partition_values_parsed;
1083 let checkpoint_read_schema = if needs_add_augmentation || include_sidecar_field {
1084 let mut new_fields: Vec<StructField> = if let (true, Some(add_field)) =
1085 (needs_add_augmentation, action_schema.field("add"))
1086 {
1087 let DataType::Struct(add_struct) = add_field.data_type() else {
1088 return Err(Error::internal_error(
1089 "add field in action schema must be a struct",
1090 ));
1091 };
1092 let mut add_fields: Vec<StructField> = add_struct.fields().cloned().collect();
1093
1094 if let (true, Some(ss)) = (has_stats_parsed, stats_schema) {
1095 add_fields.push(StructField::nullable(
1096 "stats_parsed",
1097 DataType::Struct(Box::new(ss.clone())),
1098 ));
1099 }
1100
1101 if let (true, Some(ps)) = (has_partition_values_parsed, partition_schema) {
1102 add_fields.push(StructField::nullable(
1103 "partitionValues_parsed",
1104 DataType::Struct(Box::new(ps.clone())),
1105 ));
1106 }
1107
1108 action_schema
1109 .fields()
1110 .map(|field| {
1111 if field.name() == "add" {
1112 StructField::new(
1113 add_field.name(),
1114 StructType::new_unchecked(add_fields.clone()),
1115 add_field.is_nullable(),
1116 )
1117 .with_metadata(add_field.metadata.clone())
1118 } else {
1119 field.clone()
1120 }
1121 })
1122 .collect()
1123 } else {
1124 action_schema.fields().cloned().collect()
1125 };
1126
1127 if include_sidecar_field {
1128 new_fields.push(StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()));
1129 }
1130
1131 Arc::new(StructType::new_unchecked(new_fields))
1132 } else {
1133 action_schema
1134 };
1135
1136 Ok(CheckpointReadInfo {
1137 has_stats_parsed,
1138 has_partition_values_parsed,
1139 checkpoint_read_schema,
1140 })
1141 }
1142
1143 fn extract_sidecar_refs(
1145 &self,
1146 engine: &dyn Engine,
1147 checkpoint: &ParsedLogPath,
1148 ) -> DeltaResult<Vec<FileMeta>> {
1149 let batches = match checkpoint.extension.as_str() {
1151 "json" => engine.json_handler().read_json_files(
1152 std::slice::from_ref(&checkpoint.location),
1153 Self::sidecar_read_schema(),
1154 None,
1155 )?,
1156 "parquet" => engine.parquet_handler().read_parquet_files(
1157 std::slice::from_ref(&checkpoint.location),
1158 Self::sidecar_read_schema(),
1159 None,
1160 )?,
1161 _ => return Ok(vec![]),
1162 };
1163
1164 let mut visitor = SidecarVisitor::default();
1166 for batch_result in batches {
1167 let batch = batch_result?;
1168 visitor.visit_rows_of(batch.as_ref())?;
1169 }
1170
1171 visitor
1173 .sidecars
1174 .iter()
1175 .map(|sidecar| sidecar.to_filemeta(&self.log_root))
1176 .try_collect()
1177 }
1178
1179 pub(crate) fn segment_after_crc(&self, start_v_exclusive: Version) -> Self {
1185 let (commits, compactions) =
1186 self.filtered_commits_and_compactions(Some(start_v_exclusive), self.end_version);
1187 LogSegment {
1188 end_version: self.end_version,
1189 checkpoint_version: None,
1190 log_root: self.log_root.clone(),
1191 last_checkpoint_metadata: None,
1192 listed: LogSegmentFiles {
1193 ascending_commit_files: commits,
1194 ascending_compaction_files: compactions,
1195 checkpoint_parts: vec![],
1196 latest_crc_file: None,
1197 latest_commit_file: None,
1198 max_published_version: None,
1199 },
1200 }
1201 }
1202
1203 pub(crate) fn segment_through_crc(&self, end_v_inclusive: Version) -> Self {
1210 let (commits, compactions) =
1211 self.filtered_commits_and_compactions(self.checkpoint_version, end_v_inclusive);
1212 LogSegment {
1213 end_version: self.end_version,
1214 checkpoint_version: self.checkpoint_version,
1215 log_root: self.log_root.clone(),
1216 last_checkpoint_metadata: self.last_checkpoint_metadata.clone(),
1217 listed: LogSegmentFiles {
1218 ascending_commit_files: commits,
1219 ascending_compaction_files: compactions,
1220 checkpoint_parts: self.listed.checkpoint_parts.clone(),
1221 latest_crc_file: None,
1222 latest_commit_file: None,
1223 max_published_version: None,
1224 },
1225 }
1226 }
1227
1228 fn filtered_commits_and_compactions(
1231 &self,
1232 lo_exclusive: Option<Version>,
1233 hi_inclusive: Version,
1234 ) -> (Vec<ParsedLogPath>, Vec<ParsedLogPath>) {
1235 let above_lo = |v: Version| lo_exclusive.is_none_or(|lo| lo < v);
1236 let commits = self
1237 .listed
1238 .ascending_commit_files
1239 .iter()
1240 .filter(|c| above_lo(c.version) && c.version <= hi_inclusive)
1241 .cloned()
1242 .collect();
1243 let compactions = self
1244 .listed
1245 .ascending_compaction_files
1246 .iter()
1247 .filter(|c| {
1248 matches!(
1249 c.file_type,
1250 LogPathFileType::CompactedCommit { hi }
1251 if above_lo(c.version) && hi <= hi_inclusive
1252 )
1253 })
1254 .cloned()
1255 .collect();
1256 (commits, compactions)
1257 }
1258
1259 pub(crate) fn commits_since_checkpoint(&self) -> u64 {
1261 let checkpoint_version = self.checkpoint_version.unwrap_or(0);
1264 debug_assert!(checkpoint_version <= self.end_version);
1265 self.end_version - checkpoint_version
1266 }
1267
1268 pub(crate) fn commits_since_log_compaction_or_checkpoint(&self) -> u64 {
1270 let max_compaction_end = self
1275 .listed
1276 .ascending_compaction_files
1277 .iter()
1278 .fold(0, |cur, f| {
1279 if let &ParsedLogPath {
1280 file_type: LogPathFileType::CompactedCommit { hi },
1281 ..
1282 } = f
1283 {
1284 Version::max(cur, hi)
1285 } else {
1286 warn!("Found invalid ParsedLogPath in ascending_compaction_files: {f:?}");
1287 cur
1288 }
1289 });
1290 let to_sub = Version::max(self.checkpoint_version.unwrap_or(0), max_compaction_end);
1292 debug_assert!(to_sub <= self.end_version);
1293 self.end_version - to_sub
1294 }
1295
1296 pub(crate) fn validate_published(&self) -> DeltaResult<()> {
1297 require!(
1298 self.listed
1299 .max_published_version
1300 .is_some_and(|v| v == self.end_version),
1301 Error::generic("Log segment is not published")
1302 );
1303 Ok(())
1304 }
1305
1306 fn sidecar_read_schema() -> SchemaRef {
1308 static SIDECAR_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
1309 Arc::new(StructType::new_unchecked([StructField::nullable(
1310 SIDECAR_NAME,
1311 Sidecar::to_schema(),
1312 )]))
1313 });
1314 SIDECAR_SCHEMA.clone()
1315 }
1316
1317 pub(crate) fn schema_has_compatible_stats_parsed(
1331 checkpoint_schema: &StructType,
1332 stats_schema: &StructType,
1333 ) -> bool {
1334 let Some(stats_parsed) = checkpoint_schema
1336 .field("add")
1337 .and_then(|f| match f.data_type() {
1338 DataType::Struct(s) => s.field("stats_parsed"),
1339 _ => None,
1340 })
1341 else {
1342 debug!("stats_parsed not compatible: checkpoint schema does not contain add.stats_parsed field");
1343 return false;
1344 };
1345
1346 let DataType::Struct(stats_struct) = stats_parsed.data_type() else {
1347 debug!(
1348 "stats_parsed not compatible: add.stats_parsed field is not a Struct, got {:?}",
1349 stats_parsed.data_type()
1350 );
1351 return false;
1352 };
1353
1354 for field_name in ["minValues", "maxValues"] {
1358 let Some(checkpoint_values_field) = stats_struct.field(field_name) else {
1359 continue;
1361 };
1362
1363 let DataType::Struct(checkpoint_values) = checkpoint_values_field.data_type() else {
1366 debug!(
1367 "stats_parsed not compatible: stats_parsed.{} is not a Struct, got {:?}",
1368 field_name,
1369 checkpoint_values_field.data_type()
1370 );
1371 return false;
1372 };
1373
1374 let Some(stats_values_field) = stats_schema.field(field_name) else {
1376 continue;
1378 };
1379 let DataType::Struct(stats_values) = stats_values_field.data_type() else {
1380 continue;
1382 };
1383
1384 if !Self::structs_have_compatible_types(checkpoint_values, stats_values, field_name) {
1388 return false;
1389 }
1390 }
1391
1392 debug!("Checkpoint schema has compatible stats_parsed for data skipping");
1393 true
1394 }
1395
1396 fn structs_have_compatible_types(
1406 available: &StructType,
1407 needed: &StructType,
1408 context: &str,
1409 ) -> bool {
1410 for needed_field in needed.fields() {
1411 let Some(available_field) = available.field(needed_field.name()) else {
1412 continue;
1414 };
1415
1416 match (available_field.data_type(), needed_field.data_type()) {
1417 (DataType::Struct(avail_struct), DataType::Struct(need_struct)) => {
1419 let nested_context = format!("{}.{}", context, needed_field.name());
1420 if !Self::structs_have_compatible_types(
1421 avail_struct,
1422 need_struct,
1423 &nested_context,
1424 ) {
1425 return false;
1426 }
1427 }
1428 (avail_type, need_type) => {
1431 let compatible = match (avail_type, need_type) {
1432 (DataType::Primitive(a), DataType::Primitive(b)) => {
1433 a.is_stats_type_compatible_with(b)
1434 }
1435 (a, b) => a.can_read_as(b).is_ok(),
1436 };
1437 if !compatible {
1438 debug!(
1439 "stats_parsed not compatible: incompatible type for '{}' in {}: \
1440 checkpoint has {:?}, stats schema needs {:?}",
1441 needed_field.name(),
1442 context,
1443 avail_type,
1444 need_type
1445 );
1446 return false;
1447 }
1448 }
1449 }
1450 }
1451 true
1452 }
1453
1454 pub(crate) fn schema_has_compatible_partition_values_parsed(
1464 checkpoint_schema: &StructType,
1465 partition_schema: &StructType,
1466 ) -> bool {
1467 let Some(partition_parsed) =
1468 checkpoint_schema
1469 .field("add")
1470 .and_then(|f| match f.data_type() {
1471 DataType::Struct(s) => s.field("partitionValues_parsed"),
1472 _ => None,
1473 })
1474 else {
1475 debug!("partitionValues_parsed not compatible: checkpoint schema does not contain add.partitionValues_parsed field");
1476 return false;
1477 };
1478
1479 let DataType::Struct(partition_struct) = partition_parsed.data_type() else {
1480 warn!(
1481 "partitionValues_parsed not compatible: add.partitionValues_parsed is not a Struct, got {:?}",
1482 partition_parsed.data_type()
1483 );
1484 return false;
1485 };
1486
1487 if !Self::structs_have_compatible_types(
1489 partition_struct,
1490 partition_schema,
1491 "partitionValues_parsed",
1492 ) {
1493 return false;
1494 }
1495
1496 debug!("Checkpoint schema has compatible partitionValues_parsed for partition pruning");
1497 true
1498 }
1499}
1500
1501fn validate_compaction_files(compactions: &[ParsedLogPath]) -> DeltaResult<()> {
1502 for (i, f) in compactions.iter().enumerate() {
1503 let LogPathFileType::CompactedCommit { hi } = f.file_type else {
1504 return Err(Error::generic(
1505 "ascending_compaction_files contains non-compaction file",
1506 ));
1507 };
1508 if f.version > hi {
1509 return Err(Error::generic(format!(
1510 "compaction file has start version {} > end version {}",
1511 f.version, hi
1512 )));
1513 }
1514 if let Some(next) = compactions.get(i + 1) {
1515 if let LogPathFileType::CompactedCommit { hi: next_hi } = next.file_type {
1518 if !(f.version < next.version || (f.version == next.version && hi <= next_hi)) {
1519 return Err(Error::generic(format!(
1520 "ascending_compaction_files is not sorted: {f:?} -> {next:?}"
1521 )));
1522 }
1523 }
1524 }
1525 }
1526 Ok(())
1527}
1528
1529fn validate_checkpoint_parts(parts: &[ParsedLogPath]) -> DeltaResult<()> {
1530 if parts.is_empty() {
1531 return Ok(());
1532 }
1533 let n = parts.len();
1534 let first_version = parts[0].version;
1535 for p in parts {
1536 if !p.is_checkpoint() {
1537 return Err(Error::generic(
1538 "checkpoint_parts contains non-checkpoint file",
1539 ));
1540 }
1541 if p.version != first_version {
1542 return Err(Error::generic(
1543 "multi-part checkpoint parts have different versions",
1544 ));
1545 }
1546 match p.file_type {
1547 LogPathFileType::MultiPartCheckpoint { num_parts, .. } if num_parts as usize == n => {}
1548 LogPathFileType::MultiPartCheckpoint { num_parts, .. } => {
1549 return Err(Error::generic(format!(
1550 "multi-part checkpoint part count mismatch: slice has {n} parts but num_parts field says {num_parts}"
1551 )));
1552 }
1553 _ if n > 1 => {
1554 return Err(Error::generic(format!(
1555 "multi-part checkpoint part count mismatch: expected {n} multi-part checkpoint files but got a non-multi-part checkpoint"
1556 )));
1557 }
1558 _ => {}
1559 }
1560 }
1561 Ok(())
1562}
1563
1564fn validate_commit_file_types(commits: &[ParsedLogPath]) -> DeltaResult<()> {
1565 for f in commits {
1566 if !f.is_commit() {
1567 return Err(Error::generic(
1568 "ascending_commit_files contains non-commit file",
1569 ));
1570 }
1571 }
1572 Ok(())
1573}
1574
1575fn validate_commit_files_contiguous(commits: &[ParsedLogPath]) -> DeltaResult<()> {
1576 for pair in commits.windows(2) {
1577 if pair[0].version + 1 != pair[1].version {
1578 return Err(Error::generic(format!(
1579 "Expected contiguous commit files, but found gap: {:?} -> {:?}",
1580 pair[0], pair[1]
1581 )));
1582 }
1583 }
1584 Ok(())
1585}
1586
1587fn validate_checkpoint_commit_gap(
1593 checkpoint_version: Option<Version>,
1594 commits: &[ParsedLogPath],
1595) -> DeltaResult<()> {
1596 if let (Some(checkpoint_version), Some(first_commit)) = (checkpoint_version, commits.first()) {
1597 require!(
1598 checkpoint_version + 1 == first_commit.version,
1599 Error::InvalidCheckpoint(format!(
1600 "Gap between checkpoint version {checkpoint_version} and next commit {}",
1601 first_commit.version
1602 ))
1603 );
1604 }
1605 Ok(())
1606}
1607
1608fn validate_end_version(
1615 commits: &[ParsedLogPath],
1616 checkpoint_parts: &[ParsedLogPath],
1617 end_version: Option<Version>,
1618) -> DeltaResult<Version> {
1619 let effective_version = commits
1620 .last()
1621 .or(checkpoint_parts.first())
1622 .ok_or(Error::generic("No files in log segment"))?
1623 .version;
1624 if let Some(end_version) = end_version {
1625 require!(
1626 effective_version == end_version,
1627 Error::generic(format!(
1628 "LogSegment end version {effective_version} not the same as the specified end version {end_version}"
1629 ))
1630 );
1631 }
1632 Ok(effective_version)
1633}
1634
1635fn validate_latest_commit_file(
1640 listed: &LogSegmentFiles,
1641 effective_version: Version,
1642) -> DeltaResult<()> {
1643 require!(
1644 listed.ascending_commit_files.is_empty() || listed.latest_commit_file.is_some(),
1645 Error::internal_error(
1646 "latest_commit_file must be Some when ascending_commit_files is non-empty"
1647 )
1648 );
1649 if let Some(commit) = &listed.latest_commit_file {
1650 require!(
1651 commit.version == effective_version,
1652 Error::internal_error(format!(
1653 "latest_commit_file version {} does not match end_version {effective_version}",
1654 commit.version,
1655 ))
1656 );
1657 }
1658 Ok(())
1659}