1use std::collections::HashSet;
12
13use byteorder::{LittleEndian, WriteBytesExt};
14use rustc_hash::{FxHashMap, FxHashSet};
15
16use crate::dsl::Field;
17use crate::error::{Error, Result};
18use crate::segment::SegmentSnapshot;
19use crate::structures::BloomFilter;
20
21const BLOOM_BITS_PER_KEY: usize = 10;
23
24const BLOOM_HEADROOM: usize = 100_000;
26
27pub const PK_BLOOM_FILE: &str = "pk_bloom.bin";
29
30const PK_BLOOM_MAGIC: u32 = 0x504B424C; pub struct PkSegmentData {
38 pub segment_id: String,
39 pub fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
40}
41
42pub struct PrimaryKeyIndex {
51 field: Field,
52 state: parking_lot::Mutex<PrimaryKeyState>,
53 committed_data: Vec<PkSegmentData>,
56 _snapshot: Option<SegmentSnapshot>,
58}
59
60struct PrimaryKeyState {
61 bloom: BloomFilter,
62 uncommitted: FxHashSet<Vec<u8>>,
63}
64
65impl PrimaryKeyIndex {
66 pub fn new(field: Field, pk_data: Vec<PkSegmentData>, snapshot: SegmentSnapshot) -> Self {
74 let mut total_keys: usize = 0;
76 for data in &pk_data {
77 if let Some(ff) = data.fast_fields.get(&field.0)
78 && let Some(dict) = ff.text_dict()
79 {
80 total_keys += dict.len() as usize;
81 }
82 }
83
84 let mut bloom = BloomFilter::new(total_keys + BLOOM_HEADROOM, BLOOM_BITS_PER_KEY);
85
86 for data in &pk_data {
88 if let Some(ff) = data.fast_fields.get(&field.0)
89 && let Some(dict) = ff.text_dict()
90 {
91 for key in dict.iter() {
92 bloom.insert(key.as_bytes());
93 }
94 }
95 }
96
97 let bloom_bytes = bloom.size_bytes();
98 log::info!(
99 "[primary_key] bloom filter: {} keys, {:.2} MB",
100 total_keys,
101 bloom_bytes as f64 / (1024.0 * 1024.0),
102 );
103
104 Self {
105 field,
106 state: parking_lot::Mutex::new(PrimaryKeyState {
107 bloom,
108 uncommitted: FxHashSet::default(),
109 }),
110 committed_data: pk_data,
111 _snapshot: Some(snapshot),
112 }
113 }
114
115 pub fn from_persisted(
123 field: Field,
124 mut bloom: BloomFilter,
125 pk_data: Vec<PkSegmentData>,
126 new_data: &[PkSegmentData],
127 snapshot: SegmentSnapshot,
128 ) -> Self {
129 let mut added = 0usize;
130 for data in new_data {
131 if let Some(ff) = data.fast_fields.get(&field.0)
132 && let Some(dict) = ff.text_dict()
133 {
134 for key in dict.iter() {
135 bloom.insert(key.as_bytes());
136 added += 1;
137 }
138 }
139 }
140
141 log::info!(
142 "[primary_key] bloom filter loaded from cache: {:.2} MB{}",
143 bloom.size_bytes() as f64 / (1024.0 * 1024.0),
144 if added > 0 {
145 format!(
146 ", added {} keys from {} new segment(s)",
147 added,
148 new_data.len()
149 )
150 } else {
151 String::new()
152 },
153 );
154
155 Self {
156 field,
157 state: parking_lot::Mutex::new(PrimaryKeyState {
158 bloom,
159 uncommitted: FxHashSet::default(),
160 }),
161 committed_data: pk_data,
162 _snapshot: Some(snapshot),
163 }
164 }
165
166 pub fn bloom_to_bytes(&self) -> Vec<u8> {
168 self.state.lock().bloom.to_bytes()
169 }
170
171 pub fn memory_bytes(&self) -> usize {
173 let state = self.state.lock();
174 state.bloom.size_bytes() + state.uncommitted.len() * 32 }
176
177 pub fn check_and_insert(&self, doc: &crate::dsl::Document) -> Result<()> {
183 let value = doc
184 .get_first(self.field)
185 .ok_or_else(|| Error::Document("Missing primary key field".into()))?;
186 let key = value
187 .as_text()
188 .ok_or_else(|| Error::Document("Primary key must be text".into()))?;
189 if key.is_empty() {
190 return Err(Error::Document("Primary key must not be empty".into()));
191 }
192
193 let key_bytes = key.as_bytes();
194
195 {
196 let mut state = self.state.lock();
197
198 if !state.bloom.may_contain(key_bytes) {
200 state.bloom.insert(key_bytes);
201 state.uncommitted.insert(key_bytes.to_vec());
202 return Ok(());
203 }
204
205 if state.uncommitted.contains(key_bytes) {
207 return Err(Error::DuplicatePrimaryKey(key.to_string()));
208 }
209 }
210 for data in &self.committed_data {
213 if let Some(ff) = data.fast_fields.get(&self.field.0)
214 && let Some(dict) = ff.text_dict()
215 && dict.ordinal(key).is_some()
216 {
217 return Err(Error::DuplicatePrimaryKey(key.to_string()));
218 }
219 }
220
221 let mut state = self.state.lock();
224 if state.uncommitted.contains(key_bytes) {
225 return Err(Error::DuplicatePrimaryKey(key.to_string()));
226 }
227
228 state.bloom.insert(key_bytes);
230 state.uncommitted.insert(key_bytes.to_vec());
231 Ok(())
232 }
233
234 pub fn refresh_incremental(&mut self, new_data: Vec<PkSegmentData>, snapshot: SegmentSnapshot) {
241 let new_seg_ids: HashSet<&str> =
242 snapshot.segment_ids().iter().map(|s| s.as_str()).collect();
243
244 let state = self.state.get_mut();
247 for data in &new_data {
248 if let Some(ff) = data.fast_fields.get(&self.field.0)
249 && let Some(dict) = ff.text_dict()
250 {
251 for key in dict.iter() {
252 state.bloom.insert(key.as_bytes());
253 }
254 }
255 }
256 state.uncommitted.clear();
257
258 let mut kept: Vec<PkSegmentData> = self
260 .committed_data
261 .drain(..)
262 .filter(|d| new_seg_ids.contains(d.segment_id.as_str()))
263 .collect();
264 kept.extend(new_data);
265 self.committed_data = kept;
266 self._snapshot = Some(snapshot);
267 }
268
269 pub fn committed_segment_ids(&self) -> impl Iterator<Item = &str> {
271 self.committed_data.iter().map(|d| d.segment_id.as_str())
272 }
273
274 pub fn rollback_uncommitted_key(&self, doc: &crate::dsl::Document) {
278 if let Some(value) = doc.get_first(self.field)
279 && let Some(key) = value.as_text()
280 {
281 self.state.lock().uncommitted.remove(key.as_bytes());
282 }
283 }
284
285 pub fn clear_uncommitted(&mut self) {
289 self.state.get_mut().uncommitted.clear();
290 }
291}
292
293pub fn serialize_pk_bloom(segment_ids: &[String], bloom_bytes: &[u8]) -> Vec<u8> {
297 let mut data = Vec::with_capacity(8 + segment_ids.len() * 32 + bloom_bytes.len());
298 data.write_u32::<LittleEndian>(PK_BLOOM_MAGIC).unwrap();
299 data.write_u32::<LittleEndian>(segment_ids.len() as u32)
300 .unwrap();
301 for seg_id in segment_ids {
302 let bytes = seg_id.as_bytes();
303 data.extend_from_slice(bytes);
304 data.extend(std::iter::repeat_n(0u8, 32 - bytes.len()));
306 }
307 data.extend_from_slice(bloom_bytes);
308 data
309}
310
311pub fn deserialize_pk_bloom(data: &[u8]) -> Option<(HashSet<String>, BloomFilter)> {
314 if data.len() < 8 {
315 return None;
316 }
317 let magic = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
318 if magic != PK_BLOOM_MAGIC {
319 return None;
320 }
321 let num_segments = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
322 let header_end = 8 + num_segments * 32;
323 if data.len() < header_end + 12 {
324 return None;
325 }
326 let mut segment_ids = HashSet::with_capacity(num_segments);
327 for i in 0..num_segments {
328 let start = 8 + i * 32;
329 let raw = &data[start..start + 32];
330 let end = raw.iter().position(|&b| b == 0).unwrap_or(32);
331 let hex = std::str::from_utf8(&raw[..end]).ok()?;
332 segment_ids.insert(hex.to_string());
333 }
334 let bloom = BloomFilter::from_bytes_mutable(&data[header_end..]).ok()?;
335 Some((segment_ids, bloom))
336}
337
338#[cfg(test)]
339mod tests {
340 use std::sync::Arc;
341
342 use super::*;
343 use crate::dsl::{Document, Field};
344 use crate::segment::SegmentTracker;
345
346 fn make_doc(field: Field, key: &str) -> Document {
347 let mut doc = Document::new();
348 doc.add_text(field, key);
349 doc
350 }
351
352 fn empty_snapshot() -> SegmentSnapshot {
353 SegmentSnapshot::new(Arc::new(SegmentTracker::new()), vec![])
354 }
355
356 #[test]
357 fn test_new_empty_readers() {
358 let field = Field(0);
359 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
360 let doc = make_doc(field, "key1");
362 assert!(pk.check_and_insert(&doc).is_ok());
363 }
364
365 #[test]
366 fn test_unique_keys_accepted() {
367 let field = Field(0);
368 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
369
370 assert!(pk.check_and_insert(&make_doc(field, "a")).is_ok());
371 assert!(pk.check_and_insert(&make_doc(field, "b")).is_ok());
372 assert!(pk.check_and_insert(&make_doc(field, "c")).is_ok());
373 }
374
375 #[test]
376 fn test_duplicate_uncommitted_rejected() {
377 let field = Field(0);
378 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
379
380 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
381 let result = pk.check_and_insert(&make_doc(field, "key1"));
382 assert!(result.is_err());
383 match result.unwrap_err() {
384 Error::DuplicatePrimaryKey(k) => assert_eq!(k, "key1"),
385 other => panic!("Expected DuplicatePrimaryKey, got {:?}", other),
386 }
387 }
388
389 #[test]
390 fn test_missing_field_rejected() {
391 let field = Field(0);
392 let other_field = Field(1);
393 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
394
395 let doc = make_doc(other_field, "value");
397 let result = pk.check_and_insert(&doc);
398 assert!(result.is_err());
399 match result.unwrap_err() {
400 Error::Document(msg) => assert!(msg.contains("Missing"), "{}", msg),
401 other => panic!("Expected Document error, got {:?}", other),
402 }
403 }
404
405 #[test]
406 fn test_empty_key_rejected() {
407 let field = Field(0);
408 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
409
410 let result = pk.check_and_insert(&make_doc(field, ""));
411 assert!(result.is_err());
412 match result.unwrap_err() {
413 Error::Document(msg) => assert!(msg.contains("empty"), "{}", msg),
414 other => panic!("Expected Document error, got {:?}", other),
415 }
416 }
417
418 #[test]
419 fn test_clear_uncommitted() {
420 let field = Field(0);
421 let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
422
423 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
425 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
427
428 pk.clear_uncommitted();
430
431 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
435 }
436
437 #[test]
438 fn test_many_unique_keys() {
439 let field = Field(0);
440 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
441
442 for i in 0..1000 {
443 let key = format!("key_{}", i);
444 assert!(pk.check_and_insert(&make_doc(field, &key)).is_ok());
445 }
446
447 for i in 0..1000 {
449 let key = format!("key_{}", i);
450 assert!(pk.check_and_insert(&make_doc(field, &key)).is_err());
451 }
452 }
453
454 #[test]
455 fn test_refresh_clears_uncommitted() {
456 let field = Field(0);
457 let mut pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
458
459 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
460 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_err());
461
462 pk.refresh_incremental(vec![], empty_snapshot());
465
466 assert!(pk.check_and_insert(&make_doc(field, "key1")).is_ok());
469 }
470
471 #[test]
472 fn test_pk_bloom_serialize_roundtrip() {
473 let field = Field(0);
474 let pk = PrimaryKeyIndex::new(field, vec![], empty_snapshot());
475 for i in 0..100 {
476 pk.check_and_insert(&make_doc(field, &format!("key_{}", i)))
477 .unwrap();
478 }
479
480 let seg_ids = vec![
481 "00000000000000000000000000000001".to_string(),
482 "00000000000000000000000000000002".to_string(),
483 ];
484 let bloom_bytes = pk.bloom_to_bytes();
485 let data = serialize_pk_bloom(&seg_ids, &bloom_bytes);
486 let (got_ids, got_bloom) = deserialize_pk_bloom(&data).expect("deserialize failed");
487
488 assert_eq!(got_ids.len(), 2);
489 assert!(got_ids.contains(&seg_ids[0]));
490 assert!(got_ids.contains(&seg_ids[1]));
491
492 for i in 0..100 {
494 let key = format!("key_{}", i);
495 assert!(
496 got_bloom.may_contain(key.as_bytes()),
497 "bloom miss for {}",
498 key
499 );
500 }
501 }
502
503 #[test]
504 fn test_pk_bloom_deserialize_bad_data() {
505 assert!(deserialize_pk_bloom(&[]).is_none());
506 assert!(deserialize_pk_bloom(&[0; 7]).is_none());
507 assert!(deserialize_pk_bloom(&[0; 8]).is_none()); }
509
510 #[test]
511 fn test_concurrent_access() {
512 use std::sync::Arc;
513
514 let field = Field(0);
515 let pk = Arc::new(PrimaryKeyIndex::new(field, vec![], empty_snapshot()));
516
517 let mut handles = vec![];
519 for _ in 0..10 {
520 let pk = Arc::clone(&pk);
521 handles.push(std::thread::spawn(move || {
522 pk.check_and_insert(&make_doc(field, "contested_key"))
523 }));
524 }
525
526 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
527 let successes = results.iter().filter(|r| r.is_ok()).count();
528 let failures = results.iter().filter(|r| r.is_err()).count();
529
530 assert_eq!(successes, 1, "Exactly one insert should succeed");
532 assert_eq!(failures, 9, "Rest should fail with duplicate");
533 }
534}