datafusion_federation/sql/
table.rs

1use crate::sql::SQLFederationProvider;
2use crate::FederatedTableSource;
3use crate::FederationProvider;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::error::Result;
6use datafusion::logical_expr::TableSource;
7use datafusion::logical_expr::TableType;
8use datafusion::sql::TableReference;
9use std::any::Any;
10use std::sync::Arc;
11
12use super::ast_analyzer;
13use super::executor::LogicalOptimizer;
14use super::AstAnalyzer;
15use super::RemoteTableRef;
16
17/// Trait to represent a SQL remote table inside [`SQLTableSource`].
18/// A remote table provides information such as schema, table reference, and
19/// provides hooks for rewriting the logical plan and AST before execution.
20/// This crate provides [`RemoteTable`] as a default ready-to-use type.
21pub trait SQLTable: std::fmt::Debug + Send + Sync {
22    /// Returns a reference as a trait object.
23    fn as_any(&self) -> &dyn Any;
24    /// Provides the [`TableReference`](`datafusion::sql::TableReference`) used to identify the table in SQL queries.
25    /// This TableReference is used for registering the table with the [`SQLSchemaProvider`](`super::SQLSchemaProvider`).
26    /// If the table provider is registered in the Datafusion context under a different name,
27    /// the logical plan will be rewritten to use this table reference during execution.
28    /// Therefore, any AST analyzer should match against this table reference.
29    fn table_reference(&self) -> TableReference;
30    /// Schema of the remote table
31    fn schema(&self) -> SchemaRef;
32    /// Returns a logical optimizer specific to this table, will be used to modify the logical plan before execution
33    fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
34        None
35    }
36    /// Returns an AST analyzer specific to this table, will be used to modify the AST before execution
37    fn ast_analyzer(&self) -> Option<AstAnalyzer> {
38        None
39    }
40}
41
42/// Represents a remote table with a reference and schema.
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct RemoteTable {
45    remote_table_ref: RemoteTableRef,
46    schema: SchemaRef,
47}
48
49impl RemoteTable {
50    /// Creates a new `RemoteTable` instance.
51    ///
52    /// Examples:
53    /// ```ignore
54    /// use datafusion::sql::TableReference;
55    ///
56    /// RemoteTable::new("myschema.table".try_into()?, schema);
57    /// RemoteTable::new(r#"myschema."Table""#.try_into()?, schema);
58    /// RemoteTable::new(TableReference::partial("myschema", "table").into(), schema);
59    /// RemoteTable::new("myschema.view('obj')".try_into()?, schema);
60    /// RemoteTable::new("myschema.view(name => 'obj')".try_into()?, schema);
61    /// RemoteTable::new("myschema.view(name = 'obj')".try_into()?, schema);
62    /// ```
63    pub fn new(table_ref: RemoteTableRef, schema: SchemaRef) -> Self {
64        Self {
65            remote_table_ref: table_ref,
66            schema,
67        }
68    }
69
70    /// Return table reference of this remote table.
71    /// Only returns the object name, ignoring functional params if any
72    pub fn table_reference(&self) -> &TableReference {
73        self.remote_table_ref.table_ref()
74    }
75
76    pub fn schema(&self) -> &SchemaRef {
77        &self.schema
78    }
79}
80
81impl SQLTable for RemoteTable {
82    fn as_any(&self) -> &dyn Any {
83        self
84    }
85
86    fn table_reference(&self) -> TableReference {
87        Self::table_reference(self).clone()
88    }
89
90    fn schema(&self) -> SchemaRef {
91        Arc::clone(&self.schema)
92    }
93
94    fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
95        None
96    }
97
98    /// Returns ast analyzer that modifies table that contains functional args after table ident
99    fn ast_analyzer(&self) -> Option<AstAnalyzer> {
100        if let Some(args) = self.remote_table_ref.args() {
101            Some(
102                ast_analyzer::TableArgReplace::default()
103                    .with(self.remote_table_ref.table_ref().clone(), args.to_vec())
104                    .into_analyzer(),
105            )
106        } else {
107            None
108        }
109    }
110}
111
112#[derive(Debug, Clone)]
113pub struct SQLTableSource {
114    pub provider: Arc<SQLFederationProvider>,
115    pub table: Arc<dyn SQLTable>,
116}
117
118impl SQLTableSource {
119    // creates a SQLTableSource and infers the table schema
120    pub async fn new(
121        provider: Arc<SQLFederationProvider>,
122        table_ref: RemoteTableRef,
123    ) -> Result<Self> {
124        let table_name = table_ref.to_quoted_string();
125        let schema = provider.executor.get_table_schema(&table_name).await?;
126        Ok(Self::new_with_schema(provider, table_ref, schema))
127    }
128
129    /// Create a SQLTableSource with a table reference and schema
130    pub fn new_with_schema(
131        provider: Arc<SQLFederationProvider>,
132        table_ref: RemoteTableRef,
133        schema: SchemaRef,
134    ) -> Self {
135        Self {
136            provider,
137            table: Arc::new(RemoteTable::new(table_ref, schema)),
138        }
139    }
140
141    /// Create new with a custom SQLtable instance.
142    pub fn new_with_table(provider: Arc<SQLFederationProvider>, table: Arc<dyn SQLTable>) -> Self {
143        Self { provider, table }
144    }
145
146    /// Return associated table reference of stored remote table
147    pub fn table_reference(&self) -> TableReference {
148        self.table.table_reference()
149    }
150}
151
152impl TableSource for SQLTableSource {
153    fn as_any(&self) -> &dyn Any {
154        self
155    }
156
157    fn schema(&self) -> SchemaRef {
158        self.table.schema()
159    }
160
161    fn table_type(&self) -> TableType {
162        TableType::Temporary
163    }
164}
165
166impl FederatedTableSource for SQLTableSource {
167    fn federation_provider(&self) -> Arc<dyn FederationProvider> {
168        Arc::clone(&self.provider) as Arc<dyn FederationProvider>
169    }
170}