1use std::{cmp::Ordering, collections::HashSet, convert::Infallible};
2
3use algos::FindPathResult;
4use async_stream::try_stream;
5use futures::{Stream, StreamExt};
6use ipld_core::cid::Cid;
7use serde::{Deserialize, Serialize};
8use sha2::Digest;
9
10use crate::blockstore::{AsyncBlockStoreRead, AsyncBlockStoreWrite, DAG_CBOR, SHA2_256};
11
12mod schema {
13 use super::*;
14
15 #[derive(Deserialize, Serialize, Clone, PartialEq)]
19 pub struct Node {
20 #[serde(rename = "l")]
23 pub left: Option<Cid>,
24
25 #[serde(rename = "e")]
27 pub entries: Vec<TreeEntry>,
28 }
29
30 #[derive(Deserialize, Serialize, Clone, PartialEq)]
31 pub struct TreeEntry {
32 #[serde(rename = "p")]
35 pub prefix_len: usize,
36
37 #[serde(rename = "k", with = "serde_bytes")]
45 pub key_suffix: Vec<u8>,
46
47 #[serde(rename = "v")]
49 pub value: Cid,
50
51 #[serde(rename = "t")]
55 pub tree: Option<Cid>,
56 }
57}
58
59mod algos {
61 use super::*;
62
63 pub enum TraverseAction<R, M> {
64 Continue((Cid, M)),
66 Stop(R),
68 }
69
70 pub enum FindPathResult {
71 Found {
73 node: Cid,
75 path: Cid,
77 },
78 NotFound {
80 node: Cid,
82 },
83 }
84
85 pub async fn compute_depth(
89 mut bs: impl AsyncBlockStoreRead,
90 node: Cid,
91 ) -> Result<Option<usize>, Error> {
92 let mut subtrees = vec![(node, 0usize)];
95
96 loop {
97 if let Some((subtree, depth)) = subtrees.pop() {
98 let node = Node::read_from(&mut bs, subtree).await?;
99 if let Some(layer) = node.layer() {
100 return Ok(Some(depth + layer));
101 }
102
103 subtrees.extend(node.trees().cloned().zip(std::iter::repeat(depth + 1)));
104 } else {
105 return Ok(None);
106 }
107 }
108 }
109
110 pub async fn traverse<R, M>(
117 mut bs: impl AsyncBlockStoreRead,
118 root: Cid,
119 mut f: impl FnMut(Node, Cid) -> Result<TraverseAction<R, M>, Error>,
120 ) -> Result<(Vec<(Node, M)>, R), Error> {
121 let mut node_cid = root;
122 let mut node_path = vec![];
123 let mut seen = HashSet::new();
124
125 loop {
126 let node = Node::read_from(&mut bs, node_cid).await?;
127 if !seen.insert(node_cid) {
128 panic!();
130 }
131
132 match f(node.clone(), node_cid)? {
133 TraverseAction::Continue((cid, meta)) => {
134 node_path.push((node, meta));
135 node_cid = cid;
136 }
137 TraverseAction::Stop(r) => {
138 return Ok((node_path, r));
139 }
140 }
141 }
142 }
143
144 pub fn traverse_find(
146 key: &str,
147 ) -> impl FnMut(Node, Cid) -> Result<TraverseAction<(Node, usize), usize>, Error> + '_ {
148 move |node, _cid| -> Result<_, Error> {
149 if let Some(index) = node.find_ge(key) {
150 if let Some(NodeEntry::Leaf(e)) = node.entries.get(index) {
151 if e.key == key {
152 return Ok(TraverseAction::Stop((node, index)));
153 }
154 }
155
156 if let Some(index) = index.checked_sub(1) {
158 if let Some(subtree) = node.entries.get(index).unwrap().tree() {
159 Ok(TraverseAction::Continue((*subtree, index)))
160 } else {
161 Err(Error::KeyNotFound)
162 }
163 } else {
164 Err(Error::KeyNotFound)
166 }
167 } else {
168 Err(Error::KeyNotFound)
170 }
171 }
172 }
173
174 pub fn traverse_find_path(
177 key: &str,
178 ) -> impl FnMut(Node, Cid) -> Result<TraverseAction<FindPathResult, Cid>, Error> + '_ {
179 move |node, cid| -> Result<_, Error> {
180 if let Some(index) = node.find_ge(key) {
181 if let Some(NodeEntry::Leaf(e)) = node.entries.get(index) {
182 if e.key == key {
183 return Ok(TraverseAction::Stop(FindPathResult::Found {
184 node: cid,
185 path: e.value,
186 }));
187 }
188 }
189
190 if let Some(index) = index.checked_sub(1) {
192 if let Some(subtree) = node.entries.get(index).unwrap().tree() {
193 Ok(TraverseAction::Continue((*subtree, cid)))
194 } else {
195 Ok(TraverseAction::Stop(FindPathResult::NotFound { node: cid }))
196 }
197 } else {
198 Ok(TraverseAction::Stop(FindPathResult::NotFound { node: cid }))
200 }
201 } else {
202 Ok(TraverseAction::Stop(FindPathResult::NotFound { node: cid }))
204 }
205 }
206 }
207
208 pub fn traverse_prune() -> impl FnMut(Node, Cid) -> Result<TraverseAction<Cid, usize>, Error> {
211 move |node, cid| -> Result<_, Error> {
212 if node.entries.len() == 1 {
213 if let Some(NodeEntry::Tree(cid)) = node.entries.first() {
214 return Ok(TraverseAction::Continue((*cid, 0)));
215 }
216 }
217
218 Ok(TraverseAction::Stop(cid))
219 }
220 }
221
222 pub async fn merge_subtrees(
224 mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
225 mut lc: Cid,
226 mut rc: Cid,
227 ) -> Result<Cid, Error> {
228 let mut node_path = vec![];
229
230 let (ln, rn) = loop {
231 let ln = Node::read_from(&mut bs, lc).await?;
233 let rn = Node::read_from(&mut bs, rc).await?;
234
235 if let (Some(NodeEntry::Tree(l)), Some(NodeEntry::Tree(r))) =
236 (ln.entries.last(), rn.entries.first())
237 {
238 node_path.push((ln.clone(), rn.clone()));
239
240 lc = *l;
241 rc = *r;
242 } else {
243 break (ln, rn);
244 }
245 };
246
247 let node = Node { entries: ln.entries.into_iter().chain(rn.entries).collect() };
249 let mut cid = node.serialize_into(&mut bs).await?;
250
251 for (ln, rn) in node_path.into_iter().rev() {
253 let node = Node {
254 entries: ln.entries[..ln.entries.len() - 1]
255 .iter()
256 .cloned()
257 .chain([NodeEntry::Tree(cid)])
258 .chain(rn.entries[1..].iter().cloned())
259 .collect(),
260 };
261
262 cid = node.serialize_into(&mut bs).await?;
263 }
264
265 Ok(cid)
266 }
267
268 pub async fn split_subtree(
272 mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
273 node: Cid,
274 key: &str,
275 ) -> Result<(Option<Cid>, Option<Cid>), Error> {
276 let (node_path, (mut left, mut right)) = traverse(&mut bs, node, |mut node, _cid| {
277 if let Some(partition) = node.find_ge(key) {
278 if let Some(NodeEntry::Leaf(e)) = node.entries.get(partition) {
280 if e.key == key {
281 return Err(Error::KeyAlreadyExists);
282 }
283 }
284
285 if let Some(partition) = partition.checked_sub(1) {
287 match node.entries.get(partition) {
288 Some(NodeEntry::Leaf(_e)) => {
289 let right = node.entries.split_off(partition + 1);
291
292 Ok(TraverseAction::Stop((
293 Some(node),
294 (!right.is_empty()).then_some(Node { entries: right }),
295 )))
296 }
297 Some(NodeEntry::Tree(e)) => Ok(TraverseAction::Continue((*e, partition))),
298 None => panic!(),
300 }
301 } else {
302 Ok(TraverseAction::Stop((None, Some(node))))
303 }
304 } else {
305 todo!()
306 }
307 })
308 .await?;
309
310 for (mut parent, i) in node_path.into_iter().rev() {
312 parent.entries.remove(i);
314 let (e_left, e_right) = parent.entries.split_at(i);
315
316 if let Some(left) = left.as_mut() {
317 let left_cid = left.serialize_into(&mut bs).await?;
318 *left = Node {
319 entries: e_left.iter().cloned().chain([NodeEntry::Tree(left_cid)]).collect(),
320 };
321 }
322
323 if let Some(right) = right.as_mut() {
324 let right_cid = right.serialize_into(&mut bs).await?;
325 *right = Node {
326 entries: [NodeEntry::Tree(right_cid)]
327 .into_iter()
328 .chain(e_right.iter().cloned())
329 .collect::<Vec<_>>(),
330 };
331 }
332 }
333
334 let left =
336 if let Some(left) = left { Some(left.serialize_into(&mut bs).await?) } else { None };
337 let right =
338 if let Some(right) = right { Some(right.serialize_into(&mut bs).await?) } else { None };
339
340 Ok((left, right))
341 }
342
343 pub async fn prune(
345 mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
346 root: Cid,
347 ) -> Result<Cid, Error> {
348 let (_node_path, cid) = algos::traverse(&mut bs, root, algos::traverse_prune()).await?;
349 Ok(cid)
350 }
351
352 pub async fn add(
353 mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
354 root: Cid,
355 key: &str,
356 value: Cid,
357 ) -> Result<Cid, Error> {
358 let target_layer = leading_zeroes(key.as_bytes());
360
361 let mut node_path = vec![];
363 let mut node_cid = root;
364
365 let mut node = match compute_depth(&mut bs, root).await {
370 Ok(Some(layer)) => {
371 match layer.cmp(&target_layer) {
372 Ordering::Equal => Node::read_from(&mut bs, node_cid).await?,
374 Ordering::Less => {
376 let mut layer = layer + 1;
377
378 loop {
379 let node = Node { entries: vec![NodeEntry::Tree(node_cid)] };
380
381 if layer < target_layer {
382 node_cid = node.serialize_into(&mut bs).await?;
383 layer += 1;
384 } else {
385 break node;
386 }
387 }
388 }
389 Ordering::Greater => {
391 let mut layer = layer;
392
393 let (path, (mut node, partition)) =
395 algos::traverse(&mut bs, node_cid, |node, _cid| {
396 if layer == target_layer {
397 Ok(algos::TraverseAction::Stop((node, 0)))
398 } else {
399 let partition = node.find_ge(key).unwrap();
400
401 if let Some(partition) = partition.checked_sub(1) {
403 if let Some(subtree) =
404 node.entries.get(partition).unwrap().tree()
405 {
406 layer -= 1;
407 return Ok(algos::TraverseAction::Continue((
408 *subtree, partition,
409 )));
410 }
411 }
412
413 Ok(algos::TraverseAction::Stop((node, partition)))
414 }
415 })
416 .await?;
417
418 node_path = path;
419 if layer == target_layer {
420 node
422 } else {
423 node.entries.insert(partition, NodeEntry::Tree(Cid::default()));
425 node_path.push((node, partition));
426 layer -= 1;
427
428 while layer != target_layer {
430 let node = Node { entries: vec![NodeEntry::Tree(Cid::default())] };
431
432 node_path.push((node.clone(), 0));
433 layer -= 1;
434 }
435
436 Node { entries: vec![] }
438 }
439 }
440 }
441 }
442 Ok(None) => {
443 Node { entries: vec![] }
445 }
446 Err(e) => return Err(e),
447 };
448
449 if let Some(partition) = node.find_ge(key) {
450 if let Some(NodeEntry::Leaf(e)) = node.entries.get(partition) {
452 if e.key == key {
453 return Err(Error::KeyAlreadyExists);
454 }
455 }
456
457 if let Some(partition) = partition.checked_sub(1) {
458 match node.entries.get(partition) {
459 Some(NodeEntry::Leaf(_)) => {
460 node.entries.insert(
462 partition + 1,
463 NodeEntry::Leaf(TreeEntry { key: key.to_string(), value }),
464 );
465 }
466 Some(NodeEntry::Tree(e)) => {
467 let (left, right) = algos::split_subtree(&mut bs, *e, key).await?;
469
470 let right_subvec = node.entries.split_off(partition + 1);
472
473 node.entries.pop();
474 if let Some(left) = left {
475 node.entries.push(NodeEntry::Tree(left));
476 }
477 node.entries
478 .extend([NodeEntry::Leaf(TreeEntry { key: key.to_string(), value })]);
479 if let Some(right) = right {
480 node.entries.push(NodeEntry::Tree(right));
481 }
482 node.entries.extend(right_subvec.into_iter());
483 }
484 None => unreachable!(),
486 }
487 } else {
488 node.entries.insert(0, NodeEntry::Leaf(TreeEntry { key: key.to_string(), value }));
490 }
491 } else {
492 node.entries.push(NodeEntry::Leaf(TreeEntry { key: key.to_string(), value }));
494 }
495
496 let mut cid = node.serialize_into(&mut bs).await?;
497
498 for (mut parent, i) in node_path.into_iter().rev() {
500 parent.entries[i] = NodeEntry::Tree(cid);
501 cid = parent.serialize_into(&mut bs).await?;
502 }
503
504 Ok(cid)
505 }
506
507 pub async fn update(
508 mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
509 root: Cid,
510 key: &str,
511 value: Cid,
512 ) -> Result<Cid, Error> {
513 let (node_path, (mut node, index)) =
514 algos::traverse(&mut bs, root, algos::traverse_find(key)).await?;
515
516 node.entries[index] = NodeEntry::Leaf(TreeEntry { key: key.to_string(), value });
518
519 let mut cid = node.serialize_into(&mut bs).await?;
520
521 for (mut parent, i) in node_path.into_iter().rev() {
523 parent.entries[i] = NodeEntry::Tree(cid);
524 cid = parent.serialize_into(&mut bs).await?;
525 }
526
527 Ok(cid)
528 }
529
530 pub async fn delete(
531 mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
532 root: Cid,
533 key: &str,
534 ) -> Result<Cid, Error> {
535 let (node_path, (mut node, index)) =
536 algos::traverse(&mut bs, root, algos::traverse_find(key)).await?;
537
538 node.entries.remove(index);
540
541 if let Some(index) = index.checked_sub(1) {
542 if let (Some(NodeEntry::Tree(lc)), Some(NodeEntry::Tree(rc))) =
544 (node.entries.get(index), node.entries.get(index + 1))
545 {
546 let cid = algos::merge_subtrees(&mut bs, *lc, *rc).await?;
547 node.entries[index] = NodeEntry::Tree(cid);
548 node.entries.remove(index + 1);
549 }
550 }
551
552 let node = (!node.entries.is_empty()).then_some(node);
554
555 let mut cid =
556 if let Some(node) = node { Some(node.serialize_into(&mut bs).await?) } else { None };
557
558 for (mut parent, i) in node_path.into_iter().rev() {
560 if let Some(cid) = cid.as_mut() {
561 parent.entries[i] = NodeEntry::Tree(*cid);
562 *cid = parent.serialize_into(&mut bs).await?;
563 } else {
564 parent.entries.remove(i);
568
569 cid = if parent.entries.is_empty() {
571 None
572 } else {
573 Some(parent.serialize_into(&mut bs).await?)
574 };
575 }
576 }
577
578 let cid = if let Some(cid) = cid {
579 cid
580 } else {
581 let node = Node { entries: vec![] };
583 node.serialize_into(&mut bs).await?
584 };
585
586 let cid = prune(&mut bs, cid).await?;
587 Ok(cid)
588 }
589}
590
591fn prefix(xs: &[u8], ys: &[u8]) -> usize {
593 prefix_chunks::<128>(xs, ys)
594}
595
596fn prefix_chunks<const N: usize>(xs: &[u8], ys: &[u8]) -> usize {
597 let off =
599 std::iter::zip(xs.chunks_exact(N), ys.chunks_exact(N)).take_while(|(x, y)| x == y).count()
600 * N;
601 off + std::iter::zip(&xs[off..], &ys[off..]).take_while(|(x, y)| x == y).count()
602}
603
604fn leading_zeroes(key: &[u8]) -> usize {
608 let digest = sha2::Sha256::digest(key);
609 let mut zeroes = 0;
610
611 for byte in digest.iter() {
612 zeroes += (*byte < 0b0100_0000) as usize; zeroes += (*byte < 0b0001_0000) as usize; zeroes += (*byte < 0b0000_0100) as usize; zeroes += (*byte < 0b0000_0001) as usize; if *byte != 0 {
618 break;
620 }
621 }
622
623 zeroes
624}
625
626pub struct Tree<S> {
655 storage: S,
656 root: Cid,
657}
658
659impl<S: Clone> Clone for Tree<S> {
662 fn clone(&self) -> Self {
663 Self { storage: self.storage.clone(), root: self.root }
664 }
665}
666
667impl<S> std::fmt::Debug for Tree<S> {
668 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
669 f.debug_struct("Tree").field("root", &self.root).finish_non_exhaustive()
670 }
671}
672
673impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
674 pub async fn create(mut storage: S) -> Result<Self, Error> {
676 let node = Node { entries: vec![] };
677 let cid = node.serialize_into(&mut storage).await?;
678
679 Ok(Self { storage, root: cid })
680 }
681
682 pub async fn add(&mut self, key: &str, value: Cid) -> Result<(), Error> {
684 self.root = algos::add(&mut self.storage, self.root, key, value).await?;
685 Ok(())
686 }
687
688 pub async fn update(&mut self, key: &str, value: Cid) -> Result<(), Error> {
690 self.root = algos::update(&mut self.storage, self.root, key, value).await?;
691 Ok(())
692 }
693
694 pub async fn delete(&mut self, key: &str) -> Result<(), Error> {
696 self.root = algos::delete(&mut self.storage, self.root, key).await?;
697 Ok(())
698 }
699}
700
701impl<S: AsyncBlockStoreRead> Tree<S> {
702 pub fn open(storage: S, root: Cid) -> Self {
708 Self { storage, root }
709 }
710
711 pub fn root(&self) -> Cid {
713 self.root
714 }
715
716 pub async fn depth(&mut self, node: Option<Cid>) -> Result<Option<usize>, Error> {
718 algos::compute_depth(&mut self.storage, node.unwrap_or(self.root)).await
719 }
720
721 pub fn export(&mut self) -> impl Stream<Item = Result<Cid, Error>> + '_ {
723 let mut stack = vec![Located::InSubtree(self.root)];
725
726 try_stream! {
727 while let Some(e) = stack.pop() {
728 match e {
729 Located::InSubtree(cid) => {
730 let node = Node::read_from(&mut self.storage, cid).await?;
731 yield cid;
732
733 for entry in node.entries.iter().rev() {
734 match entry {
735 NodeEntry::Tree(entry) => {
736 stack.push(Located::InSubtree(*entry));
737 }
738 NodeEntry::Leaf(entry) => {
739 stack.push(Located::Entry(entry.value));
740 }
741 }
742 }
743 }
744 Located::Entry(value) => yield value,
745 }
746 }
747 }
748 }
749
750 pub fn entries(&mut self) -> impl Stream<Item = Result<(String, Cid), Error>> + '_ {
755 let mut stack = vec![Located::InSubtree(self.root)];
757
758 try_stream! {
759 while let Some(e) = stack.pop() {
760 match e {
761 Located::InSubtree(cid) => {
762 let node = Node::read_from(&mut self.storage, cid).await?;
763 for entry in node.entries.iter().rev() {
764 match entry {
765 NodeEntry::Tree(entry) => {
766 stack.push(Located::InSubtree(*entry));
767 }
768 NodeEntry::Leaf(entry) => {
769 stack.push(Located::Entry((entry.key.clone(), entry.value)));
770 }
771 }
772 }
773 }
774 Located::Entry((key, value)) => yield (key, value),
775 }
776 }
777 }
778 }
779
780 pub fn entries_prefixed<'a>(
785 &'a mut self,
786 prefix: &'a str,
787 ) -> impl Stream<Item = Result<(String, Cid), Error>> + 'a {
788 let mut stack = vec![Located::InSubtree(self.root)];
790
791 try_stream! {
792 while let Some(e) = stack.pop() {
793 match e {
794 Located::InSubtree(cid) => {
795 let node = Node::read_from(&mut self.storage, cid).await?;
796 for entry in node.entries_with_prefix(prefix).rev() {
797 match entry {
798 NodeEntry::Tree(entry) => {
799 stack.push(Located::InSubtree(entry));
800 }
801 NodeEntry::Leaf(entry) => {
802 stack.push(Located::Entry((entry.key.clone(), entry.value)));
803 }
804 }
805 }
806 }
807 Located::Entry((key, value)) => yield (key, value),
808 }
809 }
810 }
811 }
812
813 pub fn keys(&mut self) -> impl Stream<Item = Result<String, Error>> + '_ {
818 self.entries().map(|e| e.map(|(k, _)| k))
819 }
820
821 pub fn keys_prefixed<'a>(
826 &'a mut self,
827 prefix: &'a str,
828 ) -> impl Stream<Item = Result<String, Error>> + 'a {
829 self.entries_prefixed(prefix).map(|e| e.map(|(k, _)| k))
830 }
831
832 pub async fn get(&mut self, key: &str) -> Result<Option<Cid>, Error> {
834 match algos::traverse(&mut self.storage, self.root, algos::traverse_find(key)).await {
835 Ok((_node_path, (node, index))) => Ok(Some(node.entries[index].leaf().unwrap().value)),
837 Err(Error::KeyNotFound) => Ok(None),
838 Err(e) => Err(e),
839 }
840 }
841
842 pub async fn extract_path(&mut self, key: &str) -> Result<impl Iterator<Item = Cid>, Error> {
849 let mut r = Vec::new();
851
852 match algos::traverse(&mut self.storage, self.root, algos::traverse_find_path(key)).await {
853 Ok((node_path, FindPathResult::Found { node, path })) => {
854 r.extend(node_path.into_iter().map(|(_, cid)| cid).chain([node, path]));
855 Ok(r.into_iter())
856 }
857 Ok((node_path, FindPathResult::NotFound { node })) => {
858 r.extend(node_path.into_iter().map(|(_, cid)| cid).chain([node]));
859 Ok(r.into_iter())
860 }
861 Err(e) => Err(e),
862 }
863 }
864}
865
866#[derive(Debug)]
868pub enum Located<E> {
869 Entry(E),
871 InSubtree(Cid),
873}
874
875#[derive(Debug, Clone, PartialEq)]
876enum NodeEntry {
877 Tree(Cid),
879 Leaf(TreeEntry),
881}
882
883impl NodeEntry {
884 fn tree(&self) -> Option<&Cid> {
885 match self {
886 NodeEntry::Tree(cid) => Some(cid),
887 _ => None,
888 }
889 }
890
891 fn leaf(&self) -> Option<&TreeEntry> {
892 match self {
893 NodeEntry::Leaf(entry) => Some(entry),
894 _ => None,
895 }
896 }
897}
898
899#[derive(Debug, Clone)]
901struct Node {
902 entries: Vec<NodeEntry>,
906}
907
908impl Node {
909 pub fn parse(bytes: &[u8]) -> Result<Self, Error> {
911 let node: schema::Node = serde_ipld_dagcbor::from_slice(bytes)?;
912
913 let mut entries = vec![];
914 if let Some(left) = &node.left {
915 entries.push(NodeEntry::Tree(*left));
916 }
917
918 let mut prev_key = vec![];
919 for entry in &node.entries {
920 let parsed_entry = TreeEntry::parse(entry.clone(), &prev_key)?;
921 prev_key = parsed_entry.key.as_bytes().to_vec();
922
923 entries.push(NodeEntry::Leaf(parsed_entry));
924
925 if let Some(tree) = &entry.tree {
927 entries.push(NodeEntry::Tree(*tree));
928 }
929 }
930
931 Ok(Self { entries })
932 }
933
934 pub async fn read_from(mut bs: impl AsyncBlockStoreRead, cid: Cid) -> Result<Self, Error> {
936 let bytes = bs.read_block(cid).await?;
937 Self::parse(&bytes)
938 }
939
940 pub async fn serialize_into(&self, mut bs: impl AsyncBlockStoreWrite) -> Result<Cid, Error> {
941 let mut node = schema::Node { left: None, entries: vec![] };
942
943 let ents = match self.entries.first() {
945 Some(NodeEntry::Tree(cid)) => {
946 node.left = Some(*cid);
947 &self.entries[1..]
948 }
949 _ => &self.entries,
950 };
951
952 let mut prev_key = vec![];
953 let mut i = 0usize;
954 while i != ents.len() {
955 let (leaf, tree) = match (ents.get(i), ents.get(i + 1)) {
956 (Some(NodeEntry::Tree(_)), Some(NodeEntry::Tree(_))) => {
957 panic!("attempted to serialize node with two adjacent trees")
959 }
960 (Some(NodeEntry::Leaf(leaf)), Some(NodeEntry::Tree(tree))) => (leaf, Some(tree)),
961 (Some(NodeEntry::Leaf(leaf)), _) => (leaf, None),
962 _ => {
964 i += 1;
965 continue;
966 }
967 };
968
969 let prefix = prefix(&prev_key, leaf.key.as_bytes());
970
971 node.entries.push(schema::TreeEntry {
972 prefix_len: prefix,
973 key_suffix: leaf.key.as_bytes()[prefix..].to_vec(),
974 value: leaf.value,
975 tree: tree.cloned(),
976 });
977
978 prev_key = leaf.key.as_bytes().to_vec();
979 i += 1;
980 }
981
982 let bytes = serde_ipld_dagcbor::to_vec(&node).unwrap();
983 Ok(bs.write_block(DAG_CBOR, SHA2_256, &bytes).await?)
984 }
985
986 fn trees(&self) -> impl Iterator<Item = &Cid> {
988 self.entries.iter().filter_map(|entry| match entry {
989 NodeEntry::Tree(entry) => Some(entry),
990 _ => None,
991 })
992 }
993
994 fn leaves(&self) -> impl Iterator<Item = &TreeEntry> {
996 self.entries.iter().filter_map(|entry| match entry {
997 NodeEntry::Leaf(entry) => Some(entry),
998 _ => None,
999 })
1000 }
1001
1002 fn layer(&self) -> Option<usize> {
1004 self.leaves().next().map(|e| leading_zeroes(e.key.as_bytes()))
1005 }
1006
1007 fn find_ge(&self, key: &str) -> Option<usize> {
1012 let mut e = self.entries.iter().enumerate().filter_map(|(i, e)| e.leaf().map(|e| (i, e)));
1013
1014 if let Some((i, _e)) = e.find(|(_i, e)| e.key.as_str() >= key) {
1015 Some(i)
1016 } else if !self.entries.is_empty() {
1017 Some(self.entries.len())
1018 } else {
1019 None
1020 }
1021 }
1022
1023 #[allow(dead_code)]
1027 pub fn get(&self, key: &str) -> Option<Located<Cid>> {
1028 let i = self.find_ge(key)?;
1029
1030 if let Some(NodeEntry::Leaf(e)) = self.entries.get(i) {
1031 if e.key == key {
1032 return Some(Located::Entry(e.value));
1033 }
1034 }
1035
1036 if let Some(NodeEntry::Tree(cid)) = self.entries.get(i - 1) {
1037 Some(Located::InSubtree(*cid))
1038 } else {
1039 None
1040 }
1041 }
1042
1043 pub fn entries_with_prefix<'a>(
1046 &'a self,
1047 prefix: &str,
1048 ) -> impl DoubleEndedIterator<Item = NodeEntry> + 'a {
1049 let mut list = Vec::new();
1050
1051 let index = if let Some(i) = self.find_ge(prefix) {
1052 i
1053 } else {
1054 return list.into_iter();
1056 };
1057
1058 if let Some(index) = index.checked_sub(1) {
1060 if let Some(NodeEntry::Tree(cid)) = self.entries.get(index) {
1061 list.push(NodeEntry::Tree(*cid));
1062 }
1063 }
1064
1065 if let Some(e) = self.entries.get(index..) {
1066 for e in e {
1067 if let NodeEntry::Leaf(t) = e {
1068 if let Some(kp) = t.key.get(..prefix.len()) {
1071 match kp.cmp(prefix) {
1072 Ordering::Less => (),
1073 Ordering::Equal => {
1074 list.push(NodeEntry::Leaf(t.clone()));
1075 continue;
1076 }
1077 Ordering::Greater => {
1078 break;
1082 }
1083 }
1084 }
1085 }
1086
1087 list.push(e.clone());
1088 }
1089 }
1090
1091 list.into_iter()
1092 }
1093}
1094
1095#[derive(Debug, Clone, PartialEq)]
1096struct TreeEntry {
1097 key: String,
1098 value: Cid,
1099}
1100
1101impl TreeEntry {
1102 fn parse(entry: schema::TreeEntry, prev_key: &[u8]) -> Result<Self, Error> {
1103 let key = if entry.prefix_len == 0 {
1104 entry.key_suffix
1105 } else if prev_key.len() < entry.prefix_len {
1106 return Err(Error::InvalidPrefixLen);
1107 } else {
1108 let mut key_bytes = prev_key[..entry.prefix_len].to_vec();
1109 key_bytes.extend(entry.key_suffix);
1110 key_bytes
1111 };
1112
1113 let key = String::from_utf8(key).map_err(|e| e.utf8_error())?;
1114
1115 Ok(Self { key, value: entry.value })
1116 }
1117}
1118
1119#[derive(Debug, thiserror::Error)]
1121pub enum Error {
1122 #[error("Invalid prefix_len")]
1123 InvalidPrefixLen,
1124 #[error("the key is already present in the tree")]
1125 KeyAlreadyExists,
1126 #[error("the key is not present in the tree")]
1127 KeyNotFound,
1128 #[error("Invalid key: {0}")]
1129 InvalidKey(#[from] std::str::Utf8Error),
1130 #[error("blockstore error: {0}")]
1131 BlockStore(#[from] crate::blockstore::Error),
1132 #[error("serde_ipld_dagcbor decoding error: {0}")]
1133 Parse(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
1134}
1135
1136#[cfg(test)]
1137mod test {
1138 use std::str::FromStr;
1139
1140 use futures::TryStreamExt;
1141 use ipld_core::cid::multihash::Multihash;
1142
1143 use crate::blockstore::{MemoryBlockStore, SHA2_256};
1144
1145 use super::*;
1146
1147 fn value_cid() -> Cid {
1151 Cid::new_v1(
1152 DAG_CBOR,
1153 match Multihash::wrap(
1154 SHA2_256,
1155 &[
1156 0x9d, 0x15, 0x6b, 0xc3, 0xf3, 0xa5, 0x20, 0x06, 0x62, 0x52, 0xc7, 0x08, 0xa9,
1157 0x36, 0x1f, 0xd3, 0xd0, 0x89, 0x22, 0x38, 0x42, 0x50, 0x0e, 0x37, 0x13, 0xd4,
1158 0x04, 0xfd, 0xcc, 0xb3, 0x3c, 0xef,
1159 ],
1160 ) {
1161 Ok(h) => h,
1162 Err(_e) => panic!(),
1163 },
1164 )
1165 }
1166
1167 #[test]
1168 fn test_prefix() {
1169 assert_eq!(
1170 prefix(b"com.example.record/3jqfcqzm3fo2j", b"com.example.record/3jqfcqzm3fo2j"),
1171 32
1172 );
1173 assert_eq!(
1174 prefix(b"com.example.record/3jqfcqzm3fo2j", b"com.example.record/7jqfcqzm3fo2j"),
1175 19
1176 );
1177 }
1178
1179 #[test]
1180 fn test_clz() {
1181 assert_eq!(leading_zeroes(b""), 0);
1182 assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fn2j"), 0); assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fo2j"), 0); assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fp2j"), 0); assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fs2j"), 1); assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3ft2j"), 0); assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fu2j"), 0); assert_eq!(leading_zeroes(b"com.example.record/3jqfcqzm3fx2j"), 2); }
1190
1191 #[test]
1192 fn node_find_ge() {
1193 let node = Node { entries: vec![] };
1194 assert_eq!(node.find_ge("com.example.record/3jqfcqzm3fp2j"), None);
1195
1196 let node = Node {
1197 entries: vec![NodeEntry::Leaf(TreeEntry {
1198 key: "com.example.record/3jqfcqzm3fs2j".to_string(), value: value_cid(),
1200 })],
1201 };
1202
1203 assert_eq!(node.find_ge("com.example.record/3jqfcqzm3fp2j"), Some(0)); assert_eq!(node.find_ge("com.example.record/3jqfcqzm3fs2j"), Some(0)); assert_eq!(node.find_ge("com.example.record/3jqfcqzm3ft2j"), Some(1)); assert_eq!(node.find_ge("com.example.record/3jqfcqzm4fc2j"), Some(1)); }
1208
1209 #[tokio::test]
1210 async fn mst_create() {
1211 let bs = MemoryBlockStore::new();
1212 let tree = Tree::create(bs).await.unwrap();
1213
1214 assert_eq!(
1215 tree.root,
1216 Cid::from_str("bafyreie5737gdxlw5i64vzichcalba3z2v5n6icifvx5xytvske7mr3hpm").unwrap()
1217 );
1218 }
1219
1220 #[tokio::test]
1221 async fn mst_create_trivial() {
1222 let bs = MemoryBlockStore::new();
1223 let mut tree = Tree::create(bs).await.unwrap();
1224
1225 tree.add("com.example.record/3jqfcqzm3fo2j", value_cid()).await.unwrap();
1226
1227 assert_eq!(
1228 tree.root,
1229 Cid::from_str("bafyreibj4lsc3aqnrvphp5xmrnfoorvru4wynt6lwidqbm2623a6tatzdu").unwrap()
1230 );
1231 }
1232
1233 #[tokio::test]
1234 async fn mst_create_singlelayer2() {
1235 let bs = MemoryBlockStore::new();
1236 let mut tree = Tree::create(bs).await.unwrap();
1237
1238 tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap();
1239
1240 assert_eq!(
1241 tree.root,
1242 Cid::from_str("bafyreih7wfei65pxzhauoibu3ls7jgmkju4bspy4t2ha2qdjnzqvoy33ai").unwrap()
1243 );
1244 }
1245
1246 #[tokio::test]
1247 async fn mst_create_simple() {
1248 let bs = MemoryBlockStore::new();
1249 let mut tree = Tree::create(bs).await.unwrap();
1250
1251 tree.add("com.example.record/3jqfcqzm3fp2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fr2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fs2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm4fc2j", value_cid()).await.unwrap(); assert_eq!(
1258 tree.root,
1259 Cid::from_str("bafyreicmahysq4n6wfuxo522m6dpiy7z7qzym3dzs756t5n7nfdgccwq7m").unwrap()
1260 );
1261
1262 let keys = tree.keys().try_collect::<Vec<_>>().await.unwrap();
1264 assert_eq!(
1265 keys.as_slice(),
1266 &[
1267 "com.example.record/3jqfcqzm3fp2j",
1268 "com.example.record/3jqfcqzm3fr2j",
1269 "com.example.record/3jqfcqzm3fs2j",
1270 "com.example.record/3jqfcqzm3ft2j",
1271 "com.example.record/3jqfcqzm4fc2j",
1272 ]
1273 )
1274 }
1275
1276 #[tokio::test]
1277 async fn mst_trim_top() {
1278 let bs = MemoryBlockStore::new();
1279 let mut tree = Tree::create(bs).await.unwrap();
1280
1281 tree.add("com.example.record/3jqfcqzm3fn2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fo2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fp2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fs2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fu2j", value_cid()).await.unwrap(); assert_eq!(
1289 tree.root,
1290 Cid::from_str("bafyreifnqrwbk6ffmyaz5qtujqrzf5qmxf7cbxvgzktl4e3gabuxbtatv4").unwrap()
1291 );
1292 assert_eq!(tree.depth(None).await.unwrap(), Some(1));
1293
1294 tree.delete("com.example.record/3jqfcqzm3fs2j").await.unwrap(); assert_eq!(
1297 tree.root,
1298 Cid::from_str("bafyreie4kjuxbwkhzg2i5dljaswcroeih4dgiqq6pazcmunwt2byd725vi").unwrap()
1299 );
1300 }
1301
1302 #[tokio::test]
1303 async fn mst_insertion_split() {
1304 let bs = MemoryBlockStore::new();
1305 let mut tree = Tree::create(bs).await.unwrap();
1306
1307 let root11 =
1308 Cid::from_str("bafyreiettyludka6fpgp33stwxfuwhkzlur6chs4d2v4nkmq2j3ogpdjem").unwrap();
1309 let root12 =
1310 Cid::from_str("bafyreid2x5eqs4w4qxvc5jiwda4cien3gw2q6cshofxwnvv7iucrmfohpm").unwrap();
1311
1312 tree.add("com.example.record/3jqfcqzm3fo2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fp2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fr2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fs2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fz2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm4fc2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm4fd2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm4fg2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm4ff2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm4fh2j", value_cid()).await.unwrap(); assert_eq!(tree.root, root11);
1340
1341 tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap(); assert_eq!(tree.root, root12);
1345
1346 assert!(matches!(
1348 tree.add("com.example.record/3jqfcqzm4fg2j", value_cid()).await.unwrap_err(), Error::KeyAlreadyExists
1350 ));
1351
1352 assert_eq!(tree.root, root12);
1353
1354 tree.delete("com.example.record/3jqfcqzm3fx2j").await.unwrap(); assert_eq!(tree.root, root11);
1358 }
1359
1360 #[tokio::test]
1361 async fn mst_two_layers() {
1362 let bs = MemoryBlockStore::new();
1363 let mut tree = Tree::create(bs).await.unwrap();
1364
1365 let root10 =
1366 Cid::from_str("bafyreidfcktqnfmykz2ps3dbul35pepleq7kvv526g47xahuz3rqtptmky").unwrap();
1367 let root12 =
1368 Cid::from_str("bafyreiavxaxdz7o7rbvr3zg2liox2yww46t7g6hkehx4i4h3lwudly7dhy").unwrap();
1369 let root12_2 =
1370 Cid::from_str("bafyreig4jv3vuajbsybhyvb7gggvpwh2zszwfyttjrj6qwvcsp24h6popu").unwrap();
1371
1372 tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fz2j", value_cid()).await.unwrap(); assert_eq!(tree.root, root10);
1388
1389 tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap(); assert_eq!(tree.root, root12);
1393
1394 tree.delete("com.example.record/3jqfcqzm3fx2j").await.unwrap(); assert_eq!(tree.root, root10);
1398
1399 tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm4fd2j", value_cid()).await.unwrap(); assert_eq!(tree.root, root12_2);
1404
1405 tree.delete("com.example.record/3jqfcqzm4fd2j").await.unwrap(); assert_eq!(tree.root, root12);
1409 }
1410
1411 #[tokio::test]
1412 async fn mst_two_layers_rev() {
1413 let bs = MemoryBlockStore::new();
1414 let mut tree = Tree::create(bs).await.unwrap();
1415
1416 let root10 =
1417 Cid::from_str("bafyreidfcktqnfmykz2ps3dbul35pepleq7kvv526g47xahuz3rqtptmky").unwrap();
1418 let root12 =
1419 Cid::from_str("bafyreiavxaxdz7o7rbvr3zg2liox2yww46t7g6hkehx4i4h3lwudly7dhy").unwrap();
1420
1421 tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fz2j", value_cid()).await.unwrap(); assert_eq!(tree.root, root12);
1427
1428 tree.delete("com.example.record/3jqfcqzm3fx2j").await.unwrap(); assert_eq!(tree.root, root10);
1432 }
1433
1434 #[tokio::test]
1435 async fn mst_two_layers_del() {
1436 let bs = MemoryBlockStore::new();
1437 let mut tree = Tree::create(bs).await.unwrap();
1438
1439 let root00 =
1440 Cid::from_str("bafyreie5737gdxlw5i64vzichcalba3z2v5n6icifvx5xytvske7mr3hpm").unwrap();
1441 let root10 =
1442 Cid::from_str("bafyreih7wfei65pxzhauoibu3ls7jgmkju4bspy4t2ha2qdjnzqvoy33ai").unwrap();
1443 let root11 =
1444 Cid::from_str("bafyreidjq27sf6pi5pq2relsiwis64k2jzu7yuxukovehvtc6cranqkxcy").unwrap();
1445 let root12 =
1446 Cid::from_str("bafyreiavxaxdz7o7rbvr3zg2liox2yww46t7g6hkehx4i4h3lwudly7dhy").unwrap();
1447
1448 tree.add("com.example.record/3jqfcqzm3fx2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); tree.add("com.example.record/3jqfcqzm3fz2j", value_cid()).await.unwrap(); assert_eq!(tree.root, root12);
1453 assert_eq!(tree.depth(None).await.unwrap(), Some(2));
1454
1455 tree.delete("com.example.record/3jqfcqzm3ft2j").await.unwrap(); assert_eq!(tree.root, root11);
1459
1460 tree.add("com.example.record/3jqfcqzm3ft2j", value_cid()).await.unwrap(); assert_eq!(tree.root, root12);
1464
1465 tree.delete("com.example.record/3jqfcqzm3ft2j").await.unwrap(); tree.delete("com.example.record/3jqfcqzm3fz2j").await.unwrap(); assert_eq!(tree.root, root10);
1469
1470 tree.delete("com.example.record/3jqfcqzm3fx2j").await.unwrap(); assert_eq!(tree.root, root00);
1473 }
1474
1475 #[tokio::test]
1476 async fn mst_insert() {
1477 let bs = MemoryBlockStore::new();
1478 let mut tree = Tree::create(bs).await.unwrap();
1479
1480 tree.add("com.example.record/3jqfcqzm3fo2j", Cid::default()).await.unwrap();
1481 }
1482
1483 #[tokio::test]
1484 async fn mst_enum_prefixed() {
1485 let bs = MemoryBlockStore::new();
1486 let mut tree = Tree::create(bs).await.unwrap();
1487
1488 tree.add("com.example.abcd/2222222222222", value_cid()).await.unwrap();
1489 tree.add("com.example.abcd/2222222222223", value_cid()).await.unwrap();
1490 tree.add("com.example.bbcd/2222222222222", value_cid()).await.unwrap();
1491 tree.add("com.example.bbcd/2222222222223", value_cid()).await.unwrap();
1492 tree.add("com.example.bbcd/2222222222224", value_cid()).await.unwrap();
1493 tree.add("com.example.cbcd/2222222222222", value_cid()).await.unwrap();
1494 tree.add("com.example.cbcd/2222222222223", value_cid()).await.unwrap();
1495
1496 let keys = tree
1498 .entries_prefixed("com.example.abcd")
1499 .map_ok(|(k, _v)| k)
1500 .try_collect::<Vec<_>>()
1501 .await
1502 .unwrap();
1503 assert_eq!(
1504 keys.as_slice(),
1505 &["com.example.abcd/2222222222222", "com.example.abcd/2222222222223",]
1506 );
1507
1508 let keys = tree
1509 .entries_prefixed("com.example.bbcd")
1510 .map_ok(|(k, _v)| k)
1511 .try_collect::<Vec<_>>()
1512 .await
1513 .unwrap();
1514
1515 assert_eq!(
1516 keys.as_slice(),
1517 &[
1518 "com.example.bbcd/2222222222222",
1519 "com.example.bbcd/2222222222223",
1520 "com.example.bbcd/2222222222224",
1521 ]
1522 );
1523 }
1524}