1use std::sync::Arc;
8
9use rustc_hash::FxHashSet;
10
11use crate::dsl::Field;
12use crate::error::{Error, Result};
13use crate::segment::{SegmentReader, SegmentSnapshot};
14use crate::structures::BloomFilter;
15
16const BLOOM_BITS_PER_KEY: usize = 10;
18
19const BLOOM_HEADROOM: usize = 100_000;
21
22pub struct PrimaryKeyIndex {
31 field: Field,
32 state: parking_lot::Mutex<PrimaryKeyState>,
33 committed_readers: Vec<Arc<SegmentReader>>,
36 _snapshot: Option<SegmentSnapshot>,
38}
39
40struct PrimaryKeyState {
41 bloom: BloomFilter,
42 uncommitted: FxHashSet<Vec<u8>>,
43}
44
45impl PrimaryKeyIndex {
46 pub fn new(field: Field, readers: Vec<Arc<SegmentReader>>, snapshot: SegmentSnapshot) -> Self {
52 let mut total_keys: usize = 0;
54 for reader in &readers {
55 if let Some(ff) = reader.fast_field(field.0)
56 && let Some(dict) = ff.text_dict()
57 {
58 total_keys += dict.len() as usize;
59 }
60 }
61
62 let mut bloom = BloomFilter::new(total_keys + BLOOM_HEADROOM, BLOOM_BITS_PER_KEY);
63
64 for reader in &readers {
66 if let Some(ff) = reader.fast_field(field.0)
67 && let Some(dict) = ff.text_dict()
68 {
69 for key in dict.iter() {
70 bloom.insert(key.as_bytes());
71 }
72 }
73 }
74
75 let bloom_bytes = bloom.size_bytes();
76 log::info!(
77 "[primary_key] bloom filter: {} keys, {:.2} MB",
78 total_keys,
79 bloom_bytes as f64 / (1024.0 * 1024.0),
80 );
81
82 Self {
83 field,
84 state: parking_lot::Mutex::new(PrimaryKeyState {
85 bloom,
86 uncommitted: FxHashSet::default(),
87 }),
88 committed_readers: readers,
89 _snapshot: Some(snapshot),
90 }
91 }
92
93 pub fn memory_bytes(&self) -> usize {
95 let state = self.state.lock();
96 state.bloom.size_bytes() + state.uncommitted.len() * 32 }
98
99 pub fn check_and_insert(&self, doc: &crate::dsl::Document) -> Result<()> {
105 let value = doc
106 .get_first(self.field)
107 .ok_or_else(|| Error::Document("Missing primary key field".into()))?;
108 let key = value
109 .as_text()
110 .ok_or_else(|| Error::Document("Primary key must be text".into()))?;
111 if key.is_empty() {
112 return Err(Error::Document("Primary key must not be empty".into()));
113 }
114
115 let key_bytes = key.as_bytes();
116
117 {
118 let mut state = self.state.lock();
119
120 if !state.bloom.may_contain(key_bytes) {
122 state.bloom.insert(key_bytes);
123 state.uncommitted.insert(key_bytes.to_vec());
124 return Ok(());
125 }
126
127 if state.uncommitted.contains(key_bytes) {
129 return Err(Error::DuplicatePrimaryKey(key.to_string()));
130 }
131 }
132 for reader in &self.committed_readers {
135 if let Some(ff) = reader.fast_field(self.field.0)
136 && let Some(dict) = ff.text_dict()
137 && dict.ordinal(key).is_some()
138 {
139 return Err(Error::DuplicatePrimaryKey(key.to_string()));
140 }
141 }
142
143 let mut state = self.state.lock();
146 if state.uncommitted.contains(key_bytes) {
147 return Err(Error::DuplicatePrimaryKey(key.to_string()));
148 }
149
150 state.bloom.insert(key_bytes);
152 state.uncommitted.insert(key_bytes.to_vec());
153 Ok(())
154 }
155
156 pub fn refresh(&mut self, new_readers: Vec<Arc<SegmentReader>>, snapshot: SegmentSnapshot) {
163 self.committed_readers = new_readers;
164 self._snapshot = Some(snapshot);
165 let state = self.state.get_mut();
167 state.uncommitted.clear();
168 }
169
170 pub fn rollback_uncommitted_key(&self, doc: &crate::dsl::Document) {
174 if let Some(value) = doc.get_first(self.field)
175 && let Some(key) = value.as_text()
176 {
177 self.state.lock().uncommitted.remove(key.as_bytes());
178 }
179 }
180
181 pub fn clear_uncommitted(&mut self) {
185 self.state.get_mut().uncommitted.clear();
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use crate::dsl::{Document, Field};
193 use crate::segment::SegmentTracker;
194
195 fn make_doc(field: Field, key: &str) -> Document {
196 let mut doc = Document::new();
197 doc.add_text(field, key);
198 doc
199 }
200
201 fn empty_snapshot() -> SegmentSnapshot {
202 SegmentSnapshot::new(Arc::new(SegmentTracker::new()), vec![])
203 }
204
205 #[test]
206 fn test_new_empty_readers() {
207 let field = Field(0);
208 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
209 let doc = make_doc(field, "key1");
211 assert!(pk.check_and_insert(&doc).is_ok());
212 }
213
214 #[test]
215 fn test_unique_keys_accepted() {
216 let field = Field(0);
217 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
218
219 assert!(pk.check_and_insert(&make_doc(field, "a")).is_ok());
220 assert!(pk.check_and_insert(&make_doc(field, "b")).is_ok());
221 assert!(pk.check_and_insert(&make_doc(field, "c")).is_ok());
222 }
223
224 #[test]
225 fn test_duplicate_uncommitted_rejected() {
226 let field = Field(0);
227 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
228
229 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
230 let result = pk.check_and_insert(&make_doc(field, "key1"));
231 assert!(result.is_err());
232 match result.unwrap_err() {
233 Error::DuplicatePrimaryKey(k) => assert_eq!(k, "key1"),
234 other => panic!("Expected DuplicatePrimaryKey, got {:?}", other),
235 }
236 }
237
238 #[test]
239 fn test_missing_field_rejected() {
240 let field = Field(0);
241 let other_field = Field(1);
242 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
243
244 let doc = make_doc(other_field, "value");
246 let result = pk.check_and_insert(&doc);
247 assert!(result.is_err());
248 match result.unwrap_err() {
249 Error::Document(msg) => assert!(msg.contains("Missing"), "{}", msg),
250 other => panic!("Expected Document error, got {:?}", other),
251 }
252 }
253
254 #[test]
255 fn test_empty_key_rejected() {
256 let field = Field(0);
257 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
258
259 let result = pk.check_and_insert(&make_doc(field, ""));
260 assert!(result.is_err());
261 match result.unwrap_err() {
262 Error::Document(msg) => assert!(msg.contains("empty"), "{}", msg),
263 other => panic!("Expected Document error, got {:?}", other),
264 }
265 }
266
267 #[test]
268 fn test_clear_uncommitted() {
269 let field = Field(0);
270 let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
271
272 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
274 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
276
277 pk.clear_uncommitted();
279
280 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
284 }
285
286 #[test]
287 fn test_many_unique_keys() {
288 let field = Field(0);
289 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
290
291 for i in 0..1000 {
292 let key = format!("key_{}", i);
293 assert!(pk.check_and_insert(&make_doc(field, &key)).is_ok());
294 }
295
296 for i in 0..1000 {
298 let key = format!("key_{}", i);
299 assert!(pk.check_and_insert(&make_doc(field, &key)).is_err());
300 }
301 }
302
303 #[test]
304 fn test_refresh_clears_uncommitted() {
305 let field = Field(0);
306 let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
307
308 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
309 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
310
311 pk.refresh(vec![], empty_snapshot());
314
315 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
318 }
319
320 #[test]
321 fn test_concurrent_access() {
322 use std::sync::Arc;
323
324 let field = Field(0);
325 let pk = Arc::new(PrimaryKeyIndex::new(field, vec![], empty_snapshot()));
326
327 let mut handles = vec![];
329 for _ in 0..10 {
330 let pk = Arc::clone(&pk);
331 handles.push(std::thread::spawn(move || {
332 pk.check_and_insert(&make_doc(field, "contested_key"))
333 }));
334 }
335
336 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
337 let successes = results.iter().filter(|r| r.is_ok()).count();
338 let failures = results.iter().filter(|r| r.is_err()).count();
339
340 assert_eq!(successes, 1, "Exactly one insert should succeed");
342 assert_eq!(failures, 9, "Rest should fail with duplicate");
343 }
344}