vortex_layout/scan/
layout.rs1use std::any::Any;
5use std::ops::Range;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::task::Context;
9use std::task::Poll;
10
11use async_trait::async_trait;
12use futures::FutureExt;
13use futures::Stream;
14use futures::stream;
15use futures::stream::StreamExt;
16use vortex_array::IntoArray;
17use vortex_array::arrays::ConstantArray;
18use vortex_array::dtype::DType;
19use vortex_array::dtype::FieldPath;
20use vortex_array::dtype::Nullability;
21use vortex_array::expr::Expression;
22use vortex_array::expr::stats::Precision;
23use vortex_array::scalar::Scalar;
24use vortex_array::stats::StatsSet;
25use vortex_array::stream::ArrayStreamAdapter;
26use vortex_array::stream::ArrayStreamExt;
27use vortex_array::stream::SendableArrayStream;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_mask::Mask;
31use vortex_metrics::MetricsRegistry;
32use vortex_scan::DataSource;
33use vortex_scan::DataSourceScan;
34use vortex_scan::DataSourceScanRef;
35use vortex_scan::Partition;
36use vortex_scan::PartitionRef;
37use vortex_scan::PartitionStream;
38use vortex_scan::ScanRequest;
39use vortex_scan::selection::Selection;
40use vortex_session::VortexSession;
41
42use crate::LayoutReaderRef;
43use crate::scan::scan_builder::ScanBuilder;
44
45pub struct LayoutReaderDataSource {
47 reader: LayoutReaderRef,
48 session: VortexSession,
49 split_max_row_count: u64,
50 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
51}
52
53impl LayoutReaderDataSource {
54 pub fn new(reader: LayoutReaderRef, session: VortexSession) -> Self {
61 Self {
62 reader,
63 session,
64 split_max_row_count: u64::MAX,
65 metrics_registry: None,
66 }
67 }
68
69 pub fn with_split_max_row_count(mut self, row_count: u64) -> Self {
75 self.split_max_row_count = row_count;
76 self
77 }
78
79 pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
81 self.metrics_registry = Some(metrics);
82 self
83 }
84
85 pub fn with_some_metrics_registry(mut self, metrics: Option<Arc<dyn MetricsRegistry>>) -> Self {
87 self.metrics_registry = metrics;
88 self
89 }
90}
91
92#[async_trait]
93impl DataSource for LayoutReaderDataSource {
94 fn dtype(&self) -> &DType {
95 self.reader.dtype()
96 }
97
98 fn row_count(&self) -> Option<Precision<u64>> {
99 Some(Precision::exact(self.reader.row_count()))
100 }
101
102 fn byte_size(&self) -> Option<Precision<u64>> {
103 None
104 }
105
106 fn deserialize_partition(
107 &self,
108 _data: &[u8],
109 _session: &VortexSession,
110 ) -> VortexResult<PartitionRef> {
111 vortex_bail!("LayoutReader splits are not yet serializable");
112 }
113
114 async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
115 let total_rows = self.reader.row_count();
116 let row_range = scan_request.row_range.unwrap_or(0..total_rows);
117
118 let dtype = scan_request.projection.return_dtype(self.reader.dtype())?;
119
120 if let DType::Struct(fields, Nullability::NonNullable) = &dtype
123 && fields.nfields() == 0
124 && scan_request.filter.is_none()
125 {
126 let row_count = row_range.end - row_range.start;
128 let row_count = scan_request.selection.row_count(row_count);
129
130 let row_count = if let Some(limit) = scan_request.limit {
132 row_count.min(limit)
133 } else {
134 row_count
135 };
136
137 return Ok(Box::new(Empty { dtype, row_count }));
138 }
139
140 if let Some(filter) = &scan_request.filter {
143 let mask = Mask::new_true(
144 usize::try_from(row_range.end - row_range.start).unwrap_or(usize::MAX),
145 );
146 let pruning_result = self
147 .reader
148 .pruning_evaluation(&row_range, filter, mask)?
149 .now_or_never();
150 if let Some(Ok(result_mask)) = pruning_result
151 && result_mask.all_false()
152 {
153 return Ok(Box::new(Empty {
154 dtype,
155 row_count: 0,
156 }));
157 }
158 }
159
160 Ok(Box::new(LayoutReaderScan {
161 reader: Arc::clone(&self.reader),
162 session: self.session.clone(),
163 dtype,
164 projection: scan_request.projection,
165 filter: scan_request.filter,
166 limit: scan_request.limit,
167 selection: scan_request.selection,
168 ordered: scan_request.ordered,
169 metrics_registry: self.metrics_registry.clone(),
170 next_row: row_range.start,
171 end_row: row_range.end,
172 split_size: self.split_max_row_count,
173 }))
174 }
175
176 async fn field_statistics(&self, _field_path: &FieldPath) -> VortexResult<StatsSet> {
177 Ok(StatsSet::default())
178 }
179}
180
181struct LayoutReaderScan {
182 reader: LayoutReaderRef,
183 session: VortexSession,
184 dtype: DType,
185 projection: Expression,
186 filter: Option<Expression>,
187 limit: Option<u64>,
188 ordered: bool,
189 selection: Selection,
190 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
191 next_row: u64,
192 end_row: u64,
193 split_size: u64,
194}
195
196impl DataSourceScan for LayoutReaderScan {
197 fn dtype(&self) -> &DType {
198 &self.dtype
199 }
200
201 fn partition_count(&self) -> Option<Precision<usize>> {
202 let (lower, upper) = self.size_hint();
203 match upper {
204 Some(u) if u == lower => Some(Precision::exact(lower)),
205 Some(u) => Some(Precision::inexact(u)),
206 None => Some(Precision::inexact(lower)),
207 }
208 }
209
210 fn partitions(self: Box<Self>) -> PartitionStream {
211 (*self).boxed()
212 }
213}
214
215impl Stream for LayoutReaderScan {
216 type Item = VortexResult<PartitionRef>;
217
218 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
219 let this = self.get_mut();
220
221 if this.next_row >= this.end_row {
222 return Poll::Ready(None);
223 }
224
225 if this.limit.is_some_and(|limit| limit == 0) {
226 return Poll::Ready(None);
227 }
228
229 let split_end = this
230 .next_row
231 .saturating_add(this.split_size)
232 .min(this.end_row);
233 let row_range = this.next_row..split_end;
234 let split_rows = split_end - this.next_row;
235
236 let split_limit = this.limit;
237 if this.filter.is_none()
243 && let Some(ref mut limit) = this.limit
244 {
245 *limit = limit.saturating_sub(split_rows);
246 }
247
248 let split = Box::new(LayoutReaderSplit {
249 reader: Arc::clone(&this.reader),
250 session: this.session.clone(),
251 projection: this.projection.clone(),
252 filter: this.filter.clone(),
253 limit: split_limit,
254 ordered: this.ordered,
255 row_range,
256 selection: this.selection.clone(),
257 metrics_registry: this.metrics_registry.clone(),
258 }) as PartitionRef;
259
260 this.next_row = split_end;
261
262 Poll::Ready(Some(Ok(split)))
263 }
264
265 fn size_hint(&self) -> (usize, Option<usize>) {
266 if self.next_row >= self.end_row {
267 return (0, Some(0));
268 }
269 let remaining_rows = self.end_row - self.next_row;
270 let splits = remaining_rows.div_ceil(self.split_size);
271 (0, Some(usize::try_from(splits).unwrap_or(usize::MAX)))
272 }
273}
274
275struct LayoutReaderSplit {
276 reader: LayoutReaderRef,
277 session: VortexSession,
278 projection: Expression,
279 filter: Option<Expression>,
280 limit: Option<u64>,
281 ordered: bool,
282 row_range: Range<u64>,
283 selection: Selection,
284 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
285}
286
287impl Partition for LayoutReaderSplit {
288 fn as_any(&self) -> &dyn Any {
289 self
290 }
291
292 #[expect(clippy::cast_possible_truncation)]
293 fn index(&self) -> usize {
294 self.row_range.start as usize
296 }
297
298 fn row_count(&self) -> Option<Precision<u64>> {
299 let row_count = self.row_range.end - self.row_range.start;
300 let row_count = self.selection.row_count(row_count);
301 let row_count = self.limit.map_or(row_count, |limit| row_count.min(limit));
302
303 Some(if self.filter.is_some() {
304 Precision::inexact(row_count)
305 } else {
306 Precision::exact(row_count)
307 })
308 }
309
310 fn byte_size(&self) -> Option<Precision<u64>> {
311 None
312 }
313
314 fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
315 let builder = ScanBuilder::new(self.session, self.reader)
316 .with_row_range(self.row_range)
317 .with_selection(self.selection)
318 .with_projection(self.projection)
319 .with_some_filter(self.filter)
320 .with_some_limit(self.limit)
321 .with_some_metrics_registry(self.metrics_registry)
322 .with_ordered(self.ordered);
323
324 let dtype = builder.dtype()?;
325 let stream = builder.into_stream()?;
328
329 Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
330 dtype, stream,
331 )))
332 }
333}
334
335struct Empty {
337 dtype: DType,
338 row_count: u64,
339}
340
341impl DataSourceScan for Empty {
342 fn dtype(&self) -> &DType {
343 &self.dtype
344 }
345
346 fn partition_count(&self) -> Option<Precision<usize>> {
347 Some(Precision::exact(1usize))
348 }
349
350 fn partitions(self: Box<Self>) -> PartitionStream {
351 stream::iter([Ok(self as _)]).boxed()
352 }
353}
354
355impl Partition for Empty {
356 fn as_any(&self) -> &dyn Any {
357 self
358 }
359
360 fn index(&self) -> usize {
361 0
362 }
363
364 fn row_count(&self) -> Option<Precision<u64>> {
365 Some(Precision::exact(self.row_count))
366 }
367
368 fn byte_size(&self) -> Option<Precision<u64>> {
369 Some(Precision::exact(0u64))
370 }
371
372 fn execute(mut self: Box<Self>) -> VortexResult<SendableArrayStream> {
373 let scalar = Scalar::default_value(&self.dtype);
374 let dtype = self.dtype.clone();
375
376 let iter = std::iter::from_fn(move || {
378 if self.row_count == 0 {
379 return None;
380 }
381 let chunk_size = usize::try_from(self.row_count).unwrap_or(usize::MAX);
382 self.row_count -= chunk_size as u64;
383 Some(VortexResult::Ok(
384 ConstantArray::new(scalar.clone(), chunk_size).into_array(),
385 ))
386 });
387
388 Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
389 dtype,
390 stream::iter(iter),
391 )))
392 }
393}