pub use rocksdb::{BlockBasedOptions as RocksBlockOptions, WriteOptions as RocksDBWriteOptions};
use crossbeam::sync::{ShardedLock, ShardedLockReadGuard};
use rocksdb::{
self, checkpoint::Checkpoint, ColumnFamily, DBIterator, Options as RocksDbOptions, WriteBatch,
};
use smallvec::SmallVec;
use std::{fmt, iter::Peekable, mem, path::Path, sync::Arc};
use crate::{
db::{check_database, Change},
Database, DbOptions, Iter, Iterator, Patch, ResolvedAddress, Snapshot,
};
pub const ID_SIZE: usize = mem::size_of::<u64>();
pub struct RocksDB {
db: Arc<ShardedLock<rocksdb::DB>>,
options: DbOptions,
}
impl From<DbOptions> for RocksDbOptions {
fn from(opts: DbOptions) -> Self {
Self::from(&opts)
}
}
impl From<&DbOptions> for RocksDbOptions {
fn from(opts: &DbOptions) -> Self {
let mut defaults = Self::default();
defaults.create_if_missing(opts.create_if_missing);
defaults.set_compression_type(opts.compression_type.into());
defaults.set_max_open_files(opts.max_open_files.unwrap_or(-1));
defaults.set_max_total_wal_size(opts.max_total_wal_size.unwrap_or(0));
defaults
}
}
pub struct RocksDBSnapshot {
snapshot: rocksdb::Snapshot<'static>,
db: Arc<ShardedLock<rocksdb::DB>>,
}
struct RocksDBIterator<'a> {
iter: Peekable<DBIterator<'a>>,
key: Option<Box<[u8]>>,
value: Option<Box<[u8]>>,
prefix: Option<[u8; ID_SIZE]>,
ended: bool,
}
impl RocksDB {
pub fn open<P: AsRef<Path>>(path: P, options: &DbOptions) -> crate::Result<Self> {
let inner = {
if let Ok(names) = rocksdb::DB::list_cf(&RocksDbOptions::default(), &path) {
let cf_names = names.iter().map(String::as_str).collect::<Vec<_>>();
rocksdb::DB::open_cf(&options.into(), path, cf_names)?
} else {
rocksdb::DB::open(&options.into(), path)?
}
};
let mut db = Self {
db: Arc::new(ShardedLock::new(inner)),
options: *options,
};
check_database(&mut db)?;
Ok(db)
}
pub fn create_checkpoint<T: AsRef<Path>>(&self, path: T) -> crate::Result<()> {
let checkpoint = Checkpoint::new(&*self.get_lock_guard())?;
checkpoint.create_checkpoint(path)?;
Ok(())
}
fn cf_exists(&self, cf_name: &str) -> bool {
self.get_lock_guard().cf_handle(cf_name).is_some()
}
fn create_cf(&self, cf_name: &str) -> crate::Result<()> {
self.db
.write()
.expect("Couldn't get write lock to DB")
.create_cf(cf_name, &self.options.into())
.map_err(Into::into)
}
pub(super) fn get_lock_guard(&self) -> ShardedLockReadGuard<'_, rocksdb::DB> {
self.db.read().expect("Couldn't get read lock to DB")
}
pub(super) fn clear_column_family(&self, batch: &mut WriteBatch, cf: &ColumnFamily) {
const LARGER_KEY: &[u8] = &[u8::max_value(); 1_024];
let db_reader = self.get_lock_guard();
let mut iter = db_reader.raw_iterator_cf(cf);
iter.seek_to_last();
if iter.valid() {
if let Some(key) = iter.key() {
if key.len() < LARGER_KEY.len() {
batch.delete_range_cf::<&[u8]>(cf, &[], LARGER_KEY);
} else {
batch.delete_range_cf::<&[u8]>(cf, &[], key);
batch.delete_cf(cf, &key);
}
}
}
}
fn do_merge(&self, patch: Patch, w_opts: &RocksDBWriteOptions) -> crate::Result<()> {
let mut batch = WriteBatch::default();
for (resolved, changes) in patch.into_changes() {
if !self.cf_exists(&resolved.name) {
self.create_cf(&resolved.name)?;
}
let db_reader = self.get_lock_guard();
let cf = db_reader.cf_handle(&resolved.name).unwrap();
if changes.is_cleared() {
self.clear_prefix(&mut batch, cf, &resolved);
}
if let Some(id_bytes) = resolved.id_to_bytes() {
let mut buffer: SmallVec<[u8; 1_024]> = SmallVec::new();
buffer.extend_from_slice(&id_bytes);
for (key, change) in changes.into_data() {
buffer.truncate(ID_SIZE);
buffer.extend_from_slice(&key);
match change {
Change::Put(ref value) => batch.put_cf(cf, &buffer, value),
Change::Delete => batch.delete_cf(cf, &buffer),
}
}
} else {
for (key, change) in changes.into_data() {
match change {
Change::Put(ref value) => batch.put_cf(cf, &key, value),
Change::Delete => batch.delete_cf(cf, &key),
}
}
}
}
self.get_lock_guard()
.write_opt(batch, w_opts)
.map_err(Into::into)
}
fn clear_prefix(&self, batch: &mut WriteBatch, cf: &ColumnFamily, resolved: &ResolvedAddress) {
if let Some(id_bytes) = resolved.id_to_bytes() {
let next_bytes = next_id_bytes(id_bytes);
batch.delete_range_cf(cf, id_bytes, next_bytes);
} else {
self.clear_column_family(batch, cf);
}
}
#[allow(unsafe_code)]
#[allow(clippy::useless_transmute)]
pub(super) fn rocksdb_snapshot(&self) -> RocksDBSnapshot {
RocksDBSnapshot {
snapshot: unsafe { mem::transmute(self.get_lock_guard().snapshot()) },
db: Arc::clone(&self.db),
}
}
}
impl RocksDBSnapshot {
fn get_lock_guard(&self) -> ShardedLockReadGuard<'_, rocksdb::DB> {
self.db.read().expect("Couldn't get read lock to DB")
}
fn rocksdb_iter(&self, name: &ResolvedAddress, from: &[u8]) -> RocksDBIterator<'_> {
use rocksdb::{Direction, IteratorMode};
let from = name.keyed(from);
let iter = match self.get_lock_guard().cf_handle(&name.name) {
Some(cf) => self
.snapshot
.iterator_cf(cf, IteratorMode::From(from.as_ref(), Direction::Forward)),
None => self.snapshot.iterator(IteratorMode::Start),
};
RocksDBIterator {
iter: iter.peekable(),
prefix: name.id_to_bytes(),
key: None,
value: None,
ended: false,
}
}
}
impl Database for RocksDB {
fn snapshot(&self) -> Box<dyn Snapshot> {
Box::new(self.rocksdb_snapshot())
}
fn merge(&self, patch: Patch) -> crate::Result<()> {
let w_opts = RocksDBWriteOptions::default();
self.do_merge(patch, &w_opts)
}
fn merge_sync(&self, patch: Patch) -> crate::Result<()> {
let mut w_opts = RocksDBWriteOptions::default();
w_opts.set_sync(true);
self.do_merge(patch, &w_opts)
}
}
impl Snapshot for RocksDBSnapshot {
fn get(&self, resolved_addr: &ResolvedAddress, key: &[u8]) -> Option<Vec<u8>> {
if let Some(cf) = self.get_lock_guard().cf_handle(&resolved_addr.name) {
match self.snapshot.get_cf(cf, resolved_addr.keyed(key)) {
Ok(value) => value.map(|v| v.to_vec()),
Err(e) => panic!("{}", e),
}
} else {
None
}
}
fn iter(&self, name: &ResolvedAddress, from: &[u8]) -> Iter<'_> {
Box::new(self.rocksdb_iter(name, from))
}
}
impl<'a> Iterator for RocksDBIterator<'a> {
fn next(&mut self) -> Option<(&[u8], &[u8])> {
if self.ended {
return None;
}
let (key, value) = self.iter.next()?;
if let Some(ref prefix) = self.prefix {
if &key[..ID_SIZE] != prefix {
self.ended = true;
return None;
}
}
self.key = Some(key);
let key = if self.prefix.is_some() {
&self.key.as_ref()?[ID_SIZE..]
} else {
&self.key.as_ref()?[..]
};
self.value = Some(value);
Some((key, self.value.as_ref()?))
}
fn peek(&mut self) -> Option<(&[u8], &[u8])> {
if self.ended {
return None;
}
let (key, value) = self.iter.peek()?;
let key = if let Some(prefix) = self.prefix {
if key[..ID_SIZE] != prefix {
self.ended = true;
return None;
}
&key[ID_SIZE..]
} else {
&key[..]
};
Some((key, &value[..]))
}
}
impl From<RocksDB> for Arc<dyn Database> {
fn from(db: RocksDB) -> Self {
Self::from(Box::new(db) as Box<dyn Database>)
}
}
impl fmt::Debug for RocksDB {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDB").finish()
}
}
impl fmt::Debug for RocksDBSnapshot {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDBSnapshot").finish()
}
}
pub fn next_id_bytes(id_bytes: [u8; ID_SIZE]) -> [u8; ID_SIZE] {
let mut next_id_bytes = id_bytes;
for byte in next_id_bytes.iter_mut().rev() {
if *byte == u8::max_value() {
*byte = 0;
} else {
*byte += 1;
break;
}
}
next_id_bytes
}
#[test]
fn test_next_id_bytes() {
assert_eq!(
next_id_bytes([1, 0, 0, 0, 0, 0, 0, 0]),
[1, 0, 0, 0, 0, 0, 0, 1]
);
assert_eq!(
next_id_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
[1, 2, 3, 4, 5, 6, 7, 9]
);
assert_eq!(
next_id_bytes([1, 0, 0, 0, 0, 0, 0, 254]),
[1, 0, 0, 0, 0, 0, 0, 255]
);
assert_eq!(
next_id_bytes([1, 0, 0, 0, 0, 0, 41, 255]),
[1, 0, 0, 0, 0, 0, 42, 0]
);
assert_eq!(
next_id_bytes([1, 2, 3, 4, 5, 255, 255, 255]),
[1, 2, 3, 4, 6, 0, 0, 0]
);
}