lance_io/object_store/
providers.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    sync::{Arc, RwLock, Weak},
7};
8
9use object_store::path::Path;
10use snafu::location;
11use url::Url;
12
13use super::{tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams};
14use lance_core::error::{Error, LanceOptionExt, Result};
15
16#[cfg(feature = "aws")]
17pub mod aws;
18#[cfg(feature = "azure")]
19pub mod azure;
20#[cfg(feature = "gcp")]
21pub mod gcp;
22pub mod local;
23pub mod memory;
24#[cfg(feature = "oss")]
25pub mod oss;
26
27#[async_trait::async_trait]
28pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
29    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore>;
30
31    /// Extract the path relative to the base of the store.
32    ///
33    /// For example, in S3 the path is relative to the bucket. So a URL of
34    /// `s3://bucket/path/to/file` would return `path/to/file`.
35    ///
36    /// Meanwhile, for a file store, the path is relative to the filesystem root.
37    /// So a URL of `file:///path/to/file` would return `/path/to/file`.
38    fn extract_path(&self, url: &Url) -> Result<Path> {
39        Path::parse(url.path()).map_err(|_| {
40            Error::invalid_input(format!("Invalid path in URL: {}", url.path()), location!())
41        })
42    }
43}
44
45/// A registry of object store providers.
46///
47/// Use [`Self::default()`] to create one with the available default providers.
48/// This includes (depending on features enabled):
49/// - `memory`: An in-memory object store.
50/// - `file`: A local file object store, with optimized code paths.
51/// - `file-object-store`: A local file object store that uses the ObjectStore API,
52///   for all operations. Used for testing with ObjectStore wrappers.
53/// - `s3`: An S3 object store.
54/// - `s3+ddb`: An S3 object store with DynamoDB for metadata.
55/// - `az`: An Azure Blob Storage object store.
56/// - `gs`: A Google Cloud Storage object store.
57///
58/// Use [`Self::empty()`] to create an empty registry, with no providers registered.
59///
60/// The registry also caches object stores that are currently in use. It holds
61/// weak references to the object stores, so they are not held onto. If an object
62/// store is no longer in use, it will be removed from the cache on the next
63/// call to either [`Self::active_stores()`] or [`Self::get_store()`].
64#[derive(Debug)]
65pub struct ObjectStoreRegistry {
66    providers: RwLock<HashMap<String, Arc<dyn ObjectStoreProvider>>>,
67    // Cache of object stores currently in use. We use a weak reference so the
68    // cache itself doesn't keep them alive if no object store is actually using
69    // it.
70    active_stores: RwLock<HashMap<(String, ObjectStoreParams), Weak<ObjectStore>>>,
71}
72
73/// Convert a URL to a cache key.
74///
75/// We truncate to the first path segment. This should capture
76/// buckets and prefixes. We keep URL params since those might be
77/// important.
78///
79/// * s3://bucket/path?param=value -> s3://bucket/path?param=value
80/// * file:///path/to/file -> file:///
81fn cache_url(url: &Url) -> String {
82    if ["file", "file-object-store", "memory"].contains(&url.scheme()) {
83        // For file URLs, we want to cache the URL without the path.
84        // This is because the path can be different for different
85        // object stores, but we want to cache the object store itself.
86        format!("{}://", url.scheme())
87    } else {
88        // Bucket is parsed as domain, so we just drop the path.
89        let mut url = url.clone();
90        url.set_path("");
91        url.to_string()
92    }
93}
94
95impl ObjectStoreRegistry {
96    /// Create a new registry with no providers registered.
97    ///
98    /// Typically, you want to use [`Self::default()`] instead, so you get the
99    /// default providers.
100    pub fn empty() -> Self {
101        Self {
102            providers: RwLock::new(HashMap::new()),
103            active_stores: RwLock::new(HashMap::new()),
104        }
105    }
106
107    /// Get the object store provider for a given scheme.
108    pub fn get_provider(&self, scheme: &str) -> Option<Arc<dyn ObjectStoreProvider>> {
109        self.providers
110            .read()
111            .expect("ObjectStoreRegistry lock poisoned")
112            .get(scheme)
113            .cloned()
114    }
115
116    /// Get a list of all active object stores.
117    ///
118    /// Calling this will also clean up any weak references to object stores that
119    /// are no longer valid.
120    pub fn active_stores(&self) -> Vec<Arc<ObjectStore>> {
121        let mut found_inactive = false;
122        let output = self
123            .active_stores
124            .read()
125            .expect("ObjectStoreRegistry lock poisoned")
126            .values()
127            .filter_map(|weak| match weak.upgrade() {
128                Some(store) => Some(store),
129                None => {
130                    found_inactive = true;
131                    None
132                }
133            })
134            .collect();
135
136        if found_inactive {
137            // Clean up the cache by removing any weak references that are no longer valid
138            let mut cache_lock = self
139                .active_stores
140                .write()
141                .expect("ObjectStoreRegistry lock poisoned");
142            cache_lock.retain(|_, weak| weak.upgrade().is_some());
143        }
144        output
145    }
146
147    /// Get an object store for a given base path and parameters.
148    ///
149    /// If the object store is already in use, it will return a strong reference
150    /// to the object store. If the object store is not in use, it will create a
151    /// new object store and return a strong reference to it.
152    pub async fn get_store(
153        &self,
154        base_path: Url,
155        params: &ObjectStoreParams,
156    ) -> Result<Arc<ObjectStore>> {
157        let cache_path = cache_url(&base_path);
158        let cache_key = (cache_path, params.clone());
159
160        // Check if we have a cached store for this base path and params
161        {
162            let maybe_store = self
163                .active_stores
164                .read()
165                .ok()
166                .expect_ok()?
167                .get(&cache_key)
168                .cloned();
169            if let Some(store) = maybe_store {
170                if let Some(store) = store.upgrade() {
171                    return Ok(store);
172                } else {
173                    // Remove the weak reference if it is no longer valid
174                    let mut cache_lock = self
175                        .active_stores
176                        .write()
177                        .expect("ObjectStoreRegistry lock poisoned");
178                    if let Some(store) = cache_lock.get(&cache_key) {
179                        if store.upgrade().is_none() {
180                            // Remove the weak reference if it is no longer valid
181                            cache_lock.remove(&cache_key);
182                        }
183                    }
184                }
185            }
186        }
187
188        let scheme = base_path.scheme();
189        let Some(provider) = self.get_provider(scheme) else {
190            let mut message = format!("No object store provider found for scheme: '{}'", scheme);
191            if let Ok(providers) = self.providers.read() {
192                let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
193                message.push_str(&format!("\nValid schemes: {}", valid_schemes));
194            }
195
196            return Err(Error::invalid_input(message, location!()));
197        };
198        let mut store = provider.new_store(base_path, params).await?;
199
200        store.inner = store.inner.traced();
201
202        if let Some(wrapper) = &params.object_store_wrapper {
203            store.inner = wrapper.wrap(store.inner);
204        }
205
206        let store = Arc::new(store);
207
208        {
209            // Insert the store into the cache
210            let mut cache_lock = self.active_stores.write().ok().expect_ok()?;
211            cache_lock.insert(cache_key, Arc::downgrade(&store));
212        }
213
214        Ok(store)
215    }
216}
217
218impl Default for ObjectStoreRegistry {
219    fn default() -> Self {
220        let mut providers: HashMap<String, Arc<dyn ObjectStoreProvider>> = HashMap::new();
221
222        providers.insert("memory".into(), Arc::new(memory::MemoryStoreProvider));
223        providers.insert("file".into(), Arc::new(local::FileStoreProvider));
224        // The "file" scheme has special optimized code paths that bypass
225        // the ObjectStore API for better performance. However, this can make it
226        // hard to test when using ObjectStore wrappers, such as IOTrackingStore.
227        // So we provide a "file-object-store" scheme that uses the ObjectStore API.
228        // The specialized code paths are differentiated by the scheme name.
229        providers.insert(
230            "file-object-store".into(),
231            Arc::new(local::FileStoreProvider),
232        );
233
234        #[cfg(feature = "aws")]
235        {
236            let aws = Arc::new(aws::AwsStoreProvider);
237            providers.insert("s3".into(), aws.clone());
238            providers.insert("s3+ddb".into(), aws);
239        }
240        #[cfg(feature = "azure")]
241        providers.insert("az".into(), Arc::new(azure::AzureBlobStoreProvider));
242        #[cfg(feature = "gcp")]
243        providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider));
244        #[cfg(feature = "oss")]
245        providers.insert("oss".into(), Arc::new(oss::OssStoreProvider));
246        Self {
247            providers: RwLock::new(providers),
248            active_stores: RwLock::new(HashMap::new()),
249        }
250    }
251}
252
253impl ObjectStoreRegistry {
254    /// Add a new object store provider to the registry. The provider will be used
255    /// in [`Self::get_store()`] when a URL is passed with a matching scheme.
256    pub fn insert(&self, scheme: &str, provider: Arc<dyn ObjectStoreProvider>) {
257        self.providers
258            .write()
259            .expect("ObjectStoreRegistry lock poisoned")
260            .insert(scheme.into(), provider);
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[test]
269    fn test_cache_url() {
270        let cases = [
271            ("s3://bucket/path?param=value", "s3://bucket?param=value"),
272            ("file:///path/to/file", "file://"),
273            ("file-object-store:///path/to/file", "file-object-store://"),
274            ("memory:///", "memory://"),
275            (
276                "http://example.com/path?param=value",
277                "http://example.com/?param=value",
278            ),
279        ];
280
281        for (url, expected_cache_url) in cases {
282            let url = Url::parse(url).unwrap();
283            let cache_url = cache_url(&url);
284            assert_eq!(cache_url, expected_cache_url);
285        }
286    }
287}