mappy_core/
maplet.rs

1//! Core maplet implementation
2//! 
3//! Implements the main Maplet data structure that provides space-efficient
4//! approximate key-value mappings with one-sided error guarantees.
5
6use std::hash::{Hash, BuildHasher};
7use std::marker::PhantomData;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use crate::{
11    MapletError, MapletResult, MapletStats,
12    types::MapletConfig,
13    hash::{FingerprintHasher, HashFunction, CollisionDetector, PerfectHash},
14    quotient_filter::QuotientFilter,
15    operators::MergeOperator,
16};
17
18/// Core maplet data structure
19/// 
20/// A maplet provides space-efficient approximate key-value mappings with
21/// one-sided error guarantees. When queried with a key k, it returns a
22/// value m[k] that is an approximation of the true value M[k].
23/// 
24/// The maplet guarantees that M[k] ≺ m[k] for some application-specific
25/// ordering relation ≺, and that m[k] = M[k] with probability at least 1-ε.
26#[derive(Debug)]
27pub struct Maplet<K, V, Op> 
28where
29    K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
30    V: Clone + std::fmt::Debug + Send + Sync,
31    Op: MergeOperator<V> + Send + Sync,
32{
33    /// Configuration for the maplet
34    config: MapletConfig,
35    /// Quotient filter for fingerprint storage
36    filter: Arc<RwLock<QuotientFilter>>,
37    /// Map of fingerprints to values (not aligned with slots)
38    values: Arc<RwLock<std::collections::HashMap<u64, V>>>,
39    /// Merge operator for combining values
40    operator: Op,
41    /// Collision detector for monitoring hash collisions
42    collision_detector: Arc<RwLock<CollisionDetector>>,
43    /// Perfect hash for slot mapping (same as quotient filter)
44    #[allow(dead_code)]
45    perfect_hash: PerfectHash,
46    /// Current number of items stored
47    len: Arc<RwLock<usize>>,
48    /// Phantom data to hold the key type
49    _phantom: PhantomData<K>,
50}
51
52impl<K, V, Op> Maplet<K, V, Op>
53where
54    K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
55    V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
56    Op: MergeOperator<V> + Default + Send + Sync,
57{
58    /// Create a new maplet with default configuration
59    pub fn new(capacity: usize, false_positive_rate: f64) -> MapletResult<Self> {
60        let config = MapletConfig::new(capacity, false_positive_rate);
61        Self::with_config(config)
62    }
63    
64    /// Create a new maplet with custom operator
65    pub fn with_operator(capacity: usize, false_positive_rate: f64, operator: Op) -> MapletResult<Self> {
66        let config = MapletConfig::new(capacity, false_positive_rate);
67        Self::with_config_and_operator(config, operator)
68    }
69    
70    /// Create a new maplet with custom configuration
71    pub fn with_config(config: MapletConfig) -> MapletResult<Self> {
72        let operator = Op::default();
73        Self::with_config_and_operator(config, operator)
74    }
75    
76    /// Create a new maplet with custom configuration and operator
77    pub fn with_config_and_operator(config: MapletConfig, operator: Op) -> MapletResult<Self> {
78        config.validate()?;
79        
80        let fingerprint_bits = FingerprintHasher::optimal_fingerprint_size(config.false_positive_rate);
81        let filter = QuotientFilter::new(config.capacity, fingerprint_bits, HashFunction::default())?;
82        
83        let collision_detector = CollisionDetector::new(config.capacity / 4); // Allow 25% collisions
84        let perfect_hash = PerfectHash::new(config.capacity, HashFunction::default());
85        
86        Ok(Self {
87            config,
88            filter: Arc::new(RwLock::new(filter)),
89            values: Arc::new(RwLock::new(std::collections::HashMap::new())),
90            operator,
91            collision_detector: Arc::new(RwLock::new(collision_detector)),
92            perfect_hash,
93            len: Arc::new(RwLock::new(0)),
94            _phantom: PhantomData,
95        })
96    }
97    
98    /// Insert a key-value pair into the maplet
99    pub async fn insert(&self, key: K, value: V) -> MapletResult<()> {
100        let current_len = *self.len.read().await;
101        if current_len >= self.config.capacity {
102            if self.config.auto_resize {
103                self.resize(self.config.capacity * 2).await?;
104            } else {
105                return Err(MapletError::CapacityExceeded);
106            }
107        }
108        
109        let fingerprint = self.hash_key(&key);
110        
111        // Check if key already exists in values HashMap (more reliable than filter)
112        let values_guard = self.values.read().await;
113        let key_exists = values_guard.contains_key(&fingerprint);
114        drop(values_guard);
115        
116        if key_exists {
117            // Key exists, merge with existing value
118            self.merge_value(fingerprint, value).await?;
119        } else {
120            // New key, insert into filter and store value
121            {
122                let mut filter_guard = self.filter.write().await;
123                filter_guard.insert(fingerprint)?;
124            }
125            self.store_value(fingerprint, value).await?;
126            {
127                let mut len_guard = self.len.write().await;
128                *len_guard += 1;
129            }
130        }
131        
132        Ok(())
133    }
134    
135    /// Query a key to get its associated value
136    pub async fn query(&self, key: &K) -> Option<V> {
137        let fingerprint = self.hash_key(key);
138        
139        let filter_guard = self.filter.read().await;
140        if !filter_guard.query(fingerprint) {
141            return None;
142        }
143        drop(filter_guard);
144        
145        // Get the value directly from the HashMap using the fingerprint
146        let values_guard = self.values.read().await;
147        values_guard.get(&fingerprint).cloned()
148    }
149    
150    /// Check if a key exists in the maplet
151    pub async fn contains(&self, key: &K) -> bool {
152        let fingerprint = self.hash_key(key);
153        let filter_guard = self.filter.read().await;
154        filter_guard.query(fingerprint)
155    }
156    
157    /// Delete a key-value pair from the maplet
158    pub async fn delete(&self, key: &K, value: &V) -> MapletResult<bool> {
159        if !self.config.enable_deletion {
160            return Err(MapletError::Internal("Deletion not enabled".to_string()));
161        }
162        
163        let fingerprint = self.hash_key(key);
164        
165        let filter_guard = self.filter.read().await;
166        if !filter_guard.query(fingerprint) {
167            return Ok(false);
168        }
169        drop(filter_guard);
170        
171        let mut values_guard = self.values.write().await;
172        if let Some(existing_value) = values_guard.get(&fingerprint) {
173            // Check if the values match (for exact deletion)
174            if existing_value == value {
175                // Remove from filter and clear value
176                {
177                    let mut filter_guard = self.filter.write().await;
178                    filter_guard.delete(fingerprint)?;
179                }
180                values_guard.remove(&fingerprint);
181                {
182                    let mut len_guard = self.len.write().await;
183                    *len_guard -= 1;
184                }
185                return Ok(true);
186            }
187        }
188        
189        Ok(false)
190    }
191    
192    /// Get the current number of items stored
193    pub async fn len(&self) -> usize {
194        *self.len.read().await
195    }
196    
197    /// Check if the maplet is empty
198    pub async fn is_empty(&self) -> bool {
199        *self.len.read().await == 0
200    }
201    
202    /// Get the configured false-positive rate
203    pub fn error_rate(&self) -> f64 {
204        self.config.false_positive_rate
205    }
206    
207    /// Get the current load factor
208    pub async fn load_factor(&self) -> f64 {
209        let current_len = *self.len.read().await;
210        current_len as f64 / self.config.capacity as f64
211    }
212    
213    /// Get statistics about the maplet
214    pub async fn stats(&self) -> MapletStats {
215        let filter_guard = self.filter.read().await;
216        let filter_stats = filter_guard.stats();
217        drop(filter_guard);
218        
219        let memory_usage = self.estimate_memory_usage();
220        let current_len = *self.len.read().await;
221        
222        let collision_guard = self.collision_detector.read().await;
223        let collision_count = collision_guard.collision_count() as u64;
224        drop(collision_guard);
225        
226        let mut stats = MapletStats::new(
227            self.config.capacity,
228            current_len,
229            self.config.false_positive_rate,
230        );
231        stats.update(
232            current_len,
233            memory_usage,
234            collision_count,
235            filter_stats.runs,
236        );
237        stats
238    }
239    
240    /// Resize the maplet to a new capacity
241    pub async fn resize(&self, new_capacity: usize) -> MapletResult<()> {
242        if new_capacity <= self.config.capacity {
243            return Err(MapletError::ResizeFailed("New capacity must be larger".to_string()));
244        }
245        
246        // Create new filter with larger capacity
247        let fingerprint_bits = FingerprintHasher::optimal_fingerprint_size(self.config.false_positive_rate);
248        let new_filter = QuotientFilter::new(
249            new_capacity,
250            fingerprint_bits,
251            HashFunction::default(),
252        )?;
253        
254        // Replace the filter and resize values array
255        {
256            let mut filter_guard = self.filter.write().await;
257            *filter_guard = new_filter;
258        }
259        
260        // HashMap doesn't need explicit resizing - it grows automatically
261        
262        // Note: In a full implementation, config.capacity would also need to be updated
263        // For now, we rely on the actual filter and values array capacity
264        
265        Ok(())
266    }
267    
268    /// Merge another maplet into this one
269    pub fn merge(&self, _other: &Maplet<K, V, Op>) -> MapletResult<()> {
270        if !self.config.enable_merging {
271            return Err(MapletError::MergeFailed("Merging not enabled".to_string()));
272        }
273        
274        // This is a simplified merge implementation
275        // In practice, we'd need to iterate through all items in the other maplet
276        // and insert them into this one using the merge operator
277        Err(MapletError::MergeFailed("Merge not fully implemented".to_string()))
278    }
279    
280    /// Hash a key to get its fingerprint
281    fn hash_key(&self, key: &K) -> u64 {
282        // Use the same hasher as the quotient filter
283        // We need to use the same hasher instance to ensure consistency
284        use ahash::RandomState;
285        use std::hash::Hasher;
286        
287        // Create a consistent hasher - we need to use the same seed as the quotient filter
288        // For now, use a fixed seed to ensure consistency
289        let random_state = RandomState::with_seed(42);
290        let mut hasher = random_state.build_hasher();
291        key.hash(&mut hasher);
292        hasher.finish()
293    }
294    
295    /// Find the slot index for a fingerprint
296    #[allow(dead_code)]
297    fn find_slot_for_fingerprint(&self, fingerprint: u64) -> usize {
298        // Use the same slot mapping as the quotient filter
299        let quotient = self.extract_quotient(fingerprint);
300        
301        // Use the same perfect hash as the quotient filter
302        self.perfect_hash.slot_index(quotient)
303    }
304    
305    /// Extract quotient from fingerprint (same as quotient filter)
306    #[allow(dead_code)]
307    fn extract_quotient(&self, fingerprint: u64) -> u64 {
308        let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
309        let quotient_mask = if quotient_bits >= 64 {
310            u64::MAX
311        } else {
312            (1u64 << quotient_bits) - 1
313        };
314        fingerprint & quotient_mask
315    }
316    
317    /// Extract remainder from fingerprint (same as quotient filter)
318    #[allow(dead_code)]
319    fn extract_remainder(&self, fingerprint: u64) -> u64 {
320        let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
321        let remainder_bits = 64 - quotient_bits;
322        let remainder_mask = if remainder_bits >= 64 {
323            u64::MAX
324        } else {
325            (1u64 << remainder_bits) - 1
326        };
327        (fingerprint >> quotient_bits) & remainder_mask
328    }
329    
330    /// Find the target slot for a quotient and remainder
331    #[allow(dead_code)]
332    fn find_target_slot(&self, quotient: u64, _remainder: u64) -> usize {
333        // Use the same perfect hash as the quotient filter
334        self.perfect_hash.slot_index(quotient)
335    }
336    
337    /// Find the actual slot where a fingerprint is stored
338    /// This replicates the quotient filter's slot finding logic
339    #[cfg(feature = "quotient-filter")]
340    async fn find_actual_slot_for_fingerprint(&self, quotient: u64, remainder: u64) -> Option<usize> {
341        // We need to access the quotient filter to find the actual slot
342        // The quotient filter has the logic to find runs and search within them
343        let filter_guard = self.filter.read().await;
344        
345        // Use the quotient filter's method to find the actual slot
346        // This handles runs, shifting, and all the complex logic
347        // The fingerprint is reconstructed by combining quotient and remainder
348        // The quotient goes in the lower bits, remainder in the upper bits
349        let fingerprint = quotient | (remainder << filter_guard.quotient_bits());
350        let actual_slot = filter_guard.get_actual_slot_for_fingerprint(fingerprint);
351        
352        drop(filter_guard);
353        actual_slot
354    }
355    
356    /// Find the actual slot where a key's fingerprint is stored
357    /// This is useful for debugging and advanced operations
358    #[cfg(feature = "quotient-filter")]
359    pub async fn find_slot_for_key(&self, key: &K) -> Option<usize> {
360        let fingerprint = self.hash_key(key);
361        let quotient = self.extract_quotient(fingerprint);
362        let remainder = self.extract_remainder(fingerprint);
363        
364        self.find_actual_slot_for_fingerprint(quotient, remainder).await
365    }
366    
367    /// Merge a value with an existing value at a fingerprint
368    async fn merge_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
369        let mut values_guard = self.values.write().await;
370        if let Some(existing_value) = values_guard.get(&fingerprint) {
371            let merged_value = self.operator.merge(existing_value.clone(), value)?;
372            values_guard.insert(fingerprint, merged_value);
373        } else {
374            // False positive or concurrent deletion - treat as new insertion
375            values_guard.insert(fingerprint, value);
376        }
377        
378        Ok(())
379    }
380    
381    /// Store a value at a fingerprint
382    async fn store_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
383        let mut values_guard = self.values.write().await;
384        values_guard.insert(fingerprint, value);
385        
386        Ok(())
387    }
388    
389    /// Estimate memory usage in bytes
390    fn estimate_memory_usage(&self) -> usize {
391        // QuotientFilter slots: always allocated for full capacity
392        let filter_slots_size = self.config.capacity * std::mem::size_of::<crate::quotient_filter::SlotMetadata>();
393        
394        // For now, use a simpler estimation that doesn't require async access
395        // In a real implementation, we'd need to make this async or use a different approach
396        let estimated_values_count = self.config.capacity / 4; // Assume 25% load factor
397        let estimated_values_capacity = self.config.capacity / 2; // HashMap typically allocates 2x
398        
399        // HashMap: capacity * (key_size + value_size) + overhead
400        let values_size = estimated_values_capacity * (std::mem::size_of::<u64>() + std::mem::size_of::<V>());
401        
402        // HashMap overhead (buckets, hash table)
403        let hashmap_overhead = estimated_values_capacity * std::mem::size_of::<usize>() / 2;
404        
405        // Multiset counts in QuotientFilter (approximate)
406        let multiset_size = estimated_values_count * (std::mem::size_of::<u64>() + std::mem::size_of::<usize>());
407        
408        // Struct overhead
409        let overhead = std::mem::size_of::<Self>();
410        
411        filter_slots_size + values_size + hashmap_overhead + multiset_size + overhead
412    }
413}
414
415// Implement Default for operators that support it
416impl<K, V, Op> Default for Maplet<K, V, Op>
417where
418    K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
419    V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
420    Op: MergeOperator<V> + Default + Send + Sync,
421{
422    fn default() -> Self {
423        Self::new(1000, 0.01).expect("Failed to create default maplet")
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430    use crate::operators::CounterOperator;
431
432    #[tokio::test]
433    async fn test_maplet_creation() {
434        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01);
435        assert!(maplet.is_ok());
436        
437        let maplet = maplet.unwrap();
438        assert_eq!(maplet.len().await, 0);
439        assert!(maplet.is_empty().await);
440        assert!((maplet.error_rate() - 0.01).abs() < f64::EPSILON);
441    }
442
443    #[tokio::test]
444    async fn test_maplet_insert_query() {
445        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
446        
447        // Insert some key-value pairs
448        assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
449        assert!(maplet.insert("key2".to_string(), 10).await.is_ok());
450        
451        assert_eq!(maplet.len().await, 2);
452        assert!(!maplet.is_empty().await);
453        
454        // Query existing keys
455        assert!(maplet.contains(&"key1".to_string()).await);
456        assert!(maplet.contains(&"key2".to_string()).await);
457        
458        // Query non-existing key
459        assert!(!maplet.contains(&"key3".to_string()).await);
460    }
461
462    #[tokio::test]
463    async fn test_maplet_merge_values() {
464        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
465        
466        // Insert same key multiple times
467        assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
468        assert!(maplet.insert("key1".to_string(), 3).await.is_ok());
469        
470        assert_eq!(maplet.len().await, 1); // Still only one unique key
471        
472        // Query should return merged value
473        let value = maplet.query(&"key1".to_string()).await;
474        assert!(value.is_some());
475        // Note: Due to hash collisions, the exact value might not be 8
476        // but it should be >= 5 (one-sided error guarantee)
477    }
478
479    #[tokio::test]
480    async fn test_maplet_stats() {
481        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
482        
483        maplet.insert("key1".to_string(), 5).await.unwrap();
484        maplet.insert("key2".to_string(), 10).await.unwrap();
485        
486        let stats = maplet.stats().await;
487        assert_eq!(stats.capacity, 100);
488        assert_eq!(stats.len, 2);
489        assert!(stats.load_factor > 0.0);
490        assert!(stats.memory_usage > 0);
491    }
492
493    #[tokio::test]
494    async fn test_concurrent_insertions_no_filter_inconsistency() {
495        use std::sync::Arc;
496        use tokio::task;
497
498        let maplet = Arc::new(Maplet::<String, u64, CounterOperator>::new(1000, 0.01).unwrap());
499        let mut handles = vec![];
500
501        // Spawn multiple concurrent tasks that insert the same keys
502        for i in 0..5 {
503            let maplet_clone = Arc::clone(&maplet);
504            let handle = task::spawn(async move {
505                for j in 0..50 {
506                    let key = format!("key_{}", j % 25); // Some keys will be duplicated
507                    let value = u64::try_from(i * 50 + j).unwrap_or(0);
508                    maplet_clone.insert(key, value).await.unwrap();
509                }
510            });
511            handles.push(handle);
512        }
513
514        // Wait for all tasks to complete
515        for handle in handles {
516            handle.await.unwrap();
517        }
518
519        // Verify the maplet is in a consistent state
520        let len = maplet.len().await;
521        assert!(len > 0, "Maplet should have some items");
522        // Due to hash collisions, we might have more than 50 entries
523        // The important thing is that the test doesn't panic with filter inconsistency
524        assert!(len <= 1000, "Should not exceed capacity");
525        
526        // Test that we can query without errors
527        for i in 0..50 {
528            let key = format!("key_{i}");
529            let result = maplet.query(&key).await;
530            // Result might be Some or None depending on hash collisions, but shouldn't panic
531            assert!(result.is_some() || result.is_none());
532        }
533    }
534
535    #[tokio::test]
536    async fn test_memory_usage_accuracy() {
537        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
538        
539        // Insert some items
540        for i in 0..10 {
541            let key = format!("key_{i}");
542            maplet.insert(key, u64::try_from(i).unwrap_or(0)).await.unwrap();
543        }
544
545        let stats = maplet.stats().await;
546        let memory_usage = stats.memory_usage;
547        
548        // Memory usage should be reasonable and not based on full capacity
549        assert!(memory_usage > 0, "Memory usage should be positive");
550        assert!(memory_usage < 100_000, "Memory usage should be reasonable for 10 items");
551        
552        // Should be much less than the old calculation (100 * 24 + 100 * 8 = 3200 bytes minimum)
553        // The new calculation should be more accurate
554        println!("Memory usage for 10 items: {memory_usage} bytes");
555    }
556
557    #[cfg(feature = "quotient-filter")]
558    #[tokio::test]
559    async fn test_slot_finding_for_key() {
560        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
561        
562        // Insert some items
563        let test_key = "test_key".to_string();
564        maplet.insert(test_key.clone(), 42).await.unwrap();
565        
566        // Find the slot for the key
567        let slot = maplet.find_slot_for_key(&test_key).await;
568        assert!(slot.is_some(), "Should find a slot for existing key");
569        
570        // Try to find slot for non-existing key
571        // Note: Due to false positives, the quotient filter might return a slot
572        // even for non-existing keys. This is expected behavior.
573        let non_existing_key = "non_existing".to_string();
574        let _non_existing_slot = maplet.find_slot_for_key(&non_existing_key).await;
575        // The slot might or might not be found due to false positives
576        // This is acceptable behavior for a probabilistic data structure
577    }
578}