1mod warming;
2
3use std::sync::atomic::AtomicU64;
4use std::sync::{atomic, Arc, Weak};
5
6use arc_swap::ArcSwap;
7pub use warming::Warmer;
8
9use self::warming::WarmingState;
10use crate::core::searcher::{SearcherGeneration, SearcherInner};
11use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
12use crate::store::DOCSTORE_CACHE_CAPACITY;
13use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject};
14
15#[derive(Clone, Copy)]
21pub enum ReloadPolicy {
22 Manual,
28 OnCommitWithDelay, }
32
33#[derive(Clone)]
41pub struct IndexReaderBuilder {
42 reload_policy: ReloadPolicy,
43 index: Index,
44 warmers: Vec<Weak<dyn Warmer>>,
45 num_warming_threads: usize,
46 doc_store_cache_num_blocks: usize,
47}
48
49impl IndexReaderBuilder {
50 #[must_use]
51 pub(crate) fn new(index: Index) -> IndexReaderBuilder {
52 IndexReaderBuilder {
53 reload_policy: ReloadPolicy::OnCommitWithDelay,
54 index,
55 warmers: Vec::new(),
56 num_warming_threads: 1,
57 doc_store_cache_num_blocks: DOCSTORE_CACHE_CAPACITY,
58 }
59 }
60
61 pub fn try_into(self) -> crate::Result<IndexReader> {
67 let searcher_generation_inventory = Inventory::default();
68 let warming_state = WarmingState::new(
69 self.num_warming_threads,
70 self.warmers,
71 searcher_generation_inventory.clone(),
72 )?;
73 let inner_reader = InnerIndexReader::new(
74 self.doc_store_cache_num_blocks,
75 self.index,
76 warming_state,
77 searcher_generation_inventory,
78 )?;
79 let inner_reader_arc = Arc::new(inner_reader);
80 let watch_handle_opt: Option<WatchHandle> = match self.reload_policy {
81 ReloadPolicy::Manual => {
82 None
84 }
85 ReloadPolicy::OnCommitWithDelay => {
86 let inner_reader_arc_clone = inner_reader_arc.clone();
87 let callback = move || {
88 if let Err(err) = inner_reader_arc_clone.reload() {
89 error!("Error while loading searcher after commit was detected. {err:?}");
90 }
91 };
92 let watch_handle = inner_reader_arc
93 .index
94 .directory()
95 .watch(WatchCallback::new(callback))?;
96 Some(watch_handle)
97 }
98 };
99 Ok(IndexReader {
100 inner: inner_reader_arc,
101 _watch_handle_opt: watch_handle_opt,
102 })
103 }
104
105 #[must_use]
109 pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder {
110 self.reload_policy = reload_policy;
111 self
112 }
113
114 #[must_use]
118 pub fn doc_store_cache_num_blocks(
119 mut self,
120 doc_store_cache_num_blocks: usize,
121 ) -> IndexReaderBuilder {
122 self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
123 self
124 }
125
126 #[must_use]
128 pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder {
129 self.warmers = warmers;
130 self
131 }
132
133 #[must_use]
138 pub fn num_warming_threads(mut self, num_warming_threads: usize) -> IndexReaderBuilder {
139 self.num_warming_threads = num_warming_threads;
140 self
141 }
142}
143
144impl TryInto<IndexReader> for IndexReaderBuilder {
145 type Error = crate::TantivyError;
146
147 fn try_into(self) -> crate::Result<IndexReader> {
148 IndexReaderBuilder::try_into(self)
149 }
150}
151
152struct InnerIndexReader {
153 doc_store_cache_num_blocks: usize,
154 index: Index,
155 warming_state: WarmingState,
156 searcher: arc_swap::ArcSwap<SearcherInner>,
157 searcher_generation_counter: Arc<AtomicU64>,
158 searcher_generation_inventory: Inventory<SearcherGeneration>,
159}
160
161impl InnerIndexReader {
162 fn new(
163 doc_store_cache_num_blocks: usize,
164 index: Index,
165 warming_state: WarmingState,
166 searcher_generation_inventory: Inventory<SearcherGeneration>,
169 ) -> crate::Result<Self> {
170 let searcher_generation_counter: Arc<AtomicU64> = Default::default();
171
172 let searcher = Self::create_searcher(
173 &index,
174 doc_store_cache_num_blocks,
175 &warming_state,
176 &searcher_generation_counter,
177 &searcher_generation_inventory,
178 )?;
179 Ok(InnerIndexReader {
180 doc_store_cache_num_blocks,
181 index,
182 warming_state,
183 searcher: ArcSwap::from(searcher),
184 searcher_generation_counter,
185 searcher_generation_inventory,
186 })
187 }
188 fn open_segment_readers(index: &Index) -> crate::Result<Vec<SegmentReader>> {
193 let _meta_lock = index.directory().acquire_lock(&META_LOCK)?;
195 let searchable_segments = index.searchable_segments()?;
196 let segment_readers = searchable_segments
197 .iter()
198 .map(SegmentReader::open)
199 .collect::<crate::Result<_>>()?;
200 Ok(segment_readers)
201 }
202
203 fn track_segment_readers_in_inventory(
204 segment_readers: &[SegmentReader],
205 searcher_generation_counter: &Arc<AtomicU64>,
206 searcher_generation_inventory: &Inventory<SearcherGeneration>,
207 ) -> TrackedObject<SearcherGeneration> {
208 let generation_id = searcher_generation_counter.fetch_add(1, atomic::Ordering::AcqRel);
209 let searcher_generation =
210 SearcherGeneration::from_segment_readers(segment_readers, generation_id);
211 searcher_generation_inventory.track(searcher_generation)
212 }
213
214 fn create_searcher(
215 index: &Index,
216 doc_store_cache_num_blocks: usize,
217 warming_state: &WarmingState,
218 searcher_generation_counter: &Arc<AtomicU64>,
219 searcher_generation_inventory: &Inventory<SearcherGeneration>,
220 ) -> crate::Result<Arc<SearcherInner>> {
221 let segment_readers = Self::open_segment_readers(index)?;
222 let searcher_generation = Self::track_segment_readers_in_inventory(
223 &segment_readers,
224 searcher_generation_counter,
225 searcher_generation_inventory,
226 );
227
228 let schema = index.schema();
229 let searcher = Arc::new(SearcherInner::new(
230 schema,
231 index.clone(),
232 segment_readers,
233 searcher_generation,
234 doc_store_cache_num_blocks,
235 )?);
236
237 warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
238 Ok(searcher)
239 }
240
241 fn reload(&self) -> crate::Result<()> {
242 let searcher = Self::create_searcher(
243 &self.index,
244 self.doc_store_cache_num_blocks,
245 &self.warming_state,
246 &self.searcher_generation_counter,
247 &self.searcher_generation_inventory,
248 )?;
249
250 self.searcher.store(searcher);
251
252 Ok(())
253 }
254
255 fn searcher(&self) -> Searcher {
256 self.searcher.load().clone().into()
257 }
258}
259
260#[derive(Clone)]
267pub struct IndexReader {
268 inner: Arc<InnerIndexReader>,
269 _watch_handle_opt: Option<WatchHandle>,
270}
271
272impl IndexReader {
273 #[cfg(test)]
274 pub(crate) fn index(&self) -> Index {
275 self.inner.index.clone()
276 }
277
278 pub fn reload(&self) -> crate::Result<()> {
288 self.inner.reload()
289 }
290
291 pub fn searcher(&self) -> Searcher {
299 self.inner.searcher()
300 }
301}