hermes_core/merge/
mod.rs

1//! Merge policies for background segment merging
2//!
3//! Merge policies determine when and which segments should be merged together.
4//! The default is a tiered/log-layered policy that groups segments by size tiers.
5
6use std::fmt::Debug;
7
8#[cfg(feature = "native")]
9mod scheduler;
10#[cfg(feature = "native")]
11pub use scheduler::SegmentManager;
12
13/// Information about a segment for merge decisions
14#[derive(Debug, Clone)]
15pub struct SegmentInfo {
16    /// Segment ID (hex string)
17    pub id: String,
18    /// Number of documents in the segment
19    pub num_docs: u32,
20    /// Approximate size in bytes (if known)
21    pub size_bytes: Option<u64>,
22}
23
24/// A merge operation specifying which segments to merge
25#[derive(Debug, Clone)]
26pub struct MergeCandidate {
27    /// Segment IDs to merge together
28    pub segment_ids: Vec<String>,
29}
30
31/// Trait for merge policies
32///
33/// Implementations decide when segments should be merged and which ones.
34pub trait MergePolicy: Send + Sync + Debug {
35    /// Given the current segments, return merge candidates (if any)
36    ///
37    /// Each `MergeCandidate` represents a group of segments that should be merged.
38    /// Multiple candidates can be returned for parallel merging.
39    fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate>;
40
41    /// Clone the policy into a boxed trait object
42    fn clone_box(&self) -> Box<dyn MergePolicy>;
43}
44
45impl Clone for Box<dyn MergePolicy> {
46    fn clone(&self) -> Self {
47        self.clone_box()
48    }
49}
50
51/// No-op merge policy - never merges automatically
52#[derive(Debug, Clone, Default)]
53pub struct NoMergePolicy;
54
55impl MergePolicy for NoMergePolicy {
56    fn find_merges(&self, _segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
57        Vec::new()
58    }
59
60    fn clone_box(&self) -> Box<dyn MergePolicy> {
61        Box::new(self.clone())
62    }
63}
64
65/// Tiered/Log-layered merge policy
66///
67/// Groups segments into tiers based on document count. Segments in the same tier
68/// are merged when there are enough of them. This creates a logarithmic structure
69/// where larger segments are merged less frequently.
70///
71/// Tiers are defined by powers of `tier_factor`:
72/// - Tier 0: 0 to tier_floor docs
73/// - Tier 1: tier_floor to tier_floor * tier_factor docs
74/// - Tier 2: tier_floor * tier_factor to tier_floor * tier_factor^2 docs
75/// - etc.
76#[derive(Debug, Clone)]
77pub struct TieredMergePolicy {
78    /// Minimum number of segments in a tier before merging (default: 10)
79    pub segments_per_tier: usize,
80    /// Maximum number of segments to merge at once (default: 10)
81    pub max_merge_at_once: usize,
82    /// Factor between tier sizes (default: 10.0)
83    pub tier_factor: f64,
84    /// Minimum segment size (docs) to consider for tiering (default: 1000)
85    pub tier_floor: u32,
86    /// Maximum total docs to merge at once (default: 5_000_000)
87    pub max_merged_docs: u32,
88}
89
90impl Default for TieredMergePolicy {
91    fn default() -> Self {
92        Self {
93            segments_per_tier: 10,
94            max_merge_at_once: 10,
95            tier_factor: 10.0,
96            tier_floor: 1000,
97            max_merged_docs: 5_000_000,
98        }
99    }
100}
101
102impl TieredMergePolicy {
103    /// Create a new tiered merge policy with default settings
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    /// Set segments per tier
109    pub fn with_segments_per_tier(mut self, n: usize) -> Self {
110        self.segments_per_tier = n;
111        self
112    }
113
114    /// Set max merge at once
115    pub fn with_max_merge_at_once(mut self, n: usize) -> Self {
116        self.max_merge_at_once = n;
117        self
118    }
119
120    /// Set tier factor
121    pub fn with_tier_factor(mut self, factor: f64) -> Self {
122        self.tier_factor = factor;
123        self
124    }
125
126    /// Compute the tier for a segment based on its doc count
127    fn compute_tier(&self, num_docs: u32) -> usize {
128        if num_docs <= self.tier_floor {
129            return 0;
130        }
131
132        let ratio = num_docs as f64 / self.tier_floor as f64;
133        (ratio.log(self.tier_factor).floor() as usize) + 1
134    }
135}
136
137impl MergePolicy for TieredMergePolicy {
138    fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
139        if segments.len() < 2 {
140            return Vec::new();
141        }
142
143        // Group segments by tier
144        let mut tiers: std::collections::HashMap<usize, Vec<&SegmentInfo>> =
145            std::collections::HashMap::new();
146
147        for seg in segments {
148            let tier = self.compute_tier(seg.num_docs);
149            tiers.entry(tier).or_default().push(seg);
150        }
151
152        let mut candidates = Vec::new();
153
154        // Find tiers with enough segments to merge
155        for (_tier, tier_segments) in tiers {
156            if tier_segments.len() >= self.segments_per_tier {
157                // Sort by doc count (merge smaller ones first)
158                let mut sorted: Vec<_> = tier_segments;
159                sorted.sort_by_key(|s| s.num_docs);
160
161                // Take up to max_merge_at_once segments
162                let to_merge: Vec<_> = sorted.into_iter().take(self.max_merge_at_once).collect();
163
164                // Check total docs limit
165                let total_docs: u32 = to_merge.iter().map(|s| s.num_docs).sum();
166                if total_docs <= self.max_merged_docs && to_merge.len() >= 2 {
167                    candidates.push(MergeCandidate {
168                        segment_ids: to_merge.into_iter().map(|s| s.id.clone()).collect(),
169                    });
170                }
171            }
172        }
173
174        candidates
175    }
176
177    fn clone_box(&self) -> Box<dyn MergePolicy> {
178        Box::new(self.clone())
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[test]
187    fn test_tiered_policy_compute_tier() {
188        let policy = TieredMergePolicy::default();
189
190        // Tier 0: <= 1000 docs (tier_floor)
191        assert_eq!(policy.compute_tier(500), 0);
192        assert_eq!(policy.compute_tier(1000), 0);
193
194        // Tier 1: 1001 - 9999 docs (ratio < 10)
195        assert_eq!(policy.compute_tier(1001), 1);
196        assert_eq!(policy.compute_tier(5000), 1);
197        assert_eq!(policy.compute_tier(9999), 1);
198
199        // Tier 2: 10000 - 99999 docs (ratio 10-100)
200        assert_eq!(policy.compute_tier(10000), 2);
201        assert_eq!(policy.compute_tier(50000), 2);
202
203        // Tier 3: 100000+ docs
204        assert_eq!(policy.compute_tier(100000), 3);
205    }
206
207    #[test]
208    fn test_tiered_policy_no_merge_few_segments() {
209        let policy = TieredMergePolicy::default();
210
211        let segments = vec![
212            SegmentInfo {
213                id: "a".into(),
214                num_docs: 100,
215                size_bytes: None,
216            },
217            SegmentInfo {
218                id: "b".into(),
219                num_docs: 200,
220                size_bytes: None,
221            },
222        ];
223
224        let merges = policy.find_merges(&segments);
225        assert!(merges.is_empty());
226    }
227
228    #[test]
229    fn test_tiered_policy_merge_same_tier() {
230        let policy = TieredMergePolicy {
231            segments_per_tier: 3,
232            ..Default::default()
233        };
234
235        // All in tier 0
236        let segments: Vec<_> = (0..5)
237            .map(|i| SegmentInfo {
238                id: format!("seg_{}", i),
239                num_docs: 100 + i * 10,
240                size_bytes: None,
241            })
242            .collect();
243
244        let merges = policy.find_merges(&segments);
245        assert_eq!(merges.len(), 1);
246        assert!(merges[0].segment_ids.len() >= 3);
247    }
248
249    #[test]
250    fn test_no_merge_policy() {
251        let policy = NoMergePolicy;
252
253        let segments = vec![
254            SegmentInfo {
255                id: "a".into(),
256                num_docs: 100,
257                size_bytes: None,
258            },
259            SegmentInfo {
260                id: "b".into(),
261                num_docs: 200,
262                size_bytes: None,
263            },
264        ];
265
266        let merges = policy.find_merges(&segments);
267        assert!(merges.is_empty());
268    }
269}