use std::collections::HashMap;
use std::sync::RwLock;
use crate::core::{FieldId, Result, SegmentId};
use crate::inverted::norms::FieldNormsReader;
use crate::inverted::postings::{
BlockMaxPostingListReader, PositionPostingListReader, PostingListReader, has_block_max,
has_positions,
};
use crate::inverted::term_dict::TermDict;
use crate::segment::format::{ComponentType, SegmentHeader};
use crate::spatial::geo::GeoPointStore;
use crate::store::doc_store::DocStoreReader;
struct FieldIndex<'a> {
term_dict: TermDict<'a>,
postings_data: &'a [u8],
norms_data: Option<&'a [u8]>,
}
pub struct SegmentReader {
data: Vec<u8>,
header: SegmentHeader,
#[allow(dead_code)]
header_size: usize,
geo_cache: RwLock<HashMap<FieldId, GeoPointStore>>,
geo_shape_cache: RwLock<HashMap<FieldId, crate::spatial::shape::GeoShapeStore>>,
}
impl SegmentReader {
pub fn open(data: Vec<u8>) -> Result<Self> {
let (header, header_size) = SegmentHeader::from_bytes(&data)?;
Ok(Self {
data,
header,
header_size,
geo_cache: RwLock::new(HashMap::new()),
geo_shape_cache: RwLock::new(HashMap::new()),
})
}
pub fn segment_id(&self) -> SegmentId {
self.header.segment_id
}
pub fn doc_count(&self) -> u32 {
self.header.doc_count
}
pub fn max_doc(&self) -> u32 {
self.header.max_doc
}
pub fn header(&self) -> &SegmentHeader {
&self.header
}
pub fn doc_store(&self) -> DocStoreReader<'_> {
let comp = self
.header
.component(ComponentType::DocStore)
.expect("segment must have a DocStore component");
let start = comp.offset as usize;
let end = start + comp.length as usize;
DocStoreReader::open(&self.data[start..end])
}
pub fn postings(&self, field_id: FieldId, term: &str) -> Option<PostingListReader<'_>> {
let field_index = self.field_index(field_id)?;
let posting_offset = field_index.term_dict.get(term)?;
let postings_data = &field_index.postings_data[posting_offset as usize..];
Some(PostingListReader::new(postings_data))
}
pub fn postings_block_max(
&self,
field_id: FieldId,
term: &str,
) -> Option<BlockMaxPostingListReader<'_>> {
let field_index = self.field_index(field_id)?;
let posting_offset = field_index.term_dict.get(term)?;
let postings_data = &field_index.postings_data[posting_offset as usize..];
if has_block_max(postings_data) {
Some(BlockMaxPostingListReader::new(postings_data))
} else {
None
}
}
pub fn postings_with_positions(
&self,
field_id: FieldId,
term: &str,
) -> Option<PositionPostingListReader<'_>> {
let field_index = self.field_index(field_id)?;
let posting_offset = field_index.term_dict.get(term)?;
let postings_data = &field_index.postings_data[posting_offset as usize..];
if has_positions(postings_data) {
Some(PositionPostingListReader::new(postings_data))
} else {
None
}
}
pub fn terms_with_prefix(&self, field_id: FieldId, prefix: &str) -> Vec<(String, u32)> {
let Some(field_index) = self.field_index(field_id) else {
return Vec::new();
};
field_index
.term_dict
.prefix_iter(prefix)
.into_iter()
.map(|(term, offset)| {
let postings_data = &field_index.postings_data[offset as usize..];
let reader = PostingListReader::new(postings_data);
(term, reader.len())
})
.collect()
}
pub fn automaton_search<A: fst::Automaton>(
&self,
field_id: FieldId,
aut: A,
) -> Vec<(String, u32)> {
let Some(field_index) = self.field_index(field_id) else {
return Vec::new();
};
field_index
.term_dict
.automaton_search(aut)
.into_iter()
.map(|(term, offset)| {
let postings_data = &field_index.postings_data[offset as usize..];
let reader = PostingListReader::new(postings_data);
(term, reader.len())
})
.collect()
}
pub fn doc_freq(&self, field_id: FieldId, term: &str) -> u32 {
match self.postings(field_id, term) {
Some(reader) => reader.len(),
None => 0,
}
}
pub fn parent_bitset(&self) -> Option<&[bool]> {
self.header.parent_bitset.as_deref()
}
pub fn geo_points(&self, field_id: FieldId) -> Option<GeoPointStore> {
{
let cache = self.geo_cache.read().unwrap();
if let Some(store) = cache.get(&field_id) {
return Some(store.clone());
}
}
let comp = self.header.component(ComponentType::Spatial)?;
let start = comp.offset as usize;
let spatial_data = &self.data[start..start + comp.length as usize];
let num_fields = u16::from_le_bytes([spatial_data[0], spatial_data[1]]) as usize;
let mut pos = 2;
for _ in 0..num_fields {
let fid = FieldId::new(u16::from_le_bytes([
spatial_data[pos],
spatial_data[pos + 1],
]));
pos += 2;
let sub_type = spatial_data[pos];
pos += 1;
let data_len =
u32::from_le_bytes(spatial_data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
if fid == field_id && sub_type == 0 {
let store = GeoPointStore::from_bytes(&spatial_data[pos..pos + data_len]);
self.geo_cache
.write()
.unwrap()
.insert(field_id, store.clone());
return Some(store);
}
pos += data_len;
}
None
}
pub fn geo_shapes(&self, field_id: FieldId) -> Option<crate::spatial::shape::GeoShapeStore> {
{
let cache = self.geo_shape_cache.read().unwrap();
if let Some(store) = cache.get(&field_id) {
return Some(store.clone());
}
}
let comp = self.header.component(ComponentType::Spatial)?;
let start = comp.offset as usize;
let spatial_data = &self.data[start..start + comp.length as usize];
let num_fields = u16::from_le_bytes([spatial_data[0], spatial_data[1]]) as usize;
let mut pos = 2;
for _ in 0..num_fields {
let fid = FieldId::new(u16::from_le_bytes([
spatial_data[pos],
spatial_data[pos + 1],
]));
pos += 2;
let sub_type = spatial_data[pos];
pos += 1;
let data_len =
u32::from_le_bytes(spatial_data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
if fid == field_id && sub_type == 1 {
let store = crate::spatial::shape::GeoShapeStore::from_bytes(
&spatial_data[pos..pos + data_len],
);
self.geo_shape_cache
.write()
.unwrap()
.insert(field_id, store.clone());
return Some(store);
}
pos += data_len;
}
None
}
pub fn column(&self, field_id: FieldId) -> Option<crate::columnar::reader::ColumnReader<'_>> {
let comp = self.header.component(ComponentType::Columnar)?;
let start = comp.offset as usize;
let end = start + comp.length as usize;
let columnar = crate::columnar::reader::ColumnarReader::open(&self.data[start..end]);
columnar.column(field_id)
}
pub fn norms(&self, field_id: FieldId) -> Option<FieldNormsReader<'_>> {
let field_index = self.field_index(field_id)?;
let norms_data = field_index.norms_data?;
Some(FieldNormsReader::open(norms_data))
}
pub fn avg_field_length(&self, field_id: FieldId) -> f32 {
match self.norms(field_id) {
Some(norms_reader) => {
if norms_reader.doc_count() == 0 {
return 0.0;
}
let mut total = 0.0f64;
for i in 0..norms_reader.doc_count() {
total += norms_reader.norm(crate::core::DocId::new(i)) as f64;
}
(total / norms_reader.doc_count() as f64) as f32
}
None => 0.0,
}
}
fn field_index(&self, field_id: FieldId) -> Option<FieldIndex<'_>> {
let comp = self.header.component(ComponentType::InvertedIndex)?;
let inv_start = comp.offset as usize;
let inv_data = &self.data[inv_start..inv_start + comp.length as usize];
let num_fields = u16::from_le_bytes([inv_data[0], inv_data[1]]) as usize;
let mut pos = 2;
for _ in 0..num_fields {
let fid = FieldId::new(u16::from_le_bytes([inv_data[pos], inv_data[pos + 1]]));
pos += 2;
let td_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let td_data = &inv_data[pos..pos + td_len];
pos += td_len;
let pd_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let pd_data = &inv_data[pos..pos + pd_len];
pos += pd_len;
let has_norms = inv_data[pos] != 0;
pos += 1;
let norms_data = if has_norms {
let n_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let nd = &inv_data[pos..pos + n_len];
pos += n_len;
Some(nd)
} else {
None
};
if fid == field_id {
return Some(FieldIndex {
term_dict: TermDict::open(td_data),
postings_data: pd_data,
norms_data,
});
}
}
None
}
pub fn terms(&self, field_id: FieldId) -> Vec<String> {
let Some(field_index) = self.field_index(field_id) else {
return Vec::new();
};
let td = &field_index.term_dict;
let mut result = Vec::new();
if td.len() == 0 {
return result;
}
let comp = self.header.component(ComponentType::InvertedIndex).unwrap();
let inv_start = comp.offset as usize;
let inv_data = &self.data[inv_start..inv_start + comp.length as usize];
let mut pos = 2;
for _ in 0..u16::from_le_bytes([inv_data[0], inv_data[1]]) {
let fid = FieldId::new(u16::from_le_bytes([inv_data[pos], inv_data[pos + 1]]));
pos += 2;
let td_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let td_data = &inv_data[pos..pos + td_len];
pos += td_len;
let pd_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4 + pd_len;
let has_norms = inv_data[pos] != 0;
pos += 1;
if has_norms {
let n_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4 + n_len;
}
if fid == field_id {
let td = crate::inverted::term_dict::TermDict::open(td_data);
for (term, _) in td.prefix_iter("") {
result.push(term);
}
break;
}
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::analysis::Token;
use crate::core::DocId;
use crate::mapping::{FieldType, Mapping};
use crate::segment::builder::SegmentBuilder;
fn make_tokens(terms: &[&str]) -> Vec<Token> {
terms
.iter()
.enumerate()
.map(|(i, t)| Token::new(*t, 0, t.len(), i as u32))
.collect()
}
fn build_single_doc_segment() -> Vec<u8> {
let schema = Mapping::builder().field("title", FieldType::Text).build();
let mut builder = SegmentBuilder::new(SegmentId::new(1), &schema);
builder.add_document(
&[(FieldId::new(0), make_tokens(&["hello", "world"]))],
br#"{"title":"hello world"}"#,
);
builder.build()
}
#[test]
fn open_valid_segment() {
let data = build_single_doc_segment();
let reader = SegmentReader::open(data).unwrap();
assert_eq!(reader.segment_id(), SegmentId::new(1));
assert_eq!(reader.doc_count(), 1);
}
#[test]
fn reject_invalid_magic() {
let mut data = build_single_doc_segment();
data[0] = b'X';
assert!(SegmentReader::open(data).is_err());
}
#[test]
fn reject_bad_checksum() {
let mut data = build_single_doc_segment();
data[12] ^= 0xFF; assert!(SegmentReader::open(data).is_err());
}
#[test]
fn term_lookup() {
let data = build_single_doc_segment();
let reader = SegmentReader::open(data).unwrap();
let terms = reader.terms(FieldId::new(0));
assert!(terms.contains(&"hello".to_string()));
assert!(terms.contains(&"world".to_string()));
}
#[test]
fn posting_iteration() {
let data = build_single_doc_segment();
let reader = SegmentReader::open(data).unwrap();
let mut postings = reader.postings(FieldId::new(0), "hello").unwrap();
let (doc_id, tf) = postings.next().unwrap();
assert_eq!(doc_id, DocId::new(0));
assert_eq!(tf, 1);
assert!(postings.next().is_none());
}
#[test]
fn doc_retrieval() {
let data = build_single_doc_segment();
let reader = SegmentReader::open(data).unwrap();
let store = reader.doc_store();
let doc = store.get(0).unwrap();
assert_eq!(doc, br#"{"title":"hello world"}"#);
}
#[test]
fn norms_lookup() {
let data = build_single_doc_segment();
let reader = SegmentReader::open(data).unwrap();
let norms = reader.norms(FieldId::new(0)).unwrap();
assert_eq!(norms.norm(DocId::new(0)), 2.0);
}
#[test]
fn missing_term_returns_none() {
let data = build_single_doc_segment();
let reader = SegmentReader::open(data).unwrap();
assert!(reader.postings(FieldId::new(0), "nonexistent").is_none());
}
#[test]
fn missing_field_returns_none() {
let data = build_single_doc_segment();
let reader = SegmentReader::open(data).unwrap();
assert!(reader.postings(FieldId::new(99), "hello").is_none());
}
#[test]
fn end_to_end_multi_doc() {
let schema = Mapping::builder()
.field("body", FieldType::Text)
.field("tag", FieldType::Keyword)
.build();
let mut builder = SegmentBuilder::new(SegmentId::new(42), &schema);
builder.add_document(
&[
(
FieldId::new(0),
make_tokens(&["the", "quick", "brown", "fox"]),
),
(FieldId::new(1), make_tokens(&["animal"])),
],
br#"{"body":"the quick brown fox","tag":"animal"}"#,
);
builder.add_document(
&[
(FieldId::new(0), make_tokens(&["the", "lazy", "dog"])),
(FieldId::new(1), make_tokens(&["animal"])),
],
br#"{"body":"the lazy dog","tag":"animal"}"#,
);
builder.add_document(
&[
(FieldId::new(0), make_tokens(&["quick", "search", "engine"])),
(FieldId::new(1), make_tokens(&["tech"])),
],
br#"{"body":"quick search engine","tag":"tech"}"#,
);
let data = builder.build();
let reader = SegmentReader::open(data).unwrap();
assert_eq!(reader.doc_count(), 3);
let mut postings = reader.postings(FieldId::new(0), "the").unwrap();
assert_eq!(postings.next().unwrap().0, DocId::new(0));
assert_eq!(postings.next().unwrap().0, DocId::new(1));
assert!(postings.next().is_none());
let mut postings = reader.postings(FieldId::new(0), "quick").unwrap();
assert_eq!(postings.next().unwrap().0, DocId::new(0));
assert_eq!(postings.next().unwrap().0, DocId::new(2));
assert!(postings.next().is_none());
let mut postings = reader.postings(FieldId::new(1), "animal").unwrap();
assert_eq!(postings.next().unwrap().0, DocId::new(0));
assert_eq!(postings.next().unwrap().0, DocId::new(1));
assert!(postings.next().is_none());
let store = reader.doc_store();
let doc0: serde_json::Value = serde_json::from_slice(&store.get(0).unwrap()).unwrap();
assert_eq!(doc0["tag"], "animal");
let doc2: serde_json::Value = serde_json::from_slice(&store.get(2).unwrap()).unwrap();
assert_eq!(doc2["tag"], "tech");
let norms = reader.norms(FieldId::new(0)).unwrap();
assert_eq!(norms.norm(DocId::new(0)), 4.0);
assert_eq!(norms.norm(DocId::new(1)), 3.0);
assert_eq!(norms.norm(DocId::new(2)), 3.0);
assert_eq!(reader.doc_freq(FieldId::new(0), "the"), 2);
assert_eq!(reader.doc_freq(FieldId::new(0), "quick"), 2);
assert_eq!(reader.doc_freq(FieldId::new(0), "fox"), 1);
assert_eq!(reader.doc_freq(FieldId::new(0), "missing"), 0);
}
}