use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
use arrow::datatypes::Schema;
use async_trait::async_trait;
use datafusion_catalog::{
CatalogProvider, CatalogProviderList, MemTable, MemoryCatalogProvider,
MemoryCatalogProviderList, MemorySchemaProvider, SchemaProvider, TableProvider,
};
use datafusion_common::{Result, exec_err};
use crate::catalog_provider::FFI_CatalogProvider;
use crate::catalog_provider_list::FFI_CatalogProviderList;
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
#[derive(Debug)]
pub struct FixedSchemaProvider {
inner: MemorySchemaProvider,
}
pub fn fruit_table() -> Arc<dyn TableProvider + 'static> {
use arrow::datatypes::{DataType, Field};
use datafusion_common::record_batch;
let schema = Arc::new(Schema::new(vec![
Field::new("units", DataType::Int32, true),
Field::new("price", DataType::Float64, true),
]));
let partitions = vec![
record_batch!(
("units", Int32, vec![10, 20, 30]),
("price", Float64, vec![1.0, 2.0, 5.0])
)
.unwrap(),
record_batch!(
("units", Int32, vec![5, 7]),
("price", Float64, vec![1.5, 2.5])
)
.unwrap(),
];
Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
}
impl Default for FixedSchemaProvider {
fn default() -> Self {
let inner = MemorySchemaProvider::new();
let table = fruit_table();
let _ = inner
.register_table("purchases".to_string(), table)
.unwrap();
Self { inner }
}
}
#[async_trait]
impl SchemaProvider for FixedSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.inner.table_names()
}
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
self.inner.table(name).await
}
fn table_exist(&self, name: &str) -> bool {
self.inner.table_exist(name)
}
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
if name.as_str() != "sales" && name.as_str() != "purchases" {
return exec_err!(
"FixedSchemaProvider only provides two tables: sales and purchases"
);
}
self.inner.register_table(name, table)
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
self.inner.deregister_table(name)
}
}
#[derive(Debug)]
pub struct FixedCatalogProvider {
inner: MemoryCatalogProvider,
}
impl Default for FixedCatalogProvider {
fn default() -> Self {
let inner = MemoryCatalogProvider::new();
let _ = inner.register_schema("apple", Arc::new(FixedSchemaProvider::default()));
Self { inner }
}
}
impl CatalogProvider for FixedCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.inner.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.inner.schema(name)
}
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
if !["apple", "banana", "cherry", "date"].contains(&name) {
return exec_err!(
"FixedCatalogProvider only provides four schemas: apple, banana, cherry, date"
);
}
self.inner.register_schema(name, schema)
}
fn deregister_schema(
&self,
name: &str,
cascade: bool,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
self.inner.deregister_schema(name, cascade)
}
}
pub(crate) extern "C" fn create_catalog_provider(
codec: FFI_LogicalExtensionCodec,
) -> FFI_CatalogProvider {
let catalog_provider = Arc::new(FixedCatalogProvider::default());
FFI_CatalogProvider::new_with_ffi_codec(catalog_provider, None, codec)
}
#[derive(Debug)]
pub struct FixedCatalogProviderList {
inner: MemoryCatalogProviderList,
}
impl Default for FixedCatalogProviderList {
fn default() -> Self {
let inner = MemoryCatalogProviderList::new();
let _ = inner.register_catalog(
"blue".to_owned(),
Arc::new(FixedCatalogProvider::default()),
);
Self { inner }
}
}
impl CatalogProviderList for FixedCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
}
fn catalog_names(&self) -> Vec<String> {
self.inner.catalog_names()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.inner.catalog(name)
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
if !["blue", "red", "green", "yellow"].contains(&name.as_str()) {
log::warn!(
"FixedCatalogProviderList only provides four catalogs: blue, red, green, yellow"
);
return None;
}
self.inner.register_catalog(name, catalog)
}
}
pub(crate) extern "C" fn create_catalog_provider_list(
codec: FFI_LogicalExtensionCodec,
) -> FFI_CatalogProviderList {
let catalog_provider_list = Arc::new(FixedCatalogProviderList::default());
FFI_CatalogProviderList::new_with_ffi_codec(catalog_provider_list, None, codec)
}