use std::path::Path;
use std::sync::Arc;
use rocksdb::{
BlockBasedOptions, ColumnFamilyDescriptor, DBCompactionStyle, FifoCompactOptions, IteratorMode,
Options, SliceTransform, WriteBatch as RocksWriteBatch, WriteOptions, DB,
};
use crate::config::{CoinStoreConfig, BLOOM_FILTER_BITS_PER_KEY};
pub const STO004_MEMTABLE_PREFIX_BLOOM_RATIO: f64 = 0.1;
use super::schema::ALL_COLUMN_FAMILIES;
use super::schema::{
CF_ARCHIVE_COIN_RECORDS, CF_COIN_BY_CONFIRMED_HEIGHT, CF_COIN_BY_PARENT,
CF_COIN_BY_PUZZLE_HASH, CF_COIN_BY_SPENT_HEIGHT, CF_COIN_RECORDS, CF_HINTS, CF_HINTS_BY_VALUE,
CF_MERKLE_NODES, CF_METADATA, CF_STATE_SNAPSHOTS, CF_UNSPENT_BY_PUZZLE_HASH,
STO002_ROCKS_WRITE_BUFFER_BYTES, STO006_ROCKS_MAX_WRITE_BUFFER_NUMBER,
};
use super::{StorageBackend, StorageError, WriteBatch, WriteOp};
pub struct RocksDbBackend {
db: Arc<DB>,
}
fn db_options_for_open(config: &CoinStoreConfig) -> Options {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_write_buffer_size(config.rocksdb_write_buffer_size);
opts.set_max_open_files(-1);
opts.set_keep_log_file_num(10);
opts.increase_parallelism(4);
opts.set_bytes_per_sync(1 << 20);
opts.set_wal_bytes_per_sync(1 << 20);
opts.set_max_total_wal_size(256 * 1024 * 1024);
opts.enable_statistics();
opts
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum BloomProfile {
Full,
Prefix32,
None,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct Sto004BloomPlan {
pub sst_bloom_bits_per_key: Option<i32>,
pub sst_bloom_uses_block_based_builder: bool,
pub pin_l0_filter_and_index_in_cache: bool,
pub memtable_prefix_bloom_ratio: Option<f64>,
}
pub fn sto004_bloom_plan_for_column_family(cf: &str, config: &CoinStoreConfig) -> Sto004BloomPlan {
if !config.bloom_filter {
return Sto004BloomPlan {
sst_bloom_bits_per_key: None,
sst_bloom_uses_block_based_builder: false,
pin_l0_filter_and_index_in_cache: false,
memtable_prefix_bloom_ratio: None,
};
}
match cf_bloom_profile(cf) {
BloomProfile::None => Sto004BloomPlan {
sst_bloom_bits_per_key: None,
sst_bloom_uses_block_based_builder: false,
pin_l0_filter_and_index_in_cache: false,
memtable_prefix_bloom_ratio: None,
},
BloomProfile::Full => Sto004BloomPlan {
sst_bloom_bits_per_key: Some(BLOOM_FILTER_BITS_PER_KEY),
sst_bloom_uses_block_based_builder: false,
pin_l0_filter_and_index_in_cache: true,
memtable_prefix_bloom_ratio: None,
},
BloomProfile::Prefix32 => Sto004BloomPlan {
sst_bloom_bits_per_key: Some(BLOOM_FILTER_BITS_PER_KEY),
sst_bloom_uses_block_based_builder: false,
pin_l0_filter_and_index_in_cache: true,
memtable_prefix_bloom_ratio: Some(STO004_MEMTABLE_PREFIX_BLOOM_RATIO),
},
}
}
fn cf_bloom_profile(cf: &str) -> BloomProfile {
match cf {
CF_COIN_BY_PUZZLE_HASH | CF_UNSPENT_BY_PUZZLE_HASH | CF_HINTS_BY_VALUE => {
BloomProfile::Prefix32
}
CF_COIN_BY_CONFIRMED_HEIGHT | CF_COIN_BY_SPENT_HEIGHT | CF_STATE_SNAPSHOTS => {
BloomProfile::None
}
CF_COIN_RECORDS
| CF_COIN_BY_PARENT
| CF_HINTS
| CF_MERKLE_NODES
| CF_ARCHIVE_COIN_RECORDS
| CF_METADATA => BloomProfile::Full,
_ => BloomProfile::Full,
}
}
fn cf_uses_fixed_prefix_32(cf: &str) -> bool {
matches!(
cf,
CF_COIN_BY_PUZZLE_HASH | CF_UNSPENT_BY_PUZZLE_HASH | CF_HINTS_BY_VALUE
)
}
pub const STO006_FIFO_MAX_TABLE_FILES_SIZE: u64 = 1024 * 1024 * 1024;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Sto006CompactionStyle {
Level,
Fifo,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct Sto006LevelCompactionParams {
pub max_bytes_for_level_base: u64,
pub max_bytes_for_level_multiplier: f64,
pub level0_file_num_compaction_trigger: i32,
pub level0_slowdown_writes_trigger: i32,
pub level0_stop_writes_trigger: i32,
pub target_file_size_base: u64,
pub num_levels: i32,
}
impl Sto006LevelCompactionParams {
pub const NORMATIVE: Self = Self {
max_bytes_for_level_base: 256 * 1024 * 1024,
max_bytes_for_level_multiplier: 10.0,
level0_file_num_compaction_trigger: 4,
level0_slowdown_writes_trigger: 20,
level0_stop_writes_trigger: 36,
target_file_size_base: 64 * 1024 * 1024,
num_levels: 7,
};
}
pub fn sto006_compaction_style_for_cf(cf: &str) -> Sto006CompactionStyle {
match cf {
CF_ARCHIVE_COIN_RECORDS | CF_STATE_SNAPSHOTS => Sto006CompactionStyle::Fifo,
_ => Sto006CompactionStyle::Level,
}
}
pub fn sto006_max_write_buffer_number_for_cf(cf: &str) -> i32 {
let idx = ALL_COLUMN_FAMILIES
.iter()
.position(|&n| n == cf)
.unwrap_or_else(|| panic!("STO-006: unknown column family name {cf:?}"));
STO006_ROCKS_MAX_WRITE_BUFFER_NUMBER[idx]
}
pub fn sto006_apply_level_compaction_options(o: &mut Options) {
let p = Sto006LevelCompactionParams::NORMATIVE;
o.set_compaction_style(DBCompactionStyle::Level);
o.set_max_bytes_for_level_base(p.max_bytes_for_level_base);
o.set_max_bytes_for_level_multiplier(p.max_bytes_for_level_multiplier);
o.set_level_zero_file_num_compaction_trigger(p.level0_file_num_compaction_trigger);
o.set_level_zero_slowdown_writes_trigger(p.level0_slowdown_writes_trigger);
o.set_level_zero_stop_writes_trigger(p.level0_stop_writes_trigger);
o.set_target_file_size_base(p.target_file_size_base);
o.set_num_levels(p.num_levels);
}
pub fn sto006_apply_fifo_compaction_options(o: &mut Options) {
let mut fifo = FifoCompactOptions::default();
fifo.set_max_table_files_size(STO006_FIFO_MAX_TABLE_FILES_SIZE);
o.set_fifo_compaction_options(&fifo);
o.set_compaction_style(DBCompactionStyle::Fifo);
}
fn column_family_descriptor(cf: &str, config: &CoinStoreConfig) -> ColumnFamilyDescriptor {
let idx = ALL_COLUMN_FAMILIES
.iter()
.position(|&n| n == cf)
.unwrap_or_else(|| panic!("STO-002: unknown column family name {cf:?}"));
let mut o = Options::default();
o.set_write_buffer_size(STO002_ROCKS_WRITE_BUFFER_BYTES[idx]);
o.set_max_write_buffer_number(STO006_ROCKS_MAX_WRITE_BUFFER_NUMBER[idx]);
if cf_uses_fixed_prefix_32(cf) {
o.set_prefix_extractor(SliceTransform::create_fixed_prefix(32));
}
let plan = sto004_bloom_plan_for_column_family(cf, config);
if let Some(ratio) = plan.memtable_prefix_bloom_ratio {
o.set_memtable_prefix_bloom_ratio(ratio);
}
let mut block = BlockBasedOptions::default();
if let Some(bits) = plan.sst_bloom_bits_per_key {
block.set_bloom_filter(f64::from(bits), plan.sst_bloom_uses_block_based_builder);
}
if plan.pin_l0_filter_and_index_in_cache {
block.set_pin_l0_filter_and_index_blocks_in_cache(true);
}
o.set_block_based_table_factory(&block);
match sto006_compaction_style_for_cf(cf) {
Sto006CompactionStyle::Fifo => sto006_apply_fifo_compaction_options(&mut o),
Sto006CompactionStyle::Level => sto006_apply_level_compaction_options(&mut o),
}
ColumnFamilyDescriptor::new(cf, o)
}
fn sto005_batch_write_options() -> WriteOptions {
let mut o = WriteOptions::default();
o.set_sync(true);
o
}
impl RocksDbBackend {
pub fn open(config: &CoinStoreConfig) -> Result<Self, StorageError> {
let path: &Path = config.storage_path.as_path();
let opts = db_options_for_open(config);
let cf_descriptors: Vec<ColumnFamilyDescriptor> = ALL_COLUMN_FAMILIES
.iter()
.map(|name| column_family_descriptor(name, config))
.collect();
let db = DB::open_cf_descriptors(&opts, path, cf_descriptors)
.map_err(|e| StorageError::BackendError(format!("Failed to open RocksDB: {}", e)))?;
Ok(Self { db: Arc::new(db) })
}
pub fn list_column_families(config: &CoinStoreConfig) -> Result<Vec<String>, StorageError> {
let opts = db_options_for_open(config);
DB::list_cf(&opts, config.storage_path.as_path()).map_err(|e| {
StorageError::BackendError(format!("Failed to list RocksDB column families: {}", e))
})
}
fn cf_handle(&self, cf: &str) -> Result<&rocksdb::ColumnFamily, StorageError> {
self.db
.cf_handle(cf)
.ok_or_else(|| StorageError::UnknownColumnFamily(cf.to_string()))
}
}
impl StorageBackend for RocksDbBackend {
fn get(&self, cf: &str, key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> {
let cf_handle = self.cf_handle(cf)?;
self.db
.get_cf(cf_handle, key)
.map_err(|e| StorageError::BackendError(format!("RocksDB get error: {}", e)))
}
fn put(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<(), StorageError> {
let cf_handle = self.cf_handle(cf)?;
self.db
.put_cf(cf_handle, key, value)
.map_err(|e| StorageError::BackendError(format!("RocksDB put error: {}", e)))
}
fn delete(&self, cf: &str, key: &[u8]) -> Result<(), StorageError> {
let cf_handle = self.cf_handle(cf)?;
self.db
.delete_cf(cf_handle, key)
.map_err(|e| StorageError::BackendError(format!("RocksDB delete error: {}", e)))
}
fn batch_write(&self, batch: WriteBatch) -> Result<(), StorageError> {
if batch.is_empty() {
return Ok(());
}
let mut rocks_batch = RocksWriteBatch::default();
for op in &batch.ops {
match op {
WriteOp::Put { cf, key, value } => {
let cf_handle = self.cf_handle(cf)?;
rocks_batch.put_cf(cf_handle, key, value);
}
WriteOp::Delete { cf, key } => {
let cf_handle = self.cf_handle(cf)?;
rocks_batch.delete_cf(cf_handle, key);
}
}
}
let opts = sto005_batch_write_options();
self.db
.write_opt(rocks_batch, &opts)
.map_err(|e| StorageError::BackendError(format!("RocksDB batch write error: {}", e)))
}
fn prefix_scan(&self, cf: &str, prefix: &[u8]) -> Result<Vec<super::KvPair>, StorageError> {
let cf_handle = self.cf_handle(cf)?;
let iter = self.db.iterator_cf(
cf_handle,
IteratorMode::From(prefix, rocksdb::Direction::Forward),
);
let mut results = Vec::new();
for item in iter {
let (key, value) =
item.map_err(|e| StorageError::BackendError(format!("Iterator error: {}", e)))?;
if !key.starts_with(prefix) {
break;
}
results.push((key.to_vec(), value.to_vec()));
}
Ok(results)
}
fn flush(&self) -> Result<(), StorageError> {
self.db
.flush()
.map_err(|e| StorageError::BackendError(format!("RocksDB flush error: {}", e)))
}
fn compact(&self, cf: &str) -> Result<(), StorageError> {
let cf_handle = self.cf_handle(cf)?;
self.db
.compact_range_cf(cf_handle, None::<&[u8]>, None::<&[u8]>);
Ok(())
}
}