1use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use hashbrown::HashMap;
17use lasso::{Rodeo, Spur};
18use rayon::prelude::*;
19use rustc_hash::FxHashMap;
20
21use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
22use crate::compression::CompressionLevel;
23use crate::directories::{Directory, DirectoryWriter};
24use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
25use crate::structures::{PostingList, SSTableWriter, TermInfo};
26use crate::tokenizer::BoxedTokenizer;
27use crate::wand::WandData;
28use crate::{DocId, Result};
29
30const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[derive(Clone, Copy, PartialEq, Eq, Hash)]
35struct TermKey {
36 field: u32,
37 term: Spur,
38}
39
40#[derive(Clone, Copy)]
42struct CompactPosting {
43 doc_id: DocId,
44 term_freq: u16,
45}
46
47struct PostingListBuilder {
49 postings: Vec<CompactPosting>,
51}
52
53impl PostingListBuilder {
54 fn new() -> Self {
55 Self {
56 postings: Vec::new(),
57 }
58 }
59
60 #[inline]
62 fn add(&mut self, doc_id: DocId, term_freq: u32) {
63 if let Some(last) = self.postings.last_mut()
65 && last.doc_id == doc_id
66 {
67 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
68 return;
69 }
70 self.postings.push(CompactPosting {
71 doc_id,
72 term_freq: term_freq.min(u16::MAX as u32) as u16,
73 });
74 }
75
76 fn len(&self) -> usize {
77 self.postings.len()
78 }
79}
80
81enum SerializedPosting {
83 Inline(TermInfo),
85 External { bytes: Vec<u8>, doc_count: u32 },
87}
88
89#[derive(Debug, Clone)]
91pub struct SegmentBuilderStats {
92 pub num_docs: u32,
94 pub unique_terms: usize,
96 pub postings_in_memory: usize,
98 pub interned_strings: usize,
100 pub doc_field_lengths_size: usize,
102}
103
104#[derive(Clone)]
106pub struct SegmentBuilderConfig {
107 pub temp_dir: PathBuf,
109 pub compression_level: CompressionLevel,
111 pub num_compression_threads: usize,
113 pub interner_capacity: usize,
115 pub posting_map_capacity: usize,
117}
118
119impl Default for SegmentBuilderConfig {
120 fn default() -> Self {
121 Self {
122 temp_dir: std::env::temp_dir(),
123 compression_level: CompressionLevel(7),
124 num_compression_threads: num_cpus::get(),
125 interner_capacity: 1_000_000,
126 posting_map_capacity: 500_000,
127 }
128 }
129}
130
131pub struct SegmentBuilder {
138 schema: Schema,
139 config: SegmentBuilderConfig,
140 tokenizers: FxHashMap<Field, BoxedTokenizer>,
141
142 term_interner: Rodeo,
144
145 inverted_index: HashMap<TermKey, PostingListBuilder>,
147
148 store_file: BufWriter<File>,
150 store_path: PathBuf,
151
152 next_doc_id: DocId,
154
155 field_stats: FxHashMap<u32, FieldStats>,
157
158 doc_field_lengths: Vec<u32>,
162 num_indexed_fields: usize,
163 field_to_slot: FxHashMap<u32, usize>,
164
165 wand_data: Option<Arc<WandData>>,
167
168 local_tf_buffer: FxHashMap<Spur, u32>,
171
172 token_buffer: String,
174
175 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
178}
179
180struct DenseVectorBuilder {
182 dim: usize,
184 doc_ids: Vec<DocId>,
186 vectors: Vec<f32>,
188}
189
190impl DenseVectorBuilder {
191 fn new(dim: usize) -> Self {
192 Self {
193 dim,
194 doc_ids: Vec::new(),
195 vectors: Vec::new(),
196 }
197 }
198
199 fn add(&mut self, doc_id: DocId, vector: &[f32]) {
200 debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
201 self.doc_ids.push(doc_id);
202 self.vectors.extend_from_slice(vector);
203 }
204
205 fn len(&self) -> usize {
206 self.doc_ids.len()
207 }
208
209 fn get_vectors(&self) -> Vec<Vec<f32>> {
211 self.doc_ids
212 .iter()
213 .enumerate()
214 .map(|(i, _)| {
215 let start = i * self.dim;
216 self.vectors[start..start + self.dim].to_vec()
217 })
218 .collect()
219 }
220}
221
222impl SegmentBuilder {
223 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
225 let segment_id = uuid::Uuid::new_v4();
226 let store_path = config
227 .temp_dir
228 .join(format!("hermes_store_{}.tmp", segment_id));
229
230 let store_file = BufWriter::with_capacity(
231 STORE_BUFFER_SIZE,
232 OpenOptions::new()
233 .create(true)
234 .write(true)
235 .truncate(true)
236 .open(&store_path)?,
237 );
238
239 let mut num_indexed_fields = 0;
241 let mut field_to_slot = FxHashMap::default();
242 for (field, entry) in schema.fields() {
243 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
244 field_to_slot.insert(field.0, num_indexed_fields);
245 num_indexed_fields += 1;
246 }
247 }
248
249 Ok(Self {
250 schema,
251 tokenizers: FxHashMap::default(),
252 term_interner: Rodeo::new(),
253 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
254 store_file,
255 store_path,
256 next_doc_id: 0,
257 field_stats: FxHashMap::default(),
258 doc_field_lengths: Vec::new(),
259 num_indexed_fields,
260 field_to_slot,
261 wand_data: None,
262 local_tf_buffer: FxHashMap::default(),
263 token_buffer: String::with_capacity(64),
264 config,
265 dense_vectors: FxHashMap::default(),
266 })
267 }
268
269 pub fn with_wand_data(
271 schema: Schema,
272 config: SegmentBuilderConfig,
273 wand_data: Arc<WandData>,
274 ) -> Result<Self> {
275 let mut builder = Self::new(schema, config)?;
276 builder.wand_data = Some(wand_data);
277 Ok(builder)
278 }
279
280 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
281 self.tokenizers.insert(field, tokenizer);
282 }
283
284 pub fn num_docs(&self) -> u32 {
285 self.next_doc_id
286 }
287
288 pub fn stats(&self) -> SegmentBuilderStats {
290 let postings_in_memory: usize =
291 self.inverted_index.values().map(|p| p.postings.len()).sum();
292 SegmentBuilderStats {
293 num_docs: self.next_doc_id,
294 unique_terms: self.inverted_index.len(),
295 postings_in_memory,
296 interned_strings: self.term_interner.len(),
297 doc_field_lengths_size: self.doc_field_lengths.len(),
298 }
299 }
300
301 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
303 let doc_id = self.next_doc_id;
304 self.next_doc_id += 1;
305
306 let base_idx = self.doc_field_lengths.len();
308 self.doc_field_lengths
309 .resize(base_idx + self.num_indexed_fields, 0);
310
311 for (field, value) in doc.field_values() {
312 let entry = self.schema.get_field_entry(*field);
313 if entry.is_none() || !entry.unwrap().indexed {
314 continue;
315 }
316
317 let entry = entry.unwrap();
318 match (&entry.field_type, value) {
319 (FieldType::Text, FieldValue::Text(text)) => {
320 let token_count = self.index_text_field(*field, doc_id, text)?;
321
322 let stats = self.field_stats.entry(field.0).or_default();
324 stats.total_tokens += token_count as u64;
325 stats.doc_count += 1;
326
327 if let Some(&slot) = self.field_to_slot.get(&field.0) {
329 self.doc_field_lengths[base_idx + slot] = token_count;
330 }
331 }
332 (FieldType::U64, FieldValue::U64(v)) => {
333 self.index_numeric_field(*field, doc_id, *v)?;
334 }
335 (FieldType::I64, FieldValue::I64(v)) => {
336 self.index_numeric_field(*field, doc_id, *v as u64)?;
337 }
338 (FieldType::F64, FieldValue::F64(v)) => {
339 self.index_numeric_field(*field, doc_id, v.to_bits())?;
340 }
341 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
342 self.index_dense_vector_field(*field, doc_id, vec)?;
343 }
344 (FieldType::SparseVector, FieldValue::SparseVector { indices, values }) => {
345 self.index_sparse_vector_field(*field, doc_id, indices, values)?;
346 }
347 _ => {}
348 }
349 }
350
351 self.write_document_to_store(&doc)?;
353
354 Ok(doc_id)
355 }
356
357 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
365 self.local_tf_buffer.clear();
368
369 let mut token_count = 0u32;
370
371 for word in text.split_whitespace() {
373 self.token_buffer.clear();
375 for c in word.chars() {
376 if c.is_alphanumeric() {
377 for lc in c.to_lowercase() {
378 self.token_buffer.push(lc);
379 }
380 }
381 }
382
383 if self.token_buffer.is_empty() {
384 continue;
385 }
386
387 token_count += 1;
388
389 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
391 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
392 }
393
394 let field_id = field.0;
397
398 for (&term_spur, &tf) in &self.local_tf_buffer {
399 let term_key = TermKey {
400 field: field_id,
401 term: term_spur,
402 };
403
404 let posting = self
405 .inverted_index
406 .entry(term_key)
407 .or_insert_with(PostingListBuilder::new);
408 posting.add(doc_id, tf);
409 }
410
411 Ok(token_count)
412 }
413
414 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
415 let term_str = format!("__num_{}", value);
417 let term_spur = self.term_interner.get_or_intern(&term_str);
418
419 let term_key = TermKey {
420 field: field.0,
421 term: term_spur,
422 };
423
424 let posting = self
425 .inverted_index
426 .entry(term_key)
427 .or_insert_with(PostingListBuilder::new);
428 posting.add(doc_id, 1);
429
430 Ok(())
431 }
432
433 fn index_dense_vector_field(
435 &mut self,
436 field: Field,
437 doc_id: DocId,
438 vector: &[f32],
439 ) -> Result<()> {
440 let dim = vector.len();
441
442 let builder = self
443 .dense_vectors
444 .entry(field.0)
445 .or_insert_with(|| DenseVectorBuilder::new(dim));
446
447 if builder.dim != dim && builder.len() > 0 {
449 return Err(crate::Error::Schema(format!(
450 "Dense vector dimension mismatch: expected {}, got {}",
451 builder.dim, dim
452 )));
453 }
454
455 builder.add(doc_id, vector);
456 Ok(())
457 }
458
459 fn index_sparse_vector_field(
465 &mut self,
466 field: Field,
467 doc_id: DocId,
468 indices: &[u32],
469 values: &[f32],
470 ) -> Result<()> {
471 for (&dim_id, &weight) in indices.iter().zip(values.iter()) {
472 let term_str = format!("__sparse_{}", dim_id);
474 let term_spur = self.term_interner.get_or_intern(&term_str);
475
476 let term_key = TermKey {
477 field: field.0,
478 term: term_spur,
479 };
480
481 let quantized_weight = (weight.abs() * 1000.0).min(u16::MAX as f32) as u32;
484
485 let posting = self
486 .inverted_index
487 .entry(term_key)
488 .or_insert_with(PostingListBuilder::new);
489 posting.add(doc_id, quantized_weight.max(1));
490 }
491
492 Ok(())
493 }
494
495 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
497 use byteorder::{LittleEndian, WriteBytesExt};
498
499 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
500
501 self.store_file
502 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
503 self.store_file.write_all(&doc_bytes)?;
504
505 Ok(())
506 }
507
508 pub async fn build<D: Directory + DirectoryWriter>(
510 mut self,
511 dir: &D,
512 segment_id: SegmentId,
513 ) -> Result<SegmentMeta> {
514 self.store_file.flush()?;
516
517 let files = SegmentFiles::new(segment_id.0);
518
519 let store_path = self.store_path.clone();
521 let schema = self.schema.clone();
522 let num_compression_threads = self.config.num_compression_threads;
523 let compression_level = self.config.compression_level;
524
525 let (postings_result, store_result) = rayon::join(
527 || self.build_postings(),
528 || {
529 Self::build_store_parallel(
530 &store_path,
531 &schema,
532 num_compression_threads,
533 compression_level,
534 )
535 },
536 );
537
538 let (term_dict_data, postings_data) = postings_result?;
539 let store_data = store_result?;
540
541 dir.write(&files.term_dict, &term_dict_data).await?;
543 dir.write(&files.postings, &postings_data).await?;
544 dir.write(&files.store, &store_data).await?;
545
546 if !self.dense_vectors.is_empty() {
548 let vectors_data = self.build_vectors_file()?;
549 if !vectors_data.is_empty() {
550 dir.write(&files.vectors, &vectors_data).await?;
551 }
552 }
553
554 let meta = SegmentMeta {
555 id: segment_id.0,
556 num_docs: self.next_doc_id,
557 field_stats: self.field_stats.clone(),
558 };
559
560 dir.write(&files.meta, &meta.serialize()?).await?;
561
562 let _ = std::fs::remove_file(&self.store_path);
564
565 Ok(meta)
566 }
567
568 fn build_vectors_file(&self) -> Result<Vec<u8>> {
575 use byteorder::{LittleEndian, WriteBytesExt};
576
577 let mut field_indexes: Vec<(u32, Vec<u8>)> = Vec::new();
579
580 for (&field_id, builder) in &self.dense_vectors {
581 if builder.len() == 0 {
582 continue;
583 }
584
585 let vectors = builder.get_vectors();
586 let field = crate::dsl::Field(field_id);
587
588 let ivf_config = self
590 .schema
591 .get_field_entry(field)
592 .and_then(|e| e.dense_vector_config.as_ref())
593 .filter(|c| c.uses_ivf());
594
595 let index_bytes = if let Some(dense_config) = ivf_config {
596 let centroids_path = dense_config.coarse_centroids_path.as_ref().unwrap();
598 match crate::structures::CoarseCentroids::load(std::path::Path::new(centroids_path))
599 {
600 Ok(coarse_centroids) => {
601 let ivf_cfg = crate::structures::IVFConfig::new(builder.dim);
602 let doc_ids: Vec<u32> = builder.doc_ids.clone();
603 let ivf_index = crate::structures::IVFRaBitQIndex::build(
604 ivf_cfg,
605 &coarse_centroids,
606 &vectors,
607 Some(&doc_ids),
608 );
609 serde_json::to_vec(&ivf_index)
610 .map_err(|e| crate::Error::Serialization(e.to_string()))?
611 }
612 Err(e) => {
613 log::warn!("Failed to load centroids: {}, using RaBitQ", e);
614 let cfg = crate::structures::RaBitQConfig::new(builder.dim);
615 let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
616 serde_json::to_vec(&idx)
617 .map_err(|e| crate::Error::Serialization(e.to_string()))?
618 }
619 }
620 } else {
621 let cfg = crate::structures::RaBitQConfig::new(builder.dim);
623 let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
624 serde_json::to_vec(&idx).map_err(|e| crate::Error::Serialization(e.to_string()))?
625 };
626
627 field_indexes.push((field_id, index_bytes));
628 }
629
630 if field_indexes.is_empty() {
631 return Ok(Vec::new());
632 }
633
634 field_indexes.sort_by_key(|(id, _)| *id);
636
637 let header_size = 4 + field_indexes.len() * (4 + 8 + 8);
639
640 let mut output = Vec::new();
642
643 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
645
646 let mut current_offset = header_size as u64;
648 for (field_id, data) in &field_indexes {
649 output.write_u32::<LittleEndian>(*field_id)?;
650 output.write_u64::<LittleEndian>(current_offset)?;
651 output.write_u64::<LittleEndian>(data.len() as u64)?;
652 current_offset += data.len() as u64;
653 }
654
655 for (_, data) in field_indexes {
657 output.extend_from_slice(&data);
658 }
659
660 Ok(output)
661 }
662
663 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
667 let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
670 .inverted_index
671 .iter()
672 .map(|(term_key, posting_list)| {
673 let term_str = self.term_interner.resolve(&term_key.term);
674 let mut key = Vec::with_capacity(4 + term_str.len());
675 key.extend_from_slice(&term_key.field.to_le_bytes());
676 key.extend_from_slice(term_str.as_bytes());
677 (key, posting_list)
678 })
679 .collect();
680
681 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
683
684 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
687 .into_par_iter()
688 .map(|(key, posting_builder)| {
689 let mut full_postings = PostingList::with_capacity(posting_builder.len());
691 for p in &posting_builder.postings {
692 full_postings.push(p.doc_id, p.term_freq as u32);
693 }
694
695 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
697 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
698
699 let result = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
700 SerializedPosting::Inline(inline)
701 } else {
702 let mut posting_bytes = Vec::new();
704 let block_list =
705 crate::structures::BlockPostingList::from_posting_list(&full_postings)
706 .expect("BlockPostingList creation failed");
707 block_list
708 .serialize(&mut posting_bytes)
709 .expect("BlockPostingList serialization failed");
710 SerializedPosting::External {
711 bytes: posting_bytes,
712 doc_count: full_postings.doc_count(),
713 }
714 };
715
716 (key, result)
717 })
718 .collect();
719
720 let mut term_dict = Vec::new();
722 let mut postings = Vec::new();
723 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
724
725 for (key, serialized_posting) in serialized {
726 let term_info = match serialized_posting {
727 SerializedPosting::Inline(info) => info,
728 SerializedPosting::External { bytes, doc_count } => {
729 let posting_offset = postings.len() as u64;
730 let posting_len = bytes.len() as u32;
731 postings.extend_from_slice(&bytes);
732 TermInfo::external(posting_offset, posting_len, doc_count)
733 }
734 };
735
736 writer.insert(&key, &term_info)?;
737 }
738
739 writer.finish()?;
740 Ok((term_dict, postings))
741 }
742
743 fn build_store_parallel(
747 store_path: &PathBuf,
748 schema: &Schema,
749 num_compression_threads: usize,
750 compression_level: CompressionLevel,
751 ) -> Result<Vec<u8>> {
752 use super::store::EagerParallelStoreWriter;
753
754 let file = File::open(store_path)?;
755 let mmap = unsafe { memmap2::Mmap::map(&file)? };
756
757 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
759 let mut offset = 0usize;
760 while offset + 4 <= mmap.len() {
761 let doc_len = u32::from_le_bytes([
762 mmap[offset],
763 mmap[offset + 1],
764 mmap[offset + 2],
765 mmap[offset + 3],
766 ]) as usize;
767 offset += 4;
768
769 if offset + doc_len > mmap.len() {
770 break;
771 }
772
773 doc_ranges.push((offset, doc_len));
774 offset += doc_len;
775 }
776
777 let docs: Vec<Document> = doc_ranges
779 .into_par_iter()
780 .filter_map(|(start, len)| {
781 let doc_bytes = &mmap[start..start + len];
782 super::store::deserialize_document(doc_bytes, schema).ok()
783 })
784 .collect();
785
786 let mut store_data = Vec::new();
788 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
789 &mut store_data,
790 num_compression_threads,
791 compression_level,
792 );
793
794 for doc in &docs {
795 store_writer.store(doc, schema)?;
796 }
797
798 store_writer.finish()?;
799 Ok(store_data)
800 }
801}
802
803impl Drop for SegmentBuilder {
804 fn drop(&mut self) {
805 let _ = std::fs::remove_file(&self.store_path);
807 }
808}