1#[cfg_attr(not(feature = "native"), allow(dead_code))]
12pub(crate) mod bmp;
13mod config;
14mod dense;
15#[cfg(feature = "diagnostics")]
16mod diagnostics;
17#[cfg_attr(not(feature = "native"), allow(dead_code))]
18pub(crate) mod graph_bisection;
19mod postings;
20mod sparse;
21mod store;
22
23pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
24
25#[cfg(feature = "native")]
26use std::fs::{File, OpenOptions};
27#[cfg(feature = "native")]
28use std::io::BufWriter;
29use std::io::Write;
30use std::mem::size_of;
31#[cfg(feature = "native")]
32use std::path::PathBuf;
33
34use hashbrown::HashMap;
35use rustc_hash::FxHashMap;
36
37#[cfg(feature = "native")]
39use lasso::{Rodeo, Spur};
40
41#[cfg(not(feature = "native"))]
42pub(crate) mod simple_interner {
43 use hashbrown::HashMap;
44
45 #[derive(Clone, Copy, PartialEq, Eq, Hash)]
46 pub struct Spur(u32);
47
48 pub struct Rodeo {
51 strings: Vec<Box<str>>,
53 map: HashMap<&'static str, u32>,
56 }
57
58 impl Rodeo {
59 pub fn new() -> Self {
60 Self {
61 strings: Vec::new(),
62 map: HashMap::new(),
63 }
64 }
65
66 pub fn get(&self, key: &str) -> Option<Spur> {
67 self.map.get(key).map(|&id| Spur(id))
68 }
69
70 pub fn get_or_intern(&mut self, key: &str) -> Spur {
71 if let Some(&id) = self.map.get(key) {
72 return Spur(id);
73 }
74 let id = self.strings.len() as u32;
75 let boxed: Box<str> = key.into();
76 let static_ref: &'static str = unsafe { &*(boxed.as_ref() as *const str) };
79 self.strings.push(boxed);
80 self.map.insert(static_ref, id);
81 Spur(id)
82 }
83
84 pub fn resolve(&self, spur: &Spur) -> &str {
85 &self.strings[spur.0 as usize]
86 }
87
88 pub fn len(&self) -> usize {
89 self.strings.len()
90 }
91 }
92}
93
94#[cfg(not(feature = "native"))]
95use simple_interner::{Rodeo, Spur};
96
97use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
98use std::sync::Arc;
99
100use crate::directories::{Directory, DirectoryWriter};
101use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
102use crate::tokenizer::BoxedTokenizer;
103use crate::{DocId, Result};
104
105use dense::{BinaryDenseVectorBuilder, DenseVectorBuilder};
106use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
107use sparse::SparseVectorBuilder;
108
109const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
115
116const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
118
119const NEW_POS_TERM_OVERHEAD: usize =
121 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
122
123pub struct SegmentBuilder {
130 schema: Arc<Schema>,
131 config: SegmentBuilderConfig,
132 tokenizers: FxHashMap<Field, BoxedTokenizer>,
133
134 term_interner: Rodeo,
136
137 inverted_index: HashMap<TermKey, PostingListBuilder>,
139
140 #[cfg(feature = "native")]
142 posting_spill_file: Option<BufWriter<File>>,
143 #[cfg(feature = "native")]
144 posting_spill_path: PathBuf,
145 #[cfg(feature = "native")]
147 posting_spill_index: HashMap<TermKey, Vec<(u64, u32)>>,
148 #[cfg(feature = "native")]
149 posting_spill_offset: u64,
150
151 #[cfg(feature = "native")]
153 store_file: BufWriter<File>,
154 #[cfg(feature = "native")]
155 store_path: PathBuf,
156 #[cfg(not(feature = "native"))]
157 store_buffer: Vec<u8>,
158
159 next_doc_id: DocId,
161
162 field_stats: FxHashMap<u32, FieldStats>,
164
165 doc_field_lengths: Vec<u32>,
169 num_indexed_fields: usize,
170 field_to_slot: FxHashMap<u32, usize>,
171
172 local_tf_buffer: FxHashMap<Spur, u32>,
175
176 local_positions: FxHashMap<Spur, Vec<u32>>,
179
180 token_buffer: String,
182
183 numeric_buffer: String,
185
186 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
189
190 binary_dense_vectors: FxHashMap<u32, BinaryDenseVectorBuilder>,
192
193 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
196
197 position_index: HashMap<TermKey, PositionPostingListBuilder>,
200
201 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
203
204 current_element_ordinal: FxHashMap<u32, u32>,
206
207 estimated_memory: usize,
209
210 doc_serialize_buffer: Vec<u8>,
212
213 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
215}
216
217impl SegmentBuilder {
218 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
220 #[cfg(feature = "native")]
221 let (store_file, store_path, spill_path) = {
222 let segment_id = uuid::Uuid::new_v4();
223 let store_path = config
224 .temp_dir
225 .join(format!("hermes_store_{}.tmp", segment_id));
226 let store_file = BufWriter::with_capacity(
227 STORE_BUFFER_SIZE,
228 OpenOptions::new()
229 .create(true)
230 .write(true)
231 .truncate(true)
232 .open(&store_path)?,
233 );
234 let spill_path = config
235 .temp_dir
236 .join(format!("hermes_spill_{}.tmp", segment_id));
237 (store_file, store_path, spill_path)
238 };
239
240 let registry = crate::tokenizer::TokenizerRegistry::new();
242 let mut num_indexed_fields = 0;
243 let mut field_to_slot = FxHashMap::default();
244 let mut position_enabled_fields = FxHashMap::default();
245 let mut tokenizers = FxHashMap::default();
246 for (field, entry) in schema.fields() {
247 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
248 field_to_slot.insert(field.0, num_indexed_fields);
249 num_indexed_fields += 1;
250 if entry.positions.is_some() {
251 position_enabled_fields.insert(field.0, entry.positions);
252 }
253 if let Some(ref tok_name) = entry.tokenizer
254 && let Some(tokenizer) = registry.get(tok_name)
255 {
256 tokenizers.insert(field, tokenizer);
257 }
258 }
259 }
260
261 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
263 let mut fast_fields = FxHashMap::default();
264 for (field, entry) in schema.fields() {
265 if entry.fast {
266 let writer = if entry.multi {
267 match entry.field_type {
268 FieldType::U64 => {
269 FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
270 }
271 FieldType::I64 => {
272 FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
273 }
274 FieldType::F64 => {
275 FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
276 }
277 FieldType::Text => FastFieldWriter::new_text_multi(),
278 _ => continue,
279 }
280 } else {
281 match entry.field_type {
282 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
283 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
284 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
285 FieldType::Text => FastFieldWriter::new_text(),
286 _ => continue,
287 }
288 };
289 fast_fields.insert(field.0, writer);
290 }
291 }
292
293 Ok(Self {
294 schema,
295 tokenizers,
296 term_interner: Rodeo::new(),
297 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
298 #[cfg(feature = "native")]
299 posting_spill_file: None,
300 #[cfg(feature = "native")]
301 posting_spill_path: spill_path,
302 #[cfg(feature = "native")]
303 posting_spill_index: HashMap::new(),
304 #[cfg(feature = "native")]
305 posting_spill_offset: 0,
306 #[cfg(feature = "native")]
307 store_file,
308 #[cfg(feature = "native")]
309 store_path,
310 #[cfg(not(feature = "native"))]
311 store_buffer: Vec::with_capacity(STORE_BUFFER_SIZE),
312 next_doc_id: 0,
313 field_stats: FxHashMap::default(),
314 doc_field_lengths: Vec::new(),
315 num_indexed_fields,
316 field_to_slot,
317 local_tf_buffer: FxHashMap::default(),
318 local_positions: FxHashMap::default(),
319 token_buffer: String::with_capacity(64),
320 numeric_buffer: String::with_capacity(32),
321 config,
322 dense_vectors: FxHashMap::default(),
323 binary_dense_vectors: FxHashMap::default(),
324 sparse_vectors: FxHashMap::default(),
325 position_index: HashMap::new(),
326 position_enabled_fields,
327 current_element_ordinal: FxHashMap::default(),
328 estimated_memory: 0,
329 doc_serialize_buffer: Vec::with_capacity(256),
330 fast_fields,
331 })
332 }
333
334 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
335 self.tokenizers.insert(field, tokenizer);
336 }
337
338 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
341 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
342 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
343 ordinal
344 }
345
346 pub fn num_docs(&self) -> u32 {
347 self.next_doc_id
348 }
349
350 #[inline]
352 pub fn estimated_memory_bytes(&self) -> usize {
353 self.estimated_memory
354 }
355
356 pub fn sparse_dim_count(&self) -> usize {
358 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
359 }
360
361 pub fn stats(&self) -> SegmentBuilderStats {
363 use std::mem::size_of;
364
365 let postings_in_memory: usize =
366 self.inverted_index.values().map(|p| p.postings.len()).sum();
367
368 let compact_posting_size = size_of::<CompactPosting>();
370 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
372 let posting_builder_size = size_of::<PostingListBuilder>();
373 let spur_size = size_of::<Spur>();
374 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
375
376 let hashmap_entry_base_overhead = 8usize;
379
380 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
382
383 let postings_bytes: usize = self
385 .inverted_index
386 .values()
387 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
388 .sum();
389
390 let index_overhead_bytes = self.inverted_index.len()
392 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
393
394 let interner_arena_overhead = 2 * size_of::<usize>();
397 let avg_term_len = 8; let interner_bytes =
399 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
400
401 let field_lengths_bytes =
403 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
404
405 let mut dense_vectors_bytes: usize = 0;
407 let mut dense_vector_count: usize = 0;
408 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
409 for b in self.dense_vectors.values() {
410 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
411 + b.doc_ids.capacity() * doc_id_ordinal_size
412 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
414 }
415 for b in self.binary_dense_vectors.values() {
417 dense_vectors_bytes += b.vectors.capacity()
418 + b.doc_ids.capacity() * doc_id_ordinal_size
419 + 2 * vec_overhead;
420 dense_vector_count += b.doc_ids.len();
421 }
422
423 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
425 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
426
427 let mut sparse_vectors_bytes: usize = 0;
429 for builder in self.sparse_vectors.values() {
430 for postings in builder.postings.values() {
431 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
432 }
433 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
435 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
436 }
437 let outer_sparse_entry_size =
439 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
440 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
441
442 let mut position_index_bytes: usize = 0;
444 for pos_builder in self.position_index.values() {
445 for (_, positions) in &pos_builder.postings {
446 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
447 }
448 let pos_entry_size = size_of::<DocId>() + vec_overhead;
450 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
451 }
452 let pos_index_entry_size =
454 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
455 position_index_bytes += self.position_index.len() * pos_index_entry_size;
456
457 let estimated_memory_bytes = postings_bytes
458 + index_overhead_bytes
459 + interner_bytes
460 + field_lengths_bytes
461 + dense_vectors_bytes
462 + local_tf_buffer_bytes
463 + sparse_vectors_bytes
464 + position_index_bytes;
465
466 let memory_breakdown = MemoryBreakdown {
467 postings_bytes,
468 index_overhead_bytes,
469 interner_bytes,
470 field_lengths_bytes,
471 dense_vectors_bytes,
472 dense_vector_count,
473 sparse_vectors_bytes,
474 position_index_bytes,
475 };
476
477 SegmentBuilderStats {
478 num_docs: self.next_doc_id,
479 unique_terms: self.inverted_index.len(),
480 postings_in_memory,
481 interned_strings: self.term_interner.len(),
482 doc_field_lengths_size: self.doc_field_lengths.len(),
483 estimated_memory_bytes,
484 memory_breakdown,
485 }
486 }
487
488 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
490 let doc_id = self.next_doc_id;
491 self.next_doc_id += 1;
492
493 let base_idx = self.doc_field_lengths.len();
495 self.doc_field_lengths
496 .resize(base_idx + self.num_indexed_fields, 0);
497 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
498
499 self.current_element_ordinal.clear();
501
502 for (field, value) in doc.field_values() {
503 let Some(entry) = self.schema.get_field_entry(*field) else {
504 continue;
505 };
506
507 if !matches!(
510 &entry.field_type,
511 FieldType::DenseVector | FieldType::BinaryDenseVector
512 ) && !entry.indexed
513 && !entry.fast
514 {
515 continue;
516 }
517
518 match (&entry.field_type, value) {
519 (FieldType::Text, FieldValue::Text(text)) => {
520 if entry.indexed {
521 let element_ordinal = self.next_element_ordinal(field.0);
522 let token_count =
523 self.index_text_field(*field, doc_id, text, element_ordinal)?;
524
525 let stats = self.field_stats.entry(field.0).or_default();
526 stats.total_tokens += token_count as u64;
527 if element_ordinal == 0 {
528 stats.doc_count += 1;
529 }
530
531 if let Some(&slot) = self.field_to_slot.get(&field.0) {
532 self.doc_field_lengths[base_idx + slot] = token_count;
533 }
534 }
535
536 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
538 ff.add_text(doc_id, text);
539 }
540 }
541 (FieldType::U64, FieldValue::U64(v)) => {
542 if entry.indexed {
543 self.index_numeric_field(*field, doc_id, *v)?;
544 }
545 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
546 ff.add_u64(doc_id, *v);
547 }
548 }
549 (FieldType::I64, FieldValue::I64(v)) => {
550 if entry.indexed {
551 self.index_numeric_field(*field, doc_id, *v as u64)?;
552 }
553 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
554 ff.add_i64(doc_id, *v);
555 }
556 }
557 (FieldType::F64, FieldValue::F64(v)) => {
558 if entry.indexed {
559 self.index_numeric_field(*field, doc_id, v.to_bits())?;
560 }
561 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
562 ff.add_f64(doc_id, *v);
563 }
564 }
565 (FieldType::DenseVector, FieldValue::DenseVector(vec))
566 if entry.indexed || entry.stored =>
567 {
568 let ordinal = self.next_element_ordinal(field.0);
569 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
570 }
571 (FieldType::BinaryDenseVector, FieldValue::BinaryDenseVector(bytes))
572 if entry.indexed || entry.stored =>
573 {
574 let ordinal = self.next_element_ordinal(field.0);
575 self.index_binary_dense_vector_field(*field, doc_id, ordinal as u16, bytes)?;
576 }
577 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
578 let ordinal = self.next_element_ordinal(field.0);
579 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
580 }
581 _ => {}
582 }
583 }
584
585 self.write_document_to_store(&doc)?;
587
588 Ok(doc_id)
589 }
590
591 fn index_text_field(
600 &mut self,
601 field: Field,
602 doc_id: DocId,
603 text: &str,
604 element_ordinal: u32,
605 ) -> Result<u32> {
606 use crate::dsl::PositionMode;
607
608 let field_id = field.0;
609 let position_mode = self
610 .position_enabled_fields
611 .get(&field_id)
612 .copied()
613 .flatten();
614
615 self.local_tf_buffer.clear();
619 for v in self.local_positions.values_mut() {
621 v.clear();
622 }
623
624 let mut token_position = 0u32;
625
626 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
630
631 if let Some(tokens) = custom_tokens {
632 for token in &tokens {
634 let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
635 spur
636 } else {
637 let spur = self.term_interner.get_or_intern(&token.text);
638 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
639 spur
640 };
641 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
642
643 if let Some(mode) = position_mode {
644 let encoded_pos = match mode {
645 PositionMode::Ordinal => element_ordinal << 20,
646 PositionMode::TokenPosition => token.position,
647 PositionMode::Full => (element_ordinal << 20) | token.position,
648 };
649 self.local_positions
650 .entry(term_spur)
651 .or_default()
652 .push(encoded_pos);
653 }
654 }
655 token_position = tokens.len() as u32;
656 } else {
657 for word in text.split_whitespace() {
659 self.token_buffer.clear();
660 for c in word.chars() {
661 if c.is_alphanumeric() {
662 for lc in c.to_lowercase() {
663 self.token_buffer.push(lc);
664 }
665 }
666 }
667
668 if self.token_buffer.is_empty() {
669 continue;
670 }
671
672 let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
673 spur
674 } else {
675 let spur = self.term_interner.get_or_intern(&self.token_buffer);
676 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
677 spur
678 };
679 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
680
681 if let Some(mode) = position_mode {
682 let encoded_pos = match mode {
683 PositionMode::Ordinal => element_ordinal << 20,
684 PositionMode::TokenPosition => token_position,
685 PositionMode::Full => (element_ordinal << 20) | token_position,
686 };
687 self.local_positions
688 .entry(term_spur)
689 .or_default()
690 .push(encoded_pos);
691 }
692
693 token_position += 1;
694 }
695 }
696
697 for (&term_spur, &tf) in &self.local_tf_buffer {
700 let term_key = TermKey {
701 field: field_id,
702 term: term_spur,
703 };
704
705 match self.inverted_index.entry(term_key) {
706 hashbrown::hash_map::Entry::Occupied(mut o) => {
707 o.get_mut().add(doc_id, tf);
708 self.estimated_memory += size_of::<CompactPosting>();
709 #[cfg(feature = "native")]
711 if o.get().should_spill() {
712 use byteorder::{LittleEndian, WriteBytesExt};
713
714 let builder = o.get_mut();
715 let count = builder.postings.len() as u32;
716 let offset = self.posting_spill_offset;
717
718 let spill_file = if let Some(ref mut f) = self.posting_spill_file {
720 f
721 } else {
722 self.posting_spill_file = Some(BufWriter::with_capacity(
723 256 * 1024,
724 OpenOptions::new()
725 .create(true)
726 .write(true)
727 .truncate(true)
728 .open(&self.posting_spill_path)?,
729 ));
730 self.posting_spill_file.as_mut().unwrap()
731 };
732 for p in &builder.postings {
733 spill_file.write_u32::<LittleEndian>(p.doc_id)?;
734 spill_file.write_u16::<LittleEndian>(p.term_freq)?;
735 }
736 self.posting_spill_offset += count as u64 * 6;
737 self.posting_spill_index
738 .entry(term_key)
739 .or_default()
740 .push((offset, count));
741
742 let freed = builder.postings.len() * size_of::<CompactPosting>();
743 builder.spilled_count += count;
744 builder.postings.clear();
745 self.estimated_memory -= freed;
746 }
747 }
748 hashbrown::hash_map::Entry::Vacant(v) => {
749 let mut posting = PostingListBuilder::new();
750 posting.add(doc_id, tf);
751 v.insert(posting);
752 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
753 }
754 }
755
756 if position_mode.is_some()
757 && let Some(positions) = self.local_positions.get(&term_spur)
758 {
759 match self.position_index.entry(term_key) {
760 hashbrown::hash_map::Entry::Occupied(mut o) => {
761 for &pos in positions {
762 o.get_mut().add_position(doc_id, pos);
763 }
764 self.estimated_memory += positions.len() * size_of::<u32>();
765 }
766 hashbrown::hash_map::Entry::Vacant(v) => {
767 let mut pos_posting = PositionPostingListBuilder::new();
768 for &pos in positions {
769 pos_posting.add_position(doc_id, pos);
770 }
771 self.estimated_memory +=
772 positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
773 v.insert(pos_posting);
774 }
775 }
776 }
777 }
778
779 Ok(token_position)
780 }
781
782 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
783 use std::fmt::Write;
784
785 self.numeric_buffer.clear();
786 write!(self.numeric_buffer, "__num_{}", value).unwrap();
787 let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
788 spur
789 } else {
790 let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
791 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
792 spur
793 };
794
795 let term_key = TermKey {
796 field: field.0,
797 term: term_spur,
798 };
799
800 match self.inverted_index.entry(term_key) {
801 hashbrown::hash_map::Entry::Occupied(mut o) => {
802 o.get_mut().add(doc_id, 1);
803 self.estimated_memory += size_of::<CompactPosting>();
804 }
805 hashbrown::hash_map::Entry::Vacant(v) => {
806 let mut posting = PostingListBuilder::new();
807 posting.add(doc_id, 1);
808 v.insert(posting);
809 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
810 }
811 }
812
813 Ok(())
814 }
815
816 fn index_dense_vector_field(
818 &mut self,
819 field: Field,
820 doc_id: DocId,
821 ordinal: u16,
822 vector: &[f32],
823 ) -> Result<()> {
824 let dim = vector.len();
825
826 let builder = self
827 .dense_vectors
828 .entry(field.0)
829 .or_insert_with(|| DenseVectorBuilder::new(dim));
830
831 if builder.dim != dim && builder.len() > 0 {
833 return Err(crate::Error::Schema(format!(
834 "Dense vector dimension mismatch: expected {}, got {}",
835 builder.dim, dim
836 )));
837 }
838
839 builder.add(doc_id, ordinal, vector);
840
841 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
842
843 Ok(())
844 }
845
846 fn index_binary_dense_vector_field(
848 &mut self,
849 field: Field,
850 doc_id: DocId,
851 ordinal: u16,
852 bytes: &[u8],
853 ) -> Result<()> {
854 let dim_bits = self
855 .schema
856 .get_field_entry(field)
857 .and_then(|e| e.binary_dense_vector_config.as_ref())
858 .map(|c| c.dim)
859 .ok_or_else(|| {
860 crate::Error::Schema("BinaryDenseVector field missing config".to_string())
861 })?;
862
863 let expected_byte_len = dim_bits.div_ceil(8);
864 if bytes.len() != expected_byte_len {
865 return Err(crate::Error::Schema(format!(
866 "Binary vector byte length mismatch: expected {} (dim={}), got {}",
867 expected_byte_len,
868 dim_bits,
869 bytes.len()
870 )));
871 }
872
873 let builder = self
874 .binary_dense_vectors
875 .entry(field.0)
876 .or_insert_with(|| BinaryDenseVectorBuilder::new(dim_bits));
877
878 builder.add(doc_id, ordinal, bytes);
879 self.estimated_memory += bytes.len() + size_of::<(DocId, u16)>();
880
881 Ok(())
882 }
883
884 fn index_sparse_vector_field(
891 &mut self,
892 field: Field,
893 doc_id: DocId,
894 ordinal: u16,
895 entries: &[(u32, f32)],
896 ) -> Result<()> {
897 let weight_threshold = self
899 .schema
900 .get_field_entry(field)
901 .and_then(|entry| entry.sparse_vector_config.as_ref())
902 .map(|config| config.weight_threshold)
903 .unwrap_or(0.0);
904
905 let builder = self
906 .sparse_vectors
907 .entry(field.0)
908 .or_insert_with(SparseVectorBuilder::new);
909
910 builder.inc_vector_count();
911
912 for &(dim_id, weight) in entries {
913 if weight.abs() < weight_threshold {
915 continue;
916 }
917
918 let is_new_dim = !builder.postings.contains_key(&dim_id);
919 builder.add(dim_id, doc_id, ordinal, weight);
920 self.estimated_memory += size_of::<(DocId, u16, f32)>();
921 if is_new_dim {
922 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
925 }
926
927 Ok(())
928 }
929
930 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
932 use byteorder::{LittleEndian, WriteBytesExt};
933
934 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
935
936 #[cfg(feature = "native")]
937 {
938 self.store_file
939 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
940 self.store_file.write_all(&self.doc_serialize_buffer)?;
941 }
942 #[cfg(not(feature = "native"))]
943 {
944 self.store_buffer
945 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
946 self.store_buffer.write_all(&self.doc_serialize_buffer)?;
947 }
948
949 Ok(())
950 }
951
952 pub async fn build<D: Directory + DirectoryWriter>(
958 mut self,
959 dir: &D,
960 segment_id: SegmentId,
961 trained: Option<&super::TrainedVectorStructures>,
962 ) -> Result<SegmentMeta> {
963 #[cfg(feature = "native")]
965 self.store_file.flush()?;
966
967 let files = SegmentFiles::new(segment_id.0);
968
969 let position_index = std::mem::take(&mut self.position_index);
971 let position_offsets = if !position_index.is_empty() {
972 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
973 let offsets = postings::build_positions_streaming(
974 position_index,
975 &self.term_interner,
976 &mut *pos_writer,
977 )?;
978 pos_writer.finish()?;
979 offsets
980 } else {
981 FxHashMap::default()
982 };
983
984 let inverted_index = std::mem::take(&mut self.inverted_index);
987 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
988 #[cfg(feature = "native")]
989 let store_path = self.store_path.clone();
990 #[cfg(feature = "native")]
991 let num_compression_threads = self.config.num_compression_threads;
992 let compression_level = self.config.compression_level;
993 let dense_vectors = std::mem::take(&mut self.dense_vectors);
994 let binary_dense_vectors = std::mem::take(&mut self.binary_dense_vectors);
995 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
996 let schema = &self.schema;
997
998 let mut term_dict_writer =
1001 super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
1002 let mut postings_writer =
1003 super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
1004 let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
1005 let mut vectors_writer = if !dense_vectors.is_empty() || !binary_dense_vectors.is_empty() {
1006 Some(super::OffsetWriter::new(
1007 dir.streaming_writer(&files.vectors).await?,
1008 ))
1009 } else {
1010 None
1011 };
1012 let mut sparse_writer = if !sparse_vectors.is_empty() {
1013 Some(super::OffsetWriter::new(
1014 dir.streaming_writer(&files.sparse).await?,
1015 ))
1016 } else {
1017 None
1018 };
1019 let mut fast_fields = std::mem::take(&mut self.fast_fields);
1020 let num_docs = self.next_doc_id;
1021 let mut fast_writer = if !fast_fields.is_empty() {
1022 Some(super::OffsetWriter::new(
1023 dir.streaming_writer(&files.fast).await?,
1024 ))
1025 } else {
1026 None
1027 };
1028
1029 #[cfg(feature = "native")]
1030 {
1031 if let Some(ref mut f) = self.posting_spill_file {
1032 f.flush()?;
1033 }
1034 let posting_spill_index = std::mem::take(&mut self.posting_spill_index);
1035 let mut spill_reader_opt = if !posting_spill_index.is_empty() {
1036 let spill_file = std::fs::File::open(&self.posting_spill_path)?;
1037 Some((std::io::BufReader::new(spill_file), posting_spill_index))
1038 } else {
1039 None
1040 };
1041
1042 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
1043 rayon::join(
1044 || {
1045 rayon::join(
1046 || {
1047 let spill_arg = spill_reader_opt.as_mut().map(|(r, idx)| {
1048 (
1049 r as &mut std::io::BufReader<std::fs::File>,
1050 idx as &postings::SpillIndex,
1051 )
1052 });
1053 postings::build_postings_streaming(
1054 inverted_index,
1055 term_interner,
1056 &position_offsets,
1057 &mut term_dict_writer,
1058 &mut postings_writer,
1059 spill_arg,
1060 )
1061 },
1062 || {
1063 store::build_store_streaming(
1064 &store_path,
1065 num_compression_threads,
1066 compression_level,
1067 &mut store_writer,
1068 num_docs,
1069 )
1070 },
1071 )
1072 },
1073 || {
1074 rayon::join(
1075 || {
1076 rayon::join(
1077 || -> Result<()> {
1078 if let Some(ref mut w) = vectors_writer {
1079 dense::build_vectors_streaming(
1080 dense_vectors,
1081 binary_dense_vectors,
1082 schema,
1083 trained,
1084 w,
1085 )?;
1086 }
1087 Ok(())
1088 },
1089 || -> Result<()> {
1090 if let Some(ref mut w) = sparse_writer {
1091 sparse::build_sparse_streaming(
1092 &mut sparse_vectors,
1093 schema,
1094 w,
1095 )?;
1096 }
1097 Ok(())
1098 },
1099 )
1100 },
1101 || -> Result<()> {
1102 if let Some(ref mut w) = fast_writer {
1103 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1104 }
1105 Ok(())
1106 },
1107 )
1108 },
1109 );
1110 postings_result?;
1111 store_result?;
1112 vectors_result?;
1113 sparse_result?;
1114 fast_result?;
1115 }
1116
1117 #[cfg(not(feature = "native"))]
1118 {
1119 postings::build_postings_streaming(
1120 inverted_index,
1121 term_interner,
1122 &position_offsets,
1123 &mut term_dict_writer,
1124 &mut postings_writer,
1125 )?;
1126 store::build_store_streaming_from_buffer(
1127 &self.store_buffer,
1128 compression_level,
1129 &mut store_writer,
1130 num_docs,
1131 )?;
1132 if let Some(ref mut w) = vectors_writer {
1133 dense::build_vectors_streaming(
1134 dense_vectors,
1135 binary_dense_vectors,
1136 schema,
1137 trained,
1138 w,
1139 )?;
1140 }
1141 if let Some(ref mut w) = sparse_writer {
1142 sparse::build_sparse_streaming(&mut sparse_vectors, schema, w)?;
1143 }
1144 if let Some(ref mut w) = fast_writer {
1145 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1146 }
1147 }
1148
1149 let term_dict_bytes = term_dict_writer.offset() as usize;
1150 let postings_bytes = postings_writer.offset() as usize;
1151 let store_bytes = store_writer.offset() as usize;
1152 let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
1153 let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
1154 let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
1155
1156 term_dict_writer.finish()?;
1157 postings_writer.finish()?;
1158 store_writer.finish()?;
1159 if let Some(w) = vectors_writer {
1160 w.finish()?;
1161 }
1162 if let Some(w) = sparse_writer {
1163 w.finish()?;
1164 }
1165 if let Some(w) = fast_writer {
1166 w.finish()?;
1167 }
1168 drop(position_offsets);
1169 drop(sparse_vectors);
1170
1171 log::info!(
1172 "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
1173 num_docs,
1174 super::format_bytes(term_dict_bytes),
1175 super::format_bytes(postings_bytes),
1176 super::format_bytes(store_bytes),
1177 super::format_bytes(vectors_bytes),
1178 super::format_bytes(sparse_bytes),
1179 super::format_bytes(fast_bytes),
1180 );
1181
1182 let meta = SegmentMeta {
1183 id: segment_id.0,
1184 num_docs: self.next_doc_id,
1185 field_stats: self.field_stats.clone(),
1186 };
1187
1188 dir.write(&files.meta, &meta.serialize()?).await?;
1189
1190 #[cfg(feature = "native")]
1192 {
1193 let _ = std::fs::remove_file(&self.store_path);
1194 }
1195
1196 Ok(meta)
1197 }
1198}
1199
1200fn build_fast_fields_streaming(
1202 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
1203 num_docs: u32,
1204 writer: &mut dyn Write,
1205) -> Result<()> {
1206 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
1207
1208 if fast_fields.is_empty() {
1209 return Ok(());
1210 }
1211
1212 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
1214 field_ids.sort_unstable();
1215
1216 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
1217 let mut current_offset = 0u64;
1218
1219 for &field_id in &field_ids {
1220 let ff = fast_fields.get_mut(&field_id).unwrap();
1221 ff.pad_to(num_docs);
1222
1223 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
1224 toc.field_id = field_id;
1225 current_offset += bytes_written;
1226 toc_entries.push(toc);
1227 }
1228
1229 let toc_offset = current_offset;
1231 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
1232
1233 Ok(())
1234}
1235
1236#[cfg(feature = "native")]
1237impl Drop for SegmentBuilder {
1238 fn drop(&mut self) {
1239 let _ = std::fs::remove_file(&self.store_path);
1240 if self.posting_spill_file.is_some() {
1241 let _ = std::fs::remove_file(&self.posting_spill_path);
1242 }
1243 }
1244}