use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use ahash::AHashMap;
use crate::analysis::analyzer::analyzer::Analyzer;
use crate::analysis::analyzer::standard::StandardAnalyzer;
use crate::analysis::token::Token;
use crate::error::{LaurusError, Result};
use crate::lexical::core::document::Document;
use crate::lexical::core::field::FieldValue;
use crate::lexical::index::inverted::core::posting::{Posting, PostingList};
use crate::lexical::index::inverted::core::terms::{
InvertedIndexTerms, MergedInvertedIndexTerms, TermDictionaryAccess, Terms,
};
use crate::lexical::index::inverted::segment::SegmentInfo;
use crate::lexical::index::structures::bkd_tree::{BKDReader, BKDTree};
use crate::lexical::index::structures::dictionary::HybridTermDictionary;
use crate::lexical::index::structures::dictionary::TermInfo;
use crate::lexical::index::structures::doc_values::DocValuesReader;
use crate::lexical::reader::FieldStats;
use crate::lexical::reader::PostingIterator;
use crate::maintenance::deletion::DeletionBitmap;
use crate::storage::Storage;
use crate::storage::structured::StructReader;
#[derive(Clone)]
pub struct InvertedIndexReaderConfig {
pub max_cache_memory: usize,
pub enable_term_cache: bool,
pub enable_posting_cache: bool,
pub preload_segments: bool,
pub max_cached_terms_per_field: usize,
pub analyzer: Arc<dyn Analyzer>,
}
impl std::fmt::Debug for InvertedIndexReaderConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InvertedIndexReaderConfig")
.field("max_cache_memory", &self.max_cache_memory)
.field("enable_term_cache", &self.enable_term_cache)
.field("enable_posting_cache", &self.enable_posting_cache)
.field("preload_segments", &self.preload_segments)
.field(
"max_cached_terms_per_field",
&self.max_cached_terms_per_field,
)
.field("analyzer", &self.analyzer.name())
.finish()
}
}
impl Default for InvertedIndexReaderConfig {
fn default() -> Self {
InvertedIndexReaderConfig {
max_cache_memory: 128 * 1024 * 1024, enable_term_cache: true,
enable_posting_cache: true,
preload_segments: false,
max_cached_terms_per_field: 10000,
analyzer: Arc::new(
StandardAnalyzer::new().expect("StandardAnalyzer should be creatable"),
),
}
}
}
#[derive(Debug)]
pub struct InvertedIndexPostingIterator {
postings: Vec<crate::lexical::index::inverted::core::posting::Posting>,
position: usize,
block_cache: Option<Vec<PostingBlock>>,
current_block: usize,
started: bool,
}
#[derive(Debug, Clone)]
pub struct PostingBlock {
pub min_doc_id: u64,
pub max_doc_id: u64,
pub start_position: usize,
pub count: usize,
}
impl InvertedIndexPostingIterator {
pub fn new(postings: Vec<Posting>) -> Self {
InvertedIndexPostingIterator {
postings,
position: 0,
block_cache: None,
current_block: 0,
started: false,
}
}
pub fn with_blocks(postings: Vec<Posting>, block_size: usize) -> Self {
let blocks = Self::create_blocks(&postings, block_size);
InvertedIndexPostingIterator {
postings,
position: 0,
block_cache: Some(blocks),
current_block: 0,
started: false,
}
}
fn create_blocks(postings: &[Posting], block_size: usize) -> Vec<PostingBlock> {
let mut blocks = Vec::new();
let mut start = 0;
while start < postings.len() {
let end = (start + block_size).min(postings.len());
let block_postings = &postings[start..end];
if !block_postings.is_empty() {
blocks.push(PostingBlock {
min_doc_id: block_postings[0].doc_id,
max_doc_id: block_postings[block_postings.len() - 1].doc_id,
start_position: start,
count: end - start,
});
}
start = end;
}
blocks
}
fn find_block(&self, target: u64) -> Option<usize> {
if let Some(blocks) = &self.block_cache {
for (i, block) in blocks.iter().enumerate() {
if target >= block.min_doc_id && target <= block.max_doc_id {
return Some(i);
}
if target < block.min_doc_id {
return Some(i);
}
}
if !blocks.is_empty() {
Some(blocks.len() - 1)
} else {
None
}
} else {
None
}
}
}
impl crate::lexical::reader::PostingIterator for InvertedIndexPostingIterator {
fn doc_id(&self) -> u64 {
if self.position < self.postings.len() {
self.postings[self.position].doc_id
} else {
u64::MAX }
}
fn term_freq(&self) -> u64 {
if self.position < self.postings.len() {
self.postings[self.position].frequency as u64
} else {
0
}
}
fn positions(&self) -> Result<Vec<u64>> {
if self.position < self.postings.len() {
Ok(self.postings[self.position]
.positions
.as_ref()
.unwrap_or(&Vec::new())
.iter()
.map(|&p| p as u64)
.collect())
} else {
Ok(Vec::new())
}
}
fn next(&mut self) -> Result<bool> {
if self.postings.is_empty() {
return Ok(false);
}
if !self.started {
self.started = true;
Ok(true)
} else {
self.position += 1;
Ok(self.position < self.postings.len())
}
}
fn skip_to(&mut self, target_doc_id: u64) -> Result<bool> {
self.started = true;
if let Some(block_idx) = self.find_block(target_doc_id)
&& let Some(blocks) = &self.block_cache
{
let block = &blocks[block_idx];
self.position = block.start_position;
self.current_block = block_idx;
}
while self.position < self.postings.len() {
if self.postings[self.position].doc_id >= target_doc_id {
return Ok(true);
}
self.position += 1;
}
Ok(false)
}
fn cost(&self) -> u64 {
self.postings.len() as u64
}
}
#[derive(Debug)]
pub struct SegmentReader {
info: SegmentInfo,
storage: Arc<dyn Storage>,
term_dictionary: RwLock<Option<Arc<HybridTermDictionary>>>,
stored_documents: RwLock<Option<BTreeMap<u64, Document>>>,
field_lengths: RwLock<Option<BTreeMap<u64, AHashMap<String, u32>>>>,
field_stats: RwLock<Option<AHashMap<String, crate::lexical::reader::FieldStats>>>,
doc_values: RwLock<Option<Arc<DocValuesReader>>>,
deletion_bitmap: RwLock<Option<Arc<DeletionBitmap>>>,
bkd_trees: RwLock<AHashMap<String, Arc<dyn BKDTree>>>,
loaded: AtomicBool,
}
impl SegmentReader {
pub fn open(info: SegmentInfo, storage: Arc<dyn Storage>) -> Result<Self> {
let reader = SegmentReader {
info,
storage,
term_dictionary: RwLock::new(None),
stored_documents: RwLock::new(None),
field_lengths: RwLock::new(None),
field_stats: RwLock::new(None),
doc_values: RwLock::new(None),
deletion_bitmap: RwLock::new(None),
bkd_trees: RwLock::new(AHashMap::new()),
loaded: AtomicBool::new(false),
};
Ok(reader)
}
pub fn doc_ids(&self) -> Result<Vec<u64>> {
if !self.loaded.load(Ordering::Acquire) {
self.load_stored_documents()?;
}
let docs = self.stored_documents.read().unwrap();
if let Some(ref documents) = *docs {
Ok(documents.keys().cloned().collect())
} else {
Ok(Vec::new())
}
}
#[deprecated(
since = "0.2.0",
note = "Use `open()` instead. Schema is no longer required."
)]
pub fn open_with_schema(
info: SegmentInfo,
_schema: Arc<()>,
storage: Arc<dyn Storage>,
) -> Result<Self> {
Self::open(info, storage)
}
pub fn load(&mut self) -> Result<()> {
if self.loaded.load(Ordering::Acquire) {
return Ok(());
}
self.load_term_dictionary()?;
self.load_stored_documents()?;
self.load_doc_values()?;
self.load_deletion_bitmap()?;
self.loaded.store(true, Ordering::Release);
Ok(())
}
fn load_term_dictionary(&self) -> Result<()> {
let dict_file = format!("{}.dict", self.info.segment_id);
if let Ok(input) = self.storage.open_input(&dict_file) {
let mut reader = StructReader::new(input)?;
let dictionary = HybridTermDictionary::read_from_storage(&mut reader).map_err(|e| {
LaurusError::index(format!(
"Failed to read term dictionary from {dict_file}: {e}"
))
})?;
*self.term_dictionary.write().unwrap() = Some(Arc::new(dictionary));
}
Ok(())
}
fn load_stored_documents(&self) -> Result<()> {
let json_file = format!("{}.json", self.info.segment_id);
if self.storage.file_exists(&json_file) {
let mut input = self.storage.open_input(&json_file)?;
let mut json_data = String::new();
std::io::Read::read_to_string(&mut input, &mut json_data)?;
let docs: Vec<Document> = serde_json::from_str(&json_data)
.map_err(|e| LaurusError::index(format!("Failed to parse JSON documents: {e}")))?;
let mut documents = BTreeMap::new();
for (idx, doc) in docs.into_iter().enumerate() {
let doc_id = self.info.min_doc_id + idx as u64;
documents.insert(doc_id, doc);
}
*self.stored_documents.write().unwrap() = Some(documents);
return Ok(());
}
let docs_file = format!("{}.docs", self.info.segment_id);
if let Ok(input) = self.storage.open_input(&docs_file) {
let mut reader = StructReader::new(input)?;
let doc_count = reader.read_varint()? as usize;
let mut documents = BTreeMap::new();
for _ in 0..doc_count {
let doc_id = reader.read_u64()?;
let field_count = reader.read_varint()? as usize;
let mut doc = Document::new();
for _ in 0..field_count {
let field_name = reader.read_string()?;
let type_tag = reader.read_u8()?;
let field_value = match type_tag {
0 => {
let text = reader.read_string()?;
FieldValue::Text(text)
}
1 => {
let num = reader.read_u64()? as i64;
FieldValue::Int64(num)
}
2 => {
let num = reader.read_f64()?;
FieldValue::Float64(num)
}
3 => {
let b = reader.read_u8()? != 0;
FieldValue::Bool(b)
}
4 => {
let mime = reader.read_string()?;
let data = reader.read_bytes()?;
FieldValue::Bytes(data, if mime.is_empty() { None } else { Some(mime) })
}
5 => {
let dt_str = reader.read_string()?;
let dt = chrono::DateTime::parse_from_rfc3339(&dt_str)
.map_err(|e| {
LaurusError::index(format!("Failed to parse DateTime: {e}"))
})?
.with_timezone(&chrono::Utc);
FieldValue::DateTime(dt)
}
6 => {
let lat = reader.read_f64()?;
let lon = reader.read_f64()?;
FieldValue::Geo(lat, lon)
}
7 => {
FieldValue::Null
}
_ => {
return Err(LaurusError::index(format!(
"Unknown field type tag: {type_tag}"
)));
}
};
doc.fields.insert(field_name, field_value);
}
documents.insert(doc_id, doc);
}
*self.stored_documents.write().unwrap() = Some(documents);
}
Ok(())
}
fn load_doc_values(&self) -> Result<()> {
let reader = DocValuesReader::load(self.storage.clone(), &self.info.segment_id)?;
let mut doc_values = self.doc_values.write().unwrap();
*doc_values = Some(Arc::new(reader));
Ok(())
}
fn load_deletion_bitmap(&self) -> Result<()> {
if !self.info.has_deletions {
return Ok(());
}
if self.deletion_bitmap.read().unwrap().is_some() {
return Ok(());
}
let bitmap_file = format!("{}.delmap", self.info.segment_id);
if !self.storage.file_exists(&bitmap_file) {
return Ok(());
}
let input = self.storage.open_input(&bitmap_file)?;
let mut reader = StructReader::new(input)?;
let bitmap = DeletionBitmap::read_from_storage(&mut reader)?;
*self.deletion_bitmap.write().unwrap() = Some(Arc::new(bitmap));
Ok(())
}
pub fn is_deleted(&self, doc_id: u64) -> Result<bool> {
if self.deletion_bitmap.read().unwrap().is_none() {
if self.info.has_deletions {
self.load_deletion_bitmap()?;
} else {
return Ok(false);
}
}
let bitmap_lock = self.deletion_bitmap.read().unwrap();
if let Some(ref bitmap) = *bitmap_lock {
Ok(bitmap.is_deleted(doc_id))
} else {
Ok(false)
}
}
fn get_doc_value(&self, field: &str, doc_id: u64) -> Result<Option<FieldValue>> {
if !self.loaded.load(Ordering::Acquire) {
self.ensure_loaded()?;
}
let doc_values = self.doc_values.read().unwrap();
if let Some(reader) = doc_values.as_ref() {
Ok(reader.get_value(field, doc_id).cloned())
} else {
Ok(None)
}
}
fn ensure_loaded(&self) -> Result<()> {
if !self.loaded.load(Ordering::Acquire) {
self.load_doc_values()?;
}
Ok(())
}
fn has_doc_values(&self, field: &str) -> bool {
let doc_values = self.doc_values.read().unwrap();
if let Some(reader) = doc_values.as_ref() {
reader.has_field(field)
} else {
false
}
}
fn load_field_lengths(&self) -> Result<()> {
let lens_file = format!("{}.lens", self.info.segment_id);
if !self.storage.file_exists(&lens_file) {
*self.field_lengths.write().unwrap() = Some(BTreeMap::new());
return Ok(());
}
let lens_input = self.storage.open_input(&lens_file)?;
let mut lens_reader = StructReader::new(lens_input)?;
let doc_count = lens_reader.read_varint()? as usize;
let mut all_field_lengths = BTreeMap::new();
for _ in 0..doc_count {
let doc_id = lens_reader.read_u64()?;
let field_count = lens_reader.read_varint()? as usize;
let mut field_lens = AHashMap::new();
for _ in 0..field_count {
let field_name = lens_reader.read_string()?;
let length = lens_reader.read_u32()?;
field_lens.insert(field_name, length);
}
all_field_lengths.insert(doc_id, field_lens);
}
*self.field_lengths.write().unwrap() = Some(all_field_lengths);
Ok(())
}
fn load_field_stats(&self) -> Result<()> {
let fstats_file = format!("{}.fstats", self.info.segment_id);
if !self.storage.file_exists(&fstats_file) {
*self.field_stats.write().unwrap() = Some(AHashMap::new());
return Ok(());
}
let fstats_input = self.storage.open_input(&fstats_file)?;
let mut fstats_reader = StructReader::new(fstats_input)?;
let field_count = fstats_reader.read_varint()? as usize;
let mut all_field_stats = AHashMap::new();
for _ in 0..field_count {
let field_name = fstats_reader.read_string()?;
let doc_count = fstats_reader.read_u64()?;
let avg_length = fstats_reader.read_f64()?;
let min_length = fstats_reader.read_u64()?;
let max_length = fstats_reader.read_u64()?;
all_field_stats.insert(
field_name.clone(),
crate::lexical::reader::FieldStats {
field: field_name,
unique_terms: 0, total_terms: 0, doc_count,
avg_length,
min_length,
max_length,
},
);
}
*self.field_stats.write().unwrap() = Some(all_field_stats);
Ok(())
}
pub fn field_stats(&self, field: &str) -> Result<Option<FieldStats>> {
if self.field_stats.read().unwrap().is_none() {
self.load_field_stats()?;
}
let field_stats = self.field_stats.read().unwrap();
if let Some(ref stats_map) = *field_stats {
return Ok(stats_map.get(field).cloned());
}
Ok(None)
}
pub fn field_length(&self, doc_id: u64, field: &str) -> Result<Option<u32>> {
if self.field_lengths.read().unwrap().is_none() {
self.load_field_lengths()?;
}
if self.is_deleted(doc_id)? {
return Ok(None);
}
let field_lengths = self.field_lengths.read().unwrap();
if let Some(ref lengths_map) = *field_lengths
&& let Some(doc_lengths) = lengths_map.get(&doc_id)
{
return Ok(doc_lengths.get(field).copied());
}
Ok(None)
}
pub fn document(&self, doc_id: u64) -> Result<Option<Document>> {
if !self.loaded.load(Ordering::Acquire) {
self.load_stored_documents()?;
}
if self.is_deleted(doc_id)? {
return Ok(None);
}
let docs = self.stored_documents.read().unwrap();
if let Some(ref documents) = *docs {
Ok(documents.get(&doc_id).cloned())
} else {
Ok(None)
}
}
pub fn term_info(&self, field: &str, term: &str) -> Result<Option<TermInfo>> {
if self.term_dictionary.read().unwrap().is_none() && !self.loaded.load(Ordering::Acquire) {
self.load_term_dictionary()?;
}
if let Some(ref dict) = *self.term_dictionary.read().unwrap() {
let full_term = format!("{field}:{term}");
Ok(dict.get(&full_term).cloned())
} else {
Ok(None)
}
}
pub fn postings(&self, field: &str, term: &str) -> Result<Option<Box<dyn PostingIterator>>> {
let postings_file = format!("{}.post", self.info.segment_id);
if !self.storage.file_exists(&postings_file) {
return self.scan_documents_for_term(field, term);
}
if let Some(term_info) = self.term_info(field, term)? {
let input = self.storage.open_input(&postings_file)?;
let mut reader = StructReader::new(input)?;
if term_info.posting_offset > 0 {
reader.seek(std::io::SeekFrom::Start(term_info.posting_offset))?;
}
let posting_list = PostingList::decode(&mut reader)?;
let mut filtered = Vec::new();
for posting in posting_list.postings.into_iter() {
if !self.is_deleted(posting.doc_id)? {
filtered.push(posting);
}
}
if filtered.is_empty() {
Ok(None)
} else {
Ok(Some(Box::new(InvertedIndexPostingIterator::with_blocks(
filtered, 64,
))))
}
} else {
Ok(None)
}
}
fn scan_documents_for_term(
&self,
field: &str,
term: &str,
) -> Result<Option<Box<dyn PostingIterator>>> {
if !self.loaded.load(Ordering::Acquire) {
self.load_stored_documents()?;
}
let docs = self.stored_documents.read().unwrap();
if let Some(ref documents) = *docs {
let mut postings = Vec::new();
let default_analyzer = StandardAnalyzer::new()?;
for (doc_id, doc) in documents.iter() {
if self.is_deleted(*doc_id)? {
continue;
}
if let Some(field_value) = doc.get_field(field)
&& let Some(text) = field_value.as_text()
{
let token_stream = default_analyzer.analyze(text)?;
let tokens: Vec<Token> = token_stream.collect();
let mut positions = Vec::new();
for token in tokens.iter() {
if token.text == term {
positions.push(token.position as u32);
}
}
if !positions.is_empty() {
postings.push(Posting {
doc_id: *doc_id,
frequency: positions.len() as u32,
positions: Some(positions),
weight: 1.0,
});
}
}
}
if postings.is_empty() {
Ok(None)
} else {
Ok(Some(Box::new(InvertedIndexPostingIterator::with_blocks(
postings, 64,
))))
}
} else {
Ok(None)
}
}
pub fn doc_count(&self) -> u64 {
if !self.info.has_deletions {
return self.info.doc_count;
}
if let Some(bitmap) = self.deletion_bitmap.read().unwrap().clone() {
return bitmap.live_count();
}
if self.load_deletion_bitmap().is_ok()
&& let Some(bitmap) = self.deletion_bitmap.read().unwrap().clone()
{
return bitmap.live_count();
}
self.info.doc_count
}
pub fn get_bkd_tree(&self, field: &str) -> Result<Option<Arc<dyn BKDTree>>> {
if let Some(tree) = self.bkd_trees.read().unwrap().get(field) {
return Ok(Some(tree.clone()));
}
let bkd_file = format!("{}.{}.bkd", self.info.segment_id, field);
if self.storage.file_exists(&bkd_file) {
let reader = BKDReader::open(self.storage.clone(), &bkd_file)?;
let tree: Arc<dyn BKDTree> = Arc::new(reader);
self.bkd_trees
.write()
.unwrap()
.insert(field.to_string(), tree.clone());
return Ok(Some(tree));
}
Ok(None)
}
}
#[derive(Debug)]
struct MultiSegmentBKDTree {
trees: Vec<Arc<dyn BKDTree>>,
}
impl BKDTree for MultiSegmentBKDTree {
fn range_search(
&self,
mins: &[Option<f64>],
maxs: &[Option<f64>],
include_min: bool,
include_max: bool,
) -> Result<Vec<u64>> {
let mut results = Vec::new();
for tree in &self.trees {
let mut tree_results = tree.range_search(mins, maxs, include_min, include_max)?;
results.append(&mut tree_results);
}
results.sort_unstable();
results.dedup();
Ok(results)
}
}
#[derive(Debug)]
pub struct CacheManager {
term_cache: RwLock<AHashMap<String, TermInfo>>,
#[allow(dead_code)]
posting_cache:
RwLock<AHashMap<String, Arc<Vec<crate::lexical::index::inverted::core::posting::Posting>>>>,
memory_usage: AtomicUsize,
memory_limit: usize,
cache_hits: AtomicUsize,
cache_misses: AtomicUsize,
}
impl CacheManager {
pub fn new(memory_limit: usize) -> Self {
CacheManager {
term_cache: RwLock::new(AHashMap::new()),
posting_cache: RwLock::new(AHashMap::new()),
memory_usage: AtomicUsize::new(0),
memory_limit,
cache_hits: AtomicUsize::new(0),
cache_misses: AtomicUsize::new(0),
}
}
pub fn get_term_info(&self, key: &str) -> Option<TermInfo> {
let cache = self.term_cache.read().unwrap();
if let Some(info) = cache.get(key) {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
Some(info.clone())
} else {
self.cache_misses.fetch_add(1, Ordering::Relaxed);
None
}
}
pub fn cache_term_info(&self, key: String, info: TermInfo) {
if self.memory_usage.load(Ordering::Relaxed) >= self.memory_limit {
let mut cache = self.term_cache.write().unwrap();
let evict_count = cache.len() / 4;
if evict_count > 0 {
let keys_to_remove: Vec<String> = cache.keys().take(evict_count).cloned().collect();
for k in &keys_to_remove {
cache.remove(k);
}
self.memory_usage
.fetch_sub(evict_count * 64, Ordering::Relaxed);
}
}
let mut cache = self.term_cache.write().unwrap();
cache.insert(key, info);
self.memory_usage.fetch_add(64, Ordering::Relaxed);
}
pub fn stats(&self) -> CacheStats {
CacheStats {
hits: self.cache_hits.load(Ordering::Relaxed),
misses: self.cache_misses.load(Ordering::Relaxed),
memory_usage: self.memory_usage.load(Ordering::Relaxed),
memory_limit: self.memory_limit,
}
}
}
#[derive(Debug, Clone)]
pub struct CacheStats {
pub hits: usize,
pub misses: usize,
pub memory_usage: usize,
pub memory_limit: usize,
}
impl CacheStats {
pub fn hit_ratio(&self) -> f64 {
if self.hits + self.misses == 0 {
0.0
} else {
self.hits as f64 / (self.hits + self.misses) as f64
}
}
}
#[derive(Debug, Clone)]
pub struct InvertedIndexReader {
segment_readers: Vec<Arc<RwLock<SegmentReader>>>,
cache_manager: Arc<CacheManager>,
config: InvertedIndexReaderConfig,
closed: Arc<AtomicBool>,
total_doc_count: u64,
}
impl InvertedIndexReader {
pub fn new(
segments: Vec<SegmentInfo>,
storage: Arc<dyn Storage>,
config: InvertedIndexReaderConfig,
) -> Result<Self> {
let cache_manager = Arc::new(CacheManager::new(config.max_cache_memory));
let mut segment_readers = Vec::new();
let mut total_doc_count = 0;
for segment_info in &segments {
total_doc_count += segment_info.doc_count;
let mut reader = SegmentReader::open(segment_info.clone(), storage.clone())?;
if config.preload_segments {
reader.load()?;
}
segment_readers.push(Arc::new(RwLock::new(reader)));
}
Ok(InvertedIndexReader {
segment_readers,
cache_manager,
config,
closed: Arc::new(AtomicBool::new(false)),
total_doc_count,
})
}
pub fn cache_stats(&self) -> CacheStats {
self.cache_manager.stats()
}
pub fn analyzer(&self) -> &Arc<dyn Analyzer> {
&self.config.analyzer
}
fn check_closed(&self) -> Result<()> {
if self.closed.load(Ordering::Acquire) {
Err(LaurusError::index("Reader is closed"))
} else {
Ok(())
}
}
pub fn field_length(&self, doc_id: u64, field: &str) -> Result<Option<u32>> {
self.check_closed()?;
for segment_reader in &self.segment_readers {
let reader = segment_reader.read().unwrap();
if let Ok(Some(length)) = reader.field_length(doc_id, field) {
return Ok(Some(length));
}
}
Ok(None)
}
}
impl crate::lexical::reader::LexicalIndexReader for InvertedIndexReader {
fn doc_count(&self) -> u64 {
self.segment_readers
.iter()
.map(|sr| sr.read().unwrap().doc_count())
.sum()
}
fn max_doc(&self) -> u64 {
self.total_doc_count
}
fn is_deleted(&self, doc_id: u64) -> bool {
for segment_reader in &self.segment_readers {
let reader = segment_reader.read().unwrap();
if let Ok(true) = reader.is_deleted(doc_id) {
return true;
}
}
false
}
fn document(&self, doc_id: u64) -> Result<Option<Document>> {
self.check_closed()?;
for segment_reader in &self.segment_readers {
let reader = segment_reader.read().unwrap();
if let Ok(Some(doc)) = reader.document(doc_id) {
return Ok(Some(doc));
}
}
Ok(None)
}
fn doc_ids(&self) -> Result<Vec<u64>> {
self.check_closed()?;
let mut all_ids = Vec::new();
for segment_reader in &self.segment_readers {
let reader = segment_reader.read().unwrap();
all_ids.extend(reader.doc_ids()?);
}
Ok(all_ids)
}
fn term_info(
&self,
field: &str,
term: &str,
) -> Result<Option<crate::lexical::reader::ReaderTermInfo>> {
self.check_closed()?;
let cache_key = format!("{field}:{term}");
if let Some(cached_info) = self.cache_manager.get_term_info(&cache_key) {
return Ok(Some(crate::lexical::reader::ReaderTermInfo {
field: field.to_string(),
term: term.to_string(),
doc_freq: cached_info.doc_frequency,
total_freq: cached_info.total_frequency,
posting_offset: cached_info.posting_offset,
posting_size: cached_info.posting_length,
}));
}
let mut total_doc_freq = 0;
let mut total_term_freq = 0;
let mut found = false;
for segment_reader in &self.segment_readers {
let reader = segment_reader.read().unwrap();
if let Some(term_info) = reader.term_info(field, term)? {
total_doc_freq += term_info.doc_frequency;
total_term_freq += term_info.total_frequency;
found = true;
}
}
if found {
let reader_info = crate::lexical::reader::ReaderTermInfo {
field: field.to_string(),
term: term.to_string(),
doc_freq: total_doc_freq,
total_freq: total_term_freq,
posting_offset: 0, posting_size: 0, };
let term_info = TermInfo {
posting_offset: 0,
posting_length: 0,
doc_frequency: total_doc_freq,
total_frequency: total_term_freq,
};
self.cache_manager.cache_term_info(cache_key, term_info);
Ok(Some(reader_info))
} else {
Ok(None)
}
}
fn postings(
&self,
field: &str,
term: &str,
) -> Result<Option<Box<dyn crate::lexical::reader::PostingIterator>>> {
self.check_closed()?;
let mut iterators = Vec::new();
for segment_reader in &self.segment_readers {
let reader = segment_reader.read().unwrap();
if let Some(iter) = reader.postings(field, term)? {
iterators.push(iter);
}
}
if iterators.is_empty() {
Ok(None)
} else if iterators.len() == 1 {
Ok(Some(iterators.into_iter().next().unwrap()))
} else {
let merged = MergedPostingIterator::new(iterators)?;
Ok(Some(Box::new(merged)))
}
}
fn field_stats(&self, field: &str) -> Result<Option<crate::lexical::reader::FieldStats>> {
self.check_closed()?;
let mut total_doc_count = 0u64;
let mut total_length_sum = 0u64; let mut min_length = u64::MAX;
let mut max_length = 0u64;
let mut found = false;
for segment_reader in &self.segment_readers {
let reader = segment_reader.read().unwrap();
if let Some(segment_stats) = reader.field_stats(field)? {
total_doc_count += segment_stats.doc_count;
total_length_sum +=
(segment_stats.avg_length * segment_stats.doc_count as f64) as u64;
min_length = min_length.min(segment_stats.min_length);
max_length = max_length.max(segment_stats.max_length);
found = true;
}
}
if found {
Ok(Some(crate::lexical::reader::FieldStats {
field: field.to_string(),
unique_terms: 0, total_terms: 0, doc_count: total_doc_count,
avg_length: if total_doc_count > 0 {
total_length_sum as f64 / total_doc_count as f64
} else {
0.0
},
min_length: if min_length == u64::MAX {
0
} else {
min_length
},
max_length,
}))
} else {
Ok(None)
}
}
fn close(&mut self) -> Result<()> {
self.closed.store(true, Ordering::Release);
Ok(())
}
fn is_closed(&self) -> bool {
self.closed.load(Ordering::Acquire)
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn get_doc_value(&self, field: &str, doc_id: u64) -> Result<Option<FieldValue>> {
for segment_lock in &self.segment_readers {
let segment = segment_lock.read().unwrap();
if let Ok(Some(value)) = segment.get_doc_value(field, doc_id) {
return Ok(Some(value));
}
}
Ok(None)
}
fn has_doc_values(&self, field: &str) -> bool {
self.segment_readers.iter().any(|seg_lock| {
let seg = seg_lock.read().unwrap();
seg.has_doc_values(field)
})
}
fn get_bkd_tree(&self, field: &str) -> Result<Option<Arc<dyn BKDTree>>> {
self.check_closed()?;
let mut trees = Vec::new();
for segment_reader in &self.segment_readers {
let reader = segment_reader.read().unwrap();
if let Some(tree) = reader.get_bkd_tree(field)? {
trees.push(tree);
}
}
if trees.is_empty() {
Ok(None)
} else {
Ok(Some(Arc::new(MultiSegmentBKDTree { trees })))
}
}
}
impl TermDictionaryAccess for InvertedIndexReader {
fn terms(&self, field: &str) -> Result<Option<Box<dyn Terms>>> {
let mut dicts = Vec::new();
for seg_lock in &self.segment_readers {
let seg = seg_lock.read().unwrap();
if seg.term_dictionary.read().unwrap().is_none() {
seg.load_term_dictionary()?;
}
if let Some(dict) = seg.term_dictionary.read().unwrap().clone() {
dicts.push(dict);
}
}
if dicts.is_empty() {
return Ok(None);
}
if dicts.len() == 1 {
let terms = InvertedIndexTerms::new(field, dicts.into_iter().next().unwrap());
return Ok(Some(Box::new(terms)));
}
let terms = MergedInvertedIndexTerms::new(field, &dicts);
Ok(Some(Box::new(terms)))
}
}
#[derive(Debug)]
pub struct MergedPostingIterator {
heap: std::collections::BinaryHeap<IteratorWrapper>,
current_doc: u64,
started: bool,
}
#[derive(Debug)]
struct IteratorWrapper {
iter: Box<dyn crate::lexical::reader::PostingIterator>,
current_doc: u64,
}
impl PartialEq for IteratorWrapper {
fn eq(&self, other: &Self) -> bool {
self.current_doc == other.current_doc
}
}
impl Eq for IteratorWrapper {}
impl PartialOrd for IteratorWrapper {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for IteratorWrapper {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.current_doc.cmp(&self.current_doc)
}
}
impl MergedPostingIterator {
pub fn new(iterators: Vec<Box<dyn crate::lexical::reader::PostingIterator>>) -> Result<Self> {
let mut heap = std::collections::BinaryHeap::new();
for mut iter in iterators {
if iter.next()? {
let doc_id = iter.doc_id();
heap.push(IteratorWrapper {
iter,
current_doc: doc_id,
});
}
}
let current_doc = if let Some(wrapper) = heap.peek() {
wrapper.current_doc
} else {
u64::MAX
};
Ok(MergedPostingIterator {
heap,
current_doc,
started: false,
})
}
fn advance(&mut self) -> Result<bool> {
if let Some(mut wrapper) = self.heap.pop() {
if wrapper.iter.next()? {
wrapper.current_doc = wrapper.iter.doc_id();
self.heap.push(wrapper);
}
if let Some(new_top) = self.heap.peek() {
self.current_doc = new_top.current_doc;
Ok(true)
} else {
self.current_doc = u64::MAX;
Ok(false)
}
} else {
self.current_doc = u64::MAX;
Ok(false)
}
}
}
impl crate::lexical::reader::PostingIterator for MergedPostingIterator {
fn doc_id(&self) -> u64 {
self.current_doc
}
fn term_freq(&self) -> u64 {
if let Some(wrapper) = self.heap.peek() {
wrapper.iter.term_freq()
} else {
0
}
}
fn positions(&self) -> Result<Vec<u64>> {
if let Some(wrapper) = self.heap.peek() {
wrapper.iter.positions()
} else {
Ok(Vec::new())
}
}
fn next(&mut self) -> Result<bool> {
if !self.started {
self.started = true;
return Ok(!self.heap.is_empty());
}
self.advance()
}
fn skip_to(&mut self, target: u64) -> Result<bool> {
if !self.started {
self.started = true;
}
while self.doc_id() < target {
if !self.advance()? {
return Ok(false);
}
}
Ok(self.doc_id() != u64::MAX)
}
fn cost(&self) -> u64 {
self.heap.iter().map(|w| w.iter.cost()).sum()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lexical::reader::PostingIterator;
#[test]
fn test_advanced_posting_iterator() {
let postings = vec![
crate::lexical::index::inverted::core::posting::Posting {
doc_id: 1,
frequency: 1,
positions: Some(vec![0]),
weight: 1.0,
},
crate::lexical::index::inverted::core::posting::Posting {
doc_id: 3,
frequency: 1,
positions: Some(vec![0]),
weight: 1.0,
},
crate::lexical::index::inverted::core::posting::Posting {
doc_id: 5,
frequency: 1,
positions: Some(vec![0]),
weight: 1.0,
},
crate::lexical::index::inverted::core::posting::Posting {
doc_id: 7,
frequency: 1,
positions: Some(vec![0]),
weight: 1.0,
},
crate::lexical::index::inverted::core::posting::Posting {
doc_id: 9,
frequency: 1,
positions: Some(vec![0]),
weight: 1.0,
},
];
let mut iter = InvertedIndexPostingIterator::with_blocks(postings, 2);
assert!(iter.skip_to(5).unwrap());
assert_eq!(iter.doc_id(), 5);
assert!(iter.next().unwrap());
assert_eq!(iter.doc_id(), 7);
assert!(!iter.skip_to(15).unwrap());
assert_eq!(iter.doc_id(), u64::MAX);
}
#[test]
fn test_cache_manager() {
let cache = CacheManager::new(1024);
let key = "field:term".to_string();
let term_info = TermInfo::new(100, 50, 5, 10);
assert!(cache.get_term_info(&key).is_none());
cache.cache_term_info(key.clone(), term_info.clone());
let cached = cache.get_term_info(&key).unwrap();
assert_eq!(cached.doc_frequency, term_info.doc_frequency);
let stats = cache.stats();
assert_eq!(stats.hits, 1);
assert_eq!(stats.misses, 1);
assert!(stats.hit_ratio() > 0.0);
}
#[test]
fn test_segment_info() {
let info = SegmentInfo {
segment_id: "seg_000001".to_string(),
doc_count: 1000,
min_doc_id: 0,
max_doc_id: 999,
generation: 1,
has_deletions: false,
shard_id: 0,
};
assert_eq!(info.segment_id, "seg_000001");
assert_eq!(info.doc_count, 1000);
assert_eq!(info.min_doc_id, 0);
assert_eq!(info.max_doc_id, 999);
assert!(!info.has_deletions);
}
}