Skip to main content

sanitize_engine/
store.rs

1//! Thread-safe, concurrent one-way replacement store.
2//!
3//! # Concurrency Model
4//!
5//! The store uses [`dashmap::DashMap`] — a concurrent hash map with shard-level
6//! locking (default 64 shards). This gives us:
7//!
8//! - **Lock-free reads** for lookups of already-mapped values.
9//! - **Shard-level write locks** that are held only while inserting a new entry.
10//!   With 64 shards and 8–16 threads, the probability of two threads contending
11//!   on the same shard is very low.
12//! - **Atomic get-or-insert** via the `entry()` API, which prevents TOCTOU races
13//!   and guarantees first-writer-wins semantics.
14//!
15//! # Structure
16//!
17//! The forward map is two-level: `Category → original → sanitized`.
18//!
19//! ```text
20//! DashMap<Category, Arc<DashMap<ZeroizingString, (CompactString, usize)>>>
21//!    outer (~20 entries, always hot in cache)
22//!               └── inner (one per category, holds the actual values)
23//! ```
24//!
25//! This lets the fast-path read call `inner.get(original: &str)` without
26//! constructing a temporary `String`, because `ZeroizingString: Borrow<str>`.
27//! For files where the same value appears thousands of times, this eliminates
28//! thousands of `malloc`/`free` cycles on the hot path.
29//!
30//! Replacements are **one-way only** — there is no reverse map, no mapping
31//! file, and no restore capability.
32//!
33//! # Memory Characteristics
34//!
35//! At 10M unique values with average key length 20 bytes and average value
36//! length 30 bytes:
37//! - Forward map: 10M × (20 + 30 + ~120 DashMap overhead) ≈ 1.7 GB
38//! - **Total: ~1.7 GB** — acceptable for server workloads.
39//!
40//! An optional `capacity_limit` can be set to prevent unbounded growth.
41
42use crate::allowlist::AllowlistMatcher;
43use crate::category::Category;
44use crate::error::{Result, SanitizeError};
45use crate::generator::ReplacementGenerator;
46use compact_str::CompactString;
47use dashmap::DashMap;
48use std::borrow::Borrow;
49use std::sync::atomic::{AtomicUsize, Ordering};
50use std::sync::Arc;
51use zeroize::Zeroize;
52
53// ---------------------------------------------------------------------------
54// ZeroizingString — map key for the inner (per-category) DashMap
55// ---------------------------------------------------------------------------
56
57/// A `String` that zeroizes its heap buffer on drop.
58///
59/// `Zeroizing<String>` from the `zeroize` crate does not implement `Hash`,
60/// so it cannot be used as a `HashMap` key. This newtype adds `Hash` while
61/// keeping the zeroize-on-drop guarantee via an explicit `Drop` impl.
62///
63/// Implementing `Borrow<str>` allows `DashMap<ZeroizingString, _>::get(s: &str)`
64/// to work without constructing a temporary `ZeroizingString` — the key insight
65/// that makes the fast-path read allocation-free.
66#[derive(Debug, Clone, PartialEq, Eq)]
67struct ZeroizingString(String);
68
69impl std::hash::Hash for ZeroizingString {
70    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
71        self.0.hash(state);
72    }
73}
74
75impl Drop for ZeroizingString {
76    fn drop(&mut self) {
77        self.0.zeroize();
78    }
79}
80
81/// Enables `DashMap<ZeroizingString, _>::get(s: &str)` — zero allocation on
82/// cache hits. Correct because `ZeroizingString` delegates `Hash` and `Eq`
83/// to its inner `String`, which is consistent with `str`'s `Hash` and `Eq`.
84impl Borrow<str> for ZeroizingString {
85    fn borrow(&self) -> &str {
86        &self.0
87    }
88}
89
90// ---------------------------------------------------------------------------
91// Convenience type alias for the inner map
92// ---------------------------------------------------------------------------
93
94type InnerMap = DashMap<ZeroizingString, (CompactString, usize)>;
95
96// ---------------------------------------------------------------------------
97// MappingStore
98// ---------------------------------------------------------------------------
99
100/// Thread-safe concurrent one-way replacement store.
101///
102/// Caches forward mappings for per-run consistency (same input always
103/// produces the same output within a run). There is no reverse map,
104/// no journal, and no persistence — replacements are one-way only.
105///
106/// See the [module-level documentation](self) for concurrency and memory details.
107pub struct MappingStore {
108    /// `category → original → (sanitized, insertion_index)`
109    ///
110    /// Two-level map: outer is keyed by `Category` (tiny, always in cache),
111    /// inner is keyed by `ZeroizingString` (actual values). The inner map is
112    /// behind an `Arc` so it can be obtained without holding the outer shard
113    /// lock during inner map operations.
114    forward: DashMap<Category, Arc<InnerMap>>,
115    /// Replacement generator (HMAC deterministic or CSPRNG random).
116    generator: Arc<dyn ReplacementGenerator>,
117    /// Current number of mappings (atomic for lock-free reads).
118    len: AtomicUsize,
119    /// Optional upper bound on the number of mappings.
120    capacity_limit: Option<usize>,
121    /// Optional allowlist — matched values pass through unchanged and are
122    /// not recorded in the forward map.
123    allowlist: Option<Arc<AllowlistMatcher>>,
124}
125
126impl MappingStore {
127    // ---------------- Construction ----------------
128
129    /// Create a new, empty mapping store.
130    ///
131    /// # Arguments
132    ///
133    /// - `generator` — replacement strategy (HMAC or random).
134    /// - `capacity_limit` — optional max number of unique mappings.
135    #[must_use]
136    pub fn new(generator: Arc<dyn ReplacementGenerator>, capacity_limit: Option<usize>) -> Self {
137        Self {
138            forward: DashMap::with_capacity(32),
139            generator,
140            len: AtomicUsize::new(0),
141            capacity_limit,
142            allowlist: None,
143        }
144    }
145
146    /// Create a new store with an allowlist. Values matching the allowlist
147    /// are returned unchanged and never recorded in the forward map.
148    #[must_use]
149    pub fn new_with_allowlist(
150        generator: Arc<dyn ReplacementGenerator>,
151        capacity_limit: Option<usize>,
152        allowlist: Arc<AllowlistMatcher>,
153    ) -> Self {
154        Self {
155            forward: DashMap::with_capacity(32),
156            generator,
157            len: AtomicUsize::new(0),
158            capacity_limit,
159            allowlist: Some(allowlist),
160        }
161    }
162
163    /// Return the allowlist attached to this store, if any.
164    pub fn allowlist(&self) -> Option<&Arc<AllowlistMatcher>> {
165        self.allowlist.as_ref()
166    }
167
168    // ---------------- Core API ----------------
169
170    /// Get or create the sanitized replacement for `(category, original)`.
171    ///
172    /// This is the primary API for one-way sanitization.
173    ///
174    /// **Hot-path allocation:** When the value is already cached, this method
175    /// is allocation-free. The inner `DashMap::get` accepts `&str` directly via
176    /// `ZeroizingString: Borrow<str>`, so no temporary `String` is constructed.
177    ///
178    /// **Thread-safety:** Uses `DashMap::entry()` which holds a shard-level
179    /// lock only for the duration of the insert closure. The generator is
180    /// called inside the lock, but generation is fast (one HMAC or one RNG
181    /// call). Capacity enforcement uses `compare_exchange` to prevent
182    /// TOCTOU over-insertion.
183    ///
184    /// **Per-run consistency:** Once a value is mapped, all subsequent
185    /// lookups return the same sanitized value (first-writer-wins).
186    ///
187    /// # Errors
188    ///
189    /// Returns [`SanitizeError::CapacityExceeded`] if the store has
190    /// reached its configured capacity limit.
191    pub fn get_or_insert(&self, category: &Category, original: &str) -> Result<CompactString> {
192        // Allowlist check: return the original value unchanged without recording it.
193        if let Some(al) = &self.allowlist {
194            if al.is_allowed(original) {
195                return Ok(CompactString::new(original));
196            }
197        }
198
199        // Fast path: already mapped — zero allocation.
200        // `inner.get(original)` accepts `&str` via `ZeroizingString: Borrow<str>`.
201        if let Some(inner) = self.forward.get(category) {
202            if let Some(existing) = inner.value().get(original) {
203                return Ok(existing.value().0.clone());
204            }
205        }
206
207        // Slow path: get or create the inner map for this category, then insert.
208        // Try a read-lock get first — inner maps are stable once created, so
209        // this is the common case for any category seen before. Only fall back
210        // to the write-locking entry() when creating a category's map for the
211        // first time (happens at most once per category per run).
212        let inner: Arc<InnerMap> = if let Some(existing) = self.forward.get(category) {
213            existing.value().clone()
214        } else {
215            self.forward
216                .entry(category.clone())
217                .or_insert_with(|| Arc::new(DashMap::new()))
218                .value()
219                .clone()
220        };
221
222        if let Some(limit) = self.capacity_limit {
223            // Atomically reserve a capacity slot *before* generating the value.
224            // This eliminates the TOCTOU race where multiple threads pass the
225            // capacity check and all insert.
226            loop {
227                let current = self.len.load(Ordering::Acquire);
228                if current >= limit {
229                    // One more chance: key may have been inserted by another thread.
230                    if let Some(existing) = inner.get(original) {
231                        return Ok(existing.value().0.clone());
232                    }
233                    return Err(SanitizeError::CapacityExceeded { current, limit });
234                }
235                if self
236                    .len
237                    .compare_exchange_weak(
238                        current,
239                        current + 1,
240                        Ordering::AcqRel,
241                        Ordering::Acquire,
242                    )
243                    .is_ok()
244                {
245                    break;
246                }
247                // CAS failed → another thread incremented; retry.
248            }
249
250            // Slot reserved — generate and insert (first-writer-wins).
251            let mut was_inserted = false;
252            let insertion_index = self.len.load(Ordering::Acquire).saturating_sub(1);
253            let result = inner
254                .entry(ZeroizingString(original.to_owned()))
255                .or_insert_with(|| {
256                    was_inserted = true;
257                    let val = self.generator.generate(category, original);
258                    (CompactString::new(val), insertion_index)
259                })
260                .value()
261                .0
262                .clone();
263
264            if !was_inserted {
265                // Another thread inserted first — release our reserved slot.
266                self.len.fetch_sub(1, Ordering::Release);
267            }
268
269            Ok(result)
270        } else {
271            // No capacity limit — generate inside the entry lock so only the
272            // first writer calls the generator (first-writer-wins semantics).
273            let result = inner
274                .entry(ZeroizingString(original.to_owned()))
275                .or_insert_with(|| {
276                    let insertion_index = self.len.fetch_add(1, Ordering::AcqRel);
277                    let val = self.generator.generate(category, original);
278                    (CompactString::new(val), insertion_index)
279                })
280                .value()
281                .0
282                .clone();
283
284            Ok(result)
285        }
286    }
287
288    /// Look up an existing forward mapping without creating one.
289    #[must_use]
290    pub fn forward_lookup(&self, category: &Category, original: &str) -> Option<CompactString> {
291        let inner = self.forward.get(category)?;
292        inner.value().get(original).map(|r| r.value().0.clone())
293    }
294
295    // ---------------- Metrics ----------------
296
297    /// Number of unique mappings in the store.
298    #[must_use]
299    pub fn len(&self) -> usize {
300        self.len.load(Ordering::Relaxed)
301    }
302
303    /// Whether the store is empty.
304    #[must_use]
305    pub fn is_empty(&self) -> bool {
306        self.len() == 0
307    }
308
309    /// Remove all mappings, zeroizing the original plaintexts.
310    ///
311    /// This is useful for resetting the store between runs without
312    /// dropping and recreating it.
313    pub fn clear(&mut self) {
314        // Dropping the map entries triggers ZeroizingString::drop on each inner key.
315        // If any cloned Arcs are still live (e.g., in a concurrent thread's stack),
316        // those inner maps survive until the last Arc drops — but `clear` is only
317        // called after all workers have finished, so this is safe in practice.
318        drop(std::mem::take(&mut self.forward));
319        self.len.store(0, Ordering::Release);
320    }
321
322    // ---------------- Snapshot / diff (for format-preserving pass) ----------------
323
324    /// Snapshot the current insertion count.
325    ///
326    /// Returns an opaque `usize` that can be passed to [`Self::iter_since`] to
327    /// iterate only the entries added *after* this point — useful for
328    /// finding which mappings a structured processor pass discovered without
329    /// building a full `HashSet` of all existing keys.
330    ///
331    /// O(1), no allocation.
332    #[must_use]
333    pub fn snapshot(&self) -> usize {
334        self.len.load(Ordering::Acquire)
335    }
336
337    /// Iterate over entries added at or after the given snapshot.
338    ///
339    /// `snapshot` is the value returned by a previous call to [`Self::snapshot`].
340    /// Entries whose insertion index is ≥ `snapshot` are yielded; older
341    /// entries are skipped. Still O(n) in total store size, but avoids
342    /// allocating a `HashSet` of all prior keys.
343    pub fn iter_since(
344        &self,
345        snapshot: usize,
346    ) -> impl Iterator<Item = (Category, CompactString, CompactString)> + '_ {
347        self.forward.iter().flat_map(move |outer| {
348            let cat = outer.key().clone();
349            outer
350                .value()
351                .iter()
352                .filter_map(move |inner| {
353                    let (sanitized, idx) = inner.value();
354                    if *idx >= snapshot {
355                        Some((
356                            cat.clone(),
357                            CompactString::new(inner.key().0.as_str()),
358                            sanitized.clone(),
359                        ))
360                    } else {
361                        None
362                    }
363                })
364                .collect::<Vec<_>>()
365        })
366    }
367
368    // ---------------- Iteration (for external use) ----------------
369
370    /// Iterate over all mappings. Yields `(category, original, sanitized)`.
371    ///
372    /// Note: iteration over `DashMap` is not snapshot-consistent if concurrent
373    /// inserts are happening. Call this after all workers have finished.
374    pub fn iter(&self) -> impl Iterator<Item = (Category, CompactString, CompactString)> + '_ {
375        self.forward.iter().flat_map(|outer| {
376            let cat = outer.key().clone();
377            outer
378                .value()
379                .iter()
380                .map(move |inner| {
381                    (
382                        cat.clone(),
383                        CompactString::new(inner.key().0.as_str()),
384                        inner.value().0.clone(),
385                    )
386                })
387                .collect::<Vec<_>>()
388        })
389    }
390}
391
392/// Zeroize original keys stored in the forward map on drop.
393/// Dropping the outer `DashMap` triggers `Arc::drop` for each inner map; when
394/// the last Arc drops, `ZeroizingString::drop` runs for every key, overwriting
395/// the plaintext before the memory is freed.
396impl Drop for MappingStore {
397    fn drop(&mut self) {
398        drop(std::mem::take(&mut self.forward));
399    }
400}
401
402/// Compile-time assertion that a type is `Send + Sync`.
403macro_rules! static_assertions_send_sync {
404    ($t:ty) => {
405        const _: fn() = || {
406            fn assert_send<T: Send>() {}
407            fn assert_sync<T: Sync>() {}
408            assert_send::<$t>();
409            assert_sync::<$t>();
410        };
411    };
412}
413
414static_assertions_send_sync!(MappingStore);
415
416// ---------------------------------------------------------------------------
417// Unit tests
418// ---------------------------------------------------------------------------
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423    use crate::generator::{HmacGenerator, RandomGenerator};
424    use std::sync::Arc;
425
426    fn hmac_store(limit: Option<usize>) -> MappingStore {
427        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
428        MappingStore::new(gen, limit)
429    }
430
431    fn random_store() -> MappingStore {
432        let gen = Arc::new(RandomGenerator::new());
433        MappingStore::new(gen, None)
434    }
435
436    // --- Basic operations ---
437
438    #[test]
439    fn insert_and_lookup() {
440        let store = hmac_store(None);
441        let s1 = store
442            .get_or_insert(&Category::Email, "alice@corp.com")
443            .unwrap();
444        assert!(!s1.is_empty());
445        assert!(s1.contains("@corp.com"), "domain must be preserved");
446        assert_eq!(s1.len(), "alice@corp.com".len(), "length must be preserved");
447        assert_eq!(store.len(), 1);
448    }
449
450    #[test]
451    fn same_input_same_output() {
452        let store = hmac_store(None);
453        let s1 = store
454            .get_or_insert(&Category::Email, "alice@corp.com")
455            .unwrap();
456        let s2 = store
457            .get_or_insert(&Category::Email, "alice@corp.com")
458            .unwrap();
459        assert_eq!(s1, s2, "repeated insert must return cached value");
460        assert_eq!(store.len(), 1, "no duplicate entry");
461    }
462
463    #[test]
464    fn different_inputs_different_outputs() {
465        let store = hmac_store(None);
466        let s1 = store
467            .get_or_insert(&Category::Email, "alice@corp.com")
468            .unwrap();
469        let s2 = store
470            .get_or_insert(&Category::Email, "bob@corp.com")
471            .unwrap();
472        assert_ne!(s1, s2);
473        assert_eq!(store.len(), 2);
474    }
475
476    #[test]
477    fn different_categories_different_outputs() {
478        let store = hmac_store(None);
479        let s1 = store.get_or_insert(&Category::Email, "test").unwrap();
480        let s2 = store.get_or_insert(&Category::Name, "test").unwrap();
481        assert_ne!(s1, s2);
482    }
483
484    #[test]
485    fn forward_lookup_works() {
486        let store = hmac_store(None);
487        let sanitized = store.get_or_insert(&Category::IpV4, "192.168.1.1").unwrap();
488        let found = store.forward_lookup(&Category::IpV4, "192.168.1.1");
489        assert_eq!(found, Some(sanitized));
490    }
491
492    #[test]
493    fn forward_lookup_missing() {
494        let store = hmac_store(None);
495        assert!(store.forward_lookup(&Category::Email, "nope").is_none());
496    }
497
498    // --- Capacity limit ---
499
500    #[test]
501    fn capacity_limit_enforced() {
502        let store = hmac_store(Some(2));
503        store.get_or_insert(&Category::Email, "a@a.com").unwrap();
504        store.get_or_insert(&Category::Email, "b@b.com").unwrap();
505        let result = store.get_or_insert(&Category::Email, "c@c.com");
506        assert!(result.is_err());
507        match result.unwrap_err() {
508            SanitizeError::CapacityExceeded {
509                current: 2,
510                limit: 2,
511            } => {}
512            other => panic!("unexpected error: {:?}", other),
513        }
514    }
515
516    #[test]
517    fn capacity_limit_allows_duplicate() {
518        let store = hmac_store(Some(1));
519        store.get_or_insert(&Category::Email, "a@a.com").unwrap();
520        // Re-inserting same value should succeed (fast path).
521        let s2 = store.get_or_insert(&Category::Email, "a@a.com").unwrap();
522        assert!(!s2.is_empty());
523    }
524
525    // --- Random generator within store ---
526
527    #[test]
528    fn random_store_caches() {
529        let store = random_store();
530        let s1 = store
531            .get_or_insert(&Category::Email, "alice@corp.com")
532            .unwrap();
533        let s2 = store
534            .get_or_insert(&Category::Email, "alice@corp.com")
535            .unwrap();
536        assert_eq!(s1, s2, "random store must still cache the first result");
537    }
538
539    // --- Iteration ---
540
541    #[test]
542    fn iter_yields_all_mappings() {
543        let store = hmac_store(None);
544        store.get_or_insert(&Category::Email, "a@a.com").unwrap();
545        store.get_or_insert(&Category::IpV4, "1.2.3.4").unwrap();
546        let collected: Vec<_> = store.iter().collect();
547        assert_eq!(collected.len(), 2);
548    }
549
550    // --- Concurrent inserts (basic smoke test) ---
551
552    #[test]
553    fn concurrent_inserts_no_panic() {
554        use std::sync::Arc;
555        use std::thread;
556
557        let gen = Arc::new(HmacGenerator::new([99u8; 32]));
558        let store = Arc::new(MappingStore::new(gen, None));
559
560        let mut handles = vec![];
561        for t in 0..8 {
562            let store = Arc::clone(&store);
563            handles.push(thread::spawn(move || {
564                for i in 0..1000 {
565                    let val = format!("thread{}-val{}", t, i);
566                    store.get_or_insert(&Category::Email, &val).unwrap();
567                }
568            }));
569        }
570
571        for h in handles {
572            h.join().unwrap();
573        }
574
575        assert_eq!(store.len(), 8000);
576    }
577
578    #[test]
579    fn concurrent_inserts_same_key_idempotent() {
580        use std::sync::Arc;
581        use std::thread;
582
583        let gen = Arc::new(HmacGenerator::new([7u8; 32]));
584        let store = Arc::new(MappingStore::new(gen, None));
585
586        let mut handles = vec![];
587        for _ in 0..8 {
588            let store = Arc::clone(&store);
589            handles.push(thread::spawn(move || {
590                let mut results = Vec::new();
591                for i in 0..100 {
592                    let val = format!("shared-{}", i);
593                    let r = store.get_or_insert(&Category::Email, &val).unwrap();
594                    results.push((val, r));
595                }
596                results
597            }));
598        }
599
600        let mut all_results: Vec<Vec<(String, CompactString)>> = vec![];
601        for h in handles {
602            all_results.push(h.join().unwrap());
603        }
604
605        // All threads must agree on every mapping.
606        assert_eq!(store.len(), 100);
607        for i in 0..100 {
608            let val = format!("shared-{}", i);
609            let expected = store.forward_lookup(&Category::Email, &val).unwrap();
610            for thread_results in &all_results {
611                let (_, got) = &thread_results[i];
612                assert_eq!(
613                    got, &expected,
614                    "all threads must see the same mapping for {}",
615                    val
616                );
617            }
618        }
619    }
620}