Skip to main content

triblespace_core/blob/
memoryblobstore.rs

1use crate::blob::schemas::UnknownBlob;
2use crate::blob::Blob;
3use crate::blob::BlobSchema;
4use crate::blob::ToBlob;
5use crate::repo::BlobStore;
6use crate::repo::BlobStoreGet;
7use crate::repo::BlobStoreKeep;
8use crate::repo::BlobStoreList;
9use crate::repo::BlobStorePut;
10use crate::value::schemas::hash::Handle;
11use crate::value::schemas::hash::HashProtocol;
12use crate::value::Value;
13use crate::value::VALUE_LEN;
14
15use std::collections::{BTreeMap, HashSet};
16use std::convert::Infallible;
17use std::error::Error;
18use std::fmt::Debug;
19use std::fmt::{self};
20use std::iter::FromIterator;
21use std::ops::Bound;
22
23use reft_light::Apply;
24use reft_light::ReadHandle;
25use reft_light::WriteHandle;
26
27use super::TryFromBlob;
28
29enum MemoryBlobStoreOps<H: HashProtocol> {
30    Insert(Value<Handle<H, UnknownBlob>>, Blob<UnknownBlob>),
31    Keep(HashSet<[u8; VALUE_LEN]>),
32}
33
34type MemoryBlobStoreMap<H> = BTreeMap<Value<Handle<H, UnknownBlob>>, Blob<UnknownBlob>>;
35
36impl<H: HashProtocol> Apply<MemoryBlobStoreMap<H>, ()> for MemoryBlobStoreOps<H> {
37    fn apply_first(
38        &mut self,
39        first: &mut MemoryBlobStoreMap<H>,
40        _second: &MemoryBlobStoreMap<H>,
41        _auxiliary: &mut (),
42    ) {
43        match self {
44            MemoryBlobStoreOps::Insert(handle, blob) => {
45                // This operation is indempotent, so we can just
46                // ignore it if the blob is already present.
47                first.entry(*handle).or_insert(blob.clone());
48            }
49            MemoryBlobStoreOps::Keep(retain) => {
50                first.retain(|handle, _| retain.contains(&handle.raw))
51            }
52        }
53    }
54}
55
56/// A mapping from [Handle]s to [Blob]s.
57pub struct MemoryBlobStore<H: HashProtocol> {
58    write_handle: WriteHandle<MemoryBlobStoreOps<H>, MemoryBlobStoreMap<H>, ()>,
59}
60
61impl<H: HashProtocol> Debug for MemoryBlobStore<H>
62where
63    H: HashProtocol,
64{
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        write!(f, "MemoryBlobStore")
67    }
68}
69
70#[derive(Debug)]
71/// Clonable view into a [`MemoryBlobStore`] that only exposes read operations.
72///
73/// Clones of this struct share access to the underlying store and may be used
74/// concurrently.
75pub struct MemoryBlobStoreReader<H: HashProtocol> {
76    read_handle: ReadHandle<MemoryBlobStoreMap<H>>,
77}
78
79impl<H: HashProtocol> Clone for MemoryBlobStoreReader<H> {
80    fn clone(&self) -> Self {
81        MemoryBlobStoreReader {
82            read_handle: self.read_handle.clone(),
83        }
84    }
85}
86
87impl<H: HashProtocol> PartialEq for MemoryBlobStoreReader<H> {
88    fn eq(&self, other: &Self) -> bool {
89        self.read_handle == other.read_handle
90    }
91}
92
93impl<H: HashProtocol> Eq for MemoryBlobStoreReader<H> {}
94
95impl<H: HashProtocol> MemoryBlobStoreReader<H> {
96    fn new(read_handle: ReadHandle<MemoryBlobStoreMap<H>>) -> Self {
97        MemoryBlobStoreReader { read_handle }
98    }
99
100    /// Returns how many blobs are currently stored in the underlying map.
101    pub fn len(&self) -> usize {
102        self.read_handle
103            .enter()
104            .map(|blobs| blobs.len())
105            .unwrap_or(0)
106    }
107
108    /// Returns an iterator over all blobs currently in the store.
109    ///
110    /// The iteration order is unspecified and should not be relied on.
111    pub fn iter(&self) -> MemoryBlobStoreIter<H> {
112        let read_handle = self.read_handle.clone();
113
114        MemoryBlobStoreIter {
115            read_handle,
116            cursor: None,
117        }
118    }
119}
120
121impl<H: HashProtocol> Default for MemoryBlobStore<H> {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126
127impl<H: HashProtocol> MemoryBlobStore<H> {
128    /// Creates a new [`MemoryBlobStore`] with no blobs.
129    ///
130    /// The store keeps all data in memory and is primarily intended for tests
131    /// or other transient repositories such as workspaces.
132    pub fn new() -> MemoryBlobStore<H> {
133        let write_storage = reft_light::new::<MemoryBlobStoreOps<H>, MemoryBlobStoreMap<H>, ()>(
134            MemoryBlobStoreMap::new(),
135            (),
136        );
137        MemoryBlobStore {
138            write_handle: write_storage,
139        }
140    }
141
142    /// Inserts `blob` into the store and returns the newly computed handle.
143    ///
144    /// The handle is derived from hashing the blob's bytes using the hash
145    /// protocol associated with this store.
146    pub fn insert<S>(&mut self, blob: Blob<S>) -> Value<Handle<H, S>>
147    where
148        S: BlobSchema,
149    {
150        let handle: Value<Handle<H, S>> = blob.get_handle();
151        let unknown_handle: Value<Handle<H, UnknownBlob>> = handle.transmute();
152        let blob: Blob<UnknownBlob> = blob.transmute();
153        self.write_handle
154            .append(MemoryBlobStoreOps::Insert(unknown_handle, blob));
155        handle
156    }
157
158    // Note that keep is conservative and keeps every blob for which there exists
159    // a corresponding trible value, irrespective of that tribles attribute type.
160    // This could theoretically allow an attacker to DOS blob garbage collection
161    // by introducting values that look like existing hashes, but are actually of
162    // a different type. But this is under the assumption that an attacker is only
163    // allowed to write non-handle typed triples, otherwise they might as well
164    // introduce blobs directly.
165    /// Drops any blobs that are not referenced by one of the provided tribles.
166    ///
167    /// This is a simple mark-and-sweep style GC used to prune unreferenced
168    /// blobs from long lived stores.
169    pub fn keep<I>(&mut self, handles: I)
170    where
171        I: IntoIterator<Item = Value<Handle<H, UnknownBlob>>>,
172    {
173        let retain: HashSet<[u8; VALUE_LEN]> = handles.into_iter().map(|h| h.raw).collect();
174        self.write_handle.append(MemoryBlobStoreOps::Keep(retain));
175    }
176}
177
178impl<H: HashProtocol> BlobStoreKeep<H> for MemoryBlobStore<H> {
179    fn keep<I>(&mut self, handles: I)
180    where
181        I: IntoIterator<Item = Value<Handle<H, UnknownBlob>>>,
182    {
183        MemoryBlobStore::keep(self, handles);
184    }
185}
186
187impl<H> FromIterator<(Value<Handle<H, UnknownBlob>>, Blob<UnknownBlob>)> for MemoryBlobStore<H>
188where
189    H: HashProtocol,
190{
191    fn from_iter<I: IntoIterator<Item = (Value<Handle<H, UnknownBlob>>, Blob<UnknownBlob>)>>(
192        iter: I,
193    ) -> Self {
194        let mut set = MemoryBlobStore::new();
195
196        for (handle, blob) in iter {
197            set.write_handle
198                .append(MemoryBlobStoreOps::Insert(handle, blob));
199        }
200
201        set
202    }
203}
204
205impl<H> IntoIterator for MemoryBlobStoreReader<H>
206where
207    H: HashProtocol,
208{
209    type Item = (Value<Handle<H, UnknownBlob>>, Blob<UnknownBlob>);
210    type IntoIter = MemoryBlobStoreIter<H>;
211    fn into_iter(self) -> Self::IntoIter {
212        self.iter()
213    }
214}
215
216#[derive(Debug)]
217pub enum MemoryStoreGetError<E: Error> {
218    /// This error occurs when a blob is requested that does not exist in the store.
219    /// It is used to indicate that the requested blob could not be found.
220    NotFound(),
221    /// This error occurs when a blob is requested that exists, but cannot be converted to the requested type.
222    ConversionFailed(E),
223}
224
225impl<E: Error> fmt::Display for MemoryStoreGetError<E> {
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        match self {
228            MemoryStoreGetError::NotFound() => write!(f, "Blob not found in memory store"),
229            MemoryStoreGetError::ConversionFailed(e) => write!(f, "Blob conversion failed: {e}"),
230        }
231    }
232}
233
234impl<E: Error> Error for MemoryStoreGetError<E> {}
235
236/// Iterator returned by [`MemoryBlobStoreReader::iter`].
237///
238/// Yields `(Handle, Blob)` pairs for each entry currently in the store.
239pub struct MemoryBlobStoreIter<H>
240where
241    H: HashProtocol,
242{
243    read_handle: ReadHandle<MemoryBlobStoreMap<H>>,
244    cursor: Option<Value<Handle<H, UnknownBlob>>>,
245}
246
247impl<H> Iterator for MemoryBlobStoreIter<H>
248where
249    H: HashProtocol,
250{
251    type Item = (Value<Handle<H, UnknownBlob>>, Blob<UnknownBlob>);
252
253    fn next(&mut self) -> Option<Self::Item> {
254        let read_handle = self.read_handle.enter()?;
255        let mut iter = if let Some(cursor) = self.cursor.take() {
256            // If we have a cursor, we start from the cursor.
257            // We use `Bound::Excluded` to skip the cursor itself.
258            read_handle.range((Bound::Excluded(cursor), Bound::Unbounded))
259        } else {
260            // If we don't have a cursor, we start from the beginning.
261            read_handle.range((
262                Bound::Unbounded::<Value<Handle<H, UnknownBlob>>>,
263                Bound::Unbounded,
264            ))
265        };
266
267        let (handle, blob) = iter.next()?;
268        self.cursor = Some(*handle);
269        Some((*handle, blob.clone()))
270        // Note: we may want to use batching in the future to gain more performance and amortize
271        // the cost of creating the iterator over the BTreeMap.
272    }
273}
274
275/// Adapter over [`MemoryBlobStoreIter`] that yields only blob handles.
276pub struct MemoryBlobStoreListIter<H>
277where
278    H: HashProtocol,
279{
280    inner: MemoryBlobStoreIter<H>,
281}
282
283impl<H> Iterator for MemoryBlobStoreListIter<H>
284where
285    H: HashProtocol,
286{
287    type Item = Result<Value<Handle<H, UnknownBlob>>, Infallible>;
288
289    fn next(&mut self) -> Option<Self::Item> {
290        let (handle, _) = self.inner.next()?;
291        Some(Ok(handle))
292    }
293}
294
295impl<H> BlobStoreList<H> for MemoryBlobStoreReader<H>
296where
297    H: HashProtocol,
298{
299    type Iter<'a> = MemoryBlobStoreListIter<H>;
300    type Err = Infallible;
301
302    fn blobs(&self) -> Self::Iter<'static> {
303        MemoryBlobStoreListIter { inner: self.iter() }
304    }
305}
306
307impl<H> BlobStoreGet<H> for MemoryBlobStoreReader<H>
308where
309    H: HashProtocol,
310{
311    type GetError<E: Error> = MemoryStoreGetError<E>;
312
313    fn get<T, S>(
314        &self,
315        handle: Value<Handle<H, S>>,
316    ) -> Result<T, Self::GetError<<T as TryFromBlob<S>>::Error>>
317    where
318        S: BlobSchema,
319        T: TryFromBlob<S>,
320    {
321        let handle: Value<Handle<H, UnknownBlob>> = handle.transmute();
322
323        let Some(read_guard) = self.read_handle.enter() else {
324            return Err(MemoryStoreGetError::NotFound());
325        };
326        let Some(blob) = read_guard.get(&handle) else {
327            return Err(MemoryStoreGetError::NotFound());
328        };
329
330        let blob: Blob<S> = blob.clone().transmute();
331
332        match blob.try_from_blob() {
333            Ok(value) => Ok(value),
334            Err(e) => Err(MemoryStoreGetError::ConversionFailed(e)),
335        }
336    }
337}
338
339impl<H> BlobStorePut<H> for MemoryBlobStore<H>
340where
341    H: HashProtocol,
342{
343    type PutError = Infallible;
344
345    fn put<S, T>(&mut self, item: T) -> Result<Value<Handle<H, S>>, Self::PutError>
346    where
347        S: BlobSchema,
348        T: ToBlob<S>,
349    {
350        let blob = item.to_blob();
351        let handle = blob.get_handle();
352        self.insert(blob);
353        Ok(handle)
354    }
355}
356
357impl<H: HashProtocol> BlobStore<H> for MemoryBlobStore<H> {
358    type Reader = MemoryBlobStoreReader<H>;
359    type ReaderError = Infallible;
360
361    fn reader(&mut self) -> Result<Self::Reader, Self::ReaderError> {
362        Ok(MemoryBlobStoreReader::new(
363            self.write_handle.publish().clone(),
364        ))
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use crate::prelude::*;
371
372    use super::*;
373    use anybytes::Bytes;
374    use fake::faker::name::raw::Name;
375    use fake::locales::EN;
376    use fake::Fake;
377
378    use blobschemas::LongString;
379    use valueschemas::Blake3;
380    use valueschemas::Handle;
381
382    attributes! {
383        "5AD0FAFB1FECBC197A385EC20166899E" as description: Handle<Blake3, LongString>;
384    }
385
386    #[test]
387    fn keep() {
388        use crate::repo::potential_handles;
389        use crate::trible::TribleSet;
390
391        let mut kb = TribleSet::new();
392        let mut blobs = MemoryBlobStore::new();
393        for _i in 0..200 {
394            kb += entity! {
395               description: blobs.put(Bytes::from_source(Name(EN).fake::<String>()).view().unwrap()).unwrap()
396            };
397        }
398        blobs.keep(potential_handles::<Blake3>(&kb));
399    }
400}