datafusion_catalog/
listing_schema.rs1use std::any::Any;
21use std::collections::HashSet;
22use std::path::Path;
23use std::sync::{Arc, Mutex};
24
25use crate::{SchemaProvider, TableProvider, TableProviderFactory};
26
27use crate::Session;
28use datafusion_common::{
29 DFSchema, DataFusionError, HashMap, TableReference, internal_datafusion_err,
30};
31use datafusion_expr::CreateExternalTable;
32
33use async_trait::async_trait;
34use futures::TryStreamExt;
35use itertools::Itertools;
36use object_store::ObjectStore;
37
38#[derive(Debug)]
54pub struct ListingSchemaProvider {
55 authority: String,
56 path: object_store::path::Path,
57 factory: Arc<dyn TableProviderFactory>,
58 store: Arc<dyn ObjectStore>,
59 tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
60 format: String,
61}
62
63impl ListingSchemaProvider {
64 pub fn new(
74 authority: String,
75 path: object_store::path::Path,
76 factory: Arc<dyn TableProviderFactory>,
77 store: Arc<dyn ObjectStore>,
78 format: String,
79 ) -> Self {
80 Self {
81 authority,
82 path,
83 factory,
84 store,
85 tables: Arc::new(Mutex::new(HashMap::new())),
86 format,
87 }
88 }
89
90 pub async fn refresh(&self, state: &dyn Session) -> datafusion_common::Result<()> {
92 let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
93 let base = Path::new(self.path.as_ref());
94 let mut tables = HashSet::new();
95 for file in entries.iter() {
96 let mut is_dir = false;
98 let mut parent = Path::new(file.location.as_ref());
99 while let Some(p) = parent.parent() {
100 if p == base {
101 tables.insert(TablePath {
102 is_dir,
103 path: parent,
104 });
105 }
106 parent = p;
107 is_dir = true;
108 }
109 }
110 for table in tables.iter() {
111 let file_name = table
112 .path
113 .file_name()
114 .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?
115 .to_str()
116 .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
117 let table_name = file_name.split('.').collect_vec()[0];
118 let table_path = table
119 .to_string()
120 .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
121
122 if !self.table_exist(table_name) {
123 let table_url = format!("{}/{}", self.authority, table_path);
124
125 let name = TableReference::bare(table_name);
126 let provider = self
127 .factory
128 .create(
129 state,
130 &CreateExternalTable::builder(
131 name,
132 table_url,
133 self.format.clone(),
134 Arc::new(DFSchema::empty()),
135 )
136 .build(),
137 )
138 .await?;
139 let _ =
140 self.register_table(table_name.to_string(), Arc::clone(&provider))?;
141 }
142 }
143 Ok(())
144 }
145}
146
147#[async_trait]
148impl SchemaProvider for ListingSchemaProvider {
149 fn as_any(&self) -> &dyn Any {
150 self
151 }
152
153 fn table_names(&self) -> Vec<String> {
154 self.tables
155 .lock()
156 .expect("Can't lock tables")
157 .keys()
158 .map(|it| it.to_string())
159 .collect()
160 }
161
162 async fn table(
163 &self,
164 name: &str,
165 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
166 Ok(self
167 .tables
168 .lock()
169 .expect("Can't lock tables")
170 .get(name)
171 .cloned())
172 }
173
174 fn register_table(
175 &self,
176 name: String,
177 table: Arc<dyn TableProvider>,
178 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
179 self.tables
180 .lock()
181 .expect("Can't lock tables")
182 .insert(name, Arc::clone(&table));
183 Ok(Some(table))
184 }
185
186 fn deregister_table(
187 &self,
188 name: &str,
189 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
190 Ok(self.tables.lock().expect("Can't lock tables").remove(name))
191 }
192
193 fn table_exist(&self, name: &str) -> bool {
194 self.tables
195 .lock()
196 .expect("Can't lock tables")
197 .contains_key(name)
198 }
199}
200
201#[derive(Eq, PartialEq, Hash, Debug)]
204struct TablePath<'a> {
205 path: &'a Path,
206 is_dir: bool,
207}
208
209impl TablePath<'_> {
210 fn to_string(&self) -> Option<String> {
213 self.path.to_str().map(|path| {
214 if self.is_dir {
215 format!("{path}/")
216 } else {
217 path.to_string()
218 }
219 })
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 #[test]
228 fn table_path_ends_with_slash_when_is_dir() {
229 let table_path = TablePath {
230 path: Path::new("/file"),
231 is_dir: true,
232 };
233 assert!(table_path.to_string().expect("table path").ends_with('/'));
234 }
235
236 #[test]
237 fn dir_table_path_str_does_not_end_with_slash_when_not_is_dir() {
238 let table_path = TablePath {
239 path: Path::new("/file"),
240 is_dir: false,
241 };
242 assert!(!table_path.to_string().expect("table_path").ends_with('/'));
243 }
244}