noosphere_collections/hamt/hamt_implementation.rs
1// Adapted for Noosphere from https://github.com/filecoin-project/ref-fvm
2// Source copyright and license:
3// Copyright 2019-2022 ChainSafe Systems
4// SPDX-License-Identifier: Apache-2.0, MIT
5
6use anyhow::Result;
7use libipld_cbor::DagCborCodec;
8
9use noosphere_storage::BlockStore;
10use std::borrow::Borrow;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use tokio_stream::{Stream, StreamExt};
14
15use cid::Cid;
16use forest_hash_utils::{BytesKey, Hash};
17use serde::de::DeserializeOwned;
18use serde::{Serialize, Serializer};
19
20use crate::hamt::node::Node;
21use crate::hamt::{HashAlgorithm, Sha256};
22
23pub const MAX_ARRAY_WIDTH: usize = 3;
24
25/// Default bit width for indexing a hash at each depth level
26pub const DEFAULT_BIT_WIDTH: u32 = 8;
27
28#[cfg(not(target_arch = "wasm32"))]
29pub trait TargetConditionalSendSync: Send + Sync {}
30
31#[cfg(not(target_arch = "wasm32"))]
32impl<S: Send + Sync> TargetConditionalSendSync for S {}
33
34#[cfg(target_arch = "wasm32")]
35pub trait TargetConditionalSendSync {}
36
37#[cfg(target_arch = "wasm32")]
38impl<S> TargetConditionalSendSync for S {}
39
40pub type HamtStream<'a, K, V> = Pin<Box<dyn Stream<Item = Result<(&'a K, &'a V)>> + 'a>>;
41
42/// Implementation of the HAMT data structure for IPLD.
43///
44/// # Examples
45///
46/// ```
47/// use noosphere_collections::hamt::Hamt;
48/// use noosphere_storage::MemoryStore;
49/// use tokio;
50///
51/// #[tokio::main(flavor = "multi_thread")]
52/// async fn main() {
53/// let store = MemoryStore::default();
54///
55/// let mut map: Hamt<_, _, usize> = Hamt::new(store);
56/// map.set(1, "a".to_string()).await.unwrap();
57/// assert_eq!(map.get(&1).await.unwrap(), Some(&"a".to_string()));
58/// assert_eq!(map.delete(&1).await.unwrap(), Some((1, "a".to_string())));
59/// assert_eq!(map.get::<_>(&1).await.unwrap(), None);
60/// let cid = map.flush().await.unwrap();
61/// }
62/// ```
63#[derive(Debug)]
64pub struct Hamt<BS, V, K = BytesKey, H = Sha256>
65where
66 K: TargetConditionalSendSync,
67 V: TargetConditionalSendSync,
68{
69 root: Node<K, V, H>,
70 store: BS,
71
72 bit_width: u32,
73 hash: PhantomData<H>,
74}
75
76impl<BS, V, K, H> Serialize for Hamt<BS, V, K, H>
77where
78 K: Serialize + TargetConditionalSendSync,
79 V: Serialize + TargetConditionalSendSync,
80 H: HashAlgorithm,
81{
82 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
83 where
84 S: Serializer,
85 {
86 self.root.serialize(serializer)
87 }
88}
89
90impl<
91 K: PartialEq + TargetConditionalSendSync,
92 V: PartialEq + TargetConditionalSendSync,
93 S: BlockStore,
94 H: HashAlgorithm,
95 > PartialEq for Hamt<S, V, K, H>
96{
97 fn eq(&self, other: &Self) -> bool {
98 self.root == other.root
99 }
100}
101
102impl<BS, V, K, H> Hamt<BS, V, K, H>
103where
104 K: Hash + Eq + PartialOrd + Serialize + DeserializeOwned + TargetConditionalSendSync,
105 V: Serialize + DeserializeOwned + TargetConditionalSendSync + PartialEq,
106 BS: BlockStore,
107 H: HashAlgorithm,
108{
109 pub fn new(store: BS) -> Self {
110 Self::new_with_bit_width(store, DEFAULT_BIT_WIDTH)
111 }
112
113 /// Construct hamt with a bit width
114 pub fn new_with_bit_width(store: BS, bit_width: u32) -> Self {
115 Self {
116 root: Node::default(),
117 store,
118 bit_width,
119 hash: Default::default(),
120 }
121 }
122
123 /// Lazily instantiate a hamt from this root Cid.
124 pub async fn load(cid: &Cid, store: BS) -> Result<Self> {
125 Self::load_with_bit_width(cid, store, DEFAULT_BIT_WIDTH).await
126 }
127
128 /// Lazily instantiate a hamt from this root Cid with a specified bit width.
129 pub async fn load_with_bit_width(cid: &Cid, store: BS, bit_width: u32) -> Result<Self> {
130 let root: Node<K, V, H> = store.load::<DagCborCodec, _>(cid).await?;
131 Ok(Self {
132 root,
133 store,
134 bit_width,
135 hash: Default::default(),
136 })
137 }
138
139 /// Sets the root based on the Cid of the root node using the Hamt store
140 pub async fn set_root(&mut self, cid: &Cid) -> Result<()> {
141 self.root = self.store.load::<DagCborCodec, _>(cid).await?;
142
143 Ok(())
144 }
145
146 /// Returns a reference to the underlying store of the Hamt.
147 pub fn store(&self) -> &BS {
148 &self.store
149 }
150
151 /// Inserts a key-value pair into the HAMT.
152 ///
153 /// If the HAMT did not have this key present, `None` is returned.
154 ///
155 /// If the HAMT did have this key present, the value is updated, and the old
156 /// value is returned. The key is not updated, though;
157 ///
158 /// # Examples
159 ///
160 /// ```
161 /// use noosphere_collections::hamt::Hamt;
162 /// use noosphere_storage::MemoryStore;
163 /// use tokio;
164 ///
165 /// #[tokio::main(flavor = "multi_thread")]
166 /// async fn main() {
167 /// let store = MemoryStore::default();
168 ///
169 /// let mut map: Hamt<_, _, usize> = Hamt::new(store);
170 /// map.set(37, "a".to_string()).await.unwrap();
171 /// assert_eq!(map.is_empty(), false);
172 ///
173 /// map.set(37, "b".to_string()).await.unwrap();
174 /// map.set(37, "c".to_string()).await.unwrap();
175 /// }
176 /// ```
177 pub async fn set(&mut self, key: K, value: V) -> Result<Option<V>> {
178 self.root
179 .set(key, value, &self.store, self.bit_width, true)
180 .await
181 .map(|(r, _)| r)
182 }
183
184 /// Inserts a key-value pair into the HAMT only if that key does not already exist.
185 ///
186 /// If the HAMT did not have this key present, `true` is returned and the key/value is added.
187 ///
188 /// If the HAMT did have this key present, this function will return false
189 ///
190 /// # Examples
191 ///
192 /// ```
193 /// use noosphere_collections::hamt::Hamt;
194 /// use noosphere_storage::MemoryStore;
195 /// use tokio;
196 ///
197 /// #[tokio::main(flavor = "multi_thread")]
198 /// async fn main() {
199 /// let store = MemoryStore::default();
200 ///
201 /// let mut map: Hamt<_, _, usize> = Hamt::new(store);
202 /// let a = map.set_if_absent(37, "a".to_string()).await.unwrap();
203 /// assert_eq!(map.is_empty(), false);
204 /// assert_eq!(a, true);
205 ///
206 /// let b = map.set_if_absent(37, "b".to_string()).await.unwrap();
207 /// assert_eq!(b, false);
208 /// assert_eq!(map.get(&37).await.unwrap(), Some(&"a".to_string()));
209 ///
210 /// let c = map.set_if_absent(30, "c".to_string()).await.unwrap();
211 /// assert_eq!(c, true);
212 /// }
213 /// ```
214 pub async fn set_if_absent(&mut self, key: K, value: V) -> Result<bool>
215 where
216 V: PartialEq,
217 {
218 self.root
219 .set(key, value, &self.store, self.bit_width, false)
220 .await
221 .map(|(_, set)| set)
222 }
223
224 /// Returns a reference to the value corresponding to the key.
225 ///
226 /// The key may be any borrowed form of the map's key type, but
227 /// `Hash` and `Eq` on the borrowed form *must* match those for
228 /// the key type.
229 ///
230 /// # Examples
231 ///
232 /// ```
233 /// use noosphere_collections::hamt::Hamt;
234 /// use noosphere_storage::MemoryStore;
235 /// use tokio;
236 ///
237 /// #[tokio::main(flavor = "multi_thread")]
238 /// async fn main() {
239 /// let store = MemoryStore::default();
240 ///
241 /// let mut map: Hamt<_, _, usize> = Hamt::new(store);
242 /// map.set(1, "a".to_string()).await.unwrap();
243 /// assert_eq!(map.get(&1).await.unwrap(), Some(&"a".to_string()));
244 /// assert_eq!(map.get(&2).await.unwrap(), None);
245 /// }
246 /// ```
247 #[inline]
248 pub async fn get<Q: ?Sized>(&self, k: &Q) -> Result<Option<&V>>
249 where
250 K: Borrow<Q>,
251 Q: Hash + Eq + TargetConditionalSendSync,
252 V: DeserializeOwned,
253 {
254 match self.root.get(k, &self.store, self.bit_width).await? {
255 Some(v) => Ok(Some(v)),
256 None => Ok(None),
257 }
258 }
259
260 /// Returns `true` if a value exists for the given key in the HAMT.
261 ///
262 /// The key may be any borrowed form of the map's key type, but
263 /// `Hash` and `Eq` on the borrowed form *must* match those for
264 /// the key type.
265 ///
266 /// # Examples
267 ///
268 /// ```
269 /// use noosphere_collections::hamt::Hamt;
270 /// use noosphere_storage::MemoryStore;
271 /// use tokio;
272 ///
273 /// #[tokio::main(flavor = "multi_thread")]
274 /// async fn main() {
275 /// let store = MemoryStore::default();
276 ///
277 /// let mut map: Hamt<_, _, usize> = Hamt::new(store);
278 /// map.set(1, "a".to_string()).await.unwrap();
279 /// assert_eq!(map.contains_key(&1).await.unwrap(), true);
280 /// assert_eq!(map.contains_key(&2).await.unwrap(), false);
281 /// }
282 /// ```
283 #[inline]
284 pub async fn contains_key<Q: ?Sized>(&self, k: &Q) -> Result<bool>
285 where
286 K: Borrow<Q>,
287 Q: Hash + Eq + TargetConditionalSendSync,
288 {
289 Ok(self
290 .root
291 .get(k, &self.store, self.bit_width)
292 .await?
293 .is_some())
294 }
295
296 /// Removes a key from the HAMT, returning the value at the key if the key
297 /// was previously in the HAMT.
298 ///
299 /// The key may be any borrowed form of the HAMT's key type, but
300 /// `Hash` and `Eq` on the borrowed form *must* match those for
301 /// the key type.
302 ///
303 /// # Examples
304 ///
305 /// ```
306 /// use noosphere_collections::hamt::Hamt;
307 /// use noosphere_storage::MemoryStore;
308 /// use tokio;
309 ///
310 /// #[tokio::main(flavor = "multi_thread")]
311 /// async fn main() {
312 /// let store = MemoryStore::default();
313 ///
314 /// let mut map: Hamt<_, _, usize> = Hamt::new(store);
315 /// map.set(1, "a".to_string()).await.unwrap();
316 /// assert_eq!(map.delete(&1).await.unwrap(), Some((1, "a".to_string())));
317 /// assert_eq!(map.delete(&1).await.unwrap(), None);
318 /// }
319 /// ```
320 pub async fn delete<Q: ?Sized>(&mut self, k: &Q) -> Result<Option<(K, V)>>
321 where
322 K: Borrow<Q>,
323 Q: Hash + Eq + TargetConditionalSendSync,
324 {
325 self.root.remove_entry(k, &self.store, self.bit_width).await
326 }
327
328 /// Flush root and return Cid for hamt
329 pub async fn flush(&mut self) -> Result<Cid> {
330 self.root.flush(&mut self.store).await?;
331 self.store.save::<DagCborCodec, _>(&self.root).await
332 }
333
334 /// Returns true if the HAMT has no entries
335 pub fn is_empty(&self) -> bool {
336 self.root.is_empty()
337 }
338
339 /// Iterates over each KV in the Hamt and runs a function on the values.
340 ///
341 /// This function will constrain all values to be of the same type
342 ///
343 /// # Examples
344 ///
345 /// ```
346 /// use noosphere_collections::hamt::Hamt;
347 /// use noosphere_storage::MemoryStore;
348 /// use tokio;
349 ///
350 /// #[tokio::main(flavor = "multi_thread")]
351 /// async fn main() {
352 /// let store = MemoryStore::default();
353 ///
354 /// let mut map: Hamt<_, _, usize> = Hamt::new(store);
355 /// map.set(1, 1).await.unwrap();
356 /// map.set(4, 2).await.unwrap();
357 ///
358 /// let mut total = 0;
359 /// map.for_each(|_, v: &u64| {
360 /// total += v;
361 /// Ok(())
362 /// }).await.unwrap();
363 /// assert_eq!(total, 3);
364 /// }
365 /// ```
366 #[inline]
367 pub async fn for_each<F>(&self, mut f: F) -> Result<()>
368 where
369 V: DeserializeOwned,
370 F: FnMut(&K, &V) -> anyhow::Result<()>,
371 {
372 let mut stream = self.stream();
373
374 while let Some(Ok((key, value))) = stream.next().await {
375 f(key, value)?;
376 }
377
378 Ok(())
379 }
380
381 pub fn stream(&self) -> HamtStream<K, V> {
382 self.root.stream(&self.store)
383 }
384
385 pub fn into_stream(self) -> impl Stream<Item = Result<(K, V)>> {
386 self.root.into_stream(self.store)
387 }
388
389 /// Consumes this HAMT and returns the Blockstore it owns.
390 pub fn into_store(self) -> BS {
391 self.store
392 }
393}