Skip to main content

vortex_layout/scan/
multi.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! A [`DataSource`] that combines multiple [`LayoutReaderRef`]s into a single scannable source.
5//!
6//! Readers may be pre-opened or deferred via [`LayoutReaderFactory`]. Deferred readers are opened
7//! concurrently during scanning using `buffer_unordered`: up to `concurrency` file opens run in
8//! parallel as spawned tasks on the session runtime. Once opened, each reader yields a single
9//! partition covering its full row range; internal I/O pipelining and chunking are handled by
10//! [`ScanBuilder`].
11//!
12//! # Schema Resolution
13//!
14//! Currently, all children must share the exact same [`DType`]. A dtype
15//! mismatch produces an error.
16//!
17//! # Future Work
18//!
19//! - **Schema union**: Allow missing columns (filled with nulls) and compatible type upcasts
20//!   across sources instead of requiring exact dtype matches.
21//! - **Hive-style partitioning**: Extract partition values from file paths (e.g. `year=2024/month=01/`)
22//!   and expose them as virtual columns.
23//! - **Virtual columns**: `filename`, `file_row_number`, `file_index`.
24//! - **Per-file statistics**: Merge column statistics across sources for planner hints.
25//! - **Error resilience**: Skip failed sources instead of aborting the entire scan.
26
27use std::any::Any;
28use std::collections::VecDeque;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use futures::FutureExt;
33use futures::StreamExt;
34use futures::stream;
35use tracing::Instrument;
36use vortex_array::dtype::DType;
37use vortex_array::dtype::FieldPath;
38use vortex_array::expr::stats::Precision;
39use vortex_array::stats::StatsSet;
40use vortex_array::stream::ArrayStreamAdapter;
41use vortex_array::stream::ArrayStreamExt;
42use vortex_array::stream::SendableArrayStream;
43use vortex_error::VortexResult;
44use vortex_error::vortex_bail;
45use vortex_io::session::RuntimeSessionExt;
46use vortex_mask::Mask;
47use vortex_scan::DataSource;
48use vortex_scan::DataSourceScan;
49use vortex_scan::DataSourceScanRef;
50use vortex_scan::Partition;
51use vortex_scan::PartitionRef;
52use vortex_scan::PartitionStream;
53use vortex_scan::ScanRequest;
54use vortex_scan::selection::Selection;
55use vortex_session::VortexSession;
56use vortex_utils::parallelism::get_available_parallelism;
57
58use crate::LayoutReaderRef;
59use crate::scan::scan_builder::ScanBuilder;
60
61/// Default concurrency for opening deferred readers.
62const DEFAULT_CONCURRENCY: usize = 8;
63
64/// An async factory that produces a [`LayoutReaderRef`].
65///
66/// Implementations handle file opening, footer caching, and statistics-based pruning.
67/// Returns `None` if the source should be skipped (e.g., pruned based on file-level
68/// statistics before the reader is fully constructed).
69#[async_trait]
70pub trait LayoutReaderFactory: 'static + Send + Sync {
71    /// Opens the layout reader, or returns `None` if it should be skipped.
72    async fn open(&self) -> VortexResult<Option<LayoutReaderRef>>;
73}
74
75/// A [`DataSource`] that combines multiple [`LayoutReaderRef`]s into a single scannable source.
76///
77/// Readers may be pre-opened or deferred via [`LayoutReaderFactory`]. Deferred readers are opened
78/// concurrently during scanning using `buffer_unordered`, mirroring the DuckDB scan pattern: up
79/// to `concurrency` file opens run in parallel as spawned tasks on the session runtime. Once
80/// opened, each reader yields a single partition covering its full row range; internal I/O
81/// pipelining and chunking are handled by [`ScanBuilder`].
82pub struct MultiLayoutDataSource {
83    dtype: DType,
84    session: VortexSession,
85    children: Vec<MultiLayoutChild>,
86    concurrency: usize,
87}
88
89pub enum MultiLayoutChild {
90    Opened(LayoutReaderRef),
91    Deferred(Arc<dyn LayoutReaderFactory>),
92}
93
94impl MultiLayoutDataSource {
95    /// Creates a multi-layout data source with the first reader pre-opened.
96    ///
97    /// The first reader determines the dtype. Remaining readers are opened lazily during
98    /// scanning via their factories.
99    pub fn new_with_first(
100        first: LayoutReaderRef,
101        remaining: Vec<Arc<dyn LayoutReaderFactory>>,
102        session: &VortexSession,
103    ) -> Self {
104        let dtype = first.dtype().clone();
105        let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY);
106
107        let mut children = Vec::with_capacity(1 + remaining.len());
108        children.push(MultiLayoutChild::Opened(first));
109        children.extend(remaining.into_iter().map(MultiLayoutChild::Deferred));
110
111        Self {
112            dtype,
113            session: session.clone(),
114            children,
115            concurrency,
116        }
117    }
118
119    /// Creates a multi-layout data source where all children are deferred.
120    ///
121    /// The dtype must be provided externally since there is no pre-opened reader to infer it
122    /// from. This avoids eagerly opening any file when the schema is already known (e.g. from
123    /// a catalog or a prior scan).
124    pub fn new_deferred(
125        dtype: DType,
126        factories: Vec<Arc<dyn LayoutReaderFactory>>,
127        session: &VortexSession,
128    ) -> Self {
129        let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY);
130
131        Self {
132            dtype,
133            session: session.clone(),
134            children: factories
135                .into_iter()
136                .map(MultiLayoutChild::Deferred)
137                .collect(),
138            concurrency,
139        }
140    }
141
142    pub fn children(&self) -> &Vec<MultiLayoutChild> {
143        &self.children
144    }
145
146    /// Sets the concurrency for opening deferred readers.
147    ///
148    /// Controls how many file opens run in parallel via `buffer_unordered`.
149    /// Defaults to the number of available CPU cores.
150    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
151        self.concurrency = concurrency;
152        self
153    }
154}
155
156#[async_trait]
157impl DataSource for MultiLayoutDataSource {
158    fn dtype(&self) -> &DType {
159        &self.dtype
160    }
161
162    fn row_count(&self) -> Option<Precision<u64>> {
163        let mut sum: u64 = 0;
164        let mut opened_count: u64 = 0;
165        let mut deferred_count: u64 = 0;
166
167        for child in &self.children {
168            match child {
169                MultiLayoutChild::Opened(reader) => {
170                    opened_count += 1;
171                    sum = sum.saturating_add(reader.row_count());
172                }
173                MultiLayoutChild::Deferred(_) => {
174                    deferred_count += 1;
175                }
176            }
177        }
178
179        let total_count = opened_count + deferred_count;
180        if total_count == 0 {
181            return Some(Precision::exact(0u64));
182        }
183
184        if deferred_count == 0 {
185            Some(Precision::exact(sum))
186        } else if opened_count > 0 {
187            let avg = sum / opened_count;
188            let extrapolated = avg.saturating_mul(total_count);
189            Some(Precision::inexact(extrapolated))
190        } else {
191            None
192        }
193    }
194
195    fn deserialize_partition(
196        &self,
197        _data: &[u8],
198        _session: &VortexSession,
199    ) -> VortexResult<PartitionRef> {
200        vortex_bail!("MultiLayoutDataSource partitions are not yet serializable")
201    }
202
203    async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
204        let mut ready = VecDeque::new();
205        let mut deferred = VecDeque::new();
206
207        for child in &self.children {
208            match child {
209                MultiLayoutChild::Opened(reader) => ready.push_back(Arc::clone(reader)),
210                MultiLayoutChild::Deferred(factory) => deferred.push_back(Arc::clone(factory)),
211            }
212        }
213
214        let dtype = scan_request.projection.return_dtype(&self.dtype)?;
215
216        Ok(Box::new(MultiLayoutScan {
217            session: self.session.clone(),
218            dtype,
219            request: scan_request,
220            ready,
221            deferred,
222            handle: self.session.handle(),
223            concurrency: self.concurrency,
224        }))
225    }
226
227    async fn field_statistics(&self, _field_path: &FieldPath) -> VortexResult<StatsSet> {
228        Ok(StatsSet::default())
229    }
230}
231
232struct MultiLayoutScan {
233    session: VortexSession,
234    dtype: DType,
235    request: ScanRequest,
236    ready: VecDeque<LayoutReaderRef>,
237    deferred: VecDeque<Arc<dyn LayoutReaderFactory>>,
238    handle: vortex_io::runtime::Handle,
239    concurrency: usize,
240}
241
242impl DataSourceScan for MultiLayoutScan {
243    fn dtype(&self) -> &DType {
244        &self.dtype
245    }
246
247    fn partition_count(&self) -> Option<Precision<usize>> {
248        let count = self.ready.len() + self.deferred.len();
249        if self.deferred.is_empty() {
250            Some(Precision::exact(count))
251        } else {
252            Some(Precision::inexact(count))
253        }
254    }
255
256    fn partitions(self: Box<Self>) -> PartitionStream {
257        let Self {
258            session,
259            dtype: _,
260            request,
261            ready,
262            deferred,
263            handle,
264            concurrency,
265        } = *self;
266
267        let ordered = request.ordered;
268
269        // Pre-opened readers are immediately available.
270        let ready_stream = stream::iter(ready).map(Ok);
271
272        // Deferred readers are opened concurrently via spawned tasks.
273        // When ordered, we use `buffered` to preserve the original partition order.
274        // When unordered, we use `buffer_unordered` to yield partitions as they open.
275        let spawned = stream::iter(deferred).map(move |factory| {
276            handle.spawn(async move {
277                factory
278                    .open()
279                    .instrument(tracing::info_span!("LayoutReaderFactory::open"))
280                    .await
281            })
282        });
283
284        let deferred_stream = if ordered {
285            spawned
286                .buffered(concurrency)
287                .filter_map(|result| async move {
288                    match result {
289                        Ok(Some(reader)) => Some(Ok(reader)),
290                        Ok(None) => None,
291                        Err(e) => Some(Err(e)),
292                    }
293                })
294                .boxed()
295        } else {
296            spawned
297                .buffer_unordered(concurrency)
298                .filter_map(|result| async move {
299                    match result {
300                        Ok(Some(reader)) => Some(Ok(reader)),
301                        Ok(None) => None,
302                        Err(e) => Some(Err(e)),
303                    }
304                })
305                .boxed()
306        };
307
308        // For each reader (ready or just-opened), generate a partition.
309        // Partition generation is synchronous (just creates structs with row ranges), so
310        // `flat_map` is appropriate here. The real I/O work happens when `execute()` is called.
311        ready_stream
312            .chain(deferred_stream)
313            .enumerate()
314            .flat_map(move |(i, reader_result)| match reader_result {
315                Ok(reader) => reader_partition(i, reader, session.clone(), request.clone()),
316                Err(e) => stream::once(async move { Err(e) }).boxed(),
317            })
318            .boxed()
319    }
320}
321
322/// Generates a partition stream for a single layout reader.
323///
324/// Checks file-level pruning first (via `pruning_evaluation`). If the filter proves no rows
325/// can match, returns an empty stream. Otherwise, yields a single partition covering the
326/// reader's full row range.
327fn reader_partition(
328    partition_idx: usize,
329    reader: LayoutReaderRef,
330    session: VortexSession,
331    request: ScanRequest,
332) -> PartitionStream {
333    let row_count = reader.row_count();
334    let row_range = request.row_range.clone().unwrap_or(0..row_count);
335
336    let partition_idx_u64: u64 = partition_idx as u64;
337    if let Some(range) = &request.partition_range
338        && !range.contains(&partition_idx_u64)
339    {
340        return stream::empty().boxed();
341    };
342    match &request.partition_selection {
343        Selection::IncludeByIndex(buffer) => {
344            if buffer.as_slice().binary_search(&partition_idx_u64).is_err() {
345                return stream::empty().boxed();
346            }
347        }
348        Selection::ExcludeByIndex(buffer) => {
349            if buffer.as_slice().binary_search(&partition_idx_u64).is_ok() {
350                return stream::empty().boxed();
351            }
352        }
353        _ => {}
354    };
355
356    // Check file-level pruning: if the filter can be proven false for the entire row range
357    // using file-level statistics, skip this reader entirely.
358    if let Some(filter) = &request.filter {
359        let mask_len = usize::try_from(row_range.end - row_range.start).unwrap_or(usize::MAX);
360        let mask = Mask::new_true(mask_len);
361        if let Ok(pruning_future) = reader.pruning_evaluation(&row_range, filter, mask)
362            && let Some(Ok(result_mask)) = pruning_future.now_or_never()
363            && result_mask.all_false()
364        {
365            return stream::empty().boxed();
366        }
367    }
368
369    stream::once(async move {
370        Ok(Box::new(MultiLayoutPartition {
371            reader,
372            session,
373            request: ScanRequest {
374                row_range: Some(row_range),
375                ..request
376            },
377            index: partition_idx,
378        }) as PartitionRef)
379    })
380    .boxed()
381}
382
383/// A partition backed by a single [`LayoutReaderRef`] and a row range.
384///
385/// On `execute()`, creates a [`ScanBuilder`][crate::ScanBuilder] over the row range, enabling
386/// internal I/O pipelining and split-level parallelism within the reader.
387struct MultiLayoutPartition {
388    reader: LayoutReaderRef,
389    session: VortexSession,
390    request: ScanRequest,
391    index: usize,
392}
393
394impl Partition for MultiLayoutPartition {
395    fn as_any(&self) -> &dyn Any {
396        self
397    }
398
399    fn index(&self) -> usize {
400        self.index
401    }
402
403    fn row_count(&self) -> Option<Precision<u64>> {
404        let row_range = self.request.row_range.as_ref()?;
405        let row_count = row_range.end - row_range.start;
406        let row_count = self.request.selection.row_count(row_count);
407        let row_count = self
408            .request
409            .limit
410            .map_or(row_count, |limit| row_count.min(limit));
411
412        Some(if self.request.filter.is_some() {
413            Precision::inexact(row_count)
414        } else {
415            Precision::exact(row_count)
416        })
417    }
418
419    fn byte_size(&self) -> Option<Precision<u64>> {
420        None
421    }
422
423    fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
424        let request = self.request;
425        let mut builder = ScanBuilder::new(self.session, self.reader)
426            .with_selection(request.selection)
427            .with_projection(request.projection)
428            .with_some_filter(request.filter)
429            .with_some_limit(request.limit)
430            .with_ordered(request.ordered);
431
432        if let Some(row_range) = request.row_range {
433            builder = builder.with_row_range(row_range);
434        }
435
436        let dtype = builder.dtype()?;
437        let stream = builder.into_stream()?;
438
439        Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
440            dtype, stream,
441        )))
442    }
443}