1#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::{BoxedTokenizer, LowercaseTokenizer};
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46#[cfg(feature = "native")]
48const POSTING_FLUSH_THRESHOLD: usize = 100_000;
49
50#[cfg(feature = "native")]
52const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[cfg(feature = "native")]
57const NUM_INDEX_SHARDS: usize = 64;
58
59#[cfg(feature = "native")]
61#[derive(Clone, Copy, PartialEq, Eq, Hash)]
62struct TermKey {
63 field: u32,
64 term: Spur,
65}
66
67#[cfg(feature = "native")]
68impl TermKey {
69 #[inline]
71 fn shard(&self) -> usize {
72 (self.term.into_inner().get() as usize) & (NUM_INDEX_SHARDS - 1)
75 }
76}
77
78#[cfg(feature = "native")]
81struct ShardedInvertedIndex {
82 shards: Vec<HashMap<TermKey, SpillablePostingList>>,
83}
84
85#[cfg(feature = "native")]
86impl ShardedInvertedIndex {
87 fn new(capacity_per_shard: usize) -> Self {
88 let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
89 for _ in 0..NUM_INDEX_SHARDS {
90 shards.push(HashMap::with_capacity(capacity_per_shard));
91 }
92 Self { shards }
93 }
94
95 #[inline]
96 fn get_mut(&mut self, key: &TermKey) -> Option<&mut SpillablePostingList> {
97 self.shards[key.shard()].get_mut(key)
98 }
99
100 #[inline]
102 fn get_or_insert(&mut self, key: TermKey) -> &mut SpillablePostingList {
103 self.shards[key.shard()]
104 .entry(key)
105 .or_insert_with(SpillablePostingList::new)
106 }
107
108 #[allow(dead_code)]
109 fn len(&self) -> usize {
110 self.shards.iter().map(|s| s.len()).sum()
111 }
112}
113
114#[cfg(feature = "native")]
116struct ShardedIndexIter<'a> {
117 shards: std::slice::Iter<'a, HashMap<TermKey, SpillablePostingList>>,
118 current: Option<hashbrown::hash_map::Iter<'a, TermKey, SpillablePostingList>>,
119}
120
121#[cfg(feature = "native")]
122impl<'a> Iterator for ShardedIndexIter<'a> {
123 type Item = (&'a TermKey, &'a SpillablePostingList);
124
125 fn next(&mut self) -> Option<Self::Item> {
126 loop {
127 if let Some(ref mut current) = self.current
128 && let Some(item) = current.next()
129 {
130 return Some(item);
131 }
132 match self.shards.next() {
134 Some(shard) => self.current = Some(shard.iter()),
135 None => return None,
136 }
137 }
138 }
139}
140
141#[cfg(feature = "native")]
142impl<'a> IntoIterator for &'a ShardedInvertedIndex {
143 type Item = (&'a TermKey, &'a SpillablePostingList);
144 type IntoIter = ShardedIndexIter<'a>;
145
146 fn into_iter(self) -> Self::IntoIter {
147 ShardedIndexIter {
148 shards: self.shards.iter(),
149 current: None,
150 }
151 }
152}
153
154#[cfg(feature = "native")]
156#[derive(Clone, Copy)]
157struct CompactPosting {
158 doc_id: DocId,
159 term_freq: u16, }
161
162#[cfg(feature = "native")]
164struct SpillablePostingList {
165 memory: Vec<CompactPosting>,
167 spill_offset: i64,
169 spill_count: u32,
171}
172
173#[cfg(feature = "native")]
174impl SpillablePostingList {
175 fn new() -> Self {
176 Self {
177 memory: Vec::new(),
178 spill_offset: -1,
179 spill_count: 0,
180 }
181 }
182
183 #[allow(dead_code)]
184 fn with_capacity(capacity: usize) -> Self {
185 Self {
186 memory: Vec::with_capacity(capacity),
187 spill_offset: -1,
188 spill_count: 0,
189 }
190 }
191
192 #[inline]
193 fn add(&mut self, doc_id: DocId, term_freq: u32) {
194 if let Some(last) = self.memory.last_mut()
196 && last.doc_id == doc_id
197 {
198 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
199 return;
200 }
201 self.memory.push(CompactPosting {
202 doc_id,
203 term_freq: term_freq.min(u16::MAX as u32) as u16,
204 });
205 }
206
207 fn total_count(&self) -> usize {
208 self.memory.len() + self.spill_count as usize
209 }
210
211 fn needs_spill(&self) -> bool {
212 self.memory.len() >= POSTING_FLUSH_THRESHOLD
213 }
214}
215
216#[cfg(feature = "native")]
218#[derive(Clone)]
219pub struct SegmentBuilderConfig {
220 pub temp_dir: PathBuf,
222 pub compression_level: CompressionLevel,
224 pub num_compression_threads: usize,
226 pub interner_capacity: usize,
228 pub posting_map_capacity: usize,
230}
231
232#[cfg(feature = "native")]
233impl Default for SegmentBuilderConfig {
234 fn default() -> Self {
235 Self {
236 temp_dir: std::env::temp_dir(),
237 compression_level: CompressionLevel(7),
238 num_compression_threads: num_cpus::get(),
239 interner_capacity: 1_000_000,
240 posting_map_capacity: 500_000,
241 }
242 }
243}
244
245#[cfg(feature = "native")]
253pub struct SegmentBuilder {
254 schema: Schema,
255 config: SegmentBuilderConfig,
256 tokenizers: FxHashMap<Field, BoxedTokenizer>,
257
258 term_interner: Rodeo,
260
261 inverted_index: ShardedInvertedIndex,
264
265 store_file: BufWriter<File>,
267 store_path: PathBuf,
268
269 spill_file: Option<BufWriter<File>>,
271 spill_path: PathBuf,
272 spill_offset: u64,
273
274 next_doc_id: DocId,
276
277 field_stats: FxHashMap<u32, FieldStats>,
279
280 doc_field_lengths: Vec<u32>,
284 num_indexed_fields: usize,
285 field_to_slot: FxHashMap<u32, usize>,
286
287 wand_data: Option<Arc<WandData>>,
289
290 local_tf_buffer: FxHashMap<Spur, u32>,
293
294 terms_to_spill_buffer: Vec<TermKey>,
296}
297
298#[cfg(feature = "native")]
299impl SegmentBuilder {
300 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
302 let segment_id = uuid::Uuid::new_v4();
303 let store_path = config
304 .temp_dir
305 .join(format!("hermes_store_{}.tmp", segment_id));
306 let spill_path = config
307 .temp_dir
308 .join(format!("hermes_spill_{}.tmp", segment_id));
309
310 let store_file = BufWriter::with_capacity(
311 SPILL_BUFFER_SIZE,
312 OpenOptions::new()
313 .create(true)
314 .write(true)
315 .truncate(true)
316 .open(&store_path)?,
317 );
318
319 let mut num_indexed_fields = 0;
321 let mut field_to_slot = FxHashMap::default();
322 for (field, entry) in schema.fields() {
323 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
324 field_to_slot.insert(field.0, num_indexed_fields);
325 num_indexed_fields += 1;
326 }
327 }
328
329 Ok(Self {
330 schema,
331 tokenizers: FxHashMap::default(),
332 term_interner: Rodeo::new(),
333 inverted_index: ShardedInvertedIndex::new(
334 config.posting_map_capacity / NUM_INDEX_SHARDS,
335 ),
336 store_file,
337 store_path,
338 spill_file: None,
339 spill_path,
340 spill_offset: 0,
341 next_doc_id: 0,
342 field_stats: FxHashMap::default(),
343 doc_field_lengths: Vec::new(),
344 num_indexed_fields,
345 field_to_slot,
346 wand_data: None,
347 local_tf_buffer: FxHashMap::default(),
348 terms_to_spill_buffer: Vec::new(),
349 config,
350 })
351 }
352
353 pub fn with_wand_data(
355 schema: Schema,
356 config: SegmentBuilderConfig,
357 wand_data: Arc<WandData>,
358 ) -> Result<Self> {
359 let mut builder = Self::new(schema, config)?;
360 builder.wand_data = Some(wand_data);
361 Ok(builder)
362 }
363
364 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
365 self.tokenizers.insert(field, tokenizer);
366 }
367
368 pub fn num_docs(&self) -> u32 {
369 self.next_doc_id
370 }
371
372 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
374 let doc_id = self.next_doc_id;
375 self.next_doc_id += 1;
376
377 let base_idx = self.doc_field_lengths.len();
379 self.doc_field_lengths
380 .resize(base_idx + self.num_indexed_fields, 0);
381
382 for (field, value) in doc.field_values() {
383 let entry = self.schema.get_field_entry(*field);
384 if entry.is_none() || !entry.unwrap().indexed {
385 continue;
386 }
387
388 let entry = entry.unwrap();
389 match (&entry.field_type, value) {
390 (FieldType::Text, FieldValue::Text(text)) => {
391 let token_count = self.index_text_field(*field, doc_id, text)?;
392
393 let stats = self.field_stats.entry(field.0).or_default();
395 stats.total_tokens += token_count as u64;
396 stats.doc_count += 1;
397
398 if let Some(&slot) = self.field_to_slot.get(&field.0) {
400 self.doc_field_lengths[base_idx + slot] = token_count;
401 }
402 }
403 (FieldType::U64, FieldValue::U64(v)) => {
404 self.index_numeric_field(*field, doc_id, *v)?;
405 }
406 (FieldType::I64, FieldValue::I64(v)) => {
407 self.index_numeric_field(*field, doc_id, *v as u64)?;
408 }
409 (FieldType::F64, FieldValue::F64(v)) => {
410 self.index_numeric_field(*field, doc_id, v.to_bits())?;
411 }
412 _ => {}
413 }
414 }
415
416 self.write_document_to_store(&doc)?;
418
419 Ok(doc_id)
420 }
421
422 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
428 let default_tokenizer = LowercaseTokenizer;
429 let tokenizer: &dyn crate::tokenizer::TokenizerClone = self
430 .tokenizers
431 .get(&field)
432 .map(|t| t.as_ref())
433 .unwrap_or(&default_tokenizer);
434
435 let tokens = tokenizer.tokenize(text);
436 let token_count = tokens.len() as u32;
437
438 self.local_tf_buffer.clear();
441
442 for token in tokens {
443 let term_spur = self.term_interner.get_or_intern(&token.text);
445 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
446 }
447
448 let field_id = field.0;
452 self.terms_to_spill_buffer.clear();
453
454 for (&term_spur, &tf) in &self.local_tf_buffer {
455 let term_key = TermKey {
456 field: field_id,
457 term: term_spur,
458 };
459
460 let posting = self.inverted_index.get_or_insert(term_key);
461 posting.add(doc_id, tf);
462
463 if posting.needs_spill() {
465 self.terms_to_spill_buffer.push(term_key);
466 }
467 }
468
469 for i in 0..self.terms_to_spill_buffer.len() {
471 let term_key = self.terms_to_spill_buffer[i];
472 self.spill_posting_list(term_key)?;
473 }
474
475 Ok(token_count)
476 }
477
478 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
479 let term_str = format!("__num_{}", value);
481 let term_spur = self.term_interner.get_or_intern(&term_str);
482
483 let term_key = TermKey {
484 field: field.0,
485 term: term_spur,
486 };
487
488 let posting = self.inverted_index.get_or_insert(term_key);
489 posting.add(doc_id, 1);
490
491 Ok(())
492 }
493
494 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
496 use byteorder::{LittleEndian, WriteBytesExt};
497
498 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
499
500 self.store_file
501 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
502 self.store_file.write_all(&doc_bytes)?;
503
504 Ok(())
505 }
506
507 fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
509 use byteorder::{LittleEndian, WriteBytesExt};
510
511 let posting = self.inverted_index.get_mut(&term_key).unwrap();
512
513 if self.spill_file.is_none() {
515 let file = OpenOptions::new()
516 .create(true)
517 .write(true)
518 .read(true)
519 .truncate(true)
520 .open(&self.spill_path)?;
521 self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
522 }
523
524 let spill_file = self.spill_file.as_mut().unwrap();
525
526 if posting.spill_offset < 0 {
528 posting.spill_offset = self.spill_offset as i64;
529 }
530
531 for p in &posting.memory {
533 spill_file.write_u32::<LittleEndian>(p.doc_id)?;
534 spill_file.write_u16::<LittleEndian>(p.term_freq)?;
535 self.spill_offset += 6; }
537
538 posting.spill_count += posting.memory.len() as u32;
539 posting.memory.clear();
540 posting.memory.shrink_to(POSTING_FLUSH_THRESHOLD / 4); Ok(())
543 }
544
545 pub async fn build<D: Directory + DirectoryWriter>(
547 mut self,
548 dir: &D,
549 segment_id: SegmentId,
550 ) -> Result<SegmentMeta> {
551 self.store_file.flush()?;
553 if let Some(ref mut spill) = self.spill_file {
554 spill.flush()?;
555 }
556
557 let files = SegmentFiles::new(segment_id.0);
558
559 let (term_dict_data, postings_data) = self.build_postings()?;
561
562 let store_data = self.build_store_from_stream()?;
564
565 dir.write(&files.term_dict, &term_dict_data).await?;
567 dir.write(&files.postings, &postings_data).await?;
568 dir.write(&files.store, &store_data).await?;
569
570 let meta = SegmentMeta {
571 id: segment_id.0,
572 num_docs: self.next_doc_id,
573 field_stats: self.field_stats.clone(),
574 };
575
576 dir.write(&files.meta, &meta.serialize()?).await?;
577
578 let _ = std::fs::remove_file(&self.store_path);
580 let _ = std::fs::remove_file(&self.spill_path);
581
582 Ok(meta)
583 }
584
585 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
587 use std::collections::BTreeMap;
588
589 let mut sorted_terms: BTreeMap<Vec<u8>, &SpillablePostingList> = BTreeMap::new();
592
593 for (term_key, posting_list) in &self.inverted_index {
594 let term_str = self.term_interner.resolve(&term_key.term);
595 let mut key = Vec::with_capacity(4 + term_str.len());
596 key.extend_from_slice(&term_key.field.to_le_bytes());
597 key.extend_from_slice(term_str.as_bytes());
598 sorted_terms.insert(key, posting_list);
599 }
600
601 let mut term_dict = Vec::new();
602 let mut postings = Vec::new();
603 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
604
605 let spill_mmap = if self.spill_file.is_some() {
607 drop(self.spill_file.take()); let file = File::open(&self.spill_path)?;
609 Some(unsafe { memmap2::Mmap::map(&file)? })
610 } else {
611 None
612 };
613
614 for (key, spill_posting) in sorted_terms {
615 let mut full_postings = PostingList::with_capacity(spill_posting.total_count());
617
618 if spill_posting.spill_offset >= 0
620 && let Some(ref mmap) = spill_mmap
621 {
622 let mut offset = spill_posting.spill_offset as usize;
623 for _ in 0..spill_posting.spill_count {
624 let doc_id = u32::from_le_bytes([
625 mmap[offset],
626 mmap[offset + 1],
627 mmap[offset + 2],
628 mmap[offset + 3],
629 ]);
630 let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
631 full_postings.push(doc_id, term_freq as u32);
632 offset += 6;
633 }
634 }
635
636 for p in &spill_posting.memory {
638 full_postings.push(p.doc_id, p.term_freq as u32);
639 }
640
641 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
643 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
644
645 let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
646 inline
647 } else {
648 let posting_offset = postings.len() as u64;
649 let block_list =
650 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
651 block_list.serialize(&mut postings)?;
652 TermInfo::external(
653 posting_offset,
654 (postings.len() as u64 - posting_offset) as u32,
655 full_postings.doc_count(),
656 )
657 };
658
659 writer.insert(&key, &term_info)?;
660 }
661
662 writer.finish()?;
663 Ok((term_dict, postings))
664 }
665
666 fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
668 drop(std::mem::replace(
670 &mut self.store_file,
671 BufWriter::new(File::create("/dev/null")?),
672 ));
673
674 let file = File::open(&self.store_path)?;
675 let mmap = unsafe { memmap2::Mmap::map(&file)? };
676
677 let mut store_data = Vec::new();
679 let mut store_writer =
680 StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
681
682 let mut offset = 0usize;
683 while offset < mmap.len() {
684 if offset + 4 > mmap.len() {
685 break;
686 }
687
688 let doc_len = u32::from_le_bytes([
689 mmap[offset],
690 mmap[offset + 1],
691 mmap[offset + 2],
692 mmap[offset + 3],
693 ]) as usize;
694 offset += 4;
695
696 if offset + doc_len > mmap.len() {
697 break;
698 }
699
700 let doc_bytes = &mmap[offset..offset + doc_len];
701 offset += doc_len;
702
703 if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
705 store_writer.store(&doc, &self.schema)?;
706 }
707 }
708
709 store_writer.finish()?;
710 Ok(store_data)
711 }
712}
713
714#[cfg(feature = "native")]
715impl Drop for SegmentBuilder {
716 fn drop(&mut self) {
717 let _ = std::fs::remove_file(&self.store_path);
719 let _ = std::fs::remove_file(&self.spill_path);
720 }
721}