vortex_datafusion/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Integrations between [`Vortex`] and [DataFusion].
5//!
6//! The crate exposes two main entry points:
7//!
8//! - [`VortexFormatFactory`] for the file-based integration used by SQL,
9//! `CREATE EXTERNAL TABLE`, and
10//! [`ListingTable`].
11//! - [`v2`] for direct integration from an existing Vortex
12//! [`DataSourceRef`].
13//!
14//! # Registering The File Format
15//!
16//! Most applications register [`VortexFormatFactory`] with a DataFusion
17//! [`SessionContext`] and then let DataFusion create [`VortexFormat`] and
18//! [`VortexSource`] instances as queries are planned:
19//!
20//! ```no_run
21//! use std::sync::Arc;
22//!
23//! use datafusion::datasource::provider::DefaultTableFactory;
24//! use datafusion::execution::SessionStateBuilder;
25//! use datafusion::prelude::SessionContext;
26//! use datafusion_common::GetExt;
27//! use vortex_datafusion::VortexFormatFactory;
28//!
29//! # #[tokio::main]
30//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
31//! let factory = Arc::new(VortexFormatFactory::new());
32//! let mut state_builder = SessionStateBuilder::new()
33//! .with_default_features()
34//! .with_table_factory(
35//! factory.get_ext().to_uppercase(),
36//! Arc::new(DefaultTableFactory::new()),
37//! );
38//!
39//! if let Some(file_formats) = state_builder.file_formats() {
40//! file_formats.push(factory.clone() as _);
41//! }
42//!
43//! let ctx = SessionContext::new_with_state(state_builder.build()).enable_url_table();
44//! ctx.sql(
45//! "CREATE EXTERNAL TABLE metrics (service VARCHAR, value BIGINT) \
46//! STORED AS vortex LOCATION 'file:///tmp/metrics/'",
47//! )
48//! .await?;
49//! # Ok(())
50//! # }
51//! ```
52//!
53//! # Registering An Existing Vortex Data Source
54//!
55//! If your application already has a Vortex [`DataSourceRef`], use
56//! [`v2::VortexTable`] to register it directly with DataFusion:
57//!
58//! ```no_run
59//! use std::sync::Arc;
60//!
61//! use arrow_schema::Schema;
62//! use datafusion::prelude::SessionContext;
63//! use vortex::VortexSessionDefault;
64//! use vortex::scan::DataSourceRef;
65//! use vortex::session::VortexSession;
66//! use vortex_datafusion::v2::VortexTable;
67//!
68//! # let data_source: DataSourceRef = todo!();
69//! let table = Arc::new(VortexTable::new(
70//! data_source,
71//! VortexSession::default(),
72//! Arc::new(Schema::empty()),
73//! ));
74//!
75//! let ctx = SessionContext::new();
76//! ctx.register_table("vortex_data", table)?;
77//! # Ok::<(), datafusion_common::DataFusionError>(())
78//! ```
79//!
80//! [`Vortex`]: https://docs.rs/crate/vortex/latest
81//! [DataFusion]: https://docs.rs/datafusion/latest/datafusion/
82//! [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
83//! [`DataSourceRef`]: vortex::scan::DataSourceRef
84//! [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionContext.html
85#![deny(missing_docs)]
86use std::fmt::Debug;
87
88use datafusion_common::stats::Precision as DFPrecision;
89use vortex::expr::stats::Precision;
90
91mod convert;
92mod persistent;
93pub mod v2;
94
95#[cfg(test)]
96mod tests;
97
98pub use convert::exprs::ExpressionConvertor;
99pub use persistent::*;
100
101/// Extension trait to convert our [`Precision`] to DataFusion's
102/// [`DataFusionPrecision`].
103///
104/// [`Precision`]: vortex::expr::stats::Precision
105/// [`DataFusionPrecision`]: datafusion_common::stats::Precision
106trait PrecisionExt<T>
107where
108 T: Debug + Clone + PartialEq + Eq + PartialOrd,
109{
110 /// Convert `Precision` to the datafusion equivalent.
111 fn to_df(self) -> DFPrecision<T>;
112}
113
114impl<T> PrecisionExt<T> for Precision<T>
115where
116 T: Debug + Clone + PartialEq + Eq + PartialOrd,
117{
118 fn to_df(self) -> DFPrecision<T> {
119 match self {
120 Precision::Exact(v) => DFPrecision::Exact(v),
121 Precision::Inexact(v) => DFPrecision::Inexact(v),
122 }
123 }
124}
125
126impl<T> PrecisionExt<T> for Option<Precision<T>>
127where
128 T: Debug + Clone + PartialEq + Eq + PartialOrd,
129{
130 fn to_df(self) -> DFPrecision<T> {
131 match self {
132 Some(v) => v.to_df(),
133 None => DFPrecision::Absent,
134 }
135 }
136}
137
138#[cfg(test)]
139mod common_tests {
140 use std::sync::Arc;
141 use std::sync::LazyLock;
142
143 use datafusion::arrow::array::RecordBatch;
144 use datafusion::datasource::provider::DefaultTableFactory;
145 use datafusion::execution::SessionStateBuilder;
146 use datafusion::prelude::SessionContext;
147 use datafusion_catalog::TableProvider;
148 use datafusion_common::DFSchema;
149 use datafusion_common::GetExt;
150 use datafusion_expr::CreateExternalTable;
151 use object_store::ObjectStore;
152 use object_store::memory::InMemory;
153 use url::Url;
154 use vortex::VortexSessionDefault;
155 use vortex::array::ArrayRef;
156 use vortex::array::arrow::FromArrowArray;
157 use vortex::file::WriteOptionsSessionExt;
158 use vortex::io::VortexWrite;
159 use vortex::io::object_store::ObjectStoreWrite;
160 use vortex::session::VortexSession;
161
162 use crate::VortexFormatFactory;
163 use crate::VortexTableOptions;
164
165 static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
166
167 pub struct TestSessionContext {
168 pub store: Arc<dyn ObjectStore>,
169 pub session: SessionContext,
170 }
171
172 impl Default for TestSessionContext {
173 fn default() -> Self {
174 Self::new(false)
175 }
176 }
177
178 impl TestSessionContext {
179 /// Create a new test session context with the given projection pushdown setting.
180 pub fn new(projection_pushdown: bool) -> Self {
181 let store = Arc::new(InMemory::new());
182 let opts = VortexTableOptions {
183 projection_pushdown,
184 ..Default::default()
185 };
186 let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
187 let mut session_state_builder = SessionStateBuilder::new()
188 .with_default_features()
189 .with_table_factory(
190 factory.get_ext().to_uppercase(),
191 Arc::new(DefaultTableFactory::new()),
192 )
193 .with_object_store(
194 &Url::try_from("file://").unwrap(),
195 Arc::<InMemory>::clone(&store),
196 );
197
198 if let Some(file_formats) = session_state_builder.file_formats() {
199 file_formats.push(factory as _);
200 }
201
202 let session: SessionContext =
203 SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
204
205 Self { store, session }
206 }
207
208 // Write arrow data into a vortex file.
209 pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
210 where
211 P: Into<object_store::path::Path>,
212 {
213 let array = ArrayRef::from_arrow(batch, false)?;
214 let mut write = ObjectStoreWrite::new(Arc::clone(&self.store), &path.into()).await?;
215 VX_SESSION
216 .write_options()
217 .write(&mut write, array.to_array_stream())
218 .await?;
219 write.shutdown().await?;
220
221 Ok(())
222 }
223
224 /// Creates a ListingTable provider targeted at the provided path
225 pub async fn table_provider<S>(
226 &self,
227 name: &str,
228 location: impl Into<String>,
229 schema: S,
230 ) -> anyhow::Result<Arc<dyn TableProvider>>
231 where
232 DFSchema: TryFrom<S>,
233 anyhow::Error: From<<S as TryInto<DFSchema>>::Error>,
234 {
235 let factory = self.session.table_factory("VORTEX").unwrap();
236
237 let cmd = CreateExternalTable::builder(
238 name,
239 location.into(),
240 "vortex",
241 DFSchema::try_from(schema)?.into(),
242 )
243 .build();
244
245 let table = factory.create(&self.session.state(), &cmd).await?;
246
247 Ok(table)
248 }
249 }
250}