Skip to main content

sanitize_engine/
store.rs

1//! Thread-safe, concurrent one-way replacement store.
2//!
3//! # Concurrency Model
4//!
5//! The store uses [`dashmap::DashMap`] — a concurrent hash map with shard-level
6//! locking (default 64 shards). This gives us:
7//!
8//! - **Lock-free reads** for lookups of already-mapped values.
9//! - **Shard-level write locks** that are held only while inserting a new entry.
10//!   With 64 shards and 8–16 threads, the probability of two threads contending
11//!   on the same shard is very low.
12//! - **Atomic get-or-insert** via the `entry()` API, which prevents TOCTOU races
13//!   and guarantees first-writer-wins semantics.
14//!
15//! The forward map is keyed by `(Category, original)` → `sanitized`.
16//! Replacements are **one-way only** — there is no reverse map, no mapping
17//! file, and no restore capability.
18//!
19//! # Memory Characteristics
20//!
21//! At 10M unique values with average key length 20 bytes and average value
22//! length 30 bytes:
23//! - Forward map: 10M × (20 + 30 + ~120 DashMap overhead) ≈ 1.7 GB
24//! - **Total: ~1.7 GB** — acceptable for server workloads.
25//!
26//! An optional `capacity_limit` can be set to prevent unbounded growth.
27
28use crate::category::Category;
29use crate::error::{Result, SanitizeError};
30use crate::generator::ReplacementGenerator;
31use compact_str::CompactString;
32use dashmap::DashMap;
33use std::sync::atomic::{AtomicUsize, Ordering};
34use std::sync::Arc;
35use zeroize::Zeroize;
36
37// ---------------------------------------------------------------------------
38// Composite key for the forward map
39// ---------------------------------------------------------------------------
40
41/// Composite key: `(Category, original_value)`.
42///
43/// Uses `String` for `original` (rather than `CompactString`) so that
44/// the Drop impl can zeroize sensitive plaintext via the `Zeroize`
45/// trait without unsafe code (F-09 fix).
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47struct ForwardKey {
48    category: Category,
49    original: String,
50}
51
52// ---------------------------------------------------------------------------
53// MappingStore
54// ---------------------------------------------------------------------------
55
56/// Thread-safe concurrent one-way replacement store.
57///
58/// Caches forward mappings for per-run consistency (same input always
59/// produces the same output within a run). There is no reverse map,
60/// no journal, and no persistence — replacements are one-way only.
61///
62/// See the [module-level documentation](self) for concurrency and memory details.
63pub struct MappingStore {
64    /// `(category, original) → sanitized`
65    forward: DashMap<ForwardKey, CompactString>,
66    /// Replacement generator (HMAC deterministic or CSPRNG random).
67    generator: Arc<dyn ReplacementGenerator>,
68    /// Current number of mappings (atomic for lock-free reads).
69    len: AtomicUsize,
70    /// Optional upper bound on the number of mappings.
71    capacity_limit: Option<usize>,
72}
73
74impl MappingStore {
75    // ---------------- Construction ----------------
76
77    /// Create a new, empty mapping store.
78    ///
79    /// # Arguments
80    ///
81    /// - `generator` — replacement strategy (HMAC or random).
82    /// - `capacity_limit` — optional max number of unique mappings.
83    #[must_use]
84    pub fn new(generator: Arc<dyn ReplacementGenerator>, capacity_limit: Option<usize>) -> Self {
85        Self {
86            forward: DashMap::with_capacity(1024),
87            generator,
88            len: AtomicUsize::new(0),
89            capacity_limit,
90        }
91    }
92
93    // ---------------- Core API ----------------
94
95    /// Get or create the sanitized replacement for `(category, original)`.
96    ///
97    /// This is the primary API for one-way sanitization.
98    ///
99    /// **Thread-safety:** Uses `DashMap::entry()` which holds a shard-level
100    /// lock only for the duration of the insert closure. The generator is
101    /// called inside the lock, but generation is fast (one HMAC or one RNG
102    /// call). Capacity enforcement uses `compare_exchange` to prevent
103    /// TOCTOU over-insertion (C-1 fix).
104    ///
105    /// **Per-run consistency:** Once a value is mapped, all subsequent
106    /// lookups return the same sanitized value (first-writer-wins).
107    ///
108    /// # Errors
109    ///
110    /// Returns [`SanitizeError::CapacityExceeded`] if the store has
111    /// reached its configured capacity limit.
112    pub fn get_or_insert(&self, category: &Category, original: &str) -> Result<CompactString> {
113        // Fast path: already mapped (lock-free read).
114        let key = ForwardKey {
115            category: category.clone(),
116            original: original.to_owned(),
117        };
118        if let Some(existing) = self.forward.get(&key) {
119            return Ok(existing.value().clone());
120        }
121
122        // Slow path: need to insert.
123        // Atomically reserve a capacity slot *before* generating the
124        // value.  This eliminates the TOCTOU race (C-1) where multiple
125        // threads pass the capacity check and all insert.
126        if let Some(limit) = self.capacity_limit {
127            loop {
128                let current = self.len.load(Ordering::Acquire);
129                if current >= limit {
130                    // One more chance: key may have been inserted by
131                    // another thread while we were checking.
132                    if let Some(existing) = self.forward.get(&key) {
133                        return Ok(existing.value().clone());
134                    }
135                    return Err(SanitizeError::CapacityExceeded { current, limit });
136                }
137                // Try to reserve a slot atomically.
138                if self
139                    .len
140                    .compare_exchange_weak(
141                        current,
142                        current + 1,
143                        Ordering::AcqRel,
144                        Ordering::Acquire,
145                    )
146                    .is_ok()
147                {
148                    break;
149                }
150                // CAS failed → another thread incremented; retry.
151            }
152
153            // Slot reserved — generate and insert.
154            // Use entry() for first-writer-wins semantics.
155            let mut was_inserted = false;
156            let result = self
157                .forward
158                .entry(key)
159                .or_insert_with(|| {
160                    was_inserted = true;
161                    let val = self.generator.generate(category, original);
162                    CompactString::new(val)
163                })
164                .value()
165                .clone();
166
167            if !was_inserted {
168                // Another thread inserted first — release our reserved slot.
169                self.len.fetch_sub(1, Ordering::Release);
170            }
171
172            Ok(result)
173        } else {
174            // No capacity limit — generate inside the entry lock to
175            // avoid wasted work (C-2 fix: only the first writer generates).
176            let mut was_inserted = false;
177            let result = self
178                .forward
179                .entry(key)
180                .or_insert_with(|| {
181                    was_inserted = true;
182                    let val = self.generator.generate(category, original);
183                    CompactString::new(val)
184                })
185                .value()
186                .clone();
187
188            if was_inserted {
189                self.len.fetch_add(1, Ordering::Release);
190            }
191
192            Ok(result)
193        }
194    }
195
196    /// Look up an existing forward mapping without creating one.
197    #[must_use]
198    pub fn forward_lookup(&self, category: &Category, original: &str) -> Option<CompactString> {
199        let key = ForwardKey {
200            category: category.clone(),
201            original: original.to_owned(),
202        };
203        self.forward.get(&key).map(|r| r.value().clone())
204    }
205
206    // ---------------- Metrics ----------------
207
208    /// Number of unique mappings in the store.
209    #[must_use]
210    pub fn len(&self) -> usize {
211        self.len.load(Ordering::Relaxed)
212    }
213
214    /// Whether the store is empty.
215    #[must_use]
216    pub fn is_empty(&self) -> bool {
217        self.len() == 0
218    }
219
220    /// Remove all mappings, zeroizing the original plaintexts.
221    ///
222    /// This is useful for resetting the store between runs without
223    /// dropping and recreating it.
224    pub fn clear(&mut self) {
225        let old_map = std::mem::take(&mut self.forward);
226        for (mut key, _value) in old_map {
227            key.original.zeroize();
228        }
229        self.len.store(0, Ordering::Release);
230    }
231
232    // ---------------- Snapshot / diff (for format-preserving pass) ----------------
233
234    /// Snapshot the set of `(category, original)` keys currently in the store.
235    ///
236    /// Call this before structured processing; diff against `iter()` afterwards
237    /// to find which mappings were added by the processor.
238    pub fn snapshot_keys(&self) -> std::collections::HashSet<(Category, String)> {
239        self.forward
240            .iter()
241            .map(|e| (e.key().category.clone(), e.key().original.clone()))
242            .collect()
243    }
244
245    // ---------------- Iteration (for external use) ----------------
246
247    /// Iterate over all mappings. Yields `(category, original, sanitized)`.
248    ///
249    /// Note: iteration over `DashMap` is not snapshot-consistent if concurrent
250    /// inserts are happening. Call this after all workers have finished.
251    pub fn iter(&self) -> impl Iterator<Item = (Category, CompactString, CompactString)> + '_ {
252        self.forward.iter().map(|entry| {
253            (
254                entry.key().category.clone(),
255                CompactString::new(&entry.key().original),
256                entry.value().clone(),
257            )
258        })
259    }
260}
261
262/// F-09 fix: zeroize original keys stored in the forward map on drop.
263/// This prevents sensitive plaintext values from lingering on the heap
264/// after the store is no longer needed. Uses safe Zeroize on Strings.
265impl Drop for MappingStore {
266    fn drop(&mut self) {
267        // DashMap::retain() visits each entry. We use it to overwrite
268        // the original plaintext before the entry is dropped.
269        // retain() gives us (&K, &mut V); the key is behind a shared
270        // reference.  We extract the original, create a zeroizing copy,
271        // then clear the map. This ensures the String backing buffer
272        // is zeroed before being freed.
273        //
274        // Approach: swap the map contents with an empty map, consuming
275        // ownership of all entries via IntoIter.
276        let old_map = std::mem::take(&mut self.forward);
277        for (mut key, _value) in old_map {
278            key.original.zeroize();
279        }
280    }
281}
282
283/// Compile-time assertion that a type is `Send + Sync`.
284macro_rules! static_assertions_send_sync {
285    ($t:ty) => {
286        const _: fn() = || {
287            fn assert_send<T: Send>() {}
288            fn assert_sync<T: Sync>() {}
289            assert_send::<$t>();
290            assert_sync::<$t>();
291        };
292    };
293}
294
295// DashMap is Send + Sync, AtomicUsize is Send + Sync,
296// Arc<dyn ReplacementGenerator> is Send + Sync.
297// This is satisfied automatically, but let's assert it:
298static_assertions_send_sync!(MappingStore);
299
300// ---------------------------------------------------------------------------
301// Unit tests
302// ---------------------------------------------------------------------------
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use crate::generator::{HmacGenerator, RandomGenerator};
308    use std::sync::Arc;
309
310    fn hmac_store(limit: Option<usize>) -> MappingStore {
311        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
312        MappingStore::new(gen, limit)
313    }
314
315    fn random_store() -> MappingStore {
316        let gen = Arc::new(RandomGenerator::new());
317        MappingStore::new(gen, None)
318    }
319
320    // --- Basic operations ---
321
322    #[test]
323    fn insert_and_lookup() {
324        let store = hmac_store(None);
325        let s1 = store
326            .get_or_insert(&Category::Email, "alice@corp.com")
327            .unwrap();
328        assert!(!s1.is_empty());
329        assert!(s1.contains("@corp.com"), "domain must be preserved");
330        assert_eq!(s1.len(), "alice@corp.com".len(), "length must be preserved");
331        assert_eq!(store.len(), 1);
332    }
333
334    #[test]
335    fn same_input_same_output() {
336        let store = hmac_store(None);
337        let s1 = store
338            .get_or_insert(&Category::Email, "alice@corp.com")
339            .unwrap();
340        let s2 = store
341            .get_or_insert(&Category::Email, "alice@corp.com")
342            .unwrap();
343        assert_eq!(s1, s2, "repeated insert must return cached value");
344        assert_eq!(store.len(), 1, "no duplicate entry");
345    }
346
347    #[test]
348    fn different_inputs_different_outputs() {
349        let store = hmac_store(None);
350        let s1 = store
351            .get_or_insert(&Category::Email, "alice@corp.com")
352            .unwrap();
353        let s2 = store
354            .get_or_insert(&Category::Email, "bob@corp.com")
355            .unwrap();
356        assert_ne!(s1, s2);
357        assert_eq!(store.len(), 2);
358    }
359
360    #[test]
361    fn different_categories_different_outputs() {
362        let store = hmac_store(None);
363        let s1 = store.get_or_insert(&Category::Email, "test").unwrap();
364        let s2 = store.get_or_insert(&Category::Name, "test").unwrap();
365        assert_ne!(s1, s2);
366    }
367
368    #[test]
369    fn forward_lookup_works() {
370        let store = hmac_store(None);
371        let sanitized = store.get_or_insert(&Category::IpV4, "192.168.1.1").unwrap();
372        let found = store.forward_lookup(&Category::IpV4, "192.168.1.1");
373        assert_eq!(found, Some(sanitized));
374    }
375
376    #[test]
377    fn forward_lookup_missing() {
378        let store = hmac_store(None);
379        assert!(store.forward_lookup(&Category::Email, "nope").is_none());
380    }
381
382    // --- Capacity limit ---
383
384    #[test]
385    fn capacity_limit_enforced() {
386        let store = hmac_store(Some(2));
387        store.get_or_insert(&Category::Email, "a@a.com").unwrap();
388        store.get_or_insert(&Category::Email, "b@b.com").unwrap();
389        let result = store.get_or_insert(&Category::Email, "c@c.com");
390        assert!(result.is_err());
391        match result.unwrap_err() {
392            SanitizeError::CapacityExceeded {
393                current: 2,
394                limit: 2,
395            } => {}
396            other => panic!("unexpected error: {:?}", other),
397        }
398    }
399
400    #[test]
401    fn capacity_limit_allows_duplicate() {
402        let store = hmac_store(Some(1));
403        store.get_or_insert(&Category::Email, "a@a.com").unwrap();
404        // Re-inserting same value should succeed (fast path).
405        let s2 = store.get_or_insert(&Category::Email, "a@a.com").unwrap();
406        assert!(!s2.is_empty());
407    }
408
409    // --- Random generator within store ---
410
411    #[test]
412    fn random_store_caches() {
413        let store = random_store();
414        let s1 = store
415            .get_or_insert(&Category::Email, "alice@corp.com")
416            .unwrap();
417        let s2 = store
418            .get_or_insert(&Category::Email, "alice@corp.com")
419            .unwrap();
420        assert_eq!(s1, s2, "random store must still cache the first result");
421    }
422
423    // --- Iteration ---
424
425    #[test]
426    fn iter_yields_all_mappings() {
427        let store = hmac_store(None);
428        store.get_or_insert(&Category::Email, "a@a.com").unwrap();
429        store.get_or_insert(&Category::IpV4, "1.2.3.4").unwrap();
430        let collected: Vec<_> = store.iter().collect();
431        assert_eq!(collected.len(), 2);
432    }
433
434    // --- Concurrent inserts (basic smoke test) ---
435
436    #[test]
437    fn concurrent_inserts_no_panic() {
438        use std::sync::Arc;
439        use std::thread;
440
441        let gen = Arc::new(HmacGenerator::new([99u8; 32]));
442        let store = Arc::new(MappingStore::new(gen, None));
443
444        let mut handles = vec![];
445        for t in 0..8 {
446            let store = Arc::clone(&store);
447            handles.push(thread::spawn(move || {
448                for i in 0..1000 {
449                    let val = format!("thread{}-val{}", t, i);
450                    store.get_or_insert(&Category::Email, &val).unwrap();
451                }
452            }));
453        }
454
455        for h in handles {
456            h.join().unwrap();
457        }
458
459        assert_eq!(store.len(), 8000);
460    }
461
462    #[test]
463    fn concurrent_inserts_same_key_idempotent() {
464        use std::sync::Arc;
465        use std::thread;
466
467        let gen = Arc::new(HmacGenerator::new([7u8; 32]));
468        let store = Arc::new(MappingStore::new(gen, None));
469
470        let mut handles = vec![];
471        for _ in 0..8 {
472            let store = Arc::clone(&store);
473            handles.push(thread::spawn(move || {
474                let mut results = Vec::new();
475                for i in 0..100 {
476                    let val = format!("shared-{}", i);
477                    let r = store.get_or_insert(&Category::Email, &val).unwrap();
478                    results.push((val, r));
479                }
480                results
481            }));
482        }
483
484        let mut all_results: Vec<Vec<(String, CompactString)>> = vec![];
485        for h in handles {
486            all_results.push(h.join().unwrap());
487        }
488
489        // All threads must agree on every mapping.
490        assert_eq!(store.len(), 100);
491        for i in 0..100 {
492            let val = format!("shared-{}", i);
493            let expected = store.forward_lookup(&Category::Email, &val).unwrap();
494            for thread_results in &all_results {
495                let (_, got) = &thread_results[i];
496                assert_eq!(
497                    got, &expected,
498                    "all threads must see the same mapping for {}",
499                    val
500                );
501            }
502        }
503    }
504}