Skip to main content

hermes_core/index/
primary_key.rs

1//! Primary key deduplication index.
2//!
3//! Uses a bloom filter + `FxHashSet` for uncommitted keys to reject duplicates
4//! at `add_document()` time. Committed keys are checked via fast-field
5//! `TextDictReader::ordinal()` (binary search, O(log n)).
6
7use std::sync::Arc;
8
9use rustc_hash::FxHashSet;
10
11use crate::dsl::Field;
12use crate::error::{Error, Result};
13use crate::segment::{SegmentReader, SegmentSnapshot};
14use crate::structures::BloomFilter;
15
16/// Bloom filter sizing: 10 bits/key ≈ 1% false positive rate.
17const BLOOM_BITS_PER_KEY: usize = 10;
18
19/// Extra capacity added to bloom filter beyond known keys.
20const BLOOM_HEADROOM: usize = 100_000;
21
22/// Thread-safe primary key deduplication index.
23///
24/// Sync dedup in the hot path: `BloomFilter::may_contain()`,
25/// `FxHashSet::contains()`, and `TextDictReader::ordinal()` are all sync.
26///
27/// Interior mutability for the mutable state (bloom + uncommitted set) is
28/// behind `parking_lot::Mutex`. The committed readers are only mutated via
29/// `&mut self` methods (commit/abort path), so no lock is needed for them.
30pub struct PrimaryKeyIndex {
31    field: Field,
32    state: parking_lot::Mutex<PrimaryKeyState>,
33    /// Segment readers for checking committed keys.
34    /// Only mutated by `&mut self` methods (refresh/clear) — no lock needed.
35    committed_readers: Vec<Arc<SegmentReader>>,
36    /// Holds ref counts so segments aren't deleted while we hold readers.
37    _snapshot: Option<SegmentSnapshot>,
38}
39
40struct PrimaryKeyState {
41    bloom: BloomFilter,
42    uncommitted: FxHashSet<Vec<u8>>,
43}
44
45impl PrimaryKeyIndex {
46    /// Create a new PrimaryKeyIndex by scanning committed segments.
47    ///
48    /// Iterates each reader's fast-field text dictionary to populate the bloom
49    /// filter with all existing primary key values. The snapshot keeps ref counts
50    /// alive so segments aren't deleted while we hold readers.
51    pub fn new(field: Field, readers: Vec<Arc<SegmentReader>>, snapshot: SegmentSnapshot) -> Self {
52        // Count total unique keys across all segments for bloom sizing.
53        let mut total_keys: usize = 0;
54        for reader in &readers {
55            if let Some(ff) = reader.fast_field(field.0)
56                && let Some(dict) = ff.text_dict()
57            {
58                total_keys += dict.len() as usize;
59            }
60        }
61
62        let mut bloom = BloomFilter::new(total_keys + BLOOM_HEADROOM, BLOOM_BITS_PER_KEY);
63
64        // Insert all committed keys into the bloom filter.
65        for reader in &readers {
66            if let Some(ff) = reader.fast_field(field.0)
67                && let Some(dict) = ff.text_dict()
68            {
69                for key in dict.iter() {
70                    bloom.insert(key.as_bytes());
71                }
72            }
73        }
74
75        Self {
76            field,
77            state: parking_lot::Mutex::new(PrimaryKeyState {
78                bloom,
79                uncommitted: FxHashSet::default(),
80            }),
81            committed_readers: readers,
82            _snapshot: Some(snapshot),
83        }
84    }
85
86    /// Check whether a document's primary key is unique, and if so, register it.
87    ///
88    /// Returns `Ok(())` if the key is new (inserted into bloom + uncommitted set).
89    /// Returns `Err(DuplicatePrimaryKey)` if the key already exists.
90    /// Returns `Err(Document)` if the primary key field is missing or empty.
91    pub fn check_and_insert(&self, doc: &crate::dsl::Document) -> Result<()> {
92        let value = doc
93            .get_first(self.field)
94            .ok_or_else(|| Error::Document("Missing primary key field".into()))?;
95        let key = value
96            .as_text()
97            .ok_or_else(|| Error::Document("Primary key must be text".into()))?;
98        if key.is_empty() {
99            return Err(Error::Document("Primary key must not be empty".into()));
100        }
101
102        let key_bytes = key.as_bytes();
103
104        {
105            let mut state = self.state.lock();
106
107            // Fast path: bloom says definitely not present → new key.
108            if !state.bloom.may_contain(key_bytes) {
109                state.bloom.insert(key_bytes);
110                state.uncommitted.insert(key_bytes.to_vec());
111                return Ok(());
112            }
113
114            // Bloom positive → check uncommitted set first (fast, in-memory).
115            if state.uncommitted.contains(key_bytes) {
116                return Err(Error::DuplicatePrimaryKey(key.to_string()));
117            }
118        }
119        // Lock released — check committed segments without holding mutex.
120        // committed_readers is immutable (only changed via &mut self methods).
121        for reader in &self.committed_readers {
122            if let Some(ff) = reader.fast_field(self.field.0)
123                && let Some(dict) = ff.text_dict()
124                && dict.ordinal(key).is_some()
125            {
126                return Err(Error::DuplicatePrimaryKey(key.to_string()));
127            }
128        }
129
130        // Re-acquire lock to insert. Re-check uncommitted in case another
131        // thread inserted the same key while we were scanning committed segments.
132        let mut state = self.state.lock();
133        if state.uncommitted.contains(key_bytes) {
134            return Err(Error::DuplicatePrimaryKey(key.to_string()));
135        }
136
137        // Bloom false positive — key is genuinely new.
138        state.bloom.insert(key_bytes);
139        state.uncommitted.insert(key_bytes.to_vec());
140        Ok(())
141    }
142
143    /// Refresh after commit: replace committed readers and clear uncommitted set.
144    ///
145    /// The new readers include the just-committed segments, so their text
146    /// dictionaries already contain the previously-uncommitted keys.
147    /// The snapshot keeps ref counts alive so segments aren't deleted while
148    /// we hold readers to them.
149    pub fn refresh(&mut self, new_readers: Vec<Arc<SegmentReader>>, snapshot: SegmentSnapshot) {
150        self.committed_readers = new_readers;
151        self._snapshot = Some(snapshot);
152        // get_mut() bypasses the mutex — safe because we have &mut self.
153        let state = self.state.get_mut();
154        state.uncommitted.clear();
155    }
156
157    /// Roll back an uncommitted key registration (e.g. when channel send fails
158    /// after check_and_insert succeeded). Bloom may retain the key but that only
159    /// causes harmless false positives, never missed duplicates.
160    pub fn rollback_uncommitted_key(&self, doc: &crate::dsl::Document) {
161        if let Some(value) = doc.get_first(self.field)
162            && let Some(key) = value.as_text()
163        {
164            self.state.lock().uncommitted.remove(key.as_bytes());
165        }
166    }
167
168    /// Clear uncommitted keys (e.g. on abort). Bloom may retain stale entries
169    /// but that only causes harmless false positives (extra committed-segment
170    /// lookups), never missed duplicates.
171    pub fn clear_uncommitted(&mut self) {
172        self.state.get_mut().uncommitted.clear();
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use crate::dsl::{Document, Field};
180    use crate::segment::SegmentTracker;
181
182    fn make_doc(field: Field, key: &str) -> Document {
183        let mut doc = Document::new();
184        doc.add_text(field, key);
185        doc
186    }
187
188    fn empty_snapshot() -> SegmentSnapshot {
189        SegmentSnapshot::new(Arc::new(SegmentTracker::new()), vec![])
190    }
191
192    #[test]
193    fn test_new_empty_readers() {
194        let field = Field(0);
195        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
196        // Should construct without panicking
197        let doc = make_doc(field, "key1");
198        assert!(pk.check_and_insert(&doc).is_ok());
199    }
200
201    #[test]
202    fn test_unique_keys_accepted() {
203        let field = Field(0);
204        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
205
206        assert!(pk.check_and_insert(&make_doc(field, "a")).is_ok());
207        assert!(pk.check_and_insert(&make_doc(field, "b")).is_ok());
208        assert!(pk.check_and_insert(&make_doc(field, "c")).is_ok());
209    }
210
211    #[test]
212    fn test_duplicate_uncommitted_rejected() {
213        let field = Field(0);
214        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
215
216        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
217        let result = pk.check_and_insert(&make_doc(field, "key1"));
218        assert!(result.is_err());
219        match result.unwrap_err() {
220            Error::DuplicatePrimaryKey(k) => assert_eq!(k, "key1"),
221            other => panic!("Expected DuplicatePrimaryKey, got {:?}", other),
222        }
223    }
224
225    #[test]
226    fn test_missing_field_rejected() {
227        let field = Field(0);
228        let other_field = Field(1);
229        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
230
231        // Document has a different field, not the primary key field
232        let doc = make_doc(other_field, "value");
233        let result = pk.check_and_insert(&doc);
234        assert!(result.is_err());
235        match result.unwrap_err() {
236            Error::Document(msg) => assert!(msg.contains("Missing"), "{}", msg),
237            other => panic!("Expected Document error, got {:?}", other),
238        }
239    }
240
241    #[test]
242    fn test_empty_key_rejected() {
243        let field = Field(0);
244        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
245
246        let result = pk.check_and_insert(&make_doc(field, ""));
247        assert!(result.is_err());
248        match result.unwrap_err() {
249            Error::Document(msg) => assert!(msg.contains("empty"), "{}", msg),
250            other => panic!("Expected Document error, got {:?}", other),
251        }
252    }
253
254    #[test]
255    fn test_clear_uncommitted() {
256        let field = Field(0);
257        let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
258
259        // Insert key1
260        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
261        // Duplicate should fail
262        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
263
264        // Clear uncommitted
265        pk.clear_uncommitted();
266
267        // After clear, bloom still has key1 but uncommitted doesn't.
268        // With no committed readers, the key should be allowed again
269        // (bloom positive → check uncommitted (not found) → check committed (empty) → accept)
270        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
271    }
272
273    #[test]
274    fn test_many_unique_keys() {
275        let field = Field(0);
276        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
277
278        for i in 0..1000 {
279            let key = format!("key_{}", i);
280            assert!(pk.check_and_insert(&make_doc(field, &key)).is_ok());
281        }
282
283        // All should be duplicates now
284        for i in 0..1000 {
285            let key = format!("key_{}", i);
286            assert!(pk.check_and_insert(&make_doc(field, &key)).is_err());
287        }
288    }
289
290    #[test]
291    fn test_refresh_clears_uncommitted() {
292        let field = Field(0);
293        let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
294
295        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
296        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
297
298        // Refresh with empty readers (simulates commit where segment readers
299        // don't have fast fields — edge case)
300        pk.refresh(vec![], empty_snapshot());
301
302        // After refresh, uncommitted is cleared and no committed readers have
303        // the key, so it should be accepted again
304        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
305    }
306
307    #[test]
308    fn test_concurrent_access() {
309        use std::sync::Arc;
310
311        let field = Field(0);
312        let pk = Arc::new(PrimaryKeyIndex::new(field, vec![], empty_snapshot()));
313
314        // Spawn multiple threads trying to insert the same key
315        let mut handles = vec![];
316        for _ in 0..10 {
317            let pk = Arc::clone(&pk);
318            handles.push(std::thread::spawn(move || {
319                pk.check_and_insert(&make_doc(field, "contested_key"))
320            }));
321        }
322
323        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
324        let successes = results.iter().filter(|r| r.is_ok()).count();
325        let failures = results.iter().filter(|r| r.is_err()).count();
326
327        // Exactly one thread should succeed, rest should get DuplicatePrimaryKey
328        assert_eq!(successes, 1, "Exactly one insert should succeed");
329        assert_eq!(failures, 9, "Rest should fail with duplicate");
330    }
331}