Skip to main content

vortex_datafusion/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Connectors to enable [DataFusion](https://docs.rs/datafusion/latest/datafusion/) to read [`Vortex`](https://docs.rs/crate/vortex/latest) data.
5#![deny(missing_docs)]
6use std::fmt::Debug;
7
8use datafusion_common::stats::Precision as DFPrecision;
9use vortex::expr::stats::Precision;
10
11mod convert;
12mod persistent;
13
14#[cfg(test)]
15mod tests;
16
17pub use convert::exprs::ExpressionConvertor;
18pub use persistent::*;
19
20/// Extension trait to convert our [`Precision`](vortex::stats::Precision) to Datafusion's [`Precision`](datafusion_common::stats::Precision)
21trait PrecisionExt<T>
22where
23    T: Debug + Clone + PartialEq + Eq + PartialOrd,
24{
25    /// Convert `Precision` to the datafusion equivalent.
26    fn to_df(self) -> DFPrecision<T>;
27}
28
29impl<T> PrecisionExt<T> for Precision<T>
30where
31    T: Debug + Clone + PartialEq + Eq + PartialOrd,
32{
33    fn to_df(self) -> DFPrecision<T> {
34        match self {
35            Precision::Exact(v) => DFPrecision::Exact(v),
36            Precision::Inexact(v) => DFPrecision::Inexact(v),
37        }
38    }
39}
40
41impl<T> PrecisionExt<T> for Option<Precision<T>>
42where
43    T: Debug + Clone + PartialEq + Eq + PartialOrd,
44{
45    fn to_df(self) -> DFPrecision<T> {
46        match self {
47            Some(v) => v.to_df(),
48            None => DFPrecision::Absent,
49        }
50    }
51}
52
53#[cfg(test)]
54mod common_tests {
55    use std::sync::Arc;
56    use std::sync::LazyLock;
57
58    use datafusion::arrow::array::RecordBatch;
59    use datafusion::datasource::provider::DefaultTableFactory;
60    use datafusion::execution::SessionStateBuilder;
61    use datafusion::prelude::SessionContext;
62    use datafusion_catalog::TableProvider;
63    use datafusion_common::DFSchema;
64    use datafusion_common::GetExt;
65    use datafusion_expr::CreateExternalTable;
66    use object_store::ObjectStore;
67    use object_store::memory::InMemory;
68    use url::Url;
69    use vortex::VortexSessionDefault;
70    use vortex::array::ArrayRef;
71    use vortex::array::arrow::FromArrowArray;
72    use vortex::file::WriteOptionsSessionExt;
73    use vortex::io::VortexWrite;
74    use vortex::io::object_store::ObjectStoreWrite;
75    use vortex::session::VortexSession;
76
77    use crate::VortexFormatFactory;
78    use crate::VortexTableOptions;
79
80    static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
81
82    pub struct TestSessionContext {
83        pub store: Arc<dyn ObjectStore>,
84        pub session: SessionContext,
85    }
86
87    impl Default for TestSessionContext {
88        fn default() -> Self {
89            Self::new(false)
90        }
91    }
92
93    impl TestSessionContext {
94        /// Create a new test session context with the given projection pushdown setting.
95        pub fn new(projection_pushdown: bool) -> Self {
96            let store = Arc::new(InMemory::new());
97            let opts = VortexTableOptions {
98                projection_pushdown,
99                ..Default::default()
100            };
101            let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
102            let mut session_state_builder = SessionStateBuilder::new()
103                .with_default_features()
104                .with_table_factory(
105                    factory.get_ext().to_uppercase(),
106                    Arc::new(DefaultTableFactory::new()),
107                )
108                .with_object_store(&Url::try_from("file://").unwrap(), store.clone());
109
110            if let Some(file_formats) = session_state_builder.file_formats() {
111                file_formats.push(factory as _);
112            }
113
114            let session: SessionContext =
115                SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
116
117            Self { store, session }
118        }
119
120        // Write arrow data into a vortex file.
121        pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
122        where
123            P: Into<object_store::path::Path>,
124        {
125            let array = ArrayRef::from_arrow(batch, false)?;
126            let mut write = ObjectStoreWrite::new(self.store.clone(), &path.into()).await?;
127            VX_SESSION
128                .write_options()
129                .write(&mut write, array.to_array_stream())
130                .await?;
131            write.shutdown().await?;
132
133            Ok(())
134        }
135
136        /// Creates a ListingTable provider targeted at the provided path
137        pub async fn table_provider<S>(
138            &self,
139            name: &str,
140            location: impl Into<String>,
141            schema: S,
142        ) -> anyhow::Result<Arc<dyn TableProvider>>
143        where
144            DFSchema: TryFrom<S>,
145            anyhow::Error: From<<S as TryInto<DFSchema>>::Error>,
146        {
147            let factory = self.session.table_factory("VORTEX").unwrap();
148
149            let cmd = CreateExternalTable::builder(
150                name,
151                location.into(),
152                "vortex",
153                DFSchema::try_from(schema)?.into(),
154            )
155            .build();
156
157            let table = factory.create(&self.session.state(), &cmd).await?;
158
159            Ok(table)
160        }
161    }
162}