use super::arena::Arena;
use super::value_store::ValueStore;
use crate::comparator::SharedComparator;
use crate::key::InternalKey;
use crate::value::{SeqNo, UserValue};
use crate::{UserKey, ValueType};
use std::cmp::Ordering as CmpOrdering;
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
const MAX_HEIGHT: usize = 20;
const UNSET: u32 = 0;
const OFF_HEIGHT: u32 = 11;
const OFF_TOWER: u32 = 24;
#[expect(
clippy::cast_possible_truncation,
reason = "height <= MAX_HEIGHT (20), always fits in u32"
)]
const fn node_size(height: usize) -> u32 {
OFF_TOWER + (height as u32) * 4
}
pub struct SkipMap {
arena: Arena,
values: ValueStore,
comparator: SharedComparator,
head: u32,
height: AtomicUsize,
len: AtomicUsize,
rng_state: AtomicU64,
}
impl SkipMap {
pub fn new(comparator: SharedComparator) -> Self {
let arena = Arena::new();
let head_size = node_size(MAX_HEIGHT);
#[expect(
clippy::expect_used,
reason = "arena capacity is a fixed configuration; exhaustion is fatal"
)]
let head = arena
.alloc(head_size, 4)
.expect("arena must fit at least the head sentinel");
unsafe {
let bytes = arena.get_bytes_mut(head, head_size);
#[expect(
clippy::indexing_slicing,
reason = "OFF_HEIGHT (11) < head_size (104) by construction"
)]
{
#[expect(
clippy::cast_possible_truncation,
reason = "MAX_HEIGHT = 20, fits in u8"
)]
{
bytes[OFF_HEIGHT as usize] = MAX_HEIGHT as u8;
}
}
}
let seed = {
let p = (&raw const arena) as u64;
if p == 0 { 0xDEAD_BEEF } else { p }
};
Self {
arena,
values: ValueStore::new(),
comparator,
head,
height: AtomicUsize::new(1),
len: AtomicUsize::new(0),
rng_state: AtomicU64::new(seed),
}
}
#[expect(
clippy::indexing_slicing,
reason = "preds/succs are [u32; MAX_HEIGHT]; level < height <= MAX_HEIGHT"
)]
pub fn insert(&self, key: &InternalKey, value: &UserValue) {
let height = self.random_height();
let node = self.alloc_node(key, value, height);
let mut list_h = self.height.load(Ordering::Relaxed);
while height > list_h {
match self.height.compare_exchange_weak(
list_h,
height,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(h) => list_h = h,
}
}
let mut preds = [self.head; MAX_HEIGHT];
let mut succs = [UNSET; MAX_HEIGHT];
self.find_splice(key, &mut preds, &mut succs);
for level in 0..height {
loop {
unsafe {
self.tower_atomic(node, level)
.store(succs[level], Ordering::Release);
}
let pred_next = unsafe { self.tower_atomic(preds[level], level) };
match pred_next.compare_exchange_weak(
succs[level],
node,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(_) => {
self.find_splice_for_level(key, &mut preds, &mut succs, level);
}
}
}
}
self.len.fetch_add(1, Ordering::Relaxed);
}
pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn iter(&self) -> Iter<'_> {
Iter {
map: self,
front: self.first_node(),
back: UNSET,
back_init: false,
done: false,
}
}
pub fn range<R: RangeBounds<InternalKey>>(&self, range: R) -> Range<'_> {
let front = match range.start_bound() {
Bound::Included(k) => self.seek_ge(k),
Bound::Excluded(k) => self.seek_gt(k),
Bound::Unbounded => self.first_node(),
};
let end_bound = match range.end_bound() {
Bound::Included(k) => Bound::Included(k.clone()),
Bound::Excluded(k) => Bound::Excluded(k.clone()),
Bound::Unbounded => Bound::Unbounded,
};
Range {
map: self,
end_bound,
front,
back: UNSET,
back_init: false,
done: false,
}
}
#[expect(
clippy::cast_possible_truncation,
reason = "key_bytes.len() <= u16::MAX, value idx <= u32::MAX, height <= MAX_HEIGHT (20)"
)]
fn alloc_node(&self, key: &InternalKey, value: &UserValue, height: usize) -> u32 {
let key_bytes: &[u8] = &key.user_key;
#[expect(
clippy::expect_used,
reason = "arena capacity is fixed; exhaustion is fatal"
)]
let key_offset = self
.arena
.alloc(key_bytes.len() as u32, 1)
.expect("arena exhausted (key data)");
unsafe {
self.arena
.get_bytes_mut(key_offset, key_bytes.len() as u32)
.copy_from_slice(key_bytes);
}
let value_idx = self.values.append(value);
let n_size = node_size(height);
#[expect(
clippy::expect_used,
reason = "arena capacity is fixed; exhaustion is fatal"
)]
let node = self.arena.alloc(n_size, 4).expect("arena exhausted (node)");
#[expect(
clippy::indexing_slicing,
reason = "meta is exactly OFF_TOWER (24) bytes by construction"
)]
unsafe {
let meta = self.arena.get_bytes_mut(node, OFF_TOWER);
meta[0..4].copy_from_slice(&key_offset.to_ne_bytes());
meta[4..8].copy_from_slice(&value_idx.to_ne_bytes());
meta[8..10].copy_from_slice(&(key_bytes.len() as u16).to_ne_bytes());
meta[10] = u8::from(key.value_type);
meta[11] = height as u8;
meta[16..24].copy_from_slice(&key.seqno.to_ne_bytes());
}
node
}
unsafe fn meta(&self, node: u32) -> &[u8] {
unsafe { self.arena.get_bytes(node, OFF_TOWER) }
}
#[expect(
clippy::indexing_slicing,
reason = "metadata is exactly OFF_TOWER (24) bytes by construction"
)]
#[expect(
clippy::expect_used,
reason = "infallible: 4-byte slice always converts to [u8; 4]"
)]
fn node_key_offset(&self, node: u32) -> u32 {
let m = unsafe { self.meta(node) };
u32::from_ne_bytes(m[0..4].try_into().expect("4 bytes"))
}
#[expect(
clippy::indexing_slicing,
reason = "metadata is exactly OFF_TOWER (24) bytes by construction"
)]
#[expect(
clippy::expect_used,
reason = "infallible: 2-byte slice always converts to [u8; 2]"
)]
fn node_key_len(&self, node: u32) -> u16 {
let m = unsafe { self.meta(node) };
u16::from_ne_bytes(m[8..10].try_into().expect("2 bytes"))
}
#[expect(
clippy::indexing_slicing,
reason = "metadata is exactly OFF_TOWER (24) bytes by construction"
)]
#[expect(
clippy::expect_used,
reason = "ValueType discriminant written during alloc_node is always valid"
)]
fn node_value_type(&self, node: u32) -> ValueType {
let m = unsafe { self.meta(node) };
let byte = m[10];
debug_assert!(
byte <= 4,
"invalid ValueType byte {byte} at node offset {node}, meta={m:?}",
);
ValueType::try_from(byte).expect("valid ValueType discriminant")
}
#[expect(
clippy::indexing_slicing,
reason = "metadata is exactly OFF_TOWER (24) bytes by construction"
)]
#[expect(
clippy::expect_used,
reason = "infallible: 4-byte slice always converts to [u8; 4]"
)]
fn node_value_idx(&self, node: u32) -> u32 {
let m = unsafe { self.meta(node) };
u32::from_ne_bytes(m[4..8].try_into().expect("4 bytes"))
}
#[expect(
clippy::indexing_slicing,
reason = "metadata is exactly OFF_TOWER (24) bytes by construction"
)]
#[expect(
clippy::expect_used,
reason = "infallible: 8-byte slice always converts to [u8; 8]"
)]
fn node_seqno(&self, node: u32) -> SeqNo {
let m = unsafe { self.meta(node) };
u64::from_ne_bytes(m[16..24].try_into().expect("8 bytes"))
}
fn node_user_key_bytes(&self, node: u32) -> &[u8] {
let off = self.node_key_offset(node);
let len = u32::from(self.node_key_len(node));
unsafe { self.arena.get_bytes(off, len) }
}
fn node_internal_key(&self, node: u32) -> InternalKey {
let user_key: UserKey = self.node_user_key_bytes(node).into();
let seqno = self.node_seqno(node);
let vt = self.node_value_type(node);
InternalKey {
user_key,
seqno,
value_type: vt,
}
}
fn node_value(&self, node: u32) -> UserValue {
unsafe { self.values.get(self.node_value_idx(node)) }
}
#[expect(
clippy::cast_possible_truncation,
reason = "level < MAX_HEIGHT (20), fits in u32"
)]
unsafe fn tower_atomic(&self, node: u32, level: usize) -> &std::sync::atomic::AtomicU32 {
unsafe {
self.arena
.get_atomic_u32(node + OFF_TOWER + (level as u32) * 4)
}
}
fn next_at(&self, node: u32, level: usize) -> u32 {
unsafe { self.tower_atomic(node, level).load(Ordering::Acquire) }
}
fn first_node(&self) -> u32 {
self.next_at(self.head, 0)
}
fn compare_key(&self, node: u32, target: &InternalKey) -> CmpOrdering {
let node_uk = self.node_user_key_bytes(node);
let target_uk: &[u8] = &target.user_key;
match self.comparator.compare(node_uk, target_uk) {
CmpOrdering::Equal => {
let node_seq = self.node_seqno(node);
target.seqno.cmp(&node_seq)
}
other => other,
}
}
fn compare_nodes(&self, a: u32, b: u32) -> CmpOrdering {
let a_uk = self.node_user_key_bytes(a);
let b_uk = self.node_user_key_bytes(b);
match self.comparator.compare(a_uk, b_uk) {
CmpOrdering::Equal => {
let a_seq = self.node_seqno(a);
let b_seq = self.node_seqno(b);
b_seq.cmp(&a_seq) }
other => other,
}
}
#[expect(clippy::indexing_slicing, reason = "level < list_h <= MAX_HEIGHT")]
fn find_splice(
&self,
key: &InternalKey,
preds: &mut [u32; MAX_HEIGHT],
succs: &mut [u32; MAX_HEIGHT],
) {
let list_h = self.height.load(Ordering::Acquire);
let mut node = self.head;
for level in (0..list_h).rev() {
let mut next = self.next_at(node, level);
while next != UNSET && self.compare_key(next, key) == CmpOrdering::Less {
node = next;
next = self.next_at(node, level);
}
preds[level] = node;
succs[level] = next;
}
}
#[expect(
clippy::indexing_slicing,
reason = "level < MAX_HEIGHT; preds/succs are [u32; MAX_HEIGHT]"
)]
fn find_splice_for_level(
&self,
key: &InternalKey,
preds: &mut [u32; MAX_HEIGHT],
succs: &mut [u32; MAX_HEIGHT],
level: usize,
) {
let mut node = self.head;
let list_h = self.height.load(Ordering::Acquire);
for lv in (level + 1..list_h).rev() {
let mut next = self.next_at(node, lv);
while next != UNSET && self.compare_key(next, key) == CmpOrdering::Less {
node = next;
next = self.next_at(node, lv);
}
}
let mut next = self.next_at(node, level);
while next != UNSET && self.compare_key(next, key) == CmpOrdering::Less {
node = next;
next = self.next_at(node, level);
}
preds[level] = node;
succs[level] = next;
}
fn seek_ge(&self, target: &InternalKey) -> u32 {
let mut node = self.head;
let list_h = self.height.load(Ordering::Acquire);
for level in (0..list_h).rev() {
loop {
let next = self.next_at(node, level);
if next == UNSET {
break;
}
if self.compare_key(next, target) == CmpOrdering::Less {
node = next;
} else {
break;
}
}
}
self.next_at(node, 0)
}
fn seek_gt(&self, target: &InternalKey) -> u32 {
let mut node = self.head;
let list_h = self.height.load(Ordering::Acquire);
for level in (0..list_h).rev() {
loop {
let next = self.next_at(node, level);
if next == UNSET {
break;
}
if self.compare_key(next, target) == CmpOrdering::Greater {
break;
}
node = next;
}
}
self.next_at(node, 0)
}
fn seek_le(&self, target: &InternalKey) -> u32 {
let mut node = self.head;
let list_h = self.height.load(Ordering::Acquire);
for level in (0..list_h).rev() {
loop {
let next = self.next_at(node, level);
if next == UNSET {
break;
}
if self.compare_key(next, target) == CmpOrdering::Greater {
break;
}
node = next;
}
}
if node == self.head { UNSET } else { node }
}
fn seek_lt(&self, target: &InternalKey) -> u32 {
let mut node = self.head;
let list_h = self.height.load(Ordering::Acquire);
for level in (0..list_h).rev() {
loop {
let next = self.next_at(node, level);
if next == UNSET {
break;
}
if self.compare_key(next, target) == CmpOrdering::Less {
node = next;
} else {
break;
}
}
}
if node == self.head { UNSET } else { node }
}
fn last_node(&self) -> u32 {
let mut node = self.head;
let list_h = self.height.load(Ordering::Acquire);
for level in (0..list_h).rev() {
loop {
let next = self.next_at(node, level);
if next == UNSET {
break;
}
node = next;
}
}
if node == self.head { UNSET } else { node }
}
fn find_predecessor(&self, target_node: u32) -> u32 {
let mut node = self.head;
let list_h = self.height.load(Ordering::Acquire);
for level in (0..list_h).rev() {
loop {
let next = self.next_at(node, level);
if next == UNSET || next == target_node {
break;
}
if self.compare_nodes(next, target_node) == CmpOrdering::Less {
node = next;
} else {
break;
}
}
}
loop {
let next = self.next_at(node, 0);
if next == UNSET || next == target_node {
break;
}
if self.compare_nodes(next, target_node) == CmpOrdering::Less {
node = next;
} else {
break;
}
}
if node == self.head { UNSET } else { node }
}
fn random_height(&self) -> usize {
let state = self.rng_state.fetch_add(1, Ordering::Relaxed);
let mut z = state.wrapping_add(0x9E37_79B9_7F4A_7C15);
z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
z ^= z >> 31;
let tz = z.trailing_zeros() as usize;
(1 + tz / 2).min(MAX_HEIGHT)
}
}
pub struct Entry<'a> {
map: &'a SkipMap,
node: u32,
}
impl Entry<'_> {
pub fn key(&self) -> InternalKey {
self.map.node_internal_key(self.node)
}
pub fn user_key_bytes(&self) -> &[u8] {
self.map.node_user_key_bytes(self.node)
}
pub fn value(&self) -> UserValue {
self.map.node_value(self.node)
}
}
pub struct Iter<'a> {
map: &'a SkipMap,
front: u32,
back: u32,
back_init: bool,
done: bool,
}
impl<'a> Iterator for Iter<'a> {
type Item = Entry<'a>;
fn next(&mut self) -> Option<Self::Item> {
if self.done || self.front == UNSET {
return None;
}
let node = self.front;
if self.back_init && node == self.back {
self.done = true;
} else {
self.front = self.map.next_at(node, 0);
if self.front == UNSET {
self.done = true;
}
}
Some(Entry {
map: self.map,
node,
})
}
}
impl DoubleEndedIterator for Iter<'_> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
if !self.back_init {
self.back = self.map.last_node();
self.back_init = true;
}
if self.back == UNSET {
self.done = true;
return None;
}
let node = self.back;
if node == self.front {
self.done = true;
} else {
self.back = self.map.find_predecessor(node);
}
Some(Entry {
map: self.map,
node,
})
}
}
pub struct Range<'a> {
map: &'a SkipMap,
end_bound: Bound<InternalKey>,
front: u32,
back: u32,
back_init: bool,
done: bool,
}
impl Range<'_> {
fn within_end(&self, node: u32) -> bool {
match &self.end_bound {
Bound::Unbounded => true,
Bound::Included(k) => self.map.compare_key(node, k) != CmpOrdering::Greater,
Bound::Excluded(k) => self.map.compare_key(node, k) == CmpOrdering::Less,
}
}
}
impl<'a> Iterator for Range<'a> {
type Item = Entry<'a>;
fn next(&mut self) -> Option<Self::Item> {
if self.done || self.front == UNSET {
return None;
}
let node = self.front;
if !self.within_end(node) {
self.front = UNSET;
self.done = true;
return None;
}
if self.back_init && node == self.back {
self.done = true;
} else {
self.front = self.map.next_at(node, 0);
if self.front == UNSET {
self.done = true;
}
}
Some(Entry {
map: self.map,
node,
})
}
}
impl DoubleEndedIterator for Range<'_> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
if !self.back_init {
self.back = match &self.end_bound {
Bound::Unbounded => self.map.last_node(),
Bound::Included(k) => self.map.seek_le(k),
Bound::Excluded(k) => self.map.seek_lt(k),
};
self.back_init = true;
}
if self.back == UNSET || self.front == UNSET {
self.done = true;
return None;
}
if self.map.compare_nodes(self.back, self.front) == CmpOrdering::Less {
self.done = true;
return None;
}
let node = self.back;
if node == self.front {
self.done = true;
} else {
self.back = self.map.find_predecessor(node);
}
Some(Entry {
map: self.map,
node,
})
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::expect_used,
clippy::doc_markdown,
clippy::cast_sign_loss,
reason = "tests use unwrap/indexing/expect for brevity"
)]
mod tests {
use super::*;
use crate::ValueType;
fn new_map() -> SkipMap {
SkipMap::new(crate::comparator::default_comparator())
}
fn make_key(user_key: &[u8], seqno: SeqNo) -> InternalKey {
InternalKey::new(user_key.to_vec(), seqno, ValueType::Value)
}
fn make_value(data: &[u8]) -> UserValue {
UserValue::from(data)
}
#[test]
fn insert_and_get_single() {
let map = new_map();
let key = make_key(b"hello", 1);
let val = make_value(b"world");
map.insert(&key, &val);
assert_eq!(map.len(), 1);
assert!(!map.is_empty());
let mut iter = map.iter();
let entry = iter.next().expect("one entry");
assert_eq!(&*entry.key().user_key, b"hello");
assert_eq!(entry.key().seqno, 1);
assert_eq!(&*entry.value(), b"world");
assert!(iter.next().is_none());
}
#[test]
fn ordering_user_key_asc_seqno_desc() {
let map = new_map();
map.insert(&make_key(b"abc", 1), &make_value(b"v1"));
map.insert(&make_key(b"abc", 3), &make_value(b"v3"));
map.insert(&make_key(b"abc", 2), &make_value(b"v2"));
let seqnos: Vec<SeqNo> = map.iter().map(|e| e.key().seqno).collect();
assert_eq!(seqnos, vec![3, 2, 1]);
map.insert(&make_key(b"zzz", 10), &make_value(b"z"));
map.insert(&make_key(b"aaa", 10), &make_value(b"a"));
let keys: Vec<Vec<u8>> = map.iter().map(|e| e.key().user_key.to_vec()).collect();
assert_eq!(
keys,
vec![
b"aaa".to_vec(),
b"abc".to_vec(),
b"abc".to_vec(),
b"abc".to_vec(),
b"zzz".to_vec(),
]
);
}
#[test]
fn range_lower_bound() {
let map = new_map();
for i in 0u8..10 {
let key = vec![b'a' + i];
map.insert(&make_key(&key, 0), &make_value(&[i]));
}
let bound = make_key(b"e", crate::MAX_SEQNO);
let keys: Vec<u8> = map.range(bound..).map(|e| e.key().user_key[0]).collect();
assert_eq!(keys, vec![b'e', b'f', b'g', b'h', b'i', b'j']);
}
#[test]
fn range_bounded() {
let map = new_map();
for i in 0u8..10 {
let key = vec![b'a' + i];
map.insert(&make_key(&key, 0), &make_value(&[i]));
}
let lo = make_key(b"c", crate::MAX_SEQNO);
let hi = make_key(b"f", 0);
let keys: Vec<u8> = map.range(lo..=hi).map(|e| e.key().user_key[0]).collect();
assert_eq!(keys, vec![b'c', b'd', b'e', b'f']);
}
#[test]
fn double_ended_iter() {
let map = new_map();
for i in 0u8..5 {
let key = vec![b'a' + i];
map.insert(&make_key(&key, 0), &make_value(&[i]));
}
let mut iter = map.iter();
assert_eq!(iter.next().unwrap().key().user_key[0], b'a');
assert_eq!(iter.next_back().unwrap().key().user_key[0], b'e');
assert_eq!(iter.next().unwrap().key().user_key[0], b'b');
assert_eq!(iter.next_back().unwrap().key().user_key[0], b'd');
assert_eq!(iter.next().unwrap().key().user_key[0], b'c');
assert!(iter.next().is_none());
assert!(iter.next_back().is_none());
}
#[test]
fn double_ended_range() {
let map = new_map();
for i in 0u8..10 {
let key = vec![b'a' + i];
map.insert(&make_key(&key, 0), &make_value(&[i]));
}
let lo = make_key(b"c", crate::MAX_SEQNO);
let hi = make_key(b"g", 0);
let rev: Vec<u8> = map
.range(lo..=hi)
.rev()
.map(|e| e.key().user_key[0])
.collect();
assert_eq!(rev, vec![b'g', b'f', b'e', b'd', b'c']);
}
#[test]
fn empty_value() {
let map = new_map();
map.insert(&make_key(b"k", 0), &make_value(b""));
let entry = map.iter().next().unwrap();
assert!(entry.value().is_empty());
}
#[test]
fn concurrent_inserts() {
use std::sync::Arc;
let map = Arc::new(new_map());
let n_threads = 8;
let n_per_thread = 1000;
let handles: Vec<_> = (0..n_threads)
.map(|t| {
let map = Arc::clone(&map);
std::thread::spawn(move || {
for i in 0..n_per_thread {
let key = format!("t{t:02}_k{i:05}");
map.insert(&make_key(key.as_bytes(), i as u64), &make_value(b"v"));
}
})
})
.collect();
for h in handles {
h.join().expect("thread panicked");
}
assert_eq!(map.len(), n_threads * n_per_thread);
let entries: Vec<_> = map.iter().collect();
for pair in entries.windows(2) {
let a = pair[0].key();
let b = pair[1].key();
assert!(a <= b, "out of order: {a:?} > {b:?}");
}
}
#[test]
fn mvcc_point_lookup_via_range() {
let map = new_map();
map.insert(&make_key(b"key", 1), &make_value(b"v1"));
map.insert(&make_key(b"key", 2), &make_value(b"v2"));
map.insert(&make_key(b"key", 3), &make_value(b"v3"));
let lower = InternalKey::new(b"key".to_vec(), 2, ValueType::Value);
let mut iter = map.range(lower..);
let entry = iter
.next()
.filter(|e| &*e.key().user_key == b"key")
.expect("should find key");
assert_eq!(entry.key().seqno, 2);
assert_eq!(&*entry.value(), b"v2");
let lower2 = InternalKey::new(b"key".to_vec(), 1, ValueType::Value);
let entry2 = map
.range(lower2..)
.next()
.filter(|e| &*e.key().user_key == b"key")
.expect("should find key");
assert_eq!(entry2.key().seqno, 1);
assert_eq!(&*entry2.value(), b"v1");
let lower3 = InternalKey::new(b"key".to_vec(), crate::MAX_SEQNO - 1, ValueType::Value);
let entry3 = map
.range(lower3..)
.next()
.filter(|e| &*e.key().user_key == b"key")
.expect("should find key");
assert_eq!(entry3.key().seqno, 3);
assert_eq!(&*entry3.value(), b"v3");
}
#[test]
fn empty_iter_next_back() {
let map = new_map();
let mut iter = map.iter();
assert!(iter.next().is_none());
assert!(iter.next_back().is_none());
}
#[test]
fn empty_range_next_back() {
let map = new_map();
let lo = make_key(b"a", crate::MAX_SEQNO);
let hi = make_key(b"z", 0);
let mut range = map.range(lo..=hi);
assert!(range.next().is_none());
assert!(range.next_back().is_none());
}
#[test]
fn range_excluded_end_next_back() {
let map = new_map();
for i in 0u8..5 {
map.insert(&make_key(&[b'a' + i], 0), &make_value(&[i]));
}
let lo = make_key(b"a", crate::MAX_SEQNO);
let hi = make_key(b"d", crate::MAX_SEQNO);
let rev: Vec<u8> = map
.range(lo..hi)
.rev()
.map(|e| e.key().user_key[0])
.collect();
assert_eq!(rev, vec![b'c', b'b', b'a']);
}
#[test]
fn seek_le_all_greater_returns_none() {
let map = new_map();
map.insert(&make_key(b"m", 0), &make_value(b"v"));
let hi = make_key(b"a", 0);
let mut range = map.range(..=hi);
assert!(range.next_back().is_none());
}
#[test]
fn next_back_on_first_element() {
let map = new_map();
map.insert(&make_key(b"only", 0), &make_value(b"v"));
let mut iter = map.iter();
let entry = iter.next_back().expect("one entry");
assert_eq!(&*entry.key().user_key, b"only");
assert!(iter.next().is_none());
assert!(iter.next_back().is_none());
}
#[test]
fn concurrent_insert_and_iter_no_sigbus() {
use std::sync::{Arc, Barrier};
let map = Arc::new(new_map());
let barrier = Arc::new(Barrier::new(9));
let writers: Vec<_> = (0..8)
.map(|t| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
std::thread::spawn(move || {
barrier.wait();
for i in 0..500 {
let key = format!("t{t:02}_k{i:04}");
map.insert(&make_key(key.as_bytes(), i as u64), &make_value(b"val"));
}
})
})
.collect();
let reader = {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
std::thread::spawn(move || {
barrier.wait();
let mut count = 0u64;
for _ in 0..100 {
for entry in map.iter() {
let _ = entry.key();
let _ = entry.value();
count += 1;
}
}
count
})
};
for w in writers {
w.join().expect("writer panicked");
}
let reads = reader.join().expect("reader panicked");
assert_eq!(map.len(), 4000);
let _ = reads;
}
}