use crate::sql::SQLFederationProvider;
use crate::FederatedTableSource;
use crate::FederationProvider;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::Result;
use datafusion::logical_expr::TableSource;
use datafusion::logical_expr::TableType;
use datafusion::sql::TableReference;
use std::any::Any;
use std::sync::Arc;
use super::ast_analyzer;
use super::executor::LogicalOptimizer;
use super::executor::SqlQueryRewriter;
use super::AstAnalyzer;
use super::RemoteTableRef;
pub trait SQLTable: std::fmt::Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn table_reference(&self) -> TableReference;
fn schema(&self) -> SchemaRef;
fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
None
}
fn ast_analyzer(&self) -> Option<AstAnalyzer> {
None
}
fn sql_query_rewriter(&self) -> Option<SqlQueryRewriter> {
None
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RemoteTable {
remote_table_ref: RemoteTableRef,
schema: SchemaRef,
}
impl RemoteTable {
pub fn new(table_ref: RemoteTableRef, schema: SchemaRef) -> Self {
Self {
remote_table_ref: table_ref,
schema,
}
}
pub fn table_reference(&self) -> &TableReference {
self.remote_table_ref.table_ref()
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
}
impl SQLTable for RemoteTable {
fn as_any(&self) -> &dyn Any {
self
}
fn table_reference(&self) -> TableReference {
Self::table_reference(self).clone()
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
None
}
fn ast_analyzer(&self) -> Option<AstAnalyzer> {
if let Some(args) = self.remote_table_ref.args() {
Some(
ast_analyzer::TableArgReplace::default()
.with(self.remote_table_ref.table_ref().clone(), args.to_vec())
.into_analyzer(),
)
} else {
None
}
}
}
#[derive(Debug, Clone)]
pub struct SQLTableSource {
pub provider: Arc<SQLFederationProvider>,
pub table: Arc<dyn SQLTable>,
}
impl SQLTableSource {
pub async fn new(
provider: Arc<SQLFederationProvider>,
table_ref: RemoteTableRef,
) -> Result<Self> {
let table_name = table_ref.to_quoted_string();
let schema = provider.executor.get_table_schema(&table_name).await?;
Ok(Self::new_with_schema(provider, table_ref, schema))
}
pub fn new_with_schema(
provider: Arc<SQLFederationProvider>,
table_ref: RemoteTableRef,
schema: SchemaRef,
) -> Self {
Self {
provider,
table: Arc::new(RemoteTable::new(table_ref, schema)),
}
}
pub fn new_with_table(provider: Arc<SQLFederationProvider>, table: Arc<dyn SQLTable>) -> Self {
Self { provider, table }
}
pub fn table_reference(&self) -> TableReference {
self.table.table_reference()
}
}
impl TableSource for SQLTableSource {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.table.schema()
}
fn table_type(&self) -> TableType {
TableType::Temporary
}
}
impl FederatedTableSource for SQLTableSource {
fn federation_provider(&self) -> Arc<dyn FederationProvider> {
Arc::clone(&self.provider) as Arc<dyn FederationProvider>
}
}