datafusion_execution/
object_store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store.
19//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
20//! and query data inside these systems.
21
22use dashmap::DashMap;
23use datafusion_common::{
24    DataFusionError, Result, exec_err, internal_datafusion_err, not_impl_err,
25};
26use object_store::ObjectStore;
27#[cfg(not(target_arch = "wasm32"))]
28use object_store::local::LocalFileSystem;
29use std::sync::Arc;
30use url::Url;
31
32/// A parsed URL identifying a particular [`ObjectStore`] instance
33///
34/// For example:
35/// * `file://` for local file system
36/// * `s3://bucket` for AWS S3 bucket
37/// * `oss://bucket` for Aliyun OSS bucket
38#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
39pub struct ObjectStoreUrl {
40    url: Url,
41}
42
43impl ObjectStoreUrl {
44    /// Parse an [`ObjectStoreUrl`] from a string
45    ///
46    /// # Example
47    /// ```
48    /// # use url::Url;
49    /// # use datafusion_execution::object_store::ObjectStoreUrl;
50    /// let object_store_url = ObjectStoreUrl::parse("s3://bucket").unwrap();
51    /// assert_eq!(object_store_url.as_str(), "s3://bucket/");
52    /// // can also access the underlying `Url`
53    /// let url: &Url = object_store_url.as_ref();
54    /// assert_eq!(url.scheme(), "s3");
55    /// assert_eq!(url.host_str(), Some("bucket"));
56    /// assert_eq!(url.path(), "/");
57    /// ```
58    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
59        let mut parsed =
60            Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
61
62        let remaining = &parsed[url::Position::BeforePath..];
63        if !remaining.is_empty() && remaining != "/" {
64            return exec_err!(
65                "ObjectStoreUrl must only contain scheme and authority, got: {remaining}"
66            );
67        }
68
69        // Always set path for consistency
70        parsed.set_path("/");
71        Ok(Self { url: parsed })
72    }
73
74    /// An [`ObjectStoreUrl`] for the local filesystem (`file://`)
75    ///
76    /// # Example
77    /// ```
78    /// # use datafusion_execution::object_store::ObjectStoreUrl;
79    /// let local_fs = ObjectStoreUrl::parse("file://").unwrap();
80    /// assert_eq!(local_fs, ObjectStoreUrl::local_filesystem())
81    /// ```
82    pub fn local_filesystem() -> Self {
83        Self::parse("file://").unwrap()
84    }
85
86    /// Returns this [`ObjectStoreUrl`] as a string
87    pub fn as_str(&self) -> &str {
88        self.as_ref()
89    }
90}
91
92impl AsRef<str> for ObjectStoreUrl {
93    fn as_ref(&self) -> &str {
94        self.url.as_ref()
95    }
96}
97
98impl AsRef<Url> for ObjectStoreUrl {
99    fn as_ref(&self) -> &Url {
100        &self.url
101    }
102}
103
104impl std::fmt::Display for ObjectStoreUrl {
105    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
106        self.as_str().fmt(f)
107    }
108}
109
110/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance,
111/// and allows DataFusion to read from different [`ObjectStore`]
112/// instances. For example DataFusion might be configured so that
113///
114/// 1. `s3://my_bucket/lineitem/` mapped to the `/lineitem` path on an
115///    AWS S3 object store bound to `my_bucket`
116///
117/// 2. `s3://my_other_bucket/lineitem/` mapped to the (same)
118///    `/lineitem` path on a *different* AWS S3 object store bound to
119///    `my_other_bucket`
120///
121/// When given a [`ListingTableUrl`], DataFusion tries to find an
122/// appropriate [`ObjectStore`]. For example
123///
124/// ```sql
125/// create external table unicorns stored as parquet location 's3://my_bucket/lineitem/';
126/// ```
127///
128/// In this particular case, the url `s3://my_bucket/lineitem/` will be provided to
129/// [`ObjectStoreRegistry::get_store`] and one of three things will happen:
130///
131/// - If an [`ObjectStore`] has been registered with [`ObjectStoreRegistry::register_store`] with
132///   `s3://my_bucket`, that [`ObjectStore`] will be returned
133///
134/// - If an AWS S3 object store can be ad-hoc discovered by the url `s3://my_bucket/lineitem/`, this
135///   object store will be registered with key `s3://my_bucket` and returned.
136///
137/// - Otherwise an error will be returned, indicating that no suitable [`ObjectStore`] could
138///   be found
139///
140/// This allows for two different use-cases:
141///
142/// 1. Systems where object store buckets are explicitly created using DDL, can register these
143///    buckets using [`ObjectStoreRegistry::register_store`]
144///
145/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`]
146///    lazily by providing a custom implementation of [`ObjectStoreRegistry`]
147///
148/// <!-- is in a different crate so normal rustdoc links don't work -->
149/// [`ListingTableUrl`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTableUrl.html
150/// [`ObjectStore`]: object_store::ObjectStore
151pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
152    /// If a store with the same key existed before, it is replaced and returned
153    fn register_store(
154        &self,
155        url: &Url,
156        store: Arc<dyn ObjectStore>,
157    ) -> Option<Arc<dyn ObjectStore>>;
158
159    /// Deregister the store previously registered with the same key. Returns the
160    /// deregistered store if it existed.
161    #[expect(unused_variables)]
162    fn deregister_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
163        not_impl_err!(
164            "ObjectStoreRegistry::deregister_store is not implemented for this ObjectStoreRegistry"
165        )
166    }
167
168    /// Get a suitable store for the provided URL. For example:
169    ///
170    /// - URL with scheme `file:///` or no scheme will return the default LocalFS store
171    /// - URL with scheme `s3://bucket/` will return the S3 store
172    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
173    ///
174    /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on
175    /// the `url` and [`ObjectStoreRegistry`] implementation. An [`ObjectStore`] may be lazily
176    /// created and registered.
177    fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>;
178}
179
180/// The default [`ObjectStoreRegistry`]
181pub struct DefaultObjectStoreRegistry {
182    /// A map from scheme to object store that serve list / read operations for the store
183    object_stores: DashMap<String, Arc<dyn ObjectStore>>,
184}
185
186impl std::fmt::Debug for DefaultObjectStoreRegistry {
187    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
188        f.debug_struct("DefaultObjectStoreRegistry")
189            .field(
190                "schemes",
191                &self
192                    .object_stores
193                    .iter()
194                    .map(|o| o.key().clone())
195                    .collect::<Vec<_>>(),
196            )
197            .finish()
198    }
199}
200
201impl Default for DefaultObjectStoreRegistry {
202    fn default() -> Self {
203        Self::new()
204    }
205}
206
207impl DefaultObjectStoreRegistry {
208    /// This will register [`LocalFileSystem`] to handle `file://` paths
209    #[cfg(not(target_arch = "wasm32"))]
210    pub fn new() -> Self {
211        let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
212        object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
213        Self { object_stores }
214    }
215
216    /// Default without any backend registered.
217    #[cfg(target_arch = "wasm32")]
218    pub fn new() -> Self {
219        let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
220        Self { object_stores }
221    }
222}
223
224///
225/// Stores are registered based on the scheme, host and port of the provided URL
226/// with a [`LocalFileSystem::new`] automatically registered for `file://` (if the
227/// target arch is not `wasm32`).
228///
229/// For example:
230///
231/// - `file:///my_path` will return the default LocalFS store
232/// - `s3://bucket/path` will return a store registered with `s3://bucket` if any
233/// - `hdfs://host:port/path` will return a store registered with `hdfs://host:port` if any
234impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
235    fn register_store(
236        &self,
237        url: &Url,
238        store: Arc<dyn ObjectStore>,
239    ) -> Option<Arc<dyn ObjectStore>> {
240        let s = get_url_key(url);
241        self.object_stores.insert(s, store)
242    }
243
244    fn deregister_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
245        let s = get_url_key(url);
246        let (_, object_store) = self.object_stores
247            .remove(&s)
248            .ok_or_else(|| {
249                internal_datafusion_err!("Failed to deregister object store. No suitable object store found for {url}. See `RuntimeEnv::register_object_store`")
250            })?;
251
252        Ok(object_store)
253    }
254
255    fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
256        let s = get_url_key(url);
257        self.object_stores
258            .get(&s)
259            .map(|o| Arc::clone(o.value()))
260            .ok_or_else(|| {
261                internal_datafusion_err!("No suitable object store found for {url}. See `RuntimeEnv::register_object_store`")
262            })
263    }
264}
265
266/// Get the key of a url for object store registration.
267/// The credential info will be removed
268fn get_url_key(url: &Url) -> String {
269    format!(
270        "{}://{}",
271        url.scheme(),
272        &url[url::Position::BeforeHost..url::Position::AfterPort],
273    )
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    #[test]
281    fn test_object_store_url() {
282        let file = ObjectStoreUrl::parse("file://").unwrap();
283        assert_eq!(file.as_str(), "file:///");
284
285        let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
286        assert_eq!(url.as_str(), "s3://bucket/");
287
288        let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
289        assert_eq!(url.as_str(), "s3://username:password@host:123/");
290
291        let err = ObjectStoreUrl::parse("s3://bucket:invalid").unwrap_err();
292        assert_eq!(err.strip_backtrace(), "External error: invalid port number");
293
294        let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err();
295        assert_eq!(
296            err.strip_backtrace(),
297            "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?"
298        );
299
300        let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err();
301        assert_eq!(
302            err.strip_backtrace(),
303            "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?foo=bar"
304        );
305
306        let err = ObjectStoreUrl::parse("s3://host:123/foo").unwrap_err();
307        assert_eq!(
308            err.strip_backtrace(),
309            "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"
310        );
311
312        let err =
313            ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err();
314        assert_eq!(
315            err.strip_backtrace(),
316            "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"
317        );
318    }
319
320    #[test]
321    fn test_get_url_key() {
322        let file = ObjectStoreUrl::parse("file://").unwrap();
323        let key = get_url_key(&file.url);
324        assert_eq!(key.as_str(), "file://");
325
326        let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
327        let key = get_url_key(&url.url);
328        assert_eq!(key.as_str(), "s3://bucket");
329
330        let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
331        let key = get_url_key(&url.url);
332        assert_eq!(key.as_str(), "s3://host:123");
333    }
334}