use datafusion_common::internal_datafusion_err;
use std::collections::VecDeque;
use std::task::{Context, Poll};
use crate::morsel::{Morsel, MorselPlanner, Morselizer, PendingMorselPlanner};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result};
use datafusion_physical_plan::metrics::ScopedTimerGuard;
use futures::stream::BoxStream;
use futures::{FutureExt as _, StreamExt as _};
use super::work_source::WorkSource;
use super::{FileStreamMetrics, OnError};
pub(super) struct ScanState {
work_source: WorkSource,
remain: Option<usize>,
morselizer: Box<dyn Morselizer>,
on_error: OnError,
ready_planners: VecDeque<Box<dyn MorselPlanner>>,
ready_morsels: VecDeque<Box<dyn Morsel>>,
reader: Option<BoxStream<'static, Result<RecordBatch>>>,
pending_planner: Option<PendingMorselPlanner>,
metrics: FileStreamMetrics,
}
impl ScanState {
pub(super) fn new(
work_source: WorkSource,
remain: Option<usize>,
morselizer: Box<dyn Morselizer>,
on_error: OnError,
metrics: FileStreamMetrics,
) -> Self {
Self {
work_source,
remain,
morselizer,
on_error,
ready_planners: Default::default(),
ready_morsels: Default::default(),
reader: None,
pending_planner: None,
metrics,
}
}
pub(super) fn set_on_error(&mut self, on_error: OnError) {
self.on_error = on_error;
}
pub(super) fn poll_scan(&mut self, cx: &mut Context<'_>) -> ScanAndReturn {
let _processing_timer: ScopedTimerGuard<'_> =
self.metrics.time_processing.timer();
if let Some(mut pending_planner) = self.pending_planner.take() {
match pending_planner.poll_unpin(cx) {
Poll::Pending => {
self.pending_planner = Some(pending_planner);
}
Poll::Ready(Ok(planner)) => {
self.ready_planners.push_back(planner);
}
Poll::Ready(Err(err)) => {
self.metrics.file_open_errors.add(1);
self.metrics.time_opening.stop();
return match self.on_error {
OnError::Skip => {
self.metrics.files_processed.add(1);
ScanAndReturn::Continue
}
OnError::Fail => ScanAndReturn::Error(err),
};
}
}
}
if let Some(reader) = self.reader.as_mut() {
match reader.poll_next_unpin(cx) {
Poll::Pending => return ScanAndReturn::Return(Poll::Pending),
Poll::Ready(Some(Ok(batch))) => {
self.metrics.time_scanning_until_data.stop();
self.metrics.time_scanning_total.stop();
let (batch, finished) = match &mut self.remain {
Some(remain) => {
if *remain > batch.num_rows() {
*remain -= batch.num_rows();
self.metrics.time_scanning_total.start();
(batch, false)
} else {
let batch = batch.slice(0, *remain);
let done = 1 + self.work_source.skipped_on_limit();
self.metrics.files_processed.add(done);
*remain = 0;
(batch, true)
}
}
None => {
self.metrics.time_scanning_total.start();
(batch, false)
}
};
return if finished {
ScanAndReturn::Done(Some(Ok(batch)))
} else {
ScanAndReturn::Return(Poll::Ready(Some(Ok(batch))))
};
}
Poll::Ready(Some(Err(err))) => {
self.reader = None;
self.metrics.file_scan_errors.add(1);
self.metrics.time_scanning_until_data.stop();
self.metrics.time_scanning_total.stop();
return match self.on_error {
OnError::Skip => {
self.metrics.files_processed.add(1);
ScanAndReturn::Continue
}
OnError::Fail => ScanAndReturn::Error(err),
};
}
Poll::Ready(None) => {
self.reader = None;
self.metrics.files_processed.add(1);
self.metrics.time_scanning_until_data.stop();
self.metrics.time_scanning_total.stop();
return ScanAndReturn::Continue;
}
}
}
if let Some(morsel) = self.ready_morsels.pop_front() {
self.metrics.time_opening.stop();
self.metrics.time_scanning_until_data.start();
self.metrics.time_scanning_total.start();
self.reader = Some(morsel.into_stream());
return ScanAndReturn::Continue;
}
if self.pending_planner.is_some() {
return ScanAndReturn::Return(Poll::Pending);
}
if let Some(planner) = self.ready_planners.pop_front() {
return match planner.plan() {
Ok(Some(mut plan)) => {
self.ready_morsels.extend(plan.take_morsels());
self.ready_planners.extend(plan.take_ready_planners());
if let Some(pending_planner) = plan.take_pending_planner() {
if self.pending_planner.is_some() {
return ScanAndReturn::Error(internal_datafusion_err!(
"Conflicting pending planner state in FileStream ScanState"
));
}
self.pending_planner = Some(pending_planner);
}
ScanAndReturn::Continue
}
Ok(None) => {
self.metrics.files_processed.add(1);
self.metrics.time_opening.stop();
ScanAndReturn::Continue
}
Err(err) => {
self.metrics.file_open_errors.add(1);
self.metrics.time_opening.stop();
match self.on_error {
OnError::Skip => {
self.metrics.files_processed.add(1);
ScanAndReturn::Continue
}
OnError::Fail => ScanAndReturn::Error(err),
}
}
};
}
let part_file = match self.work_source.pop_front() {
Some(part_file) => part_file,
None => return ScanAndReturn::Done(None),
};
self.metrics.time_opening.start();
match self.morselizer.plan_file(part_file) {
Ok(planner) => {
self.metrics.files_opened.add(1);
self.ready_planners.push_back(planner);
ScanAndReturn::Continue
}
Err(err) => match self.on_error {
OnError::Skip => {
self.metrics.file_open_errors.add(1);
self.metrics.time_opening.stop();
self.metrics.files_processed.add(1);
ScanAndReturn::Continue
}
OnError::Fail => ScanAndReturn::Error(err),
},
}
}
}
pub(super) enum ScanAndReturn {
Continue,
Return(Poll<Option<Result<RecordBatch>>>),
Done(Option<Result<RecordBatch>>),
Error(DataFusionError),
}