use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::Stream;
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use pin_project_lite::pin_project;
use vortex_array::stats::{Stat, StatsSet};
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
use vortex_array::ContextRef;
use vortex_buffer::Buffer;
use vortex_dtype::{DType, Field, FieldMask, FieldPath};
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_expr::transform::immediate_access::immediate_scope_access;
use vortex_expr::transform::simplify_typed::simplify_typed;
use vortex_expr::{ident, ExprRef};
use vortex_layout::{ExprEvaluator, LayoutReader};
use vortex_mask::Mask;
use vortex_scan::{RowMask, Scanner};
use crate::exec::ExecDriver;
use crate::io::IoDriver;
use crate::segments::channel::SegmentChannel;
use crate::{FileLayout, SplitBy};
pub struct VortexFile<I> {
pub(crate) ctx: ContextRef,
pub(crate) file_layout: FileLayout,
pub(crate) io_driver: I,
pub(crate) exec_driver: Arc<dyn ExecDriver>,
pub(crate) split_by: SplitBy,
}
pub struct Scan {
projection: ExprRef,
filter: Option<ExprRef>,
row_indices: Option<Buffer<u64>>,
}
impl Scan {
pub fn all() -> Self {
Self {
projection: ident(),
filter: None,
row_indices: None,
}
}
pub fn new(projection: ExprRef) -> Self {
Self {
projection,
filter: None,
row_indices: None,
}
}
pub fn filtered(filter: ExprRef) -> Self {
Self {
projection: ident(),
filter: Some(filter),
row_indices: None,
}
}
pub fn with_filter(mut self, filter: ExprRef) -> Self {
self.filter = Some(filter);
self
}
pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
self.filter = filter;
self
}
pub fn with_projection(mut self, projection: ExprRef) -> Self {
self.projection = projection;
self
}
pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
self.row_indices = Some(row_indices);
self
}
pub fn field_mask(&self, scope_dtype: &DType) -> VortexResult<Vec<FieldMask>> {
let projection = simplify_typed(self.projection.clone(), scope_dtype)?;
let filter = self
.filter
.clone()
.map(|f| simplify_typed(f, scope_dtype))
.transpose()?;
let Some(struct_dtype) = scope_dtype.as_struct() else {
return Ok(vec![FieldMask::All]);
};
let projection_mask = immediate_scope_access(&projection, struct_dtype)?;
let filter_mask = filter
.map(|f| immediate_scope_access(&f, struct_dtype))
.transpose()?
.unwrap_or_default();
Ok(projection_mask
.union(&filter_mask)
.cloned()
.map(|c| FieldMask::Prefix(FieldPath::from(Field::Name(c))))
.collect_vec())
}
}
impl<I: IoDriver> VortexFile<I> {
pub fn row_count(&self) -> u64 {
self.file_layout.row_count()
}
pub fn dtype(&self) -> &DType {
self.file_layout.dtype()
}
pub fn file_layout(&self) -> &FileLayout {
&self.file_layout
}
pub fn scan(&self, scan: Scan) -> VortexResult<impl ArrayStream + 'static + use<'_, I>> {
let field_mask = scan.field_mask(self.dtype())?;
let splits: Arc<[Range<u64>]> = self
.split_by
.splits(self.file_layout().root_layout(), &field_mask)?
.into_iter()
.collect_vec()
.into();
let row_masks = ArcIter::new(splits).filter_map(move |row_range| {
let Some(row_indices) = &scan.row_indices else {
return Some(RowMask::new_valid_between(row_range.start, row_range.end));
};
if row_indices
.first()
.is_some_and(|&first| first >= row_range.end)
|| row_indices
.last()
.is_some_and(|&last| row_range.start >= last)
{
return None;
}
let start_idx = row_indices
.binary_search(&row_range.start)
.unwrap_or_else(|x| x);
let end_idx = row_indices
.binary_search(&row_range.end)
.unwrap_or_else(|x| x);
if start_idx == end_idx {
return None;
}
let filter_mask = Mask::from_indices(
usize::try_from(row_range.end - row_range.start)
.vortex_expect("Split ranges are within usize"),
row_indices[start_idx..end_idx]
.iter()
.map(|&idx| {
usize::try_from(idx - row_range.start).vortex_expect("index within range")
})
.collect(),
);
Some(RowMask::new(filter_mask, row_range.start))
});
self.scan_with_masks(row_masks, scan.projection, scan.filter)
}
fn scan_with_masks<R>(
&self,
row_masks: R,
projection: ExprRef,
filter: Option<ExprRef>,
) -> VortexResult<impl ArrayStream + 'static + use<'_, I, R>>
where
R: Iterator<Item = RowMask> + Send + 'static,
{
let scanner = Arc::new(Scanner::new(self.dtype().clone(), projection, filter)?);
let result_dtype = scanner.result_dtype().clone();
let segment_channel = SegmentChannel::new();
let reader: Arc<dyn LayoutReader> = self
.file_layout
.root_layout()
.reader(segment_channel.reader(), self.ctx.clone())?;
let exec_stream = stream::iter(row_masks)
.map(
move |row_mask| match scanner.clone().range_scanner(row_mask) {
Ok(range_scan) => {
let reader = reader.clone();
async move {
range_scan
.evaluate_async(|row_mask, expr| {
reader.evaluate_expr(row_mask, expr)
})
.await
}
.boxed()
}
Err(e) => futures::future::ready(Err(e)).boxed(),
},
)
.boxed();
let exec_stream = self.exec_driver.drive(exec_stream);
let io_stream = self.io_driver.drive(segment_channel.into_stream());
Ok(ArrayStreamAdapter::new(
result_dtype,
UnifiedDriverStream {
exec_stream,
io_stream,
},
))
}
pub fn statistics(
&self,
field_paths: Arc<[FieldPath]>,
stats: Arc<[Stat]>,
) -> VortexResult<impl Future<Output = VortexResult<Vec<StatsSet>>> + 'static + use<'_, I>>
{
let segment_channel = SegmentChannel::new();
let reader: Arc<dyn LayoutReader> = self
.file_layout
.root_layout()
.reader(segment_channel.reader(), self.ctx.clone())?;
let exec_future = async move { reader.evaluate_stats(field_paths, stats).await }.boxed();
let io_stream = self.io_driver.drive(segment_channel.into_stream());
Ok(UnifiedDriverFuture {
exec_future,
io_stream,
})
}
}
struct ArcIter<T> {
inner: Arc<[T]>,
pos: usize,
}
impl<T> ArcIter<T> {
fn new(inner: Arc<[T]>) -> Self {
Self { inner, pos: 0 }
}
}
impl<T: Clone> Iterator for ArcIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
(self.pos < self.inner.len()).then(|| {
let item = self.inner[self.pos].clone();
self.pos += 1;
item
})
}
}
pin_project! {
struct UnifiedDriverStream<R, S> {
#[pin]
exec_stream: R,
#[pin]
io_stream: S,
}
}
impl<T, R, S> Stream for UnifiedDriverStream<R, S>
where
R: Stream<Item = VortexResult<T>>,
S: Stream<Item = VortexResult<()>>,
{
type Item = VortexResult<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Poll::Ready(r) = this.exec_stream.try_poll_next_unpin(cx) {
return Poll::Ready(r);
}
match this.io_stream.as_mut().try_poll_next_unpin(cx) {
Poll::Ready(Some(Ok(()))) => {}
Poll::Ready(Some(Err(result))) => {
return Poll::Ready(Some(Err(result)));
}
Poll::Ready(None) => {
return Poll::Ready(Some(Err(vortex_err!("unexpected end of I/O stream"))));
}
Poll::Pending => return Poll::Pending,
}
}
}
}
pin_project! {
struct UnifiedDriverFuture<R, S> {
#[pin]
exec_future: R,
#[pin]
io_stream: S,
}
}
impl<T, R, S> Future for UnifiedDriverFuture<R, S>
where
R: Future<Output = VortexResult<T>>,
S: Stream<Item = VortexResult<()>>,
{
type Output = VortexResult<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
if let Poll::Ready(r) = this.exec_future.try_poll_unpin(cx) {
return Poll::Ready(r);
}
match this.io_stream.as_mut().try_poll_next_unpin(cx) {
Poll::Ready(Some(Ok(()))) => {}
Poll::Ready(Some(Err(result))) => {
return Poll::Ready(Err(result));
}
Poll::Ready(None) => {
return Poll::Ready(Err(vortex_err!("unexpected end of I/O stream")));
}
Poll::Pending => return Poll::Pending,
}
}
}
}