Skip to main content

clayers_repo/store/
memory.rs

1//! In-memory storage backend for testing and prototyping.
2//!
3//! Thread-safe via `tokio::sync::RwLock`. Implements all three store traits.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use clayers_xml::ContentHash;
10use tokio::sync::RwLock;
11
12use futures_core::stream::BoxStream;
13
14use super::{ObjectStore, RefStore, Transaction, subtree_walk};
15use crate::error::Result;
16use crate::object::{Object, ElementObject};
17use crate::query::{QueryStore, QueryMode, QueryResult, NamespaceMap, default_query_document};
18
19/// Shared inner state for the memory store.
20pub(crate) struct MemoryStoreInner {
21    pub(crate) objects: RwLock<HashMap<ContentHash, Object>>,
22    pub(crate) refs: RwLock<HashMap<String, ContentHash>>,
23    /// Secondary index: Inclusive C14N hash -> identity (Exclusive C14N) hash.
24    pub(crate) inclusive_index: RwLock<HashMap<ContentHash, ContentHash>>,
25}
26
27/// An in-memory object store, ref store, and remote store.
28///
29/// Suitable for testing and prototyping. All data lives in memory and is
30/// lost when the store is dropped.
31pub struct MemoryStore {
32    inner: Arc<MemoryStoreInner>,
33}
34
35impl MemoryStore {
36    /// Create a new empty memory store.
37    #[must_use]
38    pub fn new() -> Self {
39        Self {
40            inner: Arc::new(MemoryStoreInner {
41                objects: RwLock::new(HashMap::new()),
42                refs: RwLock::new(HashMap::new()),
43                inclusive_index: RwLock::new(HashMap::new()),
44            }),
45        }
46    }
47}
48
49impl Default for MemoryStore {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55#[async_trait]
56impl ObjectStore for MemoryStore {
57    async fn get(&self, hash: &ContentHash) -> Result<Option<Object>> {
58        let objects = self.inner.objects.read().await;
59        Ok(objects.get(hash).cloned())
60    }
61
62    async fn contains(&self, hash: &ContentHash) -> Result<bool> {
63        let objects = self.inner.objects.read().await;
64        Ok(objects.contains_key(hash))
65    }
66
67    async fn transaction(&self) -> Result<Box<dyn Transaction>> {
68        Ok(Box::new(MemoryTransaction::new(Arc::clone(&self.inner))))
69    }
70
71    fn subtree<'a>(
72        &'a self,
73        root: &ContentHash,
74    ) -> BoxStream<'a, Result<(ContentHash, Object)>> {
75        subtree_walk(self, root)
76    }
77
78    async fn get_by_inclusive_hash(
79        &self,
80        inclusive_hash: &ContentHash,
81    ) -> Result<Option<(ContentHash, Object)>> {
82        let index = self.inner.inclusive_index.read().await;
83        let Some(identity_hash) = index.get(inclusive_hash).copied() else {
84            return Ok(None);
85        };
86        drop(index);
87        let objects = self.inner.objects.read().await;
88        Ok(objects
89            .get(&identity_hash)
90            .map(|obj| (identity_hash, obj.clone())))
91    }
92}
93
94#[async_trait]
95impl QueryStore for MemoryStore {
96    async fn query_document(
97        &self,
98        doc_hash: ContentHash,
99        xpath: &str,
100        mode: QueryMode,
101        namespaces: &NamespaceMap,
102    ) -> Result<QueryResult> {
103        default_query_document(self, doc_hash, xpath, mode, namespaces).await
104    }
105}
106
107#[async_trait]
108impl RefStore for MemoryStore {
109    async fn get_ref(&self, name: &str) -> Result<Option<ContentHash>> {
110        let refs = self.inner.refs.read().await;
111        Ok(refs.get(name).copied())
112    }
113
114    async fn set_ref(&self, name: &str, hash: ContentHash) -> Result<()> {
115        let mut refs = self.inner.refs.write().await;
116        refs.insert(name.to_string(), hash);
117        Ok(())
118    }
119
120    async fn delete_ref(&self, name: &str) -> Result<()> {
121        let mut refs = self.inner.refs.write().await;
122        refs.remove(name);
123        Ok(())
124    }
125
126    async fn list_refs(&self, prefix: &str) -> Result<Vec<(String, ContentHash)>> {
127        let refs = self.inner.refs.read().await;
128        Ok(refs
129            .iter()
130            .filter(|(k, _)| k.starts_with(prefix))
131            .map(|(k, v)| (k.clone(), *v))
132            .collect())
133    }
134
135    async fn cas_ref(
136        &self,
137        name: &str,
138        expected: Option<ContentHash>,
139        new: ContentHash,
140    ) -> Result<bool> {
141        let mut refs = self.inner.refs.write().await;
142        let current = refs.get(name).copied();
143        if current == expected {
144            refs.insert(name.to_string(), new);
145            Ok(true)
146        } else {
147            Ok(false)
148        }
149    }
150}
151
152/// Pending write entry: identity hash, object, and optional inclusive hash.
153struct PendingEntry {
154    hash: ContentHash,
155    object: Object,
156    inclusive_hash: Option<ContentHash>,
157}
158
159/// An in-memory transaction that collects writes and flushes atomically on commit.
160pub(crate) struct MemoryTransaction {
161    pending: Vec<PendingEntry>,
162    inner: Arc<MemoryStoreInner>,
163}
164
165impl MemoryTransaction {
166    pub(crate) fn new(inner: Arc<MemoryStoreInner>) -> Self {
167        Self {
168            pending: Vec::new(),
169            inner,
170        }
171    }
172}
173
174#[async_trait]
175impl Transaction for MemoryTransaction {
176    async fn put(&mut self, hash: ContentHash, object: Object) -> Result<()> {
177        let inclusive_hash = if let Object::Element(ElementObject { inclusive_hash, .. }) = &object {
178            Some(*inclusive_hash)
179        } else {
180            None
181        };
182        self.pending.push(PendingEntry {
183            hash,
184            object,
185            inclusive_hash,
186        });
187        Ok(())
188    }
189
190    async fn commit(&mut self) -> Result<()> {
191        let mut objects = self.inner.objects.write().await;
192        let mut inclusive_index = self.inner.inclusive_index.write().await;
193
194        for entry in self.pending.drain(..) {
195            objects.insert(entry.hash, entry.object);
196            if let Some(inclusive) = entry.inclusive_hash {
197                inclusive_index.insert(inclusive, entry.hash);
198            }
199        }
200
201        Ok(())
202    }
203
204    async fn rollback(&mut self) -> Result<()> {
205        self.pending.clear();
206        Ok(())
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::MemoryStore;
213    crate::store::tests::store_tests!(MemoryStore::new());
214}
215
216#[cfg(test)]
217mod query_tests {
218    use super::MemoryStore;
219    crate::query::tests::query_tests!(MemoryStore::new());
220}
221
222#[cfg(test)]
223mod prop_tests {
224    use super::MemoryStore;
225    crate::store::prop_tests::prop_store_tests!(MemoryStore::new());
226}