use std::sync::Arc;
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use crate::dsl::Schema;
use crate::error::Result;
use crate::segment::{SegmentId, SegmentReader, SegmentSnapshot};
use crate::structures::{CoarseCentroids, PQCodebook};
#[cfg(feature = "native")]
use crate::directories::DirectoryWriter;
#[cfg(feature = "native")]
pub struct Searcher<D: DirectoryWriter + 'static> {
_snapshot: SegmentSnapshot<D>,
segments: Vec<Arc<SegmentReader>>,
schema: Arc<Schema>,
default_fields: Vec<crate::Field>,
tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
}
#[cfg(feature = "native")]
impl<D: DirectoryWriter + 'static> Searcher<D> {
pub(crate) async fn from_snapshot(
directory: Arc<D>,
schema: Arc<Schema>,
snapshot: SegmentSnapshot<D>,
trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
term_cache_blocks: usize,
) -> Result<Self> {
let mut segments = Vec::new();
let mut doc_id_offset = 0u32;
for id_str in snapshot.segment_ids() {
let Some(segment_id) = SegmentId::from_hex(id_str) else {
continue;
};
match SegmentReader::open(
directory.as_ref(),
segment_id,
Arc::clone(&schema),
doc_id_offset,
term_cache_blocks,
)
.await
{
Ok(reader) => {
doc_id_offset += reader.meta().num_docs;
segments.push(Arc::new(reader));
}
Err(e) => {
log::warn!("Failed to open segment {}: {:?}", id_str, e);
}
}
}
let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
schema.default_fields().to_vec()
} else {
schema
.fields()
.filter(|(_, entry)| {
entry.indexed && entry.field_type == crate::dsl::FieldType::Text
})
.map(|(field, _)| field)
.collect()
};
Ok(Self {
_snapshot: snapshot,
segments,
schema,
default_fields,
tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
trained_centroids,
trained_codebooks,
})
}
pub fn schema(&self) -> &Schema {
&self.schema
}
pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
&self.segments
}
pub fn default_fields(&self) -> &[crate::Field] {
&self.default_fields
}
pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
&self.tokenizers
}
pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
&self.trained_centroids
}
pub fn trained_codebooks(&self) -> &FxHashMap<u32, Arc<PQCodebook>> {
&self.trained_codebooks
}
pub fn num_docs(&self) -> u32 {
self.segments.iter().map(|s| s.meta().num_docs).sum()
}
pub fn num_segments(&self) -> usize {
self.segments.len()
}
pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
let mut offset = 0u32;
for segment in &self.segments {
let segment_docs = segment.meta().num_docs;
if doc_id < offset + segment_docs {
let local_doc_id = doc_id - offset;
return segment.doc(local_doc_id).await;
}
offset += segment_docs;
}
Ok(None)
}
pub async fn search(
&self,
query: &dyn crate::query::Query,
limit: usize,
) -> Result<Vec<crate::query::SearchResult>> {
self.search_with_offset(query, limit, 0).await
}
pub async fn search_with_offset(
&self,
query: &dyn crate::query::Query,
limit: usize,
offset: usize,
) -> Result<Vec<crate::query::SearchResult>> {
let fetch_limit = offset + limit;
let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
for segment in &self.segments {
let segment_id = segment.meta().id;
let results =
crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
for result in results {
all_results.push((segment_id, result));
}
}
all_results.sort_by(|a, b| {
b.1.score
.partial_cmp(&a.1.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
Ok(all_results
.into_iter()
.skip(offset)
.take(limit)
.map(|(_, result)| result)
.collect())
}
}
#[cfg(feature = "native")]
pub struct IndexReader<D: DirectoryWriter + 'static> {
schema: Arc<Schema>,
segment_manager: Arc<crate::merge::SegmentManager<D>>,
searcher: RwLock<Arc<Searcher<D>>>,
trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
term_cache_blocks: usize,
last_reload: RwLock<std::time::Instant>,
reload_interval: std::time::Duration,
}
#[cfg(feature = "native")]
impl<D: DirectoryWriter + 'static> IndexReader<D> {
pub async fn from_segment_manager(
schema: Arc<Schema>,
segment_manager: Arc<crate::merge::SegmentManager<D>>,
trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
term_cache_blocks: usize,
) -> Result<Self> {
let reader = Self::create_reader(
&schema,
&segment_manager,
&trained_centroids,
&trained_codebooks,
term_cache_blocks,
)
.await?;
Ok(Self {
schema,
segment_manager,
searcher: RwLock::new(Arc::new(reader)),
trained_centroids,
trained_codebooks,
term_cache_blocks,
last_reload: RwLock::new(std::time::Instant::now()),
reload_interval: std::time::Duration::from_secs(1),
})
}
async fn create_reader(
schema: &Arc<Schema>,
segment_manager: &Arc<crate::merge::SegmentManager<D>>,
trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
trained_codebooks: &FxHashMap<u32, Arc<PQCodebook>>,
term_cache_blocks: usize,
) -> Result<Searcher<D>> {
let snapshot = segment_manager.acquire_snapshot().await;
Searcher::from_snapshot(
segment_manager.directory(),
Arc::clone(schema),
snapshot,
trained_centroids.clone(),
trained_codebooks.clone(),
term_cache_blocks,
)
.await
}
pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
self.reload_interval = interval;
}
pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
let should_reload = {
let last = self.last_reload.read();
last.elapsed() >= self.reload_interval
};
if should_reload {
self.reload().await?;
}
Ok(Arc::clone(&*self.searcher.read()))
}
pub async fn reload(&self) -> Result<()> {
let new_reader = Self::create_reader(
&self.schema,
&self.segment_manager,
&self.trained_centroids,
&self.trained_codebooks,
self.term_cache_blocks,
)
.await?;
*self.searcher.write() = Arc::new(new_reader);
*self.last_reload.write() = std::time::Instant::now();
Ok(())
}
pub fn schema(&self) -> &Schema {
&self.schema
}
}