Skip to main content

hermes_core/merge/
mod.rs

1//! Merge policies for background segment merging
2//!
3//! Merge policies determine when and which segments should be merged together.
4//! The default is a tiered/log-layered policy that groups segments by size tiers.
5
6use std::fmt::Debug;
7
8#[cfg(feature = "native")]
9mod scheduler;
10#[cfg(feature = "native")]
11pub use scheduler::SegmentManager;
12
13/// Information about a segment for merge decisions
14#[derive(Debug, Clone)]
15pub struct SegmentInfo {
16    /// Segment ID (hex string)
17    pub id: String,
18    /// Number of documents in the segment
19    pub num_docs: u32,
20    /// Approximate size in bytes (if known)
21    pub size_bytes: Option<u64>,
22}
23
24/// A merge operation specifying which segments to merge
25#[derive(Debug, Clone)]
26pub struct MergeCandidate {
27    /// Segment IDs to merge together
28    pub segment_ids: Vec<String>,
29}
30
31/// Trait for merge policies
32///
33/// Implementations decide when segments should be merged and which ones.
34pub trait MergePolicy: Send + Sync + Debug {
35    /// Given the current segments, return merge candidates (if any)
36    ///
37    /// Each `MergeCandidate` represents a group of segments that should be merged.
38    /// Multiple candidates can be returned for parallel merging.
39    fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate>;
40
41    /// Clone the policy into a boxed trait object
42    fn clone_box(&self) -> Box<dyn MergePolicy>;
43}
44
45impl Clone for Box<dyn MergePolicy> {
46    fn clone(&self) -> Self {
47        self.clone_box()
48    }
49}
50
51/// No-op merge policy - never merges automatically
52#[derive(Debug, Clone, Default)]
53pub struct NoMergePolicy;
54
55impl MergePolicy for NoMergePolicy {
56    fn find_merges(&self, _segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
57        Vec::new()
58    }
59
60    fn clone_box(&self) -> Box<dyn MergePolicy> {
61        Box::new(self.clone())
62    }
63}
64
65/// Tiered/Log-layered merge policy
66///
67/// Groups segments into tiers based on document count. Segments in the same tier
68/// are merged when there are enough of them. This creates a logarithmic structure
69/// where larger segments are merged less frequently.
70///
71/// Tiers are defined by powers of `tier_factor`:
72/// - Tier 0: 0 to tier_floor docs
73/// - Tier 1: tier_floor to tier_floor * tier_factor docs
74/// - Tier 2: tier_floor * tier_factor to tier_floor * tier_factor^2 docs
75/// - etc.
76#[derive(Debug, Clone)]
77pub struct TieredMergePolicy {
78    /// Minimum number of segments in a tier before merging (default: 10)
79    pub segments_per_tier: usize,
80    /// Maximum number of segments to merge at once (default: 10)
81    pub max_merge_at_once: usize,
82    /// Factor between tier sizes (default: 10.0)
83    pub tier_factor: f64,
84    /// Minimum segment size (docs) to consider for tiering (default: 1000)
85    pub tier_floor: u32,
86    /// Maximum total docs to merge at once (default: 5_000_000)
87    pub max_merged_docs: u32,
88}
89
90impl Default for TieredMergePolicy {
91    fn default() -> Self {
92        Self {
93            segments_per_tier: 10,
94            max_merge_at_once: 10,
95            tier_factor: 10.0,
96            tier_floor: 1000,
97            max_merged_docs: 5_000_000,
98        }
99    }
100}
101
102impl TieredMergePolicy {
103    /// Create a new tiered merge policy with default settings
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    /// Create an aggressive merge policy that merges more frequently
109    ///
110    /// - Merges when 3 segments in same tier (vs 10 default)
111    /// - Lower tier floor (500 docs vs 1000)
112    /// - Good for reducing segment count quickly
113    pub fn aggressive() -> Self {
114        Self {
115            segments_per_tier: 3,
116            max_merge_at_once: 10,
117            tier_factor: 10.0,
118            tier_floor: 500,
119            max_merged_docs: 10_000_000,
120        }
121    }
122
123    /// Set segments per tier
124    pub fn with_segments_per_tier(mut self, n: usize) -> Self {
125        self.segments_per_tier = n;
126        self
127    }
128
129    /// Set max merge at once
130    pub fn with_max_merge_at_once(mut self, n: usize) -> Self {
131        self.max_merge_at_once = n;
132        self
133    }
134
135    /// Set tier factor
136    pub fn with_tier_factor(mut self, factor: f64) -> Self {
137        self.tier_factor = factor;
138        self
139    }
140
141    /// Set tier floor (minimum docs before tiering starts)
142    pub fn with_tier_floor(mut self, floor: u32) -> Self {
143        self.tier_floor = floor;
144        self
145    }
146
147    /// Compute the tier for a segment based on its doc count
148    fn compute_tier(&self, num_docs: u32) -> usize {
149        if num_docs <= self.tier_floor {
150            return 0;
151        }
152
153        let ratio = num_docs as f64 / self.tier_floor as f64;
154        (ratio.log(self.tier_factor).floor() as usize) + 1
155    }
156}
157
158impl MergePolicy for TieredMergePolicy {
159    fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
160        if segments.len() < 2 {
161            return Vec::new();
162        }
163
164        // Group segments by tier
165        let mut tiers: std::collections::HashMap<usize, Vec<&SegmentInfo>> =
166            std::collections::HashMap::new();
167
168        for seg in segments {
169            let tier = self.compute_tier(seg.num_docs);
170            tiers.entry(tier).or_default().push(seg);
171        }
172
173        let mut candidates = Vec::new();
174
175        // Find tiers with enough segments to merge
176        for (_tier, tier_segments) in tiers {
177            if tier_segments.len() >= self.segments_per_tier {
178                // Sort by doc count (merge smaller ones first)
179                let mut sorted: Vec<_> = tier_segments;
180                sorted.sort_by_key(|s| s.num_docs);
181
182                // Take up to max_merge_at_once segments
183                let to_merge: Vec<_> = sorted.into_iter().take(self.max_merge_at_once).collect();
184
185                // Check total docs limit
186                let total_docs: u32 = to_merge.iter().map(|s| s.num_docs).sum();
187                if total_docs <= self.max_merged_docs && to_merge.len() >= 2 {
188                    candidates.push(MergeCandidate {
189                        segment_ids: to_merge.into_iter().map(|s| s.id.clone()).collect(),
190                    });
191                }
192            }
193        }
194
195        candidates
196    }
197
198    fn clone_box(&self) -> Box<dyn MergePolicy> {
199        Box::new(self.clone())
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[test]
208    fn test_tiered_policy_compute_tier() {
209        let policy = TieredMergePolicy::default();
210
211        // Tier 0: <= 1000 docs (tier_floor)
212        assert_eq!(policy.compute_tier(500), 0);
213        assert_eq!(policy.compute_tier(1000), 0);
214
215        // Tier 1: 1001 - 9999 docs (ratio < 10)
216        assert_eq!(policy.compute_tier(1001), 1);
217        assert_eq!(policy.compute_tier(5000), 1);
218        assert_eq!(policy.compute_tier(9999), 1);
219
220        // Tier 2: 10000 - 99999 docs (ratio 10-100)
221        assert_eq!(policy.compute_tier(10000), 2);
222        assert_eq!(policy.compute_tier(50000), 2);
223
224        // Tier 3: 100000+ docs
225        assert_eq!(policy.compute_tier(100000), 3);
226    }
227
228    #[test]
229    fn test_tiered_policy_no_merge_few_segments() {
230        let policy = TieredMergePolicy::default();
231
232        let segments = vec![
233            SegmentInfo {
234                id: "a".into(),
235                num_docs: 100,
236                size_bytes: None,
237            },
238            SegmentInfo {
239                id: "b".into(),
240                num_docs: 200,
241                size_bytes: None,
242            },
243        ];
244
245        let merges = policy.find_merges(&segments);
246        assert!(merges.is_empty());
247    }
248
249    #[test]
250    fn test_tiered_policy_merge_same_tier() {
251        let policy = TieredMergePolicy {
252            segments_per_tier: 3,
253            ..Default::default()
254        };
255
256        // All in tier 0
257        let segments: Vec<_> = (0..5)
258            .map(|i| SegmentInfo {
259                id: format!("seg_{}", i),
260                num_docs: 100 + i * 10,
261                size_bytes: None,
262            })
263            .collect();
264
265        let merges = policy.find_merges(&segments);
266        assert_eq!(merges.len(), 1);
267        assert!(merges[0].segment_ids.len() >= 3);
268    }
269
270    #[test]
271    fn test_no_merge_policy() {
272        let policy = NoMergePolicy;
273
274        let segments = vec![
275            SegmentInfo {
276                id: "a".into(),
277                num_docs: 100,
278                size_bytes: None,
279            },
280            SegmentInfo {
281                id: "b".into(),
282                num_docs: 200,
283                size_bytes: None,
284            },
285        ];
286
287        let merges = policy.find_merges(&segments);
288        assert!(merges.is_empty());
289    }
290}