baildon/btree/
baildon.rs

1//! B+Tree implementation
2//!
3//! This is the main data structure exposed by the library.
4//!
5
6use std::collections::HashMap;
7use std::fmt::Display;
8use std::io::ErrorKind;
9use std::ops::ControlFlow;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::Ordering;
12use std::sync::atomic::{AtomicBool, AtomicUsize};
13
14use anyhow::Result;
15use futures::StreamExt;
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use strum::EnumString;
19use thiserror::Error;
20use tokio::io;
21use tokio::sync::{Mutex, MutexGuard};
22
23use super::node::Node;
24use super::sparse::BuildIdentityHasher;
25use crate::command::Command;
26use crate::io::file::BTreeFile;
27use crate::io::wal::WalFile;
28
29/// When accessing tree contents serially, ascending or descending order.
30#[derive(Clone, Copy, Debug, EnumString, PartialEq)]
31#[strum(ascii_case_insensitive)]
32pub enum Direction {
33    /// Process in ascending order.
34    Ascending,
35    /// Process in descending order.
36    Descending,
37}
38
39const BAILDON_FILE_SIZE: u64 = 512_000;
40
41/// Keys which we wish to store in a Baildon tree.
42pub trait BaildonKey: Clone + Ord + Serialize + DeserializeOwned + std::fmt::Debug {}
43
44// Blanket implementation which satisfies the compiler
45impl<K> BaildonKey for K
46where
47    K: Clone + Ord + Serialize + DeserializeOwned + std::fmt::Debug,
48{
49    // Nothing to implement, since A already supports the other traits.
50    // It has the functions it needs already
51}
52
53/// Values which we wish to store in a Baildon tree.
54pub trait BaildonValue: Clone + Serialize + DeserializeOwned + std::fmt::Debug {}
55
56// Blanket implementation which satisfies the compiler
57impl<V> BaildonValue for V
58where
59    V: Clone + Serialize + DeserializeOwned + std::fmt::Debug,
60{
61    // Nothing to implement, since A already supports the other traits.
62    // It has the functions it needs already
63}
64
65/// Baildon specific errors.
66#[derive(Error, Debug)]
67pub enum BaildonError {
68    /// Supplied branching factor too small
69    #[error("branch: {0} must be >=2")]
70    BranchTooSmall(u64),
71
72    /// Could not find a node's child
73    #[error("could not find child for node with index: {0}")]
74    LostChild(usize),
75
76    /// Could not find a node's parent
77    #[error("could not find parent for node with index: {0}")]
78    LostParent(usize),
79}
80
81/// A B+Tree.
82pub struct Baildon<K, V>
83// Constraints are required because Drop is implemented
84where
85    K: BaildonKey + Send + Sync,
86    V: BaildonValue + Send + Sync,
87{
88    file: Mutex<BTreeFile>,
89    path: PathBuf,
90    root: Mutex<usize>,
91    pub(crate) nodes: Mutex<HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
92    branch: u64,
93    pub(crate) index: AtomicUsize,
94    wal: Mutex<WalFile>,
95}
96
97impl<K, V> Baildon<K, V>
98where
99    K: BaildonKey + Send + Sync,
100    V: BaildonValue + Send + Sync,
101{
102    /// Create a new store at the specified path with the specified branching factor.
103    pub async fn try_new<P: AsRef<Path>>(origin: P, branch: u64) -> Result<Self> {
104        if branch < 2 {
105            return Err(BaildonError::BranchTooSmall(branch).into());
106        }
107        let path: &Path = origin.as_ref();
108
109        tracing::info!("Creating B+Tree at: {}", path.display());
110
111        let mut file = BTreeFile::try_new(path, BAILDON_FILE_SIZE).await?;
112
113        let root = Node::<K, V>::root(branch);
114
115        let s_root = root.serialize()?;
116
117        file.write_data(1, &s_root).await?;
118
119        let mut nodes: HashMap<_, _, BuildIdentityHasher> = HashMap::default();
120        nodes.insert(1, root);
121
122        // If we can't create a new WalFile, we should fail because we might be trying to create a
123        // store over a failed WAL. That will require manual clean up first.
124        let mut wal_path = PathBuf::new();
125        wal_path.push(origin.as_ref());
126        wal_path.set_extension("wal");
127        let wal = WalFile::try_new(&wal_path).await?;
128
129        let this = Self {
130            file: Mutex::new(file),
131            path: path.into(),
132            root: Mutex::new(1),
133            nodes: Mutex::new(nodes),
134            branch,
135            index: AtomicUsize::new(2),
136            wal: Mutex::new(wal),
137        };
138        this.inner_flush_to_disk(false).await?;
139        Ok(this)
140    }
141
142    /// Open an exisiting store at the specified path.
143    pub async fn try_open<P: AsRef<Path>>(origin: P) -> Result<Self> {
144        let path: &Path = origin.as_ref();
145
146        tracing::info!("Opening B+Tree at: {}", path.display());
147
148        let mut file = BTreeFile::try_open(path).await?;
149
150        let index = AtomicUsize::new(file.get_tree_index().await);
151
152        let buf = file.read_data(file.get_root_index().await).await?;
153        let root: Node<K, V> = Node::<K, V>::deserialize(&buf)?;
154        let branch = root.branch();
155
156        let mut nodes: HashMap<_, _, BuildIdentityHasher> = HashMap::default();
157        let idx = root.index();
158        nodes.insert(root.index(), root);
159
160        // If we can open a WalFile, then we should replay it before allowing the open to complete
161        // If not, last shutdown was fine, so create a new WalFile
162        let mut wal_path = PathBuf::new();
163        wal_path.push(origin.as_ref());
164        wal_path.set_extension("wal");
165        let mut recover = false;
166        let wal = match WalFile::try_open(&wal_path).await {
167            Ok(wal) => {
168                recover = true;
169                wal
170            }
171            Err(err) => {
172                // If the error is NotFound, we can ignore the error since this is the happy path
173                if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
174                    if io_error.kind() != ErrorKind::NotFound {
175                        return Err(err);
176                    }
177                } else {
178                    return Err(err);
179                }
180                WalFile::try_new(&wal_path).await?
181            }
182        };
183
184        let this = Self {
185            file: Mutex::new(file),
186            path: path.into(),
187            root: Mutex::new(idx),
188            nodes: Mutex::new(nodes),
189            branch,
190            index,
191            wal: Mutex::new(wal),
192        };
193
194        if recover {
195            let mut wal = this.wal.lock().await;
196
197            // Process wal file
198            tracing::info!("Recovering from wal...");
199            loop {
200                match wal.read_data().await {
201                    Ok(data) => {
202                        let cmd: Command<K, V> = Command::deserialize(&data)?;
203                        match cmd {
204                            Command::Upsert(key, value) => {
205                                // We don't care about the updated value, so ignore the
206                                // function result
207                                let _ = this.inner_insert(key, value).await;
208                            }
209                            Command::Delete(key) => {
210                                // We don't care about the deleted value, so ignore the
211                                // function result
212                                let _ = this.inner_delete(&key).await;
213                            }
214                        }
215                    }
216                    Err(e) => {
217                        // XXX This is perhaps a bit sketchy...
218                        if let Some(down_e) = e.downcast_ref::<io::Error>() {
219                            if down_e.kind() == io::ErrorKind::UnexpectedEof {
220                                std::fs::remove_file(&wal_path)?;
221                                *wal = WalFile::try_new(&wal_path).await?;
222                                break;
223                            }
224                        }
225                        tracing::info!("Recovering failed, data read error: {e:?}");
226                        return Err(e);
227                    }
228                }
229            }
230            tracing::info!("Recovered!");
231        }
232        Ok(this)
233    }
234
235    /// Clear our tree.
236    pub async fn clear(&self) -> Result<()> {
237        let mut file_lock = self.file.lock().await;
238        file_lock.reset(BAILDON_FILE_SIZE).await?;
239
240        // Can't fail from here
241        let mut nodes_lock = self.nodes.lock().await;
242        nodes_lock.clear();
243        self.index.store(1, Ordering::SeqCst);
244        let root = Node::<K, V>::root(self.branch);
245        self.add_node(&mut nodes_lock, root).await;
246        let mut root_lock = self.root.lock().await;
247        *root_lock = 1;
248        Ok(())
249    }
250
251    /// Does the tree contain this key?
252    pub async fn contains(&self, key: &K) -> bool {
253        let mut nodes_lock = self.nodes.lock().await;
254        let node = match self.search_node_with_lock(&mut nodes_lock, key).await {
255            Ok(v) => v,
256            Err(_) => return false,
257        };
258        node.key_index(key).is_some()
259    }
260
261    /// Return count of entries.
262    pub async fn count(&self) -> usize {
263        let count = AtomicUsize::new(0);
264        let callback = |node: &Node<K, V>| {
265            count.fetch_add(node.len(), Ordering::SeqCst);
266            ControlFlow::Continue(())
267        };
268        self.traverse_leaf_nodes(Direction::Ascending, callback)
269            .await;
270        count.load(Ordering::SeqCst)
271    }
272
273    /// Delete a Key and return an optional previous Value.
274    pub async fn delete(&self, key: &K) -> Result<Option<V>, anyhow::Error> {
275        let cmd: Command<K, V> = Command::Delete(key.clone());
276        let s_cmd = cmd.serialize()?;
277        let mut wal_lock = self.wal.lock().await;
278        wal_lock.write_data(&s_cmd).await?;
279        self.inner_delete(key).await
280    }
281
282    async fn inner_delete(&self, key: &K) -> Result<Option<V>> {
283        let mut nodes_lock = self.nodes.lock().await;
284
285        let mut node = self.search_node_with_lock(&mut nodes_lock, key).await?;
286
287        // REMEMBER if search_node() finds a node, we still need to confirm
288        // that our node contains the key we are looking for.
289        if node.key_index(key).is_none() {
290            return Ok(None);
291        }
292
293        // XXX: This will return None if the key can't be found. Arguably, that's not quite the correct
294        // logic, but correct enough for now.
295        let value = node.remove_value(key);
296
297        loop {
298            if !node.is_minimum() {
299                break;
300            }
301            // Process this node
302            // Try to find a donor node
303            let (neighbour_opt, direction) = match self
304                .neighbour_same_parent_with_lock(
305                    &mut nodes_lock,
306                    node.index(),
307                    Direction::Ascending,
308                )
309                .await
310            {
311                Some(n) => (Some(n), Direction::Ascending),
312                None => (
313                    self.neighbour_same_parent_with_lock(
314                        &mut nodes_lock,
315                        node.index(),
316                        Direction::Descending,
317                    )
318                    .await,
319                    Direction::Descending,
320                ),
321            };
322            // Process this node
323            match neighbour_opt {
324                Some(mut neighbour) => {
325                    // If our neighbour isn't at minimum we can simply take a k/v pair
326                    if !neighbour.is_minimum() {
327                        let p_idx = node
328                            .parent()
329                            .ok_or(BaildonError::LostParent(node.index()))?;
330                        // Taking a key involves complex parent updates for both node and neighbour
331                        // If taking from the Ascending:
332                        //  - Take the first pair from the neighbour
333                        //  - Push those onto our node
334                        //  - Update our parent node entry key
335                        // If taking from the Descending:
336                        //  - Take the last pair from the neighbour
337                        //  - Prepend those onto our node
338                        //  - Update our parent neighbour entry key
339
340                        // Update the neighbour
341                        assert_eq!(neighbour.parent(), node.parent());
342                        match &mut neighbour {
343                            Node::Internal(data) => {
344                                let (tgt_idx, (k, child)) = if direction == Direction::Ascending {
345                                    (node.index(), (data.remove_pair(0)))
346                                } else {
347                                    (data.index(), (data.remove_pair(data.len() - 1)))
348                                };
349
350                                // Update our node
351                                node.set_child(&k, child);
352
353                                // Update the child (set its parent)
354                                let closure = |child: &mut Node<K, V>| {
355                                    child.set_parent(Some(node.index()));
356                                    None
357                                };
358                                self.update_node(&mut nodes_lock, child, closure).await;
359
360                                // Update the parent:
361                                self.update_node(
362                                    &mut nodes_lock,
363                                    p_idx,
364                                    |parent: &mut Node<K, V>| {
365                                        parent.update_child_key(tgt_idx, k);
366                                        None
367                                    },
368                                )
369                                .await;
370                            }
371                            Node::Leaf(data) => {
372                                let (tgt_idx, (k, value)) = if direction == Direction::Ascending {
373                                    (node.index(), (data.remove_pair(0)))
374                                } else {
375                                    (data.index(), (data.remove_pair(data.len() - 1)))
376                                };
377
378                                // Update our node
379                                node.set_value(&k, value);
380
381                                // Update the parent:
382                                self.update_node(
383                                    &mut nodes_lock,
384                                    p_idx,
385                                    |parent: &mut Node<K, V>| {
386                                        parent.update_child_key(tgt_idx, k);
387                                        None
388                                    },
389                                )
390                                .await;
391                            }
392                        }
393                        // Replace our modified neighbour
394                        self.replace_node(&mut nodes_lock, neighbour);
395                    } else {
396                        // We need to merge our neighbour
397                        assert_ne!(neighbour.index(), node.index());
398                        assert_eq!(neighbour.parent(), node.parent());
399                        // Before we merge nodes, we need to make sure that every child has
400                        // the same parent
401                        if let Node::Internal(data) = &neighbour {
402                            let p_idx = Some(node.index());
403                            let closure_update_parent = move |child: &mut Node<K, V>| {
404                                child.set_parent(p_idx);
405                                None
406                            };
407                            for child in data.children() {
408                                let _ = self
409                                    .update_node(&mut nodes_lock, child, closure_update_parent)
410                                    .await;
411                            }
412                        }
413                        // Capture various useful bits of data before the merge
414                        let neighbour_idx = neighbour.index();
415                        let neighbour_max_key = neighbour.max_key().clone();
416                        node.merge(neighbour);
417                        // Update our parent
418                        // We (may) need to adjust our parent to clean out our neighbour
419                        let update_root = AtomicBool::new(false);
420                        let closure_cleanup_parent = |parent: &mut Node<K, V>| {
421                            // If we merged to the ascending:
422                            //  - We take the max key from the neighbour and update our node key to
423                            //    that value
424                            //  - Remove the neighbour
425                            // If we merged to the descending:
426                            //  - Remove the neighbour
427                            if direction == Direction::Ascending {
428                                let _idx = parent.remove_child(neighbour_idx)?;
429                                parent.update_child_key(node.index(), neighbour_max_key);
430                            } else {
431                                let _idx = parent.remove_child(neighbour_idx)?;
432                            }
433
434                            if parent.len() == 1 && parent.parent().is_none() {
435                                assert_eq!(parent.len(), 1);
436                                assert_eq!(
437                                    parent.children().next().expect("HAS A 0"),
438                                    node.index()
439                                );
440                                update_root.store(true, Ordering::SeqCst);
441                            }
442                            None
443                        };
444                        let p_idx = node
445                            .parent()
446                            .ok_or(BaildonError::LostParent(node.index()))?;
447                        let _ = self
448                            .update_node(&mut nodes_lock, p_idx, closure_cleanup_parent)
449                            .await;
450                        // Remove the lost node
451                        nodes_lock.remove(&neighbour_idx);
452                        // WE ARE VERY CAREFUL TO ONLY HOLD THE FILE LOCK BRIEFLY HERE
453                        let mut file_lock = self.file.lock().await;
454                        file_lock.free_data(neighbour_idx)?;
455
456                        // Check if we need to update our root
457                        if update_root.load(Ordering::SeqCst) {
458                            let mut root_lock = self.root.lock().await;
459                            *root_lock = node.index();
460                            node.set_parent(None);
461                            nodes_lock.remove(&p_idx);
462                            file_lock.free_data(p_idx)?;
463                            break;
464                        }
465                    }
466                    let node_parent = node
467                        .parent()
468                        .ok_or(BaildonError::LostParent(node.index()))?;
469                    // Replace our modified node
470                    self.replace_node(&mut nodes_lock, node);
471                    // Now, update our node for next loop
472                    node = self
473                        .find_node_with_lock(&mut nodes_lock, node_parent)
474                        .await?;
475                }
476                // If we don't have a neighbour, we can't have a parent, so job done
477                None => break,
478            }
479        }
480        // Replace our modified node
481        self.replace_node(&mut nodes_lock, node);
482        Ok(value)
483    }
484
485    /// Serialize and store all our updated nodes to disk.
486    pub async fn flush_to_disk(&self) -> Result<()> {
487        self.inner_flush_to_disk(true).await
488    }
489
490    async fn inner_flush_to_disk(&self, remove_wal: bool) -> Result<()> {
491        let mut nodes_lock = self.nodes.lock().await;
492        let mut file_lock = self.file.lock().await;
493
494        tracing::debug!("About to examine {} nodes", nodes_lock.len());
495        for node in nodes_lock.values_mut().filter(|n| !n.clean()) {
496            tracing::debug!("Storing node: {:?}", node);
497            // Update root offset if required.
498            if node.parent().is_none() {
499                *self.root.lock().await = node.index();
500                // *root_lock = node.index();
501            }
502            tracing::debug!("Storing dirty node {:?}", node);
503            node.set_clean(true);
504            let s_node = (*node).serialize()?;
505            file_lock.write_data(node.index(), &s_node).await?;
506        }
507        // Update the file header
508        let index = self.index.load(Ordering::SeqCst);
509        file_lock
510            .write_header_with_indices(*self.root.lock().await, index)
511            .await?;
512
513        tracing::debug!("Tree index: {}", self.index.load(Ordering::SeqCst));
514        nodes_lock.clear();
515
516        let result = file_lock.flush().await;
517        if result.is_ok() && remove_wal {
518            let mut wal_path = self.path.clone();
519            wal_path.set_extension("wal");
520            if let Err(e) = std::fs::remove_file(&wal_path) {
521                tracing::error!("Error when removing WAL: {e}");
522            }
523        }
524        result
525    }
526
527    /// Get the value.
528    pub async fn get(&self, key: &K) -> Option<V> {
529        let mut nodes_lock = self.nodes.lock().await;
530        let node = self
531            .search_node_with_lock(&mut nodes_lock, key)
532            .await
533            .ok()?;
534        node.value(key)
535    }
536
537    /// Log basic information about our B+Tree.
538    pub async fn info(&self) {
539        tracing::info!(
540            path = %self.path.display(),
541            branching = self.branch,
542            node_count = self.count().await,
543            "B+Tree"
544        );
545    }
546
547    /// Insert a Key and Value.
548    pub async fn insert(&self, key: K, value: V) -> Result<Option<V>, anyhow::Error> {
549        let cmd = Command::Upsert(key.clone(), value.clone());
550        let s_cmd = cmd.serialize()?;
551        let mut wal_lock = self.wal.lock().await;
552        wal_lock.write_data(&s_cmd).await?;
553        Ok(self.inner_insert(key, value).await)
554    }
555
556    /// Insert a Key and Value.
557    async fn inner_insert(&self, mut key: K, value: V) -> Option<V> {
558        tracing::debug!("INSERTING: {:?}, {:?}", key, value);
559        let mut nodes_lock = self.nodes.lock().await;
560
561        let mut node = self
562            .search_node_with_lock(&mut nodes_lock, &key)
563            .await
564            .ok()?;
565
566        assert!(node.is_leaf());
567
568        let value = node.set_value(&key, value);
569
570        if node.is_full() {
571            // Split the Node
572            let new = node.split();
573            key = node.max_key().clone();
574            let mut new_key = new.max_key().clone();
575            // Insert our new leaf node to the list of nodes
576            let mut new_idx = self.add_node(&mut nodes_lock, new).await;
577            loop {
578                let p_opt = node.parent();
579                match p_opt {
580                    Some(p_idx) => {
581                        // Help the borrow check by ensuring tmp will drop
582                        let tmp_idx = node.index();
583                        // Sync out our node and get ready to loop
584                        self.replace_node(&mut nodes_lock, node);
585                        // Process this parent
586                        node = self
587                            .find_node_as_option_with_lock(&mut nodes_lock, p_idx)
588                            .await?;
589                        node.set_child(&key, tmp_idx);
590                        node.set_child(&new_key, new_idx);
591                        if node.is_full() {
592                            // Now split our node and prepare to add it next
593                            // time around.
594                            let new = node.split();
595                            key = node.max_key().clone();
596                            new_key = new.max_key().clone();
597                            new_idx = self.add_node(&mut nodes_lock, new).await;
598                        } else {
599                            break;
600                        }
601                    }
602                    None => {
603                        let keys = vec![key, new_key];
604                        let children = vec![node.index(), new_idx];
605                        node.set_parent(Some(self.add_root(&mut nodes_lock, children, keys).await));
606                        break;
607                    }
608                }
609            }
610        }
611        // Finally, sync out our node and get ready to loop
612        self.replace_node(&mut nodes_lock, node);
613        value
614    }
615
616    /// Print to stdout all the nodes in the tree.
617    pub async fn print_nodes(&self, direction: Direction) {
618        let callback = |node: &Node<K, V>| {
619            println!("node: {node:?}");
620            ControlFlow::Continue(())
621        };
622        self.traverse_nodes(direction, callback).await
623    }
624
625    /// Traverse entries until stream exhausted or callback returns break.
626    pub async fn traverse_entries(
627        &self,
628        direction: Direction,
629        mut f: impl FnMut((K, V)) -> ControlFlow<()>,
630    ) {
631        let mut streamer = self.entries(direction).await;
632        while let Some(entry) = streamer.next().await {
633            match f(entry) {
634                ControlFlow::Break(_) => break,
635                ControlFlow::Continue(_) => continue,
636            }
637        }
638    }
639
640    /// Traverse keys until stream exhausted or callback returns break.
641    pub async fn traverse_keys(
642        &self,
643        direction: Direction,
644        mut f: impl FnMut(K) -> ControlFlow<()>,
645    ) {
646        let mut streamer = self.keys(direction).await;
647        while let Some(key) = streamer.next().await {
648            match f(key) {
649                ControlFlow::Break(_) => break,
650                ControlFlow::Continue(_) => continue,
651            }
652        }
653    }
654
655    /// Traverse values until stream exhausted or callback returns break.
656    pub async fn traverse_values(
657        &self,
658        direction: Direction,
659        mut f: impl FnMut(V) -> ControlFlow<()>,
660    ) {
661        let mut streamer = self.values(direction).await;
662        while let Some(value) = streamer.next().await {
663            match f(value) {
664                ControlFlow::Break(_) => break,
665                ControlFlow::Continue(_) => continue,
666            }
667        }
668    }
669
670    /// Return leaf node utilization.
671    pub async fn utilization(&self) -> f64 {
672        let used = AtomicUsize::new(0);
673        let total = AtomicUsize::new(0);
674
675        let callback = |node: &Node<K, V>| {
676            used.fetch_add(node.len(), Ordering::SeqCst);
677            total.fetch_add(self.branch as usize, Ordering::SeqCst);
678            ControlFlow::Continue(())
679        };
680        self.traverse_leaf_nodes(Direction::Ascending, callback)
681            .await;
682        used.load(Ordering::SeqCst) as f64 / total.load(Ordering::SeqCst) as f64
683    }
684
685    /// Verify all the nodes in the tree.
686    pub async fn verify(&self, direction: Direction) -> Result<()> {
687        let callback = |node: &Node<K, V>| {
688            let mut seen_keys: Vec<K> = vec![];
689            if node.is_leaf() {
690                for key in node.keys() {
691                    assert!(!seen_keys.contains(key));
692                }
693                seen_keys.extend(node.keys().cloned());
694            } else {
695                futures::executor::block_on(async {
696                    let mut nodes_lock = self.nodes.lock().await;
697                    for child in node.children() {
698                        let child = match self.find_node_with_lock(&mut nodes_lock, child).await {
699                            Ok(c) => c,
700                            Err(e) => {
701                                tracing::error!("could not find node: {e}");
702                                return ControlFlow::Break(());
703                            }
704                        };
705                        assert_eq!(Some(node.index()), child.parent());
706                    }
707                    ControlFlow::Continue(())
708                });
709            }
710            node.verify_keys();
711            ControlFlow::Continue(())
712        };
713        self.traverse_nodes(direction, callback).await;
714        Ok(())
715    }
716
717    /// Return last key.
718    #[allow(dead_code)]
719    async fn last_key(&self) -> Option<K> {
720        self.last_leaf().await.keys().last().cloned()
721    }
722
723    /// Return first key.
724    #[allow(dead_code)]
725    async fn first_key(&self) -> Option<K> {
726        self.first_leaf().await.keys().next().cloned()
727    }
728
729    /// Traverse all nodes in a tree using the callback.
730    async fn traverse_nodes(
731        &self,
732        direction: Direction,
733        mut f: impl FnMut(&Node<K, V>) -> ControlFlow<()>,
734    ) {
735        let mut streamer = self.stream_all_nodes(direction).await;
736        while let Some(leaf) = streamer.next().await {
737            match f(&leaf) {
738                ControlFlow::Break(_) => break,
739                ControlFlow::Continue(_) => continue,
740            }
741        }
742    }
743
744    /// Traverse all leaf nodes in a tree using the callback.
745    async fn traverse_leaf_nodes(
746        &self,
747        direction: Direction,
748        mut f: impl FnMut(&Node<K, V>) -> ControlFlow<()>,
749    ) {
750        let mut streamer = self.stream_all_leaf_nodes(direction).await;
751        while let Some(leaf) = streamer.next().await {
752            match f(&leaf) {
753                ControlFlow::Break(_) => break,
754                ControlFlow::Continue(_) => continue,
755            }
756        }
757    }
758
759    async fn add_node(
760        &self,
761        nodes_lock: &mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
762        mut node: Node<K, V>,
763    ) -> usize {
764        let idx = self.index.fetch_add(1, Ordering::SeqCst);
765        node.set_index(idx);
766        if let Node::Internal(data) = &node {
767            for c_idx in data.children() {
768                self.update_node(nodes_lock, c_idx, |node: &mut Node<K, V>| {
769                    node.set_parent(Some(idx));
770                    None
771                })
772                .await;
773            }
774        }
775        nodes_lock.insert(idx, node);
776        idx
777    }
778
779    fn replace_node(
780        &self,
781        nodes_lock: &mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
782        mut node: Node<K, V>,
783    ) -> Option<Node<K, V>> {
784        node.set_clean(false);
785        nodes_lock.insert(node.index(), node)
786    }
787
788    async fn update_node(
789        &self,
790        nodes_lock: &mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
791        idx: usize,
792        f: impl FnOnce(&mut Node<K, V>) -> Option<V>,
793    ) -> Option<V> {
794        // Add the node to our cache if it isn't already there
795        if nodes_lock.get(&idx).is_none() {
796            let node = self.read_node(idx).await.ok()?;
797            nodes_lock.insert(idx, node);
798        }
799        let node = nodes_lock.get_mut(&idx).unwrap();
800        tracing::debug!("Updating node: {:?}", node);
801        // Always mark an updated node as not clean
802        node.set_clean(false);
803        f(node)
804    }
805
806    async fn add_root<'a>(
807        &self,
808        nodes_lock: &mut MutexGuard<'a, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
809        children: Vec<usize>,
810        keys: Vec<K>,
811    ) -> usize {
812        tracing::debug!(
813            "Adding a new root: children: {:?}, keys: {:?}",
814            children,
815            keys
816        );
817        let root: Node<K, V> = Node::internal(self.branch, None, keys, children.clone());
818        let root_idx = self.add_node(nodes_lock, root).await;
819        let closure = |node: &mut Node<K, V>| {
820            node.set_parent(Some(root_idx));
821            None
822        };
823        self.update_node(nodes_lock, children[0], closure).await;
824        self.update_node(nodes_lock, children[1], closure).await;
825        let mut root_lock = self.root.lock().await;
826        *root_lock = root_idx;
827        root_idx
828    }
829
830    /// Search our tree from the root for
831    ///  - a leaf node to which our key will be added
832    ///
833    /// This will return the last node in the tree if an earlier node doesn't match first.
834    #[inline]
835    async fn search_node_with_lock(
836        &self,
837        nodes_lock: &'_ mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
838        key: &K,
839    ) -> Result<Node<K, V>> {
840        let mut target_node = self
841            .find_node_with_lock(nodes_lock, *self.root.lock().await)
842            .await?;
843        loop {
844            tracing::debug!("TARGET NODE: {:?}", target_node);
845            if target_node.is_leaf() {
846                return Ok(target_node.clone());
847            }
848            let t_idx = target_node
849                .child(key)
850                .ok_or(BaildonError::LostChild(target_node.index()))?;
851            target_node = self.find_node_with_lock(nodes_lock, t_idx).await?;
852        }
853    }
854
855    /// Find a node from cache (or disk).
856    pub(crate) async fn find_node_as_option_with_lock(
857        &self,
858        nodes_lock: &'_ mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
859        idx: usize,
860    ) -> Option<Node<K, V>> {
861        match self.find_node_with_lock(nodes_lock, idx).await {
862            Ok(n) => Some(n),
863            Err(e) => {
864                tracing::error!("could not find node with index {idx}: {e}");
865                None
866            }
867        }
868    }
869
870    /// Find a node from cache (or disk).
871    pub(crate) async fn find_node_with_lock(
872        &self,
873        nodes_lock: &'_ mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
874        idx: usize,
875    ) -> Result<Node<K, V>> {
876        let child = match nodes_lock.get(&idx) {
877            Some(c) => c.clone(),
878            None => {
879                let node = self.read_node(idx).await?;
880                nodes_lock.insert(idx, node.clone());
881                node
882            }
883        };
884        Ok(child)
885    }
886
887    /// Read a node from disk.
888    async fn read_node(&self, idx: usize) -> Result<Node<K, V>> {
889        let mut file_lock = self.file.lock().await;
890        let buf = file_lock.read_data(idx).await?;
891        Node::<K, V>::deserialize(&buf)
892    }
893
894    pub(crate) async fn first_leaf(&self) -> Node<K, V> {
895        let mut nodes_lock = self.nodes.lock().await;
896        let root_lock = self.root.lock().await;
897        let mut node = self
898            .find_node_with_lock(&mut nodes_lock, *root_lock)
899            .await
900            .expect("FLUFFED NODE");
901        loop {
902            if node.is_leaf() {
903                return node;
904            }
905            let idx = node.first_child();
906            node = self
907                .find_node_with_lock(&mut nodes_lock, idx)
908                .await
909                .expect("FLUFFED NODE");
910        }
911    }
912
913    pub(crate) async fn last_leaf(&self) -> Node<K, V> {
914        let mut nodes_lock = self.nodes.lock().await;
915        let root_lock = self.root.lock().await;
916        let mut node = self
917            .find_node_with_lock(&mut nodes_lock, *root_lock)
918            .await
919            .expect("FLUFFED NODE");
920        loop {
921            if node.is_leaf() {
922                return node;
923            }
924            let idx = node.last_child();
925            node = self
926                .find_node_with_lock(&mut nodes_lock, idx)
927                .await
928                .expect("FLUFFED NODE");
929        }
930    }
931
932    pub(crate) async fn neighbour(&self, idx: usize, direction: Direction) -> Option<Node<K, V>> {
933        let mut nodes_lock = self.nodes.lock().await;
934        self.neighbour_with_lock(&mut nodes_lock, idx, direction)
935            .await
936    }
937
938    async fn neighbour_same_parent_with_lock(
939        &self,
940        nodes_lock: &'_ mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
941        idx: usize,
942        direction: Direction,
943    ) -> Option<Node<K, V>> {
944        let node = self.find_node_as_option_with_lock(nodes_lock, idx).await?;
945        match node.parent() {
946            Some(p_idx) => {
947                let parent = self
948                    .find_node_as_option_with_lock(nodes_lock, p_idx)
949                    .await?;
950
951                // Can't have a neighbour if we have fewer than 2 children
952                if parent.len() < 2 {
953                    return None;
954                }
955
956                // Find my position
957                let my_pos = parent.children().position(|x| x == idx)?;
958
959                let neighbour_idx = if direction == Direction::Ascending {
960                    // If last child, can't have a ascending neighbour
961                    if my_pos == parent.len() - 1 {
962                        return None;
963                    }
964
965                    // We know we can't get an error here
966                    parent.children().nth(my_pos + 1)?
967                } else {
968                    // If first child, can't have a descending neighbour
969                    if my_pos == 0 {
970                        return None;
971                    }
972                    // We know we can't get an error here
973                    parent.children().nth(my_pos - 1)?
974                };
975                self.find_node_as_option_with_lock(nodes_lock, neighbour_idx)
976                    .await
977            }
978            None => None,
979        }
980    }
981
982    async fn neighbour_with_lock(
983        &self,
984        nodes_lock: &'_ mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
985        mut idx: usize,
986        direction: Direction,
987    ) -> Option<Node<K, V>> {
988        let mut node = self.find_node_as_option_with_lock(nodes_lock, idx).await?;
989
990        let original_node = node.clone();
991
992        // The logic for finding a neighbour is:
993        // If we don't have a parent, we can't have a neighbour
994        // If we do have a parent, check if we are the last child:
995        //  - if we aren't - return the next child and then return
996        //    this if it's a leaf or keep checking the first child until
997        //    it is a leaf
998        //  - if we are - keep looping up the tree and checking if we are
999        //    the last child until either we aren't or we reach the top of
1000        //    the tree. If we aren't, do as first clause.
1001
1002        loop {
1003            match node.parent() {
1004                Some(p_idx) => {
1005                    // Process this parent
1006                    node = self
1007                        .find_node_as_option_with_lock(nodes_lock, p_idx)
1008                        .await?;
1009                    // Not a direct descendent?
1010                    if !node.children().any(|x| x == idx) {
1011                        idx = p_idx;
1012                        continue;
1013                    }
1014                    if direction == Direction::Ascending {
1015                        // Is it the last child?
1016                        if node.last_child() == idx {
1017                            idx = p_idx;
1018                            continue;
1019                        }
1020                        // Get the next index
1021                        let c_pos = node.children().position(|x| x == idx)? + 1;
1022
1023                        // Now keep looping down the children until we find a leaf node from c_pos
1024                        idx = node.children().nth(c_pos)?;
1025                        loop {
1026                            node = self.find_node_as_option_with_lock(nodes_lock, idx).await?;
1027                            if original_node.is_leaf() == node.is_leaf() {
1028                                break;
1029                            }
1030                            idx = node.children().next().unwrap();
1031                        }
1032                    } else {
1033                        // Is it the first child?
1034                        if node.first_child() == idx {
1035                            idx = p_idx;
1036                            continue;
1037                        }
1038                        // Get the previous index
1039                        let c_pos = node.children().position(|x| x == idx)? - 1;
1040
1041                        // Now keep looping down the children until we find a node from c_pos
1042                        idx = node.children().nth(c_pos)?;
1043                        loop {
1044                            node = self.find_node_as_option_with_lock(nodes_lock, idx).await?;
1045                            if original_node.is_leaf() == node.is_leaf() {
1046                                break;
1047                            }
1048                            idx = node.children().last().unwrap();
1049                        }
1050                    }
1051                    return Some(node);
1052                }
1053                None => return None,
1054            }
1055        }
1056    }
1057}
1058
1059impl<K, V> Baildon<K, V>
1060where
1061    K: BaildonKey + Send + Sync + Display,
1062    V: BaildonValue + Send + Sync + Display,
1063{
1064    /// Print to stdout all the keys and values in the tree.
1065    pub async fn print_entries(&self, direction: Direction) {
1066        let mut sep = "";
1067        let callback = |(key, value)| {
1068            print!("{sep}{key}:{value}");
1069            sep = ", ";
1070            ControlFlow::Continue(())
1071        };
1072        self.traverse_entries(direction, callback).await;
1073        println!();
1074    }
1075
1076    /// Print to stdout all the keys in the tree.
1077    pub async fn print_keys(&self, direction: Direction) {
1078        let mut sep = "";
1079        let callback = |key| {
1080            print!("{sep}{key}");
1081            sep = ", ";
1082            ControlFlow::Continue(())
1083        };
1084        self.traverse_keys(direction, callback).await;
1085        println!();
1086    }
1087
1088    /// Print to stdout all the values in the tree.
1089    pub async fn print_values(&self, direction: Direction) {
1090        let mut sep = "";
1091        let callback = |value| {
1092            print!("{sep}{value}");
1093            sep = ", ";
1094            ControlFlow::Continue(())
1095        };
1096        self.traverse_values(direction, callback).await;
1097        println!();
1098    }
1099}
1100
1101impl<K, V> Drop for Baildon<K, V>
1102where
1103    K: BaildonKey + Send + Sync,
1104    V: BaildonValue + Send + Sync,
1105{
1106    fn drop(&mut self) {
1107        std::thread::scope(|s| {
1108            let hdl = s.spawn(|| {
1109                let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
1110                if let Err(e) = runtime.block_on(self.flush_to_disk()) {
1111                    tracing::warn!("could not flush data file to disk: {}", e);
1112                }
1113            });
1114            hdl.join().expect("thread finished");
1115        });
1116    }
1117}
1118
1119#[cfg(test)]
1120mod tests;