use std::collections::BTreeMap;
use std::sync::Arc;
use std::{fmt, io};
use crate::collector::Collector;
use crate::core::Executor;
use crate::index::{SegmentId, SegmentReader};
use crate::query::{Bm25StatisticsProvider, EnableScoring, Query};
use crate::schema::document::DocumentDeserialize;
use crate::schema::{Schema, Term};
use crate::space_usage::SearcherSpaceUsage;
use crate::store::{CacheStats, StoreReader};
use crate::{DocAddress, Index, Opstamp, TrackedObject};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SearcherGeneration {
segments: BTreeMap<SegmentId, Option<Opstamp>>,
generation_id: u64,
}
impl SearcherGeneration {
pub(crate) fn from_segment_readers(
segment_readers: &[SegmentReader],
generation_id: u64,
) -> Self {
let mut segment_id_to_del_opstamp = BTreeMap::new();
for segment_reader in segment_readers {
segment_id_to_del_opstamp
.insert(segment_reader.segment_id(), segment_reader.delete_opstamp());
}
Self {
segments: segment_id_to_del_opstamp,
generation_id,
}
}
pub fn generation_id(&self) -> u64 {
self.generation_id
}
pub fn segments(&self) -> &BTreeMap<SegmentId, Option<Opstamp>> {
&self.segments
}
}
#[derive(Clone)]
pub struct Searcher {
inner: Arc<SearcherInner>,
}
impl Searcher {
pub fn index(&self) -> &Index {
&self.inner.index
}
pub fn generation(&self) -> &SearcherGeneration {
self.inner.generation.as_ref()
}
pub fn doc<D: DocumentDeserialize>(&self, doc_address: DocAddress) -> crate::Result<D> {
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
store_reader.get(doc_address.doc_id)
}
pub fn doc_store_cache_stats(&self) -> CacheStats {
let cache_stats: CacheStats = self
.inner
.store_readers
.iter()
.map(|reader| reader.cache_stats())
.sum();
cache_stats
}
#[cfg(feature = "quickwit")]
pub async fn doc_async<D: DocumentDeserialize>(
&self,
doc_address: DocAddress,
) -> crate::Result<D> {
let executor = self.inner.index.search_executor();
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
store_reader.get_async(doc_address.doc_id, executor).await
}
pub fn schema(&self) -> &Schema {
&self.inner.schema
}
pub fn num_docs(&self) -> u64 {
self.inner
.segment_readers
.iter()
.map(|segment_reader| u64::from(segment_reader.num_docs()))
.sum::<u64>()
}
pub fn doc_freq(&self, term: &Term) -> crate::Result<u64> {
let mut total_doc_freq = 0;
for segment_reader in &self.inner.segment_readers {
let inverted_index = segment_reader.inverted_index(term.field())?;
let doc_freq = inverted_index.doc_freq(term)?;
total_doc_freq += u64::from(doc_freq);
}
Ok(total_doc_freq)
}
#[cfg(feature = "quickwit")]
pub async fn doc_freq_async(&self, term: &Term) -> crate::Result<u64> {
let mut total_doc_freq = 0;
for segment_reader in &self.inner.segment_readers {
let inverted_index = segment_reader.inverted_index(term.field())?;
let doc_freq = inverted_index.doc_freq_async(term).await?;
total_doc_freq += u64::from(doc_freq);
}
Ok(total_doc_freq)
}
pub fn segment_readers(&self) -> &[SegmentReader] {
&self.inner.segment_readers
}
pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader {
&self.inner.segment_readers[segment_ord as usize]
}
pub fn search<C: Collector>(
&self,
query: &dyn Query,
collector: &C,
) -> crate::Result<C::Fruit> {
self.search_with_statistics_provider(query, collector, self)
}
pub fn search_with_statistics_provider<C: Collector>(
&self,
query: &dyn Query,
collector: &C,
statistics_provider: &dyn Bm25StatisticsProvider,
) -> crate::Result<C::Fruit> {
let enabled_scoring = if collector.requires_scoring() {
EnableScoring::enabled_from_statistics_provider(statistics_provider, self)
} else {
EnableScoring::disabled_from_searcher(self)
};
let executor = self.inner.index.search_executor();
self.search_with_executor(query, collector, executor, enabled_scoring)
}
pub fn search_with_executor<C: Collector>(
&self,
query: &dyn Query,
collector: &C,
executor: &Executor,
enabled_scoring: EnableScoring,
) -> crate::Result<C::Fruit> {
let weight = query.weight(enabled_scoring)?;
collector.check_schema(self.schema())?;
let segment_readers = self.segment_readers();
let fruits = executor.map(
|(segment_ord, segment_reader)| {
collector.collect_segment(weight.as_ref(), segment_ord as u32, segment_reader)
},
segment_readers.iter().enumerate(),
)?;
collector.merge_fruits(fruits)
}
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
let mut space_usage = SearcherSpaceUsage::new();
for segment_reader in self.segment_readers() {
space_usage.add_segment(segment_reader.space_usage()?);
}
Ok(space_usage)
}
#[cfg(feature = "quickwit")]
pub async fn search_async<C: Collector + Sync>(
&self,
query: &dyn Query,
collector: &C,
) -> crate::Result<C::Fruit>
where
C::Child: Send,
{
let enabled_scoring = if collector.requires_scoring() {
EnableScoring::enabled_from_searcher(self)
} else {
EnableScoring::disabled_from_searcher(self)
};
let weight = query.weight(enabled_scoring)?;
collector.check_schema(self.schema())?;
let segment_readers = self.segment_readers();
let mut fruits = Vec::with_capacity(segment_readers.len());
for (segment_ord, segment_reader) in segment_readers.iter().enumerate() {
let fruit = collector
.collect_segment_async(weight.as_ref(), segment_ord as u32, segment_reader)
.await?;
fruits.push(fruit);
}
collector.merge_fruits(fruits)
}
}
impl From<Arc<SearcherInner>> for Searcher {
fn from(inner: Arc<SearcherInner>) -> Self {
Searcher { inner }
}
}
pub(crate) struct SearcherInner {
schema: Schema,
index: Index,
segment_readers: Vec<SegmentReader>,
store_readers: Vec<StoreReader>,
generation: TrackedObject<SearcherGeneration>,
}
impl SearcherInner {
pub(crate) fn new(
schema: Schema,
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_num_blocks: usize,
) -> io::Result<SearcherInner> {
assert_eq!(
&segment_readers
.iter()
.map(|reader| (reader.segment_id(), reader.delete_opstamp()))
.collect::<BTreeMap<_, _>>(),
generation.segments(),
"Set of segments referenced by this Searcher and its SearcherGeneration must match"
);
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_num_blocks))
.collect::<io::Result<Vec<_>>>()?;
Ok(SearcherInner {
schema,
index,
segment_readers,
store_readers,
generation,
})
}
}
impl fmt::Debug for Searcher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let segment_ids = self
.segment_readers()
.iter()
.map(SegmentReader::segment_id)
.collect::<Vec<_>>();
write!(f, "Searcher({segment_ids:?})")
}
}