1use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use hashbrown::HashMap;
17use lasso::{Rodeo, Spur};
18use rayon::prelude::*;
19use rustc_hash::FxHashMap;
20
21use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
22use crate::compression::CompressionLevel;
23use crate::directories::{Directory, DirectoryWriter};
24use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
25use crate::structures::{PostingList, SSTableWriter, TermInfo};
26use crate::tokenizer::BoxedTokenizer;
27use crate::wand::WandData;
28use crate::{DocId, Result};
29
30const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[derive(Clone, Copy, PartialEq, Eq, Hash)]
35struct TermKey {
36 field: u32,
37 term: Spur,
38}
39
40#[derive(Clone, Copy)]
42struct CompactPosting {
43 doc_id: DocId,
44 term_freq: u16,
45}
46
47struct PostingListBuilder {
49 postings: Vec<CompactPosting>,
51}
52
53impl PostingListBuilder {
54 fn new() -> Self {
55 Self {
56 postings: Vec::new(),
57 }
58 }
59
60 #[inline]
62 fn add(&mut self, doc_id: DocId, term_freq: u32) {
63 if let Some(last) = self.postings.last_mut()
65 && last.doc_id == doc_id
66 {
67 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
68 return;
69 }
70 self.postings.push(CompactPosting {
71 doc_id,
72 term_freq: term_freq.min(u16::MAX as u32) as u16,
73 });
74 }
75
76 fn len(&self) -> usize {
77 self.postings.len()
78 }
79}
80
81enum SerializedPosting {
83 Inline(TermInfo),
85 External { bytes: Vec<u8>, doc_count: u32 },
87}
88
89#[derive(Debug, Clone)]
91pub struct SegmentBuilderStats {
92 pub num_docs: u32,
94 pub unique_terms: usize,
96 pub postings_in_memory: usize,
98 pub interned_strings: usize,
100 pub doc_field_lengths_size: usize,
102 pub estimated_memory_bytes: usize,
104 pub memory_breakdown: MemoryBreakdown,
106}
107
108#[derive(Debug, Clone, Default)]
110pub struct MemoryBreakdown {
111 pub postings_bytes: usize,
113 pub index_overhead_bytes: usize,
115 pub interner_bytes: usize,
117 pub field_lengths_bytes: usize,
119 pub dense_vectors_bytes: usize,
121 pub dense_vector_count: usize,
123}
124
125#[derive(Clone)]
127pub struct SegmentBuilderConfig {
128 pub temp_dir: PathBuf,
130 pub compression_level: CompressionLevel,
132 pub num_compression_threads: usize,
134 pub interner_capacity: usize,
136 pub posting_map_capacity: usize,
138}
139
140impl Default for SegmentBuilderConfig {
141 fn default() -> Self {
142 Self {
143 temp_dir: std::env::temp_dir(),
144 compression_level: CompressionLevel(7),
145 num_compression_threads: num_cpus::get(),
146 interner_capacity: 1_000_000,
147 posting_map_capacity: 500_000,
148 }
149 }
150}
151
152pub struct SegmentBuilder {
159 schema: Schema,
160 config: SegmentBuilderConfig,
161 tokenizers: FxHashMap<Field, BoxedTokenizer>,
162
163 term_interner: Rodeo,
165
166 inverted_index: HashMap<TermKey, PostingListBuilder>,
168
169 store_file: BufWriter<File>,
171 store_path: PathBuf,
172
173 next_doc_id: DocId,
175
176 field_stats: FxHashMap<u32, FieldStats>,
178
179 doc_field_lengths: Vec<u32>,
183 num_indexed_fields: usize,
184 field_to_slot: FxHashMap<u32, usize>,
185
186 wand_data: Option<Arc<WandData>>,
188
189 local_tf_buffer: FxHashMap<Spur, u32>,
192
193 token_buffer: String,
195
196 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
199}
200
201struct DenseVectorBuilder {
203 dim: usize,
205 doc_ids: Vec<DocId>,
207 vectors: Vec<f32>,
209}
210
211impl DenseVectorBuilder {
212 fn new(dim: usize) -> Self {
213 Self {
214 dim,
215 doc_ids: Vec::new(),
216 vectors: Vec::new(),
217 }
218 }
219
220 fn add(&mut self, doc_id: DocId, vector: &[f32]) {
221 debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
222 self.doc_ids.push(doc_id);
223 self.vectors.extend_from_slice(vector);
224 }
225
226 fn len(&self) -> usize {
227 self.doc_ids.len()
228 }
229
230 fn get_vectors(&self) -> Vec<Vec<f32>> {
232 self.doc_ids
233 .iter()
234 .enumerate()
235 .map(|(i, _)| {
236 let start = i * self.dim;
237 self.vectors[start..start + self.dim].to_vec()
238 })
239 .collect()
240 }
241
242 fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
244 debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
245 self.doc_ids
246 .iter()
247 .enumerate()
248 .map(|(i, _)| {
249 let start = i * self.dim;
250 self.vectors[start..start + trim_dim].to_vec()
251 })
252 .collect()
253 }
254}
255
256impl SegmentBuilder {
257 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
259 let segment_id = uuid::Uuid::new_v4();
260 let store_path = config
261 .temp_dir
262 .join(format!("hermes_store_{}.tmp", segment_id));
263
264 let store_file = BufWriter::with_capacity(
265 STORE_BUFFER_SIZE,
266 OpenOptions::new()
267 .create(true)
268 .write(true)
269 .truncate(true)
270 .open(&store_path)?,
271 );
272
273 let mut num_indexed_fields = 0;
275 let mut field_to_slot = FxHashMap::default();
276 for (field, entry) in schema.fields() {
277 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
278 field_to_slot.insert(field.0, num_indexed_fields);
279 num_indexed_fields += 1;
280 }
281 }
282
283 Ok(Self {
284 schema,
285 tokenizers: FxHashMap::default(),
286 term_interner: Rodeo::new(),
287 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
288 store_file,
289 store_path,
290 next_doc_id: 0,
291 field_stats: FxHashMap::default(),
292 doc_field_lengths: Vec::new(),
293 num_indexed_fields,
294 field_to_slot,
295 wand_data: None,
296 local_tf_buffer: FxHashMap::default(),
297 token_buffer: String::with_capacity(64),
298 config,
299 dense_vectors: FxHashMap::default(),
300 })
301 }
302
303 pub fn with_wand_data(
305 schema: Schema,
306 config: SegmentBuilderConfig,
307 wand_data: Arc<WandData>,
308 ) -> Result<Self> {
309 let mut builder = Self::new(schema, config)?;
310 builder.wand_data = Some(wand_data);
311 Ok(builder)
312 }
313
314 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
315 self.tokenizers.insert(field, tokenizer);
316 }
317
318 pub fn num_docs(&self) -> u32 {
319 self.next_doc_id
320 }
321
322 pub fn stats(&self) -> SegmentBuilderStats {
324 use std::mem::size_of;
325
326 let postings_in_memory: usize =
327 self.inverted_index.values().map(|p| p.postings.len()).sum();
328
329 let compact_posting_size = size_of::<CompactPosting>();
332
333 let postings_bytes: usize = self
335 .inverted_index
336 .values()
337 .map(|p| {
338 p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
339 })
340 .sum();
341
342 let term_key_size = size_of::<TermKey>();
346 let posting_builder_size = size_of::<PostingListBuilder>();
347 let hashmap_entry_overhead = 24; let index_overhead_bytes = self.inverted_index.len()
349 * (term_key_size + posting_builder_size + hashmap_entry_overhead);
350
351 let avg_term_len = 8;
355 let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
356 let interner_bytes =
357 self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
358
359 let field_lengths_bytes =
361 self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
362
363 let mut dense_vectors_bytes: usize = 0;
365 let mut dense_vector_count: usize = 0;
366 for b in self.dense_vectors.values() {
367 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
369 + b.doc_ids.capacity() * size_of::<DocId>()
370 + size_of::<Vec<f32>>()
371 + size_of::<Vec<DocId>>();
372 dense_vector_count += b.doc_ids.len();
373 }
374
375 let local_tf_buffer_bytes =
377 self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
378
379 let estimated_memory_bytes = postings_bytes
380 + index_overhead_bytes
381 + interner_bytes
382 + field_lengths_bytes
383 + dense_vectors_bytes
384 + local_tf_buffer_bytes;
385
386 let memory_breakdown = MemoryBreakdown {
387 postings_bytes,
388 index_overhead_bytes,
389 interner_bytes,
390 field_lengths_bytes,
391 dense_vectors_bytes,
392 dense_vector_count,
393 };
394
395 SegmentBuilderStats {
396 num_docs: self.next_doc_id,
397 unique_terms: self.inverted_index.len(),
398 postings_in_memory,
399 interned_strings: self.term_interner.len(),
400 doc_field_lengths_size: self.doc_field_lengths.len(),
401 estimated_memory_bytes,
402 memory_breakdown,
403 }
404 }
405
406 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
408 let doc_id = self.next_doc_id;
409 self.next_doc_id += 1;
410
411 let base_idx = self.doc_field_lengths.len();
413 self.doc_field_lengths
414 .resize(base_idx + self.num_indexed_fields, 0);
415
416 for (field, value) in doc.field_values() {
417 let entry = self.schema.get_field_entry(*field);
418 if entry.is_none() || !entry.unwrap().indexed {
419 continue;
420 }
421
422 let entry = entry.unwrap();
423 match (&entry.field_type, value) {
424 (FieldType::Text, FieldValue::Text(text)) => {
425 let token_count = self.index_text_field(*field, doc_id, text)?;
426
427 let stats = self.field_stats.entry(field.0).or_default();
429 stats.total_tokens += token_count as u64;
430 stats.doc_count += 1;
431
432 if let Some(&slot) = self.field_to_slot.get(&field.0) {
434 self.doc_field_lengths[base_idx + slot] = token_count;
435 }
436 }
437 (FieldType::U64, FieldValue::U64(v)) => {
438 self.index_numeric_field(*field, doc_id, *v)?;
439 }
440 (FieldType::I64, FieldValue::I64(v)) => {
441 self.index_numeric_field(*field, doc_id, *v as u64)?;
442 }
443 (FieldType::F64, FieldValue::F64(v)) => {
444 self.index_numeric_field(*field, doc_id, v.to_bits())?;
445 }
446 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
447 self.index_dense_vector_field(*field, doc_id, vec)?;
448 }
449 (FieldType::SparseVector, FieldValue::SparseVector { indices, values }) => {
450 self.index_sparse_vector_field(*field, doc_id, indices, values)?;
451 }
452 _ => {}
453 }
454 }
455
456 self.write_document_to_store(&doc)?;
458
459 Ok(doc_id)
460 }
461
462 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
470 self.local_tf_buffer.clear();
473
474 let mut token_count = 0u32;
475
476 for word in text.split_whitespace() {
478 self.token_buffer.clear();
480 for c in word.chars() {
481 if c.is_alphanumeric() {
482 for lc in c.to_lowercase() {
483 self.token_buffer.push(lc);
484 }
485 }
486 }
487
488 if self.token_buffer.is_empty() {
489 continue;
490 }
491
492 token_count += 1;
493
494 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
496 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
497 }
498
499 let field_id = field.0;
502
503 for (&term_spur, &tf) in &self.local_tf_buffer {
504 let term_key = TermKey {
505 field: field_id,
506 term: term_spur,
507 };
508
509 let posting = self
510 .inverted_index
511 .entry(term_key)
512 .or_insert_with(PostingListBuilder::new);
513 posting.add(doc_id, tf);
514 }
515
516 Ok(token_count)
517 }
518
519 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
520 let term_str = format!("__num_{}", value);
522 let term_spur = self.term_interner.get_or_intern(&term_str);
523
524 let term_key = TermKey {
525 field: field.0,
526 term: term_spur,
527 };
528
529 let posting = self
530 .inverted_index
531 .entry(term_key)
532 .or_insert_with(PostingListBuilder::new);
533 posting.add(doc_id, 1);
534
535 Ok(())
536 }
537
538 fn index_dense_vector_field(
540 &mut self,
541 field: Field,
542 doc_id: DocId,
543 vector: &[f32],
544 ) -> Result<()> {
545 let dim = vector.len();
546
547 let builder = self
548 .dense_vectors
549 .entry(field.0)
550 .or_insert_with(|| DenseVectorBuilder::new(dim));
551
552 if builder.dim != dim && builder.len() > 0 {
554 return Err(crate::Error::Schema(format!(
555 "Dense vector dimension mismatch: expected {}, got {}",
556 builder.dim, dim
557 )));
558 }
559
560 builder.add(doc_id, vector);
561 Ok(())
562 }
563
564 fn index_sparse_vector_field(
572 &mut self,
573 field: Field,
574 doc_id: DocId,
575 indices: &[u32],
576 values: &[f32],
577 ) -> Result<()> {
578 let weight_threshold = self
580 .schema
581 .get_field_entry(field)
582 .and_then(|entry| entry.sparse_vector_config.as_ref())
583 .map(|config| config.weight_threshold)
584 .unwrap_or(0.0);
585
586 for (&dim_id, &weight) in indices.iter().zip(values.iter()) {
587 if weight.abs() < weight_threshold {
589 continue;
590 }
591
592 let term_str = format!("__sparse_{}", dim_id);
594 let term_spur = self.term_interner.get_or_intern(&term_str);
595
596 let term_key = TermKey {
597 field: field.0,
598 term: term_spur,
599 };
600
601 let quantized_weight = (weight.abs() * 1000.0).min(u16::MAX as f32) as u32;
604
605 let posting = self
606 .inverted_index
607 .entry(term_key)
608 .or_insert_with(PostingListBuilder::new);
609 posting.add(doc_id, quantized_weight.max(1));
610 }
611
612 Ok(())
613 }
614
615 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
617 use byteorder::{LittleEndian, WriteBytesExt};
618
619 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
620
621 self.store_file
622 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
623 self.store_file.write_all(&doc_bytes)?;
624
625 Ok(())
626 }
627
628 pub async fn build<D: Directory + DirectoryWriter>(
630 mut self,
631 dir: &D,
632 segment_id: SegmentId,
633 ) -> Result<SegmentMeta> {
634 self.store_file.flush()?;
636
637 let files = SegmentFiles::new(segment_id.0);
638
639 let store_path = self.store_path.clone();
641 let schema = self.schema.clone();
642 let num_compression_threads = self.config.num_compression_threads;
643 let compression_level = self.config.compression_level;
644
645 let (postings_result, store_result) = rayon::join(
647 || self.build_postings(),
648 || {
649 Self::build_store_parallel(
650 &store_path,
651 &schema,
652 num_compression_threads,
653 compression_level,
654 )
655 },
656 );
657
658 let (term_dict_data, postings_data) = postings_result?;
659 let store_data = store_result?;
660
661 dir.write(&files.term_dict, &term_dict_data).await?;
663 dir.write(&files.postings, &postings_data).await?;
664 dir.write(&files.store, &store_data).await?;
665
666 if !self.dense_vectors.is_empty() {
668 let vectors_data = self.build_vectors_file()?;
669 if !vectors_data.is_empty() {
670 dir.write(&files.vectors, &vectors_data).await?;
671 }
672 }
673
674 let meta = SegmentMeta {
675 id: segment_id.0,
676 num_docs: self.next_doc_id,
677 field_stats: self.field_stats.clone(),
678 };
679
680 dir.write(&files.meta, &meta.serialize()?).await?;
681
682 let _ = std::fs::remove_file(&self.store_path);
684
685 Ok(meta)
686 }
687
688 fn build_vectors_file(&self) -> Result<Vec<u8>> {
695 use crate::dsl::VectorIndexType;
696 use byteorder::{LittleEndian, WriteBytesExt};
697
698 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
700
701 for (&field_id, builder) in &self.dense_vectors {
702 if builder.len() == 0 {
703 continue;
704 }
705
706 let field = crate::dsl::Field(field_id);
707
708 let dense_config = self
710 .schema
711 .get_field_entry(field)
712 .and_then(|e| e.dense_vector_config.as_ref());
713
714 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
716 let vectors = if index_dim < builder.dim {
717 builder.get_vectors_trimmed(index_dim)
719 } else {
720 builder.get_vectors()
721 };
722
723 let (index_type, index_bytes) = match dense_config.map(|c| c.index_type) {
724 Some(VectorIndexType::ScaNN) => {
725 let config = dense_config.unwrap();
727 let centroids_path =
728 config.coarse_centroids_path.as_ref().ok_or_else(|| {
729 crate::Error::Schema("ScaNN requires coarse_centroids_path".into())
730 })?;
731 let codebook_path = config.pq_codebook_path.as_ref().ok_or_else(|| {
732 crate::Error::Schema("ScaNN requires pq_codebook_path".into())
733 })?;
734
735 let coarse_centroids = crate::structures::CoarseCentroids::load(
736 std::path::Path::new(centroids_path),
737 )
738 .map_err(crate::Error::Io)?;
739
740 let pq_codebook =
741 crate::structures::PQCodebook::load(std::path::Path::new(codebook_path))
742 .map_err(crate::Error::Io)?;
743
744 let doc_ids: Vec<u32> = builder.doc_ids.clone();
745 let ivfpq_config = crate::structures::IVFPQConfig::new(index_dim)
746 .with_store_raw(config.store_raw);
747
748 let ivfpq_index = crate::structures::IVFPQIndex::build(
749 ivfpq_config,
750 &coarse_centroids,
751 &pq_codebook,
752 &vectors,
753 Some(doc_ids.as_slice()),
754 );
755
756 let bytes = ivfpq_index
758 .to_bytes()
759 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
760 (2u8, bytes) }
762 Some(VectorIndexType::IvfRaBitQ) => {
763 let config = dense_config.unwrap();
765 let centroids_path =
766 config.coarse_centroids_path.as_ref().ok_or_else(|| {
767 crate::Error::Schema("IVF-RaBitQ requires coarse_centroids_path".into())
768 })?;
769
770 match crate::structures::CoarseCentroids::load(std::path::Path::new(
771 centroids_path,
772 )) {
773 Ok(coarse_centroids) => {
774 let ivf_cfg = crate::structures::IVFRaBitQConfig::new(index_dim)
775 .with_store_raw(config.store_raw);
776 let rabitq_codebook = crate::structures::RaBitQCodebook::new(
777 crate::structures::RaBitQConfig::new(index_dim),
778 );
779 let doc_ids: Vec<u32> = builder.doc_ids.clone();
780 let ivf_index = crate::structures::IVFRaBitQIndex::build(
781 ivf_cfg,
782 &coarse_centroids,
783 &rabitq_codebook,
784 &vectors,
785 Some(doc_ids.as_slice()),
786 );
787 let bytes = ivf_index
788 .to_bytes()
789 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
790 (1u8, bytes) }
792 Err(e) => {
793 log::warn!("Failed to load centroids: {}, falling back to RaBitQ", e);
794 let cfg = crate::structures::RaBitQConfig::new(index_dim);
795 let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
796 let bytes = serde_json::to_vec(&idx)
797 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
798 (0u8, bytes) }
800 }
801 }
802 _ => {
803 let store_raw = dense_config.map(|c| c.store_raw).unwrap_or(true);
805 let cfg = crate::structures::RaBitQConfig::new(index_dim);
806 let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, store_raw);
807 let bytes = serde_json::to_vec(&idx)
808 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
809 (0u8, bytes) }
811 };
812
813 field_indexes.push((field_id, index_type, index_bytes));
814 }
815
816 if field_indexes.is_empty() {
817 return Ok(Vec::new());
818 }
819
820 field_indexes.sort_by_key(|(id, _, _)| *id);
822
823 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
825
826 let mut output = Vec::new();
828
829 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
831
832 let mut current_offset = header_size as u64;
834 for (field_id, index_type, data) in &field_indexes {
835 output.write_u32::<LittleEndian>(*field_id)?;
836 output.write_u8(*index_type)?;
837 output.write_u64::<LittleEndian>(current_offset)?;
838 output.write_u64::<LittleEndian>(data.len() as u64)?;
839 current_offset += data.len() as u64;
840 }
841
842 for (_, _, data) in field_indexes {
844 output.extend_from_slice(&data);
845 }
846
847 Ok(output)
848 }
849
850 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
854 let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
857 .inverted_index
858 .iter()
859 .map(|(term_key, posting_list)| {
860 let term_str = self.term_interner.resolve(&term_key.term);
861 let mut key = Vec::with_capacity(4 + term_str.len());
862 key.extend_from_slice(&term_key.field.to_le_bytes());
863 key.extend_from_slice(term_str.as_bytes());
864 (key, posting_list)
865 })
866 .collect();
867
868 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
870
871 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
874 .into_par_iter()
875 .map(|(key, posting_builder)| {
876 let mut full_postings = PostingList::with_capacity(posting_builder.len());
878 for p in &posting_builder.postings {
879 full_postings.push(p.doc_id, p.term_freq as u32);
880 }
881
882 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
884 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
885
886 let result = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
887 SerializedPosting::Inline(inline)
888 } else {
889 let mut posting_bytes = Vec::new();
891 let block_list =
892 crate::structures::BlockPostingList::from_posting_list(&full_postings)
893 .expect("BlockPostingList creation failed");
894 block_list
895 .serialize(&mut posting_bytes)
896 .expect("BlockPostingList serialization failed");
897 SerializedPosting::External {
898 bytes: posting_bytes,
899 doc_count: full_postings.doc_count(),
900 }
901 };
902
903 (key, result)
904 })
905 .collect();
906
907 let mut term_dict = Vec::new();
909 let mut postings = Vec::new();
910 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
911
912 for (key, serialized_posting) in serialized {
913 let term_info = match serialized_posting {
914 SerializedPosting::Inline(info) => info,
915 SerializedPosting::External { bytes, doc_count } => {
916 let posting_offset = postings.len() as u64;
917 let posting_len = bytes.len() as u32;
918 postings.extend_from_slice(&bytes);
919 TermInfo::external(posting_offset, posting_len, doc_count)
920 }
921 };
922
923 writer.insert(&key, &term_info)?;
924 }
925
926 writer.finish()?;
927 Ok((term_dict, postings))
928 }
929
930 fn build_store_parallel(
934 store_path: &PathBuf,
935 schema: &Schema,
936 num_compression_threads: usize,
937 compression_level: CompressionLevel,
938 ) -> Result<Vec<u8>> {
939 use super::store::EagerParallelStoreWriter;
940
941 let file = File::open(store_path)?;
942 let mmap = unsafe { memmap2::Mmap::map(&file)? };
943
944 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
946 let mut offset = 0usize;
947 while offset + 4 <= mmap.len() {
948 let doc_len = u32::from_le_bytes([
949 mmap[offset],
950 mmap[offset + 1],
951 mmap[offset + 2],
952 mmap[offset + 3],
953 ]) as usize;
954 offset += 4;
955
956 if offset + doc_len > mmap.len() {
957 break;
958 }
959
960 doc_ranges.push((offset, doc_len));
961 offset += doc_len;
962 }
963
964 let docs: Vec<Document> = doc_ranges
966 .into_par_iter()
967 .filter_map(|(start, len)| {
968 let doc_bytes = &mmap[start..start + len];
969 super::store::deserialize_document(doc_bytes, schema).ok()
970 })
971 .collect();
972
973 let mut store_data = Vec::new();
975 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
976 &mut store_data,
977 num_compression_threads,
978 compression_level,
979 );
980
981 for doc in &docs {
982 store_writer.store(doc, schema)?;
983 }
984
985 store_writer.finish()?;
986 Ok(store_data)
987 }
988}
989
990impl Drop for SegmentBuilder {
991 fn drop(&mut self) {
992 let _ = std::fs::remove_file(&self.store_path);
994 }
995}