sanitize_engine/store.rs
1//! Thread-safe, concurrent one-way replacement store.
2//!
3//! # Concurrency Model
4//!
5//! The store uses [`dashmap::DashMap`] — a concurrent hash map with shard-level
6//! locking (default 64 shards). This gives us:
7//!
8//! - **Lock-free reads** for lookups of already-mapped values.
9//! - **Shard-level write locks** that are held only while inserting a new entry.
10//! With 64 shards and 8–16 threads, the probability of two threads contending
11//! on the same shard is very low.
12//! - **Atomic get-or-insert** via the `entry()` API, which prevents TOCTOU races
13//! and guarantees first-writer-wins semantics.
14//!
15//! The forward map is keyed by `(Category, original)` → `sanitized`.
16//! Replacements are **one-way only** — there is no reverse map, no mapping
17//! file, and no restore capability.
18//!
19//! # Memory Characteristics
20//!
21//! At 10M unique values with average key length 20 bytes and average value
22//! length 30 bytes:
23//! - Forward map: 10M × (20 + 30 + ~120 DashMap overhead) ≈ 1.7 GB
24//! - **Total: ~1.7 GB** — acceptable for server workloads.
25//!
26//! An optional `capacity_limit` can be set to prevent unbounded growth.
27
28use crate::category::Category;
29use crate::error::{Result, SanitizeError};
30use crate::generator::ReplacementGenerator;
31use compact_str::CompactString;
32use dashmap::DashMap;
33use std::sync::atomic::{AtomicUsize, Ordering};
34use std::sync::Arc;
35use zeroize::Zeroize;
36
37// ---------------------------------------------------------------------------
38// Composite key for the forward map
39// ---------------------------------------------------------------------------
40
41/// Composite key: `(Category, original_value)`.
42///
43/// Uses `String` for `original` (rather than `CompactString`) so that
44/// the Drop impl can zeroize sensitive plaintext via the `Zeroize`
45/// trait without unsafe code (F-09 fix).
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47struct ForwardKey {
48 category: Category,
49 original: String,
50}
51
52// ---------------------------------------------------------------------------
53// MappingStore
54// ---------------------------------------------------------------------------
55
56/// Thread-safe concurrent one-way replacement store.
57///
58/// Caches forward mappings for per-run consistency (same input always
59/// produces the same output within a run). There is no reverse map,
60/// no journal, and no persistence — replacements are one-way only.
61///
62/// See the [module-level documentation](self) for concurrency and memory details.
63pub struct MappingStore {
64 /// `(category, original) → sanitized`
65 forward: DashMap<ForwardKey, CompactString>,
66 /// Replacement generator (HMAC deterministic or CSPRNG random).
67 generator: Arc<dyn ReplacementGenerator>,
68 /// Current number of mappings (atomic for lock-free reads).
69 len: AtomicUsize,
70 /// Optional upper bound on the number of mappings.
71 capacity_limit: Option<usize>,
72}
73
74impl MappingStore {
75 // ---------------- Construction ----------------
76
77 /// Create a new, empty mapping store.
78 ///
79 /// # Arguments
80 ///
81 /// - `generator` — replacement strategy (HMAC or random).
82 /// - `capacity_limit` — optional max number of unique mappings.
83 #[must_use]
84 pub fn new(generator: Arc<dyn ReplacementGenerator>, capacity_limit: Option<usize>) -> Self {
85 Self {
86 forward: DashMap::with_capacity(1024),
87 generator,
88 len: AtomicUsize::new(0),
89 capacity_limit,
90 }
91 }
92
93 // ---------------- Core API ----------------
94
95 /// Get or create the sanitized replacement for `(category, original)`.
96 ///
97 /// This is the primary API for one-way sanitization.
98 ///
99 /// **Thread-safety:** Uses `DashMap::entry()` which holds a shard-level
100 /// lock only for the duration of the insert closure. The generator is
101 /// called inside the lock, but generation is fast (one HMAC or one RNG
102 /// call). Capacity enforcement uses `compare_exchange` to prevent
103 /// TOCTOU over-insertion (C-1 fix).
104 ///
105 /// **Per-run consistency:** Once a value is mapped, all subsequent
106 /// lookups return the same sanitized value (first-writer-wins).
107 ///
108 /// # Errors
109 ///
110 /// Returns [`SanitizeError::CapacityExceeded`] if the store has
111 /// reached its configured capacity limit.
112 pub fn get_or_insert(&self, category: &Category, original: &str) -> Result<CompactString> {
113 // Fast path: already mapped (lock-free read).
114 let key = ForwardKey {
115 category: category.clone(),
116 original: original.to_owned(),
117 };
118 if let Some(existing) = self.forward.get(&key) {
119 return Ok(existing.value().clone());
120 }
121
122 // Slow path: need to insert.
123 // Atomically reserve a capacity slot *before* generating the
124 // value. This eliminates the TOCTOU race (C-1) where multiple
125 // threads pass the capacity check and all insert.
126 if let Some(limit) = self.capacity_limit {
127 loop {
128 let current = self.len.load(Ordering::Acquire);
129 if current >= limit {
130 // One more chance: key may have been inserted by
131 // another thread while we were checking.
132 if let Some(existing) = self.forward.get(&key) {
133 return Ok(existing.value().clone());
134 }
135 return Err(SanitizeError::CapacityExceeded { current, limit });
136 }
137 // Try to reserve a slot atomically.
138 if self
139 .len
140 .compare_exchange_weak(
141 current,
142 current + 1,
143 Ordering::AcqRel,
144 Ordering::Acquire,
145 )
146 .is_ok()
147 {
148 break;
149 }
150 // CAS failed → another thread incremented; retry.
151 }
152
153 // Slot reserved — generate and insert.
154 // Use entry() for first-writer-wins semantics.
155 let mut was_inserted = false;
156 let result = self
157 .forward
158 .entry(key)
159 .or_insert_with(|| {
160 was_inserted = true;
161 let val = self.generator.generate(category, original);
162 CompactString::new(val)
163 })
164 .value()
165 .clone();
166
167 if !was_inserted {
168 // Another thread inserted first — release our reserved slot.
169 self.len.fetch_sub(1, Ordering::Release);
170 }
171
172 Ok(result)
173 } else {
174 // No capacity limit — generate inside the entry lock to
175 // avoid wasted work (C-2 fix: only the first writer generates).
176 let mut was_inserted = false;
177 let result = self
178 .forward
179 .entry(key)
180 .or_insert_with(|| {
181 was_inserted = true;
182 let val = self.generator.generate(category, original);
183 CompactString::new(val)
184 })
185 .value()
186 .clone();
187
188 if was_inserted {
189 self.len.fetch_add(1, Ordering::Release);
190 }
191
192 Ok(result)
193 }
194 }
195
196 /// Look up an existing forward mapping without creating one.
197 #[must_use]
198 pub fn forward_lookup(&self, category: &Category, original: &str) -> Option<CompactString> {
199 let key = ForwardKey {
200 category: category.clone(),
201 original: original.to_owned(),
202 };
203 self.forward.get(&key).map(|r| r.value().clone())
204 }
205
206 // ---------------- Metrics ----------------
207
208 /// Number of unique mappings in the store.
209 #[must_use]
210 pub fn len(&self) -> usize {
211 self.len.load(Ordering::Relaxed)
212 }
213
214 /// Whether the store is empty.
215 #[must_use]
216 pub fn is_empty(&self) -> bool {
217 self.len() == 0
218 }
219
220 /// Remove all mappings, zeroizing the original plaintexts.
221 ///
222 /// This is useful for resetting the store between runs without
223 /// dropping and recreating it.
224 pub fn clear(&mut self) {
225 let old_map = std::mem::take(&mut self.forward);
226 for (mut key, _value) in old_map {
227 key.original.zeroize();
228 }
229 self.len.store(0, Ordering::Release);
230 }
231
232 // ---------------- Snapshot / diff (for format-preserving pass) ----------------
233
234 /// Snapshot the set of `(category, original)` keys currently in the store.
235 ///
236 /// Call this before structured processing; diff against `iter()` afterwards
237 /// to find which mappings were added by the processor.
238 pub fn snapshot_keys(&self) -> std::collections::HashSet<(Category, String)> {
239 self.forward
240 .iter()
241 .map(|e| (e.key().category.clone(), e.key().original.clone()))
242 .collect()
243 }
244
245 // ---------------- Iteration (for external use) ----------------
246
247 /// Iterate over all mappings. Yields `(category, original, sanitized)`.
248 ///
249 /// Note: iteration over `DashMap` is not snapshot-consistent if concurrent
250 /// inserts are happening. Call this after all workers have finished.
251 pub fn iter(&self) -> impl Iterator<Item = (Category, CompactString, CompactString)> + '_ {
252 self.forward.iter().map(|entry| {
253 (
254 entry.key().category.clone(),
255 CompactString::new(&entry.key().original),
256 entry.value().clone(),
257 )
258 })
259 }
260}
261
262/// F-09 fix: zeroize original keys stored in the forward map on drop.
263/// This prevents sensitive plaintext values from lingering on the heap
264/// after the store is no longer needed. Uses safe Zeroize on Strings.
265impl Drop for MappingStore {
266 fn drop(&mut self) {
267 // DashMap::retain() visits each entry. We use it to overwrite
268 // the original plaintext before the entry is dropped.
269 // retain() gives us (&K, &mut V); the key is behind a shared
270 // reference. We extract the original, create a zeroizing copy,
271 // then clear the map. This ensures the String backing buffer
272 // is zeroed before being freed.
273 //
274 // Approach: swap the map contents with an empty map, consuming
275 // ownership of all entries via IntoIter.
276 let old_map = std::mem::take(&mut self.forward);
277 for (mut key, _value) in old_map {
278 key.original.zeroize();
279 }
280 }
281}
282
283/// Compile-time assertion that a type is `Send + Sync`.
284macro_rules! static_assertions_send_sync {
285 ($t:ty) => {
286 const _: fn() = || {
287 fn assert_send<T: Send>() {}
288 fn assert_sync<T: Sync>() {}
289 assert_send::<$t>();
290 assert_sync::<$t>();
291 };
292 };
293}
294
295// DashMap is Send + Sync, AtomicUsize is Send + Sync,
296// Arc<dyn ReplacementGenerator> is Send + Sync.
297// This is satisfied automatically, but let's assert it:
298static_assertions_send_sync!(MappingStore);
299
300// ---------------------------------------------------------------------------
301// Unit tests
302// ---------------------------------------------------------------------------
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307 use crate::generator::{HmacGenerator, RandomGenerator};
308 use std::sync::Arc;
309
310 fn hmac_store(limit: Option<usize>) -> MappingStore {
311 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
312 MappingStore::new(gen, limit)
313 }
314
315 fn random_store() -> MappingStore {
316 let gen = Arc::new(RandomGenerator::new());
317 MappingStore::new(gen, None)
318 }
319
320 // --- Basic operations ---
321
322 #[test]
323 fn insert_and_lookup() {
324 let store = hmac_store(None);
325 let s1 = store
326 .get_or_insert(&Category::Email, "alice@corp.com")
327 .unwrap();
328 assert!(!s1.is_empty());
329 assert!(s1.contains("@corp.com"), "domain must be preserved");
330 assert_eq!(s1.len(), "alice@corp.com".len(), "length must be preserved");
331 assert_eq!(store.len(), 1);
332 }
333
334 #[test]
335 fn same_input_same_output() {
336 let store = hmac_store(None);
337 let s1 = store
338 .get_or_insert(&Category::Email, "alice@corp.com")
339 .unwrap();
340 let s2 = store
341 .get_or_insert(&Category::Email, "alice@corp.com")
342 .unwrap();
343 assert_eq!(s1, s2, "repeated insert must return cached value");
344 assert_eq!(store.len(), 1, "no duplicate entry");
345 }
346
347 #[test]
348 fn different_inputs_different_outputs() {
349 let store = hmac_store(None);
350 let s1 = store
351 .get_or_insert(&Category::Email, "alice@corp.com")
352 .unwrap();
353 let s2 = store
354 .get_or_insert(&Category::Email, "bob@corp.com")
355 .unwrap();
356 assert_ne!(s1, s2);
357 assert_eq!(store.len(), 2);
358 }
359
360 #[test]
361 fn different_categories_different_outputs() {
362 let store = hmac_store(None);
363 let s1 = store.get_or_insert(&Category::Email, "test").unwrap();
364 let s2 = store.get_or_insert(&Category::Name, "test").unwrap();
365 assert_ne!(s1, s2);
366 }
367
368 #[test]
369 fn forward_lookup_works() {
370 let store = hmac_store(None);
371 let sanitized = store.get_or_insert(&Category::IpV4, "192.168.1.1").unwrap();
372 let found = store.forward_lookup(&Category::IpV4, "192.168.1.1");
373 assert_eq!(found, Some(sanitized));
374 }
375
376 #[test]
377 fn forward_lookup_missing() {
378 let store = hmac_store(None);
379 assert!(store.forward_lookup(&Category::Email, "nope").is_none());
380 }
381
382 // --- Capacity limit ---
383
384 #[test]
385 fn capacity_limit_enforced() {
386 let store = hmac_store(Some(2));
387 store.get_or_insert(&Category::Email, "a@a.com").unwrap();
388 store.get_or_insert(&Category::Email, "b@b.com").unwrap();
389 let result = store.get_or_insert(&Category::Email, "c@c.com");
390 assert!(result.is_err());
391 match result.unwrap_err() {
392 SanitizeError::CapacityExceeded {
393 current: 2,
394 limit: 2,
395 } => {}
396 other => panic!("unexpected error: {:?}", other),
397 }
398 }
399
400 #[test]
401 fn capacity_limit_allows_duplicate() {
402 let store = hmac_store(Some(1));
403 store.get_or_insert(&Category::Email, "a@a.com").unwrap();
404 // Re-inserting same value should succeed (fast path).
405 let s2 = store.get_or_insert(&Category::Email, "a@a.com").unwrap();
406 assert!(!s2.is_empty());
407 }
408
409 // --- Random generator within store ---
410
411 #[test]
412 fn random_store_caches() {
413 let store = random_store();
414 let s1 = store
415 .get_or_insert(&Category::Email, "alice@corp.com")
416 .unwrap();
417 let s2 = store
418 .get_or_insert(&Category::Email, "alice@corp.com")
419 .unwrap();
420 assert_eq!(s1, s2, "random store must still cache the first result");
421 }
422
423 // --- Iteration ---
424
425 #[test]
426 fn iter_yields_all_mappings() {
427 let store = hmac_store(None);
428 store.get_or_insert(&Category::Email, "a@a.com").unwrap();
429 store.get_or_insert(&Category::IpV4, "1.2.3.4").unwrap();
430 let collected: Vec<_> = store.iter().collect();
431 assert_eq!(collected.len(), 2);
432 }
433
434 // --- Concurrent inserts (basic smoke test) ---
435
436 #[test]
437 fn concurrent_inserts_no_panic() {
438 use std::sync::Arc;
439 use std::thread;
440
441 let gen = Arc::new(HmacGenerator::new([99u8; 32]));
442 let store = Arc::new(MappingStore::new(gen, None));
443
444 let mut handles = vec![];
445 for t in 0..8 {
446 let store = Arc::clone(&store);
447 handles.push(thread::spawn(move || {
448 for i in 0..1000 {
449 let val = format!("thread{}-val{}", t, i);
450 store.get_or_insert(&Category::Email, &val).unwrap();
451 }
452 }));
453 }
454
455 for h in handles {
456 h.join().unwrap();
457 }
458
459 assert_eq!(store.len(), 8000);
460 }
461
462 #[test]
463 fn concurrent_inserts_same_key_idempotent() {
464 use std::sync::Arc;
465 use std::thread;
466
467 let gen = Arc::new(HmacGenerator::new([7u8; 32]));
468 let store = Arc::new(MappingStore::new(gen, None));
469
470 let mut handles = vec![];
471 for _ in 0..8 {
472 let store = Arc::clone(&store);
473 handles.push(thread::spawn(move || {
474 let mut results = Vec::new();
475 for i in 0..100 {
476 let val = format!("shared-{}", i);
477 let r = store.get_or_insert(&Category::Email, &val).unwrap();
478 results.push((val, r));
479 }
480 results
481 }));
482 }
483
484 let mut all_results: Vec<Vec<(String, CompactString)>> = vec![];
485 for h in handles {
486 all_results.push(h.join().unwrap());
487 }
488
489 // All threads must agree on every mapping.
490 assert_eq!(store.len(), 100);
491 for i in 0..100 {
492 let val = format!("shared-{}", i);
493 let expected = store.forward_lookup(&Category::Email, &val).unwrap();
494 for thread_results in &all_results {
495 let (_, got) = &thread_results[i];
496 assert_eq!(
497 got, &expected,
498 "all threads must see the same mapping for {}",
499 val
500 );
501 }
502 }
503 }
504}