use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use crate::error::Result;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SegmentManagerConfig {
pub max_vectors_per_segment: u64,
pub min_vectors_per_segment: u64,
pub max_segments: u32,
pub merge_factor: u32,
}
impl Default for SegmentManagerConfig {
fn default() -> Self {
Self {
max_vectors_per_segment: 1000000,
min_vectors_per_segment: 10000,
max_segments: 100,
merge_factor: 10,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManagedSegmentInfo {
pub segment_id: String,
pub vector_count: u64,
pub vector_offset: u64,
pub generation: u64,
pub has_deletions: bool,
pub size_bytes: u64,
}
impl ManagedSegmentInfo {
pub fn new(segment_id: String, vector_count: u64, vector_offset: u64, generation: u64) -> Self {
Self {
segment_id,
vector_count,
vector_offset,
generation,
has_deletions: false,
size_bytes: 0,
}
}
pub fn should_merge(&self, config: &SegmentManagerConfig) -> bool {
self.vector_count < config.min_vectors_per_segment
}
}
#[derive(Debug, Clone)]
pub struct MergeCandidate {
pub segment_ids: Vec<String>,
pub total_vectors: u64,
pub total_size: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MergeStrategy {
Smallest,
MostDeletions,
Adjacent,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum MergeUrgency {
Low,
Medium,
High,
}
#[derive(Debug, Clone)]
pub struct MergePlan {
pub candidates: Vec<MergeCandidate>,
pub strategy: MergeStrategy,
pub urgency: MergeUrgency,
}
#[derive(Debug, Clone)]
pub struct SegmentManagerStats {
pub segment_count: u32,
pub total_vectors: u64,
pub total_size: u64,
pub segments_with_deletions: u32,
pub avg_vectors_per_segment: f64,
}
pub struct SegmentManager {
config: SegmentManagerConfig,
segments: Arc<RwLock<HashMap<String, ManagedSegmentInfo>>>,
next_segment_id: Arc<RwLock<u64>>,
}
impl SegmentManager {
pub fn new(config: SegmentManagerConfig) -> Self {
Self {
config,
segments: Arc::new(RwLock::new(HashMap::new())),
next_segment_id: Arc::new(RwLock::new(0)),
}
}
pub fn add_segment(&self, info: ManagedSegmentInfo) -> Result<()> {
let mut segments = self.segments.write();
segments.insert(info.segment_id.clone(), info);
Ok(())
}
pub fn remove_segment(&self, segment_id: &str) -> Result<()> {
let mut segments = self.segments.write();
segments.remove(segment_id);
Ok(())
}
pub fn get_segment(&self, segment_id: &str) -> Option<ManagedSegmentInfo> {
let segments = self.segments.read();
segments.get(segment_id).cloned()
}
pub fn list_segments(&self) -> Vec<ManagedSegmentInfo> {
let segments = self.segments.read();
segments.values().cloned().collect()
}
pub fn generate_segment_id(&self) -> String {
let mut next_id = self.next_segment_id.write();
let id = *next_id;
*next_id += 1;
format!("segment_{:06}", id)
}
pub fn needs_merge(&self) -> bool {
let segments = self.segments.read();
segments.len() as u32 > self.config.max_segments
}
pub fn create_merge_plan(&self, strategy: MergeStrategy) -> Option<MergePlan> {
let segments = self.segments.read();
if segments.len() <= 1 {
return None;
}
let mut segment_list: Vec<_> = segments.values().cloned().collect();
match strategy {
MergeStrategy::Smallest => {
segment_list.sort_by_key(|s| s.vector_count);
}
MergeStrategy::MostDeletions => {
segment_list.sort_by(|a, b| b.has_deletions.cmp(&a.has_deletions));
}
MergeStrategy::Adjacent => {
segment_list.sort_by_key(|s| s.vector_offset);
}
}
let merge_count = self.config.merge_factor.min(segment_list.len() as u32) as usize;
let to_merge = &segment_list[..merge_count];
let candidate = MergeCandidate {
segment_ids: to_merge.iter().map(|s| s.segment_id.clone()).collect(),
total_vectors: to_merge.iter().map(|s| s.vector_count).sum(),
total_size: to_merge.iter().map(|s| s.size_bytes).sum(),
};
let urgency = if segments.len() as u32 > self.config.max_segments * 2 {
MergeUrgency::High
} else if segments.len() as u32 > self.config.max_segments {
MergeUrgency::Medium
} else {
MergeUrgency::Low
};
Some(MergePlan {
candidates: vec![candidate],
strategy,
urgency,
})
}
pub fn stats(&self) -> SegmentManagerStats {
let segments = self.segments.read();
let segment_count = segments.len() as u32;
let total_vectors: u64 = segments.values().map(|s| s.vector_count).sum();
let total_size: u64 = segments.values().map(|s| s.size_bytes).sum();
let segments_with_deletions = segments.values().filter(|s| s.has_deletions).count() as u32;
let avg_vectors_per_segment = if segment_count > 0 {
total_vectors as f64 / segment_count as f64
} else {
0.0
};
SegmentManagerStats {
segment_count,
total_vectors,
total_size,
segments_with_deletions,
avg_vectors_per_segment,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_segment_manager_basic() {
let config = SegmentManagerConfig::default();
let manager = SegmentManager::new(config);
let segment_id = manager.generate_segment_id();
assert_eq!(segment_id, "segment_000000");
let info = ManagedSegmentInfo::new(segment_id.clone(), 1000, 0, 0);
manager.add_segment(info.clone()).unwrap();
let retrieved = manager.get_segment(&segment_id).unwrap();
assert_eq!(retrieved.vector_count, 1000);
}
#[test]
fn test_merge_plan_creation() {
let config = SegmentManagerConfig {
max_segments: 5,
merge_factor: 3,
..Default::default()
};
let manager = SegmentManager::new(config);
for i in 0..10 {
let segment_id = manager.generate_segment_id();
let info = ManagedSegmentInfo::new(segment_id, 1000 * (i + 1), i * 1000, 0);
manager.add_segment(info).unwrap();
}
assert!(manager.needs_merge());
let plan = manager.create_merge_plan(MergeStrategy::Smallest);
assert!(plan.is_some());
let plan = plan.unwrap();
assert_eq!(plan.candidates.len(), 1);
assert_eq!(plan.candidates[0].segment_ids.len(), 3);
}
}