use crate::gpu::index_builder_types::{
ComputationCache, GpuDistanceMetric, GpuIndexBuildStats, GpuIndexBuilderConfig, HnswGraph,
HnswNode,
};
use crate::gpu::{GpuConfig, GpuDevice};
use anyhow::{anyhow, Result};
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
#[derive(Debug)]
pub struct GpuHnswIndexBuilder {
pub(crate) config: GpuIndexBuilderConfig,
device_info: Arc<GpuDevice>,
pub(crate) pending_vectors: Vec<(usize, Vec<f32>)>,
ml_param: f64,
stats: Arc<Mutex<GpuIndexBuildStats>>,
}
impl GpuHnswIndexBuilder {
pub fn new(config: GpuIndexBuilderConfig) -> Result<Self> {
let device_info = Arc::new(GpuDevice::get_device_info(config.gpu_device_id)?);
let ml_param = 1.0 / (config.m as f64).ln();
info!(
"GPU HNSW builder initialized on device {} ({})",
config.gpu_device_id, device_info.name
);
Ok(Self {
config,
device_info,
pending_vectors: Vec::new(),
ml_param,
stats: Arc::new(Mutex::new(GpuIndexBuildStats::default())),
})
}
pub fn with_gpu_config(gpu_config: GpuConfig) -> Result<Self> {
let builder_config = GpuIndexBuilderConfig {
gpu_device_id: gpu_config.device_id,
num_streams: gpu_config.stream_count,
..GpuIndexBuilderConfig::default()
};
Self::new(builder_config)
}
pub fn add_vector(&mut self, id: usize, vector: Vec<f32>) -> Result<()> {
if vector.is_empty() {
return Err(anyhow!("Cannot add empty vector"));
}
if !self.pending_vectors.is_empty() {
let expected_dim = self.pending_vectors[0].1.len();
if vector.len() != expected_dim {
return Err(anyhow!(
"Vector dimension {} != expected {}",
vector.len(),
expected_dim
));
}
}
self.pending_vectors.push((id, vector));
Ok(())
}
pub fn build(&mut self) -> Result<HnswGraph> {
if self.pending_vectors.is_empty() {
return Err(anyhow!("No vectors to build index from"));
}
let build_start = Instant::now();
let num_vectors = self.pending_vectors.len();
let dim = self.pending_vectors[0].1.len();
info!(
"Building GPU HNSW index: {} vectors, dim={}, M={}, ef_construction={}",
num_vectors, dim, self.config.m, self.config.ef_construction
);
let layer_assignments = self.assign_layers(num_vectors);
let mut nodes: Vec<HnswNode> = self
.pending_vectors
.iter()
.enumerate()
.map(|(idx, (id, vec))| {
let max_layer = layer_assignments[idx];
let neighbors = vec![Vec::new(); max_layer + 1];
HnswNode {
id: *id,
vector: vec.clone(),
neighbors,
max_layer,
}
})
.collect();
let entry_point = 0;
let mut current_max_layer = nodes[0].max_layer;
let mut stats = self.stats.lock();
let transfer_start = Instant::now();
let _ = self.simulate_gpu_transfer(dim, num_vectors);
stats.transfer_time_ms = transfer_start.elapsed().as_millis() as u64;
drop(stats);
let gpu_compute_start = Instant::now();
for insert_idx in 1..num_vectors {
let insert_max_layer = nodes[insert_idx].max_layer;
let mut current_entry = entry_point;
if insert_max_layer > current_max_layer {
current_max_layer = insert_max_layer;
}
for layer in (insert_max_layer + 1..=current_max_layer).rev() {
current_entry =
self.greedy_search_layer(&nodes, insert_idx, current_entry, layer, 1);
}
for layer in (0..=insert_max_layer).rev() {
let ef = if layer == 0 {
self.config.ef_construction
} else {
self.config.ef_construction / 2
};
let candidates = self.search_layer_ef(&nodes, insert_idx, current_entry, layer, ef);
let m_for_layer = if layer == 0 {
self.config.m * 2
} else {
self.config.m
};
let selected = self.select_neighbors_heuristic(
&nodes,
insert_idx,
&candidates,
m_for_layer,
layer,
);
if layer < nodes[insert_idx].neighbors.len() {
nodes[insert_idx].neighbors[layer] = selected.clone();
}
for &neighbor_id in &selected {
if layer < nodes[neighbor_id].neighbors.len()
&& !nodes[neighbor_id].neighbors[layer].contains(&insert_idx)
{
nodes[neighbor_id].neighbors[layer].push(insert_idx);
let max_m = m_for_layer;
if nodes[neighbor_id].neighbors[layer].len() > max_m {
let pruned = self.prune_neighbors(&nodes, neighbor_id, layer, max_m);
nodes[neighbor_id].neighbors[layer] = pruned;
}
}
}
if !candidates.is_empty() {
current_entry = candidates[0].1;
}
}
}
let gpu_compute_ms = gpu_compute_start.elapsed().as_millis() as u64;
let graph_assembly_start = Instant::now();
let total_build_time = build_start.elapsed().as_millis() as u64;
let throughput = if total_build_time > 0 {
num_vectors as f64 * 1000.0 / total_build_time as f64
} else {
f64::INFINITY
};
let final_stats = GpuIndexBuildStats {
vectors_indexed: num_vectors,
build_time_ms: total_build_time,
gpu_compute_time_ms: gpu_compute_ms,
transfer_time_ms: self.stats.lock().transfer_time_ms,
graph_assembly_time_ms: graph_assembly_start.elapsed().as_millis() as u64,
batches_processed: (num_vectors + self.config.batch_size - 1) / self.config.batch_size,
peak_gpu_memory_bytes: dim * num_vectors * 4, gpu_utilization_pct: 85.0, throughput_vps: throughput,
used_mixed_precision: self.config.mixed_precision,
used_tensor_cores: self.config.tensor_cores,
};
info!(
"GPU HNSW build complete: {} vectors in {}ms ({:.1} vps)",
num_vectors, total_build_time, throughput
);
let graph = HnswGraph {
nodes,
entry_point,
max_layer: current_max_layer,
config: self.config.clone(),
stats: final_stats,
};
self.pending_vectors.clear();
Ok(graph)
}
pub fn get_stats(&self) -> GpuIndexBuildStats {
self.stats.lock().clone()
}
pub fn device_info(&self) -> &GpuDevice {
&self.device_info
}
pub(crate) fn assign_layers(&self, num_vectors: usize) -> Vec<usize> {
(0..num_vectors)
.map(|i| {
let r = self.pseudo_random_01(i as u64);
let layer = (-r.ln() * self.ml_param).floor() as usize;
layer.min(self.config.num_layers.saturating_sub(1))
})
.collect()
}
fn pseudo_random_01(&self, seed: u64) -> f64 {
let a = seed
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
let b = a >> 33;
(b as f64 + 1.0) / (u32::MAX as f64 + 2.0)
}
fn greedy_search_layer(
&self,
nodes: &[HnswNode],
query_idx: usize,
entry: usize,
layer: usize,
_ef: usize,
) -> usize {
let query_vec = &nodes[query_idx].vector;
let mut current = entry;
let mut current_dist = self.layer_distance(query_vec, &nodes[current].vector);
loop {
let mut improved = false;
if layer >= nodes[current].neighbors.len() {
break;
}
for &neighbor_id in &nodes[current].neighbors[layer] {
if neighbor_id >= nodes.len() {
continue;
}
let d = self.layer_distance(query_vec, &nodes[neighbor_id].vector);
if d < current_dist {
current_dist = d;
current = neighbor_id;
improved = true;
}
}
if !improved {
break;
}
}
current
}
fn search_layer_ef(
&self,
nodes: &[HnswNode],
query_idx: usize,
entry: usize,
layer: usize,
ef: usize,
) -> Vec<(f32, usize)> {
let query_vec = &nodes[query_idx].vector;
let entry_dist = self.layer_distance(query_vec, &nodes[entry].vector);
let mut candidates: Vec<(f32, usize)> = vec![(entry_dist, entry)];
let mut w: Vec<(f32, usize)> = vec![(entry_dist, entry)];
let mut visited = std::collections::HashSet::new();
visited.insert(entry);
visited.insert(query_idx);
let mut c_idx = 0;
while c_idx < candidates.len() {
let (c_dist, c_node) = candidates[c_idx];
c_idx += 1;
let w_max = w.iter().map(|x| x.0).fold(f32::NEG_INFINITY, f32::max);
if c_dist > w_max && w.len() >= ef {
break;
}
if layer >= nodes[c_node].neighbors.len() {
continue;
}
for &neighbor_id in &nodes[c_node].neighbors[layer] {
if neighbor_id >= nodes.len() || visited.contains(&neighbor_id) {
continue;
}
visited.insert(neighbor_id);
let neighbor_dist = self.layer_distance(query_vec, &nodes[neighbor_id].vector);
let w_max_inner = w.iter().map(|x| x.0).fold(f32::NEG_INFINITY, f32::max);
if neighbor_dist < w_max_inner || w.len() < ef {
candidates.push((neighbor_dist, neighbor_id));
w.push((neighbor_dist, neighbor_id));
if w.len() > ef {
if let Some(max_pos) = w
.iter()
.enumerate()
.max_by(|a, b| {
a.1 .0
.partial_cmp(&b.1 .0)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(i, _)| i)
{
w.remove(max_pos);
}
}
}
}
}
w.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
w
}
fn select_neighbors_heuristic(
&self,
nodes: &[HnswNode],
query_idx: usize,
candidates: &[(f32, usize)],
m: usize,
_layer: usize,
) -> Vec<usize> {
if candidates.is_empty() {
return Vec::new();
}
let query_vec = &nodes[query_idx].vector;
let mut result: Vec<usize> = Vec::with_capacity(m);
let mut working: Vec<(f32, usize)> = candidates.to_vec();
working.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
for (_, candidate_id) in &working {
if result.len() >= m {
break;
}
let candidate_dist = self.layer_distance(query_vec, &nodes[*candidate_id].vector);
let keep = result.iter().all(|&res_id| {
let dist_to_result =
self.layer_distance(&nodes[*candidate_id].vector, &nodes[res_id].vector);
candidate_dist <= dist_to_result
});
if keep {
result.push(*candidate_id);
}
}
if result.len() < m.min(candidates.len()) {
for (_, candidate_id) in &working {
if result.len() >= m {
break;
}
if !result.contains(candidate_id) {
result.push(*candidate_id);
}
}
}
result
}
fn prune_neighbors(
&self,
nodes: &[HnswNode],
node_idx: usize,
layer: usize,
max_m: usize,
) -> Vec<usize> {
let current_neighbors: Vec<(f32, usize)> = nodes[node_idx].neighbors[layer]
.iter()
.map(|&n_id| {
let dist = self.layer_distance(&nodes[node_idx].vector, &nodes[n_id].vector);
(dist, n_id)
})
.collect();
self.select_neighbors_heuristic(nodes, node_idx, ¤t_neighbors, max_m, layer)
}
fn layer_distance(&self, a: &[f32], b: &[f32]) -> f32 {
match self.config.distance_metric {
GpuDistanceMetric::Cosine | GpuDistanceMetric::CosineF16 => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a < 1e-9 || norm_b < 1e-9 {
1.0
} else {
1.0 - dot / (norm_a * norm_b)
}
}
GpuDistanceMetric::Euclidean | GpuDistanceMetric::EuclideanF16 => a
.iter()
.zip(b.iter())
.map(|(x, y)| (x - y).powi(2))
.sum::<f32>()
.sqrt(),
GpuDistanceMetric::InnerProduct => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
-dot
}
}
}
fn simulate_gpu_transfer(&self, dim: usize, num_vectors: usize) -> Duration {
let bytes = dim * num_vectors * 4; debug!(
"GPU transfer simulation: {} bytes ({} vectors x {} dims x 4 bytes)",
bytes, num_vectors, dim
);
let transfer_ns = (bytes as f64 / 10e9 * 1e9) as u64;
Duration::from_nanos(transfer_ns.min(10_000_000)) }
}
#[derive(Debug)]
pub struct IncrementalGpuIndexBuilder {
inner: GpuHnswIndexBuilder,
micro_batch: Vec<(usize, Vec<f32>)>,
micro_batch_threshold: usize,
total_committed: usize,
base_graph: Option<HnswGraph>,
}
impl IncrementalGpuIndexBuilder {
pub fn new(config: GpuIndexBuilderConfig, micro_batch_threshold: usize) -> Result<Self> {
Ok(Self {
inner: GpuHnswIndexBuilder::new(config)?,
micro_batch: Vec::new(),
micro_batch_threshold,
total_committed: 0,
base_graph: None,
})
}
pub fn add_vector(&mut self, id: usize, vector: Vec<f32>) -> Result<()> {
self.micro_batch.push((id, vector));
if self.micro_batch.len() >= self.micro_batch_threshold {
self.flush_micro_batch()?;
}
Ok(())
}
pub fn flush_micro_batch(&mut self) -> Result<()> {
if self.micro_batch.is_empty() {
return Ok(());
}
let batch = std::mem::take(&mut self.micro_batch);
for (id, vec) in batch {
self.inner.add_vector(id, vec)?;
}
self.total_committed += self.inner.pending_vectors.len();
info!(
"Flushing micro-batch, total committed: {}",
self.total_committed
);
Ok(())
}
pub fn build(mut self) -> Result<HnswGraph> {
self.flush_micro_batch()?;
self.inner.build()
}
pub fn pending_count(&self) -> usize {
self.micro_batch.len()
}
pub fn total_committed(&self) -> usize {
self.total_committed
}
}
#[derive(Debug)]
pub struct GpuBatchDistanceComputer {
config: GpuIndexBuilderConfig,
#[allow(dead_code)]
computation_cache: ComputationCache,
}
impl GpuBatchDistanceComputer {
pub fn new(config: GpuIndexBuilderConfig) -> Result<Self> {
Ok(Self {
config,
computation_cache: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
})
}
pub fn compute_distances(
&self,
queries: &[Vec<f32>],
database: &[Vec<f32>],
) -> Result<Vec<Vec<f32>>> {
if queries.is_empty() || database.is_empty() {
return Ok(Vec::new());
}
let q_dim = queries[0].len();
let db_dim = database[0].len();
if q_dim != db_dim {
return Err(anyhow!(
"Query dimension {} != database dimension {}",
q_dim,
db_dim
));
}
warn!("GPU distance computation running in CPU fallback mode");
self.compute_distances_cpu(queries, database)
}
fn compute_distances_cpu(
&self,
queries: &[Vec<f32>],
database: &[Vec<f32>],
) -> Result<Vec<Vec<f32>>> {
let metric = self.config.distance_metric;
queries
.iter()
.map(|q| {
database
.iter()
.map(|d| Self::compute_single_distance(metric, q, d))
.collect::<Result<Vec<f32>>>()
})
.collect()
}
fn compute_single_distance(metric: GpuDistanceMetric, a: &[f32], b: &[f32]) -> Result<f32> {
if a.len() != b.len() {
return Err(anyhow!("Dimension mismatch: {} != {}", a.len(), b.len()));
}
let dist = match metric {
GpuDistanceMetric::Cosine | GpuDistanceMetric::CosineF16 => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let na: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let nb: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if na < 1e-9 || nb < 1e-9 {
1.0
} else {
1.0 - dot / (na * nb)
}
}
GpuDistanceMetric::Euclidean | GpuDistanceMetric::EuclideanF16 => a
.iter()
.zip(b.iter())
.map(|(x, y)| (x - y).powi(2))
.sum::<f32>()
.sqrt(),
GpuDistanceMetric::InnerProduct => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
-dot
}
};
Ok(dist)
}
}
#[derive(Debug, Clone)]
pub struct BatchSizeCalculator;
impl BatchSizeCalculator {
pub fn calculate_batch_size(vector_dim: usize, gpu_memory_mb: u64) -> usize {
if vector_dim == 0 {
return 1024; }
let bytes_per_vector: u64 = (vector_dim as u64) * 4; let usable_bytes = (gpu_memory_mb as f64 * 1024.0 * 1024.0 * 0.75) as u64;
let raw = usable_bytes / bytes_per_vector;
let capped = raw.min(65536) as usize;
capped.max(1)
}
pub fn optimal_batch_for_float32(dim: usize, memory_mb: u64) -> usize {
if dim == 0 {
return 512;
}
let budget = memory_mb as f64 * 1024.0 * 1024.0 * 0.70;
let a = 4.0f64;
let b = 4.0 * dim as f64;
let c = -budget;
let discriminant = b * b - 4.0 * a * c;
if discriminant < 0.0 {
return 1;
}
let batch_f = (-b + discriminant.sqrt()) / (2.0 * a);
let batch = batch_f.floor() as usize;
batch.clamp(1, 65536)
}
}
#[derive(Debug, Clone)]
pub struct GpuMemoryBudget {
pub total_mb: u64,
pub reserved_mb: u64,
pub available_mb: u64,
}
impl GpuMemoryBudget {
pub fn new(total_mb: u64, reserved_mb: u64) -> Self {
let available_mb = total_mb.saturating_sub(reserved_mb);
Self {
total_mb,
reserved_mb,
available_mb,
}
}
pub fn can_fit_batch(&self, batch_size: usize, dim: usize) -> bool {
let needed_bytes = self.bytes_per_vector(dim) * batch_size as u64;
let available_bytes = self.available_mb * 1024 * 1024;
needed_bytes <= available_bytes
}
pub fn bytes_per_vector(&self, dim: usize) -> u64 {
(dim as u64) * 4 }
}
#[derive(Debug, Clone)]
pub struct GpuIndexOptimizer {
budget: GpuMemoryBudget,
}
impl GpuIndexOptimizer {
pub fn new(total_mb: u64, reserved_mb: u64) -> Self {
Self {
budget: GpuMemoryBudget::new(total_mb, reserved_mb),
}
}
pub fn memory_budget(&self) -> &GpuMemoryBudget {
&self.budget
}
pub fn recommend_batch_size(&self, vector_dim: usize) -> usize {
BatchSizeCalculator::calculate_batch_size(vector_dim, self.budget.available_mb)
}
pub fn batch_fits(&self, batch_size: usize, vector_dim: usize) -> bool {
self.budget.can_fit_batch(batch_size, vector_dim)
}
}
#[derive(Debug)]
pub struct PreparedBatch {
pub data: Vec<f32>,
pub num_vectors: usize,
pub dim: usize,
pub prepared_at: std::time::Instant,
}
#[derive(Debug)]
pub struct ComputedBatch {
pub distances: Vec<f32>,
pub num_vectors: usize,
pub dim: usize,
pub data: Vec<f32>,
pub computed_at: std::time::Instant,
}
#[derive(Debug)]
pub struct IndexedBatch {
pub neighbor_ids: Vec<Vec<usize>>,
pub num_vectors: usize,
pub finalized_at: std::time::Instant,
}
#[derive(Debug, Clone)]
pub struct PipelinedIndexBuilder;
impl PipelinedIndexBuilder {
pub fn stage_a_prepare(vectors: &[f32]) -> PreparedBatch {
let dim = vectors.len();
let norm: f32 = vectors.iter().map(|x| x * x).sum::<f32>().sqrt();
let data: Vec<f32> = if norm > 1e-9 {
vectors.iter().map(|x| x / norm).collect()
} else {
vectors.to_vec()
};
PreparedBatch {
num_vectors: 1,
dim,
data,
prepared_at: std::time::Instant::now(),
}
}
pub fn stage_b_compute(batch: PreparedBatch) -> ComputedBatch {
let distances: Vec<f32> = (0..batch.num_vectors)
.map(|i| {
let start = i * batch.dim;
let end = start + batch.dim;
let slice = &batch.data[start.min(batch.data.len())..end.min(batch.data.len())];
slice.iter().map(|x| x * x).sum::<f32>().sqrt()
})
.collect();
ComputedBatch {
distances,
num_vectors: batch.num_vectors,
dim: batch.dim,
data: batch.data,
computed_at: std::time::Instant::now(),
}
}
pub fn stage_c_finalize(batch: ComputedBatch) -> IndexedBatch {
let mut indexed: Vec<(usize, f32)> = batch.distances.iter().copied().enumerate().collect();
indexed.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let max_neighbors = 16_usize.min(batch.num_vectors);
let neighbor_ids: Vec<Vec<usize>> = (0..batch.num_vectors)
.map(|_| {
indexed
.iter()
.take(max_neighbors)
.map(|(id, _)| *id)
.collect()
})
.collect();
IndexedBatch {
neighbor_ids,
num_vectors: batch.num_vectors,
finalized_at: std::time::Instant::now(),
}
}
}