datafusion_ffi/tests/
catalog.rs1use std::fmt::Debug;
29use std::sync::Arc;
30
31use arrow::datatypes::Schema;
32use async_trait::async_trait;
33use datafusion_catalog::{
34 CatalogProvider, CatalogProviderList, MemTable, MemoryCatalogProvider,
35 MemoryCatalogProviderList, MemorySchemaProvider, SchemaProvider, TableProvider,
36};
37use datafusion_common::{Result, exec_err};
38
39use crate::catalog_provider::FFI_CatalogProvider;
40use crate::catalog_provider_list::FFI_CatalogProviderList;
41use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
42
43#[derive(Debug)]
46pub struct FixedSchemaProvider {
47 inner: MemorySchemaProvider,
48}
49
50pub fn fruit_table() -> Arc<dyn TableProvider + 'static> {
51 use arrow::datatypes::{DataType, Field};
52 use datafusion_common::record_batch;
53
54 let schema = Arc::new(Schema::new(vec![
55 Field::new("units", DataType::Int32, true),
56 Field::new("price", DataType::Float64, true),
57 ]));
58
59 let partitions = vec![
60 record_batch!(
61 ("units", Int32, vec![10, 20, 30]),
62 ("price", Float64, vec![1.0, 2.0, 5.0])
63 )
64 .unwrap(),
65 record_batch!(
66 ("units", Int32, vec![5, 7]),
67 ("price", Float64, vec![1.5, 2.5])
68 )
69 .unwrap(),
70 ];
71
72 Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
73}
74
75impl Default for FixedSchemaProvider {
76 fn default() -> Self {
77 let inner = MemorySchemaProvider::new();
78
79 let table = fruit_table();
80
81 let _ = inner
82 .register_table("purchases".to_string(), table)
83 .unwrap();
84
85 Self { inner }
86 }
87}
88
89#[async_trait]
90impl SchemaProvider for FixedSchemaProvider {
91 fn table_names(&self) -> Vec<String> {
92 self.inner.table_names()
93 }
94
95 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
96 self.inner.table(name).await
97 }
98
99 fn table_exist(&self, name: &str) -> bool {
100 self.inner.table_exist(name)
101 }
102
103 fn register_table(
104 &self,
105 name: String,
106 table: Arc<dyn TableProvider>,
107 ) -> Result<Option<Arc<dyn TableProvider>>> {
108 if name.as_str() != "sales" && name.as_str() != "purchases" {
109 return exec_err!(
110 "FixedSchemaProvider only provides two tables: sales and purchases"
111 );
112 }
113
114 self.inner.register_table(name, table)
115 }
116
117 fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
118 self.inner.deregister_table(name)
119 }
120}
121
122#[derive(Debug)]
125pub struct FixedCatalogProvider {
126 inner: MemoryCatalogProvider,
127}
128
129impl Default for FixedCatalogProvider {
130 fn default() -> Self {
131 let inner = MemoryCatalogProvider::new();
132
133 let _ = inner.register_schema("apple", Arc::new(FixedSchemaProvider::default()));
134
135 Self { inner }
136 }
137}
138
139impl CatalogProvider for FixedCatalogProvider {
140 fn schema_names(&self) -> Vec<String> {
141 self.inner.schema_names()
142 }
143
144 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
145 self.inner.schema(name)
146 }
147
148 fn register_schema(
149 &self,
150 name: &str,
151 schema: Arc<dyn SchemaProvider>,
152 ) -> Result<Option<Arc<dyn SchemaProvider>>> {
153 if !["apple", "banana", "cherry", "date"].contains(&name) {
154 return exec_err!(
155 "FixedCatalogProvider only provides four schemas: apple, banana, cherry, date"
156 );
157 }
158
159 self.inner.register_schema(name, schema)
160 }
161
162 fn deregister_schema(
163 &self,
164 name: &str,
165 cascade: bool,
166 ) -> Result<Option<Arc<dyn SchemaProvider>>> {
167 self.inner.deregister_schema(name, cascade)
168 }
169}
170
171pub(crate) extern "C" fn create_catalog_provider(
172 codec: FFI_LogicalExtensionCodec,
173) -> FFI_CatalogProvider {
174 let catalog_provider = Arc::new(FixedCatalogProvider::default());
175 FFI_CatalogProvider::new_with_ffi_codec(catalog_provider, None, codec)
176}
177
178#[derive(Debug)]
181pub struct FixedCatalogProviderList {
182 inner: MemoryCatalogProviderList,
183}
184
185impl Default for FixedCatalogProviderList {
186 fn default() -> Self {
187 let inner = MemoryCatalogProviderList::new();
188
189 let _ = inner.register_catalog(
190 "blue".to_owned(),
191 Arc::new(FixedCatalogProvider::default()),
192 );
193
194 Self { inner }
195 }
196}
197
198impl CatalogProviderList for FixedCatalogProviderList {
199 fn catalog_names(&self) -> Vec<String> {
200 self.inner.catalog_names()
201 }
202
203 fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
204 self.inner.catalog(name)
205 }
206
207 fn register_catalog(
208 &self,
209 name: String,
210 catalog: Arc<dyn CatalogProvider>,
211 ) -> Option<Arc<dyn CatalogProvider>> {
212 if !["blue", "red", "green", "yellow"].contains(&name.as_str()) {
213 log::warn!(
214 "FixedCatalogProviderList only provides four catalogs: blue, red, green, yellow"
215 );
216 return None;
217 }
218
219 self.inner.register_catalog(name, catalog)
220 }
221}
222
223pub(crate) extern "C" fn create_catalog_provider_list(
224 codec: FFI_LogicalExtensionCodec,
225) -> FFI_CatalogProviderList {
226 let catalog_provider_list = Arc::new(FixedCatalogProviderList::default());
227 FFI_CatalogProviderList::new_with_ffi_codec(catalog_provider_list, None, codec)
228}