1use std::{
5 collections::HashMap,
6 sync::{
7 Arc, RwLock, Weak,
8 atomic::{AtomicU64, Ordering},
9 },
10};
11
12use object_store::path::Path;
13use url::Url;
14
15use crate::object_store::WrappingObjectStore;
16use crate::object_store::uri_to_url;
17
18use super::{ObjectStore, ObjectStoreParams, tracing::ObjectStoreTracingExt};
19use lance_core::error::{Error, LanceOptionExt, Result};
20
21#[cfg(feature = "aws")]
22pub mod aws;
23#[cfg(feature = "azure")]
24pub mod azure;
25#[cfg(feature = "gcp")]
26pub mod gcp;
27#[cfg(feature = "huggingface")]
28pub mod huggingface;
29pub mod local;
30pub mod memory;
31#[cfg(feature = "oss")]
32pub mod oss;
33pub mod shared_memory;
34#[cfg(feature = "tencent")]
35pub mod tencent;
36
37#[async_trait::async_trait]
38pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
39 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore>;
40
41 fn extract_path(&self, url: &Url) -> Result<Path> {
49 Path::parse(url.path())
50 .map_err(|_| Error::invalid_input(format!("Invalid path in URL: {}", url.path())))
51 }
52
53 fn calculate_object_store_prefix(
66 &self,
67 url: &Url,
68 _storage_options: Option<&HashMap<String, String>>,
69 ) -> Result<String> {
70 Ok(format!("{}${}", url.scheme(), url.authority()))
71 }
72}
73
74#[derive(Debug, Clone, Default)]
76pub struct ObjectStoreRegistryStats {
77 pub hits: u64,
79 pub misses: u64,
81 pub active_stores: usize,
83}
84
85#[derive(Debug)]
106pub struct ObjectStoreRegistry {
107 providers: RwLock<HashMap<String, Arc<dyn ObjectStoreProvider>>>,
108 active_stores: RwLock<HashMap<(String, ObjectStoreParams), Weak<ObjectStore>>>,
112 hits: AtomicU64,
114 misses: AtomicU64,
115}
116
117impl ObjectStoreRegistry {
118 pub fn empty() -> Self {
123 Self {
124 providers: RwLock::new(HashMap::new()),
125 active_stores: RwLock::new(HashMap::new()),
126 hits: AtomicU64::new(0),
127 misses: AtomicU64::new(0),
128 }
129 }
130
131 pub fn get_provider(&self, scheme: &str) -> Option<Arc<dyn ObjectStoreProvider>> {
133 self.providers
134 .read()
135 .expect("ObjectStoreRegistry lock poisoned")
136 .get(scheme)
137 .cloned()
138 }
139
140 pub fn active_stores(&self) -> Vec<Arc<ObjectStore>> {
145 let mut found_inactive = false;
146 let output = self
147 .active_stores
148 .read()
149 .expect("ObjectStoreRegistry lock poisoned")
150 .values()
151 .filter_map(|weak| match weak.upgrade() {
152 Some(store) => Some(store),
153 None => {
154 found_inactive = true;
155 None
156 }
157 })
158 .collect();
159
160 if found_inactive {
161 let mut cache_lock = self
163 .active_stores
164 .write()
165 .expect("ObjectStoreRegistry lock poisoned");
166 cache_lock.retain(|_, weak| weak.upgrade().is_some());
167 }
168 output
169 }
170
171 pub fn stats(&self) -> ObjectStoreRegistryStats {
177 let active_stores = self
178 .active_stores
179 .read()
180 .map(|s| s.values().filter(|w| w.strong_count() > 0).count())
181 .unwrap_or(0);
182 ObjectStoreRegistryStats {
183 hits: self.hits.load(Ordering::Relaxed),
184 misses: self.misses.load(Ordering::Relaxed),
185 active_stores,
186 }
187 }
188
189 fn scheme_not_found_error(&self, scheme: &str) -> Error {
190 let mut message = format!("No object store provider found for scheme: '{}'", scheme);
191 if let Ok(providers) = self.providers.read() {
192 let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
193 message.push_str(&format!("\nValid schemes: {}", valid_schemes));
194 }
195 Error::invalid_input(message)
196 }
197
198 pub async fn get_store(
204 &self,
205 base_path: Url,
206 params: &ObjectStoreParams,
207 ) -> Result<Arc<ObjectStore>> {
208 let scheme = base_path.scheme();
209 let Some(provider) = self.get_provider(scheme) else {
210 return Err(self.scheme_not_found_error(scheme));
211 };
212
213 let cache_path =
214 provider.calculate_object_store_prefix(&base_path, params.storage_options())?;
215 let cache_key = (cache_path.clone(), params.clone());
216
217 {
219 let maybe_store = self
220 .active_stores
221 .read()
222 .ok()
223 .expect_ok()?
224 .get(&cache_key)
225 .cloned();
226 if let Some(store) = maybe_store {
227 if let Some(store) = store.upgrade() {
228 self.hits.fetch_add(1, Ordering::Relaxed);
229 return Ok(store);
230 } else {
231 let mut cache_lock = self
233 .active_stores
234 .write()
235 .expect("ObjectStoreRegistry lock poisoned");
236 if let Some(store) = cache_lock.get(&cache_key)
237 && store.upgrade().is_none()
238 {
239 cache_lock.remove(&cache_key);
241 }
242 }
243 }
244 }
245
246 self.misses.fetch_add(1, Ordering::Relaxed);
247
248 let mut store = provider.new_store(base_path, params).await?;
249
250 store.inner = store.inner.traced();
251
252 if let Some(wrapper) = ¶ms.object_store_wrapper {
253 store.inner = wrapper.wrap(&cache_path, store.inner);
254 }
255
256 store.inner = store.io_tracker.wrap("", store.inner);
258
259 let store = Arc::new(store);
260
261 {
262 let mut cache_lock = self.active_stores.write().ok().expect_ok()?;
264 cache_lock.insert(cache_key, Arc::downgrade(&store));
265 }
266
267 Ok(store)
268 }
269
270 pub fn calculate_object_store_prefix(
273 &self,
274 uri: &str,
275 storage_options: Option<&HashMap<String, String>>,
276 ) -> Result<String> {
277 let url = uri_to_url(uri)?;
278 match self.get_provider(url.scheme()) {
279 None => {
280 if url.scheme() == "file" || url.scheme().len() == 1 {
281 Ok("file".to_string())
282 } else {
283 Err(self.scheme_not_found_error(url.scheme()))
284 }
285 }
286 Some(provider) => provider.calculate_object_store_prefix(&url, storage_options),
287 }
288 }
289}
290
291impl Default for ObjectStoreRegistry {
292 fn default() -> Self {
293 let mut providers: HashMap<String, Arc<dyn ObjectStoreProvider>> = HashMap::new();
294
295 providers.insert("memory".into(), Arc::new(memory::MemoryStoreProvider));
296 providers.insert(
297 "shared-memory".into(),
298 Arc::new(shared_memory::SharedMemoryStoreProvider::default()),
299 );
300 providers.insert("file".into(), Arc::new(local::FileStoreProvider));
301 providers.insert(
307 "file-object-store".into(),
308 Arc::new(local::FileStoreProvider),
309 );
310 #[cfg(target_os = "linux")]
311 providers.insert("file+uring".into(), Arc::new(local::FileStoreProvider));
312
313 #[cfg(feature = "aws")]
314 {
315 let aws = Arc::new(aws::AwsStoreProvider);
316 providers.insert("s3".into(), aws.clone());
317 providers.insert("s3+ddb".into(), aws);
318 }
319 #[cfg(feature = "azure")]
320 {
321 let azure = Arc::new(azure::AzureBlobStoreProvider);
322 providers.insert("az".into(), azure.clone());
323 providers.insert("abfss".into(), azure);
324 }
325 #[cfg(feature = "gcp")]
326 providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider));
327 #[cfg(feature = "oss")]
328 providers.insert("oss".into(), Arc::new(oss::OssStoreProvider));
329 #[cfg(feature = "tencent")]
330 providers.insert("cos".into(), Arc::new(tencent::TencentStoreProvider));
331 #[cfg(feature = "huggingface")]
332 providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider));
333 Self {
334 providers: RwLock::new(providers),
335 active_stores: RwLock::new(HashMap::new()),
336 hits: AtomicU64::new(0),
337 misses: AtomicU64::new(0),
338 }
339 }
340}
341
342impl ObjectStoreRegistry {
343 pub fn insert(&self, scheme: &str, provider: Arc<dyn ObjectStoreProvider>) {
346 self.providers
347 .write()
348 .expect("ObjectStoreRegistry lock poisoned")
349 .insert(scheme.into(), provider);
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use std::collections::HashMap;
356
357 use super::*;
358
359 #[derive(Debug)]
360 struct DummyProvider;
361
362 #[async_trait::async_trait]
363 impl ObjectStoreProvider for DummyProvider {
364 async fn new_store(
365 &self,
366 _base_path: Url,
367 _params: &ObjectStoreParams,
368 ) -> Result<ObjectStore> {
369 unreachable!("This test doesn't create stores")
370 }
371 }
372
373 #[test]
374 fn test_calculate_object_store_prefix() {
375 let provider = DummyProvider;
376 let url = Url::parse("dummy://blah/path").unwrap();
377 assert_eq!(
378 "dummy$blah",
379 provider.calculate_object_store_prefix(&url, None).unwrap()
380 );
381 }
382
383 #[test]
384 fn test_calculate_object_store_scheme_not_found() {
385 let registry = ObjectStoreRegistry::empty();
386 registry.insert("dummy", Arc::new(DummyProvider));
387 let s = "Invalid user input: No object store provider found for scheme: 'dummy2'\nValid schemes: dummy";
388 let result = registry
389 .calculate_object_store_prefix("dummy2://mybucket/my/long/path", None)
390 .expect_err("expected error")
391 .to_string();
392 assert_eq!(s, &result[..s.len()]);
393 }
394
395 #[test]
397 fn test_calculate_object_store_prefix_for_local() {
398 let registry = ObjectStoreRegistry::empty();
399 assert_eq!(
400 "file",
401 registry
402 .calculate_object_store_prefix("/tmp/foobar", None)
403 .unwrap()
404 );
405 }
406
407 #[test]
409 fn test_calculate_object_store_prefix_for_local_windows_path() {
410 let registry = ObjectStoreRegistry::empty();
411 assert_eq!(
412 "file",
413 registry
414 .calculate_object_store_prefix("c://dos/path", None)
415 .unwrap()
416 );
417 }
418
419 #[test]
421 fn test_calculate_object_store_prefix_for_dummy_path() {
422 let registry = ObjectStoreRegistry::empty();
423 registry.insert("dummy", Arc::new(DummyProvider));
424 assert_eq!(
425 "dummy$mybucket",
426 registry
427 .calculate_object_store_prefix("dummy://mybucket/my/long/path", None)
428 .unwrap()
429 );
430 }
431
432 #[tokio::test]
433 async fn test_stats_hit_miss_tracking() {
434 use crate::object_store::StorageOptionsAccessor;
435 let registry = ObjectStoreRegistry::default();
436 let url = Url::parse("memory://test").unwrap();
437
438 let params1 = ObjectStoreParams::default();
439 let params2 = ObjectStoreParams {
440 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
441 HashMap::from([("k".into(), "v".into())]),
442 ))),
443 ..Default::default()
444 };
445
446 let cases: &[(&ObjectStoreParams, (u64, u64, usize))] = &[
448 (¶ms1, (0, 1, 1)), (¶ms1, (1, 1, 1)), (¶ms2, (1, 2, 2)), ];
452
453 let mut stores = vec![]; for (params, (hits, misses, active)) in cases {
455 stores.push(registry.get_store(url.clone(), params).await.unwrap());
456 let s = registry.stats();
457 assert_eq!(
458 (s.hits, s.misses, s.active_stores),
459 (*hits, *misses, *active)
460 );
461 }
462
463 assert!(Arc::ptr_eq(&stores[0], &stores[1]));
465 }
466}