Skip to main content

sanitize_engine/
store.rs

1//! Thread-safe, concurrent one-way replacement store.
2//!
3//! # Concurrency Model
4//!
5//! The store uses [`dashmap::DashMap`] — a concurrent hash map with shard-level
6//! locking (default 64 shards). This gives us:
7//!
8//! - **Lock-free reads** for lookups of already-mapped values.
9//! - **Shard-level write locks** that are held only while inserting a new entry.
10//!   With 64 shards and 8–16 threads, the probability of two threads contending
11//!   on the same shard is very low.
12//! - **Atomic get-or-insert** via the `entry()` API, which prevents TOCTOU races
13//!   and guarantees first-writer-wins semantics.
14//!
15//! The forward map is keyed by `(Category, original)` → `sanitized`.
16//! Replacements are **one-way only** — there is no reverse map, no mapping
17//! file, and no restore capability.
18//!
19//! # Memory Characteristics
20//!
21//! At 10M unique values with average key length 20 bytes and average value
22//! length 30 bytes:
23//! - Forward map: 10M × (20 + 30 + ~120 DashMap overhead) ≈ 1.7 GB
24//! - **Total: ~1.7 GB** — acceptable for server workloads.
25//!
26//! An optional `capacity_limit` can be set to prevent unbounded growth.
27
28use crate::category::Category;
29use crate::error::{Result, SanitizeError};
30use crate::generator::ReplacementGenerator;
31use compact_str::CompactString;
32use dashmap::DashMap;
33use std::sync::atomic::{AtomicUsize, Ordering};
34use std::sync::Arc;
35use zeroize::Zeroize;
36
37// ---------------------------------------------------------------------------
38// Composite key for the forward map
39// ---------------------------------------------------------------------------
40
41/// Composite key: `(Category, original_value)`.
42///
43/// Uses `String` for `original` (rather than `CompactString`) so that
44/// the Drop impl can zeroize sensitive plaintext via the `Zeroize`
45/// trait without unsafe code (F-09 fix).
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47struct ForwardKey {
48    category: Category,
49    original: String,
50}
51
52// ---------------------------------------------------------------------------
53// MappingStore
54// ---------------------------------------------------------------------------
55
56/// Thread-safe concurrent one-way replacement store.
57///
58/// Caches forward mappings for per-run consistency (same input always
59/// produces the same output within a run). There is no reverse map,
60/// no journal, and no persistence — replacements are one-way only.
61///
62/// See the [module-level documentation](self) for concurrency and memory details.
63pub struct MappingStore {
64    /// `(category, original) → sanitized`
65    forward: DashMap<ForwardKey, CompactString>,
66    /// Replacement generator (HMAC deterministic or CSPRNG random).
67    generator: Arc<dyn ReplacementGenerator>,
68    /// Current number of mappings (atomic for lock-free reads).
69    len: AtomicUsize,
70    /// Optional upper bound on the number of mappings.
71    capacity_limit: Option<usize>,
72}
73
74impl MappingStore {
75    // ---------------- Construction ----------------
76
77    /// Create a new, empty mapping store.
78    ///
79    /// # Arguments
80    ///
81    /// - `generator` — replacement strategy (HMAC or random).
82    /// - `capacity_limit` — optional max number of unique mappings.
83    #[must_use]
84    pub fn new(generator: Arc<dyn ReplacementGenerator>, capacity_limit: Option<usize>) -> Self {
85        Self {
86            forward: DashMap::with_capacity(1024),
87            generator,
88            len: AtomicUsize::new(0),
89            capacity_limit,
90        }
91    }
92
93    /// Create a store pre-sized for `expected` entries (avoids rehashing).
94    #[must_use]
95    pub fn with_expected_capacity(
96        generator: Arc<dyn ReplacementGenerator>,
97        capacity_limit: Option<usize>,
98        expected: usize,
99    ) -> Self {
100        Self {
101            forward: DashMap::with_capacity(expected),
102            generator,
103            len: AtomicUsize::new(0),
104            capacity_limit,
105        }
106    }
107
108    // ---------------- Core API ----------------
109
110    /// Get or create the sanitized replacement for `(category, original)`.
111    ///
112    /// This is the primary API for one-way sanitization.
113    ///
114    /// **Thread-safety:** Uses `DashMap::entry()` which holds a shard-level
115    /// lock only for the duration of the insert closure. The generator is
116    /// called inside the lock, but generation is fast (one HMAC or one RNG
117    /// call). Capacity enforcement uses `compare_exchange` to prevent
118    /// TOCTOU over-insertion (C-1 fix).
119    ///
120    /// **Per-run consistency:** Once a value is mapped, all subsequent
121    /// lookups return the same sanitized value (first-writer-wins).
122    ///
123    /// # Errors
124    ///
125    /// Returns [`SanitizeError::CapacityExceeded`] if the store has
126    /// reached its configured capacity limit.
127    pub fn get_or_insert(&self, category: &Category, original: &str) -> Result<CompactString> {
128        // Fast path: already mapped (lock-free read).
129        let key = ForwardKey {
130            category: category.clone(),
131            original: original.to_owned(),
132        };
133        if let Some(existing) = self.forward.get(&key) {
134            return Ok(existing.value().clone());
135        }
136
137        // Slow path: need to insert.
138        // Atomically reserve a capacity slot *before* generating the
139        // value.  This eliminates the TOCTOU race (C-1) where multiple
140        // threads pass the capacity check and all insert.
141        if let Some(limit) = self.capacity_limit {
142            loop {
143                let current = self.len.load(Ordering::Acquire);
144                if current >= limit {
145                    // One more chance: key may have been inserted by
146                    // another thread while we were checking.
147                    if let Some(existing) = self.forward.get(&key) {
148                        return Ok(existing.value().clone());
149                    }
150                    return Err(SanitizeError::CapacityExceeded { current, limit });
151                }
152                // Try to reserve a slot atomically.
153                if self
154                    .len
155                    .compare_exchange_weak(
156                        current,
157                        current + 1,
158                        Ordering::AcqRel,
159                        Ordering::Acquire,
160                    )
161                    .is_ok()
162                {
163                    break;
164                }
165                // CAS failed → another thread incremented; retry.
166            }
167
168            // Slot reserved — generate and insert.
169            // Use entry() for first-writer-wins semantics.
170            let mut was_inserted = false;
171            let result = self
172                .forward
173                .entry(key)
174                .or_insert_with(|| {
175                    was_inserted = true;
176                    let val = self.generator.generate(category, original);
177                    CompactString::new(val)
178                })
179                .value()
180                .clone();
181
182            if !was_inserted {
183                // Another thread inserted first — release our reserved slot.
184                self.len.fetch_sub(1, Ordering::Release);
185            }
186
187            Ok(result)
188        } else {
189            // No capacity limit — generate inside the entry lock to
190            // avoid wasted work (C-2 fix: only the first writer generates).
191            let mut was_inserted = false;
192            let result = self
193                .forward
194                .entry(key)
195                .or_insert_with(|| {
196                    was_inserted = true;
197                    let val = self.generator.generate(category, original);
198                    CompactString::new(val)
199                })
200                .value()
201                .clone();
202
203            if was_inserted {
204                self.len.fetch_add(1, Ordering::Release);
205            }
206
207            Ok(result)
208        }
209    }
210
211    /// Look up an existing forward mapping without creating one.
212    pub fn forward_lookup(&self, category: &Category, original: &str) -> Option<CompactString> {
213        let key = ForwardKey {
214            category: category.clone(),
215            original: original.to_owned(),
216        };
217        self.forward.get(&key).map(|r| r.value().clone())
218    }
219
220    // ---------------- Metrics ----------------
221
222    /// Number of unique mappings in the store.
223    #[must_use]
224    pub fn len(&self) -> usize {
225        self.len.load(Ordering::Relaxed)
226    }
227
228    /// Whether the store is empty.
229    #[must_use]
230    pub fn is_empty(&self) -> bool {
231        self.len() == 0
232    }
233
234    /// Remove all mappings, zeroizing the original plaintexts.
235    ///
236    /// This is useful for resetting the store between runs without
237    /// dropping and recreating it.
238    pub fn clear(&mut self) {
239        let old_map = std::mem::take(&mut self.forward);
240        for (mut key, _value) in old_map {
241            key.original.zeroize();
242        }
243        self.len.store(0, Ordering::Release);
244    }
245
246    // ---------------- Iteration (for external use) ----------------
247
248    /// Iterate over all mappings. Yields `(category, original, sanitized)`.
249    ///
250    /// Note: iteration over `DashMap` is not snapshot-consistent if concurrent
251    /// inserts are happening. Call this after all workers have finished.
252    pub fn iter(&self) -> impl Iterator<Item = (Category, CompactString, CompactString)> + '_ {
253        self.forward.iter().map(|entry| {
254            (
255                entry.key().category.clone(),
256                CompactString::new(&entry.key().original),
257                entry.value().clone(),
258            )
259        })
260    }
261}
262
263/// F-09 fix: zeroize original keys stored in the forward map on drop.
264/// This prevents sensitive plaintext values from lingering on the heap
265/// after the store is no longer needed. Uses safe Zeroize on Strings.
266impl Drop for MappingStore {
267    fn drop(&mut self) {
268        // DashMap::retain() visits each entry. We use it to overwrite
269        // the original plaintext before the entry is dropped.
270        // retain() gives us (&K, &mut V); the key is behind a shared
271        // reference.  We extract the original, create a zeroizing copy,
272        // then clear the map. This ensures the String backing buffer
273        // is zeroed before being freed.
274        //
275        // Approach: swap the map contents with an empty map, consuming
276        // ownership of all entries via IntoIter.
277        let old_map = std::mem::take(&mut self.forward);
278        for (mut key, _value) in old_map {
279            key.original.zeroize();
280        }
281    }
282}
283
284/// Compile-time assertion that a type is `Send + Sync`.
285macro_rules! static_assertions_send_sync {
286    ($t:ty) => {
287        const _: fn() = || {
288            fn assert_send<T: Send>() {}
289            fn assert_sync<T: Sync>() {}
290            assert_send::<$t>();
291            assert_sync::<$t>();
292        };
293    };
294}
295
296// DashMap is Send + Sync, AtomicUsize is Send + Sync,
297// Arc<dyn ReplacementGenerator> is Send + Sync.
298// This is satisfied automatically, but let's assert it:
299static_assertions_send_sync!(MappingStore);
300
301// ---------------------------------------------------------------------------
302// Unit tests
303// ---------------------------------------------------------------------------
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use crate::generator::{HmacGenerator, RandomGenerator};
309    use std::sync::Arc;
310
311    fn hmac_store(limit: Option<usize>) -> MappingStore {
312        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
313        MappingStore::new(gen, limit)
314    }
315
316    fn random_store() -> MappingStore {
317        let gen = Arc::new(RandomGenerator::new());
318        MappingStore::new(gen, None)
319    }
320
321    // --- Basic operations ---
322
323    #[test]
324    fn insert_and_lookup() {
325        let store = hmac_store(None);
326        let s1 = store
327            .get_or_insert(&Category::Email, "alice@corp.com")
328            .unwrap();
329        assert!(!s1.is_empty());
330        assert!(s1.contains("@corp.com"), "domain must be preserved");
331        assert_eq!(s1.len(), "alice@corp.com".len(), "length must be preserved");
332        assert_eq!(store.len(), 1);
333    }
334
335    #[test]
336    fn same_input_same_output() {
337        let store = hmac_store(None);
338        let s1 = store
339            .get_or_insert(&Category::Email, "alice@corp.com")
340            .unwrap();
341        let s2 = store
342            .get_or_insert(&Category::Email, "alice@corp.com")
343            .unwrap();
344        assert_eq!(s1, s2, "repeated insert must return cached value");
345        assert_eq!(store.len(), 1, "no duplicate entry");
346    }
347
348    #[test]
349    fn different_inputs_different_outputs() {
350        let store = hmac_store(None);
351        let s1 = store
352            .get_or_insert(&Category::Email, "alice@corp.com")
353            .unwrap();
354        let s2 = store
355            .get_or_insert(&Category::Email, "bob@corp.com")
356            .unwrap();
357        assert_ne!(s1, s2);
358        assert_eq!(store.len(), 2);
359    }
360
361    #[test]
362    fn different_categories_different_outputs() {
363        let store = hmac_store(None);
364        let s1 = store.get_or_insert(&Category::Email, "test").unwrap();
365        let s2 = store.get_or_insert(&Category::Name, "test").unwrap();
366        assert_ne!(s1, s2);
367    }
368
369    #[test]
370    fn forward_lookup_works() {
371        let store = hmac_store(None);
372        let sanitized = store.get_or_insert(&Category::IpV4, "192.168.1.1").unwrap();
373        let found = store.forward_lookup(&Category::IpV4, "192.168.1.1");
374        assert_eq!(found, Some(sanitized));
375    }
376
377    #[test]
378    fn forward_lookup_missing() {
379        let store = hmac_store(None);
380        assert!(store.forward_lookup(&Category::Email, "nope").is_none());
381    }
382
383    // --- Capacity limit ---
384
385    #[test]
386    fn capacity_limit_enforced() {
387        let store = hmac_store(Some(2));
388        store.get_or_insert(&Category::Email, "a@a.com").unwrap();
389        store.get_or_insert(&Category::Email, "b@b.com").unwrap();
390        let result = store.get_or_insert(&Category::Email, "c@c.com");
391        assert!(result.is_err());
392        match result.unwrap_err() {
393            SanitizeError::CapacityExceeded {
394                current: 2,
395                limit: 2,
396            } => {}
397            other => panic!("unexpected error: {:?}", other),
398        }
399    }
400
401    #[test]
402    fn capacity_limit_allows_duplicate() {
403        let store = hmac_store(Some(1));
404        store.get_or_insert(&Category::Email, "a@a.com").unwrap();
405        // Re-inserting same value should succeed (fast path).
406        let s2 = store.get_or_insert(&Category::Email, "a@a.com").unwrap();
407        assert!(!s2.is_empty());
408    }
409
410    // --- Random generator within store ---
411
412    #[test]
413    fn random_store_caches() {
414        let store = random_store();
415        let s1 = store
416            .get_or_insert(&Category::Email, "alice@corp.com")
417            .unwrap();
418        let s2 = store
419            .get_or_insert(&Category::Email, "alice@corp.com")
420            .unwrap();
421        assert_eq!(s1, s2, "random store must still cache the first result");
422    }
423
424    // --- Iteration ---
425
426    #[test]
427    fn iter_yields_all_mappings() {
428        let store = hmac_store(None);
429        store.get_or_insert(&Category::Email, "a@a.com").unwrap();
430        store.get_or_insert(&Category::IpV4, "1.2.3.4").unwrap();
431        let collected: Vec<_> = store.iter().collect();
432        assert_eq!(collected.len(), 2);
433    }
434
435    // --- Concurrent inserts (basic smoke test) ---
436
437    #[test]
438    fn concurrent_inserts_no_panic() {
439        use std::sync::Arc;
440        use std::thread;
441
442        let gen = Arc::new(HmacGenerator::new([99u8; 32]));
443        let store = Arc::new(MappingStore::new(gen, None));
444
445        let mut handles = vec![];
446        for t in 0..8 {
447            let store = Arc::clone(&store);
448            handles.push(thread::spawn(move || {
449                for i in 0..1000 {
450                    let val = format!("thread{}-val{}", t, i);
451                    store.get_or_insert(&Category::Email, &val).unwrap();
452                }
453            }));
454        }
455
456        for h in handles {
457            h.join().unwrap();
458        }
459
460        assert_eq!(store.len(), 8000);
461    }
462
463    #[test]
464    fn concurrent_inserts_same_key_idempotent() {
465        use std::sync::Arc;
466        use std::thread;
467
468        let gen = Arc::new(HmacGenerator::new([7u8; 32]));
469        let store = Arc::new(MappingStore::new(gen, None));
470
471        let mut handles = vec![];
472        for _ in 0..8 {
473            let store = Arc::clone(&store);
474            handles.push(thread::spawn(move || {
475                let mut results = Vec::new();
476                for i in 0..100 {
477                    let val = format!("shared-{}", i);
478                    let r = store.get_or_insert(&Category::Email, &val).unwrap();
479                    results.push((val, r));
480                }
481                results
482            }));
483        }
484
485        let mut all_results: Vec<Vec<(String, CompactString)>> = vec![];
486        for h in handles {
487            all_results.push(h.join().unwrap());
488        }
489
490        // All threads must agree on every mapping.
491        assert_eq!(store.len(), 100);
492        for i in 0..100 {
493            let val = format!("shared-{}", i);
494            let expected = store.forward_lookup(&Category::Email, &val).unwrap();
495            for thread_results in &all_results {
496                let (_, got) = &thread_results[i];
497                assert_eq!(
498                    got, &expected,
499                    "all threads must see the same mapping for {}",
500                    val
501                );
502            }
503        }
504    }
505}