mod warming;
use std::sync::atomic::AtomicU64;
use std::sync::{atomic, Arc, Weak};
use arc_swap::ArcSwap;
pub use warming::Warmer;
use self::warming::WarmingState;
use crate::core::searcher::{SearcherGeneration, SearcherInner};
use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject};
#[derive(Clone, Copy)]
pub enum ReloadPolicy {
Manual,
OnCommitWithDelay, }
#[derive(Clone)]
pub struct IndexReaderBuilder {
reload_policy: ReloadPolicy,
index: Index,
warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize,
doc_store_cache_num_blocks: usize,
}
impl IndexReaderBuilder {
#[must_use]
pub(crate) fn new(index: Index) -> IndexReaderBuilder {
IndexReaderBuilder {
reload_policy: ReloadPolicy::OnCommitWithDelay,
index,
warmers: Vec::new(),
num_warming_threads: 1,
doc_store_cache_num_blocks: DOCSTORE_CACHE_CAPACITY,
}
}
pub fn try_into(self) -> crate::Result<IndexReader> {
let searcher_generation_inventory = Inventory::default();
let warming_state = WarmingState::new(
self.num_warming_threads,
self.warmers,
searcher_generation_inventory.clone(),
)?;
let inner_reader = InnerIndexReader::new(
self.doc_store_cache_num_blocks,
self.index,
warming_state,
searcher_generation_inventory,
)?;
let inner_reader_arc = Arc::new(inner_reader);
let watch_handle_opt: Option<WatchHandle> = match self.reload_policy {
ReloadPolicy::Manual => {
None
}
ReloadPolicy::OnCommitWithDelay => {
let inner_reader_arc_clone = inner_reader_arc.clone();
let callback = move || {
if let Err(err) = inner_reader_arc_clone.reload() {
error!("Error while loading searcher after commit was detected. {err:?}");
}
};
let watch_handle = inner_reader_arc
.index
.directory()
.watch(WatchCallback::new(callback))?;
Some(watch_handle)
}
};
Ok(IndexReader {
inner: inner_reader_arc,
_watch_handle_opt: watch_handle_opt,
})
}
#[must_use]
pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder {
self.reload_policy = reload_policy;
self
}
#[must_use]
pub fn doc_store_cache_num_blocks(
mut self,
doc_store_cache_num_blocks: usize,
) -> IndexReaderBuilder {
self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
self
}
#[must_use]
pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder {
self.warmers = warmers;
self
}
#[must_use]
pub fn num_warming_threads(mut self, num_warming_threads: usize) -> IndexReaderBuilder {
self.num_warming_threads = num_warming_threads;
self
}
}
impl TryInto<IndexReader> for IndexReaderBuilder {
type Error = crate::LucivyError;
fn try_into(self) -> crate::Result<IndexReader> {
IndexReaderBuilder::try_into(self)
}
}
struct InnerIndexReader {
doc_store_cache_num_blocks: usize,
index: Index,
warming_state: WarmingState,
searcher: arc_swap::ArcSwap<SearcherInner>,
searcher_generation_counter: Arc<AtomicU64>,
searcher_generation_inventory: Inventory<SearcherGeneration>,
}
impl InnerIndexReader {
fn new(
doc_store_cache_num_blocks: usize,
index: Index,
warming_state: WarmingState,
searcher_generation_inventory: Inventory<SearcherGeneration>,
) -> crate::Result<Self> {
let searcher_generation_counter: Arc<AtomicU64> = Default::default();
let searcher = Self::create_searcher(
&index,
doc_store_cache_num_blocks,
&warming_state,
&searcher_generation_counter,
&searcher_generation_inventory,
)?;
Ok(InnerIndexReader {
doc_store_cache_num_blocks,
index,
warming_state,
searcher: ArcSwap::from(searcher),
searcher_generation_counter,
searcher_generation_inventory,
})
}
fn open_segment_readers(index: &Index) -> crate::Result<Vec<SegmentReader>> {
let _meta_lock = index.directory().acquire_lock(&META_LOCK)?;
let searchable_segments = index.searchable_segments()?;
let segment_readers = searchable_segments
.iter()
.map(SegmentReader::open)
.collect::<crate::Result<_>>()?;
Ok(segment_readers)
}
#[cfg(feature = "quickwit")]
async fn open_segment_readers_async(index: &Index) -> crate::Result<Vec<SegmentReader>> {
let _meta_lock = index.directory().acquire_lock(&META_LOCK)?;
let searchable_segments = index.searchable_segments_async().await?;
let segment_readers =
futures::future::try_join_all(searchable_segments.iter().map(SegmentReader::open_async))
.await?;
Ok(segment_readers)
}
fn track_segment_readers_in_inventory(
segment_readers: &[SegmentReader],
searcher_generation_counter: &Arc<AtomicU64>,
searcher_generation_inventory: &Inventory<SearcherGeneration>,
) -> TrackedObject<SearcherGeneration> {
let generation_id = searcher_generation_counter.fetch_add(1, atomic::Ordering::AcqRel);
let searcher_generation =
SearcherGeneration::from_segment_readers(segment_readers, generation_id);
searcher_generation_inventory.track(searcher_generation)
}
fn create_searcher(
index: &Index,
doc_store_cache_num_blocks: usize,
warming_state: &WarmingState,
searcher_generation_counter: &Arc<AtomicU64>,
searcher_generation_inventory: &Inventory<SearcherGeneration>,
) -> crate::Result<Arc<SearcherInner>> {
let segment_readers = Self::open_segment_readers(index)?;
let searcher_generation = Self::track_segment_readers_in_inventory(
&segment_readers,
searcher_generation_counter,
searcher_generation_inventory,
);
let schema = index.schema();
let searcher = Arc::new(SearcherInner::new(
schema,
index.clone(),
segment_readers,
searcher_generation,
doc_store_cache_num_blocks,
)?);
warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
Ok(searcher)
}
fn reload(&self) -> crate::Result<()> {
let searcher = Self::create_searcher(
&self.index,
self.doc_store_cache_num_blocks,
&self.warming_state,
&self.searcher_generation_counter,
&self.searcher_generation_inventory,
)?;
self.searcher.store(searcher);
Ok(())
}
fn searcher(&self) -> Searcher {
self.searcher.load().clone().into()
}
}
#[derive(Clone)]
pub struct IndexReader {
inner: Arc<InnerIndexReader>,
_watch_handle_opt: Option<WatchHandle>,
}
impl IndexReader {
#[cfg(test)]
pub(crate) fn index(&self) -> Index {
self.inner.index.clone()
}
pub fn reload(&self) -> crate::Result<()> {
self.inner.reload()
}
pub fn searcher(&self) -> Searcher {
self.inner.searcher()
}
}