Skip to main content

vortex_datafusion/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Connectors to enable [DataFusion](https://docs.rs/datafusion/latest/datafusion/) to read [`Vortex`](https://docs.rs/crate/vortex/latest) data.
5#![deny(missing_docs)]
6use std::fmt::Debug;
7
8use datafusion_common::stats::Precision as DFPrecision;
9use vortex::expr::stats::Precision;
10
11mod convert;
12mod persistent;
13pub mod v2;
14
15#[cfg(test)]
16mod tests;
17
18pub use convert::exprs::ExpressionConvertor;
19pub use persistent::*;
20
21/// Extension trait to convert our [`Precision`](vortex::stats::Precision) to Datafusion's [`Precision`](datafusion_common::stats::Precision)
22trait PrecisionExt<T>
23where
24    T: Debug + Clone + PartialEq + Eq + PartialOrd,
25{
26    /// Convert `Precision` to the datafusion equivalent.
27    fn to_df(self) -> DFPrecision<T>;
28}
29
30impl<T> PrecisionExt<T> for Precision<T>
31where
32    T: Debug + Clone + PartialEq + Eq + PartialOrd,
33{
34    fn to_df(self) -> DFPrecision<T> {
35        match self {
36            Precision::Exact(v) => DFPrecision::Exact(v),
37            Precision::Inexact(v) => DFPrecision::Inexact(v),
38        }
39    }
40}
41
42impl<T> PrecisionExt<T> for Option<Precision<T>>
43where
44    T: Debug + Clone + PartialEq + Eq + PartialOrd,
45{
46    fn to_df(self) -> DFPrecision<T> {
47        match self {
48            Some(v) => v.to_df(),
49            None => DFPrecision::Absent,
50        }
51    }
52}
53
54#[cfg(test)]
55mod common_tests {
56    use std::sync::Arc;
57    use std::sync::LazyLock;
58
59    use datafusion::arrow::array::RecordBatch;
60    use datafusion::datasource::provider::DefaultTableFactory;
61    use datafusion::execution::SessionStateBuilder;
62    use datafusion::prelude::SessionContext;
63    use datafusion_catalog::TableProvider;
64    use datafusion_common::DFSchema;
65    use datafusion_common::GetExt;
66    use datafusion_expr::CreateExternalTable;
67    use object_store::ObjectStore;
68    use object_store::memory::InMemory;
69    use url::Url;
70    use vortex::VortexSessionDefault;
71    use vortex::array::ArrayRef;
72    use vortex::array::arrow::FromArrowArray;
73    use vortex::file::WriteOptionsSessionExt;
74    use vortex::io::VortexWrite;
75    use vortex::io::object_store::ObjectStoreWrite;
76    use vortex::session::VortexSession;
77
78    use crate::VortexFormatFactory;
79    use crate::VortexTableOptions;
80
81    static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
82
83    pub struct TestSessionContext {
84        pub store: Arc<dyn ObjectStore>,
85        pub session: SessionContext,
86    }
87
88    impl Default for TestSessionContext {
89        fn default() -> Self {
90            Self::new(false)
91        }
92    }
93
94    impl TestSessionContext {
95        /// Create a new test session context with the given projection pushdown setting.
96        pub fn new(projection_pushdown: bool) -> Self {
97            let store = Arc::new(InMemory::new());
98            let opts = VortexTableOptions {
99                projection_pushdown,
100                ..Default::default()
101            };
102            let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
103            let mut session_state_builder = SessionStateBuilder::new()
104                .with_default_features()
105                .with_table_factory(
106                    factory.get_ext().to_uppercase(),
107                    Arc::new(DefaultTableFactory::new()),
108                )
109                .with_object_store(&Url::try_from("file://").unwrap(), store.clone());
110
111            if let Some(file_formats) = session_state_builder.file_formats() {
112                file_formats.push(factory as _);
113            }
114
115            let session: SessionContext =
116                SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
117
118            Self { store, session }
119        }
120
121        // Write arrow data into a vortex file.
122        pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
123        where
124            P: Into<object_store::path::Path>,
125        {
126            let array = ArrayRef::from_arrow(batch, false)?;
127            let mut write = ObjectStoreWrite::new(self.store.clone(), &path.into()).await?;
128            VX_SESSION
129                .write_options()
130                .write(&mut write, array.to_array_stream())
131                .await?;
132            write.shutdown().await?;
133
134            Ok(())
135        }
136
137        /// Creates a ListingTable provider targeted at the provided path
138        pub async fn table_provider<S>(
139            &self,
140            name: &str,
141            location: impl Into<String>,
142            schema: S,
143        ) -> anyhow::Result<Arc<dyn TableProvider>>
144        where
145            DFSchema: TryFrom<S>,
146            anyhow::Error: From<<S as TryInto<DFSchema>>::Error>,
147        {
148            let factory = self.session.table_factory("VORTEX").unwrap();
149
150            let cmd = CreateExternalTable::builder(
151                name,
152                location.into(),
153                "vortex",
154                DFSchema::try_from(schema)?.into(),
155            )
156            .build();
157
158            let table = factory.create(&self.session.state(), &cmd).await?;
159
160            Ok(table)
161        }
162    }
163}