Skip to main content

lance_io/object_store/
providers.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    sync::{
7        Arc, RwLock, Weak,
8        atomic::{AtomicU64, Ordering},
9    },
10};
11
12use object_store::path::Path;
13use url::Url;
14
15use crate::object_store::WrappingObjectStore;
16use crate::object_store::uri_to_url;
17
18use super::{ObjectStore, ObjectStoreParams, tracing::ObjectStoreTracingExt};
19use lance_core::error::{Error, LanceOptionExt, Result};
20
21#[cfg(feature = "aws")]
22pub mod aws;
23#[cfg(feature = "azure")]
24pub mod azure;
25#[cfg(feature = "gcp")]
26pub mod gcp;
27#[cfg(feature = "huggingface")]
28pub mod huggingface;
29pub mod local;
30pub mod memory;
31#[cfg(feature = "oss")]
32pub mod oss;
33pub mod shared_memory;
34#[cfg(feature = "tencent")]
35pub mod tencent;
36
37#[async_trait::async_trait]
38pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
39    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore>;
40
41    /// Extract the path relative to the base of the store.
42    ///
43    /// For example, in S3 the path is relative to the bucket. So a URL of
44    /// `s3://bucket/path/to/file` would return `path/to/file`.
45    ///
46    /// Meanwhile, for a file store, the path is relative to the filesystem root.
47    /// So a URL of `file:///path/to/file` would return `/path/to/file`.
48    fn extract_path(&self, url: &Url) -> Result<Path> {
49        Path::parse(url.path())
50            .map_err(|_| Error::invalid_input(format!("Invalid path in URL: {}", url.path())))
51    }
52
53    /// Calculate the unique prefix that should be used for this object store.
54    ///
55    /// For object stores that don't have the concept of buckets, this will just be something like
56    /// 'file' or 'memory'.
57    ///
58    /// In object stores where all bucket names are unique, like s3, this will be
59    /// simply 's3$my_bucket_name' or similar.
60    ///
61    /// In Azure, only the combination of (account name, container name) is unique, so
62    /// this will be something like 'az$account_name@container'
63    ///
64    /// Providers should override this if they have special requirements like Azure's.
65    fn calculate_object_store_prefix(
66        &self,
67        url: &Url,
68        _storage_options: Option<&HashMap<String, String>>,
69    ) -> Result<String> {
70        Ok(format!("{}${}", url.scheme(), url.authority()))
71    }
72}
73
74/// Statistics for the object store registry cache.
75#[derive(Debug, Clone, Default)]
76pub struct ObjectStoreRegistryStats {
77    /// Number of cache hits (store was already cached and reused).
78    pub hits: u64,
79    /// Number of cache misses (new store had to be created).
80    pub misses: u64,
81    /// Number of currently active object stores in the cache.
82    pub active_stores: usize,
83}
84
85/// A registry of object store providers.
86///
87/// Use [`Self::default()`] to create one with the available default providers.
88/// This includes (depending on features enabled):
89/// - `memory`: An in-memory object store.
90/// - `file`: A local file object store, with optimized code paths.
91/// - `file-object-store`: A local file object store that uses the ObjectStore API,
92///   for all operations. Used for testing with ObjectStore wrappers.
93/// - `file+uring`: A local file object store using io_uring (Linux only).
94/// - `s3`: An S3 object store.
95/// - `s3+ddb`: An S3 object store with DynamoDB for metadata.
96/// - `az`: An Azure Blob Storage object store.
97/// - `gs`: A Google Cloud Storage object store.
98///
99/// Use [`Self::empty()`] to create an empty registry, with no providers registered.
100///
101/// The registry also caches object stores that are currently in use. It holds
102/// weak references to the object stores, so they are not held onto. If an object
103/// store is no longer in use, it will be removed from the cache on the next
104/// call to either [`Self::active_stores()`] or [`Self::get_store()`].
105#[derive(Debug)]
106pub struct ObjectStoreRegistry {
107    providers: RwLock<HashMap<String, Arc<dyn ObjectStoreProvider>>>,
108    // Cache of object stores currently in use. We use a weak reference so the
109    // cache itself doesn't keep them alive if no object store is actually using
110    // it.
111    active_stores: RwLock<HashMap<(String, ObjectStoreParams), Weak<ObjectStore>>>,
112    // Cache statistics
113    hits: AtomicU64,
114    misses: AtomicU64,
115}
116
117impl ObjectStoreRegistry {
118    /// Create a new registry with no providers registered.
119    ///
120    /// Typically, you want to use [`Self::default()`] instead, so you get the
121    /// default providers.
122    pub fn empty() -> Self {
123        Self {
124            providers: RwLock::new(HashMap::new()),
125            active_stores: RwLock::new(HashMap::new()),
126            hits: AtomicU64::new(0),
127            misses: AtomicU64::new(0),
128        }
129    }
130
131    /// Get the object store provider for a given scheme.
132    pub fn get_provider(&self, scheme: &str) -> Option<Arc<dyn ObjectStoreProvider>> {
133        self.providers
134            .read()
135            .expect("ObjectStoreRegistry lock poisoned")
136            .get(scheme)
137            .cloned()
138    }
139
140    /// Get a list of all active object stores.
141    ///
142    /// Calling this will also clean up any weak references to object stores that
143    /// are no longer valid.
144    pub fn active_stores(&self) -> Vec<Arc<ObjectStore>> {
145        let mut found_inactive = false;
146        let output = self
147            .active_stores
148            .read()
149            .expect("ObjectStoreRegistry lock poisoned")
150            .values()
151            .filter_map(|weak| match weak.upgrade() {
152                Some(store) => Some(store),
153                None => {
154                    found_inactive = true;
155                    None
156                }
157            })
158            .collect();
159
160        if found_inactive {
161            // Clean up the cache by removing any weak references that are no longer valid
162            let mut cache_lock = self
163                .active_stores
164                .write()
165                .expect("ObjectStoreRegistry lock poisoned");
166            cache_lock.retain(|_, weak| weak.upgrade().is_some());
167        }
168        output
169    }
170
171    /// Get cache statistics for monitoring and debugging.
172    ///
173    /// Returns the number of cache hits, misses, and currently active stores.
174    /// This is useful for detecting configuration issues that cause excessive
175    /// cache misses (e.g., storage options that vary per-request).
176    pub fn stats(&self) -> ObjectStoreRegistryStats {
177        let active_stores = self
178            .active_stores
179            .read()
180            .map(|s| s.values().filter(|w| w.strong_count() > 0).count())
181            .unwrap_or(0);
182        ObjectStoreRegistryStats {
183            hits: self.hits.load(Ordering::Relaxed),
184            misses: self.misses.load(Ordering::Relaxed),
185            active_stores,
186        }
187    }
188
189    fn scheme_not_found_error(&self, scheme: &str) -> Error {
190        let mut message = format!("No object store provider found for scheme: '{}'", scheme);
191        if let Ok(providers) = self.providers.read() {
192            let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
193            message.push_str(&format!("\nValid schemes: {}", valid_schemes));
194        }
195        Error::invalid_input(message)
196    }
197
198    /// Get an object store for a given base path and parameters.
199    ///
200    /// If the object store is already in use, it will return a strong reference
201    /// to the object store. If the object store is not in use, it will create a
202    /// new object store and return a strong reference to it.
203    pub async fn get_store(
204        &self,
205        base_path: Url,
206        params: &ObjectStoreParams,
207    ) -> Result<Arc<ObjectStore>> {
208        let scheme = base_path.scheme();
209        let Some(provider) = self.get_provider(scheme) else {
210            return Err(self.scheme_not_found_error(scheme));
211        };
212
213        let cache_path =
214            provider.calculate_object_store_prefix(&base_path, params.storage_options())?;
215        let cache_key = (cache_path.clone(), params.clone());
216
217        // Check if we have a cached store for this base path and params
218        {
219            let maybe_store = self
220                .active_stores
221                .read()
222                .ok()
223                .expect_ok()?
224                .get(&cache_key)
225                .cloned();
226            if let Some(store) = maybe_store {
227                if let Some(store) = store.upgrade() {
228                    self.hits.fetch_add(1, Ordering::Relaxed);
229                    return Ok(store);
230                } else {
231                    // Remove the weak reference if it is no longer valid
232                    let mut cache_lock = self
233                        .active_stores
234                        .write()
235                        .expect("ObjectStoreRegistry lock poisoned");
236                    if let Some(store) = cache_lock.get(&cache_key)
237                        && store.upgrade().is_none()
238                    {
239                        // Remove the weak reference if it is no longer valid
240                        cache_lock.remove(&cache_key);
241                    }
242                }
243            }
244        }
245
246        self.misses.fetch_add(1, Ordering::Relaxed);
247
248        let mut store = provider.new_store(base_path, params).await?;
249
250        store.inner = store.inner.traced();
251
252        if let Some(wrapper) = &params.object_store_wrapper {
253            store.inner = wrapper.wrap(&cache_path, store.inner);
254        }
255
256        // Always wrap with IO tracking
257        store.inner = store.io_tracker.wrap("", store.inner);
258
259        let store = Arc::new(store);
260
261        {
262            // Insert the store into the cache
263            let mut cache_lock = self.active_stores.write().ok().expect_ok()?;
264            cache_lock.insert(cache_key, Arc::downgrade(&store));
265        }
266
267        Ok(store)
268    }
269
270    /// Calculate the datastore prefix based on the URI and the storage options.
271    /// The data store prefix should uniquely identify the datastore.
272    pub fn calculate_object_store_prefix(
273        &self,
274        uri: &str,
275        storage_options: Option<&HashMap<String, String>>,
276    ) -> Result<String> {
277        let url = uri_to_url(uri)?;
278        match self.get_provider(url.scheme()) {
279            None => {
280                if url.scheme() == "file" || url.scheme().len() == 1 {
281                    Ok("file".to_string())
282                } else {
283                    Err(self.scheme_not_found_error(url.scheme()))
284                }
285            }
286            Some(provider) => provider.calculate_object_store_prefix(&url, storage_options),
287        }
288    }
289}
290
291impl Default for ObjectStoreRegistry {
292    fn default() -> Self {
293        let mut providers: HashMap<String, Arc<dyn ObjectStoreProvider>> = HashMap::new();
294
295        providers.insert("memory".into(), Arc::new(memory::MemoryStoreProvider));
296        providers.insert(
297            "shared-memory".into(),
298            Arc::new(shared_memory::SharedMemoryStoreProvider::default()),
299        );
300        providers.insert("file".into(), Arc::new(local::FileStoreProvider));
301        // The "file" scheme has special optimized code paths that bypass
302        // the ObjectStore API for better performance. However, this can make it
303        // hard to test when using ObjectStore wrappers, such as IOTrackingStore.
304        // So we provide a "file-object-store" scheme that uses the ObjectStore API.
305        // The specialized code paths are differentiated by the scheme name.
306        providers.insert(
307            "file-object-store".into(),
308            Arc::new(local::FileStoreProvider),
309        );
310        #[cfg(target_os = "linux")]
311        providers.insert("file+uring".into(), Arc::new(local::FileStoreProvider));
312
313        #[cfg(feature = "aws")]
314        {
315            let aws = Arc::new(aws::AwsStoreProvider);
316            providers.insert("s3".into(), aws.clone());
317            providers.insert("s3+ddb".into(), aws);
318        }
319        #[cfg(feature = "azure")]
320        {
321            let azure = Arc::new(azure::AzureBlobStoreProvider);
322            providers.insert("az".into(), azure.clone());
323            providers.insert("abfss".into(), azure);
324        }
325        #[cfg(feature = "gcp")]
326        providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider));
327        #[cfg(feature = "oss")]
328        providers.insert("oss".into(), Arc::new(oss::OssStoreProvider));
329        #[cfg(feature = "tencent")]
330        providers.insert("cos".into(), Arc::new(tencent::TencentStoreProvider));
331        #[cfg(feature = "huggingface")]
332        providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider));
333        Self {
334            providers: RwLock::new(providers),
335            active_stores: RwLock::new(HashMap::new()),
336            hits: AtomicU64::new(0),
337            misses: AtomicU64::new(0),
338        }
339    }
340}
341
342impl ObjectStoreRegistry {
343    /// Add a new object store provider to the registry. The provider will be used
344    /// in [`Self::get_store()`] when a URL is passed with a matching scheme.
345    pub fn insert(&self, scheme: &str, provider: Arc<dyn ObjectStoreProvider>) {
346        self.providers
347            .write()
348            .expect("ObjectStoreRegistry lock poisoned")
349            .insert(scheme.into(), provider);
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use std::collections::HashMap;
356
357    use super::*;
358
359    #[derive(Debug)]
360    struct DummyProvider;
361
362    #[async_trait::async_trait]
363    impl ObjectStoreProvider for DummyProvider {
364        async fn new_store(
365            &self,
366            _base_path: Url,
367            _params: &ObjectStoreParams,
368        ) -> Result<ObjectStore> {
369            unreachable!("This test doesn't create stores")
370        }
371    }
372
373    #[test]
374    fn test_calculate_object_store_prefix() {
375        let provider = DummyProvider;
376        let url = Url::parse("dummy://blah/path").unwrap();
377        assert_eq!(
378            "dummy$blah",
379            provider.calculate_object_store_prefix(&url, None).unwrap()
380        );
381    }
382
383    #[test]
384    fn test_calculate_object_store_scheme_not_found() {
385        let registry = ObjectStoreRegistry::empty();
386        registry.insert("dummy", Arc::new(DummyProvider));
387        let s = "Invalid user input: No object store provider found for scheme: 'dummy2'\nValid schemes: dummy";
388        let result = registry
389            .calculate_object_store_prefix("dummy2://mybucket/my/long/path", None)
390            .expect_err("expected error")
391            .to_string();
392        assert_eq!(s, &result[..s.len()]);
393    }
394
395    // Test that paths without a scheme get treated as local paths.
396    #[test]
397    fn test_calculate_object_store_prefix_for_local() {
398        let registry = ObjectStoreRegistry::empty();
399        assert_eq!(
400            "file",
401            registry
402                .calculate_object_store_prefix("/tmp/foobar", None)
403                .unwrap()
404        );
405    }
406
407    // Test that paths with a single-letter scheme that is not registered for anything get treated as local paths.
408    #[test]
409    fn test_calculate_object_store_prefix_for_local_windows_path() {
410        let registry = ObjectStoreRegistry::empty();
411        assert_eq!(
412            "file",
413            registry
414                .calculate_object_store_prefix("c://dos/path", None)
415                .unwrap()
416        );
417    }
418
419    // Test that paths with a given scheme get mapped to that storage provider.
420    #[test]
421    fn test_calculate_object_store_prefix_for_dummy_path() {
422        let registry = ObjectStoreRegistry::empty();
423        registry.insert("dummy", Arc::new(DummyProvider));
424        assert_eq!(
425            "dummy$mybucket",
426            registry
427                .calculate_object_store_prefix("dummy://mybucket/my/long/path", None)
428                .unwrap()
429        );
430    }
431
432    #[tokio::test]
433    async fn test_stats_hit_miss_tracking() {
434        use crate::object_store::StorageOptionsAccessor;
435        let registry = ObjectStoreRegistry::default();
436        let url = Url::parse("memory://test").unwrap();
437
438        let params1 = ObjectStoreParams::default();
439        let params2 = ObjectStoreParams {
440            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
441                HashMap::from([("k".into(), "v".into())]),
442            ))),
443            ..Default::default()
444        };
445
446        // (hits, misses, active)
447        let cases: &[(&ObjectStoreParams, (u64, u64, usize))] = &[
448            (&params1, (0, 1, 1)), // miss: new params
449            (&params1, (1, 1, 1)), // hit: same params
450            (&params2, (1, 2, 2)), // miss: different storage_options
451        ];
452
453        let mut stores = vec![]; // retain the stores
454        for (params, (hits, misses, active)) in cases {
455            stores.push(registry.get_store(url.clone(), params).await.unwrap());
456            let s = registry.stats();
457            assert_eq!(
458                (s.hits, s.misses, s.active_stores),
459                (*hits, *misses, *active)
460            );
461        }
462
463        // Same params returns same instance
464        assert!(Arc::ptr_eq(&stores[0], &stores[1]));
465    }
466}