lance_io/object_store/providers/
shared_memory.rs1use std::{
5 collections::HashMap,
6 sync::{Arc, LazyLock, Mutex},
7};
8
9use crate::object_store::{
10 ObjectStore, ObjectStoreParams, ObjectStoreProvider, providers::memory::MemoryStoreProvider,
11};
12use lance_core::error::Result;
13use object_store::{memory::InMemory, path::Path};
14use url::Url;
15
16static SHARED_BACKENDS: LazyLock<Mutex<HashMap<String, Arc<InMemory>>>> =
22 LazyLock::new(|| Mutex::new(HashMap::new()));
23
24fn shared_backend_for(url: &Url) -> Arc<InMemory> {
25 SHARED_BACKENDS
26 .lock()
27 .expect("SHARED_BACKENDS mutex poisoned")
28 .entry(url.authority().to_string())
29 .or_insert_with(|| Arc::new(InMemory::new()))
30 .clone()
31}
32
33#[derive(Default, Debug)]
46pub struct SharedMemoryStoreProvider {
47 inner: MemoryStoreProvider,
48}
49
50#[async_trait::async_trait]
51impl ObjectStoreProvider for SharedMemoryStoreProvider {
52 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
53 let mut store = self.inner.new_store(base_path.clone(), params).await?;
54 store.inner = shared_backend_for(&base_path);
55 store.scheme = String::from("shared-memory");
56 store.store_prefix = self.calculate_object_store_prefix(&base_path, None)?;
57 Ok(store)
58 }
59
60 fn extract_path(&self, url: &Url) -> Result<Path> {
61 Ok(Path::from(url.path().trim_start_matches('/')))
63 }
64
65 fn calculate_object_store_prefix(
66 &self,
67 url: &Url,
68 _storage_options: Option<&HashMap<String, String>>,
69 ) -> Result<String> {
70 Ok(format!("shared-memory${}", url.authority()))
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77 use crate::object_store::ObjectStoreRegistry;
78 use bytes::Bytes;
79 use object_store::{ObjectStoreExt as _, PutPayload};
80
81 async fn store_for(uri: &str) -> (Arc<ObjectStore>, Path) {
82 let registry = Arc::new(ObjectStoreRegistry::default());
83 let (store, path) = ObjectStore::from_uri_and_params(registry, uri, &Default::default())
84 .await
85 .unwrap();
86 (store, path)
87 }
88
89 #[tokio::test]
90 async fn same_authority_shares_bytes_across_registries() {
91 let (writer, _) = store_for("shared-memory://bucket-a/").await;
92 writer
93 .inner
94 .put(&Path::from("file"), PutPayload::from_static(b"hello"))
95 .await
96 .unwrap();
97
98 let (reader, _) = store_for("shared-memory://bucket-a/").await;
100 let bytes = reader.inner.get(&Path::from("file")).await.unwrap();
101 assert_eq!(bytes.bytes().await.unwrap(), Bytes::from_static(b"hello"));
102 }
103
104 #[tokio::test]
105 async fn different_authorities_are_isolated() {
106 let (a, _) = store_for("shared-memory://iso-a/").await;
107 let (b, _) = store_for("shared-memory://iso-b/").await;
108 a.inner
109 .put(&Path::from("k"), PutPayload::from_static(b"in-a"))
110 .await
111 .unwrap();
112 assert!(b.inner.get(&Path::from("k")).await.is_err());
113 }
114
115 #[tokio::test]
116 async fn extract_path_strips_authority() {
117 let provider = SharedMemoryStoreProvider::default();
118 let url = Url::parse("shared-memory://bucket/foo/bar").unwrap();
119 assert_eq!(provider.extract_path(&url).unwrap(), Path::from("foo/bar"));
120 }
121
122 #[tokio::test]
123 async fn from_uri_and_params_resolves_path_correctly() {
124 let (store, path) = store_for("shared-memory://path-test/sub/dir/obj").await;
125 assert_eq!(path, Path::from("sub/dir/obj"));
126 store
127 .inner
128 .put(&path, PutPayload::from_static(b"payload"))
129 .await
130 .unwrap();
131
132 let (peer, peer_path) = store_for("shared-memory://path-test/sub/dir/obj").await;
133 let bytes = peer.inner.get(&peer_path).await.unwrap();
134 assert_eq!(bytes.bytes().await.unwrap(), Bytes::from_static(b"payload"));
135 }
136
137 #[test]
138 fn calculate_prefix_is_per_authority() {
139 let provider = SharedMemoryStoreProvider::default();
140 let a = provider
141 .calculate_object_store_prefix(&Url::parse("shared-memory://x/p").unwrap(), None)
142 .unwrap();
143 let b = provider
144 .calculate_object_store_prefix(&Url::parse("shared-memory://y/p").unwrap(), None)
145 .unwrap();
146 assert_ne!(a, b);
147 assert_eq!(a, "shared-memory$x");
148 }
149}