Skip to main content

vortex_datafusion/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Integrations between [`Vortex`] and [DataFusion].
5//!
6//! The crate exposes two main entry points:
7//!
8//! - [`VortexFormatFactory`] for the file-based integration used by SQL,
9//!   `CREATE EXTERNAL TABLE`, and
10//!   [`ListingTable`].
11//! - [`v2`] for direct integration from an existing Vortex
12//!   [`DataSourceRef`].
13//!
14//! # Registering The File Format
15//!
16//! Most applications register [`VortexFormatFactory`] with a DataFusion
17//! [`SessionContext`] and then let DataFusion create [`VortexFormat`] and
18//! [`VortexSource`] instances as queries are planned:
19//!
20//! ```no_run
21//! use std::sync::Arc;
22//!
23//! use datafusion::datasource::provider::DefaultTableFactory;
24//! use datafusion::execution::SessionStateBuilder;
25//! use datafusion::prelude::SessionContext;
26//! use datafusion_common::GetExt;
27//! use vortex_datafusion::VortexFormatFactory;
28//!
29//! # #[tokio::main]
30//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
31//! let factory = Arc::new(VortexFormatFactory::new());
32//! let mut state_builder = SessionStateBuilder::new()
33//!     .with_default_features()
34//!     .with_table_factory(
35//!         factory.get_ext().to_uppercase(),
36//!         Arc::new(DefaultTableFactory::new()),
37//!     );
38//!
39//! if let Some(file_formats) = state_builder.file_formats() {
40//!     file_formats.push(factory.clone() as _);
41//! }
42//!
43//! let ctx = SessionContext::new_with_state(state_builder.build()).enable_url_table();
44//! ctx.sql(
45//!     "CREATE EXTERNAL TABLE metrics (service VARCHAR, value BIGINT) \
46//!      STORED AS vortex LOCATION 'file:///tmp/metrics/'",
47//! )
48//! .await?;
49//! # Ok(())
50//! # }
51//! ```
52//!
53//! # Registering An Existing Vortex Data Source
54//!
55//! If your application already has a Vortex [`DataSourceRef`], use
56//! [`v2::VortexTable`] to register it directly with DataFusion:
57//!
58//! ```no_run
59//! use std::sync::Arc;
60//!
61//! use arrow_schema::Schema;
62//! use datafusion::prelude::SessionContext;
63//! use vortex::VortexSessionDefault;
64//! use vortex::scan::DataSourceRef;
65//! use vortex::session::VortexSession;
66//! use vortex_datafusion::v2::VortexTable;
67//!
68//! # let data_source: DataSourceRef = todo!();
69//! let table = Arc::new(VortexTable::new(
70//!     data_source,
71//!     VortexSession::default(),
72//!     Arc::new(Schema::empty()),
73//! ));
74//!
75//! let ctx = SessionContext::new();
76//! ctx.register_table("vortex_data", table)?;
77//! # Ok::<(), datafusion_common::DataFusionError>(())
78//! ```
79//!
80//! [`Vortex`]: https://docs.rs/crate/vortex/latest
81//! [DataFusion]: https://docs.rs/datafusion/latest/datafusion/
82//! [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
83//! [`DataSourceRef`]: vortex::scan::DataSourceRef
84//! [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionContext.html
85#![deny(missing_docs)]
86use std::fmt::Debug;
87
88use datafusion_common::stats::Precision as DFPrecision;
89use vortex::expr::stats::Precision;
90
91mod convert;
92mod persistent;
93pub mod v2;
94
95#[cfg(test)]
96mod tests;
97
98pub use convert::exprs::ExpressionConvertor;
99pub use persistent::*;
100
101/// Extension trait to convert our [`Precision`] to DataFusion's
102/// [`DataFusionPrecision`].
103///
104/// [`Precision`]: vortex::expr::stats::Precision
105/// [`DataFusionPrecision`]: datafusion_common::stats::Precision
106trait PrecisionExt<T>
107where
108    T: Debug + Clone + PartialEq + Eq + PartialOrd,
109{
110    /// Convert `Precision` to the datafusion equivalent.
111    fn to_df(self) -> DFPrecision<T>;
112}
113
114impl<T> PrecisionExt<T> for Precision<T>
115where
116    T: Debug + Clone + PartialEq + Eq + PartialOrd,
117{
118    fn to_df(self) -> DFPrecision<T> {
119        match self {
120            Precision::Exact(v) => DFPrecision::Exact(v),
121            Precision::Inexact(v) => DFPrecision::Inexact(v),
122            Precision::Absent => DFPrecision::Absent,
123        }
124    }
125}
126
127#[cfg(test)]
128mod common_tests {
129    use std::sync::Arc;
130    use std::sync::LazyLock;
131
132    use datafusion::arrow::array::RecordBatch;
133    use datafusion::datasource::provider::DefaultTableFactory;
134    use datafusion::execution::SessionStateBuilder;
135    use datafusion::prelude::SessionContext;
136    use datafusion_catalog::TableProvider;
137    use datafusion_common::DFSchema;
138    use datafusion_common::GetExt;
139    use datafusion_expr::CreateExternalTable;
140    use object_store::ObjectStore;
141    use object_store::memory::InMemory;
142    use url::Url;
143    use vortex::VortexSessionDefault;
144    use vortex::array::ArrayRef;
145    use vortex::array::arrow::FromArrowArray;
146    use vortex::file::WriteOptionsSessionExt;
147    use vortex::io::VortexWrite;
148    use vortex::io::object_store::ObjectStoreWrite;
149    use vortex::session::VortexSession;
150
151    use crate::VortexFormatFactory;
152    use crate::VortexTableOptions;
153
154    static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
155
156    pub struct TestSessionContext {
157        pub store: Arc<dyn ObjectStore>,
158        pub session: SessionContext,
159    }
160
161    impl Default for TestSessionContext {
162        fn default() -> Self {
163            Self::new(false)
164        }
165    }
166
167    impl TestSessionContext {
168        /// Create a new test session context with the given projection pushdown setting.
169        pub fn new(projection_pushdown: bool) -> Self {
170            let store = Arc::new(InMemory::new());
171            let opts = VortexTableOptions {
172                projection_pushdown,
173                ..Default::default()
174            };
175            let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
176            let mut session_state_builder = SessionStateBuilder::new()
177                .with_default_features()
178                .with_table_factory(
179                    factory.get_ext().to_uppercase(),
180                    Arc::new(DefaultTableFactory::new()),
181                )
182                .with_object_store(
183                    &Url::try_from("file://").unwrap(),
184                    Arc::<InMemory>::clone(&store),
185                );
186
187            if let Some(file_formats) = session_state_builder.file_formats() {
188                file_formats.push(factory as _);
189            }
190
191            let session: SessionContext =
192                SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
193
194            Self { store, session }
195        }
196
197        // Write arrow data into a vortex file.
198        pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
199        where
200            P: Into<object_store::path::Path>,
201        {
202            let array = ArrayRef::from_arrow(batch, false)?;
203            let mut write = ObjectStoreWrite::new(Arc::clone(&self.store), &path.into()).await?;
204            VX_SESSION
205                .write_options()
206                .write(&mut write, array.to_array_stream())
207                .await?;
208            write.shutdown().await?;
209
210            Ok(())
211        }
212
213        /// Creates a ListingTable provider targeted at the provided path
214        pub async fn table_provider<S>(
215            &self,
216            name: &str,
217            location: impl Into<String>,
218            schema: S,
219        ) -> anyhow::Result<Arc<dyn TableProvider>>
220        where
221            DFSchema: TryFrom<S>,
222            anyhow::Error: From<<S as TryInto<DFSchema>>::Error>,
223        {
224            let factory = self.session.table_factory("VORTEX").unwrap();
225
226            let cmd = CreateExternalTable::builder(
227                name,
228                location.into(),
229                "vortex",
230                DFSchema::try_from(schema)?.into(),
231            )
232            .build();
233
234            let table = factory.create(&self.session.state(), &cmd).await?;
235
236            Ok(table)
237        }
238    }
239}