datafusion_catalog/
listing_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ListingSchemaProvider`]: [`SchemaProvider`] that scans ObjectStores for tables automatically
19
20use std::any::Any;
21use std::collections::HashSet;
22use std::path::Path;
23use std::sync::{Arc, Mutex};
24
25use crate::{SchemaProvider, TableProvider, TableProviderFactory};
26
27use crate::Session;
28use datafusion_common::{
29    Constraints, DFSchema, DataFusionError, HashMap, TableReference,
30};
31use datafusion_expr::CreateExternalTable;
32
33use async_trait::async_trait;
34use futures::TryStreamExt;
35use itertools::Itertools;
36use object_store::ObjectStore;
37
38/// A [`SchemaProvider`] that scans an [`ObjectStore`] to automatically discover tables
39///
40/// A subfolder relationship is assumed, i.e. given:
41/// - authority = `s3://host.example.com:3000`
42/// - path = `/data/tpch`
43/// - factory = `DeltaTableFactory`
44///
45/// A table called "customer" will be registered for the folder:
46/// `s3://host.example.com:3000/data/tpch/customer`
47///
48/// assuming it contains valid deltalake data, i.e:
49/// - `s3://host.example.com:3000/data/tpch/customer/part-00000-xxxx.snappy.parquet`
50/// - `s3://host.example.com:3000/data/tpch/customer/_delta_log/`
51///
52/// [`ObjectStore`]: object_store::ObjectStore
53#[derive(Debug)]
54pub struct ListingSchemaProvider {
55    authority: String,
56    path: object_store::path::Path,
57    factory: Arc<dyn TableProviderFactory>,
58    store: Arc<dyn ObjectStore>,
59    tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
60    format: String,
61}
62
63impl ListingSchemaProvider {
64    /// Create a new `ListingSchemaProvider`
65    ///
66    /// Arguments:
67    /// `authority`: The scheme (i.e. s3://) + host (i.e. example.com:3000)
68    /// `path`: The root path that contains subfolders which represent tables
69    /// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
70    /// `store`: The `ObjectStore` containing the table data
71    /// `format`: The `FileFormat` of the tables
72    /// `has_header`: Indicates whether the created external table has the has_header flag enabled
73    pub fn new(
74        authority: String,
75        path: object_store::path::Path,
76        factory: Arc<dyn TableProviderFactory>,
77        store: Arc<dyn ObjectStore>,
78        format: String,
79    ) -> Self {
80        Self {
81            authority,
82            path,
83            factory,
84            store,
85            tables: Arc::new(Mutex::new(HashMap::new())),
86            format,
87        }
88    }
89
90    /// Reload table information from ObjectStore
91    pub async fn refresh(&self, state: &dyn Session) -> datafusion_common::Result<()> {
92        let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
93        let base = Path::new(self.path.as_ref());
94        let mut tables = HashSet::new();
95        for file in entries.iter() {
96            // The listing will initially be a file. However if we've recursed up to match our base, we know our path is a directory.
97            let mut is_dir = false;
98            let mut parent = Path::new(file.location.as_ref());
99            while let Some(p) = parent.parent() {
100                if p == base {
101                    tables.insert(TablePath {
102                        is_dir,
103                        path: parent,
104                    });
105                }
106                parent = p;
107                is_dir = true;
108            }
109        }
110        for table in tables.iter() {
111            let file_name = table
112                .path
113                .file_name()
114                .ok_or_else(|| {
115                    DataFusionError::Internal("Cannot parse file name!".to_string())
116                })?
117                .to_str()
118                .ok_or_else(|| {
119                    DataFusionError::Internal("Cannot parse file name!".to_string())
120                })?;
121            let table_name = file_name.split('.').collect_vec()[0];
122            let table_path = table.to_string().ok_or_else(|| {
123                DataFusionError::Internal("Cannot parse file name!".to_string())
124            })?;
125
126            if !self.table_exist(table_name) {
127                let table_url = format!("{}/{}", self.authority, table_path);
128
129                let name = TableReference::bare(table_name);
130                let provider = self
131                    .factory
132                    .create(
133                        state,
134                        &CreateExternalTable {
135                            schema: Arc::new(DFSchema::empty()),
136                            name,
137                            location: table_url,
138                            file_type: self.format.clone(),
139                            table_partition_cols: vec![],
140                            if_not_exists: false,
141                            temporary: false,
142                            definition: None,
143                            order_exprs: vec![],
144                            unbounded: false,
145                            options: Default::default(),
146                            constraints: Constraints::empty(),
147                            column_defaults: Default::default(),
148                        },
149                    )
150                    .await?;
151                let _ =
152                    self.register_table(table_name.to_string(), Arc::clone(&provider))?;
153            }
154        }
155        Ok(())
156    }
157}
158
159#[async_trait]
160impl SchemaProvider for ListingSchemaProvider {
161    fn as_any(&self) -> &dyn Any {
162        self
163    }
164
165    fn table_names(&self) -> Vec<String> {
166        self.tables
167            .lock()
168            .expect("Can't lock tables")
169            .keys()
170            .map(|it| it.to_string())
171            .collect()
172    }
173
174    async fn table(
175        &self,
176        name: &str,
177    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
178        Ok(self
179            .tables
180            .lock()
181            .expect("Can't lock tables")
182            .get(name)
183            .cloned())
184    }
185
186    fn register_table(
187        &self,
188        name: String,
189        table: Arc<dyn TableProvider>,
190    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
191        self.tables
192            .lock()
193            .expect("Can't lock tables")
194            .insert(name, Arc::clone(&table));
195        Ok(Some(table))
196    }
197
198    fn deregister_table(
199        &self,
200        name: &str,
201    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
202        Ok(self.tables.lock().expect("Can't lock tables").remove(name))
203    }
204
205    fn table_exist(&self, name: &str) -> bool {
206        self.tables
207            .lock()
208            .expect("Can't lock tables")
209            .contains_key(name)
210    }
211}
212
213/// Stores metadata along with a table's path.
214/// Primarily whether the path is a directory or not.
215#[derive(Eq, PartialEq, Hash, Debug)]
216struct TablePath<'a> {
217    path: &'a Path,
218    is_dir: bool,
219}
220
221impl TablePath<'_> {
222    /// Format the path with a '/' appended if its a directory.
223    /// Clients (eg. object_store listing) can and will use the presence of trailing slash as a heuristic
224    fn to_string(&self) -> Option<String> {
225        self.path.to_str().map(|path| {
226            if self.is_dir {
227                format!("{path}/")
228            } else {
229                path.to_string()
230            }
231        })
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    #[test]
240    fn table_path_ends_with_slash_when_is_dir() {
241        let table_path = TablePath {
242            path: Path::new("/file"),
243            is_dir: true,
244        };
245        assert!(table_path.to_string().expect("table path").ends_with('/'));
246    }
247
248    #[test]
249    fn dir_table_path_str_does_not_end_with_slash_when_not_is_dir() {
250        let table_path = TablePath {
251            path: Path::new("/file"),
252            is_dir: false,
253        };
254        assert!(!table_path.to_string().expect("table_path").ends_with('/'));
255    }
256}