use std::cmp::Ordering;
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc;
const MAX_HEIGHT: usize = 16;
const P: u32 = 4;
const DEFAULT_HOT_CAPACITY: usize = 4096;
#[repr(C)]
struct Tower<K, V> {
height: usize,
next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
}
impl<K, V> Tower<K, V> {
fn new(height: usize) -> Self {
let mut next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT] =
std::array::from_fn(|_| AtomicPtr::new(ptr::null_mut()));
for ptr in next.iter_mut().take(height) {
*ptr = AtomicPtr::new(ptr::null_mut());
}
Self { height, next }
}
#[inline]
fn get(&self, level: usize) -> *mut SkipNode<K, V> {
self.next[level].load(AtomicOrdering::Acquire)
}
#[inline]
fn set(&self, level: usize, node: *mut SkipNode<K, V>) {
self.next[level].store(node, AtomicOrdering::Release);
}
#[inline]
fn cas(
&self,
level: usize,
expected: *mut SkipNode<K, V>,
new: *mut SkipNode<K, V>,
) -> Result<*mut SkipNode<K, V>, *mut SkipNode<K, V>> {
self.next[level]
.compare_exchange(expected, new, AtomicOrdering::AcqRel, AtomicOrdering::Acquire)
}
}
#[repr(C)]
struct SkipNode<K, V> {
key: K,
value: AtomicPtr<V>,
version: AtomicU64,
tower: Tower<K, V>,
}
impl<K, V> SkipNode<K, V> {
fn new(key: K, value: V, height: usize) -> *mut Self {
let value_ptr = Box::into_raw(Box::new(value));
let node = Box::new(Self {
key,
value: AtomicPtr::new(value_ptr),
version: AtomicU64::new(1),
tower: Tower::new(height),
});
Box::into_raw(node)
}
#[inline]
unsafe fn get_value(&self) -> &V {
unsafe { &*self.value.load(AtomicOrdering::Acquire) }
}
#[inline]
unsafe fn update_value(&self, new_value: V) -> V {
let new_ptr = Box::into_raw(Box::new(new_value));
let old_ptr = self.value.swap(new_ptr, AtomicOrdering::AcqRel);
self.version.fetch_add(1, AtomicOrdering::Release);
unsafe { *Box::from_raw(old_ptr) }
}
}
fn random_height() -> usize {
let mut height = 1;
let mut state = (&height as *const _ as u64)
.wrapping_mul(0x9E3779B97F4A7C15)
.wrapping_add(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0),
);
while height < MAX_HEIGHT {
state = state.wrapping_mul(1103515245).wrapping_add(12345);
if (state >> 17) % (P as u64) != 0 {
break;
}
height += 1;
}
height
}
pub struct StratifiedSkipList<K, V>
where
K: Ord + Clone,
V: Clone,
{
head: *mut SkipNode<K, V>,
max_height: AtomicUsize,
len: AtomicUsize,
capacity: usize,
promoter: Option<Arc<dyn Fn(Vec<(K, V)>) + Send + Sync>>,
}
impl<K, V> StratifiedSkipList<K, V>
where
K: Ord + Clone + Default,
V: Clone + Default,
{
pub fn new() -> Self {
Self::with_capacity(DEFAULT_HOT_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
Self {
head,
max_height: AtomicUsize::new(1),
len: AtomicUsize::new(0),
capacity,
promoter: None,
}
}
pub fn set_promoter<F>(&mut self, promoter: F)
where
F: Fn(Vec<(K, V)>) + Send + Sync + 'static,
{
self.promoter = Some(Arc::new(promoter));
}
pub fn insert(&self, key: K, value: V) -> Option<V> {
if self.len() >= self.capacity {
self.try_promote();
}
let height = random_height();
let mut prev = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
let mut next = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
loop {
self.find_position(&key, &mut prev, &mut next);
if !next[0].is_null() {
let existing = unsafe { &*next[0] };
if existing.key == key {
let old = unsafe { existing.update_value(value.clone()) };
return Some(old);
}
}
let new_node = SkipNode::new(key.clone(), value.clone(), height);
unsafe {
(*new_node).tower.set(0, next[0]);
}
let prev_ptr = if prev[0].is_null() { self.head } else { prev[0] };
match unsafe { (*prev_ptr).tower.cas(0, next[0], new_node) } {
Ok(_) => {
for level in 1..height {
loop {
unsafe {
(*new_node).tower.set(level, next[level]);
}
let prev_at_level = if prev[level].is_null() { self.head } else { prev[level] };
if unsafe { (*prev_at_level).tower.cas(level, next[level], new_node) }.is_ok() {
break;
}
self.find_position(&key, &mut prev, &mut next);
}
}
loop {
let current_max = self.max_height.load(AtomicOrdering::Relaxed);
if height <= current_max {
break;
}
if self.max_height
.compare_exchange_weak(current_max, height, AtomicOrdering::Release, AtomicOrdering::Relaxed)
.is_ok()
{
break;
}
}
self.len.fetch_add(1, AtomicOrdering::Release);
return None;
}
Err(_) => {
unsafe {
let value_ptr = (*new_node).value.load(AtomicOrdering::Relaxed);
drop(Box::from_raw(value_ptr));
drop(Box::from_raw(new_node));
}
continue;
}
}
}
}
fn find_position(
&self,
key: &K,
prev: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
next: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
) {
let max_height = self.max_height.load(AtomicOrdering::Acquire);
let mut current = self.head;
for level in (0..max_height).rev() {
loop {
let next_node = unsafe { (*current).tower.get(level) };
if next_node.is_null() {
break;
}
let next_key = unsafe { &(*next_node).key };
match next_key.cmp(key) {
Ordering::Less => {
current = next_node;
}
Ordering::Equal | Ordering::Greater => {
break;
}
}
}
prev[level] = if current == self.head { ptr::null_mut() } else { current };
next[level] = unsafe { (*current).tower.get(level) };
}
}
pub fn get(&self, key: &K) -> Option<V> {
let max_height = self.max_height.load(AtomicOrdering::Acquire);
let mut current = self.head;
for level in (0..max_height).rev() {
loop {
let next_node = unsafe { (*current).tower.get(level) };
if next_node.is_null() {
break;
}
let next_key = unsafe { &(*next_node).key };
match next_key.cmp(key) {
Ordering::Less => {
current = next_node;
}
Ordering::Equal => {
let value = unsafe { (*next_node).get_value().clone() };
return Some(value);
}
Ordering::Greater => {
break;
}
}
}
}
None
}
#[inline]
pub fn len(&self) -> usize {
self.len.load(AtomicOrdering::Acquire)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn try_promote(&self) {
if let Some(ref promoter) = self.promoter {
let entries = self.drain();
if !entries.is_empty() {
promoter(entries);
}
}
}
pub fn drain(&self) -> Vec<(K, V)> {
let mut entries = Vec::with_capacity(self.len());
let mut current = unsafe { (*self.head).tower.get(0) };
while !current.is_null() {
let node = unsafe { &*current };
let key = node.key.clone();
let value = unsafe { node.get_value().clone() };
entries.push((key, value));
current = node.tower.get(0);
}
self.len.store(0, AtomicOrdering::Release);
entries
}
pub fn iter(&self) -> impl Iterator<Item = (K, V)> + '_ {
SkipListIter {
current: unsafe { (*self.head).tower.get(0) },
_marker: std::marker::PhantomData,
}
}
}
impl<K, V> Default for StratifiedSkipList<K, V>
where
K: Ord + Clone + Default,
V: Clone + Default,
{
fn default() -> Self {
Self::new()
}
}
impl<K, V> Drop for StratifiedSkipList<K, V>
where
K: Ord + Clone,
V: Clone,
{
fn drop(&mut self) {
let mut current = unsafe { (*self.head).tower.get(0) };
while !current.is_null() {
let next = unsafe { (*current).tower.get(0) };
unsafe {
let value_ptr = (*current).value.load(AtomicOrdering::Relaxed);
drop(Box::from_raw(value_ptr));
drop(Box::from_raw(current));
}
current = next;
}
unsafe {
let value_ptr = (*self.head).value.load(AtomicOrdering::Relaxed);
drop(Box::from_raw(value_ptr));
drop(Box::from_raw(self.head));
}
}
}
unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Send for StratifiedSkipList<K, V> {}
unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Sync for StratifiedSkipList<K, V> {}
struct SkipListIter<'a, K, V> {
current: *mut SkipNode<K, V>,
_marker: std::marker::PhantomData<&'a ()>,
}
impl<'a, K: Clone, V: Clone> Iterator for SkipListIter<'a, K, V> {
type Item = (K, V);
fn next(&mut self) -> Option<Self::Item> {
if self.current.is_null() {
return None;
}
let node = unsafe { &*self.current };
let key = node.key.clone();
let value = unsafe { node.get_value().clone() };
self.current = node.tower.get(0);
Some((key, value))
}
}
#[derive(Debug, Clone, Default)]
pub struct PromotionStats {
pub promotion_count: u64,
pub entries_promoted: u64,
pub avg_batch_size: f64,
}
pub struct BatchPromoter<K, V>
where
K: Ord + Clone + Default,
V: Clone + Default,
{
hot_buffer: StratifiedSkipList<K, V>,
#[allow(dead_code)]
pending: std::sync::Mutex<Vec<Vec<(K, V)>>>,
stats: std::sync::Mutex<PromotionStats>,
_background: Option<std::thread::JoinHandle<()>>,
}
impl<K, V> BatchPromoter<K, V>
where
K: Ord + Clone + Default + Send + Sync + 'static,
V: Clone + Default + Send + Sync + 'static,
{
pub fn new(hot_capacity: usize) -> Arc<Self> {
let promoter = Arc::new(Self {
hot_buffer: StratifiedSkipList::with_capacity(hot_capacity),
pending: std::sync::Mutex::new(Vec::new()),
stats: std::sync::Mutex::new(PromotionStats::default()),
_background: None,
});
promoter
}
pub fn insert_hot(&self, key: K, value: V) -> Option<V> {
self.hot_buffer.insert(key, value)
}
pub fn get(&self, key: &K) -> Option<V> {
self.hot_buffer.get(key)
}
pub fn force_promote(&self) -> Vec<(K, V)> {
let entries = self.hot_buffer.drain();
if !entries.is_empty() {
let mut stats = self.stats.lock().unwrap();
stats.promotion_count += 1;
stats.entries_promoted += entries.len() as u64;
stats.avg_batch_size = stats.entries_promoted as f64 / stats.promotion_count as f64;
}
entries
}
pub fn stats(&self) -> PromotionStats {
self.stats.lock().unwrap().clone()
}
pub fn hot_size(&self) -> usize {
self.hot_buffer.len()
}
}
pub struct DeferredIndex<K, V, Cold>
where
K: Ord + Clone + Default,
V: Clone + Default,
{
hot: StratifiedSkipList<K, V>,
cold: Cold,
promotion_threshold: usize,
insert_count: AtomicUsize,
}
impl<K, V, Cold> DeferredIndex<K, V, Cold>
where
K: Ord + Clone + Default,
V: Clone + Default,
Cold: ColdStorage<K, V>,
{
pub fn new(cold: Cold, promotion_threshold: usize) -> Self {
Self {
hot: StratifiedSkipList::with_capacity(promotion_threshold),
cold,
promotion_threshold,
insert_count: AtomicUsize::new(0),
}
}
pub fn insert(&self, key: K, value: V) -> Option<V> {
let count = self.insert_count.fetch_add(1, AtomicOrdering::Relaxed);
if count >= self.promotion_threshold {
self.promote();
}
self.hot.insert(key, value)
}
pub fn get(&self, key: &K) -> Option<V> {
if let Some(value) = self.hot.get(key) {
return Some(value);
}
self.cold.get(key)
}
pub fn promote(&self) {
let entries = self.hot.drain();
if !entries.is_empty() {
self.cold.insert_batch(entries);
self.insert_count.store(0, AtomicOrdering::Release);
}
}
pub fn hot_size(&self) -> usize {
self.hot.len()
}
}
pub trait ColdStorage<K, V>: Send + Sync {
fn get(&self, key: &K) -> Option<V>;
fn insert_batch(&self, entries: Vec<(K, V)>);
}
pub struct HashMapCold<K, V> {
data: parking_lot::RwLock<std::collections::HashMap<K, V>>,
}
impl<K, V> HashMapCold<K, V>
where
K: Eq + std::hash::Hash + Clone,
{
pub fn new() -> Self {
Self {
data: parking_lot::RwLock::new(std::collections::HashMap::new()),
}
}
}
impl<K, V> Default for HashMapCold<K, V>
where
K: Eq + std::hash::Hash + Clone,
{
fn default() -> Self {
Self::new()
}
}
impl<K, V> ColdStorage<K, V> for HashMapCold<K, V>
where
K: Eq + std::hash::Hash + Clone + Send + Sync,
V: Clone + Send + Sync,
{
fn get(&self, key: &K) -> Option<V> {
self.data.read().get(key).cloned()
}
fn insert_batch(&self, entries: Vec<(K, V)>) {
let mut data = self.data.write();
for (k, v) in entries {
data.insert(k, v);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_skiplist_basic() {
let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
assert!(list.insert(1, "one".to_string()).is_none());
assert!(list.insert(2, "two".to_string()).is_none());
assert!(list.insert(3, "three".to_string()).is_none());
assert_eq!(list.len(), 3);
assert_eq!(list.get(&1), Some("one".to_string()));
assert_eq!(list.get(&2), Some("two".to_string()));
assert_eq!(list.get(&3), Some("three".to_string()));
assert_eq!(list.get(&4), None);
}
#[test]
fn test_skiplist_update() {
let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
assert!(list.insert(1, "one".to_string()).is_none());
assert_eq!(list.insert(1, "ONE".to_string()), Some("one".to_string()));
assert_eq!(list.len(), 1);
assert_eq!(list.get(&1), Some("ONE".to_string()));
}
#[test]
fn test_skiplist_concurrent() {
let list = Arc::new(StratifiedSkipList::<i32, i32>::with_capacity(100000));
let mut handles = vec![];
for t in 0..4 {
let list_clone = list.clone();
handles.push(thread::spawn(move || {
for i in 0..1000 {
let key = t * 1000 + i;
list_clone.insert(key, key * 2);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(list.len(), 4000);
assert_eq!(list.get(&0), Some(0));
assert_eq!(list.get(&1000), Some(2000));
assert_eq!(list.get(&2000), Some(4000));
}
#[test]
fn test_skiplist_drain() {
let list: StratifiedSkipList<i32, i32> = StratifiedSkipList::new();
for i in 0..100 {
list.insert(i, i * 2);
}
let entries = list.drain();
assert_eq!(entries.len(), 100);
for (i, (k, v)) in entries.iter().enumerate() {
assert_eq!(*k, i as i32);
assert_eq!(*v, (i * 2) as i32);
}
}
#[test]
fn test_batch_promoter() {
let promoter = BatchPromoter::<i32, i32>::new(100);
for i in 0..50 {
promoter.insert_hot(i, i * 2);
}
assert_eq!(promoter.hot_size(), 50);
assert_eq!(promoter.get(&10), Some(20));
let promoted = promoter.force_promote();
assert_eq!(promoted.len(), 50);
let stats = promoter.stats();
assert_eq!(stats.promotion_count, 1);
assert_eq!(stats.entries_promoted, 50);
}
#[test]
fn test_deferred_index() {
let cold = HashMapCold::<i32, i32>::new();
let index = DeferredIndex::new(cold, 10);
for i in 0..5 {
index.insert(i, i * 10);
}
assert_eq!(index.get(&3), Some(30));
for i in 5..15 {
index.insert(i, i * 10);
}
assert_eq!(index.get(&0), Some(0)); assert_eq!(index.get(&12), Some(120)); }
#[test]
fn test_hot_key_absorption() {
let list: StratifiedSkipList<String, i32> = StratifiedSkipList::new();
for _ in 0..100 {
list.insert("hot_key".to_string(), 42);
}
assert_eq!(list.len(), 1);
assert_eq!(list.get(&"hot_key".to_string()), Some(42));
}
}