delta_kernel/parallel/
parallel_scan_metadata.rs1use std::sync::Arc;
2use std::time::Instant;
3
4use delta_kernel_derive::internal_api;
5use tracing::{info_span, Span};
6
7use crate::log_replay::{ActionsBatch, ParallelLogReplayProcessor};
8use crate::metrics::reporter::emit_scan_metadata_completed;
9use crate::metrics::{MetricId, ScanType};
10use crate::parallel::parallel_phase::ParallelPhase;
11use crate::parallel::sequential_phase::{AfterSequential, SequentialPhase};
12use crate::scan::log_replay::{ScanLogReplayProcessor, SerializableScanState};
13use crate::scan::ScanMetadata;
14use crate::schema::SchemaRef;
15use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};
16
17pub enum AfterSequentialScanMetadata {
23 Done,
24 Parallel {
25 state: Box<ParallelState>,
26 files: Vec<FileMeta>,
27 },
28}
29
30pub struct SequentialScanMetadata {
36 pub(crate) sequential: SequentialPhase<ScanLogReplayProcessor>,
37 operation_id: MetricId,
38 start: Instant,
39 span: Span,
40}
41
42impl SequentialScanMetadata {
43 pub(crate) fn new(sequential: SequentialPhase<ScanLogReplayProcessor>) -> Self {
44 let operation_id = MetricId::new();
45 Self {
46 sequential,
47 operation_id,
48 start: Instant::now(),
49 span: info_span!("sequential_scan_metadata"),
52 }
53 }
54
55 pub fn finish(self) -> DeltaResult<AfterSequentialScanMetadata> {
56 let _guard = self.span.enter();
57 match self.sequential.finish()? {
58 AfterSequential::Done(processor) => {
59 let event = processor.get_metrics().to_event(
60 self.operation_id,
61 ScanType::SequentialPhase,
62 self.start.elapsed(),
63 );
64 processor
65 .get_metrics()
66 .log("Sequential scan metadata completed");
67 emit_scan_metadata_completed(&event);
68 Ok(AfterSequentialScanMetadata::Done)
69 }
70 AfterSequential::Parallel { processor, files } => {
71 let event = processor.get_metrics().to_event(
72 self.operation_id,
73 ScanType::SequentialPhase,
74 self.start.elapsed(),
75 );
76 processor
77 .get_metrics()
78 .log("Sequential scan metadata completed");
79 emit_scan_metadata_completed(&event);
80 processor.get_metrics().reset_counters();
81
82 Ok(AfterSequentialScanMetadata::Parallel {
83 state: Box::new(ParallelState {
84 inner: processor,
85 operation_id: self.operation_id,
86 parallel_start: Instant::now(),
87 }),
88 files,
89 })
90 }
91 }
92 }
93}
94
95impl Iterator for SequentialScanMetadata {
96 type Item = DeltaResult<ScanMetadata>;
97
98 fn next(&mut self) -> Option<Self::Item> {
99 let _guard = self.span.enter();
100 self.sequential.next()
101 }
102}
103
104pub struct ParallelState {
109 inner: ScanLogReplayProcessor,
110 operation_id: MetricId,
112 parallel_start: Instant,
114}
115
116impl ParallelLogReplayProcessor for Arc<ParallelState> {
117 type Output = ScanMetadata;
118
119 fn process_actions_batch(&self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output> {
120 self.inner.process_actions_batch(actions_batch)
121 }
122}
123
124impl ParallelState {
125 pub fn log_metrics(&self) {
146 let event = self.inner.get_metrics().to_event(
147 self.operation_id,
148 ScanType::ParallelPhase,
149 self.parallel_start.elapsed(),
150 );
151 self.inner
152 .get_metrics()
153 .log("Parallel scan metadata completed");
154 emit_scan_metadata_completed(&event);
155 }
156
157 pub fn file_read_schema(&self) -> SchemaRef {
162 self.inner.checkpoint_info().checkpoint_read_schema.clone()
163 }
164
165 #[internal_api]
173 #[allow(unused)]
174 pub(crate) fn into_serializable_state(self) -> DeltaResult<SerializableScanState> {
175 self.inner.into_serializable_state()
176 }
177
178 #[internal_api]
184 #[allow(unused)]
185 pub(crate) fn from_serializable_state(
186 engine: &dyn Engine,
187 state: SerializableScanState,
188 ) -> DeltaResult<Self> {
189 let inner = ScanLogReplayProcessor::from_serializable_state(engine, state)?;
190 Ok(Self {
191 inner,
192 operation_id: MetricId::new(),
193 parallel_start: Instant::now(),
194 })
195 }
196
197 #[allow(unused)]
206 pub fn into_bytes(self) -> DeltaResult<Vec<u8>> {
207 let state = self.into_serializable_state()?;
208 serde_json::to_vec(&state)
209 .map_err(|e| Error::generic(format!("Failed to serialize ParallelState to bytes: {e}")))
210 }
211
212 #[allow(unused)]
221 pub fn from_bytes(engine: &dyn Engine, bytes: &[u8]) -> DeltaResult<Self> {
222 let state: SerializableScanState =
223 serde_json::from_slice(bytes).map_err(Error::MalformedJson)?;
224 Self::from_serializable_state(engine, state)
225 }
226}
227
228pub struct ParallelScanMetadata {
229 pub(crate) processor: ParallelPhase<Arc<ParallelState>>,
230 span: Span,
231}
232
233impl ParallelScanMetadata {
234 pub fn try_new(
235 engine: Arc<dyn Engine>,
236 state: Arc<ParallelState>,
237 leaf_files: Vec<FileMeta>,
238 ) -> DeltaResult<Self> {
239 let read_schema = state.file_read_schema();
240 Ok(Self {
241 processor: ParallelPhase::try_new(engine, state, leaf_files, read_schema)?,
242 span: info_span!("parallel_scan_metadata"),
244 })
245 }
246
247 pub fn new_from_iter(
248 state: Arc<ParallelState>,
249 iter: impl IntoIterator<Item = DeltaResult<Box<dyn EngineData>>> + 'static,
250 ) -> Self {
251 Self {
252 processor: ParallelPhase::new_from_iter(state.clone(), iter),
253 span: info_span!("parallel_scan_metadata"),
255 }
256 }
257}
258
259impl Iterator for ParallelScanMetadata {
260 type Item = DeltaResult<ScanMetadata>;
261
262 fn next(&mut self) -> Option<Self::Item> {
263 let _guard = self.span.enter();
264 self.processor.next()
265 }
266}