use std::cell::Cell;
use std::collections::VecDeque;
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use bytes::Bytes;
use crossbeam_skiplist::map::Range;
use crossbeam_skiplist::SkipMap;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering::SeqCst;
use crate::bytes_range::BytesRange;
use crate::error::SlateDBError;
use crate::iter::{IterationOrder, KeyValueIterator, SeekToKey};
use crate::merge_iterator::MergeIterator;
use crate::types::RowEntry;
use crate::utils::WatchableOnceCell;
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct KVTableInternalKey {
user_key: Bytes,
seq: u64,
}
impl KVTableInternalKey {
pub fn new(user_key: Bytes, seq: u64) -> Self {
Self { user_key, seq }
}
}
impl Ord for KVTableInternalKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.user_key
.cmp(&other.user_key)
.then(self.seq.cmp(&other.seq).reverse())
}
}
impl PartialOrd for KVTableInternalKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct KVTableInternalKeyRange {
start_bound: Bound<KVTableInternalKey>,
end_bound: Bound<KVTableInternalKey>,
}
impl RangeBounds<KVTableInternalKey> for KVTableInternalKeyRange {
fn start_bound(&self) -> Bound<&KVTableInternalKey> {
self.start_bound.as_ref()
}
fn end_bound(&self) -> Bound<&KVTableInternalKey> {
self.end_bound.as_ref()
}
}
impl<T: RangeBounds<Bytes>> From<T> for KVTableInternalKeyRange {
fn from(range: T) -> Self {
let start_bound = match range.start_bound() {
Bound::Included(key) => Bound::Included(KVTableInternalKey::new(key.clone(), u64::MAX)),
Bound::Excluded(key) => Bound::Excluded(KVTableInternalKey::new(key.clone(), 0)),
Bound::Unbounded => Bound::Unbounded,
};
let end_bound = match range.end_bound() {
Bound::Included(key) => Bound::Included(KVTableInternalKey::new(key.clone(), 0)),
Bound::Excluded(key) => Bound::Excluded(KVTableInternalKey::new(key.clone(), u64::MAX)),
Bound::Unbounded => Bound::Unbounded,
};
Self {
start_bound,
end_bound,
}
}
}
pub(crate) struct KVTable {
map: SkipMap<KVTableInternalKey, RowEntry>,
durable: WatchableOnceCell<Result<(), SlateDBError>>,
size: AtomicUsize,
last_tick: AtomicI64,
last_seq: AtomicU64,
}
pub(crate) struct WritableKVTable {
table: Arc<KVTable>,
}
pub(crate) struct ImmutableMemtable {
last_wal_id: u64,
table: Arc<KVTable>,
flushed: WatchableOnceCell<Result<(), SlateDBError>>,
}
pub(crate) struct ImmutableWal {
table: Arc<KVTable>,
}
pub(crate) struct MemTableIterator<'a, T: RangeBounds<KVTableInternalKey>> {
inner: Range<'a, KVTableInternalKey, T, KVTableInternalKey, RowEntry>,
ordering: IterationOrder,
}
pub(crate) struct VecDequeKeyValueIterator {
rows: VecDeque<RowEntry>,
}
impl VecDequeKeyValueIterator {
pub(crate) fn new(rows: VecDeque<RowEntry>) -> Self {
Self { rows }
}
pub(crate) async fn materialize_range(
tables: VecDeque<Arc<KVTable>>,
range: BytesRange,
) -> Result<Self, SlateDBError> {
let memtable_iters = tables
.iter()
.map(|t| t.range_ascending(range.clone()))
.collect();
let mut merge_iter = MergeIterator::new(memtable_iters).await?;
let mut rows = VecDeque::new();
while let Some(row_entry) = merge_iter.next_entry().await? {
rows.push_back(row_entry.clone());
}
Ok(VecDequeKeyValueIterator::new(rows))
}
}
impl KeyValueIterator for VecDequeKeyValueIterator {
async fn next_entry(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
Ok(self.rows.pop_front())
}
}
impl SeekToKey for VecDequeKeyValueIterator {
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
loop {
let front = self.rows.front();
if front.is_some_and(|record| record.key < next_key) {
self.rows.pop_front();
} else {
return Ok(());
}
}
}
}
impl<T: RangeBounds<KVTableInternalKey>> KeyValueIterator for MemTableIterator<'_, T> {
async fn next_entry(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
Ok(self.next_entry_sync())
}
}
impl<T: RangeBounds<KVTableInternalKey>> MemTableIterator<'_, T> {
pub(crate) fn next_entry_sync(&mut self) -> Option<RowEntry> {
let next_entry = match self.ordering {
IterationOrder::Ascending => self.inner.next(),
IterationOrder::Descending => self.inner.next_back(),
};
next_entry.map(|entry| entry.value().clone())
}
}
impl ImmutableMemtable {
pub(crate) fn new(table: WritableKVTable, last_wal_id: u64) -> Self {
Self {
table: table.table,
last_wal_id,
flushed: WatchableOnceCell::new(),
}
}
pub(crate) fn table(&self) -> Arc<KVTable> {
self.table.clone()
}
pub(crate) fn last_wal_id(&self) -> u64 {
self.last_wal_id
}
pub(crate) async fn await_flush_to_l0(&self) -> Result<(), SlateDBError> {
self.flushed.reader().await_value().await
}
pub(crate) fn notify_flush_to_l0(&self, result: Result<(), SlateDBError>) {
self.flushed.write(result);
}
}
impl ImmutableWal {
pub(crate) fn new(table: WritableKVTable) -> Self {
Self { table: table.table }
}
pub(crate) fn table(&self) -> Arc<KVTable> {
self.table.clone()
}
}
impl WritableKVTable {
pub(crate) fn new() -> Self {
Self {
table: Arc::new(KVTable::new()),
}
}
pub(crate) fn table(&self) -> &Arc<KVTable> {
&self.table
}
pub(crate) fn put(&mut self, row: RowEntry) {
self.table.put(row);
}
pub(crate) fn size(&self) -> usize {
self.table.size()
}
pub(crate) fn is_empty(&self) -> bool {
self.size() == 0
}
}
impl KVTable {
pub(crate) fn new() -> Self {
Self {
map: SkipMap::new(),
size: AtomicUsize::new(0),
durable: WatchableOnceCell::new(),
last_tick: AtomicI64::new(i64::MIN),
last_seq: AtomicU64::new(0),
}
}
pub(crate) fn is_empty(&self) -> bool {
self.map.is_empty()
}
pub(crate) fn size(&self) -> usize {
self.size.load(Ordering::Relaxed)
}
pub(crate) fn last_tick(&self) -> i64 {
self.last_tick.load(SeqCst)
}
pub(crate) fn last_seq(&self) -> Option<u64> {
if self.is_empty() {
None
} else {
let last_seq = self.last_seq.load(SeqCst);
Some(last_seq)
}
}
pub(crate) fn get(&self, key: &[u8]) -> Option<RowEntry> {
let user_key = Bytes::from(key.to_vec());
let range = KVTableInternalKeyRange::from(BytesRange::new(
Bound::Included(user_key.clone()),
Bound::Included(user_key),
));
self.map
.range(range)
.next()
.map(|entry| entry.value().clone())
}
pub(crate) fn iter(&self) -> MemTableIterator<KVTableInternalKeyRange> {
self.range_ascending(..)
}
pub(crate) fn range_ascending<T: RangeBounds<Bytes>>(
&self,
range: T,
) -> MemTableIterator<KVTableInternalKeyRange> {
self.range(range, IterationOrder::Ascending)
}
pub(crate) fn range<T: RangeBounds<Bytes>>(
&self,
range: T,
ordering: IterationOrder,
) -> MemTableIterator<KVTableInternalKeyRange> {
MemTableIterator {
inner: self.map.range(range.into()),
ordering,
}
}
fn put(&self, row: RowEntry) {
self.size.fetch_add(row.estimated_size(), Ordering::Relaxed);
let internal_key = KVTableInternalKey::new(row.key.clone(), row.seq);
let previous_size = Cell::new(None);
if let Some(create_ts) = row.create_ts {
self.last_tick
.fetch_max(create_ts, atomic::Ordering::SeqCst);
}
self.last_seq.fetch_max(row.seq, atomic::Ordering::SeqCst);
self.map.compare_insert(internal_key, row, |previous_row| {
previous_size.set(Some(previous_row.estimated_size()));
true
});
if let Some(size) = previous_size.take() {
self.size.fetch_sub(size, Ordering::Relaxed);
}
}
pub(crate) async fn await_durable(&self) -> Result<(), SlateDBError> {
self.durable.reader().await_value().await
}
pub(crate) fn notify_durable(&self, result: Result<(), SlateDBError>) {
self.durable.write(result);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::proptest_util::{arbitrary, sample};
use crate::test_utils::assert_iterator;
use crate::{proptest_util, test_utils};
use rstest::rstest;
use tokio::runtime::Runtime;
#[tokio::test]
async fn test_memtable_iter() {
let mut table = WritableKVTable::new();
table.put(RowEntry::new_value(b"abc333", b"value3", 1));
table.put(RowEntry::new_value(b"abc111", b"value1", 2));
table.put(RowEntry::new_value(b"abc555", b"value5", 3));
table.put(RowEntry::new_value(b"abc444", b"value4", 4));
table.put(RowEntry::new_value(b"abc222", b"value2", 5));
assert_eq!(table.table().last_seq(), Some(5));
let mut iter = table.table().iter();
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(b"abc111", b"value1", 2),
RowEntry::new_value(b"abc222", b"value2", 5),
RowEntry::new_value(b"abc333", b"value3", 1),
RowEntry::new_value(b"abc444", b"value4", 4),
RowEntry::new_value(b"abc555", b"value5", 3),
],
)
.await;
}
#[tokio::test]
async fn test_memtable_iter_entry_attrs() {
let mut table = WritableKVTable::new();
table.put(RowEntry::new_value(b"abc333", b"value3", 1));
table.put(RowEntry::new_value(b"abc111", b"value1", 2));
let mut iter = table.table().iter();
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(b"abc111", b"value1", 2),
RowEntry::new_value(b"abc333", b"value3", 1),
],
)
.await;
}
#[tokio::test]
async fn test_memtable_range_from_existing_key() {
let mut table = WritableKVTable::new();
table.put(RowEntry::new_value(b"abc333", b"value3", 1));
table.put(RowEntry::new_value(b"abc111", b"value1", 2));
table.put(RowEntry::new_value(b"abc555", b"value5", 3));
table.put(RowEntry::new_value(b"abc444", b"value4", 4));
table.put(RowEntry::new_value(b"abc222", b"value2", 5));
let mut iter = table
.table()
.range_ascending(BytesRange::from(Bytes::from_static(b"abc333")..));
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(b"abc333", b"value3", 1),
RowEntry::new_value(b"abc444", b"value4", 4),
RowEntry::new_value(b"abc555", b"value5", 3),
],
)
.await;
}
#[tokio::test]
async fn test_memtable_range_from_nonexisting_key() {
let mut table = WritableKVTable::new();
table.put(RowEntry::new_value(b"abc333", b"value3", 1));
table.put(RowEntry::new_value(b"abc111", b"value1", 2));
table.put(RowEntry::new_value(b"abc555", b"value5", 3));
table.put(RowEntry::new_value(b"abc444", b"value4", 4));
table.put(RowEntry::new_value(b"abc222", b"value2", 5));
let mut iter = table
.table()
.range_ascending(BytesRange::from(Bytes::from_static(b"abc334")..));
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(b"abc444", b"value4", 4),
RowEntry::new_value(b"abc555", b"value5", 3),
],
)
.await;
}
#[tokio::test]
async fn test_memtable_iter_delete() {
let mut table = WritableKVTable::new();
table.put(RowEntry::new_tombstone(b"abc333", 2));
table.put(RowEntry::new_value(b"abc333", b"value3", 1));
table.put(RowEntry::new_value(b"abc444", b"value4", 4));
let iter = table.table().iter();
let mut merge_iter = MergeIterator::new(VecDeque::from(vec![iter]))
.await
.unwrap();
assert_iterator(
&mut merge_iter,
vec![
RowEntry::new_tombstone(b"abc333", 2),
RowEntry::new_value(b"abc444", b"value4", 4),
],
)
.await;
}
#[tokio::test]
async fn test_memtable_track_sz() {
let mut table = WritableKVTable::new();
assert_eq!(table.table.size(), 0);
table.put(RowEntry::new_value(b"first", b"foo", 1));
assert_eq!(table.table.size(), 16);
table.put(RowEntry::new_tombstone(b"first", 2));
assert_eq!(table.table.size(), 29);
table.put(RowEntry::new_tombstone(b"first", 2));
assert_eq!(table.table.size(), 29);
table.put(RowEntry::new_value(b"abc333", b"val1", 1));
assert_eq!(table.table.size(), 47);
table.put(RowEntry::new_value(b"def456", b"blablabla", 2));
assert_eq!(table.table.size(), 70);
table.put(RowEntry::new_value(b"def456", b"blabla", 3));
assert_eq!(table.table.size(), 90);
table.put(RowEntry::new_tombstone(b"abc333", 4));
assert_eq!(table.table.size(), 104);
}
#[rstest]
#[case(
BytesRange::from(..),
KVTableInternalKeyRange {
start_bound: Bound::Unbounded,
end_bound: Bound::Unbounded,
},
vec![KVTableInternalKey::new(Bytes::from_static(b"abc111"), 1)],
vec![]
)]
#[case(
BytesRange::from(Bytes::from_static(b"abc111")..=Bytes::from_static(b"abc333")),
KVTableInternalKeyRange {
start_bound: Bound::Included(KVTableInternalKey::new(Bytes::from_static(b"abc111"), u64::MAX)),
end_bound: Bound::Included(KVTableInternalKey::new(Bytes::from_static(b"abc333"), 0)),
},
vec![
KVTableInternalKey::new(Bytes::from_static(b"abc111"), 1),
KVTableInternalKey::new(Bytes::from_static(b"abc222"), 2),
KVTableInternalKey::new(Bytes::from_static(b"abc333"), 3),
KVTableInternalKey::new(Bytes::from_static(b"abc333"), 0),
KVTableInternalKey::new(Bytes::from_static(b"abc333"), u64::MAX),
],
vec![KVTableInternalKey::new(Bytes::from_static(b"abc444"), 4)]
)]
#[case(
BytesRange::from(Bytes::from_static(b"abc222")..Bytes::from_static(b"abc444")),
KVTableInternalKeyRange {
start_bound: Bound::Included(KVTableInternalKey::new(Bytes::from_static(b"abc222"), u64::MAX)),
end_bound: Bound::Excluded(KVTableInternalKey::new(Bytes::from_static(b"abc444"), u64::MAX)),
},
vec![
KVTableInternalKey::new(Bytes::from_static(b"abc222"), 1),
KVTableInternalKey::new(Bytes::from_static(b"abc333"), 2),
],
vec![
KVTableInternalKey::new(Bytes::from_static(b"abc444"), 0),
KVTableInternalKey::new(Bytes::from_static(b"abc444"), u64::MAX),
KVTableInternalKey::new(Bytes::from_static(b"abc555"), u64::MAX),
]
)]
#[case(
BytesRange::from(..=Bytes::from_static(b"abc333")),
KVTableInternalKeyRange {
start_bound: Bound::Unbounded,
end_bound: Bound::Included(KVTableInternalKey::new(Bytes::from_static(b"abc333"), 0)),
},
vec![
KVTableInternalKey::new(Bytes::from_static(b"abc111"), 1),
KVTableInternalKey::new(Bytes::from_static(b"abc222"), 2),
KVTableInternalKey::new(Bytes::from_static(b"abc333"), 3),
KVTableInternalKey::new(Bytes::from_static(b"abc333"), u64::MAX),
],
vec![KVTableInternalKey::new(Bytes::from_static(b"abc444"), 4)]
)]
fn test_from_internal_key_range(
#[case] range: BytesRange,
#[case] expected: KVTableInternalKeyRange,
#[case] should_contains: Vec<KVTableInternalKey>,
#[case] should_not_contains: Vec<KVTableInternalKey>,
) {
let range = KVTableInternalKeyRange::from(range);
assert_eq!(range, expected);
for key in should_contains {
assert!(range.contains(&key));
}
for key in should_not_contains {
assert!(!range.contains(&key));
}
}
#[test]
fn should_iterate_arbitrary_range() {
let mut runner = proptest_util::runner::new(file!(), None);
let runtime = Runtime::new().unwrap();
let sample_table = sample::table(runner.rng(), 500, 10);
let mut kv_table = WritableKVTable::new();
let mut seq = 1;
for (key, value) in &sample_table {
let row_entry = RowEntry::new_value(key, value, seq);
kv_table.put(row_entry);
seq += 1;
}
runner
.run(
&(arbitrary::nonempty_range(10), arbitrary::iteration_order()),
|(range, ordering)| {
let mut kv_iter = kv_table.table.range(range.clone(), ordering);
runtime.block_on(test_utils::assert_ranged_kv_scan(
&sample_table,
&range,
ordering,
&mut kv_iter,
));
Ok(())
},
)
.unwrap();
}
}