use std::cmp::Ordering;
use std::sync::Arc;
use super::skiplist_impl::ConcurrentSkipList;
use crate::iterator::merge::SeekableIterator;
use crate::types::{InternalKeyRef, LazyValue, ValueType, compare_internal_key};
#[derive(Clone, Debug)]
pub struct OrdInternalKey(Vec<u8>);
impl OrdInternalKey {
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
}
impl PartialEq for OrdInternalKey {
fn eq(&self, other: &Self) -> bool {
compare_internal_key(&self.0, &other.0) == Ordering::Equal
}
}
impl Eq for OrdInternalKey {}
impl PartialOrd for OrdInternalKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for OrdInternalKey {
fn cmp(&self, other: &Self) -> Ordering {
compare_internal_key(&self.0, &other.0)
}
}
pub struct SkipListMemTable {
map: ConcurrentSkipList<OrdInternalKey, Vec<u8>>,
}
impl Default for SkipListMemTable {
fn default() -> Self {
Self::new()
}
}
impl SkipListMemTable {
pub fn new() -> Self {
Self {
map: ConcurrentSkipList::new(),
}
}
pub fn skiplist_ptr(&self) -> *const ConcurrentSkipList<OrdInternalKey, Vec<u8>> {
&self.map as *const _
}
pub fn insert(&self, encoded_key: Vec<u8>, value: Vec<u8>) {
self.map.insert(OrdInternalKey(encoded_key), value);
}
pub fn get(&self, search_key: &[u8], user_key: &[u8]) -> Option<Option<Vec<u8>>> {
self.get_with_seq(search_key, user_key)
.map(|(result, _seq)| result)
}
pub fn get_with_seq(
&self,
search_key: &[u8],
user_key: &[u8],
) -> Option<(Option<Vec<u8>>, crate::types::SequenceNumber)> {
let search = OrdInternalKey(search_key.to_vec());
let (k, v) = self.map.lower_bound(&search)?;
let kb = k.as_bytes();
if kb.len() < 8 {
return None;
}
let entry_uk = &kb[..kb.len() - 8];
if entry_uk == user_key {
let entry_ref = InternalKeyRef::new(kb);
let seq = entry_ref.sequence();
return Some((
match entry_ref.value_type() {
ValueType::Value => Some(v),
ValueType::Deletion | ValueType::RangeDeletion => None,
},
seq,
));
}
None
}
pub fn iter(&self) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> {
self.map.iter().map(|(k, v)| (k.0, v))
}
pub fn iter_rev(&self) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> {
self.map.iter().rev().map(|(k, v)| (k.0, v))
}
pub fn range(
&self,
bounds: impl std::ops::RangeBounds<OrdInternalKey>,
) -> impl DoubleEndedIterator<Item = (Vec<u8>, Vec<u8>)> {
self.map.range(bounds).map(|(k, v)| (k.0, v))
}
}
pub struct MemTableCursorIter {
_memtable: Arc<crate::memtable::MemTable>,
cursor: *const (),
skiplist: *const ConcurrentSkipList<OrdInternalKey, Vec<u8>>,
}
unsafe impl Send for MemTableCursorIter {}
impl MemTableCursorIter {
pub fn new(memtable: Arc<crate::memtable::MemTable>) -> Self {
let skiplist = memtable.skiplist_ref();
let head = unsafe { &*skiplist }.head_ptr();
Self {
_memtable: memtable,
cursor: head,
skiplist,
}
}
#[inline]
fn sl(&self) -> &ConcurrentSkipList<OrdInternalKey, Vec<u8>> {
unsafe { &*self.skiplist }
}
fn seek_internal(&mut self, target: &[u8]) {
let search = OrdInternalKey(target.to_vec());
self.cursor = self.sl().seek_ge_raw(&search);
}
}
impl Iterator for MemTableCursorIter {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
if self.cursor.is_null() {
return None;
}
let (k, v) = unsafe { self.sl().node_kv(self.cursor) };
let result = (k.as_bytes().to_vec(), v.clone());
self.cursor = unsafe { self.sl().node_next0(self.cursor) };
Some(result)
}
}
impl SeekableIterator for MemTableCursorIter {
fn seek_to(&mut self, target: &[u8]) {
self.seek_internal(target);
}
fn current(&self) -> Option<(Vec<u8>, LazyValue)> {
if self.cursor.is_null() {
return None;
}
let (k, v) = unsafe { self.sl().node_kv(self.cursor) };
Some((k.as_bytes().to_vec(), LazyValue::Inline(v.clone())))
}
fn next_into(&mut self, key_buf: &mut Vec<u8>, value_buf: &mut Vec<u8>) -> bool {
if self.cursor.is_null() {
return false;
}
let (k, v) = unsafe { self.sl().node_kv(self.cursor) };
key_buf.clear();
key_buf.extend_from_slice(k.as_bytes());
value_buf.clear();
value_buf.extend_from_slice(v);
self.cursor = unsafe { self.sl().node_next0(self.cursor) };
true
}
fn next_lazy(&mut self, key_buf: &mut Vec<u8>) -> Option<LazyValue> {
if self.cursor.is_null() {
return None;
}
let (k, v) = unsafe { self.sl().node_kv(self.cursor) };
key_buf.clear();
key_buf.extend_from_slice(k.as_bytes());
let lv = LazyValue::Inline(v.clone());
self.cursor = unsafe { self.sl().node_next0(self.cursor) };
Some(lv)
}
fn prev(&mut self) -> Option<(Vec<u8>, LazyValue)> {
if self.cursor.is_null() {
let ptr = self.sl().tail_ptr();
if ptr.is_null() {
return None;
}
let (k, v) = unsafe { self.sl().node_kv(ptr) };
let result = (k.as_bytes().to_vec(), LazyValue::Inline(v.clone()));
self.cursor = unsafe { self.sl().node_prev0(ptr) };
return Some(result);
}
let prev_ptr = unsafe { self.sl().node_prev0(self.cursor) };
if prev_ptr.is_null() {
self.cursor = std::ptr::null();
return None;
}
let (pk, pv) = unsafe { self.sl().node_kv(prev_ptr) };
let result = (pk.as_bytes().to_vec(), LazyValue::Inline(pv.clone()));
self.cursor = prev_ptr;
Some(result)
}
fn seek_for_prev(&mut self, target: &[u8]) {
let search = OrdInternalKey(target.to_vec());
let ptr = self.sl().seek_le_raw(&search);
if ptr.is_null() {
self.cursor = std::ptr::null();
} else {
self.cursor = ptr;
}
}
fn seek_to_first(&mut self) {
self.cursor = self.sl().head_ptr();
}
fn seek_to_last(&mut self) {
let ptr = self.sl().tail_ptr();
self.cursor = ptr;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::InternalKey;
#[test]
fn test_skiplist_insert_and_get() {
let sl = SkipListMemTable::new();
let ik = InternalKey::new(b"hello", 1, ValueType::Value);
sl.insert(ik.into_bytes(), b"world".to_vec());
let search = InternalKey::new(b"hello", 1, ValueType::Value);
let result = sl.get(search.as_bytes(), b"hello");
assert_eq!(result, Some(Some(b"world".to_vec())));
}
#[test]
fn test_skiplist_seq_visibility() {
let sl = SkipListMemTable::new();
let ik1 = InternalKey::new(b"key", 5, ValueType::Value);
sl.insert(ik1.into_bytes(), b"v5".to_vec());
let ik2 = InternalKey::new(b"key", 10, ValueType::Value);
sl.insert(ik2.into_bytes(), b"v10".to_vec());
let search = InternalKey::new(b"key", 10, ValueType::Value);
assert_eq!(
sl.get(search.as_bytes(), b"key"),
Some(Some(b"v10".to_vec()))
);
let search = InternalKey::new(b"key", 7, ValueType::Value);
assert_eq!(
sl.get(search.as_bytes(), b"key"),
Some(Some(b"v5".to_vec()))
);
let search = InternalKey::new(b"key", 3, ValueType::Value);
assert_eq!(sl.get(search.as_bytes(), b"key"), None);
}
#[test]
fn test_skiplist_iter_order() {
let sl = SkipListMemTable::new();
let ik1 = InternalKey::new(b"b", 1, ValueType::Value);
sl.insert(ik1.into_bytes(), b"2".to_vec());
let ik2 = InternalKey::new(b"a", 2, ValueType::Value);
sl.insert(ik2.into_bytes(), b"1".to_vec());
let ik3 = InternalKey::new(b"ab", 3, ValueType::Value);
sl.insert(ik3.into_bytes(), b"12".to_vec());
let entries: Vec<_> = sl.iter().collect();
assert_eq!(entries.len(), 3);
let user_keys: Vec<&[u8]> = entries.iter().map(|(k, _)| &k[..k.len() - 8]).collect();
assert_eq!(user_keys[0], b"a");
assert_eq!(user_keys[1], b"ab");
assert_eq!(user_keys[2], b"b");
}
#[test]
fn test_skiplist_same_key_seq_order() {
let sl = SkipListMemTable::new();
let ik1 = InternalKey::new(b"key", 1, ValueType::Value);
sl.insert(ik1.into_bytes(), b"v1".to_vec());
let ik2 = InternalKey::new(b"key", 5, ValueType::Value);
sl.insert(ik2.into_bytes(), b"v5".to_vec());
let ik3 = InternalKey::new(b"key", 3, ValueType::Value);
sl.insert(ik3.into_bytes(), b"v3".to_vec());
let entries: Vec<_> = sl.iter().collect();
assert_eq!(entries.len(), 3);
let seqs: Vec<u64> = entries
.iter()
.map(|(k, _)| InternalKeyRef::new(k).sequence())
.collect();
assert_eq!(seqs, vec![5, 3, 1]);
}
#[test]
fn test_skiplist_iter_rev() {
let sl = SkipListMemTable::new();
let ik1 = InternalKey::new(b"a", 1, ValueType::Value);
sl.insert(ik1.into_bytes(), b"1".to_vec());
let ik2 = InternalKey::new(b"b", 2, ValueType::Value);
sl.insert(ik2.into_bytes(), b"2".to_vec());
let ik3 = InternalKey::new(b"c", 3, ValueType::Value);
sl.insert(ik3.into_bytes(), b"3".to_vec());
let entries: Vec<_> = sl.iter_rev().collect();
assert_eq!(entries.len(), 3);
let user_keys: Vec<&[u8]> = entries.iter().map(|(k, _)| &k[..k.len() - 8]).collect();
assert_eq!(user_keys[0], b"c");
assert_eq!(user_keys[1], b"b");
assert_eq!(user_keys[2], b"a");
}
}