use std::collections::HashMap;
use crate::analysis::Token;
use crate::core::{DocId, FieldId, SegmentId};
use crate::mapping::{FieldMapping, Mapping};
use crate::columnar::writer::{ColumnValue, ColumnarWriter};
use crate::inverted::norms::FieldNormsWriter;
use crate::inverted::postings::{BlockMaxPostingListWriter, PositionPostingListWriter};
use crate::inverted::term_dict::TermDictBuilder;
use crate::segment::format::{ComponentOffset, ComponentType, FieldMeta, SegmentHeader};
use crate::spatial::geo::{GeoPoint, GeoPointStore};
use crate::store::doc_store::DocStoreWriter;
struct TermPosting {
doc_id: DocId,
tf: u32,
positions: Vec<u32>,
}
pub struct SegmentBuilder {
segment_id: SegmentId,
schema: Mapping,
doc_count: u32,
postings: HashMap<FieldId, HashMap<String, Vec<TermPosting>>>,
fields_with_positions: HashMap<FieldId, bool>,
field_lengths: HashMap<FieldId, Vec<u32>>,
doc_store: DocStoreWriter,
columnar: ColumnarWriter,
geo_stores: HashMap<FieldId, GeoPointStore>,
geo_shape_stores: HashMap<FieldId, crate::spatial::shape::GeoShapeStore>,
parent_bitset: Vec<bool>,
has_nested: bool,
}
impl SegmentBuilder {
pub fn new(segment_id: SegmentId, schema: &Mapping) -> Self {
let mut fields_with_positions = HashMap::new();
for mapping in schema.fields() {
if let Some(fid) = schema.field_id(&mapping.name) {
let has_pos = mapping.field_type == crate::mapping::FieldType::Text;
fields_with_positions.insert(fid, has_pos);
}
}
Self {
segment_id,
schema: schema.clone(),
doc_count: 0,
postings: HashMap::new(),
fields_with_positions,
field_lengths: HashMap::new(),
doc_store: DocStoreWriter::new(),
columnar: ColumnarWriter::new(),
parent_bitset: Vec::new(),
has_nested: schema
.fields()
.iter()
.any(|f| matches!(f.field_type, crate::mapping::FieldType::Nested)),
geo_stores: {
let mut gs = HashMap::new();
for mapping in schema.fields() {
if matches!(mapping.field_type, crate::mapping::FieldType::GeoPoint) {
let fid = schema.field_id(&mapping.name).unwrap();
gs.insert(fid, GeoPointStore::new());
}
}
gs
},
geo_shape_stores: {
let mut gs = HashMap::new();
for mapping in schema.fields() {
if matches!(mapping.field_type, crate::mapping::FieldType::GeoShape) {
let fid = schema.field_id(&mapping.name).unwrap();
gs.insert(fid, crate::spatial::shape::GeoShapeStore::new());
}
}
gs
},
}
}
pub fn add_document(&mut self, analyzed_fields: &[(FieldId, Vec<Token>)], source: &[u8]) {
let doc_id = DocId::new(self.doc_count);
for (field_id, tokens) in analyzed_fields {
let mut term_positions: HashMap<&str, Vec<u32>> = HashMap::new();
for token in tokens {
term_positions
.entry(token.text.as_str())
.or_default()
.push(token.position);
}
let field_postings = self.postings.entry(*field_id).or_default();
for (term, positions) in &term_positions {
field_postings
.entry(term.to_string())
.or_default()
.push(TermPosting {
doc_id,
tf: positions.len() as u32,
positions: positions.clone(),
});
}
self.field_lengths
.entry(*field_id)
.or_default()
.push(tokens.len() as u32);
}
for mapping in self.schema.fields() {
let field_id = self.schema.field_id(&mapping.name).unwrap();
if mapping.norms {
let lengths = self.field_lengths.entry(field_id).or_default();
if lengths.len() <= self.doc_count as usize {
lengths.push(0);
}
}
}
self.doc_store.add(source);
self.doc_count += 1;
}
pub fn mark_parent(&mut self) {
while self.parent_bitset.len() < self.doc_count as usize {
self.parent_bitset.push(false);
}
self.parent_bitset[self.doc_count as usize - 1] = true;
}
pub fn mark_nested(&mut self) {
while self.parent_bitset.len() < self.doc_count as usize {
self.parent_bitset.push(false);
}
}
pub fn add_geo_point(&mut self, field_id: FieldId, point: GeoPoint) {
if let Some(store) = self.geo_stores.get_mut(&field_id) {
store.add(point);
}
}
pub fn add_geo_shape(&mut self, field_id: FieldId, geom: &::geo::Geometry<f64>) {
if let Some(store) = self.geo_shape_stores.get_mut(&field_id) {
store.add(geom);
}
}
pub fn add_column_value(&mut self, field_id: FieldId, value: ColumnValue) {
self.columnar.add(field_id, value);
}
pub fn doc_count(&self) -> u32 {
self.doc_count
}
pub fn segment_id(&self) -> SegmentId {
self.segment_id
}
pub fn is_empty(&self) -> bool {
self.doc_count == 0
}
pub fn build(self) -> Vec<u8> {
let field_metas = self.build_field_metas();
let has_columnar = !self.columnar.is_empty();
let has_spatial = (!self.geo_stores.is_empty()
&& self.geo_stores.values().any(|s| !s.is_empty()))
|| (!self.geo_shape_stores.is_empty()
&& self.geo_shape_stores.values().any(|s| !s.is_empty()));
let inverted_data = self.build_inverted_index();
let columnar_data = if has_columnar {
self.columnar.finish()
} else {
Vec::new()
};
let spatial_data = if has_spatial {
let mut buf = Vec::new();
let point_stores: Vec<_> = self
.geo_stores
.iter()
.filter(|(_, s)| !s.is_empty())
.collect();
let shape_stores: Vec<_> = self
.geo_shape_stores
.iter()
.filter(|(_, s)| !s.is_empty())
.collect();
let total = point_stores.len() + shape_stores.len();
buf.extend_from_slice(&(total as u16).to_le_bytes());
for (fid, store) in &point_stores {
buf.extend_from_slice(&fid.as_u16().to_le_bytes());
buf.push(0u8); let data = store.to_bytes();
buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
buf.extend_from_slice(&data);
}
for (fid, store) in &shape_stores {
buf.extend_from_slice(&fid.as_u16().to_le_bytes());
buf.push(1u8); let data = store.to_bytes();
buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
buf.extend_from_slice(&data);
}
buf
} else {
Vec::new()
};
let doc_store_data = self.doc_store.finish();
let num_components = (!inverted_data.is_empty() as usize)
+ (!columnar_data.is_empty() as usize)
+ (!spatial_data.is_empty() as usize)
+ 1;
let fields_size: usize = field_metas.iter().map(|f| f.to_bytes().len()).sum();
let parent_bitset_size = if self.has_nested && !self.parent_bitset.is_empty() {
1 + 4 + (self.parent_bitset.len() + 7) / 8 } else {
1 };
let header_size = 28
+ 1
+ num_components * ComponentOffset::SERIALIZED_SIZE
+ 2
+ fields_size
+ parent_bitset_size;
let mut offset = header_size as u64;
let mut components = Vec::new();
if !inverted_data.is_empty() {
let checksum = xxhash_rust::xxh3::xxh3_64(&inverted_data);
components.push(ComponentOffset {
component_type: ComponentType::InvertedIndex,
offset,
length: inverted_data.len() as u64,
checksum,
});
offset += inverted_data.len() as u64;
}
if !columnar_data.is_empty() {
let checksum = xxhash_rust::xxh3::xxh3_64(&columnar_data);
components.push(ComponentOffset {
component_type: ComponentType::Columnar,
offset,
length: columnar_data.len() as u64,
checksum,
});
offset += columnar_data.len() as u64;
}
if !spatial_data.is_empty() {
let checksum = xxhash_rust::xxh3::xxh3_64(&spatial_data);
components.push(ComponentOffset {
component_type: ComponentType::Spatial,
offset,
length: spatial_data.len() as u64,
checksum,
});
offset += spatial_data.len() as u64;
}
let doc_store_checksum = xxhash_rust::xxh3::xxh3_64(&doc_store_data);
components.push(ComponentOffset {
component_type: ComponentType::DocStore,
offset,
length: doc_store_data.len() as u64,
checksum: doc_store_checksum,
});
let pb = if self.has_nested && !self.parent_bitset.is_empty() {
let mut bs = self.parent_bitset.clone();
while bs.len() < self.doc_count as usize {
bs.push(false);
}
Some(bs)
} else {
None
};
let header = SegmentHeader {
segment_id: self.segment_id,
doc_count: self.doc_count,
max_doc: self.doc_count,
components,
fields: field_metas,
parent_bitset: pb,
};
let header_bytes = header.to_bytes();
debug_assert_eq!(
header_bytes.len(),
header_size,
"header size measurement mismatch"
);
let total_size = header_size
+ inverted_data.len()
+ columnar_data.len()
+ spatial_data.len()
+ doc_store_data.len();
let mut result = Vec::with_capacity(total_size);
result.extend_from_slice(&header_bytes);
result.extend_from_slice(&inverted_data);
result.extend_from_slice(&columnar_data);
result.extend_from_slice(&spatial_data);
result.extend_from_slice(&doc_store_data);
result
}
fn build_inverted_index(&self) -> Vec<u8> {
if self.postings.is_empty() {
return Vec::new();
}
let mut buf = Vec::new();
let mut indexed_field_ids: Vec<FieldId> = self.postings.keys().copied().collect();
indexed_field_ids.sort();
buf.extend_from_slice(&(indexed_field_ids.len() as u16).to_le_bytes());
for &field_id in &indexed_field_ids {
buf.extend_from_slice(&field_id.as_u16().to_le_bytes());
let field_postings = &self.postings[&field_id];
let mut terms: Vec<&String> = field_postings.keys().collect();
terms.sort();
let mut postings_data = Vec::new();
let mut term_dict_builder = TermDictBuilder::new();
let store_positions = self
.fields_with_positions
.get(&field_id)
.copied()
.unwrap_or(false);
for term in &terms {
let offset = postings_data.len() as u64;
let docs = &field_postings[*term];
if store_positions {
let mut plw = PositionPostingListWriter::new();
for posting in docs {
plw.add(posting.doc_id, &posting.positions);
}
postings_data.extend_from_slice(&plw.finish());
} else {
let mut plw = BlockMaxPostingListWriter::new();
for posting in docs {
plw.add(posting.doc_id, posting.tf);
}
postings_data.extend_from_slice(&plw.finish());
}
term_dict_builder.add(term, offset);
}
let term_dict_bytes = term_dict_builder.finish();
buf.extend_from_slice(&(term_dict_bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(&term_dict_bytes);
buf.extend_from_slice(&(postings_data.len() as u32).to_le_bytes());
buf.extend_from_slice(&postings_data);
let has_norms = self.field_lengths.contains_key(&field_id);
buf.push(has_norms as u8);
if has_norms {
let lengths = &self.field_lengths[&field_id];
let mut norms_writer = FieldNormsWriter::new(field_id);
for &len in lengths {
norms_writer.add(len);
}
let norms_bytes = norms_writer.finish();
buf.extend_from_slice(&(norms_bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(&norms_bytes);
}
}
buf
}
fn build_field_metas(&self) -> Vec<FieldMeta> {
self.schema
.fields()
.iter()
.enumerate()
.map(|(i, mapping)| field_meta_from_mapping(FieldId::new(i as u16), mapping))
.collect()
}
}
fn field_meta_from_mapping(field_id: FieldId, mapping: &FieldMapping) -> FieldMeta {
FieldMeta::new(
field_id,
mapping.name.clone(),
mapping.field_type.clone(),
mapping.stored,
mapping.indexed,
mapping.doc_values,
mapping.norms,
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mapping::FieldType;
fn simple_schema() -> Mapping {
Mapping::builder().field("title", FieldType::Text).build()
}
fn two_field_schema() -> Mapping {
Mapping::builder()
.field("title", FieldType::Text)
.field("status", FieldType::Keyword)
.build()
}
fn make_tokens(terms: &[&str]) -> Vec<Token> {
terms
.iter()
.enumerate()
.map(|(i, t)| Token::new(*t, 0, t.len(), i as u32))
.collect()
}
#[test]
fn single_doc_single_field() {
let schema = simple_schema();
let mut builder = SegmentBuilder::new(SegmentId::new(1), &schema);
let tokens = make_tokens(&["hello", "world"]);
builder.add_document(&[(FieldId::new(0), tokens)], br#"{"title":"hello world"}"#);
assert_eq!(builder.doc_count(), 1);
let data = builder.build();
assert!(!data.is_empty());
let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
assert_eq!(header.segment_id, SegmentId::new(1));
assert_eq!(header.doc_count, 1);
assert_eq!(header.max_doc, 1);
}
#[test]
fn multiple_docs() {
let schema = simple_schema();
let mut builder = SegmentBuilder::new(SegmentId::new(2), &schema);
for i in 0..10 {
let tokens = make_tokens(&["term"]);
let source = format!(r#"{{"id":{i}}}"#);
builder.add_document(&[(FieldId::new(0), tokens)], source.as_bytes());
}
assert_eq!(builder.doc_count(), 10);
let data = builder.build();
let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
assert_eq!(header.doc_count, 10);
}
#[test]
fn multiple_fields() {
let schema = two_field_schema();
let mut builder = SegmentBuilder::new(SegmentId::new(3), &schema);
let title_tokens = make_tokens(&["hello", "world"]);
let status_tokens = make_tokens(&["active"]);
builder.add_document(
&[
(FieldId::new(0), title_tokens),
(FieldId::new(1), status_tokens),
],
br#"{"title":"hello world","status":"active"}"#,
);
let data = builder.build();
let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
assert_eq!(header.fields.len(), 2);
assert_eq!(header.fields[0].field_name, "title");
assert_eq!(header.fields[1].field_name, "status");
}
#[test]
fn component_offsets_valid() {
let schema = simple_schema();
let mut builder = SegmentBuilder::new(SegmentId::new(4), &schema);
builder.add_document(&[(FieldId::new(0), make_tokens(&["test"]))], b"{}");
let data = builder.build();
let (header, header_size) = SegmentHeader::from_bytes(&data).unwrap();
let inv = header.component(ComponentType::InvertedIndex).unwrap();
assert_eq!(inv.offset as usize, header_size);
assert!(inv.length > 0);
let ds = header.component(ComponentType::DocStore).unwrap();
assert_eq!(ds.offset, inv.offset + inv.length);
assert!(ds.length > 0);
assert_eq!((ds.offset + ds.length) as usize, data.len());
}
#[test]
fn term_dict_in_segment() {
let schema = simple_schema();
let mut builder = SegmentBuilder::new(SegmentId::new(5), &schema);
builder.add_document(
&[(FieldId::new(0), make_tokens(&["alpha", "beta", "gamma"]))],
b"{}",
);
let data = builder.build();
let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
let inv = header.component(ComponentType::InvertedIndex).unwrap();
let inv_data = &data[inv.offset as usize..(inv.offset + inv.length) as usize];
let checksum = xxhash_rust::xxh3::xxh3_64(inv_data);
assert_eq!(checksum, inv.checksum);
}
#[test]
fn doc_store_in_segment() {
let schema = simple_schema();
let mut builder = SegmentBuilder::new(SegmentId::new(6), &schema);
let source = br#"{"title":"test doc"}"#;
builder.add_document(&[(FieldId::new(0), make_tokens(&["test", "doc"]))], source);
let data = builder.build();
let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
let ds = header.component(ComponentType::DocStore).unwrap();
let ds_data = &data[ds.offset as usize..(ds.offset + ds.length) as usize];
use crate::store::doc_store::DocStoreReader;
let reader = DocStoreReader::open(ds_data);
assert_eq!(reader.doc_count(), 1);
assert_eq!(reader.get(0).unwrap(), source);
}
#[test]
fn empty_builder() {
let schema = simple_schema();
let builder = SegmentBuilder::new(SegmentId::new(7), &schema);
assert!(builder.is_empty());
assert_eq!(builder.doc_count(), 0);
let data = builder.build();
let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
assert_eq!(header.doc_count, 0);
}
#[test]
fn norms_present_for_text_fields() {
let schema = simple_schema();
let mut builder = SegmentBuilder::new(SegmentId::new(8), &schema);
builder.add_document(&[(FieldId::new(0), make_tokens(&["a", "b", "c"]))], b"{}");
builder.add_document(&[(FieldId::new(0), make_tokens(&["x"]))], b"{}");
let data = builder.build();
let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
assert_eq!(header.doc_count, 2);
}
#[test]
fn posting_lists_have_correct_doc_ids() {
let schema = simple_schema();
let mut builder = SegmentBuilder::new(SegmentId::new(9), &schema);
builder.add_document(
&[(FieldId::new(0), make_tokens(&["hello", "world"]))],
b"{}",
);
builder.add_document(&[(FieldId::new(0), make_tokens(&["hello"]))], b"{}");
let data = builder.build();
assert!(data.len() > 0);
}
#[test]
fn multiple_docs_doc_store_all_retrievable() {
let schema = simple_schema();
let mut builder = SegmentBuilder::new(SegmentId::new(10), &schema);
let sources: Vec<String> = (0..50).map(|i| format!(r#"{{"id":{i}}}"#)).collect();
for source in &sources {
builder.add_document(
&[(FieldId::new(0), make_tokens(&["token"]))],
source.as_bytes(),
);
}
let data = builder.build();
let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
let ds = header.component(ComponentType::DocStore).unwrap();
let ds_data = &data[ds.offset as usize..(ds.offset + ds.length) as usize];
use crate::store::doc_store::DocStoreReader;
let reader = DocStoreReader::open(ds_data);
assert_eq!(reader.doc_count(), 50);
for (i, expected) in sources.iter().enumerate() {
assert_eq!(
String::from_utf8(reader.get(i as u32).unwrap()).unwrap(),
*expected
);
}
}
#[test]
fn checksum_validates() {
let schema = simple_schema();
let mut builder = SegmentBuilder::new(SegmentId::new(11), &schema);
builder.add_document(&[(FieldId::new(0), make_tokens(&["test"]))], b"{}");
let data = builder.build();
let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
for comp in &header.components {
let comp_data = &data[comp.offset as usize..(comp.offset + comp.length) as usize];
let checksum = xxhash_rust::xxh3::xxh3_64(comp_data);
assert_eq!(
checksum, comp.checksum,
"component {:?} checksum mismatch",
comp.component_type
);
}
}
}