use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;
use async_trait::async_trait;
use futures::FutureExt;
use futures::StreamExt;
use futures::stream;
use tracing::Instrument;
use vortex_array::dtype::DType;
use vortex_array::dtype::FieldPath;
use vortex_array::expr::stats::Precision;
use vortex_array::stats::StatsSet;
use vortex_array::stream::ArrayStreamAdapter;
use vortex_array::stream::ArrayStreamExt;
use vortex_array::stream::SendableArrayStream;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_io::session::RuntimeSessionExt;
use vortex_mask::Mask;
use vortex_scan::DataSource;
use vortex_scan::DataSourceScan;
use vortex_scan::DataSourceScanRef;
use vortex_scan::Partition;
use vortex_scan::PartitionRef;
use vortex_scan::PartitionStream;
use vortex_scan::ScanRequest;
use vortex_session::VortexSession;
use crate::LayoutReaderRef;
use crate::scan::scan_builder::ScanBuilder;
const DEFAULT_CONCURRENCY: usize = 8;
#[async_trait]
pub trait LayoutReaderFactory: 'static + Send + Sync {
async fn open(&self) -> VortexResult<Option<LayoutReaderRef>>;
}
pub struct MultiLayoutDataSource {
dtype: DType,
session: VortexSession,
children: Vec<MultiLayoutChild>,
concurrency: usize,
}
enum MultiLayoutChild {
Opened(LayoutReaderRef),
Deferred(Arc<dyn LayoutReaderFactory>),
}
impl MultiLayoutDataSource {
pub fn new_with_first(
first: LayoutReaderRef,
remaining: Vec<Arc<dyn LayoutReaderFactory>>,
session: &VortexSession,
) -> Self {
let dtype = first.dtype().clone();
let concurrency = std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(DEFAULT_CONCURRENCY);
let mut children = Vec::with_capacity(1 + remaining.len());
children.push(MultiLayoutChild::Opened(first));
children.extend(remaining.into_iter().map(MultiLayoutChild::Deferred));
Self {
dtype,
session: session.clone(),
children,
concurrency,
}
}
pub fn new_deferred(
dtype: DType,
factories: Vec<Arc<dyn LayoutReaderFactory>>,
session: &VortexSession,
) -> Self {
let concurrency = std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(DEFAULT_CONCURRENCY);
Self {
dtype,
session: session.clone(),
children: factories
.into_iter()
.map(MultiLayoutChild::Deferred)
.collect(),
concurrency,
}
}
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
}
#[async_trait]
impl DataSource for MultiLayoutDataSource {
fn dtype(&self) -> &DType {
&self.dtype
}
fn row_count(&self) -> Option<Precision<u64>> {
let mut sum: u64 = 0;
let mut opened_count: u64 = 0;
let mut deferred_count: u64 = 0;
for child in &self.children {
match child {
MultiLayoutChild::Opened(reader) => {
opened_count += 1;
sum = sum.saturating_add(reader.row_count());
}
MultiLayoutChild::Deferred(_) => {
deferred_count += 1;
}
}
}
let total_count = opened_count + deferred_count;
if total_count == 0 {
return Some(Precision::exact(0u64));
}
if deferred_count == 0 {
Some(Precision::exact(sum))
} else if opened_count > 0 {
let avg = sum / opened_count;
let extrapolated = avg.saturating_mul(total_count);
Some(Precision::inexact(extrapolated))
} else {
None
}
}
fn deserialize_partition(
&self,
_data: &[u8],
_session: &VortexSession,
) -> VortexResult<PartitionRef> {
vortex_bail!("MultiLayoutDataSource partitions are not yet serializable")
}
async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
let mut ready = VecDeque::new();
let mut deferred = VecDeque::new();
for child in &self.children {
match child {
MultiLayoutChild::Opened(reader) => ready.push_back(Arc::clone(reader)),
MultiLayoutChild::Deferred(factory) => deferred.push_back(Arc::clone(factory)),
}
}
let dtype = scan_request.projection.return_dtype(&self.dtype)?;
Ok(Box::new(MultiLayoutScan {
session: self.session.clone(),
dtype,
request: scan_request,
ready,
deferred,
handle: self.session.handle(),
concurrency: self.concurrency,
}))
}
async fn field_statistics(&self, _field_path: &FieldPath) -> VortexResult<StatsSet> {
Ok(StatsSet::default())
}
}
struct MultiLayoutScan {
session: VortexSession,
dtype: DType,
request: ScanRequest,
ready: VecDeque<LayoutReaderRef>,
deferred: VecDeque<Arc<dyn LayoutReaderFactory>>,
handle: vortex_io::runtime::Handle,
concurrency: usize,
}
impl DataSourceScan for MultiLayoutScan {
fn dtype(&self) -> &DType {
&self.dtype
}
fn partition_count(&self) -> Option<Precision<usize>> {
let count = self.ready.len() + self.deferred.len();
if self.deferred.is_empty() {
Some(Precision::exact(count))
} else {
Some(Precision::inexact(count))
}
}
fn partitions(self: Box<Self>) -> PartitionStream {
let Self {
session,
dtype: _,
request,
ready,
deferred,
handle,
concurrency,
} = *self;
let ordered = request.ordered;
let ready_stream = stream::iter(ready).map(Ok);
let spawned = stream::iter(deferred).map(move |factory| {
handle.spawn(async move {
factory
.open()
.instrument(tracing::info_span!("LayoutReaderFactory::open"))
.await
})
});
let deferred_stream = if ordered {
spawned
.buffered(concurrency)
.filter_map(|result| async move {
match result {
Ok(Some(reader)) => Some(Ok(reader)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
})
.boxed()
} else {
spawned
.buffer_unordered(concurrency)
.filter_map(|result| async move {
match result {
Ok(Some(reader)) => Some(Ok(reader)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
})
.boxed()
};
ready_stream
.chain(deferred_stream)
.flat_map(move |reader_result| match reader_result {
Ok(reader) => reader_partition(reader, session.clone(), request.clone()),
Err(e) => stream::once(async move { Err(e) }).boxed(),
})
.boxed()
}
}
fn reader_partition(
reader: LayoutReaderRef,
session: VortexSession,
request: ScanRequest,
) -> PartitionStream {
let row_count = reader.row_count();
let row_range = request.row_range.clone().unwrap_or(0..row_count);
if let Some(ref filter) = request.filter {
let mask_len = usize::try_from(row_range.end - row_range.start).unwrap_or(usize::MAX);
let mask = Mask::new_true(mask_len);
if let Ok(pruning_future) = reader.pruning_evaluation(&row_range, filter, mask)
&& let Some(Ok(result_mask)) = pruning_future.now_or_never()
&& result_mask.all_false()
{
return stream::empty().boxed();
}
}
stream::once(async move {
Ok(Box::new(MultiLayoutPartition {
reader,
session,
request: ScanRequest {
row_range: Some(row_range),
..request
},
}) as PartitionRef)
})
.boxed()
}
struct MultiLayoutPartition {
reader: LayoutReaderRef,
session: VortexSession,
request: ScanRequest,
}
impl Partition for MultiLayoutPartition {
fn as_any(&self) -> &dyn Any {
self
}
fn row_count(&self) -> Option<Precision<u64>> {
let row_range = self.request.row_range.as_ref()?;
let row_count = row_range.end - row_range.start;
let row_count = self.request.selection.row_count(row_count);
let row_count = self
.request
.limit
.map_or(row_count, |limit| row_count.min(limit));
Some(if self.request.filter.is_some() {
Precision::inexact(row_count)
} else {
Precision::exact(row_count)
})
}
fn byte_size(&self) -> Option<Precision<u64>> {
None
}
fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
let request = self.request;
let mut builder = ScanBuilder::new(self.session, self.reader)
.with_selection(request.selection)
.with_projection(request.projection)
.with_some_filter(request.filter)
.with_some_limit(request.limit)
.with_ordered(request.ordered);
if let Some(row_range) = request.row_range {
builder = builder.with_row_range(row_range);
}
let dtype = builder.dtype()?;
let stream = builder.into_stream()?;
Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
dtype, stream,
)))
}
}