Skip to main content

lance_io/object_store/
providers.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    sync::{
7        atomic::{AtomicU64, Ordering},
8        Arc, RwLock, Weak,
9    },
10};
11
12use object_store::path::Path;
13use snafu::location;
14use url::Url;
15
16use crate::object_store::uri_to_url;
17use crate::object_store::WrappingObjectStore;
18
19use super::{tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams};
20use lance_core::error::{Error, LanceOptionExt, Result};
21
22#[cfg(feature = "aws")]
23pub mod aws;
24#[cfg(feature = "azure")]
25pub mod azure;
26#[cfg(feature = "gcp")]
27pub mod gcp;
28#[cfg(feature = "huggingface")]
29pub mod huggingface;
30pub mod local;
31pub mod memory;
32#[cfg(feature = "oss")]
33pub mod oss;
34
35#[async_trait::async_trait]
36pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
37    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore>;
38
39    /// Extract the path relative to the base of the store.
40    ///
41    /// For example, in S3 the path is relative to the bucket. So a URL of
42    /// `s3://bucket/path/to/file` would return `path/to/file`.
43    ///
44    /// Meanwhile, for a file store, the path is relative to the filesystem root.
45    /// So a URL of `file:///path/to/file` would return `/path/to/file`.
46    fn extract_path(&self, url: &Url) -> Result<Path> {
47        Path::parse(url.path()).map_err(|_| {
48            Error::invalid_input(format!("Invalid path in URL: {}", url.path()), location!())
49        })
50    }
51
52    /// Calculate the unique prefix that should be used for this object store.
53    ///
54    /// For object stores that don't have the concept of buckets, this will just be something like
55    /// 'file' or 'memory'.
56    ///
57    /// In object stores where all bucket names are unique, like s3, this will be
58    /// simply 's3$my_bucket_name' or similar.
59    ///
60    /// In Azure, only the combination of (account name, container name) is unique, so
61    /// this will be something like 'az$account_name@container'
62    ///
63    /// Providers should override this if they have special requirements like Azure's.
64    fn calculate_object_store_prefix(
65        &self,
66        url: &Url,
67        _storage_options: Option<&HashMap<String, String>>,
68    ) -> Result<String> {
69        Ok(format!("{}${}", url.scheme(), url.authority()))
70    }
71}
72
73/// Statistics for the object store registry cache.
74#[derive(Debug, Clone, Default)]
75pub struct ObjectStoreRegistryStats {
76    /// Number of cache hits (store was already cached and reused).
77    pub hits: u64,
78    /// Number of cache misses (new store had to be created).
79    pub misses: u64,
80    /// Number of currently active object stores in the cache.
81    pub active_stores: usize,
82}
83
84/// A registry of object store providers.
85///
86/// Use [`Self::default()`] to create one with the available default providers.
87/// This includes (depending on features enabled):
88/// - `memory`: An in-memory object store.
89/// - `file`: A local file object store, with optimized code paths.
90/// - `file-object-store`: A local file object store that uses the ObjectStore API,
91///   for all operations. Used for testing with ObjectStore wrappers.
92/// - `s3`: An S3 object store.
93/// - `s3+ddb`: An S3 object store with DynamoDB for metadata.
94/// - `az`: An Azure Blob Storage object store.
95/// - `gs`: A Google Cloud Storage object store.
96///
97/// Use [`Self::empty()`] to create an empty registry, with no providers registered.
98///
99/// The registry also caches object stores that are currently in use. It holds
100/// weak references to the object stores, so they are not held onto. If an object
101/// store is no longer in use, it will be removed from the cache on the next
102/// call to either [`Self::active_stores()`] or [`Self::get_store()`].
103#[derive(Debug)]
104pub struct ObjectStoreRegistry {
105    providers: RwLock<HashMap<String, Arc<dyn ObjectStoreProvider>>>,
106    // Cache of object stores currently in use. We use a weak reference so the
107    // cache itself doesn't keep them alive if no object store is actually using
108    // it.
109    active_stores: RwLock<HashMap<(String, ObjectStoreParams), Weak<ObjectStore>>>,
110    // Cache statistics
111    hits: AtomicU64,
112    misses: AtomicU64,
113}
114
115impl ObjectStoreRegistry {
116    /// Create a new registry with no providers registered.
117    ///
118    /// Typically, you want to use [`Self::default()`] instead, so you get the
119    /// default providers.
120    pub fn empty() -> Self {
121        Self {
122            providers: RwLock::new(HashMap::new()),
123            active_stores: RwLock::new(HashMap::new()),
124            hits: AtomicU64::new(0),
125            misses: AtomicU64::new(0),
126        }
127    }
128
129    /// Get the object store provider for a given scheme.
130    pub fn get_provider(&self, scheme: &str) -> Option<Arc<dyn ObjectStoreProvider>> {
131        self.providers
132            .read()
133            .expect("ObjectStoreRegistry lock poisoned")
134            .get(scheme)
135            .cloned()
136    }
137
138    /// Get a list of all active object stores.
139    ///
140    /// Calling this will also clean up any weak references to object stores that
141    /// are no longer valid.
142    pub fn active_stores(&self) -> Vec<Arc<ObjectStore>> {
143        let mut found_inactive = false;
144        let output = self
145            .active_stores
146            .read()
147            .expect("ObjectStoreRegistry lock poisoned")
148            .values()
149            .filter_map(|weak| match weak.upgrade() {
150                Some(store) => Some(store),
151                None => {
152                    found_inactive = true;
153                    None
154                }
155            })
156            .collect();
157
158        if found_inactive {
159            // Clean up the cache by removing any weak references that are no longer valid
160            let mut cache_lock = self
161                .active_stores
162                .write()
163                .expect("ObjectStoreRegistry lock poisoned");
164            cache_lock.retain(|_, weak| weak.upgrade().is_some());
165        }
166        output
167    }
168
169    /// Get cache statistics for monitoring and debugging.
170    ///
171    /// Returns the number of cache hits, misses, and currently active stores.
172    /// This is useful for detecting configuration issues that cause excessive
173    /// cache misses (e.g., storage options that vary per-request).
174    pub fn stats(&self) -> ObjectStoreRegistryStats {
175        let active_stores = self
176            .active_stores
177            .read()
178            .map(|s| s.values().filter(|w| w.strong_count() > 0).count())
179            .unwrap_or(0);
180        ObjectStoreRegistryStats {
181            hits: self.hits.load(Ordering::Relaxed),
182            misses: self.misses.load(Ordering::Relaxed),
183            active_stores,
184        }
185    }
186
187    fn scheme_not_found_error(&self, scheme: &str) -> Error {
188        let mut message = format!("No object store provider found for scheme: '{}'", scheme);
189        if let Ok(providers) = self.providers.read() {
190            let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
191            message.push_str(&format!("\nValid schemes: {}", valid_schemes));
192        }
193        Error::invalid_input(message, location!())
194    }
195
196    /// Get an object store for a given base path and parameters.
197    ///
198    /// If the object store is already in use, it will return a strong reference
199    /// to the object store. If the object store is not in use, it will create a
200    /// new object store and return a strong reference to it.
201    pub async fn get_store(
202        &self,
203        base_path: Url,
204        params: &ObjectStoreParams,
205    ) -> Result<Arc<ObjectStore>> {
206        let scheme = base_path.scheme();
207        let Some(provider) = self.get_provider(scheme) else {
208            return Err(self.scheme_not_found_error(scheme));
209        };
210
211        let cache_path =
212            provider.calculate_object_store_prefix(&base_path, params.storage_options())?;
213        let cache_key = (cache_path.clone(), params.clone());
214
215        // Check if we have a cached store for this base path and params
216        {
217            let maybe_store = self
218                .active_stores
219                .read()
220                .ok()
221                .expect_ok()?
222                .get(&cache_key)
223                .cloned();
224            if let Some(store) = maybe_store {
225                if let Some(store) = store.upgrade() {
226                    self.hits.fetch_add(1, Ordering::Relaxed);
227                    return Ok(store);
228                } else {
229                    // Remove the weak reference if it is no longer valid
230                    let mut cache_lock = self
231                        .active_stores
232                        .write()
233                        .expect("ObjectStoreRegistry lock poisoned");
234                    if let Some(store) = cache_lock.get(&cache_key) {
235                        if store.upgrade().is_none() {
236                            // Remove the weak reference if it is no longer valid
237                            cache_lock.remove(&cache_key);
238                        }
239                    }
240                }
241            }
242        }
243
244        self.misses.fetch_add(1, Ordering::Relaxed);
245
246        let mut store = provider.new_store(base_path, params).await?;
247
248        store.inner = store.inner.traced();
249
250        if let Some(wrapper) = &params.object_store_wrapper {
251            store.inner = wrapper.wrap(&cache_path, store.inner);
252        }
253
254        // Always wrap with IO tracking
255        store.inner = store.io_tracker.wrap("", store.inner);
256
257        let store = Arc::new(store);
258
259        {
260            // Insert the store into the cache
261            let mut cache_lock = self.active_stores.write().ok().expect_ok()?;
262            cache_lock.insert(cache_key, Arc::downgrade(&store));
263        }
264
265        Ok(store)
266    }
267
268    /// Calculate the datastore prefix based on the URI and the storage options.
269    /// The data store prefix should uniquely identify the datastore.
270    pub fn calculate_object_store_prefix(
271        &self,
272        uri: &str,
273        storage_options: Option<&HashMap<String, String>>,
274    ) -> Result<String> {
275        let url = uri_to_url(uri)?;
276        match self.get_provider(url.scheme()) {
277            None => {
278                if url.scheme() == "file" || url.scheme().len() == 1 {
279                    Ok("file".to_string())
280                } else {
281                    Err(self.scheme_not_found_error(url.scheme()))
282                }
283            }
284            Some(provider) => provider.calculate_object_store_prefix(&url, storage_options),
285        }
286    }
287}
288
289impl Default for ObjectStoreRegistry {
290    fn default() -> Self {
291        let mut providers: HashMap<String, Arc<dyn ObjectStoreProvider>> = HashMap::new();
292
293        providers.insert("memory".into(), Arc::new(memory::MemoryStoreProvider));
294        providers.insert("file".into(), Arc::new(local::FileStoreProvider));
295        // The "file" scheme has special optimized code paths that bypass
296        // the ObjectStore API for better performance. However, this can make it
297        // hard to test when using ObjectStore wrappers, such as IOTrackingStore.
298        // So we provide a "file-object-store" scheme that uses the ObjectStore API.
299        // The specialized code paths are differentiated by the scheme name.
300        providers.insert(
301            "file-object-store".into(),
302            Arc::new(local::FileStoreProvider),
303        );
304
305        #[cfg(feature = "aws")]
306        {
307            let aws = Arc::new(aws::AwsStoreProvider);
308            providers.insert("s3".into(), aws.clone());
309            providers.insert("s3+ddb".into(), aws);
310        }
311        #[cfg(feature = "azure")]
312        providers.insert("az".into(), Arc::new(azure::AzureBlobStoreProvider));
313        #[cfg(feature = "gcp")]
314        providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider));
315        #[cfg(feature = "oss")]
316        providers.insert("oss".into(), Arc::new(oss::OssStoreProvider));
317        #[cfg(feature = "huggingface")]
318        providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider));
319        Self {
320            providers: RwLock::new(providers),
321            active_stores: RwLock::new(HashMap::new()),
322            hits: AtomicU64::new(0),
323            misses: AtomicU64::new(0),
324        }
325    }
326}
327
328impl ObjectStoreRegistry {
329    /// Add a new object store provider to the registry. The provider will be used
330    /// in [`Self::get_store()`] when a URL is passed with a matching scheme.
331    pub fn insert(&self, scheme: &str, provider: Arc<dyn ObjectStoreProvider>) {
332        self.providers
333            .write()
334            .expect("ObjectStoreRegistry lock poisoned")
335            .insert(scheme.into(), provider);
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use std::collections::HashMap;
342
343    use super::*;
344
345    #[derive(Debug)]
346    struct DummyProvider;
347
348    #[async_trait::async_trait]
349    impl ObjectStoreProvider for DummyProvider {
350        async fn new_store(
351            &self,
352            _base_path: Url,
353            _params: &ObjectStoreParams,
354        ) -> Result<ObjectStore> {
355            unreachable!("This test doesn't create stores")
356        }
357    }
358
359    #[test]
360    fn test_calculate_object_store_prefix() {
361        let provider = DummyProvider;
362        let url = Url::parse("dummy://blah/path").unwrap();
363        assert_eq!(
364            "dummy$blah",
365            provider.calculate_object_store_prefix(&url, None).unwrap()
366        );
367    }
368
369    #[test]
370    fn test_calculate_object_store_scheme_not_found() {
371        let registry = ObjectStoreRegistry::empty();
372        registry.insert("dummy", Arc::new(DummyProvider));
373        let s = "Invalid user input: No object store provider found for scheme: 'dummy2'\nValid schemes: dummy";
374        let result = registry
375            .calculate_object_store_prefix("dummy2://mybucket/my/long/path", None)
376            .expect_err("expected error")
377            .to_string();
378        assert_eq!(s, &result[..s.len()]);
379    }
380
381    // Test that paths without a scheme get treated as local paths.
382    #[test]
383    fn test_calculate_object_store_prefix_for_local() {
384        let registry = ObjectStoreRegistry::empty();
385        assert_eq!(
386            "file",
387            registry
388                .calculate_object_store_prefix("/tmp/foobar", None)
389                .unwrap()
390        );
391    }
392
393    // Test that paths with a single-letter scheme that is not registered for anything get treated as local paths.
394    #[test]
395    fn test_calculate_object_store_prefix_for_local_windows_path() {
396        let registry = ObjectStoreRegistry::empty();
397        assert_eq!(
398            "file",
399            registry
400                .calculate_object_store_prefix("c://dos/path", None)
401                .unwrap()
402        );
403    }
404
405    // Test that paths with a given scheme get mapped to that storage provider.
406    #[test]
407    fn test_calculate_object_store_prefix_for_dummy_path() {
408        let registry = ObjectStoreRegistry::empty();
409        registry.insert("dummy", Arc::new(DummyProvider));
410        assert_eq!(
411            "dummy$mybucket",
412            registry
413                .calculate_object_store_prefix("dummy://mybucket/my/long/path", None)
414                .unwrap()
415        );
416    }
417
418    #[tokio::test]
419    async fn test_stats_hit_miss_tracking() {
420        use crate::object_store::StorageOptionsAccessor;
421        let registry = ObjectStoreRegistry::default();
422        let url = Url::parse("memory://test").unwrap();
423
424        let params1 = ObjectStoreParams::default();
425        let params2 = ObjectStoreParams {
426            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
427                HashMap::from([("k".into(), "v".into())]),
428            ))),
429            ..Default::default()
430        };
431
432        // (hits, misses, active)
433        let cases: &[(&ObjectStoreParams, (u64, u64, usize))] = &[
434            (&params1, (0, 1, 1)), // miss: new params
435            (&params1, (1, 1, 1)), // hit: same params
436            (&params2, (1, 2, 2)), // miss: different storage_options
437        ];
438
439        let mut stores = vec![]; // retain the stores
440        for (params, (hits, misses, active)) in cases {
441            stores.push(registry.get_store(url.clone(), params).await.unwrap());
442            let s = registry.stats();
443            assert_eq!(
444                (s.hits, s.misses, s.active_stores),
445                (*hits, *misses, *active)
446            );
447        }
448
449        // Same params returns same instance
450        assert!(Arc::ptr_eq(&stores[0], &stores[1]));
451    }
452}