datafusion_catalog/
listing_schema.rs1use std::any::Any;
21use std::collections::HashSet;
22use std::path::Path;
23use std::sync::{Arc, Mutex};
24
25use crate::{SchemaProvider, TableProvider, TableProviderFactory};
26
27use crate::Session;
28use datafusion_common::{
29 Constraints, DFSchema, DataFusionError, HashMap, TableReference,
30};
31use datafusion_expr::CreateExternalTable;
32
33use async_trait::async_trait;
34use futures::TryStreamExt;
35use itertools::Itertools;
36use object_store::ObjectStore;
37
38#[derive(Debug)]
54pub struct ListingSchemaProvider {
55 authority: String,
56 path: object_store::path::Path,
57 factory: Arc<dyn TableProviderFactory>,
58 store: Arc<dyn ObjectStore>,
59 tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
60 format: String,
61}
62
63impl ListingSchemaProvider {
64 pub fn new(
74 authority: String,
75 path: object_store::path::Path,
76 factory: Arc<dyn TableProviderFactory>,
77 store: Arc<dyn ObjectStore>,
78 format: String,
79 ) -> Self {
80 Self {
81 authority,
82 path,
83 factory,
84 store,
85 tables: Arc::new(Mutex::new(HashMap::new())),
86 format,
87 }
88 }
89
90 pub async fn refresh(&self, state: &dyn Session) -> datafusion_common::Result<()> {
92 let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
93 let base = Path::new(self.path.as_ref());
94 let mut tables = HashSet::new();
95 for file in entries.iter() {
96 let mut is_dir = false;
98 let mut parent = Path::new(file.location.as_ref());
99 while let Some(p) = parent.parent() {
100 if p == base {
101 tables.insert(TablePath {
102 is_dir,
103 path: parent,
104 });
105 }
106 parent = p;
107 is_dir = true;
108 }
109 }
110 for table in tables.iter() {
111 let file_name = table
112 .path
113 .file_name()
114 .ok_or_else(|| {
115 DataFusionError::Internal("Cannot parse file name!".to_string())
116 })?
117 .to_str()
118 .ok_or_else(|| {
119 DataFusionError::Internal("Cannot parse file name!".to_string())
120 })?;
121 let table_name = file_name.split('.').collect_vec()[0];
122 let table_path = table.to_string().ok_or_else(|| {
123 DataFusionError::Internal("Cannot parse file name!".to_string())
124 })?;
125
126 if !self.table_exist(table_name) {
127 let table_url = format!("{}/{}", self.authority, table_path);
128
129 let name = TableReference::bare(table_name);
130 let provider = self
131 .factory
132 .create(
133 state,
134 &CreateExternalTable {
135 schema: Arc::new(DFSchema::empty()),
136 name,
137 location: table_url,
138 file_type: self.format.clone(),
139 table_partition_cols: vec![],
140 if_not_exists: false,
141 temporary: false,
142 definition: None,
143 order_exprs: vec![],
144 unbounded: false,
145 options: Default::default(),
146 constraints: Constraints::empty(),
147 column_defaults: Default::default(),
148 },
149 )
150 .await?;
151 let _ =
152 self.register_table(table_name.to_string(), Arc::clone(&provider))?;
153 }
154 }
155 Ok(())
156 }
157}
158
159#[async_trait]
160impl SchemaProvider for ListingSchemaProvider {
161 fn as_any(&self) -> &dyn Any {
162 self
163 }
164
165 fn table_names(&self) -> Vec<String> {
166 self.tables
167 .lock()
168 .expect("Can't lock tables")
169 .keys()
170 .map(|it| it.to_string())
171 .collect()
172 }
173
174 async fn table(
175 &self,
176 name: &str,
177 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
178 Ok(self
179 .tables
180 .lock()
181 .expect("Can't lock tables")
182 .get(name)
183 .cloned())
184 }
185
186 fn register_table(
187 &self,
188 name: String,
189 table: Arc<dyn TableProvider>,
190 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
191 self.tables
192 .lock()
193 .expect("Can't lock tables")
194 .insert(name, Arc::clone(&table));
195 Ok(Some(table))
196 }
197
198 fn deregister_table(
199 &self,
200 name: &str,
201 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
202 Ok(self.tables.lock().expect("Can't lock tables").remove(name))
203 }
204
205 fn table_exist(&self, name: &str) -> bool {
206 self.tables
207 .lock()
208 .expect("Can't lock tables")
209 .contains_key(name)
210 }
211}
212
213#[derive(Eq, PartialEq, Hash, Debug)]
216struct TablePath<'a> {
217 path: &'a Path,
218 is_dir: bool,
219}
220
221impl TablePath<'_> {
222 fn to_string(&self) -> Option<String> {
225 self.path.to_str().map(|path| {
226 if self.is_dir {
227 format!("{path}/")
228 } else {
229 path.to_string()
230 }
231 })
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238
239 #[test]
240 fn table_path_ends_with_slash_when_is_dir() {
241 let table_path = TablePath {
242 path: Path::new("/file"),
243 is_dir: true,
244 };
245 assert!(table_path.to_string().expect("table path").ends_with('/'));
246 }
247
248 #[test]
249 fn dir_table_path_str_does_not_end_with_slash_when_not_is_dir() {
250 let table_path = TablePath {
251 path: Path::new("/file"),
252 is_dir: false,
253 };
254 assert!(!table_path.to_string().expect("table_path").ends_with('/'));
255 }
256}