datafusion_execution/
object_store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.
19//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
20//! and query data inside these systems.
21
22use dashmap::DashMap;
23use datafusion_common::{exec_err, DataFusionError, Result};
24#[cfg(not(target_arch = "wasm32"))]
25use object_store::local::LocalFileSystem;
26use object_store::ObjectStore;
27use std::sync::Arc;
28use url::Url;
29
30/// A parsed URL identifying a particular [`ObjectStore`] instance
31///
32/// For example:
33/// * `file://` for local file system
34/// * `s3://bucket` for AWS S3 bucket
35/// * `oss://bucket` for Aliyun OSS bucket
36#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
37pub struct ObjectStoreUrl {
38    url: Url,
39}
40
41impl ObjectStoreUrl {
42    /// Parse an [`ObjectStoreUrl`] from a string
43    ///
44    /// # Example
45    /// ```
46    /// # use url::Url;
47    /// # use datafusion_execution::object_store::ObjectStoreUrl;
48    /// let object_store_url = ObjectStoreUrl::parse("s3://bucket").unwrap();
49    /// assert_eq!(object_store_url.as_str(), "s3://bucket/");
50    /// // can also access the underlying `Url`
51    /// let url: &Url = object_store_url.as_ref();
52    /// assert_eq!(url.scheme(), "s3");
53    /// assert_eq!(url.host_str(), Some("bucket"));
54    /// assert_eq!(url.path(), "/");
55    /// ```
56    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
57        let mut parsed =
58            Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
59
60        let remaining = &parsed[url::Position::BeforePath..];
61        if !remaining.is_empty() && remaining != "/" {
62            return exec_err!(
63                "ObjectStoreUrl must only contain scheme and authority, got: {remaining}"
64            );
65        }
66
67        // Always set path for consistency
68        parsed.set_path("/");
69        Ok(Self { url: parsed })
70    }
71
72    /// An [`ObjectStoreUrl`] for the local filesystem (`file://`)
73    ///
74    /// # Example
75    /// ```
76    /// # use datafusion_execution::object_store::ObjectStoreUrl;
77    /// let local_fs = ObjectStoreUrl::parse("file://").unwrap();
78    /// assert_eq!(local_fs, ObjectStoreUrl::local_filesystem())
79    /// ```
80    pub fn local_filesystem() -> Self {
81        Self::parse("file://").unwrap()
82    }
83
84    /// Returns this [`ObjectStoreUrl`] as a string
85    pub fn as_str(&self) -> &str {
86        self.as_ref()
87    }
88}
89
90impl AsRef<str> for ObjectStoreUrl {
91    fn as_ref(&self) -> &str {
92        self.url.as_ref()
93    }
94}
95
96impl AsRef<Url> for ObjectStoreUrl {
97    fn as_ref(&self) -> &Url {
98        &self.url
99    }
100}
101
102impl std::fmt::Display for ObjectStoreUrl {
103    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
104        self.as_str().fmt(f)
105    }
106}
107
108/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance,
109/// and allows DataFusion to read from different [`ObjectStore`]
110/// instances. For example DataFusion might be configured so that
111///
112/// 1. `s3://my_bucket/lineitem/` mapped to the `/lineitem` path on an
113///    AWS S3 object store bound to `my_bucket`
114///
115/// 2. `s3://my_other_bucket/lineitem/` mapped to the (same)
116///    `/lineitem` path on a *different* AWS S3 object store bound to
117///    `my_other_bucket`
118///
119/// When given a [`ListingTableUrl`], DataFusion tries to find an
120/// appropriate [`ObjectStore`]. For example
121///
122/// ```sql
123/// create external table unicorns stored as parquet location 's3://my_bucket/lineitem/';
124/// ```
125///
126/// In this particular case, the url `s3://my_bucket/lineitem/` will be provided to
127/// [`ObjectStoreRegistry::get_store`] and one of three things will happen:
128///
129/// - If an [`ObjectStore`] has been registered with [`ObjectStoreRegistry::register_store`] with
130///   `s3://my_bucket`, that [`ObjectStore`] will be returned
131///
132/// - If an AWS S3 object store can be ad-hoc discovered by the url `s3://my_bucket/lineitem/`, this
133///   object store will be registered with key `s3://my_bucket` and returned.
134///
135/// - Otherwise an error will be returned, indicating that no suitable [`ObjectStore`] could
136///   be found
137///
138/// This allows for two different use-cases:
139///
140/// 1. Systems where object store buckets are explicitly created using DDL, can register these
141///    buckets using [`ObjectStoreRegistry::register_store`]
142///
143/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`]
144///    lazily by providing a custom implementation of [`ObjectStoreRegistry`]
145///
146/// <!-- is in a different crate so normal rustdoc links don't work -->
147/// [`ListingTableUrl`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTableUrl.html
148/// [`ObjectStore`]: object_store::ObjectStore
149pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
150    /// If a store with the same key existed before, it is replaced and returned
151    fn register_store(
152        &self,
153        url: &Url,
154        store: Arc<dyn ObjectStore>,
155    ) -> Option<Arc<dyn ObjectStore>>;
156
157    /// Get a suitable store for the provided URL. For example:
158    ///
159    /// - URL with scheme `file:///` or no scheme will return the default LocalFS store
160    /// - URL with scheme `s3://bucket/` will return the S3 store
161    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
162    ///
163    /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on
164    /// the `url` and [`ObjectStoreRegistry`] implementation. An [`ObjectStore`] may be lazily
165    /// created and registered.
166    fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>;
167}
168
169/// The default [`ObjectStoreRegistry`]
170pub struct DefaultObjectStoreRegistry {
171    /// A map from scheme to object store that serve list / read operations for the store
172    object_stores: DashMap<String, Arc<dyn ObjectStore>>,
173}
174
175impl std::fmt::Debug for DefaultObjectStoreRegistry {
176    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
177        f.debug_struct("DefaultObjectStoreRegistry")
178            .field(
179                "schemes",
180                &self
181                    .object_stores
182                    .iter()
183                    .map(|o| o.key().clone())
184                    .collect::<Vec<_>>(),
185            )
186            .finish()
187    }
188}
189
190impl Default for DefaultObjectStoreRegistry {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196impl DefaultObjectStoreRegistry {
197    /// This will register [`LocalFileSystem`] to handle `file://` paths
198    #[cfg(not(target_arch = "wasm32"))]
199    pub fn new() -> Self {
200        let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
201        object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
202        Self { object_stores }
203    }
204
205    /// Default without any backend registered.
206    #[cfg(target_arch = "wasm32")]
207    pub fn new() -> Self {
208        let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
209        Self { object_stores }
210    }
211}
212
213///
214/// Stores are registered based on the scheme, host and port of the provided URL
215/// with a [`LocalFileSystem::new`] automatically registered for `file://` (if the
216/// target arch is not `wasm32`).
217///
218/// For example:
219///
220/// - `file:///my_path` will return the default LocalFS store
221/// - `s3://bucket/path` will return a store registered with `s3://bucket` if any
222/// - `hdfs://host:port/path` will return a store registered with `hdfs://host:port` if any
223impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
224    fn register_store(
225        &self,
226        url: &Url,
227        store: Arc<dyn ObjectStore>,
228    ) -> Option<Arc<dyn ObjectStore>> {
229        let s = get_url_key(url);
230        self.object_stores.insert(s, store)
231    }
232
233    fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
234        let s = get_url_key(url);
235        self.object_stores
236            .get(&s)
237            .map(|o| Arc::clone(o.value()))
238            .ok_or_else(|| {
239                DataFusionError::Internal(format!(
240                    "No suitable object store found for {url}. See `RuntimeEnv::register_object_store`"
241                ))
242            })
243    }
244}
245
246/// Get the key of a url for object store registration.
247/// The credential info will be removed
248fn get_url_key(url: &Url) -> String {
249    format!(
250        "{}://{}",
251        url.scheme(),
252        &url[url::Position::BeforeHost..url::Position::AfterPort],
253    )
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[test]
261    fn test_object_store_url() {
262        let file = ObjectStoreUrl::parse("file://").unwrap();
263        assert_eq!(file.as_str(), "file:///");
264
265        let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
266        assert_eq!(url.as_str(), "s3://bucket/");
267
268        let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
269        assert_eq!(url.as_str(), "s3://username:password@host:123/");
270
271        let err = ObjectStoreUrl::parse("s3://bucket:invalid").unwrap_err();
272        assert_eq!(err.strip_backtrace(), "External error: invalid port number");
273
274        let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err();
275        assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?");
276
277        let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err();
278        assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?foo=bar");
279
280        let err = ObjectStoreUrl::parse("s3://host:123/foo").unwrap_err();
281        assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
282
283        let err =
284            ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err();
285        assert_eq!(err.strip_backtrace(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
286    }
287
288    #[test]
289    fn test_get_url_key() {
290        let file = ObjectStoreUrl::parse("file://").unwrap();
291        let key = get_url_key(&file.url);
292        assert_eq!(key.as_str(), "file://");
293
294        let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
295        let key = get_url_key(&url.url);
296        assert_eq!(key.as_str(), "s3://bucket");
297
298        let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
299        let key = get_url_key(&url.url);
300        assert_eq!(key.as_str(), "s3://host:123");
301    }
302}