Skip to main content

datafusion_catalog/
listing_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ListingSchemaProvider`]: [`SchemaProvider`] that scans ObjectStores for tables automatically
19
20use std::collections::HashSet;
21use std::path::Path;
22use std::sync::{Arc, Mutex};
23
24use crate::{SchemaProvider, TableProvider, TableProviderFactory};
25
26use crate::Session;
27use datafusion_common::{
28    DFSchema, DataFusionError, HashMap, TableReference, internal_datafusion_err,
29};
30use datafusion_expr::CreateExternalTable;
31
32use async_trait::async_trait;
33use futures::TryStreamExt;
34use itertools::Itertools;
35use object_store::ObjectStore;
36
37/// A [`SchemaProvider`] that scans an [`ObjectStore`] to automatically discover tables
38///
39/// A subfolder relationship is assumed, i.e. given:
40/// - authority = `s3://host.example.com:3000`
41/// - path = `/data/tpch`
42/// - factory = `DeltaTableFactory`
43///
44/// A table called "customer" will be registered for the folder:
45/// `s3://host.example.com:3000/data/tpch/customer`
46///
47/// assuming it contains valid deltalake data, i.e:
48/// - `s3://host.example.com:3000/data/tpch/customer/part-00000-xxxx.snappy.parquet`
49/// - `s3://host.example.com:3000/data/tpch/customer/_delta_log/`
50///
51/// [`ObjectStore`]: object_store::ObjectStore
52#[derive(Debug)]
53pub struct ListingSchemaProvider {
54    authority: String,
55    path: object_store::path::Path,
56    factory: Arc<dyn TableProviderFactory>,
57    store: Arc<dyn ObjectStore>,
58    tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
59    format: String,
60}
61
62impl ListingSchemaProvider {
63    /// Create a new `ListingSchemaProvider`
64    ///
65    /// Arguments:
66    /// `authority`: The scheme (i.e. s3://) + host (i.e. example.com:3000)
67    /// `path`: The root path that contains subfolders which represent tables
68    /// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
69    /// `store`: The `ObjectStore` containing the table data
70    /// `format`: The `FileFormat` of the tables
71    /// `has_header`: Indicates whether the created external table has the has_header flag enabled
72    pub fn new(
73        authority: String,
74        path: object_store::path::Path,
75        factory: Arc<dyn TableProviderFactory>,
76        store: Arc<dyn ObjectStore>,
77        format: String,
78    ) -> Self {
79        Self {
80            authority,
81            path,
82            factory,
83            store,
84            tables: Arc::new(Mutex::new(HashMap::new())),
85            format,
86        }
87    }
88
89    /// Reload table information from ObjectStore
90    pub async fn refresh(&self, state: &dyn Session) -> datafusion_common::Result<()> {
91        let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
92        let base = Path::new(self.path.as_ref());
93        let mut tables = HashSet::new();
94        for file in entries.iter() {
95            // The listing will initially be a file. However if we've recursed up to match our base, we know our path is a directory.
96            let mut is_dir = false;
97            let mut parent = Path::new(file.location.as_ref());
98            while let Some(p) = parent.parent() {
99                if p == base {
100                    tables.insert(TablePath {
101                        is_dir,
102                        path: parent,
103                    });
104                }
105                parent = p;
106                is_dir = true;
107            }
108        }
109        for table in tables.iter() {
110            let file_name = table
111                .path
112                .file_name()
113                .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?
114                .to_str()
115                .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
116            let table_name = file_name.split('.').collect_vec()[0];
117            let table_path = table
118                .to_string()
119                .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
120
121            if !self.table_exist(table_name) {
122                let table_url = format!("{}/{}", self.authority, table_path);
123
124                let name = TableReference::bare(table_name);
125                let provider = self
126                    .factory
127                    .create(
128                        state,
129                        &CreateExternalTable::builder(
130                            name,
131                            table_url,
132                            self.format.clone(),
133                            Arc::new(DFSchema::empty()),
134                        )
135                        .build(),
136                    )
137                    .await?;
138                let _ =
139                    self.register_table(table_name.to_string(), Arc::clone(&provider))?;
140            }
141        }
142        Ok(())
143    }
144}
145
146#[async_trait]
147impl SchemaProvider for ListingSchemaProvider {
148    fn table_names(&self) -> Vec<String> {
149        self.tables
150            .lock()
151            .expect("Can't lock tables")
152            .keys()
153            .map(|it| it.to_string())
154            .collect()
155    }
156
157    async fn table(
158        &self,
159        name: &str,
160    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
161        Ok(self
162            .tables
163            .lock()
164            .expect("Can't lock tables")
165            .get(name)
166            .cloned())
167    }
168
169    fn register_table(
170        &self,
171        name: String,
172        table: Arc<dyn TableProvider>,
173    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
174        self.tables
175            .lock()
176            .expect("Can't lock tables")
177            .insert(name, Arc::clone(&table));
178        Ok(Some(table))
179    }
180
181    fn deregister_table(
182        &self,
183        name: &str,
184    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
185        Ok(self.tables.lock().expect("Can't lock tables").remove(name))
186    }
187
188    fn table_exist(&self, name: &str) -> bool {
189        self.tables
190            .lock()
191            .expect("Can't lock tables")
192            .contains_key(name)
193    }
194}
195
196/// Stores metadata along with a table's path.
197/// Primarily whether the path is a directory or not.
198#[derive(Eq, PartialEq, Hash, Debug)]
199struct TablePath<'a> {
200    path: &'a Path,
201    is_dir: bool,
202}
203
204impl TablePath<'_> {
205    /// Format the path with a '/' appended if its a directory.
206    /// Clients (eg. object_store listing) can and will use the presence of trailing slash as a heuristic
207    fn to_string(&self) -> Option<String> {
208        self.path.to_str().map(|path| {
209            if self.is_dir {
210                format!("{path}/")
211            } else {
212                path.to_string()
213            }
214        })
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221
222    #[test]
223    fn table_path_ends_with_slash_when_is_dir() {
224        let table_path = TablePath {
225            path: Path::new("/file"),
226            is_dir: true,
227        };
228        assert!(table_path.to_string().expect("table path").ends_with('/'));
229    }
230
231    #[test]
232    fn dir_table_path_str_does_not_end_with_slash_when_not_is_dir() {
233        let table_path = TablePath {
234            path: Path::new("/file"),
235            is_dir: false,
236        };
237        assert!(!table_path.to_string().expect("table_path").ends_with('/'));
238    }
239}