Skip to main content

vortex_scan/
layout.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::collections::VecDeque;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use futures::stream;
11use vortex_array::ArrayRef;
12use vortex_array::stream::ArrayStreamAdapter;
13use vortex_array::stream::SendableArrayStream;
14use vortex_dtype::DType;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_error::vortex_bail;
18use vortex_layout::LayoutReaderRef;
19use vortex_session::VortexSession;
20
21use crate::ScanBuilder;
22use crate::api::DataSource;
23use crate::api::DataSourceScan;
24use crate::api::DataSourceScanRef;
25use crate::api::Estimate;
26use crate::api::ScanRequest;
27use crate::api::Split;
28use crate::api::SplitRef;
29
30/// An implementation of a [`DataSource`] that reads data from a [`LayoutReaderRef`].
31pub struct LayoutReaderDataSource {
32    reader: LayoutReaderRef,
33    session: VortexSession,
34}
35
36impl LayoutReaderDataSource {
37    /// Creates a new [`LayoutReaderDataSource`].
38    pub fn new(reader: LayoutReaderRef, session: VortexSession) -> Self {
39        Self { reader, session }
40    }
41}
42
43#[async_trait]
44impl DataSource for LayoutReaderDataSource {
45    fn dtype(&self) -> &DType {
46        self.reader.dtype()
47    }
48
49    fn row_count_estimate(&self) -> Estimate<u64> {
50        Estimate::Exact(self.reader.row_count())
51    }
52
53    async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
54        let mut builder = ScanBuilder::new(self.session.clone(), self.reader.clone());
55
56        if let Some(projection) = scan_request.projection {
57            builder = builder.with_projection(projection);
58        }
59
60        if let Some(filter) = scan_request.filter {
61            builder = builder.with_filter(filter);
62        }
63
64        if let Some(limit) = scan_request.limit {
65            builder = builder.with_limit(limit);
66        }
67
68        let scan = builder.prepare()?;
69        let dtype = scan.dtype().clone();
70        let splits = scan.execute(None)?;
71
72        Ok(Box::new(LayoutReaderScan {
73            dtype,
74            splits: VecDeque::from_iter(splits),
75        }))
76    }
77
78    fn serialize_split(&self, _split: &dyn Split) -> VortexResult<Vec<u8>> {
79        vortex_bail!("LayoutReader splits are not yet serializable");
80    }
81
82    fn deserialize_split(&self, _split: &[u8]) -> VortexResult<SplitRef> {
83        vortex_bail!("LayoutReader splits are not yet serializable");
84    }
85}
86
87struct LayoutReaderScan {
88    dtype: DType,
89    splits: VecDeque<BoxFuture<'static, VortexResult<Option<ArrayRef>>>>,
90}
91
92#[async_trait]
93impl DataSourceScan for LayoutReaderScan {
94    fn dtype(&self) -> &DType {
95        &self.dtype
96    }
97
98    fn remaining_splits_estimate(&self) -> Estimate<usize> {
99        Estimate::Exact(self.splits.len())
100    }
101
102    async fn next_splits(&mut self, max_splits: usize) -> VortexResult<Vec<SplitRef>> {
103        let n = std::cmp::min(max_splits, self.splits.len());
104
105        let mut splits = Vec::with_capacity(n);
106        for _ in 0..n {
107            let fut = self
108                .splits
109                .pop_front()
110                .vortex_expect("Checked length above ensures we have enough splits");
111            splits.push(Box::new(LayoutReaderSplit {
112                dtype: self.dtype.clone(),
113                fut,
114            }) as SplitRef);
115        }
116
117        Ok(splits)
118    }
119}
120
121struct LayoutReaderSplit {
122    dtype: DType,
123    fut: BoxFuture<'static, VortexResult<Option<ArrayRef>>>,
124}
125
126#[async_trait]
127impl Split for LayoutReaderSplit {
128    fn as_any(&self) -> &dyn Any {
129        self
130    }
131
132    fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
133        Ok(Box::pin(ArrayStreamAdapter::new(
134            self.dtype,
135            stream::once(self.fut).filter_map(|a| async move { a.transpose() }),
136        )))
137    }
138
139    fn row_count_estimate(&self) -> Estimate<u64> {
140        Estimate::Unknown
141    }
142
143    fn byte_size_estimate(&self) -> Estimate<u64> {
144        Estimate::Unknown
145    }
146}