lance_io/object_store/
providers.rs1use std::{
5 collections::HashMap,
6 sync::{Arc, RwLock, Weak},
7};
8
9use object_store::path::Path;
10use snafu::location;
11use url::Url;
12
13use crate::object_store::uri_to_url;
14use crate::object_store::WrappingObjectStore;
15
16use super::{tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams};
17use lance_core::error::{Error, LanceOptionExt, Result};
18
19#[cfg(feature = "aws")]
20pub mod aws;
21#[cfg(feature = "azure")]
22pub mod azure;
23#[cfg(feature = "gcp")]
24pub mod gcp;
25#[cfg(feature = "huggingface")]
26pub mod huggingface;
27pub mod local;
28pub mod memory;
29#[cfg(feature = "oss")]
30pub mod oss;
31
32#[async_trait::async_trait]
33pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
34 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore>;
35
36 fn extract_path(&self, url: &Url) -> Result<Path> {
44 Path::parse(url.path()).map_err(|_| {
45 Error::invalid_input(format!("Invalid path in URL: {}", url.path()), location!())
46 })
47 }
48
49 fn calculate_object_store_prefix(
62 &self,
63 url: &Url,
64 _storage_options: Option<&HashMap<String, String>>,
65 ) -> Result<String> {
66 Ok(format!("{}${}", url.scheme(), url.authority()))
67 }
68}
69
70#[derive(Debug)]
90pub struct ObjectStoreRegistry {
91 providers: RwLock<HashMap<String, Arc<dyn ObjectStoreProvider>>>,
92 active_stores: RwLock<HashMap<(String, ObjectStoreParams), Weak<ObjectStore>>>,
96}
97
98impl ObjectStoreRegistry {
99 pub fn empty() -> Self {
104 Self {
105 providers: RwLock::new(HashMap::new()),
106 active_stores: RwLock::new(HashMap::new()),
107 }
108 }
109
110 pub fn get_provider(&self, scheme: &str) -> Option<Arc<dyn ObjectStoreProvider>> {
112 self.providers
113 .read()
114 .expect("ObjectStoreRegistry lock poisoned")
115 .get(scheme)
116 .cloned()
117 }
118
119 pub fn active_stores(&self) -> Vec<Arc<ObjectStore>> {
124 let mut found_inactive = false;
125 let output = self
126 .active_stores
127 .read()
128 .expect("ObjectStoreRegistry lock poisoned")
129 .values()
130 .filter_map(|weak| match weak.upgrade() {
131 Some(store) => Some(store),
132 None => {
133 found_inactive = true;
134 None
135 }
136 })
137 .collect();
138
139 if found_inactive {
140 let mut cache_lock = self
142 .active_stores
143 .write()
144 .expect("ObjectStoreRegistry lock poisoned");
145 cache_lock.retain(|_, weak| weak.upgrade().is_some());
146 }
147 output
148 }
149
150 fn scheme_not_found_error(&self, scheme: &str) -> Error {
151 let mut message = format!("No object store provider found for scheme: '{}'", scheme);
152 if let Ok(providers) = self.providers.read() {
153 let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
154 message.push_str(&format!("\nValid schemes: {}", valid_schemes));
155 }
156 Error::invalid_input(message, location!())
157 }
158
159 pub async fn get_store(
165 &self,
166 base_path: Url,
167 params: &ObjectStoreParams,
168 ) -> Result<Arc<ObjectStore>> {
169 let scheme = base_path.scheme();
170 let Some(provider) = self.get_provider(scheme) else {
171 return Err(self.scheme_not_found_error(scheme));
172 };
173
174 let cache_path =
175 provider.calculate_object_store_prefix(&base_path, params.storage_options.as_ref())?;
176 let cache_key = (cache_path.clone(), params.clone());
177
178 {
180 let maybe_store = self
181 .active_stores
182 .read()
183 .ok()
184 .expect_ok()?
185 .get(&cache_key)
186 .cloned();
187 if let Some(store) = maybe_store {
188 if let Some(store) = store.upgrade() {
189 return Ok(store);
190 } else {
191 let mut cache_lock = self
193 .active_stores
194 .write()
195 .expect("ObjectStoreRegistry lock poisoned");
196 if let Some(store) = cache_lock.get(&cache_key) {
197 if store.upgrade().is_none() {
198 cache_lock.remove(&cache_key);
200 }
201 }
202 }
203 }
204 }
205
206 let mut store = provider.new_store(base_path, params).await?;
207
208 store.inner = store.inner.traced();
209
210 if let Some(wrapper) = ¶ms.object_store_wrapper {
211 store.inner = wrapper.wrap(&cache_path, store.inner);
212 }
213
214 store.inner = store.io_tracker.wrap("", store.inner);
216
217 let store = Arc::new(store);
218
219 {
220 let mut cache_lock = self.active_stores.write().ok().expect_ok()?;
222 cache_lock.insert(cache_key, Arc::downgrade(&store));
223 }
224
225 Ok(store)
226 }
227
228 pub fn calculate_object_store_prefix(
231 &self,
232 uri: &str,
233 storage_options: Option<&HashMap<String, String>>,
234 ) -> Result<String> {
235 let url = uri_to_url(uri)?;
236 match self.get_provider(url.scheme()) {
237 None => {
238 if url.scheme() == "file" || url.scheme().len() == 1 {
239 Ok("file".to_string())
240 } else {
241 Err(self.scheme_not_found_error(url.scheme()))
242 }
243 }
244 Some(provider) => provider.calculate_object_store_prefix(&url, storage_options),
245 }
246 }
247}
248
249impl Default for ObjectStoreRegistry {
250 fn default() -> Self {
251 let mut providers: HashMap<String, Arc<dyn ObjectStoreProvider>> = HashMap::new();
252
253 providers.insert("memory".into(), Arc::new(memory::MemoryStoreProvider));
254 providers.insert("file".into(), Arc::new(local::FileStoreProvider));
255 providers.insert(
261 "file-object-store".into(),
262 Arc::new(local::FileStoreProvider),
263 );
264
265 #[cfg(feature = "aws")]
266 {
267 let aws = Arc::new(aws::AwsStoreProvider);
268 providers.insert("s3".into(), aws.clone());
269 providers.insert("s3+ddb".into(), aws);
270 }
271 #[cfg(feature = "azure")]
272 providers.insert("az".into(), Arc::new(azure::AzureBlobStoreProvider));
273 #[cfg(feature = "gcp")]
274 providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider));
275 #[cfg(feature = "oss")]
276 providers.insert("oss".into(), Arc::new(oss::OssStoreProvider));
277 #[cfg(feature = "huggingface")]
278 providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider));
279 Self {
280 providers: RwLock::new(providers),
281 active_stores: RwLock::new(HashMap::new()),
282 }
283 }
284}
285
286impl ObjectStoreRegistry {
287 pub fn insert(&self, scheme: &str, provider: Arc<dyn ObjectStoreProvider>) {
290 self.providers
291 .write()
292 .expect("ObjectStoreRegistry lock poisoned")
293 .insert(scheme.into(), provider);
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 #[derive(Debug)]
302 struct DummyProvider;
303
304 #[async_trait::async_trait]
305 impl ObjectStoreProvider for DummyProvider {
306 async fn new_store(
307 &self,
308 _base_path: Url,
309 _params: &ObjectStoreParams,
310 ) -> Result<ObjectStore> {
311 unreachable!("This test doesn't create stores")
312 }
313 }
314
315 #[test]
316 fn test_calculate_object_store_prefix() {
317 let provider = DummyProvider;
318 let url = Url::parse("dummy://blah/path").unwrap();
319 assert_eq!(
320 "dummy$blah",
321 provider.calculate_object_store_prefix(&url, None).unwrap()
322 );
323 }
324
325 #[test]
326 fn test_calculate_object_store_scheme_not_found() {
327 let registry = ObjectStoreRegistry::empty();
328 registry.insert("dummy", Arc::new(DummyProvider));
329 let s = "Invalid user input: No object store provider found for scheme: 'dummy2'\nValid schemes: dummy";
330 let result = registry
331 .calculate_object_store_prefix("dummy2://mybucket/my/long/path", None)
332 .expect_err("expected error")
333 .to_string();
334 assert_eq!(s, &result[..s.len()]);
335 }
336
337 #[test]
339 fn test_calculate_object_store_prefix_for_local() {
340 let registry = ObjectStoreRegistry::empty();
341 assert_eq!(
342 "file",
343 registry
344 .calculate_object_store_prefix("/tmp/foobar", None)
345 .unwrap()
346 );
347 }
348
349 #[test]
351 fn test_calculate_object_store_prefix_for_local_windows_path() {
352 let registry = ObjectStoreRegistry::empty();
353 assert_eq!(
354 "file",
355 registry
356 .calculate_object_store_prefix("c://dos/path", None)
357 .unwrap()
358 );
359 }
360
361 #[test]
363 fn test_calculate_object_store_prefix_for_dummy_path() {
364 let registry = ObjectStoreRegistry::empty();
365 registry.insert("dummy", Arc::new(DummyProvider));
366 assert_eq!(
367 "dummy$mybucket",
368 registry
369 .calculate_object_store_prefix("dummy://mybucket/my/long/path", None)
370 .unwrap()
371 );
372 }
373}