1use std::borrow::Borrow;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::{fmt, io};
5
6#[cfg(feature = "stream")]
7use async_trait::async_trait;
8use collate::{Collate, OverlapsValue};
9#[cfg(feature = "stream")]
10use destream::de;
11use freqfs::*;
12use futures::future::{self, Future, FutureExt};
13use futures::stream::{self, Stream, StreamExt, TryStreamExt};
14use futures::try_join;
15use futures::TryFutureExt;
16use safecast::AsType;
17use smallvec::SmallVec;
18use uuid::Uuid;
19
20use super::group::GroupBy;
21use super::node::Block;
22use super::range::Range;
23use super::{Collator, Key, Schema, NODE_STACK_SIZE};
24
25type NodeStack<V> = SmallVec<[Key<V>; NODE_STACK_SIZE]>;
26
27pub type BTreeReadGuard<S, C, FE> = BTree<S, C, Arc<DirReadGuardOwned<FE>>>;
29
30pub type BTreeWriteGuard<S, C, FE> = BTree<S, C, DirWriteGuardOwned<FE>>;
32
33pub type Keys<V> = Pin<Box<dyn Stream<Item = Result<Key<V>, io::Error>> + Send>>;
35
36type Nodes<FE, V> = Pin<Box<dyn Stream<Item = Result<Leaf<FE, V>, io::Error>> + Send>>;
37
38type Node<V> = super::node::Node<Vec<Vec<V>>>;
39
40const ROOT: Uuid = Uuid::from_fields(0, 0, 0, &[0u8; 8]);
41
42pub struct Leaf<FE, V> {
44 node: FileReadGuardOwned<FE, Node<V>>,
45 range: (usize, usize),
46}
47
48impl<FE, V> Leaf<FE, V> {
49 fn new(node: FileReadGuardOwned<FE, Node<V>>, range: (usize, usize)) -> Self {
50 debug_assert!(node.is_leaf());
51 Self { node, range }
52 }
53}
54
55impl<FE, V> AsRef<[Vec<V>]> for Leaf<FE, V> {
56 fn as_ref(&self) -> &[Vec<V>] {
57 match &*self.node {
58 Node::Leaf(keys) => &keys[self.range.0..self.range.1],
59 _ => panic!("not a leaf!"),
60 }
61 }
62}
63
64pub struct BTreeLock<S, C, FE> {
66 schema: Arc<S>,
67 collator: Collator<C>,
68 dir: DirLock<FE>,
69}
70
71impl<S, C, FE> Clone for BTreeLock<S, C, FE>
72where
73 C: Clone,
74{
75 fn clone(&self) -> Self {
76 Self {
77 schema: self.schema.clone(),
78 collator: self.collator.clone(),
79 dir: self.dir.clone(),
80 }
81 }
82}
83
84impl<S, C, FE> BTreeLock<S, C, FE> {
85 pub fn collator(&self) -> &Collator<C> {
87 &self.collator
88 }
89
90 pub fn schema(&self) -> &S {
92 &self.schema
93 }
94}
95
96impl<S, C, FE> BTreeLock<S, C, FE>
97where
98 S: Schema,
99 FE: AsType<Node<S::Value>> + Send + Sync,
100 Node<S::Value>: FileLoad,
101{
102 fn new(schema: S, collator: C, dir: DirLock<FE>) -> Self {
103 Self {
104 schema: Arc::new(schema),
105 collator: Collator::new(collator),
106 dir,
107 }
108 }
109
110 pub fn create(schema: S, collator: C, dir: DirLock<FE>) -> Result<Self, io::Error> {
112 let mut nodes = dir.try_write_owned()?;
113
114 if nodes.is_empty() {
115 nodes.create_file::<Node<S::Value>>(ROOT.to_string(), Node::Leaf(vec![]), 0)?;
116
117 debug_assert!(nodes.contains(&ROOT), "B+Tree failed to create a root node");
118
119 Ok(Self::new(schema, collator, dir))
120 } else {
121 Err(io::Error::new(
122 io::ErrorKind::AlreadyExists,
123 "creating a new B+Tree requires an empty file",
124 ))
125 }
126 }
127
128 pub fn load(schema: S, collator: C, dir: DirLock<FE>) -> Result<Self, io::Error> {
130 let mut nodes = dir.try_write_owned()?;
131
132 if !nodes.contains(&ROOT) {
133 nodes.create_file(ROOT.to_string(), Node::Leaf(vec![]), 0)?;
134 }
135
136 debug_assert!(nodes.contains(&ROOT), "B+Tree failed to create a root node");
137
138 Ok(Self::new(schema, collator, dir))
139 }
140
141 pub async fn sync(&self) -> Result<(), io::Error>
143 where
144 FE: for<'a> freqfs::FileSave<'a>,
145 {
146 self.dir.sync().await
147 }
148}
149
150impl<S, C, FE> BTreeLock<S, C, FE>
151where
152 C: Clone,
153 FE: Send + Sync,
154{
155 pub async fn into_read(self) -> BTreeReadGuard<S, C, FE> {
157 #[cfg(feature = "logging")]
158 log::debug!("lock B+Tree at {:?} for reading...", self.dir);
159
160 self.dir
161 .into_read()
162 .map(Arc::new)
163 .map(|dir| BTree {
164 schema: self.schema.clone(),
165 collator: self.collator.clone(),
166 dir,
167 })
168 .await
169 }
170
171 pub async fn read(&self) -> BTreeReadGuard<S, C, FE> {
173 #[cfg(feature = "logging")]
174 log::debug!("lock B+Tree at {:?} for reading...", self.dir);
175
176 self.dir
177 .read_owned()
178 .map(Arc::new)
179 .map(|dir| BTree {
180 schema: self.schema.clone(),
181 collator: self.collator.clone(),
182 dir,
183 })
184 .await
185 }
186
187 pub fn try_read(&self) -> Result<BTreeReadGuard<S, C, FE>, io::Error> {
189 self.dir.try_read_owned().map(Arc::new).map(|dir| BTree {
190 schema: self.schema.clone(),
191 collator: self.collator.clone(),
192 dir,
193 })
194 }
195
196 pub async fn into_write(self) -> BTreeWriteGuard<S, C, FE> {
198 #[cfg(feature = "logging")]
199 log::debug!("lock B+Tree at {:?} for writing...", self.dir);
200
201 self.dir
202 .into_write()
203 .map(|dir| BTree {
204 schema: self.schema.clone(),
205 collator: self.collator.clone(),
206 dir,
207 })
208 .await
209 }
210
211 pub async fn write(&self) -> BTreeWriteGuard<S, C, FE> {
213 #[cfg(feature = "logging")]
214 log::debug!("lock B+Tree at {:?} for writing...", self.dir);
215
216 self.dir
217 .write_owned()
218 .map(|dir| BTree {
219 schema: self.schema.clone(),
220 collator: self.collator.clone(),
221 dir,
222 })
223 .await
224 }
225
226 pub fn try_write(&self) -> Result<BTreeWriteGuard<S, C, FE>, io::Error> {
228 self.dir.try_write_owned().map(|dir| BTree {
229 schema: self.schema.clone(),
230 collator: self.collator.clone(),
231 dir,
232 })
233 }
234}
235
236#[cfg(feature = "stream")]
237struct BTreeVisitor<S, C, FE> {
238 btree: BTreeLock<S, C, FE>,
239}
240
241#[cfg(feature = "stream")]
242#[async_trait]
243impl<S, C, FE> de::Visitor for BTreeVisitor<S, C, FE>
244where
245 S: Schema + Send + Sync,
246 C: Collate<Value = S::Value> + Clone + Send + Sync,
247 FE: AsType<Node<S::Value>> + Send + Sync,
248 S::Value: de::FromStream<Context = ()>,
249 Node<S::Value>: FileLoad,
250{
251 type Value = BTreeLock<S, C, FE>;
252
253 fn expecting() -> &'static str {
254 "a BTree"
255 }
256
257 async fn visit_seq<A: de::SeqAccess>(self, mut seq: A) -> Result<Self::Value, A::Error> {
258 let mut btree = self.btree.write().await;
259
260 while let Some(key) = seq.next_element(()).map_err(de::Error::custom).await? {
261 btree.insert(key).map_err(de::Error::custom).await?;
262 }
263
264 Ok(self.btree)
265 }
266}
267
268#[cfg(feature = "stream")]
269#[async_trait]
270impl<S, C, FE> de::FromStream for BTreeLock<S, C, FE>
271where
272 S: Schema + Send + Sync,
273 C: Collate<Value = S::Value> + Clone + Send + Sync,
274 FE: AsType<Node<S::Value>> + Send + Sync,
275 S::Value: de::FromStream<Context = ()>,
276 Node<S::Value>: FileLoad,
277{
278 type Context = (S, C, DirLock<FE>);
279
280 async fn from_stream<D: de::Decoder>(
281 context: Self::Context,
282 decoder: &mut D,
283 ) -> Result<Self, D::Error> {
284 let (schema, collator, dir) = context;
285 let btree = BTreeLock::create(schema, collator, dir).map_err(de::Error::custom)?;
286 decoder.decode_seq(BTreeVisitor { btree }).await
287 }
288}
289
290pub struct BTree<S, C, G> {
292 schema: Arc<S>,
293 collator: Collator<C>,
294 dir: G,
295}
296
297impl<S, C, G> Clone for BTree<S, C, G>
298where
299 C: Clone,
300 G: Clone,
301{
302 fn clone(&self) -> Self {
303 Self {
304 schema: self.schema.clone(),
305 collator: self.collator.clone(),
306 dir: self.dir.clone(),
307 }
308 }
309}
310
311impl<S, C, G> BTree<S, C, G>
312where
313 S: Schema,
314 C: Collate<Value = S::Value>,
315{
316 pub fn schema(&self) -> &S {
318 &self.schema
319 }
320}
321
322impl<S, C, FE, G> BTree<S, C, G>
323where
324 S: Schema,
325 C: Collate<Value = S::Value>,
326 FE: AsType<Node<S::Value>> + Send + Sync,
327 G: DirDeref<Entry = FE>,
328 Node<S::Value>: FileLoad + fmt::Debug,
329{
330 pub async fn contains(&self, key: &[S::Value]) -> Result<bool, io::Error> {
332 debug_assert!(
333 self.dir.as_dir().contains(&ROOT),
334 "B+Tree is missing its root node"
335 );
336
337 let mut node = self.dir.as_dir().read_file(&ROOT).await?;
338
339 loop {
340 match &*node {
341 Node::Leaf(keys) => {
342 let i = keys.bisect_left(&key, &self.collator);
343
344 break Ok(if i < keys.len() {
345 match keys.get(i) {
346 Some(present) => present == key,
347 _ => false,
348 }
349 } else {
350 false
351 });
352 }
353 Node::Index(bounds, children) => {
354 let i = bounds.bisect_right(key, &self.collator);
355
356 if i == 0 {
357 return Ok(false);
358 } else {
359 node = self.dir.as_dir().read_file(&children[i - 1]).await?;
360 }
361 }
362 }
363 }
364 }
365
366 pub async fn count<BV>(&self, range: &Range<BV>) -> Result<u64, io::Error>
368 where
369 BV: Borrow<S::Value>,
370 {
371 debug_assert!(
372 self.dir.as_dir().contains(&ROOT),
373 "B+Tree is missing its root node"
374 );
375 let root = self.dir.as_dir().read_file(&ROOT).await?;
376 self.count_inner(range, root).await
377 }
378
379 fn count_inner<'a, BV>(
380 &'a self,
381 range: &'a Range<BV>,
382 node: FileReadGuard<'a, Node<S::Value>>,
383 ) -> Pin<Box<dyn Future<Output = Result<u64, io::Error>> + 'a>>
384 where
385 BV: Borrow<S::Value> + 'a,
386 {
387 Box::pin(async move {
388 match &*node {
389 Node::Leaf(keys) if range.is_default() => Ok(keys.len() as u64),
390 Node::Leaf(keys) => {
391 let (l, r) = keys.bisect(&range, &self.collator);
392
393 if l == keys.len() {
394 Ok(0)
395 } else if l == r {
396 if range.contains_value(&keys[l], &self.collator) {
397 Ok(1)
398 } else {
399 Ok(0)
400 }
401 } else {
402 Ok((r - l) as u64)
403 }
404 }
405 Node::Index(_bounds, children) if range.is_default() => {
406 stream::iter(children)
407 .then(|node_id| self.dir.as_dir().read_file(node_id))
408 .map_ok(|node| self.count_inner(range, node))
409 .try_buffer_unordered(num_cpus::get())
410 .try_fold(0, |sum, count| future::ready(Ok(sum + count)))
411 .await
412 }
413 Node::Index(bounds, children) => {
414 let (l, r) = bounds.bisect(&range, &self.collator);
415 let l = if l == 0 { l } else { l - 1 };
416
417 if l == children.len() {
418 let node = self
419 .dir
420 .as_dir()
421 .read_file(children.last().expect("last"))
422 .await?;
423
424 self.count_inner(range, node).await
425 } else if l == r || l + 1 == r {
426 let node = self.dir.as_dir().read_file(&children[l]).await?;
427 self.count_inner(range, node).await
428 } else {
429 let left = self
430 .dir
431 .as_dir()
432 .read_file(&children[l])
433 .and_then(|node| self.count_inner(range, node));
434
435 let default_range = Range::<S::Value>::default();
436
437 let middle = stream::iter(&children[(l + 1)..(r - 1)])
438 .then(|node_id| self.dir.as_dir().read_file(node_id))
439 .map_ok(|node| self.count_inner(&default_range, node))
440 .try_buffer_unordered(num_cpus::get())
441 .try_fold(0, |sum, count| future::ready(Ok(sum + count)));
442
443 let right = self
444 .dir
445 .as_dir()
446 .read_file(&children[r - 1])
447 .and_then(|node| self.count_inner(range, node));
448
449 let (left, middle, right) = try_join!(left, middle, right)?;
450
451 Ok(left + middle + right)
452 }
453 }
454 }
455 })
456 }
457
458 pub async fn first<BV>(&self, range: Range<BV>) -> Result<Option<Key<S::Value>>, io::Error>
460 where
461 BV: Borrow<S::Value>,
462 {
463 debug_assert!(
464 self.dir.as_dir().contains(&ROOT),
465 "B+Tree is missing its root node"
466 );
467
468 let mut node = self.dir.as_dir().read_file(&ROOT).await?;
469
470 if let Node::Leaf(keys) = &*node {
471 if keys.is_empty() {
472 return Ok(None);
473 }
474 }
475
476 Ok(loop {
477 match &*node {
478 Node::Leaf(keys) => {
479 let (l, _r) = keys.bisect(&range, &self.collator);
480
481 break if l == keys.len() {
482 None
483 } else if range.contains_value(&keys[l], &self.collator) {
484 Some(stack_key(&keys[l]))
485 } else {
486 None
487 };
488 }
489 Node::Index(bounds, children) => {
490 let (l, _r) = bounds.bisect(&range, &self.collator);
491
492 if l == bounds.len() {
493 node = self
494 .dir
495 .as_dir()
496 .read_file(children.last().expect("last"))
497 .await?;
498 } else if range.contains_value(&bounds[l], &self.collator) {
499 break Some(stack_key(&bounds[l]));
500 } else {
501 node = self.dir.as_dir().read_file(&children[l]).await?;
502 }
503 }
504 }
505 })
506 }
507
508 pub async fn last<BV>(&self, range: Range<BV>) -> Result<Option<Key<S::Value>>, io::Error>
510 where
511 BV: Borrow<S::Value>,
512 {
513 debug_assert!(
514 self.dir.as_dir().contains(&ROOT),
515 "B+Tree is missing its root node"
516 );
517
518 let mut node = self.dir.as_dir().read_file(&ROOT).await?;
519
520 if let Node::Leaf(keys) = &*node {
521 if keys.is_empty() {
522 return Ok(None);
523 }
524 }
525
526 Ok(loop {
527 match &*node {
528 Node::Leaf(keys) => {
529 let (_l, r) = keys.bisect(&range, &self.collator);
530
531 break if r == keys.len() {
532 if range.contains_value(&keys[r - 1], &self.collator) {
533 Some(stack_key(&keys[r - 1]))
534 } else {
535 None
536 }
537 } else if range.contains_value(&keys[r], &self.collator) {
538 Some(stack_key(&keys[r]))
539 } else {
540 None
541 };
542 }
543 Node::Index(bounds, children) => {
544 let (_l, r) = bounds.bisect(&range, &self.collator);
545
546 if r == 0 {
547 break None;
548 } else {
549 node = self.dir.as_dir().read_file(&children[r - 1]).await?;
550 }
551 }
552 }
553 })
554 }
555
556 pub async fn is_empty<R: Borrow<Range<S::Value>>>(&self, range: R) -> Result<bool, io::Error> {
558 let range = range.borrow();
559
560 debug_assert!(
561 self.dir.as_dir().contains(&ROOT),
562 "B+Tree is missing its root node"
563 );
564
565 let mut node = self.dir.as_dir().read_file(&ROOT).await?;
566
567 Ok(loop {
568 match &*node {
569 Node::Leaf(keys) => {
570 let (l, r) = keys.bisect(range, &self.collator);
571 break l == r;
572 }
573 Node::Index(bounds, children) => {
574 let (l, r) = bounds.bisect(range, &self.collator);
575
576 if l == children.len() {
577 node = self.dir.as_dir().read_file(&children[l - 1]).await?;
578 } else if l == r {
579 node = self.dir.as_dir().read_file(&children[l]).await?;
580 } else {
581 break false;
582 }
583 }
584 }
585 })
586 }
587
588 #[cfg(debug_assertions)]
589 fn is_valid_node<'a>(
590 &'a self,
591 node: &'a Node<S::Value>,
592 ) -> Pin<Box<dyn Future<Output = Result<bool, io::Error>> + 'a>> {
593 Box::pin(async move {
594 let order = self.schema.order();
595
596 match &*node {
597 Node::Leaf(keys) => {
598 assert!(keys.len() >= (order / 2) - 1);
599 assert!(keys.len() < order);
600 }
601 Node::Index(bounds, children) => {
602 assert_eq!(bounds.len(), children.len());
603 assert!(children.len() >= self.schema.order() / 2);
604 assert!(children.len() <= order);
605
606 for (left, node_id) in bounds.iter().zip(children) {
607 let node = self.dir.as_dir().read_file(node_id).await?;
608
609 match &*node {
610 Node::Leaf(keys) => assert_eq!(left, &keys[0]),
611 Node::Index(bounds, _) => assert_eq!(left, &bounds[0]),
612 }
613
614 assert!(self.is_valid_node(&*node).await?);
615 }
616 }
617 }
618
619 Ok(true)
620 })
621 }
622}
623
624impl<S, C, FE, G> BTree<S, C, G>
625where
626 S: Schema,
627 C: Collate<Value = S::Value> + Clone + Send + Sync + 'static,
628 FE: AsType<Node<S::Value>> + Send + Sync + 'static,
629 G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
630 Node<S::Value>: FileLoad + fmt::Debug,
631{
632 pub async fn keys<BV>(
634 self,
635 range: Range<BV>,
636 reverse: bool,
637 ) -> Result<Keys<S::Value>, io::Error>
638 where
639 BV: Borrow<S::Value> + Clone + Send + Sync + 'static,
640 {
641 debug_assert!(
642 self.dir.as_dir().contains(&ROOT),
643 "B+Tree is missing its root node"
644 );
645
646 if reverse {
647 let nodes = nodes_reverse(self.dir, self.collator, range, ROOT).await?;
648
649 let keys = nodes
650 .map_ok(|leaf| async move {
651 let mut keys = NodeStack::with_capacity(leaf.as_ref().len());
652
653 for key in leaf.as_ref().into_iter().rev() {
654 keys.push(key.into_iter().cloned().collect());
655 }
656
657 Ok(stream::iter(keys).map(Ok))
658 })
659 .try_buffered(num_cpus::get())
660 .try_flatten();
661
662 Ok(Box::pin(keys))
663 } else {
664 let nodes = nodes_forward(self.dir, self.collator, range, ROOT).await?;
665
666 let keys = nodes
667 .map_ok(|leaf| async move {
668 let mut keys = NodeStack::with_capacity(leaf.as_ref().len());
669
670 for key in leaf.as_ref() {
671 keys.push(key.into_iter().cloned().collect());
672 }
673
674 Ok(stream::iter(keys).map(Ok))
675 })
676 .try_buffered(num_cpus::get())
677 .try_flatten();
678
679 Ok(Box::pin(keys))
680 }
681 }
682
683 pub async fn groups<BV>(
685 self,
686 range: Range<BV>,
687 n: usize,
688 reverse: bool,
689 ) -> Result<Keys<S::Value>, io::Error>
690 where
691 BV: Borrow<S::Value> + Clone + Send + Sync + 'static,
692 {
693 if n <= self.schema.len() {
694 let collator = self.collator.clone();
695
696 debug_assert!(
697 self.dir.as_dir().contains(&ROOT),
698 "B+Tree is missing its root node"
699 );
700
701 let nodes = if reverse {
702 nodes_reverse(self.dir, self.collator, range, ROOT).await?
703 } else {
704 nodes_forward(self.dir, self.collator, range, ROOT).await?
705 };
706
707 let groups = GroupBy::new(collator, nodes, n, reverse);
708 Ok(Box::pin(groups))
709 } else {
710 Err(io::Error::new(
711 io::ErrorKind::InvalidInput,
712 format!(
713 "a table with {} columns does not have prefix groups of length {n}",
714 self.schema.len()
715 ),
716 ))
717 }
718 }
719
720 #[cfg(debug_assertions)]
721 pub async fn is_valid(self) -> Result<bool, io::Error> {
722 {
723 debug_assert!(
724 self.dir.as_dir().contains(&ROOT),
725 "B+Tree is missing its root node"
726 );
727
728 let root = self.dir.as_dir().read_file(&ROOT).await?;
729
730 match &*root {
731 Node::Leaf(keys) => {
732 assert!(keys.len() <= self.schema.order());
733 }
734 Node::Index(bounds, children) => {
735 assert_eq!(bounds.len(), children.len());
736
737 for (left, node_id) in bounds.iter().zip(children) {
738 let node = self.dir.as_dir().read_file(node_id).await?;
739
740 match &*node {
741 Node::Leaf(keys) => assert_eq!(left, &keys[0]),
742 Node::Index(bounds, _) => assert_eq!(left, &bounds[0]),
743 }
744
745 assert!(self.is_valid_node(&*node).await?);
746 }
747 }
748 }
749 }
750
751 let default_range = Range::<S::Value>::default();
752 let count = self.count(&default_range).await? as usize;
753 let mut contents = Vec::with_capacity(count);
754 let mut stream = self.keys(default_range, false).await?;
755 while let Some(key) = stream.try_next().await? {
756 contents.push(key);
757 }
758
759 assert_eq!(count, contents.len());
760
761 Ok(true)
762 }
763}
764
765enum NodeRead {
766 Excluded,
767 Child(Uuid),
768 Children(SmallVec<[Uuid; NODE_STACK_SIZE]>),
769 Leaf((usize, usize)),
770}
771
772impl fmt::Debug for NodeRead {
773 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
774 match self {
775 Self::Excluded => f.write_str("(none)"),
776 Self::Child(id) => write!(f, "child {id}"),
777 Self::Children(ids) => write!(f, "children {ids:?}"),
778 Self::Leaf((start, end)) => write!(f, "leaf records [{start}..{end})"),
779 }
780 }
781}
782
783fn nodes_forward<C, V, BV, FE, G>(
784 dir: G,
785 collator: Collator<C>,
786 range: Range<BV>,
787 node_id: Uuid,
788) -> Pin<Box<dyn Future<Output = Result<Nodes<FE, V>, io::Error>> + Send>>
789where
790 C: Collate<Value = V> + Clone + Send + Sync + 'static,
791 V: Clone + PartialEq + fmt::Debug + Send + Sync + 'static,
792 BV: Borrow<V> + Clone + Send + Sync + 'static,
793 FE: AsType<Node<V>> + Send + Sync + 'static,
794 G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
795 Node<V>: FileLoad,
796{
797 #[cfg(feature = "logging")]
798 log::debug!("reading BTree keys in forward order");
799
800 let file = dir.as_dir().get_file(&node_id).expect("node").clone();
801 let fut = file.into_read().map_ok(move |node| {
802 let read = match &*node {
803 Node::Leaf(keys) if range.is_default() => NodeRead::Leaf((0, keys.len())),
804 Node::Leaf(keys) => {
805 let (l, r) = keys.bisect(&range, &collator);
806
807 if r == 0 || l == keys.len() {
808 NodeRead::Excluded
809 } else if l == r {
810 if range.contains_value(&keys[l], &collator) {
811 NodeRead::Leaf((l, l + 1))
812 } else {
813 NodeRead::Excluded
814 }
815 } else {
816 NodeRead::Leaf((l, r))
817 }
818 }
819 Node::Index(_bounds, children) if range.is_default() => {
820 debug_assert!(!children.is_empty());
821
822 if children.len() == 1 {
823 NodeRead::Child(children[0])
824 } else {
825 NodeRead::Children(SmallVec::from_slice(&children))
826 }
827 }
828 Node::Index(bounds, children) => {
829 let (l, r) = bounds.bisect(&range, &collator);
830 let l = if l == 0 { l } else { l - 1 };
831
832 if r == 0 {
833 NodeRead::Excluded
834 } else if l == children.len() {
835 NodeRead::Child(children[l - 1])
836 } else if l == r || l + 1 == r {
837 NodeRead::Child(children[l])
838 } else {
839 NodeRead::Children(SmallVec::from_slice(&children[l..r]))
840 }
841 }
842 };
843
844 #[cfg(feature = "logging")]
845 log::trace!("read {read:?}");
846
847 let nodes: Nodes<FE, V> = match read {
848 NodeRead::Excluded => {
849 let nodes = stream::empty();
850 Box::pin(nodes)
851 }
852 NodeRead::Child(node_id) => {
853 let nodes =
854 stream::once(nodes_forward(dir, collator, range, node_id)).try_flatten();
855
856 Box::pin(nodes)
857 }
858 NodeRead::Children(children) => {
859 let last_child = children.len() - 1;
860 let nodes = stream::iter(children.into_iter().enumerate())
861 .map(move |(i, node_id)| {
862 if i == 0 || i == last_child {
863 nodes_forward(dir.clone(), collator.clone(), range.clone(), node_id)
864 } else {
865 nodes_forward(
866 dir.clone(),
867 collator.clone(),
868 Range::<V>::default(),
869 node_id,
870 )
871 }
872 })
873 .buffered(2)
874 .try_flatten();
875
876 Box::pin(nodes)
877 }
878 NodeRead::Leaf(range) => {
879 let nodes = stream::once(future::ready(Ok(Leaf::new(node, range))));
880 Box::pin(nodes)
881 }
882 };
883
884 nodes
885 });
886
887 Box::pin(fut)
888}
889
890fn nodes_reverse<C, V, BV, FE, G>(
891 dir: G,
892 collator: Collator<C>,
893 range: Range<BV>,
894 node_id: Uuid,
895) -> Pin<Box<dyn Future<Output = Result<Nodes<FE, V>, io::Error>> + Send>>
896where
897 C: Collate<Value = V> + Clone + Send + Sync + 'static,
898 V: Clone + PartialEq + fmt::Debug + Send + Sync + 'static,
899 BV: Borrow<V> + Clone + Send + Sync + 'static,
900 FE: AsType<Node<V>> + Send + Sync + 'static,
901 G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
902 Node<V>: FileLoad,
903{
904 let file = dir.as_dir().get_file(&node_id).expect("node").clone();
905 let fut = file.into_read().map_ok(move |node| {
906 let read = match &*node {
907 Node::Leaf(keys) if range.is_default() => NodeRead::Leaf((0, keys.len())),
908 Node::Leaf(keys) => {
909 let (l, r) = keys.bisect(&range, &collator);
910
911 if l == keys.len() || r == 0 {
912 NodeRead::Excluded
913 } else if l == r {
914 if range.contains_value(&keys[l], &collator) {
915 NodeRead::Leaf((l, l + 1))
916 } else {
917 NodeRead::Excluded
918 }
919 } else {
920 NodeRead::Leaf((l, r))
921 }
922 }
923 Node::Index(_bounds, children) if range.is_default() => {
924 debug_assert!(!children.is_empty());
925
926 if children.len() == 1 {
927 NodeRead::Child(children[0])
928 } else {
929 NodeRead::Children(SmallVec::from_slice(&children))
930 }
931 }
932 Node::Index(bounds, children) => {
933 debug_assert!(!children.is_empty());
934
935 let (l, r) = bounds.bisect(&range, &collator);
936 let l = if l == 0 { l } else { l - 1 };
937
938 if r == 0 {
939 NodeRead::Excluded
940 } else if l == children.len() {
941 NodeRead::Child(children[l - 1])
942 } else if l == r || l + 1 == r {
943 NodeRead::Child(children[l])
944 } else {
945 NodeRead::Children(SmallVec::from_slice(&children[l..r]))
946 }
947 }
948 };
949
950 let nodes: Nodes<FE, V> = match read {
951 NodeRead::Excluded => {
952 let nodes = stream::empty();
953 Box::pin(nodes)
954 }
955 NodeRead::Child(node_id) => {
956 let nodes =
957 stream::once(nodes_reverse(dir, collator, range, node_id)).try_flatten();
958
959 Box::pin(nodes)
960 }
961 NodeRead::Children(children) => {
962 let last_child = children.len() - 1;
963 let nodes = stream::iter(children.into_iter().enumerate().rev())
964 .map(move |(i, node_id)| {
965 if i == 0 || i == last_child {
966 nodes_reverse(dir.clone(), collator.clone(), range.clone(), node_id)
967 } else {
968 nodes_reverse(
969 dir.clone(),
970 collator.clone(),
971 Range::<V>::default(),
972 node_id,
973 )
974 }
975 })
976 .buffered(2)
977 .try_flatten();
978
979 Box::pin(nodes)
980 }
981 NodeRead::Leaf(range) => {
982 let nodes = stream::once(future::ready(Ok(Leaf::new(node, range))));
983 Box::pin(nodes)
984 }
985 };
986
987 nodes
988 });
989
990 Box::pin(fut)
991}
992
993enum MergeIndexLeft<V> {
994 Borrow(Vec<V>),
995 Merge(Vec<V>),
996}
997
998enum MergeIndexRight {
999 Borrow,
1000 Merge,
1001}
1002
1003enum MergeLeafLeft<V> {
1004 Borrow(Vec<V>),
1005 Merge(Vec<V>),
1006}
1007
1008enum MergeLeafRight {
1009 Borrow,
1010 Merge,
1011}
1012
1013enum Delete<FE, V> {
1014 None,
1015 Left(Vec<V>),
1016 Right,
1017 Underflow(FileWriteGuardOwned<FE, Node<V>>),
1018}
1019
1020enum Insert<V> {
1021 None,
1022 Left(Vec<V>),
1023 Right,
1024 OverflowLeft(Vec<V>, Vec<V>, Uuid),
1025 Overflow(Vec<V>, Uuid),
1026}
1027
1028impl<S, C, FE> BTree<S, C, DirWriteGuardOwned<FE>> {
1029 pub fn downgrade(self) -> BTreeReadGuard<S, C, FE> {
1031 BTreeReadGuard {
1032 schema: self.schema,
1033 collator: self.collator,
1034 dir: Arc::new(self.dir.downgrade()),
1035 }
1036 }
1037}
1038
1039impl<S, C, FE> BTree<S, C, DirWriteGuardOwned<FE>>
1040where
1041 S: Schema + Send + Sync,
1042 C: Collate<Value = S::Value> + Send + Sync,
1043 FE: AsType<Node<S::Value>> + Send + Sync,
1044 Node<S::Value>: FileLoad,
1045{
1046 pub async fn delete<V>(&mut self, key: &[V]) -> Result<bool, io::Error>
1048 where
1049 V: Borrow<S::Value> + Send + Sync,
1050 {
1051 debug_assert!(self.dir.contains(&ROOT), "B+Tree is missing its root node");
1052 let mut root = self.dir.write_file_owned(&ROOT).await?;
1053
1054 let new_root = match &mut *root {
1055 Node::Leaf(keys) => {
1056 let i = keys.bisect_left(&key, &self.collator);
1057 if i < keys.len() && keys[i].iter().zip(key).all(|(l, r)| l == r.borrow()) {
1058 keys.remove(i);
1059 return Ok(true);
1060 } else {
1061 return Ok(false);
1062 }
1063 }
1064 Node::Index(bounds, children) => {
1065 let i = match bounds.bisect_right(&key, &self.collator) {
1066 0 => return Ok(false),
1067 i => i - 1,
1068 };
1069
1070 let node = self.dir.write_file_owned(&children[i]).await?;
1071 match self.delete_inner(node, &key).await? {
1072 Delete::None => return Ok(false),
1073 Delete::Right => return Ok(true),
1074 Delete::Left(bound) => {
1075 bounds[i] = bound;
1076 return Ok(true);
1077 }
1078 Delete::Underflow(mut node) => match &mut *node {
1079 Node::Leaf(new_keys) => {
1080 self.merge_leaf(new_keys, i, bounds, children).await?
1081 }
1082 Node::Index(new_bounds, new_children) => {
1083 self.merge_index(new_bounds, new_children, i, bounds, children)
1084 .await?
1085 }
1086 },
1087 }
1088
1089 if children.len() == 1 {
1090 bounds.pop();
1091 children.pop()
1092 } else {
1093 None
1094 }
1095 }
1096 };
1097
1098 if let Some(only_child) = new_root {
1099 let new_root = {
1100 let mut child = self.dir.write_file(&only_child).await?;
1101 match &mut *child {
1102 Node::Leaf(keys) => Node::Leaf(keys.drain(..).collect()),
1103 Node::Index(bounds, children) => {
1104 Node::Index(bounds.drain(..).collect(), children.drain(..).collect())
1105 }
1106 }
1107 };
1108
1109 self.dir.delete(&only_child).await;
1110
1111 *root = new_root;
1112 }
1113
1114 Ok(true)
1115 }
1116
1117 fn delete_inner<'a, V>(
1118 &'a mut self,
1119 mut node: FileWriteGuardOwned<FE, Node<S::Value>>,
1120 key: &'a [V],
1121 ) -> Pin<Box<dyn Future<Output = Result<Delete<FE, S::Value>, io::Error>> + Send + 'a>>
1122 where
1123 V: Borrow<S::Value> + Send + Sync,
1124 {
1125 Box::pin(async move {
1126 match &mut *node {
1127 Node::Leaf(keys) => {
1128 let i = keys.bisect_left(&key, &self.collator);
1129
1130 if i < keys.len() && keys[i].iter().zip(key).all(|(l, r)| l == r.borrow()) {
1131 keys.remove(i);
1132
1133 if keys.len() < (self.schema.order() / 2) {
1134 Ok(Delete::Underflow(node))
1135 } else if i == 0 {
1136 Ok(Delete::Left(keys[0].to_vec()))
1137 } else {
1138 Ok(Delete::Right)
1139 }
1140 } else {
1141 Ok(Delete::None)
1142 }
1143 }
1144 Node::Index(bounds, children) => {
1145 let i = match bounds.bisect_right(key, &self.collator) {
1146 0 => return Ok(Delete::None),
1147 i => i - 1,
1148 };
1149
1150 let child = self.dir.write_file_owned(&children[i]).await?;
1151 match self.delete_inner(child, key).await? {
1152 Delete::None => return Ok(Delete::None),
1153 Delete::Right => return Ok(Delete::Right),
1154 Delete::Left(bound) => {
1155 bounds[i] = bound;
1156
1157 return if i == 0 {
1158 Ok(Delete::Left(bounds[0].to_vec()))
1159 } else {
1160 Ok(Delete::Right)
1161 };
1162 }
1163 Delete::Underflow(mut node) => match &mut *node {
1164 Node::Leaf(new_keys) => {
1165 self.merge_leaf(new_keys, i, bounds, children).await?
1166 }
1167 Node::Index(new_bounds, new_children) => {
1168 self.merge_index(new_bounds, new_children, i, bounds, children)
1169 .await?
1170 }
1171 },
1172 }
1173
1174 if children.len() > (self.schema.order() / 2) {
1175 if i == 0 {
1176 Ok(Delete::Left(bounds[0].to_vec()))
1177 } else {
1178 Ok(Delete::Right)
1179 }
1180 } else {
1181 Ok(Delete::Underflow(node))
1182 }
1183 }
1184 }
1185 })
1186 }
1187
1188 fn merge_index<'a>(
1189 &'a mut self,
1190 new_bounds: &'a mut Vec<Vec<S::Value>>,
1191 new_children: &'a mut Vec<Uuid>,
1192 i: usize,
1193 bounds: &'a mut Vec<Vec<S::Value>>,
1194 children: &'a mut Vec<Uuid>,
1195 ) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send + 'a>> {
1196 Box::pin(async move {
1197 if i == 0 {
1198 match self
1199 .merge_index_left(new_bounds, new_children, &children[i + 1])
1200 .await?
1201 {
1202 MergeIndexLeft::Borrow(bound) => {
1203 bounds[i] = new_bounds[0].to_vec();
1204 bounds[i + 1] = bound;
1205 }
1206 MergeIndexLeft::Merge(bound) => {
1207 self.dir.delete(&children[0]).await;
1208 children.remove(0);
1209 bounds.remove(0);
1210 bounds[0] = bound;
1211 }
1212 }
1213 } else {
1214 match self
1215 .merge_index_right(new_bounds, new_children, &children[i - 1])
1216 .await?
1217 {
1218 MergeIndexRight::Borrow => {
1219 bounds[i] = new_bounds[0].to_vec();
1220 }
1221 MergeIndexRight::Merge => {
1222 self.dir.delete(&children[i]).await;
1223 children.remove(i);
1224 bounds.remove(i);
1225 }
1226 }
1227 }
1228
1229 Ok(())
1230 })
1231 }
1232
1233 fn merge_index_left<'a>(
1234 &'a self,
1235 left_bounds: &'a mut Vec<Vec<S::Value>>,
1236 left_children: &'a mut Vec<Uuid>,
1237 node_id: &'a Uuid,
1238 ) -> Pin<Box<dyn Future<Output = Result<MergeIndexLeft<S::Value>, io::Error>> + Send + 'a>>
1239 {
1240 Box::pin(async move {
1241 let mut node = self.dir.write_file(node_id).await?;
1242
1243 match &mut *node {
1244 Node::Leaf(_right_keys) => unreachable!("merge a leaf node with an index node"),
1245 Node::Index(right_bounds, right_children) => {
1246 if right_bounds.len() > (self.schema.order() / 2) {
1247 left_bounds.push(right_bounds.remove(0));
1248 left_children.push(right_children.remove(0));
1249 Ok(MergeIndexLeft::Borrow(right_bounds[0].to_vec()))
1250 } else {
1251 let mut new_bounds =
1252 Vec::with_capacity(left_bounds.len() + right_bounds.len());
1253
1254 new_bounds.extend(left_bounds.drain(..));
1255 new_bounds.extend(right_bounds.drain(..));
1256 *right_bounds = new_bounds;
1257
1258 let mut new_children = Vec::with_capacity(right_bounds.len());
1259
1260 new_children.extend(left_children.drain(..));
1261 new_children.extend(right_children.drain(..));
1262 *right_children = new_children;
1263
1264 Ok(MergeIndexLeft::Merge(right_bounds[0].to_vec()))
1265 }
1266 }
1267 }
1268 })
1269 }
1270
1271 fn merge_index_right<'a>(
1272 &'a self,
1273 right_bounds: &'a mut Vec<Vec<S::Value>>,
1274 right_children: &'a mut Vec<Uuid>,
1275 node_id: &'a Uuid,
1276 ) -> Pin<Box<dyn Future<Output = Result<MergeIndexRight, io::Error>> + Send + 'a>> {
1277 Box::pin(async move {
1278 let mut node = self.dir.write_file(node_id).await?;
1279
1280 match &mut *node {
1281 Node::Leaf(_left_keys) => unreachable!("merge a leaf node with an index node"),
1282 Node::Index(left_bounds, left_children) => {
1283 if left_children.len() > (self.schema.order() / 2) {
1284 let right = left_bounds.pop().expect("right");
1285 right_bounds.insert(0, right);
1286
1287 let right = left_children.pop().expect("right");
1288 right_children.insert(0, right);
1289
1290 Ok(MergeIndexRight::Borrow)
1291 } else {
1292 left_bounds.extend(right_bounds.drain(..));
1293 left_children.extend(right_children.drain(..));
1294 Ok(MergeIndexRight::Merge)
1295 }
1296 }
1297 }
1298 })
1299 }
1300
1301 fn merge_leaf<'a>(
1302 &'a mut self,
1303 new_keys: &'a mut Vec<Vec<S::Value>>,
1304 i: usize,
1305 bounds: &'a mut Vec<Vec<S::Value>>,
1306 children: &'a mut Vec<Uuid>,
1307 ) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send + 'a>> {
1308 Box::pin(async move {
1309 if i == 0 {
1310 match self.merge_leaf_left(new_keys, &children[i + 1]).await? {
1311 MergeLeafLeft::Borrow(bound) => {
1312 bounds[i] = new_keys[0].to_vec();
1313 bounds[i + 1] = bound;
1314 }
1315 MergeLeafLeft::Merge(bound) => {
1316 self.dir.delete(&children[0]).await;
1317 children.remove(0);
1318 bounds.remove(0);
1319 bounds[0] = bound;
1320 }
1321 }
1322 } else {
1323 match self.merge_leaf_right(new_keys, &children[i - 1]).await? {
1324 MergeLeafRight::Borrow => {
1325 bounds[i] = new_keys[0].to_vec();
1326 }
1327 MergeLeafRight::Merge => {
1328 self.dir.delete(&children[i]).await;
1329 children.remove(i);
1330 bounds.remove(i);
1331 }
1332 }
1333 }
1334
1335 Ok(())
1336 })
1337 }
1338
1339 fn merge_leaf_left<'a>(
1340 &'a self,
1341 left_keys: &'a mut Vec<Vec<S::Value>>,
1342 node_id: &'a Uuid,
1343 ) -> Pin<Box<dyn Future<Output = Result<MergeLeafLeft<S::Value>, io::Error>> + Send + 'a>> {
1344 Box::pin(async move {
1345 let mut node = self.dir.write_file(node_id).await?;
1346
1347 match &mut *node {
1348 Node::Leaf(right_keys) => {
1349 if right_keys.len() > (self.schema.order() / 2) {
1350 left_keys.push(right_keys.remove(0));
1351 Ok(MergeLeafLeft::Borrow(right_keys[0].to_vec()))
1352 } else {
1353 let mut new_keys = Vec::with_capacity(left_keys.len() + right_keys.len());
1354 new_keys.extend(left_keys.drain(..));
1355 new_keys.extend(right_keys.drain(..));
1356 *right_keys = new_keys;
1357
1358 Ok(MergeLeafLeft::Merge(right_keys[0].to_vec()))
1359 }
1360 }
1361 Node::Index(bounds, children) => {
1362 match self.merge_leaf_left(left_keys, &children[0]).await? {
1363 MergeLeafLeft::Borrow(left) => {
1364 bounds[0] = left.to_vec();
1365 Ok(MergeLeafLeft::Borrow(left))
1366 }
1367 MergeLeafLeft::Merge(left) => {
1368 bounds[0] = left.to_vec();
1369 Ok(MergeLeafLeft::Merge(left))
1370 }
1371 }
1372 }
1373 }
1374 })
1375 }
1376
1377 fn merge_leaf_right<'a>(
1378 &'a self,
1379 right_keys: &'a mut Vec<Vec<S::Value>>,
1380 node_id: &'a Uuid,
1381 ) -> Pin<Box<dyn Future<Output = Result<MergeLeafRight, io::Error>> + Send + 'a>> {
1382 Box::pin(async move {
1383 let mut node = self.dir.write_file(node_id).await?;
1384
1385 match &mut *node {
1386 Node::Leaf(left_keys) => {
1387 if left_keys.len() > (self.schema.order() / 2) {
1388 let right = left_keys.pop().expect("right");
1389 right_keys.insert(0, right);
1390 Ok(MergeLeafRight::Borrow)
1391 } else {
1392 left_keys.extend(right_keys.drain(..));
1393 Ok(MergeLeafRight::Merge)
1394 }
1395 }
1396 Node::Index(_bounds, _children) => unreachable!("merge with the rightmost leaf"),
1397 }
1398 })
1399 }
1400
1401 pub async fn insert(&mut self, key: Vec<S::Value>) -> Result<bool, io::Error> {
1404 let key = validate_key(&*self.schema, key)?;
1405 self.insert_root(key).await
1406 }
1407
1408 async fn insert_root(&mut self, key: Vec<S::Value>) -> Result<bool, io::Error> {
1409 let order = self.schema.order();
1410
1411 debug_assert!(self.dir.contains(&ROOT), "B+Tree is missing its root node");
1412 let mut root = self.dir.write_file_owned(&ROOT).await?;
1413
1414 let new_root = match &mut *root {
1415 Node::Leaf(keys) => {
1416 let i = keys.bisect_left(&key, &self.collator);
1417
1418 if keys.get(i) == Some(&key) {
1419 return Ok(false);
1421 }
1422
1423 keys.insert(i, key);
1424
1425 if keys.len() > order {
1426 let mid = div_ceil(order, 2);
1427 let size = self.schema.block_size() / 2;
1428
1429 let right: Vec<_> = keys.drain(mid..).collect();
1430 debug_assert!(right.len() >= mid);
1431
1432 let right_key = right[0].clone();
1433 let (right, _) = self.dir.create_file_unique(Node::Leaf(right), size)?;
1434
1435 let left: Vec<_> = keys.drain(..).collect();
1436 debug_assert!(left.len() >= mid);
1437
1438 let left_key = left[0].clone();
1439 let (left, _) = self.dir.create_file_unique(Node::Leaf(left), size)?;
1440
1441 Some(Node::Index(vec![left_key, right_key], vec![left, right]))
1442 } else {
1443 None
1444 }
1445 }
1446 Node::Index(bounds, children) => {
1447 debug_assert_eq!(bounds.len(), children.len());
1448
1449 let i = match bounds.bisect_left(&key, &self.collator) {
1450 0 => 0,
1451 i => i - 1,
1452 };
1453
1454 let mut child = self.dir.write_file_owned(&children[i]).await?;
1455 let result = self.insert_inner(&mut child, key).await?;
1456
1457 match result {
1458 Insert::None => return Ok(false),
1459 Insert::Right => {}
1460 Insert::Left(key) => {
1461 bounds[i] = key;
1462 }
1463 Insert::OverflowLeft(left, middle, child_id) => {
1464 bounds[i] = left;
1465 bounds.insert(i + 1, middle);
1466 children.insert(i + 1, child_id);
1467 }
1468 Insert::Overflow(bound, child_id) => {
1469 bounds.insert(i + 1, bound);
1470 children.insert(i + 1, child_id);
1471 }
1472 }
1473
1474 debug_assert_eq!(bounds.len(), children.len());
1476
1477 if children.len() > order {
1478 let size = self.schema.block_size() / 2;
1479 let right_bounds: Vec<_> = bounds.drain(div_ceil(order, 2)..).collect();
1480 let right_children: Vec<_> = children.drain(div_ceil(order, 2)..).collect();
1481 let right_bound = right_bounds[0].clone();
1482 let (right_node_id, _) = self
1483 .dir
1484 .create_file_unique(Node::Index(right_bounds, right_children), size)?;
1485
1486 let left_bounds: Vec<_> = bounds.drain(..).collect();
1487 let left_children: Vec<_> = children.drain(..).collect();
1488 let left_bound = left_bounds[0].clone();
1489 let (left_node_id, _) = self
1490 .dir
1491 .create_file_unique(Node::Index(left_bounds, left_children), size)?;
1492
1493 Some(Node::Index(
1494 vec![left_bound, right_bound],
1495 vec![left_node_id, right_node_id],
1496 ))
1497 } else {
1498 None
1499 }
1500 }
1501 };
1502
1503 if let Some(new_root) = new_root {
1504 *root = new_root;
1505 }
1506
1507 Ok(true)
1508 }
1509
1510 fn insert_inner<'a>(
1511 &'a mut self,
1512 node: &'a mut Node<S::Value>,
1513 key: Vec<S::Value>,
1514 ) -> Pin<Box<dyn Future<Output = Result<Insert<S::Value>, io::Error>> + Send + 'a>> {
1515 Box::pin(async move {
1516 let order = self.schema.order();
1517
1518 match node {
1519 Node::Leaf(keys) => {
1520 let i = keys.bisect_left(&key, &self.collator);
1521
1522 if i < keys.len() && keys[i] == key {
1523 return Ok(Insert::None);
1525 }
1526
1527 keys.insert(i, key);
1528
1529 let mid = order / 2;
1530
1531 if keys.len() >= order {
1532 let size = self.schema.block_size() / 2;
1533 let new_leaf: Vec<_> = keys.drain(mid..).collect();
1534
1535 debug_assert!(new_leaf.len() >= mid);
1536 debug_assert!(keys.len() >= mid);
1537
1538 let middle_key = new_leaf[0].to_vec();
1539 let node = Node::Leaf(new_leaf);
1540 let (new_node_id, _) = self.dir.create_file_unique(node, size)?;
1541
1542 if i == 0 {
1543 Ok(Insert::OverflowLeft(
1544 keys[0].to_vec(),
1545 middle_key,
1546 new_node_id,
1547 ))
1548 } else {
1549 Ok(Insert::Overflow(middle_key, new_node_id))
1550 }
1551 } else {
1552 debug_assert!(keys.len() > mid);
1553
1554 if i == 0 {
1555 Ok(Insert::Left(keys[0].to_vec()))
1556 } else {
1557 Ok(Insert::Right)
1558 }
1559 }
1560 }
1561 Node::Index(bounds, children) => {
1562 debug_assert_eq!(bounds.len(), children.len());
1563 let size = self.schema.block_size() >> 1;
1564
1565 let i = match bounds.bisect_left(&key, &self.collator) {
1566 0 => 0,
1567 i => i - 1,
1568 };
1569
1570 let mut child = self.dir.write_file_owned(&children[i]).await?;
1571
1572 let overflow_left = match self.insert_inner(&mut child, key).await? {
1573 Insert::None => return Ok(Insert::None),
1574 Insert::Right => return Ok(Insert::Right),
1575 Insert::Left(key) => {
1576 bounds[i] = key;
1577
1578 return if i == 0 {
1579 Ok(Insert::Left(bounds[i].to_vec()))
1580 } else {
1581 Ok(Insert::Right)
1582 };
1583 }
1584 Insert::OverflowLeft(left, middle, child_id) => {
1585 bounds[i] = left;
1586 bounds.insert(i + 1, middle);
1587 children.insert(i + 1, child_id);
1588 i == 0
1589 }
1590 Insert::Overflow(bound, child_id) => {
1591 bounds.insert(i + 1, bound);
1592 children.insert(i + 1, child_id);
1593 false
1594 }
1595 };
1596
1597 if children.len() > order {
1600 let mid = div_ceil(self.schema.order(), 2);
1601 let new_bounds: Vec<_> = bounds.drain(mid..).collect();
1602 let new_children: Vec<_> = children.drain(mid..).collect();
1603
1604 let left_bound = new_bounds[0].to_vec();
1605 let node = Node::Index(new_bounds, new_children);
1606 let (node_id, _) = self.dir.create_file_unique(node, size)?;
1607
1608 if overflow_left {
1609 Ok(Insert::OverflowLeft(
1610 bounds[0].to_vec(),
1611 left_bound,
1612 node_id,
1613 ))
1614 } else {
1615 Ok(Insert::Overflow(left_bound, node_id))
1616 }
1617 } else if i == 0 {
1618 Ok(Insert::Left(bounds[0].to_vec()))
1619 } else {
1620 Ok(Insert::Right)
1621 }
1622 }
1623 }
1624 })
1625 }
1626
1627 pub async fn truncate(&mut self) -> Result<(), io::Error> {
1629 self.dir.truncate().await;
1630
1631 self.dir
1632 .create_file(ROOT.to_string(), Node::Leaf(vec![]), 0)?;
1633
1634 debug_assert!(
1635 self.dir.contains(&ROOT),
1636 "B+Tree failed to re-create its root node"
1637 );
1638
1639 Ok(())
1640 }
1641}
1642
1643impl<S, C, FE> BTree<S, C, DirWriteGuardOwned<FE>>
1644where
1645 S: Schema + Send + Sync,
1646 C: Collate<Value = S::Value> + Clone + Send + Sync + 'static,
1647 FE: AsType<Node<S::Value>> + Send + Sync + 'static,
1648 Node<S::Value>: FileLoad,
1649{
1650 pub async fn merge<G>(&mut self, other: BTree<S, C, G>) -> Result<(), io::Error>
1654 where
1655 G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
1656 {
1657 validate_collator_eq(&self.collator, &other.collator)?;
1658 validate_schema_eq(&self.schema, &other.schema)?;
1659
1660 let mut keys = other.keys(Range::<S::Value>::default(), false).await?;
1661 while let Some(key) = keys.try_next().await? {
1662 self.insert_root(key.into_vec()).await?;
1663 }
1664
1665 Ok(())
1666 }
1667
1668 pub async fn delete_all<G>(&mut self, other: BTree<S, C, G>) -> Result<(), io::Error>
1672 where
1673 G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
1674 {
1675 validate_collator_eq(&self.collator, &other.collator)?;
1676 validate_schema_eq(&self.schema, &other.schema)?;
1677
1678 let mut keys = other.keys(Range::<S::Value>::default(), false).await?;
1679 while let Some(key) = keys.try_next().await? {
1680 self.delete(&key).await?;
1681 }
1682
1683 Ok(())
1684 }
1685}
1686
1687impl<S: fmt::Debug, C, G> fmt::Debug for BTree<S, C, G> {
1688 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1689 write!(f, "B+Tree index with schema {:?}", self.schema)
1690 }
1691}
1692
1693#[inline]
1694fn validate_key<S: Schema>(schema: &S, key: Vec<S::Value>) -> Result<Vec<S::Value>, io::Error> {
1695 schema
1696 .validate_key(key)
1697 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
1698}
1699
1700#[inline]
1701fn validate_collator_eq<S>(this: &S, that: &S) -> Result<(), io::Error>
1702where
1703 S: PartialEq,
1704{
1705 if this == that {
1706 Ok(())
1707 } else {
1708 Err(io::Error::new(
1709 io::ErrorKind::InvalidData,
1710 "B+Tree to merge must have the same collation",
1711 )
1712 .into())
1713 }
1714}
1715
1716#[inline]
1717fn validate_schema_eq<S>(this: &S, that: &S) -> Result<(), io::Error>
1718where
1719 S: PartialEq + fmt::Debug,
1720{
1721 if this == that {
1722 Ok(())
1723 } else {
1724 Err(io::Error::new(
1725 io::ErrorKind::InvalidData,
1726 format!(
1727 "cannot merge a B+Tree with schema {:?} into one with schema {:?}",
1728 that, this
1729 ),
1730 )
1731 .into())
1732 }
1733}
1734
1735#[inline]
1736fn div_ceil(num: usize, denom: usize) -> usize {
1737 match num % denom {
1738 0 => num / denom,
1739 _ => (num / denom) + 1,
1740 }
1741}
1742
1743#[inline]
1744fn stack_key<'a, T, A>(iter: A) -> SmallVec<[T; NODE_STACK_SIZE]>
1745where
1746 T: Clone + 'a,
1747 A: IntoIterator<Item = &'a T>,
1748{
1749 iter.into_iter().cloned().collect()
1750}