use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::error::Result;
use crate::storage::Storage;
use crate::vector::core::vector::Vector;
use super::manager::ManagedSegmentInfo;
use crate::maintenance::deletion::DeletionBitmap;
use crate::vector::index::config::HnswIndexConfig;
use crate::vector::index::hnsw::reader::HnswIndexReader;
use crate::vector::index::hnsw::writer::HnswIndexWriter;
use crate::vector::reader::VectorIndexReader;
use crate::vector::writer::{VectorIndexWriter, VectorIndexWriterConfig};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MergeConfig {
pub max_merge_segments: u32,
pub target_segment_size: u64,
pub parallel_merge: bool,
pub num_threads: usize,
}
impl Default for MergeConfig {
fn default() -> Self {
Self {
max_merge_segments: 10,
target_segment_size: 1000000,
parallel_merge: true,
num_threads: 4,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MergeStats {
pub segments_merged: u32,
pub vectors_merged: u64,
pub deletions_removed: u64,
pub merge_time_ms: u64,
pub merged_size_bytes: u64,
}
impl MergeStats {
pub fn new() -> Self {
Self {
segments_merged: 0,
vectors_merged: 0,
deletions_removed: 0,
merge_time_ms: 0,
merged_size_bytes: 0,
}
}
pub fn compression_ratio(&self) -> f64 {
if self.vectors_merged == 0 {
return 1.0;
}
1.0 - (self.deletions_removed as f64 / self.vectors_merged as f64)
}
}
impl Default for MergeStats {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct MergeResult {
pub merged_segment: ManagedSegmentInfo,
pub stats: MergeStats,
pub merged_segment_ids: Vec<String>,
}
pub struct MergeEngine {
config: MergeConfig,
storage: Arc<dyn Storage>,
index_config: HnswIndexConfig,
writer_config: VectorIndexWriterConfig,
deletion_bitmap: Option<Arc<DeletionBitmap>>,
}
impl MergeEngine {
pub fn new(
config: MergeConfig,
storage: Arc<dyn Storage>,
index_config: HnswIndexConfig,
writer_config: VectorIndexWriterConfig,
) -> Self {
Self {
config,
storage,
index_config,
writer_config,
deletion_bitmap: None,
}
}
pub fn set_deletion_bitmap(&mut self, bitmap: Arc<DeletionBitmap>) {
self.deletion_bitmap = Some(bitmap);
}
pub fn merge_segments(
&self,
segments: Vec<ManagedSegmentInfo>,
new_segment_id: String,
) -> Result<MergeResult> {
let start_time = std::time::Instant::now();
let segments_merged = segments.len() as u32;
#[allow(unused_assignments)]
let mut vectors_merged = 0;
let mut deletions_removed = 0;
#[allow(unused_assignments)]
let mut total_size = segments.iter().map(|s| s.size_bytes).sum::<u64>();
let mut all_vectors: Vec<(u64, String, Vector)> = Vec::new();
for segment in &segments {
let reader = HnswIndexReader::load(
self.storage.as_ref(),
&segment.segment_id,
self.index_config.distance_metric,
)?;
let mut iterator = reader.vector_iterator()?;
while let Some((doc_id, field, vector)) = iterator.next()? {
if let Some(bitmap) = &self.deletion_bitmap
&& bitmap.is_deleted(doc_id)
{
deletions_removed += 1;
continue;
}
all_vectors.push((doc_id, field, vector));
}
}
let mut writer = HnswIndexWriter::with_storage(
self.index_config.clone(),
self.writer_config.clone(),
&new_segment_id,
self.storage.clone(),
)?;
writer.add_vectors(all_vectors.clone())?;
writer.finalize()?;
writer.write()?;
vectors_merged = all_vectors.len() as u64;
total_size = vectors_merged * 128;
let merge_time_ms = start_time.elapsed().as_millis() as u64;
let merged_segment = ManagedSegmentInfo {
segment_id: new_segment_id,
vector_count: vectors_merged,
vector_offset: 0,
generation: segments.iter().map(|s| s.generation).max().unwrap_or(0) + 1,
has_deletions: false,
size_bytes: total_size,
};
let stats = MergeStats {
segments_merged,
vectors_merged,
deletions_removed,
merge_time_ms,
merged_size_bytes: total_size,
};
Ok(MergeResult {
merged_segment,
stats,
merged_segment_ids: segments.iter().map(|s| s.segment_id.clone()).collect(),
})
}
#[allow(dead_code)]
fn merge_vectors(
&self,
vectors: Vec<Vec<(u64, Vector)>>,
deleted_ids: &[u64],
) -> Vec<(u64, Vector)> {
let mut all_vectors: Vec<(u64, Vector)> = vectors.into_iter().flatten().collect();
all_vectors.retain(|(id, _)| !deleted_ids.contains(id));
all_vectors.reverse();
all_vectors.sort_by_key(|(id, _)| *id);
all_vectors.dedup_by_key(|(id, _)| *id);
all_vectors
}
pub fn storage(&self) -> &Arc<dyn Storage> {
&self.storage
}
pub fn config(&self) -> &MergeConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::memory::{MemoryStorage, MemoryStorageConfig};
#[test]
fn test_merge_engine_basic() {
let config = MergeConfig::default();
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let index_config = HnswIndexConfig::default();
let writer_config = VectorIndexWriterConfig::default();
let engine = MergeEngine::new(config, storage, index_config, writer_config);
let _segments = [ManagedSegmentInfo {
segment_id: "seg1".to_string(),
vector_count: 1000,
vector_offset: 0,
generation: 0,
has_deletions: false,
size_bytes: 128000,
}];
assert_eq!(engine.config.max_merge_segments, 10);
}
#[test]
fn test_merge_stats() {
let stats = MergeStats {
segments_merged: 3,
vectors_merged: 1000,
deletions_removed: 200,
merge_time_ms: 100,
merged_size_bytes: 102400,
};
assert_eq!(stats.compression_ratio(), 0.8);
}
}