1use std::hash::{Hash, BuildHasher};
7use std::marker::PhantomData;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use crate::{
11 MapletError, MapletResult, MapletStats,
12 types::MapletConfig,
13 hash::{FingerprintHasher, HashFunction, CollisionDetector, PerfectHash},
14 quotient_filter::QuotientFilter,
15 operators::MergeOperator,
16};
17
18#[derive(Debug)]
27pub struct Maplet<K, V, Op>
28where
29 K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
30 V: Clone + std::fmt::Debug + Send + Sync,
31 Op: MergeOperator<V> + Send + Sync,
32{
33 config: MapletConfig,
35 filter: Arc<RwLock<QuotientFilter>>,
37 values: Arc<RwLock<std::collections::HashMap<u64, V>>>,
39 operator: Op,
41 collision_detector: Arc<RwLock<CollisionDetector>>,
43 #[allow(dead_code)]
45 perfect_hash: PerfectHash,
46 len: Arc<RwLock<usize>>,
48 _phantom: PhantomData<K>,
50}
51
52impl<K, V, Op> Maplet<K, V, Op>
53where
54 K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
55 V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
56 Op: MergeOperator<V> + Default + Send + Sync,
57{
58 pub fn new(capacity: usize, false_positive_rate: f64) -> MapletResult<Self> {
60 let config = MapletConfig::new(capacity, false_positive_rate);
61 Self::with_config(config)
62 }
63
64 pub fn with_operator(capacity: usize, false_positive_rate: f64, operator: Op) -> MapletResult<Self> {
66 let config = MapletConfig::new(capacity, false_positive_rate);
67 Self::with_config_and_operator(config, operator)
68 }
69
70 pub fn with_config(config: MapletConfig) -> MapletResult<Self> {
72 let operator = Op::default();
73 Self::with_config_and_operator(config, operator)
74 }
75
76 pub fn with_config_and_operator(config: MapletConfig, operator: Op) -> MapletResult<Self> {
78 config.validate()?;
79
80 let fingerprint_bits = FingerprintHasher::optimal_fingerprint_size(config.false_positive_rate);
81 let filter = QuotientFilter::new(config.capacity, fingerprint_bits, HashFunction::default())?;
82
83 let collision_detector = CollisionDetector::new(config.capacity / 4); let perfect_hash = PerfectHash::new(config.capacity, HashFunction::default());
85
86 Ok(Self {
87 config,
88 filter: Arc::new(RwLock::new(filter)),
89 values: Arc::new(RwLock::new(std::collections::HashMap::new())),
90 operator,
91 collision_detector: Arc::new(RwLock::new(collision_detector)),
92 perfect_hash,
93 len: Arc::new(RwLock::new(0)),
94 _phantom: PhantomData,
95 })
96 }
97
98 pub async fn insert(&self, key: K, value: V) -> MapletResult<()> {
100 let current_len = *self.len.read().await;
101 if current_len >= self.config.capacity {
102 if self.config.auto_resize {
103 self.resize(self.config.capacity * 2).await?;
104 } else {
105 return Err(MapletError::CapacityExceeded);
106 }
107 }
108
109 let fingerprint = self.hash_key(&key);
110
111 let values_guard = self.values.read().await;
113 let key_exists = values_guard.contains_key(&fingerprint);
114 drop(values_guard);
115
116 if key_exists {
117 self.merge_value(fingerprint, value).await?;
119 } else {
120 {
122 let mut filter_guard = self.filter.write().await;
123 filter_guard.insert(fingerprint)?;
124 }
125 self.store_value(fingerprint, value).await?;
126 {
127 let mut len_guard = self.len.write().await;
128 *len_guard += 1;
129 }
130 }
131
132 Ok(())
133 }
134
135 pub async fn query(&self, key: &K) -> Option<V> {
137 let fingerprint = self.hash_key(key);
138
139 let filter_guard = self.filter.read().await;
140 if !filter_guard.query(fingerprint) {
141 return None;
142 }
143 drop(filter_guard);
144
145 let values_guard = self.values.read().await;
147 values_guard.get(&fingerprint).cloned()
148 }
149
150 pub async fn contains(&self, key: &K) -> bool {
152 let fingerprint = self.hash_key(key);
153 let filter_guard = self.filter.read().await;
154 filter_guard.query(fingerprint)
155 }
156
157 pub async fn delete(&self, key: &K, value: &V) -> MapletResult<bool> {
159 if !self.config.enable_deletion {
160 return Err(MapletError::Internal("Deletion not enabled".to_string()));
161 }
162
163 let fingerprint = self.hash_key(key);
164
165 let filter_guard = self.filter.read().await;
166 if !filter_guard.query(fingerprint) {
167 return Ok(false);
168 }
169 drop(filter_guard);
170
171 let mut values_guard = self.values.write().await;
172 if let Some(existing_value) = values_guard.get(&fingerprint) {
173 if existing_value == value {
175 {
177 let mut filter_guard = self.filter.write().await;
178 filter_guard.delete(fingerprint)?;
179 }
180 values_guard.remove(&fingerprint);
181 {
182 let mut len_guard = self.len.write().await;
183 *len_guard -= 1;
184 }
185 return Ok(true);
186 }
187 }
188
189 Ok(false)
190 }
191
192 pub async fn len(&self) -> usize {
194 *self.len.read().await
195 }
196
197 pub async fn is_empty(&self) -> bool {
199 *self.len.read().await == 0
200 }
201
202 pub fn error_rate(&self) -> f64 {
204 self.config.false_positive_rate
205 }
206
207 pub async fn load_factor(&self) -> f64 {
209 let current_len = *self.len.read().await;
210 current_len as f64 / self.config.capacity as f64
211 }
212
213 pub async fn stats(&self) -> MapletStats {
215 let filter_guard = self.filter.read().await;
216 let filter_stats = filter_guard.stats();
217 drop(filter_guard);
218
219 let memory_usage = self.estimate_memory_usage();
220 let current_len = *self.len.read().await;
221
222 let collision_guard = self.collision_detector.read().await;
223 let collision_count = collision_guard.collision_count() as u64;
224 drop(collision_guard);
225
226 let mut stats = MapletStats::new(
227 self.config.capacity,
228 current_len,
229 self.config.false_positive_rate,
230 );
231 stats.update(
232 current_len,
233 memory_usage,
234 collision_count,
235 filter_stats.runs,
236 );
237 stats
238 }
239
240 pub async fn resize(&self, new_capacity: usize) -> MapletResult<()> {
242 if new_capacity <= self.config.capacity {
243 return Err(MapletError::ResizeFailed("New capacity must be larger".to_string()));
244 }
245
246 let fingerprint_bits = FingerprintHasher::optimal_fingerprint_size(self.config.false_positive_rate);
248 let new_filter = QuotientFilter::new(
249 new_capacity,
250 fingerprint_bits,
251 HashFunction::default(),
252 )?;
253
254 {
256 let mut filter_guard = self.filter.write().await;
257 *filter_guard = new_filter;
258 }
259
260 Ok(())
266 }
267
268 pub fn merge(&self, _other: &Maplet<K, V, Op>) -> MapletResult<()> {
270 if !self.config.enable_merging {
271 return Err(MapletError::MergeFailed("Merging not enabled".to_string()));
272 }
273
274 Err(MapletError::MergeFailed("Merge not fully implemented".to_string()))
278 }
279
280 fn hash_key(&self, key: &K) -> u64 {
282 use ahash::RandomState;
285 use std::hash::Hasher;
286
287 let random_state = RandomState::with_seed(42);
290 let mut hasher = random_state.build_hasher();
291 key.hash(&mut hasher);
292 hasher.finish()
293 }
294
295 #[allow(dead_code)]
297 fn find_slot_for_fingerprint(&self, fingerprint: u64) -> usize {
298 let quotient = self.extract_quotient(fingerprint);
300
301 self.perfect_hash.slot_index(quotient)
303 }
304
305 #[allow(dead_code)]
307 fn extract_quotient(&self, fingerprint: u64) -> u64 {
308 let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
309 let quotient_mask = if quotient_bits >= 64 {
310 u64::MAX
311 } else {
312 (1u64 << quotient_bits) - 1
313 };
314 fingerprint & quotient_mask
315 }
316
317 #[allow(dead_code)]
319 fn extract_remainder(&self, fingerprint: u64) -> u64 {
320 let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
321 let remainder_bits = 64 - quotient_bits;
322 let remainder_mask = if remainder_bits >= 64 {
323 u64::MAX
324 } else {
325 (1u64 << remainder_bits) - 1
326 };
327 (fingerprint >> quotient_bits) & remainder_mask
328 }
329
330 #[allow(dead_code)]
332 fn find_target_slot(&self, quotient: u64, _remainder: u64) -> usize {
333 self.perfect_hash.slot_index(quotient)
335 }
336
337 #[cfg(feature = "quotient-filter")]
340 async fn find_actual_slot_for_fingerprint(&self, quotient: u64, remainder: u64) -> Option<usize> {
341 let filter_guard = self.filter.read().await;
344
345 let fingerprint = quotient | (remainder << filter_guard.quotient_bits());
350 let actual_slot = filter_guard.get_actual_slot_for_fingerprint(fingerprint);
351
352 drop(filter_guard);
353 actual_slot
354 }
355
356 #[cfg(feature = "quotient-filter")]
359 pub async fn find_slot_for_key(&self, key: &K) -> Option<usize> {
360 let fingerprint = self.hash_key(key);
361 let quotient = self.extract_quotient(fingerprint);
362 let remainder = self.extract_remainder(fingerprint);
363
364 self.find_actual_slot_for_fingerprint(quotient, remainder).await
365 }
366
367 async fn merge_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
369 let mut values_guard = self.values.write().await;
370 if let Some(existing_value) = values_guard.get(&fingerprint) {
371 let merged_value = self.operator.merge(existing_value.clone(), value)?;
372 values_guard.insert(fingerprint, merged_value);
373 } else {
374 values_guard.insert(fingerprint, value);
376 }
377
378 Ok(())
379 }
380
381 async fn store_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
383 let mut values_guard = self.values.write().await;
384 values_guard.insert(fingerprint, value);
385
386 Ok(())
387 }
388
389 fn estimate_memory_usage(&self) -> usize {
391 let filter_slots_size = self.config.capacity * std::mem::size_of::<crate::quotient_filter::SlotMetadata>();
393
394 let estimated_values_count = self.config.capacity / 4; let estimated_values_capacity = self.config.capacity / 2; let values_size = estimated_values_capacity * (std::mem::size_of::<u64>() + std::mem::size_of::<V>());
401
402 let hashmap_overhead = estimated_values_capacity * std::mem::size_of::<usize>() / 2;
404
405 let multiset_size = estimated_values_count * (std::mem::size_of::<u64>() + std::mem::size_of::<usize>());
407
408 let overhead = std::mem::size_of::<Self>();
410
411 filter_slots_size + values_size + hashmap_overhead + multiset_size + overhead
412 }
413}
414
415impl<K, V, Op> Default for Maplet<K, V, Op>
417where
418 K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
419 V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
420 Op: MergeOperator<V> + Default + Send + Sync,
421{
422 fn default() -> Self {
423 Self::new(1000, 0.01).expect("Failed to create default maplet")
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430 use crate::operators::CounterOperator;
431
432 #[tokio::test]
433 async fn test_maplet_creation() {
434 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01);
435 assert!(maplet.is_ok());
436
437 let maplet = maplet.unwrap();
438 assert_eq!(maplet.len().await, 0);
439 assert!(maplet.is_empty().await);
440 assert!((maplet.error_rate() - 0.01).abs() < f64::EPSILON);
441 }
442
443 #[tokio::test]
444 async fn test_maplet_insert_query() {
445 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
446
447 assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
449 assert!(maplet.insert("key2".to_string(), 10).await.is_ok());
450
451 assert_eq!(maplet.len().await, 2);
452 assert!(!maplet.is_empty().await);
453
454 assert!(maplet.contains(&"key1".to_string()).await);
456 assert!(maplet.contains(&"key2".to_string()).await);
457
458 assert!(!maplet.contains(&"key3".to_string()).await);
460 }
461
462 #[tokio::test]
463 async fn test_maplet_merge_values() {
464 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
465
466 assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
468 assert!(maplet.insert("key1".to_string(), 3).await.is_ok());
469
470 assert_eq!(maplet.len().await, 1); let value = maplet.query(&"key1".to_string()).await;
474 assert!(value.is_some());
475 }
478
479 #[tokio::test]
480 async fn test_maplet_stats() {
481 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
482
483 maplet.insert("key1".to_string(), 5).await.unwrap();
484 maplet.insert("key2".to_string(), 10).await.unwrap();
485
486 let stats = maplet.stats().await;
487 assert_eq!(stats.capacity, 100);
488 assert_eq!(stats.len, 2);
489 assert!(stats.load_factor > 0.0);
490 assert!(stats.memory_usage > 0);
491 }
492
493 #[tokio::test]
494 async fn test_concurrent_insertions_no_filter_inconsistency() {
495 use std::sync::Arc;
496 use tokio::task;
497
498 let maplet = Arc::new(Maplet::<String, u64, CounterOperator>::new(1000, 0.01).unwrap());
499 let mut handles = vec![];
500
501 for i in 0..5 {
503 let maplet_clone = Arc::clone(&maplet);
504 let handle = task::spawn(async move {
505 for j in 0..50 {
506 let key = format!("key_{}", j % 25); let value = u64::try_from(i * 50 + j).unwrap_or(0);
508 maplet_clone.insert(key, value).await.unwrap();
509 }
510 });
511 handles.push(handle);
512 }
513
514 for handle in handles {
516 handle.await.unwrap();
517 }
518
519 let len = maplet.len().await;
521 assert!(len > 0, "Maplet should have some items");
522 assert!(len <= 1000, "Should not exceed capacity");
525
526 for i in 0..50 {
528 let key = format!("key_{i}");
529 let result = maplet.query(&key).await;
530 assert!(result.is_some() || result.is_none());
532 }
533 }
534
535 #[tokio::test]
536 async fn test_memory_usage_accuracy() {
537 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
538
539 for i in 0..10 {
541 let key = format!("key_{i}");
542 maplet.insert(key, u64::try_from(i).unwrap_or(0)).await.unwrap();
543 }
544
545 let stats = maplet.stats().await;
546 let memory_usage = stats.memory_usage;
547
548 assert!(memory_usage > 0, "Memory usage should be positive");
550 assert!(memory_usage < 100_000, "Memory usage should be reasonable for 10 items");
551
552 println!("Memory usage for 10 items: {memory_usage} bytes");
555 }
556
557 #[cfg(feature = "quotient-filter")]
558 #[tokio::test]
559 async fn test_slot_finding_for_key() {
560 let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
561
562 let test_key = "test_key".to_string();
564 maplet.insert(test_key.clone(), 42).await.unwrap();
565
566 let slot = maplet.find_slot_for_key(&test_key).await;
568 assert!(slot.is_some(), "Should find a slot for existing key");
569
570 let non_existing_key = "non_existing".to_string();
574 let _non_existing_slot = maplet.find_slot_for_key(&non_existing_key).await;
575 }
578}