vortex_scan/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![deny(missing_docs)]
5
6//! The Vortex Scan API implements an abstract table scan interface that can be used to
7//! read data from various data sources.
8//!
9//! It supports arbitrary projection expressions, filter expressions, and limit pushdown as well
10//! as mechanisms for parallel and distributed execution via partitions.
11//!
12//! The API is currently under development and may change in future releases, however we hope to
13//! stabilize into stable C ABI for use within foreign language bindings.
14//!
15//! If you are looking to scan Vortex files or layouts, the Vortex implementation of the Scan API
16//! can be found in the `vortex-layout` crate.
17//!
18//! ## Open Issues
19//!
20//! * We probably want to make the DataSource serializable as well, so that we can share
21//! source-level state with workers, separately from partition serialization.
22//! * We should add a way for the client to negotiate capabilities with the data source, for
23//! example which encodings it knows about.
24
25pub mod row_mask;
26pub mod selection;
27
28use std::any::Any;
29use std::ops::Range;
30use std::sync::Arc;
31
32use async_trait::async_trait;
33use futures::stream::BoxStream;
34use selection::Selection;
35use vortex_array::dtype::DType;
36use vortex_array::dtype::FieldPath;
37use vortex_array::expr::Expression;
38use vortex_array::expr::root;
39use vortex_array::expr::stats::Precision;
40use vortex_array::stats::StatsSet;
41use vortex_array::stream::SendableArrayStream;
42use vortex_error::VortexResult;
43use vortex_error::vortex_bail;
44use vortex_session::VortexSession;
45
46/// A sendable stream of partitions.
47pub type PartitionStream = BoxStream<'static, VortexResult<PartitionRef>>;
48
49/// Opens a Vortex [`DataSource`] from a URI.
50///
51/// Configuration can be passed via the URI query parameters, similar to JDBC / ADBC.
52/// Providers can be registered with the [`VortexSession`] to support additional URI schemes.
53#[async_trait]
54pub trait DataSourceOpener: 'static {
55 /// Attempt to open a new data source from a URI.
56 async fn open(&self, uri: String, session: &VortexSession) -> VortexResult<DataSourceRef>;
57}
58
59/// Supports deserialization of a Vortex [`DataSource`] on a remote worker.
60#[async_trait]
61pub trait DataSourceRemote: 'static {
62 /// Attempt to deserialize the source.
63 fn deserialize_data_source(
64 &self,
65 data: &[u8],
66 session: &VortexSession,
67 ) -> VortexResult<DataSourceRef>;
68}
69
70/// A reference-counted data source.
71pub type DataSourceRef = Arc<dyn DataSource>;
72
73/// A data source represents a streamable dataset that can be scanned with projection and filter
74/// expressions. Each scan produces partitions that can be executed in parallel to read data. Each
75/// partition can be serialized for remote execution.
76///
77/// The DataSource may be used multiple times to create multiple scans, whereas each scan and each
78/// partition of a scan can only be consumed once.
79///
80/// Partitions have indices. Partition index is stable throughout DataSource's
81/// lifetime. For every scan requested on the DataSource, the index will stay
82/// the same.
83/// However, this means you should create another instance of a DataSource if
84/// your environment changes e.g. you have a glob and another file is added to
85/// the filesystem this glob references.
86/// See MultiFileDataSource in vortex-file/src/multi/mod.rs
87#[async_trait]
88pub trait DataSource: 'static + Send + Sync {
89 /// Returns the dtype of the source.
90 fn dtype(&self) -> &DType;
91
92 /// Returns an estimate of the row count of the un-filtered source.
93 fn row_count(&self) -> Option<Precision<u64>> {
94 None
95 }
96
97 /// Returns an estimate of the byte size of the un-filtered source.
98 fn byte_size(&self) -> Option<Precision<u64>> {
99 None
100 }
101
102 /// Serialize the [`DataSource`] to pass to a remote worker.
103 fn serialize(&self) -> VortexResult<Option<Vec<u8>>> {
104 Ok(None)
105 }
106
107 /// Deserialize a partition that was previously serialized from a compatible data source.
108 fn deserialize_partition(
109 &self,
110 data: &[u8],
111 session: &VortexSession,
112 ) -> VortexResult<PartitionRef> {
113 let _ = (data, session);
114 vortex_bail!("DataSource does not support deserialization")
115 }
116
117 /// Returns a scan over the source.
118 async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef>;
119
120 /// Returns the statistics for a given field.
121 async fn field_statistics(&self, field_path: &FieldPath) -> VortexResult<StatsSet>;
122}
123
124/// A request to scan a data source.
125#[derive(Debug, Clone)]
126pub struct ScanRequest {
127 /// Projection expression. Defaults to `root()` which returns all columns.
128 pub projection: Expression,
129 /// Filter expression, `None` implies no filter.
130 pub filter: Option<Expression>,
131 /// The per-partition row range to read. Row range will be applied
132 /// over every partition you scan.
133 pub row_range: Option<Range<u64>>,
134 /// The per-partition row selection to read. Row selection will be applied
135 /// over every partition you scan.
136 pub selection: Selection,
137 /// Partition selection to scan, which allows readers to skip unwanted partitions.
138 pub partition_selection: Selection,
139 /// Partition range to scan, which allows readers to skip unwanted partitions.
140 pub partition_range: Option<Range<u64>>,
141 /// Whether the scan should preserve row order. If false, the scan may produce rows in any
142 /// order, for example to enable parallel execution across partitions.
143 pub ordered: bool,
144 /// Optional limit on the number of rows returned by scan. Limits are applied after all
145 /// filtering and row selection.
146 pub limit: Option<u64>,
147}
148
149impl Default for ScanRequest {
150 fn default() -> Self {
151 Self {
152 projection: root(),
153 filter: None,
154 row_range: None,
155 selection: Selection::default(),
156 partition_selection: Selection::default(),
157 ordered: false,
158 limit: None,
159 partition_range: None,
160 }
161 }
162}
163
164/// A boxed data source scan.
165pub type DataSourceScanRef = Box<dyn DataSourceScan>;
166
167/// A data source scan produces partitions that can be executed to read data from the source.
168pub trait DataSourceScan: 'static + Send {
169 /// The returned dtype of the scan.
170 fn dtype(&self) -> &DType;
171
172 /// Returns an estimate of the total number of partitions the scan will produce.
173 fn partition_count(&self) -> Option<Precision<usize>>;
174
175 /// Returns a stream of partitions to be processed.
176 fn partitions(self: Box<Self>) -> PartitionStream;
177}
178
179/// A reference-counted partition.
180pub type PartitionRef = Box<dyn Partition>;
181
182/// A partition represents a unit of work that can be executed to produce a stream of arrays.
183pub trait Partition: 'static + Send {
184 /// Downcast the partition to a concrete type.
185 fn as_any(&self) -> &dyn Any;
186
187 /// Some unique identifier for partition.
188 /// If you have an instance of a DataSource, the indices of emitted
189 /// partitions will stay stable for every scan in this DataSource.
190 fn index(&self) -> usize;
191
192 /// Returns an estimate of the row count for this partition.
193 fn row_count(&self) -> Option<Precision<u64>>;
194
195 /// Returns an estimate of the byte size for this partition.
196 fn byte_size(&self) -> Option<Precision<u64>>;
197
198 /// Serialize this partition for a remote worker.
199 fn serialize(&self) -> VortexResult<Option<Vec<u8>>> {
200 Ok(None)
201 }
202
203 /// Executes the partition, returning an array stream.
204 ///
205 /// This method must be fast. The returned stream should be lazy — all non-trivial work
206 /// (I/O, decoding, filtering) must be deferred to when the stream is polled. Expensive
207 /// operations should be spawned onto the runtime to enable parallel execution across
208 /// threads.
209 fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream>;
210}