use std::any::Any;
use std::collections::BTreeSet;
use std::ops::Range;
use std::sync::Arc;
use futures::future::BoxFuture;
use futures::try_join;
use once_cell::sync::OnceCell;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::MaskFuture;
use vortex_array::builtins::ArrayBuiltins;
use vortex_array::dtype::DType;
use vortex_array::dtype::FieldMask;
use vortex_array::expr::Expression;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_mask::Mask;
use vortex_session::VortexSession;
use crate::children::LayoutChildren;
use crate::segments::SegmentSource;
pub type LayoutReaderRef = Arc<dyn LayoutReader>;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct SplitRange {
row_offset: u64,
row_range: Range<u64>,
}
impl SplitRange {
pub fn try_new(row_offset: u64, row_range: Range<u64>) -> VortexResult<Self> {
if row_range.start > row_range.end {
vortex_bail!("Invalid split range {:?}", row_range);
}
Ok(Self {
row_offset,
row_range,
})
}
pub fn root(row_range: Range<u64>) -> VortexResult<Self> {
Self::try_new(0, row_range)
}
pub fn row_offset(&self) -> u64 {
self.row_offset
}
pub fn row_range(&self) -> &Range<u64> {
&self.row_range
}
pub fn len(&self) -> u64 {
self.row_range.end - self.row_range.start
}
pub fn is_empty(&self) -> bool {
self.row_range.is_empty()
}
pub fn root_row_range(&self) -> Range<u64> {
self.row_offset + self.row_range.start..self.row_offset + self.row_range.end
}
pub fn check_bounds(&self, row_count: u64) -> VortexResult<()> {
if self.row_range.end > row_count {
vortex_bail!(
"Split range {:?} is out of bounds for row count {}",
self.row_range,
row_count
);
}
Ok(())
}
}
pub trait LayoutReader: 'static + Send + Sync {
fn name(&self) -> &Arc<str>;
fn as_any(&self) -> &dyn Any;
fn dtype(&self) -> &DType;
fn row_count(&self) -> u64;
fn register_splits(
&self,
field_mask: &[FieldMask],
split_range: &SplitRange,
splits: &mut BTreeSet<u64>,
) -> VortexResult<()>;
fn pruning_evaluation(
&self,
row_range: &Range<u64>,
expr: &Expression,
mask: Mask,
) -> VortexResult<MaskFuture>;
fn filter_evaluation(
&self,
row_range: &Range<u64>,
expr: &Expression,
mask: MaskFuture,
) -> VortexResult<MaskFuture>;
fn projection_evaluation(
&self,
row_range: &Range<u64>,
expr: &Expression,
mask: MaskFuture,
) -> VortexResult<ArrayFuture>;
}
pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
pub trait ArrayFutureExt {
fn masked(self, mask: MaskFuture) -> Self;
}
impl ArrayFutureExt for ArrayFuture {
fn masked(self, mask: MaskFuture) -> Self {
Box::pin(async move {
let (array, mask) = try_join!(self, mask)?;
array.mask(mask.into_array())
})
}
}
pub struct LazyReaderChildren {
children: Arc<dyn LayoutChildren>,
dtypes: Vec<DType>,
names: Vec<Arc<str>>,
segment_source: Arc<dyn SegmentSource>,
session: VortexSession,
cache: Vec<OnceCell<LayoutReaderRef>>,
}
impl LazyReaderChildren {
pub fn new(
children: Arc<dyn LayoutChildren>,
dtypes: Vec<DType>,
names: Vec<Arc<str>>,
segment_source: Arc<dyn SegmentSource>,
session: VortexSession,
) -> Self {
let nchildren = children.nchildren();
let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
Self {
children,
dtypes,
names,
segment_source,
session,
cache,
}
}
pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
if idx >= self.cache.len() {
vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
}
self.cache[idx].get_or_try_init(|| {
let dtype = &self.dtypes[idx];
let child = self.children.child(idx, dtype)?;
child.new_reader(
Arc::clone(&self.names[idx]),
Arc::clone(&self.segment_source),
&self.session,
)
})
}
}