use std::fs::File as SysFile;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
mod arena;
mod skiplist;
use arena::Arena;
use skiplist::{Compare, Error as SkiplistError, Skiplist, SkiplistIterator};
use crate::batch::Batch;
use crate::error::Result;
use crate::sstable::table::{Table, TableWriter};
use crate::vfs::File;
use crate::vlog::{VLog, ValueLocation};
use crate::{InternalKey, InternalKeyRef, LSMIterator, Options, Value, INTERNAL_KEY_SEQ_NUM_MAX};
pub(crate) type BPTreeEntries = Vec<(Vec<u8>, Vec<u8>)>;
#[derive(Clone)]
pub(crate) struct ImmutableEntry {
pub table_id: u64,
pub wal_number: u64,
pub memtable: Arc<MemTable>,
}
#[derive(Default)]
pub(crate) struct ImmutableMemtables(Vec<ImmutableEntry>);
impl ImmutableMemtables {
pub(crate) fn add(&mut self, table_id: u64, wal_number: u64, memtable: Arc<MemTable>) {
self.0.push(ImmutableEntry {
table_id,
wal_number,
memtable,
});
self.0.sort_by_key(|entry| entry.table_id); }
pub(crate) fn remove(&mut self, id_to_remove: u64) {
if let Ok(index) = self.0.binary_search_by_key(&id_to_remove, |entry| entry.table_id) {
self.0.remove(index);
}
}
pub(crate) fn iter(&self) -> impl DoubleEndedIterator<Item = &ImmutableEntry> {
self.0.iter()
}
pub(crate) fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub(crate) fn first(&self) -> Option<&ImmutableEntry> {
self.0.first()
}
}
pub(crate) struct MemTable {
skiplist: Skiplist,
latest_seq_num: AtomicU64,
earliest_seq: u64,
wal_number: AtomicU64,
}
impl Default for MemTable {
fn default() -> Self {
Self::new(1024 * 1024, 0)
}
}
impl MemTable {
pub(crate) fn new(arena_capacity: usize, earliest_seq: u64) -> Self {
let arena = Arc::new(Arena::new(arena_capacity));
let cmp: Compare = |a, b| a.cmp(b);
let skiplist = Skiplist::new(arena, cmp);
MemTable {
skiplist,
latest_seq_num: AtomicU64::new(0),
earliest_seq,
wal_number: AtomicU64::new(0),
}
}
pub(crate) fn earliest_seq(&self) -> u64 {
self.earliest_seq
}
pub(crate) fn set_wal_number(&self, wal_number: u64) {
self.wal_number.store(wal_number, Ordering::Release);
}
pub(crate) fn get_wal_number(&self) -> u64 {
self.wal_number.load(Ordering::Acquire)
}
pub(crate) fn get(&self, key: &[u8], seq_no: Option<u64>) -> Option<(InternalKey, Value)> {
let max_seq = seq_no.unwrap_or(INTERNAL_KEY_SEQ_NUM_MAX);
let mut iter = self.skiplist.iter();
iter.seek_ge(key);
while iter.is_valid() {
let found_key = iter.key_bytes();
if found_key != key {
break; }
let found_trailer = iter.trailer();
let found_seq = found_trailer >> 8;
if found_seq <= max_seq {
let internal_key = InternalKey {
user_key: found_key.to_vec(),
timestamp: 0,
trailer: found_trailer,
};
return Some((internal_key, iter.value_bytes().to_vec()));
}
iter.advance();
}
None
}
pub(crate) fn is_empty(&self) -> bool {
let mut iter = self.skiplist.iter();
iter.first();
!iter.is_valid()
}
pub(crate) fn size(&self) -> usize {
self.skiplist.size() as usize
}
pub(crate) fn add(&self, batch: &Batch) -> Result<()> {
let highest_seq_num = self.apply_batch_to_memtable(batch)?;
self.update_latest_sequence_number(highest_seq_num);
Ok(())
}
fn apply_batch_to_memtable(&self, batch: &Batch) -> Result<u64> {
let empty_val = Value::new();
for (_i, entry, current_seq_num, timestamp) in batch.entries_with_seq_nums()? {
let ikey = InternalKey::new(entry.key.clone(), current_seq_num, entry.kind, timestamp);
let val = if let Some(encoded_value) = &entry.value {
encoded_value.clone()
} else {
empty_val.clone()
};
self.insert_into_memtable(&ikey, &val)?;
}
let highest_seq_num = batch.get_highest_seq_num();
Ok(highest_seq_num)
}
fn insert_into_memtable(&self, key: &InternalKey, value: &Value) -> Result<()> {
let trailer = (key.seq_num() << 8) | (key.kind() as u64);
match self.skiplist.add(&key.user_key, trailer, key.timestamp, value) {
Ok(()) => Ok(()),
Err(SkiplistError::RecordExists) => Ok(()), Err(SkiplistError::ArenaFull) => Err(crate::Error::ArenaFull),
}
}
fn update_latest_sequence_number(&self, current_seq_num: u64) {
let mut prev_seq_num = self.latest_seq_num.load(Ordering::Acquire);
while current_seq_num > prev_seq_num {
match self.latest_seq_num.compare_exchange_weak(
prev_seq_num,
current_seq_num,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(x) => prev_seq_num = x,
}
}
}
#[allow(unused)]
pub(crate) fn lsn(&self) -> u64 {
self.latest_seq_num.load(Ordering::Acquire)
}
pub(crate) fn flush(
&self,
table_id: u64,
lsm_opts: Arc<Options>,
vlog: Option<&Arc<VLog>>,
vlog_threshold: usize,
collect_bptree_entries: bool,
) -> Result<(Arc<Table>, BPTreeEntries)> {
let table_file_path = lsm_opts.sstable_file_path(table_id);
let mut bptree_entries = Vec::new();
{
let file = SysFile::create(&table_file_path)?;
let mut table_writer = TableWriter::new(file, table_id, Arc::clone(&lsm_opts), 0);
let mut iter = self.iter();
iter.seek_first()?;
while iter.valid() {
let key = iter.key().to_owned();
let raw_encoded = iter.value_encoded()?;
let sst_value = maybe_separate_to_vlog(raw_encoded, &key, vlog, vlog_threshold)?;
if collect_bptree_entries {
bptree_entries.push((key.encode(), sst_value.clone()));
}
table_writer.add(key, &sst_value)?;
iter.next()?;
}
table_writer.finish()?;
}
if let Some(vlog) = vlog {
vlog.sync()?;
}
let file = crate::vfs::open_for_sync(&table_file_path)?;
file.sync_all()?;
let file: Arc<dyn File> = Arc::new(file);
let file_size = file.size()?;
let created_table = Arc::new(Table::new(table_id, lsm_opts, file, file_size)?);
Ok((created_table, bptree_entries))
}
pub(crate) fn iter(&self) -> MemTableIterator<'_> {
self.range(None, None)
}
pub(crate) fn range(
&self,
lower: Option<&[u8]>, upper: Option<&[u8]>, ) -> MemTableIterator<'_> {
let mut iter = self.skiplist.new_iter(lower, upper);
if let Some(lower_key) = lower {
iter.seek_ge(lower_key);
} else {
iter.first();
}
MemTableIterator {
iter,
}
}
}
fn maybe_separate_to_vlog(
encoded_value: &[u8],
key: &InternalKey,
vlog: Option<&Arc<VLog>>,
vlog_threshold: usize,
) -> Result<Vec<u8>> {
if encoded_value.is_empty() {
return Ok(encoded_value.to_vec());
}
let location = ValueLocation::decode(encoded_value)?;
if location.is_value_pointer() {
return Ok(encoded_value.to_vec());
}
let value = &location.value;
if let Some(vlog) = vlog {
if value.len() > vlog_threshold {
let encoded_key = key.encode();
let pointer = vlog.append(&encoded_key, value)?;
return Ok(ValueLocation::with_pointer(pointer).encode());
}
}
Ok(encoded_value.to_vec())
}
pub(crate) struct MemTableIterator<'a> {
iter: SkiplistIterator<'a>,
}
impl LSMIterator for MemTableIterator<'_> {
fn seek(&mut self, target: &[u8]) -> Result<bool> {
self.iter.seek(target)
}
fn seek_first(&mut self) -> Result<bool> {
self.iter.seek_first()
}
fn seek_last(&mut self) -> Result<bool> {
self.iter.seek_last()
}
fn next(&mut self) -> Result<bool> {
self.iter.next()
}
fn prev(&mut self) -> Result<bool> {
self.iter.prev()
}
fn valid(&self) -> bool {
self.iter.valid()
}
fn key(&self) -> InternalKeyRef<'_> {
self.iter.key()
}
fn value_encoded(&self) -> Result<&[u8]> {
self.iter.value_encoded()
}
}