Skip to main content

lance_io/object_store/
providers.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    sync::{
7        Arc, RwLock, Weak,
8        atomic::{AtomicU64, Ordering},
9    },
10};
11
12use object_store::path::Path;
13use url::Url;
14
15use crate::object_store::WrappingObjectStore;
16use crate::object_store::uri_to_url;
17
18use super::{ObjectStore, ObjectStoreParams, tracing::ObjectStoreTracingExt};
19use lance_core::error::{Error, LanceOptionExt, Result};
20
21#[cfg(feature = "aws")]
22pub mod aws;
23#[cfg(feature = "azure")]
24pub mod azure;
25#[cfg(feature = "gcp")]
26pub mod gcp;
27#[cfg(feature = "goosefs")]
28pub mod goosefs;
29#[cfg(feature = "huggingface")]
30pub mod huggingface;
31pub mod local;
32pub mod memory;
33#[cfg(feature = "oss")]
34pub mod oss;
35pub mod shared_memory;
36#[cfg(feature = "tencent")]
37pub mod tencent;
38#[cfg(feature = "tos")]
39pub mod tos;
40
41#[async_trait::async_trait]
42pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
43    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore>;
44
45    /// Extract the path relative to the base of the store.
46    ///
47    /// For example, in S3 the path is relative to the bucket. So a URL of
48    /// `s3://bucket/path/to/file` would return `path/to/file`.
49    ///
50    /// Meanwhile, for a file store, the path is relative to the filesystem root.
51    /// So a URL of `file:///path/to/file` would return `/path/to/file`.
52    fn extract_path(&self, url: &Url) -> Result<Path> {
53        Path::parse(url.path())
54            .map_err(|_| Error::invalid_input(format!("Invalid path in URL: {}", url.path())))
55    }
56
57    /// Calculate the unique prefix that should be used for this object store.
58    ///
59    /// For object stores that don't have the concept of buckets, this will just be something like
60    /// 'file' or 'memory'.
61    ///
62    /// In object stores where all bucket names are unique, like s3, this will be
63    /// simply 's3$my_bucket_name' or similar.
64    ///
65    /// In Azure, only the combination of (account name, container name) is unique, so
66    /// this will be something like 'az$account_name@container'
67    ///
68    /// Providers should override this if they have special requirements like Azure's.
69    fn calculate_object_store_prefix(
70        &self,
71        url: &Url,
72        _storage_options: Option<&HashMap<String, String>>,
73    ) -> Result<String> {
74        Ok(format!("{}${}", url.scheme(), url.authority()))
75    }
76}
77
78/// Statistics for the object store registry cache.
79#[derive(Debug, Clone, Default)]
80pub struct ObjectStoreRegistryStats {
81    /// Number of cache hits (store was already cached and reused).
82    pub hits: u64,
83    /// Number of cache misses (new store had to be created).
84    pub misses: u64,
85    /// Number of currently active object stores in the cache.
86    pub active_stores: usize,
87}
88
89/// A registry of object store providers.
90///
91/// Use [`Self::default()`] to create one with the available default providers.
92/// This includes (depending on features enabled):
93/// - `memory`: An in-memory object store.
94/// - `file`: A local file object store, with optimized code paths.
95/// - `file-object-store`: A local file object store that uses the ObjectStore API,
96///   for all operations. Used for testing with ObjectStore wrappers.
97/// - `file+uring`: A local file object store using io_uring (Linux only).
98/// - `s3`: An S3 object store.
99/// - `s3+ddb`: An S3 object store with DynamoDB for metadata.
100/// - `az`: An Azure Blob Storage object store.
101/// - `gs`: A Google Cloud Storage object store.
102/// - `tos`: A Volcengine TOS object store.
103///
104/// Use [`Self::empty()`] to create an empty registry, with no providers registered.
105///
106/// The registry also caches object stores that are currently in use. It holds
107/// weak references to the object stores, so they are not held onto. If an object
108/// store is no longer in use, it will be removed from the cache on the next
109/// call to either [`Self::active_stores()`] or [`Self::get_store()`].
110#[derive(Debug)]
111pub struct ObjectStoreRegistry {
112    providers: RwLock<HashMap<String, Arc<dyn ObjectStoreProvider>>>,
113    // Cache of object stores currently in use. We use a weak reference so the
114    // cache itself doesn't keep them alive if no object store is actually using
115    // it.
116    active_stores: RwLock<HashMap<(String, ObjectStoreParams), Weak<ObjectStore>>>,
117    // Cache statistics
118    hits: AtomicU64,
119    misses: AtomicU64,
120}
121
122impl ObjectStoreRegistry {
123    /// Create a new registry with no providers registered.
124    ///
125    /// Typically, you want to use [`Self::default()`] instead, so you get the
126    /// default providers.
127    pub fn empty() -> Self {
128        Self {
129            providers: RwLock::new(HashMap::new()),
130            active_stores: RwLock::new(HashMap::new()),
131            hits: AtomicU64::new(0),
132            misses: AtomicU64::new(0),
133        }
134    }
135
136    /// Get the object store provider for a given scheme.
137    pub fn get_provider(&self, scheme: &str) -> Option<Arc<dyn ObjectStoreProvider>> {
138        self.providers
139            .read()
140            .expect("ObjectStoreRegistry lock poisoned")
141            .get(scheme)
142            .cloned()
143    }
144
145    /// Get a list of all active object stores.
146    ///
147    /// Calling this will also clean up any weak references to object stores that
148    /// are no longer valid.
149    pub fn active_stores(&self) -> Vec<Arc<ObjectStore>> {
150        let mut found_inactive = false;
151        let output = self
152            .active_stores
153            .read()
154            .expect("ObjectStoreRegistry lock poisoned")
155            .values()
156            .filter_map(|weak| match weak.upgrade() {
157                Some(store) => Some(store),
158                None => {
159                    found_inactive = true;
160                    None
161                }
162            })
163            .collect();
164
165        if found_inactive {
166            // Clean up the cache by removing any weak references that are no longer valid
167            let mut cache_lock = self
168                .active_stores
169                .write()
170                .expect("ObjectStoreRegistry lock poisoned");
171            cache_lock.retain(|_, weak| weak.upgrade().is_some());
172        }
173        output
174    }
175
176    /// Get cache statistics for monitoring and debugging.
177    ///
178    /// Returns the number of cache hits, misses, and currently active stores.
179    /// This is useful for detecting configuration issues that cause excessive
180    /// cache misses (e.g., storage options that vary per-request).
181    pub fn stats(&self) -> ObjectStoreRegistryStats {
182        let active_stores = self
183            .active_stores
184            .read()
185            .map(|s| s.values().filter(|w| w.strong_count() > 0).count())
186            .unwrap_or(0);
187        ObjectStoreRegistryStats {
188            hits: self.hits.load(Ordering::Relaxed),
189            misses: self.misses.load(Ordering::Relaxed),
190            active_stores,
191        }
192    }
193
194    fn scheme_not_found_error(&self, scheme: &str) -> Error {
195        let mut message = format!("No object store provider found for scheme: '{}'", scheme);
196        if let Ok(providers) = self.providers.read() {
197            let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
198            message.push_str(&format!("\nValid schemes: {}", valid_schemes));
199        }
200        Error::invalid_input(message)
201    }
202
203    /// Get an object store for a given base path and parameters.
204    ///
205    /// If the object store is already in use, it will return a strong reference
206    /// to the object store. If the object store is not in use, it will create a
207    /// new object store and return a strong reference to it.
208    pub async fn get_store(
209        &self,
210        base_path: Url,
211        params: &ObjectStoreParams,
212    ) -> Result<Arc<ObjectStore>> {
213        let scheme = base_path.scheme();
214        let Some(provider) = self.get_provider(scheme) else {
215            return Err(self.scheme_not_found_error(scheme));
216        };
217
218        let cache_path =
219            provider.calculate_object_store_prefix(&base_path, params.storage_options())?;
220        let cache_key = (cache_path.clone(), params.clone());
221
222        // Check if we have a cached store for this base path and params
223        {
224            let maybe_store = self
225                .active_stores
226                .read()
227                .ok()
228                .expect_ok()?
229                .get(&cache_key)
230                .cloned();
231            if let Some(store) = maybe_store {
232                if let Some(store) = store.upgrade() {
233                    self.hits.fetch_add(1, Ordering::Relaxed);
234                    return Ok(store);
235                } else {
236                    // Remove the weak reference if it is no longer valid
237                    let mut cache_lock = self
238                        .active_stores
239                        .write()
240                        .expect("ObjectStoreRegistry lock poisoned");
241                    if let Some(store) = cache_lock.get(&cache_key)
242                        && store.upgrade().is_none()
243                    {
244                        // Remove the weak reference if it is no longer valid
245                        cache_lock.remove(&cache_key);
246                    }
247                }
248            }
249        }
250
251        self.misses.fetch_add(1, Ordering::Relaxed);
252
253        let mut store = provider.new_store(base_path, params).await?;
254
255        store.inner = store.inner.traced();
256
257        if let Some(wrapper) = &params.object_store_wrapper {
258            store.inner = wrapper.wrap(&cache_path, store.inner);
259        }
260
261        // Always wrap with IO tracking
262        store.inner = store.io_tracker.wrap("", store.inner);
263
264        let store = Arc::new(store);
265
266        {
267            // Insert the store into the cache
268            let mut cache_lock = self.active_stores.write().ok().expect_ok()?;
269            cache_lock.insert(cache_key, Arc::downgrade(&store));
270        }
271
272        Ok(store)
273    }
274
275    /// Calculate the datastore prefix based on the URI and the storage options.
276    /// The data store prefix should uniquely identify the datastore.
277    pub fn calculate_object_store_prefix(
278        &self,
279        uri: &str,
280        storage_options: Option<&HashMap<String, String>>,
281    ) -> Result<String> {
282        let url = uri_to_url(uri)?;
283        match self.get_provider(url.scheme()) {
284            None => {
285                if url.scheme() == "file" || url.scheme().len() == 1 {
286                    Ok("file".to_string())
287                } else {
288                    Err(self.scheme_not_found_error(url.scheme()))
289                }
290            }
291            Some(provider) => provider.calculate_object_store_prefix(&url, storage_options),
292        }
293    }
294}
295
296impl Default for ObjectStoreRegistry {
297    fn default() -> Self {
298        let mut providers: HashMap<String, Arc<dyn ObjectStoreProvider>> = HashMap::new();
299
300        providers.insert("memory".into(), Arc::new(memory::MemoryStoreProvider));
301        providers.insert(
302            "shared-memory".into(),
303            Arc::new(shared_memory::SharedMemoryStoreProvider::default()),
304        );
305        providers.insert("file".into(), Arc::new(local::FileStoreProvider));
306        // The "file" scheme has special optimized code paths that bypass
307        // the ObjectStore API for better performance. However, this can make it
308        // hard to test when using ObjectStore wrappers, such as IOTrackingStore.
309        // So we provide a "file-object-store" scheme that uses the ObjectStore API.
310        // The specialized code paths are differentiated by the scheme name.
311        providers.insert(
312            "file-object-store".into(),
313            Arc::new(local::FileStoreProvider),
314        );
315        #[cfg(target_os = "linux")]
316        providers.insert("file+uring".into(), Arc::new(local::FileStoreProvider));
317
318        #[cfg(feature = "aws")]
319        {
320            let aws = Arc::new(aws::AwsStoreProvider);
321            providers.insert("s3".into(), aws.clone());
322            providers.insert("s3+ddb".into(), aws);
323        }
324        #[cfg(feature = "azure")]
325        {
326            let azure = Arc::new(azure::AzureBlobStoreProvider);
327            providers.insert("az".into(), azure.clone());
328            providers.insert("abfss".into(), azure);
329        }
330        #[cfg(feature = "gcp")]
331        providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider));
332        #[cfg(feature = "goosefs")]
333        providers.insert("goosefs".into(), Arc::new(goosefs::GooseFsStoreProvider));
334        #[cfg(feature = "oss")]
335        providers.insert("oss".into(), Arc::new(oss::OssStoreProvider));
336        #[cfg(feature = "tencent")]
337        providers.insert("cos".into(), Arc::new(tencent::TencentStoreProvider));
338        #[cfg(feature = "huggingface")]
339        providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider));
340        #[cfg(feature = "tos")]
341        providers.insert("tos".into(), Arc::new(tos::TosStoreProvider));
342        Self {
343            providers: RwLock::new(providers),
344            active_stores: RwLock::new(HashMap::new()),
345            hits: AtomicU64::new(0),
346            misses: AtomicU64::new(0),
347        }
348    }
349}
350
351impl ObjectStoreRegistry {
352    /// Add a new object store provider to the registry. The provider will be used
353    /// in [`Self::get_store()`] when a URL is passed with a matching scheme.
354    pub fn insert(&self, scheme: &str, provider: Arc<dyn ObjectStoreProvider>) {
355        self.providers
356            .write()
357            .expect("ObjectStoreRegistry lock poisoned")
358            .insert(scheme.into(), provider);
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use std::collections::HashMap;
365
366    use super::*;
367
368    #[derive(Debug)]
369    struct DummyProvider;
370
371    #[async_trait::async_trait]
372    impl ObjectStoreProvider for DummyProvider {
373        async fn new_store(
374            &self,
375            _base_path: Url,
376            _params: &ObjectStoreParams,
377        ) -> Result<ObjectStore> {
378            unreachable!("This test doesn't create stores")
379        }
380    }
381
382    #[test]
383    fn test_calculate_object_store_prefix() {
384        let provider = DummyProvider;
385        let url = Url::parse("dummy://blah/path").unwrap();
386        assert_eq!(
387            "dummy$blah",
388            provider.calculate_object_store_prefix(&url, None).unwrap()
389        );
390    }
391
392    #[test]
393    fn test_calculate_object_store_scheme_not_found() {
394        let registry = ObjectStoreRegistry::empty();
395        registry.insert("dummy", Arc::new(DummyProvider));
396        let s = "Invalid user input: No object store provider found for scheme: 'dummy2'\nValid schemes: dummy";
397        let result = registry
398            .calculate_object_store_prefix("dummy2://mybucket/my/long/path", None)
399            .expect_err("expected error")
400            .to_string();
401        assert_eq!(s, &result[..s.len()]);
402    }
403
404    // Test that paths without a scheme get treated as local paths.
405    #[test]
406    fn test_calculate_object_store_prefix_for_local() {
407        let registry = ObjectStoreRegistry::empty();
408        assert_eq!(
409            "file",
410            registry
411                .calculate_object_store_prefix("/tmp/foobar", None)
412                .unwrap()
413        );
414    }
415
416    // Test that paths with a single-letter scheme that is not registered for anything get treated as local paths.
417    #[test]
418    fn test_calculate_object_store_prefix_for_local_windows_path() {
419        let registry = ObjectStoreRegistry::empty();
420        assert_eq!(
421            "file",
422            registry
423                .calculate_object_store_prefix("c://dos/path", None)
424                .unwrap()
425        );
426    }
427
428    // Test that paths with a given scheme get mapped to that storage provider.
429    #[test]
430    fn test_calculate_object_store_prefix_for_dummy_path() {
431        let registry = ObjectStoreRegistry::empty();
432        registry.insert("dummy", Arc::new(DummyProvider));
433        assert_eq!(
434            "dummy$mybucket",
435            registry
436                .calculate_object_store_prefix("dummy://mybucket/my/long/path", None)
437                .unwrap()
438        );
439    }
440
441    #[tokio::test]
442    async fn test_stats_hit_miss_tracking() {
443        use crate::object_store::StorageOptionsAccessor;
444        let registry = ObjectStoreRegistry::default();
445        let url = Url::parse("memory://test").unwrap();
446
447        let params1 = ObjectStoreParams::default();
448        let params2 = ObjectStoreParams {
449            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
450                HashMap::from([("k".into(), "v".into())]),
451            ))),
452            ..Default::default()
453        };
454
455        // (hits, misses, active)
456        let cases: &[(&ObjectStoreParams, (u64, u64, usize))] = &[
457            (&params1, (0, 1, 1)), // miss: new params
458            (&params1, (1, 1, 1)), // hit: same params
459            (&params2, (1, 2, 2)), // miss: different storage_options
460        ];
461
462        let mut stores = vec![]; // retain the stores
463        for (params, (hits, misses, active)) in cases {
464            stores.push(registry.get_store(url.clone(), params).await.unwrap());
465            let s = registry.stats();
466            assert_eq!(
467                (s.hits, s.misses, s.active_stores),
468                (*hits, *misses, *active)
469            );
470        }
471
472        // Same params returns same instance
473        assert!(Arc::ptr_eq(&stores[0], &stores[1]));
474    }
475}