datafusion_federation/
table_provider.rs

1use std::{any::Any, borrow::Cow, sync::Arc};
2
3use async_trait::async_trait;
4use datafusion::{
5    arrow::datatypes::SchemaRef,
6    catalog::Session,
7    common::Constraints,
8    datasource::TableProvider,
9    error::{DataFusionError, Result},
10    logical_expr::{
11        dml::InsertOp, Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType,
12    },
13    physical_plan::ExecutionPlan,
14};
15
16use crate::FederationProvider;
17
18// FederatedTableSourceWrapper helps to recover the FederatedTableSource
19// from a TableScan. This wrapper may be avoidable.
20#[derive(Debug)]
21pub struct FederatedTableProviderAdaptor {
22    pub source: Arc<dyn FederatedTableSource>,
23    pub table_provider: Option<Arc<dyn TableProvider>>,
24}
25
26impl FederatedTableProviderAdaptor {
27    pub fn new(source: Arc<dyn FederatedTableSource>) -> Self {
28        Self {
29            source,
30            table_provider: None,
31        }
32    }
33
34    /// Creates a new FederatedTableProviderAdaptor that falls back to the
35    /// provided TableProvider. This is useful if used within a DataFusion
36    /// context without the federation optimizer.
37    pub fn new_with_provider(
38        source: Arc<dyn FederatedTableSource>,
39        table_provider: Arc<dyn TableProvider>,
40    ) -> Self {
41        Self {
42            source,
43            table_provider: Some(table_provider),
44        }
45    }
46}
47
48#[async_trait]
49impl TableProvider for FederatedTableProviderAdaptor {
50    fn as_any(&self) -> &dyn Any {
51        self
52    }
53    fn schema(&self) -> SchemaRef {
54        if let Some(table_provider) = &self.table_provider {
55            return table_provider.schema();
56        }
57
58        self.source.schema()
59    }
60    fn constraints(&self) -> Option<&Constraints> {
61        if let Some(table_provider) = &self.table_provider {
62            return table_provider
63                .constraints()
64                .or_else(|| self.source.constraints());
65        }
66
67        self.source.constraints()
68    }
69    fn table_type(&self) -> TableType {
70        if let Some(table_provider) = &self.table_provider {
71            return table_provider.table_type();
72        }
73
74        self.source.table_type()
75    }
76    fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
77        if let Some(table_provider) = &self.table_provider {
78            return table_provider
79                .get_logical_plan()
80                .or_else(|| self.source.get_logical_plan());
81        }
82
83        self.source.get_logical_plan()
84    }
85    fn get_column_default(&self, column: &str) -> Option<&Expr> {
86        if let Some(table_provider) = &self.table_provider {
87            return table_provider
88                .get_column_default(column)
89                .or_else(|| self.source.get_column_default(column));
90        }
91
92        self.source.get_column_default(column)
93    }
94    fn supports_filters_pushdown(
95        &self,
96        filters: &[&Expr],
97    ) -> Result<Vec<TableProviderFilterPushDown>> {
98        if let Some(table_provider) = &self.table_provider {
99            return table_provider.supports_filters_pushdown(filters);
100        }
101
102        Ok(vec![
103            TableProviderFilterPushDown::Unsupported;
104            filters.len()
105        ])
106    }
107
108    // Scan is not supported; the adaptor should be replaced
109    // with a virtual TableProvider that provides federation for a sub-plan.
110    async fn scan(
111        &self,
112        state: &dyn Session,
113        projection: Option<&Vec<usize>>,
114        filters: &[Expr],
115        limit: Option<usize>,
116    ) -> Result<Arc<dyn ExecutionPlan>> {
117        if let Some(table_provider) = &self.table_provider {
118            return table_provider.scan(state, projection, filters, limit).await;
119        }
120
121        Err(DataFusionError::NotImplemented(
122            "FederatedTableProviderAdaptor cannot scan".to_string(),
123        ))
124    }
125
126    async fn insert_into(
127        &self,
128        _state: &dyn Session,
129        input: Arc<dyn ExecutionPlan>,
130        insert_op: InsertOp,
131    ) -> Result<Arc<dyn ExecutionPlan>> {
132        if let Some(table_provider) = &self.table_provider {
133            return table_provider.insert_into(_state, input, insert_op).await;
134        }
135
136        Err(DataFusionError::NotImplemented(
137            "FederatedTableProviderAdaptor cannot insert_into".to_string(),
138        ))
139    }
140}
141
142// FederatedTableProvider extends DataFusion's TableProvider trait
143// to allow grouping of TableScans of the same FederationProvider.
144#[async_trait]
145pub trait FederatedTableSource: TableSource {
146    // Return the FederationProvider associated with this Table
147    fn federation_provider(&self) -> Arc<dyn FederationProvider>;
148}
149
150impl std::fmt::Debug for dyn FederatedTableSource {
151    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
152        write!(
153            f,
154            "FederatedTableSource: {:?}",
155            self.federation_provider().name()
156        )
157    }
158}