1use std::hash::{Hash, BuildHasher};
7use std::marker::PhantomData;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use crate::{
11 MapletError, MapletResult, MapletStats,
12 types::MapletConfig,
13 hash::{FingerprintHasher, HashFunction, CollisionDetector, PerfectHash},
14 quotient_filter::QuotientFilter,
15 operators::MergeOperator,
16};
17
18#[derive(Debug)]
27pub struct Maplet<K, V, Op>
28where
29 K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
30 V: Clone + std::fmt::Debug + Send + Sync,
31 Op: MergeOperator<V> + Send + Sync,
32{
33 config: MapletConfig,
35 filter: Arc<RwLock<QuotientFilter>>,
37 values: Arc<RwLock<std::collections::HashMap<u64, V>>>,
39 operator: Op,
41 collision_detector: Arc<RwLock<CollisionDetector>>,
43 perfect_hash: PerfectHash,
45 len: Arc<RwLock<usize>>,
47 _phantom: PhantomData<K>,
49}
50
51impl<K, V, Op> Maplet<K, V, Op>
52where
53 K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
54 V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
55 Op: MergeOperator<V> + Default + Send + Sync,
56{
57 pub fn new(capacity: usize, false_positive_rate: f64) -> MapletResult<Self> {
59 let config = MapletConfig::new(capacity, false_positive_rate);
60 Self::with_config(config)
61 }
62
63 pub fn with_operator(capacity: usize, false_positive_rate: f64, operator: Op) -> MapletResult<Self> {
65 let config = MapletConfig::new(capacity, false_positive_rate);
66 Self::with_config_and_operator(config, operator)
67 }
68
69 pub fn with_config(config: MapletConfig) -> MapletResult<Self> {
71 let operator = Op::default();
72 Self::with_config_and_operator(config, operator)
73 }
74
75 pub fn with_config_and_operator(config: MapletConfig, operator: Op) -> MapletResult<Self> {
77 config.validate()?;
78
79 let fingerprint_bits = FingerprintHasher::optimal_fingerprint_size(config.false_positive_rate);
80 let filter = QuotientFilter::new(config.capacity, fingerprint_bits, HashFunction::default())?;
81
82 let collision_detector = CollisionDetector::new(config.capacity / 4); let perfect_hash = PerfectHash::new(config.capacity, HashFunction::default());
84
85 Ok(Self {
86 config,
87 filter: Arc::new(RwLock::new(filter)),
88 values: Arc::new(RwLock::new(std::collections::HashMap::new())),
89 operator,
90 collision_detector: Arc::new(RwLock::new(collision_detector)),
91 perfect_hash,
92 len: Arc::new(RwLock::new(0)),
93 _phantom: PhantomData,
94 })
95 }
96
97 pub async fn insert(&self, key: K, value: V) -> MapletResult<()> {
99 let current_len = *self.len.read().await;
100 if current_len >= self.config.capacity {
101 if self.config.auto_resize {
102 self.resize(self.config.capacity * 2).await?;
103 } else {
104 return Err(MapletError::CapacityExceeded);
105 }
106 }
107
108 let fingerprint = self.hash_key(&key);
109
110 let filter_guard = self.filter.read().await;
112 let key_exists = filter_guard.query(fingerprint);
113 drop(filter_guard);
114
115 if key_exists {
116 self.merge_value(fingerprint, value).await?;
118 } else {
119 {
121 let mut filter_guard = self.filter.write().await;
122 filter_guard.insert(fingerprint)?;
123 }
124 self.store_value(fingerprint, value).await?;
125 {
126 let mut len_guard = self.len.write().await;
127 *len_guard += 1;
128 }
129 }
130
131 Ok(())
132 }
133
134 pub async fn query(&self, key: &K) -> Option<V> {
136 let fingerprint = self.hash_key(key);
137
138 let filter_guard = self.filter.read().await;
139 if !filter_guard.query(fingerprint) {
140 return None;
141 }
142 drop(filter_guard);
143
144 let values_guard = self.values.read().await;
146 values_guard.get(&fingerprint).cloned()
147 }
148
149 pub async fn contains(&self, key: &K) -> bool {
151 let fingerprint = self.hash_key(key);
152 let filter_guard = self.filter.read().await;
153 filter_guard.query(fingerprint)
154 }
155
156 pub async fn delete(&self, key: &K, value: &V) -> MapletResult<bool> {
158 if !self.config.enable_deletion {
159 return Err(MapletError::Internal("Deletion not enabled".to_string()));
160 }
161
162 let fingerprint = self.hash_key(key);
163
164 let filter_guard = self.filter.read().await;
165 if !filter_guard.query(fingerprint) {
166 return Ok(false);
167 }
168 drop(filter_guard);
169
170 let mut values_guard = self.values.write().await;
171 if let Some(existing_value) = values_guard.get(&fingerprint) {
172 if existing_value == value {
174 {
176 let mut filter_guard = self.filter.write().await;
177 filter_guard.delete(fingerprint)?;
178 }
179 values_guard.remove(&fingerprint);
180 {
181 let mut len_guard = self.len.write().await;
182 *len_guard -= 1;
183 }
184 return Ok(true);
185 }
186 }
187
188 Ok(false)
189 }
190
191 pub async fn len(&self) -> usize {
193 *self.len.read().await
194 }
195
196 pub async fn is_empty(&self) -> bool {
198 *self.len.read().await == 0
199 }
200
201 pub fn error_rate(&self) -> f64 {
203 self.config.false_positive_rate
204 }
205
206 pub async fn load_factor(&self) -> f64 {
208 let current_len = *self.len.read().await;
209 current_len as f64 / self.config.capacity as f64
210 }
211
212 pub async fn stats(&self) -> MapletStats {
214 let filter_guard = self.filter.read().await;
215 let filter_stats = filter_guard.stats();
216 drop(filter_guard);
217
218 let memory_usage = self.estimate_memory_usage();
219 let current_len = *self.len.read().await;
220
221 let collision_guard = self.collision_detector.read().await;
222 let collision_count = collision_guard.collision_count() as u64;
223 drop(collision_guard);
224
225 let mut stats = MapletStats::new(
226 self.config.capacity,
227 current_len,
228 self.config.false_positive_rate,
229 );
230 stats.update(
231 current_len,
232 memory_usage,
233 collision_count,
234 filter_stats.runs,
235 );
236 stats
237 }
238
239 pub async fn resize(&self, new_capacity: usize) -> MapletResult<()> {
241 if new_capacity <= self.config.capacity {
242 return Err(MapletError::ResizeFailed("New capacity must be larger".to_string()));
243 }
244
245 let fingerprint_bits = FingerprintHasher::optimal_fingerprint_size(self.config.false_positive_rate);
247 let new_filter = QuotientFilter::new(
248 new_capacity,
249 fingerprint_bits,
250 HashFunction::default(),
251 )?;
252
253 {
255 let mut filter_guard = self.filter.write().await;
256 *filter_guard = new_filter;
257 }
258
259 Ok(())
265 }
266
267 pub async fn merge(&self, _other: &Maplet<K, V, Op>) -> MapletResult<()> {
269 if !self.config.enable_merging {
270 return Err(MapletError::MergeFailed("Merging not enabled".to_string()));
271 }
272
273 Err(MapletError::MergeFailed("Merge not fully implemented".to_string()))
277 }
278
279 fn hash_key(&self, key: &K) -> u64 {
281 use ahash::RandomState;
284 use std::hash::Hasher;
285
286 let random_state = RandomState::with_seed(42);
289 let mut hasher = random_state.build_hasher();
290 key.hash(&mut hasher);
291 hasher.finish()
292 }
293
294 fn find_slot_for_fingerprint(&self, fingerprint: u64) -> usize {
296 let quotient = self.extract_quotient(fingerprint);
298
299 self.perfect_hash.slot_index(quotient)
301 }
302
303 fn extract_quotient(&self, fingerprint: u64) -> u64 {
305 let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
306 let quotient_mask = if quotient_bits >= 64 {
307 u64::MAX
308 } else {
309 (1u64 << quotient_bits) - 1
310 };
311 fingerprint & quotient_mask
312 }
313
314 fn extract_remainder(&self, fingerprint: u64) -> u64 {
316 let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
317 let remainder_bits = 64 - quotient_bits;
318 let remainder_mask = if remainder_bits >= 64 {
319 u64::MAX
320 } else {
321 (1u64 << remainder_bits) - 1
322 };
323 (fingerprint >> quotient_bits) & remainder_mask
324 }
325
326 fn find_target_slot(&self, quotient: u64, _remainder: u64) -> usize {
328 self.perfect_hash.slot_index(quotient)
330 }
331
332 fn find_actual_slot_for_fingerprint(&self, quotient: u64, _remainder: u64) -> usize {
335 let target_slot = self.perfect_hash.slot_index(quotient);
337
338 target_slot
345 }
346
347 async fn merge_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
349 let mut values_guard = self.values.write().await;
350 if let Some(existing_value) = values_guard.get(&fingerprint) {
351 let merged_value = self.operator.merge(existing_value.clone(), value)?;
352 values_guard.insert(fingerprint, merged_value);
353 } else {
354 return Err(MapletError::Internal("Filter inconsistency detected".to_string()));
356 }
357
358 Ok(())
359 }
360
361 async fn store_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
363 let mut values_guard = self.values.write().await;
364 values_guard.insert(fingerprint, value);
365
366 Ok(())
367 }
368
369 fn estimate_memory_usage(&self) -> usize {
371 let filter_size = self.config.capacity * std::mem::size_of::<crate::quotient_filter::SlotMetadata>();
373 let values_size = self.config.capacity * std::mem::size_of::<Option<V>>();
374 let overhead = std::mem::size_of::<Self>();
375
376 filter_size + values_size + overhead
377 }
378}
379
380impl<K, V, Op> Default for Maplet<K, V, Op>
382where
383 K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
384 V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
385 Op: MergeOperator<V> + Default + Send + Sync,
386{
387 fn default() -> Self {
388 Self::new(1000, 0.01).expect("Failed to create default maplet")
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use crate::operators::CounterOperator;
396
397 #[tokio::test]
398 async fn test_maplet_creation() {
399 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01);
400 assert!(maplet.is_ok());
401
402 let maplet = maplet.unwrap();
403 assert_eq!(maplet.len().await, 0);
404 assert!(maplet.is_empty().await);
405 assert_eq!(maplet.error_rate(), 0.01);
406 }
407
408 #[tokio::test]
409 async fn test_maplet_insert_query() {
410 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
411
412 assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
414 assert!(maplet.insert("key2".to_string(), 10).await.is_ok());
415
416 assert_eq!(maplet.len().await, 2);
417 assert!(!maplet.is_empty().await);
418
419 assert!(maplet.contains(&"key1".to_string()).await);
421 assert!(maplet.contains(&"key2".to_string()).await);
422
423 assert!(!maplet.contains(&"key3".to_string()).await);
425 }
426
427 #[tokio::test]
428 async fn test_maplet_merge_values() {
429 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
430
431 assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
433 assert!(maplet.insert("key1".to_string(), 3).await.is_ok());
434
435 assert_eq!(maplet.len().await, 1); let value = maplet.query(&"key1".to_string()).await;
439 assert!(value.is_some());
440 }
443
444 #[tokio::test]
445 async fn test_maplet_stats() {
446 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
447
448 maplet.insert("key1".to_string(), 5).await.unwrap();
449 maplet.insert("key2".to_string(), 10).await.unwrap();
450
451 let stats = maplet.stats().await;
452 assert_eq!(stats.capacity, 100);
453 assert_eq!(stats.len, 2);
454 assert!(stats.load_factor > 0.0);
455 assert!(stats.memory_usage > 0);
456 }
457}