1use std::any::Any;
5use std::ops::Range;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::task::Context;
9use std::task::Poll;
10
11use async_trait::async_trait;
12use futures::FutureExt;
13use futures::Stream;
14use futures::stream;
15use futures::stream::StreamExt;
16use vortex_array::IntoArray;
17use vortex_array::arrays::ConstantArray;
18use vortex_array::dtype::DType;
19use vortex_array::dtype::FieldPath;
20use vortex_array::dtype::Nullability;
21use vortex_array::expr::Expression;
22use vortex_array::expr::stats::Precision;
23use vortex_array::scalar::Scalar;
24use vortex_array::stats::StatsSet;
25use vortex_array::stream::ArrayStreamAdapter;
26use vortex_array::stream::ArrayStreamExt;
27use vortex_array::stream::SendableArrayStream;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_layout::LayoutReaderRef;
31use vortex_mask::Mask;
32use vortex_metrics::MetricsRegistry;
33use vortex_session::VortexSession;
34
35use crate::ScanBuilder;
36use crate::Selection;
37use crate::api::DataSource;
38use crate::api::DataSourceScan;
39use crate::api::DataSourceScanRef;
40use crate::api::Partition;
41use crate::api::PartitionRef;
42use crate::api::PartitionStream;
43use crate::api::ScanRequest;
44
45pub struct LayoutReaderDataSource {
47 reader: LayoutReaderRef,
48 session: VortexSession,
49 split_max_row_count: u64,
50 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
51}
52
53impl LayoutReaderDataSource {
54 pub fn new(reader: LayoutReaderRef, session: VortexSession) -> Self {
61 Self {
62 reader,
63 session,
64 split_max_row_count: u64::MAX,
65 metrics_registry: None,
66 }
67 }
68
69 pub fn with_split_max_row_count(mut self, row_count: u64) -> Self {
75 self.split_max_row_count = row_count;
76 self
77 }
78
79 pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
81 self.metrics_registry = Some(metrics);
82 self
83 }
84
85 pub fn with_some_metrics_registry(mut self, metrics: Option<Arc<dyn MetricsRegistry>>) -> Self {
87 self.metrics_registry = metrics;
88 self
89 }
90}
91
92#[async_trait]
93impl DataSource for LayoutReaderDataSource {
94 fn dtype(&self) -> &DType {
95 self.reader.dtype()
96 }
97
98 fn row_count(&self) -> Option<Precision<u64>> {
99 Some(Precision::exact(self.reader.row_count()))
100 }
101
102 fn byte_size(&self) -> Option<Precision<u64>> {
103 None
104 }
105
106 fn deserialize_partition(
107 &self,
108 _data: &[u8],
109 _session: &VortexSession,
110 ) -> VortexResult<PartitionRef> {
111 vortex_bail!("LayoutReader splits are not yet serializable");
112 }
113
114 async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
115 let total_rows = self.reader.row_count();
116 let row_range = scan_request.row_range.unwrap_or(0..total_rows);
117
118 let dtype = scan_request.projection.return_dtype(self.reader.dtype())?;
119
120 if let DType::Struct(fields, Nullability::NonNullable) = &dtype
123 && fields.nfields() == 0
124 && scan_request.filter.is_none()
125 {
126 let row_count = row_range.end - row_range.start;
128 let row_count = scan_request.selection.row_count(row_count);
129
130 let row_count = if let Some(limit) = scan_request.limit {
132 row_count.min(limit)
133 } else {
134 row_count
135 };
136
137 return Ok(Box::new(Empty { dtype, row_count }));
138 }
139
140 if let Some(ref filter) = scan_request.filter {
143 let mask = Mask::new_true(
144 usize::try_from(row_range.end - row_range.start).unwrap_or(usize::MAX),
145 );
146 let pruning_result = self
147 .reader
148 .pruning_evaluation(&row_range, filter, mask)?
149 .now_or_never();
150 if let Some(Ok(result_mask)) = pruning_result
151 && result_mask.all_false()
152 {
153 return Ok(Box::new(Empty {
154 dtype,
155 row_count: 0,
156 }));
157 }
158 }
159
160 Ok(Box::new(LayoutReaderScan {
161 reader: self.reader.clone(),
162 session: self.session.clone(),
163 dtype,
164 projection: scan_request.projection,
165 filter: scan_request.filter,
166 limit: scan_request.limit,
167 selection: scan_request.selection,
168 ordered: scan_request.ordered,
169 metrics_registry: self.metrics_registry.clone(),
170 next_row: row_range.start,
171 end_row: row_range.end,
172 split_size: self.split_max_row_count,
173 }))
174 }
175
176 async fn field_statistics(&self, _field_path: &FieldPath) -> VortexResult<StatsSet> {
177 Ok(StatsSet::default())
178 }
179}
180
181struct LayoutReaderScan {
182 reader: LayoutReaderRef,
183 session: VortexSession,
184 dtype: DType,
185 projection: Expression,
186 filter: Option<Expression>,
187 limit: Option<u64>,
188 ordered: bool,
189 selection: Selection,
190 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
191 next_row: u64,
192 end_row: u64,
193 split_size: u64,
194}
195
196impl DataSourceScan for LayoutReaderScan {
197 fn dtype(&self) -> &DType {
198 &self.dtype
199 }
200
201 fn partition_count(&self) -> Option<Precision<usize>> {
202 let (lower, upper) = self.size_hint();
203 match upper {
204 Some(u) if u == lower => Some(Precision::exact(lower)),
205 Some(u) => Some(Precision::inexact(u)),
206 None => Some(Precision::inexact(lower)),
207 }
208 }
209
210 fn partitions(self: Box<Self>) -> PartitionStream {
211 (*self).boxed()
212 }
213}
214
215impl Stream for LayoutReaderScan {
216 type Item = VortexResult<PartitionRef>;
217
218 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
219 let this = self.get_mut();
220
221 if this.next_row >= this.end_row {
222 return Poll::Ready(None);
223 }
224
225 if this.limit.is_some_and(|limit| limit == 0) {
226 return Poll::Ready(None);
227 }
228
229 let split_end = this
230 .next_row
231 .saturating_add(this.split_size)
232 .min(this.end_row);
233 let row_range = this.next_row..split_end;
234 let split_rows = split_end - this.next_row;
235
236 let split_limit = this.limit;
237 if this.filter.is_none()
243 && let Some(ref mut limit) = this.limit
244 {
245 *limit = limit.saturating_sub(split_rows);
246 }
247
248 let split = Box::new(LayoutReaderSplit {
249 reader: this.reader.clone(),
250 session: this.session.clone(),
251 projection: this.projection.clone(),
252 filter: this.filter.clone(),
253 limit: split_limit,
254 ordered: this.ordered,
255 row_range,
256 selection: this.selection.clone(),
257 metrics_registry: this.metrics_registry.clone(),
258 }) as PartitionRef;
259
260 this.next_row = split_end;
261
262 Poll::Ready(Some(Ok(split)))
263 }
264
265 fn size_hint(&self) -> (usize, Option<usize>) {
266 if self.next_row >= self.end_row {
267 return (0, Some(0));
268 }
269 let remaining_rows = self.end_row - self.next_row;
270 let splits = remaining_rows.div_ceil(self.split_size);
271 (0, Some(usize::try_from(splits).unwrap_or(usize::MAX)))
272 }
273}
274
275struct LayoutReaderSplit {
276 reader: LayoutReaderRef,
277 session: VortexSession,
278 projection: Expression,
279 filter: Option<Expression>,
280 limit: Option<u64>,
281 ordered: bool,
282 row_range: Range<u64>,
283 selection: Selection,
284 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
285}
286
287impl Partition for LayoutReaderSplit {
288 fn as_any(&self) -> &dyn Any {
289 self
290 }
291
292 fn row_count(&self) -> Option<Precision<u64>> {
293 let row_count = self.row_range.end - self.row_range.start;
294 let row_count = self.selection.row_count(row_count);
295 let row_count = self.limit.map_or(row_count, |limit| row_count.min(limit));
296
297 Some(if self.filter.is_some() {
298 Precision::inexact(row_count)
299 } else {
300 Precision::exact(row_count)
301 })
302 }
303
304 fn byte_size(&self) -> Option<Precision<u64>> {
305 None
306 }
307
308 fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
309 let builder = ScanBuilder::new(self.session, self.reader)
310 .with_row_range(self.row_range)
311 .with_selection(self.selection)
312 .with_projection(self.projection)
313 .with_some_filter(self.filter)
314 .with_some_limit(self.limit)
315 .with_some_metrics_registry(self.metrics_registry)
316 .with_ordered(self.ordered);
317
318 let dtype = builder.dtype()?;
319 let stream = builder.into_stream()?;
322
323 Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
324 dtype, stream,
325 )))
326 }
327}
328
329struct Empty {
331 dtype: DType,
332 row_count: u64,
333}
334
335impl DataSourceScan for Empty {
336 fn dtype(&self) -> &DType {
337 &self.dtype
338 }
339
340 fn partition_count(&self) -> Option<Precision<usize>> {
341 Some(Precision::exact(1usize))
342 }
343
344 fn partitions(self: Box<Self>) -> PartitionStream {
345 stream::iter([Ok(self as _)]).boxed()
346 }
347}
348
349impl Partition for Empty {
350 fn as_any(&self) -> &dyn Any {
351 self
352 }
353
354 fn row_count(&self) -> Option<Precision<u64>> {
355 Some(Precision::exact(self.row_count))
356 }
357
358 fn byte_size(&self) -> Option<Precision<u64>> {
359 Some(Precision::exact(0u64))
360 }
361
362 fn execute(mut self: Box<Self>) -> VortexResult<SendableArrayStream> {
363 let scalar = Scalar::default_value(&self.dtype);
364 let dtype = self.dtype.clone();
365
366 let iter = std::iter::from_fn(move || {
368 if self.row_count == 0 {
369 return None;
370 }
371 let chunk_size = usize::try_from(self.row_count).unwrap_or(usize::MAX);
372 self.row_count -= chunk_size as u64;
373 Some(VortexResult::Ok(
374 ConstantArray::new(scalar.clone(), chunk_size).into_array(),
375 ))
376 });
377
378 Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
379 dtype,
380 stream::iter(iter),
381 )))
382 }
383}