Skip to main content

vortex_datafusion/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Connectors to enable [DataFusion](https://docs.rs/datafusion/latest/datafusion/) to read [`Vortex`](https://docs.rs/crate/vortex/latest) data.
5#![deny(missing_docs)]
6use std::fmt::Debug;
7
8use datafusion_common::stats::Precision as DFPrecision;
9use vortex::expr::stats::Precision;
10
11mod convert;
12mod persistent;
13pub mod v2;
14
15#[cfg(test)]
16mod tests;
17
18pub use convert::exprs::ExpressionConvertor;
19pub use persistent::*;
20
21/// Extension trait to convert our [`Precision`](vortex::stats::Precision) to Datafusion's [`Precision`](datafusion_common::stats::Precision)
22trait PrecisionExt<T>
23where
24    T: Debug + Clone + PartialEq + Eq + PartialOrd,
25{
26    /// Convert `Precision` to the datafusion equivalent.
27    fn to_df(self) -> DFPrecision<T>;
28}
29
30impl<T> PrecisionExt<T> for Precision<T>
31where
32    T: Debug + Clone + PartialEq + Eq + PartialOrd,
33{
34    fn to_df(self) -> DFPrecision<T> {
35        match self {
36            Precision::Exact(v) => DFPrecision::Exact(v),
37            Precision::Inexact(v) => DFPrecision::Inexact(v),
38        }
39    }
40}
41
42impl<T> PrecisionExt<T> for Option<Precision<T>>
43where
44    T: Debug + Clone + PartialEq + Eq + PartialOrd,
45{
46    fn to_df(self) -> DFPrecision<T> {
47        match self {
48            Some(v) => v.to_df(),
49            None => DFPrecision::Absent,
50        }
51    }
52}
53
54#[cfg(test)]
55mod common_tests {
56    use std::sync::Arc;
57    use std::sync::LazyLock;
58
59    use datafusion::arrow::array::RecordBatch;
60    use datafusion::datasource::provider::DefaultTableFactory;
61    use datafusion::execution::SessionStateBuilder;
62    use datafusion::prelude::SessionContext;
63    use datafusion_catalog::TableProvider;
64    use datafusion_common::DFSchema;
65    use datafusion_common::GetExt;
66    use datafusion_expr::CreateExternalTable;
67    use object_store::ObjectStore;
68    use object_store::memory::InMemory;
69    use url::Url;
70    use vortex::VortexSessionDefault;
71    use vortex::array::ArrayRef;
72    use vortex::array::arrow::FromArrowArray;
73    use vortex::file::WriteOptionsSessionExt;
74    use vortex::io::VortexWrite;
75    use vortex::io::object_store::ObjectStoreWrite;
76    use vortex::session::VortexSession;
77
78    use crate::VortexFormatFactory;
79    use crate::VortexTableOptions;
80
81    static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
82
83    pub struct TestSessionContext {
84        pub store: Arc<dyn ObjectStore>,
85        pub session: SessionContext,
86    }
87
88    impl Default for TestSessionContext {
89        fn default() -> Self {
90            Self::new(false)
91        }
92    }
93
94    impl TestSessionContext {
95        /// Create a new test session context with the given projection pushdown setting.
96        pub fn new(projection_pushdown: bool) -> Self {
97            let store = Arc::new(InMemory::new());
98            let opts = VortexTableOptions {
99                projection_pushdown,
100                ..Default::default()
101            };
102            let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
103            let mut session_state_builder = SessionStateBuilder::new()
104                .with_default_features()
105                .with_table_factory(
106                    factory.get_ext().to_uppercase(),
107                    Arc::new(DefaultTableFactory::new()),
108                )
109                .with_object_store(
110                    &Url::try_from("file://").unwrap(),
111                    Arc::<InMemory>::clone(&store),
112                );
113
114            if let Some(file_formats) = session_state_builder.file_formats() {
115                file_formats.push(factory as _);
116            }
117
118            let session: SessionContext =
119                SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
120
121            Self { store, session }
122        }
123
124        // Write arrow data into a vortex file.
125        pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
126        where
127            P: Into<object_store::path::Path>,
128        {
129            let array = ArrayRef::from_arrow(batch, false)?;
130            let mut write = ObjectStoreWrite::new(Arc::clone(&self.store), &path.into()).await?;
131            VX_SESSION
132                .write_options()
133                .write(&mut write, array.to_array_stream())
134                .await?;
135            write.shutdown().await?;
136
137            Ok(())
138        }
139
140        /// Creates a ListingTable provider targeted at the provided path
141        pub async fn table_provider<S>(
142            &self,
143            name: &str,
144            location: impl Into<String>,
145            schema: S,
146        ) -> anyhow::Result<Arc<dyn TableProvider>>
147        where
148            DFSchema: TryFrom<S>,
149            anyhow::Error: From<<S as TryInto<DFSchema>>::Error>,
150        {
151            let factory = self.session.table_factory("VORTEX").unwrap();
152
153            let cmd = CreateExternalTable::builder(
154                name,
155                location.into(),
156                "vortex",
157                DFSchema::try_from(schema)?.into(),
158            )
159            .build();
160
161            let table = factory.create(&self.session.state(), &cmd).await?;
162
163            Ok(table)
164        }
165    }
166}