datafusion_federation/sql/
table.rs1use crate::sql::SQLFederationProvider;
2use crate::FederatedTableSource;
3use crate::FederationProvider;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::error::Result;
6use datafusion::logical_expr::TableSource;
7use datafusion::logical_expr::TableType;
8use datafusion::sql::TableReference;
9use std::any::Any;
10use std::sync::Arc;
11
12use super::ast_analyzer;
13use super::executor::LogicalOptimizer;
14use super::AstAnalyzer;
15use super::RemoteTableRef;
16
17pub trait SQLTable: std::fmt::Debug + Send + Sync {
22 fn as_any(&self) -> &dyn Any;
24 fn table_reference(&self) -> TableReference;
30 fn schema(&self) -> SchemaRef;
32 fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
34 None
35 }
36 fn ast_analyzer(&self) -> Option<AstAnalyzer> {
38 None
39 }
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct RemoteTable {
45 remote_table_ref: RemoteTableRef,
46 schema: SchemaRef,
47}
48
49impl RemoteTable {
50 pub fn new(table_ref: RemoteTableRef, schema: SchemaRef) -> Self {
64 Self {
65 remote_table_ref: table_ref,
66 schema,
67 }
68 }
69
70 pub fn table_reference(&self) -> &TableReference {
73 self.remote_table_ref.table_ref()
74 }
75
76 pub fn schema(&self) -> &SchemaRef {
77 &self.schema
78 }
79}
80
81impl SQLTable for RemoteTable {
82 fn as_any(&self) -> &dyn Any {
83 self
84 }
85
86 fn table_reference(&self) -> TableReference {
87 Self::table_reference(self).clone()
88 }
89
90 fn schema(&self) -> SchemaRef {
91 Arc::clone(&self.schema)
92 }
93
94 fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
95 None
96 }
97
98 fn ast_analyzer(&self) -> Option<AstAnalyzer> {
100 if let Some(args) = self.remote_table_ref.args() {
101 Some(
102 ast_analyzer::TableArgReplace::default()
103 .with(self.remote_table_ref.table_ref().clone(), args.to_vec())
104 .into_analyzer(),
105 )
106 } else {
107 None
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
113pub struct SQLTableSource {
114 pub provider: Arc<SQLFederationProvider>,
115 pub table: Arc<dyn SQLTable>,
116}
117
118impl SQLTableSource {
119 pub async fn new(
121 provider: Arc<SQLFederationProvider>,
122 table_ref: RemoteTableRef,
123 ) -> Result<Self> {
124 let table_name = table_ref.to_quoted_string();
125 let schema = provider.executor.get_table_schema(&table_name).await?;
126 Ok(Self::new_with_schema(provider, table_ref, schema))
127 }
128
129 pub fn new_with_schema(
131 provider: Arc<SQLFederationProvider>,
132 table_ref: RemoteTableRef,
133 schema: SchemaRef,
134 ) -> Self {
135 Self {
136 provider,
137 table: Arc::new(RemoteTable::new(table_ref, schema)),
138 }
139 }
140
141 pub fn new_with_table(provider: Arc<SQLFederationProvider>, table: Arc<dyn SQLTable>) -> Self {
143 Self { provider, table }
144 }
145
146 pub fn table_reference(&self) -> TableReference {
148 self.table.table_reference()
149 }
150}
151
152impl TableSource for SQLTableSource {
153 fn as_any(&self) -> &dyn Any {
154 self
155 }
156
157 fn schema(&self) -> SchemaRef {
158 self.table.schema()
159 }
160
161 fn table_type(&self) -> TableType {
162 TableType::Temporary
163 }
164}
165
166impl FederatedTableSource for SQLTableSource {
167 fn federation_provider(&self) -> Arc<dyn FederationProvider> {
168 Arc::clone(&self.provider) as Arc<dyn FederationProvider>
169 }
170}