datafusion_ffi/tests/
catalog.rs1use std::any::Any;
29use std::fmt::Debug;
30use std::sync::Arc;
31
32use arrow::datatypes::Schema;
33use async_trait::async_trait;
34use datafusion_catalog::{
35 CatalogProvider, CatalogProviderList, MemTable, MemoryCatalogProvider,
36 MemoryCatalogProviderList, MemorySchemaProvider, SchemaProvider, TableProvider,
37};
38use datafusion_common::{Result, exec_err};
39
40use crate::catalog_provider::FFI_CatalogProvider;
41use crate::catalog_provider_list::FFI_CatalogProviderList;
42use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
43
44#[derive(Debug)]
47pub struct FixedSchemaProvider {
48 inner: MemorySchemaProvider,
49}
50
51pub fn fruit_table() -> Arc<dyn TableProvider + 'static> {
52 use arrow::datatypes::{DataType, Field};
53 use datafusion_common::record_batch;
54
55 let schema = Arc::new(Schema::new(vec![
56 Field::new("units", DataType::Int32, true),
57 Field::new("price", DataType::Float64, true),
58 ]));
59
60 let partitions = vec![
61 record_batch!(
62 ("units", Int32, vec![10, 20, 30]),
63 ("price", Float64, vec![1.0, 2.0, 5.0])
64 )
65 .unwrap(),
66 record_batch!(
67 ("units", Int32, vec![5, 7]),
68 ("price", Float64, vec![1.5, 2.5])
69 )
70 .unwrap(),
71 ];
72
73 Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
74}
75
76impl Default for FixedSchemaProvider {
77 fn default() -> Self {
78 let inner = MemorySchemaProvider::new();
79
80 let table = fruit_table();
81
82 let _ = inner
83 .register_table("purchases".to_string(), table)
84 .unwrap();
85
86 Self { inner }
87 }
88}
89
90#[async_trait]
91impl SchemaProvider for FixedSchemaProvider {
92 fn as_any(&self) -> &dyn Any {
93 self
94 }
95
96 fn table_names(&self) -> Vec<String> {
97 self.inner.table_names()
98 }
99
100 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
101 self.inner.table(name).await
102 }
103
104 fn table_exist(&self, name: &str) -> bool {
105 self.inner.table_exist(name)
106 }
107
108 fn register_table(
109 &self,
110 name: String,
111 table: Arc<dyn TableProvider>,
112 ) -> Result<Option<Arc<dyn TableProvider>>> {
113 if name.as_str() != "sales" && name.as_str() != "purchases" {
114 return exec_err!(
115 "FixedSchemaProvider only provides two tables: sales and purchases"
116 );
117 }
118
119 self.inner.register_table(name, table)
120 }
121
122 fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
123 self.inner.deregister_table(name)
124 }
125}
126
127#[derive(Debug)]
130pub struct FixedCatalogProvider {
131 inner: MemoryCatalogProvider,
132}
133
134impl Default for FixedCatalogProvider {
135 fn default() -> Self {
136 let inner = MemoryCatalogProvider::new();
137
138 let _ = inner.register_schema("apple", Arc::new(FixedSchemaProvider::default()));
139
140 Self { inner }
141 }
142}
143
144impl CatalogProvider for FixedCatalogProvider {
145 fn as_any(&self) -> &dyn Any {
146 self
147 }
148
149 fn schema_names(&self) -> Vec<String> {
150 self.inner.schema_names()
151 }
152
153 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
154 self.inner.schema(name)
155 }
156
157 fn register_schema(
158 &self,
159 name: &str,
160 schema: Arc<dyn SchemaProvider>,
161 ) -> Result<Option<Arc<dyn SchemaProvider>>> {
162 if !["apple", "banana", "cherry", "date"].contains(&name) {
163 return exec_err!(
164 "FixedCatalogProvider only provides four schemas: apple, banana, cherry, date"
165 );
166 }
167
168 self.inner.register_schema(name, schema)
169 }
170
171 fn deregister_schema(
172 &self,
173 name: &str,
174 cascade: bool,
175 ) -> Result<Option<Arc<dyn SchemaProvider>>> {
176 self.inner.deregister_schema(name, cascade)
177 }
178}
179
180pub(crate) extern "C" fn create_catalog_provider(
181 codec: FFI_LogicalExtensionCodec,
182) -> FFI_CatalogProvider {
183 let catalog_provider = Arc::new(FixedCatalogProvider::default());
184 FFI_CatalogProvider::new_with_ffi_codec(catalog_provider, None, codec)
185}
186
187#[derive(Debug)]
190pub struct FixedCatalogProviderList {
191 inner: MemoryCatalogProviderList,
192}
193
194impl Default for FixedCatalogProviderList {
195 fn default() -> Self {
196 let inner = MemoryCatalogProviderList::new();
197
198 let _ = inner.register_catalog(
199 "blue".to_owned(),
200 Arc::new(FixedCatalogProvider::default()),
201 );
202
203 Self { inner }
204 }
205}
206
207impl CatalogProviderList for FixedCatalogProviderList {
208 fn as_any(&self) -> &dyn Any {
209 self
210 }
211
212 fn catalog_names(&self) -> Vec<String> {
213 self.inner.catalog_names()
214 }
215
216 fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
217 self.inner.catalog(name)
218 }
219
220 fn register_catalog(
221 &self,
222 name: String,
223 catalog: Arc<dyn CatalogProvider>,
224 ) -> Option<Arc<dyn CatalogProvider>> {
225 if !["blue", "red", "green", "yellow"].contains(&name.as_str()) {
226 log::warn!(
227 "FixedCatalogProviderList only provides four catalogs: blue, red, green, yellow"
228 );
229 return None;
230 }
231
232 self.inner.register_catalog(name, catalog)
233 }
234}
235
236pub(crate) extern "C" fn create_catalog_provider_list(
237 codec: FFI_LogicalExtensionCodec,
238) -> FFI_CatalogProviderList {
239 let catalog_provider_list = Arc::new(FixedCatalogProviderList::default());
240 FFI_CatalogProviderList::new_with_ffi_codec(catalog_provider_list, None, codec)
241}