sanitize_engine/store.rs
1//! Thread-safe, concurrent one-way replacement store.
2//!
3//! # Concurrency Model
4//!
5//! The store uses [`dashmap::DashMap`] — a concurrent hash map with shard-level
6//! locking (default 64 shards). This gives us:
7//!
8//! - **Lock-free reads** for lookups of already-mapped values.
9//! - **Shard-level write locks** that are held only while inserting a new entry.
10//! With 64 shards and 8–16 threads, the probability of two threads contending
11//! on the same shard is very low.
12//! - **Atomic get-or-insert** via the `entry()` API, which prevents TOCTOU races
13//! and guarantees first-writer-wins semantics.
14//!
15//! # Structure
16//!
17//! The forward map is two-level: `Category → original → sanitized`.
18//!
19//! ```text
20//! DashMap<Category, Arc<DashMap<ZeroizingString, (CompactString, usize)>>>
21//! outer (~20 entries, always hot in cache)
22//! └── inner (one per category, holds the actual values)
23//! ```
24//!
25//! This lets the fast-path read call `inner.get(original: &str)` without
26//! constructing a temporary `String`, because `ZeroizingString: Borrow<str>`.
27//! For files where the same value appears thousands of times, this eliminates
28//! thousands of `malloc`/`free` cycles on the hot path.
29//!
30//! Replacements are **one-way only** — there is no reverse map, no mapping
31//! file, and no restore capability.
32//!
33//! # Memory Characteristics
34//!
35//! At 10M unique values with average key length 20 bytes and average value
36//! length 30 bytes:
37//! - Forward map: 10M × (20 + 30 + ~120 DashMap overhead) ≈ 1.7 GB
38//! - **Total: ~1.7 GB** — acceptable for server workloads.
39//!
40//! An optional `capacity_limit` can be set to prevent unbounded growth.
41
42use crate::allowlist::AllowlistMatcher;
43use crate::category::Category;
44use crate::error::{Result, SanitizeError};
45use crate::generator::ReplacementGenerator;
46use compact_str::CompactString;
47use dashmap::DashMap;
48use std::borrow::Borrow;
49use std::sync::atomic::{AtomicUsize, Ordering};
50use std::sync::Arc;
51use zeroize::Zeroize;
52
53// ---------------------------------------------------------------------------
54// ZeroizingString — map key for the inner (per-category) DashMap
55// ---------------------------------------------------------------------------
56
57/// A `String` that zeroizes its heap buffer on drop.
58///
59/// `Zeroizing<String>` from the `zeroize` crate does not implement `Hash`,
60/// so it cannot be used as a `HashMap` key. This newtype adds `Hash` while
61/// keeping the zeroize-on-drop guarantee via an explicit `Drop` impl.
62///
63/// Implementing `Borrow<str>` allows `DashMap<ZeroizingString, _>::get(s: &str)`
64/// to work without constructing a temporary `ZeroizingString` — the key insight
65/// that makes the fast-path read allocation-free.
66#[derive(Debug, Clone, PartialEq, Eq)]
67struct ZeroizingString(String);
68
69impl std::hash::Hash for ZeroizingString {
70 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
71 self.0.hash(state);
72 }
73}
74
75impl Drop for ZeroizingString {
76 fn drop(&mut self) {
77 self.0.zeroize();
78 }
79}
80
81/// Enables `DashMap<ZeroizingString, _>::get(s: &str)` — zero allocation on
82/// cache hits. Correct because `ZeroizingString` delegates `Hash` and `Eq`
83/// to its inner `String`, which is consistent with `str`'s `Hash` and `Eq`.
84impl Borrow<str> for ZeroizingString {
85 fn borrow(&self) -> &str {
86 &self.0
87 }
88}
89
90// ---------------------------------------------------------------------------
91// Convenience type alias for the inner map
92// ---------------------------------------------------------------------------
93
94type InnerMap = DashMap<ZeroizingString, (CompactString, usize)>;
95
96// ---------------------------------------------------------------------------
97// MappingStore
98// ---------------------------------------------------------------------------
99
100/// Thread-safe concurrent one-way replacement store.
101///
102/// Caches forward mappings for per-run consistency (same input always
103/// produces the same output within a run). There is no reverse map,
104/// no journal, and no persistence — replacements are one-way only.
105///
106/// See the [module-level documentation](self) for concurrency and memory details.
107pub struct MappingStore {
108 /// `category → original → (sanitized, insertion_index)`
109 ///
110 /// Two-level map: outer is keyed by `Category` (tiny, always in cache),
111 /// inner is keyed by `ZeroizingString` (actual values). The inner map is
112 /// behind an `Arc` so it can be obtained without holding the outer shard
113 /// lock during inner map operations.
114 forward: DashMap<Category, Arc<InnerMap>>,
115 /// Replacement generator (HMAC deterministic or CSPRNG random).
116 generator: Arc<dyn ReplacementGenerator>,
117 /// Current number of mappings (atomic for lock-free reads).
118 len: AtomicUsize,
119 /// Optional upper bound on the number of mappings.
120 capacity_limit: Option<usize>,
121 /// Optional allowlist — matched values pass through unchanged and are
122 /// not recorded in the forward map.
123 allowlist: Option<Arc<AllowlistMatcher>>,
124}
125
126impl MappingStore {
127 // ---------------- Construction ----------------
128
129 /// Create a new, empty mapping store.
130 ///
131 /// # Arguments
132 ///
133 /// - `generator` — replacement strategy (HMAC or random).
134 /// - `capacity_limit` — optional max number of unique mappings.
135 #[must_use]
136 pub fn new(generator: Arc<dyn ReplacementGenerator>, capacity_limit: Option<usize>) -> Self {
137 Self {
138 forward: DashMap::with_capacity(32),
139 generator,
140 len: AtomicUsize::new(0),
141 capacity_limit,
142 allowlist: None,
143 }
144 }
145
146 /// Create a new store with an allowlist. Values matching the allowlist
147 /// are returned unchanged and never recorded in the forward map.
148 #[must_use]
149 pub fn new_with_allowlist(
150 generator: Arc<dyn ReplacementGenerator>,
151 capacity_limit: Option<usize>,
152 allowlist: Arc<AllowlistMatcher>,
153 ) -> Self {
154 Self {
155 forward: DashMap::with_capacity(32),
156 generator,
157 len: AtomicUsize::new(0),
158 capacity_limit,
159 allowlist: Some(allowlist),
160 }
161 }
162
163 /// Return the allowlist attached to this store, if any.
164 pub fn allowlist(&self) -> Option<&Arc<AllowlistMatcher>> {
165 self.allowlist.as_ref()
166 }
167
168 // ---------------- Core API ----------------
169
170 /// Get or create the sanitized replacement for `(category, original)`.
171 ///
172 /// This is the primary API for one-way sanitization.
173 ///
174 /// **Hot-path allocation:** When the value is already cached, this method
175 /// is allocation-free. The inner `DashMap::get` accepts `&str` directly via
176 /// `ZeroizingString: Borrow<str>`, so no temporary `String` is constructed.
177 ///
178 /// **Thread-safety:** Uses `DashMap::entry()` which holds a shard-level
179 /// lock only for the duration of the insert closure. The generator is
180 /// called inside the lock, but generation is fast (one HMAC or one RNG
181 /// call). Capacity enforcement uses `compare_exchange` to prevent
182 /// TOCTOU over-insertion.
183 ///
184 /// **Per-run consistency:** Once a value is mapped, all subsequent
185 /// lookups return the same sanitized value (first-writer-wins).
186 ///
187 /// # Errors
188 ///
189 /// Returns [`SanitizeError::CapacityExceeded`] if the store has
190 /// reached its configured capacity limit.
191 pub fn get_or_insert(&self, category: &Category, original: &str) -> Result<CompactString> {
192 // Allowlist check: return the original value unchanged without recording it.
193 if let Some(al) = &self.allowlist {
194 if al.is_allowed(original) {
195 return Ok(CompactString::new(original));
196 }
197 }
198
199 // Fast path: already mapped — zero allocation.
200 // `inner.get(original)` accepts `&str` via `ZeroizingString: Borrow<str>`.
201 if let Some(inner) = self.forward.get(category) {
202 if let Some(existing) = inner.value().get(original) {
203 return Ok(existing.value().0.clone());
204 }
205 }
206
207 // Slow path: get or create the inner map for this category, then insert.
208 // Try a read-lock get first — inner maps are stable once created, so
209 // this is the common case for any category seen before. Only fall back
210 // to the write-locking entry() when creating a category's map for the
211 // first time (happens at most once per category per run).
212 let inner: Arc<InnerMap> = if let Some(existing) = self.forward.get(category) {
213 existing.value().clone()
214 } else {
215 self.forward
216 .entry(category.clone())
217 .or_insert_with(|| Arc::new(DashMap::new()))
218 .value()
219 .clone()
220 };
221
222 if let Some(limit) = self.capacity_limit {
223 // Atomically reserve a capacity slot *before* generating the value.
224 // This eliminates the TOCTOU race where multiple threads pass the
225 // capacity check and all insert.
226 loop {
227 let current = self.len.load(Ordering::Acquire);
228 if current >= limit {
229 // One more chance: key may have been inserted by another thread.
230 if let Some(existing) = inner.get(original) {
231 return Ok(existing.value().0.clone());
232 }
233 return Err(SanitizeError::CapacityExceeded { current, limit });
234 }
235 if self
236 .len
237 .compare_exchange_weak(
238 current,
239 current + 1,
240 Ordering::AcqRel,
241 Ordering::Acquire,
242 )
243 .is_ok()
244 {
245 break;
246 }
247 // CAS failed → another thread incremented; retry.
248 }
249
250 // Slot reserved — generate and insert (first-writer-wins).
251 let mut was_inserted = false;
252 let insertion_index = self.len.load(Ordering::Acquire).saturating_sub(1);
253 let result = inner
254 .entry(ZeroizingString(original.to_owned()))
255 .or_insert_with(|| {
256 was_inserted = true;
257 let val = self.generator.generate(category, original);
258 (CompactString::new(val), insertion_index)
259 })
260 .value()
261 .0
262 .clone();
263
264 if !was_inserted {
265 // Another thread inserted first — release our reserved slot.
266 self.len.fetch_sub(1, Ordering::Release);
267 }
268
269 Ok(result)
270 } else {
271 // No capacity limit — generate inside the entry lock so only the
272 // first writer calls the generator (first-writer-wins semantics).
273 let result = inner
274 .entry(ZeroizingString(original.to_owned()))
275 .or_insert_with(|| {
276 let insertion_index = self.len.fetch_add(1, Ordering::AcqRel);
277 let val = self.generator.generate(category, original);
278 (CompactString::new(val), insertion_index)
279 })
280 .value()
281 .0
282 .clone();
283
284 Ok(result)
285 }
286 }
287
288 /// Look up an existing forward mapping without creating one.
289 #[must_use]
290 pub fn forward_lookup(&self, category: &Category, original: &str) -> Option<CompactString> {
291 let inner = self.forward.get(category)?;
292 inner.value().get(original).map(|r| r.value().0.clone())
293 }
294
295 // ---------------- Metrics ----------------
296
297 /// Number of unique mappings in the store.
298 #[must_use]
299 pub fn len(&self) -> usize {
300 self.len.load(Ordering::Relaxed)
301 }
302
303 /// Whether the store is empty.
304 #[must_use]
305 pub fn is_empty(&self) -> bool {
306 self.len() == 0
307 }
308
309 /// Remove all mappings, zeroizing the original plaintexts.
310 ///
311 /// This is useful for resetting the store between runs without
312 /// dropping and recreating it.
313 pub fn clear(&mut self) {
314 // Dropping the map entries triggers ZeroizingString::drop on each inner key.
315 // If any cloned Arcs are still live (e.g., in a concurrent thread's stack),
316 // those inner maps survive until the last Arc drops — but `clear` is only
317 // called after all workers have finished, so this is safe in practice.
318 drop(std::mem::take(&mut self.forward));
319 self.len.store(0, Ordering::Release);
320 }
321
322 // ---------------- Snapshot / diff (for format-preserving pass) ----------------
323
324 /// Snapshot the current insertion count.
325 ///
326 /// Returns an opaque `usize` that can be passed to [`Self::iter_since`] to
327 /// iterate only the entries added *after* this point — useful for
328 /// finding which mappings a structured processor pass discovered without
329 /// building a full `HashSet` of all existing keys.
330 ///
331 /// O(1), no allocation.
332 #[must_use]
333 pub fn snapshot(&self) -> usize {
334 self.len.load(Ordering::Acquire)
335 }
336
337 /// Iterate over entries added at or after the given snapshot.
338 ///
339 /// `snapshot` is the value returned by a previous call to [`Self::snapshot`].
340 /// Entries whose insertion index is ≥ `snapshot` are yielded; older
341 /// entries are skipped. Still O(n) in total store size, but avoids
342 /// allocating a `HashSet` of all prior keys.
343 pub fn iter_since(
344 &self,
345 snapshot: usize,
346 ) -> impl Iterator<Item = (Category, CompactString, CompactString)> + '_ {
347 self.forward.iter().flat_map(move |outer| {
348 let cat = outer.key().clone();
349 outer
350 .value()
351 .iter()
352 .filter_map(move |inner| {
353 let (sanitized, idx) = inner.value();
354 if *idx >= snapshot {
355 Some((
356 cat.clone(),
357 CompactString::new(inner.key().0.as_str()),
358 sanitized.clone(),
359 ))
360 } else {
361 None
362 }
363 })
364 .collect::<Vec<_>>()
365 })
366 }
367
368 // ---------------- Iteration (for external use) ----------------
369
370 /// Iterate over all mappings. Yields `(category, original, sanitized)`.
371 ///
372 /// Note: iteration over `DashMap` is not snapshot-consistent if concurrent
373 /// inserts are happening. Call this after all workers have finished.
374 pub fn iter(&self) -> impl Iterator<Item = (Category, CompactString, CompactString)> + '_ {
375 self.forward.iter().flat_map(|outer| {
376 let cat = outer.key().clone();
377 outer
378 .value()
379 .iter()
380 .map(move |inner| {
381 (
382 cat.clone(),
383 CompactString::new(inner.key().0.as_str()),
384 inner.value().0.clone(),
385 )
386 })
387 .collect::<Vec<_>>()
388 })
389 }
390}
391
392/// Zeroize original keys stored in the forward map on drop.
393/// Dropping the outer `DashMap` triggers `Arc::drop` for each inner map; when
394/// the last Arc drops, `ZeroizingString::drop` runs for every key, overwriting
395/// the plaintext before the memory is freed.
396impl Drop for MappingStore {
397 fn drop(&mut self) {
398 drop(std::mem::take(&mut self.forward));
399 }
400}
401
402/// Compile-time assertion that a type is `Send + Sync`.
403macro_rules! static_assertions_send_sync {
404 ($t:ty) => {
405 const _: fn() = || {
406 fn assert_send<T: Send>() {}
407 fn assert_sync<T: Sync>() {}
408 assert_send::<$t>();
409 assert_sync::<$t>();
410 };
411 };
412}
413
414static_assertions_send_sync!(MappingStore);
415
416// ---------------------------------------------------------------------------
417// Unit tests
418// ---------------------------------------------------------------------------
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423 use crate::generator::{HmacGenerator, RandomGenerator};
424 use std::sync::Arc;
425
426 fn hmac_store(limit: Option<usize>) -> MappingStore {
427 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
428 MappingStore::new(gen, limit)
429 }
430
431 fn random_store() -> MappingStore {
432 let gen = Arc::new(RandomGenerator::new());
433 MappingStore::new(gen, None)
434 }
435
436 // --- Basic operations ---
437
438 #[test]
439 fn insert_and_lookup() {
440 let store = hmac_store(None);
441 let s1 = store
442 .get_or_insert(&Category::Email, "alice@corp.com")
443 .unwrap();
444 assert!(!s1.is_empty());
445 assert!(s1.contains("@corp.com"), "domain must be preserved");
446 assert_eq!(s1.len(), "alice@corp.com".len(), "length must be preserved");
447 assert_eq!(store.len(), 1);
448 }
449
450 #[test]
451 fn same_input_same_output() {
452 let store = hmac_store(None);
453 let s1 = store
454 .get_or_insert(&Category::Email, "alice@corp.com")
455 .unwrap();
456 let s2 = store
457 .get_or_insert(&Category::Email, "alice@corp.com")
458 .unwrap();
459 assert_eq!(s1, s2, "repeated insert must return cached value");
460 assert_eq!(store.len(), 1, "no duplicate entry");
461 }
462
463 #[test]
464 fn different_inputs_different_outputs() {
465 let store = hmac_store(None);
466 let s1 = store
467 .get_or_insert(&Category::Email, "alice@corp.com")
468 .unwrap();
469 let s2 = store
470 .get_or_insert(&Category::Email, "bob@corp.com")
471 .unwrap();
472 assert_ne!(s1, s2);
473 assert_eq!(store.len(), 2);
474 }
475
476 #[test]
477 fn different_categories_different_outputs() {
478 let store = hmac_store(None);
479 let s1 = store.get_or_insert(&Category::Email, "test").unwrap();
480 let s2 = store.get_or_insert(&Category::Name, "test").unwrap();
481 assert_ne!(s1, s2);
482 }
483
484 #[test]
485 fn forward_lookup_works() {
486 let store = hmac_store(None);
487 let sanitized = store.get_or_insert(&Category::IpV4, "192.168.1.1").unwrap();
488 let found = store.forward_lookup(&Category::IpV4, "192.168.1.1");
489 assert_eq!(found, Some(sanitized));
490 }
491
492 #[test]
493 fn forward_lookup_missing() {
494 let store = hmac_store(None);
495 assert!(store.forward_lookup(&Category::Email, "nope").is_none());
496 }
497
498 // --- Capacity limit ---
499
500 #[test]
501 fn capacity_limit_enforced() {
502 let store = hmac_store(Some(2));
503 store.get_or_insert(&Category::Email, "a@a.com").unwrap();
504 store.get_or_insert(&Category::Email, "b@b.com").unwrap();
505 let result = store.get_or_insert(&Category::Email, "c@c.com");
506 assert!(result.is_err());
507 match result.unwrap_err() {
508 SanitizeError::CapacityExceeded {
509 current: 2,
510 limit: 2,
511 } => {}
512 other => panic!("unexpected error: {:?}", other),
513 }
514 }
515
516 #[test]
517 fn capacity_limit_allows_duplicate() {
518 let store = hmac_store(Some(1));
519 store.get_or_insert(&Category::Email, "a@a.com").unwrap();
520 // Re-inserting same value should succeed (fast path).
521 let s2 = store.get_or_insert(&Category::Email, "a@a.com").unwrap();
522 assert!(!s2.is_empty());
523 }
524
525 // --- Random generator within store ---
526
527 #[test]
528 fn random_store_caches() {
529 let store = random_store();
530 let s1 = store
531 .get_or_insert(&Category::Email, "alice@corp.com")
532 .unwrap();
533 let s2 = store
534 .get_or_insert(&Category::Email, "alice@corp.com")
535 .unwrap();
536 assert_eq!(s1, s2, "random store must still cache the first result");
537 }
538
539 // --- Iteration ---
540
541 #[test]
542 fn iter_yields_all_mappings() {
543 let store = hmac_store(None);
544 store.get_or_insert(&Category::Email, "a@a.com").unwrap();
545 store.get_or_insert(&Category::IpV4, "1.2.3.4").unwrap();
546 let collected: Vec<_> = store.iter().collect();
547 assert_eq!(collected.len(), 2);
548 }
549
550 // --- Concurrent inserts (basic smoke test) ---
551
552 #[test]
553 fn concurrent_inserts_no_panic() {
554 use std::sync::Arc;
555 use std::thread;
556
557 let gen = Arc::new(HmacGenerator::new([99u8; 32]));
558 let store = Arc::new(MappingStore::new(gen, None));
559
560 let mut handles = vec![];
561 for t in 0..8 {
562 let store = Arc::clone(&store);
563 handles.push(thread::spawn(move || {
564 for i in 0..1000 {
565 let val = format!("thread{}-val{}", t, i);
566 store.get_or_insert(&Category::Email, &val).unwrap();
567 }
568 }));
569 }
570
571 for h in handles {
572 h.join().unwrap();
573 }
574
575 assert_eq!(store.len(), 8000);
576 }
577
578 #[test]
579 fn concurrent_inserts_same_key_idempotent() {
580 use std::sync::Arc;
581 use std::thread;
582
583 let gen = Arc::new(HmacGenerator::new([7u8; 32]));
584 let store = Arc::new(MappingStore::new(gen, None));
585
586 let mut handles = vec![];
587 for _ in 0..8 {
588 let store = Arc::clone(&store);
589 handles.push(thread::spawn(move || {
590 let mut results = Vec::new();
591 for i in 0..100 {
592 let val = format!("shared-{}", i);
593 let r = store.get_or_insert(&Category::Email, &val).unwrap();
594 results.push((val, r));
595 }
596 results
597 }));
598 }
599
600 let mut all_results: Vec<Vec<(String, CompactString)>> = vec![];
601 for h in handles {
602 all_results.push(h.join().unwrap());
603 }
604
605 // All threads must agree on every mapping.
606 assert_eq!(store.len(), 100);
607 for i in 0..100 {
608 let val = format!("shared-{}", i);
609 let expected = store.forward_lookup(&Category::Email, &val).unwrap();
610 for thread_results in &all_results {
611 let (_, got) = &thread_results[i];
612 assert_eq!(
613 got, &expected,
614 "all threads must see the same mapping for {}",
615 val
616 );
617 }
618 }
619 }
620}