use std::fmt::Debug;
#[cfg(feature = "native")]
mod scheduler;
#[cfg(feature = "native")]
pub use scheduler::SegmentManager;
#[derive(Debug, Clone)]
pub struct SegmentInfo {
pub id: String,
pub num_docs: u32,
pub size_bytes: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct MergeCandidate {
pub segment_ids: Vec<String>,
}
pub trait MergePolicy: Send + Sync + Debug {
fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate>;
fn clone_box(&self) -> Box<dyn MergePolicy>;
}
impl Clone for Box<dyn MergePolicy> {
fn clone(&self) -> Self {
self.clone_box()
}
}
#[derive(Debug, Clone, Default)]
pub struct NoMergePolicy;
impl MergePolicy for NoMergePolicy {
fn find_merges(&self, _segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
Vec::new()
}
fn clone_box(&self) -> Box<dyn MergePolicy> {
Box::new(self.clone())
}
}
#[derive(Debug, Clone)]
pub struct TieredMergePolicy {
pub segments_per_tier: usize,
pub max_merge_at_once: usize,
pub tier_factor: f64,
pub tier_floor: u32,
pub max_merged_docs: u32,
}
impl Default for TieredMergePolicy {
fn default() -> Self {
Self {
segments_per_tier: 10,
max_merge_at_once: 10,
tier_factor: 10.0,
tier_floor: 1000,
max_merged_docs: 5_000_000,
}
}
}
impl TieredMergePolicy {
pub fn new() -> Self {
Self::default()
}
pub fn with_segments_per_tier(mut self, n: usize) -> Self {
self.segments_per_tier = n;
self
}
pub fn with_max_merge_at_once(mut self, n: usize) -> Self {
self.max_merge_at_once = n;
self
}
pub fn with_tier_factor(mut self, factor: f64) -> Self {
self.tier_factor = factor;
self
}
fn compute_tier(&self, num_docs: u32) -> usize {
if num_docs <= self.tier_floor {
return 0;
}
let ratio = num_docs as f64 / self.tier_floor as f64;
(ratio.log(self.tier_factor).floor() as usize) + 1
}
}
impl MergePolicy for TieredMergePolicy {
fn find_merges(&self, segments: &[SegmentInfo]) -> Vec<MergeCandidate> {
if segments.len() < 2 {
return Vec::new();
}
let mut tiers: std::collections::HashMap<usize, Vec<&SegmentInfo>> =
std::collections::HashMap::new();
for seg in segments {
let tier = self.compute_tier(seg.num_docs);
tiers.entry(tier).or_default().push(seg);
}
let mut candidates = Vec::new();
for (_tier, tier_segments) in tiers {
if tier_segments.len() >= self.segments_per_tier {
let mut sorted: Vec<_> = tier_segments;
sorted.sort_by_key(|s| s.num_docs);
let to_merge: Vec<_> = sorted.into_iter().take(self.max_merge_at_once).collect();
let total_docs: u32 = to_merge.iter().map(|s| s.num_docs).sum();
if total_docs <= self.max_merged_docs && to_merge.len() >= 2 {
candidates.push(MergeCandidate {
segment_ids: to_merge.into_iter().map(|s| s.id.clone()).collect(),
});
}
}
}
candidates
}
fn clone_box(&self) -> Box<dyn MergePolicy> {
Box::new(self.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tiered_policy_compute_tier() {
let policy = TieredMergePolicy::default();
assert_eq!(policy.compute_tier(500), 0);
assert_eq!(policy.compute_tier(1000), 0);
assert_eq!(policy.compute_tier(1001), 1);
assert_eq!(policy.compute_tier(5000), 1);
assert_eq!(policy.compute_tier(9999), 1);
assert_eq!(policy.compute_tier(10000), 2);
assert_eq!(policy.compute_tier(50000), 2);
assert_eq!(policy.compute_tier(100000), 3);
}
#[test]
fn test_tiered_policy_no_merge_few_segments() {
let policy = TieredMergePolicy::default();
let segments = vec![
SegmentInfo {
id: "a".into(),
num_docs: 100,
size_bytes: None,
},
SegmentInfo {
id: "b".into(),
num_docs: 200,
size_bytes: None,
},
];
let merges = policy.find_merges(&segments);
assert!(merges.is_empty());
}
#[test]
fn test_tiered_policy_merge_same_tier() {
let policy = TieredMergePolicy {
segments_per_tier: 3,
..Default::default()
};
let segments: Vec<_> = (0..5)
.map(|i| SegmentInfo {
id: format!("seg_{}", i),
num_docs: 100 + i * 10,
size_bytes: None,
})
.collect();
let merges = policy.find_merges(&segments);
assert_eq!(merges.len(), 1);
assert!(merges[0].segment_ids.len() >= 3);
}
#[test]
fn test_no_merge_policy() {
let policy = NoMergePolicy;
let segments = vec![
SegmentInfo {
id: "a".into(),
num_docs: 100,
size_bytes: None,
},
SegmentInfo {
id: "b".into(),
num_docs: 200,
size_bytes: None,
},
];
let merges = policy.find_merges(&segments);
assert!(merges.is_empty());
}
}