use std::collections::BTreeSet;
use std::ops::Range;
use std::sync::Arc;
use futures::future::BoxFuture;
use futures::try_join;
use once_cell::sync::OnceCell;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::MaskFuture;
use vortex_array::builtins::ArrayBuiltins;
use vortex_array::dtype::DType;
use vortex_array::dtype::FieldMask;
use vortex_array::expr::Expression;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_mask::Mask;
use vortex_session::VortexSession;
use crate::children::LayoutChildren;
use crate::segments::SegmentSource;
pub type LayoutReaderRef = Arc<dyn LayoutReader>;
pub trait LayoutReader: 'static + Send + Sync {
fn name(&self) -> &Arc<str>;
fn dtype(&self) -> &DType;
fn row_count(&self) -> u64;
fn register_splits(
&self,
field_mask: &[FieldMask],
row_range: &Range<u64>,
splits: &mut BTreeSet<u64>,
) -> VortexResult<()>;
fn pruning_evaluation(
&self,
row_range: &Range<u64>,
expr: &Expression,
mask: Mask,
) -> VortexResult<MaskFuture>;
fn filter_evaluation(
&self,
row_range: &Range<u64>,
expr: &Expression,
mask: MaskFuture,
) -> VortexResult<MaskFuture>;
fn projection_evaluation(
&self,
row_range: &Range<u64>,
expr: &Expression,
mask: MaskFuture,
) -> VortexResult<ArrayFuture>;
}
pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
pub trait ArrayFutureExt {
fn masked(self, mask: MaskFuture) -> Self;
}
impl ArrayFutureExt for ArrayFuture {
fn masked(self, mask: MaskFuture) -> Self {
Box::pin(async move {
let (array, mask) = try_join!(self, mask)?;
array.mask(mask.into_array())
})
}
}
pub struct LazyReaderChildren {
children: Arc<dyn LayoutChildren>,
dtypes: Vec<DType>,
names: Vec<Arc<str>>,
segment_source: Arc<dyn SegmentSource>,
session: VortexSession,
cache: Vec<OnceCell<LayoutReaderRef>>,
}
impl LazyReaderChildren {
pub fn new(
children: Arc<dyn LayoutChildren>,
dtypes: Vec<DType>,
names: Vec<Arc<str>>,
segment_source: Arc<dyn SegmentSource>,
session: VortexSession,
) -> Self {
let nchildren = children.nchildren();
let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
Self {
children,
dtypes,
names,
segment_source,
session,
cache,
}
}
pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
if idx >= self.cache.len() {
vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
}
self.cache[idx].get_or_try_init(|| {
let dtype = &self.dtypes[idx];
let child = self.children.child(idx, dtype)?;
child.new_reader(
Arc::clone(&self.names[idx]),
Arc::clone(&self.segment_source),
&self.session,
)
})
}
}