1#![deny(missing_docs)]
6use std::fmt::Debug;
7
8use datafusion_common::stats::Precision as DFPrecision;
9use vortex::expr::stats::Precision;
10
11mod convert;
12mod persistent;
13pub mod v2;
14
15#[cfg(test)]
16mod tests;
17
18pub use convert::exprs::ExpressionConvertor;
19pub use persistent::*;
20
21trait PrecisionExt<T>
23where
24 T: Debug + Clone + PartialEq + Eq + PartialOrd,
25{
26 fn to_df(self) -> DFPrecision<T>;
28}
29
30impl<T> PrecisionExt<T> for Precision<T>
31where
32 T: Debug + Clone + PartialEq + Eq + PartialOrd,
33{
34 fn to_df(self) -> DFPrecision<T> {
35 match self {
36 Precision::Exact(v) => DFPrecision::Exact(v),
37 Precision::Inexact(v) => DFPrecision::Inexact(v),
38 }
39 }
40}
41
42impl<T> PrecisionExt<T> for Option<Precision<T>>
43where
44 T: Debug + Clone + PartialEq + Eq + PartialOrd,
45{
46 fn to_df(self) -> DFPrecision<T> {
47 match self {
48 Some(v) => v.to_df(),
49 None => DFPrecision::Absent,
50 }
51 }
52}
53
54#[cfg(test)]
55mod common_tests {
56 use std::sync::Arc;
57 use std::sync::LazyLock;
58
59 use datafusion::arrow::array::RecordBatch;
60 use datafusion::datasource::provider::DefaultTableFactory;
61 use datafusion::execution::SessionStateBuilder;
62 use datafusion::prelude::SessionContext;
63 use datafusion_catalog::TableProvider;
64 use datafusion_common::DFSchema;
65 use datafusion_common::GetExt;
66 use datafusion_expr::CreateExternalTable;
67 use object_store::ObjectStore;
68 use object_store::memory::InMemory;
69 use url::Url;
70 use vortex::VortexSessionDefault;
71 use vortex::array::ArrayRef;
72 use vortex::array::arrow::FromArrowArray;
73 use vortex::file::WriteOptionsSessionExt;
74 use vortex::io::VortexWrite;
75 use vortex::io::object_store::ObjectStoreWrite;
76 use vortex::session::VortexSession;
77
78 use crate::VortexFormatFactory;
79 use crate::VortexTableOptions;
80
81 static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
82
83 pub struct TestSessionContext {
84 pub store: Arc<dyn ObjectStore>,
85 pub session: SessionContext,
86 }
87
88 impl Default for TestSessionContext {
89 fn default() -> Self {
90 Self::new(false)
91 }
92 }
93
94 impl TestSessionContext {
95 pub fn new(projection_pushdown: bool) -> Self {
97 let store = Arc::new(InMemory::new());
98 let opts = VortexTableOptions {
99 projection_pushdown,
100 ..Default::default()
101 };
102 let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
103 let mut session_state_builder = SessionStateBuilder::new()
104 .with_default_features()
105 .with_table_factory(
106 factory.get_ext().to_uppercase(),
107 Arc::new(DefaultTableFactory::new()),
108 )
109 .with_object_store(&Url::try_from("file://").unwrap(), store.clone());
110
111 if let Some(file_formats) = session_state_builder.file_formats() {
112 file_formats.push(factory as _);
113 }
114
115 let session: SessionContext =
116 SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
117
118 Self { store, session }
119 }
120
121 pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
123 where
124 P: Into<object_store::path::Path>,
125 {
126 let array = ArrayRef::from_arrow(batch, false)?;
127 let mut write = ObjectStoreWrite::new(self.store.clone(), &path.into()).await?;
128 VX_SESSION
129 .write_options()
130 .write(&mut write, array.to_array_stream())
131 .await?;
132 write.shutdown().await?;
133
134 Ok(())
135 }
136
137 pub async fn table_provider<S>(
139 &self,
140 name: &str,
141 location: impl Into<String>,
142 schema: S,
143 ) -> anyhow::Result<Arc<dyn TableProvider>>
144 where
145 DFSchema: TryFrom<S>,
146 anyhow::Error: From<<S as TryInto<DFSchema>>::Error>,
147 {
148 let factory = self.session.table_factory("VORTEX").unwrap();
149
150 let cmd = CreateExternalTable::builder(
151 name,
152 location.into(),
153 "vortex",
154 DFSchema::try_from(schema)?.into(),
155 )
156 .build();
157
158 let table = factory.create(&self.session.state(), &cmd).await?;
159
160 Ok(table)
161 }
162 }
163}