b_tree/
tree.rs

1use std::borrow::Borrow;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::{fmt, io};
5
6#[cfg(feature = "stream")]
7use async_trait::async_trait;
8use collate::{Collate, OverlapsValue};
9#[cfg(feature = "stream")]
10use destream::de;
11use freqfs::*;
12use futures::future::{self, Future, FutureExt};
13use futures::stream::{self, Stream, StreamExt, TryStreamExt};
14use futures::try_join;
15use futures::TryFutureExt;
16use safecast::AsType;
17use smallvec::SmallVec;
18use uuid::Uuid;
19
20use super::group::GroupBy;
21use super::node::Block;
22use super::range::Range;
23use super::{Collator, Key, Schema, NODE_STACK_SIZE};
24
25type NodeStack<V> = SmallVec<[Key<V>; NODE_STACK_SIZE]>;
26
27/// A read guard acquired on a [`BTreeLock`]
28pub type BTreeReadGuard<S, C, FE> = BTree<S, C, Arc<DirReadGuardOwned<FE>>>;
29
30/// A write guard acquired on a [`BTreeLock`]
31pub type BTreeWriteGuard<S, C, FE> = BTree<S, C, DirWriteGuardOwned<FE>>;
32
33/// A stream of [`Key`]s in a [`BTree`]
34pub type Keys<V> = Pin<Box<dyn Stream<Item = Result<Key<V>, io::Error>> + Send>>;
35
36type Nodes<FE, V> = Pin<Box<dyn Stream<Item = Result<Leaf<FE, V>, io::Error>> + Send>>;
37
38type Node<V> = super::node::Node<Vec<Vec<V>>>;
39
40const ROOT: Uuid = Uuid::from_fields(0, 0, 0, &[0u8; 8]);
41
42/// A read guard on the content of a leaf node
43pub struct Leaf<FE, V> {
44    node: FileReadGuardOwned<FE, Node<V>>,
45    range: (usize, usize),
46}
47
48impl<FE, V> Leaf<FE, V> {
49    fn new(node: FileReadGuardOwned<FE, Node<V>>, range: (usize, usize)) -> Self {
50        debug_assert!(node.is_leaf());
51        Self { node, range }
52    }
53}
54
55impl<FE, V> AsRef<[Vec<V>]> for Leaf<FE, V> {
56    fn as_ref(&self) -> &[Vec<V>] {
57        match &*self.node {
58            Node::Leaf(keys) => &keys[self.range.0..self.range.1],
59            _ => panic!("not a leaf!"),
60        }
61    }
62}
63
64/// A futures-aware read-write lock on a [`BTree`]
65pub struct BTreeLock<S, C, FE> {
66    schema: Arc<S>,
67    collator: Collator<C>,
68    dir: DirLock<FE>,
69}
70
71impl<S, C, FE> Clone for BTreeLock<S, C, FE>
72where
73    C: Clone,
74{
75    fn clone(&self) -> Self {
76        Self {
77            schema: self.schema.clone(),
78            collator: self.collator.clone(),
79            dir: self.dir.clone(),
80        }
81    }
82}
83
84impl<S, C, FE> BTreeLock<S, C, FE> {
85    /// Borrow the [`Collator`] used by this B+Tree.
86    pub fn collator(&self) -> &Collator<C> {
87        &self.collator
88    }
89
90    /// Borrow the schema of the source B+Tree.
91    pub fn schema(&self) -> &S {
92        &self.schema
93    }
94}
95
96impl<S, C, FE> BTreeLock<S, C, FE>
97where
98    S: Schema,
99    FE: AsType<Node<S::Value>> + Send + Sync,
100    Node<S::Value>: FileLoad,
101{
102    fn new(schema: S, collator: C, dir: DirLock<FE>) -> Self {
103        Self {
104            schema: Arc::new(schema),
105            collator: Collator::new(collator),
106            dir,
107        }
108    }
109
110    /// Create a new [`BTreeLock`] in `dir` with the given `collator`.
111    pub fn create(schema: S, collator: C, dir: DirLock<FE>) -> Result<Self, io::Error> {
112        let mut nodes = dir.try_write_owned()?;
113
114        if nodes.is_empty() {
115            nodes.create_file::<Node<S::Value>>(ROOT.to_string(), Node::Leaf(vec![]), 0)?;
116
117            debug_assert!(nodes.contains(&ROOT), "B+Tree failed to create a root node");
118
119            Ok(Self::new(schema, collator, dir))
120        } else {
121            Err(io::Error::new(
122                io::ErrorKind::AlreadyExists,
123                "creating a new B+Tree requires an empty file",
124            ))
125        }
126    }
127
128    /// Load a [`BTreeLock`] with the given `schema` and `collator` from `dir`.
129    pub fn load(schema: S, collator: C, dir: DirLock<FE>) -> Result<Self, io::Error> {
130        let mut nodes = dir.try_write_owned()?;
131
132        if !nodes.contains(&ROOT) {
133            nodes.create_file(ROOT.to_string(), Node::Leaf(vec![]), 0)?;
134        }
135
136        debug_assert!(nodes.contains(&ROOT), "B+Tree failed to create a root node");
137
138        Ok(Self::new(schema, collator, dir))
139    }
140
141    /// Write any modified blocks of thie [`BTree`] to the filesystem.
142    pub async fn sync(&self) -> Result<(), io::Error>
143    where
144        FE: for<'a> freqfs::FileSave<'a>,
145    {
146        self.dir.sync().await
147    }
148}
149
150impl<S, C, FE> BTreeLock<S, C, FE>
151where
152    C: Clone,
153    FE: Send + Sync,
154{
155    /// Lock this B+Tree for reading, without borrowing.
156    pub async fn into_read(self) -> BTreeReadGuard<S, C, FE> {
157        #[cfg(feature = "logging")]
158        log::debug!("lock B+Tree at {:?} for reading...", self.dir);
159
160        self.dir
161            .into_read()
162            .map(Arc::new)
163            .map(|dir| BTree {
164                schema: self.schema.clone(),
165                collator: self.collator.clone(),
166                dir,
167            })
168            .await
169    }
170
171    /// Lock this B+Tree for reading.
172    pub async fn read(&self) -> BTreeReadGuard<S, C, FE> {
173        #[cfg(feature = "logging")]
174        log::debug!("lock B+Tree at {:?} for reading...", self.dir);
175
176        self.dir
177            .read_owned()
178            .map(Arc::new)
179            .map(|dir| BTree {
180                schema: self.schema.clone(),
181                collator: self.collator.clone(),
182                dir,
183            })
184            .await
185    }
186
187    /// Lock this B+Tree for reading synchronously, if possible.
188    pub fn try_read(&self) -> Result<BTreeReadGuard<S, C, FE>, io::Error> {
189        self.dir.try_read_owned().map(Arc::new).map(|dir| BTree {
190            schema: self.schema.clone(),
191            collator: self.collator.clone(),
192            dir,
193        })
194    }
195
196    /// Lock this B+Tree for writing, without borrowing.
197    pub async fn into_write(self) -> BTreeWriteGuard<S, C, FE> {
198        #[cfg(feature = "logging")]
199        log::debug!("lock B+Tree at {:?} for writing...", self.dir);
200
201        self.dir
202            .into_write()
203            .map(|dir| BTree {
204                schema: self.schema.clone(),
205                collator: self.collator.clone(),
206                dir,
207            })
208            .await
209    }
210
211    /// Lock this B+Tree for writing.
212    pub async fn write(&self) -> BTreeWriteGuard<S, C, FE> {
213        #[cfg(feature = "logging")]
214        log::debug!("lock B+Tree at {:?} for writing...", self.dir);
215
216        self.dir
217            .write_owned()
218            .map(|dir| BTree {
219                schema: self.schema.clone(),
220                collator: self.collator.clone(),
221                dir,
222            })
223            .await
224    }
225
226    /// Lock this B+Tree for writing synchronously, if possible.
227    pub fn try_write(&self) -> Result<BTreeWriteGuard<S, C, FE>, io::Error> {
228        self.dir.try_write_owned().map(|dir| BTree {
229            schema: self.schema.clone(),
230            collator: self.collator.clone(),
231            dir,
232        })
233    }
234}
235
236#[cfg(feature = "stream")]
237struct BTreeVisitor<S, C, FE> {
238    btree: BTreeLock<S, C, FE>,
239}
240
241#[cfg(feature = "stream")]
242#[async_trait]
243impl<S, C, FE> de::Visitor for BTreeVisitor<S, C, FE>
244where
245    S: Schema + Send + Sync,
246    C: Collate<Value = S::Value> + Clone + Send + Sync,
247    FE: AsType<Node<S::Value>> + Send + Sync,
248    S::Value: de::FromStream<Context = ()>,
249    Node<S::Value>: FileLoad,
250{
251    type Value = BTreeLock<S, C, FE>;
252
253    fn expecting() -> &'static str {
254        "a BTree"
255    }
256
257    async fn visit_seq<A: de::SeqAccess>(self, mut seq: A) -> Result<Self::Value, A::Error> {
258        let mut btree = self.btree.write().await;
259
260        while let Some(key) = seq.next_element(()).map_err(de::Error::custom).await? {
261            btree.insert(key).map_err(de::Error::custom).await?;
262        }
263
264        Ok(self.btree)
265    }
266}
267
268#[cfg(feature = "stream")]
269#[async_trait]
270impl<S, C, FE> de::FromStream for BTreeLock<S, C, FE>
271where
272    S: Schema + Send + Sync,
273    C: Collate<Value = S::Value> + Clone + Send + Sync,
274    FE: AsType<Node<S::Value>> + Send + Sync,
275    S::Value: de::FromStream<Context = ()>,
276    Node<S::Value>: FileLoad,
277{
278    type Context = (S, C, DirLock<FE>);
279
280    async fn from_stream<D: de::Decoder>(
281        context: Self::Context,
282        decoder: &mut D,
283    ) -> Result<Self, D::Error> {
284        let (schema, collator, dir) = context;
285        let btree = BTreeLock::create(schema, collator, dir).map_err(de::Error::custom)?;
286        decoder.decode_seq(BTreeVisitor { btree }).await
287    }
288}
289
290/// A B+Tree
291pub struct BTree<S, C, G> {
292    schema: Arc<S>,
293    collator: Collator<C>,
294    dir: G,
295}
296
297impl<S, C, G> Clone for BTree<S, C, G>
298where
299    C: Clone,
300    G: Clone,
301{
302    fn clone(&self) -> Self {
303        Self {
304            schema: self.schema.clone(),
305            collator: self.collator.clone(),
306            dir: self.dir.clone(),
307        }
308    }
309}
310
311impl<S, C, G> BTree<S, C, G>
312where
313    S: Schema,
314    C: Collate<Value = S::Value>,
315{
316    /// Borrow the [`Schema`] of this [`BTree`].
317    pub fn schema(&self) -> &S {
318        &self.schema
319    }
320}
321
322impl<S, C, FE, G> BTree<S, C, G>
323where
324    S: Schema,
325    C: Collate<Value = S::Value>,
326    FE: AsType<Node<S::Value>> + Send + Sync,
327    G: DirDeref<Entry = FE>,
328    Node<S::Value>: FileLoad + fmt::Debug,
329{
330    /// Return `true` if this B+Tree contains the given `key`.
331    pub async fn contains(&self, key: &[S::Value]) -> Result<bool, io::Error> {
332        debug_assert!(
333            self.dir.as_dir().contains(&ROOT),
334            "B+Tree is missing its root node"
335        );
336
337        let mut node = self.dir.as_dir().read_file(&ROOT).await?;
338
339        loop {
340            match &*node {
341                Node::Leaf(keys) => {
342                    let i = keys.bisect_left(&key, &self.collator);
343
344                    break Ok(if i < keys.len() {
345                        match keys.get(i) {
346                            Some(present) => present == key,
347                            _ => false,
348                        }
349                    } else {
350                        false
351                    });
352                }
353                Node::Index(bounds, children) => {
354                    let i = bounds.bisect_right(key, &self.collator);
355
356                    if i == 0 {
357                        return Ok(false);
358                    } else {
359                        node = self.dir.as_dir().read_file(&children[i - 1]).await?;
360                    }
361                }
362            }
363        }
364    }
365
366    /// Count how many keys lie within the given `range` of this B+Tree.
367    pub async fn count<BV>(&self, range: &Range<BV>) -> Result<u64, io::Error>
368    where
369        BV: Borrow<S::Value>,
370    {
371        debug_assert!(
372            self.dir.as_dir().contains(&ROOT),
373            "B+Tree is missing its root node"
374        );
375        let root = self.dir.as_dir().read_file(&ROOT).await?;
376        self.count_inner(range, root).await
377    }
378
379    fn count_inner<'a, BV>(
380        &'a self,
381        range: &'a Range<BV>,
382        node: FileReadGuard<'a, Node<S::Value>>,
383    ) -> Pin<Box<dyn Future<Output = Result<u64, io::Error>> + 'a>>
384    where
385        BV: Borrow<S::Value> + 'a,
386    {
387        Box::pin(async move {
388            match &*node {
389                Node::Leaf(keys) if range.is_default() => Ok(keys.len() as u64),
390                Node::Leaf(keys) => {
391                    let (l, r) = keys.bisect(&range, &self.collator);
392
393                    if l == keys.len() {
394                        Ok(0)
395                    } else if l == r {
396                        if range.contains_value(&keys[l], &self.collator) {
397                            Ok(1)
398                        } else {
399                            Ok(0)
400                        }
401                    } else {
402                        Ok((r - l) as u64)
403                    }
404                }
405                Node::Index(_bounds, children) if range.is_default() => {
406                    stream::iter(children)
407                        .then(|node_id| self.dir.as_dir().read_file(node_id))
408                        .map_ok(|node| self.count_inner(range, node))
409                        .try_buffer_unordered(num_cpus::get())
410                        .try_fold(0, |sum, count| future::ready(Ok(sum + count)))
411                        .await
412                }
413                Node::Index(bounds, children) => {
414                    let (l, r) = bounds.bisect(&range, &self.collator);
415                    let l = if l == 0 { l } else { l - 1 };
416
417                    if l == children.len() {
418                        let node = self
419                            .dir
420                            .as_dir()
421                            .read_file(children.last().expect("last"))
422                            .await?;
423
424                        self.count_inner(range, node).await
425                    } else if l == r || l + 1 == r {
426                        let node = self.dir.as_dir().read_file(&children[l]).await?;
427                        self.count_inner(range, node).await
428                    } else {
429                        let left = self
430                            .dir
431                            .as_dir()
432                            .read_file(&children[l])
433                            .and_then(|node| self.count_inner(range, node));
434
435                        let default_range = Range::<S::Value>::default();
436
437                        let middle = stream::iter(&children[(l + 1)..(r - 1)])
438                            .then(|node_id| self.dir.as_dir().read_file(node_id))
439                            .map_ok(|node| self.count_inner(&default_range, node))
440                            .try_buffer_unordered(num_cpus::get())
441                            .try_fold(0, |sum, count| future::ready(Ok(sum + count)));
442
443                        let right = self
444                            .dir
445                            .as_dir()
446                            .read_file(&children[r - 1])
447                            .and_then(|node| self.count_inner(range, node));
448
449                        let (left, middle, right) = try_join!(left, middle, right)?;
450
451                        Ok(left + middle + right)
452                    }
453                }
454            }
455        })
456    }
457
458    /// Return the first key in this B+Tree within the given `range`, if any.
459    pub async fn first<BV>(&self, range: Range<BV>) -> Result<Option<Key<S::Value>>, io::Error>
460    where
461        BV: Borrow<S::Value>,
462    {
463        debug_assert!(
464            self.dir.as_dir().contains(&ROOT),
465            "B+Tree is missing its root node"
466        );
467
468        let mut node = self.dir.as_dir().read_file(&ROOT).await?;
469
470        if let Node::Leaf(keys) = &*node {
471            if keys.is_empty() {
472                return Ok(None);
473            }
474        }
475
476        Ok(loop {
477            match &*node {
478                Node::Leaf(keys) => {
479                    let (l, _r) = keys.bisect(&range, &self.collator);
480
481                    break if l == keys.len() {
482                        None
483                    } else if range.contains_value(&keys[l], &self.collator) {
484                        Some(stack_key(&keys[l]))
485                    } else {
486                        None
487                    };
488                }
489                Node::Index(bounds, children) => {
490                    let (l, _r) = bounds.bisect(&range, &self.collator);
491
492                    if l == bounds.len() {
493                        node = self
494                            .dir
495                            .as_dir()
496                            .read_file(children.last().expect("last"))
497                            .await?;
498                    } else if range.contains_value(&bounds[l], &self.collator) {
499                        break Some(stack_key(&bounds[l]));
500                    } else {
501                        node = self.dir.as_dir().read_file(&children[l]).await?;
502                    }
503                }
504            }
505        })
506    }
507
508    /// Return the last key in this B+Tree with the given `prefix`, if any.
509    pub async fn last<BV>(&self, range: Range<BV>) -> Result<Option<Key<S::Value>>, io::Error>
510    where
511        BV: Borrow<S::Value>,
512    {
513        debug_assert!(
514            self.dir.as_dir().contains(&ROOT),
515            "B+Tree is missing its root node"
516        );
517
518        let mut node = self.dir.as_dir().read_file(&ROOT).await?;
519
520        if let Node::Leaf(keys) = &*node {
521            if keys.is_empty() {
522                return Ok(None);
523            }
524        }
525
526        Ok(loop {
527            match &*node {
528                Node::Leaf(keys) => {
529                    let (_l, r) = keys.bisect(&range, &self.collator);
530
531                    break if r == keys.len() {
532                        if range.contains_value(&keys[r - 1], &self.collator) {
533                            Some(stack_key(&keys[r - 1]))
534                        } else {
535                            None
536                        }
537                    } else if range.contains_value(&keys[r], &self.collator) {
538                        Some(stack_key(&keys[r]))
539                    } else {
540                        None
541                    };
542                }
543                Node::Index(bounds, children) => {
544                    let (_l, r) = bounds.bisect(&range, &self.collator);
545
546                    if r == 0 {
547                        break None;
548                    } else {
549                        node = self.dir.as_dir().read_file(&children[r - 1]).await?;
550                    }
551                }
552            }
553        })
554    }
555
556    /// Return `true` if the given `range` of this B+Tree contains zero keys.
557    pub async fn is_empty<R: Borrow<Range<S::Value>>>(&self, range: R) -> Result<bool, io::Error> {
558        let range = range.borrow();
559
560        debug_assert!(
561            self.dir.as_dir().contains(&ROOT),
562            "B+Tree is missing its root node"
563        );
564
565        let mut node = self.dir.as_dir().read_file(&ROOT).await?;
566
567        Ok(loop {
568            match &*node {
569                Node::Leaf(keys) => {
570                    let (l, r) = keys.bisect(range, &self.collator);
571                    break l == r;
572                }
573                Node::Index(bounds, children) => {
574                    let (l, r) = bounds.bisect(range, &self.collator);
575
576                    if l == children.len() {
577                        node = self.dir.as_dir().read_file(&children[l - 1]).await?;
578                    } else if l == r {
579                        node = self.dir.as_dir().read_file(&children[l]).await?;
580                    } else {
581                        break false;
582                    }
583                }
584            }
585        })
586    }
587
588    #[cfg(debug_assertions)]
589    fn is_valid_node<'a>(
590        &'a self,
591        node: &'a Node<S::Value>,
592    ) -> Pin<Box<dyn Future<Output = Result<bool, io::Error>> + 'a>> {
593        Box::pin(async move {
594            let order = self.schema.order();
595
596            match &*node {
597                Node::Leaf(keys) => {
598                    assert!(keys.len() >= (order / 2) - 1);
599                    assert!(keys.len() < order);
600                }
601                Node::Index(bounds, children) => {
602                    assert_eq!(bounds.len(), children.len());
603                    assert!(children.len() >= self.schema.order() / 2);
604                    assert!(children.len() <= order);
605
606                    for (left, node_id) in bounds.iter().zip(children) {
607                        let node = self.dir.as_dir().read_file(node_id).await?;
608
609                        match &*node {
610                            Node::Leaf(keys) => assert_eq!(left, &keys[0]),
611                            Node::Index(bounds, _) => assert_eq!(left, &bounds[0]),
612                        }
613
614                        assert!(self.is_valid_node(&*node).await?);
615                    }
616                }
617            }
618
619            Ok(true)
620        })
621    }
622}
623
624impl<S, C, FE, G> BTree<S, C, G>
625where
626    S: Schema,
627    C: Collate<Value = S::Value> + Clone + Send + Sync + 'static,
628    FE: AsType<Node<S::Value>> + Send + Sync + 'static,
629    G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
630    Node<S::Value>: FileLoad + fmt::Debug,
631{
632    /// Construct a [`Stream`] of all the keys in the given `range` of this B+Tree.
633    pub async fn keys<BV>(
634        self,
635        range: Range<BV>,
636        reverse: bool,
637    ) -> Result<Keys<S::Value>, io::Error>
638    where
639        BV: Borrow<S::Value> + Clone + Send + Sync + 'static,
640    {
641        debug_assert!(
642            self.dir.as_dir().contains(&ROOT),
643            "B+Tree is missing its root node"
644        );
645
646        if reverse {
647            let nodes = nodes_reverse(self.dir, self.collator, range, ROOT).await?;
648
649            let keys = nodes
650                .map_ok(|leaf| async move {
651                    let mut keys = NodeStack::with_capacity(leaf.as_ref().len());
652
653                    for key in leaf.as_ref().into_iter().rev() {
654                        keys.push(key.into_iter().cloned().collect());
655                    }
656
657                    Ok(stream::iter(keys).map(Ok))
658                })
659                .try_buffered(num_cpus::get())
660                .try_flatten();
661
662            Ok(Box::pin(keys))
663        } else {
664            let nodes = nodes_forward(self.dir, self.collator, range, ROOT).await?;
665
666            let keys = nodes
667                .map_ok(|leaf| async move {
668                    let mut keys = NodeStack::with_capacity(leaf.as_ref().len());
669
670                    for key in leaf.as_ref() {
671                        keys.push(key.into_iter().cloned().collect());
672                    }
673
674                    Ok(stream::iter(keys).map(Ok))
675                })
676                .try_buffered(num_cpus::get())
677                .try_flatten();
678
679            Ok(Box::pin(keys))
680        }
681    }
682
683    /// Construct a [`Stream`] of unique length-`n` prefixes within the given `range`.
684    pub async fn groups<BV>(
685        self,
686        range: Range<BV>,
687        n: usize,
688        reverse: bool,
689    ) -> Result<Keys<S::Value>, io::Error>
690    where
691        BV: Borrow<S::Value> + Clone + Send + Sync + 'static,
692    {
693        if n <= self.schema.len() {
694            let collator = self.collator.clone();
695
696            debug_assert!(
697                self.dir.as_dir().contains(&ROOT),
698                "B+Tree is missing its root node"
699            );
700
701            let nodes = if reverse {
702                nodes_reverse(self.dir, self.collator, range, ROOT).await?
703            } else {
704                nodes_forward(self.dir, self.collator, range, ROOT).await?
705            };
706
707            let groups = GroupBy::new(collator, nodes, n, reverse);
708            Ok(Box::pin(groups))
709        } else {
710            Err(io::Error::new(
711                io::ErrorKind::InvalidInput,
712                format!(
713                    "a table with {} columns does not have prefix groups of length {n}",
714                    self.schema.len()
715                ),
716            ))
717        }
718    }
719
720    #[cfg(debug_assertions)]
721    pub async fn is_valid(self) -> Result<bool, io::Error> {
722        {
723            debug_assert!(
724                self.dir.as_dir().contains(&ROOT),
725                "B+Tree is missing its root node"
726            );
727
728            let root = self.dir.as_dir().read_file(&ROOT).await?;
729
730            match &*root {
731                Node::Leaf(keys) => {
732                    assert!(keys.len() <= self.schema.order());
733                }
734                Node::Index(bounds, children) => {
735                    assert_eq!(bounds.len(), children.len());
736
737                    for (left, node_id) in bounds.iter().zip(children) {
738                        let node = self.dir.as_dir().read_file(node_id).await?;
739
740                        match &*node {
741                            Node::Leaf(keys) => assert_eq!(left, &keys[0]),
742                            Node::Index(bounds, _) => assert_eq!(left, &bounds[0]),
743                        }
744
745                        assert!(self.is_valid_node(&*node).await?);
746                    }
747                }
748            }
749        }
750
751        let default_range = Range::<S::Value>::default();
752        let count = self.count(&default_range).await? as usize;
753        let mut contents = Vec::with_capacity(count);
754        let mut stream = self.keys(default_range, false).await?;
755        while let Some(key) = stream.try_next().await? {
756            contents.push(key);
757        }
758
759        assert_eq!(count, contents.len());
760
761        Ok(true)
762    }
763}
764
765enum NodeRead {
766    Excluded,
767    Child(Uuid),
768    Children(SmallVec<[Uuid; NODE_STACK_SIZE]>),
769    Leaf((usize, usize)),
770}
771
772impl fmt::Debug for NodeRead {
773    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
774        match self {
775            Self::Excluded => f.write_str("(none)"),
776            Self::Child(id) => write!(f, "child {id}"),
777            Self::Children(ids) => write!(f, "children {ids:?}"),
778            Self::Leaf((start, end)) => write!(f, "leaf records [{start}..{end})"),
779        }
780    }
781}
782
783fn nodes_forward<C, V, BV, FE, G>(
784    dir: G,
785    collator: Collator<C>,
786    range: Range<BV>,
787    node_id: Uuid,
788) -> Pin<Box<dyn Future<Output = Result<Nodes<FE, V>, io::Error>> + Send>>
789where
790    C: Collate<Value = V> + Clone + Send + Sync + 'static,
791    V: Clone + PartialEq + fmt::Debug + Send + Sync + 'static,
792    BV: Borrow<V> + Clone + Send + Sync + 'static,
793    FE: AsType<Node<V>> + Send + Sync + 'static,
794    G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
795    Node<V>: FileLoad,
796{
797    #[cfg(feature = "logging")]
798    log::debug!("reading BTree keys in forward order");
799
800    let file = dir.as_dir().get_file(&node_id).expect("node").clone();
801    let fut = file.into_read().map_ok(move |node| {
802        let read = match &*node {
803            Node::Leaf(keys) if range.is_default() => NodeRead::Leaf((0, keys.len())),
804            Node::Leaf(keys) => {
805                let (l, r) = keys.bisect(&range, &collator);
806
807                if r == 0 || l == keys.len() {
808                    NodeRead::Excluded
809                } else if l == r {
810                    if range.contains_value(&keys[l], &collator) {
811                        NodeRead::Leaf((l, l + 1))
812                    } else {
813                        NodeRead::Excluded
814                    }
815                } else {
816                    NodeRead::Leaf((l, r))
817                }
818            }
819            Node::Index(_bounds, children) if range.is_default() => {
820                debug_assert!(!children.is_empty());
821
822                if children.len() == 1 {
823                    NodeRead::Child(children[0])
824                } else {
825                    NodeRead::Children(SmallVec::from_slice(&children))
826                }
827            }
828            Node::Index(bounds, children) => {
829                let (l, r) = bounds.bisect(&range, &collator);
830                let l = if l == 0 { l } else { l - 1 };
831
832                if r == 0 {
833                    NodeRead::Excluded
834                } else if l == children.len() {
835                    NodeRead::Child(children[l - 1])
836                } else if l == r || l + 1 == r {
837                    NodeRead::Child(children[l])
838                } else {
839                    NodeRead::Children(SmallVec::from_slice(&children[l..r]))
840                }
841            }
842        };
843
844        #[cfg(feature = "logging")]
845        log::trace!("read {read:?}");
846
847        let nodes: Nodes<FE, V> = match read {
848            NodeRead::Excluded => {
849                let nodes = stream::empty();
850                Box::pin(nodes)
851            }
852            NodeRead::Child(node_id) => {
853                let nodes =
854                    stream::once(nodes_forward(dir, collator, range, node_id)).try_flatten();
855
856                Box::pin(nodes)
857            }
858            NodeRead::Children(children) => {
859                let last_child = children.len() - 1;
860                let nodes = stream::iter(children.into_iter().enumerate())
861                    .map(move |(i, node_id)| {
862                        if i == 0 || i == last_child {
863                            nodes_forward(dir.clone(), collator.clone(), range.clone(), node_id)
864                        } else {
865                            nodes_forward(
866                                dir.clone(),
867                                collator.clone(),
868                                Range::<V>::default(),
869                                node_id,
870                            )
871                        }
872                    })
873                    .buffered(2)
874                    .try_flatten();
875
876                Box::pin(nodes)
877            }
878            NodeRead::Leaf(range) => {
879                let nodes = stream::once(future::ready(Ok(Leaf::new(node, range))));
880                Box::pin(nodes)
881            }
882        };
883
884        nodes
885    });
886
887    Box::pin(fut)
888}
889
890fn nodes_reverse<C, V, BV, FE, G>(
891    dir: G,
892    collator: Collator<C>,
893    range: Range<BV>,
894    node_id: Uuid,
895) -> Pin<Box<dyn Future<Output = Result<Nodes<FE, V>, io::Error>> + Send>>
896where
897    C: Collate<Value = V> + Clone + Send + Sync + 'static,
898    V: Clone + PartialEq + fmt::Debug + Send + Sync + 'static,
899    BV: Borrow<V> + Clone + Send + Sync + 'static,
900    FE: AsType<Node<V>> + Send + Sync + 'static,
901    G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
902    Node<V>: FileLoad,
903{
904    let file = dir.as_dir().get_file(&node_id).expect("node").clone();
905    let fut = file.into_read().map_ok(move |node| {
906        let read = match &*node {
907            Node::Leaf(keys) if range.is_default() => NodeRead::Leaf((0, keys.len())),
908            Node::Leaf(keys) => {
909                let (l, r) = keys.bisect(&range, &collator);
910
911                if l == keys.len() || r == 0 {
912                    NodeRead::Excluded
913                } else if l == r {
914                    if range.contains_value(&keys[l], &collator) {
915                        NodeRead::Leaf((l, l + 1))
916                    } else {
917                        NodeRead::Excluded
918                    }
919                } else {
920                    NodeRead::Leaf((l, r))
921                }
922            }
923            Node::Index(_bounds, children) if range.is_default() => {
924                debug_assert!(!children.is_empty());
925
926                if children.len() == 1 {
927                    NodeRead::Child(children[0])
928                } else {
929                    NodeRead::Children(SmallVec::from_slice(&children))
930                }
931            }
932            Node::Index(bounds, children) => {
933                debug_assert!(!children.is_empty());
934
935                let (l, r) = bounds.bisect(&range, &collator);
936                let l = if l == 0 { l } else { l - 1 };
937
938                if r == 0 {
939                    NodeRead::Excluded
940                } else if l == children.len() {
941                    NodeRead::Child(children[l - 1])
942                } else if l == r || l + 1 == r {
943                    NodeRead::Child(children[l])
944                } else {
945                    NodeRead::Children(SmallVec::from_slice(&children[l..r]))
946                }
947            }
948        };
949
950        let nodes: Nodes<FE, V> = match read {
951            NodeRead::Excluded => {
952                let nodes = stream::empty();
953                Box::pin(nodes)
954            }
955            NodeRead::Child(node_id) => {
956                let nodes =
957                    stream::once(nodes_reverse(dir, collator, range, node_id)).try_flatten();
958
959                Box::pin(nodes)
960            }
961            NodeRead::Children(children) => {
962                let last_child = children.len() - 1;
963                let nodes = stream::iter(children.into_iter().enumerate().rev())
964                    .map(move |(i, node_id)| {
965                        if i == 0 || i == last_child {
966                            nodes_reverse(dir.clone(), collator.clone(), range.clone(), node_id)
967                        } else {
968                            nodes_reverse(
969                                dir.clone(),
970                                collator.clone(),
971                                Range::<V>::default(),
972                                node_id,
973                            )
974                        }
975                    })
976                    .buffered(2)
977                    .try_flatten();
978
979                Box::pin(nodes)
980            }
981            NodeRead::Leaf(range) => {
982                let nodes = stream::once(future::ready(Ok(Leaf::new(node, range))));
983                Box::pin(nodes)
984            }
985        };
986
987        nodes
988    });
989
990    Box::pin(fut)
991}
992
993enum MergeIndexLeft<V> {
994    Borrow(Vec<V>),
995    Merge(Vec<V>),
996}
997
998enum MergeIndexRight {
999    Borrow,
1000    Merge,
1001}
1002
1003enum MergeLeafLeft<V> {
1004    Borrow(Vec<V>),
1005    Merge(Vec<V>),
1006}
1007
1008enum MergeLeafRight {
1009    Borrow,
1010    Merge,
1011}
1012
1013enum Delete<FE, V> {
1014    None,
1015    Left(Vec<V>),
1016    Right,
1017    Underflow(FileWriteGuardOwned<FE, Node<V>>),
1018}
1019
1020enum Insert<V> {
1021    None,
1022    Left(Vec<V>),
1023    Right,
1024    OverflowLeft(Vec<V>, Vec<V>, Uuid),
1025    Overflow(Vec<V>, Uuid),
1026}
1027
1028impl<S, C, FE> BTree<S, C, DirWriteGuardOwned<FE>> {
1029    /// Downgrade this [`BTreeWriteGuard`] into a [`BTreeReadGuard`].
1030    pub fn downgrade(self) -> BTreeReadGuard<S, C, FE> {
1031        BTreeReadGuard {
1032            schema: self.schema,
1033            collator: self.collator,
1034            dir: Arc::new(self.dir.downgrade()),
1035        }
1036    }
1037}
1038
1039impl<S, C, FE> BTree<S, C, DirWriteGuardOwned<FE>>
1040where
1041    S: Schema + Send + Sync,
1042    C: Collate<Value = S::Value> + Send + Sync,
1043    FE: AsType<Node<S::Value>> + Send + Sync,
1044    Node<S::Value>: FileLoad,
1045{
1046    /// Delete the given `key` from this B+Tree.
1047    pub async fn delete<V>(&mut self, key: &[V]) -> Result<bool, io::Error>
1048    where
1049        V: Borrow<S::Value> + Send + Sync,
1050    {
1051        debug_assert!(self.dir.contains(&ROOT), "B+Tree is missing its root node");
1052        let mut root = self.dir.write_file_owned(&ROOT).await?;
1053
1054        let new_root = match &mut *root {
1055            Node::Leaf(keys) => {
1056                let i = keys.bisect_left(&key, &self.collator);
1057                if i < keys.len() && keys[i].iter().zip(key).all(|(l, r)| l == r.borrow()) {
1058                    keys.remove(i);
1059                    return Ok(true);
1060                } else {
1061                    return Ok(false);
1062                }
1063            }
1064            Node::Index(bounds, children) => {
1065                let i = match bounds.bisect_right(&key, &self.collator) {
1066                    0 => return Ok(false),
1067                    i => i - 1,
1068                };
1069
1070                let node = self.dir.write_file_owned(&children[i]).await?;
1071                match self.delete_inner(node, &key).await? {
1072                    Delete::None => return Ok(false),
1073                    Delete::Right => return Ok(true),
1074                    Delete::Left(bound) => {
1075                        bounds[i] = bound;
1076                        return Ok(true);
1077                    }
1078                    Delete::Underflow(mut node) => match &mut *node {
1079                        Node::Leaf(new_keys) => {
1080                            self.merge_leaf(new_keys, i, bounds, children).await?
1081                        }
1082                        Node::Index(new_bounds, new_children) => {
1083                            self.merge_index(new_bounds, new_children, i, bounds, children)
1084                                .await?
1085                        }
1086                    },
1087                }
1088
1089                if children.len() == 1 {
1090                    bounds.pop();
1091                    children.pop()
1092                } else {
1093                    None
1094                }
1095            }
1096        };
1097
1098        if let Some(only_child) = new_root {
1099            let new_root = {
1100                let mut child = self.dir.write_file(&only_child).await?;
1101                match &mut *child {
1102                    Node::Leaf(keys) => Node::Leaf(keys.drain(..).collect()),
1103                    Node::Index(bounds, children) => {
1104                        Node::Index(bounds.drain(..).collect(), children.drain(..).collect())
1105                    }
1106                }
1107            };
1108
1109            self.dir.delete(&only_child).await;
1110
1111            *root = new_root;
1112        }
1113
1114        Ok(true)
1115    }
1116
1117    fn delete_inner<'a, V>(
1118        &'a mut self,
1119        mut node: FileWriteGuardOwned<FE, Node<S::Value>>,
1120        key: &'a [V],
1121    ) -> Pin<Box<dyn Future<Output = Result<Delete<FE, S::Value>, io::Error>> + Send + 'a>>
1122    where
1123        V: Borrow<S::Value> + Send + Sync,
1124    {
1125        Box::pin(async move {
1126            match &mut *node {
1127                Node::Leaf(keys) => {
1128                    let i = keys.bisect_left(&key, &self.collator);
1129
1130                    if i < keys.len() && keys[i].iter().zip(key).all(|(l, r)| l == r.borrow()) {
1131                        keys.remove(i);
1132
1133                        if keys.len() < (self.schema.order() / 2) {
1134                            Ok(Delete::Underflow(node))
1135                        } else if i == 0 {
1136                            Ok(Delete::Left(keys[0].to_vec()))
1137                        } else {
1138                            Ok(Delete::Right)
1139                        }
1140                    } else {
1141                        Ok(Delete::None)
1142                    }
1143                }
1144                Node::Index(bounds, children) => {
1145                    let i = match bounds.bisect_right(key, &self.collator) {
1146                        0 => return Ok(Delete::None),
1147                        i => i - 1,
1148                    };
1149
1150                    let child = self.dir.write_file_owned(&children[i]).await?;
1151                    match self.delete_inner(child, key).await? {
1152                        Delete::None => return Ok(Delete::None),
1153                        Delete::Right => return Ok(Delete::Right),
1154                        Delete::Left(bound) => {
1155                            bounds[i] = bound;
1156
1157                            return if i == 0 {
1158                                Ok(Delete::Left(bounds[0].to_vec()))
1159                            } else {
1160                                Ok(Delete::Right)
1161                            };
1162                        }
1163                        Delete::Underflow(mut node) => match &mut *node {
1164                            Node::Leaf(new_keys) => {
1165                                self.merge_leaf(new_keys, i, bounds, children).await?
1166                            }
1167                            Node::Index(new_bounds, new_children) => {
1168                                self.merge_index(new_bounds, new_children, i, bounds, children)
1169                                    .await?
1170                            }
1171                        },
1172                    }
1173
1174                    if children.len() > (self.schema.order() / 2) {
1175                        if i == 0 {
1176                            Ok(Delete::Left(bounds[0].to_vec()))
1177                        } else {
1178                            Ok(Delete::Right)
1179                        }
1180                    } else {
1181                        Ok(Delete::Underflow(node))
1182                    }
1183                }
1184            }
1185        })
1186    }
1187
1188    fn merge_index<'a>(
1189        &'a mut self,
1190        new_bounds: &'a mut Vec<Vec<S::Value>>,
1191        new_children: &'a mut Vec<Uuid>,
1192        i: usize,
1193        bounds: &'a mut Vec<Vec<S::Value>>,
1194        children: &'a mut Vec<Uuid>,
1195    ) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send + 'a>> {
1196        Box::pin(async move {
1197            if i == 0 {
1198                match self
1199                    .merge_index_left(new_bounds, new_children, &children[i + 1])
1200                    .await?
1201                {
1202                    MergeIndexLeft::Borrow(bound) => {
1203                        bounds[i] = new_bounds[0].to_vec();
1204                        bounds[i + 1] = bound;
1205                    }
1206                    MergeIndexLeft::Merge(bound) => {
1207                        self.dir.delete(&children[0]).await;
1208                        children.remove(0);
1209                        bounds.remove(0);
1210                        bounds[0] = bound;
1211                    }
1212                }
1213            } else {
1214                match self
1215                    .merge_index_right(new_bounds, new_children, &children[i - 1])
1216                    .await?
1217                {
1218                    MergeIndexRight::Borrow => {
1219                        bounds[i] = new_bounds[0].to_vec();
1220                    }
1221                    MergeIndexRight::Merge => {
1222                        self.dir.delete(&children[i]).await;
1223                        children.remove(i);
1224                        bounds.remove(i);
1225                    }
1226                }
1227            }
1228
1229            Ok(())
1230        })
1231    }
1232
1233    fn merge_index_left<'a>(
1234        &'a self,
1235        left_bounds: &'a mut Vec<Vec<S::Value>>,
1236        left_children: &'a mut Vec<Uuid>,
1237        node_id: &'a Uuid,
1238    ) -> Pin<Box<dyn Future<Output = Result<MergeIndexLeft<S::Value>, io::Error>> + Send + 'a>>
1239    {
1240        Box::pin(async move {
1241            let mut node = self.dir.write_file(node_id).await?;
1242
1243            match &mut *node {
1244                Node::Leaf(_right_keys) => unreachable!("merge a leaf node with an index node"),
1245                Node::Index(right_bounds, right_children) => {
1246                    if right_bounds.len() > (self.schema.order() / 2) {
1247                        left_bounds.push(right_bounds.remove(0));
1248                        left_children.push(right_children.remove(0));
1249                        Ok(MergeIndexLeft::Borrow(right_bounds[0].to_vec()))
1250                    } else {
1251                        let mut new_bounds =
1252                            Vec::with_capacity(left_bounds.len() + right_bounds.len());
1253
1254                        new_bounds.extend(left_bounds.drain(..));
1255                        new_bounds.extend(right_bounds.drain(..));
1256                        *right_bounds = new_bounds;
1257
1258                        let mut new_children = Vec::with_capacity(right_bounds.len());
1259
1260                        new_children.extend(left_children.drain(..));
1261                        new_children.extend(right_children.drain(..));
1262                        *right_children = new_children;
1263
1264                        Ok(MergeIndexLeft::Merge(right_bounds[0].to_vec()))
1265                    }
1266                }
1267            }
1268        })
1269    }
1270
1271    fn merge_index_right<'a>(
1272        &'a self,
1273        right_bounds: &'a mut Vec<Vec<S::Value>>,
1274        right_children: &'a mut Vec<Uuid>,
1275        node_id: &'a Uuid,
1276    ) -> Pin<Box<dyn Future<Output = Result<MergeIndexRight, io::Error>> + Send + 'a>> {
1277        Box::pin(async move {
1278            let mut node = self.dir.write_file(node_id).await?;
1279
1280            match &mut *node {
1281                Node::Leaf(_left_keys) => unreachable!("merge a leaf node with an index node"),
1282                Node::Index(left_bounds, left_children) => {
1283                    if left_children.len() > (self.schema.order() / 2) {
1284                        let right = left_bounds.pop().expect("right");
1285                        right_bounds.insert(0, right);
1286
1287                        let right = left_children.pop().expect("right");
1288                        right_children.insert(0, right);
1289
1290                        Ok(MergeIndexRight::Borrow)
1291                    } else {
1292                        left_bounds.extend(right_bounds.drain(..));
1293                        left_children.extend(right_children.drain(..));
1294                        Ok(MergeIndexRight::Merge)
1295                    }
1296                }
1297            }
1298        })
1299    }
1300
1301    fn merge_leaf<'a>(
1302        &'a mut self,
1303        new_keys: &'a mut Vec<Vec<S::Value>>,
1304        i: usize,
1305        bounds: &'a mut Vec<Vec<S::Value>>,
1306        children: &'a mut Vec<Uuid>,
1307    ) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send + 'a>> {
1308        Box::pin(async move {
1309            if i == 0 {
1310                match self.merge_leaf_left(new_keys, &children[i + 1]).await? {
1311                    MergeLeafLeft::Borrow(bound) => {
1312                        bounds[i] = new_keys[0].to_vec();
1313                        bounds[i + 1] = bound;
1314                    }
1315                    MergeLeafLeft::Merge(bound) => {
1316                        self.dir.delete(&children[0]).await;
1317                        children.remove(0);
1318                        bounds.remove(0);
1319                        bounds[0] = bound;
1320                    }
1321                }
1322            } else {
1323                match self.merge_leaf_right(new_keys, &children[i - 1]).await? {
1324                    MergeLeafRight::Borrow => {
1325                        bounds[i] = new_keys[0].to_vec();
1326                    }
1327                    MergeLeafRight::Merge => {
1328                        self.dir.delete(&children[i]).await;
1329                        children.remove(i);
1330                        bounds.remove(i);
1331                    }
1332                }
1333            }
1334
1335            Ok(())
1336        })
1337    }
1338
1339    fn merge_leaf_left<'a>(
1340        &'a self,
1341        left_keys: &'a mut Vec<Vec<S::Value>>,
1342        node_id: &'a Uuid,
1343    ) -> Pin<Box<dyn Future<Output = Result<MergeLeafLeft<S::Value>, io::Error>> + Send + 'a>> {
1344        Box::pin(async move {
1345            let mut node = self.dir.write_file(node_id).await?;
1346
1347            match &mut *node {
1348                Node::Leaf(right_keys) => {
1349                    if right_keys.len() > (self.schema.order() / 2) {
1350                        left_keys.push(right_keys.remove(0));
1351                        Ok(MergeLeafLeft::Borrow(right_keys[0].to_vec()))
1352                    } else {
1353                        let mut new_keys = Vec::with_capacity(left_keys.len() + right_keys.len());
1354                        new_keys.extend(left_keys.drain(..));
1355                        new_keys.extend(right_keys.drain(..));
1356                        *right_keys = new_keys;
1357
1358                        Ok(MergeLeafLeft::Merge(right_keys[0].to_vec()))
1359                    }
1360                }
1361                Node::Index(bounds, children) => {
1362                    match self.merge_leaf_left(left_keys, &children[0]).await? {
1363                        MergeLeafLeft::Borrow(left) => {
1364                            bounds[0] = left.to_vec();
1365                            Ok(MergeLeafLeft::Borrow(left))
1366                        }
1367                        MergeLeafLeft::Merge(left) => {
1368                            bounds[0] = left.to_vec();
1369                            Ok(MergeLeafLeft::Merge(left))
1370                        }
1371                    }
1372                }
1373            }
1374        })
1375    }
1376
1377    fn merge_leaf_right<'a>(
1378        &'a self,
1379        right_keys: &'a mut Vec<Vec<S::Value>>,
1380        node_id: &'a Uuid,
1381    ) -> Pin<Box<dyn Future<Output = Result<MergeLeafRight, io::Error>> + Send + 'a>> {
1382        Box::pin(async move {
1383            let mut node = self.dir.write_file(node_id).await?;
1384
1385            match &mut *node {
1386                Node::Leaf(left_keys) => {
1387                    if left_keys.len() > (self.schema.order() / 2) {
1388                        let right = left_keys.pop().expect("right");
1389                        right_keys.insert(0, right);
1390                        Ok(MergeLeafRight::Borrow)
1391                    } else {
1392                        left_keys.extend(right_keys.drain(..));
1393                        Ok(MergeLeafRight::Merge)
1394                    }
1395                }
1396                Node::Index(_bounds, _children) => unreachable!("merge with the rightmost leaf"),
1397            }
1398        })
1399    }
1400
1401    /// Insert the given `key` into this B+Tree.
1402    /// Return `false` if te given `key` is already present.
1403    pub async fn insert(&mut self, key: Vec<S::Value>) -> Result<bool, io::Error> {
1404        let key = validate_key(&*self.schema, key)?;
1405        self.insert_root(key).await
1406    }
1407
1408    async fn insert_root(&mut self, key: Vec<S::Value>) -> Result<bool, io::Error> {
1409        let order = self.schema.order();
1410
1411        debug_assert!(self.dir.contains(&ROOT), "B+Tree is missing its root node");
1412        let mut root = self.dir.write_file_owned(&ROOT).await?;
1413
1414        let new_root = match &mut *root {
1415            Node::Leaf(keys) => {
1416                let i = keys.bisect_left(&key, &self.collator);
1417
1418                if keys.get(i) == Some(&key) {
1419                    // no-op
1420                    return Ok(false);
1421                }
1422
1423                keys.insert(i, key);
1424
1425                if keys.len() > order {
1426                    let mid = div_ceil(order, 2);
1427                    let size = self.schema.block_size() / 2;
1428
1429                    let right: Vec<_> = keys.drain(mid..).collect();
1430                    debug_assert!(right.len() >= mid);
1431
1432                    let right_key = right[0].clone();
1433                    let (right, _) = self.dir.create_file_unique(Node::Leaf(right), size)?;
1434
1435                    let left: Vec<_> = keys.drain(..).collect();
1436                    debug_assert!(left.len() >= mid);
1437
1438                    let left_key = left[0].clone();
1439                    let (left, _) = self.dir.create_file_unique(Node::Leaf(left), size)?;
1440
1441                    Some(Node::Index(vec![left_key, right_key], vec![left, right]))
1442                } else {
1443                    None
1444                }
1445            }
1446            Node::Index(bounds, children) => {
1447                debug_assert_eq!(bounds.len(), children.len());
1448
1449                let i = match bounds.bisect_left(&key, &self.collator) {
1450                    0 => 0,
1451                    i => i - 1,
1452                };
1453
1454                let mut child = self.dir.write_file_owned(&children[i]).await?;
1455                let result = self.insert_inner(&mut child, key).await?;
1456
1457                match result {
1458                    Insert::None => return Ok(false),
1459                    Insert::Right => {}
1460                    Insert::Left(key) => {
1461                        bounds[i] = key;
1462                    }
1463                    Insert::OverflowLeft(left, middle, child_id) => {
1464                        bounds[i] = left;
1465                        bounds.insert(i + 1, middle);
1466                        children.insert(i + 1, child_id);
1467                    }
1468                    Insert::Overflow(bound, child_id) => {
1469                        bounds.insert(i + 1, bound);
1470                        children.insert(i + 1, child_id);
1471                    }
1472                }
1473
1474                // debug_assert!(self.collator.is_sorted(&bounds));
1475                debug_assert_eq!(bounds.len(), children.len());
1476
1477                if children.len() > order {
1478                    let size = self.schema.block_size() / 2;
1479                    let right_bounds: Vec<_> = bounds.drain(div_ceil(order, 2)..).collect();
1480                    let right_children: Vec<_> = children.drain(div_ceil(order, 2)..).collect();
1481                    let right_bound = right_bounds[0].clone();
1482                    let (right_node_id, _) = self
1483                        .dir
1484                        .create_file_unique(Node::Index(right_bounds, right_children), size)?;
1485
1486                    let left_bounds: Vec<_> = bounds.drain(..).collect();
1487                    let left_children: Vec<_> = children.drain(..).collect();
1488                    let left_bound = left_bounds[0].clone();
1489                    let (left_node_id, _) = self
1490                        .dir
1491                        .create_file_unique(Node::Index(left_bounds, left_children), size)?;
1492
1493                    Some(Node::Index(
1494                        vec![left_bound, right_bound],
1495                        vec![left_node_id, right_node_id],
1496                    ))
1497                } else {
1498                    None
1499                }
1500            }
1501        };
1502
1503        if let Some(new_root) = new_root {
1504            *root = new_root;
1505        }
1506
1507        Ok(true)
1508    }
1509
1510    fn insert_inner<'a>(
1511        &'a mut self,
1512        node: &'a mut Node<S::Value>,
1513        key: Vec<S::Value>,
1514    ) -> Pin<Box<dyn Future<Output = Result<Insert<S::Value>, io::Error>> + Send + 'a>> {
1515        Box::pin(async move {
1516            let order = self.schema.order();
1517
1518            match node {
1519                Node::Leaf(keys) => {
1520                    let i = keys.bisect_left(&key, &self.collator);
1521
1522                    if i < keys.len() && keys[i] == key {
1523                        // no-op
1524                        return Ok(Insert::None);
1525                    }
1526
1527                    keys.insert(i, key);
1528
1529                    let mid = order / 2;
1530
1531                    if keys.len() >= order {
1532                        let size = self.schema.block_size() / 2;
1533                        let new_leaf: Vec<_> = keys.drain(mid..).collect();
1534
1535                        debug_assert!(new_leaf.len() >= mid);
1536                        debug_assert!(keys.len() >= mid);
1537
1538                        let middle_key = new_leaf[0].to_vec();
1539                        let node = Node::Leaf(new_leaf);
1540                        let (new_node_id, _) = self.dir.create_file_unique(node, size)?;
1541
1542                        if i == 0 {
1543                            Ok(Insert::OverflowLeft(
1544                                keys[0].to_vec(),
1545                                middle_key,
1546                                new_node_id,
1547                            ))
1548                        } else {
1549                            Ok(Insert::Overflow(middle_key, new_node_id))
1550                        }
1551                    } else {
1552                        debug_assert!(keys.len() > mid);
1553
1554                        if i == 0 {
1555                            Ok(Insert::Left(keys[0].to_vec()))
1556                        } else {
1557                            Ok(Insert::Right)
1558                        }
1559                    }
1560                }
1561                Node::Index(bounds, children) => {
1562                    debug_assert_eq!(bounds.len(), children.len());
1563                    let size = self.schema.block_size() >> 1;
1564
1565                    let i = match bounds.bisect_left(&key, &self.collator) {
1566                        0 => 0,
1567                        i => i - 1,
1568                    };
1569
1570                    let mut child = self.dir.write_file_owned(&children[i]).await?;
1571
1572                    let overflow_left = match self.insert_inner(&mut child, key).await? {
1573                        Insert::None => return Ok(Insert::None),
1574                        Insert::Right => return Ok(Insert::Right),
1575                        Insert::Left(key) => {
1576                            bounds[i] = key;
1577
1578                            return if i == 0 {
1579                                Ok(Insert::Left(bounds[i].to_vec()))
1580                            } else {
1581                                Ok(Insert::Right)
1582                            };
1583                        }
1584                        Insert::OverflowLeft(left, middle, child_id) => {
1585                            bounds[i] = left;
1586                            bounds.insert(i + 1, middle);
1587                            children.insert(i + 1, child_id);
1588                            i == 0
1589                        }
1590                        Insert::Overflow(bound, child_id) => {
1591                            bounds.insert(i + 1, bound);
1592                            children.insert(i + 1, child_id);
1593                            false
1594                        }
1595                    };
1596
1597                    // debug_assert!(self.collator.is_sorted(bounds));
1598
1599                    if children.len() > order {
1600                        let mid = div_ceil(self.schema.order(), 2);
1601                        let new_bounds: Vec<_> = bounds.drain(mid..).collect();
1602                        let new_children: Vec<_> = children.drain(mid..).collect();
1603
1604                        let left_bound = new_bounds[0].to_vec();
1605                        let node = Node::Index(new_bounds, new_children);
1606                        let (node_id, _) = self.dir.create_file_unique(node, size)?;
1607
1608                        if overflow_left {
1609                            Ok(Insert::OverflowLeft(
1610                                bounds[0].to_vec(),
1611                                left_bound,
1612                                node_id,
1613                            ))
1614                        } else {
1615                            Ok(Insert::Overflow(left_bound, node_id))
1616                        }
1617                    } else if i == 0 {
1618                        Ok(Insert::Left(bounds[0].to_vec()))
1619                    } else {
1620                        Ok(Insert::Right)
1621                    }
1622                }
1623            }
1624        })
1625    }
1626
1627    /// Delete all the keys in this [`BTree`].
1628    pub async fn truncate(&mut self) -> Result<(), io::Error> {
1629        self.dir.truncate().await;
1630
1631        self.dir
1632            .create_file(ROOT.to_string(), Node::Leaf(vec![]), 0)?;
1633
1634        debug_assert!(
1635            self.dir.contains(&ROOT),
1636            "B+Tree failed to re-create its root node"
1637        );
1638
1639        Ok(())
1640    }
1641}
1642
1643impl<S, C, FE> BTree<S, C, DirWriteGuardOwned<FE>>
1644where
1645    S: Schema + Send + Sync,
1646    C: Collate<Value = S::Value> + Clone + Send + Sync + 'static,
1647    FE: AsType<Node<S::Value>> + Send + Sync + 'static,
1648    Node<S::Value>: FileLoad,
1649{
1650    /// Merge the keys from the `other` B+Tree range into this one.
1651    ///
1652    /// The source B+Tree **must** have an identical schema and collation.
1653    pub async fn merge<G>(&mut self, other: BTree<S, C, G>) -> Result<(), io::Error>
1654    where
1655        G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
1656    {
1657        validate_collator_eq(&self.collator, &other.collator)?;
1658        validate_schema_eq(&self.schema, &other.schema)?;
1659
1660        let mut keys = other.keys(Range::<S::Value>::default(), false).await?;
1661        while let Some(key) = keys.try_next().await? {
1662            self.insert_root(key.into_vec()).await?;
1663        }
1664
1665        Ok(())
1666    }
1667
1668    /// Delete the keys in the `other` B+Tree from this one.
1669    ///
1670    /// The source B+Tree **must** have an identical schema and collation.
1671    pub async fn delete_all<G>(&mut self, other: BTree<S, C, G>) -> Result<(), io::Error>
1672    where
1673        G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
1674    {
1675        validate_collator_eq(&self.collator, &other.collator)?;
1676        validate_schema_eq(&self.schema, &other.schema)?;
1677
1678        let mut keys = other.keys(Range::<S::Value>::default(), false).await?;
1679        while let Some(key) = keys.try_next().await? {
1680            self.delete(&key).await?;
1681        }
1682
1683        Ok(())
1684    }
1685}
1686
1687impl<S: fmt::Debug, C, G> fmt::Debug for BTree<S, C, G> {
1688    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1689        write!(f, "B+Tree index with schema {:?}", self.schema)
1690    }
1691}
1692
1693#[inline]
1694fn validate_key<S: Schema>(schema: &S, key: Vec<S::Value>) -> Result<Vec<S::Value>, io::Error> {
1695    schema
1696        .validate_key(key)
1697        .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
1698}
1699
1700#[inline]
1701fn validate_collator_eq<S>(this: &S, that: &S) -> Result<(), io::Error>
1702where
1703    S: PartialEq,
1704{
1705    if this == that {
1706        Ok(())
1707    } else {
1708        Err(io::Error::new(
1709            io::ErrorKind::InvalidData,
1710            "B+Tree to merge must have the same collation",
1711        )
1712        .into())
1713    }
1714}
1715
1716#[inline]
1717fn validate_schema_eq<S>(this: &S, that: &S) -> Result<(), io::Error>
1718where
1719    S: PartialEq + fmt::Debug,
1720{
1721    if this == that {
1722        Ok(())
1723    } else {
1724        Err(io::Error::new(
1725            io::ErrorKind::InvalidData,
1726            format!(
1727                "cannot merge a B+Tree with schema {:?} into one with schema {:?}",
1728                that, this
1729            ),
1730        )
1731        .into())
1732    }
1733}
1734
1735#[inline]
1736fn div_ceil(num: usize, denom: usize) -> usize {
1737    match num % denom {
1738        0 => num / denom,
1739        _ => (num / denom) + 1,
1740    }
1741}
1742
1743#[inline]
1744fn stack_key<'a, T, A>(iter: A) -> SmallVec<[T; NODE_STACK_SIZE]>
1745where
1746    T: Clone + 'a,
1747    A: IntoIterator<Item = &'a T>,
1748{
1749    iter.into_iter().cloned().collect()
1750}