Skip to main content

datafusion_federation/sql/
table.rs

1use crate::sql::SQLFederationProvider;
2use crate::FederatedTableSource;
3use crate::FederationProvider;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::error::Result;
6use datafusion::logical_expr::TableSource;
7use datafusion::logical_expr::TableType;
8use datafusion::sql::TableReference;
9use std::any::Any;
10use std::sync::Arc;
11
12use super::ast_analyzer;
13use super::executor::LogicalOptimizer;
14use super::executor::SqlQueryRewriter;
15use super::AstAnalyzer;
16use super::RemoteTableRef;
17
18/// Trait to represent a SQL remote table inside [`SQLTableSource`].
19/// A remote table provides information such as schema, table reference, and
20/// provides hooks for rewriting the logical plan and AST before execution.
21/// This crate provides [`RemoteTable`] as a default ready-to-use type.
22pub trait SQLTable: std::fmt::Debug + Send + Sync {
23    /// Returns a reference as a trait object.
24    fn as_any(&self) -> &dyn Any;
25    /// Provides the [`TableReference`](`datafusion::sql::TableReference`) used to identify the table in SQL queries.
26    /// This TableReference is used for registering the table with the [`SQLSchemaProvider`](`super::SQLSchemaProvider`).
27    /// If the table provider is registered in the Datafusion context under a different name,
28    /// the logical plan will be rewritten to use this table reference during execution.
29    /// Therefore, any AST analyzer should match against this table reference.
30    fn table_reference(&self) -> TableReference;
31    /// Schema of the remote table
32    fn schema(&self) -> SchemaRef;
33    /// Returns a logical optimizer specific to this table, will be used to modify the logical plan before execution
34    fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
35        None
36    }
37    /// Returns an AST analyzer specific to this table, will be used to modify the AST before execution
38    fn ast_analyzer(&self) -> Option<AstAnalyzer> {
39        None
40    }
41    /// Returns a SQL query rewriter specific to this table.
42    /// This hook is applied after AST rewrites and can directly alter the final SQL string.
43    fn sql_query_rewriter(&self) -> Option<SqlQueryRewriter> {
44        None
45    }
46}
47
48/// Represents a remote table with a reference and schema.
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50pub struct RemoteTable {
51    remote_table_ref: RemoteTableRef,
52    schema: SchemaRef,
53}
54
55impl RemoteTable {
56    /// Creates a new `RemoteTable` instance.
57    ///
58    /// Examples:
59    /// ```ignore
60    /// use datafusion::sql::TableReference;
61    ///
62    /// RemoteTable::new("myschema.table".try_into()?, schema);
63    /// RemoteTable::new(r#"myschema."Table""#.try_into()?, schema);
64    /// RemoteTable::new(TableReference::partial("myschema", "table").into(), schema);
65    /// RemoteTable::new("myschema.view('obj')".try_into()?, schema);
66    /// RemoteTable::new("myschema.view(name => 'obj')".try_into()?, schema);
67    /// RemoteTable::new("myschema.view(name = 'obj')".try_into()?, schema);
68    /// ```
69    pub fn new(table_ref: RemoteTableRef, schema: SchemaRef) -> Self {
70        Self {
71            remote_table_ref: table_ref,
72            schema,
73        }
74    }
75
76    /// Return table reference of this remote table.
77    /// Only returns the object name, ignoring functional params if any
78    pub fn table_reference(&self) -> &TableReference {
79        self.remote_table_ref.table_ref()
80    }
81
82    pub fn schema(&self) -> &SchemaRef {
83        &self.schema
84    }
85}
86
87impl SQLTable for RemoteTable {
88    fn as_any(&self) -> &dyn Any {
89        self
90    }
91
92    fn table_reference(&self) -> TableReference {
93        Self::table_reference(self).clone()
94    }
95
96    fn schema(&self) -> SchemaRef {
97        Arc::clone(&self.schema)
98    }
99
100    fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
101        None
102    }
103
104    /// Returns ast analyzer that modifies table that contains functional args after table ident
105    fn ast_analyzer(&self) -> Option<AstAnalyzer> {
106        if let Some(args) = self.remote_table_ref.args() {
107            Some(
108                ast_analyzer::TableArgReplace::default()
109                    .with(self.remote_table_ref.table_ref().clone(), args.to_vec())
110                    .into_analyzer(),
111            )
112        } else {
113            None
114        }
115    }
116}
117
118#[derive(Debug, Clone)]
119pub struct SQLTableSource {
120    pub provider: Arc<SQLFederationProvider>,
121    pub table: Arc<dyn SQLTable>,
122}
123
124impl SQLTableSource {
125    // creates a SQLTableSource and infers the table schema
126    pub async fn new(
127        provider: Arc<SQLFederationProvider>,
128        table_ref: RemoteTableRef,
129    ) -> Result<Self> {
130        let table_name = table_ref.to_quoted_string();
131        let schema = provider.executor.get_table_schema(&table_name).await?;
132        Ok(Self::new_with_schema(provider, table_ref, schema))
133    }
134
135    /// Create a SQLTableSource with a table reference and schema
136    pub fn new_with_schema(
137        provider: Arc<SQLFederationProvider>,
138        table_ref: RemoteTableRef,
139        schema: SchemaRef,
140    ) -> Self {
141        Self {
142            provider,
143            table: Arc::new(RemoteTable::new(table_ref, schema)),
144        }
145    }
146
147    /// Create new with a custom SQLtable instance.
148    pub fn new_with_table(provider: Arc<SQLFederationProvider>, table: Arc<dyn SQLTable>) -> Self {
149        Self { provider, table }
150    }
151
152    /// Return associated table reference of stored remote table
153    pub fn table_reference(&self) -> TableReference {
154        self.table.table_reference()
155    }
156}
157
158impl TableSource for SQLTableSource {
159    fn as_any(&self) -> &dyn Any {
160        self
161    }
162
163    fn schema(&self) -> SchemaRef {
164        self.table.schema()
165    }
166
167    fn table_type(&self) -> TableType {
168        TableType::Temporary
169    }
170}
171
172impl FederatedTableSource for SQLTableSource {
173    fn federation_provider(&self) -> Arc<dyn FederationProvider> {
174        Arc::clone(&self.provider) as Arc<dyn FederationProvider>
175    }
176}