1use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use hashbrown::HashMap;
17use lasso::{Rodeo, Spur};
18use rayon::prelude::*;
19use rustc_hash::FxHashMap;
20
21use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
22use crate::compression::CompressionLevel;
23use crate::directories::{Directory, DirectoryWriter};
24use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
25use crate::structures::{PostingList, SSTableWriter, TermInfo};
26use crate::tokenizer::BoxedTokenizer;
27use crate::wand::WandData;
28use crate::{DocId, Result};
29
30const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[derive(Clone, Copy, PartialEq, Eq, Hash)]
35struct TermKey {
36 field: u32,
37 term: Spur,
38}
39
40#[derive(Clone, Copy)]
42struct CompactPosting {
43 doc_id: DocId,
44 term_freq: u16,
45}
46
47struct PostingListBuilder {
49 postings: Vec<CompactPosting>,
51}
52
53impl PostingListBuilder {
54 fn new() -> Self {
55 Self {
56 postings: Vec::new(),
57 }
58 }
59
60 #[inline]
62 fn add(&mut self, doc_id: DocId, term_freq: u32) {
63 if let Some(last) = self.postings.last_mut()
65 && last.doc_id == doc_id
66 {
67 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
68 return;
69 }
70 self.postings.push(CompactPosting {
71 doc_id,
72 term_freq: term_freq.min(u16::MAX as u32) as u16,
73 });
74 }
75
76 fn len(&self) -> usize {
77 self.postings.len()
78 }
79}
80
81enum SerializedPosting {
83 Inline(TermInfo),
85 External { bytes: Vec<u8>, doc_count: u32 },
87}
88
89#[derive(Debug, Clone)]
91pub struct SegmentBuilderStats {
92 pub num_docs: u32,
94 pub unique_terms: usize,
96 pub postings_in_memory: usize,
98 pub interned_strings: usize,
100 pub doc_field_lengths_size: usize,
102}
103
104#[derive(Clone)]
106pub struct SegmentBuilderConfig {
107 pub temp_dir: PathBuf,
109 pub compression_level: CompressionLevel,
111 pub num_compression_threads: usize,
113 pub interner_capacity: usize,
115 pub posting_map_capacity: usize,
117}
118
119impl Default for SegmentBuilderConfig {
120 fn default() -> Self {
121 Self {
122 temp_dir: std::env::temp_dir(),
123 compression_level: CompressionLevel(7),
124 num_compression_threads: num_cpus::get(),
125 interner_capacity: 1_000_000,
126 posting_map_capacity: 500_000,
127 }
128 }
129}
130
131pub struct SegmentBuilder {
138 schema: Schema,
139 config: SegmentBuilderConfig,
140 tokenizers: FxHashMap<Field, BoxedTokenizer>,
141
142 term_interner: Rodeo,
144
145 inverted_index: HashMap<TermKey, PostingListBuilder>,
147
148 store_file: BufWriter<File>,
150 store_path: PathBuf,
151
152 next_doc_id: DocId,
154
155 field_stats: FxHashMap<u32, FieldStats>,
157
158 doc_field_lengths: Vec<u32>,
162 num_indexed_fields: usize,
163 field_to_slot: FxHashMap<u32, usize>,
164
165 wand_data: Option<Arc<WandData>>,
167
168 local_tf_buffer: FxHashMap<Spur, u32>,
171
172 token_buffer: String,
174
175 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
178}
179
180struct DenseVectorBuilder {
182 dim: usize,
184 doc_ids: Vec<DocId>,
186 vectors: Vec<f32>,
188}
189
190impl DenseVectorBuilder {
191 fn new(dim: usize) -> Self {
192 Self {
193 dim,
194 doc_ids: Vec::new(),
195 vectors: Vec::new(),
196 }
197 }
198
199 fn add(&mut self, doc_id: DocId, vector: &[f32]) {
200 debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
201 self.doc_ids.push(doc_id);
202 self.vectors.extend_from_slice(vector);
203 }
204
205 fn len(&self) -> usize {
206 self.doc_ids.len()
207 }
208
209 fn get_vectors(&self) -> Vec<Vec<f32>> {
211 self.doc_ids
212 .iter()
213 .enumerate()
214 .map(|(i, _)| {
215 let start = i * self.dim;
216 self.vectors[start..start + self.dim].to_vec()
217 })
218 .collect()
219 }
220
221 fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
223 debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
224 self.doc_ids
225 .iter()
226 .enumerate()
227 .map(|(i, _)| {
228 let start = i * self.dim;
229 self.vectors[start..start + trim_dim].to_vec()
230 })
231 .collect()
232 }
233}
234
235impl SegmentBuilder {
236 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
238 let segment_id = uuid::Uuid::new_v4();
239 let store_path = config
240 .temp_dir
241 .join(format!("hermes_store_{}.tmp", segment_id));
242
243 let store_file = BufWriter::with_capacity(
244 STORE_BUFFER_SIZE,
245 OpenOptions::new()
246 .create(true)
247 .write(true)
248 .truncate(true)
249 .open(&store_path)?,
250 );
251
252 let mut num_indexed_fields = 0;
254 let mut field_to_slot = FxHashMap::default();
255 for (field, entry) in schema.fields() {
256 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
257 field_to_slot.insert(field.0, num_indexed_fields);
258 num_indexed_fields += 1;
259 }
260 }
261
262 Ok(Self {
263 schema,
264 tokenizers: FxHashMap::default(),
265 term_interner: Rodeo::new(),
266 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
267 store_file,
268 store_path,
269 next_doc_id: 0,
270 field_stats: FxHashMap::default(),
271 doc_field_lengths: Vec::new(),
272 num_indexed_fields,
273 field_to_slot,
274 wand_data: None,
275 local_tf_buffer: FxHashMap::default(),
276 token_buffer: String::with_capacity(64),
277 config,
278 dense_vectors: FxHashMap::default(),
279 })
280 }
281
282 pub fn with_wand_data(
284 schema: Schema,
285 config: SegmentBuilderConfig,
286 wand_data: Arc<WandData>,
287 ) -> Result<Self> {
288 let mut builder = Self::new(schema, config)?;
289 builder.wand_data = Some(wand_data);
290 Ok(builder)
291 }
292
293 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
294 self.tokenizers.insert(field, tokenizer);
295 }
296
297 pub fn num_docs(&self) -> u32 {
298 self.next_doc_id
299 }
300
301 pub fn stats(&self) -> SegmentBuilderStats {
303 let postings_in_memory: usize =
304 self.inverted_index.values().map(|p| p.postings.len()).sum();
305 SegmentBuilderStats {
306 num_docs: self.next_doc_id,
307 unique_terms: self.inverted_index.len(),
308 postings_in_memory,
309 interned_strings: self.term_interner.len(),
310 doc_field_lengths_size: self.doc_field_lengths.len(),
311 }
312 }
313
314 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
316 let doc_id = self.next_doc_id;
317 self.next_doc_id += 1;
318
319 let base_idx = self.doc_field_lengths.len();
321 self.doc_field_lengths
322 .resize(base_idx + self.num_indexed_fields, 0);
323
324 for (field, value) in doc.field_values() {
325 let entry = self.schema.get_field_entry(*field);
326 if entry.is_none() || !entry.unwrap().indexed {
327 continue;
328 }
329
330 let entry = entry.unwrap();
331 match (&entry.field_type, value) {
332 (FieldType::Text, FieldValue::Text(text)) => {
333 let token_count = self.index_text_field(*field, doc_id, text)?;
334
335 let stats = self.field_stats.entry(field.0).or_default();
337 stats.total_tokens += token_count as u64;
338 stats.doc_count += 1;
339
340 if let Some(&slot) = self.field_to_slot.get(&field.0) {
342 self.doc_field_lengths[base_idx + slot] = token_count;
343 }
344 }
345 (FieldType::U64, FieldValue::U64(v)) => {
346 self.index_numeric_field(*field, doc_id, *v)?;
347 }
348 (FieldType::I64, FieldValue::I64(v)) => {
349 self.index_numeric_field(*field, doc_id, *v as u64)?;
350 }
351 (FieldType::F64, FieldValue::F64(v)) => {
352 self.index_numeric_field(*field, doc_id, v.to_bits())?;
353 }
354 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
355 self.index_dense_vector_field(*field, doc_id, vec)?;
356 }
357 (FieldType::SparseVector, FieldValue::SparseVector { indices, values }) => {
358 self.index_sparse_vector_field(*field, doc_id, indices, values)?;
359 }
360 _ => {}
361 }
362 }
363
364 self.write_document_to_store(&doc)?;
366
367 Ok(doc_id)
368 }
369
370 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
378 self.local_tf_buffer.clear();
381
382 let mut token_count = 0u32;
383
384 for word in text.split_whitespace() {
386 self.token_buffer.clear();
388 for c in word.chars() {
389 if c.is_alphanumeric() {
390 for lc in c.to_lowercase() {
391 self.token_buffer.push(lc);
392 }
393 }
394 }
395
396 if self.token_buffer.is_empty() {
397 continue;
398 }
399
400 token_count += 1;
401
402 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
404 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
405 }
406
407 let field_id = field.0;
410
411 for (&term_spur, &tf) in &self.local_tf_buffer {
412 let term_key = TermKey {
413 field: field_id,
414 term: term_spur,
415 };
416
417 let posting = self
418 .inverted_index
419 .entry(term_key)
420 .or_insert_with(PostingListBuilder::new);
421 posting.add(doc_id, tf);
422 }
423
424 Ok(token_count)
425 }
426
427 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
428 let term_str = format!("__num_{}", value);
430 let term_spur = self.term_interner.get_or_intern(&term_str);
431
432 let term_key = TermKey {
433 field: field.0,
434 term: term_spur,
435 };
436
437 let posting = self
438 .inverted_index
439 .entry(term_key)
440 .or_insert_with(PostingListBuilder::new);
441 posting.add(doc_id, 1);
442
443 Ok(())
444 }
445
446 fn index_dense_vector_field(
448 &mut self,
449 field: Field,
450 doc_id: DocId,
451 vector: &[f32],
452 ) -> Result<()> {
453 let dim = vector.len();
454
455 let builder = self
456 .dense_vectors
457 .entry(field.0)
458 .or_insert_with(|| DenseVectorBuilder::new(dim));
459
460 if builder.dim != dim && builder.len() > 0 {
462 return Err(crate::Error::Schema(format!(
463 "Dense vector dimension mismatch: expected {}, got {}",
464 builder.dim, dim
465 )));
466 }
467
468 builder.add(doc_id, vector);
469 Ok(())
470 }
471
472 fn index_sparse_vector_field(
480 &mut self,
481 field: Field,
482 doc_id: DocId,
483 indices: &[u32],
484 values: &[f32],
485 ) -> Result<()> {
486 let weight_threshold = self
488 .schema
489 .get_field_entry(field)
490 .and_then(|entry| entry.sparse_vector_config.as_ref())
491 .map(|config| config.weight_threshold)
492 .unwrap_or(0.0);
493
494 for (&dim_id, &weight) in indices.iter().zip(values.iter()) {
495 if weight.abs() < weight_threshold {
497 continue;
498 }
499
500 let term_str = format!("__sparse_{}", dim_id);
502 let term_spur = self.term_interner.get_or_intern(&term_str);
503
504 let term_key = TermKey {
505 field: field.0,
506 term: term_spur,
507 };
508
509 let quantized_weight = (weight.abs() * 1000.0).min(u16::MAX as f32) as u32;
512
513 let posting = self
514 .inverted_index
515 .entry(term_key)
516 .or_insert_with(PostingListBuilder::new);
517 posting.add(doc_id, quantized_weight.max(1));
518 }
519
520 Ok(())
521 }
522
523 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
525 use byteorder::{LittleEndian, WriteBytesExt};
526
527 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
528
529 self.store_file
530 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
531 self.store_file.write_all(&doc_bytes)?;
532
533 Ok(())
534 }
535
536 pub async fn build<D: Directory + DirectoryWriter>(
538 mut self,
539 dir: &D,
540 segment_id: SegmentId,
541 ) -> Result<SegmentMeta> {
542 self.store_file.flush()?;
544
545 let files = SegmentFiles::new(segment_id.0);
546
547 let store_path = self.store_path.clone();
549 let schema = self.schema.clone();
550 let num_compression_threads = self.config.num_compression_threads;
551 let compression_level = self.config.compression_level;
552
553 let (postings_result, store_result) = rayon::join(
555 || self.build_postings(),
556 || {
557 Self::build_store_parallel(
558 &store_path,
559 &schema,
560 num_compression_threads,
561 compression_level,
562 )
563 },
564 );
565
566 let (term_dict_data, postings_data) = postings_result?;
567 let store_data = store_result?;
568
569 dir.write(&files.term_dict, &term_dict_data).await?;
571 dir.write(&files.postings, &postings_data).await?;
572 dir.write(&files.store, &store_data).await?;
573
574 if !self.dense_vectors.is_empty() {
576 let vectors_data = self.build_vectors_file()?;
577 if !vectors_data.is_empty() {
578 dir.write(&files.vectors, &vectors_data).await?;
579 }
580 }
581
582 let meta = SegmentMeta {
583 id: segment_id.0,
584 num_docs: self.next_doc_id,
585 field_stats: self.field_stats.clone(),
586 };
587
588 dir.write(&files.meta, &meta.serialize()?).await?;
589
590 let _ = std::fs::remove_file(&self.store_path);
592
593 Ok(meta)
594 }
595
596 fn build_vectors_file(&self) -> Result<Vec<u8>> {
603 use crate::dsl::VectorIndexType;
604 use byteorder::{LittleEndian, WriteBytesExt};
605
606 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
608
609 for (&field_id, builder) in &self.dense_vectors {
610 if builder.len() == 0 {
611 continue;
612 }
613
614 let field = crate::dsl::Field(field_id);
615
616 let dense_config = self
618 .schema
619 .get_field_entry(field)
620 .and_then(|e| e.dense_vector_config.as_ref());
621
622 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
624 let vectors = if index_dim < builder.dim {
625 builder.get_vectors_trimmed(index_dim)
627 } else {
628 builder.get_vectors()
629 };
630
631 let (index_type, index_bytes) = match dense_config.map(|c| c.index_type) {
632 Some(VectorIndexType::ScaNN) => {
633 let config = dense_config.unwrap();
635 let centroids_path =
636 config.coarse_centroids_path.as_ref().ok_or_else(|| {
637 crate::Error::Schema("ScaNN requires coarse_centroids_path".into())
638 })?;
639 let codebook_path = config.pq_codebook_path.as_ref().ok_or_else(|| {
640 crate::Error::Schema("ScaNN requires pq_codebook_path".into())
641 })?;
642
643 let coarse_centroids = crate::structures::CoarseCentroids::load(
644 std::path::Path::new(centroids_path),
645 )
646 .map_err(crate::Error::Io)?;
647
648 let pq_codebook =
649 crate::structures::PQCodebook::load(std::path::Path::new(codebook_path))
650 .map_err(crate::Error::Io)?;
651
652 let doc_ids: Vec<u32> = builder.doc_ids.clone();
653 let ivfpq_config = crate::structures::IVFPQConfig::new(index_dim)
654 .with_store_raw(config.store_raw);
655
656 let ivfpq_index = crate::structures::IVFPQIndex::build(
657 ivfpq_config,
658 &coarse_centroids,
659 &pq_codebook,
660 &vectors,
661 Some(doc_ids.as_slice()),
662 );
663
664 let bytes = ivfpq_index
666 .to_bytes()
667 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
668 (2u8, bytes) }
670 Some(VectorIndexType::IvfRaBitQ) => {
671 let config = dense_config.unwrap();
673 let centroids_path =
674 config.coarse_centroids_path.as_ref().ok_or_else(|| {
675 crate::Error::Schema("IVF-RaBitQ requires coarse_centroids_path".into())
676 })?;
677
678 match crate::structures::CoarseCentroids::load(std::path::Path::new(
679 centroids_path,
680 )) {
681 Ok(coarse_centroids) => {
682 let ivf_cfg = crate::structures::IVFRaBitQConfig::new(index_dim)
683 .with_store_raw(config.store_raw);
684 let rabitq_codebook = crate::structures::RaBitQCodebook::new(
685 crate::structures::RaBitQConfig::new(index_dim),
686 );
687 let doc_ids: Vec<u32> = builder.doc_ids.clone();
688 let ivf_index = crate::structures::IVFRaBitQIndex::build(
689 ivf_cfg,
690 &coarse_centroids,
691 &rabitq_codebook,
692 &vectors,
693 Some(doc_ids.as_slice()),
694 );
695 let bytes = ivf_index
696 .to_bytes()
697 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
698 (1u8, bytes) }
700 Err(e) => {
701 log::warn!("Failed to load centroids: {}, falling back to RaBitQ", e);
702 let cfg = crate::structures::RaBitQConfig::new(index_dim);
703 let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
704 let bytes = serde_json::to_vec(&idx)
705 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
706 (0u8, bytes) }
708 }
709 }
710 _ => {
711 let store_raw = dense_config.map(|c| c.store_raw).unwrap_or(true);
713 let cfg = crate::structures::RaBitQConfig::new(index_dim);
714 let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, store_raw);
715 let bytes = serde_json::to_vec(&idx)
716 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
717 (0u8, bytes) }
719 };
720
721 field_indexes.push((field_id, index_type, index_bytes));
722 }
723
724 if field_indexes.is_empty() {
725 return Ok(Vec::new());
726 }
727
728 field_indexes.sort_by_key(|(id, _, _)| *id);
730
731 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
733
734 let mut output = Vec::new();
736
737 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
739
740 let mut current_offset = header_size as u64;
742 for (field_id, index_type, data) in &field_indexes {
743 output.write_u32::<LittleEndian>(*field_id)?;
744 output.write_u8(*index_type)?;
745 output.write_u64::<LittleEndian>(current_offset)?;
746 output.write_u64::<LittleEndian>(data.len() as u64)?;
747 current_offset += data.len() as u64;
748 }
749
750 for (_, _, data) in field_indexes {
752 output.extend_from_slice(&data);
753 }
754
755 Ok(output)
756 }
757
758 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
762 let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
765 .inverted_index
766 .iter()
767 .map(|(term_key, posting_list)| {
768 let term_str = self.term_interner.resolve(&term_key.term);
769 let mut key = Vec::with_capacity(4 + term_str.len());
770 key.extend_from_slice(&term_key.field.to_le_bytes());
771 key.extend_from_slice(term_str.as_bytes());
772 (key, posting_list)
773 })
774 .collect();
775
776 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
778
779 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
782 .into_par_iter()
783 .map(|(key, posting_builder)| {
784 let mut full_postings = PostingList::with_capacity(posting_builder.len());
786 for p in &posting_builder.postings {
787 full_postings.push(p.doc_id, p.term_freq as u32);
788 }
789
790 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
792 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
793
794 let result = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
795 SerializedPosting::Inline(inline)
796 } else {
797 let mut posting_bytes = Vec::new();
799 let block_list =
800 crate::structures::BlockPostingList::from_posting_list(&full_postings)
801 .expect("BlockPostingList creation failed");
802 block_list
803 .serialize(&mut posting_bytes)
804 .expect("BlockPostingList serialization failed");
805 SerializedPosting::External {
806 bytes: posting_bytes,
807 doc_count: full_postings.doc_count(),
808 }
809 };
810
811 (key, result)
812 })
813 .collect();
814
815 let mut term_dict = Vec::new();
817 let mut postings = Vec::new();
818 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
819
820 for (key, serialized_posting) in serialized {
821 let term_info = match serialized_posting {
822 SerializedPosting::Inline(info) => info,
823 SerializedPosting::External { bytes, doc_count } => {
824 let posting_offset = postings.len() as u64;
825 let posting_len = bytes.len() as u32;
826 postings.extend_from_slice(&bytes);
827 TermInfo::external(posting_offset, posting_len, doc_count)
828 }
829 };
830
831 writer.insert(&key, &term_info)?;
832 }
833
834 writer.finish()?;
835 Ok((term_dict, postings))
836 }
837
838 fn build_store_parallel(
842 store_path: &PathBuf,
843 schema: &Schema,
844 num_compression_threads: usize,
845 compression_level: CompressionLevel,
846 ) -> Result<Vec<u8>> {
847 use super::store::EagerParallelStoreWriter;
848
849 let file = File::open(store_path)?;
850 let mmap = unsafe { memmap2::Mmap::map(&file)? };
851
852 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
854 let mut offset = 0usize;
855 while offset + 4 <= mmap.len() {
856 let doc_len = u32::from_le_bytes([
857 mmap[offset],
858 mmap[offset + 1],
859 mmap[offset + 2],
860 mmap[offset + 3],
861 ]) as usize;
862 offset += 4;
863
864 if offset + doc_len > mmap.len() {
865 break;
866 }
867
868 doc_ranges.push((offset, doc_len));
869 offset += doc_len;
870 }
871
872 let docs: Vec<Document> = doc_ranges
874 .into_par_iter()
875 .filter_map(|(start, len)| {
876 let doc_bytes = &mmap[start..start + len];
877 super::store::deserialize_document(doc_bytes, schema).ok()
878 })
879 .collect();
880
881 let mut store_data = Vec::new();
883 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
884 &mut store_data,
885 num_compression_threads,
886 compression_level,
887 );
888
889 for doc in &docs {
890 store_writer.store(doc, schema)?;
891 }
892
893 store_writer.finish()?;
894 Ok(store_data)
895 }
896}
897
898impl Drop for SegmentBuilder {
899 fn drop(&mut self) {
900 let _ = std::fs::remove_file(&self.store_path);
902 }
903}