clayers_repo/store/
memory.rs1use std::collections::HashMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use clayers_xml::ContentHash;
10use tokio::sync::RwLock;
11
12use futures_core::stream::BoxStream;
13
14use super::{ObjectStore, RefStore, Transaction, subtree_walk};
15use crate::error::Result;
16use crate::object::{Object, ElementObject};
17use crate::query::{QueryStore, QueryMode, QueryResult, NamespaceMap, default_query_document};
18
19pub(crate) struct MemoryStoreInner {
21 pub(crate) objects: RwLock<HashMap<ContentHash, Object>>,
22 pub(crate) refs: RwLock<HashMap<String, ContentHash>>,
23 pub(crate) inclusive_index: RwLock<HashMap<ContentHash, ContentHash>>,
25}
26
27pub struct MemoryStore {
32 inner: Arc<MemoryStoreInner>,
33}
34
35impl MemoryStore {
36 #[must_use]
38 pub fn new() -> Self {
39 Self {
40 inner: Arc::new(MemoryStoreInner {
41 objects: RwLock::new(HashMap::new()),
42 refs: RwLock::new(HashMap::new()),
43 inclusive_index: RwLock::new(HashMap::new()),
44 }),
45 }
46 }
47}
48
49impl Default for MemoryStore {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55#[async_trait]
56impl ObjectStore for MemoryStore {
57 async fn get(&self, hash: &ContentHash) -> Result<Option<Object>> {
58 let objects = self.inner.objects.read().await;
59 Ok(objects.get(hash).cloned())
60 }
61
62 async fn contains(&self, hash: &ContentHash) -> Result<bool> {
63 let objects = self.inner.objects.read().await;
64 Ok(objects.contains_key(hash))
65 }
66
67 async fn transaction(&self) -> Result<Box<dyn Transaction>> {
68 Ok(Box::new(MemoryTransaction::new(Arc::clone(&self.inner))))
69 }
70
71 fn subtree<'a>(
72 &'a self,
73 root: &ContentHash,
74 ) -> BoxStream<'a, Result<(ContentHash, Object)>> {
75 subtree_walk(self, root)
76 }
77
78 async fn get_by_inclusive_hash(
79 &self,
80 inclusive_hash: &ContentHash,
81 ) -> Result<Option<(ContentHash, Object)>> {
82 let index = self.inner.inclusive_index.read().await;
83 let Some(identity_hash) = index.get(inclusive_hash).copied() else {
84 return Ok(None);
85 };
86 drop(index);
87 let objects = self.inner.objects.read().await;
88 Ok(objects
89 .get(&identity_hash)
90 .map(|obj| (identity_hash, obj.clone())))
91 }
92}
93
94#[async_trait]
95impl QueryStore for MemoryStore {
96 async fn query_document(
97 &self,
98 doc_hash: ContentHash,
99 xpath: &str,
100 mode: QueryMode,
101 namespaces: &NamespaceMap,
102 ) -> Result<QueryResult> {
103 default_query_document(self, doc_hash, xpath, mode, namespaces).await
104 }
105}
106
107#[async_trait]
108impl RefStore for MemoryStore {
109 async fn get_ref(&self, name: &str) -> Result<Option<ContentHash>> {
110 let refs = self.inner.refs.read().await;
111 Ok(refs.get(name).copied())
112 }
113
114 async fn set_ref(&self, name: &str, hash: ContentHash) -> Result<()> {
115 let mut refs = self.inner.refs.write().await;
116 refs.insert(name.to_string(), hash);
117 Ok(())
118 }
119
120 async fn delete_ref(&self, name: &str) -> Result<()> {
121 let mut refs = self.inner.refs.write().await;
122 refs.remove(name);
123 Ok(())
124 }
125
126 async fn list_refs(&self, prefix: &str) -> Result<Vec<(String, ContentHash)>> {
127 let refs = self.inner.refs.read().await;
128 Ok(refs
129 .iter()
130 .filter(|(k, _)| k.starts_with(prefix))
131 .map(|(k, v)| (k.clone(), *v))
132 .collect())
133 }
134
135 async fn cas_ref(
136 &self,
137 name: &str,
138 expected: Option<ContentHash>,
139 new: ContentHash,
140 ) -> Result<bool> {
141 let mut refs = self.inner.refs.write().await;
142 let current = refs.get(name).copied();
143 if current == expected {
144 refs.insert(name.to_string(), new);
145 Ok(true)
146 } else {
147 Ok(false)
148 }
149 }
150}
151
152struct PendingEntry {
154 hash: ContentHash,
155 object: Object,
156 inclusive_hash: Option<ContentHash>,
157}
158
159pub(crate) struct MemoryTransaction {
161 pending: Vec<PendingEntry>,
162 inner: Arc<MemoryStoreInner>,
163}
164
165impl MemoryTransaction {
166 pub(crate) fn new(inner: Arc<MemoryStoreInner>) -> Self {
167 Self {
168 pending: Vec::new(),
169 inner,
170 }
171 }
172}
173
174#[async_trait]
175impl Transaction for MemoryTransaction {
176 async fn put(&mut self, hash: ContentHash, object: Object) -> Result<()> {
177 let inclusive_hash = if let Object::Element(ElementObject { inclusive_hash, .. }) = &object {
178 Some(*inclusive_hash)
179 } else {
180 None
181 };
182 self.pending.push(PendingEntry {
183 hash,
184 object,
185 inclusive_hash,
186 });
187 Ok(())
188 }
189
190 async fn commit(&mut self) -> Result<()> {
191 let mut objects = self.inner.objects.write().await;
192 let mut inclusive_index = self.inner.inclusive_index.write().await;
193
194 for entry in self.pending.drain(..) {
195 objects.insert(entry.hash, entry.object);
196 if let Some(inclusive) = entry.inclusive_hash {
197 inclusive_index.insert(inclusive, entry.hash);
198 }
199 }
200
201 Ok(())
202 }
203
204 async fn rollback(&mut self) -> Result<()> {
205 self.pending.clear();
206 Ok(())
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::MemoryStore;
213 crate::store::tests::store_tests!(MemoryStore::new());
214}
215
216#[cfg(test)]
217mod query_tests {
218 use super::MemoryStore;
219 crate::query::tests::query_tests!(MemoryStore::new());
220}
221
222#[cfg(test)]
223mod prop_tests {
224 use super::MemoryStore;
225 crate::store::prop_tests::prop_store_tests!(MemoryStore::new());
226}