use std::path::Path;
use std::sync::Arc;
use rocksdb::checkpoint::Checkpoint;
use rocksdb::{
BlockBasedIndexType, BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor, DB,
DBCompressionType, ErrorKind, IteratorMode, Options, PrefixRange, ReadOptions,
WaitForCompactOptions, WriteBatch, WriteOptions,
};
use crate::artifact::{ExportManifest, ExportStage, verify_and_stage_import};
use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor};
use crate::snapshot::{SnapshotError, SnapshotStore};
use crate::snapshot_record::{decode_entry, encode_value_into};
const DATA_CF: &str = "data";
const META_CF: &str = "meta";
const CURSOR_KEY: &[u8] = b"cursor";
const DATA_BLOCK_SIZE: usize = 16 * 1024;
const METADATA_BLOCK_SIZE: usize = 4096;
const FILTER_BITS_PER_KEY: f64 = 10.0;
const DATA_WRITE_BUFFER_BYTES: usize = 256 << 20;
const DATA_MAX_WRITE_BUFFERS: i32 = 4;
const META_WRITE_BUFFER_BYTES: usize = 8 << 20;
const MAX_TOTAL_WAL_BYTES: u64 = 1 << 30;
const SYNC_SMOOTHING_BYTES: u64 = 1 << 20;
const MAX_COMPACTION_PARALLELISM: usize = 16;
#[derive(Debug, Clone, Copy)]
pub struct RocksDbConfig {
pub sync: bool,
pub cache_size_bytes: u64,
}
impl Default for RocksDbConfig {
fn default() -> Self {
Self {
sync: false,
cache_size_bytes: 1024 * 1024 * 1024,
}
}
}
pub struct RocksDbSnapshot {
db: Arc<DB>,
config: RocksDbConfig,
cursor: WatchCursor,
}
impl RocksDbSnapshot {
pub fn open(path: &Path, config: RocksDbConfig) -> Result<(WatchCursor, Self), SnapshotError> {
std::fs::create_dir_all(path)?;
let mut db_opts = Options::default();
db_opts.create_if_missing(true);
db_opts.create_missing_column_families(true);
let cores = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(4)
.min(MAX_COMPACTION_PARALLELISM);
db_opts.increase_parallelism(cores as i32);
db_opts.set_max_subcompactions((cores / 2).max(1) as u32);
db_opts.set_max_total_wal_size(MAX_TOTAL_WAL_BYTES);
db_opts.set_bytes_per_sync(SYNC_SMOOTHING_BYTES);
db_opts.set_wal_bytes_per_sync(SYNC_SMOOTHING_BYTES);
let cache = if config.cache_size_bytes > 0 {
let capacity = usize::try_from(config.cache_size_bytes).map_err(|_| {
SnapshotError::InvalidFormat(format!(
"cache_size_bytes {} exceeds usize on this platform",
config.cache_size_bytes
))
})?;
Some(Cache::new_hyper_clock_cache(capacity, 0))
} else {
None
};
let mut data_tbl = BlockBasedOptions::default();
if let Some(cache) = &cache {
data_tbl.set_block_cache(cache);
}
data_tbl.set_block_size(DATA_BLOCK_SIZE);
data_tbl.set_hybrid_ribbon_filter(FILTER_BITS_PER_KEY, 1);
data_tbl.set_optimize_filters_for_memory(true);
data_tbl.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
data_tbl.set_partition_filters(true);
data_tbl.set_metadata_block_size(METADATA_BLOCK_SIZE);
data_tbl.set_cache_index_and_filter_blocks(true);
data_tbl.set_pin_l0_filter_and_index_blocks_in_cache(true);
data_tbl.set_pin_top_level_index_and_filter(true);
let mut data_opts = Options::default();
data_opts.set_compression_type(DBCompressionType::Lz4);
data_opts.set_bottommost_compression_type(DBCompressionType::Zstd);
data_opts.set_write_buffer_size(DATA_WRITE_BUFFER_BYTES);
data_opts.set_max_write_buffer_number(DATA_MAX_WRITE_BUFFERS);
data_opts.set_block_based_table_factory(&data_tbl);
let mut meta_opts = Options::default();
meta_opts.set_compression_type(DBCompressionType::Lz4);
meta_opts.set_write_buffer_size(META_WRITE_BUFFER_BYTES);
if let Some(cache) = &cache {
let mut meta_tbl = BlockBasedOptions::default();
meta_tbl.set_block_cache(cache);
meta_opts.set_block_based_table_factory(&meta_tbl);
}
let db = DB::open_cf_descriptors(
&db_opts,
path,
[
ColumnFamilyDescriptor::new(DATA_CF, data_opts),
ColumnFamilyDescriptor::new(META_CF, meta_opts),
],
)
.map_err(map_rocksdb)?;
let cursor = match db
.get_cf(cf(&db, META_CF)?, CURSOR_KEY)
.map_err(map_rocksdb)?
{
Some(raw) => VersionToken::from_raw(&raw)
.map(WatchCursor::from_version)
.ok_or_else(|| {
SnapshotError::InvalidFormat(format!(
"stored cursor is {} bytes, exceeds version token capacity",
raw.len()
))
})?,
None => WatchCursor::none(),
};
Ok((
cursor.clone(),
Self {
db: Arc::new(db),
config,
cursor,
},
))
}
pub fn reader(&self) -> RocksDbReader {
RocksDbReader {
db: Arc::clone(&self.db),
}
}
pub fn settle(&self) -> Result<(), SnapshotError> {
let mut opts = WaitForCompactOptions::default();
opts.set_flush(true);
self.db.wait_for_compact(&opts).map_err(map_rocksdb)
}
pub fn import(
artifact_dir: &Path,
dest_dir: &Path,
config: RocksDbConfig,
) -> Result<(WatchCursor, Self), SnapshotError> {
let (manifest, stage) =
verify_and_stage_import(artifact_dir, dest_dir, Self::BACKEND, |_| Ok(()))?;
{
let (staged_cursor, verify) = Self::open(
&stage.payload(),
RocksDbConfig {
sync: config.sync,
cache_size_bytes: 0,
},
)?;
verify.db.cancel_all_background_work(true);
if staged_cursor != manifest.cursor {
return Err(SnapshotError::ArtifactInvalid(format!(
"payload cursor {staged_cursor:?} disagrees with manifest cursor {:?}",
manifest.cursor
)));
}
}
stage.finalize_dir()?;
Self::open(dest_dir, config)
}
}
impl RocksDbSnapshot {
pub(crate) const BACKEND: &'static str = "rocksdb";
pub(crate) const BACKEND_VERSION: &'static str = "0.50";
}
#[derive(Clone)]
pub struct RocksDbReader {
db: Arc<DB>,
}
impl RocksDbReader {
pub fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
get_entry(&self.db, key)
}
pub fn multi_get<'k>(
&self,
keys: impl IntoIterator<Item = &'k str>,
) -> Result<Vec<Option<KvEntry>>, SnapshotError> {
let keys: Vec<&str> = keys.into_iter().collect();
let data = cf(&self.db, DATA_CF)?;
let results = self
.db
.batched_multi_get_cf(data, keys.iter().map(|k| k.as_bytes()), false);
keys.iter()
.zip(results)
.map(|(key, res)| match res.map_err(map_rocksdb)? {
Some(raw) => Ok(Some(decode_entry(key, &raw)?)),
None => Ok(None),
})
.collect()
}
pub fn for_each_in_range(
&self,
prefix: &str,
f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
) -> Result<(), SnapshotError> {
scan_prefix(&self.db, prefix, f)
}
pub fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
let mut out = Vec::new();
self.for_each_in_range(prefix, |e| {
out.push(e);
Ok(())
})?;
Ok(out)
}
}
impl SnapshotStore for RocksDbSnapshot {
fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError> {
Self::open(path, RocksDbConfig::default())
}
fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError> {
let data = cf(&self.db, DATA_CF)?;
let meta = cf(&self.db, META_CF)?;
let mut wb = WriteBatch::default();
let mut scratch = Vec::new();
for update in batch {
match update {
KvUpdate::Put(entry) => {
encode_value_into(&mut scratch, &entry.value, &entry.version)?;
wb.put_cf(data, entry.key.as_bytes(), scratch.as_slice());
}
KvUpdate::Delete { key, .. } | KvUpdate::Purge { key, .. } => {
wb.delete_cf(data, key.as_bytes());
}
}
}
wb.put_cf(meta, CURSOR_KEY, cursor.version().as_bytes());
let mut wo = WriteOptions::default();
wo.set_sync(self.config.sync);
self.db.write_opt(&wb, &wo).map_err(map_rocksdb)?;
self.cursor = cursor.clone();
Ok(())
}
fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
get_entry(&self.db, key)
}
fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
let mut out = Vec::new();
self.for_each_in_range(prefix, |entry| {
out.push(entry);
Ok(())
})?;
Ok(out)
}
fn for_each_in_range(
&self,
prefix: &str,
f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
) -> Result<(), SnapshotError> {
scan_prefix(&self.db, prefix, f)
}
fn cursor(&self) -> WatchCursor {
self.cursor.clone()
}
fn export_to(&mut self, dest_dir: &Path) -> Result<ExportManifest, SnapshotError> {
let stage = ExportStage::new(dest_dir)?;
Checkpoint::new(&self.db)
.and_then(|cp| cp.create_checkpoint(stage.payload()))
.map_err(map_rocksdb)?;
{
let (staged_cursor, verify) = Self::open(
&stage.payload(),
RocksDbConfig {
sync: true,
cache_size_bytes: 0,
},
)?;
verify.db.cancel_all_background_work(true);
if staged_cursor != self.cursor {
return Err(SnapshotError::ArtifactInvalid(format!(
"checkpoint recovered cursor {staged_cursor:?}, live fold is at {:?}",
self.cursor
)));
}
}
stage.seal_and_finalize(Self::BACKEND, Self::BACKEND_VERSION, &self.cursor)
}
}
fn cf<'a>(db: &'a DB, name: &str) -> Result<&'a ColumnFamily, SnapshotError> {
db.cf_handle(name)
.ok_or_else(|| SnapshotError::Backend(format!("missing column family: {name}")))
}
fn get_entry(db: &DB, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
match db
.get_cf(cf(db, DATA_CF)?, key.as_bytes())
.map_err(map_rocksdb)?
{
Some(raw) => Ok(Some(decode_entry(key, &raw)?)),
None => Ok(None),
}
}
fn scan_prefix(
db: &DB,
prefix: &str,
mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
) -> Result<(), SnapshotError> {
let mut read_opts = ReadOptions::default();
read_opts.set_iterate_range(PrefixRange(prefix.as_bytes()));
if prefix.is_empty() {
read_opts.fill_cache(false);
}
for item in db.iterator_cf_opt(cf(db, DATA_CF)?, read_opts, IteratorMode::Start) {
let (raw_key, raw_val) = item.map_err(map_rocksdb)?;
let key = std::str::from_utf8(&raw_key).map_err(|e| {
SnapshotError::InvalidFormat(format!("non-UTF-8 key in rocksdb store: {e}"))
})?;
f(decode_entry(key, &raw_val)?)?;
}
Ok(())
}
fn map_rocksdb(e: rocksdb::Error) -> SnapshotError {
match e.kind() {
ErrorKind::IOError => SnapshotError::Io(std::io::Error::other(e.into_string())),
_ => SnapshotError::Backend(e.into_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn open_rejects_corrupted_cursor() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("store");
{
let (_c, store) =
RocksDbSnapshot::open(&path, RocksDbConfig::default()).expect("initial open");
store
.db
.put_cf(
cf(&store.db, META_CF).expect("meta cf"),
CURSOR_KEY,
[0u8; 11],
)
.expect("insert oversized cursor");
}
match RocksDbSnapshot::open(&path, RocksDbConfig::default()) {
Err(SnapshotError::InvalidFormat(_)) => {}
Err(other) => panic!("expected InvalidFormat, got {other:?}"),
Ok(_) => panic!("expected open to reject the oversized cursor"),
}
}
}