1use std::fmt::Debug;
7
8#[cfg(feature = "native")]
9mod scheduler;
10#[cfg(feature = "native")]
11pub use scheduler::SegmentManager;
12
13#[derive(Debug, Clone)]
15pub struct SegmentInfo {
16 pub id: String,
18 pub num_docs: u32,
20 pub size_bytes: Option<u64>,
22}
23
24#[derive(Debug, Clone)]
26pub struct MergeCandidate {
27 pub segment_ids: Vec<String>,
29}
30
31pub trait MergePolicy: Send + Sync + Debug {
35 fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate>;
40
41 fn clone_box(&self) -> Box<dyn MergePolicy>;
43}
44
45impl Clone for Box<dyn MergePolicy> {
46 fn clone(&self) -> Self {
47 self.clone_box()
48 }
49}
50
51#[derive(Debug, Clone, Default)]
53pub struct NoMergePolicy;
54
55impl MergePolicy for NoMergePolicy {
56 fn find_merges(&self, _segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
57 Vec::new()
58 }
59
60 fn clone_box(&self) -> Box<dyn MergePolicy> {
61 Box::new(self.clone())
62 }
63}
64
65#[derive(Debug, Clone)]
77pub struct TieredMergePolicy {
78 pub segments_per_tier: usize,
80 pub max_merge_at_once: usize,
82 pub tier_factor: f64,
84 pub tier_floor: u32,
86 pub max_merged_docs: u32,
88}
89
90impl Default for TieredMergePolicy {
91 fn default() -> Self {
92 Self {
93 segments_per_tier: 10,
94 max_merge_at_once: 10,
95 tier_factor: 10.0,
96 tier_floor: 1000,
97 max_merged_docs: 5_000_000,
98 }
99 }
100}
101
102impl TieredMergePolicy {
103 pub fn new() -> Self {
105 Self::default()
106 }
107
108 pub fn aggressive() -> Self {
114 Self {
115 segments_per_tier: 3,
116 max_merge_at_once: 10,
117 tier_factor: 10.0,
118 tier_floor: 500,
119 max_merged_docs: 10_000_000,
120 }
121 }
122
123 pub fn with_segments_per_tier(mut self, n: usize) -> Self {
125 self.segments_per_tier = n;
126 self
127 }
128
129 pub fn with_max_merge_at_once(mut self, n: usize) -> Self {
131 self.max_merge_at_once = n;
132 self
133 }
134
135 pub fn with_tier_factor(mut self, factor: f64) -> Self {
137 self.tier_factor = factor;
138 self
139 }
140
141 pub fn with_tier_floor(mut self, floor: u32) -> Self {
143 self.tier_floor = floor;
144 self
145 }
146
147 fn compute_tier(&self, num_docs: u32) -> usize {
149 if num_docs <= self.tier_floor {
150 return 0;
151 }
152
153 let ratio = num_docs as f64 / self.tier_floor as f64;
154 (ratio.log(self.tier_factor).floor() as usize) + 1
155 }
156}
157
158impl MergePolicy for TieredMergePolicy {
159 fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
160 if segments.len() < 2 {
161 return Vec::new();
162 }
163
164 let mut tiers: std::collections::HashMap<usize, Vec<&SegmentInfo>> =
166 std::collections::HashMap::new();
167
168 for seg in segments {
169 let tier = self.compute_tier(seg.num_docs);
170 tiers.entry(tier).or_default().push(seg);
171 }
172
173 let mut candidates = Vec::new();
174
175 for (_tier, tier_segments) in tiers {
177 if tier_segments.len() >= self.segments_per_tier {
178 let mut sorted: Vec<_> = tier_segments;
180 sorted.sort_by_key(|s| s.num_docs);
181
182 let to_merge: Vec<_> = sorted.into_iter().take(self.max_merge_at_once).collect();
184
185 let total_docs: u32 = to_merge.iter().map(|s| s.num_docs).sum();
187 if total_docs <= self.max_merged_docs && to_merge.len() >= 2 {
188 candidates.push(MergeCandidate {
189 segment_ids: to_merge.into_iter().map(|s| s.id.clone()).collect(),
190 });
191 }
192 }
193 }
194
195 candidates
196 }
197
198 fn clone_box(&self) -> Box<dyn MergePolicy> {
199 Box::new(self.clone())
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[test]
208 fn test_tiered_policy_compute_tier() {
209 let policy = TieredMergePolicy::default();
210
211 assert_eq!(policy.compute_tier(500), 0);
213 assert_eq!(policy.compute_tier(1000), 0);
214
215 assert_eq!(policy.compute_tier(1001), 1);
217 assert_eq!(policy.compute_tier(5000), 1);
218 assert_eq!(policy.compute_tier(9999), 1);
219
220 assert_eq!(policy.compute_tier(10000), 2);
222 assert_eq!(policy.compute_tier(50000), 2);
223
224 assert_eq!(policy.compute_tier(100000), 3);
226 }
227
228 #[test]
229 fn test_tiered_policy_no_merge_few_segments() {
230 let policy = TieredMergePolicy::default();
231
232 let segments = vec![
233 SegmentInfo {
234 id: "a".into(),
235 num_docs: 100,
236 size_bytes: None,
237 },
238 SegmentInfo {
239 id: "b".into(),
240 num_docs: 200,
241 size_bytes: None,
242 },
243 ];
244
245 let merges = policy.find_merges(&segments);
246 assert!(merges.is_empty());
247 }
248
249 #[test]
250 fn test_tiered_policy_merge_same_tier() {
251 let policy = TieredMergePolicy {
252 segments_per_tier: 3,
253 ..Default::default()
254 };
255
256 let segments: Vec<_> = (0..5)
258 .map(|i| SegmentInfo {
259 id: format!("seg_{}", i),
260 num_docs: 100 + i * 10,
261 size_bytes: None,
262 })
263 .collect();
264
265 let merges = policy.find_merges(&segments);
266 assert_eq!(merges.len(), 1);
267 assert!(merges[0].segment_ids.len() >= 3);
268 }
269
270 #[test]
271 fn test_no_merge_policy() {
272 let policy = NoMergePolicy;
273
274 let segments = vec![
275 SegmentInfo {
276 id: "a".into(),
277 num_docs: 100,
278 size_bytes: None,
279 },
280 SegmentInfo {
281 id: "b".into(),
282 num_docs: 200,
283 size_bytes: None,
284 },
285 ];
286
287 let merges = policy.find_merges(&segments);
288 assert!(merges.is_empty());
289 }
290}