datafusion_federation/
table_provider.rs1use std::{any::Any, borrow::Cow, sync::Arc};
2
3use async_trait::async_trait;
4use datafusion::{
5 arrow::datatypes::SchemaRef,
6 catalog::Session,
7 common::Constraints,
8 datasource::TableProvider,
9 error::{DataFusionError, Result},
10 logical_expr::{
11 dml::InsertOp, Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType,
12 },
13 physical_plan::ExecutionPlan,
14};
15
16use crate::FederationProvider;
17
18#[derive(Debug)]
21pub struct FederatedTableProviderAdaptor {
22 pub source: Arc<dyn FederatedTableSource>,
23 pub table_provider: Option<Arc<dyn TableProvider>>,
24}
25
26impl FederatedTableProviderAdaptor {
27 pub fn new(source: Arc<dyn FederatedTableSource>) -> Self {
28 Self {
29 source,
30 table_provider: None,
31 }
32 }
33
34 pub fn new_with_provider(
38 source: Arc<dyn FederatedTableSource>,
39 table_provider: Arc<dyn TableProvider>,
40 ) -> Self {
41 Self {
42 source,
43 table_provider: Some(table_provider),
44 }
45 }
46}
47
48#[async_trait]
49impl TableProvider for FederatedTableProviderAdaptor {
50 fn as_any(&self) -> &dyn Any {
51 self
52 }
53 fn schema(&self) -> SchemaRef {
54 if let Some(table_provider) = &self.table_provider {
55 return table_provider.schema();
56 }
57
58 self.source.schema()
59 }
60 fn constraints(&self) -> Option<&Constraints> {
61 if let Some(table_provider) = &self.table_provider {
62 return table_provider
63 .constraints()
64 .or_else(|| self.source.constraints());
65 }
66
67 self.source.constraints()
68 }
69 fn table_type(&self) -> TableType {
70 if let Some(table_provider) = &self.table_provider {
71 return table_provider.table_type();
72 }
73
74 self.source.table_type()
75 }
76 fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
77 if let Some(table_provider) = &self.table_provider {
78 return table_provider
79 .get_logical_plan()
80 .or_else(|| self.source.get_logical_plan());
81 }
82
83 self.source.get_logical_plan()
84 }
85 fn get_column_default(&self, column: &str) -> Option<&Expr> {
86 if let Some(table_provider) = &self.table_provider {
87 return table_provider
88 .get_column_default(column)
89 .or_else(|| self.source.get_column_default(column));
90 }
91
92 self.source.get_column_default(column)
93 }
94 fn supports_filters_pushdown(
95 &self,
96 filters: &[&Expr],
97 ) -> Result<Vec<TableProviderFilterPushDown>> {
98 if let Some(table_provider) = &self.table_provider {
99 return table_provider.supports_filters_pushdown(filters);
100 }
101
102 Ok(vec![
103 TableProviderFilterPushDown::Unsupported;
104 filters.len()
105 ])
106 }
107
108 async fn scan(
111 &self,
112 state: &dyn Session,
113 projection: Option<&Vec<usize>>,
114 filters: &[Expr],
115 limit: Option<usize>,
116 ) -> Result<Arc<dyn ExecutionPlan>> {
117 if let Some(table_provider) = &self.table_provider {
118 return table_provider.scan(state, projection, filters, limit).await;
119 }
120
121 Err(DataFusionError::NotImplemented(
122 "FederatedTableProviderAdaptor cannot scan".to_string(),
123 ))
124 }
125
126 async fn insert_into(
127 &self,
128 _state: &dyn Session,
129 input: Arc<dyn ExecutionPlan>,
130 insert_op: InsertOp,
131 ) -> Result<Arc<dyn ExecutionPlan>> {
132 if let Some(table_provider) = &self.table_provider {
133 return table_provider.insert_into(_state, input, insert_op).await;
134 }
135
136 Err(DataFusionError::NotImplemented(
137 "FederatedTableProviderAdaptor cannot insert_into".to_string(),
138 ))
139 }
140}
141
142#[async_trait]
145pub trait FederatedTableSource: TableSource {
146 fn federation_provider(&self) -> Arc<dyn FederationProvider>;
148}
149
150impl std::fmt::Debug for dyn FederatedTableSource {
151 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
152 write!(
153 f,
154 "FederatedTableSource: {:?}",
155 self.federation_provider().name()
156 )
157 }
158}