datafusion_federation/sql/
schema.rs

1use std::{any::Any, sync::Arc};
2
3use async_trait::async_trait;
4use datafusion::{catalog::SchemaProvider, datasource::TableProvider, error::Result};
5use futures::future::join_all;
6
7use super::{table::SQLTable, RemoteTableRef, SQLTableSource};
8use crate::{sql::SQLFederationProvider, FederatedTableProviderAdaptor};
9
10/// An in-memory schema provider for SQL tables.
11#[derive(Debug)]
12pub struct SQLSchemaProvider {
13    tables: Vec<Arc<SQLTableSource>>,
14}
15
16impl SQLSchemaProvider {
17    /// Creates a new SQLSchemaProvider from a [`SQLFederationProvider`].
18    /// Initializes the schema provider by fetching table names and schema from the federation provider's executor,
19    pub async fn new(provider: Arc<SQLFederationProvider>) -> Result<Self> {
20        let tables = Arc::clone(&provider.executor)
21            .table_names()
22            .await?
23            .iter()
24            .map(RemoteTableRef::try_from)
25            .collect::<Result<Vec<_>>>()?;
26
27        Self::new_with_table_references(provider, tables).await
28    }
29
30    /// Creates a new SQLSchemaProvider from a SQLFederationProvider and a list of table references.
31    /// Fetches the schema for each table using the executor's implementation.
32    pub async fn new_with_tables<T: AsRef<str>>(
33        provider: Arc<SQLFederationProvider>,
34        tables: impl IntoIterator<Item = T>,
35    ) -> Result<Self> {
36        let tables = tables
37            .into_iter()
38            .map(|x| RemoteTableRef::try_from(x.as_ref()))
39            .collect::<Result<Vec<_>>>()?;
40
41        let futures: Vec<_> = tables
42            .into_iter()
43            .map(|t| SQLTableSource::new(Arc::clone(&provider), t))
44            .collect();
45        let results: Result<Vec<_>> = join_all(futures).await.into_iter().collect();
46        let tables = results?.into_iter().map(Arc::new).collect();
47        Ok(Self { tables })
48    }
49
50    /// Creates a new SQLSchemaProvider from a SQLFederationProvider and a list of custom table instances.
51    pub fn new_with_custom_tables(
52        provider: Arc<SQLFederationProvider>,
53        tables: Vec<Arc<dyn SQLTable>>,
54    ) -> Self {
55        Self {
56            tables: tables
57                .into_iter()
58                .map(|table| SQLTableSource::new_with_table(provider.clone(), table))
59                .map(Arc::new)
60                .collect(),
61        }
62    }
63
64    pub async fn new_with_table_references(
65        provider: Arc<SQLFederationProvider>,
66        tables: Vec<RemoteTableRef>,
67    ) -> Result<Self> {
68        let futures: Vec<_> = tables
69            .into_iter()
70            .map(|t| SQLTableSource::new(Arc::clone(&provider), t))
71            .collect();
72        let results: Result<Vec<_>> = join_all(futures).await.into_iter().collect();
73        let tables = results?.into_iter().map(Arc::new).collect();
74        Ok(Self { tables })
75    }
76}
77
78#[async_trait]
79impl SchemaProvider for SQLSchemaProvider {
80    fn as_any(&self) -> &dyn Any {
81        self
82    }
83
84    fn table_names(&self) -> Vec<String> {
85        self.tables
86            .iter()
87            .map(|source| source.table_reference().to_string())
88            .collect()
89    }
90
91    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
92        if let Some(source) = self
93            .tables
94            .iter()
95            .find(|s| s.table_reference().to_string().eq(name))
96        {
97            let adaptor = FederatedTableProviderAdaptor::new(source.clone());
98            return Ok(Some(Arc::new(adaptor)));
99        }
100        Ok(None)
101    }
102
103    fn table_exist(&self, name: &str) -> bool {
104        self.tables
105            .iter()
106            .any(|source| source.table_reference().to_string().eq(name))
107    }
108}
109
110#[derive(Debug)]
111pub struct MultiSchemaProvider {
112    children: Vec<Arc<dyn SchemaProvider>>,
113}
114
115impl MultiSchemaProvider {
116    pub fn new(children: Vec<Arc<dyn SchemaProvider>>) -> Self {
117        Self { children }
118    }
119}
120
121#[async_trait]
122impl SchemaProvider for MultiSchemaProvider {
123    fn as_any(&self) -> &dyn Any {
124        self
125    }
126
127    fn table_names(&self) -> Vec<String> {
128        self.children.iter().flat_map(|p| p.table_names()).collect()
129    }
130
131    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
132        for child in &self.children {
133            if let Ok(Some(table)) = child.table(name).await {
134                return Ok(Some(table));
135            }
136        }
137        Ok(None)
138    }
139
140    fn table_exist(&self, name: &str) -> bool {
141        self.children.iter().any(|p| p.table_exist(name))
142    }
143}