banyan/
tree.rs

1//! creation and traversal of banyan trees
2use super::index::*;
3use crate::{
4    forest::{
5        ChunkVisitor, Config, FilteredChunk, Forest, IndexIter, Secrets, Transaction, TreeIter,
6        TreeTypes,
7    },
8    store::{BanyanValue, BlockWriter},
9};
10use crate::{query::Query, store::ReadOnlyStore, util::IterExt, StreamBuilder, StreamBuilderState};
11use anyhow::Result;
12use core::fmt;
13use futures::prelude::*;
14use std::{collections::BTreeMap, iter, marker::PhantomData, usize};
15
16#[derive(Clone)]
17pub struct Tree<T: TreeTypes, V>(Option<(Index<T>, Secrets, u64)>, PhantomData<V>);
18
19impl<T: TreeTypes, V> Tree<T, V> {
20    pub(crate) fn new(root: Index<T>, secrets: Secrets, offset: u64) -> Self {
21        Self(Some((root, secrets, offset)), PhantomData)
22    }
23
24    pub(crate) fn into_inner(self) -> Option<(Index<T>, Secrets, u64)> {
25        self.0
26    }
27
28    pub fn as_index_ref(&self) -> Option<&Index<T>> {
29        self.0.as_ref().map(|(r, _, _)| r)
30    }
31
32    pub fn link(&self) -> Option<T::Link> {
33        self.0.as_ref().and_then(|(r, _, _)| *r.link())
34    }
35
36    pub fn level(&self) -> i32 {
37        self.0
38            .as_ref()
39            .map(|(r, _, _)| r.level() as i32)
40            .unwrap_or(-1)
41    }
42
43    /// true for an empty tree
44    pub fn is_empty(&self) -> bool {
45        self.0.is_none()
46    }
47
48    /// number of elements in the tree
49    pub fn count(&self) -> u64 {
50        self.0
51            .as_ref()
52            .map(|(r, _, _)| r.count())
53            .unwrap_or_default()
54    }
55
56    /// root of a non-empty tree
57    pub fn root(&self) -> Option<&T::Link> {
58        self.0.as_ref().and_then(|(r, _, _)| r.link().as_ref())
59    }
60
61    /// root of a non-empty tree
62    pub fn index(&self) -> Option<&Index<T>> {
63        self.0.as_ref().map(|(r, _, _)| r)
64    }
65
66    pub fn secrets(&self) -> Option<&Secrets> {
67        self.0.as_ref().map(|(_, secrets, _)| secrets)
68    }
69}
70
71impl<T: TreeTypes, V> Default for Tree<T, V> {
72    fn default() -> Self {
73        Self(None, PhantomData)
74    }
75}
76
77impl<T: TreeTypes, V> fmt::Debug for Tree<T, V> {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        match &self.0 {
80            Some((root, ..)) => f
81                .debug_struct("Tree")
82                .field("count", &self.count())
83                .field("key_bytes", &root.key_bytes())
84                .field("value_bytes", &root.value_bytes())
85                .field("link", &root.link())
86                .finish(),
87            None => f
88                .debug_struct("Tree")
89                .field("count", &self.count())
90                .finish(),
91        }
92    }
93}
94
95impl<T: TreeTypes, V> fmt::Display for Tree<T, V> {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        match &self.0 {
98            Some((root, ..)) => write!(f, "{:?}", root.link(),),
99            None => write!(f, "empty tree"),
100        }
101    }
102}
103
104pub type GraphEdges = Vec<(usize, usize)>;
105pub type GraphNodes<S> = BTreeMap<usize, S>;
106
107impl<T: TreeTypes, R: ReadOnlyStore<T::Link>> Forest<T, R> {
108    pub fn load_stream_builder<V>(
109        &self,
110        secrets: Secrets,
111        config: Config,
112        link: T::Link,
113    ) -> Result<StreamBuilder<T, V>> {
114        let (index, byte_range) = self.create_index_from_link(
115            &secrets,
116            |items, level| config.branch_sealed(items, level),
117            link,
118        )?;
119        let state = StreamBuilderState::new(byte_range.end, secrets, config);
120        Ok(StreamBuilder::new_from_index(Some(index), state))
121    }
122
123    pub fn load_tree<V>(&self, secrets: Secrets, link: T::Link) -> Result<Tree<T, V>> {
124        // we pass in a predicate that makes the nodes sealed, since we don't care
125        let (index, byte_range) = self.create_index_from_link(&secrets, |_, _| true, link)?;
126        // store the offset with the snapshot. Snapshots are immutable, so this won't change.
127        Ok(Tree::new(index, secrets, byte_range.end))
128    }
129
130    /// dumps the tree structure
131    pub fn dump<V>(&self, tree: &Tree<T, V>) -> Result<()> {
132        match &tree.0 {
133            Some((index, secrets, _)) => self.dump0(secrets, index, ""),
134            None => Ok(()),
135        }
136    }
137
138    /// dumps the tree structure
139    pub fn dump_graph<S, V>(
140        &self,
141        tree: &Tree<T, V>,
142        f: impl Fn((usize, &NodeInfo<T, R>)) -> S + Clone,
143    ) -> Result<(GraphEdges, GraphNodes<S>)> {
144        match &tree.0 {
145            Some((index, secrets, _)) => self.dump_graph0(secrets, None, 0, index, f),
146            None => anyhow::bail!("Tree must not be empty"),
147        }
148    }
149
150    pub(crate) fn traverse0<
151        Q: Query<T>,
152        V: BanyanValue,
153        E: Send + 'static,
154        F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
155    >(
156        &self,
157        secrets: Secrets,
158        query: Q,
159        index: Index<T>,
160        mk_extra: &'static F,
161    ) -> impl Iterator<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> {
162        TreeIter::new(
163            self.clone(),
164            secrets,
165            query,
166            ChunkVisitor::new(mk_extra),
167            index,
168        )
169    }
170
171    pub(crate) fn traverse_rev0<
172        Q: Query<T>,
173        V: BanyanValue,
174        E: Send + 'static,
175        F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
176    >(
177        &self,
178        secrets: Secrets,
179        query: Q,
180        index: Index<T>,
181        mk_extra: &'static F,
182    ) -> impl Iterator<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> {
183        TreeIter::new_rev(
184            self.clone(),
185            secrets,
186            query,
187            ChunkVisitor::new(mk_extra),
188            index,
189        )
190    }
191
192    fn index_iter0<Q: Query<T> + Clone + Send + 'static>(
193        &self,
194        secrets: Secrets,
195        query: Q,
196        index: Index<T>,
197    ) -> impl Iterator<Item = Result<Index<T>>> {
198        IndexIter::new(self.clone(), secrets, query, index)
199    }
200
201    fn index_iter_rev0<Q: Query<T> + Clone + Send + 'static>(
202        &self,
203        secrets: Secrets,
204        query: Q,
205        index: Index<T>,
206    ) -> impl Iterator<Item = Result<Index<T>>> {
207        IndexIter::new_rev(self.clone(), secrets, query, index)
208    }
209
210    pub(crate) fn dump_graph0<S>(
211        &self,
212        secrets: &Secrets,
213        parent_id: Option<usize>,
214        next_id: usize,
215        index: &Index<T>,
216        f: impl Fn((usize, &NodeInfo<T, R>)) -> S + Clone,
217    ) -> Result<(GraphEdges, GraphNodes<S>)> {
218        let mut edges = vec![];
219        let mut nodes: BTreeMap<usize, S> = Default::default();
220
221        let node = self.node_info(secrets, index);
222        if let Some(p) = parent_id {
223            edges.push((p, next_id));
224        }
225        nodes.insert(next_id, f((next_id, &node)));
226        if let NodeInfo::Branch(_, branch) = node {
227            let branch = branch.load_cached()?;
228            let mut cur = next_id;
229            for x in branch.children.iter() {
230                let (mut e, mut n) =
231                    self.dump_graph0(secrets, Some(next_id), cur + 1, x, f.clone())?;
232                cur += n.len();
233                edges.append(&mut e);
234                nodes.append(&mut n);
235            }
236        }
237
238        Ok((edges, nodes))
239    }
240
241    /// sealed roots of the tree
242    pub fn roots<V>(&self, tree: &StreamBuilder<T, V>) -> Result<Vec<Index<T>>> {
243        match tree.index() {
244            Some(index) => self.roots_impl(tree.state().secrets(), index),
245            None => Ok(Vec::new()),
246        }
247    }
248
249    /// leftmost branches of the tree as separate trees
250    pub fn left_roots<V>(&self, tree: &Tree<T, V>) -> Result<Vec<Tree<T, V>>> {
251        Ok(if let Some((index, secrets, _)) = &tree.0 {
252            self.left_roots0(secrets, index)?
253                .into_iter()
254                .map(|x| Tree::new(x, secrets.clone(), u64::max_value()))
255                .collect()
256        } else {
257            Vec::new()
258        })
259    }
260
261    /// leftmost branches of the tree as separate trees
262    fn left_roots0(&self, secrets: &Secrets, index: &Index<T>) -> Result<Vec<Index<T>>> {
263        let mut result = if let Index::Branch(branch) = index {
264            if let Some(link) = branch.link {
265                let branch = self.load_branch_cached_from_link(secrets, &link)?;
266                self.left_roots0(secrets, branch.first_child())?
267            } else {
268                Vec::new()
269            }
270        } else {
271            Vec::new()
272        };
273        result.push(index.clone());
274        Ok(result)
275    }
276
277    pub fn check_invariants<V>(&self, tree: &StreamBuilder<T, V>) -> Result<Vec<String>> {
278        let mut msgs = Vec::new();
279        if let Some(root) = tree.index() {
280            let mut level = i32::max_value();
281            self.check_invariants0(
282                tree.state().secrets(),
283                tree.state().config(),
284                root,
285                &mut level,
286                &mut msgs,
287            )?;
288        }
289        Ok(msgs)
290    }
291
292    pub fn is_packed<V>(&self, tree: &Tree<T, V>) -> Result<bool> {
293        if let Some((root, secrets, _)) = &tree.0 {
294            self.is_packed0(secrets, root)
295        } else {
296            Ok(true)
297        }
298    }
299
300    pub fn assert_invariants<V>(&self, tree: &StreamBuilder<T, V>) -> Result<()> {
301        let msgs = self.check_invariants(tree)?;
302        if !msgs.is_empty() {
303            let invariants = msgs.join(",");
304            for msg in msgs {
305                tracing::error!("Invariant failed: {}", msg);
306            }
307            panic!("assert_invariants failed {}", invariants);
308        }
309        Ok(())
310    }
311
312    pub fn stream_filtered<V: BanyanValue>(
313        &self,
314        tree: &Tree<T, V>,
315        query: impl Query<T> + Clone + 'static,
316    ) -> impl Stream<Item = Result<(u64, T::Key, V)>> + 'static {
317        match &tree.0 {
318            Some((index, secrets, _)) => self
319                .stream_filtered0(secrets.clone(), query, index.clone())
320                .left_stream(),
321            None => stream::empty().right_stream(),
322        }
323    }
324
325    /// Returns an iterator yielding all indexes that have values matching the
326    /// provided query.
327    pub fn iter_index<V>(
328        &self,
329        tree: &Tree<T, V>,
330        query: impl Query<T> + Clone + 'static,
331    ) -> impl Iterator<Item = Result<Index<T>>> + 'static {
332        match &tree.0 {
333            Some((index, secrets, _)) => self
334                .index_iter0(secrets.clone(), query, index.clone())
335                .left_iter(),
336            None => iter::empty().right_iter(),
337        }
338    }
339
340    /// Returns an iterator yielding all indexes that have values matching the
341    /// provided query in reverse order.
342    pub fn iter_index_reverse<V>(
343        &self,
344        tree: &Tree<T, V>,
345        query: impl Query<T> + Clone + 'static,
346    ) -> impl Iterator<Item = Result<Index<T>>> + 'static {
347        match &tree.0 {
348            Some((index, secrets, _)) => self
349                .index_iter_rev0(secrets.clone(), query, index.clone())
350                .left_iter(),
351            None => iter::empty().right_iter(),
352        }
353    }
354
355    pub fn iter_filtered<V: BanyanValue>(
356        &self,
357        tree: &Tree<T, V>,
358        query: impl Query<T> + Clone + 'static,
359    ) -> impl Iterator<Item = Result<(u64, T::Key, V)>> + 'static {
360        match &tree.0 {
361            Some((index, secrets, _)) => self
362                .iter_filtered0(secrets.clone(), query, index.clone())
363                .left_iter(),
364            None => iter::empty().right_iter(),
365        }
366    }
367
368    pub fn iter_filtered_reverse<V: BanyanValue>(
369        &self,
370        tree: &Tree<T, V>,
371        query: impl Query<T> + Clone + 'static,
372    ) -> impl Iterator<Item = Result<(u64, T::Key, V)>> + 'static {
373        match &tree.0 {
374            Some((index, secrets, _)) => self
375                .iter_filtered_reverse0(secrets.clone(), query, index.clone())
376                .left_iter(),
377            None => iter::empty().right_iter(),
378        }
379    }
380
381    pub fn iter_from<V: BanyanValue>(
382        &self,
383        tree: &Tree<T, V>,
384    ) -> impl Iterator<Item = Result<(u64, T::Key, V)>> + 'static {
385        match &tree.0 {
386            Some((index, secrets, _)) => self
387                .iter_filtered0(secrets.clone(), crate::query::AllQuery, index.clone())
388                .left_iter(),
389            None => iter::empty().right_iter(),
390        }
391    }
392
393    pub fn iter_filtered_chunked<Q, V, E, F>(
394        &self,
395        tree: &Tree<T, V>,
396        query: Q,
397        mk_extra: &'static F,
398    ) -> impl Iterator<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> + 'static
399    where
400        Q: Query<T>,
401        V: BanyanValue,
402        E: Send + 'static,
403        F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
404    {
405        match &tree.0 {
406            Some((index, secrets, _)) => self
407                .traverse0(secrets.clone(), query, index.clone(), mk_extra)
408                .left_iter(),
409            None => iter::empty().right_iter(),
410        }
411    }
412
413    pub fn iter_filtered_chunked_reverse<Q, V, E, F>(
414        &self,
415        tree: &Tree<T, V>,
416        query: Q,
417        mk_extra: &'static F,
418    ) -> impl Iterator<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> + 'static
419    where
420        Q: Query<T>,
421        V: BanyanValue,
422        E: Send + 'static,
423        F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
424    {
425        match &tree.0 {
426            Some((index, secrets, _)) => self
427                .traverse_rev0(secrets.clone(), query, index.clone(), mk_extra)
428                .left_iter(),
429            None => iter::empty().right_iter(),
430        }
431    }
432
433    pub fn stream_filtered_chunked<Q, V, E, F>(
434        &self,
435        tree: &Tree<T, V>,
436        query: Q,
437        mk_extra: &'static F,
438    ) -> impl Stream<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> + 'static
439    where
440        Q: Query<T>,
441        V: BanyanValue,
442        E: Send + 'static,
443        F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
444    {
445        match &tree.0 {
446            Some((index, secrets, _)) => self
447                .stream_filtered_chunked0(secrets.clone(), query, index.clone(), mk_extra)
448                .left_stream(),
449            None => stream::empty().right_stream(),
450        }
451    }
452
453    pub fn stream_filtered_chunked_reverse<Q, V, E, F>(
454        &self,
455        tree: &Tree<T, V>,
456        query: Q,
457        mk_extra: &'static F,
458    ) -> impl Stream<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> + 'static
459    where
460        Q: Query<T>,
461        V: BanyanValue,
462        E: Send + 'static,
463        F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
464    {
465        match &tree.0 {
466            Some((index, secrets, _)) => self
467                .stream_filtered_chunked_reverse0(secrets, query, index.clone(), mk_extra)
468                .left_stream(),
469            None => stream::empty().right_stream(),
470        }
471    }
472
473    /// element at index
474    ///
475    /// returns Ok(None) when offset is larger than count, or when hitting a purged
476    /// part of the tree. Returns an error when part of the tree should be there, but could
477    /// not be read.
478    pub fn get<V: BanyanValue>(
479        &self,
480        tree: &Tree<T, V>,
481        offset: u64,
482    ) -> Result<Option<(T::Key, V)>> {
483        Ok(match &tree.0 {
484            Some((index, secrets, _)) => self.get0(secrets, index, offset)?,
485            None => None,
486        })
487    }
488
489    /// Collects all elements from a stream. Might produce an OOM for large streams.
490    #[allow(clippy::type_complexity)]
491    pub fn collect<V: BanyanValue>(&self, tree: &Tree<T, V>) -> Result<Vec<Option<(T::Key, V)>>> {
492        self.collect_from(tree, 0)
493    }
494
495    /// Collects all elements from the given offset. Might produce an OOM for large streams.
496    #[allow(clippy::type_complexity)]
497    pub fn collect_from<V: BanyanValue>(
498        &self,
499        tree: &Tree<T, V>,
500        offset: u64,
501    ) -> Result<Vec<Option<(T::Key, V)>>> {
502        let mut res = Vec::new();
503        if let Some((index, secrets, _)) = &tree.0 {
504            self.collect0(secrets, index, offset, &mut res)?;
505        }
506        Ok(res)
507    }
508}
509
510impl<T: TreeTypes, R: ReadOnlyStore<T::Link>, W: BlockWriter<T::Link>> Transaction<T, R, W> {
511    pub(crate) fn tree_from_roots<V>(
512        &mut self,
513        mut roots: Vec<Index<T>>,
514        stream: &mut StreamBuilder<T, V>,
515    ) -> Result<()> {
516        assert!(roots.iter().all(|x| x.sealed()));
517        assert!(is_sorted(roots.iter().map(|x| x.level()).rev()));
518        while roots.len() > 1 {
519            self.simplify_roots(&mut roots, 0, stream.state_mut())?;
520        }
521        stream.set_index(roots.pop());
522        Ok(())
523    }
524
525    /// Packs the tree to the left.
526    ///
527    /// For an already packed tree, this is a noop.
528    /// Otherwise, all packed subtrees will be reused without touching them.
529    /// Likewise, sealed subtrees or leafs will be reused if possible.
530    ///
531    /// ![packing illustration](https://ipfs.io/ipfs/QmaEDTjHSdCKyGQ3cFMCf73kE67NvffLA5agquLW5qSEVn/packing.jpg)
532    pub fn pack<V: BanyanValue>(&mut self, tree: &mut StreamBuilder<T, V>) -> Result<()> {
533        let initial = tree.snapshot();
534        let roots = self.roots(tree)?;
535        self.tree_from_roots(roots, tree)?;
536        let remainder: Vec<_> = self
537            .collect_from(&initial, tree.count())?
538            .into_iter()
539            .collect::<Option<Vec<_>>>()
540            .ok_or_else(|| anyhow::anyhow!("found purged data"))?;
541        self.extend(tree, remainder)?;
542        Ok(())
543    }
544
545    /// append a single element. This is just a shortcut for extend.
546    pub fn push<V: BanyanValue>(
547        &mut self,
548        tree: &mut StreamBuilder<T, V>,
549        key: T::Key,
550        value: V,
551    ) -> Result<()> {
552        self.extend(tree, Some((key, value)))
553    }
554
555    /// extend the node with the given iterator of key/value pairs
556    ///
557    /// ![extend illustration](https://ipfs.io/ipfs/QmaEDTjHSdCKyGQ3cFMCf73kE67NvffLA5agquLW5qSEVn/extend.jpg)
558    pub fn extend<I, V>(&mut self, tree: &mut StreamBuilder<T, V>, from: I) -> Result<()>
559    where
560        I: IntoIterator<Item = (T::Key, V)>,
561        I::IntoIter: Send,
562        V: BanyanValue,
563    {
564        let mut from = from.into_iter().peekable();
565        if from.peek().is_none() {
566            // nothing to do
567            return Ok(());
568        }
569        let index = tree.as_index_ref().cloned();
570        let index = self.extend_above(
571            index.as_ref(),
572            u32::max_value(),
573            from.by_ref(),
574            tree.state_mut(),
575        )?;
576        tree.set_index(Some(index));
577        Ok(())
578    }
579
580    /// extend the node with the given iterator of key/value pairs
581    ///
582    /// This variant will not pack the tree, but just create a new tree from the new values and join it
583    /// with the previous tree via an unpacked branch node. Essentially this will produce a degenerate tree
584    /// that resembles a linked list.
585    ///
586    /// To pack a tree, use the pack method.
587    ///
588    /// ![extend_unpacked illustration](https://ipfs.io/ipfs/QmaEDTjHSdCKyGQ3cFMCf73kE67NvffLA5agquLW5qSEVn/extend_unpacked.jpg)
589    pub fn extend_unpacked<I, V>(&mut self, tree: &mut StreamBuilder<T, V>, from: I) -> Result<()>
590    where
591        I: IntoIterator<Item = (T::Key, V)>,
592        I::IntoIter: Send,
593        V: BanyanValue,
594    {
595        let index = tree.as_index_ref().cloned();
596        let index = self.extend_unpacked0(index.as_ref(), from, tree.state_mut())?;
597        tree.set_index(index);
598        Ok(())
599    }
600
601    /// Retain just data matching the query
602    ///
603    /// this is done as best effort and will not be precise. E.g. if a chunk of data contains
604    /// just a tiny bit that needs to be retained, the entire chunk will be retained.
605    ///
606    /// from this follows that this is not a suitable method if you want to ensure that the
607    /// non-matching data is completely gone.
608    ///
609    /// note that offsets will not be affected by this. Also, unsealed nodes will not be forgotten
610    /// even if they do not match the query.
611    pub fn retain<'a, Q: Query<T> + Send + Sync, V>(
612        &'a mut self,
613        tree: &mut StreamBuilder<T, V>,
614        query: &'a Q,
615    ) -> Result<()> {
616        let index = tree.index().cloned();
617        if let Some(index) = index {
618            let mut level: i32 = i32::max_value();
619            let index = self.retain0(0, query, &index, &mut level, tree.state_mut())?;
620            tree.set_index(Some(index));
621        }
622        Ok(())
623    }
624
625    /// repair a tree by purging parts of the tree that can not be resolved.
626    ///
627    /// produces a report of links that could not be resolved.
628    ///
629    /// Note that this is an emergency measure to recover data if the tree is not completely
630    /// available. It might result in a degenerate tree that can no longer be safely added to,
631    /// especially if there are repaired blocks in the non-packed part.
632    pub fn repair<V>(&mut self, tree: &mut StreamBuilder<T, V>) -> Result<Vec<String>> {
633        let mut report = Vec::new();
634        let index = tree.index().cloned();
635        if let Some(index) = index {
636            let mut level: i32 = i32::max_value();
637            let repaired = self.repair0(&index, &mut report, &mut level, tree.state_mut())?;
638            tree.set_index(Some(repaired));
639        }
640        Ok(report)
641    }
642}
643
644fn is_sorted<T: Ord>(iter: impl Iterator<Item = T>) -> bool {
645    iter.collect::<Vec<_>>().windows(2).all(|x| x[0] <= x[1])
646}