use crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Shared};
use std::mem::MaybeUninit;
use std::ptr;
fn init_atomic_array_48() -> [Atomic<ArtNode>; 48] {
let mut arr: [MaybeUninit<Atomic<ArtNode>>; 48] = unsafe { MaybeUninit::uninit().assume_init() };
for elem in &mut arr {
elem.write(Atomic::null());
}
unsafe { std::mem::transmute(arr) }
}
fn init_atomic_array_256() -> [Atomic<ArtNode>; 256] {
let mut arr: [MaybeUninit<Atomic<ArtNode>>; 256] = unsafe { MaybeUninit::uninit().assume_init() };
for elem in &mut arr {
elem.write(Atomic::null());
}
unsafe { std::mem::transmute(arr) }
}
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
const MAX_PREFIX_LEN: usize = 10;
#[repr(C)]
#[derive(Debug)]
pub struct NodeHeader {
pub num_children: AtomicUsize,
pub prefix_len: usize,
pub prefix: [u8; MAX_PREFIX_LEN],
}
impl NodeHeader {
fn new() -> Self {
Self {
num_children: AtomicUsize::new(0),
prefix_len: 0,
prefix: [0; MAX_PREFIX_LEN],
}
}
fn with_prefix(prefix: &[u8]) -> Self {
let mut header = Self::new();
let len = prefix.len().min(MAX_PREFIX_LEN);
header.prefix[..len].copy_from_slice(&prefix[..len]);
header.prefix_len = prefix.len();
header
}
}
#[repr(C)]
pub struct Node4 {
pub header: NodeHeader,
pub keys: [u8; 4],
pub children: [Atomic<ArtNode>; 4],
}
#[repr(C)]
pub struct Node16 {
pub header: NodeHeader,
pub keys: [u8; 16],
pub children: [Atomic<ArtNode>; 16],
}
#[repr(C)]
pub struct Node48 {
pub header: NodeHeader,
pub child_index: [u8; 256],
pub children: [Atomic<ArtNode>; 48],
}
#[repr(C)]
pub struct Node256 {
pub header: NodeHeader,
pub children: [Atomic<ArtNode>; 256],
}
#[derive(Debug)]
pub struct LeafNode {
pub key: Vec<u8>,
pub value: Option<Vec<u8>>,
pub seqno: u64,
}
impl LeafNode {
pub fn new(key: Vec<u8>, value: Option<Vec<u8>>, seqno: u64) -> Self {
Self { key, value, seqno }
}
}
pub enum ArtNode {
Node4(Box<Node4>),
Node16(Box<Node16>),
Node48(Box<Node48>),
Node256(Box<Node256>),
Leaf(Box<LeafNode>),
}
impl std::fmt::Debug for ArtNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ArtNode::Node4(_) => write!(f, "Node4"),
ArtNode::Node16(_) => write!(f, "Node16"),
ArtNode::Node48(_) => write!(f, "Node48"),
ArtNode::Node256(_) => write!(f, "Node256"),
ArtNode::Leaf(l) => write!(f, "Leaf({:?})", l.key),
}
}
}
impl Node4 {
pub fn new() -> Self {
Self {
header: NodeHeader::new(),
keys: [0; 4],
children: Default::default(),
}
}
pub fn with_prefix(prefix: &[u8]) -> Self {
Self {
header: NodeHeader::with_prefix(prefix),
keys: [0; 4],
children: Default::default(),
}
}
pub fn find_child<'g>(&self, key: u8, guard: &'g Guard) -> Option<Shared<'g, ArtNode>> {
let n = self.header.num_children.load(Ordering::Acquire);
for i in 0..n {
if self.keys[i] == key {
let child = self.children[i].load(Ordering::Acquire, guard);
if !child.is_null() {
return Some(child);
}
}
}
None
}
pub fn add_child(&self, key: u8, child: Owned<ArtNode>, guard: &Guard) -> bool {
let n = self.header.num_children.load(Ordering::Acquire);
if n >= 4 {
return false; }
let mut pos = n;
for i in 0..n {
if self.keys[i] > key {
pos = i;
break;
}
}
unsafe {
let keys_ptr = self.keys.as_ptr() as *mut u8;
let children_ptr = self.children.as_ptr() as *mut Atomic<ArtNode>;
for i in (pos..n).rev() {
*keys_ptr.add(i + 1) = *keys_ptr.add(i);
(*children_ptr.add(i + 1)).store(
(*children_ptr.add(i)).load(Ordering::Relaxed, guard),
Ordering::Relaxed,
);
}
*keys_ptr.add(pos) = key;
(*children_ptr.add(pos)).store(child, Ordering::Release);
}
self.header.num_children.fetch_add(1, Ordering::Release);
true
}
pub fn is_full(&self) -> bool {
self.header.num_children.load(Ordering::Acquire) >= 4
}
}
impl Default for Node4 {
fn default() -> Self {
Self::new()
}
}
impl Node16 {
pub fn new() -> Self {
Self {
header: NodeHeader::new(),
keys: [0; 16],
children: Default::default(),
}
}
pub fn with_prefix(prefix: &[u8]) -> Self {
Self {
header: NodeHeader::with_prefix(prefix),
keys: [0; 16],
children: Default::default(),
}
}
pub fn find_child<'g>(&self, key: u8, guard: &'g Guard) -> Option<Shared<'g, ArtNode>> {
let n = self.header.num_children.load(Ordering::Acquire);
let mut lo = 0;
let mut hi = n;
while lo < hi {
let mid = lo + (hi - lo) / 2;
if self.keys[mid] < key {
lo = mid + 1;
} else {
hi = mid;
}
}
if lo < n && self.keys[lo] == key {
let child = self.children[lo].load(Ordering::Acquire, guard);
if !child.is_null() {
return Some(child);
}
}
None
}
pub fn add_child(&self, key: u8, child: Owned<ArtNode>, guard: &Guard) -> bool {
let n = self.header.num_children.load(Ordering::Acquire);
if n >= 16 {
return false;
}
let mut pos = n;
for i in 0..n {
if self.keys[i] > key {
pos = i;
break;
}
}
unsafe {
let keys_ptr = self.keys.as_ptr() as *mut u8;
let children_ptr = self.children.as_ptr() as *mut Atomic<ArtNode>;
for i in (pos..n).rev() {
*keys_ptr.add(i + 1) = *keys_ptr.add(i);
(*children_ptr.add(i + 1)).store(
(*children_ptr.add(i)).load(Ordering::Relaxed, guard),
Ordering::Relaxed,
);
}
*keys_ptr.add(pos) = key;
(*children_ptr.add(pos)).store(child, Ordering::Release);
}
self.header.num_children.fetch_add(1, Ordering::Release);
true
}
pub fn is_full(&self) -> bool {
self.header.num_children.load(Ordering::Acquire) >= 16
}
}
impl Default for Node16 {
fn default() -> Self {
Self::new()
}
}
impl Node48 {
pub fn new() -> Self {
Self {
header: NodeHeader::new(),
child_index: [0; 256],
children: init_atomic_array_48(),
}
}
pub fn with_prefix(prefix: &[u8]) -> Self {
Self {
header: NodeHeader::with_prefix(prefix),
child_index: [0; 256],
children: init_atomic_array_48(),
}
}
pub fn find_child<'g>(&self, key: u8, guard: &'g Guard) -> Option<Shared<'g, ArtNode>> {
let idx = self.child_index[key as usize];
if idx == 0 {
return None;
}
let child = self.children[(idx - 1) as usize].load(Ordering::Acquire, guard);
if !child.is_null() {
Some(child)
} else {
None
}
}
pub fn add_child(&self, key: u8, child: Owned<ArtNode>, guard: &Guard) -> bool {
let n = self.header.num_children.load(Ordering::Acquire);
if n >= 48 {
return false;
}
unsafe {
let index_ptr = self.child_index.as_ptr() as *mut u8;
let children_ptr = self.children.as_ptr() as *mut Atomic<ArtNode>;
*index_ptr.add(key as usize) = (n + 1) as u8;
(*children_ptr.add(n)).store(child, Ordering::Release);
}
self.header.num_children.fetch_add(1, Ordering::Release);
true
}
pub fn is_full(&self) -> bool {
self.header.num_children.load(Ordering::Acquire) >= 48
}
}
impl Default for Node48 {
fn default() -> Self {
Self::new()
}
}
impl Node256 {
pub fn new() -> Self {
Self {
header: NodeHeader::new(),
children: init_atomic_array_256(),
}
}
pub fn with_prefix(prefix: &[u8]) -> Self {
Self {
header: NodeHeader::with_prefix(prefix),
children: init_atomic_array_256(),
}
}
pub fn find_child<'g>(&self, key: u8, guard: &'g Guard) -> Option<Shared<'g, ArtNode>> {
let child = self.children[key as usize].load(Ordering::Acquire, guard);
if !child.is_null() {
Some(child)
} else {
None
}
}
pub fn add_child(&self, key: u8, child: Owned<ArtNode>, guard: &Guard) -> bool {
let old = self.children[key as usize].swap(child, Ordering::AcqRel, guard);
if old.is_null() {
self.header.num_children.fetch_add(1, Ordering::Release);
}
true
}
pub fn is_full(&self) -> bool {
false }
}
impl Default for Node256 {
fn default() -> Self {
Self::new()
}
}
pub struct ConcurrentART {
root: Atomic<ArtNode>,
size: AtomicU64,
memory_usage: AtomicU64,
}
impl ConcurrentART {
pub fn new() -> Self {
Self {
root: Atomic::null(),
size: AtomicU64::new(0),
memory_usage: AtomicU64::new(0),
}
}
pub fn get(&self, key: &[u8]) -> Option<(Option<Vec<u8>>, u64)> {
let guard = &epoch::pin();
let mut node = self.root.load(Ordering::Acquire, guard);
let mut depth = 0;
while !node.is_null() {
let node_ref = unsafe { node.deref() };
match node_ref {
ArtNode::Leaf(leaf) => {
if leaf.key == key {
return Some((leaf.value.clone(), leaf.seqno));
}
return None;
}
ArtNode::Node4(n) => {
if !self.check_prefix(&n.header, key, depth) {
return None;
}
depth += n.header.prefix_len;
if depth >= key.len() {
return None;
}
match n.find_child(key[depth], guard) {
Some(child) => {
node = child;
depth += 1;
}
None => return None,
}
}
ArtNode::Node16(n) => {
if !self.check_prefix(&n.header, key, depth) {
return None;
}
depth += n.header.prefix_len;
if depth >= key.len() {
return None;
}
match n.find_child(key[depth], guard) {
Some(child) => {
node = child;
depth += 1;
}
None => return None,
}
}
ArtNode::Node48(n) => {
if !self.check_prefix(&n.header, key, depth) {
return None;
}
depth += n.header.prefix_len;
if depth >= key.len() {
return None;
}
match n.find_child(key[depth], guard) {
Some(child) => {
node = child;
depth += 1;
}
None => return None,
}
}
ArtNode::Node256(n) => {
if !self.check_prefix(&n.header, key, depth) {
return None;
}
depth += n.header.prefix_len;
if depth >= key.len() {
return None;
}
match n.find_child(key[depth], guard) {
Some(child) => {
node = child;
depth += 1;
}
None => return None,
}
}
}
}
None
}
fn check_prefix(&self, header: &NodeHeader, key: &[u8], depth: usize) -> bool {
let prefix_len = header.prefix_len.min(MAX_PREFIX_LEN);
if depth + prefix_len > key.len() {
return false;
}
for i in 0..prefix_len {
if header.prefix[i] != key[depth + i] {
return false;
}
}
true
}
pub fn insert(&self, key: Vec<u8>, value: Option<Vec<u8>>, seqno: u64) -> Option<(Option<Vec<u8>>, u64)> {
let guard = &epoch::pin();
loop {
let root = self.root.load(Ordering::Acquire, guard);
if root.is_null() {
let leaf = Box::new(LeafNode::new(key.clone(), value.clone(), seqno));
let leaf_node = Owned::new(ArtNode::Leaf(leaf));
match self.root.compare_exchange(
Shared::null(),
leaf_node,
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
Ok(_) => {
self.size.fetch_add(1, Ordering::Relaxed);
self.memory_usage.fetch_add(
key.len() as u64 + std::mem::size_of::<LeafNode>() as u64,
Ordering::Relaxed,
);
return None;
}
Err(_e) => {
continue;
}
}
}
match self.insert_recursive(root, &key, value.clone(), seqno, 0, guard) {
InsertResult::Success(old) => return old,
InsertResult::Retry => continue,
}
}
}
fn insert_recursive<'g>(
&self,
node: Shared<'g, ArtNode>,
key: &[u8],
value: Option<Vec<u8>>,
seqno: u64,
depth: usize,
guard: &'g Guard,
) -> InsertResult {
if node.is_null() {
return InsertResult::Retry;
}
let node_ref = unsafe { node.deref() };
match node_ref {
ArtNode::Leaf(existing_leaf) => {
if existing_leaf.key == key {
return InsertResult::Success(Some((existing_leaf.value.clone(), existing_leaf.seqno)));
}
let existing_key = &existing_leaf.key;
let mut common_depth = depth;
while common_depth < key.len().min(existing_key.len())
&& key[common_depth] == existing_key[common_depth]
{
common_depth += 1;
}
let new_node = Node4::new();
let new_leaf = Box::new(LeafNode::new(key.to_vec(), value, seqno));
let new_leaf_node = Owned::new(ArtNode::Leaf(new_leaf));
if common_depth < key.len() {
let _ = new_node.add_child(key[common_depth], new_leaf_node, guard);
}
if common_depth < existing_key.len() {
let existing_leaf_clone = Box::new(LeafNode::new(
existing_key.clone(),
existing_leaf.value.clone(),
existing_leaf.seqno,
));
let existing_leaf_node = Owned::new(ArtNode::Leaf(existing_leaf_clone));
let _ = new_node.add_child(existing_key[common_depth], existing_leaf_node, guard);
}
self.size.fetch_add(1, Ordering::Relaxed);
InsertResult::Success(None)
}
ArtNode::Node4(n) => {
if depth < key.len() {
if let Some(child) = n.find_child(key[depth], guard) {
return self.insert_recursive(child, key, value, seqno, depth + 1, guard);
}
if !n.is_full() {
let leaf = Box::new(LeafNode::new(key.to_vec(), value, seqno));
let leaf_node = Owned::new(ArtNode::Leaf(leaf));
if n.add_child(key[depth], leaf_node, guard) {
self.size.fetch_add(1, Ordering::Relaxed);
return InsertResult::Success(None);
}
}
}
InsertResult::Retry
}
_ => InsertResult::Retry,
}
}
pub fn len(&self) -> u64 {
self.size.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn memory_usage(&self) -> u64 {
self.memory_usage.load(Ordering::Relaxed)
}
}
impl Default for ConcurrentART {
fn default() -> Self {
Self::new()
}
}
enum InsertResult {
Success(Option<(Option<Vec<u8>>, u64)>),
Retry,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_art_empty() {
let art = ConcurrentART::new();
assert!(art.is_empty());
assert_eq!(art.get(b"key"), None);
}
#[test]
fn test_art_insert_get() {
let art = ConcurrentART::new();
let old = art.insert(b"hello".to_vec(), Some(b"world".to_vec()), 1);
assert!(old.is_none());
assert_eq!(art.len(), 1);
let result = art.get(b"hello");
assert!(result.is_some());
let (value, seqno) = result.unwrap();
assert_eq!(value, Some(b"world".to_vec()));
assert_eq!(seqno, 1);
}
#[test]
fn test_art_multiple_keys() {
let art = ConcurrentART::new();
art.insert(b"key1".to_vec(), Some(b"value1".to_vec()), 1);
art.insert(b"key2".to_vec(), Some(b"value2".to_vec()), 2);
art.insert(b"key3".to_vec(), Some(b"value3".to_vec()), 3);
assert_eq!(art.len(), 3);
}
#[test]
fn test_node4_operations() {
let node = Node4::new();
let guard = &epoch::pin();
assert!(!node.is_full());
assert_eq!(node.find_child(b'a', guard), None);
}
}