Skip to main content

hermes_core/index/
primary_key.rs

1//! Primary key deduplication index.
2//!
3//! Uses a bloom filter + `FxHashSet` for uncommitted keys to reject duplicates
4//! at `add_document()` time. Committed keys are checked via fast-field
5//! `TextDictReader::ordinal()` (binary search, O(log n)).
6//!
7//! The bloom filter is persisted to `pk_bloom.bin` so that restarts don't need
8//! to re-iterate every committed key. On load, only keys from segments that
9//! appeared since the last persist are iterated.
10
11use std::collections::HashSet;
12
13use byteorder::{LittleEndian, WriteBytesExt};
14use rustc_hash::{FxHashMap, FxHashSet};
15
16use crate::dsl::Field;
17use crate::error::{Error, Result};
18use crate::segment::SegmentSnapshot;
19use crate::structures::BloomFilter;
20
21/// Bloom filter sizing: 10 bits/key ≈ 1% false positive rate.
22const BLOOM_BITS_PER_KEY: usize = 10;
23
24/// Extra capacity added to bloom filter beyond known keys.
25const BLOOM_HEADROOM: usize = 100_000;
26
27/// File name for the persisted primary-key bloom filter.
28pub const PK_BLOOM_FILE: &str = "pk_bloom.bin";
29
30/// Magic bytes for the persisted bloom file.
31const PK_BLOOM_MAGIC: u32 = 0x504B424C; // "PKBL"
32
33/// Lightweight per-segment data for primary key lookups.
34///
35/// Only holds fast-field readers (text dictionaries), not full `SegmentReader`s.
36/// This avoids loading DimensionTables, SSTable FSTs, bloom filters, etc.
37pub struct PkSegmentData {
38    pub segment_id: String,
39    pub fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
40}
41
42/// Thread-safe primary key deduplication index.
43///
44/// Sync dedup in the hot path: `BloomFilter::may_contain()`,
45/// `FxHashSet::contains()`, and `TextDictReader::ordinal()` are all sync.
46///
47/// Interior mutability for the mutable state (bloom + uncommitted set) is
48/// behind `parking_lot::Mutex`. The committed data is only mutated via
49/// `&mut self` methods (commit/abort path), so no lock is needed for it.
50pub struct PrimaryKeyIndex {
51    field: Field,
52    state: parking_lot::Mutex<PrimaryKeyState>,
53    /// Lightweight per-segment fast-field data for checking committed keys.
54    /// Only mutated by `&mut self` methods (refresh/clear) — no lock needed.
55    committed_data: Vec<PkSegmentData>,
56    /// Holds ref counts so segments aren't deleted while we hold readers.
57    _snapshot: Option<SegmentSnapshot>,
58}
59
60struct PrimaryKeyState {
61    bloom: BloomFilter,
62    uncommitted: FxHashSet<Vec<u8>>,
63}
64
65impl PrimaryKeyIndex {
66    /// Create a new PrimaryKeyIndex by scanning committed segments.
67    ///
68    /// Iterates each segment's fast-field text dictionary to populate the bloom
69    /// filter with all existing primary key values. The snapshot keeps ref counts
70    /// alive so segments aren't deleted while we hold data.
71    ///
72    /// **CPU-intensive** — call from `spawn_blocking`, not the async runtime.
73    pub fn new(field: Field, pk_data: Vec<PkSegmentData>, snapshot: SegmentSnapshot) -> Self {
74        // Count total unique keys across all segments for bloom sizing.
75        let mut total_keys: usize = 0;
76        for data in &pk_data {
77            if let Some(ff) = data.fast_fields.get(&field.0)
78                && let Some(dict) = ff.text_dict()
79            {
80                total_keys += dict.len() as usize;
81            }
82        }
83
84        let mut bloom = BloomFilter::new(total_keys + BLOOM_HEADROOM, BLOOM_BITS_PER_KEY);
85
86        // Insert all committed keys into the bloom filter.
87        for data in &pk_data {
88            if let Some(ff) = data.fast_fields.get(&field.0)
89                && let Some(dict) = ff.text_dict()
90            {
91                for key in dict.iter() {
92                    bloom.insert(key.as_bytes());
93                }
94            }
95        }
96
97        let bloom_bytes = bloom.size_bytes();
98        log::info!(
99            "[primary_key] bloom filter: {} keys, {:.2} MB",
100            total_keys,
101            bloom_bytes as f64 / (1024.0 * 1024.0),
102        );
103
104        Self {
105            field,
106            state: parking_lot::Mutex::new(PrimaryKeyState {
107                bloom,
108                uncommitted: FxHashSet::default(),
109            }),
110            committed_data: pk_data,
111            _snapshot: Some(snapshot),
112        }
113    }
114
115    /// Create from a pre-loaded bloom filter (loaded from `pk_bloom.bin`).
116    ///
117    /// Skips dictionary iteration entirely when the persisted bloom covers
118    /// all current segments. `pk_data` contains data for ALL current segments.
119    /// If `new_data` is non-empty, their keys are inserted into the bloom
120    /// before returning (incremental update). `new_data` is a borrowed slice
121    /// pointing to the subset of segments not covered by the persisted bloom.
122    pub fn from_persisted(
123        field: Field,
124        mut bloom: BloomFilter,
125        pk_data: Vec<PkSegmentData>,
126        new_data: &[PkSegmentData],
127        snapshot: SegmentSnapshot,
128    ) -> Self {
129        let mut added = 0usize;
130        for data in new_data {
131            if let Some(ff) = data.fast_fields.get(&field.0)
132                && let Some(dict) = ff.text_dict()
133            {
134                for key in dict.iter() {
135                    bloom.insert(key.as_bytes());
136                    added += 1;
137                }
138            }
139        }
140
141        log::info!(
142            "[primary_key] bloom filter loaded from cache: {:.2} MB{}",
143            bloom.size_bytes() as f64 / (1024.0 * 1024.0),
144            if added > 0 {
145                format!(
146                    ", added {} keys from {} new segment(s)",
147                    added,
148                    new_data.len()
149                )
150            } else {
151                String::new()
152            },
153        );
154
155        Self {
156            field,
157            state: parking_lot::Mutex::new(PrimaryKeyState {
158                bloom,
159                uncommitted: FxHashSet::default(),
160            }),
161            committed_data: pk_data,
162            _snapshot: Some(snapshot),
163        }
164    }
165
166    /// Serialize the bloom filter for persistence to `pk_bloom.bin`.
167    pub fn bloom_to_bytes(&self) -> Vec<u8> {
168        self.state.lock().bloom.to_bytes()
169    }
170
171    /// Memory used by the bloom filter and uncommitted set.
172    pub fn memory_bytes(&self) -> usize {
173        let state = self.state.lock();
174        state.bloom.size_bytes() + state.uncommitted.len() * 32 // estimate 32 bytes per key
175    }
176
177    /// Check whether a document's primary key is unique, and if so, register it.
178    ///
179    /// Returns `Ok(())` if the key is new (inserted into bloom + uncommitted set).
180    /// Returns `Err(DuplicatePrimaryKey)` if the key already exists.
181    /// Returns `Err(Document)` if the primary key field is missing or empty.
182    pub fn check_and_insert(&self, doc: &crate::dsl::Document) -> Result<()> {
183        let value = doc
184            .get_first(self.field)
185            .ok_or_else(|| Error::Document("Missing primary key field".into()))?;
186        let key = value
187            .as_text()
188            .ok_or_else(|| Error::Document("Primary key must be text".into()))?;
189        if key.is_empty() {
190            return Err(Error::Document("Primary key must not be empty".into()));
191        }
192
193        let key_bytes = key.as_bytes();
194
195        {
196            let mut state = self.state.lock();
197
198            // Fast path: bloom says definitely not present → new key.
199            if !state.bloom.may_contain(key_bytes) {
200                state.bloom.insert(key_bytes);
201                state.uncommitted.insert(key_bytes.to_vec());
202                return Ok(());
203            }
204
205            // Bloom positive → check uncommitted set first (fast, in-memory).
206            if state.uncommitted.contains(key_bytes) {
207                return Err(Error::DuplicatePrimaryKey(key.to_string()));
208            }
209        }
210        // Lock released — check committed segments without holding mutex.
211        // committed_data is immutable (only changed via &mut self methods).
212        for data in &self.committed_data {
213            if let Some(ff) = data.fast_fields.get(&self.field.0)
214                && let Some(dict) = ff.text_dict()
215                && dict.ordinal(key).is_some()
216            {
217                return Err(Error::DuplicatePrimaryKey(key.to_string()));
218            }
219        }
220
221        // Re-acquire lock to insert. Re-check uncommitted in case another
222        // thread inserted the same key while we were scanning committed segments.
223        let mut state = self.state.lock();
224        if state.uncommitted.contains(key_bytes) {
225            return Err(Error::DuplicatePrimaryKey(key.to_string()));
226        }
227
228        // Bloom false positive — key is genuinely new.
229        state.bloom.insert(key_bytes);
230        state.uncommitted.insert(key_bytes.to_vec());
231        Ok(())
232    }
233
234    /// Refresh after commit: merge new segment data, prune removed segments,
235    /// insert new keys into bloom, and clear uncommitted set.
236    ///
237    /// Only `new_data` (segments not already held) need to be loaded by the
238    /// caller. Existing data for segments still in `snapshot` is retained.
239    /// The snapshot keeps ref counts alive so segments aren't deleted.
240    pub fn refresh_incremental(&mut self, new_data: Vec<PkSegmentData>, snapshot: SegmentSnapshot) {
241        let new_seg_ids: HashSet<&str> =
242            snapshot.segment_ids().iter().map(|s| s.as_str()).collect();
243
244        // Insert new segments' keys into bloom (these were uncommitted before).
245        // get_mut() bypasses the mutex — safe because we have &mut self.
246        let state = self.state.get_mut();
247        for data in &new_data {
248            if let Some(ff) = data.fast_fields.get(&self.field.0)
249                && let Some(dict) = ff.text_dict()
250            {
251                for key in dict.iter() {
252                    state.bloom.insert(key.as_bytes());
253                }
254            }
255        }
256        state.uncommitted.clear();
257
258        // Keep existing data for segments still in the snapshot
259        let mut kept: Vec<PkSegmentData> = self
260            .committed_data
261            .drain(..)
262            .filter(|d| new_seg_ids.contains(d.segment_id.as_str()))
263            .collect();
264        kept.extend(new_data);
265        self.committed_data = kept;
266        self._snapshot = Some(snapshot);
267    }
268
269    /// Iterator over segment IDs already held in this PK index.
270    pub fn committed_segment_ids(&self) -> impl Iterator<Item = &str> {
271        self.committed_data.iter().map(|d| d.segment_id.as_str())
272    }
273
274    /// Roll back an uncommitted key registration (e.g. when channel send fails
275    /// after check_and_insert succeeded). Bloom may retain the key but that only
276    /// causes harmless false positives, never missed duplicates.
277    pub fn rollback_uncommitted_key(&self, doc: &crate::dsl::Document) {
278        if let Some(value) = doc.get_first(self.field)
279            && let Some(key) = value.as_text()
280        {
281            self.state.lock().uncommitted.remove(key.as_bytes());
282        }
283    }
284
285    /// Clear uncommitted keys (e.g. on abort). Bloom may retain stale entries
286    /// but that only causes harmless false positives (extra committed-segment
287    /// lookups), never missed duplicates.
288    pub fn clear_uncommitted(&mut self) {
289        self.state.get_mut().uncommitted.clear();
290    }
291}
292
293/// Serialize a bloom filter with the segment IDs it covers into `pk_bloom.bin` format.
294///
295/// Layout: `[magic:u32][num_segs:u32][seg_id_hex × 32 bytes each...][bloom_bytes...]`
296pub fn serialize_pk_bloom(segment_ids: &[String], bloom_bytes: &[u8]) -> Vec<u8> {
297    let mut data = Vec::with_capacity(8 + segment_ids.len() * 32 + bloom_bytes.len());
298    data.write_u32::<LittleEndian>(PK_BLOOM_MAGIC).unwrap();
299    data.write_u32::<LittleEndian>(segment_ids.len() as u32)
300        .unwrap();
301    for seg_id in segment_ids {
302        let bytes = seg_id.as_bytes();
303        data.extend_from_slice(bytes);
304        // Pad to 32 bytes (segment IDs are 32-char hex strings)
305        data.extend(std::iter::repeat_n(0u8, 32 - bytes.len()));
306    }
307    data.extend_from_slice(bloom_bytes);
308    data
309}
310
311/// Deserialize `pk_bloom.bin`. Returns the set of covered segment IDs and the bloom filter,
312/// or `None` if the data is corrupt / wrong magic.
313pub fn deserialize_pk_bloom(data: &[u8]) -> Option<(HashSet<String>, BloomFilter)> {
314    if data.len() < 8 {
315        return None;
316    }
317    let magic = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
318    if magic != PK_BLOOM_MAGIC {
319        return None;
320    }
321    let num_segments = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
322    let header_end = 8 + num_segments * 32;
323    if data.len() < header_end + 12 {
324        return None;
325    }
326    let mut segment_ids = HashSet::with_capacity(num_segments);
327    for i in 0..num_segments {
328        let start = 8 + i * 32;
329        let raw = &data[start..start + 32];
330        let end = raw.iter().position(|&b| b == 0).unwrap_or(32);
331        let hex = std::str::from_utf8(&raw[..end]).ok()?;
332        segment_ids.insert(hex.to_string());
333    }
334    let bloom = BloomFilter::from_bytes_mutable(&data[header_end..]).ok()?;
335    Some((segment_ids, bloom))
336}
337
338#[cfg(test)]
339mod tests {
340    use std::sync::Arc;
341
342    use super::*;
343    use crate::dsl::{Document, Field};
344    use crate::segment::SegmentTracker;
345
346    fn make_doc(field: Field, key: &str) -> Document {
347        let mut doc = Document::new();
348        doc.add_text(field, key);
349        doc
350    }
351
352    fn empty_snapshot() -> SegmentSnapshot {
353        SegmentSnapshot::new(Arc::new(SegmentTracker::new()), vec![])
354    }
355
356    #[test]
357    fn test_new_empty_readers() {
358        let field = Field(0);
359        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
360        // Should construct without panicking
361        let doc = make_doc(field, "key1");
362        assert!(pk.check_and_insert(&doc).is_ok());
363    }
364
365    #[test]
366    fn test_unique_keys_accepted() {
367        let field = Field(0);
368        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
369
370        assert!(pk.check_and_insert(&make_doc(field, "a")).is_ok());
371        assert!(pk.check_and_insert(&make_doc(field, "b")).is_ok());
372        assert!(pk.check_and_insert(&make_doc(field, "c")).is_ok());
373    }
374
375    #[test]
376    fn test_duplicate_uncommitted_rejected() {
377        let field = Field(0);
378        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
379
380        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
381        let result = pk.check_and_insert(&make_doc(field, "key1"));
382        assert!(result.is_err());
383        match result.unwrap_err() {
384            Error::DuplicatePrimaryKey(k) => assert_eq!(k, "key1"),
385            other => panic!("Expected DuplicatePrimaryKey, got {:?}", other),
386        }
387    }
388
389    #[test]
390    fn test_missing_field_rejected() {
391        let field = Field(0);
392        let other_field = Field(1);
393        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
394
395        // Document has a different field, not the primary key field
396        let doc = make_doc(other_field, "value");
397        let result = pk.check_and_insert(&doc);
398        assert!(result.is_err());
399        match result.unwrap_err() {
400            Error::Document(msg) => assert!(msg.contains("Missing"), "{}", msg),
401            other => panic!("Expected Document error, got {:?}", other),
402        }
403    }
404
405    #[test]
406    fn test_empty_key_rejected() {
407        let field = Field(0);
408        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
409
410        let result = pk.check_and_insert(&make_doc(field, ""));
411        assert!(result.is_err());
412        match result.unwrap_err() {
413            Error::Document(msg) => assert!(msg.contains("empty"), "{}", msg),
414            other => panic!("Expected Document error, got {:?}", other),
415        }
416    }
417
418    #[test]
419    fn test_clear_uncommitted() {
420        let field = Field(0);
421        let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
422
423        // Insert key1
424        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
425        // Duplicate should fail
426        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
427
428        // Clear uncommitted
429        pk.clear_uncommitted();
430
431        // After clear, bloom still has key1 but uncommitted doesn't.
432        // With no committed readers, the key should be allowed again
433        // (bloom positive → check uncommitted (not found) → check committed (empty) → accept)
434        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
435    }
436
437    #[test]
438    fn test_many_unique_keys() {
439        let field = Field(0);
440        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
441
442        for i in 0..1000 {
443            let key = format!("key_{}", i);
444            assert!(pk.check_and_insert(&make_doc(field, &key)).is_ok());
445        }
446
447        // All should be duplicates now
448        for i in 0..1000 {
449            let key = format!("key_{}", i);
450            assert!(pk.check_and_insert(&make_doc(field, &key)).is_err());
451        }
452    }
453
454    #[test]
455    fn test_refresh_clears_uncommitted() {
456        let field = Field(0);
457        let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
458
459        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
460        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
461
462        // Refresh with empty data (simulates commit where segments
463        // don't have fast fields — edge case)
464        pk.refresh_incremental(vec![], empty_snapshot());
465
466        // After refresh, uncommitted is cleared and no committed data has
467        // the key, so it should be accepted again
468        assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
469    }
470
471    #[test]
472    fn test_pk_bloom_serialize_roundtrip() {
473        let field = Field(0);
474        let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
475        for i in 0..100 {
476            pk.check_and_insert(&make_doc(field, &format!("key_{}", i)))
477                .unwrap();
478        }
479
480        let seg_ids = vec![
481            "00000000000000000000000000000001".to_string(),
482            "00000000000000000000000000000002".to_string(),
483        ];
484        let bloom_bytes = pk.bloom_to_bytes();
485        let data = serialize_pk_bloom(&seg_ids, &bloom_bytes);
486        let (got_ids, got_bloom) = deserialize_pk_bloom(&data).expect("deserialize failed");
487
488        assert_eq!(got_ids.len(), 2);
489        assert!(got_ids.contains(&seg_ids[0]));
490        assert!(got_ids.contains(&seg_ids[1]));
491
492        // Verify the loaded bloom recognizes previously inserted keys.
493        for i in 0..100 {
494            let key = format!("key_{}", i);
495            assert!(
496                got_bloom.may_contain(key.as_bytes()),
497                "bloom miss for {}",
498                key
499            );
500        }
501    }
502
503    #[test]
504    fn test_pk_bloom_deserialize_bad_data() {
505        assert!(deserialize_pk_bloom(&[]).is_none());
506        assert!(deserialize_pk_bloom(&[0; 7]).is_none());
507        assert!(deserialize_pk_bloom(&[0; 8]).is_none()); // wrong magic
508    }
509
510    #[test]
511    fn test_concurrent_access() {
512        use std::sync::Arc;
513
514        let field = Field(0);
515        let pk = Arc::new(PrimaryKeyIndex::new(field, vec![], empty_snapshot()));
516
517        // Spawn multiple threads trying to insert the same key
518        let mut handles = vec![];
519        for _ in 0..10 {
520            let pk = Arc::clone(&pk);
521            handles.push(std::thread::spawn(move || {
522                pk.check_and_insert(&make_doc(field, "contested_key"))
523            }));
524        }
525
526        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
527        let successes = results.iter().filter(|r| r.is_ok()).count();
528        let failures = results.iter().filter(|r| r.is_err()).count();
529
530        // Exactly one thread should succeed, rest should get DuplicatePrimaryKey
531        assert_eq!(successes, 1, "Exactly one insert should succeed");
532        assert_eq!(failures, 9, "Rest should fail with duplicate");
533    }
534}