1use crate::{
7 MapletError, MapletResult, MapletStats,
8 hash::{CollisionDetector, FingerprintHasher, HashFunction, PerfectHash},
9 operators::MergeOperator,
10 quotient_filter::QuotientFilter,
11 types::MapletConfig,
12};
13use std::hash::Hash;
14use std::marker::PhantomData;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18#[derive(Debug)]
27pub struct Maplet<K, V, Op>
28where
29 K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
30 V: Clone + std::fmt::Debug + Send + Sync,
31 Op: MergeOperator<V> + Send + Sync,
32{
33 config: MapletConfig,
35 filter: Arc<RwLock<QuotientFilter>>,
37 values: Arc<RwLock<std::collections::HashMap<u64, V>>>,
39 operator: Op,
41 collision_detector: Arc<RwLock<CollisionDetector>>,
43 #[allow(dead_code)]
45 perfect_hash: PerfectHash,
46 len: Arc<RwLock<usize>>,
48 _phantom: PhantomData<K>,
50}
51
52impl<K, V, Op> Maplet<K, V, Op>
53where
54 K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
55 V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
56 Op: MergeOperator<V> + Default + Send + Sync,
57{
58 pub fn new(capacity: usize, false_positive_rate: f64) -> MapletResult<Self> {
60 let config = MapletConfig::new(capacity, false_positive_rate);
61 Self::with_config(config)
62 }
63
64 pub fn with_operator(
66 capacity: usize,
67 false_positive_rate: f64,
68 operator: Op,
69 ) -> MapletResult<Self> {
70 let config = MapletConfig::new(capacity, false_positive_rate);
71 Self::with_config_and_operator(config, operator)
72 }
73
74 pub fn with_config(config: MapletConfig) -> MapletResult<Self> {
76 let operator = Op::default();
77 Self::with_config_and_operator(config, operator)
78 }
79
80 pub fn with_config_and_operator(config: MapletConfig, operator: Op) -> MapletResult<Self> {
82 config.validate()?;
83
84 let fingerprint_bits =
85 FingerprintHasher::optimal_fingerprint_size(config.false_positive_rate);
86 let filter =
87 QuotientFilter::new(config.capacity, fingerprint_bits, HashFunction::default())?;
88
89 let collision_detector = CollisionDetector::new(config.capacity / 4); let perfect_hash = PerfectHash::new(config.capacity, HashFunction::default());
91
92 Ok(Self {
93 config,
94 filter: Arc::new(RwLock::new(filter)),
95 values: Arc::new(RwLock::new(std::collections::HashMap::new())),
96 operator,
97 collision_detector: Arc::new(RwLock::new(collision_detector)),
98 perfect_hash,
99 len: Arc::new(RwLock::new(0)),
100 _phantom: PhantomData,
101 })
102 }
103
104 pub async fn insert(&self, key: K, value: V) -> MapletResult<()> {
106 let current_len = *self.len.read().await;
107 if current_len >= self.config.capacity {
108 if self.config.auto_resize {
109 self.resize(self.config.capacity * 2).await?;
110 } else {
111 return Err(MapletError::CapacityExceeded);
112 }
113 }
114
115 let fingerprint = self.hash_key(&key);
116
117 let values_guard = self.values.read().await;
119 let key_exists = values_guard.contains_key(&fingerprint);
120 drop(values_guard);
121
122 if key_exists {
123 self.merge_value(fingerprint, value).await?;
125 } else {
126 {
128 let mut filter_guard = self.filter.write().await;
129 filter_guard.insert(fingerprint)?;
130 }
131 self.store_value(fingerprint, value).await?;
132 {
133 let mut len_guard = self.len.write().await;
134 *len_guard += 1;
135 }
136 }
137
138 Ok(())
139 }
140
141 pub async fn query(&self, key: &K) -> Option<V> {
143 let fingerprint = self.hash_key(key);
144
145 let filter_guard = self.filter.read().await;
146 if !filter_guard.query(fingerprint) {
147 return None;
148 }
149 drop(filter_guard);
150
151 let values_guard = self.values.read().await;
153 values_guard.get(&fingerprint).cloned()
154 }
155
156 pub async fn contains(&self, key: &K) -> bool {
158 let fingerprint = self.hash_key(key);
159 let filter_guard = self.filter.read().await;
160 filter_guard.query(fingerprint)
161 }
162
163 pub async fn delete(&self, key: &K, value: &V) -> MapletResult<bool> {
165 if !self.config.enable_deletion {
166 return Err(MapletError::Internal("Deletion not enabled".to_string()));
167 }
168
169 let fingerprint = self.hash_key(key);
170
171 let filter_guard = self.filter.read().await;
172 if !filter_guard.query(fingerprint) {
173 return Ok(false);
174 }
175 drop(filter_guard);
176
177 {
178 let mut values_guard = self.values.write().await;
179 if let Some(existing_value) = values_guard.get(&fingerprint) {
180 if existing_value == value {
182 {
184 let mut filter_guard = self.filter.write().await;
185 filter_guard.delete(fingerprint)?;
186 }
187 values_guard.remove(&fingerprint);
188 {
189 let mut len_guard = self.len.write().await;
190 *len_guard -= 1;
191 }
192 return Ok(true);
193 }
194 }
195 }
196
197 Ok(false)
198 }
199
200 pub async fn len(&self) -> usize {
202 *self.len.read().await
203 }
204
205 pub async fn is_empty(&self) -> bool {
207 *self.len.read().await == 0
208 }
209
210 pub const fn error_rate(&self) -> f64 {
212 self.config.false_positive_rate
213 }
214
215 #[allow(clippy::cast_precision_loss)] pub async fn load_factor(&self) -> f64 {
218 let current_len = *self.len.read().await;
219 current_len as f64 / self.config.capacity as f64
220 }
221
222 pub async fn stats(&self) -> MapletStats {
224 let filter_guard = self.filter.read().await;
225 let filter_stats = filter_guard.stats();
226 drop(filter_guard);
227
228 let memory_usage = self.estimate_memory_usage();
229 let current_len = *self.len.read().await;
230
231 let collision_guard = self.collision_detector.read().await;
232 let collision_count = collision_guard.collision_count() as u64;
233 drop(collision_guard);
234
235 let mut stats = MapletStats::new(
236 self.config.capacity,
237 current_len,
238 self.config.false_positive_rate,
239 );
240 stats.update(
241 current_len,
242 memory_usage,
243 collision_count,
244 filter_stats.runs,
245 );
246 stats
247 }
248
249 pub async fn resize(&self, new_capacity: usize) -> MapletResult<()> {
251 if new_capacity <= self.config.capacity {
252 return Err(MapletError::ResizeFailed(
253 "New capacity must be larger".to_string(),
254 ));
255 }
256
257 let fingerprint_bits =
259 FingerprintHasher::optimal_fingerprint_size(self.config.false_positive_rate);
260 let new_filter =
261 QuotientFilter::new(new_capacity, fingerprint_bits, HashFunction::default())?;
262
263 {
265 let mut filter_guard = self.filter.write().await;
266 *filter_guard = new_filter;
267 }
268
269 Ok(())
275 }
276
277 pub fn merge(&self, _other: &Self) -> MapletResult<()> {
279 if !self.config.enable_merging {
280 return Err(MapletError::MergeFailed("Merging not enabled".to_string()));
281 }
282
283 Err(MapletError::MergeFailed(
287 "Merge not fully implemented".to_string(),
288 ))
289 }
290
291 fn hash_key(&self, key: &K) -> u64 {
293 use ahash::RandomState;
296
297 let random_state = RandomState::with_seed(42);
300
301 random_state.hash_one(&key)
302 }
303
304 #[allow(dead_code)]
306 fn find_slot_for_fingerprint(&self, fingerprint: u64) -> usize {
307 let quotient = self.extract_quotient(fingerprint);
309
310 self.perfect_hash.slot_index(quotient)
312 }
313
314 #[allow(dead_code, clippy::cast_precision_loss)] fn extract_quotient(&self, fingerprint: u64) -> u64 {
317 let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
318 let quotient_mask = if quotient_bits >= 64 {
319 u64::MAX
320 } else {
321 (1u64 << quotient_bits) - 1
322 };
323 fingerprint & quotient_mask
324 }
325
326 #[allow(dead_code, clippy::cast_precision_loss)] fn extract_remainder(&self, fingerprint: u64) -> u64 {
329 let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
330 let remainder_bits = 64 - quotient_bits;
331 let remainder_mask = if remainder_bits >= 64 {
332 u64::MAX
333 } else {
334 (1u64 << remainder_bits) - 1
335 };
336 (fingerprint >> quotient_bits) & remainder_mask
337 }
338
339 #[allow(dead_code)]
341 fn find_target_slot(&self, quotient: u64, _remainder: u64) -> usize {
342 self.perfect_hash.slot_index(quotient)
344 }
345
346 #[cfg(feature = "quotient-filter")]
349 async fn find_actual_slot_for_fingerprint(
350 &self,
351 quotient: u64,
352 remainder: u64,
353 ) -> Option<usize> {
354 let filter_guard = self.filter.read().await;
357
358 let fingerprint = quotient | (remainder << filter_guard.quotient_bits());
363 let actual_slot = filter_guard.get_actual_slot_for_fingerprint(fingerprint);
364
365 drop(filter_guard);
366 actual_slot
367 }
368
369 #[cfg(feature = "quotient-filter")]
372 pub async fn find_slot_for_key(&self, key: &K) -> Option<usize> {
373 let fingerprint = self.hash_key(key);
374 let quotient = self.extract_quotient(fingerprint);
375 let remainder = self.extract_remainder(fingerprint);
376
377 self.find_actual_slot_for_fingerprint(quotient, remainder)
378 .await
379 }
380
381 async fn merge_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
383 {
384 let mut values_guard = self.values.write().await;
385 if let Some(existing_value) = values_guard.get(&fingerprint) {
386 let merged_value = self.operator.merge(existing_value.clone(), value)?;
387 values_guard.insert(fingerprint, merged_value);
388 } else {
389 values_guard.insert(fingerprint, value);
391 }
392 }
393
394 Ok(())
395 }
396
397 async fn store_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
399 {
400 let mut values_guard = self.values.write().await;
401 values_guard.insert(fingerprint, value);
402 }
403
404 Ok(())
405 }
406
407 const fn estimate_memory_usage(&self) -> usize {
409 let filter_slots_size =
411 self.config.capacity * std::mem::size_of::<crate::quotient_filter::SlotMetadata>();
412
413 let estimated_values_count = self.config.capacity / 4; let estimated_values_capacity = self.config.capacity / 2; let values_size =
420 estimated_values_capacity * (std::mem::size_of::<u64>() + std::mem::size_of::<V>());
421
422 let hashmap_overhead = estimated_values_capacity * std::mem::size_of::<usize>() / 2;
424
425 let multiset_size =
427 estimated_values_count * (std::mem::size_of::<u64>() + std::mem::size_of::<usize>());
428
429 let overhead = std::mem::size_of::<Self>();
431
432 filter_slots_size + values_size + hashmap_overhead + multiset_size + overhead
433 }
434}
435
436impl<K, V, Op> Default for Maplet<K, V, Op>
438where
439 K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
440 V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
441 Op: MergeOperator<V> + Default + Send + Sync,
442{
443 fn default() -> Self {
444 Self::new(1000, 0.01).expect("Failed to create default maplet")
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451 use crate::operators::CounterOperator;
452
453 #[tokio::test]
454 async fn test_maplet_creation() {
455 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01);
456 assert!(maplet.is_ok());
457
458 let maplet = maplet.unwrap();
459 assert_eq!(maplet.len().await, 0);
460 assert!(maplet.is_empty().await);
461 assert!((maplet.error_rate() - 0.01).abs() < f64::EPSILON);
462 }
463
464 #[tokio::test]
465 async fn test_maplet_insert_query() {
466 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
467
468 assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
470 assert!(maplet.insert("key2".to_string(), 10).await.is_ok());
471
472 assert_eq!(maplet.len().await, 2);
473 assert!(!maplet.is_empty().await);
474
475 assert!(maplet.contains(&"key1".to_string()).await);
477 assert!(maplet.contains(&"key2".to_string()).await);
478
479 assert!(!maplet.contains(&"key3".to_string()).await);
481 }
482
483 #[tokio::test]
484 async fn test_maplet_merge_values() {
485 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
486
487 assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
489 assert!(maplet.insert("key1".to_string(), 3).await.is_ok());
490
491 assert_eq!(maplet.len().await, 1); let value = maplet.query(&"key1".to_string()).await;
495 assert!(value.is_some());
496 }
499
500 #[tokio::test]
501 async fn test_maplet_stats() {
502 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
503
504 maplet.insert("key1".to_string(), 5).await.unwrap();
505 maplet.insert("key2".to_string(), 10).await.unwrap();
506
507 let stats = maplet.stats().await;
508 assert_eq!(stats.capacity, 100);
509 assert_eq!(stats.len, 2);
510 assert!(stats.load_factor > 0.0);
511 assert!(stats.memory_usage > 0);
512 }
513
514 #[tokio::test]
515 async fn test_concurrent_insertions_no_filter_inconsistency() {
516 use std::sync::Arc;
517 use tokio::task;
518
519 let maplet = Arc::new(Maplet::<String, u64, CounterOperator>::new(1000, 0.01).unwrap());
520 let mut handles = vec![];
521
522 for i in 0..5 {
524 let maplet_clone = Arc::clone(&maplet);
525 let handle = task::spawn(async move {
526 for j in 0..50 {
527 let key = format!("key_{}", j % 25); let value = u64::try_from(i * 50 + j).unwrap_or(0);
529 maplet_clone.insert(key, value).await.unwrap();
530 }
531 });
532 handles.push(handle);
533 }
534
535 for handle in handles {
537 handle.await.unwrap();
538 }
539
540 let len = maplet.len().await;
542 assert!(len > 0, "Maplet should have some items");
543 assert!(len <= 1000, "Should not exceed capacity");
546
547 for i in 0..50 {
549 let key = format!("key_{i}");
550 let result = maplet.query(&key).await;
551 assert!(result.is_some() || result.is_none());
553 }
554 }
555
556 #[tokio::test]
557 async fn test_memory_usage_accuracy() {
558 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
559
560 for i in 0..10 {
562 let key = format!("key_{i}");
563 maplet
564 .insert(key, u64::try_from(i).unwrap_or(0))
565 .await
566 .unwrap();
567 }
568
569 let stats = maplet.stats().await;
570 let memory_usage = stats.memory_usage;
571
572 assert!(memory_usage > 0, "Memory usage should be positive");
574 assert!(
575 memory_usage < 100_000,
576 "Memory usage should be reasonable for 10 items"
577 );
578
579 println!("Memory usage for 10 items: {memory_usage} bytes");
582 }
583
584 #[cfg(feature = "quotient-filter")]
585 #[tokio::test]
586 async fn test_slot_finding_for_key() {
587 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
588
589 let test_key = "test_key".to_string();
591 maplet.insert(test_key.clone(), 42).await.unwrap();
592
593 let slot = maplet.find_slot_for_key(&test_key).await;
595 assert!(slot.is_some(), "Should find a slot for existing key");
596
597 let non_existing_key = "non_existing".to_string();
601 let _non_existing_slot = maplet.find_slot_for_key(&non_existing_key).await;
602 }
605}