datafusion_catalog/
listing_schema.rs1use std::any::Any;
21use std::collections::HashSet;
22use std::path::Path;
23use std::sync::{Arc, Mutex};
24
25use crate::{SchemaProvider, TableProvider, TableProviderFactory};
26
27use crate::Session;
28use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference};
29use datafusion_expr::CreateExternalTable;
30
31use async_trait::async_trait;
32use futures::TryStreamExt;
33use itertools::Itertools;
34use object_store::ObjectStore;
35
36#[derive(Debug)]
52pub struct ListingSchemaProvider {
53 authority: String,
54 path: object_store::path::Path,
55 factory: Arc<dyn TableProviderFactory>,
56 store: Arc<dyn ObjectStore>,
57 tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
58 format: String,
59}
60
61impl ListingSchemaProvider {
62 pub fn new(
72 authority: String,
73 path: object_store::path::Path,
74 factory: Arc<dyn TableProviderFactory>,
75 store: Arc<dyn ObjectStore>,
76 format: String,
77 ) -> Self {
78 Self {
79 authority,
80 path,
81 factory,
82 store,
83 tables: Arc::new(Mutex::new(HashMap::new())),
84 format,
85 }
86 }
87
88 pub async fn refresh(&self, state: &dyn Session) -> datafusion_common::Result<()> {
90 let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
91 let base = Path::new(self.path.as_ref());
92 let mut tables = HashSet::new();
93 for file in entries.iter() {
94 let mut is_dir = false;
96 let mut parent = Path::new(file.location.as_ref());
97 while let Some(p) = parent.parent() {
98 if p == base {
99 tables.insert(TablePath {
100 is_dir,
101 path: parent,
102 });
103 }
104 parent = p;
105 is_dir = true;
106 }
107 }
108 for table in tables.iter() {
109 let file_name = table
110 .path
111 .file_name()
112 .ok_or_else(|| {
113 DataFusionError::Internal("Cannot parse file name!".to_string())
114 })?
115 .to_str()
116 .ok_or_else(|| {
117 DataFusionError::Internal("Cannot parse file name!".to_string())
118 })?;
119 let table_name = file_name.split('.').collect_vec()[0];
120 let table_path = table.to_string().ok_or_else(|| {
121 DataFusionError::Internal("Cannot parse file name!".to_string())
122 })?;
123
124 if !self.table_exist(table_name) {
125 let table_url = format!("{}/{}", self.authority, table_path);
126
127 let name = TableReference::bare(table_name);
128 let provider = self
129 .factory
130 .create(
131 state,
132 &CreateExternalTable {
133 schema: Arc::new(DFSchema::empty()),
134 name,
135 location: table_url,
136 file_type: self.format.clone(),
137 table_partition_cols: vec![],
138 if_not_exists: false,
139 temporary: false,
140 definition: None,
141 order_exprs: vec![],
142 unbounded: false,
143 options: Default::default(),
144 constraints: Default::default(),
145 column_defaults: Default::default(),
146 },
147 )
148 .await?;
149 let _ =
150 self.register_table(table_name.to_string(), Arc::clone(&provider))?;
151 }
152 }
153 Ok(())
154 }
155}
156
157#[async_trait]
158impl SchemaProvider for ListingSchemaProvider {
159 fn as_any(&self) -> &dyn Any {
160 self
161 }
162
163 fn table_names(&self) -> Vec<String> {
164 self.tables
165 .lock()
166 .expect("Can't lock tables")
167 .keys()
168 .map(|it| it.to_string())
169 .collect()
170 }
171
172 async fn table(
173 &self,
174 name: &str,
175 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
176 Ok(self
177 .tables
178 .lock()
179 .expect("Can't lock tables")
180 .get(name)
181 .cloned())
182 }
183
184 fn register_table(
185 &self,
186 name: String,
187 table: Arc<dyn TableProvider>,
188 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
189 self.tables
190 .lock()
191 .expect("Can't lock tables")
192 .insert(name, Arc::clone(&table));
193 Ok(Some(table))
194 }
195
196 fn deregister_table(
197 &self,
198 name: &str,
199 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
200 Ok(self.tables.lock().expect("Can't lock tables").remove(name))
201 }
202
203 fn table_exist(&self, name: &str) -> bool {
204 self.tables
205 .lock()
206 .expect("Can't lock tables")
207 .contains_key(name)
208 }
209}
210
211#[derive(Eq, PartialEq, Hash, Debug)]
214struct TablePath<'a> {
215 path: &'a Path,
216 is_dir: bool,
217}
218
219impl TablePath<'_> {
220 fn to_string(&self) -> Option<String> {
223 self.path.to_str().map(|path| {
224 if self.is_dir {
225 format!("{path}/")
226 } else {
227 path.to_string()
228 }
229 })
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn table_path_ends_with_slash_when_is_dir() {
239 let table_path = TablePath {
240 path: Path::new("/file"),
241 is_dir: true,
242 };
243 assert!(table_path.to_string().expect("table path").ends_with('/'));
244 }
245
246 #[test]
247 fn dir_table_path_str_does_not_end_with_slash_when_not_is_dir() {
248 let table_path = TablePath {
249 path: Path::new("/file"),
250 is_dir: false,
251 };
252 assert!(!table_path.to_string().expect("table_path").ends_with('/'));
253 }
254}