mod adapters;
#[cfg(test)]
pub(crate) mod mocks;
use crate::PartitionedFile;
pub(crate) use adapters::FileOpenerMorselizer;
use arrow::array::RecordBatch;
use datafusion_common::Result;
use futures::FutureExt;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Morsel: Send + Debug {
fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>>;
}
pub trait Morselizer: Send + Sync + Debug {
fn plan_file(&self, file: PartitionedFile) -> Result<Box<dyn MorselPlanner>>;
}
pub trait MorselPlanner: Send + Debug {
fn plan(self: Box<Self>) -> Result<Option<MorselPlan>>;
}
#[derive(Default)]
pub struct MorselPlan {
morsels: Vec<Box<dyn Morsel>>,
ready_planners: Vec<Box<dyn MorselPlanner>>,
pending_planner: Option<PendingMorselPlanner>,
}
impl MorselPlan {
pub fn new() -> Self {
Self::default()
}
pub fn with_morsels(mut self, morsels: Vec<Box<dyn Morsel>>) -> Self {
self.morsels = morsels;
self
}
pub fn with_planners(mut self, planners: Vec<Box<dyn MorselPlanner>>) -> Self {
self.ready_planners = planners;
self
}
pub fn with_pending_planner<F>(mut self, io_future: F) -> Self
where
F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
{
self.pending_planner = Some(PendingMorselPlanner::new(io_future));
self
}
pub fn set_pending_planner<F>(&mut self, io_future: F)
where
F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
{
self.pending_planner = Some(PendingMorselPlanner::new(io_future));
}
pub fn take_morsels(&mut self) -> Vec<Box<dyn Morsel>> {
std::mem::take(&mut self.morsels)
}
pub fn take_ready_planners(&mut self) -> Vec<Box<dyn MorselPlanner>> {
std::mem::take(&mut self.ready_planners)
}
pub fn take_pending_planner(&mut self) -> Option<PendingMorselPlanner> {
self.pending_planner.take()
}
pub fn has_io_future(&self) -> bool {
self.pending_planner.is_some()
}
}
pub struct PendingMorselPlanner {
future: BoxFuture<'static, Result<Box<dyn MorselPlanner>>>,
}
impl PendingMorselPlanner {
pub fn new<F>(future: F) -> Self
where
F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
{
Self {
future: future.boxed(),
}
}
pub fn into_future(self) -> BoxFuture<'static, Result<Box<dyn MorselPlanner>>> {
self.future
}
}
impl Future for PendingMorselPlanner {
type Output = Result<Box<dyn MorselPlanner>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.future.as_mut().poll(cx)
}
}