use crate::error::Result;
use crate::types::RowId;
use crate::distance::DistanceKind;
use std::path::PathBuf;
use std::collections::HashMap;
use super::{
FreshVamanaGraph, FreshGraphConfig,
VamanaSSTFile, Candidate, VectorNode,
MultiLevelSearch, CompactionStrategy, CompactionTrigger,
};
#[derive(Debug, Clone)]
pub struct FreshDiskANNConfig {
pub fresh_config: FreshGraphConfig,
pub compaction_trigger: CompactionTrigger,
pub data_dir: PathBuf,
}
impl Default for FreshDiskANNConfig {
fn default() -> Self {
Self {
fresh_config: FreshGraphConfig::default(),
compaction_trigger: CompactionTrigger::default(),
data_dir: PathBuf::from("."),
}
}
}
pub struct FreshDiskANNIndex {
fresh_graph: FreshVamanaGraph,
level1_ssts: Vec<VamanaSSTFile>,
merged_index: Option<VamanaSSTFile>,
config: FreshDiskANNConfig,
metric: DistanceKind,
multi_level: MultiLevelSearch,
compaction: CompactionStrategy,
}
impl FreshDiskANNIndex {
pub fn create(config: FreshDiskANNConfig, metric: DistanceKind) -> Result<Self> {
let fresh_graph = FreshVamanaGraph::new(config.fresh_config.clone(), metric);
let multi_level = MultiLevelSearch::new();
let compaction = CompactionStrategy::new(config.compaction_trigger.clone());
Ok(Self {
fresh_graph,
level1_ssts: Vec::new(),
merged_index: None,
config,
metric,
multi_level,
compaction,
})
}
pub fn insert(&mut self, id: RowId, vector: Vec<f32>) -> Result<()> {
self.fresh_graph.insert(id, vector)?;
if self.fresh_graph.should_flush() {
self.flush_fresh_graph()?;
}
Ok(())
}
pub fn batch_insert(&mut self, vectors: &[(RowId, Vec<f32>)]) -> Result<()> {
if vectors.is_empty() {
return Ok(());
}
self.fresh_graph.batch_insert(vectors)?;
if self.fresh_graph.should_flush() {
self.flush_fresh_graph()?;
}
Ok(())
}
pub fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Candidate>> {
let ef = if k <= 10 {
ef.max(k * 3).max(80)
} else if k <= 30 {
ef.max(k * 3).max(100)
} else {
ef.max(k * 3).max(120)
};
let total_vectors = self.fresh_graph.node_count()
+ self.level1_ssts.iter().map(|s| s.active_node_count()).sum::<usize>()
+ self.merged_index.as_ref().map(|m| m.active_node_count()).unwrap_or(0);
let expanded_k = if total_vectors < 1_000 {
(k * 3).max(k + 30).min(200)
} else if total_vectors < 100_000 {
(k * 5).max(k + 50).min(500)
} else {
(k * 10).max(k + 100).min(2000)
};
let fresh_results = if !self.fresh_graph.is_empty() {
self.fresh_graph.search(query, expanded_k, ef)?
} else {
Vec::new()
};
let mut all_l1_results = Vec::new();
for sst in &self.level1_ssts {
let sst_results = sst.search(query, expanded_k, ef)?;
all_l1_results.extend(sst_results);
}
let l2_results = if let Some(ref merged) = self.merged_index {
merged.search(query, expanded_k, ef)?
} else {
Vec::new()
};
let merged = self.multi_level.merge(
fresh_results,
all_l1_results,
l2_results,
k,
)?;
Ok(merged)
}
fn flush_fresh_graph(&mut self) -> Result<()> {
println!("[FreshDiskANN] Flushing Fresh Graph to L1 SST...");
let nodes = self.fresh_graph.export_nodes()?;
let medoid = self.fresh_graph.medoid();
if nodes.is_empty() {
println!("[FreshDiskANN] Fresh Graph is empty, skip flush");
return Ok(());
}
let l1_index = self.level1_ssts.len();
let sst_path = self.config.data_dir.join(format!("l1_{:06}.sst", l1_index));
let sst_file = VamanaSSTFile::create(&sst_path, nodes, medoid)?;
println!(
"[FreshDiskANN] Created L1 SST: {:?}, {} nodes",
sst_path,
sst_file.metadata().node_count
);
self.level1_ssts.push(sst_file);
self.fresh_graph.clear()?;
if self.compaction.should_compact(&self.level1_ssts) {
self.compact_l1_to_l2()?;
}
Ok(())
}
fn compact_l1_to_l2(&mut self) -> Result<()> {
println!("[FreshDiskANN] Compacting {} L1 SSTs to L2...", self.level1_ssts.len());
if self.level1_ssts.is_empty() {
return Ok(());
}
let mut all_nodes = Vec::new();
let mut total_before = 0usize;
let mut active_after = 0usize;
for sst in &self.level1_ssts {
let node_count = sst.metadata().node_count as usize;
total_before += node_count;
let nodes = sst.export_active_nodes()?;
active_after += nodes.len();
all_nodes.extend(nodes);
}
if let Some(ref l2_sst) = self.merged_index {
let l2_nodes = l2_sst.export_active_nodes()?;
total_before += l2_sst.metadata().node_count as usize;
active_after += l2_nodes.len();
all_nodes.extend(l2_nodes);
}
println!(
"[FreshDiskANN] Compaction: collected {} active nodes (removed {} tombstones)",
active_after,
total_before - active_after
);
if all_nodes.is_empty() {
println!("[FreshDiskANN] No active nodes after compaction, skip");
return Ok(());
}
use std::collections::HashMap;
let mut dedup_map: HashMap<RowId, VectorNode> = HashMap::new();
for (row_id, node) in all_nodes {
dedup_map.insert(row_id, node);
}
let merged_nodes: Vec<_> = dedup_map.into_iter().collect();
println!("[FreshDiskANN] After dedup: {} unique nodes", merged_nodes.len());
let num_anchors = 8.min(merged_nodes.len()); let anchor_points = self.select_anchor_points(&merged_nodes, num_anchors)?;
let primary_medoid = anchor_points[0];
println!("[FreshDiskANN] Selected {} anchor points, primary medoid: {}",
anchor_points.len(), primary_medoid);
println!(" Anchor points: {:?}", anchor_points);
println!("[FreshDiskANN] Phase 9: Rebuilding graph with multi-anchor Vamana (conservative)...");
let rebuild_start = std::time::Instant::now();
let rebuilt_nodes = self.rebuild_graph_with_medoid_fallback(merged_nodes, primary_medoid, &anchor_points)?;
println!(
"[FreshDiskANN] Graph rebuild completed in {:.2?}",
rebuild_start.elapsed()
);
let l2_path = self.config.data_dir.join("l2_merged.sst");
if l2_path.exists() {
std::fs::remove_file(&l2_path)?;
}
let new_l2 = VamanaSSTFile::create(&l2_path, rebuilt_nodes, primary_medoid)?;
println!(
"[FreshDiskANN] L2 merged index created: {:?}, {} nodes",
l2_path,
new_l2.metadata().node_count
);
for sst in self.level1_ssts.drain(..) {
let path = sst.path().to_path_buf();
drop(sst);
if let Err(e) = std::fs::remove_file(&path) {
eprintln!("[FreshDiskANN] Failed to remove L1 SST {:?}: {}", path, e);
}
}
self.merged_index = Some(new_l2);
println!("[FreshDiskANN] Compaction complete");
Ok(())
}
fn rebuild_graph_with_medoid_fallback(&self, nodes: Vec<(RowId, VectorNode)>, medoid: RowId, anchor_points: &[RowId]) -> Result<Vec<(RowId, VectorNode)>> {
let max_degree = self.config.fresh_config.max_degree;
let search_list_size = self.config.fresh_config.search_list_size;
let node_count = nodes.len();
let phase = if max_degree >= 100 && search_list_size >= 500 {
"Phase 10"
} else {
"Phase 9"
};
println!("[FreshDiskANN] {}: Graph rebuild (max_degree={}, search_list={})",
phase, max_degree, search_list_size);
use std::collections::HashMap;
let mut id_to_idx: HashMap<RowId, usize> = HashMap::new();
for (idx, (id, _)) in nodes.iter().enumerate() {
id_to_idx.insert(*id, idx);
}
let _medoid_idx = *id_to_idx.get(&medoid).ok_or_else(|| {
crate::error::StorageError::InvalidData("Medoid not found".into())
})?;
let anchor_indices: Vec<usize> = anchor_points.iter()
.filter_map(|&id| id_to_idx.get(&id).copied())
.collect();
let mut graph: Vec<Vec<RowId>> = vec![Vec::new(); node_count];
println!("[FreshDiskANN] Phase 9: Bootstrap - Random initialization...");
let bootstrap_neighbors = 20;
use rand::Rng;
let mut rng = rand::thread_rng();
for i in 0..node_count {
let mut random_neighbors = Vec::new();
for _ in 0..bootstrap_neighbors.min(node_count - 1) {
let random_idx = rng.gen_range(0..node_count);
if random_idx != i {
random_neighbors.push(nodes[random_idx].0);
}
}
random_neighbors.sort();
random_neighbors.dedup();
graph[i] = random_neighbors;
}
let bootstrap_avg_degree: f32 = graph.iter().map(|g| g.len()).sum::<usize>() as f32 / node_count as f32;
println!(" Bootstrap complete: avg degree = {:.2}", bootstrap_avg_degree);
println!("[FreshDiskANN] Round 1: Building base graph with top-k...");
for (i, (id, node)) in nodes.iter().enumerate() {
if (i + 1) % 1000 == 0 {
println!(" Round 1: {} / {} nodes...", i + 1, node_count);
}
let candidates = self.multi_anchor_greedy_search(
&node.vector,
&nodes,
&graph,
&id_to_idx,
search_list_size,
*id,
&anchor_indices,
)?;
let mut candidates_sorted = candidates;
candidates_sorted.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let pruned_neighbors: Vec<RowId> = candidates_sorted
.into_iter()
.take(max_degree)
.map(|(id, _)| id)
.collect();
graph[i] = pruned_neighbors;
}
let avg_degree_round1: f32 = graph.iter().map(|g| g.len()).sum::<usize>() as f32 / node_count as f32;
println!(" Round 1 complete: avg degree = {:.2}", avg_degree_round1);
if max_degree >= 100 && search_list_size >= 500 {
println!("[FreshDiskANN] Round 2: RobustPrune optimization...");
for (i, (id, node)) in nodes.iter().enumerate() {
if (i + 1) % 1000 == 0 {
println!(" Round 2: {} / {} nodes...", i + 1, node_count);
}
let candidates = self.multi_anchor_greedy_search(
&node.vector,
&nodes,
&graph,
&id_to_idx,
search_list_size,
*id,
&anchor_indices,
)?;
let pruned_neighbors = self.robust_prune(
&node.vector,
candidates,
max_degree,
1.2,
&nodes,
&id_to_idx,
)?;
graph[i] = pruned_neighbors.clone();
for &neighbor_id in &pruned_neighbors {
if let Some(&neighbor_idx) = id_to_idx.get(&neighbor_id) {
if graph[neighbor_idx].len() < max_degree && !graph[neighbor_idx].contains(id) {
graph[neighbor_idx].push(*id);
} else if graph[neighbor_idx].len() >= max_degree {
let neighbor_node = &nodes[neighbor_idx];
let mut new_candidates: Vec<(RowId, f32)> = graph[neighbor_idx]
.iter()
.map(|&nid| {
let nidx = id_to_idx[&nid];
let dist = self.compute_distance(&neighbor_node.1.vector, &nodes[nidx].1.vector);
(nid, dist)
})
.collect();
let dist_to_current = self.compute_distance(&neighbor_node.1.vector, &node.vector);
new_candidates.push((*id, dist_to_current));
let updated_neighbors = self.robust_prune(
&neighbor_node.1.vector,
new_candidates,
max_degree,
1.2,
&nodes,
&id_to_idx,
)?;
graph[neighbor_idx] = updated_neighbors;
}
}
}
}
let avg_degree_round2: f32 = graph.iter().map(|g| g.len()).sum::<usize>() as f32 / node_count as f32;
println!(" Round 2 complete: avg degree = {:.2}", avg_degree_round2);
}
let empty_neighbors = graph.iter().filter(|g| g.is_empty()).count();
let avg_degree_final: f32 = graph.iter().map(|g| g.len()).sum::<usize>() as f32 / node_count as f32;
println!(" Final graph: avg degree = {:.2}, empty = {}", avg_degree_final, empty_neighbors);
let mut rebuilt_nodes = Vec::with_capacity(node_count);
for (i, (id, node)) in nodes.into_iter().enumerate() {
let mut updated_node = node;
updated_node.neighbors = graph[i].clone();
rebuilt_nodes.push((id, updated_node));
}
println!("[FreshDiskANN] Graph rebuild complete!");
Ok(rebuilt_nodes)
}
#[allow(dead_code)]
fn rebuild_graph_with_anchors(&self, nodes: Vec<(RowId, VectorNode)>, anchor_points: Vec<RowId>) -> Result<Vec<(RowId, VectorNode)>> {
let max_degree = 64; let node_count = nodes.len();
println!(
"[FreshDiskANN] Phase 9: Rebuilding graph with multi-anchor Vamana...",
);
println!(
" Nodes: {}, Max Degree: {} (increased from 32), Anchors: {}",
node_count, max_degree, anchor_points.len()
);
use std::collections::HashMap;
let mut id_to_idx: HashMap<RowId, usize> = HashMap::new();
for (idx, (id, _)) in nodes.iter().enumerate() {
id_to_idx.insert(*id, idx);
}
let anchor_indices: Vec<usize> = anchor_points.iter()
.filter_map(|&id| id_to_idx.get(&id).copied())
.collect();
if anchor_indices.is_empty() {
return Err(crate::error::StorageError::InvalidData("No valid anchor points".into()));
}
println!(" Using {} anchor indices: {:?}", anchor_indices.len(), &anchor_indices[..anchor_indices.len().min(5)]);
let mut graph: Vec<Vec<RowId>> = vec![Vec::new(); node_count];
println!("[FreshDiskANN] Phase 9: Bootstrap - Random initialization...");
let bootstrap_neighbors = 20;
use rand::Rng;
let mut rng = rand::thread_rng();
for i in 0..node_count {
let mut random_neighbors = Vec::new();
for _ in 0..bootstrap_neighbors.min(node_count - 1) {
let random_idx = rng.gen_range(0..node_count);
if random_idx != i {
random_neighbors.push(nodes[random_idx].0);
}
}
random_neighbors.sort();
random_neighbors.dedup();
graph[i] = random_neighbors;
}
let bootstrap_avg_degree: f32 = graph.iter().map(|g| g.len()).sum::<usize>() as f32 / node_count as f32;
println!(" Bootstrap complete: avg degree = {:.2}", bootstrap_avg_degree);
println!("[FreshDiskANN] Phase 9: Round 1 - Multi-anchor Vamana optimization...");
let search_list_size = (max_degree as f32 * 2.0) as usize;
for (i, (id, node)) in nodes.iter().enumerate() {
if (i + 1) % 1000 == 0 {
println!(" Round 1: {} / {} nodes...", i + 1, node_count);
}
let candidates = self.multi_anchor_greedy_search(
&node.vector,
&nodes,
&graph,
&id_to_idx,
search_list_size,
*id,
&anchor_indices,
)?;
let candidates_count2 = candidates.len();
let pruned_neighbors = self.robust_prune(
&node.vector,
candidates,
max_degree,
2.5, &nodes,
&id_to_idx,
)?;
if i < 5 {
println!(" Node {} candidates: {}, pruned: {}", i, candidates_count2, pruned_neighbors.len());
}
graph[i] = pruned_neighbors.clone();
for &neighbor_id in &pruned_neighbors {
if let Some(&neighbor_idx) = id_to_idx.get(&neighbor_id) {
if graph[neighbor_idx].len() < max_degree && !graph[neighbor_idx].contains(id) {
graph[neighbor_idx].push(*id);
} else if graph[neighbor_idx].len() >= max_degree {
let neighbor_node = &nodes[neighbor_idx];
let mut new_candidates: Vec<(RowId, f32)> = graph[neighbor_idx]
.iter()
.map(|&nid| {
let nidx = id_to_idx[&nid];
let dist = self.compute_distance(&neighbor_node.1.vector, &nodes[nidx].1.vector);
(nid, dist)
})
.collect();
let dist_to_current = self.compute_distance(&neighbor_node.1.vector, &node.vector);
new_candidates.push((*id, dist_to_current));
let updated_neighbors = self.robust_prune(
&neighbor_node.1.vector,
new_candidates,
max_degree,
1.2,
&nodes,
&id_to_idx,
)?;
graph[neighbor_idx] = updated_neighbors;
}
}
}
}
println!("[FreshDiskANN] Round 1 complete!");
let mut rebuilt_nodes = Vec::with_capacity(node_count);
for (i, (id, node)) in nodes.into_iter().enumerate() {
let mut updated_node = node;
updated_node.neighbors = graph[i].clone();
rebuilt_nodes.push((id, updated_node));
}
println!("[FreshDiskANN] Vamana graph rebuild complete!");
Ok(rebuilt_nodes)
}
fn multi_anchor_greedy_search(
&self,
query: &[f32],
nodes: &[(RowId, VectorNode)],
graph: &[Vec<RowId>],
id_to_idx: &HashMap<RowId, usize>,
search_list_size: usize,
exclude_id: RowId,
anchor_indices: &[usize],
) -> Result<Vec<(RowId, f32)>> {
use std::collections::{BinaryHeap, HashSet};
use std::cmp::Ordering;
#[derive(Clone)]
struct SearchCandidate {
id: RowId,
distance: f32,
}
impl PartialEq for SearchCandidate {
fn eq(&self, other: &Self) -> bool {
self.distance == other.distance
}
}
impl Eq for SearchCandidate {}
impl PartialOrd for SearchCandidate {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.distance.partial_cmp(&self.distance)
}
}
impl Ord for SearchCandidate {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap_or(Ordering::Equal)
}
}
let mut visited = HashSet::new();
let mut candidates = BinaryHeap::new();
for &anchor_idx in anchor_indices {
let anchor_id = nodes[anchor_idx].0;
let dist = self.compute_distance(query, &nodes[anchor_idx].1.vector);
candidates.push(SearchCandidate {
id: anchor_id,
distance: dist,
});
visited.insert(anchor_id);
}
let mut result = Vec::new();
while let Some(current) = candidates.pop() {
if current.id != exclude_id {
result.push((current.id, current.distance));
}
if result.len() >= search_list_size {
break;
}
if let Some(¤t_idx) = id_to_idx.get(¤t.id) {
for &neighbor_id in &graph[current_idx] {
if visited.contains(&neighbor_id) {
continue;
}
visited.insert(neighbor_id);
if let Some(&neighbor_idx) = id_to_idx.get(&neighbor_id) {
let dist = self.compute_distance(query, &nodes[neighbor_idx].1.vector);
candidates.push(SearchCandidate {
id: neighbor_id,
distance: dist,
});
}
}
}
}
if result.len() < search_list_size {
let max_brute_force = (search_list_size * 2).min(nodes.len());
let step = if nodes.len() > max_brute_force {
nodes.len() / max_brute_force
} else {
1
};
for (_idx, (id, node)) in nodes.iter().enumerate().step_by(step) {
if *id == exclude_id || visited.contains(id) {
continue;
}
let dist = self.compute_distance(query, &node.vector);
result.push((*id, dist));
visited.insert(*id);
if result.len() >= max_brute_force {
break;
}
}
}
result.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
result.truncate(search_list_size);
Ok(result)
}
#[allow(dead_code)]
fn vamana_greedy_search(
&self,
query: &[f32],
nodes: &[(RowId, VectorNode)],
graph: &[Vec<RowId>],
id_to_idx: &HashMap<RowId, usize>,
search_list_size: usize,
exclude_id: RowId,
medoid_idx: usize,
) -> Result<Vec<(RowId, f32)>> {
use std::collections::BinaryHeap;
use std::cmp::Ordering;
#[derive(Clone)]
struct SearchCandidate {
id: RowId,
distance: f32,
}
impl PartialEq for SearchCandidate {
fn eq(&self, other: &Self) -> bool {
self.distance == other.distance
}
}
impl Eq for SearchCandidate {}
impl PartialOrd for SearchCandidate {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.distance.partial_cmp(&self.distance)
}
}
impl Ord for SearchCandidate {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap_or(Ordering::Equal)
}
}
let start_id = nodes[medoid_idx].0;
let start_dist = self.compute_distance(query, &nodes[medoid_idx].1.vector);
let mut visited = std::collections::HashSet::new();
let mut candidates = BinaryHeap::new();
candidates.push(SearchCandidate {
id: start_id,
distance: start_dist,
});
visited.insert(start_id);
let mut result = Vec::new();
while let Some(current) = candidates.pop() {
if current.id != exclude_id {
result.push((current.id, current.distance));
}
if result.len() >= search_list_size {
break;
}
if let Some(¤t_idx) = id_to_idx.get(¤t.id) {
for &neighbor_id in &graph[current_idx] {
if visited.contains(&neighbor_id) {
continue;
}
visited.insert(neighbor_id);
if let Some(&neighbor_idx) = id_to_idx.get(&neighbor_id) {
let dist = self.compute_distance(query, &nodes[neighbor_idx].1.vector);
candidates.push(SearchCandidate {
id: neighbor_id,
distance: dist,
});
}
}
}
}
if result.len() < search_list_size {
for (id, node) in nodes.iter() {
if *id == exclude_id || visited.contains(id) {
continue;
}
let dist = self.compute_distance(query, &node.vector);
result.push((*id, dist));
if result.len() >= search_list_size {
break;
}
}
}
result.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
result.truncate(search_list_size);
Ok(result)
}
fn robust_prune(
&self,
_query: &[f32],
mut candidates: Vec<(RowId, f32)>,
max_degree: usize,
alpha: f32, nodes: &[(RowId, VectorNode)],
id_to_idx: &HashMap<RowId, usize>,
) -> Result<Vec<RowId>> {
candidates.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let mut result = Vec::new();
for (candidate_id, candidate_dist) in candidates {
if result.len() >= max_degree {
break;
}
let mut should_add = true;
if let Some(&candidate_idx) = id_to_idx.get(&candidate_id) {
let candidate_vec = &nodes[candidate_idx].1.vector;
for &existing_id in &result {
if let Some(&existing_idx) = id_to_idx.get(&existing_id) {
let existing_vec = &nodes[existing_idx].1.vector;
let dist_to_existing = self.compute_distance(candidate_vec, existing_vec);
if dist_to_existing < alpha * candidate_dist {
should_add = false;
break;
}
}
}
if should_add {
result.push(candidate_id);
}
}
}
Ok(result)
}
#[allow(dead_code)]
fn rebuild_graph_phase10(&self, nodes: Vec<(RowId, VectorNode)>, _medoid: RowId, anchor_points: &[RowId]) -> Result<Vec<(RowId, VectorNode)>> {
let max_degree = 128;
let search_list_size = 1000; let node_count = nodes.len();
println!("[FreshDiskANN] Phase 10: Two-round graph optimization...");
println!(" Nodes: {}, Max Degree: 128, Search List: 1000, Anchors: {}",
node_count, anchor_points.len());
use std::collections::HashMap;
let mut id_to_idx: HashMap<RowId, usize> = HashMap::new();
for (idx, (id, _)) in nodes.iter().enumerate() {
id_to_idx.insert(*id, idx);
}
let anchor_indices: Vec<usize> = anchor_points.iter()
.filter_map(|&id| id_to_idx.get(&id).copied())
.collect();
let mut graph: Vec<Vec<RowId>> = vec![Vec::new(); node_count];
println!("[FreshDiskANN] Phase 10: Bootstrap initialization...");
let bootstrap_neighbors = 30;
use rand::Rng;
let mut rng = rand::thread_rng();
for i in 0..node_count {
let mut random_neighbors = Vec::new();
for _ in 0..bootstrap_neighbors.min(node_count - 1) {
let random_idx = rng.gen_range(0..node_count);
if random_idx != i {
random_neighbors.push(nodes[random_idx].0);
}
}
random_neighbors.sort();
random_neighbors.dedup();
graph[i] = random_neighbors;
}
let bootstrap_avg_degree: f32 = graph.iter().map(|g| g.len()).sum::<usize>() as f32 / node_count as f32;
println!(" Bootstrap complete: avg degree = {:.2}", bootstrap_avg_degree);
println!("[FreshDiskANN] Phase 10 Round 1: Building base graph...");
for (i, (id, node)) in nodes.iter().enumerate() {
if (i + 1) % 1000 == 0 {
println!(" Round 1: {} / {} nodes...", i + 1, node_count);
}
let candidates = self.multi_anchor_greedy_search(
&node.vector,
&nodes,
&graph,
&id_to_idx,
search_list_size,
*id,
&anchor_indices,
)?;
let mut candidates_sorted = candidates;
candidates_sorted.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let pruned_neighbors: Vec<RowId> = candidates_sorted
.into_iter()
.take(max_degree)
.map(|(id, _)| id)
.collect();
graph[i] = pruned_neighbors;
}
let avg_degree_round1: f32 = graph.iter().map(|g| g.len()).sum::<usize>() as f32 / node_count as f32;
println!(" Round 1 complete: avg degree = {:.2}", avg_degree_round1);
println!("[FreshDiskANN] Phase 10 Round 2: RobustPrune optimization...");
for (i, (id, node)) in nodes.iter().enumerate() {
if (i + 1) % 1000 == 0 {
println!(" Round 2: {} / {} nodes...", i + 1, node_count);
}
let candidates = self.multi_anchor_greedy_search(
&node.vector,
&nodes,
&graph,
&id_to_idx,
search_list_size,
*id,
&anchor_indices,
)?;
let pruned_neighbors = self.robust_prune(
&node.vector,
candidates,
max_degree,
1.2,
&nodes,
&id_to_idx,
)?;
graph[i] = pruned_neighbors;
}
let avg_degree_final: f32 = graph.iter().map(|g| g.len()).sum::<usize>() as f32 / node_count as f32;
let empty_neighbors = graph.iter().filter(|g| g.is_empty()).count();
println!(" Round 2 complete: avg degree = {:.2}, empty = {}", avg_degree_final, empty_neighbors);
let mut rebuilt_nodes = Vec::with_capacity(node_count);
for (i, (id, node)) in nodes.into_iter().enumerate() {
let mut updated_node = node;
updated_node.neighbors = graph[i].clone();
rebuilt_nodes.push((id, updated_node));
}
println!("[FreshDiskANN] Phase 10 graph rebuild complete!");
Ok(rebuilt_nodes)
}
fn select_anchor_points(&self, nodes: &[(RowId, VectorNode)], k: usize) -> Result<Vec<RowId>> {
if nodes.is_empty() {
return Err(crate::error::StorageError::InvalidData("No nodes to select anchors".into()));
}
let k = k.min(nodes.len());
println!("[FreshDiskANN] Phase 9: Selecting {} anchor points using K-Means...", k);
let dim = nodes[0].1.vector.len();
use rand::seq::SliceRandom;
let mut rng = rand::thread_rng();
let mut centers: Vec<Vec<f32>> = nodes
.choose_multiple(&mut rng, k)
.map(|(_, node)| node.vector.clone())
.collect();
let max_iterations = 10;
for iter in 0..max_iterations {
let mut clusters: Vec<Vec<usize>> = vec![Vec::new(); k];
for (idx, (_, node)) in nodes.iter().enumerate() {
let mut min_dist = f32::MAX;
let mut best_cluster = 0;
for (c_idx, center) in centers.iter().enumerate() {
let dist = self.compute_distance(&node.vector, center);
if dist < min_dist {
min_dist = dist;
best_cluster = c_idx;
}
}
clusters[best_cluster].push(idx);
}
let mut converged = true;
for (c_idx, cluster) in clusters.iter().enumerate() {
if cluster.is_empty() {
continue;
}
let mut new_center = vec![0.0f32; dim];
for &node_idx in cluster {
for (d, val) in nodes[node_idx].1.vector.iter().enumerate() {
new_center[d] += val;
}
}
for d in 0..dim {
new_center[d] /= cluster.len() as f32;
}
let shift = self.compute_distance(¢ers[c_idx], &new_center);
if shift > 0.01 {
converged = false;
}
centers[c_idx] = new_center;
}
println!(" K-Means iteration {}: {} non-empty clusters", iter + 1,
clusters.iter().filter(|c| !c.is_empty()).count());
if converged {
println!(" Converged after {} iterations", iter + 1);
break;
}
}
let mut anchor_points = Vec::new();
for center in centers.iter() {
let mut min_dist = f32::MAX;
let mut best_id = nodes[0].0;
for (id, node) in nodes.iter() {
let dist = self.compute_distance(&node.vector, center);
if dist < min_dist {
min_dist = dist;
best_id = *id;
}
}
anchor_points.push(best_id);
}
println!("[FreshDiskANN] Selected {} anchor points: {:?}", anchor_points.len(), anchor_points);
Ok(anchor_points)
}
#[allow(dead_code)]
fn select_medoid(&self, nodes: &[(RowId, VectorNode)]) -> Result<RowId> {
let anchors = self.select_anchor_points(nodes, 1)?;
Ok(anchors[0])
}
fn compute_distance(&self, v1: &[f32], v2: &[f32]) -> f32 {
self.metric.distance(v1, v2)
}
pub fn flush(&mut self) -> Result<()> {
if !self.fresh_graph.is_empty() {
self.flush_fresh_graph()?;
}
Ok(())
}
pub fn stats(&self) -> FreshDiskANNStats {
FreshDiskANNStats {
fresh_count: self.fresh_graph.node_count(),
fresh_memory: self.fresh_graph.memory_usage(),
l1_sst_count: self.level1_ssts.len(),
l1_total_nodes: self.level1_ssts.iter()
.map(|sst| sst.metadata().node_count)
.sum(),
l2_nodes: self.merged_index.as_ref()
.map(|sst| sst.metadata().node_count)
.unwrap_or(0),
}
}
}
#[derive(Debug)]
pub struct FreshDiskANNStats {
pub fresh_count: usize,
pub fresh_memory: usize,
pub l1_sst_count: usize,
pub l1_total_nodes: u64,
pub l2_nodes: u64,
}
use crate::index::builder::{IndexBuilder, BuildStats};
use crate::types::{Row, Value};
impl IndexBuilder for FreshDiskANNIndex {
fn build_from_memtable(&mut self, rows: &[(RowId, Row)]) -> Result<()> {
use std::time::Instant;
let start = Instant::now();
let mut vectors: Vec<(RowId, Vec<f32>)> = Vec::with_capacity(rows.len());
for (row_id, row) in rows {
for value in row.iter() {
if let Value::Vector(vec) = value {
vectors.push((*row_id, vec.to_vec()));
break; }
}
}
if vectors.is_empty() {
return Ok(());
}
println!("[FreshDiskANN] Batch building {} vectors", vectors.len());
if vectors.len() >= 1000 {
println!("[FreshDiskANN] Using batch Vamana build (>= 1000 vectors)");
self.batch_vamana_build(&vectors)?;
} else {
println!("[FreshDiskANN] Using incremental insert (< 1000 vectors)");
for (row_id, vec) in vectors {
self.fresh_graph.insert(row_id, vec)?;
}
if self.fresh_graph.should_flush() {
self.flush_fresh_graph()?;
}
}
let duration = start.elapsed();
println!("[FreshDiskANN] Batch build complete in {:?}", duration);
Ok(())
}
fn persist(&mut self) -> Result<()> {
use std::time::Instant;
let start = Instant::now();
if !self.fresh_graph.is_empty() {
self.flush_fresh_graph()?;
}
let duration = start.elapsed();
println!("[FreshDiskANN] Persist complete in {:?}", duration);
Ok(())
}
fn name(&self) -> &str {
"FreshDiskANN"
}
fn stats(&self) -> BuildStats {
let stats = self.stats();
BuildStats {
rows_processed: stats.fresh_count + stats.l1_total_nodes as usize,
build_time_ms: 0, persist_time_ms: 0,
index_size_bytes: stats.fresh_memory,
}
}
}
impl FreshDiskANNIndex {
fn batch_vamana_build(&mut self, vectors: &[(RowId, Vec<f32>)]) -> Result<()> {
use std::time::Instant;
let start = Instant::now();
let mut graph_data: HashMap<RowId, VectorNode> = HashMap::new();
for (row_id, vec) in vectors {
graph_data.insert(*row_id, VectorNode::new(vec.clone()));
}
let medoid = self.select_medoid_from_batch(vectors)?;
println!("[FreshDiskANN] Selected medoid: {}", medoid);
let r = self.config.fresh_config.max_degree;
let _alpha = 1.2;
for (row_id, query_vec) in vectors {
if *row_id == medoid {
continue; }
let mut candidates: Vec<(RowId, f32)> = Vec::new();
for (other_id, other_vec) in vectors {
if other_id != row_id {
let dist = self.metric.distance(query_vec, other_vec);
candidates.push((*other_id, dist));
}
}
candidates.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
candidates.truncate(r);
if let Some(node) = graph_data.get_mut(row_id) {
node.neighbors = candidates.into_iter().map(|(id, _)| id).collect();
}
}
for (row_id, vec) in vectors {
self.fresh_graph.insert(*row_id, vec.clone())?;
}
let duration = start.elapsed();
println!("[FreshDiskANN] Batch Vamana build: {} vectors in {:?}",
vectors.len(), duration);
Ok(())
}
fn select_medoid_from_batch(&self, vectors: &[(RowId, Vec<f32>)]) -> Result<RowId> {
if vectors.is_empty() {
return Err(crate::StorageError::InvalidData("Empty vector batch".into()));
}
let dim = vectors[0].1.len();
let mut center = vec![0.0f32; dim];
for (_, vec) in vectors {
for (i, &v) in vec.iter().enumerate() {
center[i] += v;
}
}
let count = vectors.len() as f32;
for c in center.iter_mut() {
*c /= count;
}
let mut min_dist = f32::MAX;
let mut medoid = vectors[0].0;
for (row_id, vec) in vectors {
let dist = self.metric.distance(vec, ¢er);
if dist < min_dist {
min_dist = dist;
medoid = *row_id;
}
}
Ok(medoid)
}
}