use std::sync::atomic::Ordering as AtomicOrdering;
use seize::LocalGuard;
use std::ptr as StdPtr;
use crate::Permuter;
use crate::leaf15::LeafNode15;
use crate::{
Linker, MassTreeGeneric, TreeAllocator, key::Key, leaf_trait::TreeLeafNode,
nodeversion::LockGuard, policy::LeafPolicy,
};
use super::{FindSlotResult, InsertSearchResultGeneric};
const MAX_BATCH_RETIRES: usize = crate::leaf15::WIDTH_15 + 1;
struct RetireBuf {
ptrs: [*mut u8; MAX_BATCH_RETIRES],
count: usize,
}
impl RetireBuf {
#[inline]
const fn new() -> Self {
Self {
ptrs: [StdPtr::null_mut(); MAX_BATCH_RETIRES],
count: 0,
}
}
#[inline]
fn push(&mut self, ptr: *mut u8) {
debug_assert!(self.count < self.ptrs.len());
self.ptrs[self.count] = ptr;
self.count += 1;
}
#[inline]
fn as_slice(&self) -> &[*mut u8] {
&self.ptrs[..self.count]
}
}
#[must_use]
#[expect(
missing_debug_implementations,
reason = "Debug on P::Output may not be available"
)]
pub struct BatchEntry<P: LeafPolicy> {
pub key: Vec<u8>,
pub output: P::Output,
ikey: u64,
}
impl<P: LeafPolicy> BatchEntry<P> {
#[inline]
pub fn new(key: Vec<u8>, value: P::Value) -> Self {
let ikey: u64 = Self::compute_ikey(&key);
let output: P::Output = P::into_output(value);
Self { key, output, ikey }
}
#[inline(always)]
pub fn from_output(key: Vec<u8>, output: P::Output) -> Self {
let ikey: u64 = Self::compute_ikey(&key);
Self { key, output, ikey }
}
#[inline(always)]
#[expect(
clippy::indexing_slicing,
reason = "len is bounded by min(key.len(), 8), so slicing is safe"
)]
fn compute_ikey(key: &[u8]) -> u64 {
let mut buf: [u8; 8] = [0u8; 8];
let len: usize = key.len().min(8);
buf[..len].copy_from_slice(&key[..len]);
u64::from_be_bytes(buf)
}
#[inline(always)]
pub const fn ikey(&self) -> u64 {
self.ikey
}
#[inline(always)]
pub const fn has_suffix(&self) -> bool {
self.key.len() > 8
}
}
#[must_use]
struct BatchValueEntry<P: LeafPolicy> {
key: Vec<u8>,
value: Option<P::Value>,
ikey: u64,
}
impl<P: LeafPolicy> BatchValueEntry<P> {
#[inline]
fn new(key: Vec<u8>, value: P::Value) -> Self {
let ikey: u64 = BatchEntry::<P>::compute_ikey(&key);
Self {
key,
value: Some(value),
ikey,
}
}
#[inline(always)]
const fn ikey(&self) -> u64 {
self.ikey
}
#[inline(always)]
fn value_ref(&self) -> &P::Value {
debug_assert!(self.value.is_some(), "batch value already consumed");
unsafe { self.value.as_ref().unwrap_unchecked() }
}
#[inline(always)]
fn take_value(&mut self) -> P::Value {
debug_assert!(self.value.is_some(), "batch value already consumed");
unsafe { self.value.take().unwrap_unchecked() }
}
}
#[derive(Debug, Clone)]
#[must_use]
pub struct BatchInsertResult<O> {
pub inserted: usize,
pub updated: usize,
pub old_values: Vec<O>,
pub failed: usize,
}
impl<O> Default for BatchInsertResult<O> {
fn default() -> Self {
Self {
inserted: 0,
updated: 0,
old_values: Vec::new(),
failed: 0,
}
}
}
impl<O> BatchInsertResult<O> {
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline(always)]
pub fn with_capacity(capacity: usize) -> Self {
Self {
inserted: 0,
updated: 0,
old_values: Vec::with_capacity(capacity),
failed: 0,
}
}
#[inline(always)]
pub const fn record_insert(&mut self) {
self.inserted += 1;
}
#[inline]
pub fn record_update(&mut self, old_value: O) {
self.updated += 1;
self.old_values.push(old_value);
}
#[inline(always)]
pub const fn record_failure(&mut self) {
self.failed += 1;
}
#[must_use]
#[inline(always)]
pub const fn total(&self) -> usize {
self.inserted + self.updated + self.failed
}
#[must_use]
#[inline(always)]
pub const fn all_succeeded(&self) -> bool {
self.failed == 0
}
}
enum BatchEntryResult<O> {
Inserted(*mut u8),
Updated(O),
NeedsSplit,
NeedsLayerDescent,
Retry,
}
trait BatchOps<P: LeafPolicy, A: TreeAllocator<P>> {
type OldValue;
fn len(&self) -> usize;
fn entry_ikey(&self, index: usize) -> u64;
fn entry_key_bytes(&self, index: usize) -> &[u8];
fn can_reuse_empty_at(
&self,
index: usize,
tree: &MassTreeGeneric<P, A>,
leaf: &LeafNode15<P>,
) -> bool;
fn insert_empty_leaf_at(
&mut self,
index: usize,
tree: &MassTreeGeneric<P, A>,
leaf: &LeafNode15<P>,
lock: &mut LockGuard<'_>,
guard: &LocalGuard<'_>,
) -> *mut u8;
#[expect(
clippy::too_many_arguments,
reason = "Batch entry insertion requires full context"
)]
fn try_insert_at(
&mut self,
index: usize,
tree: &MassTreeGeneric<P, A>,
leaf: &LeafNode15<P>,
lock: &mut LockGuard<'_>,
perm: &mut Permuter,
guard: &LocalGuard<'_>,
pre_allocated: Option<Vec<u8>>,
) -> BatchEntryResult<Self::OldValue>;
fn fallback_insert_at(
&mut self,
index: usize,
tree: &MassTreeGeneric<P, A>,
result: &mut BatchInsertResult<Self::OldValue>,
guard: &LocalGuard<'_>,
);
}
struct GenericBatch<'a, P: LeafPolicy>(&'a [BatchEntry<P>]);
impl<P: LeafPolicy, A: TreeAllocator<P>> BatchOps<P, A> for GenericBatch<'_, P> {
type OldValue = P::Output;
#[inline(always)]
fn len(&self) -> usize {
self.0.len()
}
#[inline(always)]
fn entry_ikey(&self, index: usize) -> u64 {
self.0[index].ikey()
}
#[inline(always)]
fn entry_key_bytes(&self, index: usize) -> &[u8] {
&self.0[index].key
}
#[inline]
fn can_reuse_empty_at(
&self,
index: usize,
tree: &MassTreeGeneric<P, A>,
leaf: &LeafNode15<P>,
) -> bool {
let key: Key<'_> = Key::new(&self.0[index].key);
tree.can_reuse_empty_leaf(leaf, &key)
}
#[inline]
fn insert_empty_leaf_at(
&mut self,
index: usize,
tree: &MassTreeGeneric<P, A>,
leaf: &LeafNode15<P>,
lock: &mut LockGuard<'_>,
guard: &LocalGuard<'_>,
) -> *mut u8 {
let entry: &BatchEntry<P> = &self.0[index];
let key: Key<'_> = Key::new(&entry.key);
tree.insert_into_empty_leaf_batch(leaf, lock, &key, &entry.output, guard)
}
#[inline]
fn try_insert_at(
&mut self,
index: usize,
tree: &MassTreeGeneric<P, A>,
leaf: &LeafNode15<P>,
lock: &mut LockGuard<'_>,
perm: &mut Permuter,
guard: &LocalGuard<'_>,
pre_allocated: Option<Vec<u8>>,
) -> BatchEntryResult<P::Output> {
let entry: &BatchEntry<P> = &self.0[index];
let key: Key<'_> = Key::new(&entry.key);
let single_layer_mode: bool = !key.has_suffix();
let search_result: InsertSearchResultGeneric = if single_layer_mode {
tree.search_for_insert_single_layer(leaf, &key, perm)
} else {
tree.search_for_insert_generic(leaf, &key, perm)
};
match search_result {
InsertSearchResultGeneric::Found { slot } => {
if leaf.is_value_empty(slot) {
return BatchEntryResult::Retry;
}
let old_value: P::Output =
tree.update_existing_value(leaf, lock, slot, &entry.output, guard);
BatchEntryResult::Updated(old_value)
}
InsertSearchResultGeneric::NotFound { logical_pos } => {
let ikey: u64 = key.ikey();
match tree.find_usable_slot(leaf, perm, ikey) {
FindSlotResult::Found { slot, back_offset } => {
let deferred_retire: *mut u8 = tree.insert_new_value(
leaf,
lock,
slot,
back_offset,
logical_pos,
*perm,
&key,
&entry.output,
guard,
pre_allocated,
);
tree.count.increment();
*perm = leaf.permutation();
BatchEntryResult::Inserted(deferred_retire)
}
FindSlotResult::NeedsSplit => BatchEntryResult::NeedsSplit,
}
}
InsertSearchResultGeneric::Layer { .. }
| InsertSearchResultGeneric::Conflict { .. } => {
if single_layer_mode {
BatchEntryResult::Retry
} else {
BatchEntryResult::NeedsLayerDescent
}
}
}
}
fn fallback_insert_at(
&mut self,
index: usize,
tree: &MassTreeGeneric<P, A>,
result: &mut BatchInsertResult<P::Output>,
guard: &LocalGuard<'_>,
) {
let entry: &BatchEntry<P> = &self.0[index];
let mut key: Key<'_> = Key::new(&entry.key);
match tree.insert_concurrent_generic(&mut key, entry.output.clone(), guard) {
Ok(old) => {
if let Some(old_value) = old {
result.record_update(old_value);
} else {
result.record_insert();
}
}
Err(e) => {
panic!("Batch insert failed unexpectedly: {e:?}. This indicates a bug.");
}
}
}
}
struct ValueBatch<'a, P: LeafPolicy>(&'a mut [BatchValueEntry<P>]);
impl<P: LeafPolicy, A: TreeAllocator<P>> BatchOps<P, A> for ValueBatch<'_, P> {
type OldValue = P::Value;
#[inline(always)]
fn len(&self) -> usize {
self.0.len()
}
#[inline(always)]
fn entry_ikey(&self, index: usize) -> u64 {
self.0[index].ikey()
}
#[inline(always)]
fn entry_key_bytes(&self, index: usize) -> &[u8] {
&self.0[index].key
}
#[inline]
fn can_reuse_empty_at(
&self,
index: usize,
tree: &MassTreeGeneric<P, A>,
leaf: &LeafNode15<P>,
) -> bool {
let key: Key<'_> = Key::new(&self.0[index].key);
tree.can_reuse_empty_leaf(leaf, &key)
}
#[inline]
fn insert_empty_leaf_at(
&mut self,
index: usize,
tree: &MassTreeGeneric<P, A>,
leaf: &LeafNode15<P>,
lock: &mut LockGuard<'_>,
guard: &LocalGuard<'_>,
) -> *mut u8 {
let value: P::Value = self.0[index].take_value();
let key: Key<'_> = Key::new(&self.0[index].key);
let output: P::Output = P::into_output(value);
tree.insert_into_empty_leaf_batch(leaf, lock, &key, &output, guard)
}
#[inline]
fn try_insert_at(
&mut self,
index: usize,
tree: &MassTreeGeneric<P, A>,
leaf: &LeafNode15<P>,
lock: &mut LockGuard<'_>,
perm: &mut Permuter,
guard: &LocalGuard<'_>,
pre_allocated: Option<Vec<u8>>,
) -> BatchEntryResult<P::Value> {
let key: Key<'_> = Key::new(&self.0[index].key);
let single_layer_mode: bool = !key.has_suffix();
let search_result: InsertSearchResultGeneric = if single_layer_mode {
tree.search_for_insert_single_layer(leaf, &key, perm)
} else {
tree.search_for_insert_generic(leaf, &key, perm)
};
match search_result {
InsertSearchResultGeneric::Found { slot } => {
if leaf.is_value_empty(slot) {
return BatchEntryResult::Retry;
}
let old_value: P::Value = tree.update_existing_value_write_through(
leaf,
lock,
slot,
self.0[index].value_ref(),
);
BatchEntryResult::Updated(old_value)
}
InsertSearchResultGeneric::NotFound { logical_pos } => {
let ikey: u64 = key.ikey();
match tree.find_usable_slot(leaf, perm, ikey) {
FindSlotResult::Found { slot, back_offset } => {
let output: P::Output = P::into_output(self.0[index].take_value());
let insert_key: Key<'_> = Key::new(&self.0[index].key);
let deferred_retire: *mut u8 = tree.insert_new_value(
leaf,
lock,
slot,
back_offset,
logical_pos,
*perm,
&insert_key,
&output,
guard,
pre_allocated,
);
tree.count.increment();
*perm = leaf.permutation();
BatchEntryResult::Inserted(deferred_retire)
}
FindSlotResult::NeedsSplit => BatchEntryResult::NeedsSplit,
}
}
InsertSearchResultGeneric::Layer { .. }
| InsertSearchResultGeneric::Conflict { .. } => {
if single_layer_mode {
BatchEntryResult::Retry
} else {
BatchEntryResult::NeedsLayerDescent
}
}
}
}
fn fallback_insert_at(
&mut self,
index: usize,
tree: &MassTreeGeneric<P, A>,
result: &mut BatchInsertResult<P::Value>,
guard: &LocalGuard<'_>,
) {
let entry: &mut BatchValueEntry<P> = &mut self.0[index];
let value: P::Value = entry.take_value();
let mut key: Key<'_> = Key::new(&entry.key);
match tree.insert_concurrent_value(&mut key, value, guard) {
Ok(old) => {
if let Some(old_value) = old {
result.record_update(old_value);
} else {
result.record_insert();
}
}
Err(e) => {
panic!("Batch insert failed unexpectedly: {e:?}. This indicates a bug.");
}
}
}
}
impl<P, A> MassTreeGeneric<P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
pub fn insert_batch<I>(&self, entries: I) -> BatchInsertResult<P::Value>
where
I: IntoIterator<Item = (Vec<u8>, P::Value)>,
P::Value: Clone,
{
let guard: LocalGuard<'_> = self.guard();
if P::CAN_WRITE_THROUGH {
self.insert_batch_values_with_guard(entries, &guard)
} else {
let result: BatchInsertResult<P::Output> =
self.insert_batch_with_guard(entries, &guard);
BatchInsertResult {
inserted: result.inserted,
updated: result.updated,
old_values: result
.old_values
.iter()
.map(|o: &P::Output| P::clone_value_from_output(o))
.collect(),
failed: result.failed,
}
}
}
pub fn insert_batch_with_guard<I>(
&self,
entries: I,
guard: &LocalGuard<'_>,
) -> BatchInsertResult<P::Output>
where
I: IntoIterator<Item = (Vec<u8>, P::Value)>,
{
self.verify_guard(guard);
if P::CAN_WRITE_THROUGH {
let value_result: BatchInsertResult<P::Value> =
self.insert_batch_values_with_guard(entries, guard);
return BatchInsertResult {
inserted: value_result.inserted,
updated: value_result.updated,
old_values: value_result
.old_values
.into_iter()
.map(P::into_output)
.collect(),
failed: value_result.failed,
};
}
let mut batch: Vec<BatchEntry<P>> = entries
.into_iter()
.map(|(key, value): (Vec<u8>, P::Value)| BatchEntry::new(key, value))
.collect();
if batch.is_empty() {
return BatchInsertResult::new();
}
batch.sort_unstable_by_key(BatchEntry::ikey);
self.process_sorted_batch(&batch, guard)
}
fn insert_batch_values_with_guard<I>(
&self,
entries: I,
guard: &LocalGuard<'_>,
) -> BatchInsertResult<P::Value>
where
I: IntoIterator<Item = (Vec<u8>, P::Value)>,
{
let mut batch: Vec<BatchValueEntry<P>> = entries
.into_iter()
.map(|(key, value): (Vec<u8>, P::Value)| BatchValueEntry::new(key, value))
.collect();
if batch.is_empty() {
return BatchInsertResult::new();
}
batch.sort_unstable_by_key(BatchValueEntry::ikey);
self.process_sorted_value_batch(&mut batch, guard)
}
pub fn insert_batch_entries(
&self,
mut entries: Vec<BatchEntry<P>>,
guard: &LocalGuard<'_>,
) -> BatchInsertResult<P::Output> {
self.verify_guard(guard);
if entries.is_empty() {
return BatchInsertResult::new();
}
entries.sort_unstable_by_key(BatchEntry::ikey);
self.process_sorted_batch(&entries, guard)
}
fn process_sorted_batch(
&self,
batch: &[BatchEntry<P>],
guard: &LocalGuard<'_>,
) -> BatchInsertResult<P::Output> {
let mut ops: GenericBatch<'_, P> = GenericBatch(batch);
self.process_sorted_batch_inner(&mut ops, guard)
}
fn process_sorted_value_batch(
&self,
batch: &mut [BatchValueEntry<P>],
guard: &LocalGuard<'_>,
) -> BatchInsertResult<P::Value> {
let mut ops: ValueBatch<'_, P> = ValueBatch(batch);
self.process_sorted_batch_inner(&mut ops, guard)
}
fn process_sorted_batch_inner<B: BatchOps<P, A>>(
&self,
batch: &mut B,
guard: &LocalGuard<'_>,
) -> BatchInsertResult<B::OldValue> {
let mut result: BatchInsertResult<B::OldValue> =
BatchInsertResult::with_capacity(batch.len() / 4);
let mut index: usize = 0;
while index < batch.len() {
let key: Key<'_> = Key::new(batch.entry_key_bytes(index));
let mut layer_root: *const u8 = self.root_ptr.load(AtomicOrdering::Acquire);
let entries_processed: usize = 'retry: loop {
layer_root = self.maybe_parent_generic(layer_root);
let mut leaf_ptr: *mut LeafNode15<P> =
self.reach_leaf_concurrent_generic(layer_root, &key, false, guard);
let (advanced_ptr, exceeded_hop_limit) =
self.advance_to_key_by_bound_generic(leaf_ptr, &key, guard);
if exceeded_hop_limit {
layer_root = self.root_ptr.load(AtomicOrdering::Acquire);
continue 'retry;
}
leaf_ptr = advanced_ptr;
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let pre_lock_version: u32 = leaf.version().stable();
let pre_lock_perm_raw: u64 = leaf.permutation_raw();
let pre_lock_perm: Permuter = leaf.permutation();
let pre_allocated_vec: Option<Vec<u8>> = if key.has_suffix() {
Self::maybe_pre_allocate_suffix(&key, pre_lock_perm.size())
} else {
None
};
let mut lock = leaf.version().lock_bounded();
if !self.validate_post_lock(leaf, pre_lock_version, pre_lock_perm_raw) {
drop(lock);
continue 'retry;
}
if leaf.deleted_layer() {
drop(lock);
layer_root = self.root_ptr.load(AtomicOrdering::Acquire);
continue 'retry;
}
if self.validate_membership(leaf, &key).is_err() {
drop(lock);
continue 'retry;
}
let mut retire_buf = RetireBuf::new();
let processed: usize = self.insert_batch_into_locked_leaf_inner(
leaf,
&mut lock,
batch,
index,
&mut result,
&mut retire_buf,
guard,
pre_allocated_vec,
);
drop(lock);
for &ptr in retire_buf.as_slice() {
unsafe {
LeafNode15::<P>::retire_suffix_bag_ptr(ptr, guard);
}
}
if processed == 0 {
break 'retry 0;
}
break 'retry processed;
};
if entries_processed == 0 {
batch.fallback_insert_at(index, self, &mut result, guard);
index += 1;
} else {
index += entries_processed;
}
}
result
}
#[expect(
clippy::too_many_arguments,
reason = "Batch insertion requires context"
)]
fn insert_batch_into_locked_leaf_inner<B: BatchOps<P, A>>(
&self,
leaf: &LeafNode15<P>,
lock: &mut LockGuard<'_>,
batch: &mut B,
start_index: usize,
result: &mut BatchInsertResult<B::OldValue>,
retire_buf: &mut RetireBuf,
guard: &LocalGuard<'_>,
pre_allocated: Option<Vec<u8>>,
) -> usize {
let mut processed: usize = 0;
let mut perm: Permuter = leaf.permutation();
let mut pre_allocated_vec: Option<Vec<u8>> = pre_allocated;
let next_raw: *mut LeafNode15<P> = unsafe { leaf.next_raw_unguarded() };
let next_ptr: *mut LeafNode15<P> = Linker::unmark_ptr(next_raw);
let upper_bound: Option<u64> = if next_ptr.is_null() {
None
} else {
Some(unsafe { (*next_ptr).ikey_bound() })
};
if leaf.is_empty()
&& start_index < batch.len()
&& batch.can_reuse_empty_at(start_index, self, leaf)
{
let deferred: *mut u8 =
batch.insert_empty_leaf_at(start_index, self, leaf, lock, guard);
if !deferred.is_null() {
retire_buf.push(deferred);
}
result.record_insert();
processed = 1;
perm = leaf.permutation();
}
while start_index + processed < batch.len() {
let entry_index: usize = start_index + processed;
if let Some(bound) = upper_bound
&& batch.entry_ikey(entry_index) >= bound
{
break;
}
if perm.size() >= LeafNode15::<P>::WIDTH {
break;
}
let insert_result = batch.try_insert_at(
entry_index,
self,
leaf,
lock,
&mut perm,
guard,
pre_allocated_vec.take(),
);
match insert_result {
BatchEntryResult::Inserted(deferred) => {
if !deferred.is_null() {
retire_buf.push(deferred);
}
result.record_insert();
processed += 1;
}
BatchEntryResult::Updated(old_value) => {
result.record_update(old_value);
processed += 1;
}
BatchEntryResult::NeedsSplit
| BatchEntryResult::NeedsLayerDescent
| BatchEntryResult::Retry => {
break;
}
}
}
processed
}
fn insert_into_empty_leaf_batch(
&self,
leaf: &LeafNode15<P>,
lock: &mut LockGuard<'_>,
key: &Key<'_>,
value: &P::Output,
guard: &LocalGuard<'_>,
) -> *mut u8 {
leaf.clear_empty_state();
let slot: usize = 0;
let deferred_retire: *mut u8 =
self.assign_slot_generic(leaf, lock, slot, key, value, guard, None);
let new_perm: Permuter = <LeafNode15<P> as TreeLeafNode<P>>::Perm::make_sorted(1);
leaf.set_permutation_relaxed(new_perm);
self.count.increment();
deferred_retire
}
}