1use super::index::*;
3use crate::{
4 forest::{
5 ChunkVisitor, Config, FilteredChunk, Forest, IndexIter, Secrets, Transaction, TreeIter,
6 TreeTypes,
7 },
8 store::{BanyanValue, BlockWriter},
9};
10use crate::{query::Query, store::ReadOnlyStore, util::IterExt, StreamBuilder, StreamBuilderState};
11use anyhow::Result;
12use core::fmt;
13use futures::prelude::*;
14use std::{collections::BTreeMap, iter, marker::PhantomData, usize};
15
16#[derive(Clone)]
17pub struct Tree<T: TreeTypes, V>(Option<(Index<T>, Secrets, u64)>, PhantomData<V>);
18
19impl<T: TreeTypes, V> Tree<T, V> {
20 pub(crate) fn new(root: Index<T>, secrets: Secrets, offset: u64) -> Self {
21 Self(Some((root, secrets, offset)), PhantomData)
22 }
23
24 pub(crate) fn into_inner(self) -> Option<(Index<T>, Secrets, u64)> {
25 self.0
26 }
27
28 pub fn as_index_ref(&self) -> Option<&Index<T>> {
29 self.0.as_ref().map(|(r, _, _)| r)
30 }
31
32 pub fn link(&self) -> Option<T::Link> {
33 self.0.as_ref().and_then(|(r, _, _)| *r.link())
34 }
35
36 pub fn level(&self) -> i32 {
37 self.0
38 .as_ref()
39 .map(|(r, _, _)| r.level() as i32)
40 .unwrap_or(-1)
41 }
42
43 pub fn is_empty(&self) -> bool {
45 self.0.is_none()
46 }
47
48 pub fn count(&self) -> u64 {
50 self.0
51 .as_ref()
52 .map(|(r, _, _)| r.count())
53 .unwrap_or_default()
54 }
55
56 pub fn root(&self) -> Option<&T::Link> {
58 self.0.as_ref().and_then(|(r, _, _)| r.link().as_ref())
59 }
60
61 pub fn index(&self) -> Option<&Index<T>> {
63 self.0.as_ref().map(|(r, _, _)| r)
64 }
65
66 pub fn secrets(&self) -> Option<&Secrets> {
67 self.0.as_ref().map(|(_, secrets, _)| secrets)
68 }
69}
70
71impl<T: TreeTypes, V> Default for Tree<T, V> {
72 fn default() -> Self {
73 Self(None, PhantomData)
74 }
75}
76
77impl<T: TreeTypes, V> fmt::Debug for Tree<T, V> {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 match &self.0 {
80 Some((root, ..)) => f
81 .debug_struct("Tree")
82 .field("count", &self.count())
83 .field("key_bytes", &root.key_bytes())
84 .field("value_bytes", &root.value_bytes())
85 .field("link", &root.link())
86 .finish(),
87 None => f
88 .debug_struct("Tree")
89 .field("count", &self.count())
90 .finish(),
91 }
92 }
93}
94
95impl<T: TreeTypes, V> fmt::Display for Tree<T, V> {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 match &self.0 {
98 Some((root, ..)) => write!(f, "{:?}", root.link(),),
99 None => write!(f, "empty tree"),
100 }
101 }
102}
103
104pub type GraphEdges = Vec<(usize, usize)>;
105pub type GraphNodes<S> = BTreeMap<usize, S>;
106
107impl<T: TreeTypes, R: ReadOnlyStore<T::Link>> Forest<T, R> {
108 pub fn load_stream_builder<V>(
109 &self,
110 secrets: Secrets,
111 config: Config,
112 link: T::Link,
113 ) -> Result<StreamBuilder<T, V>> {
114 let (index, byte_range) = self.create_index_from_link(
115 &secrets,
116 |items, level| config.branch_sealed(items, level),
117 link,
118 )?;
119 let state = StreamBuilderState::new(byte_range.end, secrets, config);
120 Ok(StreamBuilder::new_from_index(Some(index), state))
121 }
122
123 pub fn load_tree<V>(&self, secrets: Secrets, link: T::Link) -> Result<Tree<T, V>> {
124 let (index, byte_range) = self.create_index_from_link(&secrets, |_, _| true, link)?;
126 Ok(Tree::new(index, secrets, byte_range.end))
128 }
129
130 pub fn dump<V>(&self, tree: &Tree<T, V>) -> Result<()> {
132 match &tree.0 {
133 Some((index, secrets, _)) => self.dump0(secrets, index, ""),
134 None => Ok(()),
135 }
136 }
137
138 pub fn dump_graph<S, V>(
140 &self,
141 tree: &Tree<T, V>,
142 f: impl Fn((usize, &NodeInfo<T, R>)) -> S + Clone,
143 ) -> Result<(GraphEdges, GraphNodes<S>)> {
144 match &tree.0 {
145 Some((index, secrets, _)) => self.dump_graph0(secrets, None, 0, index, f),
146 None => anyhow::bail!("Tree must not be empty"),
147 }
148 }
149
150 pub(crate) fn traverse0<
151 Q: Query<T>,
152 V: BanyanValue,
153 E: Send + 'static,
154 F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
155 >(
156 &self,
157 secrets: Secrets,
158 query: Q,
159 index: Index<T>,
160 mk_extra: &'static F,
161 ) -> impl Iterator<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> {
162 TreeIter::new(
163 self.clone(),
164 secrets,
165 query,
166 ChunkVisitor::new(mk_extra),
167 index,
168 )
169 }
170
171 pub(crate) fn traverse_rev0<
172 Q: Query<T>,
173 V: BanyanValue,
174 E: Send + 'static,
175 F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
176 >(
177 &self,
178 secrets: Secrets,
179 query: Q,
180 index: Index<T>,
181 mk_extra: &'static F,
182 ) -> impl Iterator<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> {
183 TreeIter::new_rev(
184 self.clone(),
185 secrets,
186 query,
187 ChunkVisitor::new(mk_extra),
188 index,
189 )
190 }
191
192 fn index_iter0<Q: Query<T> + Clone + Send + 'static>(
193 &self,
194 secrets: Secrets,
195 query: Q,
196 index: Index<T>,
197 ) -> impl Iterator<Item = Result<Index<T>>> {
198 IndexIter::new(self.clone(), secrets, query, index)
199 }
200
201 fn index_iter_rev0<Q: Query<T> + Clone + Send + 'static>(
202 &self,
203 secrets: Secrets,
204 query: Q,
205 index: Index<T>,
206 ) -> impl Iterator<Item = Result<Index<T>>> {
207 IndexIter::new_rev(self.clone(), secrets, query, index)
208 }
209
210 pub(crate) fn dump_graph0<S>(
211 &self,
212 secrets: &Secrets,
213 parent_id: Option<usize>,
214 next_id: usize,
215 index: &Index<T>,
216 f: impl Fn((usize, &NodeInfo<T, R>)) -> S + Clone,
217 ) -> Result<(GraphEdges, GraphNodes<S>)> {
218 let mut edges = vec![];
219 let mut nodes: BTreeMap<usize, S> = Default::default();
220
221 let node = self.node_info(secrets, index);
222 if let Some(p) = parent_id {
223 edges.push((p, next_id));
224 }
225 nodes.insert(next_id, f((next_id, &node)));
226 if let NodeInfo::Branch(_, branch) = node {
227 let branch = branch.load_cached()?;
228 let mut cur = next_id;
229 for x in branch.children.iter() {
230 let (mut e, mut n) =
231 self.dump_graph0(secrets, Some(next_id), cur + 1, x, f.clone())?;
232 cur += n.len();
233 edges.append(&mut e);
234 nodes.append(&mut n);
235 }
236 }
237
238 Ok((edges, nodes))
239 }
240
241 pub fn roots<V>(&self, tree: &StreamBuilder<T, V>) -> Result<Vec<Index<T>>> {
243 match tree.index() {
244 Some(index) => self.roots_impl(tree.state().secrets(), index),
245 None => Ok(Vec::new()),
246 }
247 }
248
249 pub fn left_roots<V>(&self, tree: &Tree<T, V>) -> Result<Vec<Tree<T, V>>> {
251 Ok(if let Some((index, secrets, _)) = &tree.0 {
252 self.left_roots0(secrets, index)?
253 .into_iter()
254 .map(|x| Tree::new(x, secrets.clone(), u64::max_value()))
255 .collect()
256 } else {
257 Vec::new()
258 })
259 }
260
261 fn left_roots0(&self, secrets: &Secrets, index: &Index<T>) -> Result<Vec<Index<T>>> {
263 let mut result = if let Index::Branch(branch) = index {
264 if let Some(link) = branch.link {
265 let branch = self.load_branch_cached_from_link(secrets, &link)?;
266 self.left_roots0(secrets, branch.first_child())?
267 } else {
268 Vec::new()
269 }
270 } else {
271 Vec::new()
272 };
273 result.push(index.clone());
274 Ok(result)
275 }
276
277 pub fn check_invariants<V>(&self, tree: &StreamBuilder<T, V>) -> Result<Vec<String>> {
278 let mut msgs = Vec::new();
279 if let Some(root) = tree.index() {
280 let mut level = i32::max_value();
281 self.check_invariants0(
282 tree.state().secrets(),
283 tree.state().config(),
284 root,
285 &mut level,
286 &mut msgs,
287 )?;
288 }
289 Ok(msgs)
290 }
291
292 pub fn is_packed<V>(&self, tree: &Tree<T, V>) -> Result<bool> {
293 if let Some((root, secrets, _)) = &tree.0 {
294 self.is_packed0(secrets, root)
295 } else {
296 Ok(true)
297 }
298 }
299
300 pub fn assert_invariants<V>(&self, tree: &StreamBuilder<T, V>) -> Result<()> {
301 let msgs = self.check_invariants(tree)?;
302 if !msgs.is_empty() {
303 let invariants = msgs.join(",");
304 for msg in msgs {
305 tracing::error!("Invariant failed: {}", msg);
306 }
307 panic!("assert_invariants failed {}", invariants);
308 }
309 Ok(())
310 }
311
312 pub fn stream_filtered<V: BanyanValue>(
313 &self,
314 tree: &Tree<T, V>,
315 query: impl Query<T> + Clone + 'static,
316 ) -> impl Stream<Item = Result<(u64, T::Key, V)>> + 'static {
317 match &tree.0 {
318 Some((index, secrets, _)) => self
319 .stream_filtered0(secrets.clone(), query, index.clone())
320 .left_stream(),
321 None => stream::empty().right_stream(),
322 }
323 }
324
325 pub fn iter_index<V>(
328 &self,
329 tree: &Tree<T, V>,
330 query: impl Query<T> + Clone + 'static,
331 ) -> impl Iterator<Item = Result<Index<T>>> + 'static {
332 match &tree.0 {
333 Some((index, secrets, _)) => self
334 .index_iter0(secrets.clone(), query, index.clone())
335 .left_iter(),
336 None => iter::empty().right_iter(),
337 }
338 }
339
340 pub fn iter_index_reverse<V>(
343 &self,
344 tree: &Tree<T, V>,
345 query: impl Query<T> + Clone + 'static,
346 ) -> impl Iterator<Item = Result<Index<T>>> + 'static {
347 match &tree.0 {
348 Some((index, secrets, _)) => self
349 .index_iter_rev0(secrets.clone(), query, index.clone())
350 .left_iter(),
351 None => iter::empty().right_iter(),
352 }
353 }
354
355 pub fn iter_filtered<V: BanyanValue>(
356 &self,
357 tree: &Tree<T, V>,
358 query: impl Query<T> + Clone + 'static,
359 ) -> impl Iterator<Item = Result<(u64, T::Key, V)>> + 'static {
360 match &tree.0 {
361 Some((index, secrets, _)) => self
362 .iter_filtered0(secrets.clone(), query, index.clone())
363 .left_iter(),
364 None => iter::empty().right_iter(),
365 }
366 }
367
368 pub fn iter_filtered_reverse<V: BanyanValue>(
369 &self,
370 tree: &Tree<T, V>,
371 query: impl Query<T> + Clone + 'static,
372 ) -> impl Iterator<Item = Result<(u64, T::Key, V)>> + 'static {
373 match &tree.0 {
374 Some((index, secrets, _)) => self
375 .iter_filtered_reverse0(secrets.clone(), query, index.clone())
376 .left_iter(),
377 None => iter::empty().right_iter(),
378 }
379 }
380
381 pub fn iter_from<V: BanyanValue>(
382 &self,
383 tree: &Tree<T, V>,
384 ) -> impl Iterator<Item = Result<(u64, T::Key, V)>> + 'static {
385 match &tree.0 {
386 Some((index, secrets, _)) => self
387 .iter_filtered0(secrets.clone(), crate::query::AllQuery, index.clone())
388 .left_iter(),
389 None => iter::empty().right_iter(),
390 }
391 }
392
393 pub fn iter_filtered_chunked<Q, V, E, F>(
394 &self,
395 tree: &Tree<T, V>,
396 query: Q,
397 mk_extra: &'static F,
398 ) -> impl Iterator<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> + 'static
399 where
400 Q: Query<T>,
401 V: BanyanValue,
402 E: Send + 'static,
403 F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
404 {
405 match &tree.0 {
406 Some((index, secrets, _)) => self
407 .traverse0(secrets.clone(), query, index.clone(), mk_extra)
408 .left_iter(),
409 None => iter::empty().right_iter(),
410 }
411 }
412
413 pub fn iter_filtered_chunked_reverse<Q, V, E, F>(
414 &self,
415 tree: &Tree<T, V>,
416 query: Q,
417 mk_extra: &'static F,
418 ) -> impl Iterator<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> + 'static
419 where
420 Q: Query<T>,
421 V: BanyanValue,
422 E: Send + 'static,
423 F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
424 {
425 match &tree.0 {
426 Some((index, secrets, _)) => self
427 .traverse_rev0(secrets.clone(), query, index.clone(), mk_extra)
428 .left_iter(),
429 None => iter::empty().right_iter(),
430 }
431 }
432
433 pub fn stream_filtered_chunked<Q, V, E, F>(
434 &self,
435 tree: &Tree<T, V>,
436 query: Q,
437 mk_extra: &'static F,
438 ) -> impl Stream<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> + 'static
439 where
440 Q: Query<T>,
441 V: BanyanValue,
442 E: Send + 'static,
443 F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
444 {
445 match &tree.0 {
446 Some((index, secrets, _)) => self
447 .stream_filtered_chunked0(secrets.clone(), query, index.clone(), mk_extra)
448 .left_stream(),
449 None => stream::empty().right_stream(),
450 }
451 }
452
453 pub fn stream_filtered_chunked_reverse<Q, V, E, F>(
454 &self,
455 tree: &Tree<T, V>,
456 query: Q,
457 mk_extra: &'static F,
458 ) -> impl Stream<Item = Result<FilteredChunk<(u64, T::Key, V), E>>> + 'static
459 where
460 Q: Query<T>,
461 V: BanyanValue,
462 E: Send + 'static,
463 F: Fn(&NodeInfo<T, R>) -> E + Send + Sync + 'static,
464 {
465 match &tree.0 {
466 Some((index, secrets, _)) => self
467 .stream_filtered_chunked_reverse0(secrets, query, index.clone(), mk_extra)
468 .left_stream(),
469 None => stream::empty().right_stream(),
470 }
471 }
472
473 pub fn get<V: BanyanValue>(
479 &self,
480 tree: &Tree<T, V>,
481 offset: u64,
482 ) -> Result<Option<(T::Key, V)>> {
483 Ok(match &tree.0 {
484 Some((index, secrets, _)) => self.get0(secrets, index, offset)?,
485 None => None,
486 })
487 }
488
489 #[allow(clippy::type_complexity)]
491 pub fn collect<V: BanyanValue>(&self, tree: &Tree<T, V>) -> Result<Vec<Option<(T::Key, V)>>> {
492 self.collect_from(tree, 0)
493 }
494
495 #[allow(clippy::type_complexity)]
497 pub fn collect_from<V: BanyanValue>(
498 &self,
499 tree: &Tree<T, V>,
500 offset: u64,
501 ) -> Result<Vec<Option<(T::Key, V)>>> {
502 let mut res = Vec::new();
503 if let Some((index, secrets, _)) = &tree.0 {
504 self.collect0(secrets, index, offset, &mut res)?;
505 }
506 Ok(res)
507 }
508}
509
510impl<T: TreeTypes, R: ReadOnlyStore<T::Link>, W: BlockWriter<T::Link>> Transaction<T, R, W> {
511 pub(crate) fn tree_from_roots<V>(
512 &mut self,
513 mut roots: Vec<Index<T>>,
514 stream: &mut StreamBuilder<T, V>,
515 ) -> Result<()> {
516 assert!(roots.iter().all(|x| x.sealed()));
517 assert!(is_sorted(roots.iter().map(|x| x.level()).rev()));
518 while roots.len() > 1 {
519 self.simplify_roots(&mut roots, 0, stream.state_mut())?;
520 }
521 stream.set_index(roots.pop());
522 Ok(())
523 }
524
525 pub fn pack<V: BanyanValue>(&mut self, tree: &mut StreamBuilder<T, V>) -> Result<()> {
533 let initial = tree.snapshot();
534 let roots = self.roots(tree)?;
535 self.tree_from_roots(roots, tree)?;
536 let remainder: Vec<_> = self
537 .collect_from(&initial, tree.count())?
538 .into_iter()
539 .collect::<Option<Vec<_>>>()
540 .ok_or_else(|| anyhow::anyhow!("found purged data"))?;
541 self.extend(tree, remainder)?;
542 Ok(())
543 }
544
545 pub fn push<V: BanyanValue>(
547 &mut self,
548 tree: &mut StreamBuilder<T, V>,
549 key: T::Key,
550 value: V,
551 ) -> Result<()> {
552 self.extend(tree, Some((key, value)))
553 }
554
555 pub fn extend<I, V>(&mut self, tree: &mut StreamBuilder<T, V>, from: I) -> Result<()>
559 where
560 I: IntoIterator<Item = (T::Key, V)>,
561 I::IntoIter: Send,
562 V: BanyanValue,
563 {
564 let mut from = from.into_iter().peekable();
565 if from.peek().is_none() {
566 return Ok(());
568 }
569 let index = tree.as_index_ref().cloned();
570 let index = self.extend_above(
571 index.as_ref(),
572 u32::max_value(),
573 from.by_ref(),
574 tree.state_mut(),
575 )?;
576 tree.set_index(Some(index));
577 Ok(())
578 }
579
580 pub fn extend_unpacked<I, V>(&mut self, tree: &mut StreamBuilder<T, V>, from: I) -> Result<()>
590 where
591 I: IntoIterator<Item = (T::Key, V)>,
592 I::IntoIter: Send,
593 V: BanyanValue,
594 {
595 let index = tree.as_index_ref().cloned();
596 let index = self.extend_unpacked0(index.as_ref(), from, tree.state_mut())?;
597 tree.set_index(index);
598 Ok(())
599 }
600
601 pub fn retain<'a, Q: Query<T> + Send + Sync, V>(
612 &'a mut self,
613 tree: &mut StreamBuilder<T, V>,
614 query: &'a Q,
615 ) -> Result<()> {
616 let index = tree.index().cloned();
617 if let Some(index) = index {
618 let mut level: i32 = i32::max_value();
619 let index = self.retain0(0, query, &index, &mut level, tree.state_mut())?;
620 tree.set_index(Some(index));
621 }
622 Ok(())
623 }
624
625 pub fn repair<V>(&mut self, tree: &mut StreamBuilder<T, V>) -> Result<Vec<String>> {
633 let mut report = Vec::new();
634 let index = tree.index().cloned();
635 if let Some(index) = index {
636 let mut level: i32 = i32::max_value();
637 let repaired = self.repair0(&index, &mut report, &mut level, tree.state_mut())?;
638 tree.set_index(Some(repaired));
639 }
640 Ok(report)
641 }
642}
643
644fn is_sorted<T: Ord>(iter: impl Iterator<Item = T>) -> bool {
645 iter.collect::<Vec<_>>().windows(2).all(|x| x[0] <= x[1])
646}