use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
use crate::datasource::listing::PartitionedFile;
use crate::error::Result;
use crate::physical_plan::file_format::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
use crate::physical_plan::RecordBatchStream;
pub type FileOpenFuture =
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
pub trait FileOpener: Unpin {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
}
pub struct FileStream<F: FileOpener> {
file_iter: VecDeque<PartitionedFile>,
projected_schema: SchemaRef,
remain: Option<usize>,
file_reader: F,
pc_projector: PartitionColumnProjector,
state: FileStreamState,
file_stream_metrics: FileStreamMetrics,
baseline_metrics: BaselineMetrics,
}
enum FileStreamState {
Idle,
Open {
future: FileOpenFuture,
partition_values: Vec<ScalarValue>,
},
Scan {
partition_values: Vec<ScalarValue>,
reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
next: Option<(FileOpenFuture, Vec<ScalarValue>)>,
},
Error,
Limit,
}
struct StartableTime {
metrics: Time,
start: Option<Instant>,
}
impl StartableTime {
fn start(&mut self) {
assert!(self.start.is_none());
self.start = Some(Instant::now());
}
fn stop(&mut self) {
if let Some(start) = self.start.take() {
self.metrics.add_elapsed(start);
}
}
}
struct FileStreamMetrics {
pub time_opening: StartableTime,
pub time_scanning_until_data: StartableTime,
pub time_scanning_total: StartableTime,
pub time_processing: StartableTime,
}
impl FileStreamMetrics {
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let time_opening = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_opening", partition),
start: None,
};
let time_scanning_until_data = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_scanning_until_data", partition),
start: None,
};
let time_scanning_total = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_scanning_total", partition),
start: None,
};
let time_processing = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_processing", partition),
start: None,
};
Self {
time_opening,
time_scanning_until_data,
time_scanning_total,
time_processing,
}
}
}
impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
file_reader: F,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Self> {
let (projected_schema, _) = config.project();
let pc_projector = PartitionColumnProjector::new(
projected_schema.clone(),
&config
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
);
let files = config.file_groups[partition].clone();
Ok(Self {
file_iter: files.into(),
projected_schema,
remain: config.limit,
file_reader,
pc_projector,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
baseline_metrics: BaselineMetrics::new(metrics, partition),
})
}
fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
let part_file = self.file_iter.pop_front()?;
let file_meta = FileMeta {
object_meta: part_file.object_meta,
range: part_file.range,
extensions: part_file.extensions,
};
Some(
self.file_reader
.open(file_meta)
.map(|future| (future, part_file.partition_values)),
)
}
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
loop {
match &mut self.state {
FileStreamState::Idle => {
self.file_stream_metrics.time_opening.start();
match self.start_next_file().transpose() {
Ok(Some((future, partition_values))) => {
self.state = FileStreamState::Open {
future,
partition_values,
}
}
Ok(None) => return Poll::Ready(None),
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
FileStreamState::Open {
future,
partition_values,
} => match ready!(future.poll_unpin(cx)) {
Ok(reader) => {
let partition_values = mem::take(partition_values);
let next = self.start_next_file().transpose();
self.file_stream_metrics.time_opening.stop();
self.file_stream_metrics.time_scanning_until_data.start();
self.file_stream_metrics.time_scanning_total.start();
match next {
Ok(Some((next_future, next_partition_values))) => {
self.state = FileStreamState::Scan {
partition_values,
reader,
next: Some((next_future, next_partition_values)),
};
}
Ok(None) => {
self.state = FileStreamState::Scan {
reader,
partition_values,
next: None,
};
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
},
FileStreamState::Scan {
reader,
partition_values,
next,
} => match ready!(reader.poll_next_unpin(cx)) {
Some(result) => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
let result = result
.and_then(|b| {
self.pc_projector
.project(b, partition_values)
.map_err(|e| ArrowError::ExternalError(e.into()))
})
.map(|batch| match &mut self.remain {
Some(remain) => {
if *remain > batch.num_rows() {
*remain -= batch.num_rows();
batch
} else {
let batch = batch.slice(0, *remain);
self.state = FileStreamState::Limit;
*remain = 0;
batch
}
}
None => batch,
});
if result.is_err() {
self.state = FileStreamState::Error
}
self.file_stream_metrics.time_scanning_total.start();
return Poll::Ready(Some(result.map_err(Into::into)));
}
None => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
match mem::take(next) {
Some((future, partition_values)) => {
self.file_stream_metrics.time_opening.start();
self.state = FileStreamState::Open {
future,
partition_values,
}
}
None => return Poll::Ready(None),
}
}
},
FileStreamState::Error | FileStreamState::Limit => {
return Poll::Ready(None)
}
}
}
}
}
impl<F: FileOpener> Stream for FileStream<F> {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.file_stream_metrics.time_processing.start();
let result = self.poll_inner(cx);
self.file_stream_metrics.time_processing.stop();
self.baseline_metrics.record_poll(result)
}
}
impl<F: FileOpener> RecordBatchStream for FileStream<F> {
fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}
}
#[cfg(test)]
mod tests {
use futures::StreamExt;
use super::*;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::prelude::SessionContext;
use crate::{
error::Result,
test::{make_partition, object_store::register_test_store},
};
struct TestOpener {
records: Vec<RecordBatch>,
}
impl FileOpener for TestOpener {
fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
Ok(futures::future::ready(Ok(stream)).boxed())
}
}
async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
let records = vec![make_partition(3), make_partition(2)];
let file_schema = records[0].schema();
let reader = TestOpener { records };
let ctx = SessionContext::new();
register_test_store(&ctx, &[("mock_file1", 10), ("mock_file2", 20)]);
let config = FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema,
file_groups: vec![vec![
PartitionedFile::new("mock_file1".to_owned(), 10),
PartitionedFile::new("mock_file2".to_owned(), 20),
]],
statistics: Default::default(),
projection: None,
limit,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
};
let metrics_set = ExecutionPlanMetricsSet::new();
let file_stream = FileStream::new(&config, 0, reader, &metrics_set).unwrap();
file_stream
.map(|b| b.expect("No error expected in stream"))
.collect::<Vec<_>>()
.await
}
#[tokio::test]
async fn without_limit() -> Result<()> {
let batches = create_and_collect(None).await;
#[rustfmt::skip]
crate::assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
Ok(())
}
#[tokio::test]
async fn with_limit_between_files() -> Result<()> {
let batches = create_and_collect(Some(5)).await;
#[rustfmt::skip]
crate::assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"+---+",
], &batches);
Ok(())
}
#[tokio::test]
async fn with_limit_at_middle_of_batch() -> Result<()> {
let batches = create_and_collect(Some(6)).await;
#[rustfmt::skip]
crate::assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 0 |",
"| 1 |",
"| 0 |",
"+---+",
], &batches);
Ok(())
}
}