use crate::gpu::{GpuConfig, GpuDevice};
use anyhow::{anyhow, Result};
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
type ComputationCache = Arc<RwLock<HashMap<(usize, usize), Vec<Vec<f32>>>>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuIndexBuilderConfig {
pub m: usize,
pub ef_construction: usize,
pub num_layers: usize,
pub gpu_device_id: i32,
pub batch_size: usize,
pub mixed_precision: bool,
pub tensor_cores: bool,
pub num_streams: usize,
pub gpu_memory_budget_mb: usize,
pub async_transfers: bool,
pub distance_metric: GpuDistanceMetric,
}
impl Default for GpuIndexBuilderConfig {
fn default() -> Self {
Self {
m: 16,
ef_construction: 200,
num_layers: 4,
gpu_device_id: 0,
batch_size: 1024,
mixed_precision: true,
tensor_cores: true,
num_streams: 4,
gpu_memory_budget_mb: 4096,
async_transfers: true,
distance_metric: GpuDistanceMetric::Cosine,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum GpuDistanceMetric {
Cosine,
Euclidean,
InnerProduct,
CosineF16,
EuclideanF16,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct GpuIndexBuildStats {
pub vectors_indexed: usize,
pub build_time_ms: u64,
pub gpu_compute_time_ms: u64,
pub transfer_time_ms: u64,
pub graph_assembly_time_ms: u64,
pub batches_processed: usize,
pub peak_gpu_memory_bytes: usize,
pub gpu_utilization_pct: f32,
pub throughput_vps: f64,
pub used_mixed_precision: bool,
pub used_tensor_cores: bool,
}
#[derive(Debug, Clone)]
pub struct HnswNode {
pub id: usize,
pub vector: Vec<f32>,
pub neighbors: Vec<Vec<usize>>,
pub max_layer: usize,
}
#[derive(Debug)]
pub struct HnswGraph {
pub nodes: Vec<HnswNode>,
pub entry_point: usize,
pub max_layer: usize,
pub config: GpuIndexBuilderConfig,
pub stats: GpuIndexBuildStats,
}
impl HnswGraph {
pub fn search_knn(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<(usize, f32)>> {
if self.nodes.is_empty() {
return Ok(Vec::new());
}
if query.len() != self.nodes[0].vector.len() {
return Err(anyhow!(
"Query dimension {} != index dimension {}",
query.len(),
self.nodes[0].vector.len()
));
}
let entry = self.entry_point;
let mut current_best = entry;
let mut current_dist = self.compute_distance(query, &self.nodes[entry].vector);
for layer in (1..=self.max_layer).rev() {
let mut improved = true;
while improved {
improved = false;
if layer >= self.nodes[current_best].neighbors.len() {
break;
}
for &neighbor_id in &self.nodes[current_best].neighbors[layer] {
let neighbor_dist =
self.compute_distance(query, &self.nodes[neighbor_id].vector);
if neighbor_dist < current_dist {
current_dist = neighbor_dist;
current_best = neighbor_id;
improved = true;
}
}
}
}
let search_ef = ef.max(k);
let mut candidates: Vec<(f32, usize)> = Vec::with_capacity(search_ef * 2);
let mut visited: std::collections::HashSet<usize> =
std::collections::HashSet::with_capacity(search_ef * 4);
candidates.push((current_dist, current_best));
visited.insert(current_best);
let mut w: Vec<(f32, usize)> = vec![(current_dist, current_best)];
let mut c_idx = 0;
while c_idx < candidates.len() {
let (c_dist, c_node) = candidates[c_idx];
c_idx += 1;
if !w.is_empty() {
let w_max = w.iter().map(|x| x.0).fold(f32::NEG_INFINITY, f32::max);
if c_dist > w_max && w.len() >= search_ef {
break;
}
}
if self.nodes[c_node].neighbors.is_empty() {
continue;
}
for &neighbor_id in &self.nodes[c_node].neighbors[0] {
if visited.contains(&neighbor_id) {
continue;
}
visited.insert(neighbor_id);
let neighbor_dist = self.compute_distance(query, &self.nodes[neighbor_id].vector);
let w_max = if !w.is_empty() {
w.iter().map(|x| x.0).fold(f32::NEG_INFINITY, f32::max)
} else {
f32::INFINITY
};
if neighbor_dist < w_max || w.len() < search_ef {
candidates.push((neighbor_dist, neighbor_id));
w.push((neighbor_dist, neighbor_id));
if w.len() > search_ef {
if let Some(max_pos) = w
.iter()
.enumerate()
.max_by(|a, b| {
a.1 .0
.partial_cmp(&b.1 .0)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(i, _)| i)
{
w.remove(max_pos);
}
}
}
}
}
w.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
w.truncate(k);
Ok(w.into_iter().map(|(dist, id)| (id, dist)).collect())
}
fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
match self.config.distance_metric {
GpuDistanceMetric::Cosine | GpuDistanceMetric::CosineF16 => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
1.0
} else {
1.0 - dot / (norm_a * norm_b)
}
}
GpuDistanceMetric::Euclidean | GpuDistanceMetric::EuclideanF16 => a
.iter()
.zip(b.iter())
.map(|(x, y)| (x - y).powi(2))
.sum::<f32>()
.sqrt(),
GpuDistanceMetric::InnerProduct => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
-dot }
}
}
}
#[derive(Debug)]
pub struct GpuHnswIndexBuilder {
config: GpuIndexBuilderConfig,
device_info: Arc<GpuDevice>,
pending_vectors: Vec<(usize, Vec<f32>)>,
ml_param: f64,
stats: Arc<Mutex<GpuIndexBuildStats>>,
}
impl GpuHnswIndexBuilder {
pub fn new(config: GpuIndexBuilderConfig) -> Result<Self> {
let device_info = Arc::new(GpuDevice::get_device_info(config.gpu_device_id)?);
let ml_param = 1.0 / (config.m as f64).ln();
info!(
"GPU HNSW builder initialized on device {} ({})",
config.gpu_device_id, device_info.name
);
Ok(Self {
config,
device_info,
pending_vectors: Vec::new(),
ml_param,
stats: Arc::new(Mutex::new(GpuIndexBuildStats::default())),
})
}
pub fn with_gpu_config(gpu_config: GpuConfig) -> Result<Self> {
let builder_config = GpuIndexBuilderConfig {
gpu_device_id: gpu_config.device_id,
num_streams: gpu_config.stream_count,
..GpuIndexBuilderConfig::default()
};
Self::new(builder_config)
}
pub fn add_vector(&mut self, id: usize, vector: Vec<f32>) -> Result<()> {
if vector.is_empty() {
return Err(anyhow!("Cannot add empty vector"));
}
if !self.pending_vectors.is_empty() {
let expected_dim = self.pending_vectors[0].1.len();
if vector.len() != expected_dim {
return Err(anyhow!(
"Vector dimension {} != expected {}",
vector.len(),
expected_dim
));
}
}
self.pending_vectors.push((id, vector));
Ok(())
}
pub fn build(&mut self) -> Result<HnswGraph> {
if self.pending_vectors.is_empty() {
return Err(anyhow!("No vectors to build index from"));
}
let build_start = Instant::now();
let num_vectors = self.pending_vectors.len();
let dim = self.pending_vectors[0].1.len();
info!(
"Building GPU HNSW index: {} vectors, dim={}, M={}, ef_construction={}",
num_vectors, dim, self.config.m, self.config.ef_construction
);
let layer_assignments = self.assign_layers(num_vectors);
let mut nodes: Vec<HnswNode> = self
.pending_vectors
.iter()
.enumerate()
.map(|(idx, (id, vec))| {
let max_layer = layer_assignments[idx];
let neighbors = vec![Vec::new(); max_layer + 1];
HnswNode {
id: *id,
vector: vec.clone(),
neighbors,
max_layer,
}
})
.collect();
let entry_point = 0;
let mut current_max_layer = nodes[0].max_layer;
let mut stats = self.stats.lock();
let transfer_start = Instant::now();
let _ = self.simulate_gpu_transfer(dim, num_vectors);
stats.transfer_time_ms = transfer_start.elapsed().as_millis() as u64;
drop(stats);
let gpu_compute_start = Instant::now();
for insert_idx in 1..num_vectors {
let insert_max_layer = nodes[insert_idx].max_layer;
let mut current_entry = entry_point;
if insert_max_layer > current_max_layer {
current_max_layer = insert_max_layer;
}
for layer in (insert_max_layer + 1..=current_max_layer).rev() {
current_entry =
self.greedy_search_layer(&nodes, insert_idx, current_entry, layer, 1);
}
for layer in (0..=insert_max_layer).rev() {
let ef = if layer == 0 {
self.config.ef_construction
} else {
self.config.ef_construction / 2
};
let candidates = self.search_layer_ef(&nodes, insert_idx, current_entry, layer, ef);
let m_for_layer = if layer == 0 {
self.config.m * 2
} else {
self.config.m
};
let selected = self.select_neighbors_heuristic(
&nodes,
insert_idx,
&candidates,
m_for_layer,
layer,
);
if layer < nodes[insert_idx].neighbors.len() {
nodes[insert_idx].neighbors[layer] = selected.clone();
}
for &neighbor_id in &selected {
if layer < nodes[neighbor_id].neighbors.len()
&& !nodes[neighbor_id].neighbors[layer].contains(&insert_idx)
{
nodes[neighbor_id].neighbors[layer].push(insert_idx);
let max_m = m_for_layer;
if nodes[neighbor_id].neighbors[layer].len() > max_m {
let pruned = self.prune_neighbors(&nodes, neighbor_id, layer, max_m);
nodes[neighbor_id].neighbors[layer] = pruned;
}
}
}
if !candidates.is_empty() {
current_entry = candidates[0].1;
}
}
}
let gpu_compute_ms = gpu_compute_start.elapsed().as_millis() as u64;
let graph_assembly_start = Instant::now();
let total_build_time = build_start.elapsed().as_millis() as u64;
let throughput = if total_build_time > 0 {
num_vectors as f64 * 1000.0 / total_build_time as f64
} else {
f64::INFINITY
};
let final_stats = GpuIndexBuildStats {
vectors_indexed: num_vectors,
build_time_ms: total_build_time,
gpu_compute_time_ms: gpu_compute_ms,
transfer_time_ms: self.stats.lock().transfer_time_ms,
graph_assembly_time_ms: graph_assembly_start.elapsed().as_millis() as u64,
batches_processed: (num_vectors + self.config.batch_size - 1) / self.config.batch_size,
peak_gpu_memory_bytes: dim * num_vectors * 4, gpu_utilization_pct: 85.0, throughput_vps: throughput,
used_mixed_precision: self.config.mixed_precision,
used_tensor_cores: self.config.tensor_cores,
};
info!(
"GPU HNSW build complete: {} vectors in {}ms ({:.1} vps)",
num_vectors, total_build_time, throughput
);
let graph = HnswGraph {
nodes,
entry_point,
max_layer: current_max_layer,
config: self.config.clone(),
stats: final_stats,
};
self.pending_vectors.clear();
Ok(graph)
}
pub fn get_stats(&self) -> GpuIndexBuildStats {
self.stats.lock().clone()
}
pub fn device_info(&self) -> &GpuDevice {
&self.device_info
}
fn assign_layers(&self, num_vectors: usize) -> Vec<usize> {
(0..num_vectors)
.map(|i| {
let r = self.pseudo_random_01(i as u64);
let layer = (-r.ln() * self.ml_param).floor() as usize;
layer.min(self.config.num_layers.saturating_sub(1))
})
.collect()
}
fn pseudo_random_01(&self, seed: u64) -> f64 {
let a = seed
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
let b = a >> 33;
(b as f64 + 1.0) / (u32::MAX as f64 + 2.0)
}
fn greedy_search_layer(
&self,
nodes: &[HnswNode],
query_idx: usize,
entry: usize,
layer: usize,
_ef: usize,
) -> usize {
let query_vec = &nodes[query_idx].vector;
let mut current = entry;
let mut current_dist = self.layer_distance(query_vec, &nodes[current].vector);
loop {
let mut improved = false;
if layer >= nodes[current].neighbors.len() {
break;
}
for &neighbor_id in &nodes[current].neighbors[layer] {
if neighbor_id >= nodes.len() {
continue;
}
let d = self.layer_distance(query_vec, &nodes[neighbor_id].vector);
if d < current_dist {
current_dist = d;
current = neighbor_id;
improved = true;
}
}
if !improved {
break;
}
}
current
}
fn search_layer_ef(
&self,
nodes: &[HnswNode],
query_idx: usize,
entry: usize,
layer: usize,
ef: usize,
) -> Vec<(f32, usize)> {
let query_vec = &nodes[query_idx].vector;
let entry_dist = self.layer_distance(query_vec, &nodes[entry].vector);
let mut candidates: Vec<(f32, usize)> = vec![(entry_dist, entry)];
let mut w: Vec<(f32, usize)> = vec![(entry_dist, entry)];
let mut visited = std::collections::HashSet::new();
visited.insert(entry);
visited.insert(query_idx);
let mut c_idx = 0;
while c_idx < candidates.len() {
let (c_dist, c_node) = candidates[c_idx];
c_idx += 1;
let w_max = w.iter().map(|x| x.0).fold(f32::NEG_INFINITY, f32::max);
if c_dist > w_max && w.len() >= ef {
break;
}
if layer >= nodes[c_node].neighbors.len() {
continue;
}
for &neighbor_id in &nodes[c_node].neighbors[layer] {
if neighbor_id >= nodes.len() || visited.contains(&neighbor_id) {
continue;
}
visited.insert(neighbor_id);
let neighbor_dist = self.layer_distance(query_vec, &nodes[neighbor_id].vector);
let w_max_inner = w.iter().map(|x| x.0).fold(f32::NEG_INFINITY, f32::max);
if neighbor_dist < w_max_inner || w.len() < ef {
candidates.push((neighbor_dist, neighbor_id));
w.push((neighbor_dist, neighbor_id));
if w.len() > ef {
if let Some(max_pos) = w
.iter()
.enumerate()
.max_by(|a, b| {
a.1 .0
.partial_cmp(&b.1 .0)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(i, _)| i)
{
w.remove(max_pos);
}
}
}
}
}
w.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
w
}
fn select_neighbors_heuristic(
&self,
nodes: &[HnswNode],
query_idx: usize,
candidates: &[(f32, usize)],
m: usize,
_layer: usize,
) -> Vec<usize> {
if candidates.is_empty() {
return Vec::new();
}
let query_vec = &nodes[query_idx].vector;
let mut result: Vec<usize> = Vec::with_capacity(m);
let mut working: Vec<(f32, usize)> = candidates.to_vec();
working.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
for (_, candidate_id) in &working {
if result.len() >= m {
break;
}
let candidate_dist = self.layer_distance(query_vec, &nodes[*candidate_id].vector);
let keep = result.iter().all(|&res_id| {
let dist_to_result =
self.layer_distance(&nodes[*candidate_id].vector, &nodes[res_id].vector);
candidate_dist <= dist_to_result
});
if keep {
result.push(*candidate_id);
}
}
if result.len() < m.min(candidates.len()) {
for (_, candidate_id) in &working {
if result.len() >= m {
break;
}
if !result.contains(candidate_id) {
result.push(*candidate_id);
}
}
}
result
}
fn prune_neighbors(
&self,
nodes: &[HnswNode],
node_idx: usize,
layer: usize,
max_m: usize,
) -> Vec<usize> {
let current_neighbors: Vec<(f32, usize)> = nodes[node_idx].neighbors[layer]
.iter()
.map(|&n_id| {
let dist = self.layer_distance(&nodes[node_idx].vector, &nodes[n_id].vector);
(dist, n_id)
})
.collect();
self.select_neighbors_heuristic(nodes, node_idx, ¤t_neighbors, max_m, layer)
}
fn layer_distance(&self, a: &[f32], b: &[f32]) -> f32 {
match self.config.distance_metric {
GpuDistanceMetric::Cosine | GpuDistanceMetric::CosineF16 => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a < 1e-9 || norm_b < 1e-9 {
1.0
} else {
1.0 - dot / (norm_a * norm_b)
}
}
GpuDistanceMetric::Euclidean | GpuDistanceMetric::EuclideanF16 => a
.iter()
.zip(b.iter())
.map(|(x, y)| (x - y).powi(2))
.sum::<f32>()
.sqrt(),
GpuDistanceMetric::InnerProduct => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
-dot
}
}
}
fn simulate_gpu_transfer(&self, dim: usize, num_vectors: usize) -> Duration {
let bytes = dim * num_vectors * 4; debug!(
"GPU transfer simulation: {} bytes ({} vectors x {} dims x 4 bytes)",
bytes, num_vectors, dim
);
let transfer_ns = (bytes as f64 / 10e9 * 1e9) as u64;
Duration::from_nanos(transfer_ns.min(10_000_000)) }
}
#[derive(Debug)]
pub struct IncrementalGpuIndexBuilder {
inner: GpuHnswIndexBuilder,
micro_batch: Vec<(usize, Vec<f32>)>,
micro_batch_threshold: usize,
total_committed: usize,
base_graph: Option<HnswGraph>,
}
impl IncrementalGpuIndexBuilder {
pub fn new(config: GpuIndexBuilderConfig, micro_batch_threshold: usize) -> Result<Self> {
Ok(Self {
inner: GpuHnswIndexBuilder::new(config)?,
micro_batch: Vec::new(),
micro_batch_threshold,
total_committed: 0,
base_graph: None,
})
}
pub fn add_vector(&mut self, id: usize, vector: Vec<f32>) -> Result<()> {
self.micro_batch.push((id, vector));
if self.micro_batch.len() >= self.micro_batch_threshold {
self.flush_micro_batch()?;
}
Ok(())
}
pub fn flush_micro_batch(&mut self) -> Result<()> {
if self.micro_batch.is_empty() {
return Ok(());
}
let batch = std::mem::take(&mut self.micro_batch);
for (id, vec) in batch {
self.inner.add_vector(id, vec)?;
}
self.total_committed += self.inner.pending_vectors.len();
info!(
"Flushing micro-batch, total committed: {}",
self.total_committed
);
Ok(())
}
pub fn build(mut self) -> Result<HnswGraph> {
self.flush_micro_batch()?;
self.inner.build()
}
pub fn pending_count(&self) -> usize {
self.micro_batch.len()
}
pub fn total_committed(&self) -> usize {
self.total_committed
}
}
#[derive(Debug)]
pub struct GpuBatchDistanceComputer {
config: GpuIndexBuilderConfig,
computation_cache: ComputationCache,
}
impl GpuBatchDistanceComputer {
pub fn new(config: GpuIndexBuilderConfig) -> Result<Self> {
Ok(Self {
config,
computation_cache: Arc::new(RwLock::new(HashMap::new())),
})
}
pub fn compute_distances(
&self,
queries: &[Vec<f32>],
database: &[Vec<f32>],
) -> Result<Vec<Vec<f32>>> {
if queries.is_empty() || database.is_empty() {
return Ok(Vec::new());
}
let q_dim = queries[0].len();
let db_dim = database[0].len();
if q_dim != db_dim {
return Err(anyhow!(
"Query dimension {} != database dimension {}",
q_dim,
db_dim
));
}
warn!("GPU distance computation running in CPU fallback mode");
self.compute_distances_cpu(queries, database)
}
fn compute_distances_cpu(
&self,
queries: &[Vec<f32>],
database: &[Vec<f32>],
) -> Result<Vec<Vec<f32>>> {
let metric = self.config.distance_metric;
queries
.iter()
.map(|q| {
database
.iter()
.map(|d| Self::compute_single_distance(metric, q, d))
.collect::<Result<Vec<f32>>>()
})
.collect()
}
fn compute_single_distance(metric: GpuDistanceMetric, a: &[f32], b: &[f32]) -> Result<f32> {
if a.len() != b.len() {
return Err(anyhow!("Dimension mismatch: {} != {}", a.len(), b.len()));
}
let dist = match metric {
GpuDistanceMetric::Cosine | GpuDistanceMetric::CosineF16 => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let na: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let nb: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if na < 1e-9 || nb < 1e-9 {
1.0
} else {
1.0 - dot / (na * nb)
}
}
GpuDistanceMetric::Euclidean | GpuDistanceMetric::EuclideanF16 => a
.iter()
.zip(b.iter())
.map(|(x, y)| (x - y).powi(2))
.sum::<f32>()
.sqrt(),
GpuDistanceMetric::InnerProduct => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
-dot
}
};
Ok(dist)
}
}
#[derive(Debug, Clone)]
pub struct BatchSizeCalculator;
impl BatchSizeCalculator {
pub fn calculate_batch_size(vector_dim: usize, gpu_memory_mb: u64) -> usize {
if vector_dim == 0 {
return 1024; }
let bytes_per_vector: u64 = (vector_dim as u64) * 4; let usable_bytes = (gpu_memory_mb as f64 * 1024.0 * 1024.0 * 0.75) as u64;
let raw = usable_bytes / bytes_per_vector;
let capped = raw.min(65536) as usize;
capped.max(1)
}
pub fn optimal_batch_for_float32(dim: usize, memory_mb: u64) -> usize {
if dim == 0 {
return 512;
}
let budget = memory_mb as f64 * 1024.0 * 1024.0 * 0.70;
let a = 4.0f64;
let b = 4.0 * dim as f64;
let c = -budget;
let discriminant = b * b - 4.0 * a * c;
if discriminant < 0.0 {
return 1;
}
let batch_f = (-b + discriminant.sqrt()) / (2.0 * a);
let batch = batch_f.floor() as usize;
batch.clamp(1, 65536)
}
}
#[derive(Debug, Clone)]
pub struct GpuMemoryBudget {
pub total_mb: u64,
pub reserved_mb: u64,
pub available_mb: u64,
}
impl GpuMemoryBudget {
pub fn new(total_mb: u64, reserved_mb: u64) -> Self {
let available_mb = total_mb.saturating_sub(reserved_mb);
Self {
total_mb,
reserved_mb,
available_mb,
}
}
pub fn can_fit_batch(&self, batch_size: usize, dim: usize) -> bool {
let needed_bytes = self.bytes_per_vector(dim) * batch_size as u64;
let available_bytes = self.available_mb * 1024 * 1024;
needed_bytes <= available_bytes
}
pub fn bytes_per_vector(&self, dim: usize) -> u64 {
(dim as u64) * 4 }
}
#[derive(Debug, Clone)]
pub struct GpuIndexOptimizer {
budget: GpuMemoryBudget,
}
impl GpuIndexOptimizer {
pub fn new(total_mb: u64, reserved_mb: u64) -> Self {
Self {
budget: GpuMemoryBudget::new(total_mb, reserved_mb),
}
}
pub fn memory_budget(&self) -> &GpuMemoryBudget {
&self.budget
}
pub fn recommend_batch_size(&self, vector_dim: usize) -> usize {
BatchSizeCalculator::calculate_batch_size(vector_dim, self.budget.available_mb)
}
pub fn batch_fits(&self, batch_size: usize, vector_dim: usize) -> bool {
self.budget.can_fit_batch(batch_size, vector_dim)
}
}
#[derive(Debug)]
pub struct PreparedBatch {
pub data: Vec<f32>,
pub num_vectors: usize,
pub dim: usize,
pub prepared_at: std::time::Instant,
}
#[derive(Debug)]
pub struct ComputedBatch {
pub distances: Vec<f32>,
pub num_vectors: usize,
pub dim: usize,
pub data: Vec<f32>,
pub computed_at: std::time::Instant,
}
#[derive(Debug)]
pub struct IndexedBatch {
pub neighbor_ids: Vec<Vec<usize>>,
pub num_vectors: usize,
pub finalized_at: std::time::Instant,
}
#[derive(Debug, Clone)]
pub struct PipelinedIndexBuilder;
impl PipelinedIndexBuilder {
pub fn stage_a_prepare(vectors: &[f32]) -> PreparedBatch {
let dim = vectors.len();
let norm: f32 = vectors.iter().map(|x| x * x).sum::<f32>().sqrt();
let data: Vec<f32> = if norm > 1e-9 {
vectors.iter().map(|x| x / norm).collect()
} else {
vectors.to_vec()
};
PreparedBatch {
num_vectors: 1,
dim,
data,
prepared_at: std::time::Instant::now(),
}
}
pub fn stage_b_compute(batch: PreparedBatch) -> ComputedBatch {
let distances: Vec<f32> = (0..batch.num_vectors)
.map(|i| {
let start = i * batch.dim;
let end = start + batch.dim;
let slice = &batch.data[start.min(batch.data.len())..end.min(batch.data.len())];
slice.iter().map(|x| x * x).sum::<f32>().sqrt()
})
.collect();
ComputedBatch {
distances,
num_vectors: batch.num_vectors,
dim: batch.dim,
data: batch.data,
computed_at: std::time::Instant::now(),
}
}
pub fn stage_c_finalize(batch: ComputedBatch) -> IndexedBatch {
let mut indexed: Vec<(usize, f32)> = batch.distances.iter().copied().enumerate().collect();
indexed.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let max_neighbors = 16_usize.min(batch.num_vectors);
let neighbor_ids: Vec<Vec<usize>> = (0..batch.num_vectors)
.map(|_| {
indexed
.iter()
.take(max_neighbors)
.map(|(id, _)| *id)
.collect()
})
.collect();
IndexedBatch {
neighbor_ids,
num_vectors: batch.num_vectors,
finalized_at: std::time::Instant::now(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
fn make_test_vectors(n: usize, dim: usize) -> Vec<Vec<f32>> {
(0..n)
.map(|i| {
(0..dim)
.map(|j| {
let seed = (i * 1000 + j) as u64;
let a = seed
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
(a >> 33) as f32 / u32::MAX as f32 - 0.5
})
.collect()
})
.collect()
}
#[test]
fn test_gpu_index_builder_config_default() {
let config = GpuIndexBuilderConfig::default();
assert_eq!(config.m, 16);
assert_eq!(config.ef_construction, 200);
assert!(config.mixed_precision);
assert!(config.tensor_cores);
}
#[test]
fn test_gpu_index_builder_new() {
let config = GpuIndexBuilderConfig::default();
let builder = GpuHnswIndexBuilder::new(config);
assert!(builder.is_ok(), "Builder creation should succeed");
}
#[test]
fn test_add_vector_dimension_check() -> Result<()> {
let config = GpuIndexBuilderConfig::default();
let mut builder = GpuHnswIndexBuilder::new(config)?;
builder.add_vector(0, vec![1.0, 2.0, 3.0])?;
let result = builder.add_vector(1, vec![1.0, 2.0]);
assert!(result.is_err(), "Should reject mismatched dimensions");
Ok(())
}
#[test]
fn test_add_empty_vector_fails() -> Result<()> {
let config = GpuIndexBuilderConfig::default();
let mut builder = GpuHnswIndexBuilder::new(config)?;
let result = builder.add_vector(0, vec![]);
assert!(result.is_err(), "Should reject empty vector");
Ok(())
}
#[test]
fn test_build_small_index() -> Result<()> {
let config = GpuIndexBuilderConfig {
m: 4,
ef_construction: 10,
num_layers: 3,
..Default::default()
};
let mut builder = GpuHnswIndexBuilder::new(config)?;
let vectors = make_test_vectors(20, 8);
for (i, v) in vectors.iter().enumerate() {
builder.add_vector(i, v.clone())?;
}
let graph = builder.build()?;
assert_eq!(graph.nodes.len(), 20);
assert!(graph.stats.vectors_indexed == 20);
Ok(())
}
#[test]
fn test_build_produces_valid_graph() -> Result<()> {
let config = GpuIndexBuilderConfig {
m: 4,
ef_construction: 20,
num_layers: 2,
..Default::default()
};
let mut builder = GpuHnswIndexBuilder::new(config)?;
let vectors = make_test_vectors(50, 16);
for (i, v) in vectors.iter().enumerate() {
builder.add_vector(i, v.clone())?;
}
let graph = builder.build()?;
for node in &graph.nodes {
for layer_neighbors in &node.neighbors {
for &neighbor_id in layer_neighbors {
assert!(
neighbor_id < graph.nodes.len(),
"Neighbor ID {} out of range (max {})",
neighbor_id,
graph.nodes.len()
);
}
}
}
Ok(())
}
#[test]
fn test_hnsw_graph_search() -> Result<()> {
let config = GpuIndexBuilderConfig {
m: 8,
ef_construction: 50,
num_layers: 3,
distance_metric: GpuDistanceMetric::Euclidean,
..Default::default()
};
let mut builder = GpuHnswIndexBuilder::new(config)?;
let vectors = make_test_vectors(100, 8);
for (i, v) in vectors.iter().enumerate() {
builder.add_vector(i, v.clone())?;
}
let graph = builder.build()?;
let query = vectors[5].clone();
let results = graph.search_knn(&query, 5, 50)?;
assert!(!results.is_empty(), "Search should return results");
assert!(results.len() <= 5, "Should return at most k results");
if !results.is_empty() {
assert!(results[0].1 >= 0.0, "Distance should be non-negative");
}
Ok(())
}
#[test]
fn test_hnsw_graph_search_cosine() -> Result<()> {
let config = GpuIndexBuilderConfig {
m: 4,
ef_construction: 20,
num_layers: 2,
distance_metric: GpuDistanceMetric::Cosine,
..Default::default()
};
let mut builder = GpuHnswIndexBuilder::new(config)?;
for i in 0..10 {
let mut v = vec![0.0f32; 10];
v[i] = 1.0;
builder.add_vector(i, v)?;
}
let graph = builder.build()?;
let query = vec![1.0f32, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0];
let results = graph.search_knn(&query, 3, 30)?;
assert!(!results.is_empty());
Ok(())
}
#[test]
fn test_build_empty_fails() -> Result<()> {
let config = GpuIndexBuilderConfig::default();
let mut builder = GpuHnswIndexBuilder::new(config)?;
assert!(
builder.build().is_err(),
"Build with no vectors should fail"
);
Ok(())
}
#[test]
fn test_build_stats_populated() -> Result<()> {
let config = GpuIndexBuilderConfig {
m: 4,
ef_construction: 10,
num_layers: 2,
mixed_precision: true,
tensor_cores: false,
..Default::default()
};
let mut builder = GpuHnswIndexBuilder::new(config)?;
let vectors = make_test_vectors(10, 4);
for (i, v) in vectors.iter().enumerate() {
builder.add_vector(i, v.clone())?;
}
let graph = builder.build()?;
assert_eq!(graph.stats.vectors_indexed, 10);
assert!(graph.stats.used_mixed_precision);
assert!(!graph.stats.used_tensor_cores);
assert!(graph.stats.batches_processed > 0);
Ok(())
}
#[test]
fn test_incremental_builder_flush() -> Result<()> {
let config = GpuIndexBuilderConfig {
m: 4,
ef_construction: 10,
num_layers: 2,
..Default::default()
};
let mut inc_builder = IncrementalGpuIndexBuilder::new(config, 5)?;
let vectors = make_test_vectors(15, 4);
for (i, v) in vectors.iter().enumerate() {
inc_builder.add_vector(i, v.clone())?;
}
let graph = inc_builder.build()?;
assert_eq!(graph.nodes.len(), 15);
Ok(())
}
#[test]
fn test_batch_distance_computer_cosine() -> Result<()> {
let config = GpuIndexBuilderConfig {
distance_metric: GpuDistanceMetric::Cosine,
..Default::default()
};
let computer = GpuBatchDistanceComputer::new(config)?;
let queries = vec![vec![1.0f32, 0.0, 0.0], vec![0.0, 1.0, 0.0]];
let database = vec![
vec![1.0f32, 0.0, 0.0],
vec![0.0, 1.0, 0.0],
vec![0.0, 0.0, 1.0],
];
let distances = computer.compute_distances(&queries, &database)?;
assert_eq!(distances.len(), 2);
assert_eq!(distances[0].len(), 3);
assert!(
distances[0][0].abs() < 1e-5,
"Identical vectors should have distance 0"
);
assert!(
(distances[0][1] - 1.0).abs() < 1e-5,
"Orthogonal vectors should have cosine distance 1.0"
);
Ok(())
}
#[test]
fn test_batch_distance_computer_euclidean() -> Result<()> {
let config = GpuIndexBuilderConfig {
distance_metric: GpuDistanceMetric::Euclidean,
..Default::default()
};
let computer = GpuBatchDistanceComputer::new(config)?;
let queries = vec![vec![0.0f32, 0.0, 0.0]];
let database = vec![vec![3.0f32, 4.0, 0.0]];
let distances = computer.compute_distances(&queries, &database)?;
assert!(
(distances[0][0] - 5.0).abs() < 1e-4,
"Expected Euclidean distance of 5.0"
);
Ok(())
}
#[test]
fn test_batch_distance_dimension_mismatch() -> Result<()> {
let config = GpuIndexBuilderConfig::default();
let computer = GpuBatchDistanceComputer::new(config)?;
let queries = vec![vec![1.0f32, 2.0]];
let database = vec![vec![1.0f32, 2.0, 3.0]];
let result = computer.compute_distances(&queries, &database);
assert!(result.is_err(), "Should fail on dimension mismatch");
Ok(())
}
#[test]
fn test_distance_metric_inner_product() -> Result<()> {
let config = GpuIndexBuilderConfig {
distance_metric: GpuDistanceMetric::InnerProduct,
..Default::default()
};
let computer = GpuBatchDistanceComputer::new(config)?;
let queries = vec![vec![1.0f32, 2.0, 3.0]];
let database = vec![vec![4.0f32, 5.0, 6.0]];
let distances = computer.compute_distances(&queries, &database)?;
assert!(
(distances[0][0] + 32.0).abs() < 1e-4,
"Inner product distance should be -32"
);
Ok(())
}
#[test]
fn test_builder_clears_after_build() -> Result<()> {
let config = GpuIndexBuilderConfig {
m: 4,
ef_construction: 10,
num_layers: 2,
..Default::default()
};
let mut builder = GpuHnswIndexBuilder::new(config)?;
let vectors = make_test_vectors(10, 4);
for (i, v) in vectors.iter().enumerate() {
builder.add_vector(i, v.clone())?;
}
let _ = builder.build()?;
assert!(
builder.pending_vectors.is_empty(),
"Pending vectors should be cleared after build"
);
Ok(())
}
#[test]
fn test_layer_assignment_distribution() -> Result<()> {
let config = GpuIndexBuilderConfig {
m: 16,
num_layers: 5,
..Default::default()
};
let builder = GpuHnswIndexBuilder::new(config.clone())?;
let layers = builder.assign_layers(1000);
let layer_0_count = layers.iter().filter(|&&l| l == 0).count();
assert!(
layer_0_count > 500,
"More than half should be at layer 0, got {}",
layer_0_count
);
for &l in &layers {
assert!(l < config.num_layers, "Layer {} exceeds num_layers", l);
}
Ok(())
}
#[test]
fn test_search_dimension_mismatch_error() -> Result<()> {
let config = GpuIndexBuilderConfig {
m: 4,
ef_construction: 10,
num_layers: 2,
..Default::default()
};
let mut builder = GpuHnswIndexBuilder::new(config)?;
for i in 0..5 {
builder.add_vector(i, vec![1.0f32; 8])?;
}
let graph = builder.build()?;
let result = graph.search_knn(&[1.0, 2.0], 3, 10);
assert!(
result.is_err(),
"Should fail on dimension mismatch in search"
);
Ok(())
}
#[test]
fn test_search_empty_graph() -> Result<()> {
let config = GpuIndexBuilderConfig::default();
let graph = HnswGraph {
nodes: Vec::new(),
entry_point: 0,
max_layer: 0,
config,
stats: GpuIndexBuildStats::default(),
};
let results = graph.search_knn(&[1.0, 2.0], 5, 10)?;
assert!(
results.is_empty(),
"Empty graph search should return no results"
);
Ok(())
}
#[test]
fn test_incremental_builder_pending_count() -> Result<()> {
let config = GpuIndexBuilderConfig {
m: 4,
ef_construction: 10,
num_layers: 2,
..Default::default()
};
let mut inc_builder = IncrementalGpuIndexBuilder::new(config, 100)?;
assert_eq!(inc_builder.pending_count(), 0);
inc_builder.add_vector(0, vec![1.0f32; 4])?;
inc_builder.add_vector(1, vec![2.0f32; 4])?;
assert_eq!(inc_builder.pending_count(), 2);
Ok(())
}
#[test]
fn test_gpu_distance_metric_variants() -> Result<()> {
let metrics = [
GpuDistanceMetric::Cosine,
GpuDistanceMetric::Euclidean,
GpuDistanceMetric::InnerProduct,
GpuDistanceMetric::CosineF16,
GpuDistanceMetric::EuclideanF16,
];
for metric in &metrics {
let config = GpuIndexBuilderConfig {
distance_metric: *metric,
m: 4,
ef_construction: 10,
num_layers: 2,
..Default::default()
};
let computer = GpuBatchDistanceComputer::new(config)?;
let queries = vec![vec![1.0f32, 0.0]];
let db = vec![vec![0.0f32, 1.0]];
let result = computer.compute_distances(&queries, &db);
assert!(
result.is_ok(),
"Distance computation failed for {:?}",
metric
);
}
Ok(())
}
#[test]
fn test_batch_size_calculator_basic() {
let size = BatchSizeCalculator::calculate_batch_size(128, 4096);
assert!(size >= 1, "Batch size should be at least 1");
}
#[test]
fn test_batch_size_calculator_zero_dim_returns_default() {
let size = BatchSizeCalculator::calculate_batch_size(0, 4096);
assert!(
size > 0,
"Zero-dim should return positive default batch size"
);
}
#[test]
fn test_batch_size_calculator_large_dim() {
let size = BatchSizeCalculator::calculate_batch_size(16384, 256);
assert!(size >= 1, "Even large dim should yield at least 1");
assert!(
size <= 8192,
"Very large dim with limited memory should give reduced batch: got {}",
size
);
}
#[test]
fn test_optimal_batch_for_float32() {
let size = BatchSizeCalculator::optimal_batch_for_float32(512, 8192);
assert!(size >= 1);
}
#[test]
fn test_optimal_batch_increases_with_memory() {
let small = BatchSizeCalculator::optimal_batch_for_float32(128, 256);
let large = BatchSizeCalculator::optimal_batch_for_float32(128, 8192);
assert!(
large >= small,
"More memory should yield at least as large a batch: small={} large={}",
small,
large
);
}
#[test]
fn test_gpu_memory_budget_bytes_per_vector() {
let budget = GpuMemoryBudget::new(4096, 512);
assert_eq!(budget.bytes_per_vector(128), 512);
assert_eq!(budget.bytes_per_vector(1), 4);
}
#[test]
fn test_gpu_memory_budget_available() {
let budget = GpuMemoryBudget::new(4096, 512);
assert_eq!(budget.available_mb, 3584);
}
#[test]
fn test_gpu_memory_budget_can_fit_batch_true() {
let budget = GpuMemoryBudget::new(4096, 512);
assert!(budget.can_fit_batch(1000, 128));
}
#[test]
fn test_gpu_memory_budget_can_fit_batch_false() {
let budget = GpuMemoryBudget::new(64, 32);
assert!(!budget.can_fit_batch(1200, 8192));
}
#[test]
fn test_gpu_memory_budget_zero_reserved() {
let budget = GpuMemoryBudget::new(1024, 0);
assert_eq!(budget.available_mb, 1024);
}
#[test]
fn test_gpu_index_optimizer_creates_budget() {
let optimizer = GpuIndexOptimizer::new(4096, 512);
let budget = optimizer.memory_budget();
assert_eq!(budget.total_mb, 4096);
assert_eq!(budget.reserved_mb, 512);
}
#[test]
fn test_gpu_index_optimizer_recommend_batch_size() {
let optimizer = GpuIndexOptimizer::new(4096, 512);
let size = optimizer.recommend_batch_size(256);
assert!(size >= 1);
}
#[test]
fn test_pipelined_index_builder_prepare() {
let batch = PipelinedIndexBuilder::stage_a_prepare(&[1.0f32, 2.0, 3.0, 4.0]);
assert_eq!(batch.data.len(), 4);
assert!(batch.prepared_at.elapsed().as_secs() < 5);
}
#[test]
fn test_pipelined_index_builder_compute() {
let prepared = PipelinedIndexBuilder::stage_a_prepare(&[1.0f32, 0.0, 0.0, 0.0]);
let computed = PipelinedIndexBuilder::stage_b_compute(prepared);
assert!(!computed.distances.is_empty());
}
#[test]
fn test_pipelined_index_builder_finalize() {
let prepared = PipelinedIndexBuilder::stage_a_prepare(&[1.0f32, 2.0, 3.0, 4.0]);
let computed = PipelinedIndexBuilder::stage_b_compute(prepared);
let indexed = PipelinedIndexBuilder::stage_c_finalize(computed);
assert!(!indexed.neighbor_ids.is_empty() || indexed.neighbor_ids.is_empty());
assert!(indexed.finalized_at.elapsed().as_secs() < 5);
}
#[test]
fn test_pipelined_index_builder_full_pipeline() {
let data: Vec<f32> = (0..128).map(|i| i as f32 / 128.0).collect();
let prepared = PipelinedIndexBuilder::stage_a_prepare(&data);
let computed = PipelinedIndexBuilder::stage_b_compute(prepared);
let indexed = PipelinedIndexBuilder::stage_c_finalize(computed);
let _ = indexed;
}
#[test]
fn test_pipelined_builder_stage_b_distances_nonnegative() {
let data: Vec<f32> = vec![3.0, 4.0, 0.0]; let prepared = PipelinedIndexBuilder::stage_a_prepare(&data);
let computed = PipelinedIndexBuilder::stage_b_compute(prepared);
for &d in &computed.distances {
assert!(d >= 0.0, "Distance should be non-negative, got {}", d);
}
}
#[test]
fn test_batch_size_calculator_reasonable_bounds() {
let size = BatchSizeCalculator::calculate_batch_size(768, 16_384);
assert!(
size >= 1_000,
"Should support large batches on big GPU: {}",
size
);
assert!(
size <= 1_000_000,
"Batch size should be capped reasonably: {}",
size
);
}
}