1#![deny(missing_docs)]
6use std::fmt::Debug;
7
8use datafusion_common::stats::Precision as DFPrecision;
9use vortex::expr::stats::Precision;
10
11mod convert;
12mod persistent;
13
14#[cfg(test)]
15mod tests;
16
17pub use convert::exprs::ExpressionConvertor;
18pub use persistent::*;
19
20trait PrecisionExt<T>
22where
23 T: Debug + Clone + PartialEq + Eq + PartialOrd,
24{
25 fn to_df(self) -> DFPrecision<T>;
27}
28
29impl<T> PrecisionExt<T> for Precision<T>
30where
31 T: Debug + Clone + PartialEq + Eq + PartialOrd,
32{
33 fn to_df(self) -> DFPrecision<T> {
34 match self {
35 Precision::Exact(v) => DFPrecision::Exact(v),
36 Precision::Inexact(v) => DFPrecision::Inexact(v),
37 }
38 }
39}
40
41impl<T> PrecisionExt<T> for Option<Precision<T>>
42where
43 T: Debug + Clone + PartialEq + Eq + PartialOrd,
44{
45 fn to_df(self) -> DFPrecision<T> {
46 match self {
47 Some(v) => v.to_df(),
48 None => DFPrecision::Absent,
49 }
50 }
51}
52
53#[cfg(test)]
54mod common_tests {
55 use std::sync::Arc;
56 use std::sync::LazyLock;
57
58 use datafusion::arrow::array::RecordBatch;
59 use datafusion::datasource::provider::DefaultTableFactory;
60 use datafusion::execution::SessionStateBuilder;
61 use datafusion::prelude::SessionContext;
62 use datafusion_catalog::TableProvider;
63 use datafusion_common::DFSchema;
64 use datafusion_common::GetExt;
65 use datafusion_expr::CreateExternalTable;
66 use object_store::ObjectStore;
67 use object_store::memory::InMemory;
68 use url::Url;
69 use vortex::VortexSessionDefault;
70 use vortex::array::ArrayRef;
71 use vortex::array::arrow::FromArrowArray;
72 use vortex::file::WriteOptionsSessionExt;
73 use vortex::io::VortexWrite;
74 use vortex::io::object_store::ObjectStoreWrite;
75 use vortex::session::VortexSession;
76
77 use crate::VortexFormatFactory;
78 use crate::VortexTableOptions;
79
80 static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
81
82 pub struct TestSessionContext {
83 pub store: Arc<dyn ObjectStore>,
84 pub session: SessionContext,
85 }
86
87 impl Default for TestSessionContext {
88 fn default() -> Self {
89 Self::new(false)
90 }
91 }
92
93 impl TestSessionContext {
94 pub fn new(projection_pushdown: bool) -> Self {
96 let store = Arc::new(InMemory::new());
97 let opts = VortexTableOptions {
98 projection_pushdown,
99 ..Default::default()
100 };
101 let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
102 let mut session_state_builder = SessionStateBuilder::new()
103 .with_default_features()
104 .with_table_factory(
105 factory.get_ext().to_uppercase(),
106 Arc::new(DefaultTableFactory::new()),
107 )
108 .with_object_store(&Url::try_from("file://").unwrap(), store.clone());
109
110 if let Some(file_formats) = session_state_builder.file_formats() {
111 file_formats.push(factory as _);
112 }
113
114 let session: SessionContext =
115 SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
116
117 Self { store, session }
118 }
119
120 pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
122 where
123 P: Into<object_store::path::Path>,
124 {
125 let array = ArrayRef::from_arrow(batch, false)?;
126 let mut write = ObjectStoreWrite::new(self.store.clone(), &path.into()).await?;
127 VX_SESSION
128 .write_options()
129 .write(&mut write, array.to_array_stream())
130 .await?;
131 write.shutdown().await?;
132
133 Ok(())
134 }
135
136 pub async fn table_provider<S>(
138 &self,
139 name: &str,
140 location: impl Into<String>,
141 schema: S,
142 ) -> anyhow::Result<Arc<dyn TableProvider>>
143 where
144 DFSchema: TryFrom<S>,
145 anyhow::Error: From<<S as TryInto<DFSchema>>::Error>,
146 {
147 let factory = self.session.table_factory("VORTEX").unwrap();
148
149 let cmd = CreateExternalTable::builder(
150 name,
151 location.into(),
152 "vortex",
153 DFSchema::try_from(schema)?.into(),
154 )
155 .build();
156
157 let table = factory.create(&self.session.state(), &cmd).await?;
158
159 Ok(table)
160 }
161 }
162}