use std::sync::Arc;
use std::time::Instant;
use delta_kernel_derive::internal_api;
use tracing::{info_span, Span};
use crate::log_replay::{ActionsBatch, ParallelLogReplayProcessor};
use crate::log_segment::LogSegment;
use crate::metrics::reporter::emit_scan_metadata_completed;
use crate::metrics::{MetricId, ScanType};
use crate::parallel::parallel_phase::ParallelPhase;
use crate::parallel::sequential_phase::{AfterSequential, SequentialPhase};
use crate::scan::log_replay::{ScanLogReplayProcessor, SerializableScanState};
use crate::scan::ScanMetadata;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};
pub enum AfterSequentialScanMetadata {
Done,
Parallel {
state: Box<ParallelState>,
files: Vec<FileMeta>,
},
}
pub struct SequentialScanMetadata {
pub(crate) sequential: SequentialPhase<ScanLogReplayProcessor>,
operation_id: MetricId,
start: Instant,
deferred_parallel_checkpoint_planning: Option<DeferredParallelCheckpointPlanning>,
span: Span,
}
impl SequentialScanMetadata {
pub(crate) fn new(sequential: SequentialPhase<ScanLogReplayProcessor>) -> Self {
let operation_id = MetricId::new();
Self {
sequential,
operation_id,
start: Instant::now(),
deferred_parallel_checkpoint_planning: None,
span: info_span!("sequential_scan_metadata"),
}
}
pub(crate) fn new_with_deferred_parallel_checkpoint_planning(
sequential: SequentialPhase<ScanLogReplayProcessor>,
engine: Arc<dyn Engine>,
action_schema: SchemaRef,
stats_schema: Option<SchemaRef>,
partition_schema: Option<SchemaRef>,
) -> Self {
let operation_id = MetricId::new();
Self {
sequential,
operation_id,
start: Instant::now(),
deferred_parallel_checkpoint_planning: Some(DeferredParallelCheckpointPlanning {
engine,
action_schema,
stats_schema,
partition_schema,
}),
span: info_span!("sequential_scan_metadata"),
}
}
pub fn finish(self) -> DeltaResult<AfterSequentialScanMetadata> {
let Self {
sequential,
operation_id,
start,
deferred_parallel_checkpoint_planning,
span,
} = self;
let _guard = span.enter();
match sequential.finish()? {
AfterSequential::Done(processor) => {
let event = processor.get_metrics().to_event(
operation_id,
ScanType::SequentialPhase,
start.elapsed(),
);
processor
.get_metrics()
.log("Sequential scan metadata completed");
emit_scan_metadata_completed(&event);
Ok(AfterSequentialScanMetadata::Done)
}
AfterSequential::Parallel {
mut processor,
files,
} => {
let event = processor.get_metrics().to_event(
operation_id,
ScanType::SequentialPhase,
start.elapsed(),
);
if let Some(deferred_planning) = deferred_parallel_checkpoint_planning.as_ref() {
if let Some(checkpoint_info) = deferred_planning.finalize(&files)? {
processor.set_checkpoint_info(
deferred_planning.engine.as_ref(),
checkpoint_info,
)?;
}
}
processor
.get_metrics()
.log("Sequential scan metadata completed");
emit_scan_metadata_completed(&event);
processor.get_metrics().reset_counters();
Ok(AfterSequentialScanMetadata::Parallel {
state: Box::new(ParallelState {
inner: processor,
operation_id,
parallel_start: Instant::now(),
}),
files,
})
}
}
}
}
struct DeferredParallelCheckpointPlanning {
engine: Arc<dyn Engine>,
action_schema: SchemaRef,
stats_schema: Option<SchemaRef>,
partition_schema: Option<SchemaRef>,
}
impl DeferredParallelCheckpointPlanning {
fn finalize(
&self,
files: &[FileMeta],
) -> DeltaResult<Option<crate::log_segment::CheckpointReadInfo>> {
if files.is_empty() {
return Ok(None);
}
Ok(Some(LogSegment::projected_checkpoint_read_info_from_files(
self.engine.as_ref(),
self.action_schema.clone(),
self.stats_schema.as_deref(),
self.partition_schema.as_deref(),
files,
)?))
}
}
impl Iterator for SequentialScanMetadata {
type Item = DeltaResult<ScanMetadata>;
fn next(&mut self) -> Option<Self::Item> {
let _guard = self.span.enter();
self.sequential.next()
}
}
pub struct ParallelState {
inner: ScanLogReplayProcessor,
operation_id: MetricId,
parallel_start: Instant,
}
impl ParallelLogReplayProcessor for Arc<ParallelState> {
type Output = ScanMetadata;
fn process_actions_batch(&self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output> {
self.inner.process_actions_batch(actions_batch)
}
}
impl ParallelState {
pub fn log_metrics(&self) {
let event = self.inner.get_metrics().to_event(
self.operation_id,
ScanType::ParallelPhase,
self.parallel_start.elapsed(),
);
self.inner
.get_metrics()
.log("Parallel scan metadata completed");
emit_scan_metadata_completed(&event);
}
pub fn file_read_schema(&self) -> SchemaRef {
self.inner.checkpoint_info().checkpoint_read_schema.clone()
}
#[internal_api]
#[allow(unused)]
pub(crate) fn into_serializable_state(self) -> DeltaResult<SerializableScanState> {
self.inner.into_serializable_state()
}
#[internal_api]
#[allow(unused)]
pub(crate) fn from_serializable_state(
engine: &dyn Engine,
state: SerializableScanState,
) -> DeltaResult<Self> {
let inner = ScanLogReplayProcessor::from_serializable_state(engine, state)?;
Ok(Self {
inner,
operation_id: MetricId::new(),
parallel_start: Instant::now(),
})
}
#[allow(unused)]
pub fn into_bytes(self) -> DeltaResult<Vec<u8>> {
let state = self.into_serializable_state()?;
serde_json::to_vec(&state)
.map_err(|e| Error::generic(format!("Failed to serialize ParallelState to bytes: {e}")))
}
#[allow(unused)]
pub fn from_bytes(engine: &dyn Engine, bytes: &[u8]) -> DeltaResult<Self> {
let state: SerializableScanState =
serde_json::from_slice(bytes).map_err(Error::MalformedJson)?;
Self::from_serializable_state(engine, state)
}
}
pub struct ParallelScanMetadata {
pub(crate) processor: ParallelPhase<Arc<ParallelState>>,
span: Span,
}
impl ParallelScanMetadata {
pub fn try_new(
engine: Arc<dyn Engine>,
state: Arc<ParallelState>,
leaf_files: Vec<FileMeta>,
) -> DeltaResult<Self> {
let read_schema = state.file_read_schema();
Ok(Self {
processor: ParallelPhase::try_new(engine, state, leaf_files, read_schema)?,
span: info_span!("parallel_scan_metadata"),
})
}
pub fn new_from_iter(
state: Arc<ParallelState>,
iter: impl IntoIterator<Item = DeltaResult<Box<dyn EngineData>>> + 'static,
) -> Self {
Self {
processor: ParallelPhase::new_from_iter(state.clone(), iter),
span: info_span!("parallel_scan_metadata"),
}
}
}
impl Iterator for ParallelScanMetadata {
type Item = DeltaResult<ScanMetadata>;
fn next(&mut self) -> Option<Self::Item> {
let _guard = self.span.enter();
self.processor.next()
}
}