Skip to main content

hermes_core/merge/
mod.rs

1//! Merge policies for background segment merging
2//!
3//! Merge policies determine when and which segments should be merged together.
4//! The default is a tiered/log-layered policy that groups segments by size tiers.
5
6use std::fmt::Debug;
7
8#[cfg(feature = "native")]
9mod segment_manager;
10#[cfg(feature = "native")]
11pub use segment_manager::SegmentManager;
12
13/// Information about a segment for merge decisions
14#[derive(Debug, Clone)]
15pub struct SegmentInfo {
16    /// Segment ID (hex string)
17    pub id: String,
18    /// Number of documents in the segment
19    pub num_docs: u32,
20}
21
22/// A merge operation specifying which segments to merge
23#[derive(Debug, Clone)]
24pub struct MergeCandidate {
25    /// Segment IDs to merge together
26    pub segment_ids: Vec<String>,
27}
28
29/// Trait for merge policies
30///
31/// Implementations decide when segments should be merged and which ones.
32pub trait MergePolicy: Send + Sync + Debug {
33    /// Given the current segments, return all eligible merge candidates.
34    /// Multiple candidates can run concurrently as long as they don't share segments.
35    fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate>;
36
37    /// Clone the policy into a boxed trait object
38    fn clone_box(&self) -> Box<dyn MergePolicy>;
39}
40
41impl Clone for Box<dyn MergePolicy> {
42    fn clone(&self) -> Self {
43        self.clone_box()
44    }
45}
46
47/// No-op merge policy - never merges automatically
48#[derive(Debug, Clone, Default)]
49pub struct NoMergePolicy;
50
51impl MergePolicy for NoMergePolicy {
52    fn find_merges(&self, _segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
53        Vec::new()
54    }
55
56    fn clone_box(&self) -> Box<dyn MergePolicy> {
57        Box::new(self.clone())
58    }
59}
60
61/// Tiered/Log-layered merge policy
62///
63/// Groups segments into tiers based on document count. Segments in the same tier
64/// are merged when there are enough of them. This creates a logarithmic structure
65/// where larger segments are merged less frequently.
66///
67/// Tiers are defined by powers of `tier_factor`:
68/// - Tier 0: 0 to tier_floor docs
69/// - Tier 1: tier_floor to tier_floor * tier_factor docs
70/// - Tier 2: tier_floor * tier_factor to tier_floor * tier_factor^2 docs
71/// - etc.
72#[derive(Debug, Clone)]
73pub struct TieredMergePolicy {
74    /// Minimum number of segments in a tier before merging (default: 10)
75    pub segments_per_tier: usize,
76    /// Maximum number of segments to merge at once (default: 10).
77    /// Should be close to segments_per_tier to prevent giant merges.
78    pub max_merge_at_once: usize,
79    /// Factor between tier sizes (default: 10.0)
80    pub tier_factor: f64,
81    /// Minimum segment size (docs) to consider for tiering (default: 1000)
82    pub tier_floor: u32,
83    /// Maximum total docs to merge at once (default: 5_000_000)
84    pub max_merged_docs: u32,
85}
86
87impl Default for TieredMergePolicy {
88    fn default() -> Self {
89        Self {
90            segments_per_tier: 10,
91            max_merge_at_once: 10,
92            tier_factor: 10.0,
93            tier_floor: 1000,
94            max_merged_docs: 5_000_000,
95        }
96    }
97}
98
99impl TieredMergePolicy {
100    /// Create a new tiered merge policy with default settings
101    pub fn new() -> Self {
102        Self::default()
103    }
104
105    /// Create an aggressive merge policy that merges more frequently
106    ///
107    /// - Merges when 3 segments in same tier (vs 10 default)
108    /// - Lower tier floor (500 docs vs 1000)
109    /// - Good for reducing segment count quickly
110    pub fn aggressive() -> Self {
111        Self {
112            segments_per_tier: 3,
113            max_merge_at_once: 10,
114            tier_factor: 10.0,
115            tier_floor: 500,
116            max_merged_docs: 10_000_000,
117        }
118    }
119}
120
121impl MergePolicy for TieredMergePolicy {
122    fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
123        if segments.len() < 2 {
124            return Vec::new();
125        }
126
127        // Sort by size ascending — greedily merge from smallest.
128        // This replaces per-tier grouping, allowing cross-tier promotion:
129        // many small segments can jump several tiers in one merge.
130        let mut sorted: Vec<&SegmentInfo> = segments.iter().collect();
131        sorted.sort_by_key(|s| s.num_docs);
132
133        let mut candidates = Vec::new();
134        let mut used = vec![false; sorted.len()];
135        let max_ratio = self.tier_factor as u64;
136
137        let mut start = 0;
138        loop {
139            // Find next unused segment
140            while start < sorted.len() && used[start] {
141                start += 1;
142            }
143            if start >= sorted.len() {
144                break;
145            }
146
147            // Build a merge group starting from the smallest unused segment.
148            // Accumulate segments as long as:
149            //   - group size < max_merge_at_once
150            //   - total docs < max_merged_docs
151            //   - the next segment isn't disproportionately larger than the group
152            //     (ratio guard prevents rewriting a huge segment to absorb tiny ones)
153            let mut group = vec![start];
154            let mut total_docs: u64 = sorted[start].num_docs as u64;
155
156            for j in (start + 1)..sorted.len() {
157                if used[j] {
158                    continue;
159                }
160                if group.len() >= self.max_merge_at_once {
161                    break;
162                }
163                let next_docs = sorted[j].num_docs as u64;
164                if total_docs + next_docs > self.max_merged_docs as u64 {
165                    break;
166                }
167                // Ratio guard: don't include a segment that dwarfs the accumulated group.
168                // Uses the actual accumulated total — NOT inflated by tier_floor — so
169                // a group of tiny segments won't attract a previously-merged large segment.
170                // max(1) prevents a zero-doc starting segment from blocking all accumulation.
171                if next_docs > total_docs.max(1) * max_ratio {
172                    break;
173                }
174                group.push(j);
175                total_docs += next_docs;
176            }
177
178            if group.len() >= self.segments_per_tier && group.len() >= 2 {
179                for &i in &group {
180                    used[i] = true;
181                }
182                candidates.push(MergeCandidate {
183                    segment_ids: group.iter().map(|&i| sorted[i].id.clone()).collect(),
184                });
185            }
186
187            // Always advance past start (whether or not we formed a group)
188            // so we try starting from the next unused segment.
189            start += 1;
190        }
191
192        candidates
193    }
194
195    fn clone_box(&self) -> Box<dyn MergePolicy> {
196        Box::new(self.clone())
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203
204    /// Compute tier for a segment (used only in tests to verify tier math)
205    fn compute_tier(policy: &TieredMergePolicy, num_docs: u32) -> usize {
206        if num_docs <= policy.tier_floor {
207            return 0;
208        }
209        let ratio = num_docs as f64 / policy.tier_floor as f64;
210        (ratio.log(policy.tier_factor).floor() as usize) + 1
211    }
212
213    #[test]
214    fn test_tiered_policy_compute_tier() {
215        let policy = TieredMergePolicy::default();
216
217        // Tier 0: <= 1000 docs (tier_floor)
218        assert_eq!(compute_tier(&policy, 500), 0);
219        assert_eq!(compute_tier(&policy, 1000), 0);
220
221        // Tier 1: 1001 - 9999 docs (ratio < 10)
222        assert_eq!(compute_tier(&policy, 1001), 1);
223        assert_eq!(compute_tier(&policy, 5000), 1);
224        assert_eq!(compute_tier(&policy, 9999), 1);
225
226        // Tier 2: 10000 - 99999 docs (ratio 10-100)
227        assert_eq!(compute_tier(&policy, 10000), 2);
228        assert_eq!(compute_tier(&policy, 50000), 2);
229
230        // Tier 3: 100000+ docs
231        assert_eq!(compute_tier(&policy, 100000), 3);
232    }
233
234    #[test]
235    fn test_tiered_policy_no_merge_few_segments() {
236        let policy = TieredMergePolicy::default();
237
238        let segments = vec![
239            SegmentInfo {
240                id: "a".into(),
241                num_docs: 100,
242            },
243            SegmentInfo {
244                id: "b".into(),
245                num_docs: 200,
246            },
247        ];
248
249        assert!(policy.find_merges(&segments).is_empty());
250    }
251
252    #[test]
253    fn test_tiered_policy_merge_same_size() {
254        let policy = TieredMergePolicy {
255            segments_per_tier: 3,
256            ..Default::default()
257        };
258
259        // 5 small segments — all similar size, should merge into one group
260        let segments: Vec<_> = (0..5)
261            .map(|i| SegmentInfo {
262                id: format!("seg_{}", i),
263                num_docs: 100 + i * 10,
264            })
265            .collect();
266
267        let candidates = policy.find_merges(&segments);
268        assert_eq!(candidates.len(), 1);
269        assert_eq!(candidates[0].segment_ids.len(), 5);
270    }
271
272    #[test]
273    fn test_tiered_policy_cross_tier_promotion() {
274        let policy = TieredMergePolicy {
275            segments_per_tier: 3,
276            tier_factor: 10.0,
277            tier_floor: 1000,
278            max_merge_at_once: 20,
279            max_merged_docs: 5_000_000,
280        };
281
282        // 4 small (tier 0) + 3 medium (tier 1) — should merge ALL into one group
283        // because the small segments accumulate and the medium ones pass the ratio check
284        let mut segments: Vec<_> = (0..4)
285            .map(|i| SegmentInfo {
286                id: format!("small_{}", i),
287                num_docs: 100 + i * 10,
288            })
289            .collect();
290        for i in 0..3 {
291            segments.push(SegmentInfo {
292                id: format!("medium_{}", i),
293                num_docs: 2000 + i * 500,
294            });
295        }
296
297        let candidates = policy.find_merges(&segments);
298        assert_eq!(
299            candidates.len(),
300            1,
301            "should merge all into one cross-tier group"
302        );
303        assert_eq!(
304            candidates[0].segment_ids.len(),
305            7,
306            "all 7 segments should be in the merge"
307        );
308    }
309
310    #[test]
311    fn test_tiered_policy_ratio_guard_separates_groups() {
312        let policy = TieredMergePolicy {
313            segments_per_tier: 3,
314            tier_factor: 10.0,
315            tier_floor: 100,
316            max_merge_at_once: 20,
317            max_merged_docs: 5_000_000,
318        };
319
320        // 4 tiny (10 docs) + 4 large (100_000 docs)
321        // Ratio guard should prevent merging tiny with large:
322        // group total after 4 tiny = 40, effective = max(40, 100) = 100
323        // next segment is 100_000 > 100 * 10 = 1000 → blocked
324        // So tiny segments (4) form one group, large segments (4) form another.
325        let mut segments: Vec<_> = (0..4)
326            .map(|i| SegmentInfo {
327                id: format!("tiny_{}", i),
328                num_docs: 10,
329            })
330            .collect();
331        for i in 0..4 {
332            segments.push(SegmentInfo {
333                id: format!("large_{}", i),
334                num_docs: 100_000 + i * 100,
335            });
336        }
337
338        let candidates = policy.find_merges(&segments);
339        assert_eq!(candidates.len(), 2, "should produce two separate groups");
340
341        // First group: the 4 tiny segments
342        assert_eq!(candidates[0].segment_ids.len(), 4);
343        assert!(candidates[0].segment_ids[0].starts_with("tiny_"));
344
345        // Second group: the 4 large segments
346        assert_eq!(candidates[1].segment_ids.len(), 4);
347        assert!(candidates[1].segment_ids[0].starts_with("large_"));
348    }
349
350    #[test]
351    fn test_tiered_policy_small_segments_skip_to_large_group() {
352        let policy = TieredMergePolicy {
353            segments_per_tier: 3,
354            tier_factor: 10.0,
355            tier_floor: 1000,
356            max_merge_at_once: 10,
357            max_merged_docs: 5_000_000,
358        };
359
360        // 2 tiny segments (can't form a group) + 5 medium segments (can)
361        // The tiny segments should be skipped, and the medium ones should merge.
362        let mut segments = vec![
363            SegmentInfo {
364                id: "tiny_0".into(),
365                num_docs: 10,
366            },
367            SegmentInfo {
368                id: "tiny_1".into(),
369                num_docs: 20,
370            },
371        ];
372        for i in 0..5 {
373            segments.push(SegmentInfo {
374                id: format!("medium_{}", i),
375                num_docs: 5000 + i * 100,
376            });
377        }
378
379        let candidates = policy.find_merges(&segments);
380        assert!(
381            !candidates.is_empty(),
382            "should find a merge even though tiny segments can't form a group"
383        );
384        // The medium segments should be merged (possibly with the tiny ones bridging in)
385        let total_segs: usize = candidates.iter().map(|c| c.segment_ids.len()).sum();
386        assert!(
387            total_segs >= 5,
388            "should merge at least the 5 medium segments"
389        );
390    }
391
392    #[test]
393    fn test_tiered_policy_respects_max_merged_docs() {
394        let policy = TieredMergePolicy {
395            segments_per_tier: 3,
396            max_merge_at_once: 100,
397            tier_factor: 10.0,
398            tier_floor: 1000,
399            max_merged_docs: 500,
400        };
401
402        // 10 segments of 100 docs each — total would be 1000 but max_merged_docs=500
403        let segments: Vec<_> = (0..10)
404            .map(|i| SegmentInfo {
405                id: format!("seg_{}", i),
406                num_docs: 100,
407            })
408            .collect();
409
410        let candidates = policy.find_merges(&segments);
411        for c in &candidates {
412            let total: u64 = c
413                .segment_ids
414                .iter()
415                .map(|id| segments.iter().find(|s| s.id == *id).unwrap().num_docs as u64)
416                .sum();
417            assert!(
418                total <= 500,
419                "merge total {} exceeds max_merged_docs 500",
420                total
421            );
422        }
423    }
424
425    #[test]
426    fn test_tiered_policy_large_segment_not_remerged_with_small() {
427        // Simulates the user scenario: after merging, we have one large segment
428        // and a few new small segments from recent commits. The large segment
429        // should NOT be re-merged — only the small ones should merge together
430        // once there are enough of them.
431        let policy = TieredMergePolicy::default(); // segments_per_tier=10
432
433        // 1 large segment (from previous merge) + 5 new small segments
434        let mut segments = vec![SegmentInfo {
435            id: "large_merged".into(),
436            num_docs: 50_000,
437        }];
438        for i in 0..5 {
439            segments.push(SegmentInfo {
440                id: format!("new_{}", i),
441                num_docs: 500,
442            });
443        }
444
445        // Should NOT merge: only 5 small segments (< segments_per_tier=10),
446        // and the large segment is too big to join their group.
447        let candidates = policy.find_merges(&segments);
448        assert!(
449            candidates.is_empty(),
450            "should not re-merge large segment with 5 small ones: {:?}",
451            candidates
452        );
453
454        // Now add 5 more small segments (total 10) — those should merge together,
455        // but the large segment should still be excluded.
456        for i in 5..10 {
457            segments.push(SegmentInfo {
458                id: format!("new_{}", i),
459                num_docs: 500,
460            });
461        }
462
463        let candidates = policy.find_merges(&segments);
464        assert_eq!(candidates.len(), 1, "should merge the 10 small segments");
465        assert!(
466            !candidates[0].segment_ids.contains(&"large_merged".into()),
467            "large segment must NOT be in the merge group"
468        );
469        assert_eq!(
470            candidates[0].segment_ids.len(),
471            10,
472            "all 10 small segments should be merged"
473        );
474    }
475
476    #[test]
477    fn test_no_merge_policy() {
478        let policy = NoMergePolicy;
479
480        let segments = vec![
481            SegmentInfo {
482                id: "a".into(),
483                num_docs: 100,
484            },
485            SegmentInfo {
486                id: "b".into(),
487                num_docs: 200,
488            },
489        ];
490
491        assert!(policy.find_merges(&segments).is_empty());
492    }
493}