vortex_layout/scan/
multi.rs1use std::any::Any;
28use std::collections::VecDeque;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use futures::FutureExt;
33use futures::StreamExt;
34use futures::stream;
35use tracing::Instrument;
36use vortex_array::dtype::DType;
37use vortex_array::dtype::FieldPath;
38use vortex_array::expr::stats::Precision;
39use vortex_array::stats::StatsSet;
40use vortex_array::stream::ArrayStreamAdapter;
41use vortex_array::stream::ArrayStreamExt;
42use vortex_array::stream::SendableArrayStream;
43use vortex_error::VortexResult;
44use vortex_error::vortex_bail;
45use vortex_io::session::RuntimeSessionExt;
46use vortex_mask::Mask;
47use vortex_scan::DataSource;
48use vortex_scan::DataSourceScan;
49use vortex_scan::DataSourceScanRef;
50use vortex_scan::Partition;
51use vortex_scan::PartitionRef;
52use vortex_scan::PartitionStream;
53use vortex_scan::ScanRequest;
54use vortex_scan::selection::Selection;
55use vortex_session::VortexSession;
56use vortex_utils::parallelism::get_available_parallelism;
57
58use crate::LayoutReaderRef;
59use crate::scan::scan_builder::ScanBuilder;
60
61const DEFAULT_CONCURRENCY: usize = 8;
63
64#[async_trait]
70pub trait LayoutReaderFactory: 'static + Send + Sync {
71 async fn open(&self) -> VortexResult<Option<LayoutReaderRef>>;
73}
74
75pub struct MultiLayoutDataSource {
83 dtype: DType,
84 session: VortexSession,
85 children: Vec<MultiLayoutChild>,
86 concurrency: usize,
87}
88
89pub enum MultiLayoutChild {
90 Opened(LayoutReaderRef),
91 Deferred(Arc<dyn LayoutReaderFactory>),
92}
93
94impl MultiLayoutDataSource {
95 pub fn new_with_first(
100 first: LayoutReaderRef,
101 remaining: Vec<Arc<dyn LayoutReaderFactory>>,
102 session: &VortexSession,
103 ) -> Self {
104 let dtype = first.dtype().clone();
105 let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY);
106
107 let mut children = Vec::with_capacity(1 + remaining.len());
108 children.push(MultiLayoutChild::Opened(first));
109 children.extend(remaining.into_iter().map(MultiLayoutChild::Deferred));
110
111 Self {
112 dtype,
113 session: session.clone(),
114 children,
115 concurrency,
116 }
117 }
118
119 pub fn new_deferred(
125 dtype: DType,
126 factories: Vec<Arc<dyn LayoutReaderFactory>>,
127 session: &VortexSession,
128 ) -> Self {
129 let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY);
130
131 Self {
132 dtype,
133 session: session.clone(),
134 children: factories
135 .into_iter()
136 .map(MultiLayoutChild::Deferred)
137 .collect(),
138 concurrency,
139 }
140 }
141
142 pub fn children(&self) -> &Vec<MultiLayoutChild> {
143 &self.children
144 }
145
146 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
151 self.concurrency = concurrency;
152 self
153 }
154}
155
156#[async_trait]
157impl DataSource for MultiLayoutDataSource {
158 fn dtype(&self) -> &DType {
159 &self.dtype
160 }
161
162 fn row_count(&self) -> Option<Precision<u64>> {
163 let mut sum: u64 = 0;
164 let mut opened_count: u64 = 0;
165 let mut deferred_count: u64 = 0;
166
167 for child in &self.children {
168 match child {
169 MultiLayoutChild::Opened(reader) => {
170 opened_count += 1;
171 sum = sum.saturating_add(reader.row_count());
172 }
173 MultiLayoutChild::Deferred(_) => {
174 deferred_count += 1;
175 }
176 }
177 }
178
179 let total_count = opened_count + deferred_count;
180 if total_count == 0 {
181 return Some(Precision::exact(0u64));
182 }
183
184 if deferred_count == 0 {
185 Some(Precision::exact(sum))
186 } else if opened_count > 0 {
187 let avg = sum / opened_count;
188 let extrapolated = avg.saturating_mul(total_count);
189 Some(Precision::inexact(extrapolated))
190 } else {
191 None
192 }
193 }
194
195 fn deserialize_partition(
196 &self,
197 _data: &[u8],
198 _session: &VortexSession,
199 ) -> VortexResult<PartitionRef> {
200 vortex_bail!("MultiLayoutDataSource partitions are not yet serializable")
201 }
202
203 async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
204 let mut ready = VecDeque::new();
205 let mut deferred = VecDeque::new();
206
207 for child in &self.children {
208 match child {
209 MultiLayoutChild::Opened(reader) => ready.push_back(Arc::clone(reader)),
210 MultiLayoutChild::Deferred(factory) => deferred.push_back(Arc::clone(factory)),
211 }
212 }
213
214 let dtype = scan_request.projection.return_dtype(&self.dtype)?;
215
216 Ok(Box::new(MultiLayoutScan {
217 session: self.session.clone(),
218 dtype,
219 request: scan_request,
220 ready,
221 deferred,
222 handle: self.session.handle(),
223 concurrency: self.concurrency,
224 }))
225 }
226
227 async fn field_statistics(&self, _field_path: &FieldPath) -> VortexResult<StatsSet> {
228 Ok(StatsSet::default())
229 }
230}
231
232struct MultiLayoutScan {
233 session: VortexSession,
234 dtype: DType,
235 request: ScanRequest,
236 ready: VecDeque<LayoutReaderRef>,
237 deferred: VecDeque<Arc<dyn LayoutReaderFactory>>,
238 handle: vortex_io::runtime::Handle,
239 concurrency: usize,
240}
241
242impl DataSourceScan for MultiLayoutScan {
243 fn dtype(&self) -> &DType {
244 &self.dtype
245 }
246
247 fn partition_count(&self) -> Option<Precision<usize>> {
248 let count = self.ready.len() + self.deferred.len();
249 if self.deferred.is_empty() {
250 Some(Precision::exact(count))
251 } else {
252 Some(Precision::inexact(count))
253 }
254 }
255
256 fn partitions(self: Box<Self>) -> PartitionStream {
257 let Self {
258 session,
259 dtype: _,
260 request,
261 ready,
262 deferred,
263 handle,
264 concurrency,
265 } = *self;
266
267 let ordered = request.ordered;
268
269 let ready_stream = stream::iter(ready).map(Ok);
271
272 let spawned = stream::iter(deferred).map(move |factory| {
276 handle.spawn(async move {
277 factory
278 .open()
279 .instrument(tracing::info_span!("LayoutReaderFactory::open"))
280 .await
281 })
282 });
283
284 let deferred_stream = if ordered {
285 spawned
286 .buffered(concurrency)
287 .filter_map(|result| async move {
288 match result {
289 Ok(Some(reader)) => Some(Ok(reader)),
290 Ok(None) => None,
291 Err(e) => Some(Err(e)),
292 }
293 })
294 .boxed()
295 } else {
296 spawned
297 .buffer_unordered(concurrency)
298 .filter_map(|result| async move {
299 match result {
300 Ok(Some(reader)) => Some(Ok(reader)),
301 Ok(None) => None,
302 Err(e) => Some(Err(e)),
303 }
304 })
305 .boxed()
306 };
307
308 ready_stream
312 .chain(deferred_stream)
313 .enumerate()
314 .flat_map(move |(i, reader_result)| match reader_result {
315 Ok(reader) => reader_partition(i, reader, session.clone(), request.clone()),
316 Err(e) => stream::once(async move { Err(e) }).boxed(),
317 })
318 .boxed()
319 }
320}
321
322fn reader_partition(
328 partition_idx: usize,
329 reader: LayoutReaderRef,
330 session: VortexSession,
331 request: ScanRequest,
332) -> PartitionStream {
333 let row_count = reader.row_count();
334 let row_range = request.row_range.clone().unwrap_or(0..row_count);
335
336 let partition_idx_u64: u64 = partition_idx as u64;
337 if let Some(range) = &request.partition_range
338 && !range.contains(&partition_idx_u64)
339 {
340 return stream::empty().boxed();
341 };
342 match &request.partition_selection {
343 Selection::IncludeByIndex(buffer) => {
344 if buffer.as_slice().binary_search(&partition_idx_u64).is_err() {
345 return stream::empty().boxed();
346 }
347 }
348 Selection::ExcludeByIndex(buffer) => {
349 if buffer.as_slice().binary_search(&partition_idx_u64).is_ok() {
350 return stream::empty().boxed();
351 }
352 }
353 _ => {}
354 };
355
356 if let Some(filter) = &request.filter {
359 let mask_len = usize::try_from(row_range.end - row_range.start).unwrap_or(usize::MAX);
360 let mask = Mask::new_true(mask_len);
361 if let Ok(pruning_future) = reader.pruning_evaluation(&row_range, filter, mask)
362 && let Some(Ok(result_mask)) = pruning_future.now_or_never()
363 && result_mask.all_false()
364 {
365 return stream::empty().boxed();
366 }
367 }
368
369 stream::once(async move {
370 Ok(Box::new(MultiLayoutPartition {
371 reader,
372 session,
373 request: ScanRequest {
374 row_range: Some(row_range),
375 ..request
376 },
377 index: partition_idx,
378 }) as PartitionRef)
379 })
380 .boxed()
381}
382
383struct MultiLayoutPartition {
388 reader: LayoutReaderRef,
389 session: VortexSession,
390 request: ScanRequest,
391 index: usize,
392}
393
394impl Partition for MultiLayoutPartition {
395 fn as_any(&self) -> &dyn Any {
396 self
397 }
398
399 fn index(&self) -> usize {
400 self.index
401 }
402
403 fn row_count(&self) -> Option<Precision<u64>> {
404 let row_range = self.request.row_range.as_ref()?;
405 let row_count = row_range.end - row_range.start;
406 let row_count = self.request.selection.row_count(row_count);
407 let row_count = self
408 .request
409 .limit
410 .map_or(row_count, |limit| row_count.min(limit));
411
412 Some(if self.request.filter.is_some() {
413 Precision::inexact(row_count)
414 } else {
415 Precision::exact(row_count)
416 })
417 }
418
419 fn byte_size(&self) -> Option<Precision<u64>> {
420 None
421 }
422
423 fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
424 let request = self.request;
425 let mut builder = ScanBuilder::new(self.session, self.reader)
426 .with_selection(request.selection)
427 .with_projection(request.projection)
428 .with_some_filter(request.filter)
429 .with_some_limit(request.limit)
430 .with_ordered(request.ordered);
431
432 if let Some(row_range) = request.row_range {
433 builder = builder.with_row_range(row_range);
434 }
435
436 let dtype = builder.dtype()?;
437 let stream = builder.into_stream()?;
438
439 Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
440 dtype, stream,
441 )))
442 }
443}