1use std::borrow::BorrowMut;
2use std::collections::HashSet;
3use std::io::Write;
4use std::ops::Deref;
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, RwLock};
8
9use fail::fail_point;
10use rayon::{ThreadPool, ThreadPoolBuilder};
11
12use super::segment_manager::SegmentManager;
13use crate::core::{
14 Index, IndexMeta, IndexSettings, Segment, SegmentId, SegmentMeta, META_FILEPATH,
15};
16use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
17use crate::fastfield::AliveBitSet;
18use crate::indexer::delete_queue::DeleteCursor;
19use crate::indexer::index_writer::advance_deletes;
20use crate::indexer::merge_operation::MergeOperationInventory;
21use crate::indexer::merger::IndexMerger;
22use crate::indexer::segment_manager::SegmentsStatus;
23use crate::indexer::stamper::Stamper;
24use crate::indexer::{
25 DefaultMergePolicy, MergeCandidate, MergeOperation, MergePolicy, SegmentEntry,
26 SegmentSerializer,
27};
28use crate::{FutureResult, Opstamp};
29
30const NUM_MERGE_THREADS: usize = 4;
31
32pub(crate) fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate::Result<()> {
42 info!("save metas");
43 let mut buffer = serde_json::to_vec_pretty(metas)?;
44 writeln!(&mut buffer)?;
46 fail_point!("save_metas", |msg| Err(crate::TantivyError::from(
47 std::io::Error::new(
48 std::io::ErrorKind::Other,
49 msg.unwrap_or_else(|| "Undefined".to_string())
50 )
51 )));
52 directory.sync_directory()?;
53 directory.atomic_write(&META_FILEPATH, &buffer[..])?;
54 debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
55 Ok(())
56}
57
58#[derive(Clone)]
67pub(crate) struct SegmentUpdater(Arc<InnerSegmentUpdater>);
68
69impl Deref for SegmentUpdater {
70 type Target = InnerSegmentUpdater;
71
72 #[inline]
73 fn deref(&self) -> &Self::Target {
74 &self.0
75 }
76}
77
78fn garbage_collect_files(
79 segment_updater: SegmentUpdater,
80) -> crate::Result<GarbageCollectionResult> {
81 info!("Running garbage collection");
82 let mut index = segment_updater.index.clone();
83 index
84 .directory_mut()
85 .garbage_collect(move || segment_updater.list_files())
86}
87
88fn merge(
93 index: &Index,
94 mut segment_entries: Vec<SegmentEntry>,
95 target_opstamp: Opstamp,
96 override_segment_attributes: Option<serde_json::Value>,
97) -> crate::Result<Option<SegmentEntry>> {
98 let num_docs = segment_entries
99 .iter()
100 .map(|segment| segment.meta().num_docs() as u64)
101 .sum::<u64>();
102 if num_docs == 0 {
103 return Ok(None);
104 }
105
106 let segment_attributes = override_segment_attributes.or_else(|| {
107 index
108 .segment_attributes_merger()
109 .as_ref()
110 .map(|segment_attributes_merger| {
111 let current_segment_attributes: Vec<_> = segment_entries
112 .iter()
113 .filter_map(|segment_entry| segment_entry.meta().segment_attributes().as_ref())
114 .collect();
115 segment_attributes_merger.merge_json(current_segment_attributes)
116 })
117 });
118
119 let delete_cursor = segment_entries[0].delete_cursor().clone();
120
121 if segment_entries.len() == 1 && !segment_entries[0].meta().has_deletes() {
122 let original_segment = segment_entries.into_iter().nth(0).unwrap();
123 let new_segment_meta = match segment_attributes {
124 Some(segment_attributes) => original_segment
125 .meta()
126 .clone()
127 .with_segment_attributes(segment_attributes),
128 None => original_segment.meta().clone(),
129 };
130 return Ok(Some(SegmentEntry::new(
131 new_segment_meta,
132 delete_cursor,
133 None,
134 )));
135 }
136
137 let merged_segment = index.new_segment();
139
140 for segment_entry in &mut segment_entries {
142 let segment = index.segment(segment_entry.meta().clone());
143 advance_deletes(segment, segment_entry, target_opstamp)?;
144 }
145
146 let segments: Vec<Segment> = segment_entries
147 .iter()
148 .map(|segment_entry| index.segment(segment_entry.meta().clone()))
149 .collect();
150
151 let merger: IndexMerger =
153 IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?;
154
155 let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone(), true)?;
157
158 let num_docs = merger.write(segment_serializer)?;
159
160 let merged_segment_id = merged_segment.id();
161
162 let segment_meta = index.new_segment_meta(merged_segment_id, num_docs, segment_attributes);
163 Ok(Some(SegmentEntry::new(segment_meta, delete_cursor, None)))
164}
165
166#[doc(hidden)]
178pub fn merge_indices<T: Into<Box<dyn Directory>>>(
179 indices: &[Index],
180 output_directory: T,
181) -> crate::Result<Index> {
182 if indices.is_empty() {
183 return Err(crate::TantivyError::InvalidArgument(
185 "No indices given to merge".to_string(),
186 ));
187 }
188
189 let target_settings = indices[0].settings().clone();
190
191 if indices
193 .iter()
194 .skip(1)
195 .any(|index| index.settings() != &target_settings)
196 {
197 return Err(crate::TantivyError::InvalidArgument(
198 "Attempt to merge indices with different index_settings".to_string(),
199 ));
200 }
201
202 let mut segments: Vec<Segment> = Vec::new();
203 for index in indices {
204 segments.extend(index.searchable_segments()?);
205 }
206
207 let non_filter = segments.iter().map(|_| None).collect::<Vec<_>>();
208 merge_filtered_segments(&segments, target_settings, non_filter, output_directory)
209}
210
211#[doc(hidden)]
224pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
225 segments: &[Segment],
226 target_settings: IndexSettings,
227 filter_doc_ids: Vec<Option<AliveBitSet>>,
228 output_directory: T,
229) -> crate::Result<Index> {
230 if segments.is_empty() {
231 return Err(crate::TantivyError::InvalidArgument(
233 "No segments given to merge".to_string(),
234 ));
235 }
236
237 let target_schema = segments[0].schema();
238
239 if segments
241 .iter()
242 .skip(1)
243 .any(|index| index.schema() != target_schema)
244 {
245 return Err(crate::TantivyError::InvalidArgument(
246 "Attempt to merge different schema indices".to_string(),
247 ));
248 }
249
250 let mut merged_index = Index::create(
251 output_directory,
252 target_schema.clone(),
253 target_settings.clone(),
254 )?;
255 let merged_segment = merged_index.new_segment();
256 let merged_segment_id = merged_segment.id();
257 let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
258 merged_index.schema(),
259 merged_index.settings().clone(),
260 segments,
261 filter_doc_ids,
262 )?;
263 let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?;
264 let num_docs = merger.write(segment_serializer)?;
265
266 let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs, None);
267
268 let stats = format!(
269 "Segments Merge: [{}]",
270 segments
271 .iter()
272 .fold(String::new(), |sum, current| format!(
273 "{}{} ",
274 sum,
275 current.meta().id().uuid_string()
276 ))
277 .trim_end()
278 );
279
280 let index_meta = IndexMeta {
281 index_settings: target_settings, segments: vec![segment_meta],
283 schema: target_schema,
284 opstamp: 0u64,
285 payload: Some(stats),
286 index_attributes: None,
287 };
288
289 save_metas(&index_meta, merged_index.directory_mut())?;
291
292 Ok(merged_index)
293}
294
295pub(crate) struct InnerSegmentUpdater {
296 active_index_meta: RwLock<Arc<IndexMeta>>,
303 pool: ThreadPool,
304 merge_thread_pool: ThreadPool,
305
306 index: Index,
307 segment_manager: SegmentManager,
308 merge_policy: RwLock<Arc<dyn MergePolicy>>,
309 killed: AtomicBool,
310 stamper: Stamper,
311 merge_operations: MergeOperationInventory,
312}
313
314impl SegmentUpdater {
315 pub fn create(
316 index: Index,
317 stamper: Stamper,
318 delete_cursor: &DeleteCursor,
319 ) -> crate::Result<SegmentUpdater> {
320 let segments = index.searchable_segment_metas()?;
321 let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
322 let pool = ThreadPoolBuilder::new()
323 .thread_name(|_| "segment_updater".to_string())
324 .num_threads(1)
325 .build()
326 .map_err(|_| {
327 crate::TantivyError::SystemError(
328 "Failed to spawn segment updater thread".to_string(),
329 )
330 })?;
331 let merge_thread_pool = ThreadPoolBuilder::new()
332 .thread_name(|i| format!("merge_thread_{i}"))
333 .num_threads(NUM_MERGE_THREADS)
334 .build()
335 .map_err(|_| {
336 crate::TantivyError::SystemError(
337 "Failed to spawn segment merging thread".to_string(),
338 )
339 })?;
340 let index_meta = index.load_metas()?;
341 Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
342 active_index_meta: RwLock::new(Arc::new(index_meta)),
343 pool,
344 merge_thread_pool,
345 index,
346 segment_manager,
347 merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
348 killed: AtomicBool::new(false),
349 stamper,
350 merge_operations: Default::default(),
351 })))
352 }
353
354 pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
355 self.merge_policy.read().unwrap().clone()
356 }
357
358 pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
359 let arc_merge_policy = Arc::from(merge_policy);
360 *self.merge_policy.write().unwrap() = arc_merge_policy;
361 }
362
363 fn schedule_task<T: 'static + Send, F: FnOnce() -> crate::Result<T> + 'static + Send>(
364 &self,
365 task: F,
366 ) -> FutureResult<T> {
367 if !self.is_alive() {
368 return crate::TantivyError::SystemError("Segment updater killed".to_string()).into();
369 }
370 let (scheduled_result, sender) = FutureResult::create(
371 "A segment_updater future did not succeed. This should never happen.",
372 );
373 self.pool.spawn(|| {
374 let task_result = task();
375 let _ = sender.send(task_result);
376 });
377 scheduled_result
378 }
379
380 pub fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> FutureResult<()> {
381 let segment_updater = self.clone();
382 self.schedule_task(move || {
383 segment_updater.segment_manager.add_segment(segment_entry);
384 segment_updater.consider_merge_options();
385 Ok(())
386 })
387 }
388
389 pub(crate) fn remove_all_segments(&self) {
391 self.segment_manager.remove_all_segments();
392 }
393
394 pub fn kill(&mut self) {
395 self.killed.store(true, Ordering::Release);
396 }
397
398 pub fn is_alive(&self) -> bool {
399 !self.killed.load(Ordering::Acquire)
400 }
401
402 fn purge_deletes(&self, target_opstamp: Opstamp) -> crate::Result<Vec<SegmentEntry>> {
407 let mut segment_entries = self.segment_manager.segment_entries();
408 for segment_entry in &mut segment_entries {
409 let segment = self.index.segment(segment_entry.meta().clone());
410 advance_deletes(segment, segment_entry, target_opstamp)?;
411 }
412 Ok(segment_entries)
413 }
414
415 pub fn save_metas(
416 &self,
417 opstamp: Opstamp,
418 commit_message: Option<String>,
419 ) -> crate::Result<()> {
420 if self.is_alive() {
421 let index = &self.index;
422 let directory = index.directory();
423 let mut commited_segment_metas = self.segment_manager.committed_segment_metas();
424
425 commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
439 let index_meta = IndexMeta {
440 index_settings: index.settings().clone(),
441 segments: commited_segment_metas,
442 schema: index.schema(),
443 opstamp,
444 payload: commit_message,
445 index_attributes: self
446 .active_index_meta
447 .read()
448 .unwrap()
449 .index_attributes
450 .clone(),
451 };
452 save_metas(&index_meta, directory.box_clone().borrow_mut())?;
454 self.store_meta(&index_meta);
455 }
456 Ok(())
457 }
458
459 pub fn schedule_garbage_collect(&self) -> FutureResult<GarbageCollectionResult> {
460 let self_clone = self.clone();
461 self.schedule_task(move || garbage_collect_files(self_clone))
462 }
463
464 fn list_files(&self) -> HashSet<PathBuf> {
469 let mut files: HashSet<PathBuf> = self
470 .index
471 .list_all_segment_metas()
472 .into_iter()
473 .flat_map(|segment_meta| segment_meta.list_files())
474 .collect();
475 files.insert(META_FILEPATH.to_path_buf());
476 files
477 }
478
479 pub(crate) fn schedule_commit(
480 &self,
481 opstamp: Opstamp,
482 payload: Option<String>,
483 ) -> FutureResult<Opstamp> {
484 let segment_updater: SegmentUpdater = self.clone();
485 self.schedule_task(move || {
486 let segment_entries = segment_updater.purge_deletes(opstamp)?;
487 segment_updater.segment_manager.commit(segment_entries);
488 segment_updater.save_metas(opstamp, payload)?;
489 let _ = garbage_collect_files(segment_updater.clone());
490 segment_updater.consider_merge_options();
491 Ok(opstamp)
492 })
493 }
494
495 fn store_meta(&self, index_meta: &IndexMeta) {
496 *self.active_index_meta.write().unwrap() = Arc::new(index_meta.clone());
497 }
498
499 fn load_meta(&self) -> Arc<IndexMeta> {
500 self.active_index_meta.read().unwrap().clone()
501 }
502
503 pub(crate) fn make_merge_operation(
504 &self,
505 segment_ids: &[SegmentId],
506 segment_attributes: Option<serde_json::Value>,
507 ) -> MergeOperation {
508 let commit_opstamp = self.load_meta().opstamp;
509 MergeOperation::new(
510 &self.merge_operations,
511 commit_opstamp,
512 segment_ids.to_vec(),
513 segment_attributes,
514 )
515 }
516
517 pub fn start_merge(
535 &self,
536 merge_operation: MergeOperation,
537 ) -> FutureResult<Option<SegmentMeta>> {
538 assert!(
539 !merge_operation.segment_ids().is_empty(),
540 "Segment_ids cannot be empty."
541 );
542
543 let segment_updater = self.clone();
544 let segment_entries: Vec<SegmentEntry> = match self
545 .segment_manager
546 .start_merge(merge_operation.segment_ids())
547 {
548 Ok(segment_entries) => segment_entries,
549 Err(err) => {
550 warn!(
551 "Starting the merge failed for the following reason. This is not fatal. {}",
552 err
553 );
554 return err.into();
555 }
556 };
557
558 info!("Starting merge - {:?}", merge_operation.segment_ids());
559
560 let (scheduled_result, merging_future_send) =
561 FutureResult::create("Merge operation failed.");
562
563 self.merge_thread_pool.spawn(move || {
564 match merge(
569 &segment_updater.index,
570 segment_entries,
571 merge_operation.target_opstamp(),
572 merge_operation.segment_attributes().clone(),
573 ) {
574 Ok(after_merge_segment_entry) => {
575 let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry);
576 let _send_result = merging_future_send.send(res);
577 }
578 Err(merge_error) => {
579 warn!(
580 "Merge of {:?} was cancelled: {:?}",
581 merge_operation.segment_ids().to_vec(),
582 merge_error
583 );
584 if cfg!(test) {
585 panic!("{:?}", merge_error);
586 }
587 let _send_result = merging_future_send.send(Err(merge_error));
588 }
589 }
590 });
591
592 scheduled_result
593 }
594
595 pub(crate) fn get_mergeable_segments(&self) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
596 let merge_segment_ids: HashSet<SegmentId> = self.merge_operations.segment_in_merge();
597 self.segment_manager
598 .get_mergeable_segments(&merge_segment_ids)
599 }
600
601 fn consider_merge_options(&self) {
602 let (committed_segments, uncommitted_segments) = self.get_mergeable_segments();
603
604 let merge_policy = self.get_merge_policy();
607
608 let current_opstamp = self.stamper.stamp();
609 let mut merge_candidates: Vec<MergeOperation> = merge_policy
610 .compute_merge_candidates(&uncommitted_segments)
611 .into_iter()
612 .map(|merge_candidate| {
613 MergeOperation::new(
614 &self.merge_operations,
615 current_opstamp,
616 merge_candidate.0,
617 None,
618 )
619 })
620 .collect();
621
622 let commit_opstamp = self.load_meta().opstamp;
623 let committed_merge_candidates = merge_policy
624 .compute_merge_candidates(&committed_segments)
625 .into_iter()
626 .map(|merge_candidate: MergeCandidate| {
627 MergeOperation::new(
628 &self.merge_operations,
629 commit_opstamp,
630 merge_candidate.0,
631 None,
632 )
633 });
634 merge_candidates.extend(committed_merge_candidates);
635
636 for merge_operation in merge_candidates {
637 drop(self.start_merge(merge_operation));
640 }
641 }
642
643 fn end_merge(
645 &self,
646 merge_operation: MergeOperation,
647 mut after_merge_segment_entry: Option<SegmentEntry>,
648 ) -> crate::Result<Option<SegmentMeta>> {
649 let segment_updater = self.clone();
650 let after_merge_segment_meta = after_merge_segment_entry
651 .as_ref()
652 .map(|after_merge_segment_entry| after_merge_segment_entry.meta().clone());
653 self.schedule_task(move || {
654 info!(
655 "End merge {:?}",
656 after_merge_segment_entry.as_ref().map(|entry| entry.meta())
657 );
658 {
659 if let Some(after_merge_segment_entry) = after_merge_segment_entry.as_mut() {
660 let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
661 if let Some(delete_operation) = delete_cursor.get() {
662 let committed_opstamp = segment_updater.load_meta().opstamp;
663 if delete_operation.opstamp < committed_opstamp {
664 let index = &segment_updater.index;
665 let segment = index.segment(after_merge_segment_entry.meta().clone());
666 if let Err(advance_deletes_err) = advance_deletes(
667 segment,
668 after_merge_segment_entry,
669 committed_opstamp,
670 ) {
671 error!(
672 "Merge of {:?} was cancelled (advancing deletes failed): {:?}",
673 merge_operation.segment_ids(),
674 advance_deletes_err
675 );
676 assert!(!cfg!(test), "Merge failed.");
677
678 return Err(advance_deletes_err);
682 }
683 }
684 }
685 }
686 let previous_metas = segment_updater.load_meta();
687 let segments_status = segment_updater
688 .segment_manager
689 .end_merge(merge_operation.segment_ids(), after_merge_segment_entry)?;
690
691 if segments_status == SegmentsStatus::Committed {
692 segment_updater
693 .save_metas(previous_metas.opstamp, previous_metas.payload.clone())?;
694 }
695
696 segment_updater.consider_merge_options();
697 } let _ = garbage_collect_files(segment_updater);
700 Ok(())
701 })
702 .wait()?;
703 Ok(after_merge_segment_meta)
704 }
705
706 pub fn wait_merging_thread(&self) -> crate::Result<()> {
722 self.merge_operations.wait_until_empty();
723 Ok(())
724 }
725}
726
727#[cfg(test)]
728mod tests {
729 use super::merge_indices;
730 use crate::collector::TopDocs;
731 use crate::directory::RamDirectory;
732 use crate::fastfield::AliveBitSet;
733 use crate::indexer::merge_policy::tests::MergeWheneverPossible;
734 use crate::indexer::merger::IndexMerger;
735 use crate::indexer::segment_updater::merge_filtered_segments;
736 use crate::query::QueryParser;
737 use crate::schema::*;
738 use crate::{Directory, DocAddress, Index, Segment};
739
740 #[test]
741 fn test_delete_during_merge() -> crate::Result<()> {
742 let mut schema_builder = Schema::builder();
743 let text_field = schema_builder.add_text_field("text", TEXT);
744 let index = Index::create_in_ram(schema_builder.build());
745
746 let mut index_writer = index.writer_for_tests()?;
748 index_writer.set_merge_policy(Box::new(MergeWheneverPossible));
749
750 for _ in 0..100 {
751 index_writer.add_document(doc!(text_field=>"a"))?;
752 index_writer.add_document(doc!(text_field=>"b"))?;
753 }
754 index_writer.commit()?;
755
756 for _ in 0..100 {
757 index_writer.add_document(doc!(text_field=>"c"))?;
758 index_writer.add_document(doc!(text_field=>"d"))?;
759 }
760 index_writer.commit()?;
761
762 index_writer.add_document(doc!(text_field=>"e"))?;
763 index_writer.add_document(doc!(text_field=>"f"))?;
764 index_writer.commit()?;
765
766 let term = Term::from_field_text(text_field, "a");
767 index_writer.delete_term(term);
768 index_writer.commit()?;
769
770 let reader = index.reader()?;
771 assert_eq!(reader.searcher().num_docs(), 302);
772
773 index_writer.wait_merging_threads()?;
774
775 reader.reload()?;
776 assert_eq!(reader.searcher().segment_readers().len(), 1);
777 assert_eq!(reader.searcher().num_docs(), 302);
778 Ok(())
779 }
780
781 #[test]
782 fn delete_all_docs_min() -> crate::Result<()> {
783 let mut schema_builder = Schema::builder();
784 let text_field = schema_builder.add_text_field("text", TEXT);
785 let index = Index::create_in_ram(schema_builder.build());
786
787 let mut index_writer = index.writer_for_tests()?;
789
790 for _ in 0..10 {
791 index_writer.add_document(doc!(text_field=>"a"))?;
792 index_writer.add_document(doc!(text_field=>"b"))?;
793 }
794 index_writer.commit()?;
795
796 let seg_ids = index.searchable_segment_ids()?;
797 assert!(!seg_ids.is_empty());
799
800 let term = Term::from_field_text(text_field, "a");
801 index_writer.delete_term(term);
802 index_writer.commit()?;
803
804 let term = Term::from_field_text(text_field, "b");
805 index_writer.delete_term(term);
806 index_writer.commit()?;
807
808 index_writer.wait_merging_threads()?;
809
810 let reader = index.reader()?;
811 assert_eq!(reader.searcher().num_docs(), 0);
812
813 let seg_ids = index.searchable_segment_ids()?;
814 assert!(seg_ids.is_empty());
815
816 reader.reload()?;
817 assert_eq!(reader.searcher().num_docs(), 0);
818 assert!(index.searchable_segment_metas()?.is_empty());
820 assert!(reader.searcher().segment_readers().is_empty());
821
822 Ok(())
823 }
824
825 #[test]
826 fn delete_all_docs() -> crate::Result<()> {
827 let mut schema_builder = Schema::builder();
828 let text_field = schema_builder.add_text_field("text", TEXT);
829 let index = Index::create_in_ram(schema_builder.build());
830
831 let mut index_writer = index.writer_for_tests()?;
833
834 for _ in 0..100 {
835 index_writer.add_document(doc!(text_field=>"a"))?;
836 index_writer.add_document(doc!(text_field=>"b"))?;
837 }
838 index_writer.commit()?;
839
840 for _ in 0..100 {
841 index_writer.add_document(doc!(text_field=>"c"))?;
842 index_writer.add_document(doc!(text_field=>"d"))?;
843 }
844 index_writer.commit()?;
845
846 index_writer.add_document(doc!(text_field=>"e"))?;
847 index_writer.add_document(doc!(text_field=>"f"))?;
848 index_writer.commit()?;
849
850 let seg_ids = index.searchable_segment_ids()?;
851 assert!(!seg_ids.is_empty());
853
854 let term_vals = vec!["a", "b", "c", "d", "e", "f"];
855 for term_val in term_vals {
856 let term = Term::from_field_text(text_field, term_val);
857 index_writer.delete_term(term);
858 index_writer.commit()?;
859 }
860
861 index_writer.wait_merging_threads()?;
862
863 let reader = index.reader()?;
864 assert_eq!(reader.searcher().num_docs(), 0);
865
866 let seg_ids = index.searchable_segment_ids()?;
867 assert!(seg_ids.is_empty());
868
869 reader.reload()?;
870 assert_eq!(reader.searcher().num_docs(), 0);
871 assert!(index.searchable_segment_metas()?.is_empty());
873 assert!(reader.searcher().segment_readers().is_empty());
874
875 Ok(())
876 }
877
878 #[test]
879 fn test_remove_all_segments() -> crate::Result<()> {
880 let mut schema_builder = Schema::builder();
881 let text_field = schema_builder.add_text_field("text", TEXT);
882 let index = Index::create_in_ram(schema_builder.build());
883
884 let mut index_writer = index.writer_for_tests()?;
886 for _ in 0..100 {
887 index_writer.add_document(doc!(text_field=>"a"))?;
888 index_writer.add_document(doc!(text_field=>"b"))?;
889 }
890 index_writer.commit()?;
891
892 index_writer.segment_updater().remove_all_segments();
893 let seg_vec = index_writer
894 .segment_updater()
895 .segment_manager
896 .segment_entries();
897 assert!(seg_vec.is_empty());
898 Ok(())
899 }
900
901 #[test]
902 fn test_merge_segments() -> crate::Result<()> {
903 let mut indices = vec![];
904 let mut schema_builder = Schema::builder();
905 let text_field = schema_builder.add_text_field("text", TEXT);
906 let schema = schema_builder.build();
907
908 for _ in 0..3 {
909 let index = Index::create_in_ram(schema.clone());
910
911 let mut index_writer = index.writer_for_tests()?;
913 for _ in 0..100 {
914 index_writer.add_document(doc!(text_field=>"fizz"))?;
915 index_writer.add_document(doc!(text_field=>"buzz"))?;
916 }
917 index_writer.commit()?;
918
919 for _ in 0..1000 {
920 index_writer.add_document(doc!(text_field=>"foo"))?;
921 index_writer.add_document(doc!(text_field=>"bar"))?;
922 }
923 index_writer.commit()?;
924 indices.push(index);
925 }
926
927 assert_eq!(indices.len(), 3);
928 let output_directory: Box<dyn Directory> = Box::<RamDirectory>::default();
929 let index = merge_indices(&indices, output_directory)?;
930 assert_eq!(index.schema(), schema);
931
932 let segments = index.searchable_segments()?;
933 assert_eq!(segments.len(), 1);
934
935 let segment_metas = segments[0].meta();
936 assert_eq!(segment_metas.num_deleted_docs(), 0);
937 assert_eq!(segment_metas.num_docs(), 6600);
938 Ok(())
939 }
940
941 #[test]
942 fn test_merge_empty_indices_array() {
943 let merge_result = merge_indices(&[], RamDirectory::default());
944 assert!(merge_result.is_err());
945 }
946
947 #[test]
948 fn test_merge_mismatched_schema() -> crate::Result<()> {
949 let first_index = {
950 let mut schema_builder = Schema::builder();
951 let text_field = schema_builder.add_text_field("text", TEXT);
952 let index = Index::create_in_ram(schema_builder.build());
953 let mut index_writer = index.writer_for_tests()?;
954 index_writer.add_document(doc!(text_field=>"some text"))?;
955 index_writer.commit()?;
956 index
957 };
958
959 let second_index = {
960 let mut schema_builder = Schema::builder();
961 let body_field = schema_builder.add_text_field("body", TEXT);
962 let index = Index::create_in_ram(schema_builder.build());
963 let mut index_writer = index.writer_for_tests()?;
964 index_writer.add_document(doc!(body_field=>"some body"))?;
965 index_writer.commit()?;
966 index
967 };
968
969 let result = merge_indices(&[first_index, second_index], RamDirectory::default());
971 assert!(result.is_err());
972
973 Ok(())
974 }
975
976 #[test]
977 fn test_merge_filtered_segments() -> crate::Result<()> {
978 let first_index = {
979 let mut schema_builder = Schema::builder();
980 let text_field = schema_builder.add_text_field("text", TEXT);
981 let index = Index::create_in_ram(schema_builder.build());
982 let mut index_writer = index.writer_for_tests()?;
983 index_writer.add_document(doc!(text_field=>"some text 1"))?;
984 index_writer.add_document(doc!(text_field=>"some text 2"))?;
985 index_writer.commit()?;
986 index
987 };
988
989 let second_index = {
990 let mut schema_builder = Schema::builder();
991 let text_field = schema_builder.add_text_field("text", TEXT);
992 let index = Index::create_in_ram(schema_builder.build());
993 let mut index_writer = index.writer_for_tests()?;
994 index_writer.add_document(doc!(text_field=>"some text 3"))?;
995 index_writer.add_document(doc!(text_field=>"some text 4"))?;
996 index_writer.delete_term(Term::from_field_text(text_field, "4"));
997
998 index_writer.commit()?;
999 index
1000 };
1001
1002 let mut segments: Vec<Segment> = Vec::new();
1003 segments.extend(first_index.searchable_segments()?);
1004 segments.extend(second_index.searchable_segments()?);
1005
1006 let target_settings = first_index.settings().clone();
1007
1008 let filter_segment_1 = AliveBitSet::for_test_from_deleted_docs(&[1], 2);
1009 let filter_segment_2 = AliveBitSet::for_test_from_deleted_docs(&[0], 2);
1010
1011 let filter_segments = vec![Some(filter_segment_1), Some(filter_segment_2)];
1012
1013 let merged_index = merge_filtered_segments(
1014 &segments,
1015 target_settings,
1016 filter_segments,
1017 RamDirectory::default(),
1018 )?;
1019
1020 let segments = merged_index.searchable_segments()?;
1021 assert_eq!(segments.len(), 1);
1022
1023 let segment_metas = segments[0].meta();
1024 assert_eq!(segment_metas.num_deleted_docs(), 0);
1025 assert_eq!(segment_metas.num_docs(), 1);
1026
1027 Ok(())
1028 }
1029
1030 #[test]
1031 fn test_merge_single_filtered_segments() -> crate::Result<()> {
1032 let first_index = {
1033 let mut schema_builder = Schema::builder();
1034 let text_field = schema_builder.add_text_field("text", TEXT);
1035 let index = Index::create_in_ram(schema_builder.build());
1036 let mut index_writer = index.writer_for_tests()?;
1037 index_writer.add_document(doc!(text_field=>"test text"))?;
1038 index_writer.add_document(doc!(text_field=>"some text 2"))?;
1039
1040 index_writer.add_document(doc!(text_field=>"some text 3"))?;
1041 index_writer.add_document(doc!(text_field=>"some text 4"))?;
1042
1043 index_writer.delete_term(Term::from_field_text(text_field, "4"));
1044
1045 index_writer.commit()?;
1046 index
1047 };
1048
1049 let mut segments: Vec<Segment> = Vec::new();
1050 segments.extend(first_index.searchable_segments()?);
1051
1052 let target_settings = first_index.settings().clone();
1053
1054 let filter_segment = AliveBitSet::for_test_from_deleted_docs(&[0], 4);
1055
1056 let filter_segments = vec![Some(filter_segment)];
1057
1058 let index = merge_filtered_segments(
1059 &segments,
1060 target_settings,
1061 filter_segments,
1062 RamDirectory::default(),
1063 )?;
1064
1065 let segments = index.searchable_segments()?;
1066 assert_eq!(segments.len(), 1);
1067
1068 let segment_metas = segments[0].meta();
1069 assert_eq!(segment_metas.num_deleted_docs(), 0);
1070 assert_eq!(segment_metas.num_docs(), 2);
1071
1072 let searcher = index.reader()?.searcher();
1073 {
1074 let text_field = index.schema().get_field("text").unwrap();
1075
1076 let do_search = |term: &str| {
1077 let query = QueryParser::for_index(&index, vec![text_field])
1078 .parse_query(term)
1079 .unwrap();
1080 let top_docs: Vec<(f32, DocAddress)> =
1081 searcher.search(&query, &TopDocs::with_limit(3)).unwrap();
1082
1083 top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>()
1084 };
1085
1086 assert_eq!(do_search("test"), vec![] as Vec<u32>);
1087 assert_eq!(do_search("text"), vec![0, 1]);
1088 }
1089
1090 Ok(())
1091 }
1092
1093 #[test]
1094 fn test_apply_doc_id_filter_in_merger() -> crate::Result<()> {
1095 let first_index = {
1096 let mut schema_builder = Schema::builder();
1097 let text_field = schema_builder.add_text_field("text", TEXT);
1098 let index = Index::create_in_ram(schema_builder.build());
1099 let mut index_writer = index.writer_for_tests()?;
1100 index_writer.add_document(doc!(text_field=>"some text 1"))?;
1101 index_writer.add_document(doc!(text_field=>"some text 2"))?;
1102
1103 index_writer.add_document(doc!(text_field=>"some text 3"))?;
1104 index_writer.add_document(doc!(text_field=>"some text 4"))?;
1105
1106 index_writer.delete_term(Term::from_field_text(text_field, "4"));
1107
1108 index_writer.commit()?;
1109 index
1110 };
1111
1112 let mut segments: Vec<Segment> = Vec::new();
1113 segments.extend(first_index.searchable_segments()?);
1114
1115 let target_settings = first_index.settings().clone();
1116 {
1117 let filter_segment = AliveBitSet::for_test_from_deleted_docs(&[1], 4);
1118 let filter_segments = vec![Some(filter_segment)];
1119 let target_schema = segments[0].schema();
1120 let merged_index = Index::create(
1121 RamDirectory::default(),
1122 target_schema,
1123 target_settings.clone(),
1124 )?;
1125 let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
1126 merged_index.schema(),
1127 merged_index.settings().clone(),
1128 &segments[..],
1129 filter_segments,
1130 )?;
1131
1132 let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();
1133 assert_eq!(doc_ids_alive, vec![0, 2]);
1134 }
1135
1136 {
1137 let filter_segments = vec![None];
1138 let target_schema = segments[0].schema();
1139 let merged_index =
1140 Index::create(RamDirectory::default(), target_schema, target_settings)?;
1141 let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
1142 merged_index.schema(),
1143 merged_index.settings().clone(),
1144 &segments[..],
1145 filter_segments,
1146 )?;
1147
1148 let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();
1149 assert_eq!(doc_ids_alive, vec![0, 1, 2]);
1150 }
1151
1152 Ok(())
1153 }
1154}