atrium_repo/
mst.rs

1use std::{cmp::Ordering, collections::HashSet, convert::Infallible};
2
3use algos::FindPathResult;
4use async_stream::try_stream;
5use futures::{Stream, StreamExt};
6use ipld_core::cid::Cid;
7use serde::{Deserialize, Serialize};
8use sha2::Digest;
9
10use crate::blockstore::{AsyncBlockStoreRead, AsyncBlockStoreWrite, DAG_CBOR, SHA2_256};
11
12mod schema {
13    use super::*;
14
15    /// The [IPLD schema] for an MST node.
16    ///
17    /// [IPLD schema]: https://atproto.com/specs/repository#mst-structure
18    #[derive(Deserialize, Serialize, Clone, PartialEq)]
19    pub struct Node {
20        /// ("left", CID link, nullable): link to sub-tree [`Node`] on a lower level and with
21        /// all keys sorting before keys at this node.
22        #[serde(rename = "l")]
23        pub left: Option<Cid>,
24
25        /// ("entries", array of objects, required): ordered list of [`TreeEntry`] objects.
26        #[serde(rename = "e")]
27        pub entries: Vec<TreeEntry>,
28    }
29
30    #[derive(Deserialize, Serialize, Clone, PartialEq)]
31    pub struct TreeEntry {
32        /// ("prefixlen", integer, required): count of bytes shared with previous [`TreeEntry`]
33        /// in this [`Node`] (if any).
34        #[serde(rename = "p")]
35        pub prefix_len: usize,
36
37        /// ("keysuffix", byte array, required): remainder of key for this [`TreeEntry`],
38        /// after "prefixlen" have been removed.
39        ///
40        /// We deserialize this with the [`Ipld`] type instead of directly as a `Vec<u8>`,
41        /// because serde maps the latter to CBOR Major Type 4 (array of data items) instead
42        /// of Major Type 2 (byte string). Other crates exist that provide bytes-specific
43        /// deserializers, but `Ipld` is already in our dependencies.
44        #[serde(rename = "k", with = "serde_bytes")]
45        pub key_suffix: Vec<u8>,
46
47        /// ("value", CID Link, required): link to the record data (CBOR) for this entry.
48        #[serde(rename = "v")]
49        pub value: Cid,
50
51        /// ("tree", CID Link, nullable): link to a sub-tree [`Node`] at a lower level which
52        /// has keys sorting after this [`TreeEntry`]'s key (to the "right"), but before the
53        /// next [`TreeEntry`]'s key in this [`Node`] (if any).
54        #[serde(rename = "t")]
55        pub tree: Option<Cid>,
56    }
57}
58
59/// Merkle search tree helper algorithms.
60mod algos {
61    use super::*;
62
63    pub enum TraverseAction<R, M> {
64        /// Continue traversal into the specified `Cid`.
65        Continue((Cid, M)),
66        /// Stop traversal and return `R`.
67        Stop(R),
68    }
69
70    pub enum FindPathResult {
71        /// The key was found
72        Found {
73            /// The containing MST node
74            node: Cid,
75            /// The value's [Cid]
76            path: Cid,
77        },
78        /// The key was not found
79        NotFound {
80            /// The containing MST node
81            node: Cid,
82        },
83    }
84
85    /// Compute the depth of the specified node.
86    ///
87    /// If both the node and its nested subtrees do not contain leaves, this will return `None`.
88    pub async fn compute_depth(
89        mut bs: impl AsyncBlockStoreRead,
90        node: Cid,
91    ) -> Result<Option<usize>, Error> {
92        // Recursively iterate through the tree until we encounter a leaf node, and then
93        // use that to calculate the depth of the entire tree.
94        let mut subtrees = vec![(node, 0usize)];
95
96        loop {
97            if let Some((subtree, depth)) = subtrees.pop() {
98                let node = Node::read_from(&mut bs, subtree).await?;
99                if let Some(layer) = node.layer() {
100                    return Ok(Some(depth + layer));
101                }
102
103                subtrees.extend(node.trees().cloned().zip(std::iter::repeat(depth + 1)));
104            } else {
105                return Ok(None);
106            }
107        }
108    }
109
110    /// Traverse a merkle search tree.
111    ///
112    /// This executes the closure provided in `f` and takes the action
113    /// returned by the closure.
114    /// This also keeps track of "seen" nodes, and if a node is seen twice, traversal
115    /// is immediately halted and an error is returned.
116    pub async fn traverse<R, M>(
117        mut bs: impl AsyncBlockStoreRead,
118        root: Cid,
119        mut f: impl FnMut(Node, Cid) -> Result<TraverseAction<R, M>, Error>,
120    ) -> Result<(Vec<(Node, M)>, R), Error> {
121        let mut node_cid = root;
122        let mut node_path = vec![];
123        let mut seen = HashSet::new();
124
125        loop {
126            let node = Node::read_from(&mut bs, node_cid).await?;
127            if !seen.insert(node_cid) {
128                // This CID was already seen. There is a cycle in the graph.
129                panic!();
130            }
131
132            match f(node.clone(), node_cid)? {
133                TraverseAction::Continue((cid, meta)) => {
134                    node_path.push((node, meta));
135                    node_cid = cid;
136                }
137                TraverseAction::Stop(r) => {
138                    return Ok((node_path, r));
139                }
140            }
141        }
142    }
143
144    /// Traverse through the tree, finding the node that contains a key.
145    pub fn traverse_find(
146        key: &str,
147    ) -> impl FnMut(Node, Cid) -> Result<TraverseAction<(Node, usize), usize>, Error> + '_ {
148        move |node, _cid| -> Result<_, Error> {
149            if let Some(index) = node.find_ge(key) {
150                if let Some(NodeEntry::Leaf(e)) = node.entries.get(index) {
151                    if e.key == key {
152                        return Ok(TraverseAction::Stop((node, index)));
153                    }
154                }
155
156                // Check if the left neighbor is a tree, and if so, recurse into it.
157                if let Some(index) = index.checked_sub(1) {
158                    if let Some(subtree) = node.entries.get(index).unwrap().tree() {
159                        Ok(TraverseAction::Continue((*subtree, index)))
160                    } else {
161                        Err(Error::KeyNotFound)
162                    }
163                } else {
164                    // There is no left neighbor. The key is not present.
165                    Err(Error::KeyNotFound)
166                }
167            } else {
168                // We've recursed into an empty node, so the key is not present in the tree.
169                Err(Error::KeyNotFound)
170            }
171        }
172    }
173
174    /// Traverse through the tree, finding the node that contains a key. This will record
175    /// the CIDs of all nodes traversed.
176    pub fn traverse_find_path(
177        key: &str,
178    ) -> impl FnMut(Node, Cid) -> Result<TraverseAction<FindPathResult, Cid>, Error> + '_ {
179        move |node, cid| -> Result<_, Error> {
180            if let Some(index) = node.find_ge(key) {
181                if let Some(NodeEntry::Leaf(e)) = node.entries.get(index) {
182                    if e.key == key {
183                        return Ok(TraverseAction::Stop(FindPathResult::Found {
184                            node: cid,
185                            path: e.value,
186                        }));
187                    }
188                }
189
190                // Check if the left neighbor is a tree, and if so, recurse into it.
191                if let Some(index) = index.checked_sub(1) {
192                    if let Some(subtree) = node.entries.get(index).unwrap().tree() {
193                        Ok(TraverseAction::Continue((*subtree, cid)))
194                    } else {
195                        Ok(TraverseAction::Stop(FindPathResult::NotFound { node: cid }))
196                    }
197                } else {
198                    // There is no left neighbor. The key is not present.
199                    Ok(TraverseAction::Stop(FindPathResult::NotFound { node: cid }))
200                }
201            } else {
202                // We've recursed into an empty node, so the key is not present in the tree.
203                Ok(TraverseAction::Stop(FindPathResult::NotFound { node: cid }))
204            }
205        }
206    }
207
208    /// Traverse through the tree, finding the first node that consists of more than just a single
209    /// nested tree entry.
210    pub fn traverse_prune() -> impl FnMut(Node, Cid) -> Result<TraverseAction<Cid, usize>, Error> {
211        move |node, cid| -> Result<_, Error> {
212            if node.entries.len() == 1 {
213                if let Some(NodeEntry::Tree(cid)) = node.entries.first() {
214                    return Ok(TraverseAction::Continue((*cid, 0)));
215                }
216            }
217
218            Ok(TraverseAction::Stop(cid))
219        }
220    }
221
222    /// Recursively merge two subtrees into one.
223    pub async fn merge_subtrees(
224        mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
225        mut lc: Cid,
226        mut rc: Cid,
227    ) -> Result<Cid, Error> {
228        let mut node_path = vec![];
229
230        let (ln, rn) = loop {
231            // Traverse down both the left and right trees until we reach the first leaf node on either side.
232            let ln = Node::read_from(&mut bs, lc).await?;
233            let rn = Node::read_from(&mut bs, rc).await?;
234
235            if let (Some(NodeEntry::Tree(l)), Some(NodeEntry::Tree(r))) =
236                (ln.entries.last(), rn.entries.first())
237            {
238                node_path.push((ln.clone(), rn.clone()));
239
240                lc = *l;
241                rc = *r;
242            } else {
243                break (ln, rn);
244            }
245        };
246
247        // Merge the two nodes.
248        let node = Node { entries: ln.entries.into_iter().chain(rn.entries).collect() };
249        let mut cid = node.serialize_into(&mut bs).await?;
250
251        // Now go back up the node path chain and update parent entries.
252        for (ln, rn) in node_path.into_iter().rev() {
253            let node = Node {
254                entries: ln.entries[..ln.entries.len() - 1]
255                    .iter()
256                    .cloned()
257                    .chain([NodeEntry::Tree(cid)])
258                    .chain(rn.entries[1..].iter().cloned())
259                    .collect(),
260            };
261
262            cid = node.serialize_into(&mut bs).await?;
263        }
264
265        Ok(cid)
266    }
267
268    /// Recursively split a node based on a key.
269    ///
270    /// If the key is found within the subtree, this will return an error.
271    pub async fn split_subtree(
272        mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
273        node: Cid,
274        key: &str,
275    ) -> Result<(Option<Cid>, Option<Cid>), Error> {
276        let (node_path, (mut left, mut right)) = traverse(&mut bs, node, |mut node, _cid| {
277            if let Some(partition) = node.find_ge(key) {
278                // Ensure that the key does not already exist.
279                if let Some(NodeEntry::Leaf(e)) = node.entries.get(partition) {
280                    if e.key == key {
281                        return Err(Error::KeyAlreadyExists);
282                    }
283                }
284
285                // Determine if the left neighbor is a subtree. If so, we need to recursively split that tree.
286                if let Some(partition) = partition.checked_sub(1) {
287                    match node.entries.get(partition) {
288                        Some(NodeEntry::Leaf(_e)) => {
289                            // Left neighbor is a leaf, so we can split the current node into two and we are done.
290                            let right = node.entries.split_off(partition + 1);
291
292                            Ok(TraverseAction::Stop((
293                                Some(node),
294                                (!right.is_empty()).then_some(Node { entries: right }),
295                            )))
296                        }
297                        Some(NodeEntry::Tree(e)) => Ok(TraverseAction::Continue((*e, partition))),
298                        // This should not happen; node.find_ge() should return `None` in this case.
299                        None => panic!(),
300                    }
301                } else {
302                    Ok(TraverseAction::Stop((None, Some(node))))
303                }
304            } else {
305                todo!()
306            }
307        })
308        .await?;
309
310        // If the node was split into two, walk back up the path chain and split all parents.
311        for (mut parent, i) in node_path.into_iter().rev() {
312            // Remove the tree entry at the partition point.
313            parent.entries.remove(i);
314            let (e_left, e_right) = parent.entries.split_at(i);
315
316            if let Some(left) = left.as_mut() {
317                let left_cid = left.serialize_into(&mut bs).await?;
318                *left = Node {
319                    entries: e_left.iter().cloned().chain([NodeEntry::Tree(left_cid)]).collect(),
320                };
321            }
322
323            if let Some(right) = right.as_mut() {
324                let right_cid = right.serialize_into(&mut bs).await?;
325                *right = Node {
326                    entries: [NodeEntry::Tree(right_cid)]
327                        .into_iter()
328                        .chain(e_right.iter().cloned())
329                        .collect::<Vec<_>>(),
330                };
331            }
332        }
333
334        // Serialize the two new subtrees.
335        let left =
336            if let Some(left) = left { Some(left.serialize_into(&mut bs).await?) } else { None };
337        let right =
338            if let Some(right) = right { Some(right.serialize_into(&mut bs).await?) } else { None };
339
340        Ok((left, right))
341    }
342
343    /// Prune entries that contain a single nested tree entry from the root.
344    pub async fn prune(
345        mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
346        root: Cid,
347    ) -> Result<Cid, Error> {
348        let (_node_path, cid) = algos::traverse(&mut bs, root, algos::traverse_prune()).await?;
349        Ok(cid)
350    }
351
352    pub async fn add(
353        mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
354        root: Cid,
355        key: &str,
356        value: Cid,
357    ) -> Result<Cid, Error> {
358        // Compute the layer where this note should be added.
359        let target_layer = leading_zeroes(key.as_bytes());
360
361        // Now traverse to the node containing the target layer.
362        let mut node_path = vec![];
363        let mut node_cid = root;
364
365        // There are three cases we need to handle:
366        // 1) The target layer is above the tree (and our entire tree needs to be pushed down).
367        // 2) The target layer is contained within the tree (and we will traverse to find it).
368        // 3) The tree is currently empty (trivial).
369        let mut node = match compute_depth(&mut bs, root).await {
370            Ok(Some(layer)) => {
371                match layer.cmp(&target_layer) {
372                    // The new key can be inserted into the root node.
373                    Ordering::Equal => Node::read_from(&mut bs, node_cid).await?,
374                    // The entire tree needs to be shifted down.
375                    Ordering::Less => {
376                        let mut layer = layer + 1;
377
378                        loop {
379                            let node = Node { entries: vec![NodeEntry::Tree(node_cid)] };
380
381                            if layer < target_layer {
382                                node_cid = node.serialize_into(&mut bs).await?;
383                                layer += 1;
384                            } else {
385                                break node;
386                            }
387                        }
388                    }
389                    // Search in a subtree (most common).
390                    Ordering::Greater => {
391                        let mut layer = layer;
392
393                        // Traverse to the lowest possible layer in the tree.
394                        let (path, (mut node, partition)) =
395                            algos::traverse(&mut bs, node_cid, |node, _cid| {
396                                if layer == target_layer {
397                                    Ok(algos::TraverseAction::Stop((node, 0)))
398                                } else {
399                                    let partition = node.find_ge(key).unwrap();
400
401                                    // If left neighbor is a subtree, recurse through.
402                                    if let Some(partition) = partition.checked_sub(1) {
403                                        if let Some(subtree) =
404                                            node.entries.get(partition).unwrap().tree()
405                                        {
406                                            layer -= 1;
407                                            return Ok(algos::TraverseAction::Continue((
408                                                *subtree, partition,
409                                            )));
410                                        }
411                                    }
412
413                                    Ok(algos::TraverseAction::Stop((node, partition)))
414                                }
415                            })
416                            .await?;
417
418                        node_path = path;
419                        if layer == target_layer {
420                            // A pre-existing node was found on the same layer.
421                            node
422                        } else {
423                            // Insert a new dummy tree entry and push the last node onto the node path.
424                            node.entries.insert(partition, NodeEntry::Tree(Cid::default()));
425                            node_path.push((node, partition));
426                            layer -= 1;
427
428                            // Insert empty nodes until we reach the target layer.
429                            while layer != target_layer {
430                                let node = Node { entries: vec![NodeEntry::Tree(Cid::default())] };
431
432                                node_path.push((node.clone(), 0));
433                                layer -= 1;
434                            }
435
436                            // Insert the new leaf node.
437                            Node { entries: vec![] }
438                        }
439                    }
440                }
441            }
442            Ok(None) => {
443                // The tree is currently empty.
444                Node { entries: vec![] }
445            }
446            Err(e) => return Err(e),
447        };
448
449        if let Some(partition) = node.find_ge(key) {
450            // Check if the key is already present in the node.
451            if let Some(NodeEntry::Leaf(e)) = node.entries.get(partition) {
452                if e.key == key {
453                    return Err(Error::KeyAlreadyExists);
454                }
455            }
456
457            if let Some(partition) = partition.checked_sub(1) {
458                match node.entries.get(partition) {
459                    Some(NodeEntry::Leaf(_)) => {
460                        // Left neighbor is a leaf, so we can simply insert this leaf to its right.
461                        node.entries.insert(
462                            partition + 1,
463                            NodeEntry::Leaf(TreeEntry { key: key.to_string(), value }),
464                        );
465                    }
466                    Some(NodeEntry::Tree(e)) => {
467                        // Need to split the subtree into two based on the node's key.
468                        let (left, right) = algos::split_subtree(&mut bs, *e, key).await?;
469
470                        // Insert the new node inbetween the two subtrees.
471                        let right_subvec = node.entries.split_off(partition + 1);
472
473                        node.entries.pop();
474                        if let Some(left) = left {
475                            node.entries.push(NodeEntry::Tree(left));
476                        }
477                        node.entries
478                            .extend([NodeEntry::Leaf(TreeEntry { key: key.to_string(), value })]);
479                        if let Some(right) = right {
480                            node.entries.push(NodeEntry::Tree(right));
481                        }
482                        node.entries.extend(right_subvec.into_iter());
483                    }
484                    // Should be impossible. The node is empty in this case, and that is handled below.
485                    None => unreachable!(),
486                }
487            } else {
488                // Key is already located at leftmost position, so we can simply prepend the new node.
489                node.entries.insert(0, NodeEntry::Leaf(TreeEntry { key: key.to_string(), value }));
490            }
491        } else {
492            // The node is empty! Just append the new key to this node's entries.
493            node.entries.push(NodeEntry::Leaf(TreeEntry { key: key.to_string(), value }));
494        }
495
496        let mut cid = node.serialize_into(&mut bs).await?;
497
498        // Now walk back up the node path chain and update parent entries to point to the new node's CID.
499        for (mut parent, i) in node_path.into_iter().rev() {
500            parent.entries[i] = NodeEntry::Tree(cid);
501            cid = parent.serialize_into(&mut bs).await?;
502        }
503
504        Ok(cid)
505    }
506
507    pub async fn update(
508        mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
509        root: Cid,
510        key: &str,
511        value: Cid,
512    ) -> Result<Cid, Error> {
513        let (node_path, (mut node, index)) =
514            algos::traverse(&mut bs, root, algos::traverse_find(key)).await?;
515
516        // Update the value.
517        node.entries[index] = NodeEntry::Leaf(TreeEntry { key: key.to_string(), value });
518
519        let mut cid = node.serialize_into(&mut bs).await?;
520
521        // Now walk up the node path chain and update parent entries to point to the new node's CID.
522        for (mut parent, i) in node_path.into_iter().rev() {
523            parent.entries[i] = NodeEntry::Tree(cid);
524            cid = parent.serialize_into(&mut bs).await?;
525        }
526
527        Ok(cid)
528    }
529
530    pub async fn delete(
531        mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
532        root: Cid,
533        key: &str,
534    ) -> Result<Cid, Error> {
535        let (node_path, (mut node, index)) =
536            algos::traverse(&mut bs, root, algos::traverse_find(key)).await?;
537
538        // Remove the key.
539        node.entries.remove(index);
540
541        if let Some(index) = index.checked_sub(1) {
542            // Check to see if the left and right neighbors are both trees. If so, merge them.
543            if let (Some(NodeEntry::Tree(lc)), Some(NodeEntry::Tree(rc))) =
544                (node.entries.get(index), node.entries.get(index + 1))
545            {
546                let cid = algos::merge_subtrees(&mut bs, *lc, *rc).await?;
547                node.entries[index] = NodeEntry::Tree(cid);
548                node.entries.remove(index + 1);
549            }
550        }
551
552        // Option-alize the node depending on whether or not it is empty.
553        let node = (!node.entries.is_empty()).then_some(node);
554
555        let mut cid =
556            if let Some(node) = node { Some(node.serialize_into(&mut bs).await?) } else { None };
557
558        // Now walk back up the node path chain and update parent entries to point to the new node's CID.
559        for (mut parent, i) in node_path.into_iter().rev() {
560            if let Some(cid) = cid.as_mut() {
561                parent.entries[i] = NodeEntry::Tree(*cid);
562                *cid = parent.serialize_into(&mut bs).await?;
563            } else {
564                // The node ended up becoming empty, so it will be orphaned.
565                // Note that we can safely delete this entry from the parent because it's guaranteed that
566                // two trees will never be adjacent (and thus no merging is required).
567                parent.entries.remove(i);
568
569                // If the parent also becomes empty, orphan it.
570                cid = if parent.entries.is_empty() {
571                    None
572                } else {
573                    Some(parent.serialize_into(&mut bs).await?)
574                };
575            }
576        }
577
578        let cid = if let Some(cid) = cid {
579            cid
580        } else {
581            // The tree is now empty. Create a new empty node.
582            let node = Node { entries: vec![] };
583            node.serialize_into(&mut bs).await?
584        };
585
586        let cid = prune(&mut bs, cid).await?;
587        Ok(cid)
588    }
589}
590
591// https://users.rust-lang.org/t/how-to-find-common-prefix-of-two-byte-slices-effectively/25815/3
592fn prefix(xs: &[u8], ys: &[u8]) -> usize {
593    prefix_chunks::<128>(xs, ys)
594}
595
596fn prefix_chunks<const N: usize>(xs: &[u8], ys: &[u8]) -> usize {
597    // N.B: We take exact chunks here to entice the compiler to autovectorize this loop.
598    let off =
599        std::iter::zip(xs.chunks_exact(N), ys.chunks_exact(N)).take_while(|(x, y)| x == y).count()
600            * N;
601    off + std::iter::zip(&xs[off..], &ys[off..]).take_while(|(x, y)| x == y).count()
602}
603
604/// Calculate the number of leading zeroes from the sha256 hash of a byte array
605///
606/// Reference: https://github.com/bluesky-social/atproto/blob/13636ba963225407f63c20253b983a92dcfe1bfa/packages/repo/src/mst/util.ts#L8-L23
607fn leading_zeroes(key: &[u8]) -> usize {
608    let digest = sha2::Sha256::digest(key);
609    let mut zeroes = 0;
610
611    for byte in digest.iter() {
612        zeroes += (*byte < 0b0100_0000) as usize; // 64
613        zeroes += (*byte < 0b0001_0000) as usize; // 16
614        zeroes += (*byte < 0b0000_0100) as usize; // 4
615        zeroes += (*byte < 0b0000_0001) as usize; // 1
616
617        if *byte != 0 {
618            // If the byte is nonzero, then there cannot be any more leading zeroes.
619            break;
620        }
621    }
622
623    zeroes
624}
625
626/// A merkle search tree data structure, backed by storage implementing
627/// [AsyncBlockStoreRead] and optionally [AsyncBlockStoreWrite].
628///
629/// This data structure is merely a convenience structure that implements
630/// algorithms that handle certain common operations one may want to perform
631/// against a MST.
632///
633/// The structure does not actually load the merkle search tree into memory
634/// or perform any deep copies. The tree itself lives entirely inside of the
635/// provided backing storage. This also carries the implication that any operation
636/// performed against the tree will have performance that reflects that of accesses
637/// to the backing storage.
638///
639/// If your backing storage is implemented by a cloud service, such as a
640/// database or block storage service, you will likely want to insert a
641/// caching layer in your block storage to ensure that performance remains
642/// fast.
643///
644/// ---
645///
646/// There are two factors that determine the placement of nodes inside of
647/// a merkle search tree:
648/// - The number of leading zeroes in the SHA256 hash of the key
649/// - The key's lexicographic position inside of a layer
650///
651/// # Reference
652/// * Official documentation: https://atproto.com/guides/data-repos
653/// * Useful reading: https://interjectedfuture.com/crdts-turned-inside-out/
654pub struct Tree<S> {
655    storage: S,
656    root: Cid,
657}
658
659// N.B: It's trivial to clone the tree if it's trivial to clone the backing storage,
660// so implement clone if the storage also implements it.
661impl<S: Clone> Clone for Tree<S> {
662    fn clone(&self) -> Self {
663        Self { storage: self.storage.clone(), root: self.root }
664    }
665}
666
667impl<S> std::fmt::Debug for Tree<S> {
668    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
669        f.debug_struct("Tree").field("root", &self.root).finish_non_exhaustive()
670    }
671}
672
673impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
674    /// Create a new MST with an empty root node
675    pub async fn create(mut storage: S) -> Result<Self, Error> {
676        let node = Node { entries: vec![] };
677        let cid = node.serialize_into(&mut storage).await?;
678
679        Ok(Self { storage, root: cid })
680    }
681
682    /// Add a new key with the specified value to the tree.
683    pub async fn add(&mut self, key: &str, value: Cid) -> Result<(), Error> {
684        self.root = algos::add(&mut self.storage, self.root, key, value).await?;
685        Ok(())
686    }
687
688    /// Update an existing key with a new value.
689    pub async fn update(&mut self, key: &str, value: Cid) -> Result<(), Error> {
690        self.root = algos::update(&mut self.storage, self.root, key, value).await?;
691        Ok(())
692    }
693
694    /// Delete a key from the tree.
695    pub async fn delete(&mut self, key: &str) -> Result<(), Error> {
696        self.root = algos::delete(&mut self.storage, self.root, key).await?;
697        Ok(())
698    }
699}
700
701impl<S: AsyncBlockStoreRead> Tree<S> {
702    /// Open a pre-existing merkle search tree.
703    ///
704    /// This is a very cheap operation that does not actually load the MST
705    /// or check its validity. You should only use this with data from a trusted
706    /// source.
707    pub fn open(storage: S, root: Cid) -> Self {
708        Self { storage, root }
709    }
710
711    /// Return the CID of the root node.
712    pub fn root(&self) -> Cid {
713        self.root
714    }
715
716    /// Compute the depth of the merkle search tree from either the specified node or the root
717    pub async fn depth(&mut self, node: Option<Cid>) -> Result<Option<usize>, Error> {
718        algos::compute_depth(&mut self.storage, node.unwrap_or(self.root)).await
719    }
720
721    /// Returns a stream of all CIDs in the tree or referenced by the tree.
722    pub fn export(&mut self) -> impl Stream<Item = Result<Cid, Error>> + '_ {
723        // Start from the root of the tree.
724        let mut stack = vec![Located::InSubtree(self.root)];
725
726        try_stream! {
727            while let Some(e) = stack.pop() {
728                match e {
729                    Located::InSubtree(cid) => {
730                        let node = Node::read_from(&mut self.storage, cid).await?;
731                        yield cid;
732
733                        for entry in node.entries.iter().rev() {
734                            match entry {
735                                NodeEntry::Tree(entry) => {
736                                    stack.push(Located::InSubtree(*entry));
737                                }
738                                NodeEntry::Leaf(entry) => {
739                                    stack.push(Located::Entry(entry.value));
740                                }
741                            }
742                        }
743                    }
744                    Located::Entry(value) => yield value,
745                }
746            }
747        }
748    }
749
750    /// Returns a stream of all entries in this tree, in lexicographic order.
751    ///
752    /// This function will _not_ work with a partial MST, such as one received from
753    /// a firehose record.
754    pub fn entries(&mut self) -> impl Stream<Item = Result<(String, Cid), Error>> + '_ {
755        // Start from the root of the tree.
756        let mut stack = vec![Located::InSubtree(self.root)];
757
758        try_stream! {
759            while let Some(e) = stack.pop() {
760                match e {
761                    Located::InSubtree(cid) => {
762                        let node = Node::read_from(&mut self.storage, cid).await?;
763                        for entry in node.entries.iter().rev() {
764                            match entry {
765                                NodeEntry::Tree(entry) => {
766                                    stack.push(Located::InSubtree(*entry));
767                                }
768                                NodeEntry::Leaf(entry) => {
769                                    stack.push(Located::Entry((entry.key.clone(), entry.value)));
770                                }
771                            }
772                        }
773                    }
774                    Located::Entry((key, value)) => yield (key, value),
775                }
776            }
777        }
778    }
779
780    /// Returns a stream of all keys starting with the specified prefix, in lexicographic order.
781    ///
782    /// This function will _not_ work with a partial MST, such as one received from
783    /// a firehose record.
784    pub fn entries_prefixed<'a>(
785        &'a mut self,
786        prefix: &'a str,
787    ) -> impl Stream<Item = Result<(String, Cid), Error>> + 'a {
788        // Start from the root of the tree.
789        let mut stack = vec![Located::InSubtree(self.root)];
790
791        try_stream! {
792            while let Some(e) = stack.pop() {
793                match e {
794                    Located::InSubtree(cid) => {
795                        let node = Node::read_from(&mut self.storage, cid).await?;
796                        for entry in node.entries_with_prefix(prefix).rev() {
797                            match entry {
798                                NodeEntry::Tree(entry) => {
799                                    stack.push(Located::InSubtree(entry));
800                                }
801                                NodeEntry::Leaf(entry) => {
802                                    stack.push(Located::Entry((entry.key.clone(), entry.value)));
803                                }
804                            }
805                        }
806                    }
807                    Located::Entry((key, value)) => yield (key, value),
808                }
809            }
810        }
811    }
812
813    /// Returns a stream of all keys in this tree, in lexicographic order.
814    ///
815    /// This function will _not_ work with a partial MST, such as one received from
816    /// a firehose record.
817    pub fn keys(&mut self) -> impl Stream<Item = Result<String, Error>> + '_ {
818        self.entries().map(|e| e.map(|(k, _)| k))
819    }
820
821    /// Returns a stream of all keys in this tree with the specified prefix, in lexicographic order.
822    ///
823    /// This function will _not_ work with a partial MST, such as one received from
824    /// a firehose record.
825    pub fn keys_prefixed<'a>(
826        &'a mut self,
827        prefix: &'a str,
828    ) -> impl Stream<Item = Result<String, Error>> + 'a {
829        self.entries_prefixed(prefix).map(|e| e.map(|(k, _)| k))
830    }
831
832    /// Returns the specified record from the repository, or `None` if it does not exist.
833    pub async fn get(&mut self, key: &str) -> Result<Option<Cid>, Error> {
834        match algos::traverse(&mut self.storage, self.root, algos::traverse_find(key)).await {
835            // FIXME: The `unwrap` call here isn't preferable, but it is guaranteed to succeed.
836            Ok((_node_path, (node, index))) => Ok(Some(node.entries[index].leaf().unwrap().value)),
837            Err(Error::KeyNotFound) => Ok(None),
838            Err(e) => Err(e),
839        }
840    }
841
842    /// Returns the full path to a node that contains the specified key (including the containing node).
843    ///
844    /// If the key is not present in the tree, this will return the path to the node that would've contained
845    /// the key.
846    ///
847    /// This is useful for exporting portions of the MST for e.g. generating firehose records.
848    pub async fn extract_path(&mut self, key: &str) -> Result<impl Iterator<Item = Cid>, Error> {
849        // HACK: Create a common vector type that can be returned on all paths.
850        let mut r = Vec::new();
851
852        match algos::traverse(&mut self.storage, self.root, algos::traverse_find_path(key)).await {
853            Ok((node_path, FindPathResult::Found { node, path })) => {
854                r.extend(node_path.into_iter().map(|(_, cid)| cid).chain([node, path]));
855                Ok(r.into_iter())
856            }
857            Ok((node_path, FindPathResult::NotFound { node })) => {
858                r.extend(node_path.into_iter().map(|(_, cid)| cid).chain([node]));
859                Ok(r.into_iter())
860            }
861            Err(e) => Err(e),
862        }
863    }
864}
865
866/// The location of an entry in a Merkle Search Tree.
867#[derive(Debug)]
868pub enum Located<E> {
869    /// The tree entry corresponding to a key.
870    Entry(E),
871    /// The CID of the [`Node`] containing the sub-tree in which a key is located.
872    InSubtree(Cid),
873}
874
875#[derive(Debug, Clone, PartialEq)]
876enum NodeEntry {
877    /// A nested node.
878    Tree(Cid),
879    /// A tree entry.
880    Leaf(TreeEntry),
881}
882
883impl NodeEntry {
884    fn tree(&self) -> Option<&Cid> {
885        match self {
886            NodeEntry::Tree(cid) => Some(cid),
887            _ => None,
888        }
889    }
890
891    fn leaf(&self) -> Option<&TreeEntry> {
892        match self {
893            NodeEntry::Leaf(entry) => Some(entry),
894            _ => None,
895        }
896    }
897}
898
899/// A node in a Merkle Search Tree.
900#[derive(Debug, Clone)]
901struct Node {
902    /// The entries within this node.
903    ///
904    /// This list has the special property that no two `Tree` variants can be adjacent.
905    entries: Vec<NodeEntry>,
906}
907
908impl Node {
909    /// Parses an MST node from its DAG-CBOR encoding.
910    pub fn parse(bytes: &[u8]) -> Result<Self, Error> {
911        let node: schema::Node = serde_ipld_dagcbor::from_slice(bytes)?;
912
913        let mut entries = vec![];
914        if let Some(left) = &node.left {
915            entries.push(NodeEntry::Tree(*left));
916        }
917
918        let mut prev_key = vec![];
919        for entry in &node.entries {
920            let parsed_entry = TreeEntry::parse(entry.clone(), &prev_key)?;
921            prev_key = parsed_entry.key.as_bytes().to_vec();
922
923            entries.push(NodeEntry::Leaf(parsed_entry));
924
925            // Nested subtrees are located to the right of the entry.
926            if let Some(tree) = &entry.tree {
927                entries.push(NodeEntry::Tree(*tree));
928            }
929        }
930
931        Ok(Self { entries })
932    }
933
934    /// Read and parse a node from block storage
935    pub async fn read_from(mut bs: impl AsyncBlockStoreRead, cid: Cid) -> Result<Self, Error> {
936        let bytes = bs.read_block(cid).await?;
937        Self::parse(&bytes)
938    }
939
940    pub async fn serialize_into(&self, mut bs: impl AsyncBlockStoreWrite) -> Result<Cid, Error> {
941        let mut node = schema::Node { left: None, entries: vec![] };
942
943        // Special case: if the first entry is a tree, that gets inserted into the node directly.
944        let ents = match self.entries.first() {
945            Some(NodeEntry::Tree(cid)) => {
946                node.left = Some(*cid);
947                &self.entries[1..]
948            }
949            _ => &self.entries,
950        };
951
952        let mut prev_key = vec![];
953        let mut i = 0usize;
954        while i != ents.len() {
955            let (leaf, tree) = match (ents.get(i), ents.get(i + 1)) {
956                (Some(NodeEntry::Tree(_)), Some(NodeEntry::Tree(_))) => {
957                    // We should never encounter this. If this is hit, something went wrong when modifying the tree.
958                    panic!("attempted to serialize node with two adjacent trees")
959                }
960                (Some(NodeEntry::Leaf(leaf)), Some(NodeEntry::Tree(tree))) => (leaf, Some(tree)),
961                (Some(NodeEntry::Leaf(leaf)), _) => (leaf, None),
962                // Skip this window if the first entry is not a leaf.
963                _ => {
964                    i += 1;
965                    continue;
966                }
967            };
968
969            let prefix = prefix(&prev_key, leaf.key.as_bytes());
970
971            node.entries.push(schema::TreeEntry {
972                prefix_len: prefix,
973                key_suffix: leaf.key.as_bytes()[prefix..].to_vec(),
974                value: leaf.value,
975                tree: tree.cloned(),
976            });
977
978            prev_key = leaf.key.as_bytes().to_vec();
979            i += 1;
980        }
981
982        let bytes = serde_ipld_dagcbor::to_vec(&node).unwrap();
983        Ok(bs.write_block(DAG_CBOR, SHA2_256, &bytes).await?)
984    }
985
986    /// Return an iterator of the subtrees contained within this node
987    fn trees(&self) -> impl Iterator<Item = &Cid> {
988        self.entries.iter().filter_map(|entry| match entry {
989            NodeEntry::Tree(entry) => Some(entry),
990            _ => None,
991        })
992    }
993
994    /// Return an iterator of the leaves contained within this node
995    fn leaves(&self) -> impl Iterator<Item = &TreeEntry> {
996        self.entries.iter().filter_map(|entry| match entry {
997            NodeEntry::Leaf(entry) => Some(entry),
998            _ => None,
999        })
1000    }
1001
1002    /// Computes the node's layer, or returns `None` if this node has no leaves.
1003    fn layer(&self) -> Option<usize> {
1004        self.leaves().next().map(|e| leading_zeroes(e.key.as_bytes()))
1005    }
1006
1007    /// Find the index of the first leaf node that has a key greater than or equal to the provided key.
1008    ///
1009    /// This may return an index that is equal to the length of `self.entries` (or in other words, OOB).
1010    /// If the node has no entries, this will return `None`.
1011    fn find_ge(&self, key: &str) -> Option<usize> {
1012        let mut e = self.entries.iter().enumerate().filter_map(|(i, e)| e.leaf().map(|e| (i, e)));
1013
1014        if let Some((i, _e)) = e.find(|(_i, e)| e.key.as_str() >= key) {
1015            Some(i)
1016        } else if !self.entries.is_empty() {
1017            Some(self.entries.len())
1018        } else {
1019            None
1020        }
1021    }
1022
1023    /// Finds the location of the given key's value within this sub-tree.
1024    ///
1025    /// Returns `None` if the key does not exist within this sub-tree.
1026    #[allow(dead_code)]
1027    pub fn get(&self, key: &str) -> Option<Located<Cid>> {
1028        let i = self.find_ge(key)?;
1029
1030        if let Some(NodeEntry::Leaf(e)) = self.entries.get(i) {
1031            if e.key == key {
1032                return Some(Located::Entry(e.value));
1033            }
1034        }
1035
1036        if let Some(NodeEntry::Tree(cid)) = self.entries.get(i - 1) {
1037            Some(Located::InSubtree(*cid))
1038        } else {
1039            None
1040        }
1041    }
1042
1043    /// Returns the locations of values for all keys within this sub-tree with the given
1044    /// prefix.
1045    pub fn entries_with_prefix<'a>(
1046        &'a self,
1047        prefix: &str,
1048    ) -> impl DoubleEndedIterator<Item = NodeEntry> + 'a {
1049        let mut list = Vec::new();
1050
1051        let index = if let Some(i) = self.find_ge(prefix) {
1052            i
1053        } else {
1054            // Special case: The tree is empty.
1055            return list.into_iter();
1056        };
1057
1058        // Check to see if the left neighbor is a subtree.
1059        if let Some(index) = index.checked_sub(1) {
1060            if let Some(NodeEntry::Tree(cid)) = self.entries.get(index) {
1061                list.push(NodeEntry::Tree(*cid));
1062            }
1063        }
1064
1065        if let Some(e) = self.entries.get(index..) {
1066            for e in e {
1067                if let NodeEntry::Leaf(t) = e {
1068                    // Ensure the specified prefix is not longer than the key.
1069                    // If the key is shorter than the prefix, it is always lexicographically lesser.
1070                    if let Some(kp) = t.key.get(..prefix.len()) {
1071                        match kp.cmp(prefix) {
1072                            Ordering::Less => (),
1073                            Ordering::Equal => {
1074                                list.push(NodeEntry::Leaf(t.clone()));
1075                                continue;
1076                            }
1077                            Ordering::Greater => {
1078                                // This leaf node has a key that is lexicographically greater than
1079                                // the prefix. Stop the search now since we won't find any more
1080                                // matching entries.
1081                                break;
1082                            }
1083                        }
1084                    }
1085                }
1086
1087                list.push(e.clone());
1088            }
1089        }
1090
1091        list.into_iter()
1092    }
1093}
1094
1095#[derive(Debug, Clone, PartialEq)]
1096struct TreeEntry {
1097    key: String,
1098    value: Cid,
1099}
1100
1101impl TreeEntry {
1102    fn parse(entry: schema::TreeEntry, prev_key: &[u8]) -> Result<Self, Error> {
1103        let key = if entry.prefix_len == 0 {
1104            entry.key_suffix
1105        } else if prev_key.len() < entry.prefix_len {
1106            return Err(Error::InvalidPrefixLen);
1107        } else {
1108            let mut key_bytes = prev_key[..entry.prefix_len].to_vec();
1109            key_bytes.extend(entry.key_suffix);
1110            key_bytes
1111        };
1112
1113        let key = String::from_utf8(key).map_err(|e| e.utf8_error())?;
1114
1115        Ok(Self { key, value: entry.value })
1116    }
1117}
1118
1119/// Errors that can occur while interacting with an MST.
1120#[derive(Debug, thiserror::Error)]
1121pub enum Error {
1122    #[error("Invalid prefix_len")]
1123    InvalidPrefixLen,
1124    #[error("the key is already present in the tree")]
1125    KeyAlreadyExists,
1126    #[error("the key is not present in the tree")]
1127    KeyNotFound,
1128    #[error("Invalid key: {0}")]
1129    InvalidKey(#[from] std::str::Utf8Error),
1130    #[error("blockstore error: {0}")]
1131    BlockStore(#[from] crate::blockstore::Error),
1132    #[error("serde_ipld_dagcbor decoding error: {0}")]
1133    Parse(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
1134}
1135
1136#[cfg(test)]
1137mod test {
1138    use std::str::FromStr;
1139
1140    use futures::TryStreamExt;
1141    use ipld_core::cid::multihash::Multihash;
1142
1143    use crate::blockstore::{MemoryBlockStore, SHA2_256};
1144
1145    use super::*;
1146
1147    /// Returns a dummy value Cid used for testing.
1148    ///
1149    /// b"bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454"
1150    fn value_cid() -> Cid {
1151        Cid::new_v1(
1152            DAG_CBOR,
1153            match Multihash::wrap(
1154                SHA2_256,
1155                &[
1156                    0x9d, 0x15, 0x6b, 0xc3, 0xf3, 0xa5, 0x20, 0x06, 0x62, 0x52, 0xc7, 0x08, 0xa9,
1157                    0x36, 0x1f, 0xd3, 0xd0, 0x89, 0x22, 0x38, 0x42, 0x50, 0x0e, 0x37, 0x13, 0xd4,
1158                    0x04, 0xfd, 0xcc, 0xb3, 0x3c, 0xef,
1159                ],
1160            ) {
1161                Ok(h) => h,
1162                Err(_e) => panic!(),
1163            },
1164        )
1165    }
1166
1167    #[test]
1168    fn test_prefix() {
1169        assert_eq!(
1170            prefix(b"com.example.record/3jqfcqzm3fo2j", b"com.example.record/3jqfcqzm3fo2j"),
1171            32
1172        );
1173        assert_eq!(
1174            prefix(b"com.example.record/3jqfcqzm3fo2j", b"com.example.record/7jqfcqzm3fo2j"),
1175            19
1176        );
1177    }
1178
1179    #[test]
1180    fn test_clz() {
1181        assert_eq!(leading_zeroes(b""), 0);
1182        assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fn2j"), 0); // level 0
1183        assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fo2j"), 0); // level 0
1184        assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fp2j"), 0); // level 0
1185        assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fs2j"), 1); // level 1
1186        assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3ft2j"), 0); // level 0
1187        assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fu2j"), 0); // level 0
1188        assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fx2j"), 2); // level 2
1189    }
1190
1191    #[test]
1192    fn node_find_ge() {
1193        let node = Node { entries: vec![] };
1194        assert_eq!(node.find_ge("com.example.record/3jqfcqzm3fp2j"), None);
1195
1196        let node = Node {
1197            entries: vec![NodeEntry::Leaf(TreeEntry {
1198                key: "com.example.record/3jqfcqzm3fs2j".to_string(), // '3..s'
1199                value: value_cid(),
1200            })],
1201        };
1202
1203        assert_eq!(node.find_ge("com.example.record/3jqfcqzm3fp2j"), Some(0)); // '3..p'
1204        assert_eq!(node.find_ge("com.example.record/3jqfcqzm3fs2j"), Some(0)); // '3..s'
1205        assert_eq!(node.find_ge("com.example.record/3jqfcqzm3ft2j"), Some(1)); // '3..t'
1206        assert_eq!(node.find_ge("com.example.record/3jqfcqzm4fc2j"), Some(1)); // '4..c'
1207    }
1208
1209    #[tokio::test]
1210    async fn mst_create() {
1211        let bs = MemoryBlockStore::new();
1212        let tree = Tree::create(bs).await.unwrap();
1213
1214        assert_eq!(
1215            tree.root,
1216            Cid::from_str("bafyreie5737gdxlw5i64vzichcalba3z2v5n6icifvx5xytvske7mr3hpm").unwrap()
1217        );
1218    }
1219
1220    #[tokio::test]
1221    async fn mst_create_trivial() {
1222        let bs = MemoryBlockStore::new();
1223        let mut tree = Tree::create(bs).await.unwrap();
1224
1225        tree.add("com.example.record/3jqfcqzm3fo2j", value_cid()).await.unwrap();
1226
1227        assert_eq!(
1228            tree.root,
1229            Cid::from_str("bafyreibj4lsc3aqnrvphp5xmrnfoorvru4wynt6lwidqbm2623a6tatzdu").unwrap()
1230        );
1231    }
1232
1233    #[tokio::test]
1234    async fn mst_create_singlelayer2() {
1235        let bs = MemoryBlockStore::new();
1236        let mut tree = Tree::create(bs).await.unwrap();
1237
1238        tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap();
1239
1240        assert_eq!(
1241            tree.root,
1242            Cid::from_str("bafyreih7wfei65pxzhauoibu3ls7jgmkju4bspy4t2ha2qdjnzqvoy33ai").unwrap()
1243        );
1244    }
1245
1246    #[tokio::test]
1247    async fn mst_create_simple() {
1248        let bs = MemoryBlockStore::new();
1249        let mut tree = Tree::create(bs).await.unwrap();
1250
1251        tree.add("com.example.record/3jqfcqzm3fp2j", value_cid()).await.unwrap(); // level 0
1252        tree.add("com.example.record/3jqfcqzm3fr2j", value_cid()).await.unwrap(); // level 0
1253        tree.add("com.example.record/3jqfcqzm3fs2j", value_cid()).await.unwrap(); // level 1
1254        tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); // level 0
1255        tree.add("com.example.record/3jqfcqzm4fc2j", value_cid()).await.unwrap(); // level 0
1256
1257        assert_eq!(
1258            tree.root,
1259            Cid::from_str("bafyreicmahysq4n6wfuxo522m6dpiy7z7qzym3dzs756t5n7nfdgccwq7m").unwrap()
1260        );
1261
1262        // Ensure keys are returned in lexicographic order.
1263        let keys = tree.keys().try_collect::<Vec<_>>().await.unwrap();
1264        assert_eq!(
1265            keys.as_slice(),
1266            &[
1267                "com.example.record/3jqfcqzm3fp2j",
1268                "com.example.record/3jqfcqzm3fr2j",
1269                "com.example.record/3jqfcqzm3fs2j",
1270                "com.example.record/3jqfcqzm3ft2j",
1271                "com.example.record/3jqfcqzm4fc2j",
1272            ]
1273        )
1274    }
1275
1276    #[tokio::test]
1277    async fn mst_trim_top() {
1278        let bs = MemoryBlockStore::new();
1279        let mut tree = Tree::create(bs).await.unwrap();
1280
1281        tree.add("com.example.record/3jqfcqzm3fn2j", value_cid()).await.unwrap(); // level 0
1282        tree.add("com.example.record/3jqfcqzm3fo2j", value_cid()).await.unwrap(); // level 0
1283        tree.add("com.example.record/3jqfcqzm3fp2j", value_cid()).await.unwrap(); // level 0
1284        tree.add("com.example.record/3jqfcqzm3fs2j", value_cid()).await.unwrap(); // level 1
1285        tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); // level 0
1286        tree.add("com.example.record/3jqfcqzm3fu2j", value_cid()).await.unwrap(); // level 0
1287
1288        assert_eq!(
1289            tree.root,
1290            Cid::from_str("bafyreifnqrwbk6ffmyaz5qtujqrzf5qmxf7cbxvgzktl4e3gabuxbtatv4").unwrap()
1291        );
1292        assert_eq!(tree.depth(None).await.unwrap(), Some(1));
1293
1294        tree.delete("com.example.record/3jqfcqzm3fs2j").await.unwrap(); // level 1
1295
1296        assert_eq!(
1297            tree.root,
1298            Cid::from_str("bafyreie4kjuxbwkhzg2i5dljaswcroeih4dgiqq6pazcmunwt2byd725vi").unwrap()
1299        );
1300    }
1301
1302    #[tokio::test]
1303    async fn mst_insertion_split() {
1304        let bs = MemoryBlockStore::new();
1305        let mut tree = Tree::create(bs).await.unwrap();
1306
1307        let root11 =
1308            Cid::from_str("bafyreiettyludka6fpgp33stwxfuwhkzlur6chs4d2v4nkmq2j3ogpdjem").unwrap();
1309        let root12 =
1310            Cid::from_str("bafyreid2x5eqs4w4qxvc5jiwda4cien3gw2q6cshofxwnvv7iucrmfohpm").unwrap();
1311
1312        /*
1313         *
1314         *                *                                  *
1315         *       _________|________                      ____|_____
1316         *       |   |    |    |   |                    |    |     |
1317         *       *   d    *    i   *       ->           *    f     *
1318         *     __|__    __|__    __|__                __|__      __|___
1319         *    |  |  |  |  |  |  |  |  |              |  |  |    |  |   |
1320         *    a  b  c  e  g  h  j  k  l              *  d  *    *  i   *
1321         *                                         __|__   |   _|_   __|__
1322         *                                        |  |  |  |  |   | |  |  |
1323         *                                        a  b  c  e  g   h j  k  l
1324         *
1325         */
1326        tree.add("com.example.record/3jqfcqzm3fo2j", value_cid()).await.unwrap(); // A; level 0
1327        tree.add("com.example.record/3jqfcqzm3fp2j", value_cid()).await.unwrap(); // B; level 0
1328        tree.add("com.example.record/3jqfcqzm3fr2j", value_cid()).await.unwrap(); // C; level 0
1329        tree.add("com.example.record/3jqfcqzm3fs2j", value_cid()).await.unwrap(); // D; level 1
1330        tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); // E; level 0
1331                                                                                  // GAP for F
1332        tree.add("com.example.record/3jqfcqzm3fz2j", value_cid()).await.unwrap(); // G; level 0
1333        tree.add("com.example.record/3jqfcqzm4fc2j", value_cid()).await.unwrap(); // H; level 0
1334        tree.add("com.example.record/3jqfcqzm4fd2j", value_cid()).await.unwrap(); // I; level 1
1335        tree.add("com.example.record/3jqfcqzm4fg2j", value_cid()).await.unwrap(); // K; level 0
1336        tree.add("com.example.record/3jqfcqzm4ff2j", value_cid()).await.unwrap(); // J; level 0
1337        tree.add("com.example.record/3jqfcqzm4fh2j", value_cid()).await.unwrap(); // L; level 0
1338
1339        assert_eq!(tree.root, root11);
1340
1341        // insert F, which will push E out of the node with G+H to a new node under D
1342        tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap(); // F; level 2
1343
1344        assert_eq!(tree.root, root12);
1345
1346        // insert K again. An error should be returned.
1347        assert!(matches!(
1348            tree.add("com.example.record/3jqfcqzm4fg2j", value_cid()).await.unwrap_err(), // K; level 0
1349            Error::KeyAlreadyExists
1350        ));
1351
1352        assert_eq!(tree.root, root12);
1353
1354        // remove F, which should push E back over with G+H
1355        tree.delete("com.example.record/3jqfcqzm3fx2j").await.unwrap(); // F; level 2
1356
1357        assert_eq!(tree.root, root11);
1358    }
1359
1360    #[tokio::test]
1361    async fn mst_two_layers() {
1362        let bs = MemoryBlockStore::new();
1363        let mut tree = Tree::create(bs).await.unwrap();
1364
1365        let root10 =
1366            Cid::from_str("bafyreidfcktqnfmykz2ps3dbul35pepleq7kvv526g47xahuz3rqtptmky").unwrap();
1367        let root12 =
1368            Cid::from_str("bafyreiavxaxdz7o7rbvr3zg2liox2yww46t7g6hkehx4i4h3lwudly7dhy").unwrap();
1369        let root12_2 =
1370            Cid::from_str("bafyreig4jv3vuajbsybhyvb7gggvpwh2zszwfyttjrj6qwvcsp24h6popu").unwrap();
1371
1372        /*
1373         *
1374         *          *        ->            *
1375         *        __|__                  __|__
1376         *       |     |                |  |  |
1377         *       a     c                *  b  *
1378         *                              |     |
1379         *                              *     *
1380         *                              |     |
1381         *                              a     c
1382         *
1383         */
1384        tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); // A; level 0
1385        tree.add("com.example.record/3jqfcqzm3fz2j", value_cid()).await.unwrap(); // C; level 0
1386
1387        assert_eq!(tree.root, root10);
1388
1389        // insert B, which is two levels above
1390        tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap(); // B; level 2
1391
1392        assert_eq!(tree.root, root12);
1393
1394        // remove B
1395        tree.delete("com.example.record/3jqfcqzm3fx2j").await.unwrap(); // B; level 2
1396
1397        assert_eq!(tree.root, root10);
1398
1399        // insert B (level=2) and D (level=1)
1400        tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap(); // B; level 2
1401        tree.add("com.example.record/3jqfcqzm4fd2j", value_cid()).await.unwrap(); // D; level 1
1402
1403        assert_eq!(tree.root, root12_2);
1404
1405        // remove D
1406        tree.delete("com.example.record/3jqfcqzm4fd2j").await.unwrap(); // D; level 1
1407
1408        assert_eq!(tree.root, root12);
1409    }
1410
1411    #[tokio::test]
1412    async fn mst_two_layers_rev() {
1413        let bs = MemoryBlockStore::new();
1414        let mut tree = Tree::create(bs).await.unwrap();
1415
1416        let root10 =
1417            Cid::from_str("bafyreidfcktqnfmykz2ps3dbul35pepleq7kvv526g47xahuz3rqtptmky").unwrap();
1418        let root12 =
1419            Cid::from_str("bafyreiavxaxdz7o7rbvr3zg2liox2yww46t7g6hkehx4i4h3lwudly7dhy").unwrap();
1420
1421        // This is the same test as `mst_two_layers`, but with the top level entry inserted first.
1422        tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap(); // B; level 2
1423        tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); // A; level 0
1424        tree.add("com.example.record/3jqfcqzm3fz2j", value_cid()).await.unwrap(); // C; level 0
1425
1426        assert_eq!(tree.root, root12);
1427
1428        // remove B
1429        tree.delete("com.example.record/3jqfcqzm3fx2j").await.unwrap(); // B; level 2
1430
1431        assert_eq!(tree.root, root10);
1432    }
1433
1434    #[tokio::test]
1435    async fn mst_two_layers_del() {
1436        let bs = MemoryBlockStore::new();
1437        let mut tree = Tree::create(bs).await.unwrap();
1438
1439        let root00 =
1440            Cid::from_str("bafyreie5737gdxlw5i64vzichcalba3z2v5n6icifvx5xytvske7mr3hpm").unwrap();
1441        let root10 =
1442            Cid::from_str("bafyreih7wfei65pxzhauoibu3ls7jgmkju4bspy4t2ha2qdjnzqvoy33ai").unwrap();
1443        let root11 =
1444            Cid::from_str("bafyreidjq27sf6pi5pq2relsiwis64k2jzu7yuxukovehvtc6cranqkxcy").unwrap();
1445        let root12 =
1446            Cid::from_str("bafyreiavxaxdz7o7rbvr3zg2liox2yww46t7g6hkehx4i4h3lwudly7dhy").unwrap();
1447
1448        tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap(); // B; level 2
1449        tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); // A; level 0
1450        tree.add("com.example.record/3jqfcqzm3fz2j", value_cid()).await.unwrap(); // C; level 0
1451
1452        assert_eq!(tree.root, root12);
1453        assert_eq!(tree.depth(None).await.unwrap(), Some(2));
1454
1455        // remove A. This should remove the entire left side of the tree.
1456        tree.delete("com.example.record/3jqfcqzm3ft2j").await.unwrap(); // A; level 0
1457
1458        assert_eq!(tree.root, root11);
1459
1460        // add it back and compare.
1461        tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); // A; level 0
1462
1463        assert_eq!(tree.root, root12);
1464
1465        tree.delete("com.example.record/3jqfcqzm3ft2j").await.unwrap(); // A; level 0
1466        tree.delete("com.example.record/3jqfcqzm3fz2j").await.unwrap(); // C; level 0
1467
1468        assert_eq!(tree.root, root10);
1469
1470        tree.delete("com.example.record/3jqfcqzm3fx2j").await.unwrap(); // B; level 2
1471
1472        assert_eq!(tree.root, root00);
1473    }
1474
1475    #[tokio::test]
1476    async fn mst_insert() {
1477        let bs = MemoryBlockStore::new();
1478        let mut tree = Tree::create(bs).await.unwrap();
1479
1480        tree.add("com.example.record/3jqfcqzm3fo2j", Cid::default()).await.unwrap();
1481    }
1482
1483    #[tokio::test]
1484    async fn mst_enum_prefixed() {
1485        let bs = MemoryBlockStore::new();
1486        let mut tree = Tree::create(bs).await.unwrap();
1487
1488        tree.add("com.example.abcd/2222222222222", value_cid()).await.unwrap();
1489        tree.add("com.example.abcd/2222222222223", value_cid()).await.unwrap();
1490        tree.add("com.example.bbcd/2222222222222", value_cid()).await.unwrap();
1491        tree.add("com.example.bbcd/2222222222223", value_cid()).await.unwrap();
1492        tree.add("com.example.bbcd/2222222222224", value_cid()).await.unwrap();
1493        tree.add("com.example.cbcd/2222222222222", value_cid()).await.unwrap();
1494        tree.add("com.example.cbcd/2222222222223", value_cid()).await.unwrap();
1495
1496        // Ensure keys are returned in lexicographic order.
1497        let keys = tree
1498            .entries_prefixed("com.example.abcd")
1499            .map_ok(|(k, _v)| k)
1500            .try_collect::<Vec<_>>()
1501            .await
1502            .unwrap();
1503        assert_eq!(
1504            keys.as_slice(),
1505            &["com.example.abcd/2222222222222", "com.example.abcd/2222222222223",]
1506        );
1507
1508        let keys = tree
1509            .entries_prefixed("com.example.bbcd")
1510            .map_ok(|(k, _v)| k)
1511            .try_collect::<Vec<_>>()
1512            .await
1513            .unwrap();
1514
1515        assert_eq!(
1516            keys.as_slice(),
1517            &[
1518                "com.example.bbcd/2222222222222",
1519                "com.example.bbcd/2222222222223",
1520                "com.example.bbcd/2222222222224",
1521            ]
1522        );
1523    }
1524}