use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::PartitionedFile;
use crate::file_scan_config::FileScanConfig;
use arrow::datatypes::SchemaRef;
use datafusion_common::error::Result;
use datafusion_execution::RecordBatchStream;
use datafusion_physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
use arrow::record_batch::RecordBatch;
use datafusion_common::instant::Instant;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{FutureExt as _, Stream, StreamExt as _, ready};
pub struct FileStream {
file_iter: VecDeque<PartitionedFile>,
projected_schema: SchemaRef,
remain: Option<usize>,
file_opener: Arc<dyn FileOpener>,
state: FileStreamState,
file_stream_metrics: FileStreamMetrics,
baseline_metrics: BaselineMetrics,
on_error: OnError,
}
impl FileStream {
pub fn new(
config: &FileScanConfig,
partition: usize,
file_opener: Arc<dyn FileOpener>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Self> {
let projected_schema = config.projected_schema()?;
let file_group = config.file_groups[partition].clone();
Ok(Self {
file_iter: file_group.into_inner().into_iter().collect(),
projected_schema,
remain: config.limit,
file_opener,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
baseline_metrics: BaselineMetrics::new(metrics, partition),
on_error: OnError::Fail,
})
}
pub fn with_on_error(mut self, on_error: OnError) -> Self {
self.on_error = on_error;
self
}
fn start_next_file(&mut self) -> Option<Result<FileOpenFuture>> {
let part_file = self.file_iter.pop_front()?;
Some(self.file_opener.open(part_file))
}
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
loop {
match &mut self.state {
FileStreamState::Idle => {
self.file_stream_metrics.time_opening.start();
match self.start_next_file().transpose() {
Ok(Some(future)) => self.state = FileStreamState::Open { future },
Ok(None) => return Poll::Ready(None),
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) {
Ok(reader) => {
self.file_stream_metrics.time_opening.stop();
let next = self.start_next_file().transpose();
self.file_stream_metrics.time_scanning_until_data.start();
self.file_stream_metrics.time_scanning_total.start();
match next {
Ok(Some(next_future)) => {
self.state = FileStreamState::Scan {
reader,
next: Some(NextOpen::Pending(next_future)),
};
}
Ok(None) => {
self.state = FileStreamState::Scan { reader, next: None };
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
Err(e) => {
self.file_stream_metrics.file_open_errors.add(1);
match self.on_error {
OnError::Skip => {
self.file_stream_metrics.time_opening.stop();
self.state = FileStreamState::Idle
}
OnError::Fail => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
},
FileStreamState::Scan { reader, next } => {
if let Some(next_open_future) = next
&& let NextOpen::Pending(f) = next_open_future
&& let Poll::Ready(reader) = f.as_mut().poll(cx)
{
*next_open_future = NextOpen::Ready(reader);
}
match ready!(reader.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
let batch = match &mut self.remain {
Some(remain) => {
if *remain > batch.num_rows() {
*remain -= batch.num_rows();
batch
} else {
let batch = batch.slice(0, *remain);
self.state = FileStreamState::Limit;
*remain = 0;
batch
}
}
None => batch,
};
self.file_stream_metrics.time_scanning_total.start();
return Poll::Ready(Some(Ok(batch)));
}
Some(Err(err)) => {
self.file_stream_metrics.file_scan_errors.add(1);
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
match self.on_error {
OnError::Skip => match mem::take(next) {
Some(future) => {
self.file_stream_metrics.time_opening.start();
match future {
NextOpen::Pending(future) => {
self.state =
FileStreamState::Open { future }
}
NextOpen::Ready(reader) => {
self.state = FileStreamState::Open {
future: Box::pin(std::future::ready(
reader,
)),
}
}
}
}
None => return Poll::Ready(None),
},
OnError::Fail => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(err)));
}
}
}
None => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
match mem::take(next) {
Some(future) => {
self.file_stream_metrics.time_opening.start();
match future {
NextOpen::Pending(future) => {
self.state = FileStreamState::Open { future }
}
NextOpen::Ready(reader) => {
self.state = FileStreamState::Open {
future: Box::pin(std::future::ready(
reader,
)),
}
}
}
}
None => return Poll::Ready(None),
}
}
}
}
FileStreamState::Error | FileStreamState::Limit => {
return Poll::Ready(None);
}
}
}
}
}
impl Stream for FileStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.file_stream_metrics.time_processing.start();
let result = self.poll_inner(cx);
self.file_stream_metrics.time_processing.stop();
self.baseline_metrics.record_poll(result)
}
}
impl RecordBatchStream for FileStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.projected_schema)
}
}
pub type FileOpenFuture =
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
#[derive(Default)]
pub enum OnError {
#[default]
Fail,
Skip,
}
pub trait FileOpener: Unpin + Send + Sync {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture>;
}
pub enum NextOpen {
Pending(FileOpenFuture),
Ready(Result<BoxStream<'static, Result<RecordBatch>>>),
}
pub enum FileStreamState {
Idle,
Open {
future: FileOpenFuture,
},
Scan {
reader: BoxStream<'static, Result<RecordBatch>>,
next: Option<NextOpen>,
},
Error,
Limit,
}
pub struct StartableTime {
pub metrics: Time,
pub start: Option<Instant>,
}
impl StartableTime {
pub fn start(&mut self) {
assert!(self.start.is_none());
self.start = Some(Instant::now());
}
pub fn stop(&mut self) {
if let Some(start) = self.start.take() {
self.metrics.add_elapsed(start);
}
}
}
pub struct FileStreamMetrics {
pub time_opening: StartableTime,
pub time_scanning_until_data: StartableTime,
pub time_scanning_total: StartableTime,
pub time_processing: StartableTime,
pub file_open_errors: Count,
pub file_scan_errors: Count,
}
impl FileStreamMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let time_opening = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_opening", partition),
start: None,
};
let time_scanning_until_data = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_scanning_until_data", partition),
start: None,
};
let time_scanning_total = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_scanning_total", partition),
start: None,
};
let time_processing = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_processing", partition),
start: None,
};
let file_open_errors =
MetricBuilder::new(metrics).counter("file_open_errors", partition);
let file_scan_errors =
MetricBuilder::new(metrics).counter("file_scan_errors", partition);
Self {
time_opening,
time_scanning_until_data,
time_scanning_total,
time_processing,
file_open_errors,
file_scan_errors,
}
}
}
#[cfg(test)]
mod tests {
use crate::PartitionedFile;
use crate::file_scan_config::FileScanConfigBuilder;
use crate::tests::make_partition;
use datafusion_common::error::Result;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{FutureExt as _, StreamExt as _};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
use crate::test_util::MockSource;
use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
use datafusion_common::{assert_batches_eq, exec_err, internal_err};
#[derive(Default)]
struct TestOpener {
error_opening_idx: Vec<usize>,
error_scanning_idx: Vec<usize>,
current_idx: AtomicUsize,
records: Vec<RecordBatch>,
}
impl FileOpener for TestOpener {
fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
if self.error_opening_idx.contains(&idx) {
Ok(futures::future::ready(internal_err!("error opening")).boxed())
} else if self.error_scanning_idx.contains(&idx) {
let error = futures::future::ready(exec_err!("error scanning"));
let stream = futures::stream::once(error).boxed();
Ok(futures::future::ready(Ok(stream)).boxed())
} else {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
Ok(futures::future::ready(Ok(stream)).boxed())
}
}
}
#[derive(Default)]
struct FileStreamTest {
num_files: usize,
limit: Option<usize>,
on_error: OnError,
opener: TestOpener,
}
impl FileStreamTest {
pub fn new() -> Self {
Self::default()
}
pub fn with_num_files(mut self, num_files: usize) -> Self {
self.num_files = num_files;
self
}
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
self.opener.error_opening_idx = idx;
self
}
pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
self.opener.error_scanning_idx = idx;
self
}
pub fn with_on_error(mut self, on_error: OnError) -> Self {
self.on_error = on_error;
self
}
pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
self.opener.records = records;
self
}
pub async fn result(self) -> Result<Vec<RecordBatch>> {
let file_schema = self
.opener
.records
.first()
.map(|batch| batch.schema())
.unwrap_or_else(|| Arc::new(Schema::empty()));
let mock_files: Vec<(String, u64)> = (0..self.num_files)
.map(|idx| (format!("mock_file{idx}"), 10_u64))
.collect();
let file_group = mock_files
.into_iter()
.map(|(name, size)| PartitionedFile::new(name, size))
.collect();
let on_error = self.on_error;
let table_schema = crate::table_schema::TableSchema::new(file_schema, vec![]);
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
Arc::new(MockSource::new(table_schema)),
)
.with_file_group(file_group)
.with_limit(self.limit)
.build();
let metrics_set = ExecutionPlanMetricsSet::new();
let file_stream =
FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
.unwrap()
.with_on_error(on_error);
file_stream
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>>>()
}
}
async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_limit(limit)
.result()
.await
.expect("error executing stream")
}
#[tokio::test]
async fn on_error_opening() -> Result<()> {
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_open_errors(vec![0])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_open_errors(vec![1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_open_errors(vec![0, 1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"++",
"++",
], &batches);
Ok(())
}
#[tokio::test]
async fn on_error_scanning_fail() -> Result<()> {
let result = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Fail)
.with_scan_errors(vec![1])
.result()
.await;
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn on_error_opening_fail() -> Result<()> {
let result = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Fail)
.with_open_errors(vec![1])
.result()
.await;
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn on_error_scanning() -> Result<()> {
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_scan_errors(vec![0])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_scan_errors(vec![1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_on_error(OnError::Skip)
.with_scan_errors(vec![0, 1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"++",
"++",
], &batches);
Ok(())
}
#[tokio::test]
async fn on_error_mixed() -> Result<()> {
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(3)
.with_on_error(OnError::Skip)
.with_open_errors(vec![1])
.with_scan_errors(vec![0])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(3)
.with_on_error(OnError::Skip)
.with_open_errors(vec![0])
.with_scan_errors(vec![1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(3)
.with_on_error(OnError::Skip)
.with_open_errors(vec![2])
.with_scan_errors(vec![0, 1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"++",
"++",
], &batches);
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(3)
.with_on_error(OnError::Skip)
.with_open_errors(vec![0, 2])
.with_scan_errors(vec![1])
.result()
.await?;
#[rustfmt::skip]
assert_batches_eq!(&[
"++",
"++",
], &batches);
Ok(())
}
#[tokio::test]
async fn without_limit() -> Result<()> {
let batches = create_and_collect(None).await;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
Ok(())
}
#[tokio::test]
async fn with_limit_between_files() -> Result<()> {
let batches = create_and_collect(Some(5)).await;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
Ok(())
}
#[tokio::test]
async fn with_limit_at_middle_of_batch() -> Result<()> {
let batches = create_and_collect(Some(6)).await;
#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"| 0 |",
"+---+",
], &batches);
Ok(())
}
}