datafusion_federation/sql/
table.rs1use crate::sql::SQLFederationProvider;
2use crate::FederatedTableSource;
3use crate::FederationProvider;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::error::Result;
6use datafusion::logical_expr::TableSource;
7use datafusion::logical_expr::TableType;
8use datafusion::sql::TableReference;
9use std::any::Any;
10use std::sync::Arc;
11
12use super::ast_analyzer;
13use super::executor::LogicalOptimizer;
14use super::executor::SqlQueryRewriter;
15use super::AstAnalyzer;
16use super::RemoteTableRef;
17
18pub trait SQLTable: std::fmt::Debug + Send + Sync {
23 fn as_any(&self) -> &dyn Any;
25 fn table_reference(&self) -> TableReference;
31 fn schema(&self) -> SchemaRef;
33 fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
35 None
36 }
37 fn ast_analyzer(&self) -> Option<AstAnalyzer> {
39 None
40 }
41 fn sql_query_rewriter(&self) -> Option<SqlQueryRewriter> {
44 None
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50pub struct RemoteTable {
51 remote_table_ref: RemoteTableRef,
52 schema: SchemaRef,
53}
54
55impl RemoteTable {
56 pub fn new(table_ref: RemoteTableRef, schema: SchemaRef) -> Self {
70 Self {
71 remote_table_ref: table_ref,
72 schema,
73 }
74 }
75
76 pub fn table_reference(&self) -> &TableReference {
79 self.remote_table_ref.table_ref()
80 }
81
82 pub fn schema(&self) -> &SchemaRef {
83 &self.schema
84 }
85}
86
87impl SQLTable for RemoteTable {
88 fn as_any(&self) -> &dyn Any {
89 self
90 }
91
92 fn table_reference(&self) -> TableReference {
93 Self::table_reference(self).clone()
94 }
95
96 fn schema(&self) -> SchemaRef {
97 Arc::clone(&self.schema)
98 }
99
100 fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
101 None
102 }
103
104 fn ast_analyzer(&self) -> Option<AstAnalyzer> {
106 if let Some(args) = self.remote_table_ref.args() {
107 Some(
108 ast_analyzer::TableArgReplace::default()
109 .with(self.remote_table_ref.table_ref().clone(), args.to_vec())
110 .into_analyzer(),
111 )
112 } else {
113 None
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
119pub struct SQLTableSource {
120 pub provider: Arc<SQLFederationProvider>,
121 pub table: Arc<dyn SQLTable>,
122}
123
124impl SQLTableSource {
125 pub async fn new(
127 provider: Arc<SQLFederationProvider>,
128 table_ref: RemoteTableRef,
129 ) -> Result<Self> {
130 let table_name = table_ref.to_quoted_string();
131 let schema = provider.executor.get_table_schema(&table_name).await?;
132 Ok(Self::new_with_schema(provider, table_ref, schema))
133 }
134
135 pub fn new_with_schema(
137 provider: Arc<SQLFederationProvider>,
138 table_ref: RemoteTableRef,
139 schema: SchemaRef,
140 ) -> Self {
141 Self {
142 provider,
143 table: Arc::new(RemoteTable::new(table_ref, schema)),
144 }
145 }
146
147 pub fn new_with_table(provider: Arc<SQLFederationProvider>, table: Arc<dyn SQLTable>) -> Self {
149 Self { provider, table }
150 }
151
152 pub fn table_reference(&self) -> TableReference {
154 self.table.table_reference()
155 }
156}
157
158impl TableSource for SQLTableSource {
159 fn as_any(&self) -> &dyn Any {
160 self
161 }
162
163 fn schema(&self) -> SchemaRef {
164 self.table.schema()
165 }
166
167 fn table_type(&self) -> TableType {
168 TableType::Temporary
169 }
170}
171
172impl FederatedTableSource for SQLTableSource {
173 fn federation_provider(&self) -> Arc<dyn FederationProvider> {
174 Arc::clone(&self.provider) as Arc<dyn FederationProvider>
175 }
176}