use std::sync::Arc;
use rayon::prelude::*;
use crate::error::{LaurusError, Result};
use crate::storage::Storage;
use crate::vector::core::vector::Vector;
use crate::vector::index::IvfIndexConfig;
use crate::vector::index::field::LegacyVectorFieldWriter;
use crate::vector::writer::{VectorIndexWriter, VectorIndexWriterConfig};
use serde::{Deserialize, Serialize};
#[derive(Debug)]
pub struct IvfIndexWriter {
index_config: IvfIndexConfig,
writer_config: VectorIndexWriterConfig,
storage: Option<Arc<dyn Storage>>,
path: String,
centroids: Vec<Vector>, inverted_lists: Vec<Vec<(u64, String, Vector)>>, vectors: Vec<(u64, String, Vector)>, is_finalized: bool,
total_vectors_to_add: Option<usize>,
next_vec_id: u64,
}
impl IvfIndexWriter {
pub fn new(
index_config: IvfIndexConfig,
writer_config: VectorIndexWriterConfig,
path: impl Into<String>,
) -> Result<Self> {
Ok(Self {
index_config,
writer_config,
storage: None,
path: path.into(),
centroids: Vec::new(),
inverted_lists: Vec::new(),
vectors: Vec::new(),
is_finalized: false,
total_vectors_to_add: None,
next_vec_id: 0,
})
}
pub fn with_storage(
index_config: IvfIndexConfig,
writer_config: VectorIndexWriterConfig,
path: impl Into<String>,
storage: Arc<dyn Storage>,
) -> Result<Self> {
let path = path.into();
let file_name = format!("{}.ivf", path);
if storage.file_exists(&file_name) {
return Self::load(index_config, writer_config, storage, &path);
}
Ok(Self {
index_config,
writer_config,
storage: Some(storage),
path,
centroids: Vec::new(),
inverted_lists: Vec::new(),
vectors: Vec::new(),
is_finalized: false,
total_vectors_to_add: None,
next_vec_id: 0,
})
}
pub fn into_field_writer(self, field_name: impl Into<String>) -> LegacyVectorFieldWriter<Self> {
LegacyVectorFieldWriter::new(field_name, self)
}
pub fn load(
index_config: IvfIndexConfig,
writer_config: VectorIndexWriterConfig,
storage: Arc<dyn Storage>,
path: &str,
) -> Result<Self> {
use std::io::Read;
let file_name = format!("{}.ivf", path);
let mut input = storage.open_input(&file_name)?;
let mut num_vectors_buf = [0u8; 4];
input.read_exact(&mut num_vectors_buf)?;
let num_vectors = u32::from_le_bytes(num_vectors_buf) as usize;
let mut dimension_buf = [0u8; 4];
input.read_exact(&mut dimension_buf)?;
let dimension = u32::from_le_bytes(dimension_buf) as usize;
let mut n_clusters_buf = [0u8; 4];
input.read_exact(&mut n_clusters_buf)?;
let n_clusters = u32::from_le_bytes(n_clusters_buf) as usize;
let mut n_probe_buf = [0u8; 4];
input.read_exact(&mut n_probe_buf)?;
let _n_probe = u32::from_le_bytes(n_probe_buf) as usize;
if dimension != index_config.dimension {
return Err(LaurusError::InvalidOperation(format!(
"Dimension mismatch: expected {}, found {}",
index_config.dimension, dimension
)));
}
let mut centroids = Vec::with_capacity(n_clusters);
for _ in 0..n_clusters {
let mut values = vec![0.0f32; dimension];
for value in &mut values {
let mut value_buf = [0u8; 4];
input.read_exact(&mut value_buf)?;
*value = f32::from_le_bytes(value_buf);
}
centroids.push(Vector::new(values));
}
let mut inverted_lists = vec![Vec::new(); n_clusters];
for list in &mut inverted_lists {
let mut list_size_buf = [0u8; 4];
input.read_exact(&mut list_size_buf)?;
let list_size = u32::from_le_bytes(list_size_buf) as usize;
for _ in 0..list_size {
let mut doc_id_buf = [0u8; 8];
input.read_exact(&mut doc_id_buf)?;
let doc_id = u64::from_le_bytes(doc_id_buf);
let mut field_name_len_buf = [0u8; 4];
input.read_exact(&mut field_name_len_buf)?;
let field_name_len = u32::from_le_bytes(field_name_len_buf) as usize;
let mut field_name_buf = vec![0u8; field_name_len];
input.read_exact(&mut field_name_buf)?;
let field_name = String::from_utf8(field_name_buf).map_err(|e| {
LaurusError::InvalidOperation(format!("Invalid UTF-8 in field name: {}", e))
})?;
let mut values = vec![0.0f32; dimension];
for value in &mut values {
let mut value_buf = [0u8; 4];
input.read_exact(&mut value_buf)?;
*value = f32::from_le_bytes(value_buf);
}
list.push((doc_id, field_name, Vector::new(values)));
}
}
let mut vectors = Vec::with_capacity(num_vectors);
for list in &inverted_lists {
vectors.extend(list.iter().cloned());
}
let max_id = vectors.iter().map(|(id, _, _)| *id).max().unwrap_or(0);
let next_vec_id = if num_vectors > 0 { max_id + 1 } else { 0 };
Ok(Self {
index_config,
writer_config,
storage: Some(storage),
path: path.to_string(),
centroids,
inverted_lists,
vectors,
is_finalized: true,
total_vectors_to_add: Some(num_vectors),
next_vec_id,
})
}
pub fn with_ivf_params(mut self, n_clusters: usize, n_probe: usize) -> Self {
self.index_config.n_clusters = n_clusters;
self.index_config.n_probe = n_probe;
self
}
pub fn set_expected_vector_count(&mut self, count: usize) {
self.total_vectors_to_add = Some(count);
self.index_config.n_clusters = Self::compute_default_clusters(count);
}
fn compute_default_clusters(n_vectors: usize) -> usize {
let clusters = (n_vectors as f64).sqrt() as usize;
clusters.clamp(10, 10000)
}
fn validate_vectors(&self, vectors: &[(u64, String, Vector)]) -> Result<()> {
if vectors.is_empty() {
return Ok(());
}
for (doc_id, _field_name, vector) in vectors {
if vector.dimension() != self.index_config.dimension {
return Err(LaurusError::InvalidOperation(format!(
"Vector {} has dimension {}, expected {}",
doc_id,
vector.dimension(),
self.index_config.dimension
)));
}
if !vector.is_valid() {
return Err(LaurusError::InvalidOperation(format!(
"Vector {doc_id} contains invalid values (NaN or infinity)"
)));
}
}
Ok(())
}
fn normalize_vectors(&self, vectors: &mut [(u64, String, Vector)]) {
if !self.index_config.normalize_vectors {
return;
}
if self.writer_config.parallel_build && vectors.len() > 100 {
vectors.par_iter_mut().for_each(|(_, _, vector)| {
vector.normalize();
});
} else {
for (_, _, vector) in vectors {
vector.normalize();
}
}
}
fn train_centroids(&mut self) -> Result<()> {
if self.vectors.is_empty() {
return Err(LaurusError::InvalidOperation(
"Cannot train centroids on empty vector set".to_string(),
));
}
if self.vectors.len() < self.index_config.n_clusters {
return Err(LaurusError::InvalidOperation(format!(
"Cannot create {} clusters from {} vectors",
self.index_config.n_clusters,
self.vectors.len() as u64
)));
}
println!(
"Training {} centroids using k-means...",
self.index_config.n_clusters
);
self.init_centroids_kmeans_plus_plus()?;
let max_iterations = 100;
let convergence_threshold = 1e-6;
for iteration in 0..max_iterations {
let old_centroids = self.centroids.clone();
let assignments = self.assign_vectors_to_clusters();
self.update_centroids(&assignments)?;
let convergence = self.compute_convergence(&old_centroids);
if convergence < convergence_threshold {
println!("K-means converged after {} iterations", iteration + 1);
break;
}
}
Ok(())
}
fn init_centroids_kmeans_plus_plus(&mut self) -> Result<()> {
use rand::prelude::*;
let mut rng = rand::rng();
self.centroids.clear();
let first_idx = rng.random_range(0..self.vectors.len());
self.centroids.push(self.vectors[first_idx].2.clone());
for _ in 1..self.index_config.n_clusters {
let mut distances = Vec::with_capacity(self.vectors.len());
let mut total_weight = 0.0;
for (_, _, vector) in &self.vectors {
let min_dist = self
.centroids
.iter()
.map(|centroid| {
self.index_config
.distance_metric
.distance(&vector.data, ¢roid.data)
.unwrap_or(f32::INFINITY)
})
.fold(f32::INFINITY, f32::min);
let weight = min_dist * min_dist;
distances.push(weight);
total_weight += weight;
}
if total_weight == 0.0 {
let idx = rng.random_range(0..self.vectors.len());
self.centroids.push(self.vectors[idx].2.clone());
continue;
}
let target = rng.random::<f32>() * total_weight;
let mut cumsum = 0.0;
let mut selected = false;
for (i, &weight) in distances.iter().enumerate() {
cumsum += weight;
if cumsum >= target {
self.centroids.push(self.vectors[i].2.clone());
selected = true;
break;
}
}
if !selected {
self.centroids
.push(self.vectors[self.vectors.len() - 1].2.clone());
}
}
Ok(())
}
fn assign_vectors_to_clusters(&self) -> Vec<usize> {
if self.writer_config.parallel_build && self.vectors.len() as u64 > 1000 {
self.vectors
.par_iter()
.map(|(_, _, vector)| self.find_nearest_centroid(vector))
.collect()
} else {
self.vectors
.iter()
.map(|(_, _, vector)| self.find_nearest_centroid(vector))
.collect()
}
}
fn find_nearest_centroid(&self, vector: &Vector) -> usize {
let mut best_cluster = 0;
let mut best_distance = f32::INFINITY;
for (i, centroid) in self.centroids.iter().enumerate() {
if let Ok(distance) = self
.index_config
.distance_metric
.distance(&vector.data, ¢roid.data)
&& distance < best_distance
{
best_distance = distance;
best_cluster = i;
}
}
best_cluster
}
fn update_centroids(&mut self, assignments: &[usize]) -> Result<()> {
let mut cluster_sums =
vec![vec![0.0; self.index_config.dimension]; self.index_config.n_clusters];
let mut cluster_counts = vec![0; self.index_config.n_clusters];
for (i, (_, _, vector)) in self.vectors.iter().enumerate() {
let cluster = assignments[i];
cluster_counts[cluster] += 1;
for (j, &value) in vector.data.iter().enumerate() {
cluster_sums[cluster][j] += value;
}
}
for (i, (sum, count)) in cluster_sums.iter().zip(cluster_counts.iter()).enumerate() {
if *count == 0 {
continue;
}
let centroid_data: Vec<f32> = sum.iter().map(|&s| s / *count as f32).collect();
self.centroids[i] = Vector::new(centroid_data);
}
Ok(())
}
fn compute_convergence(&self, old_centroids: &[Vector]) -> f32 {
if old_centroids.len() != self.centroids.len() {
return f32::INFINITY;
}
let mut total_movement = 0.0;
for (old, new) in old_centroids.iter().zip(self.centroids.iter()) {
if let Ok(distance) = self
.index_config
.distance_metric
.distance(&old.data, &new.data)
{
total_movement += distance;
}
}
total_movement / self.centroids.len() as f32
}
fn build_inverted_lists(&mut self) -> Result<()> {
self.inverted_lists = vec![Vec::new(); self.index_config.n_clusters];
for (doc_id, field_name, vector) in &self.vectors {
let cluster = self.find_nearest_centroid(vector);
self.inverted_lists[cluster].push((*doc_id, field_name.clone(), vector.clone()));
}
if self.writer_config.parallel_build {
self.inverted_lists.par_iter_mut().for_each(|list| {
list.sort_by_key(|(doc_id, _, _)| *doc_id);
});
} else {
for list in &mut self.inverted_lists {
list.sort_by_key(|(doc_id, _, _)| *doc_id);
}
}
Ok(())
}
fn check_memory_limit(&self) -> Result<()> {
if let Some(limit) = self.writer_config.memory_limit {
let current_usage = self.estimated_memory_usage();
if current_usage > limit {
return Err(LaurusError::ResourceExhausted(format!(
"Memory usage {current_usage} bytes exceeds limit {limit} bytes"
)));
}
}
Ok(())
}
pub fn vectors(&self) -> &[(u64, String, Vector)] {
&self.vectors
}
pub fn ivf_params(&self) -> (usize, usize) {
(self.index_config.n_clusters, self.index_config.n_probe)
}
pub fn centroids(&self) -> &[Vector] {
&self.centroids
}
pub fn inverted_lists(&self) -> &[Vec<(u64, String, Vector)>] {
&self.inverted_lists
}
}
type SplitClusterResult = (
Vector,
Vec<(u64, String, Vector)>,
Vector,
Vec<(u64, String, Vector)>,
);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStats {
pub cluster_id: usize,
pub count: usize,
pub avg_distance: f32,
}
impl IvfIndexWriter {
pub fn get_cluster_stats(&self) -> Vec<ClusterStats> {
if !self.is_finalized || self.centroids.is_empty() {
return Vec::new();
}
let mut stats = Vec::with_capacity(self.centroids.len());
for (i, (centroid, list)) in self
.centroids
.iter()
.zip(self.inverted_lists.iter())
.enumerate()
{
let count = list.len();
let total_dist: f32 = list
.iter()
.map(|(_, _, vec)| {
self.index_config
.distance_metric
.distance(&vec.data, ¢roid.data)
.unwrap_or(0.0)
})
.sum();
stats.push(ClusterStats {
cluster_id: i,
count,
avg_distance: if count > 0 {
total_dist / count as f32
} else {
0.0
},
});
}
stats
}
pub fn merge_sparse_clusters(&mut self, threshold: usize) -> Result<usize> {
if !self.is_finalized || self.centroids.is_empty() {
return Ok(0);
}
let stats = self.get_cluster_stats();
let sparse_indices: Vec<usize> = stats
.iter()
.filter(|s| s.count < threshold)
.map(|s| s.cluster_id)
.collect();
if sparse_indices.is_empty() || sparse_indices.len() == self.centroids.len() {
return Ok(0);
}
let non_sparse_indices: Vec<usize> = stats
.iter()
.filter(|s| s.count >= threshold)
.map(|s| s.cluster_id)
.collect();
let merged_count = sparse_indices.len();
let mut moves = Vec::new();
for &sparse_idx in &sparse_indices {
let sparse_centroid = &self.centroids[sparse_idx];
let mut best_target = non_sparse_indices[0];
let mut best_dist = f32::INFINITY;
for &target_idx in &non_sparse_indices {
if let Ok(dist) = self
.index_config
.distance_metric
.distance(&sparse_centroid.data, &self.centroids[target_idx].data)
&& dist < best_dist
{
best_dist = dist;
best_target = target_idx;
}
}
moves.push((sparse_idx, best_target));
}
for (sparse_idx, target_idx) in moves {
let mut vectors_to_move = std::mem::take(&mut self.inverted_lists[sparse_idx]);
self.inverted_lists[target_idx].append(&mut vectors_to_move);
}
let mut new_centroids = Vec::new();
let mut new_inverted_lists = Vec::new();
for i in 0..self.centroids.len() {
if !sparse_indices.contains(&i) {
new_centroids.push(self.centroids[i].clone());
new_inverted_lists.push(std::mem::take(&mut self.inverted_lists[i]));
}
}
self.centroids = new_centroids;
self.inverted_lists = new_inverted_lists;
self.index_config.n_clusters = self.centroids.len();
for (i, list) in self.inverted_lists.iter().enumerate() {
if !list.is_empty() {
let dim = self.index_config.dimension;
let mut sum = vec![0.0; dim];
for (_, _, vec) in list {
for (j, &val) in vec.data.iter().enumerate() {
sum[j] += val;
}
}
let new_data: Vec<f32> = sum.iter().map(|&s| s / list.len() as f32).collect();
self.centroids[i] = Vector::new(new_data);
}
}
Ok(merged_count)
}
pub fn split_dense_clusters(&mut self, threshold: usize) -> Result<usize> {
if !self.is_finalized || self.centroids.is_empty() {
return Ok(0);
}
let stats = self.get_cluster_stats();
let dense_indices: Vec<usize> = stats
.iter()
.filter(|s| s.count > threshold)
.map(|s| s.cluster_id)
.collect();
if dense_indices.is_empty() {
return Ok(0);
}
let mut additional_clusters = 0;
let mut new_centroids = Vec::new();
let mut new_inverted_lists = Vec::new();
for i in 0..self.centroids.len() {
if dense_indices.contains(&i) {
let list = std::mem::take(&mut self.inverted_lists[i]);
if list.len() < 2 {
new_centroids.push(self.centroids[i].clone());
new_inverted_lists.push(list);
continue;
}
let (c1, l1, c2, l2) = self.split_cluster_kmeans_k2(list)?;
new_centroids.push(c1);
new_inverted_lists.push(l1);
new_centroids.push(c2);
new_inverted_lists.push(l2);
additional_clusters += 1;
} else {
new_centroids.push(self.centroids[i].clone());
new_inverted_lists.push(std::mem::take(&mut self.inverted_lists[i]));
}
}
self.centroids = new_centroids;
self.inverted_lists = new_inverted_lists;
self.index_config.n_clusters = self.centroids.len();
Ok(additional_clusters)
}
fn split_cluster_kmeans_k2(
&self,
vectors: Vec<(u64, String, Vector)>,
) -> Result<SplitClusterResult> {
use rand::prelude::*;
let mut rng = rand::rng();
let idx1 = rng.random_range(0..vectors.len());
let mut idx2 = rng.random_range(0..vectors.len());
while idx1 == idx2 && vectors.len() > 1 {
idx2 = rng.random_range(0..vectors.len());
}
let mut c1 = vectors[idx1].2.clone();
let mut c2 = vectors[idx2].2.clone();
let mut l1 = Vec::new();
let mut l2 = Vec::new();
for _ in 0..10 {
l1.clear();
l2.clear();
for (_, _, vec) in &vectors {
let d1 = self
.index_config
.distance_metric
.distance(&vec.data, &c1.data)
.unwrap_or(f32::INFINITY);
let d2 = self
.index_config
.distance_metric
.distance(&vec.data, &c2.data)
.unwrap_or(f32::INFINITY);
if d1 < d2 {
l1.push((0, String::new(), vec.clone())); } else {
l2.push((0, String::new(), vec.clone()));
}
}
if !l1.is_empty() {
c1 = self.calculate_mean_vector(&l1);
}
if !l2.is_empty() {
c2 = self.calculate_mean_vector(&l2);
}
}
l1.clear();
l2.clear();
for item in vectors {
let d1 = self
.index_config
.distance_metric
.distance(&item.2.data, &c1.data)
.unwrap_or(f32::INFINITY);
let d2 = self
.index_config
.distance_metric
.distance(&item.2.data, &c2.data)
.unwrap_or(f32::INFINITY);
if d1 < d2 {
l1.push(item);
} else {
l2.push(item);
}
}
Ok((c1, l1, c2, l2))
}
fn calculate_mean_vector(&self, list: &[(u64, String, Vector)]) -> Vector {
let dim = self.index_config.dimension;
if list.is_empty() {
return Vector::new(vec![0.0; dim]);
}
let mut sum = vec![0.0; dim];
for (_, _, vec) in list {
for (j, &val) in vec.data.iter().enumerate() {
sum[j] += val;
}
}
let data: Vec<f32> = sum.iter().map(|&s| s / list.len() as f32).collect();
Vector::new(data)
}
}
#[async_trait::async_trait]
impl VectorIndexWriter for IvfIndexWriter {
fn next_vector_id(&self) -> u64 {
self.next_vec_id
}
fn build(&mut self, mut vectors: Vec<(u64, String, Vector)>) -> Result<()> {
if self.is_finalized {
self.is_finalized = false;
}
self.validate_vectors(&vectors)?;
self.normalize_vectors(&mut vectors);
if let Some(max_id) = vectors.iter().map(|(id, _, _)| *id).max()
&& max_id >= self.next_vec_id
{
self.next_vec_id = max_id + 1;
}
self.vectors = vectors;
self.total_vectors_to_add = Some(self.vectors.len());
if self.vectors.len() < self.index_config.n_clusters {
self.index_config.n_clusters = self.vectors.len().max(1);
}
self.check_memory_limit()?;
Ok(())
}
fn add_vectors(&mut self, mut vectors: Vec<(u64, String, Vector)>) -> Result<()> {
if self.is_finalized {
self.is_finalized = false;
}
self.validate_vectors(&vectors)?;
self.normalize_vectors(&mut vectors);
if let Some(max_id) = vectors.iter().map(|(id, _, _)| *id).max()
&& max_id >= self.next_vec_id
{
self.next_vec_id = max_id + 1;
}
self.vectors.extend(vectors);
self.check_memory_limit()?;
Ok(())
}
fn finalize(&mut self) -> Result<()> {
if self.is_finalized {
return Ok(());
}
if self.vectors.is_empty() {
return Err(LaurusError::InvalidOperation(
"Cannot finalize empty index".to_string(),
));
}
self.train_centroids()?;
self.build_inverted_lists()?;
self.is_finalized = true;
Ok(())
}
fn progress(&self) -> f32 {
if let Some(total) = self.total_vectors_to_add {
if total == 0 {
if self.is_finalized { 1.0 } else { 0.0 }
} else {
let current = self.vectors.len() as u64 as f32;
let progress = current / total as f32;
if self.is_finalized {
1.0
} else {
progress.min(0.99) }
}
} else if self.is_finalized {
1.0
} else {
0.0
}
}
fn estimated_memory_usage(&self) -> usize {
let vector_memory = self.vectors.len()
* (
8 + self.index_config.dimension * 4 + std::mem::size_of::<Vector>()
);
let centroid_memory = self.centroids.len()
* (self.index_config.dimension * 4 + std::mem::size_of::<Vector>());
let inverted_list_memory =
self.inverted_lists.len() * (std::mem::size_of::<Vec<(u64, String, Vector)>>() + 64);
let metadata_memory = self.vectors.len() * 64;
vector_memory + centroid_memory + inverted_list_memory + metadata_memory
}
fn vectors(&self) -> &[(u64, String, Vector)] {
&self.vectors
}
fn write(&self) -> Result<()> {
use std::io::Write;
if !self.is_finalized {
return Err(LaurusError::InvalidOperation(
"Index must be finalized before writing".to_string(),
));
}
let storage = self
.storage
.as_ref()
.ok_or_else(|| LaurusError::InvalidOperation("No storage configured".to_string()))?;
let file_name = format!("{}.ivf", self.path);
let mut output = storage.create_output(&file_name)?;
let vector_count: u32 = self.vectors.len().try_into().map_err(|_| {
LaurusError::InvalidOperation(format!(
"Vector count {} exceeds u32::MAX",
self.vectors.len()
))
})?;
output.write_all(&vector_count.to_le_bytes())?;
output.write_all(&(self.index_config.dimension as u32).to_le_bytes())?;
output.write_all(&(self.index_config.n_clusters as u32).to_le_bytes())?;
output.write_all(&(self.index_config.n_probe as u32).to_le_bytes())?;
for centroid in &self.centroids {
for value in ¢roid.data {
output.write_all(&value.to_le_bytes())?;
}
}
for list in &self.inverted_lists {
output.write_all(&(list.len() as u32).to_le_bytes())?;
for (doc_id, field_name, vector) in list {
output.write_all(&doc_id.to_le_bytes())?;
let field_name_bytes = field_name.as_bytes();
output.write_all(&(field_name_bytes.len() as u32).to_le_bytes())?;
output.write_all(field_name_bytes)?;
for value in &vector.data {
output.write_all(&value.to_le_bytes())?;
}
}
}
output.flush()?;
Ok(())
}
fn has_storage(&self) -> bool {
self.storage.is_some()
}
fn delete_document(&mut self, doc_id: u64) -> Result<()> {
if self.is_finalized {
self.is_finalized = false;
}
self.vectors.retain(|(id, _, _)| *id != doc_id);
Ok(())
}
fn delete_documents(&mut self, _field: &str, _value: &str) -> Result<usize> {
if self.is_finalized {
return Err(LaurusError::InvalidOperation(
"Cannot delete documents from finalized index".to_string(),
));
}
Ok(0)
}
fn rollback(&mut self) -> Result<()> {
self.vectors.clear();
self.is_finalized = false;
self.next_vec_id = 0;
Ok(())
}
fn pending_docs(&self) -> u64 {
if self.is_finalized {
0
} else {
self.vectors.len() as u64
}
}
fn close(&mut self) -> Result<()> {
self.vectors.clear();
self.is_finalized = true;
Ok(())
}
fn is_closed(&self) -> bool {
self.is_finalized && self.vectors.is_empty()
}
fn optimize(&mut self) -> Result<()> {
if !self.is_finalized {
return Err(LaurusError::InvalidOperation(
"Index must be finalized before optimization".to_string(),
));
}
println!("Optimizing IVF index...");
let total_vectors = self.vectors.len();
let avg_vectors_per_cluster = total_vectors / self.index_config.n_clusters.max(1);
let sparse_threshold = avg_vectors_per_cluster / 4;
let dense_threshold = avg_vectors_per_cluster * 4;
let merged = self.merge_sparse_clusters(sparse_threshold.max(2))?;
if merged > 0 {
println!("Merged {} sparse clusters", merged);
}
let split = self.split_dense_clusters(dense_threshold)?;
if split > 0 {
println!("Split {} dense clusters", split);
}
self.vectors.shrink_to_fit();
self.centroids.shrink_to_fit();
for list in &mut self.inverted_lists {
list.shrink_to_fit();
}
Ok(())
}
fn build_reader(&self) -> Result<Arc<dyn crate::vector::reader::VectorIndexReader>> {
use crate::vector::index::ivf::reader::IvfIndexReader;
let storage = self.storage.as_ref().ok_or_else(|| {
LaurusError::InvalidOperation("Cannot build reader: storage not configured".to_string())
})?;
let reader = IvfIndexReader::load(
storage.as_ref(),
&self.path,
self.index_config.distance_metric,
)?;
Ok(Arc::new(reader))
}
}