use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::file_meta::FileMeta;
use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector};
use crate::PartitionedFile;
use arrow::datatypes::SchemaRef;
use datafusion_common::error::Result;
use datafusion_execution::RecordBatchStream;
use datafusion_physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::instant::Instant;
use datafusion_common::ScalarValue;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt as _, Stream, StreamExt as _};
pub struct FileStream {
file_iter: VecDeque<PartitionedFile>,
projected_schema: SchemaRef,
remain: Option<usize>,
file_opener: Arc<dyn FileOpener>,
pc_projector: PartitionColumnProjector,
state: FileStreamState,
file_stream_metrics: FileStreamMetrics,
baseline_metrics: BaselineMetrics,
on_error: OnError,
}
impl FileStream {
pub fn new(
config: &FileScanConfig,
partition: usize,
file_opener: Arc<dyn FileOpener>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Self> {
let projected_schema = config.projected_schema();
let pc_projector = PartitionColumnProjector::new(
Arc::clone(&projected_schema),
&config
.table_partition_cols
.iter()
.map(|x| x.name().clone())
.collect::<Vec<_>>(),
);
let file_group = config.file_groups[partition].clone();
Ok(Self {
file_iter: file_group.into_inner().into_iter().collect(),
projected_schema,
remain: config.limit,
file_opener,
pc_projector,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
baseline_metrics: BaselineMetrics::new(metrics, partition),
on_error: OnError::Fail,
})
}
pub fn with_on_error(mut self, on_error: OnError) -> Self {
self.on_error = on_error;
self
}
fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
let part_file = self.file_iter.pop_front()?;
let file_meta = FileMeta {
object_meta: part_file.object_meta,
range: part_file.range,
extensions: part_file.extensions,
metadata_size_hint: part_file.metadata_size_hint,
};
Some(
self.file_opener
.open(file_meta)
.map(|future| (future, part_file.partition_values)),
)
}
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
loop {
match &mut self.state {
FileStreamState::Idle => {
self.file_stream_metrics.time_opening.start();
match self.start_next_file().transpose() {
Ok(Some((future, partition_values))) => {
self.state = FileStreamState::Open {
future,
partition_values,
}
}
Ok(None) => return Poll::Ready(None),
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
FileStreamState::Open {
future,
partition_values,
} => match ready!(future.poll_unpin(cx)) {
Ok(reader) => {
let partition_values = mem::take(partition_values);
self.file_stream_metrics.time_opening.stop();
let next = self.start_next_file().transpose();
self.file_stream_metrics.time_scanning_until_data.start();
self.file_stream_metrics.time_scanning_total.start();
match next {
Ok(Some((next_future, next_partition_values))) => {
self.state = FileStreamState::Scan {
partition_values,
reader,
next: Some((
NextOpen::Pending(next_future),
next_partition_values,
)),
};
}
Ok(None) => {
self.state = FileStreamState::Scan {
reader,
partition_values,
next: None,
};
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
Err(e) => {
self.file_stream_metrics.file_open_errors.add(1);
match self.on_error {
OnError::Skip => {
self.file_stream_metrics.time_opening.stop();
self.state = FileStreamState::Idle
}
OnError::Fail => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
},
FileStreamState::Scan {
reader,
partition_values,
next,
} => {
if let Some((next_open_future, _)) = next {
if let NextOpen::Pending(f) = next_open_future {
if let Poll::Ready(reader) = f.as_mut().poll(cx) {
*next_open_future = NextOpen::Ready(reader);
}
}
}
match ready!(reader.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
let result = self
.pc_projector
.project(batch, partition_values)
.map_err(|e| ArrowError::ExternalError(e.into()))
.map(|batch| match &mut self.remain {
Some(remain) => {
if *remain > batch.num_rows() {
*remain -= batch.num_rows();
batch
} else {
let batch = batch.slice(0, *remain);
self.state = FileStreamState::Limit;
*remain = 0;
batch
}
}
None => batch,
});
if result.is_err() {
self.state = FileStreamState::Error
}
self.file_stream_metrics.time_scanning_total.start();
return Poll::Ready(Some(result.map_err(Into::into)));
}
Some(Err(err)) => {
self.file_stream_metrics.file_scan_errors.add(1);
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
match self.on_error {
OnError::Skip => match mem::take(next) {
Some((future, partition_values)) => {
self.file_stream_metrics.time_opening.start();
match future {
NextOpen::Pending(future) => {
self.state = FileStreamState::Open {
future,
partition_values,
}
}
NextOpen::Ready(reader) => {
self.state = FileStreamState::Open {
future: Box::pin(std::future::ready(
reader,
)),
partition_values,
}
}
}
}
None => return Poll::Ready(None),
},
OnError::Fail => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(err.into())));
}
}
}
None => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
match mem::take(next) {
Some((future, partition_values)) => {
self.file_stream_metrics.time_opening.start();
match future {
NextOpen::Pending(future) => {
self.state = FileStreamState::Open {
future,
partition_values,
}
}
NextOpen::Ready(reader) => {
self.state = FileStreamState::Open {
future: Box::pin(std::future::ready(
reader,
)),
partition_values,
}
}
}
}
None => return Poll::Ready(None),
}
}
}
}
FileStreamState::Error | FileStreamState::Limit => {
return Poll::Ready(None)
}
}
}
}
}
impl Stream for FileStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.file_stream_metrics.time_processing.start();
let result = self.poll_inner(cx);
self.file_stream_metrics.time_processing.stop();
self.baseline_metrics.record_poll(result)
}
}
impl RecordBatchStream for FileStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.projected_schema)
}
}
pub type FileOpenFuture =
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
pub enum OnError {
Fail,
Skip,
}
impl Default for OnError {
fn default() -> Self {
Self::Fail
}
}
pub trait FileOpener: Unpin + Send + Sync {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
}
pub enum NextOpen {
Pending(FileOpenFuture),
Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>),
}
pub enum FileStreamState {
Idle,
Open {
future: FileOpenFuture,
partition_values: Vec<ScalarValue>,
},
Scan {
partition_values: Vec<ScalarValue>,
reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
next: Option<(NextOpen, Vec<ScalarValue>)>,
},
Error,
Limit,
}
pub struct StartableTime {
pub metrics: Time,
pub start: Option<Instant>,
}
impl StartableTime {
pub fn start(&mut self) {
assert!(self.start.is_none());
self.start = Some(Instant::now());
}
pub fn stop(&mut self) {
if let Some(start) = self.start.take() {
self.metrics.add_elapsed(start);
}
}
}
#[allow(rustdoc::broken_intra_doc_links)]
pub struct FileStreamMetrics {
pub time_opening: StartableTime,
pub time_scanning_until_data: StartableTime,
pub time_scanning_total: StartableTime,
pub time_processing: StartableTime,
pub file_open_errors: Count,
pub file_scan_errors: Count,
}
impl FileStreamMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let time_opening = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_opening", partition),
start: None,
};
let time_scanning_until_data = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_scanning_until_data", partition),
start: None,
};
let time_scanning_total = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_scanning_total", partition),
start: None,
};
let time_processing = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_processing", partition),
start: None,
};
let file_open_errors =
MetricBuilder::new(metrics).counter("file_open_errors", partition);
let file_scan_errors =
MetricBuilder::new(metrics).counter("file_scan_errors", partition);
Self {
time_opening,
time_scanning_until_data,
time_scanning_total,
time_processing,
file_open_errors,
file_scan_errors,
}
}
}
#[cfg(test)]
mod tests {
use crate::file_scan_config::FileScanConfigBuilder;
use crate::tests::make_partition;
use crate::PartitionedFile;
use arrow::error::ArrowError;
use datafusion_common::error::Result;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{FutureExt as _, StreamExt as _};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::file_meta::FileMeta;
use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
use crate::test_util::MockSource;
use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
use datafusion_common::{assert_batches_eq, internal_err};
#[derive(Default)]
struct TestOpener {
error_opening_idx: Vec<usize>,
error_scanning_idx: Vec<usize>,
current_idx: AtomicUsize,
records: Vec<RecordBatch>,
}
impl FileOpener for TestOpener {
fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
if self.error_opening_idx.contains(&idx) {
Ok(futures::future::ready(internal_err!("error opening")).boxed())
} else if self.error_scanning_idx.contains(&idx) {
let error = futures::future::ready(Err(ArrowError::IpcError(
"error scanning".to_owned(),
)));
let stream = futures::stream::once(error).boxed();
Ok(futures::future::ready(Ok(stream)).boxed())
} else {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
Ok(futures::future::ready(Ok(stream)).boxed())
}
}
}
#[derive(Default)]
struct FileStreamTest {
num_files: usize,
limit: Option<usize>,
on_error: OnError,
opener: TestOpener,
}
impl FileStreamTest {
pub fn new() -> Self {
Self::default()
}
pub fn with_num_files(mut self, num_files: usize) -> Self {
self.num_files = num_files;
self
}
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
self.opener.error_opening_idx = idx;
self
}
pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
self.opener.error_scanning_idx = idx;
self
}
pub fn with_on_error(mut self, on_error: OnError) -> Self {
self.on_error = on_error;
self
}
pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
self.opener.records = records;
self
}
pub async fn result(self) -> Result<Vec<RecordBatch>> {
let file_schema = self
.opener
.records
.first()
.map(|batch| batch.schema())
.unwrap_or_else(|| Arc::new(Schema::empty()));
let mock_files: Vec<(String, u64)> = (0..self.num_files)
.map(|idx| (format!("mock_file{idx}"), 10_u64))
.collect();
let file_group = mock_files
.into_iter()
.map(|(name, size)| PartitionedFile::new(name, size))
.collect();
let on_error = self.on_error;
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
file_schema,
Arc::new(MockSource::default()),
)
.with_file_group(file_group)
.with_limit(self.limit)
.build();
let metrics_set = ExecutionPlanMetricsSet::new();
let file_stream =
FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
.unwrap()
.with_on_error(on_error);
file_stream
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>>>()
}
}
async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_limit(limit)
.result()
.await
.expect("error executing stream")
}
#[tokio::test]
async fn on_error_opening() -> Result<()> {
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_open_errors(vec![0])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_open_errors(vec![1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_open_errors(vec![0, 1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"++",
"++",
], &batches);
Ok(())
}
#[tokio::test]
async fn on_error_scanning_fail() -> Result<()> {
let result = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Fail)
.with_scan_errors(vec![1])
.result()
.await;
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn on_error_opening_fail() -> Result<()> {
let result = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Fail)
.with_open_errors(vec![1])
.result()
.await;
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn on_error_scanning() -> Result<()> {
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_scan_errors(vec![0])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_scan_errors(vec![1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_scan_errors(vec![0, 1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"++",
"++",
], &batches);
Ok(())
}
#[tokio::test]
async fn on_error_mixed() -> Result<()> {
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(3)
.with_on_error(OnError::Skip)
.with_open_errors(vec![1])
.with_scan_errors(vec![0])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(3)
.with_on_error(OnError::Skip)
.with_open_errors(vec![0])
.with_scan_errors(vec![1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(3)
.with_on_error(OnError::Skip)
.with_open_errors(vec![2])
.with_scan_errors(vec![0, 1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"++",
"++",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(3)
.with_on_error(OnError::Skip)
.with_open_errors(vec![0, 2])
.with_scan_errors(vec![1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"++",
"++",
], &batches);
Ok(())
}
#[tokio::test]
async fn without_limit() -> Result<()> {
let batches = create_and_collect(None).await;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
Ok(())
}
#[tokio::test]
async fn with_limit_between_files() -> Result<()> {
let batches = create_and_collect(Some(5)).await;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
Ok(())
}
#[tokio::test]
async fn with_limit_at_middle_of_batch() -> Result<()> {
let batches = create_and_collect(Some(6)).await;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"| 0 |",
"+---+",
], &batches);
Ok(())
}
}