datafusion_federation/sql/
schema.rs1use std::{any::Any, sync::Arc};
2
3use async_trait::async_trait;
4use datafusion::{catalog::SchemaProvider, datasource::TableProvider, error::Result};
5use futures::future::join_all;
6
7use super::{table::SQLTable, RemoteTableRef, SQLTableSource};
8use crate::{sql::SQLFederationProvider, FederatedTableProviderAdaptor};
9
10#[derive(Debug)]
12pub struct SQLSchemaProvider {
13 tables: Vec<Arc<SQLTableSource>>,
14}
15
16impl SQLSchemaProvider {
17 pub async fn new(provider: Arc<SQLFederationProvider>) -> Result<Self> {
20 let tables = Arc::clone(&provider.executor)
21 .table_names()
22 .await?
23 .iter()
24 .map(RemoteTableRef::try_from)
25 .collect::<Result<Vec<_>>>()?;
26
27 Self::new_with_table_references(provider, tables).await
28 }
29
30 pub async fn new_with_tables<T: AsRef<str>>(
33 provider: Arc<SQLFederationProvider>,
34 tables: impl IntoIterator<Item = T>,
35 ) -> Result<Self> {
36 let tables = tables
37 .into_iter()
38 .map(|x| RemoteTableRef::try_from(x.as_ref()))
39 .collect::<Result<Vec<_>>>()?;
40
41 let futures: Vec<_> = tables
42 .into_iter()
43 .map(|t| SQLTableSource::new(Arc::clone(&provider), t))
44 .collect();
45 let results: Result<Vec<_>> = join_all(futures).await.into_iter().collect();
46 let tables = results?.into_iter().map(Arc::new).collect();
47 Ok(Self { tables })
48 }
49
50 pub fn new_with_custom_tables(
52 provider: Arc<SQLFederationProvider>,
53 tables: Vec<Arc<dyn SQLTable>>,
54 ) -> Self {
55 Self {
56 tables: tables
57 .into_iter()
58 .map(|table| SQLTableSource::new_with_table(provider.clone(), table))
59 .map(Arc::new)
60 .collect(),
61 }
62 }
63
64 pub async fn new_with_table_references(
65 provider: Arc<SQLFederationProvider>,
66 tables: Vec<RemoteTableRef>,
67 ) -> Result<Self> {
68 let futures: Vec<_> = tables
69 .into_iter()
70 .map(|t| SQLTableSource::new(Arc::clone(&provider), t))
71 .collect();
72 let results: Result<Vec<_>> = join_all(futures).await.into_iter().collect();
73 let tables = results?.into_iter().map(Arc::new).collect();
74 Ok(Self { tables })
75 }
76}
77
78#[async_trait]
79impl SchemaProvider for SQLSchemaProvider {
80 fn as_any(&self) -> &dyn Any {
81 self
82 }
83
84 fn table_names(&self) -> Vec<String> {
85 self.tables
86 .iter()
87 .map(|source| source.table_reference().to_string())
88 .collect()
89 }
90
91 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
92 if let Some(source) = self
93 .tables
94 .iter()
95 .find(|s| s.table_reference().to_string().eq(name))
96 {
97 let adaptor = FederatedTableProviderAdaptor::new(source.clone());
98 return Ok(Some(Arc::new(adaptor)));
99 }
100 Ok(None)
101 }
102
103 fn table_exist(&self, name: &str) -> bool {
104 self.tables
105 .iter()
106 .any(|source| source.table_reference().to_string().eq(name))
107 }
108}
109
110#[derive(Debug)]
111pub struct MultiSchemaProvider {
112 children: Vec<Arc<dyn SchemaProvider>>,
113}
114
115impl MultiSchemaProvider {
116 pub fn new(children: Vec<Arc<dyn SchemaProvider>>) -> Self {
117 Self { children }
118 }
119}
120
121#[async_trait]
122impl SchemaProvider for MultiSchemaProvider {
123 fn as_any(&self) -> &dyn Any {
124 self
125 }
126
127 fn table_names(&self) -> Vec<String> {
128 self.children.iter().flat_map(|p| p.table_names()).collect()
129 }
130
131 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
132 for child in &self.children {
133 if let Ok(Some(table)) = child.table(name).await {
134 return Ok(Some(table));
135 }
136 }
137 Ok(None)
138 }
139
140 fn table_exist(&self, name: &str) -> bool {
141 self.children.iter().any(|p| p.table_exist(name))
142 }
143}