1use std::collections::HashMap;
7use std::fmt::Display;
8use std::io::ErrorKind;
9use std::ops::ControlFlow;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::Ordering;
12use std::sync::atomic::{AtomicBool, AtomicUsize};
13
14use anyhow::Result;
15use futures::StreamExt;
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use strum::EnumString;
19use thiserror::Error;
20use tokio::io;
21use tokio::sync::{Mutex, MutexGuard};
22
23use super::node::Node;
24use super::sparse::BuildIdentityHasher;
25use crate::command::Command;
26use crate::io::file::BTreeFile;
27use crate::io::wal::WalFile;
28
29#[derive(Clone, Copy, Debug, EnumString, PartialEq)]
31#[strum(ascii_case_insensitive)]
32pub enum Direction {
33 Ascending,
35 Descending,
37}
38
39const BAILDON_FILE_SIZE: u64 = 512_000;
40
41pub trait BaildonKey: Clone + Ord + Serialize + DeserializeOwned + std::fmt::Debug {}
43
44impl<K> BaildonKey for K
46where
47 K: Clone + Ord + Serialize + DeserializeOwned + std::fmt::Debug,
48{
49 }
52
53pub trait BaildonValue: Clone + Serialize + DeserializeOwned + std::fmt::Debug {}
55
56impl<V> BaildonValue for V
58where
59 V: Clone + Serialize + DeserializeOwned + std::fmt::Debug,
60{
61 }
64
65#[derive(Error, Debug)]
67pub enum BaildonError {
68 #[error("branch: {0} must be >=2")]
70 BranchTooSmall(u64),
71
72 #[error("could not find child for node with index: {0}")]
74 LostChild(usize),
75
76 #[error("could not find parent for node with index: {0}")]
78 LostParent(usize),
79}
80
81pub struct Baildon<K, V>
83where
85 K: BaildonKey + Send + Sync,
86 V: BaildonValue + Send + Sync,
87{
88 file: Mutex<BTreeFile>,
89 path: PathBuf,
90 root: Mutex<usize>,
91 pub(crate) nodes: Mutex<HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
92 branch: u64,
93 pub(crate) index: AtomicUsize,
94 wal: Mutex<WalFile>,
95}
96
97impl<K, V> Baildon<K, V>
98where
99 K: BaildonKey + Send + Sync,
100 V: BaildonValue + Send + Sync,
101{
102 pub async fn try_new<P: AsRef<Path>>(origin: P, branch: u64) -> Result<Self> {
104 if branch < 2 {
105 return Err(BaildonError::BranchTooSmall(branch).into());
106 }
107 let path: &Path = origin.as_ref();
108
109 tracing::info!("Creating B+Tree at: {}", path.display());
110
111 let mut file = BTreeFile::try_new(path, BAILDON_FILE_SIZE).await?;
112
113 let root = Node::<K, V>::root(branch);
114
115 let s_root = root.serialize()?;
116
117 file.write_data(1, &s_root).await?;
118
119 let mut nodes: HashMap<_, _, BuildIdentityHasher> = HashMap::default();
120 nodes.insert(1, root);
121
122 let mut wal_path = PathBuf::new();
125 wal_path.push(origin.as_ref());
126 wal_path.set_extension("wal");
127 let wal = WalFile::try_new(&wal_path).await?;
128
129 let this = Self {
130 file: Mutex::new(file),
131 path: path.into(),
132 root: Mutex::new(1),
133 nodes: Mutex::new(nodes),
134 branch,
135 index: AtomicUsize::new(2),
136 wal: Mutex::new(wal),
137 };
138 this.inner_flush_to_disk(false).await?;
139 Ok(this)
140 }
141
142 pub async fn try_open<P: AsRef<Path>>(origin: P) -> Result<Self> {
144 let path: &Path = origin.as_ref();
145
146 tracing::info!("Opening B+Tree at: {}", path.display());
147
148 let mut file = BTreeFile::try_open(path).await?;
149
150 let index = AtomicUsize::new(file.get_tree_index().await);
151
152 let buf = file.read_data(file.get_root_index().await).await?;
153 let root: Node<K, V> = Node::<K, V>::deserialize(&buf)?;
154 let branch = root.branch();
155
156 let mut nodes: HashMap<_, _, BuildIdentityHasher> = HashMap::default();
157 let idx = root.index();
158 nodes.insert(root.index(), root);
159
160 let mut wal_path = PathBuf::new();
163 wal_path.push(origin.as_ref());
164 wal_path.set_extension("wal");
165 let mut recover = false;
166 let wal = match WalFile::try_open(&wal_path).await {
167 Ok(wal) => {
168 recover = true;
169 wal
170 }
171 Err(err) => {
172 if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
174 if io_error.kind() != ErrorKind::NotFound {
175 return Err(err);
176 }
177 } else {
178 return Err(err);
179 }
180 WalFile::try_new(&wal_path).await?
181 }
182 };
183
184 let this = Self {
185 file: Mutex::new(file),
186 path: path.into(),
187 root: Mutex::new(idx),
188 nodes: Mutex::new(nodes),
189 branch,
190 index,
191 wal: Mutex::new(wal),
192 };
193
194 if recover {
195 let mut wal = this.wal.lock().await;
196
197 tracing::info!("Recovering from wal...");
199 loop {
200 match wal.read_data().await {
201 Ok(data) => {
202 let cmd: Command<K, V> = Command::deserialize(&data)?;
203 match cmd {
204 Command::Upsert(key, value) => {
205 let _ = this.inner_insert(key, value).await;
208 }
209 Command::Delete(key) => {
210 let _ = this.inner_delete(&key).await;
213 }
214 }
215 }
216 Err(e) => {
217 if let Some(down_e) = e.downcast_ref::<io::Error>() {
219 if down_e.kind() == io::ErrorKind::UnexpectedEof {
220 std::fs::remove_file(&wal_path)?;
221 *wal = WalFile::try_new(&wal_path).await?;
222 break;
223 }
224 }
225 tracing::info!("Recovering failed, data read error: {e:?}");
226 return Err(e);
227 }
228 }
229 }
230 tracing::info!("Recovered!");
231 }
232 Ok(this)
233 }
234
235 pub async fn clear(&self) -> Result<()> {
237 let mut file_lock = self.file.lock().await;
238 file_lock.reset(BAILDON_FILE_SIZE).await?;
239
240 let mut nodes_lock = self.nodes.lock().await;
242 nodes_lock.clear();
243 self.index.store(1, Ordering::SeqCst);
244 let root = Node::<K, V>::root(self.branch);
245 self.add_node(&mut nodes_lock, root).await;
246 let mut root_lock = self.root.lock().await;
247 *root_lock = 1;
248 Ok(())
249 }
250
251 pub async fn contains(&self, key: &K) -> bool {
253 let mut nodes_lock = self.nodes.lock().await;
254 let node = match self.search_node_with_lock(&mut nodes_lock, key).await {
255 Ok(v) => v,
256 Err(_) => return false,
257 };
258 node.key_index(key).is_some()
259 }
260
261 pub async fn count(&self) -> usize {
263 let count = AtomicUsize::new(0);
264 let callback = |node: &Node<K, V>| {
265 count.fetch_add(node.len(), Ordering::SeqCst);
266 ControlFlow::Continue(())
267 };
268 self.traverse_leaf_nodes(Direction::Ascending, callback)
269 .await;
270 count.load(Ordering::SeqCst)
271 }
272
273 pub async fn delete(&self, key: &K) -> Result<Option<V>, anyhow::Error> {
275 let cmd: Command<K, V> = Command::Delete(key.clone());
276 let s_cmd = cmd.serialize()?;
277 let mut wal_lock = self.wal.lock().await;
278 wal_lock.write_data(&s_cmd).await?;
279 self.inner_delete(key).await
280 }
281
282 async fn inner_delete(&self, key: &K) -> Result<Option<V>> {
283 let mut nodes_lock = self.nodes.lock().await;
284
285 let mut node = self.search_node_with_lock(&mut nodes_lock, key).await?;
286
287 if node.key_index(key).is_none() {
290 return Ok(None);
291 }
292
293 let value = node.remove_value(key);
296
297 loop {
298 if !node.is_minimum() {
299 break;
300 }
301 let (neighbour_opt, direction) = match self
304 .neighbour_same_parent_with_lock(
305 &mut nodes_lock,
306 node.index(),
307 Direction::Ascending,
308 )
309 .await
310 {
311 Some(n) => (Some(n), Direction::Ascending),
312 None => (
313 self.neighbour_same_parent_with_lock(
314 &mut nodes_lock,
315 node.index(),
316 Direction::Descending,
317 )
318 .await,
319 Direction::Descending,
320 ),
321 };
322 match neighbour_opt {
324 Some(mut neighbour) => {
325 if !neighbour.is_minimum() {
327 let p_idx = node
328 .parent()
329 .ok_or(BaildonError::LostParent(node.index()))?;
330 assert_eq!(neighbour.parent(), node.parent());
342 match &mut neighbour {
343 Node::Internal(data) => {
344 let (tgt_idx, (k, child)) = if direction == Direction::Ascending {
345 (node.index(), (data.remove_pair(0)))
346 } else {
347 (data.index(), (data.remove_pair(data.len() - 1)))
348 };
349
350 node.set_child(&k, child);
352
353 let closure = |child: &mut Node<K, V>| {
355 child.set_parent(Some(node.index()));
356 None
357 };
358 self.update_node(&mut nodes_lock, child, closure).await;
359
360 self.update_node(
362 &mut nodes_lock,
363 p_idx,
364 |parent: &mut Node<K, V>| {
365 parent.update_child_key(tgt_idx, k);
366 None
367 },
368 )
369 .await;
370 }
371 Node::Leaf(data) => {
372 let (tgt_idx, (k, value)) = if direction == Direction::Ascending {
373 (node.index(), (data.remove_pair(0)))
374 } else {
375 (data.index(), (data.remove_pair(data.len() - 1)))
376 };
377
378 node.set_value(&k, value);
380
381 self.update_node(
383 &mut nodes_lock,
384 p_idx,
385 |parent: &mut Node<K, V>| {
386 parent.update_child_key(tgt_idx, k);
387 None
388 },
389 )
390 .await;
391 }
392 }
393 self.replace_node(&mut nodes_lock, neighbour);
395 } else {
396 assert_ne!(neighbour.index(), node.index());
398 assert_eq!(neighbour.parent(), node.parent());
399 if let Node::Internal(data) = &neighbour {
402 let p_idx = Some(node.index());
403 let closure_update_parent = move |child: &mut Node<K, V>| {
404 child.set_parent(p_idx);
405 None
406 };
407 for child in data.children() {
408 let _ = self
409 .update_node(&mut nodes_lock, child, closure_update_parent)
410 .await;
411 }
412 }
413 let neighbour_idx = neighbour.index();
415 let neighbour_max_key = neighbour.max_key().clone();
416 node.merge(neighbour);
417 let update_root = AtomicBool::new(false);
420 let closure_cleanup_parent = |parent: &mut Node<K, V>| {
421 if direction == Direction::Ascending {
428 let _idx = parent.remove_child(neighbour_idx)?;
429 parent.update_child_key(node.index(), neighbour_max_key);
430 } else {
431 let _idx = parent.remove_child(neighbour_idx)?;
432 }
433
434 if parent.len() == 1 && parent.parent().is_none() {
435 assert_eq!(parent.len(), 1);
436 assert_eq!(
437 parent.children().next().expect("HAS A 0"),
438 node.index()
439 );
440 update_root.store(true, Ordering::SeqCst);
441 }
442 None
443 };
444 let p_idx = node
445 .parent()
446 .ok_or(BaildonError::LostParent(node.index()))?;
447 let _ = self
448 .update_node(&mut nodes_lock, p_idx, closure_cleanup_parent)
449 .await;
450 nodes_lock.remove(&neighbour_idx);
452 let mut file_lock = self.file.lock().await;
454 file_lock.free_data(neighbour_idx)?;
455
456 if update_root.load(Ordering::SeqCst) {
458 let mut root_lock = self.root.lock().await;
459 *root_lock = node.index();
460 node.set_parent(None);
461 nodes_lock.remove(&p_idx);
462 file_lock.free_data(p_idx)?;
463 break;
464 }
465 }
466 let node_parent = node
467 .parent()
468 .ok_or(BaildonError::LostParent(node.index()))?;
469 self.replace_node(&mut nodes_lock, node);
471 node = self
473 .find_node_with_lock(&mut nodes_lock, node_parent)
474 .await?;
475 }
476 None => break,
478 }
479 }
480 self.replace_node(&mut nodes_lock, node);
482 Ok(value)
483 }
484
485 pub async fn flush_to_disk(&self) -> Result<()> {
487 self.inner_flush_to_disk(true).await
488 }
489
490 async fn inner_flush_to_disk(&self, remove_wal: bool) -> Result<()> {
491 let mut nodes_lock = self.nodes.lock().await;
492 let mut file_lock = self.file.lock().await;
493
494 tracing::debug!("About to examine {} nodes", nodes_lock.len());
495 for node in nodes_lock.values_mut().filter(|n| !n.clean()) {
496 tracing::debug!("Storing node: {:?}", node);
497 if node.parent().is_none() {
499 *self.root.lock().await = node.index();
500 }
502 tracing::debug!("Storing dirty node {:?}", node);
503 node.set_clean(true);
504 let s_node = (*node).serialize()?;
505 file_lock.write_data(node.index(), &s_node).await?;
506 }
507 let index = self.index.load(Ordering::SeqCst);
509 file_lock
510 .write_header_with_indices(*self.root.lock().await, index)
511 .await?;
512
513 tracing::debug!("Tree index: {}", self.index.load(Ordering::SeqCst));
514 nodes_lock.clear();
515
516 let result = file_lock.flush().await;
517 if result.is_ok() && remove_wal {
518 let mut wal_path = self.path.clone();
519 wal_path.set_extension("wal");
520 if let Err(e) = std::fs::remove_file(&wal_path) {
521 tracing::error!("Error when removing WAL: {e}");
522 }
523 }
524 result
525 }
526
527 pub async fn get(&self, key: &K) -> Option<V> {
529 let mut nodes_lock = self.nodes.lock().await;
530 let node = self
531 .search_node_with_lock(&mut nodes_lock, key)
532 .await
533 .ok()?;
534 node.value(key)
535 }
536
537 pub async fn info(&self) {
539 tracing::info!(
540 path = %self.path.display(),
541 branching = self.branch,
542 node_count = self.count().await,
543 "B+Tree"
544 );
545 }
546
547 pub async fn insert(&self, key: K, value: V) -> Result<Option<V>, anyhow::Error> {
549 let cmd = Command::Upsert(key.clone(), value.clone());
550 let s_cmd = cmd.serialize()?;
551 let mut wal_lock = self.wal.lock().await;
552 wal_lock.write_data(&s_cmd).await?;
553 Ok(self.inner_insert(key, value).await)
554 }
555
556 async fn inner_insert(&self, mut key: K, value: V) -> Option<V> {
558 tracing::debug!("INSERTING: {:?}, {:?}", key, value);
559 let mut nodes_lock = self.nodes.lock().await;
560
561 let mut node = self
562 .search_node_with_lock(&mut nodes_lock, &key)
563 .await
564 .ok()?;
565
566 assert!(node.is_leaf());
567
568 let value = node.set_value(&key, value);
569
570 if node.is_full() {
571 let new = node.split();
573 key = node.max_key().clone();
574 let mut new_key = new.max_key().clone();
575 let mut new_idx = self.add_node(&mut nodes_lock, new).await;
577 loop {
578 let p_opt = node.parent();
579 match p_opt {
580 Some(p_idx) => {
581 let tmp_idx = node.index();
583 self.replace_node(&mut nodes_lock, node);
585 node = self
587 .find_node_as_option_with_lock(&mut nodes_lock, p_idx)
588 .await?;
589 node.set_child(&key, tmp_idx);
590 node.set_child(&new_key, new_idx);
591 if node.is_full() {
592 let new = node.split();
595 key = node.max_key().clone();
596 new_key = new.max_key().clone();
597 new_idx = self.add_node(&mut nodes_lock, new).await;
598 } else {
599 break;
600 }
601 }
602 None => {
603 let keys = vec![key, new_key];
604 let children = vec![node.index(), new_idx];
605 node.set_parent(Some(self.add_root(&mut nodes_lock, children, keys).await));
606 break;
607 }
608 }
609 }
610 }
611 self.replace_node(&mut nodes_lock, node);
613 value
614 }
615
616 pub async fn print_nodes(&self, direction: Direction) {
618 let callback = |node: &Node<K, V>| {
619 println!("node: {node:?}");
620 ControlFlow::Continue(())
621 };
622 self.traverse_nodes(direction, callback).await
623 }
624
625 pub async fn traverse_entries(
627 &self,
628 direction: Direction,
629 mut f: impl FnMut((K, V)) -> ControlFlow<()>,
630 ) {
631 let mut streamer = self.entries(direction).await;
632 while let Some(entry) = streamer.next().await {
633 match f(entry) {
634 ControlFlow::Break(_) => break,
635 ControlFlow::Continue(_) => continue,
636 }
637 }
638 }
639
640 pub async fn traverse_keys(
642 &self,
643 direction: Direction,
644 mut f: impl FnMut(K) -> ControlFlow<()>,
645 ) {
646 let mut streamer = self.keys(direction).await;
647 while let Some(key) = streamer.next().await {
648 match f(key) {
649 ControlFlow::Break(_) => break,
650 ControlFlow::Continue(_) => continue,
651 }
652 }
653 }
654
655 pub async fn traverse_values(
657 &self,
658 direction: Direction,
659 mut f: impl FnMut(V) -> ControlFlow<()>,
660 ) {
661 let mut streamer = self.values(direction).await;
662 while let Some(value) = streamer.next().await {
663 match f(value) {
664 ControlFlow::Break(_) => break,
665 ControlFlow::Continue(_) => continue,
666 }
667 }
668 }
669
670 pub async fn utilization(&self) -> f64 {
672 let used = AtomicUsize::new(0);
673 let total = AtomicUsize::new(0);
674
675 let callback = |node: &Node<K, V>| {
676 used.fetch_add(node.len(), Ordering::SeqCst);
677 total.fetch_add(self.branch as usize, Ordering::SeqCst);
678 ControlFlow::Continue(())
679 };
680 self.traverse_leaf_nodes(Direction::Ascending, callback)
681 .await;
682 used.load(Ordering::SeqCst) as f64 / total.load(Ordering::SeqCst) as f64
683 }
684
685 pub async fn verify(&self, direction: Direction) -> Result<()> {
687 let callback = |node: &Node<K, V>| {
688 let mut seen_keys: Vec<K> = vec![];
689 if node.is_leaf() {
690 for key in node.keys() {
691 assert!(!seen_keys.contains(key));
692 }
693 seen_keys.extend(node.keys().cloned());
694 } else {
695 futures::executor::block_on(async {
696 let mut nodes_lock = self.nodes.lock().await;
697 for child in node.children() {
698 let child = match self.find_node_with_lock(&mut nodes_lock, child).await {
699 Ok(c) => c,
700 Err(e) => {
701 tracing::error!("could not find node: {e}");
702 return ControlFlow::Break(());
703 }
704 };
705 assert_eq!(Some(node.index()), child.parent());
706 }
707 ControlFlow::Continue(())
708 });
709 }
710 node.verify_keys();
711 ControlFlow::Continue(())
712 };
713 self.traverse_nodes(direction, callback).await;
714 Ok(())
715 }
716
717 #[allow(dead_code)]
719 async fn last_key(&self) -> Option<K> {
720 self.last_leaf().await.keys().last().cloned()
721 }
722
723 #[allow(dead_code)]
725 async fn first_key(&self) -> Option<K> {
726 self.first_leaf().await.keys().next().cloned()
727 }
728
729 async fn traverse_nodes(
731 &self,
732 direction: Direction,
733 mut f: impl FnMut(&Node<K, V>) -> ControlFlow<()>,
734 ) {
735 let mut streamer = self.stream_all_nodes(direction).await;
736 while let Some(leaf) = streamer.next().await {
737 match f(&leaf) {
738 ControlFlow::Break(_) => break,
739 ControlFlow::Continue(_) => continue,
740 }
741 }
742 }
743
744 async fn traverse_leaf_nodes(
746 &self,
747 direction: Direction,
748 mut f: impl FnMut(&Node<K, V>) -> ControlFlow<()>,
749 ) {
750 let mut streamer = self.stream_all_leaf_nodes(direction).await;
751 while let Some(leaf) = streamer.next().await {
752 match f(&leaf) {
753 ControlFlow::Break(_) => break,
754 ControlFlow::Continue(_) => continue,
755 }
756 }
757 }
758
759 async fn add_node(
760 &self,
761 nodes_lock: &mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
762 mut node: Node<K, V>,
763 ) -> usize {
764 let idx = self.index.fetch_add(1, Ordering::SeqCst);
765 node.set_index(idx);
766 if let Node::Internal(data) = &node {
767 for c_idx in data.children() {
768 self.update_node(nodes_lock, c_idx, |node: &mut Node<K, V>| {
769 node.set_parent(Some(idx));
770 None
771 })
772 .await;
773 }
774 }
775 nodes_lock.insert(idx, node);
776 idx
777 }
778
779 fn replace_node(
780 &self,
781 nodes_lock: &mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
782 mut node: Node<K, V>,
783 ) -> Option<Node<K, V>> {
784 node.set_clean(false);
785 nodes_lock.insert(node.index(), node)
786 }
787
788 async fn update_node(
789 &self,
790 nodes_lock: &mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
791 idx: usize,
792 f: impl FnOnce(&mut Node<K, V>) -> Option<V>,
793 ) -> Option<V> {
794 if nodes_lock.get(&idx).is_none() {
796 let node = self.read_node(idx).await.ok()?;
797 nodes_lock.insert(idx, node);
798 }
799 let node = nodes_lock.get_mut(&idx).unwrap();
800 tracing::debug!("Updating node: {:?}", node);
801 node.set_clean(false);
803 f(node)
804 }
805
806 async fn add_root<'a>(
807 &self,
808 nodes_lock: &mut MutexGuard<'a, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
809 children: Vec<usize>,
810 keys: Vec<K>,
811 ) -> usize {
812 tracing::debug!(
813 "Adding a new root: children: {:?}, keys: {:?}",
814 children,
815 keys
816 );
817 let root: Node<K, V> = Node::internal(self.branch, None, keys, children.clone());
818 let root_idx = self.add_node(nodes_lock, root).await;
819 let closure = |node: &mut Node<K, V>| {
820 node.set_parent(Some(root_idx));
821 None
822 };
823 self.update_node(nodes_lock, children[0], closure).await;
824 self.update_node(nodes_lock, children[1], closure).await;
825 let mut root_lock = self.root.lock().await;
826 *root_lock = root_idx;
827 root_idx
828 }
829
830 #[inline]
835 async fn search_node_with_lock(
836 &self,
837 nodes_lock: &'_ mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
838 key: &K,
839 ) -> Result<Node<K, V>> {
840 let mut target_node = self
841 .find_node_with_lock(nodes_lock, *self.root.lock().await)
842 .await?;
843 loop {
844 tracing::debug!("TARGET NODE: {:?}", target_node);
845 if target_node.is_leaf() {
846 return Ok(target_node.clone());
847 }
848 let t_idx = target_node
849 .child(key)
850 .ok_or(BaildonError::LostChild(target_node.index()))?;
851 target_node = self.find_node_with_lock(nodes_lock, t_idx).await?;
852 }
853 }
854
855 pub(crate) async fn find_node_as_option_with_lock(
857 &self,
858 nodes_lock: &'_ mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
859 idx: usize,
860 ) -> Option<Node<K, V>> {
861 match self.find_node_with_lock(nodes_lock, idx).await {
862 Ok(n) => Some(n),
863 Err(e) => {
864 tracing::error!("could not find node with index {idx}: {e}");
865 None
866 }
867 }
868 }
869
870 pub(crate) async fn find_node_with_lock(
872 &self,
873 nodes_lock: &'_ mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
874 idx: usize,
875 ) -> Result<Node<K, V>> {
876 let child = match nodes_lock.get(&idx) {
877 Some(c) => c.clone(),
878 None => {
879 let node = self.read_node(idx).await?;
880 nodes_lock.insert(idx, node.clone());
881 node
882 }
883 };
884 Ok(child)
885 }
886
887 async fn read_node(&self, idx: usize) -> Result<Node<K, V>> {
889 let mut file_lock = self.file.lock().await;
890 let buf = file_lock.read_data(idx).await?;
891 Node::<K, V>::deserialize(&buf)
892 }
893
894 pub(crate) async fn first_leaf(&self) -> Node<K, V> {
895 let mut nodes_lock = self.nodes.lock().await;
896 let root_lock = self.root.lock().await;
897 let mut node = self
898 .find_node_with_lock(&mut nodes_lock, *root_lock)
899 .await
900 .expect("FLUFFED NODE");
901 loop {
902 if node.is_leaf() {
903 return node;
904 }
905 let idx = node.first_child();
906 node = self
907 .find_node_with_lock(&mut nodes_lock, idx)
908 .await
909 .expect("FLUFFED NODE");
910 }
911 }
912
913 pub(crate) async fn last_leaf(&self) -> Node<K, V> {
914 let mut nodes_lock = self.nodes.lock().await;
915 let root_lock = self.root.lock().await;
916 let mut node = self
917 .find_node_with_lock(&mut nodes_lock, *root_lock)
918 .await
919 .expect("FLUFFED NODE");
920 loop {
921 if node.is_leaf() {
922 return node;
923 }
924 let idx = node.last_child();
925 node = self
926 .find_node_with_lock(&mut nodes_lock, idx)
927 .await
928 .expect("FLUFFED NODE");
929 }
930 }
931
932 pub(crate) async fn neighbour(&self, idx: usize, direction: Direction) -> Option<Node<K, V>> {
933 let mut nodes_lock = self.nodes.lock().await;
934 self.neighbour_with_lock(&mut nodes_lock, idx, direction)
935 .await
936 }
937
938 async fn neighbour_same_parent_with_lock(
939 &self,
940 nodes_lock: &'_ mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
941 idx: usize,
942 direction: Direction,
943 ) -> Option<Node<K, V>> {
944 let node = self.find_node_as_option_with_lock(nodes_lock, idx).await?;
945 match node.parent() {
946 Some(p_idx) => {
947 let parent = self
948 .find_node_as_option_with_lock(nodes_lock, p_idx)
949 .await?;
950
951 if parent.len() < 2 {
953 return None;
954 }
955
956 let my_pos = parent.children().position(|x| x == idx)?;
958
959 let neighbour_idx = if direction == Direction::Ascending {
960 if my_pos == parent.len() - 1 {
962 return None;
963 }
964
965 parent.children().nth(my_pos + 1)?
967 } else {
968 if my_pos == 0 {
970 return None;
971 }
972 parent.children().nth(my_pos - 1)?
974 };
975 self.find_node_as_option_with_lock(nodes_lock, neighbour_idx)
976 .await
977 }
978 None => None,
979 }
980 }
981
982 async fn neighbour_with_lock(
983 &self,
984 nodes_lock: &'_ mut MutexGuard<'_, HashMap<usize, Node<K, V>, BuildIdentityHasher>>,
985 mut idx: usize,
986 direction: Direction,
987 ) -> Option<Node<K, V>> {
988 let mut node = self.find_node_as_option_with_lock(nodes_lock, idx).await?;
989
990 let original_node = node.clone();
991
992 loop {
1003 match node.parent() {
1004 Some(p_idx) => {
1005 node = self
1007 .find_node_as_option_with_lock(nodes_lock, p_idx)
1008 .await?;
1009 if !node.children().any(|x| x == idx) {
1011 idx = p_idx;
1012 continue;
1013 }
1014 if direction == Direction::Ascending {
1015 if node.last_child() == idx {
1017 idx = p_idx;
1018 continue;
1019 }
1020 let c_pos = node.children().position(|x| x == idx)? + 1;
1022
1023 idx = node.children().nth(c_pos)?;
1025 loop {
1026 node = self.find_node_as_option_with_lock(nodes_lock, idx).await?;
1027 if original_node.is_leaf() == node.is_leaf() {
1028 break;
1029 }
1030 idx = node.children().next().unwrap();
1031 }
1032 } else {
1033 if node.first_child() == idx {
1035 idx = p_idx;
1036 continue;
1037 }
1038 let c_pos = node.children().position(|x| x == idx)? - 1;
1040
1041 idx = node.children().nth(c_pos)?;
1043 loop {
1044 node = self.find_node_as_option_with_lock(nodes_lock, idx).await?;
1045 if original_node.is_leaf() == node.is_leaf() {
1046 break;
1047 }
1048 idx = node.children().last().unwrap();
1049 }
1050 }
1051 return Some(node);
1052 }
1053 None => return None,
1054 }
1055 }
1056 }
1057}
1058
1059impl<K, V> Baildon<K, V>
1060where
1061 K: BaildonKey + Send + Sync + Display,
1062 V: BaildonValue + Send + Sync + Display,
1063{
1064 pub async fn print_entries(&self, direction: Direction) {
1066 let mut sep = "";
1067 let callback = |(key, value)| {
1068 print!("{sep}{key}:{value}");
1069 sep = ", ";
1070 ControlFlow::Continue(())
1071 };
1072 self.traverse_entries(direction, callback).await;
1073 println!();
1074 }
1075
1076 pub async fn print_keys(&self, direction: Direction) {
1078 let mut sep = "";
1079 let callback = |key| {
1080 print!("{sep}{key}");
1081 sep = ", ";
1082 ControlFlow::Continue(())
1083 };
1084 self.traverse_keys(direction, callback).await;
1085 println!();
1086 }
1087
1088 pub async fn print_values(&self, direction: Direction) {
1090 let mut sep = "";
1091 let callback = |value| {
1092 print!("{sep}{value}");
1093 sep = ", ";
1094 ControlFlow::Continue(())
1095 };
1096 self.traverse_values(direction, callback).await;
1097 println!();
1098 }
1099}
1100
1101impl<K, V> Drop for Baildon<K, V>
1102where
1103 K: BaildonKey + Send + Sync,
1104 V: BaildonValue + Send + Sync,
1105{
1106 fn drop(&mut self) {
1107 std::thread::scope(|s| {
1108 let hdl = s.spawn(|| {
1109 let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
1110 if let Err(e) = runtime.block_on(self.flush_to_disk()) {
1111 tracing::warn!("could not flush data file to disk: {}", e);
1112 }
1113 });
1114 hdl.join().expect("thread finished");
1115 });
1116 }
1117}
1118
1119#[cfg(test)]
1120mod tests;