mappy_core/
maplet.rs

1//! Core maplet implementation
2//!
3//! Implements the main Maplet data structure that provides space-efficient
4//! approximate key-value mappings with one-sided error guarantees.
5
6use crate::{
7    MapletError, MapletResult, MapletStats,
8    hash::{CollisionDetector, FingerprintHasher, HashFunction, PerfectHash},
9    operators::MergeOperator,
10    quotient_filter::QuotientFilter,
11    types::MapletConfig,
12};
13use std::hash::Hash;
14use std::marker::PhantomData;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18/// Core maplet data structure
19///
20/// A maplet provides space-efficient approximate key-value mappings with
21/// one-sided error guarantees. When queried with a key k, it returns a
22/// value m[k] that is an approximation of the true value M[k].
23///
24/// The maplet guarantees that M[k] ≺ m[k] for some application-specific
25/// ordering relation ≺, and that m[k] = M[k] with probability at least 1-ε.
26#[derive(Debug)]
27pub struct Maplet<K, V, Op>
28where
29    K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
30    V: Clone + std::fmt::Debug + Send + Sync,
31    Op: MergeOperator<V> + Send + Sync,
32{
33    /// Configuration for the maplet
34    config: MapletConfig,
35    /// Quotient filter for fingerprint storage
36    filter: Arc<RwLock<QuotientFilter>>,
37    /// Map of fingerprints to values (not aligned with slots)
38    values: Arc<RwLock<std::collections::HashMap<u64, V>>>,
39    /// Merge operator for combining values
40    operator: Op,
41    /// Collision detector for monitoring hash collisions
42    collision_detector: Arc<RwLock<CollisionDetector>>,
43    /// Perfect hash for slot mapping (same as quotient filter)
44    #[allow(dead_code)]
45    perfect_hash: PerfectHash,
46    /// Current number of items stored
47    len: Arc<RwLock<usize>>,
48    /// Phantom data to hold the key type
49    _phantom: PhantomData<K>,
50}
51
52impl<K, V, Op> Maplet<K, V, Op>
53where
54    K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
55    V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
56    Op: MergeOperator<V> + Default + Send + Sync,
57{
58    /// Create a new maplet with default configuration
59    pub fn new(capacity: usize, false_positive_rate: f64) -> MapletResult<Self> {
60        let config = MapletConfig::new(capacity, false_positive_rate);
61        Self::with_config(config)
62    }
63
64    /// Create a new maplet with custom operator
65    pub fn with_operator(
66        capacity: usize,
67        false_positive_rate: f64,
68        operator: Op,
69    ) -> MapletResult<Self> {
70        let config = MapletConfig::new(capacity, false_positive_rate);
71        Self::with_config_and_operator(config, operator)
72    }
73
74    /// Create a new maplet with custom configuration
75    pub fn with_config(config: MapletConfig) -> MapletResult<Self> {
76        let operator = Op::default();
77        Self::with_config_and_operator(config, operator)
78    }
79
80    /// Create a new maplet with custom configuration and operator
81    pub fn with_config_and_operator(config: MapletConfig, operator: Op) -> MapletResult<Self> {
82        config.validate()?;
83
84        let fingerprint_bits =
85            FingerprintHasher::optimal_fingerprint_size(config.false_positive_rate);
86        let filter =
87            QuotientFilter::new(config.capacity, fingerprint_bits, HashFunction::default())?;
88
89        let collision_detector = CollisionDetector::new(config.capacity / 4); // Allow 25% collisions
90        let perfect_hash = PerfectHash::new(config.capacity, HashFunction::default());
91
92        Ok(Self {
93            config,
94            filter: Arc::new(RwLock::new(filter)),
95            values: Arc::new(RwLock::new(std::collections::HashMap::new())),
96            operator,
97            collision_detector: Arc::new(RwLock::new(collision_detector)),
98            perfect_hash,
99            len: Arc::new(RwLock::new(0)),
100            _phantom: PhantomData,
101        })
102    }
103
104    /// Insert a key-value pair into the maplet
105    pub async fn insert(&self, key: K, value: V) -> MapletResult<()> {
106        let current_len = *self.len.read().await;
107        if current_len >= self.config.capacity {
108            if self.config.auto_resize {
109                self.resize(self.config.capacity * 2).await?;
110            } else {
111                return Err(MapletError::CapacityExceeded);
112            }
113        }
114
115        let fingerprint = self.hash_key(&key);
116
117        // Check if key already exists in values HashMap (more reliable than filter)
118        let values_guard = self.values.read().await;
119        let key_exists = values_guard.contains_key(&fingerprint);
120        drop(values_guard);
121
122        if key_exists {
123            // Key exists, merge with existing value
124            self.merge_value(fingerprint, value).await?;
125        } else {
126            // New key, insert into filter and store value
127            {
128                let mut filter_guard = self.filter.write().await;
129                filter_guard.insert(fingerprint)?;
130            }
131            self.store_value(fingerprint, value).await?;
132            {
133                let mut len_guard = self.len.write().await;
134                *len_guard += 1;
135            }
136        }
137
138        Ok(())
139    }
140
141    /// Query a key to get its associated value
142    pub async fn query(&self, key: &K) -> Option<V> {
143        let fingerprint = self.hash_key(key);
144
145        let filter_guard = self.filter.read().await;
146        if !filter_guard.query(fingerprint) {
147            return None;
148        }
149        drop(filter_guard);
150
151        // Get the value directly from the HashMap using the fingerprint
152        let values_guard = self.values.read().await;
153        values_guard.get(&fingerprint).cloned()
154    }
155
156    /// Check if a key exists in the maplet
157    pub async fn contains(&self, key: &K) -> bool {
158        let fingerprint = self.hash_key(key);
159        let filter_guard = self.filter.read().await;
160        filter_guard.query(fingerprint)
161    }
162
163    /// Delete a key-value pair from the maplet
164    pub async fn delete(&self, key: &K, value: &V) -> MapletResult<bool> {
165        if !self.config.enable_deletion {
166            return Err(MapletError::Internal("Deletion not enabled".to_string()));
167        }
168
169        let fingerprint = self.hash_key(key);
170
171        let filter_guard = self.filter.read().await;
172        if !filter_guard.query(fingerprint) {
173            return Ok(false);
174        }
175        drop(filter_guard);
176
177        {
178            let mut values_guard = self.values.write().await;
179            if let Some(existing_value) = values_guard.get(&fingerprint) {
180                // Check if the values match (for exact deletion)
181                if existing_value == value {
182                    // Remove from filter and clear value
183                    {
184                        let mut filter_guard = self.filter.write().await;
185                        filter_guard.delete(fingerprint)?;
186                    }
187                    values_guard.remove(&fingerprint);
188                    {
189                        let mut len_guard = self.len.write().await;
190                        *len_guard -= 1;
191                    }
192                    return Ok(true);
193                }
194            }
195        }
196
197        Ok(false)
198    }
199
200    /// Get the current number of items stored
201    pub async fn len(&self) -> usize {
202        *self.len.read().await
203    }
204
205    /// Check if the maplet is empty
206    pub async fn is_empty(&self) -> bool {
207        *self.len.read().await == 0
208    }
209
210    /// Get the configured false-positive rate
211    pub const fn error_rate(&self) -> f64 {
212        self.config.false_positive_rate
213    }
214
215    /// Get the current load factor
216    #[allow(clippy::cast_precision_loss)] // Acceptable for ratio calculation
217    pub async fn load_factor(&self) -> f64 {
218        let current_len = *self.len.read().await;
219        current_len as f64 / self.config.capacity as f64
220    }
221
222    /// Get statistics about the maplet
223    pub async fn stats(&self) -> MapletStats {
224        let filter_guard = self.filter.read().await;
225        let filter_stats = filter_guard.stats();
226        drop(filter_guard);
227
228        let memory_usage = self.estimate_memory_usage();
229        let current_len = *self.len.read().await;
230
231        let collision_guard = self.collision_detector.read().await;
232        let collision_count = collision_guard.collision_count() as u64;
233        drop(collision_guard);
234
235        let mut stats = MapletStats::new(
236            self.config.capacity,
237            current_len,
238            self.config.false_positive_rate,
239        );
240        stats.update(
241            current_len,
242            memory_usage,
243            collision_count,
244            filter_stats.runs,
245        );
246        stats
247    }
248
249    /// Resize the maplet to a new capacity
250    pub async fn resize(&self, new_capacity: usize) -> MapletResult<()> {
251        if new_capacity <= self.config.capacity {
252            return Err(MapletError::ResizeFailed(
253                "New capacity must be larger".to_string(),
254            ));
255        }
256
257        // Create new filter with larger capacity
258        let fingerprint_bits =
259            FingerprintHasher::optimal_fingerprint_size(self.config.false_positive_rate);
260        let new_filter =
261            QuotientFilter::new(new_capacity, fingerprint_bits, HashFunction::default())?;
262
263        // Replace the filter and resize values array
264        {
265            let mut filter_guard = self.filter.write().await;
266            *filter_guard = new_filter;
267        }
268
269        // HashMap doesn't need explicit resizing - it grows automatically
270
271        // Note: In a full implementation, config.capacity would also need to be updated
272        // For now, we rely on the actual filter and values array capacity
273
274        Ok(())
275    }
276
277    /// Merge another maplet into this one
278    pub fn merge(&self, _other: &Self) -> MapletResult<()> {
279        if !self.config.enable_merging {
280            return Err(MapletError::MergeFailed("Merging not enabled".to_string()));
281        }
282
283        // This is a simplified merge implementation
284        // In practice, we'd need to iterate through all items in the other maplet
285        // and insert them into this one using the merge operator
286        Err(MapletError::MergeFailed(
287            "Merge not fully implemented".to_string(),
288        ))
289    }
290
291    /// Hash a key to get its fingerprint
292    fn hash_key(&self, key: &K) -> u64 {
293        // Use the same hasher as the quotient filter
294        // We need to use the same hasher instance to ensure consistency
295        use ahash::RandomState;
296
297        // Create a consistent hasher - we need to use the same seed as the quotient filter
298        // For now, use a fixed seed to ensure consistency
299        let random_state = RandomState::with_seed(42);
300
301        random_state.hash_one(&key)
302    }
303
304    /// Find the slot index for a fingerprint
305    #[allow(dead_code)]
306    fn find_slot_for_fingerprint(&self, fingerprint: u64) -> usize {
307        // Use the same slot mapping as the quotient filter
308        let quotient = self.extract_quotient(fingerprint);
309
310        // Use the same perfect hash as the quotient filter
311        self.perfect_hash.slot_index(quotient)
312    }
313
314    /// Extract quotient from fingerprint (same as quotient filter)
315    #[allow(dead_code, clippy::cast_precision_loss)] // Acceptable for bit calculation
316    fn extract_quotient(&self, fingerprint: u64) -> u64 {
317        let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
318        let quotient_mask = if quotient_bits >= 64 {
319            u64::MAX
320        } else {
321            (1u64 << quotient_bits) - 1
322        };
323        fingerprint & quotient_mask
324    }
325
326    /// Extract remainder from fingerprint (same as quotient filter)
327    #[allow(dead_code, clippy::cast_precision_loss)] // Acceptable for bit calculation
328    fn extract_remainder(&self, fingerprint: u64) -> u64 {
329        let quotient_bits = (self.config.capacity as f64).log2().ceil() as u32;
330        let remainder_bits = 64 - quotient_bits;
331        let remainder_mask = if remainder_bits >= 64 {
332            u64::MAX
333        } else {
334            (1u64 << remainder_bits) - 1
335        };
336        (fingerprint >> quotient_bits) & remainder_mask
337    }
338
339    /// Find the target slot for a quotient and remainder
340    #[allow(dead_code)]
341    fn find_target_slot(&self, quotient: u64, _remainder: u64) -> usize {
342        // Use the same perfect hash as the quotient filter
343        self.perfect_hash.slot_index(quotient)
344    }
345
346    /// Find the actual slot where a fingerprint is stored
347    /// This replicates the quotient filter's slot finding logic
348    #[cfg(feature = "quotient-filter")]
349    async fn find_actual_slot_for_fingerprint(
350        &self,
351        quotient: u64,
352        remainder: u64,
353    ) -> Option<usize> {
354        // We need to access the quotient filter to find the actual slot
355        // The quotient filter has the logic to find runs and search within them
356        let filter_guard = self.filter.read().await;
357
358        // Use the quotient filter's method to find the actual slot
359        // This handles runs, shifting, and all the complex logic
360        // The fingerprint is reconstructed by combining quotient and remainder
361        // The quotient goes in the lower bits, remainder in the upper bits
362        let fingerprint = quotient | (remainder << filter_guard.quotient_bits());
363        let actual_slot = filter_guard.get_actual_slot_for_fingerprint(fingerprint);
364
365        drop(filter_guard);
366        actual_slot
367    }
368
369    /// Find the actual slot where a key's fingerprint is stored
370    /// This is useful for debugging and advanced operations
371    #[cfg(feature = "quotient-filter")]
372    pub async fn find_slot_for_key(&self, key: &K) -> Option<usize> {
373        let fingerprint = self.hash_key(key);
374        let quotient = self.extract_quotient(fingerprint);
375        let remainder = self.extract_remainder(fingerprint);
376
377        self.find_actual_slot_for_fingerprint(quotient, remainder)
378            .await
379    }
380
381    /// Merge a value with an existing value at a fingerprint
382    async fn merge_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
383        {
384            let mut values_guard = self.values.write().await;
385            if let Some(existing_value) = values_guard.get(&fingerprint) {
386                let merged_value = self.operator.merge(existing_value.clone(), value)?;
387                values_guard.insert(fingerprint, merged_value);
388            } else {
389                // False positive or concurrent deletion - treat as new insertion
390                values_guard.insert(fingerprint, value);
391            }
392        }
393
394        Ok(())
395    }
396
397    /// Store a value at a fingerprint
398    async fn store_value(&self, fingerprint: u64, value: V) -> MapletResult<()> {
399        {
400            let mut values_guard = self.values.write().await;
401            values_guard.insert(fingerprint, value);
402        }
403
404        Ok(())
405    }
406
407    /// Estimate memory usage in bytes
408    const fn estimate_memory_usage(&self) -> usize {
409        // QuotientFilter slots: always allocated for full capacity
410        let filter_slots_size =
411            self.config.capacity * std::mem::size_of::<crate::quotient_filter::SlotMetadata>();
412
413        // For now, use a simpler estimation that doesn't require async access
414        // In a real implementation, we'd need to make this async or use a different approach
415        let estimated_values_count = self.config.capacity / 4; // Assume 25% load factor
416        let estimated_values_capacity = self.config.capacity / 2; // HashMap typically allocates 2x
417
418        // HashMap: capacity * (key_size + value_size) + overhead
419        let values_size =
420            estimated_values_capacity * (std::mem::size_of::<u64>() + std::mem::size_of::<V>());
421
422        // HashMap overhead (buckets, hash table)
423        let hashmap_overhead = estimated_values_capacity * std::mem::size_of::<usize>() / 2;
424
425        // Multiset counts in QuotientFilter (approximate)
426        let multiset_size =
427            estimated_values_count * (std::mem::size_of::<u64>() + std::mem::size_of::<usize>());
428
429        // Struct overhead
430        let overhead = std::mem::size_of::<Self>();
431
432        filter_slots_size + values_size + hashmap_overhead + multiset_size + overhead
433    }
434}
435
436// Implement Default for operators that support it
437impl<K, V, Op> Default for Maplet<K, V, Op>
438where
439    K: Hash + Eq + Clone + std::fmt::Debug + Send + Sync,
440    V: Clone + PartialEq + std::fmt::Debug + Send + Sync,
441    Op: MergeOperator<V> + Default + Send + Sync,
442{
443    fn default() -> Self {
444        Self::new(1000, 0.01).expect("Failed to create default maplet")
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451    use crate::operators::CounterOperator;
452
453    #[tokio::test]
454    async fn test_maplet_creation() {
455        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01);
456        assert!(maplet.is_ok());
457
458        let maplet = maplet.unwrap();
459        assert_eq!(maplet.len().await, 0);
460        assert!(maplet.is_empty().await);
461        assert!((maplet.error_rate() - 0.01).abs() < f64::EPSILON);
462    }
463
464    #[tokio::test]
465    async fn test_maplet_insert_query() {
466        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
467
468        // Insert some key-value pairs
469        assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
470        assert!(maplet.insert("key2".to_string(), 10).await.is_ok());
471
472        assert_eq!(maplet.len().await, 2);
473        assert!(!maplet.is_empty().await);
474
475        // Query existing keys
476        assert!(maplet.contains(&"key1".to_string()).await);
477        assert!(maplet.contains(&"key2".to_string()).await);
478
479        // Query non-existing key
480        assert!(!maplet.contains(&"key3".to_string()).await);
481    }
482
483    #[tokio::test]
484    async fn test_maplet_merge_values() {
485        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
486
487        // Insert same key multiple times
488        assert!(maplet.insert("key1".to_string(), 5).await.is_ok());
489        assert!(maplet.insert("key1".to_string(), 3).await.is_ok());
490
491        assert_eq!(maplet.len().await, 1); // Still only one unique key
492
493        // Query should return merged value
494        let value = maplet.query(&"key1".to_string()).await;
495        assert!(value.is_some());
496        // Note: Due to hash collisions, the exact value might not be 8
497        // but it should be >= 5 (one-sided error guarantee)
498    }
499
500    #[tokio::test]
501    async fn test_maplet_stats() {
502        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
503
504        maplet.insert("key1".to_string(), 5).await.unwrap();
505        maplet.insert("key2".to_string(), 10).await.unwrap();
506
507        let stats = maplet.stats().await;
508        assert_eq!(stats.capacity, 100);
509        assert_eq!(stats.len, 2);
510        assert!(stats.load_factor > 0.0);
511        assert!(stats.memory_usage > 0);
512    }
513
514    #[tokio::test]
515    async fn test_concurrent_insertions_no_filter_inconsistency() {
516        use std::sync::Arc;
517        use tokio::task;
518
519        let maplet = Arc::new(Maplet::<String, u64, CounterOperator>::new(1000, 0.01).unwrap());
520        let mut handles = vec![];
521
522        // Spawn multiple concurrent tasks that insert the same keys
523        for i in 0..5 {
524            let maplet_clone = Arc::clone(&maplet);
525            let handle = task::spawn(async move {
526                for j in 0..50 {
527                    let key = format!("key_{}", j % 25); // Some keys will be duplicated
528                    let value = u64::try_from(i * 50 + j).unwrap_or(0);
529                    maplet_clone.insert(key, value).await.unwrap();
530                }
531            });
532            handles.push(handle);
533        }
534
535        // Wait for all tasks to complete
536        for handle in handles {
537            handle.await.unwrap();
538        }
539
540        // Verify the maplet is in a consistent state
541        let len = maplet.len().await;
542        assert!(len > 0, "Maplet should have some items");
543        // Due to hash collisions, we might have more than 50 entries
544        // The important thing is that the test doesn't panic with filter inconsistency
545        assert!(len <= 1000, "Should not exceed capacity");
546
547        // Test that we can query without errors
548        for i in 0..50 {
549            let key = format!("key_{i}");
550            let result = maplet.query(&key).await;
551            // Result might be Some or None depending on hash collisions, but shouldn't panic
552            assert!(result.is_some() || result.is_none());
553        }
554    }
555
556    #[tokio::test]
557    async fn test_memory_usage_accuracy() {
558        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
559
560        // Insert some items
561        for i in 0..10 {
562            let key = format!("key_{i}");
563            maplet
564                .insert(key, u64::try_from(i).unwrap_or(0))
565                .await
566                .unwrap();
567        }
568
569        let stats = maplet.stats().await;
570        let memory_usage = stats.memory_usage;
571
572        // Memory usage should be reasonable and not based on full capacity
573        assert!(memory_usage > 0, "Memory usage should be positive");
574        assert!(
575            memory_usage < 100_000,
576            "Memory usage should be reasonable for 10 items"
577        );
578
579        // Should be much less than the old calculation (100 * 24 + 100 * 8 = 3200 bytes minimum)
580        // The new calculation should be more accurate
581        println!("Memory usage for 10 items: {memory_usage} bytes");
582    }
583
584    #[cfg(feature = "quotient-filter")]
585    #[tokio::test]
586    async fn test_slot_finding_for_key() {
587        let maplet = Maplet::<String, u64, CounterOperator>::new(100, 0.01).unwrap();
588
589        // Insert some items
590        let test_key = "test_key".to_string();
591        maplet.insert(test_key.clone(), 42).await.unwrap();
592
593        // Find the slot for the key
594        let slot = maplet.find_slot_for_key(&test_key).await;
595        assert!(slot.is_some(), "Should find a slot for existing key");
596
597        // Try to find slot for non-existing key
598        // Note: Due to false positives, the quotient filter might return a slot
599        // even for non-existing keys. This is expected behavior.
600        let non_existing_key = "non_existing".to_string();
601        let _non_existing_slot = maplet.find_slot_for_key(&non_existing_key).await;
602        // The slot might or might not be found due to false positives
603        // This is acceptable behavior for a probabilistic data structure
604    }
605}