use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use datafusion_common::{not_impl_err, Constraints, Statistics};
use datafusion_expr::{CreateExternalTable, LogicalPlan};
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
use crate::arrow::datatypes::SchemaRef;
use crate::datasource::listing_table_factory::ListingTableFactory;
use crate::datasource::stream::StreamTableFactory;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::ExecutionPlan;
#[async_trait]
pub trait TableProvider: Sync + Send {
fn as_any(&self) -> &dyn Any;
fn schema(&self) -> SchemaRef;
fn constraints(&self) -> Option<&Constraints> {
None
}
fn table_type(&self) -> TableType;
fn get_table_definition(&self) -> Option<&str> {
None
}
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
None
}
fn get_column_default(&self, _column: &str) -> Option<&Expr> {
None
}
async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![
TableProviderFilterPushDown::Unsupported;
filters.len()
])
}
fn statistics(&self) -> Option<Statistics> {
None
}
async fn insert_into(
&self,
_state: &SessionState,
_input: Arc<dyn ExecutionPlan>,
_overwrite: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Insert into not implemented for this table")
}
}
#[async_trait]
pub trait TableProviderFactory: Sync + Send {
async fn create(
&self,
state: &SessionState,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}
#[derive(Debug, Default)]
pub struct DefaultTableFactory {
stream: StreamTableFactory,
listing: ListingTableFactory,
}
impl DefaultTableFactory {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl TableProviderFactory for DefaultTableFactory {
async fn create(
&self,
state: &SessionState,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
let mut unbounded = cmd.unbounded;
for (k, v) in &cmd.options {
if k.eq_ignore_ascii_case("unbounded") && v.eq_ignore_ascii_case("true") {
unbounded = true
}
}
match unbounded {
true => self.stream.create(state, cmd).await,
false => self.listing.create(state, cmd).await,
}
}
}