1mod config;
12mod dense;
13#[cfg(feature = "diagnostics")]
14mod diagnostics;
15mod postings;
16mod sparse;
17mod store;
18
19pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
20
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::mem::size_of;
24use std::path::PathBuf;
25
26use hashbrown::HashMap;
27use lasso::{Rodeo, Spur};
28use rustc_hash::FxHashMap;
29
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31use std::sync::Arc;
32
33use crate::directories::{Directory, DirectoryWriter};
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use dense::DenseVectorBuilder;
39use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
40use sparse::SparseVectorBuilder;
41
42const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
48
49const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
51
52const NEW_POS_TERM_OVERHEAD: usize =
54 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
55
56pub struct SegmentBuilder {
63 schema: Arc<Schema>,
64 config: SegmentBuilderConfig,
65 tokenizers: FxHashMap<Field, BoxedTokenizer>,
66
67 term_interner: Rodeo,
69
70 inverted_index: HashMap<TermKey, PostingListBuilder>,
72
73 store_file: BufWriter<File>,
75 store_path: PathBuf,
76
77 next_doc_id: DocId,
79
80 field_stats: FxHashMap<u32, FieldStats>,
82
83 doc_field_lengths: Vec<u32>,
87 num_indexed_fields: usize,
88 field_to_slot: FxHashMap<u32, usize>,
89
90 local_tf_buffer: FxHashMap<Spur, u32>,
93
94 local_positions: FxHashMap<Spur, Vec<u32>>,
97
98 token_buffer: String,
100
101 numeric_buffer: String,
103
104 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
107
108 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
111
112 position_index: HashMap<TermKey, PositionPostingListBuilder>,
115
116 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
118
119 current_element_ordinal: FxHashMap<u32, u32>,
121
122 estimated_memory: usize,
124
125 doc_serialize_buffer: Vec<u8>,
127
128 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
130}
131
132impl SegmentBuilder {
133 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
135 let segment_id = uuid::Uuid::new_v4();
136 let store_path = config
137 .temp_dir
138 .join(format!("hermes_store_{}.tmp", segment_id));
139
140 let store_file = BufWriter::with_capacity(
141 STORE_BUFFER_SIZE,
142 OpenOptions::new()
143 .create(true)
144 .write(true)
145 .truncate(true)
146 .open(&store_path)?,
147 );
148
149 let mut num_indexed_fields = 0;
152 let mut field_to_slot = FxHashMap::default();
153 let mut position_enabled_fields = FxHashMap::default();
154 for (field, entry) in schema.fields() {
155 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
156 field_to_slot.insert(field.0, num_indexed_fields);
157 num_indexed_fields += 1;
158 if entry.positions.is_some() {
159 position_enabled_fields.insert(field.0, entry.positions);
160 }
161 }
162 }
163
164 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
166 let mut fast_fields = FxHashMap::default();
167 for (field, entry) in schema.fields() {
168 if entry.fast {
169 let writer = match entry.field_type {
170 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
171 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
172 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
173 FieldType::Text => FastFieldWriter::new_text(),
174 _ => continue, };
176 fast_fields.insert(field.0, writer);
177 }
178 }
179
180 Ok(Self {
181 schema,
182 tokenizers: FxHashMap::default(),
183 term_interner: Rodeo::new(),
184 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
185 store_file,
186 store_path,
187 next_doc_id: 0,
188 field_stats: FxHashMap::default(),
189 doc_field_lengths: Vec::new(),
190 num_indexed_fields,
191 field_to_slot,
192 local_tf_buffer: FxHashMap::default(),
193 local_positions: FxHashMap::default(),
194 token_buffer: String::with_capacity(64),
195 numeric_buffer: String::with_capacity(32),
196 config,
197 dense_vectors: FxHashMap::default(),
198 sparse_vectors: FxHashMap::default(),
199 position_index: HashMap::new(),
200 position_enabled_fields,
201 current_element_ordinal: FxHashMap::default(),
202 estimated_memory: 0,
203 doc_serialize_buffer: Vec::with_capacity(256),
204 fast_fields,
205 })
206 }
207
208 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
209 self.tokenizers.insert(field, tokenizer);
210 }
211
212 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
215 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
216 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
217 ordinal
218 }
219
220 pub fn num_docs(&self) -> u32 {
221 self.next_doc_id
222 }
223
224 #[inline]
226 pub fn estimated_memory_bytes(&self) -> usize {
227 self.estimated_memory
228 }
229
230 pub fn sparse_dim_count(&self) -> usize {
232 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
233 }
234
235 pub fn stats(&self) -> SegmentBuilderStats {
237 use std::mem::size_of;
238
239 let postings_in_memory: usize =
240 self.inverted_index.values().map(|p| p.postings.len()).sum();
241
242 let compact_posting_size = size_of::<CompactPosting>();
244 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
246 let posting_builder_size = size_of::<PostingListBuilder>();
247 let spur_size = size_of::<lasso::Spur>();
248 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
249
250 let hashmap_entry_base_overhead = 8usize;
253
254 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
256
257 let postings_bytes: usize = self
259 .inverted_index
260 .values()
261 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
262 .sum();
263
264 let index_overhead_bytes = self.inverted_index.len()
266 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
267
268 let interner_arena_overhead = 2 * size_of::<usize>();
271 let avg_term_len = 8; let interner_bytes =
273 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
274
275 let field_lengths_bytes =
277 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
278
279 let mut dense_vectors_bytes: usize = 0;
281 let mut dense_vector_count: usize = 0;
282 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
283 for b in self.dense_vectors.values() {
284 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
285 + b.doc_ids.capacity() * doc_id_ordinal_size
286 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
288 }
289
290 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
292 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
293
294 let mut sparse_vectors_bytes: usize = 0;
296 for builder in self.sparse_vectors.values() {
297 for postings in builder.postings.values() {
298 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
299 }
300 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
302 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
303 }
304 let outer_sparse_entry_size =
306 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
307 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
308
309 let mut position_index_bytes: usize = 0;
311 for pos_builder in self.position_index.values() {
312 for (_, positions) in &pos_builder.postings {
313 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
314 }
315 let pos_entry_size = size_of::<DocId>() + vec_overhead;
317 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
318 }
319 let pos_index_entry_size =
321 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
322 position_index_bytes += self.position_index.len() * pos_index_entry_size;
323
324 let estimated_memory_bytes = postings_bytes
325 + index_overhead_bytes
326 + interner_bytes
327 + field_lengths_bytes
328 + dense_vectors_bytes
329 + local_tf_buffer_bytes
330 + sparse_vectors_bytes
331 + position_index_bytes;
332
333 let memory_breakdown = MemoryBreakdown {
334 postings_bytes,
335 index_overhead_bytes,
336 interner_bytes,
337 field_lengths_bytes,
338 dense_vectors_bytes,
339 dense_vector_count,
340 sparse_vectors_bytes,
341 position_index_bytes,
342 };
343
344 SegmentBuilderStats {
345 num_docs: self.next_doc_id,
346 unique_terms: self.inverted_index.len(),
347 postings_in_memory,
348 interned_strings: self.term_interner.len(),
349 doc_field_lengths_size: self.doc_field_lengths.len(),
350 estimated_memory_bytes,
351 memory_breakdown,
352 }
353 }
354
355 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
357 let doc_id = self.next_doc_id;
358 self.next_doc_id += 1;
359
360 let base_idx = self.doc_field_lengths.len();
362 self.doc_field_lengths
363 .resize(base_idx + self.num_indexed_fields, 0);
364 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
365
366 self.current_element_ordinal.clear();
368
369 for (field, value) in doc.field_values() {
370 let Some(entry) = self.schema.get_field_entry(*field) else {
371 continue;
372 };
373
374 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed {
377 continue;
378 }
379
380 match (&entry.field_type, value) {
381 (FieldType::Text, FieldValue::Text(text)) => {
382 let element_ordinal = self.next_element_ordinal(field.0);
383 let token_count =
384 self.index_text_field(*field, doc_id, text, element_ordinal)?;
385
386 let stats = self.field_stats.entry(field.0).or_default();
387 stats.total_tokens += token_count as u64;
388 if element_ordinal == 0 {
389 stats.doc_count += 1;
390 }
391
392 if let Some(&slot) = self.field_to_slot.get(&field.0) {
393 self.doc_field_lengths[base_idx + slot] = token_count;
394 }
395
396 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
398 ff.add_text(doc_id, text);
399 }
400 }
401 (FieldType::U64, FieldValue::U64(v)) => {
402 self.index_numeric_field(*field, doc_id, *v)?;
403 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
404 ff.add_u64(doc_id, *v);
405 }
406 }
407 (FieldType::I64, FieldValue::I64(v)) => {
408 self.index_numeric_field(*field, doc_id, *v as u64)?;
409 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
410 ff.add_i64(doc_id, *v);
411 }
412 }
413 (FieldType::F64, FieldValue::F64(v)) => {
414 self.index_numeric_field(*field, doc_id, v.to_bits())?;
415 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
416 ff.add_f64(doc_id, *v);
417 }
418 }
419 (FieldType::DenseVector, FieldValue::DenseVector(vec))
420 if entry.indexed || entry.stored =>
421 {
422 let ordinal = self.next_element_ordinal(field.0);
423 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
424 }
425 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
426 let ordinal = self.next_element_ordinal(field.0);
427 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
428 }
429 _ => {}
430 }
431 }
432
433 self.write_document_to_store(&doc)?;
435
436 Ok(doc_id)
437 }
438
439 fn index_text_field(
448 &mut self,
449 field: Field,
450 doc_id: DocId,
451 text: &str,
452 element_ordinal: u32,
453 ) -> Result<u32> {
454 use crate::dsl::PositionMode;
455
456 let field_id = field.0;
457 let position_mode = self
458 .position_enabled_fields
459 .get(&field_id)
460 .copied()
461 .flatten();
462
463 self.local_tf_buffer.clear();
467 for v in self.local_positions.values_mut() {
469 v.clear();
470 }
471
472 let mut token_position = 0u32;
473
474 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
478
479 if let Some(tokens) = custom_tokens {
480 for token in &tokens {
482 let is_new_string = !self.term_interner.contains(&token.text);
483 let term_spur = self.term_interner.get_or_intern(&token.text);
484 if is_new_string {
485 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
486 }
487 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
488
489 if let Some(mode) = position_mode {
490 let encoded_pos = match mode {
491 PositionMode::Ordinal => element_ordinal << 20,
492 PositionMode::TokenPosition => token.position,
493 PositionMode::Full => (element_ordinal << 20) | token.position,
494 };
495 self.local_positions
496 .entry(term_spur)
497 .or_default()
498 .push(encoded_pos);
499 }
500 }
501 token_position = tokens.len() as u32;
502 } else {
503 for word in text.split_whitespace() {
505 self.token_buffer.clear();
506 for c in word.chars() {
507 if c.is_alphanumeric() {
508 for lc in c.to_lowercase() {
509 self.token_buffer.push(lc);
510 }
511 }
512 }
513
514 if self.token_buffer.is_empty() {
515 continue;
516 }
517
518 let is_new_string = !self.term_interner.contains(&self.token_buffer);
519 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
520 if is_new_string {
521 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
522 }
523 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
524
525 if let Some(mode) = position_mode {
526 let encoded_pos = match mode {
527 PositionMode::Ordinal => element_ordinal << 20,
528 PositionMode::TokenPosition => token_position,
529 PositionMode::Full => (element_ordinal << 20) | token_position,
530 };
531 self.local_positions
532 .entry(term_spur)
533 .or_default()
534 .push(encoded_pos);
535 }
536
537 token_position += 1;
538 }
539 }
540
541 for (&term_spur, &tf) in &self.local_tf_buffer {
544 let term_key = TermKey {
545 field: field_id,
546 term: term_spur,
547 };
548
549 let is_new_term = !self.inverted_index.contains_key(&term_key);
550 let posting = self
551 .inverted_index
552 .entry(term_key)
553 .or_insert_with(PostingListBuilder::new);
554 posting.add(doc_id, tf);
555
556 self.estimated_memory += size_of::<CompactPosting>();
557 if is_new_term {
558 self.estimated_memory += NEW_TERM_OVERHEAD;
559 }
560
561 if position_mode.is_some()
562 && let Some(positions) = self.local_positions.get(&term_spur)
563 {
564 let is_new_pos_term = !self.position_index.contains_key(&term_key);
565 let pos_posting = self
566 .position_index
567 .entry(term_key)
568 .or_insert_with(PositionPostingListBuilder::new);
569 for &pos in positions {
570 pos_posting.add_position(doc_id, pos);
571 }
572 self.estimated_memory += positions.len() * size_of::<u32>();
573 if is_new_pos_term {
574 self.estimated_memory += NEW_POS_TERM_OVERHEAD;
575 }
576 }
577 }
578
579 Ok(token_position)
580 }
581
582 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
583 use std::fmt::Write;
584
585 self.numeric_buffer.clear();
586 write!(self.numeric_buffer, "__num_{}", value).unwrap();
587 let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
588 let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
589
590 let term_key = TermKey {
591 field: field.0,
592 term: term_spur,
593 };
594
595 let is_new_term = !self.inverted_index.contains_key(&term_key);
596 let posting = self
597 .inverted_index
598 .entry(term_key)
599 .or_insert_with(PostingListBuilder::new);
600 posting.add(doc_id, 1);
601
602 self.estimated_memory += size_of::<CompactPosting>();
603 if is_new_term {
604 self.estimated_memory += NEW_TERM_OVERHEAD;
605 }
606 if is_new_string {
607 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
608 }
609
610 Ok(())
611 }
612
613 fn index_dense_vector_field(
615 &mut self,
616 field: Field,
617 doc_id: DocId,
618 ordinal: u16,
619 vector: &[f32],
620 ) -> Result<()> {
621 let dim = vector.len();
622
623 let builder = self
624 .dense_vectors
625 .entry(field.0)
626 .or_insert_with(|| DenseVectorBuilder::new(dim));
627
628 if builder.dim != dim && builder.len() > 0 {
630 return Err(crate::Error::Schema(format!(
631 "Dense vector dimension mismatch: expected {}, got {}",
632 builder.dim, dim
633 )));
634 }
635
636 builder.add(doc_id, ordinal, vector);
637
638 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
639
640 Ok(())
641 }
642
643 fn index_sparse_vector_field(
650 &mut self,
651 field: Field,
652 doc_id: DocId,
653 ordinal: u16,
654 entries: &[(u32, f32)],
655 ) -> Result<()> {
656 let weight_threshold = self
658 .schema
659 .get_field_entry(field)
660 .and_then(|entry| entry.sparse_vector_config.as_ref())
661 .map(|config| config.weight_threshold)
662 .unwrap_or(0.0);
663
664 let builder = self
665 .sparse_vectors
666 .entry(field.0)
667 .or_insert_with(SparseVectorBuilder::new);
668
669 builder.inc_vector_count();
670
671 for &(dim_id, weight) in entries {
672 if weight.abs() < weight_threshold {
674 continue;
675 }
676
677 let is_new_dim = !builder.postings.contains_key(&dim_id);
678 builder.add(dim_id, doc_id, ordinal, weight);
679 self.estimated_memory += size_of::<(DocId, u16, f32)>();
680 if is_new_dim {
681 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
684 }
685
686 Ok(())
687 }
688
689 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
691 use byteorder::{LittleEndian, WriteBytesExt};
692
693 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
694
695 self.store_file
696 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
697 self.store_file.write_all(&self.doc_serialize_buffer)?;
698
699 Ok(())
700 }
701
702 pub async fn build<D: Directory + DirectoryWriter>(
708 mut self,
709 dir: &D,
710 segment_id: SegmentId,
711 trained: Option<&super::TrainedVectorStructures>,
712 ) -> Result<SegmentMeta> {
713 self.store_file.flush()?;
715
716 let files = SegmentFiles::new(segment_id.0);
717
718 let position_index = std::mem::take(&mut self.position_index);
720 let position_offsets = if !position_index.is_empty() {
721 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
722 let offsets = postings::build_positions_streaming(
723 position_index,
724 &self.term_interner,
725 &mut *pos_writer,
726 )?;
727 pos_writer.finish()?;
728 offsets
729 } else {
730 FxHashMap::default()
731 };
732
733 let inverted_index = std::mem::take(&mut self.inverted_index);
736 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
737 let store_path = self.store_path.clone();
738 let num_compression_threads = self.config.num_compression_threads;
739 let compression_level = self.config.compression_level;
740 let dense_vectors = std::mem::take(&mut self.dense_vectors);
741 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
742 let schema = &self.schema;
743
744 let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
746 let mut postings_writer = dir.streaming_writer(&files.postings).await?;
747 let mut store_writer = dir.streaming_writer(&files.store).await?;
748 let mut vectors_writer = if !dense_vectors.is_empty() {
749 Some(dir.streaming_writer(&files.vectors).await?)
750 } else {
751 None
752 };
753 let mut sparse_writer = if !sparse_vectors.is_empty() {
754 Some(dir.streaming_writer(&files.sparse).await?)
755 } else {
756 None
757 };
758 let mut fast_fields = std::mem::take(&mut self.fast_fields);
759 let num_docs = self.next_doc_id;
760 let mut fast_writer = if !fast_fields.is_empty() {
761 Some(dir.streaming_writer(&files.fast).await?)
762 } else {
763 None
764 };
765
766 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
767 rayon::join(
768 || {
769 rayon::join(
770 || {
771 postings::build_postings_streaming(
772 inverted_index,
773 term_interner,
774 &position_offsets,
775 &mut *term_dict_writer,
776 &mut *postings_writer,
777 )
778 },
779 || {
780 store::build_store_streaming(
781 &store_path,
782 num_compression_threads,
783 compression_level,
784 &mut *store_writer,
785 num_docs,
786 )
787 },
788 )
789 },
790 || {
791 rayon::join(
792 || {
793 rayon::join(
794 || -> Result<()> {
795 if let Some(ref mut w) = vectors_writer {
796 dense::build_vectors_streaming(
797 dense_vectors,
798 schema,
799 trained,
800 &mut **w,
801 )?;
802 }
803 Ok(())
804 },
805 || -> Result<()> {
806 if let Some(ref mut w) = sparse_writer {
807 sparse::build_sparse_streaming(
808 &mut sparse_vectors,
809 schema,
810 &mut **w,
811 )?;
812 }
813 Ok(())
814 },
815 )
816 },
817 || -> Result<()> {
818 if let Some(ref mut w) = fast_writer {
819 build_fast_fields_streaming(&mut fast_fields, num_docs, &mut **w)?;
820 }
821 Ok(())
822 },
823 )
824 },
825 );
826 postings_result?;
827 store_result?;
828 vectors_result?;
829 sparse_result?;
830 fast_result?;
831 term_dict_writer.finish()?;
832 postings_writer.finish()?;
833 store_writer.finish()?;
834 if let Some(w) = vectors_writer {
835 w.finish()?;
836 }
837 if let Some(w) = sparse_writer {
838 w.finish()?;
839 }
840 if let Some(w) = fast_writer {
841 w.finish()?;
842 }
843 drop(position_offsets);
844 drop(sparse_vectors);
845
846 let meta = SegmentMeta {
847 id: segment_id.0,
848 num_docs: self.next_doc_id,
849 field_stats: self.field_stats.clone(),
850 };
851
852 dir.write(&files.meta, &meta.serialize()?).await?;
853
854 let _ = std::fs::remove_file(&self.store_path);
856
857 Ok(meta)
858 }
859}
860
861fn build_fast_fields_streaming(
863 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
864 num_docs: u32,
865 writer: &mut dyn Write,
866) -> Result<()> {
867 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
868
869 if fast_fields.is_empty() {
870 return Ok(());
871 }
872
873 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
875 field_ids.sort_unstable();
876
877 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
878 let mut current_offset = 0u64;
879
880 for &field_id in &field_ids {
881 let ff = fast_fields.get_mut(&field_id).unwrap();
882 ff.pad_to(num_docs);
883
884 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
885 toc.field_id = field_id;
886 current_offset += bytes_written;
887 toc_entries.push(toc);
888 }
889
890 let toc_offset = current_offset;
892 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
893
894 log::debug!(
895 "[fast-fields] wrote {} columns, {} docs, toc at offset {}",
896 toc_entries.len(),
897 num_docs,
898 toc_offset
899 );
900
901 Ok(())
902}
903
904impl Drop for SegmentBuilder {
905 fn drop(&mut self) {
906 let _ = std::fs::remove_file(&self.store_path);
908 }
909}