Skip to main content

vortex_scan/
multi.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! A [`DataSource`] that combines multiple [`LayoutReaderRef`]s into a single scannable source.
5//!
6//! Readers may be pre-opened or deferred via [`LayoutReaderFactory`]. Deferred readers are opened
7//! concurrently during scanning using `buffer_unordered`: up to `concurrency` file opens run in
8//! parallel as spawned tasks on the session runtime. Once opened, each reader yields a single
9//! partition covering its full row range; internal I/O pipelining and chunking are handled by
10//! [`ScanBuilder`].
11//!
12//! # Schema Resolution
13//!
14//! Currently, all children must share the exact same [`DType`]. A dtype
15//! mismatch produces an error.
16//!
17//! # Future Work
18//!
19//! - **Schema union**: Allow missing columns (filled with nulls) and compatible type upcasts
20//!   across sources instead of requiring exact dtype matches.
21//! - **Hive-style partitioning**: Extract partition values from file paths (e.g. `year=2024/month=01/`)
22//!   and expose them as virtual columns.
23//! - **Virtual columns**: `filename`, `file_row_number`, `file_index`.
24//! - **Per-file statistics**: Merge column statistics across sources for planner hints.
25//! - **Error resilience**: Skip failed sources instead of aborting the entire scan.
26
27use std::any::Any;
28use std::collections::VecDeque;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use futures::FutureExt;
33use futures::StreamExt;
34use futures::stream;
35use tracing::Instrument;
36use vortex_array::dtype::DType;
37use vortex_array::dtype::FieldPath;
38use vortex_array::expr::stats::Precision;
39use vortex_array::stats::StatsSet;
40use vortex_array::stream::ArrayStreamAdapter;
41use vortex_array::stream::ArrayStreamExt;
42use vortex_array::stream::SendableArrayStream;
43use vortex_error::VortexResult;
44use vortex_error::vortex_bail;
45use vortex_io::session::RuntimeSessionExt;
46use vortex_layout::LayoutReaderRef;
47use vortex_mask::Mask;
48use vortex_session::VortexSession;
49
50use crate::ScanBuilder;
51use crate::api::DataSource;
52use crate::api::DataSourceScan;
53use crate::api::DataSourceScanRef;
54use crate::api::Partition;
55use crate::api::PartitionRef;
56use crate::api::PartitionStream;
57use crate::api::ScanRequest;
58
59/// Default concurrency for opening deferred readers.
60const DEFAULT_CONCURRENCY: usize = 8;
61
62/// An async factory that produces a [`LayoutReaderRef`].
63///
64/// Implementations handle file opening, footer caching, and statistics-based pruning.
65/// Returns `None` if the source should be skipped (e.g., pruned based on file-level
66/// statistics before the reader is fully constructed).
67#[async_trait]
68pub trait LayoutReaderFactory: 'static + Send + Sync {
69    /// Opens the layout reader, or returns `None` if it should be skipped.
70    async fn open(&self) -> VortexResult<Option<LayoutReaderRef>>;
71}
72
73/// A [`DataSource`] that combines multiple [`LayoutReaderRef`]s into a single scannable source.
74///
75/// Readers may be pre-opened or deferred via [`LayoutReaderFactory`]. Deferred readers are opened
76/// concurrently during scanning using `buffer_unordered`, mirroring the DuckDB scan pattern: up
77/// to `concurrency` file opens run in parallel as spawned tasks on the session runtime. Once
78/// opened, each reader yields a single partition covering its full row range; internal I/O
79/// pipelining and chunking are handled by [`ScanBuilder`].
80pub struct MultiLayoutDataSource {
81    dtype: DType,
82    session: VortexSession,
83    children: Vec<MultiLayoutChild>,
84    concurrency: usize,
85}
86
87enum MultiLayoutChild {
88    Opened(LayoutReaderRef),
89    Deferred(Arc<dyn LayoutReaderFactory>),
90}
91
92impl MultiLayoutDataSource {
93    /// Creates a multi-layout data source with the first reader pre-opened.
94    ///
95    /// The first reader determines the dtype. Remaining readers are opened lazily during
96    /// scanning via their factories.
97    pub fn new_with_first(
98        first: LayoutReaderRef,
99        remaining: Vec<Arc<dyn LayoutReaderFactory>>,
100        session: &VortexSession,
101    ) -> Self {
102        let dtype = first.dtype().clone();
103        let concurrency = std::thread::available_parallelism()
104            .map(|v| v.get())
105            .unwrap_or(DEFAULT_CONCURRENCY);
106
107        let mut children = Vec::with_capacity(1 + remaining.len());
108        children.push(MultiLayoutChild::Opened(first));
109        children.extend(remaining.into_iter().map(MultiLayoutChild::Deferred));
110
111        Self {
112            dtype,
113            session: session.clone(),
114            children,
115            concurrency,
116        }
117    }
118
119    /// Creates a multi-layout data source where all children are deferred.
120    ///
121    /// The dtype must be provided externally since there is no pre-opened reader to infer it
122    /// from. This avoids eagerly opening any file when the schema is already known (e.g. from
123    /// a catalog or a prior scan).
124    pub fn new_deferred(
125        dtype: DType,
126        factories: Vec<Arc<dyn LayoutReaderFactory>>,
127        session: &VortexSession,
128    ) -> Self {
129        let concurrency = std::thread::available_parallelism()
130            .map(|v| v.get())
131            .unwrap_or(DEFAULT_CONCURRENCY);
132
133        Self {
134            dtype,
135            session: session.clone(),
136            children: factories
137                .into_iter()
138                .map(MultiLayoutChild::Deferred)
139                .collect(),
140            concurrency,
141        }
142    }
143
144    /// Sets the concurrency for opening deferred readers.
145    ///
146    /// Controls how many file opens run in parallel via `buffer_unordered`.
147    /// Defaults to the number of available CPU cores.
148    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
149        self.concurrency = concurrency;
150        self
151    }
152}
153
154#[async_trait]
155impl DataSource for MultiLayoutDataSource {
156    fn dtype(&self) -> &DType {
157        &self.dtype
158    }
159
160    fn row_count(&self) -> Option<Precision<u64>> {
161        let mut sum: u64 = 0;
162        let mut opened_count: u64 = 0;
163        let mut deferred_count: u64 = 0;
164
165        for child in &self.children {
166            match child {
167                MultiLayoutChild::Opened(reader) => {
168                    opened_count += 1;
169                    sum = sum.saturating_add(reader.row_count());
170                }
171                MultiLayoutChild::Deferred(_) => {
172                    deferred_count += 1;
173                }
174            }
175        }
176
177        let total_count = opened_count + deferred_count;
178        if total_count == 0 {
179            return Some(Precision::exact(0u64));
180        }
181
182        if deferred_count == 0 {
183            Some(Precision::exact(sum))
184        } else if opened_count > 0 {
185            let avg = sum / opened_count;
186            let extrapolated = avg.saturating_mul(total_count);
187            Some(Precision::inexact(extrapolated))
188        } else {
189            None
190        }
191    }
192
193    fn deserialize_partition(
194        &self,
195        _data: &[u8],
196        _session: &VortexSession,
197    ) -> VortexResult<PartitionRef> {
198        vortex_bail!("MultiLayoutDataSource partitions are not yet serializable")
199    }
200
201    async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
202        let mut ready = VecDeque::new();
203        let mut deferred = VecDeque::new();
204
205        for child in &self.children {
206            match child {
207                MultiLayoutChild::Opened(reader) => ready.push_back(reader.clone()),
208                MultiLayoutChild::Deferred(factory) => deferred.push_back(factory.clone()),
209            }
210        }
211
212        let dtype = scan_request.projection.return_dtype(&self.dtype)?;
213
214        Ok(Box::new(MultiLayoutScan {
215            session: self.session.clone(),
216            dtype,
217            request: scan_request,
218            ready,
219            deferred,
220            handle: self.session.handle(),
221            concurrency: self.concurrency,
222        }))
223    }
224
225    async fn field_statistics(&self, _field_path: &FieldPath) -> VortexResult<StatsSet> {
226        Ok(StatsSet::default())
227    }
228}
229
230struct MultiLayoutScan {
231    session: VortexSession,
232    dtype: DType,
233    request: ScanRequest,
234    ready: VecDeque<LayoutReaderRef>,
235    deferred: VecDeque<Arc<dyn LayoutReaderFactory>>,
236    handle: vortex_io::runtime::Handle,
237    concurrency: usize,
238}
239
240impl DataSourceScan for MultiLayoutScan {
241    fn dtype(&self) -> &DType {
242        &self.dtype
243    }
244
245    fn partition_count(&self) -> Option<Precision<usize>> {
246        let count = self.ready.len() + self.deferred.len();
247        if self.deferred.is_empty() {
248            Some(Precision::exact(count))
249        } else {
250            Some(Precision::inexact(count))
251        }
252    }
253
254    fn partitions(self: Box<Self>) -> PartitionStream {
255        let Self {
256            session,
257            dtype: _,
258            request,
259            ready,
260            deferred,
261            handle,
262            concurrency,
263        } = *self;
264
265        let ordered = request.ordered;
266
267        // Pre-opened readers are immediately available.
268        let ready_stream = stream::iter(ready).map(Ok);
269
270        // Deferred readers are opened concurrently via spawned tasks.
271        // When ordered, we use `buffered` to preserve the original partition order.
272        // When unordered, we use `buffer_unordered` to yield partitions as they open.
273        let spawned = stream::iter(deferred).map(move |factory| {
274            handle.spawn(async move {
275                factory
276                    .open()
277                    .instrument(tracing::info_span!("LayoutReaderFactory::open"))
278                    .await
279            })
280        });
281
282        let deferred_stream = if ordered {
283            spawned
284                .buffered(concurrency)
285                .filter_map(|result| async move {
286                    match result {
287                        Ok(Some(reader)) => Some(Ok(reader)),
288                        Ok(None) => None,
289                        Err(e) => Some(Err(e)),
290                    }
291                })
292                .boxed()
293        } else {
294            spawned
295                .buffer_unordered(concurrency)
296                .filter_map(|result| async move {
297                    match result {
298                        Ok(Some(reader)) => Some(Ok(reader)),
299                        Ok(None) => None,
300                        Err(e) => Some(Err(e)),
301                    }
302                })
303                .boxed()
304        };
305
306        // For each reader (ready or just-opened), generate a partition.
307        // Partition generation is synchronous (just creates structs with row ranges), so
308        // `flat_map` is appropriate here. The real I/O work happens when `execute()` is called.
309        ready_stream
310            .chain(deferred_stream)
311            .flat_map(move |reader_result| match reader_result {
312                Ok(reader) => reader_partition(reader, session.clone(), request.clone()),
313                Err(e) => stream::once(async move { Err(e) }).boxed(),
314            })
315            .boxed()
316    }
317}
318
319/// Generates a partition stream for a single layout reader.
320///
321/// Checks file-level pruning first (via `pruning_evaluation`). If the filter proves no rows
322/// can match, returns an empty stream. Otherwise, yields a single partition covering the
323/// reader's full row range.
324fn reader_partition(
325    reader: LayoutReaderRef,
326    session: VortexSession,
327    request: ScanRequest,
328) -> PartitionStream {
329    let row_count = reader.row_count();
330    let row_range = request.row_range.clone().unwrap_or(0..row_count);
331
332    // Check file-level pruning: if the filter can be proven false for the entire row range
333    // using file-level statistics, skip this reader entirely.
334    if let Some(ref filter) = request.filter {
335        let mask_len = usize::try_from(row_range.end - row_range.start).unwrap_or(usize::MAX);
336        let mask = Mask::new_true(mask_len);
337        if let Ok(pruning_future) = reader.pruning_evaluation(&row_range, filter, mask)
338            && let Some(Ok(result_mask)) = pruning_future.now_or_never()
339            && result_mask.all_false()
340        {
341            return stream::empty().boxed();
342        }
343    }
344
345    stream::once(async move {
346        Ok(Box::new(MultiLayoutPartition {
347            reader,
348            session,
349            request: ScanRequest {
350                row_range: Some(row_range),
351                ..request
352            },
353        }) as PartitionRef)
354    })
355    .boxed()
356}
357
358/// A partition backed by a single [`LayoutReaderRef`] and a row range.
359///
360/// On `execute()`, creates a [`ScanBuilder`][crate::ScanBuilder] over the row range, enabling
361/// internal I/O pipelining and split-level parallelism within the reader.
362struct MultiLayoutPartition {
363    reader: LayoutReaderRef,
364    session: VortexSession,
365    request: ScanRequest,
366}
367
368impl Partition for MultiLayoutPartition {
369    fn as_any(&self) -> &dyn Any {
370        self
371    }
372
373    fn row_count(&self) -> Option<Precision<u64>> {
374        let row_range = self.request.row_range.as_ref()?;
375        let row_count = row_range.end - row_range.start;
376        let row_count = self.request.selection.row_count(row_count);
377        let row_count = self
378            .request
379            .limit
380            .map_or(row_count, |limit| row_count.min(limit));
381
382        Some(if self.request.filter.is_some() {
383            Precision::inexact(row_count)
384        } else {
385            Precision::exact(row_count)
386        })
387    }
388
389    fn byte_size(&self) -> Option<Precision<u64>> {
390        None
391    }
392
393    fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
394        let request = self.request;
395        let mut builder = ScanBuilder::new(self.session, self.reader)
396            .with_selection(request.selection)
397            .with_projection(request.projection)
398            .with_some_filter(request.filter)
399            .with_some_limit(request.limit)
400            .with_ordered(request.ordered);
401
402        if let Some(row_range) = request.row_range {
403            builder = builder.with_row_range(row_range);
404        }
405
406        let dtype = builder.dtype()?;
407        let stream = builder.into_stream()?;
408
409        Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
410            dtype, stream,
411        )))
412    }
413}