use std::collections::VecDeque;
use std::mem;
use std::ops::Range;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use futures::future::BoxFuture;
use futures::stream::{BoxStream, Stream, StreamExt};
use futures::FutureExt;
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use crate::{DeltaResult, FileMeta};
pub type FileOpenFuture =
BoxFuture<'static, DeltaResult<BoxStream<'static, DeltaResult<RecordBatch>>>>;
pub trait FileOpener: Send + Unpin {
fn open(&self, file_meta: FileMeta, range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture>;
}
#[allow(missing_debug_implementations)]
#[derive(Default)]
pub enum OnError {
#[default]
Fail,
Skip,
}
enum NextOpen {
Pending(FileOpenFuture),
Ready(DeltaResult<BoxStream<'static, DeltaResult<RecordBatch>>>),
}
enum FileStreamState {
Idle,
Open {
future: FileOpenFuture,
},
Scan {
reader: BoxStream<'static, DeltaResult<RecordBatch>>,
next: Option<NextOpen>,
},
Error,
}
#[allow(missing_debug_implementations)]
pub struct FileStream {
file_iter: VecDeque<FileMeta>,
#[allow(unused)]
projected_schema: ArrowSchemaRef,
file_opener: Box<dyn FileOpener>,
state: FileStreamState,
on_error: OnError,
}
impl FileStream {
pub fn new(
files: impl IntoIterator<Item = FileMeta>,
schema: ArrowSchemaRef,
file_opener: Box<dyn FileOpener>,
) -> DeltaResult<Self> {
Ok(Self {
file_iter: files.into_iter().collect(),
projected_schema: schema,
file_opener,
state: FileStreamState::Idle,
on_error: OnError::Fail,
})
}
pub fn with_on_error(mut self, on_error: OnError) -> Self {
self.on_error = on_error;
self
}
fn start_next_file(&mut self) -> Option<DeltaResult<FileOpenFuture>> {
let file_meta = self.file_iter.pop_front()?;
Some(self.file_opener.open(file_meta, None))
}
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<DeltaResult<RecordBatch>>> {
loop {
match &mut self.state {
FileStreamState::Idle => match self.start_next_file().transpose() {
Ok(Some(future)) => self.state = FileStreamState::Open { future },
Ok(None) => return Poll::Ready(None),
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
},
FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) {
Ok(reader) => {
let next = self.start_next_file().transpose();
match next {
Ok(Some(next_future)) => {
self.state = FileStreamState::Scan {
reader,
next: Some(NextOpen::Pending(next_future)),
};
}
Ok(None) => {
self.state = FileStreamState::Scan { reader, next: None };
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
Err(e) => match self.on_error {
OnError::Skip => self.state = FileStreamState::Idle,
OnError::Fail => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
},
},
FileStreamState::Scan { reader, next } => {
if let Some(next_open_future) = next {
if let NextOpen::Pending(f) = next_open_future {
if let Poll::Ready(reader) = f.as_mut().poll(cx) {
*next_open_future = NextOpen::Ready(reader);
}
}
}
match ready!(reader.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
return Poll::Ready(Some(Ok(batch)));
}
Some(Err(err)) => {
match self.on_error {
OnError::Skip => match mem::take(next) {
Some(future) => match future {
NextOpen::Pending(future) => {
self.state = FileStreamState::Open { future }
}
NextOpen::Ready(reader) => {
self.state = FileStreamState::Open {
future: Box::pin(std::future::ready(reader)),
}
}
},
None => return Poll::Ready(None),
},
OnError::Fail => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(err)));
}
}
}
None => match mem::take(next) {
Some(future) => match future {
NextOpen::Pending(future) => {
self.state = FileStreamState::Open { future }
}
NextOpen::Ready(reader) => {
self.state = FileStreamState::Open {
future: Box::pin(std::future::ready(reader)),
}
}
},
None => return Poll::Ready(None),
},
}
}
FileStreamState::Error => return Poll::Ready(None),
}
}
}
}
impl Stream for FileStream {
type Item = DeltaResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_inner(cx)
}
}