Skip to main content

datafusion_ffi/tests/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This is an example of an async table provider that will call functions on
19//! the tokio runtime of the library providing the function. Since we cannot
20//! share a tokio runtime across the ffi boundary and the producer and consumer
21//! may have different runtimes, we need to store a reference to the runtime
22//! and enter it during streaming calls. The entering of the runtime will
23//! occur by the datafusion_ffi crate during the streaming calls. This code
24//! serves as an integration test of this feature. If we do not correctly
25//! access the runtime, then you will get a panic when trying to do operations
26//! such as spawning a tokio task.
27
28use std::fmt::Debug;
29use std::sync::Arc;
30
31use arrow::datatypes::Schema;
32use async_trait::async_trait;
33use datafusion_catalog::{
34    CatalogProvider, CatalogProviderList, MemTable, MemoryCatalogProvider,
35    MemoryCatalogProviderList, MemorySchemaProvider, SchemaProvider, TableProvider,
36};
37use datafusion_common::{Result, exec_err};
38
39use crate::catalog_provider::FFI_CatalogProvider;
40use crate::catalog_provider_list::FFI_CatalogProviderList;
41use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
42
43/// This schema provider is intended only for unit tests. It prepopulates with one
44/// table and only allows for tables named sales and purchases.
45#[derive(Debug)]
46pub struct FixedSchemaProvider {
47    inner: MemorySchemaProvider,
48}
49
50pub fn fruit_table() -> Arc<dyn TableProvider + 'static> {
51    use arrow::datatypes::{DataType, Field};
52    use datafusion_common::record_batch;
53
54    let schema = Arc::new(Schema::new(vec![
55        Field::new("units", DataType::Int32, true),
56        Field::new("price", DataType::Float64, true),
57    ]));
58
59    let partitions = vec![
60        record_batch!(
61            ("units", Int32, vec![10, 20, 30]),
62            ("price", Float64, vec![1.0, 2.0, 5.0])
63        )
64        .unwrap(),
65        record_batch!(
66            ("units", Int32, vec![5, 7]),
67            ("price", Float64, vec![1.5, 2.5])
68        )
69        .unwrap(),
70    ];
71
72    Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
73}
74
75impl Default for FixedSchemaProvider {
76    fn default() -> Self {
77        let inner = MemorySchemaProvider::new();
78
79        let table = fruit_table();
80
81        let _ = inner
82            .register_table("purchases".to_string(), table)
83            .unwrap();
84
85        Self { inner }
86    }
87}
88
89#[async_trait]
90impl SchemaProvider for FixedSchemaProvider {
91    fn table_names(&self) -> Vec<String> {
92        self.inner.table_names()
93    }
94
95    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
96        self.inner.table(name).await
97    }
98
99    fn table_exist(&self, name: &str) -> bool {
100        self.inner.table_exist(name)
101    }
102
103    fn register_table(
104        &self,
105        name: String,
106        table: Arc<dyn TableProvider>,
107    ) -> Result<Option<Arc<dyn TableProvider>>> {
108        if name.as_str() != "sales" && name.as_str() != "purchases" {
109            return exec_err!(
110                "FixedSchemaProvider only provides two tables: sales and purchases"
111            );
112        }
113
114        self.inner.register_table(name, table)
115    }
116
117    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
118        self.inner.deregister_table(name)
119    }
120}
121
122/// This catalog provider is intended only for unit tests. It prepopulates with one
123/// schema and only allows for schemas named after four types of fruit.
124#[derive(Debug)]
125pub struct FixedCatalogProvider {
126    inner: MemoryCatalogProvider,
127}
128
129impl Default for FixedCatalogProvider {
130    fn default() -> Self {
131        let inner = MemoryCatalogProvider::new();
132
133        let _ = inner.register_schema("apple", Arc::new(FixedSchemaProvider::default()));
134
135        Self { inner }
136    }
137}
138
139impl CatalogProvider for FixedCatalogProvider {
140    fn schema_names(&self) -> Vec<String> {
141        self.inner.schema_names()
142    }
143
144    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
145        self.inner.schema(name)
146    }
147
148    fn register_schema(
149        &self,
150        name: &str,
151        schema: Arc<dyn SchemaProvider>,
152    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
153        if !["apple", "banana", "cherry", "date"].contains(&name) {
154            return exec_err!(
155                "FixedCatalogProvider only provides four schemas: apple, banana, cherry, date"
156            );
157        }
158
159        self.inner.register_schema(name, schema)
160    }
161
162    fn deregister_schema(
163        &self,
164        name: &str,
165        cascade: bool,
166    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
167        self.inner.deregister_schema(name, cascade)
168    }
169}
170
171pub(crate) extern "C" fn create_catalog_provider(
172    codec: FFI_LogicalExtensionCodec,
173) -> FFI_CatalogProvider {
174    let catalog_provider = Arc::new(FixedCatalogProvider::default());
175    FFI_CatalogProvider::new_with_ffi_codec(catalog_provider, None, codec)
176}
177
178/// This catalog provider list is intended only for unit tests. It prepopulates with one
179/// catalog and only allows for catalogs named after four colors.
180#[derive(Debug)]
181pub struct FixedCatalogProviderList {
182    inner: MemoryCatalogProviderList,
183}
184
185impl Default for FixedCatalogProviderList {
186    fn default() -> Self {
187        let inner = MemoryCatalogProviderList::new();
188
189        let _ = inner.register_catalog(
190            "blue".to_owned(),
191            Arc::new(FixedCatalogProvider::default()),
192        );
193
194        Self { inner }
195    }
196}
197
198impl CatalogProviderList for FixedCatalogProviderList {
199    fn catalog_names(&self) -> Vec<String> {
200        self.inner.catalog_names()
201    }
202
203    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
204        self.inner.catalog(name)
205    }
206
207    fn register_catalog(
208        &self,
209        name: String,
210        catalog: Arc<dyn CatalogProvider>,
211    ) -> Option<Arc<dyn CatalogProvider>> {
212        if !["blue", "red", "green", "yellow"].contains(&name.as_str()) {
213            log::warn!(
214                "FixedCatalogProviderList only provides four catalogs: blue, red, green, yellow"
215            );
216            return None;
217        }
218
219        self.inner.register_catalog(name, catalog)
220    }
221}
222
223pub(crate) extern "C" fn create_catalog_provider_list(
224    codec: FFI_LogicalExtensionCodec,
225) -> FFI_CatalogProviderList {
226    let catalog_provider_list = Arc::new(FixedCatalogProviderList::default());
227    FFI_CatalogProviderList::new_with_ffi_codec(catalog_provider_list, None, codec)
228}