Skip to main content

lance_io/object_store/providers/
shared_memory.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    sync::{Arc, LazyLock, Mutex},
7};
8
9use crate::object_store::{
10    ObjectStore, ObjectStoreParams, ObjectStoreProvider, providers::memory::MemoryStoreProvider,
11};
12use lance_core::error::Result;
13use object_store::{memory::InMemory, path::Path};
14use url::Url;
15
16/// Process-global pool of in-memory backends keyed by URL authority.
17///
18/// Different authorities map to different backends (act as "buckets"); same
19/// authority across any caller in the process resolves to the same `Arc<InMemory>`.
20/// The pool grows for the lifetime of the process — entries are never evicted.
21static SHARED_BACKENDS: LazyLock<Mutex<HashMap<String, Arc<InMemory>>>> =
22    LazyLock::new(|| Mutex::new(HashMap::new()));
23
24fn shared_backend_for(url: &Url) -> Arc<InMemory> {
25    SHARED_BACKENDS
26        .lock()
27        .expect("SHARED_BACKENDS mutex poisoned")
28        .entry(url.authority().to_string())
29        .or_insert_with(|| Arc::new(InMemory::new()))
30        .clone()
31}
32
33/// Like [`MemoryStoreProvider`], but every caller resolving a `shared-memory://<authority>/...`
34/// URL with the same `<authority>` sees the same backing bytes — across `ObjectStoreRegistry`
35/// instances, threads, and unrelated components in the same process.
36///
37/// Intended for tests and harnesses that need multiple actors to coordinate through a
38/// common in-memory object store (e.g. a writer and an independent reader, multi-pod
39/// fence simulations). Choose distinct authorities for isolation
40/// (`shared-memory://test-a` vs `shared-memory://test-b`).
41///
42/// Unlike `memory://` — which mints a fresh `InMemory` per `new_store` call — this
43/// provider is opt-in precisely so existing tests relying on per-call isolation are
44/// unaffected.
45#[derive(Default, Debug)]
46pub struct SharedMemoryStoreProvider {
47    inner: MemoryStoreProvider,
48}
49
50#[async_trait::async_trait]
51impl ObjectStoreProvider for SharedMemoryStoreProvider {
52    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
53        let mut store = self.inner.new_store(base_path.clone(), params).await?;
54        store.inner = shared_backend_for(&base_path);
55        store.scheme = String::from("shared-memory");
56        store.store_prefix = self.calculate_object_store_prefix(&base_path, None)?;
57        Ok(store)
58    }
59
60    fn extract_path(&self, url: &Url) -> Result<Path> {
61        // The authority is the bucket; the URL path is the object path within it.
62        Ok(Path::from(url.path().trim_start_matches('/')))
63    }
64
65    fn calculate_object_store_prefix(
66        &self,
67        url: &Url,
68        _storage_options: Option<&HashMap<String, String>>,
69    ) -> Result<String> {
70        Ok(format!("shared-memory${}", url.authority()))
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77    use crate::object_store::ObjectStoreRegistry;
78    use bytes::Bytes;
79    use object_store::{ObjectStoreExt as _, PutPayload};
80
81    async fn store_for(uri: &str) -> (Arc<ObjectStore>, Path) {
82        let registry = Arc::new(ObjectStoreRegistry::default());
83        let (store, path) = ObjectStore::from_uri_and_params(registry, uri, &Default::default())
84            .await
85            .unwrap();
86        (store, path)
87    }
88
89    #[tokio::test]
90    async fn same_authority_shares_bytes_across_registries() {
91        let (writer, _) = store_for("shared-memory://bucket-a/").await;
92        writer
93            .inner
94            .put(&Path::from("file"), PutPayload::from_static(b"hello"))
95            .await
96            .unwrap();
97
98        // Build a *separate* registry — no shared state at the registry layer.
99        let (reader, _) = store_for("shared-memory://bucket-a/").await;
100        let bytes = reader.inner.get(&Path::from("file")).await.unwrap();
101        assert_eq!(bytes.bytes().await.unwrap(), Bytes::from_static(b"hello"));
102    }
103
104    #[tokio::test]
105    async fn different_authorities_are_isolated() {
106        let (a, _) = store_for("shared-memory://iso-a/").await;
107        let (b, _) = store_for("shared-memory://iso-b/").await;
108        a.inner
109            .put(&Path::from("k"), PutPayload::from_static(b"in-a"))
110            .await
111            .unwrap();
112        assert!(b.inner.get(&Path::from("k")).await.is_err());
113    }
114
115    #[tokio::test]
116    async fn extract_path_strips_authority() {
117        let provider = SharedMemoryStoreProvider::default();
118        let url = Url::parse("shared-memory://bucket/foo/bar").unwrap();
119        assert_eq!(provider.extract_path(&url).unwrap(), Path::from("foo/bar"));
120    }
121
122    #[tokio::test]
123    async fn from_uri_and_params_resolves_path_correctly() {
124        let (store, path) = store_for("shared-memory://path-test/sub/dir/obj").await;
125        assert_eq!(path, Path::from("sub/dir/obj"));
126        store
127            .inner
128            .put(&path, PutPayload::from_static(b"payload"))
129            .await
130            .unwrap();
131
132        let (peer, peer_path) = store_for("shared-memory://path-test/sub/dir/obj").await;
133        let bytes = peer.inner.get(&peer_path).await.unwrap();
134        assert_eq!(bytes.bytes().await.unwrap(), Bytes::from_static(b"payload"));
135    }
136
137    #[test]
138    fn calculate_prefix_is_per_authority() {
139        let provider = SharedMemoryStoreProvider::default();
140        let a = provider
141            .calculate_object_store_prefix(&Url::parse("shared-memory://x/p").unwrap(), None)
142            .unwrap();
143        let b = provider
144            .calculate_object_store_prefix(&Url::parse("shared-memory://y/p").unwrap(), None)
145            .unwrap();
146        assert_ne!(a, b);
147        assert_eq!(a, "shared-memory$x");
148    }
149}