1use std::fmt::Debug;
7
8#[cfg(feature = "native")]
9mod scheduler;
10#[cfg(feature = "native")]
11pub use scheduler::SegmentManager;
12
13#[derive(Debug, Clone)]
15pub struct SegmentInfo {
16 pub id: String,
18 pub num_docs: u32,
20 pub size_bytes: Option<u64>,
22}
23
24#[derive(Debug, Clone)]
26pub struct MergeCandidate {
27 pub segment_ids: Vec<String>,
29}
30
31pub trait MergePolicy: Send + Sync + Debug {
35 fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate>;
40
41 fn clone_box(&self) -> Box<dyn MergePolicy>;
43}
44
45impl Clone for Box<dyn MergePolicy> {
46 fn clone(&self) -> Self {
47 self.clone_box()
48 }
49}
50
51#[derive(Debug, Clone, Default)]
53pub struct NoMergePolicy;
54
55impl MergePolicy for NoMergePolicy {
56 fn find_merges(&self, _segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
57 Vec::new()
58 }
59
60 fn clone_box(&self) -> Box<dyn MergePolicy> {
61 Box::new(self.clone())
62 }
63}
64
65#[derive(Debug, Clone)]
77pub struct TieredMergePolicy {
78 pub segments_per_tier: usize,
80 pub max_merge_at_once: usize,
82 pub tier_factor: f64,
84 pub tier_floor: u32,
86 pub max_merged_docs: u32,
88}
89
90impl Default for TieredMergePolicy {
91 fn default() -> Self {
92 Self {
93 segments_per_tier: 10,
94 max_merge_at_once: 10,
95 tier_factor: 10.0,
96 tier_floor: 1000,
97 max_merged_docs: 5_000_000,
98 }
99 }
100}
101
102impl TieredMergePolicy {
103 pub fn new() -> Self {
105 Self::default()
106 }
107
108 pub fn with_segments_per_tier(mut self, n: usize) -> Self {
110 self.segments_per_tier = n;
111 self
112 }
113
114 pub fn with_max_merge_at_once(mut self, n: usize) -> Self {
116 self.max_merge_at_once = n;
117 self
118 }
119
120 pub fn with_tier_factor(mut self, factor: f64) -> Self {
122 self.tier_factor = factor;
123 self
124 }
125
126 fn compute_tier(&self, num_docs: u32) -> usize {
128 if num_docs <= self.tier_floor {
129 return 0;
130 }
131
132 let ratio = num_docs as f64 / self.tier_floor as f64;
133 (ratio.log(self.tier_factor).floor() as usize) + 1
134 }
135}
136
137impl MergePolicy for TieredMergePolicy {
138 fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
139 if segments.len() < 2 {
140 return Vec::new();
141 }
142
143 let mut tiers: std::collections::HashMap<usize, Vec<&SegmentInfo>> =
145 std::collections::HashMap::new();
146
147 for seg in segments {
148 let tier = self.compute_tier(seg.num_docs);
149 tiers.entry(tier).or_default().push(seg);
150 }
151
152 let mut candidates = Vec::new();
153
154 for (_tier, tier_segments) in tiers {
156 if tier_segments.len() >= self.segments_per_tier {
157 let mut sorted: Vec<_> = tier_segments;
159 sorted.sort_by_key(|s| s.num_docs);
160
161 let to_merge: Vec<_> = sorted.into_iter().take(self.max_merge_at_once).collect();
163
164 let total_docs: u32 = to_merge.iter().map(|s| s.num_docs).sum();
166 if total_docs <= self.max_merged_docs && to_merge.len() >= 2 {
167 candidates.push(MergeCandidate {
168 segment_ids: to_merge.into_iter().map(|s| s.id.clone()).collect(),
169 });
170 }
171 }
172 }
173
174 candidates
175 }
176
177 fn clone_box(&self) -> Box<dyn MergePolicy> {
178 Box::new(self.clone())
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[test]
187 fn test_tiered_policy_compute_tier() {
188 let policy = TieredMergePolicy::default();
189
190 assert_eq!(policy.compute_tier(500), 0);
192 assert_eq!(policy.compute_tier(1000), 0);
193
194 assert_eq!(policy.compute_tier(1001), 1);
196 assert_eq!(policy.compute_tier(5000), 1);
197 assert_eq!(policy.compute_tier(9999), 1);
198
199 assert_eq!(policy.compute_tier(10000), 2);
201 assert_eq!(policy.compute_tier(50000), 2);
202
203 assert_eq!(policy.compute_tier(100000), 3);
205 }
206
207 #[test]
208 fn test_tiered_policy_no_merge_few_segments() {
209 let policy = TieredMergePolicy::default();
210
211 let segments = vec![
212 SegmentInfo {
213 id: "a".into(),
214 num_docs: 100,
215 size_bytes: None,
216 },
217 SegmentInfo {
218 id: "b".into(),
219 num_docs: 200,
220 size_bytes: None,
221 },
222 ];
223
224 let merges = policy.find_merges(&segments);
225 assert!(merges.is_empty());
226 }
227
228 #[test]
229 fn test_tiered_policy_merge_same_tier() {
230 let policy = TieredMergePolicy {
231 segments_per_tier: 3,
232 ..Default::default()
233 };
234
235 let segments: Vec<_> = (0..5)
237 .map(|i| SegmentInfo {
238 id: format!("seg_{}", i),
239 num_docs: 100 + i * 10,
240 size_bytes: None,
241 })
242 .collect();
243
244 let merges = policy.find_merges(&segments);
245 assert_eq!(merges.len(), 1);
246 assert!(merges[0].segment_ids.len() >= 3);
247 }
248
249 #[test]
250 fn test_no_merge_policy() {
251 let policy = NoMergePolicy;
252
253 let segments = vec![
254 SegmentInfo {
255 id: "a".into(),
256 num_docs: 100,
257 size_bytes: None,
258 },
259 SegmentInfo {
260 id: "b".into(),
261 num_docs: 200,
262 size_bytes: None,
263 },
264 ];
265
266 let merges = policy.find_merges(&segments);
267 assert!(merges.is_empty());
268 }
269}