lance_io/object_store/
providers.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    sync::{Arc, RwLock, Weak},
7};
8
9use object_store::path::Path;
10use snafu::location;
11use url::Url;
12
13use crate::object_store::uri_to_url;
14use crate::object_store::WrappingObjectStore;
15
16use super::{tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams};
17use lance_core::error::{Error, LanceOptionExt, Result};
18
19#[cfg(feature = "aws")]
20pub mod aws;
21#[cfg(feature = "azure")]
22pub mod azure;
23#[cfg(feature = "gcp")]
24pub mod gcp;
25#[cfg(feature = "huggingface")]
26pub mod huggingface;
27pub mod local;
28pub mod memory;
29#[cfg(feature = "oss")]
30pub mod oss;
31
32#[async_trait::async_trait]
33pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
34    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore>;
35
36    /// Extract the path relative to the base of the store.
37    ///
38    /// For example, in S3 the path is relative to the bucket. So a URL of
39    /// `s3://bucket/path/to/file` would return `path/to/file`.
40    ///
41    /// Meanwhile, for a file store, the path is relative to the filesystem root.
42    /// So a URL of `file:///path/to/file` would return `/path/to/file`.
43    fn extract_path(&self, url: &Url) -> Result<Path> {
44        Path::parse(url.path()).map_err(|_| {
45            Error::invalid_input(format!("Invalid path in URL: {}", url.path()), location!())
46        })
47    }
48
49    /// Calculate the unique prefix that should be used for this object store.
50    ///
51    /// For object stores that don't have the concept of buckets, this will just be something like
52    /// 'file' or 'memory'.
53    ///
54    /// In object stores where all bucket names are unique, like s3, this will be
55    /// simply 's3$my_bucket_name' or similar.
56    ///
57    /// In Azure, only the combination of (account name, container name) is unique, so
58    /// this will be something like 'az$account_name@container'
59    ///
60    /// Providers should override this if they have special requirements like Azure's.
61    fn calculate_object_store_prefix(
62        &self,
63        url: &Url,
64        _storage_options: Option<&HashMap<String, String>>,
65    ) -> Result<String> {
66        Ok(format!("{}${}", url.scheme(), url.authority()))
67    }
68}
69
70/// A registry of object store providers.
71///
72/// Use [`Self::default()`] to create one with the available default providers.
73/// This includes (depending on features enabled):
74/// - `memory`: An in-memory object store.
75/// - `file`: A local file object store, with optimized code paths.
76/// - `file-object-store`: A local file object store that uses the ObjectStore API,
77///   for all operations. Used for testing with ObjectStore wrappers.
78/// - `s3`: An S3 object store.
79/// - `s3+ddb`: An S3 object store with DynamoDB for metadata.
80/// - `az`: An Azure Blob Storage object store.
81/// - `gs`: A Google Cloud Storage object store.
82///
83/// Use [`Self::empty()`] to create an empty registry, with no providers registered.
84///
85/// The registry also caches object stores that are currently in use. It holds
86/// weak references to the object stores, so they are not held onto. If an object
87/// store is no longer in use, it will be removed from the cache on the next
88/// call to either [`Self::active_stores()`] or [`Self::get_store()`].
89#[derive(Debug)]
90pub struct ObjectStoreRegistry {
91    providers: RwLock<HashMap<String, Arc<dyn ObjectStoreProvider>>>,
92    // Cache of object stores currently in use. We use a weak reference so the
93    // cache itself doesn't keep them alive if no object store is actually using
94    // it.
95    active_stores: RwLock<HashMap<(String, ObjectStoreParams), Weak<ObjectStore>>>,
96}
97
98impl ObjectStoreRegistry {
99    /// Create a new registry with no providers registered.
100    ///
101    /// Typically, you want to use [`Self::default()`] instead, so you get the
102    /// default providers.
103    pub fn empty() -> Self {
104        Self {
105            providers: RwLock::new(HashMap::new()),
106            active_stores: RwLock::new(HashMap::new()),
107        }
108    }
109
110    /// Get the object store provider for a given scheme.
111    pub fn get_provider(&self, scheme: &str) -> Option<Arc<dyn ObjectStoreProvider>> {
112        self.providers
113            .read()
114            .expect("ObjectStoreRegistry lock poisoned")
115            .get(scheme)
116            .cloned()
117    }
118
119    /// Get a list of all active object stores.
120    ///
121    /// Calling this will also clean up any weak references to object stores that
122    /// are no longer valid.
123    pub fn active_stores(&self) -> Vec<Arc<ObjectStore>> {
124        let mut found_inactive = false;
125        let output = self
126            .active_stores
127            .read()
128            .expect("ObjectStoreRegistry lock poisoned")
129            .values()
130            .filter_map(|weak| match weak.upgrade() {
131                Some(store) => Some(store),
132                None => {
133                    found_inactive = true;
134                    None
135                }
136            })
137            .collect();
138
139        if found_inactive {
140            // Clean up the cache by removing any weak references that are no longer valid
141            let mut cache_lock = self
142                .active_stores
143                .write()
144                .expect("ObjectStoreRegistry lock poisoned");
145            cache_lock.retain(|_, weak| weak.upgrade().is_some());
146        }
147        output
148    }
149
150    fn scheme_not_found_error(&self, scheme: &str) -> Error {
151        let mut message = format!("No object store provider found for scheme: '{}'", scheme);
152        if let Ok(providers) = self.providers.read() {
153            let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
154            message.push_str(&format!("\nValid schemes: {}", valid_schemes));
155        }
156        Error::invalid_input(message, location!())
157    }
158
159    /// Get an object store for a given base path and parameters.
160    ///
161    /// If the object store is already in use, it will return a strong reference
162    /// to the object store. If the object store is not in use, it will create a
163    /// new object store and return a strong reference to it.
164    pub async fn get_store(
165        &self,
166        base_path: Url,
167        params: &ObjectStoreParams,
168    ) -> Result<Arc<ObjectStore>> {
169        let scheme = base_path.scheme();
170        let Some(provider) = self.get_provider(scheme) else {
171            return Err(self.scheme_not_found_error(scheme));
172        };
173
174        let cache_path =
175            provider.calculate_object_store_prefix(&base_path, params.storage_options.as_ref())?;
176        let cache_key = (cache_path.clone(), params.clone());
177
178        // Check if we have a cached store for this base path and params
179        {
180            let maybe_store = self
181                .active_stores
182                .read()
183                .ok()
184                .expect_ok()?
185                .get(&cache_key)
186                .cloned();
187            if let Some(store) = maybe_store {
188                if let Some(store) = store.upgrade() {
189                    return Ok(store);
190                } else {
191                    // Remove the weak reference if it is no longer valid
192                    let mut cache_lock = self
193                        .active_stores
194                        .write()
195                        .expect("ObjectStoreRegistry lock poisoned");
196                    if let Some(store) = cache_lock.get(&cache_key) {
197                        if store.upgrade().is_none() {
198                            // Remove the weak reference if it is no longer valid
199                            cache_lock.remove(&cache_key);
200                        }
201                    }
202                }
203            }
204        }
205
206        let mut store = provider.new_store(base_path, params).await?;
207
208        store.inner = store.inner.traced();
209
210        if let Some(wrapper) = &params.object_store_wrapper {
211            store.inner = wrapper.wrap(&cache_path, store.inner);
212        }
213
214        // Always wrap with IO tracking
215        store.inner = store.io_tracker.wrap("", store.inner);
216
217        let store = Arc::new(store);
218
219        {
220            // Insert the store into the cache
221            let mut cache_lock = self.active_stores.write().ok().expect_ok()?;
222            cache_lock.insert(cache_key, Arc::downgrade(&store));
223        }
224
225        Ok(store)
226    }
227
228    /// Calculate the datastore prefix based on the URI and the storage options.
229    /// The data store prefix should uniquely identify the datastore.
230    pub fn calculate_object_store_prefix(
231        &self,
232        uri: &str,
233        storage_options: Option<&HashMap<String, String>>,
234    ) -> Result<String> {
235        let url = uri_to_url(uri)?;
236        match self.get_provider(url.scheme()) {
237            None => {
238                if url.scheme() == "file" || url.scheme().len() == 1 {
239                    Ok("file".to_string())
240                } else {
241                    Err(self.scheme_not_found_error(url.scheme()))
242                }
243            }
244            Some(provider) => provider.calculate_object_store_prefix(&url, storage_options),
245        }
246    }
247}
248
249impl Default for ObjectStoreRegistry {
250    fn default() -> Self {
251        let mut providers: HashMap<String, Arc<dyn ObjectStoreProvider>> = HashMap::new();
252
253        providers.insert("memory".into(), Arc::new(memory::MemoryStoreProvider));
254        providers.insert("file".into(), Arc::new(local::FileStoreProvider));
255        // The "file" scheme has special optimized code paths that bypass
256        // the ObjectStore API for better performance. However, this can make it
257        // hard to test when using ObjectStore wrappers, such as IOTrackingStore.
258        // So we provide a "file-object-store" scheme that uses the ObjectStore API.
259        // The specialized code paths are differentiated by the scheme name.
260        providers.insert(
261            "file-object-store".into(),
262            Arc::new(local::FileStoreProvider),
263        );
264
265        #[cfg(feature = "aws")]
266        {
267            let aws = Arc::new(aws::AwsStoreProvider);
268            providers.insert("s3".into(), aws.clone());
269            providers.insert("s3+ddb".into(), aws);
270        }
271        #[cfg(feature = "azure")]
272        providers.insert("az".into(), Arc::new(azure::AzureBlobStoreProvider));
273        #[cfg(feature = "gcp")]
274        providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider));
275        #[cfg(feature = "oss")]
276        providers.insert("oss".into(), Arc::new(oss::OssStoreProvider));
277        #[cfg(feature = "huggingface")]
278        providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider));
279        Self {
280            providers: RwLock::new(providers),
281            active_stores: RwLock::new(HashMap::new()),
282        }
283    }
284}
285
286impl ObjectStoreRegistry {
287    /// Add a new object store provider to the registry. The provider will be used
288    /// in [`Self::get_store()`] when a URL is passed with a matching scheme.
289    pub fn insert(&self, scheme: &str, provider: Arc<dyn ObjectStoreProvider>) {
290        self.providers
291            .write()
292            .expect("ObjectStoreRegistry lock poisoned")
293            .insert(scheme.into(), provider);
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[derive(Debug)]
302    struct DummyProvider;
303
304    #[async_trait::async_trait]
305    impl ObjectStoreProvider for DummyProvider {
306        async fn new_store(
307            &self,
308            _base_path: Url,
309            _params: &ObjectStoreParams,
310        ) -> Result<ObjectStore> {
311            unreachable!("This test doesn't create stores")
312        }
313    }
314
315    #[test]
316    fn test_calculate_object_store_prefix() {
317        let provider = DummyProvider;
318        let url = Url::parse("dummy://blah/path").unwrap();
319        assert_eq!(
320            "dummy$blah",
321            provider.calculate_object_store_prefix(&url, None).unwrap()
322        );
323    }
324
325    #[test]
326    fn test_calculate_object_store_scheme_not_found() {
327        let registry = ObjectStoreRegistry::empty();
328        registry.insert("dummy", Arc::new(DummyProvider));
329        let s = "Invalid user input: No object store provider found for scheme: 'dummy2'\nValid schemes: dummy";
330        let result = registry
331            .calculate_object_store_prefix("dummy2://mybucket/my/long/path", None)
332            .expect_err("expected error")
333            .to_string();
334        assert_eq!(s, &result[..s.len()]);
335    }
336
337    // Test that paths without a scheme get treated as local paths.
338    #[test]
339    fn test_calculate_object_store_prefix_for_local() {
340        let registry = ObjectStoreRegistry::empty();
341        assert_eq!(
342            "file",
343            registry
344                .calculate_object_store_prefix("/tmp/foobar", None)
345                .unwrap()
346        );
347    }
348
349    // Test that paths with a single-letter scheme that is not registered for anything get treated as local paths.
350    #[test]
351    fn test_calculate_object_store_prefix_for_local_windows_path() {
352        let registry = ObjectStoreRegistry::empty();
353        assert_eq!(
354            "file",
355            registry
356                .calculate_object_store_prefix("c://dos/path", None)
357                .unwrap()
358        );
359    }
360
361    // Test that paths with a given scheme get mapped to that storage provider.
362    #[test]
363    fn test_calculate_object_store_prefix_for_dummy_path() {
364        let registry = ObjectStoreRegistry::empty();
365        registry.insert("dummy", Arc::new(DummyProvider));
366        assert_eq!(
367            "dummy$mybucket",
368            registry
369                .calculate_object_store_prefix("dummy://mybucket/my/long/path", None)
370                .unwrap()
371        );
372    }
373}