summavy/indexer/
segment_updater.rs

1use std::borrow::BorrowMut;
2use std::collections::HashSet;
3use std::io::Write;
4use std::ops::Deref;
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, RwLock};
8
9use fail::fail_point;
10use rayon::{ThreadPool, ThreadPoolBuilder};
11
12use super::segment_manager::SegmentManager;
13use crate::core::{
14    Index, IndexMeta, IndexSettings, Segment, SegmentId, SegmentMeta, META_FILEPATH,
15};
16use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
17use crate::fastfield::AliveBitSet;
18use crate::indexer::delete_queue::DeleteCursor;
19use crate::indexer::index_writer::advance_deletes;
20use crate::indexer::merge_operation::MergeOperationInventory;
21use crate::indexer::merger::IndexMerger;
22use crate::indexer::segment_manager::SegmentsStatus;
23use crate::indexer::stamper::Stamper;
24use crate::indexer::{
25    DefaultMergePolicy, MergeCandidate, MergeOperation, MergePolicy, SegmentEntry,
26    SegmentSerializer,
27};
28use crate::{FutureResult, Opstamp};
29
30const NUM_MERGE_THREADS: usize = 4;
31
32/// Save the index meta file.
33/// This operation is atomic:
34/// Either
35///  - it fails, in which case an error is returned,
36/// and the `meta.json` remains untouched,
37/// - it success, and `meta.json` is written
38/// and flushed.
39///
40/// This method is not part of tantivy's public API
41pub(crate) fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate::Result<()> {
42    info!("save metas");
43    let mut buffer = serde_json::to_vec_pretty(metas)?;
44    // Just adding a new line at the end of the buffer.
45    writeln!(&mut buffer)?;
46    fail_point!("save_metas", |msg| Err(crate::TantivyError::from(
47        std::io::Error::new(
48            std::io::ErrorKind::Other,
49            msg.unwrap_or_else(|| "Undefined".to_string())
50        )
51    )));
52    directory.sync_directory()?;
53    directory.atomic_write(&META_FILEPATH, &buffer[..])?;
54    debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
55    Ok(())
56}
57
58// The segment update runner is in charge of processing all
59//  of the `SegmentUpdate`s.
60//
61// All this processing happens on a single thread
62// consuming a common queue.
63//
64// We voluntarily pass a merge_operation ref to guarantee that
65// the merge_operation is alive during the process
66#[derive(Clone)]
67pub(crate) struct SegmentUpdater(Arc<InnerSegmentUpdater>);
68
69impl Deref for SegmentUpdater {
70    type Target = InnerSegmentUpdater;
71
72    #[inline]
73    fn deref(&self) -> &Self::Target {
74        &self.0
75    }
76}
77
78fn garbage_collect_files(
79    segment_updater: SegmentUpdater,
80) -> crate::Result<GarbageCollectionResult> {
81    info!("Running garbage collection");
82    let mut index = segment_updater.index.clone();
83    index
84        .directory_mut()
85        .garbage_collect(move || segment_updater.list_files())
86}
87
88/// Merges a list of segments the list of segment givens in the `segment_entries`.
89/// This function happens in the calling thread and is computationally expensive.
90/// Methods allows to override segment attributes by setting `override_segment_attributes`
91/// argument
92fn merge(
93    index: &Index,
94    mut segment_entries: Vec<SegmentEntry>,
95    target_opstamp: Opstamp,
96    override_segment_attributes: Option<serde_json::Value>,
97) -> crate::Result<Option<SegmentEntry>> {
98    let num_docs = segment_entries
99        .iter()
100        .map(|segment| segment.meta().num_docs() as u64)
101        .sum::<u64>();
102    if num_docs == 0 {
103        return Ok(None);
104    }
105
106    let segment_attributes = override_segment_attributes.or_else(|| {
107        index
108            .segment_attributes_merger()
109            .as_ref()
110            .map(|segment_attributes_merger| {
111                let current_segment_attributes: Vec<_> = segment_entries
112                    .iter()
113                    .filter_map(|segment_entry| segment_entry.meta().segment_attributes().as_ref())
114                    .collect();
115                segment_attributes_merger.merge_json(current_segment_attributes)
116            })
117    });
118
119    let delete_cursor = segment_entries[0].delete_cursor().clone();
120
121    if segment_entries.len() == 1 && !segment_entries[0].meta().has_deletes() {
122        let original_segment = segment_entries.into_iter().nth(0).unwrap();
123        let new_segment_meta = match segment_attributes {
124            Some(segment_attributes) => original_segment
125                .meta()
126                .clone()
127                .with_segment_attributes(segment_attributes),
128            None => original_segment.meta().clone(),
129        };
130        return Ok(Some(SegmentEntry::new(
131            new_segment_meta,
132            delete_cursor,
133            None,
134        )));
135    }
136
137    // first we need to apply deletes to our segment.
138    let merged_segment = index.new_segment();
139
140    // First we apply all of the delete to the merged segment, up to the target opstamp.
141    for segment_entry in &mut segment_entries {
142        let segment = index.segment(segment_entry.meta().clone());
143        advance_deletes(segment, segment_entry, target_opstamp)?;
144    }
145
146    let segments: Vec<Segment> = segment_entries
147        .iter()
148        .map(|segment_entry| index.segment(segment_entry.meta().clone()))
149        .collect();
150
151    // An IndexMerger is like a "view" of our merged segments.
152    let merger: IndexMerger =
153        IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?;
154
155    // ... we just serialize this index merger in our new segment to merge the segments.
156    let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone(), true)?;
157
158    let num_docs = merger.write(segment_serializer)?;
159
160    let merged_segment_id = merged_segment.id();
161
162    let segment_meta = index.new_segment_meta(merged_segment_id, num_docs, segment_attributes);
163    Ok(Some(SegmentEntry::new(segment_meta, delete_cursor, None)))
164}
165
166/// Advanced: Merges a list of segments from different indices in a new index.
167///
168/// Returns `TantivyError` if the indices list is empty or their
169/// schemas don't match.
170///
171/// `output_directory`: is assumed to be empty.
172///
173/// # Warning
174/// This function does NOT check or take the `IndexWriter` is running. It is not
175/// meant to work if you have an `IndexWriter` running for the origin indices, or
176/// the destination `Index`.
177#[doc(hidden)]
178pub fn merge_indices<T: Into<Box<dyn Directory>>>(
179    indices: &[Index],
180    output_directory: T,
181) -> crate::Result<Index> {
182    if indices.is_empty() {
183        // If there are no indices to merge, there is no need to do anything.
184        return Err(crate::TantivyError::InvalidArgument(
185            "No indices given to merge".to_string(),
186        ));
187    }
188
189    let target_settings = indices[0].settings().clone();
190
191    // let's check that all of the indices have the same index settings
192    if indices
193        .iter()
194        .skip(1)
195        .any(|index| index.settings() != &target_settings)
196    {
197        return Err(crate::TantivyError::InvalidArgument(
198            "Attempt to merge indices with different index_settings".to_string(),
199        ));
200    }
201
202    let mut segments: Vec<Segment> = Vec::new();
203    for index in indices {
204        segments.extend(index.searchable_segments()?);
205    }
206
207    let non_filter = segments.iter().map(|_| None).collect::<Vec<_>>();
208    merge_filtered_segments(&segments, target_settings, non_filter, output_directory)
209}
210
211/// Advanced: Merges a list of segments from different indices in a new index.
212/// Additional you can provide a delete bitset for each segment to ignore doc_ids.
213///
214/// Returns `TantivyError` if the indices list is empty or their
215/// schemas don't match.
216///
217/// `output_directory`: is assumed to be empty.
218///
219/// # Warning
220/// This function does NOT check or take the `IndexWriter` is running. It is not
221/// meant to work if you have an `IndexWriter` running for the origin indices, or
222/// the destination `Index`.
223#[doc(hidden)]
224pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
225    segments: &[Segment],
226    target_settings: IndexSettings,
227    filter_doc_ids: Vec<Option<AliveBitSet>>,
228    output_directory: T,
229) -> crate::Result<Index> {
230    if segments.is_empty() {
231        // If there are no indices to merge, there is no need to do anything.
232        return Err(crate::TantivyError::InvalidArgument(
233            "No segments given to merge".to_string(),
234        ));
235    }
236
237    let target_schema = segments[0].schema();
238
239    // let's check that all of the indices have the same schema
240    if segments
241        .iter()
242        .skip(1)
243        .any(|index| index.schema() != target_schema)
244    {
245        return Err(crate::TantivyError::InvalidArgument(
246            "Attempt to merge different schema indices".to_string(),
247        ));
248    }
249
250    let mut merged_index = Index::create(
251        output_directory,
252        target_schema.clone(),
253        target_settings.clone(),
254    )?;
255    let merged_segment = merged_index.new_segment();
256    let merged_segment_id = merged_segment.id();
257    let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
258        merged_index.schema(),
259        merged_index.settings().clone(),
260        segments,
261        filter_doc_ids,
262    )?;
263    let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?;
264    let num_docs = merger.write(segment_serializer)?;
265
266    let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs, None);
267
268    let stats = format!(
269        "Segments Merge: [{}]",
270        segments
271            .iter()
272            .fold(String::new(), |sum, current| format!(
273                "{}{} ",
274                sum,
275                current.meta().id().uuid_string()
276            ))
277            .trim_end()
278    );
279
280    let index_meta = IndexMeta {
281        index_settings: target_settings, // index_settings of all segments should be the same
282        segments: vec![segment_meta],
283        schema: target_schema,
284        opstamp: 0u64,
285        payload: Some(stats),
286        index_attributes: None,
287    };
288
289    // save the meta.json
290    save_metas(&index_meta, merged_index.directory_mut())?;
291
292    Ok(merged_index)
293}
294
295pub(crate) struct InnerSegmentUpdater {
296    // we keep a copy of the current active IndexMeta to
297    // avoid loading the file every time we need it in the
298    // `SegmentUpdater`.
299    //
300    // This should be up to date as all update happen through
301    // the unique active `SegmentUpdater`.
302    active_index_meta: RwLock<Arc<IndexMeta>>,
303    pool: ThreadPool,
304    merge_thread_pool: ThreadPool,
305
306    index: Index,
307    segment_manager: SegmentManager,
308    merge_policy: RwLock<Arc<dyn MergePolicy>>,
309    killed: AtomicBool,
310    stamper: Stamper,
311    merge_operations: MergeOperationInventory,
312}
313
314impl SegmentUpdater {
315    pub fn create(
316        index: Index,
317        stamper: Stamper,
318        delete_cursor: &DeleteCursor,
319    ) -> crate::Result<SegmentUpdater> {
320        let segments = index.searchable_segment_metas()?;
321        let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
322        let pool = ThreadPoolBuilder::new()
323            .thread_name(|_| "segment_updater".to_string())
324            .num_threads(1)
325            .build()
326            .map_err(|_| {
327                crate::TantivyError::SystemError(
328                    "Failed to spawn segment updater thread".to_string(),
329                )
330            })?;
331        let merge_thread_pool = ThreadPoolBuilder::new()
332            .thread_name(|i| format!("merge_thread_{i}"))
333            .num_threads(NUM_MERGE_THREADS)
334            .build()
335            .map_err(|_| {
336                crate::TantivyError::SystemError(
337                    "Failed to spawn segment merging thread".to_string(),
338                )
339            })?;
340        let index_meta = index.load_metas()?;
341        Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
342            active_index_meta: RwLock::new(Arc::new(index_meta)),
343            pool,
344            merge_thread_pool,
345            index,
346            segment_manager,
347            merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
348            killed: AtomicBool::new(false),
349            stamper,
350            merge_operations: Default::default(),
351        })))
352    }
353
354    pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
355        self.merge_policy.read().unwrap().clone()
356    }
357
358    pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
359        let arc_merge_policy = Arc::from(merge_policy);
360        *self.merge_policy.write().unwrap() = arc_merge_policy;
361    }
362
363    fn schedule_task<T: 'static + Send, F: FnOnce() -> crate::Result<T> + 'static + Send>(
364        &self,
365        task: F,
366    ) -> FutureResult<T> {
367        if !self.is_alive() {
368            return crate::TantivyError::SystemError("Segment updater killed".to_string()).into();
369        }
370        let (scheduled_result, sender) = FutureResult::create(
371            "A segment_updater future did not succeed. This should never happen.",
372        );
373        self.pool.spawn(|| {
374            let task_result = task();
375            let _ = sender.send(task_result);
376        });
377        scheduled_result
378    }
379
380    pub fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> FutureResult<()> {
381        let segment_updater = self.clone();
382        self.schedule_task(move || {
383            segment_updater.segment_manager.add_segment(segment_entry);
384            segment_updater.consider_merge_options();
385            Ok(())
386        })
387    }
388
389    /// Orders `SegmentManager` to remove all segments
390    pub(crate) fn remove_all_segments(&self) {
391        self.segment_manager.remove_all_segments();
392    }
393
394    pub fn kill(&mut self) {
395        self.killed.store(true, Ordering::Release);
396    }
397
398    pub fn is_alive(&self) -> bool {
399        !self.killed.load(Ordering::Acquire)
400    }
401
402    /// Apply deletes up to the target opstamp to all segments.
403    ///
404    /// The method returns copies of the segment entries,
405    /// updated with the delete information.
406    fn purge_deletes(&self, target_opstamp: Opstamp) -> crate::Result<Vec<SegmentEntry>> {
407        let mut segment_entries = self.segment_manager.segment_entries();
408        for segment_entry in &mut segment_entries {
409            let segment = self.index.segment(segment_entry.meta().clone());
410            advance_deletes(segment, segment_entry, target_opstamp)?;
411        }
412        Ok(segment_entries)
413    }
414
415    pub fn save_metas(
416        &self,
417        opstamp: Opstamp,
418        commit_message: Option<String>,
419    ) -> crate::Result<()> {
420        if self.is_alive() {
421            let index = &self.index;
422            let directory = index.directory();
423            let mut commited_segment_metas = self.segment_manager.committed_segment_metas();
424
425            // We sort segment_readers by number of documents.
426            // This is an heuristic to make multithreading more efficient.
427            //
428            // This is not done at the searcher level because I had a strange
429            // use case in which I was dealing with a large static index,
430            // dispatched over 5 SSD drives.
431            //
432            // A `UnionDirectory` makes it possible to read from these
433            // 5 different drives and creates a meta.json on the fly.
434            // In order to optimize the throughput, it creates a lasagna of segments
435            // from the different drives.
436            //
437            // Segment 1 from disk 1, Segment 1 from disk 2, etc.
438            commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
439            let index_meta = IndexMeta {
440                index_settings: index.settings().clone(),
441                segments: commited_segment_metas,
442                schema: index.schema(),
443                opstamp,
444                payload: commit_message,
445                index_attributes: self
446                    .active_index_meta
447                    .read()
448                    .unwrap()
449                    .index_attributes
450                    .clone(),
451            };
452            // TODO add context to the error.
453            save_metas(&index_meta, directory.box_clone().borrow_mut())?;
454            self.store_meta(&index_meta);
455        }
456        Ok(())
457    }
458
459    pub fn schedule_garbage_collect(&self) -> FutureResult<GarbageCollectionResult> {
460        let self_clone = self.clone();
461        self.schedule_task(move || garbage_collect_files(self_clone))
462    }
463
464    /// List the files that are useful to the index.
465    ///
466    /// This does not include lock files, or files that are obsolete
467    /// but have not yet been deleted by the garbage collector.
468    fn list_files(&self) -> HashSet<PathBuf> {
469        let mut files: HashSet<PathBuf> = self
470            .index
471            .list_all_segment_metas()
472            .into_iter()
473            .flat_map(|segment_meta| segment_meta.list_files())
474            .collect();
475        files.insert(META_FILEPATH.to_path_buf());
476        files
477    }
478
479    pub(crate) fn schedule_commit(
480        &self,
481        opstamp: Opstamp,
482        payload: Option<String>,
483    ) -> FutureResult<Opstamp> {
484        let segment_updater: SegmentUpdater = self.clone();
485        self.schedule_task(move || {
486            let segment_entries = segment_updater.purge_deletes(opstamp)?;
487            segment_updater.segment_manager.commit(segment_entries);
488            segment_updater.save_metas(opstamp, payload)?;
489            let _ = garbage_collect_files(segment_updater.clone());
490            segment_updater.consider_merge_options();
491            Ok(opstamp)
492        })
493    }
494
495    fn store_meta(&self, index_meta: &IndexMeta) {
496        *self.active_index_meta.write().unwrap() = Arc::new(index_meta.clone());
497    }
498
499    fn load_meta(&self) -> Arc<IndexMeta> {
500        self.active_index_meta.read().unwrap().clone()
501    }
502
503    pub(crate) fn make_merge_operation(
504        &self,
505        segment_ids: &[SegmentId],
506        segment_attributes: Option<serde_json::Value>,
507    ) -> MergeOperation {
508        let commit_opstamp = self.load_meta().opstamp;
509        MergeOperation::new(
510            &self.merge_operations,
511            commit_opstamp,
512            segment_ids.to_vec(),
513            segment_attributes,
514        )
515    }
516
517    // Starts a merge operation. This function will block until the merge operation is effectively
518    // started. Note that it does not wait for the merge to terminate.
519    // The calling thread should not be block for a long time, as this only involve waiting for the
520    // `SegmentUpdater` queue which in turns only contains lightweight operations.
521    //
522    // The merge itself happens on a different thread.
523    //
524    // When successful, this function returns a `Future` for a `Result<SegmentMeta>` that represents
525    // the actual outcome of the merge operation.
526    //
527    // It returns an error if for some reason the merge operation could not be started.
528    //
529    // At this point an error is not necessarily the sign of a malfunction.
530    // (e.g. A rollback could have happened, between the instant when the merge operation was
531    // suggested and the moment when it ended up being executed.)
532    //
533    // `segment_ids` is required to be non-empty.
534    pub fn start_merge(
535        &self,
536        merge_operation: MergeOperation,
537    ) -> FutureResult<Option<SegmentMeta>> {
538        assert!(
539            !merge_operation.segment_ids().is_empty(),
540            "Segment_ids cannot be empty."
541        );
542
543        let segment_updater = self.clone();
544        let segment_entries: Vec<SegmentEntry> = match self
545            .segment_manager
546            .start_merge(merge_operation.segment_ids())
547        {
548            Ok(segment_entries) => segment_entries,
549            Err(err) => {
550                warn!(
551                    "Starting the merge failed for the following reason. This is not fatal. {}",
552                    err
553                );
554                return err.into();
555            }
556        };
557
558        info!("Starting merge  - {:?}", merge_operation.segment_ids());
559
560        let (scheduled_result, merging_future_send) =
561            FutureResult::create("Merge operation failed.");
562
563        self.merge_thread_pool.spawn(move || {
564            // The fact that `merge_operation` is moved here is important.
565            // Its lifetime is used to track how many merging thread are currently running,
566            // as well as which segment is currently in merge and therefore should not be
567            // candidate for another merge.
568            match merge(
569                &segment_updater.index,
570                segment_entries,
571                merge_operation.target_opstamp(),
572                merge_operation.segment_attributes().clone(),
573            ) {
574                Ok(after_merge_segment_entry) => {
575                    let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry);
576                    let _send_result = merging_future_send.send(res);
577                }
578                Err(merge_error) => {
579                    warn!(
580                        "Merge of {:?} was cancelled: {:?}",
581                        merge_operation.segment_ids().to_vec(),
582                        merge_error
583                    );
584                    if cfg!(test) {
585                        panic!("{:?}", merge_error);
586                    }
587                    let _send_result = merging_future_send.send(Err(merge_error));
588                }
589            }
590        });
591
592        scheduled_result
593    }
594
595    pub(crate) fn get_mergeable_segments(&self) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
596        let merge_segment_ids: HashSet<SegmentId> = self.merge_operations.segment_in_merge();
597        self.segment_manager
598            .get_mergeable_segments(&merge_segment_ids)
599    }
600
601    fn consider_merge_options(&self) {
602        let (committed_segments, uncommitted_segments) = self.get_mergeable_segments();
603
604        // Committed segments cannot be merged with uncommitted_segments.
605        // We therefore consider merges using these two sets of segments independently.
606        let merge_policy = self.get_merge_policy();
607
608        let current_opstamp = self.stamper.stamp();
609        let mut merge_candidates: Vec<MergeOperation> = merge_policy
610            .compute_merge_candidates(&uncommitted_segments)
611            .into_iter()
612            .map(|merge_candidate| {
613                MergeOperation::new(
614                    &self.merge_operations,
615                    current_opstamp,
616                    merge_candidate.0,
617                    None,
618                )
619            })
620            .collect();
621
622        let commit_opstamp = self.load_meta().opstamp;
623        let committed_merge_candidates = merge_policy
624            .compute_merge_candidates(&committed_segments)
625            .into_iter()
626            .map(|merge_candidate: MergeCandidate| {
627                MergeOperation::new(
628                    &self.merge_operations,
629                    commit_opstamp,
630                    merge_candidate.0,
631                    None,
632                )
633            });
634        merge_candidates.extend(committed_merge_candidates);
635
636        for merge_operation in merge_candidates {
637            // If a merge cannot be started this is not a fatal error.
638            // We do log a warning in `start_merge`.
639            drop(self.start_merge(merge_operation));
640        }
641    }
642
643    /// Queues a `end_merge` in the segment updater and blocks until it is successfully processed.
644    fn end_merge(
645        &self,
646        merge_operation: MergeOperation,
647        mut after_merge_segment_entry: Option<SegmentEntry>,
648    ) -> crate::Result<Option<SegmentMeta>> {
649        let segment_updater = self.clone();
650        let after_merge_segment_meta = after_merge_segment_entry
651            .as_ref()
652            .map(|after_merge_segment_entry| after_merge_segment_entry.meta().clone());
653        self.schedule_task(move || {
654            info!(
655                "End merge {:?}",
656                after_merge_segment_entry.as_ref().map(|entry| entry.meta())
657            );
658            {
659                if let Some(after_merge_segment_entry) = after_merge_segment_entry.as_mut() {
660                    let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
661                    if let Some(delete_operation) = delete_cursor.get() {
662                        let committed_opstamp = segment_updater.load_meta().opstamp;
663                        if delete_operation.opstamp < committed_opstamp {
664                            let index = &segment_updater.index;
665                            let segment = index.segment(after_merge_segment_entry.meta().clone());
666                            if let Err(advance_deletes_err) = advance_deletes(
667                                segment,
668                                after_merge_segment_entry,
669                                committed_opstamp,
670                            ) {
671                                error!(
672                                    "Merge of {:?} was cancelled (advancing deletes failed): {:?}",
673                                    merge_operation.segment_ids(),
674                                    advance_deletes_err
675                                );
676                                assert!(!cfg!(test), "Merge failed.");
677
678                                // ... cancel merge
679                                // `merge_operations` are tracked. As it is dropped, the
680                                // the segment_ids will be available again for merge.
681                                return Err(advance_deletes_err);
682                            }
683                        }
684                    }
685                }
686                let previous_metas = segment_updater.load_meta();
687                let segments_status = segment_updater
688                    .segment_manager
689                    .end_merge(merge_operation.segment_ids(), after_merge_segment_entry)?;
690
691                if segments_status == SegmentsStatus::Committed {
692                    segment_updater
693                        .save_metas(previous_metas.opstamp, previous_metas.payload.clone())?;
694                }
695
696                segment_updater.consider_merge_options();
697            } // we drop all possible handle to a now useless `SegmentMeta`.
698
699            let _ = garbage_collect_files(segment_updater);
700            Ok(())
701        })
702        .wait()?;
703        Ok(after_merge_segment_meta)
704    }
705
706    /// Wait for current merging threads.
707    ///
708    /// Upon termination of the current merging threads,
709    /// merge opportunity may appear.
710    ///
711    /// We keep waiting until the merge policy judges that
712    /// no opportunity is available.
713    ///
714    /// Note that it is not required to call this
715    /// method in your application.
716    /// Terminating your application without letting
717    /// merge terminate is perfectly safe.
718    ///
719    /// Obsolete files will eventually be cleaned up
720    /// by the directory garbage collector.
721    pub fn wait_merging_thread(&self) -> crate::Result<()> {
722        self.merge_operations.wait_until_empty();
723        Ok(())
724    }
725}
726
727#[cfg(test)]
728mod tests {
729    use super::merge_indices;
730    use crate::collector::TopDocs;
731    use crate::directory::RamDirectory;
732    use crate::fastfield::AliveBitSet;
733    use crate::indexer::merge_policy::tests::MergeWheneverPossible;
734    use crate::indexer::merger::IndexMerger;
735    use crate::indexer::segment_updater::merge_filtered_segments;
736    use crate::query::QueryParser;
737    use crate::schema::*;
738    use crate::{Directory, DocAddress, Index, Segment};
739
740    #[test]
741    fn test_delete_during_merge() -> crate::Result<()> {
742        let mut schema_builder = Schema::builder();
743        let text_field = schema_builder.add_text_field("text", TEXT);
744        let index = Index::create_in_ram(schema_builder.build());
745
746        // writing the segment
747        let mut index_writer = index.writer_for_tests()?;
748        index_writer.set_merge_policy(Box::new(MergeWheneverPossible));
749
750        for _ in 0..100 {
751            index_writer.add_document(doc!(text_field=>"a"))?;
752            index_writer.add_document(doc!(text_field=>"b"))?;
753        }
754        index_writer.commit()?;
755
756        for _ in 0..100 {
757            index_writer.add_document(doc!(text_field=>"c"))?;
758            index_writer.add_document(doc!(text_field=>"d"))?;
759        }
760        index_writer.commit()?;
761
762        index_writer.add_document(doc!(text_field=>"e"))?;
763        index_writer.add_document(doc!(text_field=>"f"))?;
764        index_writer.commit()?;
765
766        let term = Term::from_field_text(text_field, "a");
767        index_writer.delete_term(term);
768        index_writer.commit()?;
769
770        let reader = index.reader()?;
771        assert_eq!(reader.searcher().num_docs(), 302);
772
773        index_writer.wait_merging_threads()?;
774
775        reader.reload()?;
776        assert_eq!(reader.searcher().segment_readers().len(), 1);
777        assert_eq!(reader.searcher().num_docs(), 302);
778        Ok(())
779    }
780
781    #[test]
782    fn delete_all_docs_min() -> crate::Result<()> {
783        let mut schema_builder = Schema::builder();
784        let text_field = schema_builder.add_text_field("text", TEXT);
785        let index = Index::create_in_ram(schema_builder.build());
786
787        // writing the segment
788        let mut index_writer = index.writer_for_tests()?;
789
790        for _ in 0..10 {
791            index_writer.add_document(doc!(text_field=>"a"))?;
792            index_writer.add_document(doc!(text_field=>"b"))?;
793        }
794        index_writer.commit()?;
795
796        let seg_ids = index.searchable_segment_ids()?;
797        // docs exist, should have at least 1 segment
798        assert!(!seg_ids.is_empty());
799
800        let term = Term::from_field_text(text_field, "a");
801        index_writer.delete_term(term);
802        index_writer.commit()?;
803
804        let term = Term::from_field_text(text_field, "b");
805        index_writer.delete_term(term);
806        index_writer.commit()?;
807
808        index_writer.wait_merging_threads()?;
809
810        let reader = index.reader()?;
811        assert_eq!(reader.searcher().num_docs(), 0);
812
813        let seg_ids = index.searchable_segment_ids()?;
814        assert!(seg_ids.is_empty());
815
816        reader.reload()?;
817        assert_eq!(reader.searcher().num_docs(), 0);
818        // empty segments should be erased
819        assert!(index.searchable_segment_metas()?.is_empty());
820        assert!(reader.searcher().segment_readers().is_empty());
821
822        Ok(())
823    }
824
825    #[test]
826    fn delete_all_docs() -> crate::Result<()> {
827        let mut schema_builder = Schema::builder();
828        let text_field = schema_builder.add_text_field("text", TEXT);
829        let index = Index::create_in_ram(schema_builder.build());
830
831        // writing the segment
832        let mut index_writer = index.writer_for_tests()?;
833
834        for _ in 0..100 {
835            index_writer.add_document(doc!(text_field=>"a"))?;
836            index_writer.add_document(doc!(text_field=>"b"))?;
837        }
838        index_writer.commit()?;
839
840        for _ in 0..100 {
841            index_writer.add_document(doc!(text_field=>"c"))?;
842            index_writer.add_document(doc!(text_field=>"d"))?;
843        }
844        index_writer.commit()?;
845
846        index_writer.add_document(doc!(text_field=>"e"))?;
847        index_writer.add_document(doc!(text_field=>"f"))?;
848        index_writer.commit()?;
849
850        let seg_ids = index.searchable_segment_ids()?;
851        // docs exist, should have at least 1 segment
852        assert!(!seg_ids.is_empty());
853
854        let term_vals = vec!["a", "b", "c", "d", "e", "f"];
855        for term_val in term_vals {
856            let term = Term::from_field_text(text_field, term_val);
857            index_writer.delete_term(term);
858            index_writer.commit()?;
859        }
860
861        index_writer.wait_merging_threads()?;
862
863        let reader = index.reader()?;
864        assert_eq!(reader.searcher().num_docs(), 0);
865
866        let seg_ids = index.searchable_segment_ids()?;
867        assert!(seg_ids.is_empty());
868
869        reader.reload()?;
870        assert_eq!(reader.searcher().num_docs(), 0);
871        // empty segments should be erased
872        assert!(index.searchable_segment_metas()?.is_empty());
873        assert!(reader.searcher().segment_readers().is_empty());
874
875        Ok(())
876    }
877
878    #[test]
879    fn test_remove_all_segments() -> crate::Result<()> {
880        let mut schema_builder = Schema::builder();
881        let text_field = schema_builder.add_text_field("text", TEXT);
882        let index = Index::create_in_ram(schema_builder.build());
883
884        // writing the segment
885        let mut index_writer = index.writer_for_tests()?;
886        for _ in 0..100 {
887            index_writer.add_document(doc!(text_field=>"a"))?;
888            index_writer.add_document(doc!(text_field=>"b"))?;
889        }
890        index_writer.commit()?;
891
892        index_writer.segment_updater().remove_all_segments();
893        let seg_vec = index_writer
894            .segment_updater()
895            .segment_manager
896            .segment_entries();
897        assert!(seg_vec.is_empty());
898        Ok(())
899    }
900
901    #[test]
902    fn test_merge_segments() -> crate::Result<()> {
903        let mut indices = vec![];
904        let mut schema_builder = Schema::builder();
905        let text_field = schema_builder.add_text_field("text", TEXT);
906        let schema = schema_builder.build();
907
908        for _ in 0..3 {
909            let index = Index::create_in_ram(schema.clone());
910
911            // writing two segments
912            let mut index_writer = index.writer_for_tests()?;
913            for _ in 0..100 {
914                index_writer.add_document(doc!(text_field=>"fizz"))?;
915                index_writer.add_document(doc!(text_field=>"buzz"))?;
916            }
917            index_writer.commit()?;
918
919            for _ in 0..1000 {
920                index_writer.add_document(doc!(text_field=>"foo"))?;
921                index_writer.add_document(doc!(text_field=>"bar"))?;
922            }
923            index_writer.commit()?;
924            indices.push(index);
925        }
926
927        assert_eq!(indices.len(), 3);
928        let output_directory: Box<dyn Directory> = Box::<RamDirectory>::default();
929        let index = merge_indices(&indices, output_directory)?;
930        assert_eq!(index.schema(), schema);
931
932        let segments = index.searchable_segments()?;
933        assert_eq!(segments.len(), 1);
934
935        let segment_metas = segments[0].meta();
936        assert_eq!(segment_metas.num_deleted_docs(), 0);
937        assert_eq!(segment_metas.num_docs(), 6600);
938        Ok(())
939    }
940
941    #[test]
942    fn test_merge_empty_indices_array() {
943        let merge_result = merge_indices(&[], RamDirectory::default());
944        assert!(merge_result.is_err());
945    }
946
947    #[test]
948    fn test_merge_mismatched_schema() -> crate::Result<()> {
949        let first_index = {
950            let mut schema_builder = Schema::builder();
951            let text_field = schema_builder.add_text_field("text", TEXT);
952            let index = Index::create_in_ram(schema_builder.build());
953            let mut index_writer = index.writer_for_tests()?;
954            index_writer.add_document(doc!(text_field=>"some text"))?;
955            index_writer.commit()?;
956            index
957        };
958
959        let second_index = {
960            let mut schema_builder = Schema::builder();
961            let body_field = schema_builder.add_text_field("body", TEXT);
962            let index = Index::create_in_ram(schema_builder.build());
963            let mut index_writer = index.writer_for_tests()?;
964            index_writer.add_document(doc!(body_field=>"some body"))?;
965            index_writer.commit()?;
966            index
967        };
968
969        // mismatched schema index list
970        let result = merge_indices(&[first_index, second_index], RamDirectory::default());
971        assert!(result.is_err());
972
973        Ok(())
974    }
975
976    #[test]
977    fn test_merge_filtered_segments() -> crate::Result<()> {
978        let first_index = {
979            let mut schema_builder = Schema::builder();
980            let text_field = schema_builder.add_text_field("text", TEXT);
981            let index = Index::create_in_ram(schema_builder.build());
982            let mut index_writer = index.writer_for_tests()?;
983            index_writer.add_document(doc!(text_field=>"some text 1"))?;
984            index_writer.add_document(doc!(text_field=>"some text 2"))?;
985            index_writer.commit()?;
986            index
987        };
988
989        let second_index = {
990            let mut schema_builder = Schema::builder();
991            let text_field = schema_builder.add_text_field("text", TEXT);
992            let index = Index::create_in_ram(schema_builder.build());
993            let mut index_writer = index.writer_for_tests()?;
994            index_writer.add_document(doc!(text_field=>"some text 3"))?;
995            index_writer.add_document(doc!(text_field=>"some text 4"))?;
996            index_writer.delete_term(Term::from_field_text(text_field, "4"));
997
998            index_writer.commit()?;
999            index
1000        };
1001
1002        let mut segments: Vec<Segment> = Vec::new();
1003        segments.extend(first_index.searchable_segments()?);
1004        segments.extend(second_index.searchable_segments()?);
1005
1006        let target_settings = first_index.settings().clone();
1007
1008        let filter_segment_1 = AliveBitSet::for_test_from_deleted_docs(&[1], 2);
1009        let filter_segment_2 = AliveBitSet::for_test_from_deleted_docs(&[0], 2);
1010
1011        let filter_segments = vec![Some(filter_segment_1), Some(filter_segment_2)];
1012
1013        let merged_index = merge_filtered_segments(
1014            &segments,
1015            target_settings,
1016            filter_segments,
1017            RamDirectory::default(),
1018        )?;
1019
1020        let segments = merged_index.searchable_segments()?;
1021        assert_eq!(segments.len(), 1);
1022
1023        let segment_metas = segments[0].meta();
1024        assert_eq!(segment_metas.num_deleted_docs(), 0);
1025        assert_eq!(segment_metas.num_docs(), 1);
1026
1027        Ok(())
1028    }
1029
1030    #[test]
1031    fn test_merge_single_filtered_segments() -> crate::Result<()> {
1032        let first_index = {
1033            let mut schema_builder = Schema::builder();
1034            let text_field = schema_builder.add_text_field("text", TEXT);
1035            let index = Index::create_in_ram(schema_builder.build());
1036            let mut index_writer = index.writer_for_tests()?;
1037            index_writer.add_document(doc!(text_field=>"test text"))?;
1038            index_writer.add_document(doc!(text_field=>"some text 2"))?;
1039
1040            index_writer.add_document(doc!(text_field=>"some text 3"))?;
1041            index_writer.add_document(doc!(text_field=>"some text 4"))?;
1042
1043            index_writer.delete_term(Term::from_field_text(text_field, "4"));
1044
1045            index_writer.commit()?;
1046            index
1047        };
1048
1049        let mut segments: Vec<Segment> = Vec::new();
1050        segments.extend(first_index.searchable_segments()?);
1051
1052        let target_settings = first_index.settings().clone();
1053
1054        let filter_segment = AliveBitSet::for_test_from_deleted_docs(&[0], 4);
1055
1056        let filter_segments = vec![Some(filter_segment)];
1057
1058        let index = merge_filtered_segments(
1059            &segments,
1060            target_settings,
1061            filter_segments,
1062            RamDirectory::default(),
1063        )?;
1064
1065        let segments = index.searchable_segments()?;
1066        assert_eq!(segments.len(), 1);
1067
1068        let segment_metas = segments[0].meta();
1069        assert_eq!(segment_metas.num_deleted_docs(), 0);
1070        assert_eq!(segment_metas.num_docs(), 2);
1071
1072        let searcher = index.reader()?.searcher();
1073        {
1074            let text_field = index.schema().get_field("text").unwrap();
1075
1076            let do_search = |term: &str| {
1077                let query = QueryParser::for_index(&index, vec![text_field])
1078                    .parse_query(term)
1079                    .unwrap();
1080                let top_docs: Vec<(f32, DocAddress)> =
1081                    searcher.search(&query, &TopDocs::with_limit(3)).unwrap();
1082
1083                top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>()
1084            };
1085
1086            assert_eq!(do_search("test"), vec![] as Vec<u32>);
1087            assert_eq!(do_search("text"), vec![0, 1]);
1088        }
1089
1090        Ok(())
1091    }
1092
1093    #[test]
1094    fn test_apply_doc_id_filter_in_merger() -> crate::Result<()> {
1095        let first_index = {
1096            let mut schema_builder = Schema::builder();
1097            let text_field = schema_builder.add_text_field("text", TEXT);
1098            let index = Index::create_in_ram(schema_builder.build());
1099            let mut index_writer = index.writer_for_tests()?;
1100            index_writer.add_document(doc!(text_field=>"some text 1"))?;
1101            index_writer.add_document(doc!(text_field=>"some text 2"))?;
1102
1103            index_writer.add_document(doc!(text_field=>"some text 3"))?;
1104            index_writer.add_document(doc!(text_field=>"some text 4"))?;
1105
1106            index_writer.delete_term(Term::from_field_text(text_field, "4"));
1107
1108            index_writer.commit()?;
1109            index
1110        };
1111
1112        let mut segments: Vec<Segment> = Vec::new();
1113        segments.extend(first_index.searchable_segments()?);
1114
1115        let target_settings = first_index.settings().clone();
1116        {
1117            let filter_segment = AliveBitSet::for_test_from_deleted_docs(&[1], 4);
1118            let filter_segments = vec![Some(filter_segment)];
1119            let target_schema = segments[0].schema();
1120            let merged_index = Index::create(
1121                RamDirectory::default(),
1122                target_schema,
1123                target_settings.clone(),
1124            )?;
1125            let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
1126                merged_index.schema(),
1127                merged_index.settings().clone(),
1128                &segments[..],
1129                filter_segments,
1130            )?;
1131
1132            let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();
1133            assert_eq!(doc_ids_alive, vec![0, 2]);
1134        }
1135
1136        {
1137            let filter_segments = vec![None];
1138            let target_schema = segments[0].schema();
1139            let merged_index =
1140                Index::create(RamDirectory::default(), target_schema, target_settings)?;
1141            let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
1142                merged_index.schema(),
1143                merged_index.settings().clone(),
1144                &segments[..],
1145                filter_segments,
1146            )?;
1147
1148            let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();
1149            assert_eq!(doc_ids_alive, vec![0, 1, 2]);
1150        }
1151
1152        Ok(())
1153    }
1154}