#[cfg_attr(not(feature = "native"), allow(dead_code))]
pub(crate) mod bmp;
mod config;
mod dense;
#[cfg(feature = "diagnostics")]
mod diagnostics;
#[cfg_attr(not(feature = "native"), allow(dead_code))]
pub(crate) mod graph_bisection;
mod postings;
mod sparse;
mod store;
pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
#[cfg(feature = "native")]
use std::fs::{File, OpenOptions};
#[cfg(feature = "native")]
use std::io::BufWriter;
use std::io::Write;
use std::mem::size_of;
#[cfg(feature = "native")]
use std::path::PathBuf;
use hashbrown::HashMap;
use rustc_hash::FxHashMap;
#[cfg(feature = "native")]
use lasso::{Rodeo, Spur};
#[cfg(not(feature = "native"))]
pub(crate) mod simple_interner {
use hashbrown::HashMap;
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct Spur(u32);
pub struct Rodeo {
strings: Vec<Box<str>>,
map: HashMap<&'static str, u32>,
}
impl Rodeo {
pub fn new() -> Self {
Self {
strings: Vec::new(),
map: HashMap::new(),
}
}
pub fn get(&self, key: &str) -> Option<Spur> {
self.map.get(key).map(|&id| Spur(id))
}
pub fn get_or_intern(&mut self, key: &str) -> Spur {
if let Some(&id) = self.map.get(key) {
return Spur(id);
}
let id = self.strings.len() as u32;
let boxed: Box<str> = key.into();
let static_ref: &'static str = unsafe { &*(boxed.as_ref() as *const str) };
self.strings.push(boxed);
self.map.insert(static_ref, id);
Spur(id)
}
pub fn resolve(&self, spur: &Spur) -> &str {
&self.strings[spur.0 as usize]
}
pub fn len(&self) -> usize {
self.strings.len()
}
}
}
#[cfg(not(feature = "native"))]
use simple_interner::{Rodeo, Spur};
use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
use std::sync::Arc;
use crate::directories::{Directory, DirectoryWriter};
use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
use crate::tokenizer::BoxedTokenizer;
use crate::{DocId, Result};
use dense::{BinaryDenseVectorBuilder, DenseVectorBuilder};
use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
use sparse::SparseVectorBuilder;
const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024;
const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
const NEW_POS_TERM_OVERHEAD: usize =
size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
pub struct SegmentBuilder {
schema: Arc<Schema>,
config: SegmentBuilderConfig,
tokenizers: FxHashMap<Field, BoxedTokenizer>,
term_interner: Rodeo,
inverted_index: HashMap<TermKey, PostingListBuilder>,
#[cfg(feature = "native")]
posting_spill_file: Option<BufWriter<File>>,
#[cfg(feature = "native")]
posting_spill_path: PathBuf,
#[cfg(feature = "native")]
posting_spill_index: HashMap<TermKey, Vec<(u64, u32)>>,
#[cfg(feature = "native")]
posting_spill_offset: u64,
#[cfg(feature = "native")]
store_file: BufWriter<File>,
#[cfg(feature = "native")]
store_path: PathBuf,
#[cfg(not(feature = "native"))]
store_buffer: Vec<u8>,
next_doc_id: DocId,
field_stats: FxHashMap<u32, FieldStats>,
doc_field_lengths: Vec<u32>,
num_indexed_fields: usize,
field_to_slot: FxHashMap<u32, usize>,
local_tf_buffer: FxHashMap<Spur, u32>,
local_positions: FxHashMap<Spur, Vec<u32>>,
token_buffer: String,
numeric_buffer: String,
dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
binary_dense_vectors: FxHashMap<u32, BinaryDenseVectorBuilder>,
sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
position_index: HashMap<TermKey, PositionPostingListBuilder>,
position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
current_element_ordinal: FxHashMap<u32, u32>,
estimated_memory: usize,
doc_serialize_buffer: Vec<u8>,
fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
}
impl SegmentBuilder {
pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
#[cfg(feature = "native")]
let (store_file, store_path, spill_path) = {
let segment_id = uuid::Uuid::new_v4();
let store_path = config
.temp_dir
.join(format!("hermes_store_{}.tmp", segment_id));
let store_file = BufWriter::with_capacity(
STORE_BUFFER_SIZE,
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&store_path)?,
);
let spill_path = config
.temp_dir
.join(format!("hermes_spill_{}.tmp", segment_id));
(store_file, store_path, spill_path)
};
let registry = crate::tokenizer::TokenizerRegistry::new();
let mut num_indexed_fields = 0;
let mut field_to_slot = FxHashMap::default();
let mut position_enabled_fields = FxHashMap::default();
let mut tokenizers = FxHashMap::default();
for (field, entry) in schema.fields() {
if entry.indexed && matches!(entry.field_type, FieldType::Text) {
field_to_slot.insert(field.0, num_indexed_fields);
num_indexed_fields += 1;
if entry.positions.is_some() {
position_enabled_fields.insert(field.0, entry.positions);
}
if let Some(ref tok_name) = entry.tokenizer
&& let Some(tokenizer) = registry.get(tok_name)
{
tokenizers.insert(field, tokenizer);
}
}
}
use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
let mut fast_fields = FxHashMap::default();
for (field, entry) in schema.fields() {
if entry.fast {
let writer = if entry.multi {
match entry.field_type {
FieldType::U64 => {
FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
}
FieldType::I64 => {
FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
}
FieldType::F64 => {
FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
}
FieldType::Text => FastFieldWriter::new_text_multi(),
_ => continue,
}
} else {
match entry.field_type {
FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
FieldType::Text => FastFieldWriter::new_text(),
_ => continue,
}
};
fast_fields.insert(field.0, writer);
}
}
Ok(Self {
schema,
tokenizers,
term_interner: Rodeo::new(),
inverted_index: HashMap::with_capacity(config.posting_map_capacity),
#[cfg(feature = "native")]
posting_spill_file: None,
#[cfg(feature = "native")]
posting_spill_path: spill_path,
#[cfg(feature = "native")]
posting_spill_index: HashMap::new(),
#[cfg(feature = "native")]
posting_spill_offset: 0,
#[cfg(feature = "native")]
store_file,
#[cfg(feature = "native")]
store_path,
#[cfg(not(feature = "native"))]
store_buffer: Vec::with_capacity(STORE_BUFFER_SIZE),
next_doc_id: 0,
field_stats: FxHashMap::default(),
doc_field_lengths: Vec::new(),
num_indexed_fields,
field_to_slot,
local_tf_buffer: FxHashMap::default(),
local_positions: FxHashMap::default(),
token_buffer: String::with_capacity(64),
numeric_buffer: String::with_capacity(32),
config,
dense_vectors: FxHashMap::default(),
binary_dense_vectors: FxHashMap::default(),
sparse_vectors: FxHashMap::default(),
position_index: HashMap::new(),
position_enabled_fields,
current_element_ordinal: FxHashMap::default(),
estimated_memory: 0,
doc_serialize_buffer: Vec::with_capacity(256),
fast_fields,
})
}
pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
self.tokenizers.insert(field, tokenizer);
}
fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
*self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
ordinal
}
pub fn num_docs(&self) -> u32 {
self.next_doc_id
}
#[inline]
pub fn estimated_memory_bytes(&self) -> usize {
self.estimated_memory
}
pub fn sparse_dim_count(&self) -> usize {
self.sparse_vectors.values().map(|b| b.postings.len()).sum()
}
pub fn stats(&self) -> SegmentBuilderStats {
use std::mem::size_of;
let postings_in_memory: usize =
self.inverted_index.values().map(|p| p.postings.len()).sum();
let compact_posting_size = size_of::<CompactPosting>();
let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
let posting_builder_size = size_of::<PostingListBuilder>();
let spur_size = size_of::<Spur>();
let sparse_entry_size = size_of::<(DocId, u16, f32)>();
let hashmap_entry_base_overhead = 8usize;
let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
let postings_bytes: usize = self
.inverted_index
.values()
.map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
.sum();
let index_overhead_bytes = self.inverted_index.len()
* (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
let interner_arena_overhead = 2 * size_of::<usize>();
let avg_term_len = 8; let interner_bytes =
self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
let field_lengths_bytes =
self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
let mut dense_vectors_bytes: usize = 0;
let mut dense_vector_count: usize = 0;
let doc_id_ordinal_size = size_of::<(DocId, u16)>();
for b in self.dense_vectors.values() {
dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
+ b.doc_ids.capacity() * doc_id_ordinal_size
+ 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
}
for b in self.binary_dense_vectors.values() {
dense_vectors_bytes += b.vectors.capacity()
+ b.doc_ids.capacity() * doc_id_ordinal_size
+ 2 * vec_overhead;
dense_vector_count += b.doc_ids.len();
}
let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
let mut sparse_vectors_bytes: usize = 0;
for builder in self.sparse_vectors.values() {
for postings in builder.postings.values() {
sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
}
let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
}
let outer_sparse_entry_size =
size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
let mut position_index_bytes: usize = 0;
for pos_builder in self.position_index.values() {
for (_, positions) in &pos_builder.postings {
position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
}
let pos_entry_size = size_of::<DocId>() + vec_overhead;
position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
}
let pos_index_entry_size =
term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
position_index_bytes += self.position_index.len() * pos_index_entry_size;
let estimated_memory_bytes = postings_bytes
+ index_overhead_bytes
+ interner_bytes
+ field_lengths_bytes
+ dense_vectors_bytes
+ local_tf_buffer_bytes
+ sparse_vectors_bytes
+ position_index_bytes;
let memory_breakdown = MemoryBreakdown {
postings_bytes,
index_overhead_bytes,
interner_bytes,
field_lengths_bytes,
dense_vectors_bytes,
dense_vector_count,
sparse_vectors_bytes,
position_index_bytes,
};
SegmentBuilderStats {
num_docs: self.next_doc_id,
unique_terms: self.inverted_index.len(),
postings_in_memory,
interned_strings: self.term_interner.len(),
doc_field_lengths_size: self.doc_field_lengths.len(),
estimated_memory_bytes,
memory_breakdown,
}
}
pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
let doc_id = self.next_doc_id;
self.next_doc_id += 1;
let base_idx = self.doc_field_lengths.len();
self.doc_field_lengths
.resize(base_idx + self.num_indexed_fields, 0);
self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
self.current_element_ordinal.clear();
for (field, value) in doc.field_values() {
let Some(entry) = self.schema.get_field_entry(*field) else {
continue;
};
if !matches!(
&entry.field_type,
FieldType::DenseVector | FieldType::BinaryDenseVector
) && !entry.indexed
&& !entry.fast
{
continue;
}
match (&entry.field_type, value) {
(FieldType::Text, FieldValue::Text(text)) => {
if entry.indexed {
let element_ordinal = self.next_element_ordinal(field.0);
let token_count =
self.index_text_field(*field, doc_id, text, element_ordinal)?;
let stats = self.field_stats.entry(field.0).or_default();
stats.total_tokens += token_count as u64;
if element_ordinal == 0 {
stats.doc_count += 1;
}
if let Some(&slot) = self.field_to_slot.get(&field.0) {
self.doc_field_lengths[base_idx + slot] = token_count;
}
}
if let Some(ff) = self.fast_fields.get_mut(&field.0) {
ff.add_text(doc_id, text);
}
}
(FieldType::U64, FieldValue::U64(v)) => {
if entry.indexed {
self.index_numeric_field(*field, doc_id, *v)?;
}
if let Some(ff) = self.fast_fields.get_mut(&field.0) {
ff.add_u64(doc_id, *v);
}
}
(FieldType::I64, FieldValue::I64(v)) => {
if entry.indexed {
self.index_numeric_field(*field, doc_id, *v as u64)?;
}
if let Some(ff) = self.fast_fields.get_mut(&field.0) {
ff.add_i64(doc_id, *v);
}
}
(FieldType::F64, FieldValue::F64(v)) => {
if entry.indexed {
self.index_numeric_field(*field, doc_id, v.to_bits())?;
}
if let Some(ff) = self.fast_fields.get_mut(&field.0) {
ff.add_f64(doc_id, *v);
}
}
(FieldType::DenseVector, FieldValue::DenseVector(vec))
if entry.indexed || entry.stored =>
{
let ordinal = self.next_element_ordinal(field.0);
self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
}
(FieldType::BinaryDenseVector, FieldValue::BinaryDenseVector(bytes))
if entry.indexed || entry.stored =>
{
let ordinal = self.next_element_ordinal(field.0);
self.index_binary_dense_vector_field(*field, doc_id, ordinal as u16, bytes)?;
}
(FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
let ordinal = self.next_element_ordinal(field.0);
self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
}
_ => {}
}
}
self.write_document_to_store(&doc)?;
Ok(doc_id)
}
fn index_text_field(
&mut self,
field: Field,
doc_id: DocId,
text: &str,
element_ordinal: u32,
) -> Result<u32> {
use crate::dsl::PositionMode;
let field_id = field.0;
let position_mode = self
.position_enabled_fields
.get(&field_id)
.copied()
.flatten();
self.local_tf_buffer.clear();
for v in self.local_positions.values_mut() {
v.clear();
}
let mut token_position = 0u32;
let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
if let Some(tokens) = custom_tokens {
for token in &tokens {
let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
spur
} else {
let spur = self.term_interner.get_or_intern(&token.text);
self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
spur
};
*self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
if let Some(mode) = position_mode {
let encoded_pos = match mode {
PositionMode::Ordinal => element_ordinal << 20,
PositionMode::TokenPosition => token.position,
PositionMode::Full => (element_ordinal << 20) | token.position,
};
self.local_positions
.entry(term_spur)
.or_default()
.push(encoded_pos);
}
}
token_position = tokens.len() as u32;
} else {
for word in text.split_whitespace() {
self.token_buffer.clear();
for c in word.chars() {
if c.is_alphanumeric() {
for lc in c.to_lowercase() {
self.token_buffer.push(lc);
}
}
}
if self.token_buffer.is_empty() {
continue;
}
let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
spur
} else {
let spur = self.term_interner.get_or_intern(&self.token_buffer);
self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
spur
};
*self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
if let Some(mode) = position_mode {
let encoded_pos = match mode {
PositionMode::Ordinal => element_ordinal << 20,
PositionMode::TokenPosition => token_position,
PositionMode::Full => (element_ordinal << 20) | token_position,
};
self.local_positions
.entry(term_spur)
.or_default()
.push(encoded_pos);
}
token_position += 1;
}
}
for (&term_spur, &tf) in &self.local_tf_buffer {
let term_key = TermKey {
field: field_id,
term: term_spur,
};
match self.inverted_index.entry(term_key) {
hashbrown::hash_map::Entry::Occupied(mut o) => {
o.get_mut().add(doc_id, tf);
self.estimated_memory += size_of::<CompactPosting>();
#[cfg(feature = "native")]
if o.get().should_spill() {
use byteorder::{LittleEndian, WriteBytesExt};
let builder = o.get_mut();
let count = builder.postings.len() as u32;
let offset = self.posting_spill_offset;
let spill_file = if let Some(ref mut f) = self.posting_spill_file {
f
} else {
self.posting_spill_file = Some(BufWriter::with_capacity(
256 * 1024,
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&self.posting_spill_path)?,
));
self.posting_spill_file.as_mut().unwrap()
};
for p in &builder.postings {
spill_file.write_u32::<LittleEndian>(p.doc_id)?;
spill_file.write_u16::<LittleEndian>(p.term_freq)?;
}
self.posting_spill_offset += count as u64 * 6;
self.posting_spill_index
.entry(term_key)
.or_default()
.push((offset, count));
let freed = builder.postings.len() * size_of::<CompactPosting>();
builder.spilled_count += count;
builder.postings.clear();
self.estimated_memory -= freed;
}
}
hashbrown::hash_map::Entry::Vacant(v) => {
let mut posting = PostingListBuilder::new();
posting.add(doc_id, tf);
v.insert(posting);
self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
}
}
if position_mode.is_some()
&& let Some(positions) = self.local_positions.get(&term_spur)
{
match self.position_index.entry(term_key) {
hashbrown::hash_map::Entry::Occupied(mut o) => {
for &pos in positions {
o.get_mut().add_position(doc_id, pos);
}
self.estimated_memory += positions.len() * size_of::<u32>();
}
hashbrown::hash_map::Entry::Vacant(v) => {
let mut pos_posting = PositionPostingListBuilder::new();
for &pos in positions {
pos_posting.add_position(doc_id, pos);
}
self.estimated_memory +=
positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
v.insert(pos_posting);
}
}
}
}
Ok(token_position)
}
fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
use std::fmt::Write;
self.numeric_buffer.clear();
write!(self.numeric_buffer, "__num_{}", value).unwrap();
let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
spur
} else {
let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
spur
};
let term_key = TermKey {
field: field.0,
term: term_spur,
};
match self.inverted_index.entry(term_key) {
hashbrown::hash_map::Entry::Occupied(mut o) => {
o.get_mut().add(doc_id, 1);
self.estimated_memory += size_of::<CompactPosting>();
}
hashbrown::hash_map::Entry::Vacant(v) => {
let mut posting = PostingListBuilder::new();
posting.add(doc_id, 1);
v.insert(posting);
self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
}
}
Ok(())
}
fn index_dense_vector_field(
&mut self,
field: Field,
doc_id: DocId,
ordinal: u16,
vector: &[f32],
) -> Result<()> {
let dim = vector.len();
let builder = self
.dense_vectors
.entry(field.0)
.or_insert_with(|| DenseVectorBuilder::new(dim));
if builder.dim != dim && builder.len() > 0 {
return Err(crate::Error::Schema(format!(
"Dense vector dimension mismatch: expected {}, got {}",
builder.dim, dim
)));
}
builder.add(doc_id, ordinal, vector);
self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
Ok(())
}
fn index_binary_dense_vector_field(
&mut self,
field: Field,
doc_id: DocId,
ordinal: u16,
bytes: &[u8],
) -> Result<()> {
let dim_bits = self
.schema
.get_field_entry(field)
.and_then(|e| e.binary_dense_vector_config.as_ref())
.map(|c| c.dim)
.ok_or_else(|| {
crate::Error::Schema("BinaryDenseVector field missing config".to_string())
})?;
let expected_byte_len = dim_bits.div_ceil(8);
if bytes.len() != expected_byte_len {
return Err(crate::Error::Schema(format!(
"Binary vector byte length mismatch: expected {} (dim={}), got {}",
expected_byte_len,
dim_bits,
bytes.len()
)));
}
let builder = self
.binary_dense_vectors
.entry(field.0)
.or_insert_with(|| BinaryDenseVectorBuilder::new(dim_bits));
builder.add(doc_id, ordinal, bytes);
self.estimated_memory += bytes.len() + size_of::<(DocId, u16)>();
Ok(())
}
fn index_sparse_vector_field(
&mut self,
field: Field,
doc_id: DocId,
ordinal: u16,
entries: &[(u32, f32)],
) -> Result<()> {
let weight_threshold = self
.schema
.get_field_entry(field)
.and_then(|entry| entry.sparse_vector_config.as_ref())
.map(|config| config.weight_threshold)
.unwrap_or(0.0);
let builder = self
.sparse_vectors
.entry(field.0)
.or_insert_with(SparseVectorBuilder::new);
builder.inc_vector_count();
for &(dim_id, weight) in entries {
if weight.abs() < weight_threshold {
continue;
}
let is_new_dim = !builder.postings.contains_key(&dim_id);
builder.add(dim_id, doc_id, ordinal, weight);
self.estimated_memory += size_of::<(DocId, u16, f32)>();
if is_new_dim {
self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
}
Ok(())
}
fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
use byteorder::{LittleEndian, WriteBytesExt};
super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
#[cfg(feature = "native")]
{
self.store_file
.write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
self.store_file.write_all(&self.doc_serialize_buffer)?;
}
#[cfg(not(feature = "native"))]
{
self.store_buffer
.write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
self.store_buffer.write_all(&self.doc_serialize_buffer)?;
}
Ok(())
}
pub async fn build<D: Directory + DirectoryWriter>(
mut self,
dir: &D,
segment_id: SegmentId,
trained: Option<&super::TrainedVectorStructures>,
) -> Result<SegmentMeta> {
#[cfg(feature = "native")]
self.store_file.flush()?;
let files = SegmentFiles::new(segment_id.0);
let position_index = std::mem::take(&mut self.position_index);
let position_offsets = if !position_index.is_empty() {
let mut pos_writer = dir.streaming_writer(&files.positions).await?;
let offsets = postings::build_positions_streaming(
position_index,
&self.term_interner,
&mut *pos_writer,
)?;
pos_writer.finish()?;
offsets
} else {
FxHashMap::default()
};
let inverted_index = std::mem::take(&mut self.inverted_index);
let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
#[cfg(feature = "native")]
let store_path = self.store_path.clone();
#[cfg(feature = "native")]
let num_compression_threads = self.config.num_compression_threads;
let compression_level = self.config.compression_level;
let dense_vectors = std::mem::take(&mut self.dense_vectors);
let binary_dense_vectors = std::mem::take(&mut self.binary_dense_vectors);
let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
let schema = &self.schema;
let mut term_dict_writer =
super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
let mut postings_writer =
super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
let mut vectors_writer = if !dense_vectors.is_empty() || !binary_dense_vectors.is_empty() {
Some(super::OffsetWriter::new(
dir.streaming_writer(&files.vectors).await?,
))
} else {
None
};
let mut sparse_writer = if !sparse_vectors.is_empty() {
Some(super::OffsetWriter::new(
dir.streaming_writer(&files.sparse).await?,
))
} else {
None
};
let mut fast_fields = std::mem::take(&mut self.fast_fields);
let num_docs = self.next_doc_id;
let mut fast_writer = if !fast_fields.is_empty() {
Some(super::OffsetWriter::new(
dir.streaming_writer(&files.fast).await?,
))
} else {
None
};
#[cfg(feature = "native")]
{
if let Some(ref mut f) = self.posting_spill_file {
f.flush()?;
}
let posting_spill_index = std::mem::take(&mut self.posting_spill_index);
let mut spill_reader_opt = if !posting_spill_index.is_empty() {
let spill_file = std::fs::File::open(&self.posting_spill_path)?;
Some((std::io::BufReader::new(spill_file), posting_spill_index))
} else {
None
};
let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
rayon::join(
|| {
rayon::join(
|| {
let spill_arg = spill_reader_opt.as_mut().map(|(r, idx)| {
(
r as &mut std::io::BufReader<std::fs::File>,
idx as &postings::SpillIndex,
)
});
postings::build_postings_streaming(
inverted_index,
term_interner,
&position_offsets,
&mut term_dict_writer,
&mut postings_writer,
spill_arg,
)
},
|| {
store::build_store_streaming(
&store_path,
num_compression_threads,
compression_level,
&mut store_writer,
num_docs,
)
},
)
},
|| {
rayon::join(
|| {
rayon::join(
|| -> Result<()> {
if let Some(ref mut w) = vectors_writer {
dense::build_vectors_streaming(
dense_vectors,
binary_dense_vectors,
schema,
trained,
w,
)?;
}
Ok(())
},
|| -> Result<()> {
if let Some(ref mut w) = sparse_writer {
sparse::build_sparse_streaming(
&mut sparse_vectors,
schema,
w,
)?;
}
Ok(())
},
)
},
|| -> Result<()> {
if let Some(ref mut w) = fast_writer {
build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
}
Ok(())
},
)
},
);
postings_result?;
store_result?;
vectors_result?;
sparse_result?;
fast_result?;
}
#[cfg(not(feature = "native"))]
{
postings::build_postings_streaming(
inverted_index,
term_interner,
&position_offsets,
&mut term_dict_writer,
&mut postings_writer,
)?;
store::build_store_streaming_from_buffer(
&self.store_buffer,
compression_level,
&mut store_writer,
num_docs,
)?;
if let Some(ref mut w) = vectors_writer {
dense::build_vectors_streaming(
dense_vectors,
binary_dense_vectors,
schema,
trained,
w,
)?;
}
if let Some(ref mut w) = sparse_writer {
sparse::build_sparse_streaming(&mut sparse_vectors, schema, w)?;
}
if let Some(ref mut w) = fast_writer {
build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
}
}
let term_dict_bytes = term_dict_writer.offset() as usize;
let postings_bytes = postings_writer.offset() as usize;
let store_bytes = store_writer.offset() as usize;
let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
term_dict_writer.finish()?;
postings_writer.finish()?;
store_writer.finish()?;
if let Some(w) = vectors_writer {
w.finish()?;
}
if let Some(w) = sparse_writer {
w.finish()?;
}
if let Some(w) = fast_writer {
w.finish()?;
}
drop(position_offsets);
drop(sparse_vectors);
log::info!(
"[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
num_docs,
super::format_bytes(term_dict_bytes),
super::format_bytes(postings_bytes),
super::format_bytes(store_bytes),
super::format_bytes(vectors_bytes),
super::format_bytes(sparse_bytes),
super::format_bytes(fast_bytes),
);
let meta = SegmentMeta {
id: segment_id.0,
num_docs: self.next_doc_id,
field_stats: self.field_stats.clone(),
};
dir.write(&files.meta, &meta.serialize()?).await?;
#[cfg(feature = "native")]
{
let _ = std::fs::remove_file(&self.store_path);
}
Ok(meta)
}
}
fn build_fast_fields_streaming(
fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
num_docs: u32,
writer: &mut dyn Write,
) -> Result<()> {
use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
if fast_fields.is_empty() {
return Ok(());
}
let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
field_ids.sort_unstable();
let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
let mut current_offset = 0u64;
for &field_id in &field_ids {
let ff = fast_fields.get_mut(&field_id).unwrap();
ff.pad_to(num_docs);
let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
toc.field_id = field_id;
current_offset += bytes_written;
toc_entries.push(toc);
}
let toc_offset = current_offset;
write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
Ok(())
}
#[cfg(feature = "native")]
impl Drop for SegmentBuilder {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.store_path);
if self.posting_spill_file.is_some() {
let _ = std::fs::remove_file(&self.posting_spill_path);
}
}
}