1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
//! Protocol and Metadata replay logic for [`LogSegment`].
//!
//! This module contains the methods that perform a lightweight log replay to extract the latest
//! Protocol and Metadata actions from a [`LogSegment`].
use std::sync::Arc;
use tracing::{info, instrument};
use super::LogSegment;
use crate::actions::{get_commit_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::crc::Crc;
use crate::log_replay::ActionsBatch;
use crate::metrics::events::PROTOCOL_METADATA_LOADED_SPAN;
use crate::metrics::MetricId;
use crate::{DeltaResult, Engine, Error};
impl LogSegment {
/// Read the latest Protocol and Metadata from this log segment, using CRC when available.
/// Returns an error if either is missing.
///
/// This is the checked variant of [`Self::read_protocol_metadata_opt`], used for fresh
/// snapshot creation where both Protocol and Metadata must exist.
#[instrument(name = PROTOCOL_METADATA_LOADED_SPAN, fields(report, operation_id = %operation_id), skip(engine))]
pub(crate) fn read_protocol_metadata(
&self,
engine: &dyn Engine,
crc: Option<&Arc<Crc>>,
operation_id: MetricId,
) -> DeltaResult<(Metadata, Protocol)> {
match self.read_protocol_metadata_opt(engine, crc)? {
(Some(m), Some(p)) => Ok((m, p)),
(None, Some(_)) => Err(Error::MissingMetadata),
(Some(_), None) => Err(Error::MissingProtocol),
(None, None) => Err(Error::MissingMetadataAndProtocol),
}
}
/// Read the latest Protocol and Metadata from this log segment, using CRC when available.
/// Returns `None` for either if not found.
///
/// This is the unchecked variant of [`Self::read_protocol_metadata`], used for incremental
/// snapshot updates where the caller can fall back to an existing snapshot's Protocol and
/// Metadata.
///
/// The `crc` parameter is the CRC eagerly resolved by the caller; it is used to
/// short-circuit or seed the replay.
#[instrument(name = "log_seg.load_p_m", skip_all, err)]
pub(crate) fn read_protocol_metadata_opt(
&self,
engine: &dyn Engine,
crc: Option<&Arc<Crc>>,
) -> DeltaResult<(Option<Metadata>, Option<Protocol>)> {
// Case 1: If CRC at target version, use it directly and exit early.
if let Some(crc) = crc.filter(|c| c.version == self.end_version) {
info!("P&M from CRC at target version {}", self.end_version);
return Ok((Some(crc.metadata.clone()), Some(crc.protocol.clone())));
}
// We didn't return above, so we need to do log replay to find P&M.
//
// Case 2: CRC exists at an earlier version => Prune the log segment to only replay
// commits *after* the CRC version.
// (a) If we find new P&M in the pruned replay, return it.
// (b) If we don't find new P&M, fall back to the CRC.
//
// Case 3: No CRC exists => Full P&M log replay.
if let Some(crc) = crc.filter(|c| c.version < self.end_version) {
// Case 2(a): Replay only commits after CRC version
info!(
"Pruning log segment to commits after CRC version {}",
crc.version
);
let pruned = self.segment_after_crc(crc.version);
let (metadata_opt, protocol_opt) = pruned.replay_for_pm(engine, None, None)?;
if metadata_opt.is_some() && protocol_opt.is_some() {
info!("Found P&M from pruned log replay");
return Ok((metadata_opt, protocol_opt));
}
// Case 2(b): P&M incomplete from pruned replay, use the CRC.
// Use `or_else` so any newer P or M found in the pruned replay takes priority
// over the (older) CRC values.
info!("P&M fallback to CRC (no P&M changes after CRC version)");
return Ok((
metadata_opt.or_else(|| Some(crc.metadata.clone())),
protocol_opt.or_else(|| Some(crc.protocol.clone())),
));
}
// Case 3: Full P&M log replay.
self.replay_for_pm(engine, None, None)
}
/// Replays the log segment for Protocol and Metadata, merging with any already-found values.
/// Stops early once both are found.
fn replay_for_pm(
&self,
engine: &dyn Engine,
mut metadata_opt: Option<Metadata>,
mut protocol_opt: Option<Protocol>,
) -> DeltaResult<(Option<Metadata>, Option<Protocol>)> {
for actions_batch in self.read_pm_batches(engine)? {
let actions = actions_batch?.actions;
if metadata_opt.is_none() {
metadata_opt = Metadata::try_new_from_data(actions.as_ref())?;
}
if protocol_opt.is_none() {
protocol_opt = Protocol::try_new_from_data(actions.as_ref())?;
}
if metadata_opt.is_some() && protocol_opt.is_some() {
break;
}
}
Ok((metadata_opt, protocol_opt))
}
// Replay the commit log, projecting rows to only contain Protocol and Metadata action columns.
fn read_pm_batches(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
let schema = get_commit_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
self.read_actions(engine, schema)
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use itertools::Itertools;
use test_log::test;
use crate::engine::sync::SyncEngine;
use crate::Snapshot;
// NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies
// that the parquet reader properly infers nullcount = rowcount for missing columns. The two
// checkpoint part files that contain transaction app ids have truncated schemas that would
// otherwise fail skipping due to their missing nullcount stat:
//
// Row group 0: count: 1 total(compressed): 111 B total(uncompressed):107 B
// --------------------------------------------------------------------------------
// type nulls min / max
// txn.appId BINARY 0 "3ae45b72-24e1-865a-a211-3..." / "3ae45b72-24e1-865a-a211-3..."
// txn.version INT64 0 "4390" / "4390"
#[test]
fn test_replay_for_metadata() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let data: Vec<_> = snapshot
.log_segment()
.read_pm_batches(&engine)
.unwrap()
.try_collect()
.unwrap();
// The checkpoint has five parts, each containing one action:
// 1. txn (physically missing P&M columns)
// 2. metaData
// 3. protocol
// 4. add
// 5. txn (physically missing P&M columns)
//
// The parquet reader should skip parts 1, 3, and 5. Note that the actual `read_metadata`
// always skips parts 4 and 5 because it terminates the iteration after finding both P&M.
//
// NOTE: Each checkpoint part is a single-row file -- guaranteed to produce one row group.
//
// WARNING: https://github.com/delta-io/delta-kernel-rs/issues/434 -- We currently
// read parts 1 and 5 (4 in all instead of 2) because row group skipping is disabled for
// missing columns, but can still skip part 3 because has valid nullcount stats for P&M.
assert_eq!(data.len(), 4);
}
}