pub mod binary;
use crate::config::RockSolidCompactionFilterRouterConfig;
use crate::error::{StoreError, StoreResult};
use matchit::{Params, Router};
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use rocksdb::compaction_filter::Decision as RocksDbDecision;
use std::collections::binary_heap;
use std::sync::Arc;
pub type CompactionFilterRouteHandlerFn =
Arc<dyn Fn(u32, &[u8], &[u8], &Params) -> RocksDbDecision + Send + Sync + 'static>;
static COMPACTION_FILTER_ROUTER: Lazy<RwLock<Router<CompactionFilterRouteHandlerFn>>> =
Lazy::new(|| RwLock::new(Router::new()));
#[derive(Default)]
pub struct CompactionFilterRouterBuilder {
operator_name: Option<String>,
routes_added: bool,
}
impl CompactionFilterRouterBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn operator_name(&mut self, name: impl Into<String>) -> &mut Self {
self.operator_name = Some(name.into());
self
}
pub fn add_route(&mut self, route_pattern: &str, handler: CompactionFilterRouteHandlerFn) -> StoreResult<&mut Self> {
let mut router_guard = COMPACTION_FILTER_ROUTER.write();
router_guard.insert(route_pattern.to_string(), handler).map_err(|e| {
StoreError::InvalidConfiguration(format!(
"Invalid compaction filter route pattern '{}': {}",
route_pattern, e
))
})?;
drop(router_guard); self.routes_added = true;
Ok(self)
}
pub fn build(self) -> StoreResult<RockSolidCompactionFilterRouterConfig> {
let operator_name = self.operator_name.ok_or_else(|| {
StoreError::InvalidConfiguration("Compaction filter router operator name must be set".to_string())
})?;
if !self.routes_added {
log::warn!(
"Building compaction filter router config ('{}'), but no routes were added. The router will default to 'Keep' for all keys if no pattern matches.",
operator_name
);
}
Ok(RockSolidCompactionFilterRouterConfig {
name: operator_name,
filter_fn_ptr: router_compaction_filter_fn,
})
}
}
pub fn router_compaction_filter_fn(level: u32, key_bytes: &[u8], value_bytes: &[u8]) -> RocksDbDecision {
match std::str::from_utf8(key_bytes) {
Ok(key_str) => {
let router_guard = COMPACTION_FILTER_ROUTER.read();
if let Ok(match_result) = router_guard.at(key_str) {
let handler_arc = match_result.value;
return handler_arc(level, key_bytes, value_bytes, &match_result.params);
} else {
RocksDbDecision::Keep
}
}
Err(_) => {
log::warn!(
"Compaction filter key (hex: {:x?}) is not valid UTF-8. Cannot route. Defaulting to Keep.",
key_bytes
);
RocksDbDecision::Keep
}
}
}
pub fn clear_compaction_filter_routes() {
#[cfg(feature = "test-utils")]
{
let mut router_guard = COMPACTION_FILTER_ROUTER.write();
*router_guard = Router::new();
}
}