datafusion_ffi/tests/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This is an example of an async table provider that will call functions on
19//! the tokio runtime of the library providing the function. Since we cannot
20//! share a tokio runtime across the ffi boundary and the producer and consumer
21//! may have different runtimes, we need to store a reference to the runtime
22//! and enter it during streaming calls. The entering of the runtime will
23//! occur by the datafusion_ffi crate during the streaming calls. This code
24//! serves as an integration test of this feature. If we do not correctly
25//! access the runtime, then you will get a panic when trying to do operations
26//! such as spawning a tokio task.
27
28use std::any::Any;
29use std::fmt::Debug;
30use std::sync::Arc;
31
32use arrow::datatypes::Schema;
33use async_trait::async_trait;
34use datafusion_catalog::{
35    CatalogProvider, CatalogProviderList, MemTable, MemoryCatalogProvider,
36    MemoryCatalogProviderList, MemorySchemaProvider, SchemaProvider, TableProvider,
37};
38use datafusion_common::{Result, exec_err};
39
40use crate::catalog_provider::FFI_CatalogProvider;
41use crate::catalog_provider_list::FFI_CatalogProviderList;
42use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
43
44/// This schema provider is intended only for unit tests. It prepopulates with one
45/// table and only allows for tables named sales and purchases.
46#[derive(Debug)]
47pub struct FixedSchemaProvider {
48    inner: MemorySchemaProvider,
49}
50
51pub fn fruit_table() -> Arc<dyn TableProvider + 'static> {
52    use arrow::datatypes::{DataType, Field};
53    use datafusion_common::record_batch;
54
55    let schema = Arc::new(Schema::new(vec![
56        Field::new("units", DataType::Int32, true),
57        Field::new("price", DataType::Float64, true),
58    ]));
59
60    let partitions = vec![
61        record_batch!(
62            ("units", Int32, vec![10, 20, 30]),
63            ("price", Float64, vec![1.0, 2.0, 5.0])
64        )
65        .unwrap(),
66        record_batch!(
67            ("units", Int32, vec![5, 7]),
68            ("price", Float64, vec![1.5, 2.5])
69        )
70        .unwrap(),
71    ];
72
73    Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
74}
75
76impl Default for FixedSchemaProvider {
77    fn default() -> Self {
78        let inner = MemorySchemaProvider::new();
79
80        let table = fruit_table();
81
82        let _ = inner
83            .register_table("purchases".to_string(), table)
84            .unwrap();
85
86        Self { inner }
87    }
88}
89
90#[async_trait]
91impl SchemaProvider for FixedSchemaProvider {
92    fn as_any(&self) -> &dyn Any {
93        self
94    }
95
96    fn table_names(&self) -> Vec<String> {
97        self.inner.table_names()
98    }
99
100    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
101        self.inner.table(name).await
102    }
103
104    fn table_exist(&self, name: &str) -> bool {
105        self.inner.table_exist(name)
106    }
107
108    fn register_table(
109        &self,
110        name: String,
111        table: Arc<dyn TableProvider>,
112    ) -> Result<Option<Arc<dyn TableProvider>>> {
113        if name.as_str() != "sales" && name.as_str() != "purchases" {
114            return exec_err!(
115                "FixedSchemaProvider only provides two tables: sales and purchases"
116            );
117        }
118
119        self.inner.register_table(name, table)
120    }
121
122    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
123        self.inner.deregister_table(name)
124    }
125}
126
127/// This catalog provider is intended only for unit tests. It prepopulates with one
128/// schema and only allows for schemas named after four types of fruit.
129#[derive(Debug)]
130pub struct FixedCatalogProvider {
131    inner: MemoryCatalogProvider,
132}
133
134impl Default for FixedCatalogProvider {
135    fn default() -> Self {
136        let inner = MemoryCatalogProvider::new();
137
138        let _ = inner.register_schema("apple", Arc::new(FixedSchemaProvider::default()));
139
140        Self { inner }
141    }
142}
143
144impl CatalogProvider for FixedCatalogProvider {
145    fn as_any(&self) -> &dyn Any {
146        self
147    }
148
149    fn schema_names(&self) -> Vec<String> {
150        self.inner.schema_names()
151    }
152
153    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
154        self.inner.schema(name)
155    }
156
157    fn register_schema(
158        &self,
159        name: &str,
160        schema: Arc<dyn SchemaProvider>,
161    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
162        if !["apple", "banana", "cherry", "date"].contains(&name) {
163            return exec_err!(
164                "FixedCatalogProvider only provides four schemas: apple, banana, cherry, date"
165            );
166        }
167
168        self.inner.register_schema(name, schema)
169    }
170
171    fn deregister_schema(
172        &self,
173        name: &str,
174        cascade: bool,
175    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
176        self.inner.deregister_schema(name, cascade)
177    }
178}
179
180pub(crate) extern "C" fn create_catalog_provider(
181    codec: FFI_LogicalExtensionCodec,
182) -> FFI_CatalogProvider {
183    let catalog_provider = Arc::new(FixedCatalogProvider::default());
184    FFI_CatalogProvider::new_with_ffi_codec(catalog_provider, None, codec)
185}
186
187/// This catalog provider list is intended only for unit tests. It prepopulates with one
188/// catalog and only allows for catalogs named after four colors.
189#[derive(Debug)]
190pub struct FixedCatalogProviderList {
191    inner: MemoryCatalogProviderList,
192}
193
194impl Default for FixedCatalogProviderList {
195    fn default() -> Self {
196        let inner = MemoryCatalogProviderList::new();
197
198        let _ = inner.register_catalog(
199            "blue".to_owned(),
200            Arc::new(FixedCatalogProvider::default()),
201        );
202
203        Self { inner }
204    }
205}
206
207impl CatalogProviderList for FixedCatalogProviderList {
208    fn as_any(&self) -> &dyn Any {
209        self
210    }
211
212    fn catalog_names(&self) -> Vec<String> {
213        self.inner.catalog_names()
214    }
215
216    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
217        self.inner.catalog(name)
218    }
219
220    fn register_catalog(
221        &self,
222        name: String,
223        catalog: Arc<dyn CatalogProvider>,
224    ) -> Option<Arc<dyn CatalogProvider>> {
225        if !["blue", "red", "green", "yellow"].contains(&name.as_str()) {
226            log::warn!(
227                "FixedCatalogProviderList only provides four catalogs: blue, red, green, yellow"
228            );
229            return None;
230        }
231
232        self.inner.register_catalog(name, catalog)
233    }
234}
235
236pub(crate) extern "C" fn create_catalog_provider_list(
237    codec: FFI_LogicalExtensionCodec,
238) -> FFI_CatalogProviderList {
239    let catalog_provider_list = Arc::new(FixedCatalogProviderList::default());
240    FFI_CatalogProviderList::new_with_ffi_codec(catalog_provider_list, None, codec)
241}