tantivy/reader/
mod.rs

1mod warming;
2
3use std::sync::atomic::AtomicU64;
4use std::sync::{atomic, Arc, Weak};
5
6use arc_swap::ArcSwap;
7pub use warming::Warmer;
8
9use self::warming::WarmingState;
10use crate::core::searcher::{SearcherGeneration, SearcherInner};
11use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
12use crate::store::DOCSTORE_CACHE_CAPACITY;
13use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject};
14
15/// Defines when a new version of the index should be reloaded.
16///
17/// Regardless of whether you search and index in the same process, tantivy does not necessarily
18/// reflects the change that are committed to your index. `ReloadPolicy` precisely helps you define
19/// when you want your index to be reloaded.
20#[derive(Clone, Copy)]
21pub enum ReloadPolicy {
22    /// The index is entirely reloaded manually.
23    /// All updates of the index should be manual.
24    ///
25    /// No change is reflected automatically. You are required to call [`IndexReader::reload()`]
26    /// manually.
27    Manual,
28    /// The index is reloaded within milliseconds after a new commit is available.
29    /// This is made possible by watching changes in the `meta.json` file.
30    OnCommitWithDelay, // TODO add NEAR_REAL_TIME(target_ms)
31}
32
33/// [`IndexReader`] builder
34///
35/// It makes it possible to configure:
36/// - [`ReloadPolicy`] defining when new index versions are detected
37/// - [`Warmer`] implementations
38/// - number of warming threads, for parallelizing warming work
39/// - The cache size of the underlying doc store readers.
40#[derive(Clone)]
41pub struct IndexReaderBuilder {
42    reload_policy: ReloadPolicy,
43    index: Index,
44    warmers: Vec<Weak<dyn Warmer>>,
45    num_warming_threads: usize,
46    doc_store_cache_num_blocks: usize,
47}
48
49impl IndexReaderBuilder {
50    #[must_use]
51    pub(crate) fn new(index: Index) -> IndexReaderBuilder {
52        IndexReaderBuilder {
53            reload_policy: ReloadPolicy::OnCommitWithDelay,
54            index,
55            warmers: Vec::new(),
56            num_warming_threads: 1,
57            doc_store_cache_num_blocks: DOCSTORE_CACHE_CAPACITY,
58        }
59    }
60
61    /// Builds the reader.
62    ///
63    /// Building the reader is a non-trivial operation that requires
64    /// to open different segment readers. It may take hundreds of milliseconds
65    /// of time and it may return an error.
66    pub fn try_into(self) -> crate::Result<IndexReader> {
67        let searcher_generation_inventory = Inventory::default();
68        let warming_state = WarmingState::new(
69            self.num_warming_threads,
70            self.warmers,
71            searcher_generation_inventory.clone(),
72        )?;
73        let inner_reader = InnerIndexReader::new(
74            self.doc_store_cache_num_blocks,
75            self.index,
76            warming_state,
77            searcher_generation_inventory,
78        )?;
79        let inner_reader_arc = Arc::new(inner_reader);
80        let watch_handle_opt: Option<WatchHandle> = match self.reload_policy {
81            ReloadPolicy::Manual => {
82                // No need to set anything...
83                None
84            }
85            ReloadPolicy::OnCommitWithDelay => {
86                let inner_reader_arc_clone = inner_reader_arc.clone();
87                let callback = move || {
88                    if let Err(err) = inner_reader_arc_clone.reload() {
89                        error!("Error while loading searcher after commit was detected. {err:?}");
90                    }
91                };
92                let watch_handle = inner_reader_arc
93                    .index
94                    .directory()
95                    .watch(WatchCallback::new(callback))?;
96                Some(watch_handle)
97            }
98        };
99        Ok(IndexReader {
100            inner: inner_reader_arc,
101            _watch_handle_opt: watch_handle_opt,
102        })
103    }
104
105    /// Sets the reload_policy.
106    ///
107    /// See [`ReloadPolicy`] for more details.
108    #[must_use]
109    pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder {
110        self.reload_policy = reload_policy;
111        self
112    }
113
114    /// Sets the cache size of the doc store readers.
115    ///
116    /// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks.
117    #[must_use]
118    pub fn doc_store_cache_num_blocks(
119        mut self,
120        doc_store_cache_num_blocks: usize,
121    ) -> IndexReaderBuilder {
122        self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
123        self
124    }
125
126    /// Set the [`Warmer`]s that are invoked when reloading searchable segments.
127    #[must_use]
128    pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder {
129        self.warmers = warmers;
130        self
131    }
132
133    /// Sets the number of warming threads.
134    ///
135    /// This allows parallelizing warming work when there are multiple [`Warmer`] registered with
136    /// the [`IndexReader`].
137    #[must_use]
138    pub fn num_warming_threads(mut self, num_warming_threads: usize) -> IndexReaderBuilder {
139        self.num_warming_threads = num_warming_threads;
140        self
141    }
142}
143
144impl TryInto<IndexReader> for IndexReaderBuilder {
145    type Error = crate::TantivyError;
146
147    fn try_into(self) -> crate::Result<IndexReader> {
148        IndexReaderBuilder::try_into(self)
149    }
150}
151
152struct InnerIndexReader {
153    doc_store_cache_num_blocks: usize,
154    index: Index,
155    warming_state: WarmingState,
156    searcher: arc_swap::ArcSwap<SearcherInner>,
157    searcher_generation_counter: Arc<AtomicU64>,
158    searcher_generation_inventory: Inventory<SearcherGeneration>,
159}
160
161impl InnerIndexReader {
162    fn new(
163        doc_store_cache_num_blocks: usize,
164        index: Index,
165        warming_state: WarmingState,
166        // The searcher_generation_inventory is not used as source, but as target to track the
167        // loaded segments.
168        searcher_generation_inventory: Inventory<SearcherGeneration>,
169    ) -> crate::Result<Self> {
170        let searcher_generation_counter: Arc<AtomicU64> = Default::default();
171
172        let searcher = Self::create_searcher(
173            &index,
174            doc_store_cache_num_blocks,
175            &warming_state,
176            &searcher_generation_counter,
177            &searcher_generation_inventory,
178        )?;
179        Ok(InnerIndexReader {
180            doc_store_cache_num_blocks,
181            index,
182            warming_state,
183            searcher: ArcSwap::from(searcher),
184            searcher_generation_counter,
185            searcher_generation_inventory,
186        })
187    }
188    /// Opens the freshest segments [`SegmentReader`].
189    ///
190    /// This function acquires a lock to prevent GC from removing files
191    /// as we are opening our index.
192    fn open_segment_readers(index: &Index) -> crate::Result<Vec<SegmentReader>> {
193        // Prevents segment files from getting deleted while we are in the process of opening them
194        let _meta_lock = index.directory().acquire_lock(&META_LOCK)?;
195        let searchable_segments = index.searchable_segments()?;
196        let segment_readers = searchable_segments
197            .iter()
198            .map(SegmentReader::open)
199            .collect::<crate::Result<_>>()?;
200        Ok(segment_readers)
201    }
202
203    fn track_segment_readers_in_inventory(
204        segment_readers: &[SegmentReader],
205        searcher_generation_counter: &Arc<AtomicU64>,
206        searcher_generation_inventory: &Inventory<SearcherGeneration>,
207    ) -> TrackedObject<SearcherGeneration> {
208        let generation_id = searcher_generation_counter.fetch_add(1, atomic::Ordering::AcqRel);
209        let searcher_generation =
210            SearcherGeneration::from_segment_readers(segment_readers, generation_id);
211        searcher_generation_inventory.track(searcher_generation)
212    }
213
214    fn create_searcher(
215        index: &Index,
216        doc_store_cache_num_blocks: usize,
217        warming_state: &WarmingState,
218        searcher_generation_counter: &Arc<AtomicU64>,
219        searcher_generation_inventory: &Inventory<SearcherGeneration>,
220    ) -> crate::Result<Arc<SearcherInner>> {
221        let segment_readers = Self::open_segment_readers(index)?;
222        let searcher_generation = Self::track_segment_readers_in_inventory(
223            &segment_readers,
224            searcher_generation_counter,
225            searcher_generation_inventory,
226        );
227
228        let schema = index.schema();
229        let searcher = Arc::new(SearcherInner::new(
230            schema,
231            index.clone(),
232            segment_readers,
233            searcher_generation,
234            doc_store_cache_num_blocks,
235        )?);
236
237        warming_state.warm_new_searcher_generation(&searcher.clone().into())?;
238        Ok(searcher)
239    }
240
241    fn reload(&self) -> crate::Result<()> {
242        let searcher = Self::create_searcher(
243            &self.index,
244            self.doc_store_cache_num_blocks,
245            &self.warming_state,
246            &self.searcher_generation_counter,
247            &self.searcher_generation_inventory,
248        )?;
249
250        self.searcher.store(searcher);
251
252        Ok(())
253    }
254
255    fn searcher(&self) -> Searcher {
256        self.searcher.load().clone().into()
257    }
258}
259
260/// `IndexReader` is your entry point to read and search the index.
261///
262/// It controls when a new version of the index should be loaded and lends
263/// you instances of `Searcher` for the last loaded version.
264///
265/// `IndexReader` just wraps an `Arc`.
266#[derive(Clone)]
267pub struct IndexReader {
268    inner: Arc<InnerIndexReader>,
269    _watch_handle_opt: Option<WatchHandle>,
270}
271
272impl IndexReader {
273    #[cfg(test)]
274    pub(crate) fn index(&self) -> Index {
275        self.inner.index.clone()
276    }
277
278    /// Update searchers so that they reflect the state of the last
279    /// `.commit()`.
280    ///
281    /// If you set up the [`ReloadPolicy::OnCommitWithDelay`] (which is the default)
282    /// every commit should be rapidly reflected on your `IndexReader` and you should
283    /// not need to call `reload()` at all.
284    ///
285    /// This automatic reload can take 10s of milliseconds to kick in however, and in unit tests
286    /// it can be nice to deterministically force the reload of searchers.
287    pub fn reload(&self) -> crate::Result<()> {
288        self.inner.reload()
289    }
290
291    /// Returns a searcher
292    ///
293    /// This method should be called every single time a search
294    /// query is performed.
295    ///
296    /// The same searcher must be used for a given query, as it ensures
297    /// the use of a consistent segment set.
298    pub fn searcher(&self) -> Searcher {
299        self.inner.searcher()
300    }
301}