Skip to main content

vortex_datafusion/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Integrations between [`Vortex`] and [DataFusion].
5//!
6//! The crate exposes two main entry points:
7//!
8//! - [`VortexFormatFactory`] for the file-based integration used by SQL,
9//!   `CREATE EXTERNAL TABLE`, and
10//!   [`ListingTable`].
11//! - [`v2`] for direct integration from an existing Vortex
12//!   [`DataSourceRef`].
13//!
14//! # Registering The File Format
15//!
16//! Most applications register [`VortexFormatFactory`] with a DataFusion
17//! [`SessionContext`] and then let DataFusion create [`VortexFormat`] and
18//! [`VortexSource`] instances as queries are planned:
19//!
20//! ```no_run
21//! use std::sync::Arc;
22//!
23//! use datafusion::datasource::provider::DefaultTableFactory;
24//! use datafusion::execution::SessionStateBuilder;
25//! use datafusion::prelude::SessionContext;
26//! use datafusion_common::GetExt;
27//! use vortex_datafusion::VortexFormatFactory;
28//!
29//! # #[tokio::main]
30//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
31//! let factory = Arc::new(VortexFormatFactory::new());
32//! let mut state_builder = SessionStateBuilder::new()
33//!     .with_default_features()
34//!     .with_table_factory(
35//!         factory.get_ext().to_uppercase(),
36//!         Arc::new(DefaultTableFactory::new()),
37//!     );
38//!
39//! if let Some(file_formats) = state_builder.file_formats() {
40//!     file_formats.push(factory.clone() as _);
41//! }
42//!
43//! let ctx = SessionContext::new_with_state(state_builder.build()).enable_url_table();
44//! ctx.sql(
45//!     "CREATE EXTERNAL TABLE metrics (service VARCHAR, value BIGINT) \
46//!      STORED AS vortex LOCATION 'file:///tmp/metrics/'",
47//! )
48//! .await?;
49//! # Ok(())
50//! # }
51//! ```
52//!
53//! # Registering An Existing Vortex Data Source
54//!
55//! If your application already has a Vortex [`DataSourceRef`], use
56//! [`v2::VortexTable`] to register it directly with DataFusion:
57//!
58//! ```no_run
59//! use std::sync::Arc;
60//!
61//! use arrow_schema::Schema;
62//! use datafusion::prelude::SessionContext;
63//! use vortex::VortexSessionDefault;
64//! use vortex::scan::DataSourceRef;
65//! use vortex::session::VortexSession;
66//! use vortex_datafusion::v2::VortexTable;
67//!
68//! # let data_source: DataSourceRef = todo!();
69//! let table = Arc::new(VortexTable::new(
70//!     data_source,
71//!     VortexSession::default(),
72//!     Arc::new(Schema::empty()),
73//! ));
74//!
75//! let ctx = SessionContext::new();
76//! ctx.register_table("vortex_data", table)?;
77//! # Ok::<(), datafusion_common::DataFusionError>(())
78//! ```
79//!
80//! [`Vortex`]: https://docs.rs/crate/vortex/latest
81//! [DataFusion]: https://docs.rs/datafusion/latest/datafusion/
82//! [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
83//! [`DataSourceRef`]: vortex::scan::DataSourceRef
84//! [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionContext.html
85#![deny(missing_docs)]
86use std::fmt::Debug;
87
88use datafusion_common::stats::Precision as DFPrecision;
89use vortex::expr::stats::Precision;
90
91mod convert;
92mod persistent;
93pub mod v2;
94
95#[cfg(test)]
96mod tests;
97
98pub use convert::exprs::ExpressionConvertor;
99pub use persistent::*;
100
101/// Extension trait to convert our [`Precision`] to DataFusion's
102/// [`DataFusionPrecision`].
103///
104/// [`Precision`]: vortex::expr::stats::Precision
105/// [`DataFusionPrecision`]: datafusion_common::stats::Precision
106trait PrecisionExt<T>
107where
108    T: Debug + Clone + PartialEq + Eq + PartialOrd,
109{
110    /// Convert `Precision` to the datafusion equivalent.
111    fn to_df(self) -> DFPrecision<T>;
112}
113
114impl<T> PrecisionExt<T> for Precision<T>
115where
116    T: Debug + Clone + PartialEq + Eq + PartialOrd,
117{
118    fn to_df(self) -> DFPrecision<T> {
119        match self {
120            Precision::Exact(v) => DFPrecision::Exact(v),
121            Precision::Inexact(v) => DFPrecision::Inexact(v),
122        }
123    }
124}
125
126impl<T> PrecisionExt<T> for Option<Precision<T>>
127where
128    T: Debug + Clone + PartialEq + Eq + PartialOrd,
129{
130    fn to_df(self) -> DFPrecision<T> {
131        match self {
132            Some(v) => v.to_df(),
133            None => DFPrecision::Absent,
134        }
135    }
136}
137
138#[cfg(test)]
139mod common_tests {
140    use std::sync::Arc;
141    use std::sync::LazyLock;
142
143    use datafusion::arrow::array::RecordBatch;
144    use datafusion::datasource::provider::DefaultTableFactory;
145    use datafusion::execution::SessionStateBuilder;
146    use datafusion::prelude::SessionContext;
147    use datafusion_catalog::TableProvider;
148    use datafusion_common::DFSchema;
149    use datafusion_common::GetExt;
150    use datafusion_expr::CreateExternalTable;
151    use object_store::ObjectStore;
152    use object_store::memory::InMemory;
153    use url::Url;
154    use vortex::VortexSessionDefault;
155    use vortex::array::ArrayRef;
156    use vortex::array::arrow::FromArrowArray;
157    use vortex::file::WriteOptionsSessionExt;
158    use vortex::io::VortexWrite;
159    use vortex::io::object_store::ObjectStoreWrite;
160    use vortex::session::VortexSession;
161
162    use crate::VortexFormatFactory;
163    use crate::VortexTableOptions;
164
165    static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
166
167    pub struct TestSessionContext {
168        pub store: Arc<dyn ObjectStore>,
169        pub session: SessionContext,
170    }
171
172    impl Default for TestSessionContext {
173        fn default() -> Self {
174            Self::new(false)
175        }
176    }
177
178    impl TestSessionContext {
179        /// Create a new test session context with the given projection pushdown setting.
180        pub fn new(projection_pushdown: bool) -> Self {
181            let store = Arc::new(InMemory::new());
182            let opts = VortexTableOptions {
183                projection_pushdown,
184                ..Default::default()
185            };
186            let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
187            let mut session_state_builder = SessionStateBuilder::new()
188                .with_default_features()
189                .with_table_factory(
190                    factory.get_ext().to_uppercase(),
191                    Arc::new(DefaultTableFactory::new()),
192                )
193                .with_object_store(
194                    &Url::try_from("file://").unwrap(),
195                    Arc::<InMemory>::clone(&store),
196                );
197
198            if let Some(file_formats) = session_state_builder.file_formats() {
199                file_formats.push(factory as _);
200            }
201
202            let session: SessionContext =
203                SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
204
205            Self { store, session }
206        }
207
208        // Write arrow data into a vortex file.
209        pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
210        where
211            P: Into<object_store::path::Path>,
212        {
213            let array = ArrayRef::from_arrow(batch, false)?;
214            let mut write = ObjectStoreWrite::new(Arc::clone(&self.store), &path.into()).await?;
215            VX_SESSION
216                .write_options()
217                .write(&mut write, array.to_array_stream())
218                .await?;
219            write.shutdown().await?;
220
221            Ok(())
222        }
223
224        /// Creates a ListingTable provider targeted at the provided path
225        pub async fn table_provider<S>(
226            &self,
227            name: &str,
228            location: impl Into<String>,
229            schema: S,
230        ) -> anyhow::Result<Arc<dyn TableProvider>>
231        where
232            DFSchema: TryFrom<S>,
233            anyhow::Error: From<<S as TryInto<DFSchema>>::Error>,
234        {
235            let factory = self.session.table_factory("VORTEX").unwrap();
236
237            let cmd = CreateExternalTable::builder(
238                name,
239                location.into(),
240                "vortex",
241                DFSchema::try_from(schema)?.into(),
242            )
243            .build();
244
245            let table = factory.create(&self.session.state(), &cmd).await?;
246
247            Ok(table)
248        }
249    }
250}