datafusion_catalog/
listing_schema.rs1use std::collections::HashSet;
21use std::path::Path;
22use std::sync::{Arc, Mutex};
23
24use crate::{SchemaProvider, TableProvider, TableProviderFactory};
25
26use crate::Session;
27use datafusion_common::{
28 DFSchema, DataFusionError, HashMap, TableReference, internal_datafusion_err,
29};
30use datafusion_expr::CreateExternalTable;
31
32use async_trait::async_trait;
33use futures::TryStreamExt;
34use itertools::Itertools;
35use object_store::ObjectStore;
36
37#[derive(Debug)]
53pub struct ListingSchemaProvider {
54 authority: String,
55 path: object_store::path::Path,
56 factory: Arc<dyn TableProviderFactory>,
57 store: Arc<dyn ObjectStore>,
58 tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
59 format: String,
60}
61
62impl ListingSchemaProvider {
63 pub fn new(
73 authority: String,
74 path: object_store::path::Path,
75 factory: Arc<dyn TableProviderFactory>,
76 store: Arc<dyn ObjectStore>,
77 format: String,
78 ) -> Self {
79 Self {
80 authority,
81 path,
82 factory,
83 store,
84 tables: Arc::new(Mutex::new(HashMap::new())),
85 format,
86 }
87 }
88
89 pub async fn refresh(&self, state: &dyn Session) -> datafusion_common::Result<()> {
91 let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
92 let base = Path::new(self.path.as_ref());
93 let mut tables = HashSet::new();
94 for file in entries.iter() {
95 let mut is_dir = false;
97 let mut parent = Path::new(file.location.as_ref());
98 while let Some(p) = parent.parent() {
99 if p == base {
100 tables.insert(TablePath {
101 is_dir,
102 path: parent,
103 });
104 }
105 parent = p;
106 is_dir = true;
107 }
108 }
109 for table in tables.iter() {
110 let file_name = table
111 .path
112 .file_name()
113 .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?
114 .to_str()
115 .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
116 let table_name = file_name.split('.').collect_vec()[0];
117 let table_path = table
118 .to_string()
119 .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
120
121 if !self.table_exist(table_name) {
122 let table_url = format!("{}/{}", self.authority, table_path);
123
124 let name = TableReference::bare(table_name);
125 let provider = self
126 .factory
127 .create(
128 state,
129 &CreateExternalTable::builder(
130 name,
131 table_url,
132 self.format.clone(),
133 Arc::new(DFSchema::empty()),
134 )
135 .build(),
136 )
137 .await?;
138 let _ =
139 self.register_table(table_name.to_string(), Arc::clone(&provider))?;
140 }
141 }
142 Ok(())
143 }
144}
145
146#[async_trait]
147impl SchemaProvider for ListingSchemaProvider {
148 fn table_names(&self) -> Vec<String> {
149 self.tables
150 .lock()
151 .expect("Can't lock tables")
152 .keys()
153 .map(|it| it.to_string())
154 .collect()
155 }
156
157 async fn table(
158 &self,
159 name: &str,
160 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
161 Ok(self
162 .tables
163 .lock()
164 .expect("Can't lock tables")
165 .get(name)
166 .cloned())
167 }
168
169 fn register_table(
170 &self,
171 name: String,
172 table: Arc<dyn TableProvider>,
173 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
174 self.tables
175 .lock()
176 .expect("Can't lock tables")
177 .insert(name, Arc::clone(&table));
178 Ok(Some(table))
179 }
180
181 fn deregister_table(
182 &self,
183 name: &str,
184 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
185 Ok(self.tables.lock().expect("Can't lock tables").remove(name))
186 }
187
188 fn table_exist(&self, name: &str) -> bool {
189 self.tables
190 .lock()
191 .expect("Can't lock tables")
192 .contains_key(name)
193 }
194}
195
196#[derive(Eq, PartialEq, Hash, Debug)]
199struct TablePath<'a> {
200 path: &'a Path,
201 is_dir: bool,
202}
203
204impl TablePath<'_> {
205 fn to_string(&self) -> Option<String> {
208 self.path.to_str().map(|path| {
209 if self.is_dir {
210 format!("{path}/")
211 } else {
212 path.to_string()
213 }
214 })
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221
222 #[test]
223 fn table_path_ends_with_slash_when_is_dir() {
224 let table_path = TablePath {
225 path: Path::new("/file"),
226 is_dir: true,
227 };
228 assert!(table_path.to_string().expect("table path").ends_with('/'));
229 }
230
231 #[test]
232 fn dir_table_path_str_does_not_end_with_slash_when_not_is_dir() {
233 let table_path = TablePath {
234 path: Path::new("/file"),
235 is_dir: false,
236 };
237 assert!(!table_path.to_string().expect("table_path").ends_with('/'));
238 }
239}