mod arena_sharded;
mod height;
mod iter;
mod node;
mod skiplist;
mod util;
use std::marker::PhantomData;
use std::sync::atomic::{fence, AtomicBool, AtomicUsize, Ordering};
use crate::arena_sharded::ConcurrentArena;
use crate::node::*;
use crate::skiplist::{InsertResult, SkipList};
pub use crate::iter::{Cursor, Entry, Iter, Snapshot, SnapshotIter};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InsertError {
DuplicateKey,
OutOfMemory,
}
impl std::fmt::Display for InsertError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
InsertError::DuplicateKey => write!(f, "duplicate key"),
InsertError::OutOfMemory => write!(f, "out of memory"),
}
}
}
impl std::error::Error for InsertError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BatchError {
PartialFailure { succeeded: usize, failed: usize },
}
impl std::fmt::Display for BatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BatchError::PartialFailure { succeeded, failed } => {
write!(
f,
"partial failure: {} succeeded, {} failed",
succeeded, failed
)
}
}
}
}
impl std::error::Error for BatchError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SealError {
AlreadySealed,
}
impl std::fmt::Display for SealError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SealError::AlreadySealed => write!(f, "memtable is already sealed"),
}
}
}
impl std::error::Error for SealError {}
pub struct ConcurrentSkipList {
pub(crate) skiplist: SkipList,
arena: ConcurrentArena,
live_count: AtomicUsize,
sealed: AtomicBool,
max_memory_bytes: usize,
max_entries: usize,
total_inserts: AtomicUsize,
}
unsafe impl Send for ConcurrentSkipList {}
unsafe impl Sync for ConcurrentSkipList {}
impl ConcurrentSkipList {
pub fn new() -> Self {
Self::with_shards(num_cpus())
}
pub fn with_capacity(arena_bytes: usize) -> Self {
Self::with_capacity_and_shards(arena_bytes, num_cpus(), 0, 0)
}
pub fn with_shards(num_shards: usize) -> Self {
Self::with_capacity_and_shards(64 * 1024, num_shards, 0, 0)
}
pub fn with_capacity_and_shards(
arena_bytes: usize,
num_shards: usize,
max_memory_bytes: usize,
max_entries: usize,
) -> Self {
let arena = ConcurrentArena::with_block_size(num_shards, arena_bytes);
let head = Self::alloc_sentinel(&arena);
let skiplist = unsafe { SkipList::new(head) };
ConcurrentSkipList {
skiplist,
arena,
live_count: AtomicUsize::new(0),
sealed: AtomicBool::new(false),
max_memory_bytes,
max_entries,
total_inserts: AtomicUsize::new(0),
}
}
fn alloc_sentinel(arena: &ConcurrentArena) -> *const u8 {
let local = arena.local();
let head_size = node_alloc_size(MAX_HEIGHT, 0, 0);
let head_ptr = local.alloc_raw(head_size, 8).as_ptr();
unsafe {
init_node(head_ptr, MAX_HEIGHT, b"", b"", false, 0);
}
fence(Ordering::Release);
head_ptr
}
#[inline]
pub fn insert(&self, key: &[u8], value: &[u8]) -> bool {
if self.sealed.load(Ordering::Acquire) {
return false;
}
if self.should_seal() {
return false;
}
let arena = self.arena.local();
match self.skiplist.insert(key, value, arena) {
(InsertResult::Success, size) => {
self.total_inserts.fetch_add(1, Ordering::Relaxed);
self.live_count.fetch_add(1, Ordering::Relaxed);
if self.max_memory_bytes > 0 {
self.arena.record_alloc(size);
}
true
}
(InsertResult::Duplicate | InsertResult::Oom, _) => {
self.total_inserts.fetch_add(1, Ordering::Relaxed);
false
}
}
}
#[inline]
pub fn try_insert(&self, key: &[u8], value: &[u8]) -> Result<(), InsertError> {
if self.sealed.load(Ordering::Acquire) {
return Err(InsertError::DuplicateKey);
}
if self.should_seal() {
return Err(InsertError::OutOfMemory);
}
let arena = self.arena.local();
match self.skiplist.insert(key, value, arena) {
(InsertResult::Success, size) => {
self.total_inserts.fetch_add(1, Ordering::Relaxed);
self.live_count.fetch_add(1, Ordering::Relaxed);
if self.max_memory_bytes > 0 {
self.arena.record_alloc(size);
}
Ok(())
}
(InsertResult::Duplicate, _) => {
self.total_inserts.fetch_add(1, Ordering::Relaxed);
Err(InsertError::DuplicateKey)
}
(InsertResult::Oom, _) => {
self.total_inserts.fetch_add(1, Ordering::Relaxed);
Err(InsertError::OutOfMemory)
}
}
}
#[cold]
fn should_seal(&self) -> bool {
if self.sealed.load(Ordering::Acquire) {
return true;
}
if self.max_memory_bytes > 0 && self.memory_usage() >= self.max_memory_bytes {
return true;
}
if self.max_entries > 0 && self.len() >= self.max_entries {
return true;
}
false
}
pub fn is_under_backpressure(&self) -> bool {
if self.max_memory_bytes > 0 {
let threshold = self.max_memory_bytes - self.max_memory_bytes / 10;
if self.memory_usage() >= threshold {
return true;
}
}
if self.max_entries > 0 {
let threshold = self.max_entries - self.max_entries / 10;
if self.len() >= threshold {
return true;
}
}
false
}
#[inline]
pub fn get_or_insert<'a>(&'a self, key: &[u8], value: &'a [u8]) -> (&'a [u8], bool) {
if let Some((v, false)) = self.get(key) {
return (v, false);
}
if self.insert(key, value) {
(value, true)
} else {
match self.get(key) {
Some((v, false)) => (v, false),
_ => (value, true), }
}
}
pub fn insert_batch(&self, entries: &[(&[u8], &[u8])]) -> Result<usize, BatchError> {
if self.sealed.load(Ordering::Acquire) {
return Err(BatchError::PartialFailure {
succeeded: 0,
failed: entries.len(),
});
}
let track_memory = self.max_memory_bytes > 0;
let mut succeeded = 0;
let arena = self.arena.local();
for (key, value) in entries {
match self.skiplist.insert(key, value, arena) {
(InsertResult::Success, size) => {
succeeded += 1;
self.live_count.fetch_add(1, Ordering::Relaxed);
if track_memory {
self.arena.record_alloc(size);
}
}
(InsertResult::Duplicate | InsertResult::Oom, _) => {}
}
}
self.total_inserts
.fetch_add(entries.len(), Ordering::Relaxed);
let failed = entries.len() - succeeded;
if failed > 0 {
Err(BatchError::PartialFailure { succeeded, failed })
} else {
Ok(succeeded)
}
}
pub fn get_many<'a>(&'a self, keys: &[&[u8]]) -> Vec<Option<&'a [u8]>> {
keys.iter().map(|k| self.get_live(k)).collect()
}
#[inline]
pub fn delete(&self, key: &[u8]) -> bool {
if self.sealed.load(Ordering::Acquire) {
return false;
}
if self.skiplist.delete(key) {
self.live_count.fetch_sub(1, Ordering::Relaxed);
true
} else {
false
}
}
#[inline]
pub fn get(&self, key: &[u8]) -> Option<(&[u8], bool)> {
self.skiplist.get(key)
}
#[inline]
pub fn get_live(&self, key: &[u8]) -> Option<&[u8]> {
match self.get(key) {
Some((value, false)) => Some(value),
_ => None,
}
}
#[inline]
pub fn contains_key(&self, key: &[u8]) -> bool {
matches!(self.get(key), Some((_, false)))
}
pub fn iter(&self) -> Iter<'_> {
Iter {
current: self.skiplist.head,
_owner: PhantomData,
}
}
pub fn snapshot(&self) -> Snapshot<'_> {
let snap_seq = self.skiplist.next_snapshot_seq();
Snapshot {
head: self.skiplist.head,
snap_seq,
_owner: PhantomData,
}
}
pub fn cursor(&self) -> Cursor<'_> {
Cursor {
current: self.skiplist.head,
_owner: PhantomData,
}
}
pub fn cursor_at(&self, target: &[u8]) -> Option<Cursor<'_>> {
let mut c = Cursor {
current: std::ptr::null(),
_owner: PhantomData,
};
if c.seek(self, target) {
Some(c)
} else {
None
}
}
pub fn len(&self) -> usize {
self.live_count.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn is_sealed(&self) -> bool {
self.sealed.load(Ordering::Acquire)
}
pub fn memory_usage(&self) -> usize {
let fast = self.arena.bytes_allocated_fast();
if fast > 0 {
fast
} else {
self.arena.stats().bytes_allocated
}
}
pub fn memory_reserved(&self) -> usize {
self.arena.stats().bytes_reserved
}
pub fn memory_utilization(&self) -> f64 {
self.arena.stats().utilization()
}
pub fn memory_idle(&self) -> usize {
self.arena.stats().bytes_idle()
}
pub fn avg_key_size(&self) -> f64 {
let len = self.len();
if len == 0 {
return 0.0;
}
let total = self.memory_usage() as f64;
let key_value_overhead = 32.0 + 16.0;
let avg_per_entry = total / len as f64;
((avg_per_entry - key_value_overhead) / 2.0).max(0.0)
}
pub fn avg_value_size(&self) -> f64 {
self.avg_key_size()
}
pub fn total_inserts(&self) -> usize {
self.total_inserts.load(Ordering::Relaxed)
}
pub fn max_memory_bytes(&self) -> usize {
self.max_memory_bytes
}
pub fn max_entries(&self) -> usize {
self.max_entries
}
pub fn seal(self) -> Result<(FrozenMemtable, ConcurrentSkipList), SealError> {
if self.sealed.swap(true, Ordering::Acquire) {
std::mem::drop(FrozenMemtable { inner: self });
return Err(SealError::AlreadySealed);
}
let num_shards = self.arena.num_shards();
let stats = self.arena.stats();
let block_size = if stats.block_count > 0 {
stats.bytes_reserved / stats.block_count
} else {
64 * 1024
};
let fresh = ConcurrentSkipList::with_capacity_and_shards(
block_size,
num_shards,
self.max_memory_bytes,
self.max_entries,
);
Ok((FrozenMemtable { inner: self }, fresh))
}
pub unsafe fn reset(&mut self) {
self.arena.reset_all();
self.live_count.store(0, Ordering::Relaxed);
self.sealed.store(false, Ordering::Release);
self.total_inserts.store(0, Ordering::Relaxed);
let head = Self::alloc_sentinel(&self.arena);
self.skiplist.reset(head);
}
}
impl Default for ConcurrentSkipList {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for ConcurrentSkipList {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConcurrentSkipList")
.field("len", &self.len())
.field("sealed", &self.is_sealed())
.field("memory_usage", &self.memory_usage())
.finish()
}
}
fn num_cpus() -> usize {
#[cfg(miri)]
{
4
}
#[cfg(not(miri))]
{
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4)
}
}
pub struct FrozenMemtable {
inner: ConcurrentSkipList,
}
unsafe impl Send for FrozenMemtable {}
unsafe impl Sync for FrozenMemtable {}
impl FrozenMemtable {
pub fn iter(&self) -> Iter<'_> {
self.inner.iter()
}
pub fn snapshot(&self) -> Snapshot<'_> {
self.inner.snapshot()
}
pub fn cursor(&self) -> Cursor<'_> {
self.inner.cursor()
}
pub fn cursor_at(&self, target: &[u8]) -> Option<Cursor<'_>> {
self.inner.cursor_at(target)
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn memory_usage(&self) -> usize {
self.inner.memory_usage()
}
pub fn memory_reserved(&self) -> usize {
self.inner.memory_reserved()
}
pub fn memory_utilization(&self) -> f64 {
self.inner.memory_utilization()
}
pub fn memory_idle(&self) -> usize {
self.inner.memory_idle()
}
pub fn get(&self, key: &[u8]) -> Option<(&[u8], bool)> {
self.inner.get(key)
}
pub fn get_live(&self, key: &[u8]) -> Option<&[u8]> {
self.inner.get_live(key)
}
}
impl std::fmt::Debug for FrozenMemtable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FrozenMemtable")
.field("len", &self.len())
.field("memory_usage", &self.memory_usage())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_insert_get() {
let sl = ConcurrentSkipList::new();
assert!(sl.insert(b"key1", b"value1"));
assert!(sl.insert(b"key2", b"value2"));
let (v, tomb) = sl.get(b"key1").unwrap();
assert_eq!(v, b"value1");
assert!(!tomb);
assert_eq!(sl.get_live(b"key1"), Some(b"value1".as_slice()));
assert_eq!(sl.get_live(b"missing"), None);
}
#[test]
fn test_delete_tombstone() {
let sl = ConcurrentSkipList::new();
sl.insert(b"key1", b"value1");
assert!(sl.delete(b"key1"));
let (_, tomb) = sl.get(b"key1").unwrap();
assert!(tomb);
assert_eq!(sl.get_live(b"key1"), None);
}
#[test]
fn test_snapshot_iter() {
let sl = ConcurrentSkipList::new();
sl.insert(b"alpha", b"1");
sl.insert(b"beta", b"2");
sl.insert(b"gamma", b"3");
let snap = sl.snapshot();
let entries: Vec<_> = snap.iter().collect();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].key, b"alpha");
assert_eq!(entries[1].key, b"beta");
assert_eq!(entries[2].key, b"gamma");
}
#[test]
fn test_debug_iter() {
let sl = ConcurrentSkipList::new();
let r1 = sl.insert(b"x", b"1");
let r2 = sl.insert(b"y", b"2");
assert!(r1, "insert x failed");
assert!(r2, "insert y failed");
let mut cur = sl.skiplist.head;
let mut count = 0;
loop {
let next = unsafe { crate::node::tower_load(cur, 0) };
if next.is_null() {
break;
}
let node = next.ptr();
let key = unsafe { crate::node::node_key(node) };
let seq = unsafe { crate::node::node_seq(node) };
eprintln!(
" [{}] key={:?} seq={}",
count,
std::str::from_utf8(key).unwrap_or("<non-utf8>"),
seq
);
cur = node;
count += 1;
}
eprintln!(" total level-0 nodes: {}", count);
let entries: Vec<_> = sl.iter().collect();
for (i, e) in entries.iter().enumerate() {
eprintln!(
" iter[{}] key={:?}",
i,
std::str::from_utf8(e.key).unwrap_or("<non-utf8>")
);
}
assert_eq!(entries.len(), 2, "expected 2 iter entries");
}
#[test]
fn test_live_iter() {
let sl = ConcurrentSkipList::new();
let r1 = sl.insert(b"x", b"1");
let r2 = sl.insert(b"y", b"2");
assert!(r1, "first insert should succeed");
assert!(r2, "second insert should succeed");
assert!(sl.get(b"x").is_some(), "x should be found");
assert!(sl.get(b"y").is_some(), "y should be found");
let entries: Vec<_> = sl.iter().collect();
assert_eq!(
entries.len(),
2,
"expected 2 entries, got {}",
entries.len()
);
}
#[test]
fn test_duplicate_insert() {
let sl = ConcurrentSkipList::new();
assert!(sl.insert(b"key", b"v1"));
assert!(!sl.insert(b"key", b"v2"));
let (v, _) = sl.get(b"key").unwrap();
assert_eq!(v, b"v1");
}
#[test]
fn test_insert_batch() {
let sl = ConcurrentSkipList::new();
let entries: &[(&[u8], &[u8])] = &[(b"a", b"1"), (b"b", b"2"), (b"c", b"3")];
let result = sl.insert_batch(entries);
assert_eq!(result, Ok(3));
assert_eq!(sl.len(), 3);
let dup_entries: &[(&[u8], &[u8])] = &[(b"a", b"x"), (b"d", b"4")];
let result = sl.insert_batch(dup_entries);
assert!(result.is_err());
}
#[test]
fn test_get_many() {
let sl = ConcurrentSkipList::new();
sl.insert(b"a", b"1");
sl.insert(b"b", b"2");
sl.insert(b"c", b"3");
sl.delete(b"b");
let keys: &[&[u8]] = &[b"a", b"b", b"c", b"d"];
let results = sl.get_many(keys);
assert_eq!(results[0], Some(b"1".as_slice()));
assert_eq!(results[1], None);
assert_eq!(results[2], Some(b"3".as_slice()));
assert_eq!(results[3], None);
}
#[test]
fn test_contains_key() {
let sl = ConcurrentSkipList::new();
assert!(!sl.contains_key(b"missing"));
sl.insert(b"key", b"val");
assert!(sl.contains_key(b"key"));
sl.delete(b"key");
assert!(!sl.contains_key(b"key"));
}
#[test]
fn test_memory_usage() {
let sl = ConcurrentSkipList::new();
let before = sl.memory_usage();
for i in 0..100 {
let k = format!("k{:04}", i);
let v = format!("v{:04}", i);
sl.insert(k.as_bytes(), v.as_bytes());
}
assert!(sl.memory_usage() > before);
}
#[test]
fn test_memory_stats() {
let sl = ConcurrentSkipList::new();
sl.insert(b"key1", b"value1");
let reserved = sl.memory_reserved();
let utilization = sl.memory_utilization();
let idle = sl.memory_idle();
assert!(reserved > 0);
assert!(utilization > 0.0 && utilization <= 1.0);
assert_eq!(reserved - sl.memory_usage(), idle);
let (frozen, _) = sl.seal().unwrap();
assert!(frozen.memory_reserved() > 0);
assert!(frozen.memory_utilization() > 0.0);
}
#[test]
fn test_empty_list() {
let sl = ConcurrentSkipList::new();
assert!(sl.is_empty());
assert_eq!(sl.get(b"nope"), None);
assert_eq!(sl.iter().count(), 0);
let snap = sl.snapshot();
assert_eq!(snap.iter().count(), 0);
}
#[test]
fn test_empty_key_in_iter() {
let sl = ConcurrentSkipList::new();
assert!(sl.insert(b"", b"empty_key_val"));
assert!(sl.insert(b"a", b"val_a"));
let entries: Vec<_> = sl.iter().collect();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].key, b"");
assert_eq!(entries[0].value, b"empty_key_val");
assert_eq!(entries[1].key, b"a");
let snap = sl.snapshot();
let snap_entries: Vec<_> = snap.iter().collect();
assert_eq!(snap_entries.len(), 2);
assert_eq!(snap_entries[0].key, b"");
}
#[test]
fn test_snapshot_excludes_concurrent_inserts() {
let sl = ConcurrentSkipList::new();
for i in 0..10 {
let k = format!("key_{:04}", i);
let v = format!("val_{:04}", i);
sl.insert(k.as_bytes(), v.as_bytes());
}
let snap = sl.snapshot();
let snap_seq = snap.snap_seq;
for i in 10..20 {
let k = format!("key_{:04}", i);
let v = format!("val_{:04}", i);
sl.insert(k.as_bytes(), v.as_bytes());
}
let entries: Vec<_> = snap.iter().collect();
assert_eq!(entries.len(), 10);
for (i, entry) in entries.iter().enumerate() {
let expected = format!("key_{:04}", i);
assert_eq!(entry.key, expected.as_bytes());
}
let live: Vec<_> = sl.iter().collect();
assert_eq!(live.len(), 20);
assert!(snap_seq < sl.skiplist.next_seq.load(Ordering::Relaxed) as u64);
}
#[test]
fn test_len_decrements_on_delete() {
let sl = ConcurrentSkipList::new();
sl.insert(b"a", b"1");
sl.insert(b"b", b"2");
assert_eq!(sl.len(), 2);
sl.delete(b"a");
assert_eq!(sl.len(), 1);
sl.delete(b"b");
assert_eq!(sl.len(), 0);
assert!(sl.is_empty());
}
#[test]
fn test_seal_creates_fresh_memtable() {
let sl = ConcurrentSkipList::new();
sl.insert(b"key1", b"val1");
sl.insert(b"key2", b"val2");
let (frozen, fresh) = sl.seal().unwrap();
assert_eq!(frozen.len(), 2);
assert_eq!(frozen.get_live(b"key1"), Some(b"val1".as_slice()));
assert!(fresh.is_empty());
assert!(fresh.insert(b"key3", b"val3"));
assert_eq!(fresh.get_live(b"key3"), Some(b"val3".as_slice()));
let entries: Vec<_> = frozen.iter().collect();
assert_eq!(entries.len(), 2);
}
#[test]
fn test_seal_double_returns_error() {
let sl = ConcurrentSkipList::new();
sl.insert(b"key", b"val");
let (frozen, _fresh) = sl.seal().unwrap();
assert_eq!(frozen.len(), 1);
}
#[test]
fn test_sealed_rejects_writes() {
let sl = ConcurrentSkipList::new();
sl.insert(b"key", b"val");
sl.sealed.store(true, Ordering::Release);
assert!(!sl.insert(b"new", b"val"));
assert!(!sl.delete(b"key"));
}
#[test]
fn test_cursor_basic() {
let sl = ConcurrentSkipList::new();
sl.insert(b"b", b"2");
sl.insert(b"a", b"1");
sl.insert(b"c", b"3");
let mut cursor = sl.cursor();
assert!(cursor.next_entry());
let e = cursor.entry().unwrap();
assert_eq!(e.key, b"a");
assert!(cursor.next_entry());
let e = cursor.entry().unwrap();
assert_eq!(e.key, b"b");
assert!(cursor.next_entry());
let e = cursor.entry().unwrap();
assert_eq!(e.key, b"c");
assert!(!cursor.next_entry());
}
#[test]
fn test_cursor_seek() {
let sl = ConcurrentSkipList::new();
sl.insert(b"a", b"1");
sl.insert(b"c", b"3");
sl.insert(b"e", b"5");
let cursor = sl.cursor_at(b"c").unwrap();
let e = cursor.entry().unwrap();
assert_eq!(e.key, b"c");
let cursor = sl.cursor_at(b"b").unwrap();
let e = cursor.entry().unwrap();
assert_eq!(e.key, b"c");
assert!(sl.cursor_at(b"z").is_none());
let cursor = sl.cursor_at(b"").unwrap();
let e = cursor.entry().unwrap();
assert_eq!(e.key, b"a");
}
#[test]
fn test_cursor_as_iterator() {
let sl = ConcurrentSkipList::new();
sl.insert(b"a", b"1");
sl.insert(b"b", b"2");
sl.insert(b"c", b"3");
let cursor = sl.cursor_at(b"b").unwrap();
let keys: Vec<_> = cursor.map(|e| e.key.to_vec()).collect();
assert_eq!(keys, vec![b"b".to_vec(), b"c".to_vec()]);
}
#[test]
fn test_cursor_with_tombstones() {
let sl = ConcurrentSkipList::new();
sl.insert(b"a", b"1");
sl.insert(b"b", b"2");
sl.insert(b"c", b"3");
sl.insert(b"d", b"4");
sl.insert(b"e", b"5");
assert!(sl.delete(b"b"));
assert!(sl.delete(b"d"));
let cursor = sl.cursor_at(b"a").unwrap();
let entries: Vec<_> = cursor.collect();
assert_eq!(entries.len(), 5);
assert!(!entries[0].is_tombstone); assert!(entries[1].is_tombstone); assert!(!entries[2].is_tombstone); assert!(entries[3].is_tombstone); assert!(!entries[4].is_tombstone);
let live_keys: Vec<_> = sl
.cursor_at(b"a")
.unwrap()
.filter(|e| !e.is_tombstone)
.map(|e| e.key.to_vec())
.collect();
assert_eq!(live_keys, vec![b"a".to_vec(), b"c".to_vec(), b"e".to_vec()]);
let cursor = sl.cursor_at(b"b").unwrap();
let e = cursor.entry().unwrap();
assert_eq!(e.key, b"b");
assert!(e.is_tombstone);
}
#[test]
fn test_get_or_insert_new_key() {
let sl = ConcurrentSkipList::new();
let (val, is_new) = sl.get_or_insert(b"key", b"value");
assert!(is_new);
assert_eq!(val, b"value");
assert_eq!(sl.len(), 1);
}
#[test]
fn test_get_or_insert_existing_key() {
let sl = ConcurrentSkipList::new();
sl.insert(b"key", b"original");
let (val, is_new) = sl.get_or_insert(b"key", b"replacement");
assert!(!is_new);
assert_eq!(val, b"original");
assert_eq!(sl.len(), 1);
let (v, _) = sl.get(b"key").unwrap();
assert_eq!(v, b"original");
}
#[test]
fn test_debug_format() {
let sl = ConcurrentSkipList::new();
sl.insert(b"key", b"val");
let debug = format!("{:?}", sl);
assert!(debug.contains("ConcurrentSkipList"));
assert!(debug.contains("len: 1"));
assert!(debug.contains("sealed: false"));
}
#[test]
fn test_frozen_memtable_debug() {
let sl = ConcurrentSkipList::new();
sl.insert(b"key", b"val");
let (frozen, _fresh) = sl.seal().unwrap();
let debug = format!("{:?}", frozen);
assert!(debug.contains("FrozenMemtable"));
assert!(debug.contains("len: 1"));
}
#[test]
fn test_seal_full_lifecycle() {
let sl = ConcurrentSkipList::new();
for i in 0..100 {
let k = format!("key_{:04}", i);
let v = format!("val_{:04}", i);
sl.insert(k.as_bytes(), v.as_bytes());
}
assert_eq!(sl.len(), 100);
let (frozen, fresh) = sl.seal().unwrap();
let mut flushed = 0;
for entry in frozen.iter() {
if !entry.is_tombstone {
flushed += 1;
}
}
assert_eq!(flushed, 100);
std::mem::drop(frozen);
assert!(fresh.is_empty());
for i in 0..50 {
let k = format!("new_{:04}", i);
let v = format!("val_{:04}", i);
fresh.insert(k.as_bytes(), v.as_bytes());
}
assert_eq!(fresh.len(), 50);
}
#[test]
fn test_size_based_auto_seal() {
let sl = ConcurrentSkipList::with_capacity_and_shards(256, 4, 512, 0);
for i in 0..100 {
let k = format!("key:{}", i);
let v = format!("value:{}", i);
if !sl.insert(k.as_bytes(), v.as_bytes()) {
break;
}
}
let k = "newkey".as_bytes();
let v = "newvalue".as_bytes();
assert!(!sl.insert(k, v), "should reject after hitting limit");
}
#[test]
fn test_max_entries_auto_seal() {
let sl = ConcurrentSkipList::with_capacity_and_shards(64 * 1024 / 4, 4, 0, 5);
for i in 0..10 {
let k = format!("key:{}", i);
let v = format!("value:{}", i);
if !sl.insert(k.as_bytes(), v.as_bytes()) {
break;
}
}
assert!(sl.len() <= 5, "len={} should be <= max_entries=5", sl.len());
}
#[test]
fn test_backpressure() {
let sl = ConcurrentSkipList::with_capacity_and_shards(256, 4, 1000, 0);
assert!(!sl.is_under_backpressure());
for i in 0..50 {
let k = format!("key:{:04}", i);
let v = "x".repeat(20);
sl.insert(k.as_bytes(), v.as_bytes());
}
assert!(
sl.is_under_backpressure(),
"should be under backpressure after 90%"
);
}
#[test]
fn test_total_inserts() {
let sl = ConcurrentSkipList::new();
assert_eq!(sl.total_inserts(), 0);
sl.insert(b"a", b"1");
sl.insert(b"a", b"2");
sl.insert(b"b", b"3");
assert_eq!(sl.total_inserts(), 3);
}
#[test]
fn test_max_config() {
let sl = ConcurrentSkipList::with_capacity_and_shards(1024, 2, 2048, 100);
assert_eq!(sl.max_memory_bytes(), 2048);
assert_eq!(sl.max_entries(), 100);
}
}