vortex_datafusion/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Integrations between [`Vortex`] and [DataFusion].
5//!
6//! The crate exposes two main entry points:
7//!
8//! - [`VortexFormatFactory`] for the file-based integration used by SQL,
9//! `CREATE EXTERNAL TABLE`, and
10//! [`ListingTable`].
11//! - [`v2`] for direct integration from an existing Vortex
12//! [`DataSourceRef`].
13//!
14//! # Registering The File Format
15//!
16//! Most applications register [`VortexFormatFactory`] with a DataFusion
17//! [`SessionContext`] and then let DataFusion create [`VortexFormat`] and
18//! [`VortexSource`] instances as queries are planned:
19//!
20//! ```no_run
21//! use std::sync::Arc;
22//!
23//! use datafusion::datasource::provider::DefaultTableFactory;
24//! use datafusion::execution::SessionStateBuilder;
25//! use datafusion::prelude::SessionContext;
26//! use datafusion_common::GetExt;
27//! use vortex_datafusion::VortexFormatFactory;
28//!
29//! # #[tokio::main]
30//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
31//! let factory = Arc::new(VortexFormatFactory::new());
32//! let mut state_builder = SessionStateBuilder::new()
33//! .with_default_features()
34//! .with_table_factory(
35//! factory.get_ext().to_uppercase(),
36//! Arc::new(DefaultTableFactory::new()),
37//! );
38//!
39//! if let Some(file_formats) = state_builder.file_formats() {
40//! file_formats.push(factory.clone() as _);
41//! }
42//!
43//! let ctx = SessionContext::new_with_state(state_builder.build()).enable_url_table();
44//! ctx.sql(
45//! "CREATE EXTERNAL TABLE metrics (service VARCHAR, value BIGINT) \
46//! STORED AS vortex LOCATION 'file:///tmp/metrics/'",
47//! )
48//! .await?;
49//! # Ok(())
50//! # }
51//! ```
52//!
53//! # Registering An Existing Vortex Data Source
54//!
55//! If your application already has a Vortex [`DataSourceRef`], use
56//! [`v2::VortexTable`] to register it directly with DataFusion:
57//!
58//! ```no_run
59//! use std::sync::Arc;
60//!
61//! use arrow_schema::Schema;
62//! use datafusion::prelude::SessionContext;
63//! use vortex::VortexSessionDefault;
64//! use vortex::scan::DataSourceRef;
65//! use vortex::session::VortexSession;
66//! use vortex_datafusion::v2::VortexTable;
67//!
68//! # let data_source: DataSourceRef = todo!();
69//! let table = Arc::new(VortexTable::new(
70//! data_source,
71//! VortexSession::default(),
72//! Arc::new(Schema::empty()),
73//! ));
74//!
75//! let ctx = SessionContext::new();
76//! ctx.register_table("vortex_data", table)?;
77//! # Ok::<(), datafusion_common::DataFusionError>(())
78//! ```
79//!
80//! [`Vortex`]: https://docs.rs/crate/vortex/latest
81//! [DataFusion]: https://docs.rs/datafusion/latest/datafusion/
82//! [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
83//! [`DataSourceRef`]: vortex::scan::DataSourceRef
84//! [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionContext.html
85#![deny(missing_docs)]
86use std::fmt::Debug;
87
88use datafusion_common::stats::Precision as DFPrecision;
89use vortex::expr::stats::Precision;
90
91mod convert;
92mod persistent;
93pub mod v2;
94
95#[cfg(test)]
96mod tests;
97
98pub use convert::exprs::ExpressionConvertor;
99pub use persistent::*;
100
101/// Extension trait to convert our [`Precision`] to DataFusion's
102/// [`DataFusionPrecision`].
103///
104/// [`Precision`]: vortex::expr::stats::Precision
105/// [`DataFusionPrecision`]: datafusion_common::stats::Precision
106trait PrecisionExt<T>
107where
108 T: Debug + Clone + PartialEq + Eq + PartialOrd,
109{
110 /// Convert `Precision` to the datafusion equivalent.
111 fn to_df(self) -> DFPrecision<T>;
112}
113
114impl<T> PrecisionExt<T> for Precision<T>
115where
116 T: Debug + Clone + PartialEq + Eq + PartialOrd,
117{
118 fn to_df(self) -> DFPrecision<T> {
119 match self {
120 Precision::Exact(v) => DFPrecision::Exact(v),
121 Precision::Inexact(v) => DFPrecision::Inexact(v),
122 Precision::Absent => DFPrecision::Absent,
123 }
124 }
125}
126
127#[cfg(test)]
128mod common_tests {
129 use std::sync::Arc;
130 use std::sync::LazyLock;
131
132 use datafusion::arrow::array::RecordBatch;
133 use datafusion::datasource::provider::DefaultTableFactory;
134 use datafusion::execution::SessionStateBuilder;
135 use datafusion::prelude::SessionContext;
136 use datafusion_catalog::TableProvider;
137 use datafusion_common::DFSchema;
138 use datafusion_common::GetExt;
139 use datafusion_expr::CreateExternalTable;
140 use object_store::ObjectStore;
141 use object_store::memory::InMemory;
142 use url::Url;
143 use vortex::VortexSessionDefault;
144 use vortex::array::ArrayRef;
145 use vortex::array::arrow::FromArrowArray;
146 use vortex::file::WriteOptionsSessionExt;
147 use vortex::io::VortexWrite;
148 use vortex::io::object_store::ObjectStoreWrite;
149 use vortex::session::VortexSession;
150
151 use crate::VortexFormatFactory;
152 use crate::VortexTableOptions;
153
154 static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
155
156 pub struct TestSessionContext {
157 pub store: Arc<dyn ObjectStore>,
158 pub session: SessionContext,
159 }
160
161 impl Default for TestSessionContext {
162 fn default() -> Self {
163 Self::new(false)
164 }
165 }
166
167 impl TestSessionContext {
168 /// Create a new test session context with the given projection pushdown setting.
169 pub fn new(projection_pushdown: bool) -> Self {
170 let store = Arc::new(InMemory::new());
171 let opts = VortexTableOptions {
172 projection_pushdown,
173 ..Default::default()
174 };
175 let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
176 let mut session_state_builder = SessionStateBuilder::new()
177 .with_default_features()
178 .with_table_factory(
179 factory.get_ext().to_uppercase(),
180 Arc::new(DefaultTableFactory::new()),
181 )
182 .with_object_store(
183 &Url::try_from("file://").unwrap(),
184 Arc::<InMemory>::clone(&store),
185 );
186
187 if let Some(file_formats) = session_state_builder.file_formats() {
188 file_formats.push(factory as _);
189 }
190
191 let session: SessionContext =
192 SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
193
194 Self { store, session }
195 }
196
197 // Write arrow data into a vortex file.
198 pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
199 where
200 P: Into<object_store::path::Path>,
201 {
202 let array = ArrayRef::from_arrow(batch, false)?;
203 let mut write = ObjectStoreWrite::new(Arc::clone(&self.store), &path.into()).await?;
204 VX_SESSION
205 .write_options()
206 .write(&mut write, array.to_array_stream())
207 .await?;
208 write.shutdown().await?;
209
210 Ok(())
211 }
212
213 /// Creates a ListingTable provider targeted at the provided path
214 pub async fn table_provider<S>(
215 &self,
216 name: &str,
217 location: impl Into<String>,
218 schema: S,
219 ) -> anyhow::Result<Arc<dyn TableProvider>>
220 where
221 DFSchema: TryFrom<S>,
222 anyhow::Error: From<<S as TryInto<DFSchema>>::Error>,
223 {
224 let factory = self.session.table_factory("VORTEX").unwrap();
225
226 let cmd = CreateExternalTable::builder(
227 name,
228 location.into(),
229 "vortex",
230 DFSchema::try_from(schema)?.into(),
231 )
232 .build();
233
234 let table = factory.create(&self.session.state(), &cmd).await?;
235
236 Ok(table)
237 }
238 }
239}