1use std::fmt::Debug;
7
8#[cfg(feature = "native")]
9mod segment_manager;
10#[cfg(feature = "native")]
11pub use segment_manager::SegmentManager;
12
13#[derive(Debug, Clone)]
15pub struct SegmentInfo {
16 pub id: String,
18 pub num_docs: u32,
20}
21
22#[derive(Debug, Clone)]
24pub struct MergeCandidate {
25 pub segment_ids: Vec<String>,
27}
28
29pub trait MergePolicy: Send + Sync + Debug {
33 fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate>;
36
37 fn clone_box(&self) -> Box<dyn MergePolicy>;
39}
40
41impl Clone for Box<dyn MergePolicy> {
42 fn clone(&self) -> Self {
43 self.clone_box()
44 }
45}
46
47#[derive(Debug, Clone, Default)]
49pub struct NoMergePolicy;
50
51impl MergePolicy for NoMergePolicy {
52 fn find_merges(&self, _segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
53 Vec::new()
54 }
55
56 fn clone_box(&self) -> Box<dyn MergePolicy> {
57 Box::new(self.clone())
58 }
59}
60
61#[derive(Debug, Clone)]
77pub struct TieredMergePolicy {
78 pub segments_per_tier: usize,
80 pub max_merge_at_once: usize,
83 pub tier_factor: f64,
85 pub tier_floor: u32,
87 pub max_merged_docs: u32,
89
90 pub floor_segment_docs: u32,
94 pub oversized_threshold: f64,
97 pub min_growth_ratio: f64,
101 pub budget_trigger: bool,
105 pub scored_selection: bool,
108}
109
110impl Default for TieredMergePolicy {
111 fn default() -> Self {
112 Self {
113 segments_per_tier: 10,
114 max_merge_at_once: 10,
115 tier_factor: 10.0,
116 tier_floor: 1000,
117 max_merged_docs: 5_000_000,
118 floor_segment_docs: 1000,
119 oversized_threshold: 0.5,
120 min_growth_ratio: 0.0,
121 budget_trigger: false,
122 scored_selection: false,
123 }
124 }
125}
126
127impl TieredMergePolicy {
128 pub fn new() -> Self {
130 Self::default()
131 }
132
133 pub fn aggressive() -> Self {
139 Self {
140 segments_per_tier: 3,
141 max_merge_at_once: 10,
142 tier_factor: 10.0,
143 tier_floor: 500,
144 max_merged_docs: 10_000_000,
145 ..Default::default()
146 }
147 }
148
149 pub fn large_scale() -> Self {
157 Self {
158 segments_per_tier: 10,
159 max_merge_at_once: 10,
160 tier_factor: 10.0,
161 tier_floor: 50_000,
162 max_merged_docs: 20_000_000,
163 floor_segment_docs: 50_000,
164 oversized_threshold: 0.5,
165 min_growth_ratio: 0.5,
166 budget_trigger: true,
167 scored_selection: true,
168 }
169 }
170
171 pub fn bulk_indexing() -> Self {
176 Self {
177 segments_per_tier: 20,
178 max_merge_at_once: 20,
179 tier_factor: 10.0,
180 tier_floor: 100_000,
181 max_merged_docs: 50_000_000,
182 floor_segment_docs: 100_000,
183 oversized_threshold: 0.5,
184 min_growth_ratio: 0.75,
185 budget_trigger: true,
186 scored_selection: true,
187 }
188 }
189}
190
191impl TieredMergePolicy {
192 fn compute_ideal_segment_count(&self, total_docs: u64) -> usize {
196 if total_docs == 0 {
197 return 0;
198 }
199 let floor = self.floor_segment_docs.max(1) as f64;
200 let num_tiers = ((total_docs as f64 / floor).max(1.0))
202 .log(self.tier_factor)
203 .ceil() as usize;
204 let num_tiers = num_tiers.max(1);
205 num_tiers * self.segments_per_tier
206 }
207
208 fn score_candidate(&self, group: &[usize], sorted: &[&SegmentInfo]) -> f64 {
213 let floor = self.floor_segment_docs.max(1) as f64;
214 let mut total_floored = 0.0f64;
215 let mut largest_floored = 0.0f64;
216 for &idx in group {
217 let floored = (sorted[idx].num_docs as f64).max(floor);
218 total_floored += floored;
219 if floored > largest_floored {
220 largest_floored = floored;
221 }
222 }
223 if total_floored == 0.0 {
224 return f64::MAX;
225 }
226 let skew = largest_floored / total_floored;
227 skew * total_floored.powf(0.05)
228 }
229
230 fn passes_min_growth(&self, group: &[usize], sorted: &[&SegmentInfo]) -> bool {
233 if self.min_growth_ratio <= 0.0 || group.len() < 2 {
234 return true;
235 }
236 let largest = group
237 .iter()
238 .map(|&i| sorted[i].num_docs as u64)
239 .max()
240 .unwrap_or(0);
241 let total: u64 = group.iter().map(|&i| sorted[i].num_docs as u64).sum();
242 total as f64 >= (1.0 + self.min_growth_ratio) * largest as f64
243 }
244
245 fn find_merges_greedy(&self, sorted: &[&SegmentInfo]) -> Vec<MergeCandidate> {
247 let mut candidates = Vec::new();
248 let mut used = vec![false; sorted.len()];
249 let max_ratio = self.tier_factor as u64;
250
251 let mut start = 0;
252 loop {
253 while start < sorted.len() && used[start] {
254 start += 1;
255 }
256 if start >= sorted.len() {
257 break;
258 }
259
260 let mut group = vec![start];
261 let mut total_docs: u64 = sorted[start].num_docs as u64;
262
263 for j in (start + 1)..sorted.len() {
264 if used[j] {
265 continue;
266 }
267 if group.len() >= self.max_merge_at_once {
268 break;
269 }
270 let next_docs = sorted[j].num_docs as u64;
271 if total_docs + next_docs > self.max_merged_docs as u64 {
272 break;
273 }
274 if next_docs > total_docs.max(1) * max_ratio {
275 break;
276 }
277 group.push(j);
278 total_docs += next_docs;
279 }
280
281 if group.len() >= self.segments_per_tier
282 && group.len() >= 2
283 && self.passes_min_growth(&group, sorted)
284 {
285 for &i in &group {
286 used[i] = true;
287 }
288 candidates.push(MergeCandidate {
289 segment_ids: group.iter().map(|&i| sorted[i].id.clone()).collect(),
290 });
291 }
292
293 start += 1;
294 }
295
296 candidates
297 }
298
299 fn find_merges_scored(&self, sorted: &[&SegmentInfo]) -> Vec<MergeCandidate> {
302 let max_ratio = self.tier_factor as u64;
303
304 let mut scored_groups: Vec<(f64, Vec<usize>)> = Vec::new();
306
307 for start in 0..sorted.len() {
308 let mut group = vec![start];
309 let mut total_docs: u64 = sorted[start].num_docs as u64;
310
311 for j in (start + 1)..sorted.len() {
312 if group.len() >= self.max_merge_at_once {
313 break;
314 }
315 let next_docs = sorted[j].num_docs as u64;
316 if total_docs + next_docs > self.max_merged_docs as u64 {
317 break;
318 }
319 if next_docs > total_docs.max(1) * max_ratio {
320 break;
321 }
322 group.push(j);
323 total_docs += next_docs;
324
325 if group.len() >= self.segments_per_tier
327 && group.len() >= 2
328 && self.passes_min_growth(&group, sorted)
329 {
330 let score = self.score_candidate(&group, sorted);
331 scored_groups.push((score, group.clone()));
332 }
333 }
334 }
335
336 scored_groups.sort_by(|a, b| a.0.total_cmp(&b.0));
338
339 let mut used = vec![false; sorted.len()];
341 let mut candidates = Vec::new();
342
343 for (_score, group) in scored_groups {
344 if group.iter().any(|&i| used[i]) {
345 continue;
346 }
347 for &i in &group {
348 used[i] = true;
349 }
350 candidates.push(MergeCandidate {
351 segment_ids: group.iter().map(|&i| sorted[i].id.clone()).collect(),
352 });
353 }
354
355 candidates
356 }
357}
358
359impl MergePolicy for TieredMergePolicy {
360 fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
361 if segments.len() < 2 {
362 return Vec::new();
363 }
364
365 let oversized_limit = (self.max_merged_docs as f64 * self.oversized_threshold) as u64;
367 let eligible: Vec<&SegmentInfo> = segments
368 .iter()
369 .filter(|s| (s.num_docs as u64) <= oversized_limit || oversized_limit == 0)
370 .collect();
371
372 if eligible.len() < 2 {
373 return Vec::new();
374 }
375
376 if self.budget_trigger {
378 let total_docs: u64 = segments.iter().map(|s| s.num_docs as u64).sum();
379 let ideal = self.compute_ideal_segment_count(total_docs);
380 if eligible.len() <= ideal {
381 return Vec::new();
382 }
383 }
384
385 let mut sorted = eligible;
387 sorted.sort_by_key(|s| s.num_docs);
388
389 if self.scored_selection {
391 self.find_merges_scored(&sorted)
392 } else {
393 self.find_merges_greedy(&sorted)
394 }
395 }
396
397 fn clone_box(&self) -> Box<dyn MergePolicy> {
398 Box::new(self.clone())
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405
406 fn compute_tier(policy: &TieredMergePolicy, num_docs: u32) -> usize {
408 if num_docs <= policy.tier_floor {
409 return 0;
410 }
411 let ratio = num_docs as f64 / policy.tier_floor as f64;
412 (ratio.log(policy.tier_factor).floor() as usize) + 1
413 }
414
415 #[test]
416 fn test_tiered_policy_compute_tier() {
417 let policy = TieredMergePolicy::default();
418
419 assert_eq!(compute_tier(&policy, 500), 0);
421 assert_eq!(compute_tier(&policy, 1000), 0);
422
423 assert_eq!(compute_tier(&policy, 1001), 1);
425 assert_eq!(compute_tier(&policy, 5000), 1);
426 assert_eq!(compute_tier(&policy, 9999), 1);
427
428 assert_eq!(compute_tier(&policy, 10000), 2);
430 assert_eq!(compute_tier(&policy, 50000), 2);
431
432 assert_eq!(compute_tier(&policy, 100000), 3);
434 }
435
436 #[test]
437 fn test_tiered_policy_no_merge_few_segments() {
438 let policy = TieredMergePolicy::default();
439
440 let segments = vec![
441 SegmentInfo {
442 id: "a".into(),
443 num_docs: 100,
444 },
445 SegmentInfo {
446 id: "b".into(),
447 num_docs: 200,
448 },
449 ];
450
451 assert!(policy.find_merges(&segments).is_empty());
452 }
453
454 #[test]
455 fn test_tiered_policy_merge_same_size() {
456 let policy = TieredMergePolicy {
457 segments_per_tier: 3,
458 ..Default::default()
459 };
460
461 let segments: Vec<_> = (0..5)
463 .map(|i| SegmentInfo {
464 id: format!("seg_{}", i),
465 num_docs: 100 + i * 10,
466 })
467 .collect();
468
469 let candidates = policy.find_merges(&segments);
470 assert_eq!(candidates.len(), 1);
471 assert_eq!(candidates[0].segment_ids.len(), 5);
472 }
473
474 #[test]
475 fn test_tiered_policy_cross_tier_promotion() {
476 let policy = TieredMergePolicy {
477 segments_per_tier: 3,
478 tier_factor: 10.0,
479 tier_floor: 1000,
480 max_merge_at_once: 20,
481 max_merged_docs: 5_000_000,
482 ..Default::default()
483 };
484
485 let mut segments: Vec<_> = (0..4)
488 .map(|i| SegmentInfo {
489 id: format!("small_{}", i),
490 num_docs: 100 + i * 10,
491 })
492 .collect();
493 for i in 0..3 {
494 segments.push(SegmentInfo {
495 id: format!("medium_{}", i),
496 num_docs: 2000 + i * 500,
497 });
498 }
499
500 let candidates = policy.find_merges(&segments);
501 assert_eq!(
502 candidates.len(),
503 1,
504 "should merge all into one cross-tier group"
505 );
506 assert_eq!(
507 candidates[0].segment_ids.len(),
508 7,
509 "all 7 segments should be in the merge"
510 );
511 }
512
513 #[test]
514 fn test_tiered_policy_ratio_guard_separates_groups() {
515 let policy = TieredMergePolicy {
516 segments_per_tier: 3,
517 tier_factor: 10.0,
518 tier_floor: 100,
519 max_merge_at_once: 20,
520 max_merged_docs: 5_000_000,
521 ..Default::default()
522 };
523
524 let mut segments: Vec<_> = (0..4)
530 .map(|i| SegmentInfo {
531 id: format!("tiny_{}", i),
532 num_docs: 10,
533 })
534 .collect();
535 for i in 0..4 {
536 segments.push(SegmentInfo {
537 id: format!("large_{}", i),
538 num_docs: 100_000 + i * 100,
539 });
540 }
541
542 let candidates = policy.find_merges(&segments);
543 assert_eq!(candidates.len(), 2, "should produce two separate groups");
544
545 assert_eq!(candidates[0].segment_ids.len(), 4);
547 assert!(candidates[0].segment_ids[0].starts_with("tiny_"));
548
549 assert_eq!(candidates[1].segment_ids.len(), 4);
551 assert!(candidates[1].segment_ids[0].starts_with("large_"));
552 }
553
554 #[test]
555 fn test_tiered_policy_small_segments_skip_to_large_group() {
556 let policy = TieredMergePolicy {
557 segments_per_tier: 3,
558 tier_factor: 10.0,
559 tier_floor: 1000,
560 max_merge_at_once: 10,
561 max_merged_docs: 5_000_000,
562 ..Default::default()
563 };
564
565 let mut segments = vec![
568 SegmentInfo {
569 id: "tiny_0".into(),
570 num_docs: 10,
571 },
572 SegmentInfo {
573 id: "tiny_1".into(),
574 num_docs: 20,
575 },
576 ];
577 for i in 0..5 {
578 segments.push(SegmentInfo {
579 id: format!("medium_{}", i),
580 num_docs: 5000 + i * 100,
581 });
582 }
583
584 let candidates = policy.find_merges(&segments);
585 assert!(
586 !candidates.is_empty(),
587 "should find a merge even though tiny segments can't form a group"
588 );
589 let total_segs: usize = candidates.iter().map(|c| c.segment_ids.len()).sum();
591 assert!(
592 total_segs >= 5,
593 "should merge at least the 5 medium segments"
594 );
595 }
596
597 #[test]
598 fn test_tiered_policy_respects_max_merged_docs() {
599 let policy = TieredMergePolicy {
600 segments_per_tier: 3,
601 max_merge_at_once: 100,
602 tier_factor: 10.0,
603 tier_floor: 1000,
604 max_merged_docs: 500,
605 ..Default::default()
606 };
607
608 let segments: Vec<_> = (0..10)
610 .map(|i| SegmentInfo {
611 id: format!("seg_{}", i),
612 num_docs: 100,
613 })
614 .collect();
615
616 let candidates = policy.find_merges(&segments);
617 for c in &candidates {
618 let total: u64 = c
619 .segment_ids
620 .iter()
621 .map(|id| segments.iter().find(|s| s.id == *id).unwrap().num_docs as u64)
622 .sum();
623 assert!(
624 total <= 500,
625 "merge total {} exceeds max_merged_docs 500",
626 total
627 );
628 }
629 }
630
631 #[test]
632 fn test_tiered_policy_large_segment_not_remerged_with_small() {
633 let policy = TieredMergePolicy::default(); let mut segments = vec![SegmentInfo {
641 id: "large_merged".into(),
642 num_docs: 50_000,
643 }];
644 for i in 0..5 {
645 segments.push(SegmentInfo {
646 id: format!("new_{}", i),
647 num_docs: 500,
648 });
649 }
650
651 let candidates = policy.find_merges(&segments);
654 assert!(
655 candidates.is_empty(),
656 "should not re-merge large segment with 5 small ones: {:?}",
657 candidates
658 );
659
660 for i in 5..10 {
663 segments.push(SegmentInfo {
664 id: format!("new_{}", i),
665 num_docs: 500,
666 });
667 }
668
669 let candidates = policy.find_merges(&segments);
670 assert_eq!(candidates.len(), 1, "should merge the 10 small segments");
671 assert!(
672 !candidates[0].segment_ids.contains(&"large_merged".into()),
673 "large segment must NOT be in the merge group"
674 );
675 assert_eq!(
676 candidates[0].segment_ids.len(),
677 10,
678 "all 10 small segments should be merged"
679 );
680 }
681
682 #[test]
683 fn test_no_merge_policy() {
684 let policy = NoMergePolicy;
685
686 let segments = vec![
687 SegmentInfo {
688 id: "a".into(),
689 num_docs: 100,
690 },
691 SegmentInfo {
692 id: "b".into(),
693 num_docs: 200,
694 },
695 ];
696
697 assert!(policy.find_merges(&segments).is_empty());
698 }
699
700 #[test]
701 fn test_oversized_exclusion() {
702 let policy = TieredMergePolicy {
704 segments_per_tier: 3,
705 max_merged_docs: 1_000_000,
706 oversized_threshold: 0.5,
707 ..Default::default()
708 };
709
710 let mut segments: Vec<_> = (0..4)
712 .map(|i| SegmentInfo {
713 id: format!("small_{}", i),
714 num_docs: 1000,
715 })
716 .collect();
717 segments.push(SegmentInfo {
718 id: "oversized_0".into(),
719 num_docs: 600_000,
720 });
721 segments.push(SegmentInfo {
722 id: "oversized_1".into(),
723 num_docs: 700_000,
724 });
725
726 let candidates = policy.find_merges(&segments);
727 for c in &candidates {
729 assert!(
730 !c.segment_ids.contains(&"oversized_0".into()),
731 "oversized_0 should be excluded"
732 );
733 assert!(
734 !c.segment_ids.contains(&"oversized_1".into()),
735 "oversized_1 should be excluded"
736 );
737 }
738 }
739
740 #[test]
741 fn test_budget_trigger_prevents_unnecessary_merge() {
742 let policy = TieredMergePolicy {
743 segments_per_tier: 10,
744 tier_factor: 10.0,
745 tier_floor: 1000,
746 floor_segment_docs: 1000,
747 budget_trigger: true,
748 ..Default::default()
749 };
750
751 let segments: Vec<_> = (0..5)
755 .map(|i| SegmentInfo {
756 id: format!("seg_{}", i),
757 num_docs: 10_000,
758 })
759 .collect();
760
761 let candidates = policy.find_merges(&segments);
762 assert!(
763 candidates.is_empty(),
764 "should not merge when under budget: {:?}",
765 candidates
766 );
767 }
768
769 #[test]
770 fn test_budget_trigger_allows_merge_when_over_budget() {
771 let policy = TieredMergePolicy {
772 segments_per_tier: 3,
773 tier_factor: 10.0,
774 tier_floor: 1000,
775 floor_segment_docs: 1000,
776 budget_trigger: true,
777 ..Default::default()
778 };
779
780 let segments: Vec<_> = (0..10)
784 .map(|i| SegmentInfo {
785 id: format!("seg_{}", i),
786 num_docs: 1000,
787 })
788 .collect();
789
790 let candidates = policy.find_merges(&segments);
791 assert!(!candidates.is_empty(), "should merge when over budget");
792 }
793
794 #[test]
795 fn test_min_growth_ratio_rejects_wasteful_merge() {
796 let policy = TieredMergePolicy {
797 segments_per_tier: 3,
798 min_growth_ratio: 0.5,
799 max_merge_at_once: 10,
800 ..Default::default()
801 };
802
803 let mut segments = vec![SegmentInfo {
807 id: "big".into(),
808 num_docs: 100_000,
809 }];
810 for i in 0..3 {
811 segments.push(SegmentInfo {
812 id: format!("tiny_{}", i),
813 num_docs: 10,
814 });
815 }
816
817 let candidates = policy.find_merges(&segments);
818 for c in &candidates {
821 if c.segment_ids.contains(&"big".into()) {
822 let total: u64 = c
823 .segment_ids
824 .iter()
825 .map(|id| segments.iter().find(|s| s.id == *id).unwrap().num_docs as u64)
826 .sum();
827 let largest: u64 = c
828 .segment_ids
829 .iter()
830 .map(|id| segments.iter().find(|s| s.id == *id).unwrap().num_docs as u64)
831 .max()
832 .unwrap();
833 assert!(
834 total as f64 >= 1.5 * largest as f64,
835 "merge with 'big' segment violates min_growth_ratio: total={}, largest={}",
836 total,
837 largest
838 );
839 }
840 }
841 }
842
843 #[test]
844 fn test_scored_selection_prefers_balanced_merge() {
845 let policy = TieredMergePolicy {
846 segments_per_tier: 3,
847 max_merge_at_once: 5,
848 scored_selection: true,
849 ..Default::default()
850 };
851
852 let segments = vec![
855 SegmentInfo {
856 id: "unbal_0".into(),
857 num_docs: 100,
858 },
859 SegmentInfo {
860 id: "unbal_1".into(),
861 num_docs: 100,
862 },
863 SegmentInfo {
864 id: "bal_0".into(),
865 num_docs: 1000,
866 },
867 SegmentInfo {
868 id: "bal_1".into(),
869 num_docs: 1100,
870 },
871 SegmentInfo {
872 id: "bal_2".into(),
873 num_docs: 1200,
874 },
875 SegmentInfo {
876 id: "unbal_2".into(),
877 num_docs: 5000,
878 },
879 ];
880
881 let candidates = policy.find_merges(&segments);
882 assert!(!candidates.is_empty(), "should find at least one merge");
883
884 let first = &candidates[0];
886 let has_balanced = first.segment_ids.iter().any(|id| id.starts_with("bal_"));
887 assert!(
888 has_balanced,
889 "scored selection should prefer balanced group, got: {:?}",
890 first.segment_ids
891 );
892 }
893
894 #[test]
895 fn test_large_scale_preset_values() {
896 let p = TieredMergePolicy::large_scale();
897 assert_eq!(p.tier_floor, 50_000);
898 assert_eq!(p.max_merged_docs, 20_000_000);
899 assert_eq!(p.floor_segment_docs, 50_000);
900 assert!(p.budget_trigger);
901 assert!(p.scored_selection);
902 assert_eq!(p.segments_per_tier, 10);
903 assert!((p.min_growth_ratio - 0.5).abs() < f64::EPSILON);
904 assert!((p.oversized_threshold - 0.5).abs() < f64::EPSILON);
905 }
906
907 #[test]
908 fn test_bulk_indexing_preset_values() {
909 let p = TieredMergePolicy::bulk_indexing();
910 assert_eq!(p.segments_per_tier, 20);
911 assert_eq!(p.max_merge_at_once, 20);
912 assert_eq!(p.tier_floor, 100_000);
913 assert_eq!(p.max_merged_docs, 50_000_000);
914 assert_eq!(p.floor_segment_docs, 100_000);
915 assert!(p.budget_trigger);
916 assert!(p.scored_selection);
917 assert!((p.min_growth_ratio - 0.75).abs() < f64::EPSILON);
918 }
919}