datafusion_ffi/tests/
catalog.rs1use std::{any::Any, fmt::Debug, sync::Arc};
29
30use crate::catalog_provider::FFI_CatalogProvider;
31use arrow::datatypes::Schema;
32use async_trait::async_trait;
33use datafusion::{
34 catalog::{
35 CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider,
36 TableProvider,
37 },
38 common::exec_err,
39 datasource::MemTable,
40 error::{DataFusionError, Result},
41};
42
43#[derive(Debug)]
46pub struct FixedSchemaProvider {
47 inner: MemorySchemaProvider,
48}
49
50pub fn fruit_table() -> Arc<dyn TableProvider + 'static> {
51 use arrow::datatypes::{DataType, Field};
52 use datafusion::common::record_batch;
53
54 let schema = Arc::new(Schema::new(vec![
55 Field::new("units", DataType::Int32, true),
56 Field::new("price", DataType::Float64, true),
57 ]));
58
59 let partitions = vec![
60 record_batch!(
61 ("units", Int32, vec![10, 20, 30]),
62 ("price", Float64, vec![1.0, 2.0, 5.0])
63 )
64 .unwrap(),
65 record_batch!(
66 ("units", Int32, vec![5, 7]),
67 ("price", Float64, vec![1.5, 2.5])
68 )
69 .unwrap(),
70 ];
71
72 Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
73}
74
75impl Default for FixedSchemaProvider {
76 fn default() -> Self {
77 let inner = MemorySchemaProvider::new();
78
79 let table = fruit_table();
80
81 let _ = inner
82 .register_table("purchases".to_string(), table)
83 .unwrap();
84
85 Self { inner }
86 }
87}
88
89#[async_trait]
90impl SchemaProvider for FixedSchemaProvider {
91 fn as_any(&self) -> &dyn Any {
92 self
93 }
94
95 fn table_names(&self) -> Vec<String> {
96 self.inner.table_names()
97 }
98
99 async fn table(
100 &self,
101 name: &str,
102 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
103 self.inner.table(name).await
104 }
105
106 fn table_exist(&self, name: &str) -> bool {
107 self.inner.table_exist(name)
108 }
109
110 fn register_table(
111 &self,
112 name: String,
113 table: Arc<dyn TableProvider>,
114 ) -> Result<Option<Arc<dyn TableProvider>>> {
115 if name.as_str() != "sales" && name.as_str() != "purchases" {
116 return exec_err!(
117 "FixedSchemaProvider only provides two tables: sales and purchases"
118 );
119 }
120
121 self.inner.register_table(name, table)
122 }
123
124 fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
125 self.inner.deregister_table(name)
126 }
127}
128
129#[derive(Debug)]
132pub struct FixedCatalogProvider {
133 inner: MemoryCatalogProvider,
134}
135
136impl Default for FixedCatalogProvider {
137 fn default() -> Self {
138 let inner = MemoryCatalogProvider::new();
139
140 let _ = inner.register_schema("apple", Arc::new(FixedSchemaProvider::default()));
141
142 Self { inner }
143 }
144}
145
146impl CatalogProvider for FixedCatalogProvider {
147 fn as_any(&self) -> &dyn Any {
148 self
149 }
150
151 fn schema_names(&self) -> Vec<String> {
152 self.inner.schema_names()
153 }
154
155 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
156 self.inner.schema(name)
157 }
158
159 fn register_schema(
160 &self,
161 name: &str,
162 schema: Arc<dyn SchemaProvider>,
163 ) -> Result<Option<Arc<dyn SchemaProvider>>> {
164 if !["apple", "banana", "cherry", "date"].contains(&name) {
165 return exec_err!("FixedCatalogProvider only provides four schemas: apple, banana, cherry, date");
166 }
167
168 self.inner.register_schema(name, schema)
169 }
170
171 fn deregister_schema(
172 &self,
173 name: &str,
174 cascade: bool,
175 ) -> Result<Option<Arc<dyn SchemaProvider>>> {
176 self.inner.deregister_schema(name, cascade)
177 }
178}
179
180pub(crate) extern "C" fn create_catalog_provider() -> FFI_CatalogProvider {
181 let catalog_provider = Arc::new(FixedCatalogProvider::default());
182 FFI_CatalogProvider::new(catalog_provider, None)
183}