use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::error::Result;
use crate::storage::Storage;
use crate::vector::core::vector::Vector;
use super::manager::ManagedSegmentInfo;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MergeConfig {
pub max_merge_segments: u32,
pub target_segment_size: u64,
pub parallel_merge: bool,
pub num_threads: usize,
}
impl Default for MergeConfig {
fn default() -> Self {
Self {
max_merge_segments: 10,
target_segment_size: 1000000,
parallel_merge: true,
num_threads: 4,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MergeStats {
pub segments_merged: u32,
pub vectors_merged: u64,
pub deletions_removed: u64,
pub merge_time_ms: u64,
pub merged_size_bytes: u64,
}
impl MergeStats {
pub fn new() -> Self {
Self {
segments_merged: 0,
vectors_merged: 0,
deletions_removed: 0,
merge_time_ms: 0,
merged_size_bytes: 0,
}
}
pub fn compression_ratio(&self) -> f64 {
if self.vectors_merged == 0 {
return 1.0;
}
1.0 - (self.deletions_removed as f64 / self.vectors_merged as f64)
}
}
impl Default for MergeStats {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct MergeResult {
pub merged_segment: ManagedSegmentInfo,
pub stats: MergeStats,
pub merged_segment_ids: Vec<String>,
}
pub struct MergeEngine {
config: MergeConfig,
storage: Arc<dyn Storage>,
}
impl MergeEngine {
pub fn new(config: MergeConfig, storage: Arc<dyn Storage>) -> Self {
Self { config, storage }
}
pub fn merge_segments(
&self,
segments: Vec<ManagedSegmentInfo>,
new_segment_id: String,
) -> Result<MergeResult> {
let start_time = crate::util::time::Timer::now();
let segments_merged = segments.len() as u32;
let vectors_merged: u64 = segments.iter().map(|s| s.vector_count).sum();
let total_size: u64 = segments.iter().map(|s| s.size_bytes).sum();
let merged_segment = ManagedSegmentInfo {
segment_id: new_segment_id,
vector_count: vectors_merged,
vector_offset: 0,
generation: segments.iter().map(|s| s.generation).max().unwrap_or(0) + 1,
has_deletions: false,
size_bytes: total_size,
};
let merge_time_ms = start_time.elapsed_ms();
let stats = MergeStats {
segments_merged,
vectors_merged,
deletions_removed: 0,
merge_time_ms,
merged_size_bytes: total_size,
};
Ok(MergeResult {
merged_segment,
stats,
merged_segment_ids: segments.iter().map(|s| s.segment_id.clone()).collect(),
})
}
#[allow(dead_code)]
fn merge_vectors(
&self,
vectors: Vec<Vec<(u64, Vector)>>,
deleted_ids: &[u64],
) -> Vec<(u64, Vector)> {
let mut all_vectors: Vec<(u64, Vector)> = vectors.into_iter().flatten().collect();
all_vectors.retain(|(id, _)| !deleted_ids.contains(id));
all_vectors.sort_by_key(|(id, _)| *id);
all_vectors.dedup_by_key(|(id, _)| *id);
all_vectors
}
pub fn storage(&self) -> &Arc<dyn Storage> {
&self.storage
}
pub fn config(&self) -> &MergeConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::memory::{MemoryStorage, MemoryStorageConfig};
#[test]
fn test_merge_engine_basic() {
let config = MergeConfig::default();
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let engine = MergeEngine::new(config, storage);
let segments = vec![
ManagedSegmentInfo {
segment_id: "seg1".to_string(),
vector_count: 1000,
vector_offset: 0,
generation: 0,
has_deletions: false,
size_bytes: 128000,
},
ManagedSegmentInfo {
segment_id: "seg2".to_string(),
vector_count: 2000,
vector_offset: 1000,
generation: 1,
has_deletions: false,
size_bytes: 256000,
},
];
let result = engine
.merge_segments(segments, "merged_seg".to_string())
.unwrap();
assert_eq!(result.stats.segments_merged, 2);
assert_eq!(result.stats.vectors_merged, 3000);
assert_eq!(result.merged_segment.vector_count, 3000);
assert_eq!(result.merged_segment.generation, 2);
}
#[test]
fn test_merge_stats() {
let stats = MergeStats {
segments_merged: 3,
vectors_merged: 1000,
deletions_removed: 200,
merge_time_ms: 100,
merged_size_bytes: 102400,
};
assert_eq!(stats.compression_ratio(), 0.8);
}
}