use std::any::Any;
use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::Arc;
use crate::session::Session;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{Constraints, Statistics, not_impl_err};
use datafusion_expr::Expr;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
};
use datafusion_physical_plan::ExecutionPlan;
#[async_trait]
pub trait TableProvider: Debug + Sync + Send {
fn as_any(&self) -> &dyn Any;
fn schema(&self) -> SchemaRef;
fn constraints(&self) -> Option<&Constraints> {
None
}
fn table_type(&self) -> TableType;
fn get_table_definition(&self) -> Option<&str> {
None
}
fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
None
}
fn get_column_default(&self, _column: &str) -> Option<&Expr> {
None
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;
async fn scan_with_args<'a>(
&self,
state: &dyn Session,
args: ScanArgs<'a>,
) -> Result<ScanResult> {
let filters = args.filters().unwrap_or(&[]);
let projection = args.projection().map(|p| p.to_vec());
let limit = args.limit();
let plan = self
.scan(state, projection.as_ref(), filters, limit)
.await?;
Ok(plan.into())
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![
TableProviderFilterPushDown::Unsupported;
filters.len()
])
}
fn statistics(&self) -> Option<Statistics> {
None
}
async fn insert_into(
&self,
_state: &dyn Session,
_input: Arc<dyn ExecutionPlan>,
_insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Insert into not implemented for this table")
}
async fn delete_from(
&self,
_state: &dyn Session,
_filters: Vec<Expr>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("DELETE not supported for {} table", self.table_type())
}
async fn update(
&self,
_state: &dyn Session,
_assignments: Vec<(String, Expr)>,
_filters: Vec<Expr>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("UPDATE not supported for {} table", self.table_type())
}
async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("TRUNCATE not supported for {} table", self.table_type())
}
}
#[derive(Debug, Clone, Default)]
pub struct ScanArgs<'a> {
filters: Option<&'a [Expr]>,
projection: Option<&'a [usize]>,
limit: Option<usize>,
}
impl<'a> ScanArgs<'a> {
pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
self.projection = projection;
self
}
pub fn projection(&self) -> Option<&'a [usize]> {
self.projection
}
pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self {
self.filters = filters;
self
}
pub fn filters(&self) -> Option<&'a [Expr]> {
self.filters
}
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
pub fn limit(&self) -> Option<usize> {
self.limit
}
}
#[derive(Debug, Clone)]
pub struct ScanResult {
plan: Arc<dyn ExecutionPlan>,
}
impl ScanResult {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self { plan }
}
pub fn plan(&self) -> &Arc<dyn ExecutionPlan> {
&self.plan
}
pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
self.plan
}
}
impl From<Arc<dyn ExecutionPlan>> for ScanResult {
fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
Self::new(plan)
}
}
#[async_trait]
pub trait TableProviderFactory: Debug + Sync + Send {
async fn create(
&self,
state: &dyn Session,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}
pub trait TableFunctionImpl: Debug + Sync + Send {
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
}
#[derive(Clone, Debug)]
pub struct TableFunction {
name: String,
fun: Arc<dyn TableFunctionImpl>,
}
impl TableFunction {
pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
Self { name, fun }
}
pub fn name(&self) -> &str {
&self.name
}
pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
&self.fun
}
pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
self.fun.call(args)
}
}