Skip to main content

hermes_core/index/
primary_key.rs

1//! Primary key deduplication index.
2//!
3//! Uses a bloom filter + `FxHashSet` for uncommitted keys to reject duplicates
4//! at `add_document()` time. Committed keys are checked via fast-field
5//! `TextDictReader::ordinal()` (binary search, O(log n)).
6
7use std::sync::Arc;
8
9use rustc_hash::FxHashSet;
10
11use crate::dsl::Field;
12use crate::error::{Error, Result};
13use crate::segment::{SegmentReader, SegmentSnapshot};
14use crate::structures::BloomFilter;
15
16/// Bloom filter sizing: 10 bits/key ≈ 1% false positive rate.
17const BLOOM_BITS_PER_KEY: usize = 10;
18
19/// Extra capacity added to bloom filter beyond known keys.
20const BLOOM_HEADROOM: usize = 100_000;
21
22/// Thread-safe primary key deduplication index.
23///
24/// Sync dedup in the hot path: `BloomFilter::may_contain()`,
25/// `FxHashSet::contains()`, and `TextDictReader::ordinal()` are all sync.
26///
27/// Interior mutability for the mutable state (bloom + uncommitted set) is
28/// behind `parking_lot::Mutex`. The committed readers are only mutated via
29/// `&mut self` methods (commit/abort path), so no lock is needed for them.
30pub struct PrimaryKeyIndex {
31    field: Field,
32    state: parking_lot::Mutex<PrimaryKeyState>,
33    /// Segment readers for checking committed keys.
34    /// Only mutated by `&mut self` methods (refresh/clear) — no lock needed.
35    committed_readers: Vec<Arc<SegmentReader>>,
36    /// Holds ref counts so segments aren't deleted while we hold readers.
37    _snapshot: Option<SegmentSnapshot>,
38}
39
40struct PrimaryKeyState {
41    bloom: BloomFilter,
42    uncommitted: FxHashSet<Vec<u8>>,
43}
44
45impl PrimaryKeyIndex {
46    /// Create a new PrimaryKeyIndex by scanning committed segments.
47    ///
48    /// Iterates each reader's fast-field text dictionary to populate the bloom
49    /// filter with all existing primary key values. The snapshot keeps ref counts
50    /// alive so segments aren't deleted while we hold readers.
51    pub fn new(field: Field, readers: Vec<Arc<SegmentReader>>, snapshot: SegmentSnapshot) -> Self {
52        // Count total unique keys across all segments for bloom sizing.
53        let mut total_keys: usize = 0;
54        for reader in &readers {
55            if let Some(ff) = reader.fast_field(field.0)
56                && let Some(dict) = ff.text_dict()
57            {
58                total_keys += dict.len() as usize;
59            }
60        }
61
62        let mut bloom = BloomFilter::new(total_keys + BLOOM_HEADROOM, BLOOM_BITS_PER_KEY);
63
64        // Insert all committed keys into the bloom filter.
65        for reader in &readers {
66            if let Some(ff) = reader.fast_field(field.0)
67                && let Some(dict) = ff.text_dict()
68            {
69                for key in dict.iter() {
70                    bloom.insert(key.as_bytes());
71                }
72            }
73        }
74
75        let bloom_bytes = bloom.size_bytes();
76        log::info!(
77            "[primary_key] bloom filter: {} keys, {:.2} MB",
78            total_keys,
79            bloom_bytes as f64 / (1024.0 * 1024.0),
80        );
81
82        Self {
83            field,
84            state: parking_lot::Mutex::new(PrimaryKeyState {
85                bloom,
86                uncommitted: FxHashSet::default(),
87            }),
88            committed_readers: readers,
89            _snapshot: Some(snapshot),
90        }
91    }
92
93    /// Memory used by the bloom filter and uncommitted set.
94    pub fn memory_bytes(&self) -> usize {
95        let state = self.state.lock();
96        state.bloom.size_bytes() + state.uncommitted.len() * 32 // estimate 32 bytes per key
97    }
98
99    /// Check whether a document's primary key is unique, and if so, register it.
100    ///
101    /// Returns `Ok(())` if the key is new (inserted into bloom + uncommitted set).
102    /// Returns `Err(DuplicatePrimaryKey)` if the key already exists.
103    /// Returns `Err(Document)` if the primary key field is missing or empty.
104    pub fn check_and_insert(&self, doc: &crate::dsl::Document) -> Result<()> {
105        let value = doc
106            .get_first(self.field)
107            .ok_or_else(|| Error::Document("Missing primary key field".into()))?;
108        let key = value
109            .as_text()
110            .ok_or_else(|| Error::Document("Primary key must be text".into()))?;
111        if key.is_empty() {
112            return Err(Error::Document("Primary key must not be empty".into()));
113        }
114
115        let key_bytes = key.as_bytes();
116
117        {
118            let mut state = self.state.lock();
119
120            // Fast path: bloom says definitely not present → new key.
121            if !state.bloom.may_contain(key_bytes) {
122                state.bloom.insert(key_bytes);
123                state.uncommitted.insert(key_bytes.to_vec());
124                return Ok(());
125            }
126
127            // Bloom positive → check uncommitted set first (fast, in-memory).
128            if state.uncommitted.contains(key_bytes) {
129                return Err(Error::DuplicatePrimaryKey(key.to_string()));
130            }
131        }
132        // Lock released — check committed segments without holding mutex.
133        // committed_readers is immutable (only changed via &mut self methods).
134        for reader in &self.committed_readers {
135            if let Some(ff) = reader.fast_field(self.field.0)
136                && let Some(dict) = ff.text_dict()
137                && dict.ordinal(key).is_some()
138            {
139                return Err(Error::DuplicatePrimaryKey(key.to_string()));
140            }
141        }
142
143        // Re-acquire lock to insert. Re-check uncommitted in case another
144        // thread inserted the same key while we were scanning committed segments.
145        let mut state = self.state.lock();
146        if state.uncommitted.contains(key_bytes) {
147            return Err(Error::DuplicatePrimaryKey(key.to_string()));
148        }
149
150        // Bloom false positive — key is genuinely new.
151        state.bloom.insert(key_bytes);
152        state.uncommitted.insert(key_bytes.to_vec());
153        Ok(())
154    }
155
156    /// Refresh after commit: replace committed readers and clear uncommitted set.
157    ///
158    /// The new readers include the just-committed segments, so their text
159    /// dictionaries already contain the previously-uncommitted keys.
160    /// The snapshot keeps ref counts alive so segments aren't deleted while
161    /// we hold readers to them.
162    pub fn refresh(&mut self, new_readers: Vec<Arc<SegmentReader>>, snapshot: SegmentSnapshot) {
163        self.committed_readers = new_readers;
164        self._snapshot = Some(snapshot);
165        // get_mut() bypasses the mutex — safe because we have &mut self.
166        let state = self.state.get_mut();
167        state.uncommitted.clear();
168    }
169
170    /// Roll back an uncommitted key registration (e.g. when channel send fails
171    /// after check_and_insert succeeded). Bloom may retain the key but that only
172    /// causes harmless false positives, never missed duplicates.
173    pub fn rollback_uncommitted_key(&self, doc: &crate::dsl::Document) {
174        if let Some(value) = doc.get_first(self.field)
175            && let Some(key) = value.as_text()
176        {
177            self.state.lock().uncommitted.remove(key.as_bytes());
178        }
179    }
180
181    /// Clear uncommitted keys (e.g. on abort). Bloom may retain stale entries
182    /// but that only causes harmless false positives (extra committed-segment
183    /// lookups), never missed duplicates.
184    pub fn clear_uncommitted(&mut self) {
185        self.state.get_mut().uncommitted.clear();
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use crate::dsl::{Document, Field};
193    use crate::segment::SegmentTracker;
194
195    fn make_doc(field: Field, key: &str) -> Document {
196        let mut doc = Document::new();
197        doc.add_text(field, key);
198        doc
199    }
200
201    fn empty_snapshot() -> SegmentSnapshot {
202        SegmentSnapshot::new(Arc::new(SegmentTracker::new()), vec![])
203    }
204
205    #[test]
206    fn test_new_empty_readers() {
207        let field = Field(0);
208        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
209        // Should construct without panicking
210        let doc = make_doc(field, "key1");
211        assert!(pk.check_and_insert(&doc).is_ok());
212    }
213
214    #[test]
215    fn test_unique_keys_accepted() {
216        let field = Field(0);
217        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
218
219        assert!(pk.check_and_insert(&make_doc(field, "a")).is_ok());
220        assert!(pk.check_and_insert(&make_doc(field, "b")).is_ok());
221        assert!(pk.check_and_insert(&make_doc(field, "c")).is_ok());
222    }
223
224    #[test]
225    fn test_duplicate_uncommitted_rejected() {
226        let field = Field(0);
227        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
228
229        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
230        let result = pk.check_and_insert(&make_doc(field, "key1"));
231        assert!(result.is_err());
232        match result.unwrap_err() {
233            Error::DuplicatePrimaryKey(k) => assert_eq!(k, "key1"),
234            other => panic!("Expected DuplicatePrimaryKey, got {:?}", other),
235        }
236    }
237
238    #[test]
239    fn test_missing_field_rejected() {
240        let field = Field(0);
241        let other_field = Field(1);
242        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
243
244        // Document has a different field, not the primary key field
245        let doc = make_doc(other_field, "value");
246        let result = pk.check_and_insert(&doc);
247        assert!(result.is_err());
248        match result.unwrap_err() {
249            Error::Document(msg) => assert!(msg.contains("Missing"), "{}", msg),
250            other => panic!("Expected Document error, got {:?}", other),
251        }
252    }
253
254    #[test]
255    fn test_empty_key_rejected() {
256        let field = Field(0);
257        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
258
259        let result = pk.check_and_insert(&make_doc(field, ""));
260        assert!(result.is_err());
261        match result.unwrap_err() {
262            Error::Document(msg) => assert!(msg.contains("empty"), "{}", msg),
263            other => panic!("Expected Document error, got {:?}", other),
264        }
265    }
266
267    #[test]
268    fn test_clear_uncommitted() {
269        let field = Field(0);
270        let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
271
272        // Insert key1
273        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
274        // Duplicate should fail
275        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
276
277        // Clear uncommitted
278        pk.clear_uncommitted();
279
280        // After clear, bloom still has key1 but uncommitted doesn't.
281        // With no committed readers, the key should be allowed again
282        // (bloom positive → check uncommitted (not found) → check committed (empty) → accept)
283        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
284    }
285
286    #[test]
287    fn test_many_unique_keys() {
288        let field = Field(0);
289        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
290
291        for i in 0..1000 {
292            let key = format!("key_{}", i);
293            assert!(pk.check_and_insert(&make_doc(field, &key)).is_ok());
294        }
295
296        // All should be duplicates now
297        for i in 0..1000 {
298            let key = format!("key_{}", i);
299            assert!(pk.check_and_insert(&make_doc(field, &key)).is_err());
300        }
301    }
302
303    #[test]
304    fn test_refresh_clears_uncommitted() {
305        let field = Field(0);
306        let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
307
308        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
309        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
310
311        // Refresh with empty readers (simulates commit where segment readers
312        // don't have fast fields — edge case)
313        pk.refresh(vec![], empty_snapshot());
314
315        // After refresh, uncommitted is cleared and no committed readers have
316        // the key, so it should be accepted again
317        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
318    }
319
320    #[test]
321    fn test_concurrent_access() {
322        use std::sync::Arc;
323
324        let field = Field(0);
325        let pk = Arc::new(PrimaryKeyIndex::new(field, vec![], empty_snapshot()));
326
327        // Spawn multiple threads trying to insert the same key
328        let mut handles = vec![];
329        for _ in 0..10 {
330            let pk = Arc::clone(&pk);
331            handles.push(std::thread::spawn(move || {
332                pk.check_and_insert(&make_doc(field, "contested_key"))
333            }));
334        }
335
336        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
337        let successes = results.iter().filter(|r| r.is_ok()).count();
338        let failures = results.iter().filter(|r| r.is_err()).count();
339
340        // Exactly one thread should succeed, rest should get DuplicatePrimaryKey
341        assert_eq!(successes, 1, "Exactly one insert should succeed");
342        assert_eq!(failures, 9, "Rest should fail with duplicate");
343    }
344}