#![forbid(unsafe_code)]
#![deny(missing_docs)]
mod iterator;
mod metrics;
pub mod schema;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex;
use anyhow::{format_err, Result};
use iterator::ScanDirection;
pub use iterator::{SchemaIterator, SeekKeyEncoder};
use metrics::{
SCHEMADB_BATCH_COMMIT_BYTES, SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS,
SCHEMADB_BATCH_PUT_LATENCY_SECONDS, SCHEMADB_DELETES, SCHEMADB_GET_BYTES,
SCHEMADB_GET_LATENCY_SECONDS, SCHEMADB_PUT_BYTES,
};
pub use rocksdb::DEFAULT_COLUMN_FAMILY_NAME;
use rocksdb::{ColumnFamilyDescriptor, ReadOptions};
use thiserror::Error;
use tracing::info;
pub use crate::schema::Schema;
use crate::schema::{ColumnFamilyName, KeyCodec, ValueCodec};
#[derive(Debug)]
pub struct DB {
name: &'static str, inner: rocksdb::DB,
}
impl DB {
pub fn open(
path: impl AsRef<Path>,
name: &'static str,
column_families: impl IntoIterator<Item = impl Into<String>>,
db_opts: &rocksdb::Options,
) -> Result<Self> {
let db = DB::open_cf(
db_opts,
path,
name,
column_families.into_iter().map(|cf_name| {
let mut cf_opts = rocksdb::Options::default();
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_opts)
}),
)?;
Ok(db)
}
pub fn open_cf(
db_opts: &rocksdb::Options,
path: impl AsRef<Path>,
name: &'static str,
cfds: impl IntoIterator<Item = ColumnFamilyDescriptor>,
) -> Result<DB> {
let inner = rocksdb::DB::open_cf_descriptors(db_opts, path, cfds)?;
Ok(Self::log_construct(name, inner))
}
pub fn open_cf_readonly(
opts: &rocksdb::Options,
path: impl AsRef<Path>,
name: &'static str,
cfs: Vec<ColumnFamilyName>,
) -> Result<DB> {
let error_if_log_file_exists = false;
let inner = rocksdb::DB::open_cf_for_read_only(opts, path, cfs, error_if_log_file_exists)?;
Ok(Self::log_construct(name, inner))
}
pub fn open_cf_as_secondary<P: AsRef<Path>>(
opts: &rocksdb::Options,
primary_path: P,
secondary_path: P,
name: &'static str,
cfs: Vec<ColumnFamilyName>,
) -> Result<DB> {
let inner = rocksdb::DB::open_cf_as_secondary(opts, primary_path, secondary_path, cfs)?;
Ok(Self::log_construct(name, inner))
}
fn log_construct(name: &'static str, inner: rocksdb::DB) -> DB {
info!(rocksdb_name = name, "Opened RocksDB.");
DB { name, inner }
}
pub fn get<S: Schema>(&self, schema_key: &impl KeyCodec<S>) -> Result<Option<S::Value>> {
let _timer = SCHEMADB_GET_LATENCY_SECONDS
.with_label_values(&[S::COLUMN_FAMILY_NAME])
.start_timer();
let k = schema_key.encode_key()?;
let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
let result = self.inner.get_cf(cf_handle, k)?;
SCHEMADB_GET_BYTES
.with_label_values(&[S::COLUMN_FAMILY_NAME])
.observe(result.as_ref().map_or(0.0, |v| v.len() as f64));
result
.map(|raw_value| <S::Value as ValueCodec<S>>::decode_value(&raw_value))
.transpose()
.map_err(|err| err.into())
}
pub fn put<S: Schema>(&self, key: &impl KeyCodec<S>, value: &impl ValueCodec<S>) -> Result<()> {
let batch = SchemaBatch::new();
batch.put::<S>(key, value)?;
self.write_schemas(batch)
}
fn iter_with_direction<S: Schema>(
&self,
opts: ReadOptions,
direction: ScanDirection,
) -> Result<SchemaIterator<S>> {
let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
Ok(SchemaIterator::new(
self.inner.raw_iterator_cf_opt(cf_handle, opts),
direction,
))
}
pub fn iter<S: Schema>(&self) -> Result<SchemaIterator<S>> {
self.iter_with_direction::<S>(Default::default(), ScanDirection::Forward)
}
pub fn iter_with_opts<S: Schema>(&self, opts: ReadOptions) -> Result<SchemaIterator<S>> {
self.iter_with_direction::<S>(opts, ScanDirection::Forward)
}
pub fn rev_iter<S: Schema>(&self) -> Result<SchemaIterator<S>> {
self.iter_with_direction::<S>(Default::default(), ScanDirection::Backward)
}
pub fn rev_iter_with_opts<S: Schema>(&self, opts: ReadOptions) -> Result<SchemaIterator<S>> {
self.iter_with_direction::<S>(opts, ScanDirection::Backward)
}
pub fn write_schemas(&self, batch: SchemaBatch) -> Result<()> {
let _timer = SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS
.with_label_values(&[self.name])
.start_timer();
let rows_locked = batch.rows.lock().expect("Lock must not be poisoned");
let mut db_batch = rocksdb::WriteBatch::default();
for (cf_name, rows) in rows_locked.iter() {
let cf_handle = self.get_cf_handle(cf_name)?;
for write_op in rows {
match write_op {
WriteOp::Value { key, value } => db_batch.put_cf(cf_handle, key, value),
WriteOp::Deletion { key } => db_batch.delete_cf(cf_handle, key),
}
}
}
let serialized_size = db_batch.size_in_bytes();
self.inner.write_opt(db_batch, &default_write_options())?;
for (cf_name, rows) in rows_locked.iter() {
for write_op in rows {
match write_op {
WriteOp::Value { key, value } => {
SCHEMADB_PUT_BYTES
.with_label_values(&[cf_name])
.observe((key.len() + value.len()) as f64);
}
WriteOp::Deletion { key: _ } => {
SCHEMADB_DELETES.with_label_values(&[cf_name]).inc();
}
}
}
}
SCHEMADB_BATCH_COMMIT_BYTES
.with_label_values(&[self.name])
.observe(serialized_size as f64);
Ok(())
}
fn get_cf_handle(&self, cf_name: &str) -> Result<&rocksdb::ColumnFamily> {
self.inner.cf_handle(cf_name).ok_or_else(|| {
format_err!(
"DB::cf_handle not found for column family name: {}",
cf_name
)
})
}
pub fn flush_cf(&self, cf_name: &str) -> Result<()> {
Ok(self.inner.flush_cf(self.get_cf_handle(cf_name)?)?)
}
pub fn get_property(&self, cf_name: &str, property_name: &str) -> Result<u64> {
self.inner
.property_int_value_cf(self.get_cf_handle(cf_name)?, property_name)?
.ok_or_else(|| {
format_err!(
"Unable to get property \"{}\" of column family \"{}\".",
property_name,
cf_name,
)
})
}
pub fn create_checkpoint<P: AsRef<Path>>(&self, path: P) -> Result<()> {
rocksdb::checkpoint::Checkpoint::new(&self.inner)?.create_checkpoint(path)?;
Ok(())
}
}
#[derive(Debug)]
enum WriteOp {
Value { key: Vec<u8>, value: Vec<u8> },
Deletion { key: Vec<u8> },
}
#[derive(Debug, Default)]
pub struct SchemaBatch {
rows: Mutex<HashMap<ColumnFamilyName, Vec<WriteOp>>>,
}
impl SchemaBatch {
pub fn new() -> Self {
Self::default()
}
pub fn put<S: Schema>(&self, key: &impl KeyCodec<S>, value: &impl ValueCodec<S>) -> Result<()> {
let _timer = SCHEMADB_BATCH_PUT_LATENCY_SECONDS
.with_label_values(&["unknown"])
.start_timer();
let key = key.encode_key()?;
let value = value.encode_value()?;
self.rows
.lock()
.expect("Lock must not be poisoned")
.entry(S::COLUMN_FAMILY_NAME)
.or_insert_with(Vec::new)
.push(WriteOp::Value { key, value });
Ok(())
}
pub fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> Result<()> {
let key = key.encode_key()?;
self.rows
.lock()
.expect("Lock must not be poisoned")
.entry(S::COLUMN_FAMILY_NAME)
.or_insert_with(Vec::new)
.push(WriteOp::Deletion { key });
Ok(())
}
}
#[derive(Error, Debug)]
pub enum CodecError {
#[error("Invalid key length. Expected {expected:}, got {got:}")]
#[allow(missing_docs)] InvalidKeyLength { expected: usize, got: usize },
#[error(transparent)]
Wrapped(#[from] anyhow::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
}
fn default_write_options() -> rocksdb::WriteOptions {
let mut opts = rocksdb::WriteOptions::default();
opts.set_sync(true);
opts
}