datafusion_ffi/tests/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This is an example of an async table provider that will call functions on
19//! the tokio runtime of the library providing the function. Since we cannot
20//! share a tokio runtime across the ffi boundary and the producer and consumer
21//! may have different runtimes, we need to store a reference to the runtime
22//! and enter it during streaming calls. The entering of the runtime will
23//! occur by the datafusion_ffi crate during the streaming calls. This code
24//! serves as an integration test of this feature. If we do not correctly
25//! access the runtime, then you will get a panic when trying to do operations
26//! such as spawning a tokio task.
27
28use std::{any::Any, fmt::Debug, sync::Arc};
29
30use crate::catalog_provider::FFI_CatalogProvider;
31use arrow::datatypes::Schema;
32use async_trait::async_trait;
33use datafusion::{
34    catalog::{
35        CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider,
36        TableProvider,
37    },
38    common::exec_err,
39    datasource::MemTable,
40    error::{DataFusionError, Result},
41};
42
43/// This schema provider is intended only for unit tests. It prepopulates with one
44/// table and only allows for tables named sales and purchases.
45#[derive(Debug)]
46pub struct FixedSchemaProvider {
47    inner: MemorySchemaProvider,
48}
49
50pub fn fruit_table() -> Arc<dyn TableProvider + 'static> {
51    use arrow::datatypes::{DataType, Field};
52    use datafusion::common::record_batch;
53
54    let schema = Arc::new(Schema::new(vec![
55        Field::new("units", DataType::Int32, true),
56        Field::new("price", DataType::Float64, true),
57    ]));
58
59    let partitions = vec![
60        record_batch!(
61            ("units", Int32, vec![10, 20, 30]),
62            ("price", Float64, vec![1.0, 2.0, 5.0])
63        )
64        .unwrap(),
65        record_batch!(
66            ("units", Int32, vec![5, 7]),
67            ("price", Float64, vec![1.5, 2.5])
68        )
69        .unwrap(),
70    ];
71
72    Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
73}
74
75impl Default for FixedSchemaProvider {
76    fn default() -> Self {
77        let inner = MemorySchemaProvider::new();
78
79        let table = fruit_table();
80
81        let _ = inner
82            .register_table("purchases".to_string(), table)
83            .unwrap();
84
85        Self { inner }
86    }
87}
88
89#[async_trait]
90impl SchemaProvider for FixedSchemaProvider {
91    fn as_any(&self) -> &dyn Any {
92        self
93    }
94
95    fn table_names(&self) -> Vec<String> {
96        self.inner.table_names()
97    }
98
99    async fn table(
100        &self,
101        name: &str,
102    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
103        self.inner.table(name).await
104    }
105
106    fn table_exist(&self, name: &str) -> bool {
107        self.inner.table_exist(name)
108    }
109
110    fn register_table(
111        &self,
112        name: String,
113        table: Arc<dyn TableProvider>,
114    ) -> Result<Option<Arc<dyn TableProvider>>> {
115        if name.as_str() != "sales" && name.as_str() != "purchases" {
116            return exec_err!(
117                "FixedSchemaProvider only provides two tables: sales and purchases"
118            );
119        }
120
121        self.inner.register_table(name, table)
122    }
123
124    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
125        self.inner.deregister_table(name)
126    }
127}
128
129/// This catalog provider is intended only for unit tests. It prepopulates with one
130/// schema and only allows for schemas named after four types of fruit.
131#[derive(Debug)]
132pub struct FixedCatalogProvider {
133    inner: MemoryCatalogProvider,
134}
135
136impl Default for FixedCatalogProvider {
137    fn default() -> Self {
138        let inner = MemoryCatalogProvider::new();
139
140        let _ = inner.register_schema("apple", Arc::new(FixedSchemaProvider::default()));
141
142        Self { inner }
143    }
144}
145
146impl CatalogProvider for FixedCatalogProvider {
147    fn as_any(&self) -> &dyn Any {
148        self
149    }
150
151    fn schema_names(&self) -> Vec<String> {
152        self.inner.schema_names()
153    }
154
155    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
156        self.inner.schema(name)
157    }
158
159    fn register_schema(
160        &self,
161        name: &str,
162        schema: Arc<dyn SchemaProvider>,
163    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
164        if !["apple", "banana", "cherry", "date"].contains(&name) {
165            return exec_err!("FixedCatalogProvider only provides four schemas: apple, banana, cherry, date");
166        }
167
168        self.inner.register_schema(name, schema)
169    }
170
171    fn deregister_schema(
172        &self,
173        name: &str,
174        cascade: bool,
175    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
176        self.inner.deregister_schema(name, cascade)
177    }
178}
179
180pub(crate) extern "C" fn create_catalog_provider() -> FFI_CatalogProvider {
181    let catalog_provider = Arc::new(FixedCatalogProvider::default());
182    FFI_CatalogProvider::new(catalog_provider, None)
183}