use crate::value::UserValue;
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicU32, Ordering};
const SEGMENT_SHIFT: u32 = 16;
const SEGMENT_SIZE: usize = 1 << SEGMENT_SHIFT;
#[expect(
clippy::cast_possible_truncation,
reason = "SEGMENT_SIZE = 65536, fits in u32"
)]
const SEGMENT_MASK: u32 = SEGMENT_SIZE as u32 - 1;
const MAX_SEGMENTS: usize = 1 << (32 - SEGMENT_SHIFT);
pub struct ValueStore {
segments: Box<[AtomicPtr<UserValue>]>,
next_idx: AtomicU32,
}
impl ValueStore {
pub fn new() -> Self {
let mut segments = Vec::with_capacity(MAX_SEGMENTS);
for _ in 0..MAX_SEGMENTS {
segments.push(AtomicPtr::new(ptr::null_mut()));
}
Self {
segments: segments.into_boxed_slice(),
next_idx: AtomicU32::new(0),
}
}
#[expect(
clippy::indexing_slicing,
reason = "seg_idx < MAX_SEGMENTS enforced by u32 index range"
)]
pub fn append(&self, value: &UserValue) -> u32 {
#[expect(
clippy::expect_used,
reason = "a memtable with 4 billion entries would exhaust memory long before this"
)]
let idx = self
.next_idx
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
current.checked_add(1)
})
.expect("ValueStore::append: exceeded u32::MAX entries");
let seg_idx = (idx >> SEGMENT_SHIFT) as usize;
let slot = (idx & SEGMENT_MASK) as usize;
self.ensure_segment(seg_idx);
unsafe {
let seg_ptr = self.segments[seg_idx].load(Ordering::Acquire);
debug_assert!(!seg_ptr.is_null());
ptr::write(seg_ptr.add(slot), value.clone());
}
idx
}
#[expect(
clippy::indexing_slicing,
reason = "seg_idx < MAX_SEGMENTS enforced by u32 index range"
)]
pub unsafe fn get(&self, idx: u32) -> UserValue {
let seg_idx = (idx >> SEGMENT_SHIFT) as usize;
let slot = (idx & SEGMENT_MASK) as usize;
unsafe {
let seg_ptr = self.segments[seg_idx].load(Ordering::Acquire);
debug_assert!(!seg_ptr.is_null());
(*seg_ptr.add(slot)).clone()
}
}
#[expect(
clippy::indexing_slicing,
reason = "seg_idx < MAX_SEGMENTS enforced by caller"
)]
fn ensure_segment(&self, seg_idx: usize) {
if self.segments[seg_idx].load(Ordering::Acquire).is_null() {
#[expect(
clippy::expect_used,
reason = "Layout::array with compile-time-known size cannot fail"
)]
let layout =
std::alloc::Layout::array::<UserValue>(SEGMENT_SIZE).expect("segment layout");
#[expect(
clippy::cast_ptr_alignment,
reason = "Layout::array ensures correct alignment"
)]
let raw = unsafe { std::alloc::alloc_zeroed(layout) }.cast::<UserValue>();
if raw.is_null() {
std::alloc::handle_alloc_error(layout);
}
if self.segments[seg_idx]
.compare_exchange(ptr::null_mut(), raw, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
unsafe {
std::alloc::dealloc(raw.cast::<u8>(), layout);
}
}
}
}
}
impl Default for ValueStore {
fn default() -> Self {
Self::new()
}
}
impl Drop for ValueStore {
#[expect(
clippy::cast_possible_truncation,
reason = "seg_idx < MAX_SEGMENTS (65536), fits in u32"
)]
fn drop(&mut self) {
let total = self.next_idx.load(Ordering::Relaxed);
if total == 0 {
return;
}
let max_seg_idx = ((total - 1) >> SEGMENT_SHIFT) as usize + 1;
for seg_idx in 0..max_seg_idx {
#[expect(
clippy::indexing_slicing,
reason = "seg_idx < max_seg_idx <= MAX_SEGMENTS"
)]
let seg_ptr = self.segments[seg_idx].load(Ordering::Relaxed);
if seg_ptr.is_null() {
continue;
}
let seg_start = (seg_idx as u32) << SEGMENT_SHIFT;
let seg_end = seg_start.saturating_add(SEGMENT_SIZE as u32).min(total);
if seg_start < total {
let count = (seg_end - seg_start) as usize;
for i in 0..count {
unsafe {
ptr::drop_in_place(seg_ptr.add(i));
}
}
}
#[expect(
clippy::expect_used,
reason = "Layout::array with compile-time-known size cannot fail"
)]
let layout =
std::alloc::Layout::array::<UserValue>(SEGMENT_SIZE).expect("segment layout");
unsafe {
std::alloc::dealloc(seg_ptr.cast::<u8>(), layout);
}
}
}
}
#[cfg(test)]
#[allow(
clippy::expect_used,
clippy::unwrap_used,
clippy::cast_possible_truncation,
reason = "tests use expect/unwrap and narrow casts for brevity"
)]
mod tests {
use super::*;
fn val(s: &[u8]) -> UserValue {
UserValue::from(s)
}
#[test]
fn append_and_get() {
let store = ValueStore::new();
let i0 = store.append(&val(b"hello"));
let i1 = store.append(&val(b"world"));
assert_eq!(&*unsafe { store.get(i0) }, b"hello");
assert_eq!(&*unsafe { store.get(i1) }, b"world");
}
#[test]
fn empty_value() {
let store = ValueStore::new();
let i = store.append(&val(b""));
assert!(unsafe { store.get(i) }.is_empty());
}
#[test]
fn crosses_segment_boundary() {
let store = ValueStore::new();
for i in 0..=SEGMENT_SIZE {
store.append(&val(format!("v{i}").as_bytes()));
}
let last_idx = u32::try_from(SEGMENT_SIZE).unwrap();
assert_eq!(
&*unsafe { store.get(last_idx) },
format!("v{SEGMENT_SIZE}").as_bytes()
);
}
#[test]
fn concurrent_append_and_read() {
use std::sync::Arc;
let store = Arc::new(ValueStore::new());
let n_threads = 8usize;
let n_per_thread = 1000usize;
let all: Vec<(u32, String)> = (0..n_threads)
.map(|t| {
let store = Arc::clone(&store);
std::thread::spawn(move || {
let mut indices = Vec::with_capacity(n_per_thread);
for i in 0..n_per_thread {
let v = format!("t{t}_v{i}");
indices.push((store.append(&val(v.as_bytes())), v));
}
indices
})
})
.flat_map(|h| h.join().expect("thread ok"))
.collect();
for (idx, expected) in &all {
assert_eq!(&*unsafe { store.get(*idx) }, expected.as_bytes());
}
assert_eq!(all.len(), n_threads * n_per_thread);
}
}