datafusion_catalog/
listing_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ListingSchemaProvider`]: [`SchemaProvider`] that scans ObjectStores for tables automatically
19
20use std::any::Any;
21use std::collections::HashSet;
22use std::path::Path;
23use std::sync::{Arc, Mutex};
24
25use crate::{SchemaProvider, TableProvider, TableProviderFactory};
26
27use crate::Session;
28use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference};
29use datafusion_expr::CreateExternalTable;
30
31use async_trait::async_trait;
32use futures::TryStreamExt;
33use itertools::Itertools;
34use object_store::ObjectStore;
35
36/// A [`SchemaProvider`] that scans an [`ObjectStore`] to automatically discover tables
37///
38/// A subfolder relationship is assumed, i.e. given:
39/// - authority = `s3://host.example.com:3000`
40/// - path = `/data/tpch`
41/// - factory = `DeltaTableFactory`
42///
43/// A table called "customer" will be registered for the folder:
44/// `s3://host.example.com:3000/data/tpch/customer`
45///
46/// assuming it contains valid deltalake data, i.e:
47/// - `s3://host.example.com:3000/data/tpch/customer/part-00000-xxxx.snappy.parquet`
48/// - `s3://host.example.com:3000/data/tpch/customer/_delta_log/`
49///
50/// [`ObjectStore`]: object_store::ObjectStore
51#[derive(Debug)]
52pub struct ListingSchemaProvider {
53    authority: String,
54    path: object_store::path::Path,
55    factory: Arc<dyn TableProviderFactory>,
56    store: Arc<dyn ObjectStore>,
57    tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
58    format: String,
59}
60
61impl ListingSchemaProvider {
62    /// Create a new `ListingSchemaProvider`
63    ///
64    /// Arguments:
65    /// `authority`: The scheme (i.e. s3://) + host (i.e. example.com:3000)
66    /// `path`: The root path that contains subfolders which represent tables
67    /// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
68    /// `store`: The `ObjectStore` containing the table data
69    /// `format`: The `FileFormat` of the tables
70    /// `has_header`: Indicates whether the created external table has the has_header flag enabled
71    pub fn new(
72        authority: String,
73        path: object_store::path::Path,
74        factory: Arc<dyn TableProviderFactory>,
75        store: Arc<dyn ObjectStore>,
76        format: String,
77    ) -> Self {
78        Self {
79            authority,
80            path,
81            factory,
82            store,
83            tables: Arc::new(Mutex::new(HashMap::new())),
84            format,
85        }
86    }
87
88    /// Reload table information from ObjectStore
89    pub async fn refresh(&self, state: &dyn Session) -> datafusion_common::Result<()> {
90        let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
91        let base = Path::new(self.path.as_ref());
92        let mut tables = HashSet::new();
93        for file in entries.iter() {
94            // The listing will initially be a file. However if we've recursed up to match our base, we know our path is a directory.
95            let mut is_dir = false;
96            let mut parent = Path::new(file.location.as_ref());
97            while let Some(p) = parent.parent() {
98                if p == base {
99                    tables.insert(TablePath {
100                        is_dir,
101                        path: parent,
102                    });
103                }
104                parent = p;
105                is_dir = true;
106            }
107        }
108        for table in tables.iter() {
109            let file_name = table
110                .path
111                .file_name()
112                .ok_or_else(|| {
113                    DataFusionError::Internal("Cannot parse file name!".to_string())
114                })?
115                .to_str()
116                .ok_or_else(|| {
117                    DataFusionError::Internal("Cannot parse file name!".to_string())
118                })?;
119            let table_name = file_name.split('.').collect_vec()[0];
120            let table_path = table.to_string().ok_or_else(|| {
121                DataFusionError::Internal("Cannot parse file name!".to_string())
122            })?;
123
124            if !self.table_exist(table_name) {
125                let table_url = format!("{}/{}", self.authority, table_path);
126
127                let name = TableReference::bare(table_name);
128                let provider = self
129                    .factory
130                    .create(
131                        state,
132                        &CreateExternalTable {
133                            schema: Arc::new(DFSchema::empty()),
134                            name,
135                            location: table_url,
136                            file_type: self.format.clone(),
137                            table_partition_cols: vec![],
138                            if_not_exists: false,
139                            temporary: false,
140                            definition: None,
141                            order_exprs: vec![],
142                            unbounded: false,
143                            options: Default::default(),
144                            constraints: Default::default(),
145                            column_defaults: Default::default(),
146                        },
147                    )
148                    .await?;
149                let _ =
150                    self.register_table(table_name.to_string(), Arc::clone(&provider))?;
151            }
152        }
153        Ok(())
154    }
155}
156
157#[async_trait]
158impl SchemaProvider for ListingSchemaProvider {
159    fn as_any(&self) -> &dyn Any {
160        self
161    }
162
163    fn table_names(&self) -> Vec<String> {
164        self.tables
165            .lock()
166            .expect("Can't lock tables")
167            .keys()
168            .map(|it| it.to_string())
169            .collect()
170    }
171
172    async fn table(
173        &self,
174        name: &str,
175    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
176        Ok(self
177            .tables
178            .lock()
179            .expect("Can't lock tables")
180            .get(name)
181            .cloned())
182    }
183
184    fn register_table(
185        &self,
186        name: String,
187        table: Arc<dyn TableProvider>,
188    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
189        self.tables
190            .lock()
191            .expect("Can't lock tables")
192            .insert(name, Arc::clone(&table));
193        Ok(Some(table))
194    }
195
196    fn deregister_table(
197        &self,
198        name: &str,
199    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
200        Ok(self.tables.lock().expect("Can't lock tables").remove(name))
201    }
202
203    fn table_exist(&self, name: &str) -> bool {
204        self.tables
205            .lock()
206            .expect("Can't lock tables")
207            .contains_key(name)
208    }
209}
210
211/// Stores metadata along with a table's path.
212/// Primarily whether the path is a directory or not.
213#[derive(Eq, PartialEq, Hash, Debug)]
214struct TablePath<'a> {
215    path: &'a Path,
216    is_dir: bool,
217}
218
219impl TablePath<'_> {
220    /// Format the path with a '/' appended if its a directory.
221    /// Clients (eg. object_store listing) can and will use the presence of trailing slash as a heuristic
222    fn to_string(&self) -> Option<String> {
223        self.path.to_str().map(|path| {
224            if self.is_dir {
225                format!("{path}/")
226            } else {
227                path.to_string()
228            }
229        })
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn table_path_ends_with_slash_when_is_dir() {
239        let table_path = TablePath {
240            path: Path::new("/file"),
241            is_dir: true,
242        };
243        assert!(table_path.to_string().expect("table path").ends_with('/'));
244    }
245
246    #[test]
247    fn dir_table_path_str_does_not_end_with_slash_when_not_is_dir() {
248        let table_path = TablePath {
249            path: Path::new("/file"),
250            is_dir: false,
251        };
252        assert!(!table_path.to_string().expect("table_path").ends_with('/'));
253    }
254}