1use std::fmt::Debug;
7
8#[cfg(feature = "native")]
9mod segment_manager;
10#[cfg(feature = "native")]
11pub use segment_manager::SegmentManager;
12
13#[derive(Debug, Clone)]
15pub struct SegmentInfo {
16 pub id: String,
18 pub num_docs: u32,
20}
21
22#[derive(Debug, Clone)]
24pub struct MergeCandidate {
25 pub segment_ids: Vec<String>,
27}
28
29pub trait MergePolicy: Send + Sync + Debug {
33 fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate>;
36
37 fn clone_box(&self) -> Box<dyn MergePolicy>;
39}
40
41impl Clone for Box<dyn MergePolicy> {
42 fn clone(&self) -> Self {
43 self.clone_box()
44 }
45}
46
47#[derive(Debug, Clone, Default)]
49pub struct NoMergePolicy;
50
51impl MergePolicy for NoMergePolicy {
52 fn find_merges(&self, _segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
53 Vec::new()
54 }
55
56 fn clone_box(&self) -> Box<dyn MergePolicy> {
57 Box::new(self.clone())
58 }
59}
60
61#[derive(Debug, Clone)]
73pub struct TieredMergePolicy {
74 pub segments_per_tier: usize,
76 pub max_merge_at_once: usize,
79 pub tier_factor: f64,
81 pub tier_floor: u32,
83 pub max_merged_docs: u32,
85}
86
87impl Default for TieredMergePolicy {
88 fn default() -> Self {
89 Self {
90 segments_per_tier: 10,
91 max_merge_at_once: 10,
92 tier_factor: 10.0,
93 tier_floor: 1000,
94 max_merged_docs: 5_000_000,
95 }
96 }
97}
98
99impl TieredMergePolicy {
100 pub fn new() -> Self {
102 Self::default()
103 }
104
105 pub fn aggressive() -> Self {
111 Self {
112 segments_per_tier: 3,
113 max_merge_at_once: 10,
114 tier_factor: 10.0,
115 tier_floor: 500,
116 max_merged_docs: 10_000_000,
117 }
118 }
119}
120
121impl MergePolicy for TieredMergePolicy {
122 fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
123 if segments.len() < 2 {
124 return Vec::new();
125 }
126
127 let mut sorted: Vec<&SegmentInfo> = segments.iter().collect();
131 sorted.sort_by_key(|s| s.num_docs);
132
133 let mut candidates = Vec::new();
134 let mut used = vec![false; sorted.len()];
135 let max_ratio = self.tier_factor as u64;
136
137 let mut start = 0;
138 loop {
139 while start < sorted.len() && used[start] {
141 start += 1;
142 }
143 if start >= sorted.len() {
144 break;
145 }
146
147 let mut group = vec![start];
154 let mut total_docs: u64 = sorted[start].num_docs as u64;
155
156 for j in (start + 1)..sorted.len() {
157 if used[j] {
158 continue;
159 }
160 if group.len() >= self.max_merge_at_once {
161 break;
162 }
163 let next_docs = sorted[j].num_docs as u64;
164 if total_docs + next_docs > self.max_merged_docs as u64 {
165 break;
166 }
167 if next_docs > total_docs.max(1) * max_ratio {
172 break;
173 }
174 group.push(j);
175 total_docs += next_docs;
176 }
177
178 if group.len() >= self.segments_per_tier && group.len() >= 2 {
179 for &i in &group {
180 used[i] = true;
181 }
182 candidates.push(MergeCandidate {
183 segment_ids: group.iter().map(|&i| sorted[i].id.clone()).collect(),
184 });
185 }
186
187 start += 1;
190 }
191
192 candidates
193 }
194
195 fn clone_box(&self) -> Box<dyn MergePolicy> {
196 Box::new(self.clone())
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203
204 fn compute_tier(policy: &TieredMergePolicy, num_docs: u32) -> usize {
206 if num_docs <= policy.tier_floor {
207 return 0;
208 }
209 let ratio = num_docs as f64 / policy.tier_floor as f64;
210 (ratio.log(policy.tier_factor).floor() as usize) + 1
211 }
212
213 #[test]
214 fn test_tiered_policy_compute_tier() {
215 let policy = TieredMergePolicy::default();
216
217 assert_eq!(compute_tier(&policy, 500), 0);
219 assert_eq!(compute_tier(&policy, 1000), 0);
220
221 assert_eq!(compute_tier(&policy, 1001), 1);
223 assert_eq!(compute_tier(&policy, 5000), 1);
224 assert_eq!(compute_tier(&policy, 9999), 1);
225
226 assert_eq!(compute_tier(&policy, 10000), 2);
228 assert_eq!(compute_tier(&policy, 50000), 2);
229
230 assert_eq!(compute_tier(&policy, 100000), 3);
232 }
233
234 #[test]
235 fn test_tiered_policy_no_merge_few_segments() {
236 let policy = TieredMergePolicy::default();
237
238 let segments = vec![
239 SegmentInfo {
240 id: "a".into(),
241 num_docs: 100,
242 },
243 SegmentInfo {
244 id: "b".into(),
245 num_docs: 200,
246 },
247 ];
248
249 assert!(policy.find_merges(&segments).is_empty());
250 }
251
252 #[test]
253 fn test_tiered_policy_merge_same_size() {
254 let policy = TieredMergePolicy {
255 segments_per_tier: 3,
256 ..Default::default()
257 };
258
259 let segments: Vec<_> = (0..5)
261 .map(|i| SegmentInfo {
262 id: format!("seg_{}", i),
263 num_docs: 100 + i * 10,
264 })
265 .collect();
266
267 let candidates = policy.find_merges(&segments);
268 assert_eq!(candidates.len(), 1);
269 assert_eq!(candidates[0].segment_ids.len(), 5);
270 }
271
272 #[test]
273 fn test_tiered_policy_cross_tier_promotion() {
274 let policy = TieredMergePolicy {
275 segments_per_tier: 3,
276 tier_factor: 10.0,
277 tier_floor: 1000,
278 max_merge_at_once: 20,
279 max_merged_docs: 5_000_000,
280 };
281
282 let mut segments: Vec<_> = (0..4)
285 .map(|i| SegmentInfo {
286 id: format!("small_{}", i),
287 num_docs: 100 + i * 10,
288 })
289 .collect();
290 for i in 0..3 {
291 segments.push(SegmentInfo {
292 id: format!("medium_{}", i),
293 num_docs: 2000 + i * 500,
294 });
295 }
296
297 let candidates = policy.find_merges(&segments);
298 assert_eq!(
299 candidates.len(),
300 1,
301 "should merge all into one cross-tier group"
302 );
303 assert_eq!(
304 candidates[0].segment_ids.len(),
305 7,
306 "all 7 segments should be in the merge"
307 );
308 }
309
310 #[test]
311 fn test_tiered_policy_ratio_guard_separates_groups() {
312 let policy = TieredMergePolicy {
313 segments_per_tier: 3,
314 tier_factor: 10.0,
315 tier_floor: 100,
316 max_merge_at_once: 20,
317 max_merged_docs: 5_000_000,
318 };
319
320 let mut segments: Vec<_> = (0..4)
326 .map(|i| SegmentInfo {
327 id: format!("tiny_{}", i),
328 num_docs: 10,
329 })
330 .collect();
331 for i in 0..4 {
332 segments.push(SegmentInfo {
333 id: format!("large_{}", i),
334 num_docs: 100_000 + i * 100,
335 });
336 }
337
338 let candidates = policy.find_merges(&segments);
339 assert_eq!(candidates.len(), 2, "should produce two separate groups");
340
341 assert_eq!(candidates[0].segment_ids.len(), 4);
343 assert!(candidates[0].segment_ids[0].starts_with("tiny_"));
344
345 assert_eq!(candidates[1].segment_ids.len(), 4);
347 assert!(candidates[1].segment_ids[0].starts_with("large_"));
348 }
349
350 #[test]
351 fn test_tiered_policy_small_segments_skip_to_large_group() {
352 let policy = TieredMergePolicy {
353 segments_per_tier: 3,
354 tier_factor: 10.0,
355 tier_floor: 1000,
356 max_merge_at_once: 10,
357 max_merged_docs: 5_000_000,
358 };
359
360 let mut segments = vec![
363 SegmentInfo {
364 id: "tiny_0".into(),
365 num_docs: 10,
366 },
367 SegmentInfo {
368 id: "tiny_1".into(),
369 num_docs: 20,
370 },
371 ];
372 for i in 0..5 {
373 segments.push(SegmentInfo {
374 id: format!("medium_{}", i),
375 num_docs: 5000 + i * 100,
376 });
377 }
378
379 let candidates = policy.find_merges(&segments);
380 assert!(
381 !candidates.is_empty(),
382 "should find a merge even though tiny segments can't form a group"
383 );
384 let total_segs: usize = candidates.iter().map(|c| c.segment_ids.len()).sum();
386 assert!(
387 total_segs >= 5,
388 "should merge at least the 5 medium segments"
389 );
390 }
391
392 #[test]
393 fn test_tiered_policy_respects_max_merged_docs() {
394 let policy = TieredMergePolicy {
395 segments_per_tier: 3,
396 max_merge_at_once: 100,
397 tier_factor: 10.0,
398 tier_floor: 1000,
399 max_merged_docs: 500,
400 };
401
402 let segments: Vec<_> = (0..10)
404 .map(|i| SegmentInfo {
405 id: format!("seg_{}", i),
406 num_docs: 100,
407 })
408 .collect();
409
410 let candidates = policy.find_merges(&segments);
411 for c in &candidates {
412 let total: u64 = c
413 .segment_ids
414 .iter()
415 .map(|id| segments.iter().find(|s| s.id == *id).unwrap().num_docs as u64)
416 .sum();
417 assert!(
418 total <= 500,
419 "merge total {} exceeds max_merged_docs 500",
420 total
421 );
422 }
423 }
424
425 #[test]
426 fn test_tiered_policy_large_segment_not_remerged_with_small() {
427 let policy = TieredMergePolicy::default(); let mut segments = vec![SegmentInfo {
435 id: "large_merged".into(),
436 num_docs: 50_000,
437 }];
438 for i in 0..5 {
439 segments.push(SegmentInfo {
440 id: format!("new_{}", i),
441 num_docs: 500,
442 });
443 }
444
445 let candidates = policy.find_merges(&segments);
448 assert!(
449 candidates.is_empty(),
450 "should not re-merge large segment with 5 small ones: {:?}",
451 candidates
452 );
453
454 for i in 5..10 {
457 segments.push(SegmentInfo {
458 id: format!("new_{}", i),
459 num_docs: 500,
460 });
461 }
462
463 let candidates = policy.find_merges(&segments);
464 assert_eq!(candidates.len(), 1, "should merge the 10 small segments");
465 assert!(
466 !candidates[0].segment_ids.contains(&"large_merged".into()),
467 "large segment must NOT be in the merge group"
468 );
469 assert_eq!(
470 candidates[0].segment_ids.len(),
471 10,
472 "all 10 small segments should be merged"
473 );
474 }
475
476 #[test]
477 fn test_no_merge_policy() {
478 let policy = NoMergePolicy;
479
480 let segments = vec![
481 SegmentInfo {
482 id: "a".into(),
483 num_docs: 100,
484 },
485 SegmentInfo {
486 id: "b".into(),
487 num_docs: 200,
488 },
489 ];
490
491 assert!(policy.find_merges(&segments).is_empty());
492 }
493}