1use std::{
5 collections::HashMap,
6 sync::{
7 Arc, RwLock, Weak,
8 atomic::{AtomicU64, Ordering},
9 },
10};
11
12use object_store::path::Path;
13use url::Url;
14
15use crate::object_store::WrappingObjectStore;
16use crate::object_store::uri_to_url;
17
18use super::{ObjectStore, ObjectStoreParams, tracing::ObjectStoreTracingExt};
19use lance_core::error::{Error, LanceOptionExt, Result};
20
21#[cfg(feature = "aws")]
22pub mod aws;
23#[cfg(feature = "azure")]
24pub mod azure;
25#[cfg(feature = "gcp")]
26pub mod gcp;
27#[cfg(feature = "goosefs")]
28pub mod goosefs;
29#[cfg(feature = "huggingface")]
30pub mod huggingface;
31pub mod local;
32pub mod memory;
33#[cfg(feature = "oss")]
34pub mod oss;
35pub mod shared_memory;
36#[cfg(feature = "tencent")]
37pub mod tencent;
38#[cfg(feature = "tos")]
39pub mod tos;
40
41#[async_trait::async_trait]
42pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send {
43 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore>;
44
45 fn extract_path(&self, url: &Url) -> Result<Path> {
53 Path::parse(url.path())
54 .map_err(|_| Error::invalid_input(format!("Invalid path in URL: {}", url.path())))
55 }
56
57 fn calculate_object_store_prefix(
70 &self,
71 url: &Url,
72 _storage_options: Option<&HashMap<String, String>>,
73 ) -> Result<String> {
74 Ok(format!("{}${}", url.scheme(), url.authority()))
75 }
76}
77
78#[derive(Debug, Clone, Default)]
80pub struct ObjectStoreRegistryStats {
81 pub hits: u64,
83 pub misses: u64,
85 pub active_stores: usize,
87}
88
89#[derive(Debug)]
111pub struct ObjectStoreRegistry {
112 providers: RwLock<HashMap<String, Arc<dyn ObjectStoreProvider>>>,
113 active_stores: RwLock<HashMap<(String, ObjectStoreParams), Weak<ObjectStore>>>,
117 hits: AtomicU64,
119 misses: AtomicU64,
120}
121
122impl ObjectStoreRegistry {
123 pub fn empty() -> Self {
128 Self {
129 providers: RwLock::new(HashMap::new()),
130 active_stores: RwLock::new(HashMap::new()),
131 hits: AtomicU64::new(0),
132 misses: AtomicU64::new(0),
133 }
134 }
135
136 pub fn get_provider(&self, scheme: &str) -> Option<Arc<dyn ObjectStoreProvider>> {
138 self.providers
139 .read()
140 .expect("ObjectStoreRegistry lock poisoned")
141 .get(scheme)
142 .cloned()
143 }
144
145 pub fn active_stores(&self) -> Vec<Arc<ObjectStore>> {
150 let mut found_inactive = false;
151 let output = self
152 .active_stores
153 .read()
154 .expect("ObjectStoreRegistry lock poisoned")
155 .values()
156 .filter_map(|weak| match weak.upgrade() {
157 Some(store) => Some(store),
158 None => {
159 found_inactive = true;
160 None
161 }
162 })
163 .collect();
164
165 if found_inactive {
166 let mut cache_lock = self
168 .active_stores
169 .write()
170 .expect("ObjectStoreRegistry lock poisoned");
171 cache_lock.retain(|_, weak| weak.upgrade().is_some());
172 }
173 output
174 }
175
176 pub fn stats(&self) -> ObjectStoreRegistryStats {
182 let active_stores = self
183 .active_stores
184 .read()
185 .map(|s| s.values().filter(|w| w.strong_count() > 0).count())
186 .unwrap_or(0);
187 ObjectStoreRegistryStats {
188 hits: self.hits.load(Ordering::Relaxed),
189 misses: self.misses.load(Ordering::Relaxed),
190 active_stores,
191 }
192 }
193
194 fn scheme_not_found_error(&self, scheme: &str) -> Error {
195 let mut message = format!("No object store provider found for scheme: '{}'", scheme);
196 if let Ok(providers) = self.providers.read() {
197 let valid_schemes = providers.keys().cloned().collect::<Vec<_>>().join(", ");
198 message.push_str(&format!("\nValid schemes: {}", valid_schemes));
199 }
200 Error::invalid_input(message)
201 }
202
203 pub async fn get_store(
209 &self,
210 base_path: Url,
211 params: &ObjectStoreParams,
212 ) -> Result<Arc<ObjectStore>> {
213 let scheme = base_path.scheme();
214 let Some(provider) = self.get_provider(scheme) else {
215 return Err(self.scheme_not_found_error(scheme));
216 };
217
218 let cache_path =
219 provider.calculate_object_store_prefix(&base_path, params.storage_options())?;
220 let cache_key = (cache_path.clone(), params.clone());
221
222 {
224 let maybe_store = self
225 .active_stores
226 .read()
227 .ok()
228 .expect_ok()?
229 .get(&cache_key)
230 .cloned();
231 if let Some(store) = maybe_store {
232 if let Some(store) = store.upgrade() {
233 self.hits.fetch_add(1, Ordering::Relaxed);
234 return Ok(store);
235 } else {
236 let mut cache_lock = self
238 .active_stores
239 .write()
240 .expect("ObjectStoreRegistry lock poisoned");
241 if let Some(store) = cache_lock.get(&cache_key)
242 && store.upgrade().is_none()
243 {
244 cache_lock.remove(&cache_key);
246 }
247 }
248 }
249 }
250
251 self.misses.fetch_add(1, Ordering::Relaxed);
252
253 let mut store = provider.new_store(base_path, params).await?;
254
255 store.inner = store.inner.traced();
256
257 if let Some(wrapper) = ¶ms.object_store_wrapper {
258 store.inner = wrapper.wrap(&cache_path, store.inner);
259 }
260
261 store.inner = store.io_tracker.wrap("", store.inner);
263
264 let store = Arc::new(store);
265
266 {
267 let mut cache_lock = self.active_stores.write().ok().expect_ok()?;
269 cache_lock.insert(cache_key, Arc::downgrade(&store));
270 }
271
272 Ok(store)
273 }
274
275 pub fn calculate_object_store_prefix(
278 &self,
279 uri: &str,
280 storage_options: Option<&HashMap<String, String>>,
281 ) -> Result<String> {
282 let url = uri_to_url(uri)?;
283 match self.get_provider(url.scheme()) {
284 None => {
285 if url.scheme() == "file" || url.scheme().len() == 1 {
286 Ok("file".to_string())
287 } else {
288 Err(self.scheme_not_found_error(url.scheme()))
289 }
290 }
291 Some(provider) => provider.calculate_object_store_prefix(&url, storage_options),
292 }
293 }
294}
295
296impl Default for ObjectStoreRegistry {
297 fn default() -> Self {
298 let mut providers: HashMap<String, Arc<dyn ObjectStoreProvider>> = HashMap::new();
299
300 providers.insert("memory".into(), Arc::new(memory::MemoryStoreProvider));
301 providers.insert(
302 "shared-memory".into(),
303 Arc::new(shared_memory::SharedMemoryStoreProvider::default()),
304 );
305 providers.insert("file".into(), Arc::new(local::FileStoreProvider));
306 providers.insert(
312 "file-object-store".into(),
313 Arc::new(local::FileStoreProvider),
314 );
315 #[cfg(target_os = "linux")]
316 providers.insert("file+uring".into(), Arc::new(local::FileStoreProvider));
317
318 #[cfg(feature = "aws")]
319 {
320 let aws = Arc::new(aws::AwsStoreProvider);
321 providers.insert("s3".into(), aws.clone());
322 providers.insert("s3+ddb".into(), aws);
323 }
324 #[cfg(feature = "azure")]
325 {
326 let azure = Arc::new(azure::AzureBlobStoreProvider);
327 providers.insert("az".into(), azure.clone());
328 providers.insert("abfss".into(), azure);
329 }
330 #[cfg(feature = "gcp")]
331 providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider));
332 #[cfg(feature = "goosefs")]
333 providers.insert("goosefs".into(), Arc::new(goosefs::GooseFsStoreProvider));
334 #[cfg(feature = "oss")]
335 providers.insert("oss".into(), Arc::new(oss::OssStoreProvider));
336 #[cfg(feature = "tencent")]
337 providers.insert("cos".into(), Arc::new(tencent::TencentStoreProvider));
338 #[cfg(feature = "huggingface")]
339 providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider));
340 #[cfg(feature = "tos")]
341 providers.insert("tos".into(), Arc::new(tos::TosStoreProvider));
342 Self {
343 providers: RwLock::new(providers),
344 active_stores: RwLock::new(HashMap::new()),
345 hits: AtomicU64::new(0),
346 misses: AtomicU64::new(0),
347 }
348 }
349}
350
351impl ObjectStoreRegistry {
352 pub fn insert(&self, scheme: &str, provider: Arc<dyn ObjectStoreProvider>) {
355 self.providers
356 .write()
357 .expect("ObjectStoreRegistry lock poisoned")
358 .insert(scheme.into(), provider);
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use std::collections::HashMap;
365
366 use super::*;
367
368 #[derive(Debug)]
369 struct DummyProvider;
370
371 #[async_trait::async_trait]
372 impl ObjectStoreProvider for DummyProvider {
373 async fn new_store(
374 &self,
375 _base_path: Url,
376 _params: &ObjectStoreParams,
377 ) -> Result<ObjectStore> {
378 unreachable!("This test doesn't create stores")
379 }
380 }
381
382 #[test]
383 fn test_calculate_object_store_prefix() {
384 let provider = DummyProvider;
385 let url = Url::parse("dummy://blah/path").unwrap();
386 assert_eq!(
387 "dummy$blah",
388 provider.calculate_object_store_prefix(&url, None).unwrap()
389 );
390 }
391
392 #[test]
393 fn test_calculate_object_store_scheme_not_found() {
394 let registry = ObjectStoreRegistry::empty();
395 registry.insert("dummy", Arc::new(DummyProvider));
396 let s = "Invalid user input: No object store provider found for scheme: 'dummy2'\nValid schemes: dummy";
397 let result = registry
398 .calculate_object_store_prefix("dummy2://mybucket/my/long/path", None)
399 .expect_err("expected error")
400 .to_string();
401 assert_eq!(s, &result[..s.len()]);
402 }
403
404 #[test]
406 fn test_calculate_object_store_prefix_for_local() {
407 let registry = ObjectStoreRegistry::empty();
408 assert_eq!(
409 "file",
410 registry
411 .calculate_object_store_prefix("/tmp/foobar", None)
412 .unwrap()
413 );
414 }
415
416 #[test]
418 fn test_calculate_object_store_prefix_for_local_windows_path() {
419 let registry = ObjectStoreRegistry::empty();
420 assert_eq!(
421 "file",
422 registry
423 .calculate_object_store_prefix("c://dos/path", None)
424 .unwrap()
425 );
426 }
427
428 #[test]
430 fn test_calculate_object_store_prefix_for_dummy_path() {
431 let registry = ObjectStoreRegistry::empty();
432 registry.insert("dummy", Arc::new(DummyProvider));
433 assert_eq!(
434 "dummy$mybucket",
435 registry
436 .calculate_object_store_prefix("dummy://mybucket/my/long/path", None)
437 .unwrap()
438 );
439 }
440
441 #[tokio::test]
442 async fn test_stats_hit_miss_tracking() {
443 use crate::object_store::StorageOptionsAccessor;
444 let registry = ObjectStoreRegistry::default();
445 let url = Url::parse("memory://test").unwrap();
446
447 let params1 = ObjectStoreParams::default();
448 let params2 = ObjectStoreParams {
449 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
450 HashMap::from([("k".into(), "v".into())]),
451 ))),
452 ..Default::default()
453 };
454
455 let cases: &[(&ObjectStoreParams, (u64, u64, usize))] = &[
457 (¶ms1, (0, 1, 1)), (¶ms1, (1, 1, 1)), (¶ms2, (1, 2, 2)), ];
461
462 let mut stores = vec![]; for (params, (hits, misses, active)) in cases {
464 stores.push(registry.get_store(url.clone(), params).await.unwrap());
465 let s = registry.stats();
466 assert_eq!(
467 (s.hits, s.misses, s.active_stores),
468 (*hits, *misses, *active)
469 );
470 }
471
472 assert!(Arc::ptr_eq(&stores[0], &stores[1]));
474 }
475}