use std::{any::Any, borrow::Cow, sync::Arc};
use async_trait::async_trait;
use datafusion::{
arrow::datatypes::SchemaRef,
catalog::Session,
common::Constraints,
datasource::TableProvider,
error::{DataFusionError, Result},
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType},
physical_plan::ExecutionPlan,
};
use crate::FederationProvider;
pub struct FederatedTableProviderAdaptor {
pub source: Arc<dyn FederatedTableSource>,
pub table_provider: Option<Arc<dyn TableProvider>>,
}
impl FederatedTableProviderAdaptor {
pub fn new(source: Arc<dyn FederatedTableSource>) -> Self {
Self {
source,
table_provider: None,
}
}
pub fn new_with_provider(
source: Arc<dyn FederatedTableSource>,
table_provider: Arc<dyn TableProvider>,
) -> Self {
Self {
source,
table_provider: Some(table_provider),
}
}
}
#[async_trait]
impl TableProvider for FederatedTableProviderAdaptor {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
if let Some(table_provider) = &self.table_provider {
return table_provider.schema();
}
self.source.schema()
}
fn constraints(&self) -> Option<&Constraints> {
if let Some(table_provider) = &self.table_provider {
return table_provider
.constraints()
.or_else(|| self.source.constraints());
}
self.source.constraints()
}
fn table_type(&self) -> TableType {
if let Some(table_provider) = &self.table_provider {
return table_provider.table_type();
}
self.source.table_type()
}
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
if let Some(table_provider) = &self.table_provider {
return table_provider
.get_logical_plan()
.or_else(|| self.source.get_logical_plan());
}
self.source.get_logical_plan()
}
fn get_column_default(&self, column: &str) -> Option<&Expr> {
if let Some(table_provider) = &self.table_provider {
return table_provider
.get_column_default(column)
.or_else(|| self.source.get_column_default(column));
}
self.source.get_column_default(column)
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
if let Some(table_provider) = &self.table_provider {
return table_provider.supports_filters_pushdown(filters);
}
Ok(vec![
TableProviderFilterPushDown::Unsupported;
filters.len()
])
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(table_provider) = &self.table_provider {
return table_provider.scan(state, projection, filters, limit).await;
}
Err(DataFusionError::NotImplemented(
"FederatedTableProviderAdaptor cannot scan".to_string(),
))
}
async fn insert_into(
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(table_provider) = &self.table_provider {
return table_provider.insert_into(_state, input, overwrite).await;
}
Err(DataFusionError::NotImplemented(
"FederatedTableProviderAdaptor cannot insert_into".to_string(),
))
}
}
#[async_trait]
pub trait FederatedTableSource: TableSource {
fn federation_provider(&self) -> Arc<dyn FederationProvider>;
}