use crate::{
MapletError, MapletResult, MapletStats,
hash::{CollisionDetector, FingerprintHasher, HashFunction, PerfectHash},
operators::MergeOperator,
quotient_filter::QuotientFilter,
types::MapletConfig,
};
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug)]
pub struct Maplet<K, V, Op>
where
K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
V: Clone + std::fmt::Debug + Send + Sync,
Op: MergeOperator<V> + Send + Sync,
{
config: MapletConfig,
filter: Arc<RwLock<QuotientFilter>>,
values: Arc<RwLock<std::collections::HashMap<u64, V>>>,
operator: Op,
collision_detector: Arc<RwLock<CollisionDetector>>,
#[allow(dead_code)]
perfect_hash: PerfectHash,
len: Arc<RwLock<usize>>,
_phantom: PhantomData<K>,
}
impl<K, V, Op> Maplet<K, V, Op>
where
K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
Op: MergeOperator<V> + Default + Send + Sync,
{
pub fn new(capacity: usize, false_positive_rate: f64) -> MapletResult<Self> {
let config = MapletConfig::new(capacity, false_positive_rate);
Self::with_config(config)
}
pub fn with_operator(
capacity: usize,
false_positive_rate: f64,
operator: Op,
) -> MapletResult<Self> {
let config = MapletConfig::new(capacity, false_positive_rate);
Self::with_config_and_operator(config, operator)
}
pub fn with_config(config: MapletConfig) -> MapletResult<Self> {
let operator = Op::default();
Self::with_config_and_operator(config, operator)
}
pub fn with_config_and_operator(config: MapletConfig, operator: Op) -> MapletResult<Self> {
config.validate()?;
let fingerprint_bits =
FingerprintHasher::optimal_fingerprint_size(config.false_positive_rate);
let filter =
QuotientFilter::new(config.capacity, fingerprint_bits, HashFunction::default())?;
let collision_detector = CollisionDetector::new(config.capacity / 4); let perfect_hash = PerfectHash::new(config.capacity, HashFunction::default());
Ok(Self {
config,
filter: Arc::new(RwLock::new(filter)),
values: Arc::new(RwLock::new(std::collections::HashMap::new())),
operator,
collision_detector: Arc::new(RwLock::new(collision_detector)),
perfect_hash,
len: Arc::new(RwLock::new(0)),
_phantom: PhantomData,
})
}
pub async fn insert(&self, key: K, value: V) -> MapletResult<()> {
let current_len = *self.len.read().await;
if current_len >= self.config.capacity {
if self.config.auto_resize {
self.resize(self.config.capacity * 2).await?;
} else {
return Err(MapletError::CapacityExceeded);
}
}
let fingerprint = self.hash_key(&key);
let values_guard = self.values.read().await;
let key_exists = values_guard.contains_key(&fingerprint);
drop(values_guard);
if key_exists {
self.merge_value(fingerprint, value).await?;
} else {
{
let mut filter_guard = self.filter.write().await;
filter_guard.insert(fingerprint)?;
}
self.store_value(fingerprint, value).await?;
{
let mut len_guard = self.len.write().await;
*len_guard += 1;
}
}
Ok(())
}
pub async fn query(&self, key: &K) -> Option<V> {
let fingerprint = self.hash_key(key);
let filter_guard = self.filter.read().await;
if !filter_guard.query(fingerprint) {
return None;
}
drop(filter_guard);
let values_guard = self.values.read().await;
values_guard.get(&fingerprint).cloned()
}
pub async fn contains(&self, key: &K) -> bool {
let fingerprint = self.hash_key(key);
let filter_guard = self.filter.read().await;
filter_guard.query(fingerprint)
}
pub async fn delete(&self, key: &K, value: &V) -> MapletResult<bool> {
if !self.config.enable_deletion {
return Err(MapletError::Internal("Deletion not enabled".to_string()));
}
let fingerprint = self.hash_key(key);
let filter_guard = self.filter.read().await;
if !filter_guard.query(fingerprint) {
return Ok(false);
}
drop(filter_guard);
{
let mut values_guard = self.values.write().await;
if let Some(existing_value) = values_guard.get(&fingerprint) {
if existing_value == value {
{
let mut filter_guard = self.filter.write().await;
filter_guard.delete(fingerprint)?;
}
values_guard.remove(&fingerprint);
{
let mut len_guard = self.len.write().await;
*len_guard -= 1;
}
return Ok(true);
}
}
}
Ok(false)
}
pub async fn len(&self) -> usize {
*self.len.read().await
}
pub async fn is_empty(&self) -> bool {
*self.len.read().await == 0
}
pub const fn error_rate(&self) -> f64 {
self.config.false_positive_rate
}
#[allow(clippy::cast_precision_loss)] pub async fn load_factor(&self) -> f64 {
let current_len = *self.len.read().await;
current_len as f64 / self.config.capacity as f64
}
pub async fn stats(&self) -> MapletStats {
let filter_guard = self.filter.read().await;
let filter_stats = filter_guard.stats();
drop(filter_guard);
let memory_usage = self.estimate_memory_usage();
let current_len = *self.len.read().await;
let collision_guard = self.collision_detector.read().await;
let collision_count = collision_guard.collision_count() as u64;
drop(collision_guard);
let mut stats = MapletStats::new(
self.config.capacity,
current_len,
self.config.false_positive_rate,
);
stats.update(
current_len,
memory_usage,
collision_count,
filter_stats.runs,
);
stats
}
pub async fn resize(&self, new_capacity: usize) -> MapletResult<()> {
if new_capacity <= self.config.capacity {
return Err(MapletError::ResizeFailed(
"New capacity must be larger".to_string(),
));
}
let fingerprint_bits =
FingerprintHasher::optimal_fingerprint_size(self.config.false_positive_rate);
let new_filter =
QuotientFilter::new(new_capacity, fingerprint_bits, HashFunction::default())?;
{
let mut filter_guard = self.filter.write().await;
*filter_guard = new_filter;
}
Ok(())
}
pub fn merge(&self, _other: &Self) -> MapletResult<()> {
if !self.config.enable_merging {
return Err(MapletError::MergeFailed("Merging not enabled".to_string()));
}
Err(MapletError::MergeFailed(
"Merge not fully implemented".to_string(),
))
}
fn hash_key(&self, key: &K) -> u64 {
use ahash::RandomState;
let random_state = RandomState::with_seed(42);
random_state.hash_one(&key)
}
#[allow(dead_code)]
fn find_slot_for_fingerprint(&self, fingerprint: u64) -> usize {
let quotient = self.extract_quotient(fingerprint);
self.perfect_hash.slot_index(quotient)
}
#[allow(dead_code, clippy::cast_precision_loss)] fn extract_quotient(&self, fingerprint: u64) -> u64 {
let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
let quotient_mask = if quotient_bits >= 64 {
u64::MAX
} else {
(1u64 << quotient_bits) - 1
};
fingerprint & quotient_mask
}
#[allow(dead_code, clippy::cast_precision_loss)] fn extract_remainder(&self, fingerprint: u64) -> u64 {
let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
let remainder_bits = 64 - quotient_bits;
let remainder_mask = if remainder_bits >= 64 {
u64::MAX
} else {
(1u64 << remainder_bits) - 1
};
(fingerprint >> quotient_bits) & remainder_mask
}
#[allow(dead_code)]
fn find_target_slot(&self, quotient: u64, _remainder: u64) -> usize {
self.perfect_hash.slot_index(quotient)
}
#[cfg(feature = "quotient-filter")]
async fn find_actual_slot_for_fingerprint(
&self,
quotient: u64,
remainder: u64,
) -> Option<usize> {
let filter_guard = self.filter.read().await;
let fingerprint = quotient | (remainder << filter_guard.quotient_bits());
let actual_slot = filter_guard.get_actual_slot_for_fingerprint(fingerprint);
drop(filter_guard);
actual_slot
}
#[cfg(feature = "quotient-filter")]
pub async fn find_slot_for_key(&self, key: &K) -> Option<usize> {
let fingerprint = self.hash_key(key);
let quotient = self.extract_quotient(fingerprint);
let remainder = self.extract_remainder(fingerprint);
self.find_actual_slot_for_fingerprint(quotient, remainder)
.await
}
async fn merge_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
{
let mut values_guard = self.values.write().await;
if let Some(existing_value) = values_guard.get(&fingerprint) {
let merged_value = self.operator.merge(existing_value.clone(), value)?;
values_guard.insert(fingerprint, merged_value);
} else {
values_guard.insert(fingerprint, value);
}
}
Ok(())
}
async fn store_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
{
let mut values_guard = self.values.write().await;
values_guard.insert(fingerprint, value);
}
Ok(())
}
const fn estimate_memory_usage(&self) -> usize {
let filter_slots_size =
self.config.capacity * std::mem::size_of::<crate::quotient_filter::SlotMetadata>();
let estimated_values_count = self.config.capacity / 4; let estimated_values_capacity = self.config.capacity / 2;
let values_size =
estimated_values_capacity * (std::mem::size_of::<u64>() + std::mem::size_of::<V>());
let hashmap_overhead = estimated_values_capacity * std::mem::size_of::<usize>() / 2;
let multiset_size =
estimated_values_count * (std::mem::size_of::<u64>() + std::mem::size_of::<usize>());
let overhead = std::mem::size_of::<Self>();
filter_slots_size + values_size + hashmap_overhead + multiset_size + overhead
}
}
impl<K, V, Op> Default for Maplet<K, V, Op>
where
K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
Op: MergeOperator<V> + Default + Send + Sync,
{
fn default() -> Self {
Self::new(1000, 0.01).expect("Failed to create default maplet")
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::operators::CounterOperator;
#[tokio::test]
async fn test_maplet_creation() {
let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01);
assert!(maplet.is_ok());
let maplet = maplet.unwrap();
assert_eq!(maplet.len().await, 0);
assert!(maplet.is_empty().await);
assert!((maplet.error_rate() - 0.01).abs() < f64::EPSILON);
}
#[tokio::test]
async fn test_maplet_insert_query() {
let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
assert!(maplet.insert("key2".to_string(), 10).await.is_ok());
assert_eq!(maplet.len().await, 2);
assert!(!maplet.is_empty().await);
assert!(maplet.contains(&"key1".to_string()).await);
assert!(maplet.contains(&"key2".to_string()).await);
assert!(!maplet.contains(&"key3".to_string()).await);
}
#[tokio::test]
async fn test_maplet_merge_values() {
let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
assert!(maplet.insert("key1".to_string(), 3).await.is_ok());
assert_eq!(maplet.len().await, 1);
let value = maplet.query(&"key1".to_string()).await;
assert!(value.is_some());
}
#[tokio::test]
async fn test_maplet_stats() {
let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
maplet.insert("key1".to_string(), 5).await.unwrap();
maplet.insert("key2".to_string(), 10).await.unwrap();
let stats = maplet.stats().await;
assert_eq!(stats.capacity, 100);
assert_eq!(stats.len, 2);
assert!(stats.load_factor > 0.0);
assert!(stats.memory_usage > 0);
}
#[tokio::test]
async fn test_concurrent_insertions_no_filter_inconsistency() {
use std::sync::Arc;
use tokio::task;
let maplet = Arc::new(Maplet::<String, u64, CounterOperator>::new(1000, 0.01).unwrap());
let mut handles = vec![];
for i in 0..5 {
let maplet_clone = Arc::clone(&maplet);
let handle = task::spawn(async move {
for j in 0..50 {
let key = format!("key_{}", j % 25); let value = u64::try_from(i * 50 + j).unwrap_or(0);
maplet_clone.insert(key, value).await.unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let len = maplet.len().await;
assert!(len > 0, "Maplet should have some items");
assert!(len <= 1000, "Should not exceed capacity");
for i in 0..50 {
let key = format!("key_{i}");
let result = maplet.query(&key).await;
assert!(result.is_some() || result.is_none());
}
}
#[tokio::test]
async fn test_memory_usage_accuracy() {
let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
for i in 0..10 {
let key = format!("key_{i}");
maplet
.insert(key, u64::try_from(i).unwrap_or(0))
.await
.unwrap();
}
let stats = maplet.stats().await;
let memory_usage = stats.memory_usage;
assert!(memory_usage > 0, "Memory usage should be positive");
assert!(
memory_usage < 100_000,
"Memory usage should be reasonable for 10 items"
);
println!("Memory usage for 10 items: {memory_usage} bytes");
}
#[cfg(feature = "quotient-filter")]
#[tokio::test]
async fn test_slot_finding_for_key() {
let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
let test_key = "test_key".to_string();
maplet.insert(test_key.clone(), 42).await.unwrap();
let slot = maplet.find_slot_for_key(&test_key).await;
assert!(slot.is_some(), "Should find a slot for existing key");
let non_existing_key = "non_existing".to_string();
let _non_existing_slot = maplet.find_slot_for_key(&non_existing_key).await;
}
}