#![allow(unsafe_code)]
use crossbeam_epoch::{self as epoch, Guard, Shared};
use crossbeam_utils::Backoff;
use crate::config::Config;
use crate::key::Key;
use crate::model::LinearModel;
use crate::node::{is_child, Node, SLOT_DATA, SLOT_TOMBSTONE, SLOT_WRITING};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InsertResult {
Inserted,
Updated,
}
#[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
pub fn insert<K: Key, V: Clone + Send + Sync>(
node: &Node<K, V>,
key: K,
value: &V,
config: &Config,
guard: &Guard,
) -> InsertResult {
let mut retry_was_new = false;
'retry: loop {
let backoff = Backoff::new();
let mut current_node = node;
let mut depth: usize = 0;
let mut rebuild_candidate: Option<(&Node<K, V>, usize)> = None;
#[allow(clippy::type_complexity)]
let mut descent_snapshot: Option<(&Node<K, V>, usize, Shared<'_, Node<K, V>>)> = None;
loop {
let slot_idx = current_node.predict_slot(&key);
let state = current_node.slot_state(slot_idx);
if state == SLOT_WRITING {
backoff.snooze();
continue;
}
if state == crate::node::SLOT_EMPTY {
if current_node.cas_empty_to_data(slot_idx, key.clone(), value.clone()) {
current_node.inc_keys();
if let Some((parent, pidx, expected)) = descent_snapshot {
if parent.load_child(pidx, guard) != expected {
retry_was_new = true;
continue 'retry;
}
}
if config.auto_rebuild && depth > config.rebuild_depth_threshold {
if let Some((parent, idx)) = rebuild_candidate {
crate::rebuild::try_rebuild_subtree(parent, idx, config, guard);
}
}
return InsertResult::Inserted;
}
continue;
}
if state == SLOT_TOMBSTONE {
let child = build_single_entry_child(&key, value.clone());
if current_node.cas_tombstone_to_child_stale(slot_idx, child, guard) {
current_node.dec_tombstones();
if let Some((parent, pidx, expected)) = descent_snapshot {
if parent.load_child(pidx, guard) != expected {
retry_was_new = true;
continue 'retry;
}
}
if config.auto_rebuild && depth > config.rebuild_depth_threshold {
if let Some((parent, idx)) = rebuild_candidate {
crate::rebuild::try_rebuild_subtree(parent, idx, config, guard);
}
}
return InsertResult::Inserted;
}
continue;
}
if state == SLOT_DATA {
let existing_key = unsafe { current_node.read_key(slot_idx) };
if existing_key == &key {
let child = build_single_entry_child(&key, value.clone());
if current_node.cas_data_to_child_stale(slot_idx, child, guard) {
if let Some((parent, pidx, expected)) = descent_snapshot {
if parent.load_child(pidx, guard) != expected {
continue 'retry;
}
}
return if retry_was_new {
InsertResult::Inserted
} else {
InsertResult::Updated
};
}
continue;
}
let ek = existing_key.clone();
let ev = unsafe { current_node.read_value(slot_idx) }.clone();
let child = build_conflict_node(ek, ev, key.clone(), value.clone(), config);
if current_node.cas_data_to_child_stale(slot_idx, child, guard) {
current_node.dec_keys(); if let Some((parent, pidx, expected)) = descent_snapshot {
if parent.load_child(pidx, guard) != expected {
retry_was_new = true;
continue 'retry;
}
}
if config.auto_rebuild && depth > config.rebuild_depth_threshold {
if let Some((parent, idx)) = rebuild_candidate {
crate::rebuild::try_rebuild_subtree(parent, idx, config, guard);
}
}
return InsertResult::Inserted;
}
continue;
}
if is_child(state) {
let child_shared = current_node.load_child(slot_idx, guard);
if child_shared.tag() != 0 {
backoff.snooze();
continue;
}
if child_shared.is_null() {
continue;
}
depth += 1;
if rebuild_candidate.is_none() {
rebuild_candidate = Some((current_node, slot_idx));
}
if descent_snapshot.is_none() {
descent_snapshot = Some((current_node, slot_idx, child_shared));
}
current_node = unsafe { child_shared.deref() };
continue;
}
backoff.snooze();
}
} }
#[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
pub fn get_or_insert<'g, K: Key, V: Clone + Send + Sync>(
node: &'g Node<K, V>,
key: K,
value: &V,
config: &Config,
guard: &'g Guard,
) -> (&'g V, InsertResult) {
let mut retry_was_new = false;
'retry: loop {
let backoff = Backoff::new();
let mut current_node: &'g Node<K, V> = node;
let mut depth: usize = 0;
let mut rebuild_candidate: Option<(&'g Node<K, V>, usize)> = None;
#[allow(clippy::type_complexity)]
let mut descent_snapshot: Option<(&'g Node<K, V>, usize, Shared<'g, Node<K, V>>)> = None;
loop {
let slot_idx = current_node.predict_slot(&key);
let state = current_node.slot_state(slot_idx);
if state == SLOT_WRITING {
backoff.snooze();
continue;
}
if state == crate::node::SLOT_EMPTY {
if current_node.cas_empty_to_data(slot_idx, key.clone(), value.clone()) {
current_node.inc_keys();
if let Some((parent, pidx, expected)) = descent_snapshot {
if parent.load_child(pidx, guard) != expected {
retry_was_new = true;
continue 'retry;
}
}
if config.auto_rebuild && depth > config.rebuild_depth_threshold {
if let Some((parent, idx)) = rebuild_candidate {
crate::rebuild::try_rebuild_subtree(parent, idx, config, guard);
}
}
let val = unsafe { current_node.read_value(slot_idx) };
return (val, InsertResult::Inserted);
}
continue;
}
if state == SLOT_TOMBSTONE {
let child = build_single_entry_child(&key, value.clone());
if current_node.cas_tombstone_to_child_stale(slot_idx, child, guard) {
current_node.dec_tombstones();
if let Some((parent, pidx, expected)) = descent_snapshot {
if parent.load_child(pidx, guard) != expected {
retry_was_new = true;
continue 'retry;
}
}
if config.auto_rebuild && depth > config.rebuild_depth_threshold {
if let Some((parent, idx)) = rebuild_candidate {
crate::rebuild::try_rebuild_subtree(parent, idx, config, guard);
}
}
let child_shared = current_node.load_child(slot_idx, guard);
if !child_shared.is_null() {
let child_node = unsafe { child_shared.deref() };
if let Some(val) = crate::lookup::get(child_node, &key, guard) {
return (val, InsertResult::Inserted);
}
}
}
continue;
}
if state == SLOT_DATA {
let existing_key = unsafe { current_node.read_key(slot_idx) };
if existing_key == &key {
let existing_value = unsafe { current_node.read_value(slot_idx) };
let result = if retry_was_new {
InsertResult::Inserted
} else {
InsertResult::Updated
};
return (existing_value, result);
}
let ek = existing_key.clone();
let ev = unsafe { current_node.read_value(slot_idx) }.clone();
let child = build_conflict_node(ek, ev, key.clone(), value.clone(), config);
if current_node.cas_data_to_child_stale(slot_idx, child, guard) {
current_node.dec_keys(); if let Some((parent, pidx, expected)) = descent_snapshot {
if parent.load_child(pidx, guard) != expected {
retry_was_new = true;
continue 'retry;
}
}
if config.auto_rebuild && depth > config.rebuild_depth_threshold {
if let Some((parent, idx)) = rebuild_candidate {
crate::rebuild::try_rebuild_subtree(parent, idx, config, guard);
}
}
let child_shared = current_node.load_child(slot_idx, guard);
if !child_shared.is_null() {
let child_node = unsafe { child_shared.deref() };
if let Some(val) = crate::lookup::get(child_node, &key, guard) {
return (val, InsertResult::Inserted);
}
}
}
continue;
}
if is_child(state) {
let child_shared = current_node.load_child(slot_idx, guard);
if child_shared.tag() != 0 {
backoff.snooze();
continue;
}
if child_shared.is_null() {
continue;
}
depth += 1;
if rebuild_candidate.is_none() {
rebuild_candidate = Some((current_node, slot_idx));
}
if descent_snapshot.is_none() {
descent_snapshot = Some((current_node, slot_idx, child_shared));
}
current_node = unsafe { child_shared.deref() };
continue;
}
backoff.snooze();
}
} }
fn build_single_entry_child<K: Key, V: Clone + Send + Sync>(key: &K, value: V) -> Node<K, V> {
let f = key.to_model_input();
let node = Node::with_capacity(LinearModel::new(1.0, -f), 2);
let slot = node.predict_slot(key);
node.store_data(slot, key.clone(), value);
node.inc_keys();
node
}
fn build_conflict_node<K: Key, V: Clone + Send + Sync>(
k1: K,
v1: V,
k2: K,
v2: V,
config: &Config,
) -> Node<K, V> {
let (lo_k, lo_v, hi_k, hi_v) = if k1 < k2 {
(k1, v1, k2, v2)
} else {
(k2, v2, k1, v1)
};
let lo_f = lo_k.to_model_input();
let hi_f = hi_k.to_model_input();
let key_range = hi_f - lo_f;
let array_size = 4;
let node = if key_range.abs() < f64::EPSILON {
let lo_ord = lo_k.to_exact_ordinal();
let hi_ord = hi_k.to_exact_ordinal();
if lo_ord == hi_ord {
Node::with_split_key(lo_k.clone(), array_size)
} else {
let midpoint = lo_ord + (hi_ord - lo_ord) / 2;
Node::with_capacity(LinearModel::binary_split(midpoint), array_size)
}
} else {
let s = (array_size - 1) as f64 / key_range;
Node::with_capacity(LinearModel::new(s, -s * lo_f), array_size)
};
let s1 = node.predict_slot(&lo_k);
let s2 = node.predict_slot(&hi_k);
node.store_data(s1, lo_k, lo_v);
node.inc_keys();
if s1 == s2 {
unsafe {
let guard = epoch::unprotected();
insert(&node, hi_k, &hi_v, config, guard);
}
} else {
node.store_data(s2, hi_k, hi_v);
node.inc_keys();
}
node
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Config;
use crossbeam_epoch as epoch;
fn guard() -> epoch::Guard {
epoch::pin()
}
fn cfg() -> Config {
Config::default()
}
fn empty_root() -> Node<u64, u64> {
let model = LinearModel::new(0.1, 0.0);
Node::with_capacity(model, 100)
}
#[test]
fn insert_into_empty_slot() {
let g = guard();
let node = empty_root();
let result = insert(&node, 50, &500, &cfg(), &g);
assert_eq!(result, InsertResult::Inserted);
assert_eq!(node.total_keys(&g), 1);
}
#[test]
fn insert_duplicate_returns_updated() {
let g = guard();
let node = empty_root();
insert(&node, 50, &500, &cfg(), &g);
let result = insert(&node, 50, &5000, &cfg(), &g);
assert_eq!(result, InsertResult::Updated);
assert_eq!(node.total_keys(&g), 1);
}
#[test]
fn insert_conflict_creates_child() {
let g = guard();
let pairs: Vec<(u64, &str)> = vec![(10, "a"), (20, "b")];
let node = crate::build::bulk_load(&pairs, &Config::default()).unwrap();
let initial_keys = node.total_keys(&g);
insert(&node, 15, &"c", &cfg(), &g);
assert_eq!(node.total_keys(&g), initial_keys + 1);
assert_eq!(crate::lookup::get(&node, &10, &g), Some(&"a"));
assert_eq!(crate::lookup::get(&node, &15, &g), Some(&"c"));
assert_eq!(crate::lookup::get(&node, &20, &g), Some(&"b"));
}
#[test]
fn insert_many_sequential() {
let g = guard();
let c = cfg();
let node = empty_root();
for i in 0..100u64 {
insert(&node, i, &i, &c, &g);
}
assert_eq!(node.total_keys(&g), 100);
for i in 0..100u64 {
assert_eq!(
crate::lookup::get(&node, &i, &g),
Some(&i),
"key {i} not found after sequential insert"
);
}
}
#[test]
fn insert_reverse_order() {
let g = guard();
let c = cfg();
let node = empty_root();
for i in (0..50u64).rev() {
insert(&node, i, &(i * 10), &c, &g);
}
assert_eq!(node.total_keys(&g), 50);
for i in 0..50u64 {
assert_eq!(crate::lookup::get(&node, &i, &g), Some(&(i * 10)));
}
}
#[test]
fn insert_update_preserves_count() {
let g = guard();
let c = cfg();
let node = empty_root();
insert(&node, 1, &10, &c, &g);
insert(&node, 2, &20, &c, &g);
insert(&node, 3, &30, &c, &g);
assert_eq!(node.total_keys(&g), 3);
insert(&node, 2, &200, &c, &g);
assert_eq!(node.total_keys(&g), 3);
assert_eq!(crate::lookup::get(&node, &2, &g), Some(&200));
}
#[test]
fn insert_into_bulk_loaded_tree() {
let g = guard();
let c = cfg();
let pairs: Vec<(u64, u64)> = (0..100).map(|i| (i * 2, i)).collect();
let node = crate::build::bulk_load(&pairs, &Config::default()).unwrap();
for i in 0..100u64 {
insert(&node, i * 2 + 1, &(i + 1000), &c, &g);
}
assert_eq!(node.total_keys(&g), 200);
for i in 0..200u64 {
assert!(
crate::lookup::get(&node, &i, &g).is_some(),
"key {i} not found after mixed bulk_load + insert"
);
}
}
#[test]
fn conflict_node_is_small() {
let g = guard();
let node = build_conflict_node(10u64, "a", 20u64, "b", &cfg());
assert_eq!(node.capacity(), 4);
assert_eq!(node.total_keys(&g), 2);
}
#[test]
fn conflict_node_both_findable() {
let g = guard();
let node = build_conflict_node(100u64, 1, 200u64, 2, &cfg());
assert_eq!(crate::lookup::get(&node, &100, &g), Some(&1));
assert_eq!(crate::lookup::get(&node, &200, &g), Some(&2));
}
#[test]
fn insert_update_returns_correct_result() {
let g = guard();
let c = cfg();
let node = empty_root();
assert_eq!(insert(&node, 1, &10, &c, &g), InsertResult::Inserted);
assert_eq!(insert(&node, 1, &20, &c, &g), InsertResult::Updated);
assert_eq!(insert(&node, 2, &30, &c, &g), InsertResult::Inserted);
}
#[test]
fn insert_value_is_updated() {
let g = guard();
let c = cfg();
let node = empty_root();
insert(&node, 42, &100, &c, &g);
assert_eq!(crate::lookup::get(&node, &42, &g), Some(&100));
insert(&node, 42, &999, &c, &g);
assert_eq!(crate::lookup::get(&node, &42, &g), Some(&999));
}
#[test]
fn conflict_node_same_f64_keys() {
let g = guard();
let base: u64 = 1_700_000_000_000_000_000;
#[allow(clippy::float_cmp)]
{
assert_eq!(base as f64, (base + 1) as f64, "precondition: same f64");
}
let node = build_conflict_node(base, "a", base + 1, "b", &cfg());
assert_eq!(node.total_keys(&g), 2);
assert_eq!(crate::lookup::get(&node, &base, &g), Some(&"a"));
assert_eq!(crate::lookup::get(&node, &(base + 1), &g), Some(&"b"));
}
#[test]
fn conflict_node_many_same_f64_keys() {
let g = guard();
let c = cfg();
let base: u64 = 1_700_000_000_000_000_000;
let node = empty_root();
for i in 0..20u64 {
insert(&node, base + i, &(base + i), &c, &g);
}
assert_eq!(node.total_keys(&g), 20);
for i in 0..20u64 {
assert_eq!(
crate::lookup::get(&node, &(base + i), &g),
Some(&(base + i)),
"key base+{i} not found"
);
}
}
#[test]
fn get_or_insert_empty_slot() {
let g = guard();
let node = empty_root();
let (val, result) = get_or_insert(&node, 50, &500, &cfg(), &g);
assert_eq!(result, InsertResult::Inserted);
assert_eq!(*val, 500);
assert_eq!(node.total_keys(&g), 1);
}
#[test]
fn get_or_insert_existing_no_update() {
let g = guard();
let c = cfg();
let node = empty_root();
insert(&node, 50, &500, &c, &g);
let (val, result) = get_or_insert(&node, 50, &999, &c, &g);
assert_eq!(result, InsertResult::Updated);
assert_eq!(*val, 500); assert_eq!(node.total_keys(&g), 1);
}
#[test]
fn get_or_insert_conflict_creates_child() {
let g = guard();
let c = cfg();
let pairs: Vec<(u64, &str)> = vec![(10, "a"), (20, "b")];
let node = crate::build::bulk_load(&pairs, &Config::default()).unwrap();
let initial_keys = node.total_keys(&g);
let (val, result) = get_or_insert(&node, 15, &"c", &c, &g);
assert_eq!(result, InsertResult::Inserted);
assert_eq!(*val, "c");
assert_eq!(node.total_keys(&g), initial_keys + 1);
assert_eq!(crate::lookup::get(&node, &10, &g), Some(&"a"));
assert_eq!(crate::lookup::get(&node, &15, &g), Some(&"c"));
assert_eq!(crate::lookup::get(&node, &20, &g), Some(&"b"));
}
#[test]
fn get_or_insert_idempotent() {
let g = guard();
let c = cfg();
let node = empty_root();
let (v1, r1) = get_or_insert(&node, 42, &100, &c, &g);
let (v2, r2) = get_or_insert(&node, 42, &999, &c, &g);
assert_eq!(r1, InsertResult::Inserted);
assert_eq!(r2, InsertResult::Updated);
assert_eq!(*v1, 100);
assert_eq!(*v2, 100); assert_eq!(node.total_keys(&g), 1);
}
#[test]
fn get_or_insert_many_sequential() {
let g = guard();
let c = cfg();
let node = empty_root();
for i in 0..100u64 {
let (val, result) = get_or_insert(&node, i, &i, &c, &g);
assert_eq!(result, InsertResult::Inserted);
assert_eq!(*val, i);
}
assert_eq!(node.total_keys(&g), 100);
for i in 0..100u64 {
let (val, result) = get_or_insert(&node, i, &(i + 1000), &c, &g);
assert_eq!(result, InsertResult::Updated);
assert_eq!(*val, i); }
}
#[test]
fn insert_triggers_localized_rebuild() {
let g = guard();
let config = Config::new().rebuild_depth_threshold(4);
let node = empty_root();
for i in 0..50u64 {
insert(&node, i, &(i * 10), &config, &g);
}
let depth = node.max_depth(&g);
assert!(
depth <= 10,
"depth {depth} too high with rebuild_depth_threshold=4"
);
for i in 0..50u64 {
assert_eq!(
crate::lookup::get(&node, &i, &g),
Some(&(i * 10)),
"key {i} missing after localized rebuild"
);
}
}
}