use crate::{BTreeNode, FromBytes, NodeType, SplitInfo, ToBytes, TrieNode, BTREE_MAX_KEYS};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
pub(crate) fn collect_trie_range<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
node: &TrieNode<K, V>,
prefix: Vec<u8>,
start: &K,
end: &K,
result: &mut Vec<(K, V)>,
) {
if let Some(value) = &node.value {
if let Some(key) = K::from_bytes(&prefix) {
if &key >= start && &key <= end {
result.push((key, value.clone()));
}
}
}
for (byte, child_opt) in node.children.iter().enumerate() {
if let Some(child) = child_opt {
let mut new_prefix: Vec<u8> = prefix.clone();
new_prefix.push(byte as u8);
if let Ok(child_guard) = child.read() {
if let NodeType::Trie(child_trie) = &*child_guard {
collect_trie_range(child_trie, new_prefix, start, end, result);
}
}
}
}
}
pub(crate) fn collect_btree_range<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
node: &BTreeNode<K, V>,
start: &K,
end: &K,
result: &mut Vec<(K, V)>,
) {
if node.is_leaf {
let start_idx: usize = match node.keys.binary_search(start) {
Ok(idx) => idx,
Err(idx) => idx,
};
for idx in start_idx..node.keys.len() {
if &node.keys[idx] > end {
break;
}
result.push((node.keys[idx].clone(), node.values[idx].clone()));
}
} else {
let mut current_idx: usize = match node.keys.binary_search(start) {
Ok(idx) => idx + 1,
Err(idx) => idx,
};
while current_idx < node.children.len() {
if current_idx > 0 && &node.keys[current_idx - 1] > end {
break;
}
if let Ok(child_guard) = node.children[current_idx].read() {
match &*child_guard {
NodeType::BTree(child_btree) => {
collect_btree_range(child_btree, start, end, result);
}
NodeType::Trie(child_trie) => {
collect_trie_range(child_trie, Vec::new(), start, end, result);
}
}
}
current_idx += 1;
}
}
}
pub(crate) fn insert_into_trie<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
root: &mut TrieNode<K, V>,
key: &K,
value: V,
current_depth: usize,
key_bytes: &[u8],
) {
if current_depth == key_bytes.len() {
root.value = Some(value);
root.size.fetch_add(1, Ordering::Relaxed);
return;
}
let current_byte = key_bytes[current_depth] as usize;
if root.children[current_byte].is_none() {
root.children[current_byte] = Some(Arc::new(RwLock::new(NodeType::Trie(TrieNode {
children: vec![None; 256],
value: None,
depth: current_depth + 1,
size: AtomicUsize::new(0),
}))));
}
if let Some(child_arc) = &root.children[current_byte] {
if let Ok(mut child_lock) = child_arc.write() {
if let NodeType::Trie(child_trie) = &mut *child_lock {
insert_into_trie(child_trie, key, value, current_depth + 1, key_bytes);
}
}
}
}
pub(crate) fn collect_leaf_pairs<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
node: &BTreeNode<K, V>,
pairs: &mut Vec<(K, V)>,
) {
if node.is_leaf {
pairs.extend(node.keys.iter().cloned().zip(node.values.iter().cloned()));
} else {
for child in &node.children {
if let Ok(child_guard) = child.read() {
if let NodeType::BTree(child_btree) = &*child_guard {
collect_leaf_pairs(child_btree, pairs);
}
}
}
}
}
pub(crate) fn key_to_bytes<K: Clone + Ord + ToBytes + FromBytes>(key: &K) -> Vec<u8> {
key.to_bytes()
}
pub(crate) fn handle_split<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
parent: &mut BTreeNode<K, V>,
child_idx: usize,
split_info: SplitInfo<K, V>,
) {
parent.keys.insert(child_idx, split_info.median_key);
parent.values.insert(child_idx, split_info.median_value);
parent.children.insert(child_idx + 1, split_info.right_node);
}
pub(crate) fn split_btree_node<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
node: &mut BTreeNode<K, V>,
) -> Option<SplitInfo<K, V>> {
let mid: usize = node.keys.len() / 2;
let mut right_node: BTreeNode<K, V> = BTreeNode {
keys: Vec::with_capacity(BTREE_MAX_KEYS),
values: Vec::with_capacity(BTREE_MAX_KEYS),
children: Vec::with_capacity(BTREE_MAX_KEYS + 1),
is_leaf: node.is_leaf,
};
if node.is_leaf {
right_node.keys = node.keys.split_off(mid);
right_node.values = node.values.split_off(mid);
let median_key: K = right_node.keys[0].clone();
let median_value: V = right_node.values[0].clone();
let right_node: Arc<RwLock<NodeType<K, V>>> =
Arc::new(RwLock::new(NodeType::BTree(right_node)));
Some(SplitInfo {
median_key,
median_value,
right_node,
})
}
else {
let median_key: K = node.keys[mid].clone();
let median_value: V = node.values[mid].clone();
right_node.keys = node.keys.split_off(mid + 1);
right_node.values = node.values.split_off(mid + 1);
node.keys.pop();
node.values.pop();
right_node.children = node.children.split_off(mid + 1);
let right_node: Arc<RwLock<NodeType<K, V>>> =
Arc::new(RwLock::new(NodeType::BTree(right_node)));
Some(SplitInfo {
median_key,
median_value,
right_node,
})
}
}
pub(crate) fn collect_pairs<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
node: &TrieNode<K, V>,
prefix: Vec<u8>,
pairs: &mut Vec<(K, V)>,
) {
if let Some(value) = &node.value {
if let Some(key) = K::from_bytes(&prefix) {
pairs.push((key, value.clone()));
}
}
for (byte, child_opt) in node.children.iter().enumerate() {
if let Some(child) = child_opt {
let mut new_prefix = prefix.clone();
new_prefix.push(byte as u8);
if let Ok(guard) = child.read() {
if let NodeType::Trie(child_trie) = &*guard {
collect_pairs(child_trie, new_prefix, pairs);
}
}
}
}
}
pub(crate) fn convert_to_trie<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
btree_node: &BTreeNode<K, V>,
) -> NodeType<K, V> {
let mut root = TrieNode {
children: vec![None; 256],
value: None,
depth: 0,
size: AtomicUsize::new(0),
};
if btree_node.is_leaf {
for (key, value) in btree_node.keys.iter().zip(btree_node.values.iter()) {
let key_bytes = key.to_bytes();
insert_into_trie(&mut root, key, value.clone(), 0, &key_bytes);
}
} else {
let mut all_pairs: Vec<(K, V)> = Vec::new();
collect_leaf_pairs(btree_node, &mut all_pairs);
for (key, value) in all_pairs {
let key_bytes: Vec<u8> = key.to_bytes();
insert_into_trie(&mut root, &key, value, 0, &key_bytes);
}
}
NodeType::Trie(root)
}
pub(crate) fn convert_to_btree<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
trie_node: &TrieNode<K, V>,
) -> NodeType<K, V> {
let mut btree_node = BTreeNode {
keys: Vec::with_capacity(BTREE_MAX_KEYS),
values: Vec::with_capacity(BTREE_MAX_KEYS),
children: Vec::new(),
is_leaf: true,
};
let mut pairs: Vec<(K, V)> = Vec::new();
collect_pairs(trie_node, Vec::new(), &mut pairs);
pairs.sort_by(|a, b| a.0.cmp(&b.0));
for (key, value) in pairs {
btree_node.keys.push(key);
btree_node.values.push(value);
}
if btree_node.keys.len() > BTREE_MAX_KEYS {
let mut root: BTreeNode<K, V> = BTreeNode {
keys: Vec::new(),
values: Vec::new(),
children: Vec::new(),
is_leaf: false,
};
let split_info: SplitInfo<K, V> = split_btree_node(&mut btree_node).unwrap();
root.keys.push(split_info.median_key);
root.values.push(split_info.median_value);
root.children
.push(Arc::new(RwLock::new(NodeType::BTree(btree_node))));
root.children.push(split_info.right_node);
NodeType::BTree(root)
} else {
NodeType::BTree(btree_node)
}
}
pub fn merge_btree_nodes<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
node: &mut BTreeNode<K, V>,
path: &[(Arc<RwLock<NodeType<K, V>>>, usize)],
) {
if path.is_empty() {
return;
}
let (parent_arc, idx) = path.last().unwrap();
let mut parent = parent_arc.write().unwrap();
if let NodeType::BTree(parent_node) = &mut *parent {
if *idx > 0 {
let can_borrow_left = {
if let Ok(left_sibling) = parent_node.children[idx - 1].read() {
if let NodeType::BTree(left) = &*left_sibling {
left.keys.len() > BTREE_MAX_KEYS / 2
} else {
false
}
} else {
false
}
};
if can_borrow_left {
if let Ok(mut left_sibling) = parent_node.children[idx - 1].write() {
if let NodeType::BTree(left) = &mut *left_sibling {
node.keys.insert(0, parent_node.keys[idx - 1].clone());
node.values.insert(0, parent_node.values[idx - 1].clone());
parent_node.keys[idx - 1] = left.keys.pop().unwrap();
parent_node.values[idx - 1] = left.values.pop().unwrap();
return;
}
}
}
}
if *idx < parent_node.children.len() - 1 {
let can_borrow_right = {
if let Ok(right_sibling) = parent_node.children[idx + 1].read() {
if let NodeType::BTree(right) = &*right_sibling {
right.keys.len() > BTREE_MAX_KEYS / 2
} else {
false
}
} else {
false
}
};
if can_borrow_right {
if let Ok(mut right_sibling) = parent_node.children[idx + 1].write() {
if let NodeType::BTree(right) = &mut *right_sibling {
node.keys.push(parent_node.keys[*idx].clone());
node.values.push(parent_node.values[*idx].clone());
parent_node.keys[*idx] = right.keys.remove(0);
parent_node.values[*idx] = right.values.remove(0);
return;
}
}
}
}
if *idx > 0 {
let merged_node = {
if let Ok(left_sibling) = parent_node.children[idx - 1].read() {
if let NodeType::BTree(left) = &*left_sibling {
let mut merged = BTreeNode {
keys: left.keys.clone(),
values: left.values.clone(),
children: left.children.clone(),
is_leaf: left.is_leaf,
};
merged.keys.push(parent_node.keys[idx - 1].clone());
merged.values.push(parent_node.values[idx - 1].clone());
merged.keys.extend(node.keys.iter().cloned());
merged.values.extend(node.values.iter().cloned());
merged.children.extend(node.children.iter().cloned());
Some(merged)
} else {
None
}
} else {
None
}
};
if let Some(merged) = merged_node {
parent_node.children[idx - 1] = Arc::new(RwLock::new(NodeType::BTree(merged)));
parent_node.keys.remove(idx - 1);
parent_node.values.remove(idx - 1);
parent_node.children.remove(*idx);
}
} else if *idx < parent_node.children.len() - 1 {
let merged_node = {
if let Ok(right_sibling) = parent_node.children[idx + 1].read() {
if let NodeType::BTree(right) = &*right_sibling {
let mut merged = BTreeNode {
keys: node.keys.clone(),
values: node.values.clone(),
children: node.children.clone(),
is_leaf: node.is_leaf,
};
merged.keys.push(parent_node.keys[*idx].clone());
merged.values.push(parent_node.values[*idx].clone());
merged.keys.extend(right.keys.iter().cloned());
merged.values.extend(right.values.iter().cloned());
merged.children.extend(right.children.iter().cloned());
Some(merged)
} else {
None
}
} else {
None
}
};
if let Some(merged) = merged_node {
parent_node.children[*idx] = Arc::new(RwLock::new(NodeType::BTree(merged)));
parent_node.keys.remove(*idx);
parent_node.values.remove(*idx);
parent_node.children.remove(idx + 1);
}
}
}
}
pub fn cleanup_empty_trie_nodes<K: Clone + Ord + ToBytes + FromBytes, V: Clone>(
path: &[(Arc<RwLock<NodeType<K, V>>>, usize)],
) {
for (node_arc, idx) in path.iter().rev() {
let mut node = node_arc.write().unwrap();
if let NodeType::Trie(trie_node) = &mut *node {
if trie_node.children[*idx].as_ref().map_or(false, |child| {
if let Ok(guard) = child.read() {
match &*guard {
NodeType::Trie(child_trie) => {
child_trie.value.is_none()
&& child_trie.children.iter().all(|c| c.is_none())
}
_ => false,
}
} else {
false
}
}) {
trie_node.children[*idx] = None;
}
}
}
}