hermes_core/index/
primary_key.rs1use std::sync::Arc;
8
9use rustc_hash::FxHashSet;
10
11use crate::dsl::Field;
12use crate::error::{Error, Result};
13use crate::segment::{SegmentReader, SegmentSnapshot};
14use crate::structures::BloomFilter;
15
16const BLOOM_BITS_PER_KEY: usize = 10;
18
19const BLOOM_HEADROOM: usize = 100_000;
21
22pub struct PrimaryKeyIndex {
31 field: Field,
32 state: parking_lot::Mutex<PrimaryKeyState>,
33 committed_readers: Vec<Arc<SegmentReader>>,
36 _snapshot: Option<SegmentSnapshot>,
38}
39
40struct PrimaryKeyState {
41 bloom: BloomFilter,
42 uncommitted: FxHashSet<Vec<u8>>,
43}
44
45impl PrimaryKeyIndex {
46 pub fn new(field: Field, readers: Vec<Arc<SegmentReader>>, snapshot: SegmentSnapshot) -> Self {
52 let mut total_keys: usize = 0;
54 for reader in &readers {
55 if let Some(ff) = reader.fast_field(field.0)
56 && let Some(dict) = ff.text_dict()
57 {
58 total_keys += dict.len() as usize;
59 }
60 }
61
62 let mut bloom = BloomFilter::new(total_keys + BLOOM_HEADROOM, BLOOM_BITS_PER_KEY);
63
64 for reader in &readers {
66 if let Some(ff) = reader.fast_field(field.0)
67 && let Some(dict) = ff.text_dict()
68 {
69 for key in dict.iter() {
70 bloom.insert(key.as_bytes());
71 }
72 }
73 }
74
75 Self {
76 field,
77 state: parking_lot::Mutex::new(PrimaryKeyState {
78 bloom,
79 uncommitted: FxHashSet::default(),
80 }),
81 committed_readers: readers,
82 _snapshot: Some(snapshot),
83 }
84 }
85
86 pub fn check_and_insert(&self, doc: &crate::dsl::Document) -> Result<()> {
92 let value = doc
93 .get_first(self.field)
94 .ok_or_else(|| Error::Document("Missing primary key field".into()))?;
95 let key = value
96 .as_text()
97 .ok_or_else(|| Error::Document("Primary key must be text".into()))?;
98 if key.is_empty() {
99 return Err(Error::Document("Primary key must not be empty".into()));
100 }
101
102 let key_bytes = key.as_bytes();
103
104 {
105 let mut state = self.state.lock();
106
107 if !state.bloom.may_contain(key_bytes) {
109 state.bloom.insert(key_bytes);
110 state.uncommitted.insert(key_bytes.to_vec());
111 return Ok(());
112 }
113
114 if state.uncommitted.contains(key_bytes) {
116 return Err(Error::DuplicatePrimaryKey(key.to_string()));
117 }
118 }
119 for reader in &self.committed_readers {
122 if let Some(ff) = reader.fast_field(self.field.0)
123 && let Some(dict) = ff.text_dict()
124 && dict.ordinal(key).is_some()
125 {
126 return Err(Error::DuplicatePrimaryKey(key.to_string()));
127 }
128 }
129
130 let mut state = self.state.lock();
133 if state.uncommitted.contains(key_bytes) {
134 return Err(Error::DuplicatePrimaryKey(key.to_string()));
135 }
136
137 state.bloom.insert(key_bytes);
139 state.uncommitted.insert(key_bytes.to_vec());
140 Ok(())
141 }
142
143 pub fn refresh(&mut self, new_readers: Vec<Arc<SegmentReader>>, snapshot: SegmentSnapshot) {
150 self.committed_readers = new_readers;
151 self._snapshot = Some(snapshot);
152 let state = self.state.get_mut();
154 state.uncommitted.clear();
155 }
156
157 pub fn rollback_uncommitted_key(&self, doc: &crate::dsl::Document) {
161 if let Some(value) = doc.get_first(self.field)
162 && let Some(key) = value.as_text()
163 {
164 self.state.lock().uncommitted.remove(key.as_bytes());
165 }
166 }
167
168 pub fn clear_uncommitted(&mut self) {
172 self.state.get_mut().uncommitted.clear();
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::dsl::{Document, Field};
180 use crate::segment::SegmentTracker;
181
182 fn make_doc(field: Field, key: &str) -> Document {
183 let mut doc = Document::new();
184 doc.add_text(field, key);
185 doc
186 }
187
188 fn empty_snapshot() -> SegmentSnapshot {
189 SegmentSnapshot::new(Arc::new(SegmentTracker::new()), vec![])
190 }
191
192 #[test]
193 fn test_new_empty_readers() {
194 let field = Field(0);
195 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
196 let doc = make_doc(field, "key1");
198 assert!(pk.check_and_insert(&doc).is_ok());
199 }
200
201 #[test]
202 fn test_unique_keys_accepted() {
203 let field = Field(0);
204 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
205
206 assert!(pk.check_and_insert(&make_doc(field, "a")).is_ok());
207 assert!(pk.check_and_insert(&make_doc(field, "b")).is_ok());
208 assert!(pk.check_and_insert(&make_doc(field, "c")).is_ok());
209 }
210
211 #[test]
212 fn test_duplicate_uncommitted_rejected() {
213 let field = Field(0);
214 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
215
216 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
217 let result = pk.check_and_insert(&make_doc(field, "key1"));
218 assert!(result.is_err());
219 match result.unwrap_err() {
220 Error::DuplicatePrimaryKey(k) => assert_eq!(k, "key1"),
221 other => panic!("Expected DuplicatePrimaryKey, got {:?}", other),
222 }
223 }
224
225 #[test]
226 fn test_missing_field_rejected() {
227 let field = Field(0);
228 let other_field = Field(1);
229 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
230
231 let doc = make_doc(other_field, "value");
233 let result = pk.check_and_insert(&doc);
234 assert!(result.is_err());
235 match result.unwrap_err() {
236 Error::Document(msg) => assert!(msg.contains("Missing"), "{}", msg),
237 other => panic!("Expected Document error, got {:?}", other),
238 }
239 }
240
241 #[test]
242 fn test_empty_key_rejected() {
243 let field = Field(0);
244 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
245
246 let result = pk.check_and_insert(&make_doc(field, ""));
247 assert!(result.is_err());
248 match result.unwrap_err() {
249 Error::Document(msg) => assert!(msg.contains("empty"), "{}", msg),
250 other => panic!("Expected Document error, got {:?}", other),
251 }
252 }
253
254 #[test]
255 fn test_clear_uncommitted() {
256 let field = Field(0);
257 let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
258
259 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
261 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
263
264 pk.clear_uncommitted();
266
267 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
271 }
272
273 #[test]
274 fn test_many_unique_keys() {
275 let field = Field(0);
276 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
277
278 for i in 0..1000 {
279 let key = format!("key_{}", i);
280 assert!(pk.check_and_insert(&make_doc(field, &key)).is_ok());
281 }
282
283 for i in 0..1000 {
285 let key = format!("key_{}", i);
286 assert!(pk.check_and_insert(&make_doc(field, &key)).is_err());
287 }
288 }
289
290 #[test]
291 fn test_refresh_clears_uncommitted() {
292 let field = Field(0);
293 let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
294
295 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
296 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
297
298 pk.refresh(vec![], empty_snapshot());
301
302 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
305 }
306
307 #[test]
308 fn test_concurrent_access() {
309 use std::sync::Arc;
310
311 let field = Field(0);
312 let pk = Arc::new(PrimaryKeyIndex::new(field, vec![], empty_snapshot()));
313
314 let mut handles = vec![];
316 for _ in 0..10 {
317 let pk = Arc::clone(&pk);
318 handles.push(std::thread::spawn(move || {
319 pk.check_and_insert(&make_doc(field, "contested_key"))
320 }));
321 }
322
323 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
324 let successes = results.iter().filter(|r| r.is_ok()).count();
325 let failures = results.iter().filter(|r| r.is_err()).count();
326
327 assert_eq!(successes, 1, "Exactly one insert should succeed");
329 assert_eq!(failures, 9, "Rest should fail with duplicate");
330 }
331}