use std::sync::Arc;
use std::time::Instant;
use delta_kernel_derive::internal_api;
use tracing::{info_span, Span};
use crate::log_replay::{ActionsBatch, ParallelLogReplayProcessor};
use crate::metrics::events::emit_scan_metadata_completed;
use crate::metrics::{MetricId, ScanType};
use crate::parallel::parallel_phase::ParallelPhase;
use crate::parallel::sequential_phase::{AfterSequential, SequentialPhase};
use crate::scan::log_replay::{ScanLogReplayProcessor, SerializableScanState};
use crate::scan::ScanMetadata;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};
pub enum AfterSequentialScanMetadata {
Done,
Parallel {
state: Box<ParallelState>,
files: Vec<FileMeta>,
},
}
pub struct SequentialScanMetadata {
pub(crate) sequential: SequentialPhase<ScanLogReplayProcessor>,
operation_id: MetricId,
correlation_id: Option<Arc<str>>,
start: Instant,
span: Span,
}
impl SequentialScanMetadata {
pub(crate) fn new(
sequential: SequentialPhase<ScanLogReplayProcessor>,
correlation_id: Option<Arc<str>>,
) -> Self {
let operation_id = MetricId::new();
Self {
sequential,
operation_id,
correlation_id,
start: Instant::now(),
span: info_span!("sequential_scan_metadata"),
}
}
pub fn finish(self) -> DeltaResult<AfterSequentialScanMetadata> {
let _guard = self.span.enter();
match self.sequential.finish()? {
AfterSequential::Done(processor) => {
let event = processor.get_metrics().to_event(
self.operation_id,
processor.is_catalog_managed(),
self.correlation_id,
ScanType::SequentialPhase,
self.start.elapsed(),
);
processor
.get_metrics()
.log("Sequential scan metadata completed");
emit_scan_metadata_completed(&event);
Ok(AfterSequentialScanMetadata::Done)
}
AfterSequential::Parallel { processor, files } => {
let event = processor.get_metrics().to_event(
self.operation_id,
processor.is_catalog_managed(),
self.correlation_id.clone(),
ScanType::SequentialPhase,
self.start.elapsed(),
);
processor
.get_metrics()
.log("Sequential scan metadata completed");
emit_scan_metadata_completed(&event);
processor.get_metrics().reset_counters();
Ok(AfterSequentialScanMetadata::Parallel {
state: Box::new(ParallelState {
inner: processor,
operation_id: self.operation_id,
correlation_id: self.correlation_id,
parallel_start: Instant::now(),
}),
files,
})
}
}
}
}
impl Iterator for SequentialScanMetadata {
type Item = DeltaResult<ScanMetadata>;
fn next(&mut self) -> Option<Self::Item> {
let _guard = self.span.enter();
self.sequential.next()
}
}
pub struct ParallelState {
inner: ScanLogReplayProcessor,
operation_id: MetricId,
correlation_id: Option<Arc<str>>,
parallel_start: Instant,
}
impl ParallelLogReplayProcessor for Arc<ParallelState> {
type Output = ScanMetadata;
fn process_actions_batch(&self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output> {
self.inner.process_actions_batch(actions_batch)
}
}
impl ParallelState {
pub fn log_metrics(&self) {
let event = self.inner.get_metrics().to_event(
self.operation_id,
self.inner.is_catalog_managed(),
self.correlation_id.clone(),
ScanType::ParallelPhase,
self.parallel_start.elapsed(),
);
self.inner
.get_metrics()
.log("Parallel scan metadata completed");
emit_scan_metadata_completed(&event);
}
pub fn file_read_schema(&self) -> SchemaRef {
self.inner.checkpoint_info().checkpoint_read_schema.clone()
}
#[internal_api]
#[allow(unused)]
pub(crate) fn into_serializable_state(self) -> DeltaResult<SerializableScanState> {
self.inner.into_serializable_state()
}
#[internal_api]
#[allow(unused)]
pub(crate) fn from_serializable_state(
engine: &dyn Engine,
state: SerializableScanState,
) -> DeltaResult<Self> {
let inner = ScanLogReplayProcessor::from_serializable_state(engine, state)?;
Ok(Self {
inner,
operation_id: MetricId::new(),
correlation_id: None,
parallel_start: Instant::now(),
})
}
#[allow(unused)]
pub fn into_bytes(self) -> DeltaResult<Vec<u8>> {
let state = self.into_serializable_state()?;
serde_json::to_vec(&state)
.map_err(|e| Error::generic(format!("Failed to serialize ParallelState to bytes: {e}")))
}
#[allow(unused)]
pub fn from_bytes(engine: &dyn Engine, bytes: &[u8]) -> DeltaResult<Self> {
let state: SerializableScanState =
serde_json::from_slice(bytes).map_err(Error::MalformedJson)?;
Self::from_serializable_state(engine, state)
}
}
pub struct ParallelScanMetadata {
pub(crate) processor: ParallelPhase<Arc<ParallelState>>,
span: Span,
}
impl ParallelScanMetadata {
pub fn try_new(
engine: Arc<dyn Engine>,
state: Arc<ParallelState>,
leaf_files: Vec<FileMeta>,
) -> DeltaResult<Self> {
let read_schema = state.file_read_schema();
Ok(Self {
processor: ParallelPhase::try_new(engine, state, leaf_files, read_schema)?,
span: info_span!("parallel_scan_metadata"),
})
}
pub fn new_from_iter(
state: Arc<ParallelState>,
iter: impl IntoIterator<Item = DeltaResult<Box<dyn EngineData>>> + 'static,
) -> Self {
Self {
processor: ParallelPhase::new_from_iter(state.clone(), iter),
span: info_span!("parallel_scan_metadata"),
}
}
}
impl Iterator for ParallelScanMetadata {
type Item = DeltaResult<ScanMetadata>;
fn next(&mut self) -> Option<Self::Item> {
let _guard = self.span.enter();
self.processor.next()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use super::ParallelState;
use crate::engine::sync::SyncEngine;
use crate::log_segment::CheckpointReadInfo;
use crate::metrics::{MetricEvent, ScanType, TableType};
use crate::scan::log_replay::{ScanLogReplayProcessor, ScanStatsOptions};
use crate::scan::state_info::StateInfo;
use crate::scan::PhysicalPredicate;
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::table_features::ColumnMappingMode;
use crate::utils::test_utils::{install_thread_local_metrics_reporter, CapturingReporter};
#[test]
fn test_parallel_state_log_metrics_carries_round_tripped_table_type() {
let engine = SyncEngine::new();
let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new(
"id",
DataType::INTEGER,
true,
)]));
let state_info = Arc::new(StateInfo {
logical_schema: schema.clone(),
physical_schema: schema,
physical_predicate: PhysicalPredicate::None,
transform_spec: None,
column_mapping_mode: ColumnMappingMode::None,
physical_stats_schema: None,
physical_partition_schema: None,
physical_stats_columns: HashSet::new(),
is_catalog_managed: true,
});
let processor = ScanLogReplayProcessor::new(
&engine,
state_info,
CheckpointReadInfo::without_stats_parsed(),
ScanStatsOptions::default(),
)
.unwrap();
let serialized = processor.into_serializable_state().unwrap();
let state = ParallelState::from_serializable_state(&engine, serialized).unwrap();
let reporter = Arc::new(CapturingReporter::default());
let _guard = install_thread_local_metrics_reporter(reporter.clone());
state.log_metrics();
let event = reporter
.events()
.into_iter()
.find_map(|e| match e {
MetricEvent::ScanMetadataCompleted(c) => Some(c),
_ => None,
})
.expect("ScanMetadataCompleted emitted");
assert_eq!(event.table_type, TableType::CatalogManaged);
assert_eq!(event.scan_type, ScanType::ParallelPhase);
}
}