use std::sync::Arc;
use delta_kernel_derive::internal_api;
use tracing::{info_span, Span};
use crate::log_replay::{ActionsBatch, ParallelLogReplayProcessor};
use crate::parallel::parallel_phase::ParallelPhase;
use crate::parallel::sequential_phase::{AfterSequential, SequentialPhase};
use crate::scan::log_replay::{ScanLogReplayProcessor, SerializableScanState};
use crate::scan::ScanMetadata;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};
pub enum AfterSequentialScanMetadata {
Done,
Parallel {
state: Box<ParallelState>,
files: Vec<FileMeta>,
},
}
pub struct SequentialScanMetadata {
pub(crate) sequential: SequentialPhase<ScanLogReplayProcessor>,
span: Span,
}
impl SequentialScanMetadata {
pub(crate) fn new(sequential: SequentialPhase<ScanLogReplayProcessor>) -> Self {
Self {
sequential,
span: info_span!("sequential_scan_metadata"),
}
}
pub fn finish(self) -> DeltaResult<AfterSequentialScanMetadata> {
let _guard = self.span.enter();
match self.sequential.finish()? {
AfterSequential::Done(processor) => {
processor
.get_metrics()
.log("Sequential scan metadata completed");
Ok(AfterSequentialScanMetadata::Done)
}
AfterSequential::Parallel { processor, files } => {
processor
.get_metrics()
.log("Sequential scan metadata completed");
processor.get_metrics().reset_counters();
Ok(AfterSequentialScanMetadata::Parallel {
state: Box::new(ParallelState { inner: processor }),
files,
})
}
}
}
}
impl Iterator for SequentialScanMetadata {
type Item = DeltaResult<ScanMetadata>;
fn next(&mut self) -> Option<Self::Item> {
let _guard = self.span.enter();
self.sequential.next()
}
}
pub struct ParallelState {
inner: ScanLogReplayProcessor,
}
impl ParallelLogReplayProcessor for Arc<ParallelState> {
type Output = ScanMetadata;
fn process_actions_batch(&self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output> {
self.inner.process_actions_batch(actions_batch)
}
}
impl ParallelState {
pub fn log_metrics(&self) {
self.inner
.get_metrics()
.log("Parallel scan metadata completed");
}
pub fn file_read_schema(&self) -> SchemaRef {
self.inner.checkpoint_info().checkpoint_read_schema.clone()
}
#[internal_api]
#[allow(unused)]
pub(crate) fn into_serializable_state(self) -> DeltaResult<SerializableScanState> {
self.inner.into_serializable_state()
}
#[internal_api]
#[allow(unused)]
pub(crate) fn from_serializable_state(
engine: &dyn Engine,
state: SerializableScanState,
) -> DeltaResult<Self> {
let inner = ScanLogReplayProcessor::from_serializable_state(engine, state)?;
Ok(Self { inner })
}
#[allow(unused)]
pub fn into_bytes(self) -> DeltaResult<Vec<u8>> {
let state = self.into_serializable_state()?;
serde_json::to_vec(&state)
.map_err(|e| Error::generic(format!("Failed to serialize ParallelState to bytes: {e}")))
}
#[allow(unused)]
pub fn from_bytes(engine: &dyn Engine, bytes: &[u8]) -> DeltaResult<Self> {
let state: SerializableScanState =
serde_json::from_slice(bytes).map_err(Error::MalformedJson)?;
Self::from_serializable_state(engine, state)
}
}
pub struct ParallelScanMetadata {
pub(crate) processor: ParallelPhase<Arc<ParallelState>>,
span: Span,
}
impl ParallelScanMetadata {
pub fn try_new(
engine: Arc<dyn Engine>,
state: Arc<ParallelState>,
leaf_files: Vec<FileMeta>,
) -> DeltaResult<Self> {
let read_schema = state.file_read_schema();
Ok(Self {
processor: ParallelPhase::try_new(engine, state, leaf_files, read_schema)?,
span: info_span!("parallel_scan_metadata"),
})
}
pub fn new_from_iter(
state: Arc<ParallelState>,
iter: impl IntoIterator<Item = DeltaResult<Box<dyn EngineData>>> + 'static,
) -> Self {
Self {
processor: ParallelPhase::new_from_iter(state.clone(), iter),
span: info_span!("parallel_scan_metadata"),
}
}
}
impl Iterator for ParallelScanMetadata {
type Item = DeltaResult<ScanMetadata>;
fn next(&mut self) -> Option<Self::Item> {
let _guard = self.span.enter();
self.processor.next()
}
}