datafusion_catalog/
listing_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ListingSchemaProvider`]: [`SchemaProvider`] that scans ObjectStores for tables automatically
19
20use std::any::Any;
21use std::collections::HashSet;
22use std::path::Path;
23use std::sync::{Arc, Mutex};
24
25use crate::{SchemaProvider, TableProvider, TableProviderFactory};
26
27use crate::Session;
28use datafusion_common::{
29    DFSchema, DataFusionError, HashMap, TableReference, internal_datafusion_err,
30};
31use datafusion_expr::CreateExternalTable;
32
33use async_trait::async_trait;
34use futures::TryStreamExt;
35use itertools::Itertools;
36use object_store::ObjectStore;
37
38/// A [`SchemaProvider`] that scans an [`ObjectStore`] to automatically discover tables
39///
40/// A subfolder relationship is assumed, i.e. given:
41/// - authority = `s3://host.example.com:3000`
42/// - path = `/data/tpch`
43/// - factory = `DeltaTableFactory`
44///
45/// A table called "customer" will be registered for the folder:
46/// `s3://host.example.com:3000/data/tpch/customer`
47///
48/// assuming it contains valid deltalake data, i.e:
49/// - `s3://host.example.com:3000/data/tpch/customer/part-00000-xxxx.snappy.parquet`
50/// - `s3://host.example.com:3000/data/tpch/customer/_delta_log/`
51///
52/// [`ObjectStore`]: object_store::ObjectStore
53#[derive(Debug)]
54pub struct ListingSchemaProvider {
55    authority: String,
56    path: object_store::path::Path,
57    factory: Arc<dyn TableProviderFactory>,
58    store: Arc<dyn ObjectStore>,
59    tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
60    format: String,
61}
62
63impl ListingSchemaProvider {
64    /// Create a new `ListingSchemaProvider`
65    ///
66    /// Arguments:
67    /// `authority`: The scheme (i.e. s3://) + host (i.e. example.com:3000)
68    /// `path`: The root path that contains subfolders which represent tables
69    /// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
70    /// `store`: The `ObjectStore` containing the table data
71    /// `format`: The `FileFormat` of the tables
72    /// `has_header`: Indicates whether the created external table has the has_header flag enabled
73    pub fn new(
74        authority: String,
75        path: object_store::path::Path,
76        factory: Arc<dyn TableProviderFactory>,
77        store: Arc<dyn ObjectStore>,
78        format: String,
79    ) -> Self {
80        Self {
81            authority,
82            path,
83            factory,
84            store,
85            tables: Arc::new(Mutex::new(HashMap::new())),
86            format,
87        }
88    }
89
90    /// Reload table information from ObjectStore
91    pub async fn refresh(&self, state: &dyn Session) -> datafusion_common::Result<()> {
92        let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
93        let base = Path::new(self.path.as_ref());
94        let mut tables = HashSet::new();
95        for file in entries.iter() {
96            // The listing will initially be a file. However if we've recursed up to match our base, we know our path is a directory.
97            let mut is_dir = false;
98            let mut parent = Path::new(file.location.as_ref());
99            while let Some(p) = parent.parent() {
100                if p == base {
101                    tables.insert(TablePath {
102                        is_dir,
103                        path: parent,
104                    });
105                }
106                parent = p;
107                is_dir = true;
108            }
109        }
110        for table in tables.iter() {
111            let file_name = table
112                .path
113                .file_name()
114                .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?
115                .to_str()
116                .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
117            let table_name = file_name.split('.').collect_vec()[0];
118            let table_path = table
119                .to_string()
120                .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
121
122            if !self.table_exist(table_name) {
123                let table_url = format!("{}/{}", self.authority, table_path);
124
125                let name = TableReference::bare(table_name);
126                let provider = self
127                    .factory
128                    .create(
129                        state,
130                        &CreateExternalTable::builder(
131                            name,
132                            table_url,
133                            self.format.clone(),
134                            Arc::new(DFSchema::empty()),
135                        )
136                        .build(),
137                    )
138                    .await?;
139                let _ =
140                    self.register_table(table_name.to_string(), Arc::clone(&provider))?;
141            }
142        }
143        Ok(())
144    }
145}
146
147#[async_trait]
148impl SchemaProvider for ListingSchemaProvider {
149    fn as_any(&self) -> &dyn Any {
150        self
151    }
152
153    fn table_names(&self) -> Vec<String> {
154        self.tables
155            .lock()
156            .expect("Can't lock tables")
157            .keys()
158            .map(|it| it.to_string())
159            .collect()
160    }
161
162    async fn table(
163        &self,
164        name: &str,
165    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
166        Ok(self
167            .tables
168            .lock()
169            .expect("Can't lock tables")
170            .get(name)
171            .cloned())
172    }
173
174    fn register_table(
175        &self,
176        name: String,
177        table: Arc<dyn TableProvider>,
178    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
179        self.tables
180            .lock()
181            .expect("Can't lock tables")
182            .insert(name, Arc::clone(&table));
183        Ok(Some(table))
184    }
185
186    fn deregister_table(
187        &self,
188        name: &str,
189    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
190        Ok(self.tables.lock().expect("Can't lock tables").remove(name))
191    }
192
193    fn table_exist(&self, name: &str) -> bool {
194        self.tables
195            .lock()
196            .expect("Can't lock tables")
197            .contains_key(name)
198    }
199}
200
201/// Stores metadata along with a table's path.
202/// Primarily whether the path is a directory or not.
203#[derive(Eq, PartialEq, Hash, Debug)]
204struct TablePath<'a> {
205    path: &'a Path,
206    is_dir: bool,
207}
208
209impl TablePath<'_> {
210    /// Format the path with a '/' appended if its a directory.
211    /// Clients (eg. object_store listing) can and will use the presence of trailing slash as a heuristic
212    fn to_string(&self) -> Option<String> {
213        self.path.to_str().map(|path| {
214            if self.is_dir {
215                format!("{path}/")
216            } else {
217                path.to_string()
218            }
219        })
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    #[test]
228    fn table_path_ends_with_slash_when_is_dir() {
229        let table_path = TablePath {
230            path: Path::new("/file"),
231            is_dir: true,
232        };
233        assert!(table_path.to_string().expect("table path").ends_with('/'));
234    }
235
236    #[test]
237    fn dir_table_path_str_does_not_end_with_slash_when_not_is_dir() {
238        let table_path = TablePath {
239            path: Path::new("/file"),
240            is_dir: false,
241        };
242        assert!(!table_path.to_string().expect("table_path").ends_with('/'));
243    }
244}