Skip to main content

delta_kernel/parallel/
parallel_scan_metadata.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use delta_kernel_derive::internal_api;
5use tracing::{info_span, Span};
6
7use crate::log_replay::{ActionsBatch, ParallelLogReplayProcessor};
8use crate::metrics::reporter::emit_scan_metadata_completed;
9use crate::metrics::{MetricId, ScanType};
10use crate::parallel::parallel_phase::ParallelPhase;
11use crate::parallel::sequential_phase::{AfterSequential, SequentialPhase};
12use crate::scan::log_replay::{ScanLogReplayProcessor, SerializableScanState};
13use crate::scan::ScanMetadata;
14use crate::schema::SchemaRef;
15use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};
16
17/// Result of sequential scan metadata processing.
18///
19/// This enum indicates whether distributed processing is needed:
20/// - `Done`: All processing completed sequentially - no distributed phase needed.
21/// - `Parallel`: Contains state and files for parallel processing.
22pub enum AfterSequentialScanMetadata {
23    Done,
24    Parallel {
25        state: Box<ParallelState>,
26        files: Vec<FileMeta>,
27    },
28}
29
30/// Sequential scan metadata processing.
31///
32/// This phase processes commits and single-part checkpoint manifests sequentially.
33/// After exhaustion, call `finish()` to get the result which indicates whether
34/// a distributed phase is needed.
35pub struct SequentialScanMetadata {
36    pub(crate) sequential: SequentialPhase<ScanLogReplayProcessor>,
37    operation_id: MetricId,
38    start: Instant,
39    span: Span,
40}
41
42impl SequentialScanMetadata {
43    pub(crate) fn new(sequential: SequentialPhase<ScanLogReplayProcessor>) -> Self {
44        let operation_id = MetricId::new();
45        Self {
46            sequential,
47            operation_id,
48            start: Instant::now(),
49            // TODO: Associate a unique scan ID with this span to correlate sequential and parallel
50            // phases
51            span: info_span!("sequential_scan_metadata"),
52        }
53    }
54
55    pub fn finish(self) -> DeltaResult<AfterSequentialScanMetadata> {
56        let _guard = self.span.enter();
57        match self.sequential.finish()? {
58            AfterSequential::Done(processor) => {
59                let event = processor.get_metrics().to_event(
60                    self.operation_id,
61                    ScanType::SequentialPhase,
62                    self.start.elapsed(),
63                );
64                processor
65                    .get_metrics()
66                    .log("Sequential scan metadata completed");
67                emit_scan_metadata_completed(&event);
68                Ok(AfterSequentialScanMetadata::Done)
69            }
70            AfterSequential::Parallel { processor, files } => {
71                let event = processor.get_metrics().to_event(
72                    self.operation_id,
73                    ScanType::SequentialPhase,
74                    self.start.elapsed(),
75                );
76                processor
77                    .get_metrics()
78                    .log("Sequential scan metadata completed");
79                emit_scan_metadata_completed(&event);
80                processor.get_metrics().reset_counters();
81
82                Ok(AfterSequentialScanMetadata::Parallel {
83                    state: Box::new(ParallelState {
84                        inner: processor,
85                        operation_id: self.operation_id,
86                        parallel_start: Instant::now(),
87                    }),
88                    files,
89                })
90            }
91        }
92    }
93}
94
95impl Iterator for SequentialScanMetadata {
96    type Item = DeltaResult<ScanMetadata>;
97
98    fn next(&mut self) -> Option<Self::Item> {
99        let _guard = self.span.enter();
100        self.sequential.next()
101    }
102}
103
104/// State for parallel scan metadata processing.
105///
106/// This state can be serialized and distributed to remote workers, or wrapped
107/// in Arc and shared across threads for local parallel processing.
108pub struct ParallelState {
109    inner: ScanLogReplayProcessor,
110    /// Operation ID inherited from the sequential phase for event correlation.
111    operation_id: MetricId,
112    /// Start time for the parallel phase, set when this state is created.
113    parallel_start: Instant,
114}
115
116impl ParallelLogReplayProcessor for Arc<ParallelState> {
117    type Output = ScanMetadata;
118
119    fn process_actions_batch(&self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output> {
120        self.inner.process_actions_batch(actions_batch)
121    }
122}
123
124impl ParallelState {
125    /// Log the accumulated metrics from parallel processing.
126    ///
127    /// Call this after all parallel workers complete. The metrics will be logged
128    /// in the current tracing span context.
129    ///
130    /// # Example
131    ///
132    /// ```no_run
133    /// # use std::sync::Arc;
134    /// # use delta_kernel::scan::ParallelState;
135    /// # use tracing::instrument;
136    /// #[instrument(skip_all, name = "parallel_scan")]
137    /// async fn process(state: Arc<ParallelState>) {
138    ///     // ... spawn workers that share Arc<ParallelState> ...
139    ///     // ... wait for workers to complete ...
140    ///
141    ///     // Log accumulated metrics
142    ///     state.log_metrics();
143    /// }
144    /// ```
145    pub fn log_metrics(&self) {
146        let event = self.inner.get_metrics().to_event(
147            self.operation_id,
148            ScanType::ParallelPhase,
149            self.parallel_start.elapsed(),
150        );
151        self.inner
152            .get_metrics()
153            .log("Parallel scan metadata completed");
154        emit_scan_metadata_completed(&event);
155    }
156
157    /// Get the schema to use for reading checkpoint files.
158    ///
159    /// Returns the checkpoint read schema which may have stats excluded
160    /// if skip_stats was enabled when the scan was created.
161    pub fn file_read_schema(&self) -> SchemaRef {
162        self.inner.checkpoint_info().checkpoint_read_schema.clone()
163    }
164
165    /// Serialize the processor state for distributed processing.
166    ///
167    /// Returns a `SerializableScanState` containing all information needed to
168    /// reconstruct this state on remote compute nodes.
169    ///
170    /// # Errors
171    /// Returns an error if the state cannot be serialized (e.g., contains opaque predicates).
172    #[internal_api]
173    #[allow(unused)]
174    pub(crate) fn into_serializable_state(self) -> DeltaResult<SerializableScanState> {
175        self.inner.into_serializable_state()
176    }
177
178    /// Reconstruct a ParallelState from serialized state.
179    ///
180    /// # Parameters
181    /// - `engine`: Engine for creating evaluators and filters
182    /// - `state`: The serialized state from a previous `into_serializable_state()` call
183    #[internal_api]
184    #[allow(unused)]
185    pub(crate) fn from_serializable_state(
186        engine: &dyn Engine,
187        state: SerializableScanState,
188    ) -> DeltaResult<Self> {
189        let inner = ScanLogReplayProcessor::from_serializable_state(engine, state)?;
190        Ok(Self {
191            inner,
192            operation_id: MetricId::new(),
193            parallel_start: Instant::now(),
194        })
195    }
196
197    /// Serialize the processor state directly to bytes.
198    ///
199    /// This is a convenience method that combines `into_serializable_state()` with
200    /// JSON serialization. For more control over serialization format, use
201    /// `into_serializable_state()` directly.
202    ///
203    /// # Errors
204    /// Returns an error if the state cannot be serialized.
205    #[allow(unused)]
206    pub fn into_bytes(self) -> DeltaResult<Vec<u8>> {
207        let state = self.into_serializable_state()?;
208        serde_json::to_vec(&state)
209            .map_err(|e| Error::generic(format!("Failed to serialize ParallelState to bytes: {e}")))
210    }
211
212    /// Reconstruct a ParallelState from bytes.
213    ///
214    /// This is a convenience method that combines JSON deserialization with
215    /// `from_serializable_state()`. The bytes must have been produced by `into_bytes()`.
216    ///
217    /// # Parameters
218    /// - `engine`: Engine for creating evaluators and filters
219    /// - `bytes`: The serialized bytes from a previous `into_bytes()` call
220    #[allow(unused)]
221    pub fn from_bytes(engine: &dyn Engine, bytes: &[u8]) -> DeltaResult<Self> {
222        let state: SerializableScanState =
223            serde_json::from_slice(bytes).map_err(Error::MalformedJson)?;
224        Self::from_serializable_state(engine, state)
225    }
226}
227
228pub struct ParallelScanMetadata {
229    pub(crate) processor: ParallelPhase<Arc<ParallelState>>,
230    span: Span,
231}
232
233impl ParallelScanMetadata {
234    pub fn try_new(
235        engine: Arc<dyn Engine>,
236        state: Arc<ParallelState>,
237        leaf_files: Vec<FileMeta>,
238    ) -> DeltaResult<Self> {
239        let read_schema = state.file_read_schema();
240        Ok(Self {
241            processor: ParallelPhase::try_new(engine, state, leaf_files, read_schema)?,
242            // TODO: Associate the same scan ID from sequential phase to correlate phases
243            span: info_span!("parallel_scan_metadata"),
244        })
245    }
246
247    pub fn new_from_iter(
248        state: Arc<ParallelState>,
249        iter: impl IntoIterator<Item = DeltaResult<Box<dyn EngineData>>> + 'static,
250    ) -> Self {
251        Self {
252            processor: ParallelPhase::new_from_iter(state.clone(), iter),
253            // TODO: Associate the same scan ID from sequential phase to correlate phases
254            span: info_span!("parallel_scan_metadata"),
255        }
256    }
257}
258
259impl Iterator for ParallelScanMetadata {
260    type Item = DeltaResult<ScanMetadata>;
261
262    fn next(&mut self) -> Option<Self::Item> {
263        let _guard = self.span.enter();
264        self.processor.next()
265    }
266}