sanitize_engine/store.rs
1//! Thread-safe, concurrent one-way replacement store.
2//!
3//! # Concurrency Model
4//!
5//! The store uses [`dashmap::DashMap`] — a concurrent hash map with shard-level
6//! locking (default 64 shards). This gives us:
7//!
8//! - **Lock-free reads** for lookups of already-mapped values.
9//! - **Shard-level write locks** that are held only while inserting a new entry.
10//! With 64 shards and 8–16 threads, the probability of two threads contending
11//! on the same shard is very low.
12//! - **Atomic get-or-insert** via the `entry()` API, which prevents TOCTOU races
13//! and guarantees first-writer-wins semantics.
14//!
15//! The forward map is keyed by `(Category, original)` → `sanitized`.
16//! Replacements are **one-way only** — there is no reverse map, no mapping
17//! file, and no restore capability.
18//!
19//! # Memory Characteristics
20//!
21//! At 10M unique values with average key length 20 bytes and average value
22//! length 30 bytes:
23//! - Forward map: 10M × (20 + 30 + ~120 DashMap overhead) ≈ 1.7 GB
24//! - **Total: ~1.7 GB** — acceptable for server workloads.
25//!
26//! An optional `capacity_limit` can be set to prevent unbounded growth.
27
28use crate::category::Category;
29use crate::error::{Result, SanitizeError};
30use crate::generator::ReplacementGenerator;
31use compact_str::CompactString;
32use dashmap::DashMap;
33use std::sync::atomic::{AtomicUsize, Ordering};
34use std::sync::Arc;
35use zeroize::Zeroize;
36
37// ---------------------------------------------------------------------------
38// Composite key for the forward map
39// ---------------------------------------------------------------------------
40
41/// Composite key: `(Category, original_value)`.
42///
43/// Uses `String` for `original` (rather than `CompactString`) so that
44/// the Drop impl can zeroize sensitive plaintext via the `Zeroize`
45/// trait without unsafe code (F-09 fix).
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47struct ForwardKey {
48 category: Category,
49 original: String,
50}
51
52// ---------------------------------------------------------------------------
53// MappingStore
54// ---------------------------------------------------------------------------
55
56/// Thread-safe concurrent one-way replacement store.
57///
58/// Caches forward mappings for per-run consistency (same input always
59/// produces the same output within a run). There is no reverse map,
60/// no journal, and no persistence — replacements are one-way only.
61///
62/// See the [module-level documentation](self) for concurrency and memory details.
63pub struct MappingStore {
64 /// `(category, original) → sanitized`
65 forward: DashMap<ForwardKey, CompactString>,
66 /// Replacement generator (HMAC deterministic or CSPRNG random).
67 generator: Arc<dyn ReplacementGenerator>,
68 /// Current number of mappings (atomic for lock-free reads).
69 len: AtomicUsize,
70 /// Optional upper bound on the number of mappings.
71 capacity_limit: Option<usize>,
72}
73
74impl MappingStore {
75 // ---------------- Construction ----------------
76
77 /// Create a new, empty mapping store.
78 ///
79 /// # Arguments
80 ///
81 /// - `generator` — replacement strategy (HMAC or random).
82 /// - `capacity_limit` — optional max number of unique mappings.
83 #[must_use]
84 pub fn new(generator: Arc<dyn ReplacementGenerator>, capacity_limit: Option<usize>) -> Self {
85 Self {
86 forward: DashMap::with_capacity(1024),
87 generator,
88 len: AtomicUsize::new(0),
89 capacity_limit,
90 }
91 }
92
93 /// Create a store pre-sized for `expected` entries (avoids rehashing).
94 #[must_use]
95 pub fn with_expected_capacity(
96 generator: Arc<dyn ReplacementGenerator>,
97 capacity_limit: Option<usize>,
98 expected: usize,
99 ) -> Self {
100 Self {
101 forward: DashMap::with_capacity(expected),
102 generator,
103 len: AtomicUsize::new(0),
104 capacity_limit,
105 }
106 }
107
108 // ---------------- Core API ----------------
109
110 /// Get or create the sanitized replacement for `(category, original)`.
111 ///
112 /// This is the primary API for one-way sanitization.
113 ///
114 /// **Thread-safety:** Uses `DashMap::entry()` which holds a shard-level
115 /// lock only for the duration of the insert closure. The generator is
116 /// called inside the lock, but generation is fast (one HMAC or one RNG
117 /// call). Capacity enforcement uses `compare_exchange` to prevent
118 /// TOCTOU over-insertion (C-1 fix).
119 ///
120 /// **Per-run consistency:** Once a value is mapped, all subsequent
121 /// lookups return the same sanitized value (first-writer-wins).
122 ///
123 /// # Errors
124 ///
125 /// Returns [`SanitizeError::CapacityExceeded`] if the store has
126 /// reached its configured capacity limit.
127 pub fn get_or_insert(&self, category: &Category, original: &str) -> Result<CompactString> {
128 // Fast path: already mapped (lock-free read).
129 let key = ForwardKey {
130 category: category.clone(),
131 original: original.to_owned(),
132 };
133 if let Some(existing) = self.forward.get(&key) {
134 return Ok(existing.value().clone());
135 }
136
137 // Slow path: need to insert.
138 // Atomically reserve a capacity slot *before* generating the
139 // value. This eliminates the TOCTOU race (C-1) where multiple
140 // threads pass the capacity check and all insert.
141 if let Some(limit) = self.capacity_limit {
142 loop {
143 let current = self.len.load(Ordering::Acquire);
144 if current >= limit {
145 // One more chance: key may have been inserted by
146 // another thread while we were checking.
147 if let Some(existing) = self.forward.get(&key) {
148 return Ok(existing.value().clone());
149 }
150 return Err(SanitizeError::CapacityExceeded { current, limit });
151 }
152 // Try to reserve a slot atomically.
153 if self
154 .len
155 .compare_exchange_weak(
156 current,
157 current + 1,
158 Ordering::AcqRel,
159 Ordering::Acquire,
160 )
161 .is_ok()
162 {
163 break;
164 }
165 // CAS failed → another thread incremented; retry.
166 }
167
168 // Slot reserved — generate and insert.
169 // Use entry() for first-writer-wins semantics.
170 let mut was_inserted = false;
171 let result = self
172 .forward
173 .entry(key)
174 .or_insert_with(|| {
175 was_inserted = true;
176 let val = self.generator.generate(category, original);
177 CompactString::new(val)
178 })
179 .value()
180 .clone();
181
182 if !was_inserted {
183 // Another thread inserted first — release our reserved slot.
184 self.len.fetch_sub(1, Ordering::Release);
185 }
186
187 Ok(result)
188 } else {
189 // No capacity limit — generate inside the entry lock to
190 // avoid wasted work (C-2 fix: only the first writer generates).
191 let mut was_inserted = false;
192 let result = self
193 .forward
194 .entry(key)
195 .or_insert_with(|| {
196 was_inserted = true;
197 let val = self.generator.generate(category, original);
198 CompactString::new(val)
199 })
200 .value()
201 .clone();
202
203 if was_inserted {
204 self.len.fetch_add(1, Ordering::Release);
205 }
206
207 Ok(result)
208 }
209 }
210
211 /// Look up an existing forward mapping without creating one.
212 pub fn forward_lookup(&self, category: &Category, original: &str) -> Option<CompactString> {
213 let key = ForwardKey {
214 category: category.clone(),
215 original: original.to_owned(),
216 };
217 self.forward.get(&key).map(|r| r.value().clone())
218 }
219
220 // ---------------- Metrics ----------------
221
222 /// Number of unique mappings in the store.
223 #[must_use]
224 pub fn len(&self) -> usize {
225 self.len.load(Ordering::Relaxed)
226 }
227
228 /// Whether the store is empty.
229 #[must_use]
230 pub fn is_empty(&self) -> bool {
231 self.len() == 0
232 }
233
234 /// Remove all mappings, zeroizing the original plaintexts.
235 ///
236 /// This is useful for resetting the store between runs without
237 /// dropping and recreating it.
238 pub fn clear(&mut self) {
239 let old_map = std::mem::take(&mut self.forward);
240 for (mut key, _value) in old_map {
241 key.original.zeroize();
242 }
243 self.len.store(0, Ordering::Release);
244 }
245
246 // ---------------- Iteration (for external use) ----------------
247
248 /// Iterate over all mappings. Yields `(category, original, sanitized)`.
249 ///
250 /// Note: iteration over `DashMap` is not snapshot-consistent if concurrent
251 /// inserts are happening. Call this after all workers have finished.
252 pub fn iter(&self) -> impl Iterator<Item = (Category, CompactString, CompactString)> + '_ {
253 self.forward.iter().map(|entry| {
254 (
255 entry.key().category.clone(),
256 CompactString::new(&entry.key().original),
257 entry.value().clone(),
258 )
259 })
260 }
261}
262
263/// F-09 fix: zeroize original keys stored in the forward map on drop.
264/// This prevents sensitive plaintext values from lingering on the heap
265/// after the store is no longer needed. Uses safe Zeroize on Strings.
266impl Drop for MappingStore {
267 fn drop(&mut self) {
268 // DashMap::retain() visits each entry. We use it to overwrite
269 // the original plaintext before the entry is dropped.
270 // retain() gives us (&K, &mut V); the key is behind a shared
271 // reference. We extract the original, create a zeroizing copy,
272 // then clear the map. This ensures the String backing buffer
273 // is zeroed before being freed.
274 //
275 // Approach: swap the map contents with an empty map, consuming
276 // ownership of all entries via IntoIter.
277 let old_map = std::mem::take(&mut self.forward);
278 for (mut key, _value) in old_map {
279 key.original.zeroize();
280 }
281 }
282}
283
284/// Compile-time assertion that a type is `Send + Sync`.
285macro_rules! static_assertions_send_sync {
286 ($t:ty) => {
287 const _: fn() = || {
288 fn assert_send<T: Send>() {}
289 fn assert_sync<T: Sync>() {}
290 assert_send::<$t>();
291 assert_sync::<$t>();
292 };
293 };
294}
295
296// DashMap is Send + Sync, AtomicUsize is Send + Sync,
297// Arc<dyn ReplacementGenerator> is Send + Sync.
298// This is satisfied automatically, but let's assert it:
299static_assertions_send_sync!(MappingStore);
300
301// ---------------------------------------------------------------------------
302// Unit tests
303// ---------------------------------------------------------------------------
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use crate::generator::{HmacGenerator, RandomGenerator};
309 use std::sync::Arc;
310
311 fn hmac_store(limit: Option<usize>) -> MappingStore {
312 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
313 MappingStore::new(gen, limit)
314 }
315
316 fn random_store() -> MappingStore {
317 let gen = Arc::new(RandomGenerator::new());
318 MappingStore::new(gen, None)
319 }
320
321 // --- Basic operations ---
322
323 #[test]
324 fn insert_and_lookup() {
325 let store = hmac_store(None);
326 let s1 = store
327 .get_or_insert(&Category::Email, "alice@corp.com")
328 .unwrap();
329 assert!(!s1.is_empty());
330 assert!(s1.contains("@corp.com"), "domain must be preserved");
331 assert_eq!(s1.len(), "alice@corp.com".len(), "length must be preserved");
332 assert_eq!(store.len(), 1);
333 }
334
335 #[test]
336 fn same_input_same_output() {
337 let store = hmac_store(None);
338 let s1 = store
339 .get_or_insert(&Category::Email, "alice@corp.com")
340 .unwrap();
341 let s2 = store
342 .get_or_insert(&Category::Email, "alice@corp.com")
343 .unwrap();
344 assert_eq!(s1, s2, "repeated insert must return cached value");
345 assert_eq!(store.len(), 1, "no duplicate entry");
346 }
347
348 #[test]
349 fn different_inputs_different_outputs() {
350 let store = hmac_store(None);
351 let s1 = store
352 .get_or_insert(&Category::Email, "alice@corp.com")
353 .unwrap();
354 let s2 = store
355 .get_or_insert(&Category::Email, "bob@corp.com")
356 .unwrap();
357 assert_ne!(s1, s2);
358 assert_eq!(store.len(), 2);
359 }
360
361 #[test]
362 fn different_categories_different_outputs() {
363 let store = hmac_store(None);
364 let s1 = store.get_or_insert(&Category::Email, "test").unwrap();
365 let s2 = store.get_or_insert(&Category::Name, "test").unwrap();
366 assert_ne!(s1, s2);
367 }
368
369 #[test]
370 fn forward_lookup_works() {
371 let store = hmac_store(None);
372 let sanitized = store.get_or_insert(&Category::IpV4, "192.168.1.1").unwrap();
373 let found = store.forward_lookup(&Category::IpV4, "192.168.1.1");
374 assert_eq!(found, Some(sanitized));
375 }
376
377 #[test]
378 fn forward_lookup_missing() {
379 let store = hmac_store(None);
380 assert!(store.forward_lookup(&Category::Email, "nope").is_none());
381 }
382
383 // --- Capacity limit ---
384
385 #[test]
386 fn capacity_limit_enforced() {
387 let store = hmac_store(Some(2));
388 store.get_or_insert(&Category::Email, "a@a.com").unwrap();
389 store.get_or_insert(&Category::Email, "b@b.com").unwrap();
390 let result = store.get_or_insert(&Category::Email, "c@c.com");
391 assert!(result.is_err());
392 match result.unwrap_err() {
393 SanitizeError::CapacityExceeded {
394 current: 2,
395 limit: 2,
396 } => {}
397 other => panic!("unexpected error: {:?}", other),
398 }
399 }
400
401 #[test]
402 fn capacity_limit_allows_duplicate() {
403 let store = hmac_store(Some(1));
404 store.get_or_insert(&Category::Email, "a@a.com").unwrap();
405 // Re-inserting same value should succeed (fast path).
406 let s2 = store.get_or_insert(&Category::Email, "a@a.com").unwrap();
407 assert!(!s2.is_empty());
408 }
409
410 // --- Random generator within store ---
411
412 #[test]
413 fn random_store_caches() {
414 let store = random_store();
415 let s1 = store
416 .get_or_insert(&Category::Email, "alice@corp.com")
417 .unwrap();
418 let s2 = store
419 .get_or_insert(&Category::Email, "alice@corp.com")
420 .unwrap();
421 assert_eq!(s1, s2, "random store must still cache the first result");
422 }
423
424 // --- Iteration ---
425
426 #[test]
427 fn iter_yields_all_mappings() {
428 let store = hmac_store(None);
429 store.get_or_insert(&Category::Email, "a@a.com").unwrap();
430 store.get_or_insert(&Category::IpV4, "1.2.3.4").unwrap();
431 let collected: Vec<_> = store.iter().collect();
432 assert_eq!(collected.len(), 2);
433 }
434
435 // --- Concurrent inserts (basic smoke test) ---
436
437 #[test]
438 fn concurrent_inserts_no_panic() {
439 use std::sync::Arc;
440 use std::thread;
441
442 let gen = Arc::new(HmacGenerator::new([99u8; 32]));
443 let store = Arc::new(MappingStore::new(gen, None));
444
445 let mut handles = vec![];
446 for t in 0..8 {
447 let store = Arc::clone(&store);
448 handles.push(thread::spawn(move || {
449 for i in 0..1000 {
450 let val = format!("thread{}-val{}", t, i);
451 store.get_or_insert(&Category::Email, &val).unwrap();
452 }
453 }));
454 }
455
456 for h in handles {
457 h.join().unwrap();
458 }
459
460 assert_eq!(store.len(), 8000);
461 }
462
463 #[test]
464 fn concurrent_inserts_same_key_idempotent() {
465 use std::sync::Arc;
466 use std::thread;
467
468 let gen = Arc::new(HmacGenerator::new([7u8; 32]));
469 let store = Arc::new(MappingStore::new(gen, None));
470
471 let mut handles = vec![];
472 for _ in 0..8 {
473 let store = Arc::clone(&store);
474 handles.push(thread::spawn(move || {
475 let mut results = Vec::new();
476 for i in 0..100 {
477 let val = format!("shared-{}", i);
478 let r = store.get_or_insert(&Category::Email, &val).unwrap();
479 results.push((val, r));
480 }
481 results
482 }));
483 }
484
485 let mut all_results: Vec<Vec<(String, CompactString)>> = vec![];
486 for h in handles {
487 all_results.push(h.join().unwrap());
488 }
489
490 // All threads must agree on every mapping.
491 assert_eq!(store.len(), 100);
492 for i in 0..100 {
493 let val = format!("shared-{}", i);
494 let expected = store.forward_lookup(&Category::Email, &val).unwrap();
495 for thread_results in &all_results {
496 let (_, got) = &thread_results[i];
497 assert_eq!(
498 got, &expected,
499 "all threads must see the same mapping for {}",
500 val
501 );
502 }
503 }
504 }
505}