1mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::path::PathBuf;
20
21use hashbrown::HashMap;
22use lasso::{Rodeo, Spur};
23use rayon::prelude::*;
24use rustc_hash::FxHashMap;
25
26use crate::compression::CompressionLevel;
27
28use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
29use crate::directories::{Directory, DirectoryWriter};
30use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
31use crate::structures::{PostingList, SSTableWriter, TermInfo};
32use crate::tokenizer::BoxedTokenizer;
33use crate::{DocId, Result};
34
35use posting::{
36 CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
37};
38use vectors::{DenseVectorBuilder, SparseVectorBuilder};
39
40pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
42
43const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; pub struct SegmentBuilder {
53 schema: Schema,
54 config: SegmentBuilderConfig,
55 tokenizers: FxHashMap<Field, BoxedTokenizer>,
56
57 term_interner: Rodeo,
59
60 inverted_index: HashMap<TermKey, PostingListBuilder>,
62
63 store_file: BufWriter<File>,
65 store_path: PathBuf,
66
67 next_doc_id: DocId,
69
70 field_stats: FxHashMap<u32, FieldStats>,
72
73 doc_field_lengths: Vec<u32>,
77 num_indexed_fields: usize,
78 field_to_slot: FxHashMap<u32, usize>,
79
80 local_tf_buffer: FxHashMap<Spur, u32>,
83
84 token_buffer: String,
86
87 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
90
91 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
94
95 position_index: HashMap<TermKey, PositionPostingListBuilder>,
98
99 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
101
102 current_element_ordinal: FxHashMap<u32, u32>,
104
105 estimated_memory: usize,
107}
108
109impl SegmentBuilder {
110 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
112 let segment_id = uuid::Uuid::new_v4();
113 let store_path = config
114 .temp_dir
115 .join(format!("hermes_store_{}.tmp", segment_id));
116
117 let store_file = BufWriter::with_capacity(
118 STORE_BUFFER_SIZE,
119 OpenOptions::new()
120 .create(true)
121 .write(true)
122 .truncate(true)
123 .open(&store_path)?,
124 );
125
126 let mut num_indexed_fields = 0;
129 let mut field_to_slot = FxHashMap::default();
130 let mut position_enabled_fields = FxHashMap::default();
131 for (field, entry) in schema.fields() {
132 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
133 field_to_slot.insert(field.0, num_indexed_fields);
134 num_indexed_fields += 1;
135 if entry.positions.is_some() {
136 position_enabled_fields.insert(field.0, entry.positions);
137 }
138 }
139 }
140
141 Ok(Self {
142 schema,
143 tokenizers: FxHashMap::default(),
144 term_interner: Rodeo::new(),
145 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
146 store_file,
147 store_path,
148 next_doc_id: 0,
149 field_stats: FxHashMap::default(),
150 doc_field_lengths: Vec::new(),
151 num_indexed_fields,
152 field_to_slot,
153 local_tf_buffer: FxHashMap::default(),
154 token_buffer: String::with_capacity(64),
155 config,
156 dense_vectors: FxHashMap::default(),
157 sparse_vectors: FxHashMap::default(),
158 position_index: HashMap::new(),
159 position_enabled_fields,
160 current_element_ordinal: FxHashMap::default(),
161 estimated_memory: 0,
162 })
163 }
164
165 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
166 self.tokenizers.insert(field, tokenizer);
167 }
168
169 pub fn num_docs(&self) -> u32 {
170 self.next_doc_id
171 }
172
173 #[inline]
175 pub fn estimated_memory_bytes(&self) -> usize {
176 self.estimated_memory
177 }
178
179 pub fn stats(&self) -> SegmentBuilderStats {
181 use std::mem::size_of;
182
183 let postings_in_memory: usize =
184 self.inverted_index.values().map(|p| p.postings.len()).sum();
185
186 let compact_posting_size = size_of::<CompactPosting>();
188 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
190 let posting_builder_size = size_of::<PostingListBuilder>();
191 let spur_size = size_of::<lasso::Spur>();
192 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
193
194 let hashmap_entry_base_overhead = 8usize;
197
198 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
200
201 let postings_bytes: usize = self
203 .inverted_index
204 .values()
205 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
206 .sum();
207
208 let index_overhead_bytes = self.inverted_index.len()
210 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
211
212 let interner_arena_overhead = 2 * size_of::<usize>();
215 let avg_term_len = 8; let interner_bytes =
217 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
218
219 let field_lengths_bytes =
221 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
222
223 let mut dense_vectors_bytes: usize = 0;
225 let mut dense_vector_count: usize = 0;
226 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
227 for b in self.dense_vectors.values() {
228 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
229 + b.doc_ids.capacity() * doc_id_ordinal_size
230 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
232 }
233
234 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
236 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
237
238 let mut sparse_vectors_bytes: usize = 0;
240 for builder in self.sparse_vectors.values() {
241 for postings in builder.postings.values() {
242 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
243 }
244 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
246 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
247 }
248 let outer_sparse_entry_size =
250 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
251 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
252
253 let mut position_index_bytes: usize = 0;
255 for pos_builder in self.position_index.values() {
256 for (_, positions) in &pos_builder.postings {
257 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
258 }
259 let pos_entry_size = size_of::<DocId>() + vec_overhead;
261 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
262 }
263 let pos_index_entry_size =
265 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
266 position_index_bytes += self.position_index.len() * pos_index_entry_size;
267
268 let estimated_memory_bytes = postings_bytes
269 + index_overhead_bytes
270 + interner_bytes
271 + field_lengths_bytes
272 + dense_vectors_bytes
273 + local_tf_buffer_bytes
274 + sparse_vectors_bytes
275 + position_index_bytes;
276
277 let memory_breakdown = MemoryBreakdown {
278 postings_bytes,
279 index_overhead_bytes,
280 interner_bytes,
281 field_lengths_bytes,
282 dense_vectors_bytes,
283 dense_vector_count,
284 sparse_vectors_bytes,
285 position_index_bytes,
286 };
287
288 SegmentBuilderStats {
289 num_docs: self.next_doc_id,
290 unique_terms: self.inverted_index.len(),
291 postings_in_memory,
292 interned_strings: self.term_interner.len(),
293 doc_field_lengths_size: self.doc_field_lengths.len(),
294 estimated_memory_bytes,
295 memory_breakdown,
296 }
297 }
298
299 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
301 let doc_id = self.next_doc_id;
302 self.next_doc_id += 1;
303
304 let base_idx = self.doc_field_lengths.len();
306 self.doc_field_lengths
307 .resize(base_idx + self.num_indexed_fields, 0);
308
309 self.current_element_ordinal.clear();
311
312 for (field, value) in doc.field_values() {
313 let entry = self.schema.get_field_entry(*field);
314 if entry.is_none() || !entry.unwrap().indexed {
315 continue;
316 }
317
318 let entry = entry.unwrap();
319 match (&entry.field_type, value) {
320 (FieldType::Text, FieldValue::Text(text)) => {
321 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
323 let token_count =
324 self.index_text_field(*field, doc_id, text, element_ordinal)?;
325 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
327
328 let stats = self.field_stats.entry(field.0).or_default();
330 stats.total_tokens += token_count as u64;
331 stats.doc_count += 1;
332
333 if let Some(&slot) = self.field_to_slot.get(&field.0) {
335 self.doc_field_lengths[base_idx + slot] = token_count;
336 }
337 }
338 (FieldType::U64, FieldValue::U64(v)) => {
339 self.index_numeric_field(*field, doc_id, *v)?;
340 }
341 (FieldType::I64, FieldValue::I64(v)) => {
342 self.index_numeric_field(*field, doc_id, *v as u64)?;
343 }
344 (FieldType::F64, FieldValue::F64(v)) => {
345 self.index_numeric_field(*field, doc_id, v.to_bits())?;
346 }
347 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
348 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
350 self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
351 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
353 }
354 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
355 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
357 self.index_sparse_vector_field(
358 *field,
359 doc_id,
360 element_ordinal as u16,
361 entries,
362 )?;
363 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
365 }
366 _ => {}
367 }
368 }
369
370 self.write_document_to_store(&doc)?;
372
373 Ok(doc_id)
374 }
375
376 fn index_text_field(
387 &mut self,
388 field: Field,
389 doc_id: DocId,
390 text: &str,
391 element_ordinal: u32,
392 ) -> Result<u32> {
393 use crate::dsl::PositionMode;
394
395 let field_id = field.0;
396 let position_mode = self
397 .position_enabled_fields
398 .get(&field_id)
399 .copied()
400 .flatten();
401
402 self.local_tf_buffer.clear();
406
407 let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
409
410 let mut token_position = 0u32;
411
412 for word in text.split_whitespace() {
414 self.token_buffer.clear();
416 for c in word.chars() {
417 if c.is_alphanumeric() {
418 for lc in c.to_lowercase() {
419 self.token_buffer.push(lc);
420 }
421 }
422 }
423
424 if self.token_buffer.is_empty() {
425 continue;
426 }
427
428 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
430 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
431
432 if let Some(mode) = position_mode {
434 let encoded_pos = match mode {
435 PositionMode::Ordinal => element_ordinal << 20,
437 PositionMode::TokenPosition => token_position,
439 PositionMode::Full => (element_ordinal << 20) | token_position,
441 };
442 local_positions
443 .entry(term_spur)
444 .or_default()
445 .push(encoded_pos);
446 }
447
448 token_position += 1;
449 }
450
451 for (&term_spur, &tf) in &self.local_tf_buffer {
454 let term_key = TermKey {
455 field: field_id,
456 term: term_spur,
457 };
458
459 let is_new_term = !self.inverted_index.contains_key(&term_key);
460 let posting = self
461 .inverted_index
462 .entry(term_key)
463 .or_insert_with(PostingListBuilder::new);
464 posting.add(doc_id, tf);
465
466 use std::mem::size_of;
468 self.estimated_memory += size_of::<CompactPosting>();
469 if is_new_term {
470 self.estimated_memory +=
472 size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
473 }
474
475 if position_mode.is_some()
477 && let Some(positions) = local_positions.get(&term_spur)
478 {
479 let pos_posting = self
480 .position_index
481 .entry(term_key)
482 .or_insert_with(PositionPostingListBuilder::new);
483 for &pos in positions {
484 pos_posting.add_position(doc_id, pos);
485 }
486 }
487 }
488
489 Ok(token_position)
490 }
491
492 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
493 let term_str = format!("__num_{}", value);
495 let term_spur = self.term_interner.get_or_intern(&term_str);
496
497 let term_key = TermKey {
498 field: field.0,
499 term: term_spur,
500 };
501
502 let posting = self
503 .inverted_index
504 .entry(term_key)
505 .or_insert_with(PostingListBuilder::new);
506 posting.add(doc_id, 1);
507
508 Ok(())
509 }
510
511 fn index_dense_vector_field(
513 &mut self,
514 field: Field,
515 doc_id: DocId,
516 ordinal: u16,
517 vector: &[f32],
518 ) -> Result<()> {
519 let dim = vector.len();
520
521 let builder = self
522 .dense_vectors
523 .entry(field.0)
524 .or_insert_with(|| DenseVectorBuilder::new(dim));
525
526 if builder.dim != dim && builder.len() > 0 {
528 return Err(crate::Error::Schema(format!(
529 "Dense vector dimension mismatch: expected {}, got {}",
530 builder.dim, dim
531 )));
532 }
533
534 builder.add(doc_id, ordinal, vector);
535
536 use std::mem::{size_of, size_of_val};
538 self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
539
540 Ok(())
541 }
542
543 fn index_sparse_vector_field(
550 &mut self,
551 field: Field,
552 doc_id: DocId,
553 ordinal: u16,
554 entries: &[(u32, f32)],
555 ) -> Result<()> {
556 let weight_threshold = self
558 .schema
559 .get_field_entry(field)
560 .and_then(|entry| entry.sparse_vector_config.as_ref())
561 .map(|config| config.weight_threshold)
562 .unwrap_or(0.0);
563
564 let builder = self
565 .sparse_vectors
566 .entry(field.0)
567 .or_insert_with(SparseVectorBuilder::new);
568
569 for &(dim_id, weight) in entries {
570 if weight.abs() < weight_threshold {
572 continue;
573 }
574
575 use std::mem::size_of;
577 let is_new_dim = !builder.postings.contains_key(&dim_id);
578 builder.add(dim_id, doc_id, ordinal, weight);
579 self.estimated_memory += size_of::<(DocId, u16, f32)>();
580 if is_new_dim {
581 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
584 }
585
586 Ok(())
587 }
588
589 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
591 use byteorder::{LittleEndian, WriteBytesExt};
592
593 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
594
595 self.store_file
596 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
597 self.store_file.write_all(&doc_bytes)?;
598
599 Ok(())
600 }
601
602 pub async fn build<D: Directory + DirectoryWriter>(
604 mut self,
605 dir: &D,
606 segment_id: SegmentId,
607 ) -> Result<SegmentMeta> {
608 self.store_file.flush()?;
610
611 let files = SegmentFiles::new(segment_id.0);
612
613 let (positions_data, position_offsets) = self.build_positions_file()?;
615
616 let store_path = self.store_path.clone();
618 let schema = self.schema.clone();
619 let num_compression_threads = self.config.num_compression_threads;
620 let compression_level = self.config.compression_level;
621
622 let (postings_result, store_result) = rayon::join(
624 || self.build_postings(&position_offsets),
625 || {
626 Self::build_store_parallel(
627 &store_path,
628 &schema,
629 num_compression_threads,
630 compression_level,
631 )
632 },
633 );
634
635 let (term_dict_data, postings_data) = postings_result?;
636 let store_data = store_result?;
637
638 dir.write(&files.term_dict, &term_dict_data).await?;
640 dir.write(&files.postings, &postings_data).await?;
641 dir.write(&files.store, &store_data).await?;
642
643 if !positions_data.is_empty() {
645 dir.write(&files.positions, &positions_data).await?;
646 }
647
648 if !self.dense_vectors.is_empty() {
650 let vectors_data = self.build_vectors_file()?;
651 if !vectors_data.is_empty() {
652 dir.write(&files.vectors, &vectors_data).await?;
653 }
654 }
655
656 if !self.sparse_vectors.is_empty() {
658 let sparse_data = self.build_sparse_file()?;
659 if !sparse_data.is_empty() {
660 dir.write(&files.sparse, &sparse_data).await?;
661 }
662 }
663
664 let meta = SegmentMeta {
665 id: segment_id.0,
666 num_docs: self.next_doc_id,
667 field_stats: self.field_stats.clone(),
668 };
669
670 dir.write(&files.meta, &meta.serialize()?).await?;
671
672 let _ = std::fs::remove_file(&self.store_path);
674
675 Ok(meta)
676 }
677
678 fn build_vectors_file(&self) -> Result<Vec<u8>> {
685 use byteorder::{LittleEndian, WriteBytesExt};
686
687 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
689
690 for (&field_id, builder) in &self.dense_vectors {
691 if builder.len() == 0 {
692 continue;
693 }
694
695 let field = crate::dsl::Field(field_id);
696
697 let dense_config = self
699 .schema
700 .get_field_entry(field)
701 .and_then(|e| e.dense_vector_config.as_ref());
702
703 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
705 let vectors = if index_dim < builder.dim {
706 builder.get_vectors_trimmed(index_dim)
708 } else {
709 builder.get_vectors()
710 };
711
712 let flat_data = FlatVectorData {
716 dim: index_dim,
717 vectors: vectors.clone(),
718 doc_ids: builder.doc_ids.clone(),
719 };
720 let index_bytes = serde_json::to_vec(&flat_data)
721 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
722 let index_type = 3u8; field_indexes.push((field_id, index_type, index_bytes));
725 }
726
727 if field_indexes.is_empty() {
728 return Ok(Vec::new());
729 }
730
731 field_indexes.sort_by_key(|(id, _, _)| *id);
733
734 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
736
737 let mut output = Vec::new();
739
740 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
742
743 let mut current_offset = header_size as u64;
745 for (field_id, index_type, data) in &field_indexes {
746 output.write_u32::<LittleEndian>(*field_id)?;
747 output.write_u8(*index_type)?;
748 output.write_u64::<LittleEndian>(current_offset)?;
749 output.write_u64::<LittleEndian>(data.len() as u64)?;
750 current_offset += data.len() as u64;
751 }
752
753 for (_, _, data) in field_indexes {
755 output.extend_from_slice(&data);
756 }
757
758 Ok(output)
759 }
760
761 fn build_sparse_file(&self) -> Result<Vec<u8>> {
773 use crate::structures::{BlockSparsePostingList, WeightQuantization};
774 use byteorder::{LittleEndian, WriteBytesExt};
775
776 if self.sparse_vectors.is_empty() {
777 return Ok(Vec::new());
778 }
779
780 type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
782 let mut field_data: Vec<SparseFieldData> = Vec::new();
783
784 for (&field_id, builder) in &self.sparse_vectors {
785 if builder.is_empty() {
786 continue;
787 }
788
789 let field = crate::dsl::Field(field_id);
790
791 let sparse_config = self
793 .schema
794 .get_field_entry(field)
795 .and_then(|e| e.sparse_vector_config.as_ref());
796
797 let quantization = sparse_config
798 .map(|c| c.weight_quantization)
799 .unwrap_or(WeightQuantization::Float32);
800
801 let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
802
803 let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
805
806 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
808
809 for (&dim_id, postings) in &builder.postings {
810 let mut sorted_postings = postings.clone();
812 sorted_postings.sort_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
813
814 let block_list = BlockSparsePostingList::from_postings_with_block_size(
816 &sorted_postings,
817 quantization,
818 block_size,
819 )
820 .map_err(crate::Error::Io)?;
821
822 let mut bytes = Vec::new();
824 block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
825
826 dim_bytes.insert(dim_id, bytes);
827 }
828
829 field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
830 }
831
832 if field_data.is_empty() {
833 return Ok(Vec::new());
834 }
835
836 field_data.sort_by_key(|(id, _, _, _)| *id);
838
839 let mut header_size = 4u64;
843 for (_, _, max_dim_id, _) in &field_data {
844 header_size += 4 + 1 + 4; header_size += (*max_dim_id as u64) * 12; }
847
848 let mut output = Vec::new();
850
851 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
853
854 let mut current_offset = header_size;
856
857 let mut all_data: Vec<u8> = Vec::new();
859 let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
860
861 for (_, _, max_dim_id, dim_bytes) in &field_data {
862 let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
863
864 for dim_id in 0..*max_dim_id {
866 if let Some(bytes) = dim_bytes.get(&dim_id) {
867 table[dim_id as usize] = (current_offset, bytes.len() as u32);
868 current_offset += bytes.len() as u64;
869 all_data.extend_from_slice(bytes);
870 }
871 }
873
874 field_tables.push(table);
875 }
876
877 for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
879 output.write_u32::<LittleEndian>(*field_id)?;
880 output.write_u8(*quantization as u8)?;
881 output.write_u32::<LittleEndian>(*max_dim_id)?;
882
883 for &(offset, length) in &field_tables[i] {
885 output.write_u64::<LittleEndian>(offset)?;
886 output.write_u32::<LittleEndian>(length)?;
887 }
888 }
889
890 output.extend_from_slice(&all_data);
892
893 Ok(output)
894 }
895
896 #[allow(clippy::type_complexity)]
904 fn build_positions_file(&self) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
905 use crate::structures::PositionPostingList;
906
907 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
908
909 if self.position_index.is_empty() {
910 return Ok((Vec::new(), position_offsets));
911 }
912
913 let mut entries: Vec<(Vec<u8>, &PositionPostingListBuilder)> = self
915 .position_index
916 .iter()
917 .map(|(term_key, pos_list)| {
918 let term_str = self.term_interner.resolve(&term_key.term);
919 let mut key = Vec::with_capacity(4 + term_str.len());
920 key.extend_from_slice(&term_key.field.to_le_bytes());
921 key.extend_from_slice(term_str.as_bytes());
922 (key, pos_list)
923 })
924 .collect();
925
926 entries.sort_by(|a, b| a.0.cmp(&b.0));
927
928 let mut output = Vec::new();
930
931 for (key, pos_builder) in entries {
932 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
934 for (doc_id, positions) in &pos_builder.postings {
935 pos_list.push(*doc_id, positions.clone());
936 }
937
938 let offset = output.len() as u64;
940 pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
941 let len = (output.len() as u64 - offset) as u32;
942
943 position_offsets.insert(key, (offset, len));
944 }
945
946 Ok((output, position_offsets))
947 }
948
949 fn build_postings(
954 &mut self,
955 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
956 ) -> Result<(Vec<u8>, Vec<u8>)> {
957 let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
960 .inverted_index
961 .iter()
962 .map(|(term_key, posting_list)| {
963 let term_str = self.term_interner.resolve(&term_key.term);
964 let mut key = Vec::with_capacity(4 + term_str.len());
965 key.extend_from_slice(&term_key.field.to_le_bytes());
966 key.extend_from_slice(term_str.as_bytes());
967 (key, posting_list)
968 })
969 .collect();
970
971 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
973
974 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
977 .into_par_iter()
978 .map(|(key, posting_builder)| {
979 let mut full_postings = PostingList::with_capacity(posting_builder.len());
981 for p in &posting_builder.postings {
982 full_postings.push(p.doc_id, p.term_freq as u32);
983 }
984
985 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
987 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
988
989 let has_positions = position_offsets.contains_key(&key);
991 let result = if !has_positions
992 && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
993 {
994 SerializedPosting::Inline(inline)
995 } else {
996 let mut posting_bytes = Vec::new();
998 let block_list =
999 crate::structures::BlockPostingList::from_posting_list(&full_postings)
1000 .expect("BlockPostingList creation failed");
1001 block_list
1002 .serialize(&mut posting_bytes)
1003 .expect("BlockPostingList serialization failed");
1004 SerializedPosting::External {
1005 bytes: posting_bytes,
1006 doc_count: full_postings.doc_count(),
1007 }
1008 };
1009
1010 (key, result)
1011 })
1012 .collect();
1013
1014 let mut term_dict = Vec::new();
1016 let mut postings = Vec::new();
1017 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1018
1019 for (key, serialized_posting) in serialized {
1020 let term_info = match serialized_posting {
1021 SerializedPosting::Inline(info) => info,
1022 SerializedPosting::External { bytes, doc_count } => {
1023 let posting_offset = postings.len() as u64;
1024 let posting_len = bytes.len() as u32;
1025 postings.extend_from_slice(&bytes);
1026
1027 if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1029 TermInfo::external_with_positions(
1030 posting_offset,
1031 posting_len,
1032 doc_count,
1033 pos_offset,
1034 pos_len,
1035 )
1036 } else {
1037 TermInfo::external(posting_offset, posting_len, doc_count)
1038 }
1039 }
1040 };
1041
1042 writer.insert(&key, &term_info)?;
1043 }
1044
1045 writer.finish()?;
1046 Ok((term_dict, postings))
1047 }
1048
1049 fn build_store_parallel(
1053 store_path: &PathBuf,
1054 schema: &Schema,
1055 num_compression_threads: usize,
1056 compression_level: CompressionLevel,
1057 ) -> Result<Vec<u8>> {
1058 use super::store::EagerParallelStoreWriter;
1059
1060 let file = File::open(store_path)?;
1061 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1062
1063 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1065 let mut offset = 0usize;
1066 while offset + 4 <= mmap.len() {
1067 let doc_len = u32::from_le_bytes([
1068 mmap[offset],
1069 mmap[offset + 1],
1070 mmap[offset + 2],
1071 mmap[offset + 3],
1072 ]) as usize;
1073 offset += 4;
1074
1075 if offset + doc_len > mmap.len() {
1076 break;
1077 }
1078
1079 doc_ranges.push((offset, doc_len));
1080 offset += doc_len;
1081 }
1082
1083 let docs: Vec<Document> = doc_ranges
1085 .into_par_iter()
1086 .filter_map(|(start, len)| {
1087 let doc_bytes = &mmap[start..start + len];
1088 super::store::deserialize_document(doc_bytes, schema).ok()
1089 })
1090 .collect();
1091
1092 let mut store_data = Vec::new();
1094 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1095 &mut store_data,
1096 num_compression_threads,
1097 compression_level,
1098 );
1099
1100 for doc in &docs {
1101 store_writer.store(doc, schema)?;
1102 }
1103
1104 store_writer.finish()?;
1105 Ok(store_data)
1106 }
1107}
1108
1109impl Drop for SegmentBuilder {
1110 fn drop(&mut self) {
1111 let _ = std::fs::remove_file(&self.store_path);
1113 }
1114}