use crate::config::RockSolidCompactionFilterRouterConfig;
use crate::error::{StoreError, StoreResult};
use fast_radix_trie::RadixMap;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use rocksdb::compaction_filter::Decision as RocksDbDecision;
use std::sync::Arc;
pub type BinaryCompactionFilterHandlerFn =
Arc<dyn Fn(u32, &[u8], &[u8]) -> RocksDbDecision + Send + Sync + 'static>;
static BINARY_COMPACTION_FILTER_ROUTER: Lazy<RwLock<RadixMap<BinaryCompactionFilterHandlerFn>>> =
Lazy::new(|| RwLock::new(RadixMap::new()));
#[derive(Default)]
pub struct BinaryCompactionFilterBuilder {
operator_name: Option<String>,
routes_added: bool,
}
impl BinaryCompactionFilterBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn operator_name(&mut self, name: impl Into<String>) -> &mut Self {
self.operator_name = Some(name.into());
self
}
pub fn add_prefix_route(
&mut self,
prefix: &[u8],
handler: BinaryCompactionFilterHandlerFn,
) -> StoreResult<&mut Self> {
let mut router_guard = BINARY_COMPACTION_FILTER_ROUTER.write();
router_guard.insert(prefix.to_vec(), handler);
self.routes_added = true;
Ok(self)
}
pub fn build(self) -> StoreResult<RockSolidCompactionFilterRouterConfig> {
let operator_name = self.operator_name.ok_or_else(|| {
StoreError::InvalidConfiguration(
"Binary compaction filter operator name must be set".to_string(),
)
})?;
if !self.routes_added {
log::warn!(
"Building binary compaction filter config ('{}'), but no routes were added. The filter will default to 'Keep' for all keys.",
operator_name
);
}
Ok(RockSolidCompactionFilterRouterConfig {
name: operator_name,
filter_fn_ptr: binary_router_fn,
})
}
}
pub fn binary_router_fn(
level: u32,
key_bytes: &[u8],
value_bytes: &[u8],
) -> RocksDbDecision {
let router_guard = BINARY_COMPACTION_FILTER_ROUTER.read();
if let Some((_prefix, handler)) = router_guard.get_longest_common_prefix(key_bytes) {
return handler(level, key_bytes, value_bytes);
}
RocksDbDecision::Keep
}