1mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::path::PathBuf;
20
21use hashbrown::HashMap;
22use lasso::{Rodeo, Spur};
23use rayon::prelude::*;
24use rustc_hash::FxHashMap;
25
26use crate::compression::CompressionLevel;
27
28use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
29use crate::directories::{Directory, DirectoryWriter};
30use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
31use crate::structures::{PostingList, SSTableWriter, TermInfo};
32use crate::tokenizer::BoxedTokenizer;
33use crate::{DocId, Result};
34
35use posting::{
36 CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
37};
38use vectors::{DenseVectorBuilder, SparseVectorBuilder};
39
40pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
42
43const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; pub struct SegmentBuilder {
53 schema: Schema,
54 config: SegmentBuilderConfig,
55 tokenizers: FxHashMap<Field, BoxedTokenizer>,
56
57 term_interner: Rodeo,
59
60 inverted_index: HashMap<TermKey, PostingListBuilder>,
62
63 store_file: BufWriter<File>,
65 store_path: PathBuf,
66
67 next_doc_id: DocId,
69
70 field_stats: FxHashMap<u32, FieldStats>,
72
73 doc_field_lengths: Vec<u32>,
77 num_indexed_fields: usize,
78 field_to_slot: FxHashMap<u32, usize>,
79
80 local_tf_buffer: FxHashMap<Spur, u32>,
83
84 token_buffer: String,
86
87 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
90
91 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
94
95 position_index: HashMap<TermKey, PositionPostingListBuilder>,
98
99 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
101
102 current_element_ordinal: FxHashMap<u32, u32>,
104
105 estimated_memory: usize,
107}
108
109impl SegmentBuilder {
110 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
112 let segment_id = uuid::Uuid::new_v4();
113 let store_path = config
114 .temp_dir
115 .join(format!("hermes_store_{}.tmp", segment_id));
116
117 let store_file = BufWriter::with_capacity(
118 STORE_BUFFER_SIZE,
119 OpenOptions::new()
120 .create(true)
121 .write(true)
122 .truncate(true)
123 .open(&store_path)?,
124 );
125
126 let mut num_indexed_fields = 0;
129 let mut field_to_slot = FxHashMap::default();
130 let mut position_enabled_fields = FxHashMap::default();
131 for (field, entry) in schema.fields() {
132 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
133 field_to_slot.insert(field.0, num_indexed_fields);
134 num_indexed_fields += 1;
135 if entry.positions.is_some() {
136 position_enabled_fields.insert(field.0, entry.positions);
137 }
138 }
139 }
140
141 Ok(Self {
142 schema,
143 tokenizers: FxHashMap::default(),
144 term_interner: Rodeo::new(),
145 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
146 store_file,
147 store_path,
148 next_doc_id: 0,
149 field_stats: FxHashMap::default(),
150 doc_field_lengths: Vec::new(),
151 num_indexed_fields,
152 field_to_slot,
153 local_tf_buffer: FxHashMap::default(),
154 token_buffer: String::with_capacity(64),
155 config,
156 dense_vectors: FxHashMap::default(),
157 sparse_vectors: FxHashMap::default(),
158 position_index: HashMap::new(),
159 position_enabled_fields,
160 current_element_ordinal: FxHashMap::default(),
161 estimated_memory: 0,
162 })
163 }
164
165 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
166 self.tokenizers.insert(field, tokenizer);
167 }
168
169 pub fn num_docs(&self) -> u32 {
170 self.next_doc_id
171 }
172
173 #[inline]
175 pub fn estimated_memory_bytes(&self) -> usize {
176 self.estimated_memory
177 }
178
179 pub fn sparse_dim_count(&self) -> usize {
181 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
182 }
183
184 pub fn stats(&self) -> SegmentBuilderStats {
186 use std::mem::size_of;
187
188 let postings_in_memory: usize =
189 self.inverted_index.values().map(|p| p.postings.len()).sum();
190
191 let compact_posting_size = size_of::<CompactPosting>();
193 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
195 let posting_builder_size = size_of::<PostingListBuilder>();
196 let spur_size = size_of::<lasso::Spur>();
197 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
198
199 let hashmap_entry_base_overhead = 8usize;
202
203 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
205
206 let postings_bytes: usize = self
208 .inverted_index
209 .values()
210 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
211 .sum();
212
213 let index_overhead_bytes = self.inverted_index.len()
215 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
216
217 let interner_arena_overhead = 2 * size_of::<usize>();
220 let avg_term_len = 8; let interner_bytes =
222 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
223
224 let field_lengths_bytes =
226 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
227
228 let mut dense_vectors_bytes: usize = 0;
230 let mut dense_vector_count: usize = 0;
231 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
232 for b in self.dense_vectors.values() {
233 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
234 + b.doc_ids.capacity() * doc_id_ordinal_size
235 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
237 }
238
239 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
241 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
242
243 let mut sparse_vectors_bytes: usize = 0;
245 for builder in self.sparse_vectors.values() {
246 for postings in builder.postings.values() {
247 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
248 }
249 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
251 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
252 }
253 let outer_sparse_entry_size =
255 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
256 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
257
258 let mut position_index_bytes: usize = 0;
260 for pos_builder in self.position_index.values() {
261 for (_, positions) in &pos_builder.postings {
262 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
263 }
264 let pos_entry_size = size_of::<DocId>() + vec_overhead;
266 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
267 }
268 let pos_index_entry_size =
270 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
271 position_index_bytes += self.position_index.len() * pos_index_entry_size;
272
273 let estimated_memory_bytes = postings_bytes
274 + index_overhead_bytes
275 + interner_bytes
276 + field_lengths_bytes
277 + dense_vectors_bytes
278 + local_tf_buffer_bytes
279 + sparse_vectors_bytes
280 + position_index_bytes;
281
282 let memory_breakdown = MemoryBreakdown {
283 postings_bytes,
284 index_overhead_bytes,
285 interner_bytes,
286 field_lengths_bytes,
287 dense_vectors_bytes,
288 dense_vector_count,
289 sparse_vectors_bytes,
290 position_index_bytes,
291 };
292
293 SegmentBuilderStats {
294 num_docs: self.next_doc_id,
295 unique_terms: self.inverted_index.len(),
296 postings_in_memory,
297 interned_strings: self.term_interner.len(),
298 doc_field_lengths_size: self.doc_field_lengths.len(),
299 estimated_memory_bytes,
300 memory_breakdown,
301 }
302 }
303
304 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
306 let doc_id = self.next_doc_id;
307 self.next_doc_id += 1;
308
309 let base_idx = self.doc_field_lengths.len();
311 self.doc_field_lengths
312 .resize(base_idx + self.num_indexed_fields, 0);
313 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
314
315 self.current_element_ordinal.clear();
317
318 for (field, value) in doc.field_values() {
319 let entry = self.schema.get_field_entry(*field);
320 if entry.is_none() || !entry.unwrap().indexed {
321 continue;
322 }
323
324 let entry = entry.unwrap();
325 match (&entry.field_type, value) {
326 (FieldType::Text, FieldValue::Text(text)) => {
327 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
329 let token_count =
330 self.index_text_field(*field, doc_id, text, element_ordinal)?;
331 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
333
334 let stats = self.field_stats.entry(field.0).or_default();
336 stats.total_tokens += token_count as u64;
337 if element_ordinal == 0 {
339 stats.doc_count += 1;
340 }
341
342 if let Some(&slot) = self.field_to_slot.get(&field.0) {
344 self.doc_field_lengths[base_idx + slot] = token_count;
345 }
346 }
347 (FieldType::U64, FieldValue::U64(v)) => {
348 self.index_numeric_field(*field, doc_id, *v)?;
349 }
350 (FieldType::I64, FieldValue::I64(v)) => {
351 self.index_numeric_field(*field, doc_id, *v as u64)?;
352 }
353 (FieldType::F64, FieldValue::F64(v)) => {
354 self.index_numeric_field(*field, doc_id, v.to_bits())?;
355 }
356 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
357 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
359 self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
360 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
362 }
363 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
364 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
366 self.index_sparse_vector_field(
367 *field,
368 doc_id,
369 element_ordinal as u16,
370 entries,
371 )?;
372 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
374 }
375 _ => {}
376 }
377 }
378
379 self.write_document_to_store(&doc)?;
381
382 Ok(doc_id)
383 }
384
385 fn index_text_field(
396 &mut self,
397 field: Field,
398 doc_id: DocId,
399 text: &str,
400 element_ordinal: u32,
401 ) -> Result<u32> {
402 use crate::dsl::PositionMode;
403
404 let field_id = field.0;
405 let position_mode = self
406 .position_enabled_fields
407 .get(&field_id)
408 .copied()
409 .flatten();
410
411 self.local_tf_buffer.clear();
415
416 let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
418
419 let mut token_position = 0u32;
420
421 for word in text.split_whitespace() {
423 self.token_buffer.clear();
425 for c in word.chars() {
426 if c.is_alphanumeric() {
427 for lc in c.to_lowercase() {
428 self.token_buffer.push(lc);
429 }
430 }
431 }
432
433 if self.token_buffer.is_empty() {
434 continue;
435 }
436
437 let is_new_string = !self.term_interner.contains(&self.token_buffer);
439 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
440 if is_new_string {
441 use std::mem::size_of;
442 self.estimated_memory +=
444 self.token_buffer.len() + size_of::<Spur>() + 2 * size_of::<usize>();
445 }
446 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
447
448 if let Some(mode) = position_mode {
450 let encoded_pos = match mode {
451 PositionMode::Ordinal => element_ordinal << 20,
453 PositionMode::TokenPosition => token_position,
455 PositionMode::Full => (element_ordinal << 20) | token_position,
457 };
458 local_positions
459 .entry(term_spur)
460 .or_default()
461 .push(encoded_pos);
462 }
463
464 token_position += 1;
465 }
466
467 for (&term_spur, &tf) in &self.local_tf_buffer {
470 let term_key = TermKey {
471 field: field_id,
472 term: term_spur,
473 };
474
475 let is_new_term = !self.inverted_index.contains_key(&term_key);
476 let posting = self
477 .inverted_index
478 .entry(term_key)
479 .or_insert_with(PostingListBuilder::new);
480 posting.add(doc_id, tf);
481
482 use std::mem::size_of;
484 self.estimated_memory += size_of::<CompactPosting>();
485 if is_new_term {
486 self.estimated_memory +=
488 size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
489 }
490
491 if position_mode.is_some()
493 && let Some(positions) = local_positions.get(&term_spur)
494 {
495 let is_new_pos_term = !self.position_index.contains_key(&term_key);
496 let pos_posting = self
497 .position_index
498 .entry(term_key)
499 .or_insert_with(PositionPostingListBuilder::new);
500 for &pos in positions {
501 pos_posting.add_position(doc_id, pos);
502 }
503 self.estimated_memory += positions.len() * size_of::<u32>();
505 if is_new_pos_term {
506 self.estimated_memory +=
507 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
508 }
509 }
510 }
511
512 Ok(token_position)
513 }
514
515 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
516 use std::mem::size_of;
517
518 let term_str = format!("__num_{}", value);
520 let is_new_string = !self.term_interner.contains(&term_str);
521 let term_spur = self.term_interner.get_or_intern(&term_str);
522
523 let term_key = TermKey {
524 field: field.0,
525 term: term_spur,
526 };
527
528 let is_new_term = !self.inverted_index.contains_key(&term_key);
529 let posting = self
530 .inverted_index
531 .entry(term_key)
532 .or_insert_with(PostingListBuilder::new);
533 posting.add(doc_id, 1);
534
535 self.estimated_memory += size_of::<CompactPosting>();
537 if is_new_term {
538 self.estimated_memory += size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
539 }
540 if is_new_string {
541 self.estimated_memory += term_str.len() + size_of::<Spur>() + 2 * size_of::<usize>();
542 }
543
544 Ok(())
545 }
546
547 fn index_dense_vector_field(
549 &mut self,
550 field: Field,
551 doc_id: DocId,
552 ordinal: u16,
553 vector: &[f32],
554 ) -> Result<()> {
555 let dim = vector.len();
556
557 let builder = self
558 .dense_vectors
559 .entry(field.0)
560 .or_insert_with(|| DenseVectorBuilder::new(dim));
561
562 if builder.dim != dim && builder.len() > 0 {
564 return Err(crate::Error::Schema(format!(
565 "Dense vector dimension mismatch: expected {}, got {}",
566 builder.dim, dim
567 )));
568 }
569
570 builder.add(doc_id, ordinal, vector);
571
572 use std::mem::{size_of, size_of_val};
574 self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
575
576 Ok(())
577 }
578
579 fn index_sparse_vector_field(
586 &mut self,
587 field: Field,
588 doc_id: DocId,
589 ordinal: u16,
590 entries: &[(u32, f32)],
591 ) -> Result<()> {
592 let weight_threshold = self
594 .schema
595 .get_field_entry(field)
596 .and_then(|entry| entry.sparse_vector_config.as_ref())
597 .map(|config| config.weight_threshold)
598 .unwrap_or(0.0);
599
600 let builder = self
601 .sparse_vectors
602 .entry(field.0)
603 .or_insert_with(SparseVectorBuilder::new);
604
605 for &(dim_id, weight) in entries {
606 if weight.abs() < weight_threshold {
608 continue;
609 }
610
611 use std::mem::size_of;
613 let is_new_dim = !builder.postings.contains_key(&dim_id);
614 builder.add(dim_id, doc_id, ordinal, weight);
615 self.estimated_memory += size_of::<(DocId, u16, f32)>();
616 if is_new_dim {
617 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
620 }
621
622 Ok(())
623 }
624
625 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
627 use byteorder::{LittleEndian, WriteBytesExt};
628
629 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
630
631 self.store_file
632 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
633 self.store_file.write_all(&doc_bytes)?;
634
635 Ok(())
636 }
637
638 pub async fn build<D: Directory + DirectoryWriter>(
643 mut self,
644 dir: &D,
645 segment_id: SegmentId,
646 ) -> Result<SegmentMeta> {
647 self.store_file.flush()?;
649
650 let files = SegmentFiles::new(segment_id.0);
651
652 let position_index = std::mem::take(&mut self.position_index);
654 let (positions_data, position_offsets) =
655 Self::build_positions_owned(position_index, &self.term_interner)?;
656 let inverted_index = std::mem::take(&mut self.inverted_index);
661 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
662 let store_path = self.store_path.clone();
663 let schema_clone = self.schema.clone();
664 let num_compression_threads = self.config.num_compression_threads;
665 let compression_level = self.config.compression_level;
666
667 let (postings_result, store_result) = rayon::join(
668 || Self::build_postings_owned(inverted_index, term_interner, &position_offsets),
669 || {
670 Self::build_store_batched(
671 &store_path,
672 &schema_clone,
673 num_compression_threads,
674 compression_level,
675 )
676 },
677 );
678 let (term_dict_data, postings_data) = postings_result?;
681 let store_data = store_result?;
682
683 dir.write(&files.term_dict, &term_dict_data).await?;
685 drop(term_dict_data);
686 dir.write(&files.postings, &postings_data).await?;
687 drop(postings_data);
688 dir.write(&files.store, &store_data).await?;
689 drop(store_data);
690
691 if !positions_data.is_empty() {
692 dir.write(&files.positions, &positions_data).await?;
693 }
694 drop(positions_data);
695 drop(position_offsets);
696
697 let dense_vectors = std::mem::take(&mut self.dense_vectors);
699 if !dense_vectors.is_empty() {
700 let vectors_data = Self::build_vectors_file_binary(dense_vectors, &self.schema)?;
701 if !vectors_data.is_empty() {
702 dir.write(&files.vectors, &vectors_data).await?;
703 }
704 }
705 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
709 if !sparse_vectors.is_empty() {
710 let sparse_data = Self::build_sparse_file_inplace(&mut sparse_vectors, &self.schema)?;
711 drop(sparse_vectors);
712 if !sparse_data.is_empty() {
713 dir.write(&files.sparse, &sparse_data).await?;
714 }
715 }
716
717 let meta = SegmentMeta {
718 id: segment_id.0,
719 num_docs: self.next_doc_id,
720 field_stats: self.field_stats.clone(),
721 };
722
723 dir.write(&files.meta, &meta.serialize()?).await?;
724
725 let _ = std::fs::remove_file(&self.store_path);
727
728 Ok(meta)
729 }
730
731 fn build_vectors_file_binary(
736 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
737 schema: &Schema,
738 ) -> Result<Vec<u8>> {
739 use byteorder::{LittleEndian, WriteBytesExt};
740
741 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
742
743 for (field_id, builder) in dense_vectors {
744 if builder.len() == 0 {
745 continue;
746 }
747
748 let field = crate::dsl::Field(field_id);
749 let dense_config = schema
750 .get_field_entry(field)
751 .and_then(|e| e.dense_vector_config.as_ref());
752
753 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
754
755 let index_bytes = FlatVectorData::serialize_binary_from_flat(
757 index_dim,
758 &builder.vectors,
759 builder.dim,
760 &builder.doc_ids,
761 );
762
763 field_indexes.push((field_id, 4u8, index_bytes)); }
766
767 if field_indexes.is_empty() {
768 return Ok(Vec::new());
769 }
770
771 field_indexes.sort_by_key(|(id, _, _)| *id);
772 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
773
774 let mut output = Vec::new();
775 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
776
777 let mut current_offset = header_size as u64;
778 for (field_id, index_type, data) in &field_indexes {
779 output.write_u32::<LittleEndian>(*field_id)?;
780 output.write_u8(*index_type)?;
781 output.write_u64::<LittleEndian>(current_offset)?;
782 output.write_u64::<LittleEndian>(data.len() as u64)?;
783 current_offset += data.len() as u64;
784 }
785
786 for (_, _, data) in field_indexes {
787 output.extend_from_slice(&data);
788 }
789
790 Ok(output)
791 }
792
793 fn build_sparse_file_inplace(
798 sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
799 schema: &Schema,
800 ) -> Result<Vec<u8>> {
801 use crate::structures::{BlockSparsePostingList, WeightQuantization};
802 use byteorder::{LittleEndian, WriteBytesExt};
803
804 if sparse_vectors.is_empty() {
805 return Ok(Vec::new());
806 }
807
808 type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
809 let mut field_data: Vec<SparseFieldData> = Vec::new();
810
811 for (&field_id, builder) in sparse_vectors.iter_mut() {
812 if builder.is_empty() {
813 continue;
814 }
815
816 let field = crate::dsl::Field(field_id);
817 let sparse_config = schema
818 .get_field_entry(field)
819 .and_then(|e| e.sparse_vector_config.as_ref());
820
821 let quantization = sparse_config
822 .map(|c| c.weight_quantization)
823 .unwrap_or(WeightQuantization::Float32);
824
825 let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
826 let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
827
828 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
829
830 for (&dim_id, postings) in builder.postings.iter_mut() {
831 postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
833
834 if let Some(fraction) = pruning_fraction
836 && postings.len() > 1
837 && fraction < 1.0
838 {
839 let original_len = postings.len();
840 postings.sort_by(|a, b| {
841 b.2.abs()
842 .partial_cmp(&a.2.abs())
843 .unwrap_or(std::cmp::Ordering::Equal)
844 });
845 let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
846 postings.truncate(keep);
847 postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
848 }
849
850 let block_list = BlockSparsePostingList::from_postings_with_block_size(
851 postings,
852 quantization,
853 block_size,
854 )
855 .map_err(crate::Error::Io)?;
856
857 let mut bytes = Vec::new();
858 block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
859 dim_bytes.insert(dim_id, bytes);
860 }
861
862 field_data.push((field_id, quantization, dim_bytes.len() as u32, dim_bytes));
863 }
864
865 if field_data.is_empty() {
866 return Ok(Vec::new());
867 }
868
869 field_data.sort_by_key(|(id, _, _, _)| *id);
870
871 let mut header_size = 4u64;
873 for (_, _, num_dims, _) in &field_data {
874 header_size += 4 + 1 + 4;
875 header_size += (*num_dims as u64) * 16;
876 }
877
878 let mut output = Vec::new();
879 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
880
881 let mut current_offset = header_size;
882 let mut all_data: Vec<u8> = Vec::new();
883 let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
884
885 for (_, _, _, dim_bytes) in &field_data {
886 let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
887 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
888 dims.sort();
889
890 for dim_id in dims {
891 let bytes = &dim_bytes[&dim_id];
892 table.push((dim_id, current_offset, bytes.len() as u32));
893 current_offset += bytes.len() as u64;
894 all_data.extend_from_slice(bytes);
895 }
896 field_tables.push(table);
897 }
898
899 for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
900 output.write_u32::<LittleEndian>(*field_id)?;
901 output.write_u8(*quantization as u8)?;
902 output.write_u32::<LittleEndian>(*num_dims)?;
903
904 for &(dim_id, offset, length) in &field_tables[i] {
905 output.write_u32::<LittleEndian>(dim_id)?;
906 output.write_u64::<LittleEndian>(offset)?;
907 output.write_u32::<LittleEndian>(length)?;
908 }
909 }
910
911 output.extend_from_slice(&all_data);
912 Ok(output)
913 }
914
915 #[allow(clippy::type_complexity)]
917 fn build_positions_owned(
918 position_index: HashMap<TermKey, PositionPostingListBuilder>,
919 term_interner: &Rodeo,
920 ) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
921 use crate::structures::PositionPostingList;
922
923 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
924
925 if position_index.is_empty() {
926 return Ok((Vec::new(), position_offsets));
927 }
928
929 let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
931 .into_iter()
932 .map(|(term_key, pos_builder)| {
933 let term_str = term_interner.resolve(&term_key.term);
934 let mut key = Vec::with_capacity(4 + term_str.len());
935 key.extend_from_slice(&term_key.field.to_le_bytes());
936 key.extend_from_slice(term_str.as_bytes());
937 (key, pos_builder)
938 })
939 .collect();
940
941 entries.sort_by(|a, b| a.0.cmp(&b.0));
942
943 let mut output = Vec::new();
944
945 for (key, pos_builder) in entries {
946 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
947 for (doc_id, positions) in pos_builder.postings {
948 pos_list.push(doc_id, positions);
950 }
951
952 let offset = output.len() as u64;
953 pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
954 let len = (output.len() as u64 - offset) as u32;
955
956 position_offsets.insert(key, (offset, len));
957 }
958
959 Ok((output, position_offsets))
960 }
961
962 fn build_postings_owned(
967 inverted_index: HashMap<TermKey, PostingListBuilder>,
968 term_interner: Rodeo,
969 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
970 ) -> Result<(Vec<u8>, Vec<u8>)> {
971 let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
973 .into_iter()
974 .map(|(term_key, posting_list)| {
975 let term_str = term_interner.resolve(&term_key.term);
976 let mut key = Vec::with_capacity(4 + term_str.len());
977 key.extend_from_slice(&term_key.field.to_le_bytes());
978 key.extend_from_slice(term_str.as_bytes());
979 (key, posting_list)
980 })
981 .collect();
982
983 drop(term_interner);
985
986 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
987
988 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
991 .into_par_iter()
992 .map(|(key, posting_builder)| {
993 let mut full_postings = PostingList::with_capacity(posting_builder.len());
994 for p in &posting_builder.postings {
995 full_postings.push(p.doc_id, p.term_freq as u32);
996 }
997 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1000 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1001
1002 let has_positions = position_offsets.contains_key(&key);
1003 let result = if !has_positions
1004 && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1005 {
1006 SerializedPosting::Inline(inline)
1007 } else {
1008 let mut posting_bytes = Vec::new();
1009 let block_list =
1010 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1011 block_list.serialize(&mut posting_bytes)?;
1012 SerializedPosting::External {
1013 bytes: posting_bytes,
1014 doc_count: full_postings.doc_count(),
1015 }
1016 };
1017
1018 Ok((key, result))
1019 })
1020 .collect::<Result<Vec<_>>>()?;
1021
1022 let mut term_dict = Vec::new();
1024 let mut postings = Vec::new();
1025 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1026
1027 for (key, serialized_posting) in serialized {
1028 let term_info = match serialized_posting {
1029 SerializedPosting::Inline(info) => info,
1030 SerializedPosting::External { bytes, doc_count } => {
1031 let posting_offset = postings.len() as u64;
1032 let posting_len = bytes.len() as u32;
1033 postings.extend_from_slice(&bytes);
1034
1035 if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1036 TermInfo::external_with_positions(
1037 posting_offset,
1038 posting_len,
1039 doc_count,
1040 pos_offset,
1041 pos_len,
1042 )
1043 } else {
1044 TermInfo::external(posting_offset, posting_len, doc_count)
1045 }
1046 }
1047 };
1048
1049 writer.insert(&key, &term_info)?;
1050 }
1051
1052 writer.finish()?;
1053 Ok((term_dict, postings))
1054 }
1055
1056 fn build_store_batched(
1061 store_path: &PathBuf,
1062 schema: &Schema,
1063 num_compression_threads: usize,
1064 compression_level: CompressionLevel,
1065 ) -> Result<Vec<u8>> {
1066 use super::store::EagerParallelStoreWriter;
1067
1068 let file = File::open(store_path)?;
1069 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1070
1071 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1073 let mut offset = 0usize;
1074 while offset + 4 <= mmap.len() {
1075 let doc_len = u32::from_le_bytes([
1076 mmap[offset],
1077 mmap[offset + 1],
1078 mmap[offset + 2],
1079 mmap[offset + 3],
1080 ]) as usize;
1081 offset += 4;
1082
1083 if offset + doc_len > mmap.len() {
1084 break;
1085 }
1086
1087 doc_ranges.push((offset, doc_len));
1088 offset += doc_len;
1089 }
1090
1091 const BATCH_SIZE: usize = 10_000;
1095 let mut store_data = Vec::new();
1096 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1097 &mut store_data,
1098 num_compression_threads,
1099 compression_level,
1100 );
1101
1102 for batch in doc_ranges.chunks(BATCH_SIZE) {
1103 let batch_docs: Vec<Document> = batch
1104 .par_iter()
1105 .filter_map(|&(start, len)| {
1106 let doc_bytes = &mmap[start..start + len];
1107 super::store::deserialize_document(doc_bytes, schema).ok()
1108 })
1109 .collect();
1110
1111 for doc in &batch_docs {
1112 store_writer.store(doc, schema)?;
1113 }
1114 }
1116
1117 store_writer.finish()?;
1118 Ok(store_data)
1119 }
1120}
1121
1122impl Drop for SegmentBuilder {
1123 fn drop(&mut self) {
1124 let _ = std::fs::remove_file(&self.store_path);
1126 }
1127}