noosphere_collections/hamt/
hamt_implementation.rs

1// Adapted for Noosphere from https://github.com/filecoin-project/ref-fvm
2// Source copyright and license:
3// Copyright 2019-2022 ChainSafe Systems
4// SPDX-License-Identifier: Apache-2.0, MIT
5
6use anyhow::Result;
7use libipld_cbor::DagCborCodec;
8
9use noosphere_storage::BlockStore;
10use std::borrow::Borrow;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use tokio_stream::{Stream, StreamExt};
14
15use cid::Cid;
16use forest_hash_utils::{BytesKey, Hash};
17use serde::de::DeserializeOwned;
18use serde::{Serialize, Serializer};
19
20use crate::hamt::node::Node;
21use crate::hamt::{HashAlgorithm, Sha256};
22
23pub const MAX_ARRAY_WIDTH: usize = 3;
24
25/// Default bit width for indexing a hash at each depth level
26pub const DEFAULT_BIT_WIDTH: u32 = 8;
27
28#[cfg(not(target_arch = "wasm32"))]
29pub trait TargetConditionalSendSync: Send + Sync {}
30
31#[cfg(not(target_arch = "wasm32"))]
32impl<S: Send + Sync> TargetConditionalSendSync for S {}
33
34#[cfg(target_arch = "wasm32")]
35pub trait TargetConditionalSendSync {}
36
37#[cfg(target_arch = "wasm32")]
38impl<S> TargetConditionalSendSync for S {}
39
40pub type HamtStream<'a, K, V> = Pin<Box<dyn Stream<Item = Result<(&'a K, &'a V)>> + 'a>>;
41
42/// Implementation of the HAMT data structure for IPLD.
43///
44/// # Examples
45///
46/// ```
47/// use noosphere_collections::hamt::Hamt;
48/// use noosphere_storage::MemoryStore;
49/// use tokio;
50///
51/// #[tokio::main(flavor = "multi_thread")]
52/// async fn main() {
53///     let store = MemoryStore::default();
54///
55///     let mut map: Hamt<_, _, usize> = Hamt::new(store);
56///     map.set(1, "a".to_string()).await.unwrap();
57///     assert_eq!(map.get(&1).await.unwrap(), Some(&"a".to_string()));
58///     assert_eq!(map.delete(&1).await.unwrap(), Some((1, "a".to_string())));
59///     assert_eq!(map.get::<_>(&1).await.unwrap(), None);
60///     let cid = map.flush().await.unwrap();
61/// }
62/// ```
63#[derive(Debug)]
64pub struct Hamt<BS, V, K = BytesKey, H = Sha256>
65where
66    K: TargetConditionalSendSync,
67    V: TargetConditionalSendSync,
68{
69    root: Node<K, V, H>,
70    store: BS,
71
72    bit_width: u32,
73    hash: PhantomData<H>,
74}
75
76impl<BS, V, K, H> Serialize for Hamt<BS, V, K, H>
77where
78    K: Serialize + TargetConditionalSendSync,
79    V: Serialize + TargetConditionalSendSync,
80    H: HashAlgorithm,
81{
82    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
83    where
84        S: Serializer,
85    {
86        self.root.serialize(serializer)
87    }
88}
89
90impl<
91        K: PartialEq + TargetConditionalSendSync,
92        V: PartialEq + TargetConditionalSendSync,
93        S: BlockStore,
94        H: HashAlgorithm,
95    > PartialEq for Hamt<S, V, K, H>
96{
97    fn eq(&self, other: &Self) -> bool {
98        self.root == other.root
99    }
100}
101
102impl<BS, V, K, H> Hamt<BS, V, K, H>
103where
104    K: Hash + Eq + PartialOrd + Serialize + DeserializeOwned + TargetConditionalSendSync,
105    V: Serialize + DeserializeOwned + TargetConditionalSendSync + PartialEq,
106    BS: BlockStore,
107    H: HashAlgorithm,
108{
109    pub fn new(store: BS) -> Self {
110        Self::new_with_bit_width(store, DEFAULT_BIT_WIDTH)
111    }
112
113    /// Construct hamt with a bit width
114    pub fn new_with_bit_width(store: BS, bit_width: u32) -> Self {
115        Self {
116            root: Node::default(),
117            store,
118            bit_width,
119            hash: Default::default(),
120        }
121    }
122
123    /// Lazily instantiate a hamt from this root Cid.
124    pub async fn load(cid: &Cid, store: BS) -> Result<Self> {
125        Self::load_with_bit_width(cid, store, DEFAULT_BIT_WIDTH).await
126    }
127
128    /// Lazily instantiate a hamt from this root Cid with a specified bit width.
129    pub async fn load_with_bit_width(cid: &Cid, store: BS, bit_width: u32) -> Result<Self> {
130        let root: Node<K, V, H> = store.load::<DagCborCodec, _>(cid).await?;
131        Ok(Self {
132            root,
133            store,
134            bit_width,
135            hash: Default::default(),
136        })
137    }
138
139    /// Sets the root based on the Cid of the root node using the Hamt store
140    pub async fn set_root(&mut self, cid: &Cid) -> Result<()> {
141        self.root = self.store.load::<DagCborCodec, _>(cid).await?;
142
143        Ok(())
144    }
145
146    /// Returns a reference to the underlying store of the Hamt.
147    pub fn store(&self) -> &BS {
148        &self.store
149    }
150
151    /// Inserts a key-value pair into the HAMT.
152    ///
153    /// If the HAMT did not have this key present, `None` is returned.
154    ///
155    /// If the HAMT did have this key present, the value is updated, and the old
156    /// value is returned. The key is not updated, though;
157    ///
158    /// # Examples
159    ///
160    /// ```
161    /// use noosphere_collections::hamt::Hamt;
162    /// use noosphere_storage::MemoryStore;
163    /// use tokio;
164    ///
165    /// #[tokio::main(flavor = "multi_thread")]
166    /// async fn main() {
167    ///     let store = MemoryStore::default();
168    ///
169    ///     let mut map: Hamt<_, _, usize> = Hamt::new(store);
170    ///     map.set(37, "a".to_string()).await.unwrap();
171    ///     assert_eq!(map.is_empty(), false);
172    ///
173    ///     map.set(37, "b".to_string()).await.unwrap();
174    ///     map.set(37, "c".to_string()).await.unwrap();
175    /// }
176    /// ```
177    pub async fn set(&mut self, key: K, value: V) -> Result<Option<V>> {
178        self.root
179            .set(key, value, &self.store, self.bit_width, true)
180            .await
181            .map(|(r, _)| r)
182    }
183
184    /// Inserts a key-value pair into the HAMT only if that key does not already exist.
185    ///
186    /// If the HAMT did not have this key present, `true` is returned and the key/value is added.
187    ///
188    /// If the HAMT did have this key present, this function will return false
189    ///
190    /// # Examples
191    ///
192    /// ```
193    /// use noosphere_collections::hamt::Hamt;
194    /// use noosphere_storage::MemoryStore;
195    /// use tokio;
196    ///
197    /// #[tokio::main(flavor = "multi_thread")]
198    /// async fn main() {
199    ///     let store = MemoryStore::default();
200    ///
201    ///     let mut map: Hamt<_, _, usize> = Hamt::new(store);
202    ///     let a = map.set_if_absent(37, "a".to_string()).await.unwrap();
203    ///     assert_eq!(map.is_empty(), false);
204    ///     assert_eq!(a, true);
205    ///
206    ///     let b = map.set_if_absent(37, "b".to_string()).await.unwrap();
207    ///     assert_eq!(b, false);
208    ///     assert_eq!(map.get(&37).await.unwrap(), Some(&"a".to_string()));
209    ///
210    ///     let c = map.set_if_absent(30, "c".to_string()).await.unwrap();
211    ///     assert_eq!(c, true);
212    /// }
213    /// ```
214    pub async fn set_if_absent(&mut self, key: K, value: V) -> Result<bool>
215    where
216        V: PartialEq,
217    {
218        self.root
219            .set(key, value, &self.store, self.bit_width, false)
220            .await
221            .map(|(_, set)| set)
222    }
223
224    /// Returns a reference to the value corresponding to the key.
225    ///
226    /// The key may be any borrowed form of the map's key type, but
227    /// `Hash` and `Eq` on the borrowed form *must* match those for
228    /// the key type.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use noosphere_collections::hamt::Hamt;
234    /// use noosphere_storage::MemoryStore;
235    /// use tokio;
236    ///
237    /// #[tokio::main(flavor = "multi_thread")]
238    /// async fn main() {
239    ///     let store = MemoryStore::default();
240    ///
241    ///     let mut map: Hamt<_, _, usize> = Hamt::new(store);
242    ///     map.set(1, "a".to_string()).await.unwrap();
243    ///     assert_eq!(map.get(&1).await.unwrap(), Some(&"a".to_string()));
244    ///     assert_eq!(map.get(&2).await.unwrap(), None);
245    /// }
246    /// ```
247    #[inline]
248    pub async fn get<Q: ?Sized>(&self, k: &Q) -> Result<Option<&V>>
249    where
250        K: Borrow<Q>,
251        Q: Hash + Eq + TargetConditionalSendSync,
252        V: DeserializeOwned,
253    {
254        match self.root.get(k, &self.store, self.bit_width).await? {
255            Some(v) => Ok(Some(v)),
256            None => Ok(None),
257        }
258    }
259
260    /// Returns `true` if a value exists for the given key in the HAMT.
261    ///
262    /// The key may be any borrowed form of the map's key type, but
263    /// `Hash` and `Eq` on the borrowed form *must* match those for
264    /// the key type.
265    ///
266    /// # Examples
267    ///
268    /// ```
269    /// use noosphere_collections::hamt::Hamt;
270    /// use noosphere_storage::MemoryStore;
271    /// use tokio;
272    ///
273    /// #[tokio::main(flavor = "multi_thread")]
274    /// async fn main() {
275    ///     let store = MemoryStore::default();
276    ///
277    ///     let mut map: Hamt<_, _, usize> = Hamt::new(store);
278    ///     map.set(1, "a".to_string()).await.unwrap();
279    ///     assert_eq!(map.contains_key(&1).await.unwrap(), true);
280    ///     assert_eq!(map.contains_key(&2).await.unwrap(), false);
281    /// }
282    /// ```
283    #[inline]
284    pub async fn contains_key<Q: ?Sized>(&self, k: &Q) -> Result<bool>
285    where
286        K: Borrow<Q>,
287        Q: Hash + Eq + TargetConditionalSendSync,
288    {
289        Ok(self
290            .root
291            .get(k, &self.store, self.bit_width)
292            .await?
293            .is_some())
294    }
295
296    /// Removes a key from the HAMT, returning the value at the key if the key
297    /// was previously in the HAMT.
298    ///
299    /// The key may be any borrowed form of the HAMT's key type, but
300    /// `Hash` and `Eq` on the borrowed form *must* match those for
301    /// the key type.
302    ///
303    /// # Examples
304    ///
305    /// ```
306    /// use noosphere_collections::hamt::Hamt;
307    /// use noosphere_storage::MemoryStore;
308    /// use tokio;
309    ///
310    /// #[tokio::main(flavor = "multi_thread")]
311    /// async fn main() {
312    ///     let store = MemoryStore::default();
313    ///
314    ///     let mut map: Hamt<_, _, usize> = Hamt::new(store);
315    ///     map.set(1, "a".to_string()).await.unwrap();
316    ///     assert_eq!(map.delete(&1).await.unwrap(), Some((1, "a".to_string())));
317    ///     assert_eq!(map.delete(&1).await.unwrap(), None);
318    /// }
319    /// ```
320    pub async fn delete<Q: ?Sized>(&mut self, k: &Q) -> Result<Option<(K, V)>>
321    where
322        K: Borrow<Q>,
323        Q: Hash + Eq + TargetConditionalSendSync,
324    {
325        self.root.remove_entry(k, &self.store, self.bit_width).await
326    }
327
328    /// Flush root and return Cid for hamt
329    pub async fn flush(&mut self) -> Result<Cid> {
330        self.root.flush(&mut self.store).await?;
331        self.store.save::<DagCborCodec, _>(&self.root).await
332    }
333
334    /// Returns true if the HAMT has no entries
335    pub fn is_empty(&self) -> bool {
336        self.root.is_empty()
337    }
338
339    /// Iterates over each KV in the Hamt and runs a function on the values.
340    ///
341    /// This function will constrain all values to be of the same type
342    ///
343    /// # Examples
344    ///
345    /// ```
346    /// use noosphere_collections::hamt::Hamt;
347    /// use noosphere_storage::MemoryStore;
348    /// use tokio;
349    ///
350    /// #[tokio::main(flavor = "multi_thread")]
351    /// async fn main() {
352    ///     let store = MemoryStore::default();
353    ///
354    ///     let mut map: Hamt<_, _, usize> = Hamt::new(store);
355    ///     map.set(1, 1).await.unwrap();
356    ///     map.set(4, 2).await.unwrap();
357    ///
358    ///     let mut total = 0;
359    ///     map.for_each(|_, v: &u64| {
360    ///         total += v;
361    ///         Ok(())
362    ///     }).await.unwrap();
363    ///     assert_eq!(total, 3);
364    /// }
365    /// ```
366    #[inline]
367    pub async fn for_each<F>(&self, mut f: F) -> Result<()>
368    where
369        V: DeserializeOwned,
370        F: FnMut(&K, &V) -> anyhow::Result<()>,
371    {
372        let mut stream = self.stream();
373
374        while let Some(Ok((key, value))) = stream.next().await {
375            f(key, value)?;
376        }
377
378        Ok(())
379    }
380
381    pub fn stream(&self) -> HamtStream<K, V> {
382        self.root.stream(&self.store)
383    }
384
385    pub fn into_stream(self) -> impl Stream<Item = Result<(K, V)>> {
386        self.root.into_stream(self.store)
387    }
388
389    /// Consumes this HAMT and returns the Blockstore it owns.
390    pub fn into_store(self) -> BS {
391        self.store
392    }
393}