#[cfg(feature = "gpu-index")]
use std::sync::Mutex;
#[cfg(feature = "gpu-index")]
use ndarray_015::Array2;
#[cfg(feature = "gpu-index")]
use thiserror::Error;
#[cfg(feature = "gpu-index")]
use crate::embedder::Embedding;
#[cfg(feature = "gpu-index")]
use crate::index::{IndexResult, VectorIndex};
#[cfg(feature = "gpu-index")]
#[derive(Error, Debug)]
pub enum CagraError {
#[error("cuVS error: {0}")]
Cuvs(String),
#[error("No GPU available")]
NoGpu,
#[error("Dimension mismatch: expected {expected}, got {actual}")]
DimensionMismatch { expected: usize, actual: usize },
#[error("Build error: {0}")]
Build(String),
#[error("Index not built")]
NotBuilt,
}
#[cfg(feature = "gpu-index")]
fn cagra_max_bytes() -> usize {
static MAX: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
*MAX.get_or_init(|| {
std::env::var("CQS_CAGRA_MAX_BYTES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(2 * 1024 * 1024 * 1024)
})
}
#[cfg(feature = "gpu-index")]
pub struct CagraIndex {
dim: usize,
gpu: Mutex<GpuState>,
id_map: Vec<String>,
}
#[cfg(feature = "gpu-index")]
struct GpuState {
resources: cuvs::Resources,
index: cuvs::cagra::Index,
}
#[cfg(feature = "gpu-index")]
impl std::fmt::Debug for CagraIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CagraIndex")
.field("dim", &self.dim)
.field("len", &self.id_map.len())
.finish()
}
}
#[cfg(feature = "gpu-index")]
impl CagraIndex {
pub fn gpu_available() -> bool {
cuvs::Resources::new().is_ok()
}
pub fn build(embeddings: Vec<(String, Embedding)>, dim: usize) -> Result<Self, CagraError> {
let _span = tracing::debug_span!("cagra_build").entered();
let (id_map, flat_data, n_vectors) = crate::hnsw::prepare_index_data(embeddings, dim)
.map_err(|e| CagraError::Build(e.to_string()))?;
tracing::info!(n_vectors, "Building CAGRA index");
let resources = cuvs::Resources::new().map_err(|e| CagraError::Cuvs(e.to_string()))?;
let dataset = Array2::from_shape_vec((n_vectors, dim), flat_data)
.map_err(|e| CagraError::Cuvs(format!("Failed to create array: {}", e)))?;
let build_params =
cuvs::cagra::IndexParams::new().map_err(|e| CagraError::Cuvs(e.to_string()))?;
let index = cuvs::cagra::Index::build(&resources, &build_params, &dataset)
.map_err(|e| CagraError::Cuvs(e.to_string()))?;
tracing::info!("CAGRA index built successfully");
Ok(Self {
dim,
gpu: Mutex::new(GpuState { resources, index }),
id_map,
})
}
pub fn len(&self) -> usize {
self.id_map.len()
}
pub fn is_empty(&self) -> bool {
self.id_map.is_empty()
}
pub fn search(&self, query: &Embedding, k: usize) -> Vec<IndexResult> {
let _span = tracing::debug_span!("cagra_search", k).entered();
if self.id_map.is_empty() || k == 0 {
return Vec::new();
}
if query.len() != self.dim {
tracing::warn!(
"Query dimension mismatch: expected {}, got {}",
self.dim,
query.len()
);
return Vec::new();
}
let gpu = self.gpu.lock().unwrap_or_else(|poisoned| {
tracing::debug!("CAGRA GPU mutex poisoned, recovering");
poisoned.into_inner()
});
self.search_impl(&gpu, query, k, None)
}
fn search_impl(
&self,
gpu: &GpuState,
query: &Embedding,
k: usize,
bitset_device: Option<&cuvs::ManagedTensor>,
) -> Vec<IndexResult> {
let itopk_size = (k * 2).clamp(128, 512);
if k * 2 > 512 {
tracing::debug!(k, "CAGRA itopk_size clamped to 512, recall may degrade");
}
let search_params = match cuvs::cagra::SearchParams::new() {
Ok(params) => params.set_itopk_size(itopk_size),
Err(e) => {
tracing::error!(error = %e, "Failed to create search params");
return Vec::new();
}
};
let query_host = match Array2::from_shape_vec((1, self.dim), query.as_slice().to_vec()) {
Ok(arr) => arr,
Err(e) => {
tracing::error!(expected_dim = self.dim, error = %e, "Invalid query shape");
return Vec::new();
}
};
let mut neighbors_host: Array2<u32> = Array2::zeros((1, k));
let mut distances_host: Array2<f32> = Array2::zeros((1, k));
let query_device = match cuvs::ManagedTensor::from(&query_host).to_device(&gpu.resources) {
Ok(t) => t,
Err(e) => {
tracing::error!(error = %e, "Failed to copy query to device");
return Vec::new();
}
};
let neighbors_device =
match cuvs::ManagedTensor::from(&neighbors_host).to_device(&gpu.resources) {
Ok(t) => t,
Err(e) => {
tracing::error!(error = %e, "Failed to allocate neighbors on device");
return Vec::new();
}
};
let distances_device =
match cuvs::ManagedTensor::from(&distances_host).to_device(&gpu.resources) {
Ok(t) => t,
Err(e) => {
tracing::error!(error = %e, "Failed to allocate distances on device");
return Vec::new();
}
};
let result = if let Some(bitset) = bitset_device {
gpu.index.search_with_filter(
&gpu.resources,
&search_params,
&query_device,
&neighbors_device,
&distances_device,
bitset,
)
} else {
gpu.index.search(
&gpu.resources,
&search_params,
&query_device,
&neighbors_device,
&distances_device,
)
};
if let Err(e) = result {
tracing::error!(error = %e, "CAGRA search failed");
return Vec::new();
}
if let Err(e) = neighbors_device.to_host(&gpu.resources, &mut neighbors_host) {
tracing::error!(error = %e, "Failed to copy neighbors from device");
return Vec::new();
}
if let Err(e) = distances_device.to_host(&gpu.resources, &mut distances_host) {
tracing::error!(error = %e, "Failed to copy distances from device");
return Vec::new();
}
let mut results = Vec::with_capacity(k);
let neighbor_row = neighbors_host.row(0);
let distance_row = distances_host.row(0);
for i in 0..k {
let idx = neighbor_row[i] as usize;
if idx < self.id_map.len() {
let dist = distance_row[i];
let score = 1.0 - dist / 2.0;
results.push(IndexResult {
id: self.id_map[idx].clone(),
score,
});
}
}
results
}
}
#[cfg(feature = "gpu-index")]
impl VectorIndex for CagraIndex {
fn search(&self, query: &Embedding, k: usize) -> Vec<IndexResult> {
CagraIndex::search(self, query, k)
}
fn len(&self) -> usize {
CagraIndex::len(self)
}
fn is_empty(&self) -> bool {
CagraIndex::is_empty(self)
}
fn name(&self) -> &'static str {
"CAGRA"
}
fn dim(&self) -> usize {
self.dim
}
fn search_with_filter(
&self,
query: &Embedding,
k: usize,
filter: &dyn Fn(&str) -> bool,
) -> Vec<IndexResult> {
let _span = tracing::debug_span!("cagra_search_filtered", k).entered();
if self.id_map.is_empty() || k == 0 {
return Vec::new();
}
if query.len() != self.dim {
tracing::warn!(
"Query dimension mismatch: expected {}, got {}",
self.dim,
query.len()
);
return Vec::new();
}
let n = self.id_map.len();
let n_words = n.div_ceil(32);
let mut bitset = vec![0u32; n_words];
let mut included = 0usize;
for (i, id) in self.id_map.iter().enumerate() {
if filter(id) {
bitset[i / 32] |= 1u32 << (i % 32);
included += 1;
}
}
if included == n {
return CagraIndex::search(self, query, k);
}
if included == 0 {
return Vec::new();
}
tracing::debug!(
total = n,
included,
excluded = n - included,
"CAGRA bitset filter"
);
let gpu = self.gpu.lock().unwrap_or_else(|poisoned| {
tracing::debug!("CAGRA GPU mutex poisoned, recovering");
poisoned.into_inner()
});
let bitset_host = ndarray_015::Array1::from_vec(bitset);
let bitset_device = match cuvs::ManagedTensor::from(&bitset_host).to_device(&gpu.resources)
{
Ok(t) => t,
Err(e) => {
tracing::error!(error = %e, "Failed to upload bitset to device");
return Vec::new();
}
};
self.search_impl(&gpu, query, k, Some(&bitset_device))
}
}
#[cfg(feature = "gpu-index")]
unsafe impl Send for CagraIndex {}
#[cfg(feature = "gpu-index")]
unsafe impl Sync for CagraIndex {}
#[cfg(feature = "gpu-index")]
impl CagraIndex {
pub fn build_from_store(store: &crate::Store, dim: usize) -> Result<Self, CagraError> {
let _span = tracing::debug_span!("cagra_build_from_store").entered();
let chunk_count = store
.chunk_count()
.map_err(|e| CagraError::Cuvs(format!("Failed to count chunks: {}", e)))?
as usize;
if chunk_count == 0 {
return Err(CagraError::Cuvs("No embeddings in store".into()));
}
tracing::info!(chunk_count, "Building CAGRA index from chunk embeddings");
let max_bytes = cagra_max_bytes();
let estimated_bytes = chunk_count.saturating_mul(dim).saturating_mul(4);
if estimated_bytes > max_bytes {
return Err(CagraError::Cuvs(format!(
"Dataset too large for GPU indexing: {}MB estimated (limit {}MB)",
estimated_bytes / (1024 * 1024),
max_bytes / (1024 * 1024)
)));
}
let mut id_map = Vec::with_capacity(chunk_count);
let mut flat_data = Vec::with_capacity(chunk_count * dim);
const BATCH_SIZE: usize = 10_000;
let mut loaded_chunks = 0usize;
for batch_result in store.embedding_batches(BATCH_SIZE) {
let batch = batch_result
.map_err(|e| CagraError::Cuvs(format!("Failed to fetch batch: {}", e)))?;
let batch_len = batch.len();
for (chunk_id, embedding) in batch {
if embedding.len() != dim {
return Err(CagraError::DimensionMismatch {
expected: dim,
actual: embedding.len(),
});
}
id_map.push(chunk_id);
flat_data.extend(embedding.into_inner());
}
loaded_chunks += batch_len;
let progress_pct = if chunk_count > 0 {
(loaded_chunks * 100) / chunk_count
} else {
100
};
tracing::info!(
"CAGRA loading progress: {} / {} chunks ({}%)",
loaded_chunks,
chunk_count,
progress_pct
);
}
Self::build_from_flat(id_map, flat_data, dim)
}
pub(crate) fn build_from_flat(
id_map: Vec<String>,
flat_data: Vec<f32>,
dim: usize,
) -> Result<Self, CagraError> {
let n_vectors = id_map.len();
if n_vectors == 0 {
return Err(CagraError::Cuvs("Cannot build empty index".into()));
}
tracing::info!(n_vectors, "Building CAGRA index");
let resources = cuvs::Resources::new().map_err(|e| CagraError::Cuvs(e.to_string()))?;
let dataset = Array2::from_shape_vec((n_vectors, dim), flat_data)
.map_err(|e| CagraError::Cuvs(format!("Failed to create array: {}", e)))?;
let build_params =
cuvs::cagra::IndexParams::new().map_err(|e| CagraError::Cuvs(e.to_string()))?;
let index = cuvs::cagra::Index::build(&resources, &build_params, &dataset)
.map_err(|e| CagraError::Cuvs(e.to_string()))?;
tracing::info!("CAGRA index built successfully");
Ok(Self {
dim,
gpu: Mutex::new(GpuState { resources, index }),
id_map,
})
}
}
#[cfg(all(test, feature = "gpu-index"))]
mod tests {
use super::*;
use crate::index::VectorIndex;
use crate::EMBEDDING_DIM;
use std::sync::Mutex;
static GPU_LOCK: Mutex<()> = Mutex::new(());
fn make_embedding(seed: u32) -> Embedding {
let mut v = vec![0.0f32; EMBEDDING_DIM];
for (i, val) in v.iter_mut().enumerate() {
*val = ((seed as f32 * 10.0) + (i as f32 * 0.001)).sin();
}
let norm: f32 = v.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
v.iter_mut().for_each(|x| *x /= norm);
}
Embedding::new(v)
}
fn require_gpu() -> bool {
if !CagraIndex::gpu_available() {
eprintln!("Skipping CAGRA test: no GPU available");
return false;
}
true
}
fn build_test_index(n: u32) -> CagraIndex {
let embeddings: Vec<(String, Embedding)> = (0..n)
.map(|i| (format!("chunk_{}", i), make_embedding(i)))
.collect();
CagraIndex::build(embeddings, EMBEDDING_DIM).expect("Failed to build test index")
}
#[test]
fn test_gpu_available() {
let _ = CagraIndex::gpu_available();
}
#[test]
fn test_build_simple() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(5);
assert_eq!(index.len(), 5);
assert!(!index.is_empty());
}
#[test]
fn test_build_empty() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let result = CagraIndex::build(vec![], EMBEDDING_DIM);
assert!(result.is_err());
}
#[test]
fn test_build_dimension_mismatch() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let bad_embedding = Embedding::new(vec![1.0; 100]);
let result = CagraIndex::build(vec![("bad".into(), bad_embedding)], EMBEDDING_DIM);
match result {
Err(CagraError::Build(_)) => {}
Err(e) => panic!("Expected Build error, got: {:?}", e),
Ok(_) => panic!("Expected error, got Ok"),
}
}
#[test]
fn test_search_self_match() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(10);
let query = make_embedding(3);
let results = index.search(&query, 5);
assert!(!results.is_empty(), "Search returned no results");
assert_eq!(results[0].id, "chunk_3", "Top result should be chunk_3");
assert!(
results[0].score > 0.9,
"Self-match score should be high, got {}",
results[0].score
);
}
#[test]
fn test_search_k_limiting() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(10);
let query = make_embedding(0);
let results = index.search(&query, 3);
assert!(results.len() <= 3);
}
#[test]
fn test_search_ordering() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(10);
let query = make_embedding(0);
let results = index.search(&query, 5);
for window in results.windows(2) {
assert!(
window[0].score >= window[1].score,
"Results not sorted: {} < {}",
window[0].score,
window[1].score
);
}
}
#[test]
fn test_search_dimension_mismatch_query() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(5);
let bad_query = Embedding::new(vec![1.0; 100]);
let results = index.search(&bad_query, 3);
assert!(results.is_empty());
}
#[test]
fn test_multiple_searches() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(10);
let results1 = index.search(&make_embedding(0), 3);
assert!(!results1.is_empty());
let results2 = index.search(&make_embedding(5), 3);
assert!(!results2.is_empty());
assert_eq!(results2[0].id, "chunk_5");
}
#[test]
fn test_consecutive_searches() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(20);
for i in 0..10 {
let query = make_embedding(i);
let results = index.search(&query, 5);
assert!(!results.is_empty(), "Search {} should return results", i);
assert!(results.len() <= 5);
}
}
#[test]
fn test_search_with_invalid_k() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(5);
let results = index.search(&make_embedding(0), 0);
assert!(results.is_empty());
let results = index.search(&make_embedding(1), 3);
assert!(!results.is_empty());
}
#[test]
fn test_name_returns_cagra() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(5);
let vi: &dyn VectorIndex = &index;
assert_eq!(vi.name(), "CAGRA");
}
#[test]
fn test_oom_guard_arithmetic() {
let max_bytes = super::cagra_max_bytes();
let max_chunks = max_bytes / (EMBEDDING_DIM * 4);
let under = max_chunks.saturating_mul(EMBEDDING_DIM).saturating_mul(4);
assert!(under <= max_bytes);
let over = (max_chunks + 1)
.saturating_mul(EMBEDDING_DIM)
.saturating_mul(4);
assert!(over > max_bytes);
}
}