mappy_core/
maplet.rs

1//! Core maplet implementation
2//! 
3//! Implements the main Maplet data structure that provides space-efficient
4//! approximate key-value mappings with one-sided error guarantees.
5
6use std::hash::{Hash, BuildHasher};
7use std::marker::PhantomData;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use crate::{
11    MapletError, MapletResult, MapletStats,
12    types::MapletConfig,
13    hash::{FingerprintHasher, HashFunction, CollisionDetector, PerfectHash},
14    quotient_filter::QuotientFilter,
15    operators::MergeOperator,
16};
17
18/// Core maplet data structure
19/// 
20/// A maplet provides space-efficient approximate key-value mappings with
21/// one-sided error guarantees. When queried with a key k, it returns a
22/// value m[k] that is an approximation of the true value M[k].
23/// 
24/// The maplet guarantees that M[k] ≺ m[k] for some application-specific
25/// ordering relation ≺, and that m[k] = M[k] with probability at least 1-ε.
26#[derive(Debug)]
27pub struct Maplet<K, V, Op> 
28where
29    K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
30    V: Clone + std::fmt::Debug + Send + Sync,
31    Op: MergeOperator<V> + Send + Sync,
32{
33    /// Configuration for the maplet
34    config: MapletConfig,
35    /// Quotient filter for fingerprint storage
36    filter: Arc<RwLock<QuotientFilter>>,
37    /// Map of fingerprints to values (not aligned with slots)
38    values: Arc<RwLock<std::collections::HashMap<u64, V>>>,
39    /// Merge operator for combining values
40    operator: Op,
41    /// Collision detector for monitoring hash collisions
42    collision_detector: Arc<RwLock<CollisionDetector>>,
43    /// Perfect hash for slot mapping (same as quotient filter)
44    perfect_hash: PerfectHash,
45    /// Current number of items stored
46    len: Arc<RwLock<usize>>,
47    /// Phantom data to hold the key type
48    _phantom: PhantomData<K>,
49}
50
51impl<K, V, Op> Maplet<K, V, Op>
52where
53    K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
54    V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
55    Op: MergeOperator<V> + Default + Send + Sync,
56{
57    /// Create a new maplet with default configuration
58    pub fn new(capacity: usize, false_positive_rate: f64) -> MapletResult<Self> {
59        let config = MapletConfig::new(capacity, false_positive_rate);
60        Self::with_config(config)
61    }
62    
63    /// Create a new maplet with custom operator
64    pub fn with_operator(capacity: usize, false_positive_rate: f64, operator: Op) -> MapletResult<Self> {
65        let config = MapletConfig::new(capacity, false_positive_rate);
66        Self::with_config_and_operator(config, operator)
67    }
68    
69    /// Create a new maplet with custom configuration
70    pub fn with_config(config: MapletConfig) -> MapletResult<Self> {
71        let operator = Op::default();
72        Self::with_config_and_operator(config, operator)
73    }
74    
75    /// Create a new maplet with custom configuration and operator
76    pub fn with_config_and_operator(config: MapletConfig, operator: Op) -> MapletResult<Self> {
77        config.validate()?;
78        
79        let fingerprint_bits = FingerprintHasher::optimal_fingerprint_size(config.false_positive_rate);
80        let filter = QuotientFilter::new(config.capacity, fingerprint_bits, HashFunction::default())?;
81        
82        let collision_detector = CollisionDetector::new(config.capacity / 4); // Allow 25% collisions
83        let perfect_hash = PerfectHash::new(config.capacity, HashFunction::default());
84        
85        Ok(Self {
86            config,
87            filter: Arc::new(RwLock::new(filter)),
88            values: Arc::new(RwLock::new(std::collections::HashMap::new())),
89            operator,
90            collision_detector: Arc::new(RwLock::new(collision_detector)),
91            perfect_hash,
92            len: Arc::new(RwLock::new(0)),
93            _phantom: PhantomData,
94        })
95    }
96    
97    /// Insert a key-value pair into the maplet
98    pub async fn insert(&self, key: K, value: V) -> MapletResult<()> {
99        let current_len = *self.len.read().await;
100        if current_len >= self.config.capacity {
101            if self.config.auto_resize {
102                self.resize(self.config.capacity * 2).await?;
103            } else {
104                return Err(MapletError::CapacityExceeded);
105            }
106        }
107        
108        let fingerprint = self.hash_key(&key);
109        
110        // Check if key already exists
111        let filter_guard = self.filter.read().await;
112        let key_exists = filter_guard.query(fingerprint);
113        drop(filter_guard);
114        
115        if key_exists {
116            // Key exists, merge with existing value
117            self.merge_value(fingerprint, value).await?;
118        } else {
119            // New key, insert into filter and store value
120            {
121                let mut filter_guard = self.filter.write().await;
122                filter_guard.insert(fingerprint)?;
123            }
124            self.store_value(fingerprint, value).await?;
125            {
126                let mut len_guard = self.len.write().await;
127                *len_guard += 1;
128            }
129        }
130        
131        Ok(())
132    }
133    
134    /// Query a key to get its associated value
135    pub async fn query(&self, key: &K) -> Option<V> {
136        let fingerprint = self.hash_key(key);
137        
138        let filter_guard = self.filter.read().await;
139        if !filter_guard.query(fingerprint) {
140            return None;
141        }
142        drop(filter_guard);
143        
144        // Get the value directly from the HashMap using the fingerprint
145        let values_guard = self.values.read().await;
146        values_guard.get(&fingerprint).cloned()
147    }
148    
149    /// Check if a key exists in the maplet
150    pub async fn contains(&self, key: &K) -> bool {
151        let fingerprint = self.hash_key(key);
152        let filter_guard = self.filter.read().await;
153        filter_guard.query(fingerprint)
154    }
155    
156    /// Delete a key-value pair from the maplet
157    pub async fn delete(&self, key: &K, value: &V) -> MapletResult<bool> {
158        if !self.config.enable_deletion {
159            return Err(MapletError::Internal("Deletion not enabled".to_string()));
160        }
161        
162        let fingerprint = self.hash_key(key);
163        
164        let filter_guard = self.filter.read().await;
165        if !filter_guard.query(fingerprint) {
166            return Ok(false);
167        }
168        drop(filter_guard);
169        
170        let mut values_guard = self.values.write().await;
171        if let Some(existing_value) = values_guard.get(&fingerprint) {
172            // Check if the values match (for exact deletion)
173            if existing_value == value {
174                // Remove from filter and clear value
175                {
176                    let mut filter_guard = self.filter.write().await;
177                    filter_guard.delete(fingerprint)?;
178                }
179                values_guard.remove(&fingerprint);
180                {
181                    let mut len_guard = self.len.write().await;
182                    *len_guard -= 1;
183                }
184                return Ok(true);
185            }
186        }
187        
188        Ok(false)
189    }
190    
191    /// Get the current number of items stored
192    pub async fn len(&self) -> usize {
193        *self.len.read().await
194    }
195    
196    /// Check if the maplet is empty
197    pub async fn is_empty(&self) -> bool {
198        *self.len.read().await == 0
199    }
200    
201    /// Get the configured false-positive rate
202    pub fn error_rate(&self) -> f64 {
203        self.config.false_positive_rate
204    }
205    
206    /// Get the current load factor
207    pub async fn load_factor(&self) -> f64 {
208        let current_len = *self.len.read().await;
209        current_len as f64 / self.config.capacity as f64
210    }
211    
212    /// Get statistics about the maplet
213    pub async fn stats(&self) -> MapletStats {
214        let filter_guard = self.filter.read().await;
215        let filter_stats = filter_guard.stats();
216        drop(filter_guard);
217        
218        let memory_usage = self.estimate_memory_usage();
219        let current_len = *self.len.read().await;
220        
221        let collision_guard = self.collision_detector.read().await;
222        let collision_count = collision_guard.collision_count() as u64;
223        drop(collision_guard);
224        
225        let mut stats = MapletStats::new(
226            self.config.capacity,
227            current_len,
228            self.config.false_positive_rate,
229        );
230        stats.update(
231            current_len,
232            memory_usage,
233            collision_count,
234            filter_stats.runs,
235        );
236        stats
237    }
238    
239    /// Resize the maplet to a new capacity
240    pub async fn resize(&self, new_capacity: usize) -> MapletResult<()> {
241        if new_capacity <= self.config.capacity {
242            return Err(MapletError::ResizeFailed("New capacity must be larger".to_string()));
243        }
244        
245        // Create new filter with larger capacity
246        let fingerprint_bits = FingerprintHasher::optimal_fingerprint_size(self.config.false_positive_rate);
247        let new_filter = QuotientFilter::new(
248            new_capacity,
249            fingerprint_bits,
250            HashFunction::default(),
251        )?;
252        
253        // Replace the filter and resize values array
254        {
255            let mut filter_guard = self.filter.write().await;
256            *filter_guard = new_filter;
257        }
258        
259        // HashMap doesn't need explicit resizing - it grows automatically
260        
261        // Note: In a full implementation, config.capacity would also need to be updated
262        // For now, we rely on the actual filter and values array capacity
263        
264        Ok(())
265    }
266    
267    /// Merge another maplet into this one
268    pub async fn merge(&self, _other: &Maplet<K, V, Op>) -> MapletResult<()> {
269        if !self.config.enable_merging {
270            return Err(MapletError::MergeFailed("Merging not enabled".to_string()));
271        }
272        
273        // This is a simplified merge implementation
274        // In practice, we'd need to iterate through all items in the other maplet
275        // and insert them into this one using the merge operator
276        Err(MapletError::MergeFailed("Merge not fully implemented".to_string()))
277    }
278    
279    /// Hash a key to get its fingerprint
280    fn hash_key(&self, key: &K) -> u64 {
281        // Use the same hasher as the quotient filter
282        // The quotient filter uses AHash by default, so we need to use the same
283        use ahash::RandomState;
284        use std::hash::Hasher;
285        
286        // Create a consistent hasher - we need to use the same seed as the quotient filter
287        // For now, use a fixed seed to ensure consistency
288        let random_state = RandomState::with_seed(42);
289        let mut hasher = random_state.build_hasher();
290        key.hash(&mut hasher);
291        hasher.finish()
292    }
293    
294    /// Find the slot index for a fingerprint
295    fn find_slot_for_fingerprint(&self, fingerprint: u64) -> usize {
296        // Use the same slot mapping as the quotient filter
297        let quotient = self.extract_quotient(fingerprint);
298        
299        // Use the same perfect hash as the quotient filter
300        self.perfect_hash.slot_index(quotient)
301    }
302    
303    /// Extract quotient from fingerprint (same as quotient filter)
304    fn extract_quotient(&self, fingerprint: u64) -> u64 {
305        let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
306        let quotient_mask = if quotient_bits >= 64 {
307            u64::MAX
308        } else {
309            (1u64 << quotient_bits) - 1
310        };
311        fingerprint & quotient_mask
312    }
313    
314    /// Extract remainder from fingerprint (same as quotient filter)
315    fn extract_remainder(&self, fingerprint: u64) -> u64 {
316        let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
317        let remainder_bits = 64 - quotient_bits;
318        let remainder_mask = if remainder_bits >= 64 {
319            u64::MAX
320        } else {
321            (1u64 << remainder_bits) - 1
322        };
323        (fingerprint >> quotient_bits) & remainder_mask
324    }
325    
326    /// Find the target slot for a quotient and remainder
327    fn find_target_slot(&self, quotient: u64, _remainder: u64) -> usize {
328        // Use the same perfect hash as the quotient filter
329        self.perfect_hash.slot_index(quotient)
330    }
331    
332    /// Find the actual slot where a fingerprint is stored
333    /// This replicates the quotient filter's slot finding logic
334    fn find_actual_slot_for_fingerprint(&self, quotient: u64, _remainder: u64) -> usize {
335        // Get the target slot from the perfect hash
336        let target_slot = self.perfect_hash.slot_index(quotient);
337        
338        // The quotient filter stores fingerprints in runs
339        // We need to find the actual slot within the run where this remainder is stored
340        // This is a simplified version - in practice, we'd need access to the quotient filter's internal state
341        
342        // For now, let's use a simple approach: assume the remainder is stored at the target slot
343        // This is not correct but will help us identify the issue
344        target_slot
345    }
346    
347    /// Merge a value with an existing value at a fingerprint
348    async fn merge_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
349        let mut values_guard = self.values.write().await;
350        if let Some(existing_value) = values_guard.get(&fingerprint) {
351            let merged_value = self.operator.merge(existing_value.clone(), value)?;
352            values_guard.insert(fingerprint, merged_value);
353        } else {
354            // This shouldn't happen if the filter is consistent
355            return Err(MapletError::Internal("Filter inconsistency detected".to_string()));
356        }
357        
358        Ok(())
359    }
360    
361    /// Store a value at a fingerprint
362    async fn store_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
363        let mut values_guard = self.values.write().await;
364        values_guard.insert(fingerprint, value);
365        
366        Ok(())
367    }
368    
369    /// Estimate memory usage in bytes
370    fn estimate_memory_usage(&self) -> usize {
371        // Rough estimate: filter size + values size + overhead
372        let filter_size = self.config.capacity * std::mem::size_of::<crate::quotient_filter::SlotMetadata>();
373        let values_size = self.config.capacity * std::mem::size_of::<Option<V>>();
374        let overhead = std::mem::size_of::<Self>();
375        
376        filter_size + values_size + overhead
377    }
378}
379
380// Implement Default for operators that support it
381impl<K, V, Op> Default for Maplet<K, V, Op>
382where
383    K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
384    V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
385    Op: MergeOperator<V> + Default + Send + Sync,
386{
387    fn default() -> Self {
388        Self::new(1000, 0.01).expect("Failed to create default maplet")
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use crate::operators::CounterOperator;
396
397    #[tokio::test]
398    async fn test_maplet_creation() {
399        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01);
400        assert!(maplet.is_ok());
401        
402        let maplet = maplet.unwrap();
403        assert_eq!(maplet.len().await, 0);
404        assert!(maplet.is_empty().await);
405        assert_eq!(maplet.error_rate(), 0.01);
406    }
407
408    #[tokio::test]
409    async fn test_maplet_insert_query() {
410        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
411        
412        // Insert some key-value pairs
413        assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
414        assert!(maplet.insert("key2".to_string(), 10).await.is_ok());
415        
416        assert_eq!(maplet.len().await, 2);
417        assert!(!maplet.is_empty().await);
418        
419        // Query existing keys
420        assert!(maplet.contains(&"key1".to_string()).await);
421        assert!(maplet.contains(&"key2".to_string()).await);
422        
423        // Query non-existing key
424        assert!(!maplet.contains(&"key3".to_string()).await);
425    }
426
427    #[tokio::test]
428    async fn test_maplet_merge_values() {
429        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
430        
431        // Insert same key multiple times
432        assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
433        assert!(maplet.insert("key1".to_string(), 3).await.is_ok());
434        
435        assert_eq!(maplet.len().await, 1); // Still only one unique key
436        
437        // Query should return merged value
438        let value = maplet.query(&"key1".to_string()).await;
439        assert!(value.is_some());
440        // Note: Due to hash collisions, the exact value might not be 8
441        // but it should be >= 5 (one-sided error guarantee)
442    }
443
444    #[tokio::test]
445    async fn test_maplet_stats() {
446        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
447        
448        maplet.insert("key1".to_string(), 5).await.unwrap();
449        maplet.insert("key2".to_string(), 10).await.unwrap();
450        
451        let stats = maplet.stats().await;
452        assert_eq!(stats.capacity, 100);
453        assert_eq!(stats.len, 2);
454        assert!(stats.load_factor > 0.0);
455        assert!(stats.memory_usage > 0);
456    }
457}