Skip to main content

lance_io/object_store/
providers.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    sync::{
7        Arc, RwLock, Weak,
8        atomic::{AtomicU64, Ordering},
9    },
10};
11
12use object_store::path::Path;
13use url::Url;
14
15use crate::object_store::WrappingObjectStore;
16use crate::object_store::uri_to_url;
17
18use super::{ObjectStore, ObjectStoreParams, tracing::ObjectStoreTracingExt};
19use lance_core::error::{Error, LanceOptionExt, Result};
20
21#[cfg(feature = "aws")]
22pub mod aws;
23#[cfg(feature = "azure")]
24pub mod azure;
25#[cfg(feature = "gcp")]
26pub mod gcp;
27#[cfg(feature = "huggingface")]
28pub mod huggingface;
29pub mod local;
30pub mod memory;
31#[cfg(feature = "oss")]
32pub mod oss;
33#[cfg(feature = "tencent")]
34pub mod tencent;
35
36#[async_trait::async_trait]
37pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
38    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore>;
39
40    /// Extract the path relative to the base of the store.
41    ///
42    /// For example, in S3 the path is relative to the bucket. So a URL of
43    /// `s3://bucket/path/to/file` would return `path/to/file`.
44    ///
45    /// Meanwhile, for a file store, the path is relative to the filesystem root.
46    /// So a URL of `file:///path/to/file` would return `/path/to/file`.
47    fn extract_path(&self, url: &Url) -> Result<Path> {
48        Path::parse(url.path())
49            .map_err(|_| Error::invalid_input(format!("Invalid path in URL: {}", url.path())))
50    }
51
52    /// Calculate the unique prefix that should be used for this object store.
53    ///
54    /// For object stores that don't have the concept of buckets, this will just be something like
55    /// 'file' or 'memory'.
56    ///
57    /// In object stores where all bucket names are unique, like s3, this will be
58    /// simply 's3$my_bucket_name' or similar.
59    ///
60    /// In Azure, only the combination of (account name, container name) is unique, so
61    /// this will be something like 'az$account_name@container'
62    ///
63    /// Providers should override this if they have special requirements like Azure's.
64    fn calculate_object_store_prefix(
65        &self,
66        url: &Url,
67        _storage_options: Option<&HashMap<String, String>>,
68    ) -> Result<String> {
69        Ok(format!("{}${}", url.scheme(), url.authority()))
70    }
71}
72
73/// Statistics for the object store registry cache.
74#[derive(Debug, Clone, Default)]
75pub struct ObjectStoreRegistryStats {
76    /// Number of cache hits (store was already cached and reused).
77    pub hits: u64,
78    /// Number of cache misses (new store had to be created).
79    pub misses: u64,
80    /// Number of currently active object stores in the cache.
81    pub active_stores: usize,
82}
83
84/// A registry of object store providers.
85///
86/// Use [`Self::default()`] to create one with the available default providers.
87/// This includes (depending on features enabled):
88/// - `memory`: An in-memory object store.
89/// - `file`: A local file object store, with optimized code paths.
90/// - `file-object-store`: A local file object store that uses the ObjectStore API,
91///   for all operations. Used for testing with ObjectStore wrappers.
92/// - `file+uring`: A local file object store using io_uring (Linux only).
93/// - `s3`: An S3 object store.
94/// - `s3+ddb`: An S3 object store with DynamoDB for metadata.
95/// - `az`: An Azure Blob Storage object store.
96/// - `gs`: A Google Cloud Storage object store.
97///
98/// Use [`Self::empty()`] to create an empty registry, with no providers registered.
99///
100/// The registry also caches object stores that are currently in use. It holds
101/// weak references to the object stores, so they are not held onto. If an object
102/// store is no longer in use, it will be removed from the cache on the next
103/// call to either [`Self::active_stores()`] or [`Self::get_store()`].
104#[derive(Debug)]
105pub struct ObjectStoreRegistry {
106    providers: RwLock<HashMap<String, Arc<dyn ObjectStoreProvider>>>,
107    // Cache of object stores currently in use. We use a weak reference so the
108    // cache itself doesn't keep them alive if no object store is actually using
109    // it.
110    active_stores: RwLock<HashMap<(String, ObjectStoreParams), Weak<ObjectStore>>>,
111    // Cache statistics
112    hits: AtomicU64,
113    misses: AtomicU64,
114}
115
116impl ObjectStoreRegistry {
117    /// Create a new registry with no providers registered.
118    ///
119    /// Typically, you want to use [`Self::default()`] instead, so you get the
120    /// default providers.
121    pub fn empty() -> Self {
122        Self {
123            providers: RwLock::new(HashMap::new()),
124            active_stores: RwLock::new(HashMap::new()),
125            hits: AtomicU64::new(0),
126            misses: AtomicU64::new(0),
127        }
128    }
129
130    /// Get the object store provider for a given scheme.
131    pub fn get_provider(&self, scheme: &str) -> Option<Arc<dyn ObjectStoreProvider>> {
132        self.providers
133            .read()
134            .expect("ObjectStoreRegistry lock poisoned")
135            .get(scheme)
136            .cloned()
137    }
138
139    /// Get a list of all active object stores.
140    ///
141    /// Calling this will also clean up any weak references to object stores that
142    /// are no longer valid.
143    pub fn active_stores(&self) -> Vec<Arc<ObjectStore>> {
144        let mut found_inactive = false;
145        let output = self
146            .active_stores
147            .read()
148            .expect("ObjectStoreRegistry lock poisoned")
149            .values()
150            .filter_map(|weak| match weak.upgrade() {
151                Some(store) => Some(store),
152                None => {
153                    found_inactive = true;
154                    None
155                }
156            })
157            .collect();
158
159        if found_inactive {
160            // Clean up the cache by removing any weak references that are no longer valid
161            let mut cache_lock = self
162                .active_stores
163                .write()
164                .expect("ObjectStoreRegistry lock poisoned");
165            cache_lock.retain(|_, weak| weak.upgrade().is_some());
166        }
167        output
168    }
169
170    /// Get cache statistics for monitoring and debugging.
171    ///
172    /// Returns the number of cache hits, misses, and currently active stores.
173    /// This is useful for detecting configuration issues that cause excessive
174    /// cache misses (e.g., storage options that vary per-request).
175    pub fn stats(&self) -> ObjectStoreRegistryStats {
176        let active_stores = self
177            .active_stores
178            .read()
179            .map(|s| s.values().filter(|w| w.strong_count() > 0).count())
180            .unwrap_or(0);
181        ObjectStoreRegistryStats {
182            hits: self.hits.load(Ordering::Relaxed),
183            misses: self.misses.load(Ordering::Relaxed),
184            active_stores,
185        }
186    }
187
188    fn scheme_not_found_error(&self, scheme: &str) -> Error {
189        let mut message = format!("No object store provider found for scheme: '{}'", scheme);
190        if let Ok(providers) = self.providers.read() {
191            let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
192            message.push_str(&format!("\nValid schemes: {}", valid_schemes));
193        }
194        Error::invalid_input(message)
195    }
196
197    /// Get an object store for a given base path and parameters.
198    ///
199    /// If the object store is already in use, it will return a strong reference
200    /// to the object store. If the object store is not in use, it will create a
201    /// new object store and return a strong reference to it.
202    pub async fn get_store(
203        &self,
204        base_path: Url,
205        params: &ObjectStoreParams,
206    ) -> Result<Arc<ObjectStore>> {
207        let scheme = base_path.scheme();
208        let Some(provider) = self.get_provider(scheme) else {
209            return Err(self.scheme_not_found_error(scheme));
210        };
211
212        let cache_path =
213            provider.calculate_object_store_prefix(&base_path, params.storage_options())?;
214        let cache_key = (cache_path.clone(), params.clone());
215
216        // Check if we have a cached store for this base path and params
217        {
218            let maybe_store = self
219                .active_stores
220                .read()
221                .ok()
222                .expect_ok()?
223                .get(&cache_key)
224                .cloned();
225            if let Some(store) = maybe_store {
226                if let Some(store) = store.upgrade() {
227                    self.hits.fetch_add(1, Ordering::Relaxed);
228                    return Ok(store);
229                } else {
230                    // Remove the weak reference if it is no longer valid
231                    let mut cache_lock = self
232                        .active_stores
233                        .write()
234                        .expect("ObjectStoreRegistry lock poisoned");
235                    if let Some(store) = cache_lock.get(&cache_key)
236                        && store.upgrade().is_none()
237                    {
238                        // Remove the weak reference if it is no longer valid
239                        cache_lock.remove(&cache_key);
240                    }
241                }
242            }
243        }
244
245        self.misses.fetch_add(1, Ordering::Relaxed);
246
247        let mut store = provider.new_store(base_path, params).await?;
248
249        store.inner = store.inner.traced();
250
251        if let Some(wrapper) = &params.object_store_wrapper {
252            store.inner = wrapper.wrap(&cache_path, store.inner);
253        }
254
255        // Always wrap with IO tracking
256        store.inner = store.io_tracker.wrap("", store.inner);
257
258        let store = Arc::new(store);
259
260        {
261            // Insert the store into the cache
262            let mut cache_lock = self.active_stores.write().ok().expect_ok()?;
263            cache_lock.insert(cache_key, Arc::downgrade(&store));
264        }
265
266        Ok(store)
267    }
268
269    /// Calculate the datastore prefix based on the URI and the storage options.
270    /// The data store prefix should uniquely identify the datastore.
271    pub fn calculate_object_store_prefix(
272        &self,
273        uri: &str,
274        storage_options: Option<&HashMap<String, String>>,
275    ) -> Result<String> {
276        let url = uri_to_url(uri)?;
277        match self.get_provider(url.scheme()) {
278            None => {
279                if url.scheme() == "file" || url.scheme().len() == 1 {
280                    Ok("file".to_string())
281                } else {
282                    Err(self.scheme_not_found_error(url.scheme()))
283                }
284            }
285            Some(provider) => provider.calculate_object_store_prefix(&url, storage_options),
286        }
287    }
288}
289
290impl Default for ObjectStoreRegistry {
291    fn default() -> Self {
292        let mut providers: HashMap<String, Arc<dyn ObjectStoreProvider>> = HashMap::new();
293
294        providers.insert("memory".into(), Arc::new(memory::MemoryStoreProvider));
295        providers.insert("file".into(), Arc::new(local::FileStoreProvider));
296        // The "file" scheme has special optimized code paths that bypass
297        // the ObjectStore API for better performance. However, this can make it
298        // hard to test when using ObjectStore wrappers, such as IOTrackingStore.
299        // So we provide a "file-object-store" scheme that uses the ObjectStore API.
300        // The specialized code paths are differentiated by the scheme name.
301        providers.insert(
302            "file-object-store".into(),
303            Arc::new(local::FileStoreProvider),
304        );
305        #[cfg(target_os = "linux")]
306        providers.insert("file+uring".into(), Arc::new(local::FileStoreProvider));
307
308        #[cfg(feature = "aws")]
309        {
310            let aws = Arc::new(aws::AwsStoreProvider);
311            providers.insert("s3".into(), aws.clone());
312            providers.insert("s3+ddb".into(), aws);
313        }
314        #[cfg(feature = "azure")]
315        {
316            let azure = Arc::new(azure::AzureBlobStoreProvider);
317            providers.insert("az".into(), azure.clone());
318            providers.insert("abfss".into(), azure);
319        }
320        #[cfg(feature = "gcp")]
321        providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider));
322        #[cfg(feature = "oss")]
323        providers.insert("oss".into(), Arc::new(oss::OssStoreProvider));
324        #[cfg(feature = "tencent")]
325        providers.insert("cos".into(), Arc::new(tencent::TencentStoreProvider));
326        #[cfg(feature = "huggingface")]
327        providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider));
328        Self {
329            providers: RwLock::new(providers),
330            active_stores: RwLock::new(HashMap::new()),
331            hits: AtomicU64::new(0),
332            misses: AtomicU64::new(0),
333        }
334    }
335}
336
337impl ObjectStoreRegistry {
338    /// Add a new object store provider to the registry. The provider will be used
339    /// in [`Self::get_store()`] when a URL is passed with a matching scheme.
340    pub fn insert(&self, scheme: &str, provider: Arc<dyn ObjectStoreProvider>) {
341        self.providers
342            .write()
343            .expect("ObjectStoreRegistry lock poisoned")
344            .insert(scheme.into(), provider);
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use std::collections::HashMap;
351
352    use super::*;
353
354    #[derive(Debug)]
355    struct DummyProvider;
356
357    #[async_trait::async_trait]
358    impl ObjectStoreProvider for DummyProvider {
359        async fn new_store(
360            &self,
361            _base_path: Url,
362            _params: &ObjectStoreParams,
363        ) -> Result<ObjectStore> {
364            unreachable!("This test doesn't create stores")
365        }
366    }
367
368    #[test]
369    fn test_calculate_object_store_prefix() {
370        let provider = DummyProvider;
371        let url = Url::parse("dummy://blah/path").unwrap();
372        assert_eq!(
373            "dummy$blah",
374            provider.calculate_object_store_prefix(&url, None).unwrap()
375        );
376    }
377
378    #[test]
379    fn test_calculate_object_store_scheme_not_found() {
380        let registry = ObjectStoreRegistry::empty();
381        registry.insert("dummy", Arc::new(DummyProvider));
382        let s = "Invalid user input: No object store provider found for scheme: 'dummy2'\nValid schemes: dummy";
383        let result = registry
384            .calculate_object_store_prefix("dummy2://mybucket/my/long/path", None)
385            .expect_err("expected error")
386            .to_string();
387        assert_eq!(s, &result[..s.len()]);
388    }
389
390    // Test that paths without a scheme get treated as local paths.
391    #[test]
392    fn test_calculate_object_store_prefix_for_local() {
393        let registry = ObjectStoreRegistry::empty();
394        assert_eq!(
395            "file",
396            registry
397                .calculate_object_store_prefix("/tmp/foobar", None)
398                .unwrap()
399        );
400    }
401
402    // Test that paths with a single-letter scheme that is not registered for anything get treated as local paths.
403    #[test]
404    fn test_calculate_object_store_prefix_for_local_windows_path() {
405        let registry = ObjectStoreRegistry::empty();
406        assert_eq!(
407            "file",
408            registry
409                .calculate_object_store_prefix("c://dos/path", None)
410                .unwrap()
411        );
412    }
413
414    // Test that paths with a given scheme get mapped to that storage provider.
415    #[test]
416    fn test_calculate_object_store_prefix_for_dummy_path() {
417        let registry = ObjectStoreRegistry::empty();
418        registry.insert("dummy", Arc::new(DummyProvider));
419        assert_eq!(
420            "dummy$mybucket",
421            registry
422                .calculate_object_store_prefix("dummy://mybucket/my/long/path", None)
423                .unwrap()
424        );
425    }
426
427    #[tokio::test]
428    async fn test_stats_hit_miss_tracking() {
429        use crate::object_store::StorageOptionsAccessor;
430        let registry = ObjectStoreRegistry::default();
431        let url = Url::parse("memory://test").unwrap();
432
433        let params1 = ObjectStoreParams::default();
434        let params2 = ObjectStoreParams {
435            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
436                HashMap::from([("k".into(), "v".into())]),
437            ))),
438            ..Default::default()
439        };
440
441        // (hits, misses, active)
442        let cases: &[(&ObjectStoreParams, (u64, u64, usize))] = &[
443            (&params1, (0, 1, 1)), // miss: new params
444            (&params1, (1, 1, 1)), // hit: same params
445            (&params2, (1, 2, 2)), // miss: different storage_options
446        ];
447
448        let mut stores = vec![]; // retain the stores
449        for (params, (hits, misses, active)) in cases {
450            stores.push(registry.get_store(url.clone(), params).await.unwrap());
451            let s = registry.stats();
452            assert_eq!(
453                (s.hits, s.misses, s.active_stores),
454                (*hits, *misses, *active)
455            );
456        }
457
458        // Same params returns same instance
459        assert!(Arc::ptr_eq(&stores[0], &stores[1]));
460    }
461}