use std::sync::atomic::fence;
use std::sync::atomic::{AtomicUsize, Ordering};
use fastarena::Arena;
use crate::height::random_height;
use crate::node::*;
use crate::util::{compare_keys, prefetch_read};
pub(crate) struct SkipList {
pub(crate) head: *const u8,
pub(crate) height: AtomicUsize,
pub(crate) next_seq: AtomicUsize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum InsertResult {
Success,
Duplicate,
Oom,
}
impl SkipList {
pub(crate) unsafe fn new(head: *const u8) -> Self {
SkipList {
head,
height: AtomicUsize::new(1),
next_seq: AtomicUsize::new(1), }
}
#[inline]
pub(crate) fn get(&self, key: &[u8]) -> Option<(&[u8], bool)> {
let mut x = self.head;
let h = self.height.load(Ordering::Relaxed);
let mut level = if h > 0 { h - 1 } else { 0 };
loop {
let next = unsafe { tower_load(x, level) };
if next.is_null() {
if level == 0 {
return None;
}
level -= 1;
continue;
}
let next_node = next.ptr();
let next_key = unsafe { node_key(next_node) };
match compare_keys(next_key, key) {
std::cmp::Ordering::Less => {
x = next_node;
}
std::cmp::Ordering::Equal => {
if level == 0 {
let val = unsafe { node_value(next_node) };
let tomb = unsafe { is_tombstone(next_node) };
return Some((val, tomb));
}
level -= 1;
}
std::cmp::Ordering::Greater => {
if level == 0 {
return None;
}
level -= 1;
}
}
}
}
#[inline]
pub(crate) fn insert(
&self,
key: &[u8],
value: &[u8],
arena: &mut Arena,
) -> (InsertResult, usize) {
self.insert_inner(key, value, arena, false)
}
pub(crate) fn insert_inner(
&self,
key: &[u8],
value: &[u8],
arena: &mut Arena,
is_tombstone: bool,
) -> (InsertResult, usize) {
let h = random_height();
loop {
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed) as u64;
let (preds, succs) = self.find_less(key);
let succ0 = succs[0];
if !succ0.is_null() {
let succ_key = unsafe { node_key(succ0.ptr()) };
if compare_keys(succ_key, key) == std::cmp::Ordering::Equal {
return (InsertResult::Duplicate, 0);
}
}
let size = node_alloc_size(h, key.len(), value.len());
let node_ptr = match arena.try_alloc_raw(size, 8) {
Some(ptr) => ptr.as_ptr(),
None => return (InsertResult::Oom, 0),
};
unsafe {
init_node(node_ptr, h, key, value, is_tombstone, seq);
for (i, succ) in succs.iter().enumerate().take(h) {
tower_store(node_ptr, i, *succ);
}
}
fence(Ordering::Release);
let pred0 = preds[0];
let new_ptr = TowerPtr::new(node_ptr);
let cas_result = unsafe { tower_cas(pred0.ptr(), 0, succ0, new_ptr) };
match cas_result {
Ok(_) => {
for (i, pred) in preds.iter().enumerate().take(h).skip(1) {
if pred.is_null() {
continue;
}
let mut retries = 0u8;
loop {
let curr_succ = unsafe { tower_load(pred.ptr(), i) };
unsafe { tower_store(node_ptr, i, curr_succ) };
let cas = unsafe { tower_cas(pred.ptr(), i, curr_succ, new_ptr) };
if cas.is_ok() {
break;
}
retries += 1;
if retries >= 2 {
break;
}
}
}
let mut old_h = self.height.load(Ordering::Relaxed);
while h > old_h {
match self.height.compare_exchange_weak(
old_h,
h,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(current) => old_h = current,
}
}
return (InsertResult::Success, size);
}
Err(_) => {
core::hint::spin_loop();
continue;
}
}
}
}
#[inline]
pub(crate) fn delete(&self, key: &[u8]) -> bool {
let mut x = self.head;
let h = self.height.load(Ordering::Relaxed);
let mut level = if h > 0 { h - 1 } else { 0 };
loop {
let next = unsafe { tower_load(x, level) };
if next.is_null() {
if level == 0 {
return false;
}
level -= 1;
continue;
}
let next_node = next.ptr();
let next_key = unsafe { node_key(next_node) };
match compare_keys(next_key, key) {
std::cmp::Ordering::Less => {
x = next_node;
}
std::cmp::Ordering::Equal => {
if level == 0 {
return unsafe { set_tombstone(next_node) };
}
level -= 1;
}
std::cmp::Ordering::Greater => {
if level == 0 {
return false;
}
level -= 1;
}
}
}
}
#[inline]
pub(crate) fn find_less(&self, key: &[u8]) -> ([TowerPtr; MAX_HEIGHT], [TowerPtr; MAX_HEIGHT]) {
let mut preds = [TowerPtr::NULL; MAX_HEIGHT];
let mut succs = [TowerPtr::NULL; MAX_HEIGHT];
let mut x = self.head;
let h = self.height.load(Ordering::Relaxed);
let mut level = if h > 0 { h - 1 } else { 0 };
loop {
let next = unsafe { tower_load(x, level) };
if next.is_null() {
preds[level] = TowerPtr::new(x);
succs[level] = TowerPtr::NULL;
if level == 0 {
break;
}
level -= 1;
continue;
}
let next_node = next.ptr();
let next_next = unsafe { tower_load(next_node, level) };
if !next_next.is_null() {
prefetch_read(next_next.ptr());
}
let next_key = unsafe { node_key(next_node) };
if compare_keys(next_key, key) == std::cmp::Ordering::Less {
x = next_node;
} else {
preds[level] = TowerPtr::new(x);
succs[level] = next;
if level == 0 {
break;
}
level -= 1;
}
}
(preds, succs)
}
pub(crate) fn next_snapshot_seq(&self) -> u64 {
self.next_seq.load(Ordering::Relaxed) as u64
}
pub(crate) unsafe fn reset(&mut self, head: *const u8) {
self.head = head;
self.height.store(1, Ordering::Relaxed);
self.next_seq.store(1, Ordering::Relaxed);
}
}