Skip to main content

vortex_scan/
layout.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::task::Context;
9use std::task::Poll;
10
11use async_trait::async_trait;
12use futures::FutureExt;
13use futures::Stream;
14use futures::stream;
15use futures::stream::StreamExt;
16use vortex_array::IntoArray;
17use vortex_array::arrays::ConstantArray;
18use vortex_array::dtype::DType;
19use vortex_array::dtype::FieldPath;
20use vortex_array::dtype::Nullability;
21use vortex_array::expr::Expression;
22use vortex_array::expr::stats::Precision;
23use vortex_array::scalar::Scalar;
24use vortex_array::stats::StatsSet;
25use vortex_array::stream::ArrayStreamAdapter;
26use vortex_array::stream::ArrayStreamExt;
27use vortex_array::stream::SendableArrayStream;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_layout::LayoutReaderRef;
31use vortex_mask::Mask;
32use vortex_metrics::MetricsRegistry;
33use vortex_session::VortexSession;
34
35use crate::ScanBuilder;
36use crate::Selection;
37use crate::api::DataSource;
38use crate::api::DataSourceScan;
39use crate::api::DataSourceScanRef;
40use crate::api::Partition;
41use crate::api::PartitionRef;
42use crate::api::PartitionStream;
43use crate::api::ScanRequest;
44
45/// An implementation of a [`DataSource`] that reads data from a [`LayoutReaderRef`].
46pub struct LayoutReaderDataSource {
47    reader: LayoutReaderRef,
48    session: VortexSession,
49    split_max_row_count: u64,
50    metrics_registry: Option<Arc<dyn MetricsRegistry>>,
51}
52
53impl LayoutReaderDataSource {
54    /// Creates a new [`LayoutReaderDataSource`].
55    ///
56    /// By default, the entire scan is returned as a single split. This best preserves V1
57    /// `ScanBuilder` behavior where one scan covers the full row range, allowing the internal
58    /// I/O pipeline and `SplitBy::Layout` chunking to operate without per-split overhead from
59    /// redundant expression resolution and layout tree traversal.
60    pub fn new(reader: LayoutReaderRef, session: VortexSession) -> Self {
61        Self {
62            reader,
63            session,
64            split_max_row_count: u64::MAX,
65            metrics_registry: None,
66        }
67    }
68
69    /// Sets the maximum number of rows per Scan API split.
70    ///
71    /// Each split drives a [`ScanBuilder`] over its row range, which internally handles
72    /// physical layout alignment and I/O pipelining. This controls the engine-level
73    /// parallelism granularity, not the I/O granularity.
74    pub fn with_split_max_row_count(mut self, row_count: u64) -> Self {
75        self.split_max_row_count = row_count;
76        self
77    }
78
79    /// Sets the metrics registry for tracking scan performance.
80    pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
81        self.metrics_registry = Some(metrics);
82        self
83    }
84
85    /// Optionally sets the metrics registry for tracking scan performance.
86    pub fn with_some_metrics_registry(mut self, metrics: Option<Arc<dyn MetricsRegistry>>) -> Self {
87        self.metrics_registry = metrics;
88        self
89    }
90}
91
92#[async_trait]
93impl DataSource for LayoutReaderDataSource {
94    fn dtype(&self) -> &DType {
95        self.reader.dtype()
96    }
97
98    fn row_count(&self) -> Option<Precision<u64>> {
99        Some(Precision::exact(self.reader.row_count()))
100    }
101
102    fn byte_size(&self) -> Option<Precision<u64>> {
103        None
104    }
105
106    fn deserialize_partition(
107        &self,
108        _data: &[u8],
109        _session: &VortexSession,
110    ) -> VortexResult<PartitionRef> {
111        vortex_bail!("LayoutReader splits are not yet serializable");
112    }
113
114    async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
115        let total_rows = self.reader.row_count();
116        let row_range = scan_request.row_range.unwrap_or(0..total_rows);
117
118        let dtype = scan_request.projection.return_dtype(self.reader.dtype())?;
119
120        // If the dtype is an empty struct, and there is no filter, we can return a special
121        // length-only scan.
122        if let DType::Struct(fields, Nullability::NonNullable) = &dtype
123            && fields.nfields() == 0
124            && scan_request.filter.is_none()
125        {
126            // FIXME(ngates): extract out maybe?
127            let row_count = row_range.end - row_range.start;
128            let row_count = scan_request.selection.row_count(row_count);
129
130            // Apply the limit.
131            let row_count = if let Some(limit) = scan_request.limit {
132                row_count.min(limit)
133            } else {
134                row_count
135            };
136
137            return Ok(Box::new(Empty { dtype, row_count }));
138        }
139
140        // Check file-level pruning: if the filter can be proven false for the entire row range
141        // using file-level statistics (e.g. via FileStatsLayoutReader), skip the scan entirely.
142        if let Some(ref filter) = scan_request.filter {
143            let mask = Mask::new_true(
144                usize::try_from(row_range.end - row_range.start).unwrap_or(usize::MAX),
145            );
146            let pruning_result = self
147                .reader
148                .pruning_evaluation(&row_range, filter, mask)?
149                .now_or_never();
150            if let Some(Ok(result_mask)) = pruning_result
151                && result_mask.all_false()
152            {
153                return Ok(Box::new(Empty {
154                    dtype,
155                    row_count: 0,
156                }));
157            }
158        }
159
160        Ok(Box::new(LayoutReaderScan {
161            reader: self.reader.clone(),
162            session: self.session.clone(),
163            dtype,
164            projection: scan_request.projection,
165            filter: scan_request.filter,
166            limit: scan_request.limit,
167            selection: scan_request.selection,
168            ordered: scan_request.ordered,
169            metrics_registry: self.metrics_registry.clone(),
170            next_row: row_range.start,
171            end_row: row_range.end,
172            split_size: self.split_max_row_count,
173        }))
174    }
175
176    async fn field_statistics(&self, _field_path: &FieldPath) -> VortexResult<StatsSet> {
177        Ok(StatsSet::default())
178    }
179}
180
181struct LayoutReaderScan {
182    reader: LayoutReaderRef,
183    session: VortexSession,
184    dtype: DType,
185    projection: Expression,
186    filter: Option<Expression>,
187    limit: Option<u64>,
188    ordered: bool,
189    selection: Selection,
190    metrics_registry: Option<Arc<dyn MetricsRegistry>>,
191    next_row: u64,
192    end_row: u64,
193    split_size: u64,
194}
195
196impl DataSourceScan for LayoutReaderScan {
197    fn dtype(&self) -> &DType {
198        &self.dtype
199    }
200
201    fn partition_count(&self) -> Option<Precision<usize>> {
202        let (lower, upper) = self.size_hint();
203        match upper {
204            Some(u) if u == lower => Some(Precision::exact(lower)),
205            Some(u) => Some(Precision::inexact(u)),
206            None => Some(Precision::inexact(lower)),
207        }
208    }
209
210    fn partitions(self: Box<Self>) -> PartitionStream {
211        (*self).boxed()
212    }
213}
214
215impl Stream for LayoutReaderScan {
216    type Item = VortexResult<PartitionRef>;
217
218    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
219        let this = self.get_mut();
220
221        if this.next_row >= this.end_row {
222            return Poll::Ready(None);
223        }
224
225        if this.limit.is_some_and(|limit| limit == 0) {
226            return Poll::Ready(None);
227        }
228
229        let split_end = this
230            .next_row
231            .saturating_add(this.split_size)
232            .min(this.end_row);
233        let row_range = this.next_row..split_end;
234        let split_rows = split_end - this.next_row;
235
236        let split_limit = this.limit;
237        // Only decrement the remaining limit when there is no filter. With a filter,
238        // the actual output row count is unknown (could be anywhere from 0 to split_rows),
239        // so decrementing by split_rows would be too aggressive and could stop producing
240        // splits before the limit is reached. Instead, pass the full remaining limit to
241        // each split and let the engine enforce the exact limit at the stream level.
242        if this.filter.is_none()
243            && let Some(ref mut limit) = this.limit
244        {
245            *limit = limit.saturating_sub(split_rows);
246        }
247
248        let split = Box::new(LayoutReaderSplit {
249            reader: this.reader.clone(),
250            session: this.session.clone(),
251            projection: this.projection.clone(),
252            filter: this.filter.clone(),
253            limit: split_limit,
254            ordered: this.ordered,
255            row_range,
256            selection: this.selection.clone(),
257            metrics_registry: this.metrics_registry.clone(),
258        }) as PartitionRef;
259
260        this.next_row = split_end;
261
262        Poll::Ready(Some(Ok(split)))
263    }
264
265    fn size_hint(&self) -> (usize, Option<usize>) {
266        if self.next_row >= self.end_row {
267            return (0, Some(0));
268        }
269        let remaining_rows = self.end_row - self.next_row;
270        let splits = remaining_rows.div_ceil(self.split_size);
271        (0, Some(usize::try_from(splits).unwrap_or(usize::MAX)))
272    }
273}
274
275struct LayoutReaderSplit {
276    reader: LayoutReaderRef,
277    session: VortexSession,
278    projection: Expression,
279    filter: Option<Expression>,
280    limit: Option<u64>,
281    ordered: bool,
282    row_range: Range<u64>,
283    selection: Selection,
284    metrics_registry: Option<Arc<dyn MetricsRegistry>>,
285}
286
287impl Partition for LayoutReaderSplit {
288    fn as_any(&self) -> &dyn Any {
289        self
290    }
291
292    fn row_count(&self) -> Option<Precision<u64>> {
293        let row_count = self.row_range.end - self.row_range.start;
294        let row_count = self.selection.row_count(row_count);
295        let row_count = self.limit.map_or(row_count, |limit| row_count.min(limit));
296
297        Some(if self.filter.is_some() {
298            Precision::inexact(row_count)
299        } else {
300            Precision::exact(row_count)
301        })
302    }
303
304    fn byte_size(&self) -> Option<Precision<u64>> {
305        None
306    }
307
308    fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
309        let builder = ScanBuilder::new(self.session, self.reader)
310            .with_row_range(self.row_range)
311            .with_selection(self.selection)
312            .with_projection(self.projection)
313            .with_some_filter(self.filter)
314            .with_some_limit(self.limit)
315            .with_some_metrics_registry(self.metrics_registry)
316            .with_ordered(self.ordered);
317
318        let dtype = builder.dtype()?;
319        // Use into_stream() which creates a LazyScanStream that spawns individual I/O
320        // tasks onto the runtime, enabling parallel execution across executor threads.
321        let stream = builder.into_stream()?;
322
323        Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
324            dtype, stream,
325        )))
326    }
327}
328
329/// A scan that produces no data, only empty arrays with the correct row count.
330struct Empty {
331    dtype: DType,
332    row_count: u64,
333}
334
335impl DataSourceScan for Empty {
336    fn dtype(&self) -> &DType {
337        &self.dtype
338    }
339
340    fn partition_count(&self) -> Option<Precision<usize>> {
341        Some(Precision::exact(1usize))
342    }
343
344    fn partitions(self: Box<Self>) -> PartitionStream {
345        stream::iter([Ok(self as _)]).boxed()
346    }
347}
348
349impl Partition for Empty {
350    fn as_any(&self) -> &dyn Any {
351        self
352    }
353
354    fn row_count(&self) -> Option<Precision<u64>> {
355        Some(Precision::exact(self.row_count))
356    }
357
358    fn byte_size(&self) -> Option<Precision<u64>> {
359        Some(Precision::exact(0u64))
360    }
361
362    fn execute(mut self: Box<Self>) -> VortexResult<SendableArrayStream> {
363        let scalar = Scalar::default_value(&self.dtype);
364        let dtype = self.dtype.clone();
365
366        // Create an iterator of arrays with the correct row count, respecting u64::MAX limits.
367        let iter = std::iter::from_fn(move || {
368            if self.row_count == 0 {
369                return None;
370            }
371            let chunk_size = usize::try_from(self.row_count).unwrap_or(usize::MAX);
372            self.row_count -= chunk_size as u64;
373            Some(VortexResult::Ok(
374                ConstantArray::new(scalar.clone(), chunk_size).into_array(),
375            ))
376        });
377
378        Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
379            dtype,
380            stream::iter(iter),
381        )))
382    }
383}