use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
pub(crate) const MAX_HEIGHT: usize = 20;
pub(crate) const NODE_HEADER_SIZE: usize = 32;
pub(crate) const TOMBSTONE_BIT: u8 = 0x01;
pub(crate) const FLAGS_OFFSET: usize = 16;
pub(crate) const NODE_SEQ_OFFSET: usize = 24;
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) struct TowerPtr(u64);
impl TowerPtr {
pub const NULL: Self = Self(0);
#[inline]
pub fn new(ptr: *const u8) -> Self {
Self(ptr as usize as u64)
}
#[inline]
pub fn from_raw(raw: u64) -> Self {
Self(raw)
}
#[inline]
pub fn raw(self) -> u64 {
self.0
}
#[inline]
pub fn ptr(self) -> *const u8 {
(self.0 & 0x0000_FFFF_FFFF_FFFF) as usize as *const u8
}
#[inline]
pub fn is_null(self) -> bool {
self.0 == 0
}
}
#[inline]
pub(crate) fn node_alloc_size(height: usize, key_len: usize, value_len: usize) -> usize {
NODE_HEADER_SIZE + height * 8 + key_len + value_len
}
#[allow(dead_code)]
#[inline(always)]
pub(crate) fn node_alloc_size_expected(key_len: usize, value_len: usize) -> usize {
NODE_HEADER_SIZE + 16 + key_len + value_len
}
#[inline(always)]
pub(crate) unsafe fn init_node(
ptr: *mut u8,
height: usize,
key: &[u8],
value: &[u8],
is_tombstone: bool,
seq: u64,
) {
let key_len = key.len() as u32;
let value_len = value.len() as u32;
let h = height as u8;
let key_offset = (NODE_HEADER_SIZE + height * 8) as u32;
let value_offset = key_offset + key_len;
let flags: u8 = if is_tombstone { TOMBSTONE_BIT } else { 0 };
let header0 = (key_len as u64) << 32 | key_offset as u64;
let header1 = (value_len as u64) << 32 | value_offset as u64;
let header2 = (h as u64) << 8 | flags as u64;
ptr.cast::<u64>().write(header0);
ptr.add(8).cast::<u64>().write(header1);
ptr.add(16).cast::<u64>().write(header2);
ptr.add(NODE_SEQ_OFFSET).cast::<u64>().write(seq);
std::ptr::write_bytes(ptr.add(NODE_HEADER_SIZE).cast::<u64>(), 0, height);
std::ptr::copy_nonoverlapping(key.as_ptr(), ptr.add(key_offset as usize), key.len());
std::ptr::copy_nonoverlapping(value.as_ptr(), ptr.add(value_offset as usize), value.len());
}
#[inline]
pub(crate) unsafe fn node_key(ptr: *const u8) -> &'static [u8] {
let packed = ptr.cast::<u64>().read();
let key_offset = packed as u32 as usize;
let key_len = (packed >> 32) as usize;
std::slice::from_raw_parts(ptr.add(key_offset), key_len)
}
#[inline]
pub(crate) unsafe fn node_value(ptr: *const u8) -> &'static [u8] {
let packed = ptr.add(8).cast::<u64>().read();
let value_offset = packed as u32 as usize;
let value_len = (packed >> 32) as usize;
std::slice::from_raw_parts(ptr.add(value_offset), value_len)
}
#[inline]
pub(crate) unsafe fn is_tombstone(ptr: *const u8) -> bool {
let atomic = &*ptr.add(FLAGS_OFFSET).cast::<AtomicU8>();
(atomic.load(Ordering::Acquire) & TOMBSTONE_BIT) != 0
}
#[inline]
pub(crate) unsafe fn set_tombstone(ptr: *const u8) -> bool {
let atomic = &*ptr.add(FLAGS_OFFSET).cast::<AtomicU8>();
let mut current = atomic.load(Ordering::Acquire);
loop {
if current & TOMBSTONE_BIT != 0 {
return false;
}
match atomic.compare_exchange_weak(
current,
current | TOMBSTONE_BIT,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => return true,
Err(new) => current = new,
}
}
}
#[inline]
#[allow(dead_code)]
pub(crate) unsafe fn node_height(ptr: *const u8) -> usize {
ptr.add(17).read() as usize
}
#[inline]
pub(crate) unsafe fn node_seq(ptr: *const u8) -> u64 {
let atomic = &*ptr.add(NODE_SEQ_OFFSET).cast::<AtomicU64>();
atomic.load(Ordering::Relaxed)
}
#[inline]
pub(crate) unsafe fn tower_load(node: *const u8, level: usize) -> TowerPtr {
let offset = NODE_HEADER_SIZE + level * 8;
let atomic = node.add(offset).cast::<AtomicU64>();
TowerPtr::from_raw((*atomic).load(Ordering::Acquire))
}
#[inline]
pub(crate) unsafe fn tower_store(node: *mut u8, level: usize, val: TowerPtr) {
let offset = NODE_HEADER_SIZE + level * 8;
let atomic = node.add(offset).cast::<AtomicU64>();
(*atomic).store(val.raw(), Ordering::Relaxed);
}
#[inline]
pub(crate) unsafe fn tower_cas(
node: *const u8,
level: usize,
current: TowerPtr,
new: TowerPtr,
) -> Result<TowerPtr, TowerPtr> {
let offset = NODE_HEADER_SIZE + level * 8;
let atomic = node.add(offset).cast::<AtomicU64>();
match (*atomic).compare_exchange_weak(
current.raw(),
new.raw(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(v) => Ok(TowerPtr::from_raw(v)),
Err(v) => Err(TowerPtr::from_raw(v)),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn alloc_node(height: usize, key: &[u8], value: &[u8]) -> (*mut u8, std::alloc::Layout) {
let size = node_alloc_size(height, key.len(), value.len());
let layout = std::alloc::Layout::from_size_align(size, 8).unwrap();
let ptr = unsafe { std::alloc::alloc(layout) };
assert!(!ptr.is_null());
(ptr, layout)
}
#[test]
fn test_node_alloc_size() {
assert_eq!(
node_alloc_size(4, 10, 20),
NODE_HEADER_SIZE + 4 * 8 + 10 + 20
);
}
#[test]
fn test_init_and_read_roundtrip() {
let key = b"hello";
let value = b"world";
let (ptr, layout) = alloc_node(3, key, value);
unsafe {
init_node(ptr, 3, key, value, false, 42);
assert_eq!(node_key(ptr), key);
assert_eq!(node_value(ptr), value);
assert!(!is_tombstone(ptr));
assert_eq!(node_seq(ptr), 42);
for i in 0..3 {
assert!(tower_load(ptr, i).is_null());
}
std::alloc::dealloc(ptr, layout);
}
}
#[test]
fn test_tombstone_flag() {
let (ptr, layout) = alloc_node(1, b"key", b"val");
unsafe {
init_node(ptr, 1, b"key", b"val", false, 1);
assert!(!is_tombstone(ptr));
assert!(set_tombstone(ptr));
assert!(is_tombstone(ptr));
assert!(!set_tombstone(ptr));
std::alloc::dealloc(ptr, layout);
}
}
#[test]
#[cfg(not(miri))] fn test_tower_cas() {
let (ptr, layout) = alloc_node(4, b"", b"");
unsafe {
init_node(ptr, 4, b"", b"", false, 1);
let null = TowerPtr::NULL;
let node_ptr = TowerPtr::new(ptr);
assert_eq!(tower_cas(ptr, 0, null, node_ptr), Ok(null));
assert_eq!(tower_load(ptr, 0), node_ptr);
assert!(tower_cas(ptr, 0, null, node_ptr).is_err());
std::alloc::dealloc(ptr, layout);
}
}
#[test]
fn test_tower_ptr_basic() {
let dummy = Box::into_raw(Box::new(42u8));
let p = TowerPtr::new(dummy);
assert!(!p.is_null());
assert_eq!(p.ptr(), dummy);
assert!(TowerPtr::NULL.is_null());
assert_eq!(TowerPtr::NULL.ptr(), std::ptr::null());
unsafe { drop(Box::from_raw(dummy)) };
}
#[test]
fn test_tower_store_atomic() {
let (ptr, layout) = alloc_node(2, b"", b"");
unsafe {
init_node(ptr, 2, b"", b"", false, 1);
let dummy = Box::into_raw(Box::new(99u8));
let target = TowerPtr::new(dummy);
tower_store(ptr, 0, target);
assert_eq!(tower_load(ptr, 0), target);
let offset = NODE_HEADER_SIZE;
let atomic = ptr.add(offset).cast::<AtomicU64>();
assert_eq!((*atomic).load(Ordering::Acquire), target.raw());
drop(Box::from_raw(dummy));
std::alloc::dealloc(ptr, layout);
}
}
}