1use std::any::Any;
28use std::collections::VecDeque;
29use std::sync::Arc;
30
31use async_trait::async_trait;
32use futures::FutureExt;
33use futures::StreamExt;
34use futures::stream;
35use tracing::Instrument;
36use vortex_array::dtype::DType;
37use vortex_array::dtype::FieldPath;
38use vortex_array::expr::stats::Precision;
39use vortex_array::stats::StatsSet;
40use vortex_array::stream::ArrayStreamAdapter;
41use vortex_array::stream::ArrayStreamExt;
42use vortex_array::stream::SendableArrayStream;
43use vortex_error::VortexResult;
44use vortex_error::vortex_bail;
45use vortex_io::session::RuntimeSessionExt;
46use vortex_layout::LayoutReaderRef;
47use vortex_mask::Mask;
48use vortex_session::VortexSession;
49
50use crate::ScanBuilder;
51use crate::api::DataSource;
52use crate::api::DataSourceScan;
53use crate::api::DataSourceScanRef;
54use crate::api::Partition;
55use crate::api::PartitionRef;
56use crate::api::PartitionStream;
57use crate::api::ScanRequest;
58
59const DEFAULT_CONCURRENCY: usize = 8;
61
62#[async_trait]
68pub trait LayoutReaderFactory: 'static + Send + Sync {
69 async fn open(&self) -> VortexResult<Option<LayoutReaderRef>>;
71}
72
73pub struct MultiLayoutDataSource {
81 dtype: DType,
82 session: VortexSession,
83 children: Vec<MultiLayoutChild>,
84 concurrency: usize,
85}
86
87enum MultiLayoutChild {
88 Opened(LayoutReaderRef),
89 Deferred(Arc<dyn LayoutReaderFactory>),
90}
91
92impl MultiLayoutDataSource {
93 pub fn new_with_first(
98 first: LayoutReaderRef,
99 remaining: Vec<Arc<dyn LayoutReaderFactory>>,
100 session: &VortexSession,
101 ) -> Self {
102 let dtype = first.dtype().clone();
103 let concurrency = std::thread::available_parallelism()
104 .map(|v| v.get())
105 .unwrap_or(DEFAULT_CONCURRENCY);
106
107 let mut children = Vec::with_capacity(1 + remaining.len());
108 children.push(MultiLayoutChild::Opened(first));
109 children.extend(remaining.into_iter().map(MultiLayoutChild::Deferred));
110
111 Self {
112 dtype,
113 session: session.clone(),
114 children,
115 concurrency,
116 }
117 }
118
119 pub fn new_deferred(
125 dtype: DType,
126 factories: Vec<Arc<dyn LayoutReaderFactory>>,
127 session: &VortexSession,
128 ) -> Self {
129 let concurrency = std::thread::available_parallelism()
130 .map(|v| v.get())
131 .unwrap_or(DEFAULT_CONCURRENCY);
132
133 Self {
134 dtype,
135 session: session.clone(),
136 children: factories
137 .into_iter()
138 .map(MultiLayoutChild::Deferred)
139 .collect(),
140 concurrency,
141 }
142 }
143
144 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
149 self.concurrency = concurrency;
150 self
151 }
152}
153
154#[async_trait]
155impl DataSource for MultiLayoutDataSource {
156 fn dtype(&self) -> &DType {
157 &self.dtype
158 }
159
160 fn row_count(&self) -> Option<Precision<u64>> {
161 let mut sum: u64 = 0;
162 let mut opened_count: u64 = 0;
163 let mut deferred_count: u64 = 0;
164
165 for child in &self.children {
166 match child {
167 MultiLayoutChild::Opened(reader) => {
168 opened_count += 1;
169 sum = sum.saturating_add(reader.row_count());
170 }
171 MultiLayoutChild::Deferred(_) => {
172 deferred_count += 1;
173 }
174 }
175 }
176
177 let total_count = opened_count + deferred_count;
178 if total_count == 0 {
179 return Some(Precision::exact(0u64));
180 }
181
182 if deferred_count == 0 {
183 Some(Precision::exact(sum))
184 } else if opened_count > 0 {
185 let avg = sum / opened_count;
186 let extrapolated = avg.saturating_mul(total_count);
187 Some(Precision::inexact(extrapolated))
188 } else {
189 None
190 }
191 }
192
193 fn deserialize_partition(
194 &self,
195 _data: &[u8],
196 _session: &VortexSession,
197 ) -> VortexResult<PartitionRef> {
198 vortex_bail!("MultiLayoutDataSource partitions are not yet serializable")
199 }
200
201 async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
202 let mut ready = VecDeque::new();
203 let mut deferred = VecDeque::new();
204
205 for child in &self.children {
206 match child {
207 MultiLayoutChild::Opened(reader) => ready.push_back(reader.clone()),
208 MultiLayoutChild::Deferred(factory) => deferred.push_back(factory.clone()),
209 }
210 }
211
212 let dtype = scan_request.projection.return_dtype(&self.dtype)?;
213
214 Ok(Box::new(MultiLayoutScan {
215 session: self.session.clone(),
216 dtype,
217 request: scan_request,
218 ready,
219 deferred,
220 handle: self.session.handle(),
221 concurrency: self.concurrency,
222 }))
223 }
224
225 async fn field_statistics(&self, _field_path: &FieldPath) -> VortexResult<StatsSet> {
226 Ok(StatsSet::default())
227 }
228}
229
230struct MultiLayoutScan {
231 session: VortexSession,
232 dtype: DType,
233 request: ScanRequest,
234 ready: VecDeque<LayoutReaderRef>,
235 deferred: VecDeque<Arc<dyn LayoutReaderFactory>>,
236 handle: vortex_io::runtime::Handle,
237 concurrency: usize,
238}
239
240impl DataSourceScan for MultiLayoutScan {
241 fn dtype(&self) -> &DType {
242 &self.dtype
243 }
244
245 fn partition_count(&self) -> Option<Precision<usize>> {
246 let count = self.ready.len() + self.deferred.len();
247 if self.deferred.is_empty() {
248 Some(Precision::exact(count))
249 } else {
250 Some(Precision::inexact(count))
251 }
252 }
253
254 fn partitions(self: Box<Self>) -> PartitionStream {
255 let Self {
256 session,
257 dtype: _,
258 request,
259 ready,
260 deferred,
261 handle,
262 concurrency,
263 } = *self;
264
265 let ordered = request.ordered;
266
267 let ready_stream = stream::iter(ready).map(Ok);
269
270 let spawned = stream::iter(deferred).map(move |factory| {
274 handle.spawn(async move {
275 factory
276 .open()
277 .instrument(tracing::info_span!("LayoutReaderFactory::open"))
278 .await
279 })
280 });
281
282 let deferred_stream = if ordered {
283 spawned
284 .buffered(concurrency)
285 .filter_map(|result| async move {
286 match result {
287 Ok(Some(reader)) => Some(Ok(reader)),
288 Ok(None) => None,
289 Err(e) => Some(Err(e)),
290 }
291 })
292 .boxed()
293 } else {
294 spawned
295 .buffer_unordered(concurrency)
296 .filter_map(|result| async move {
297 match result {
298 Ok(Some(reader)) => Some(Ok(reader)),
299 Ok(None) => None,
300 Err(e) => Some(Err(e)),
301 }
302 })
303 .boxed()
304 };
305
306 ready_stream
310 .chain(deferred_stream)
311 .flat_map(move |reader_result| match reader_result {
312 Ok(reader) => reader_partition(reader, session.clone(), request.clone()),
313 Err(e) => stream::once(async move { Err(e) }).boxed(),
314 })
315 .boxed()
316 }
317}
318
319fn reader_partition(
325 reader: LayoutReaderRef,
326 session: VortexSession,
327 request: ScanRequest,
328) -> PartitionStream {
329 let row_count = reader.row_count();
330 let row_range = request.row_range.clone().unwrap_or(0..row_count);
331
332 if let Some(ref filter) = request.filter {
335 let mask_len = usize::try_from(row_range.end - row_range.start).unwrap_or(usize::MAX);
336 let mask = Mask::new_true(mask_len);
337 if let Ok(pruning_future) = reader.pruning_evaluation(&row_range, filter, mask)
338 && let Some(Ok(result_mask)) = pruning_future.now_or_never()
339 && result_mask.all_false()
340 {
341 return stream::empty().boxed();
342 }
343 }
344
345 stream::once(async move {
346 Ok(Box::new(MultiLayoutPartition {
347 reader,
348 session,
349 request: ScanRequest {
350 row_range: Some(row_range),
351 ..request
352 },
353 }) as PartitionRef)
354 })
355 .boxed()
356}
357
358struct MultiLayoutPartition {
363 reader: LayoutReaderRef,
364 session: VortexSession,
365 request: ScanRequest,
366}
367
368impl Partition for MultiLayoutPartition {
369 fn as_any(&self) -> &dyn Any {
370 self
371 }
372
373 fn row_count(&self) -> Option<Precision<u64>> {
374 let row_range = self.request.row_range.as_ref()?;
375 let row_count = row_range.end - row_range.start;
376 let row_count = self.request.selection.row_count(row_count);
377 let row_count = self
378 .request
379 .limit
380 .map_or(row_count, |limit| row_count.min(limit));
381
382 Some(if self.request.filter.is_some() {
383 Precision::inexact(row_count)
384 } else {
385 Precision::exact(row_count)
386 })
387 }
388
389 fn byte_size(&self) -> Option<Precision<u64>> {
390 None
391 }
392
393 fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
394 let request = self.request;
395 let mut builder = ScanBuilder::new(self.session, self.reader)
396 .with_selection(request.selection)
397 .with_projection(request.projection)
398 .with_some_filter(request.filter)
399 .with_some_limit(request.limit)
400 .with_ordered(request.ordered);
401
402 if let Some(row_range) = request.row_range {
403 builder = builder.with_row_range(row_range);
404 }
405
406 let dtype = builder.dtype()?;
407 let stream = builder.into_stream()?;
408
409 Ok(ArrayStreamExt::boxed(ArrayStreamAdapter::new(
410 dtype, stream,
411 )))
412 }
413}