vortex_scan/api.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![deny(missing_docs)]
5
6//! The Vortex Scan API implements an abstract table scan interface that can be used to
7//! read data from various data sources.
8//!
9//! It supports arbitrary projection expressions, filter expressions, and limit pushdown as well
10//! as mechanisms for parallel and distributed execution via partitions.
11//!
12//! The API is currently under development and may change in future releases, however we hope to
13//! stabilize into stable C ABI for use within foreign language bindings.
14//!
15//! ## Open Issues
16//!
17//! * We probably want to make the DataSource serializable as well, so that we can share
18//! source-level state with workers, separately from partition serialization.
19//! * We should add a way for the client to negotiate capabilities with the data source, for
20//! example which encodings it knows about.
21
22use std::any::Any;
23use std::ops::Range;
24use std::sync::Arc;
25
26use async_trait::async_trait;
27use futures::stream::BoxStream;
28use vortex_array::dtype::DType;
29use vortex_array::dtype::FieldPath;
30use vortex_array::expr::Expression;
31use vortex_array::expr::root;
32use vortex_array::expr::stats::Precision;
33use vortex_array::stats::StatsSet;
34use vortex_array::stream::SendableArrayStream;
35use vortex_error::VortexResult;
36use vortex_error::vortex_bail;
37use vortex_session::VortexSession;
38
39use crate::Selection;
40
41/// A sendable stream of partitions.
42pub type PartitionStream = BoxStream<'static, VortexResult<PartitionRef>>;
43
44/// Opens a Vortex [`DataSource`] from a URI.
45///
46/// Configuration can be passed via the URI query parameters, similar to JDBC / ADBC.
47/// Providers can be registered with the [`VortexSession`] to support additional URI schemes.
48#[async_trait]
49pub trait DataSourceOpener: 'static {
50 /// Attempt to open a new data source from a URI.
51 async fn open(&self, uri: String, session: &VortexSession) -> VortexResult<DataSourceRef>;
52}
53
54/// Supports deserialization of a Vortex [`DataSource`] on a remote worker.
55#[async_trait]
56pub trait DataSourceRemote: 'static {
57 /// Attempt to deserialize the source.
58 fn deserialize_data_source(
59 &self,
60 data: &[u8],
61 session: &VortexSession,
62 ) -> VortexResult<DataSourceRef>;
63}
64
65/// A reference-counted data source.
66pub type DataSourceRef = Arc<dyn DataSource>;
67
68/// A data source represents a streamable dataset that can be scanned with projection and filter
69/// expressions. Each scan produces partitions that can be executed in parallel to read data. Each
70/// partition can be serialized for remote execution.
71///
72/// The DataSource may be used multiple times to create multiple scans, whereas each scan and each
73/// partition of a scan can only be consumed once.
74#[async_trait]
75pub trait DataSource: 'static + Send + Sync {
76 /// Returns the dtype of the source.
77 fn dtype(&self) -> &DType;
78
79 /// Returns an estimate of the row count of the un-filtered source.
80 fn row_count(&self) -> Option<Precision<u64>> {
81 None
82 }
83
84 /// Returns an estimate of the byte size of the un-filtered source.
85 fn byte_size(&self) -> Option<Precision<u64>> {
86 None
87 }
88
89 /// Serialize the [`DataSource`] to pass to a remote worker.
90 fn serialize(&self) -> VortexResult<Option<Vec<u8>>> {
91 Ok(None)
92 }
93
94 /// Deserialize a partition that was previously serialized from a compatible data source.
95 fn deserialize_partition(
96 &self,
97 data: &[u8],
98 session: &VortexSession,
99 ) -> VortexResult<PartitionRef> {
100 let _ = (data, session);
101 vortex_bail!("DataSource does not support deserialization")
102 }
103
104 /// Returns a scan over the source.
105 async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef>;
106
107 /// Returns the statistics for a given field.
108 async fn field_statistics(&self, field_path: &FieldPath) -> VortexResult<StatsSet>;
109}
110
111/// A request to scan a data source.
112#[derive(Debug, Clone)]
113pub struct ScanRequest {
114 /// Projection expression. Defaults to `root()` which returns all columns.
115 pub projection: Expression,
116 /// Filter expression, `None` implies no filter.
117 pub filter: Option<Expression>,
118 /// The row range to read.
119 pub row_range: Option<Range<u64>>,
120 /// A row selection to apply to the scan. The selection identifies rows within the specified
121 /// row range.
122 pub selection: Selection,
123 /// Whether the scan should preserve row order. If false, the scan may produce rows in any
124 /// order, for example to enable parallel execution across partitions.
125 pub ordered: bool,
126 /// Optional limit on the number of rows returned by scan. Limits are applied after all
127 /// filtering and row selection.
128 pub limit: Option<u64>,
129}
130
131impl Default for ScanRequest {
132 fn default() -> Self {
133 Self {
134 projection: root(),
135 filter: None,
136 row_range: None,
137 selection: Selection::default(),
138 ordered: false,
139 limit: None,
140 }
141 }
142}
143
144/// A boxed data source scan.
145pub type DataSourceScanRef = Box<dyn DataSourceScan>;
146
147/// A data source scan produces partitions that can be executed to read data from the source.
148pub trait DataSourceScan: 'static + Send {
149 /// The returned dtype of the scan.
150 fn dtype(&self) -> &DType;
151
152 /// Returns an estimate of the total number of partitions the scan will produce.
153 fn partition_count(&self) -> Option<Precision<usize>>;
154
155 /// Returns a stream of partitions to be processed.
156 fn partitions(self: Box<Self>) -> PartitionStream;
157}
158
159/// A reference-counted partition.
160pub type PartitionRef = Box<dyn Partition>;
161
162/// A partition represents a unit of work that can be executed to produce a stream of arrays.
163pub trait Partition: 'static + Send {
164 /// Downcast the partition to a concrete type.
165 fn as_any(&self) -> &dyn Any;
166
167 /// Returns an estimate of the row count for this partition.
168 fn row_count(&self) -> Option<Precision<u64>>;
169
170 /// Returns an estimate of the byte size for this partition.
171 fn byte_size(&self) -> Option<Precision<u64>>;
172
173 /// Serialize this partition for a remote worker.
174 fn serialize(&self) -> VortexResult<Option<Vec<u8>>> {
175 Ok(None)
176 }
177
178 /// Executes the partition, returning an array stream.
179 ///
180 /// This method must be fast. The returned stream should be lazy — all non-trivial work
181 /// (I/O, decoding, filtering) must be deferred to when the stream is polled. Expensive
182 /// operations should be spawned onto the runtime to enable parallel execution across
183 /// threads.
184 fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream>;
185}