#[cfg(feature = "gpu-index")]
use std::path::Path;
#[cfg(feature = "gpu-index")]
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "gpu-index")]
use std::sync::Mutex;
#[cfg(feature = "gpu-index")]
use ndarray_015::Array2;
#[cfg(feature = "gpu-index")]
use thiserror::Error;
#[cfg(feature = "gpu-index")]
use crate::embedder::Embedding;
#[cfg(feature = "gpu-index")]
use crate::index::{IndexResult, VectorIndex};
#[cfg(feature = "gpu-index")]
const CAGRA_META_MAGIC: &str = "CAGRA01";
#[cfg(feature = "gpu-index")]
const CAGRA_META_VERSION: u32 = 1;
#[cfg(feature = "gpu-index")]
const INVALID_DISTANCE: f32 = f32::INFINITY;
#[cfg(feature = "gpu-index")]
#[derive(Error, Debug)]
pub enum CagraError {
#[error("cuVS error: {0}")]
Cuvs(String),
#[error("No GPU available")]
NoGpu,
#[error("Dimension mismatch: expected {expected}, got {actual}")]
DimensionMismatch { expected: usize, actual: usize },
#[error("Build error: {0}")]
Build(String),
#[error("Index not built")]
NotBuilt,
#[error("Persistence IO error: {0}")]
Io(String),
#[error("Persistence metadata invalid: {0}")]
BadMeta(String),
#[error("Persisted CAGRA index is stale ({reason})")]
Stale { reason: String },
#[error("Persisted CAGRA index checksum mismatch (file: {0})")]
ChecksumMismatch(String),
}
#[cfg(feature = "gpu-index")]
fn cagra_max_bytes() -> usize {
static MAX: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
*MAX.get_or_init(|| {
std::env::var("CQS_CAGRA_MAX_BYTES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(2 * 1024 * 1024 * 1024)
})
}
#[cfg(feature = "gpu-index")]
fn cagra_itopk_max_default(n_vectors: usize) -> usize {
let log2 = (n_vectors.max(1) as f64).log2();
let scaled = (log2 * 32.0) as usize;
scaled.clamp(128, 4096)
}
#[cfg(feature = "gpu-index")]
fn cagra_build_params() -> Result<cuvs::cagra::IndexParams, CagraError> {
let graph_degree: usize = std::env::var("CQS_CAGRA_GRAPH_DEGREE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(64);
let intermediate_graph_degree: usize = std::env::var("CQS_CAGRA_INTERMEDIATE_GRAPH_DEGREE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(128);
let params = cuvs::cagra::IndexParams::new()
.map_err(|e| CagraError::Cuvs(e.to_string()))?
.set_graph_degree(graph_degree)
.set_intermediate_graph_degree(intermediate_graph_degree);
tracing::info!(
graph_degree,
intermediate_graph_degree,
"CAGRA build params"
);
Ok(params)
}
#[cfg(feature = "gpu-index")]
pub struct CagraIndex {
dim: usize,
gpu: Mutex<GpuState>,
id_map: Vec<String>,
poisoned: AtomicBool,
}
#[cfg(feature = "gpu-index")]
struct GpuState {
resources: cuvs::Resources,
index: cuvs::cagra::Index,
}
#[cfg(feature = "gpu-index")]
impl std::fmt::Debug for CagraIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CagraIndex")
.field("dim", &self.dim)
.field("len", &self.id_map.len())
.finish()
}
}
#[cfg(feature = "gpu-index")]
impl CagraIndex {
pub fn gpu_available() -> bool {
cuvs::Resources::new().is_ok()
}
pub fn build(embeddings: Vec<(String, Embedding)>, dim: usize) -> Result<Self, CagraError> {
let _span = tracing::debug_span!("cagra_build").entered();
let (id_map, flat_data, n_vectors) = crate::hnsw::prepare_index_data(embeddings, dim)
.map_err(|e| CagraError::Build(e.to_string()))?;
tracing::info!(n_vectors, "Building CAGRA index");
let resources = cuvs::Resources::new().map_err(|e| CagraError::Cuvs(e.to_string()))?;
let dataset = Array2::from_shape_vec((n_vectors, dim), flat_data)
.map_err(|e| CagraError::Cuvs(format!("Failed to create array: {}", e)))?;
let build_params = cagra_build_params()?;
let index = cuvs::cagra::Index::build(&resources, &build_params, &dataset)
.map_err(|e| CagraError::Cuvs(e.to_string()))?;
tracing::info!("CAGRA index built successfully");
Ok(Self {
dim,
gpu: Mutex::new(GpuState { resources, index }),
id_map,
poisoned: AtomicBool::new(false),
})
}
pub fn len(&self) -> usize {
self.id_map.len()
}
pub fn is_empty(&self) -> bool {
self.id_map.is_empty()
}
pub fn search(&self, query: &Embedding, k: usize) -> Vec<IndexResult> {
let _span = tracing::debug_span!("cagra_search", k).entered();
if self.id_map.is_empty() || k == 0 {
return Vec::new();
}
if query.len() != self.dim {
tracing::warn!(
expected_dim = self.dim,
actual_dim = query.len(),
"Query dimension mismatch"
);
return Vec::new();
}
let gpu = self.gpu.lock().unwrap_or_else(|poisoned| {
self.poisoned.store(true, Ordering::Release);
tracing::warn!(
"CAGRA GPU mutex poisoned — results from this call are discarded \
and the index will be rebuilt on the next vector_index() access"
);
poisoned.into_inner()
});
if self.poisoned.load(Ordering::Acquire) {
return Vec::new();
}
self.search_impl(&gpu, query, k, None)
}
fn search_impl(
&self,
gpu: &GpuState,
query: &Embedding,
k: usize,
bitset_device: Option<&cuvs::ManagedTensor>,
) -> Vec<IndexResult> {
let itopk_min = std::env::var("CQS_CAGRA_ITOPK_MIN")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(128);
let itopk_max = std::env::var("CQS_CAGRA_ITOPK_MAX")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or_else(|| cagra_itopk_max_default(self.len()));
let itopk_size = (k * 2).clamp(itopk_min, itopk_max);
tracing::debug!(
itopk_size,
itopk_min,
itopk_max,
k,
n_vectors = self.len(),
"CAGRA itopk resolved"
);
let search_params = match cuvs::cagra::SearchParams::new() {
Ok(params) => params.set_itopk_size(itopk_size),
Err(e) => {
tracing::error!(error = %e, "Failed to create search params");
return Vec::new();
}
};
let query_host = match Array2::from_shape_vec((1, self.dim), query.as_slice().to_vec()) {
Ok(arr) => arr,
Err(e) => {
tracing::error!(expected_dim = self.dim, error = %e, "Invalid query shape");
return Vec::new();
}
};
let mut neighbors_host: Array2<u32> = Array2::zeros((1, k));
let mut distances_host: Array2<f32> = Array2::from_elem((1, k), INVALID_DISTANCE);
let query_device = match cuvs::ManagedTensor::from(&query_host).to_device(&gpu.resources) {
Ok(t) => t,
Err(e) => {
tracing::error!(error = %e, "Failed to copy query to device");
return Vec::new();
}
};
let neighbors_device =
match cuvs::ManagedTensor::from(&neighbors_host).to_device(&gpu.resources) {
Ok(t) => t,
Err(e) => {
tracing::error!(error = %e, "Failed to allocate neighbors on device");
return Vec::new();
}
};
let distances_device =
match cuvs::ManagedTensor::from(&distances_host).to_device(&gpu.resources) {
Ok(t) => t,
Err(e) => {
tracing::error!(error = %e, "Failed to allocate distances on device");
return Vec::new();
}
};
let result = if let Some(bitset) = bitset_device {
gpu.index.search_with_filter(
&gpu.resources,
&search_params,
&query_device,
&neighbors_device,
&distances_device,
bitset,
)
} else {
gpu.index.search(
&gpu.resources,
&search_params,
&query_device,
&neighbors_device,
&distances_device,
)
};
if let Err(e) = result {
tracing::error!(error = %e, "CAGRA search failed");
return Vec::new();
}
if let Err(e) = neighbors_device.to_host(&gpu.resources, &mut neighbors_host) {
tracing::error!(error = %e, "Failed to copy neighbors from device");
return Vec::new();
}
if let Err(e) = distances_device.to_host(&gpu.resources, &mut distances_host) {
tracing::error!(error = %e, "Failed to copy distances from device");
return Vec::new();
}
let mut results = Vec::with_capacity(k);
let neighbor_row = neighbors_host.row(0);
let distance_row = distances_host.row(0);
for i in 0..k {
let idx = neighbor_row[i] as usize;
let dist = distance_row[i];
if !dist.is_finite() {
continue;
}
if idx < self.id_map.len() {
let score = 1.0 - dist / 2.0;
results.push(IndexResult {
id: self.id_map[idx].clone(),
score,
});
}
}
results
}
}
#[cfg(feature = "gpu-index")]
impl VectorIndex for CagraIndex {
fn search(&self, query: &Embedding, k: usize) -> Vec<IndexResult> {
CagraIndex::search(self, query, k)
}
fn len(&self) -> usize {
CagraIndex::len(self)
}
fn is_empty(&self) -> bool {
CagraIndex::is_empty(self)
}
fn name(&self) -> &'static str {
"CAGRA"
}
fn is_poisoned(&self) -> bool {
self.poisoned.load(Ordering::Acquire)
}
fn dim(&self) -> usize {
self.dim
}
fn search_with_filter(
&self,
query: &Embedding,
k: usize,
filter: &dyn Fn(&str) -> bool,
) -> Vec<IndexResult> {
let _span = tracing::debug_span!("cagra_search_filtered", k).entered();
if self.id_map.is_empty() || k == 0 {
return Vec::new();
}
if query.len() != self.dim {
tracing::warn!(
expected_dim = self.dim,
actual_dim = query.len(),
"Query dimension mismatch"
);
return Vec::new();
}
let n = self.id_map.len();
let n_words = n.div_ceil(32);
let mut bitset = vec![0u32; n_words];
let mut included = 0usize;
for (i, id) in self.id_map.iter().enumerate() {
if filter(id) {
bitset[i / 32] |= 1u32 << (i % 32);
included += 1;
}
}
if included == n {
return CagraIndex::search(self, query, k);
}
if included == 0 {
return Vec::new();
}
tracing::debug!(
total = n,
included,
excluded = n - included,
"CAGRA bitset filter"
);
let gpu = self.gpu.lock().unwrap_or_else(|poisoned| {
self.poisoned.store(true, Ordering::Release);
tracing::warn!(
"CAGRA GPU mutex poisoned (filtered path) — results discarded \
and index will be rebuilt on next vector_index()"
);
poisoned.into_inner()
});
if self.poisoned.load(Ordering::Acquire) {
return Vec::new();
}
let bitset_host = ndarray_015::Array1::from_vec(bitset);
let bitset_device = match cuvs::ManagedTensor::from(&bitset_host).to_device(&gpu.resources)
{
Ok(t) => t,
Err(e) => {
tracing::error!(error = %e, "Failed to upload bitset to device");
return Vec::new();
}
};
self.search_impl(&gpu, query, k, Some(&bitset_device))
}
}
#[cfg(feature = "gpu-index")]
unsafe impl Send for CagraIndex {}
#[cfg(feature = "gpu-index")]
unsafe impl Sync for CagraIndex {}
#[cfg(feature = "gpu-index")]
impl CagraIndex {
pub fn build_from_store<Mode>(
store: &crate::Store<Mode>,
dim: usize,
) -> Result<Self, CagraError> {
let _span = tracing::debug_span!("cagra_build_from_store").entered();
let chunk_count = store
.chunk_count()
.map_err(|e| CagraError::Cuvs(format!("Failed to count chunks: {}", e)))?
as usize;
if chunk_count == 0 {
return Err(CagraError::Cuvs("No embeddings in store".into()));
}
tracing::info!(chunk_count, "Building CAGRA index from chunk embeddings");
let max_bytes = cagra_max_bytes();
let estimated_bytes = chunk_count.saturating_mul(dim).saturating_mul(4);
if estimated_bytes > max_bytes {
return Err(CagraError::Cuvs(format!(
"Dataset too large for GPU indexing: {}MB estimated (limit {}MB)",
estimated_bytes / (1024 * 1024),
max_bytes / (1024 * 1024)
)));
}
let mut id_map = Vec::with_capacity(chunk_count);
let mut flat_data = Vec::with_capacity(chunk_count * dim);
const BATCH_SIZE: usize = 10_000;
let mut loaded_chunks = 0usize;
for batch_result in store.embedding_batches(BATCH_SIZE) {
let batch = batch_result
.map_err(|e| CagraError::Cuvs(format!("Failed to fetch batch: {}", e)))?;
let batch_len = batch.len();
for (chunk_id, embedding) in batch {
if embedding.len() != dim {
return Err(CagraError::DimensionMismatch {
expected: dim,
actual: embedding.len(),
});
}
id_map.push(chunk_id);
flat_data.extend(embedding.into_inner());
}
loaded_chunks += batch_len;
let progress_pct = if chunk_count > 0 {
(loaded_chunks * 100) / chunk_count
} else {
100
};
tracing::info!(
"CAGRA loading progress: {} / {} chunks ({}%)",
loaded_chunks,
chunk_count,
progress_pct
);
}
Self::build_from_flat(id_map, flat_data, dim)
}
pub(crate) fn build_from_flat(
id_map: Vec<String>,
flat_data: Vec<f32>,
dim: usize,
) -> Result<Self, CagraError> {
let n_vectors = id_map.len();
if n_vectors == 0 {
return Err(CagraError::Cuvs("Cannot build empty index".into()));
}
tracing::info!(n_vectors, "Building CAGRA index");
let resources = cuvs::Resources::new().map_err(|e| CagraError::Cuvs(e.to_string()))?;
let dataset = Array2::from_shape_vec((n_vectors, dim), flat_data)
.map_err(|e| CagraError::Cuvs(format!("Failed to create array: {}", e)))?;
let build_params = cagra_build_params()?;
let index = cuvs::cagra::Index::build(&resources, &build_params, &dataset)
.map_err(|e| CagraError::Cuvs(e.to_string()))?;
tracing::info!("CAGRA index built successfully");
Ok(Self {
dim,
gpu: Mutex::new(GpuState { resources, index }),
id_map,
poisoned: AtomicBool::new(false),
})
}
}
#[cfg(feature = "gpu-index")]
#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct CagraMeta {
magic: String,
version: u32,
dim: usize,
chunk_count: usize,
splade_generation: u64,
id_map: Vec<String>,
blake3: String,
}
#[cfg(feature = "gpu-index")]
pub fn cagra_persist_enabled() -> bool {
static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
*ENABLED.get_or_init(|| match std::env::var("CQS_CAGRA_PERSIST").as_deref() {
Ok("0") | Ok("false") | Ok("no") => {
tracing::info!("CQS_CAGRA_PERSIST=0 — CAGRA persistence disabled");
false
}
_ => true,
})
}
#[cfg(feature = "gpu-index")]
impl CagraIndex {
pub fn save(&self, path: &Path) -> Result<(), CagraError> {
let _span = tracing::info_span!("cagra_save", path = %path.display()).entered();
if !cagra_persist_enabled() {
return Err(CagraError::Io(
"CAGRA persistence disabled via CQS_CAGRA_PERSIST=0".to_string(),
));
}
let gpu = self.gpu.lock().map_err(|_| {
self.poisoned.store(true, Ordering::Release);
CagraError::Io("CAGRA mutex poisoned, refusing to save".to_string())
})?;
if self.poisoned.load(Ordering::Acquire) {
return Err(CagraError::Io(
"CAGRA index is poisoned, refusing to save".to_string(),
));
}
if let Some(parent) = path.parent() {
if let Err(e) = std::fs::create_dir_all(parent) {
return Err(CagraError::Io(format!(
"Failed to create parent dir {}: {}",
parent.display(),
e
)));
}
}
let meta_path = meta_path_for(path);
let _ = std::fs::remove_file(&meta_path);
let path_str = path.to_str().ok_or_else(|| {
CagraError::Io(format!(
"CAGRA save path is not valid UTF-8: {}",
path.display()
))
})?;
tracing::info!(
n_vectors = self.id_map.len(),
dim = self.dim,
"Serializing CAGRA index to disk"
);
gpu.index
.serialize(&gpu.resources, path_str, true)
.map_err(|e| CagraError::Cuvs(format!("cuvsCagraSerialize failed: {}", e)))?;
let blob_hash = blake3_of_path(path)?;
let meta = CagraMeta {
magic: CAGRA_META_MAGIC.to_string(),
version: CAGRA_META_VERSION,
dim: self.dim,
chunk_count: self.id_map.len(),
splade_generation: 0,
id_map: self.id_map.clone(),
blake3: blob_hash,
};
if let Err(e) = write_meta_atomic(&meta_path, &meta) {
let _ = std::fs::remove_file(path);
return Err(e);
}
tracing::info!(
path = %path.display(),
n_vectors = self.id_map.len(),
"CAGRA index persisted"
);
Ok(())
}
pub fn save_with_store<Mode>(
&self,
path: &Path,
store: &crate::Store<Mode>,
) -> Result<(), CagraError> {
let _span = tracing::info_span!("cagra_save_with_store", path = %path.display()).entered();
if !cagra_persist_enabled() {
return Err(CagraError::Io(
"CAGRA persistence disabled via CQS_CAGRA_PERSIST=0".to_string(),
));
}
let gpu = self.gpu.lock().map_err(|_| {
self.poisoned.store(true, Ordering::Release);
CagraError::Io("CAGRA mutex poisoned, refusing to save".to_string())
})?;
if self.poisoned.load(Ordering::Acquire) {
return Err(CagraError::Io(
"CAGRA index is poisoned, refusing to save".to_string(),
));
}
if let Some(parent) = path.parent() {
if let Err(e) = std::fs::create_dir_all(parent) {
return Err(CagraError::Io(format!(
"Failed to create parent dir {}: {}",
parent.display(),
e
)));
}
}
let meta_path = meta_path_for(path);
let _ = std::fs::remove_file(&meta_path);
let path_str = path.to_str().ok_or_else(|| {
CagraError::Io(format!(
"CAGRA save path is not valid UTF-8: {}",
path.display()
))
})?;
let generation = match store.splade_generation() {
Ok(g) => g,
Err(e) => {
tracing::warn!(
error = %e,
"Failed to read splade_generation for CAGRA meta; defaulting to 0"
);
0
}
};
tracing::info!(
n_vectors = self.id_map.len(),
dim = self.dim,
splade_generation = generation,
"Serializing CAGRA index to disk"
);
gpu.index
.serialize(&gpu.resources, path_str, true)
.map_err(|e| CagraError::Cuvs(format!("cuvsCagraSerialize failed: {}", e)))?;
let blob_hash = blake3_of_path(path)?;
let meta = CagraMeta {
magic: CAGRA_META_MAGIC.to_string(),
version: CAGRA_META_VERSION,
dim: self.dim,
chunk_count: self.id_map.len(),
splade_generation: generation,
id_map: self.id_map.clone(),
blake3: blob_hash,
};
if let Err(e) = write_meta_atomic(&meta_path, &meta) {
let _ = std::fs::remove_file(path);
return Err(e);
}
tracing::info!(
path = %path.display(),
n_vectors = self.id_map.len(),
"CAGRA index persisted"
);
Ok(())
}
pub fn load(
path: &Path,
expected_dim: usize,
expected_chunks: usize,
) -> Result<Self, CagraError> {
let _span = tracing::info_span!("cagra_load", path = %path.display()).entered();
if !cagra_persist_enabled() {
return Err(CagraError::Io(
"CAGRA persistence disabled via CQS_CAGRA_PERSIST=0".to_string(),
));
}
if !path.exists() {
return Err(CagraError::Io(format!(
"CAGRA blob not found at {}",
path.display()
)));
}
let meta_path = meta_path_for(path);
if !meta_path.exists() {
return Err(CagraError::BadMeta(format!(
"CAGRA sidecar missing at {}",
meta_path.display()
)));
}
const MAX_META_SIZE: u64 = 128 * 1024 * 1024;
let meta_size = std::fs::metadata(&meta_path)
.map_err(|e| {
CagraError::Io(format!(
"Failed to stat CAGRA sidecar {}: {}",
meta_path.display(),
e
))
})?
.len();
if meta_size > MAX_META_SIZE {
return Err(CagraError::BadMeta(format!(
"CAGRA sidecar {} is {}MB, exceeds {}MB limit",
meta_path.display(),
meta_size / (1024 * 1024),
MAX_META_SIZE / (1024 * 1024)
)));
}
let meta_file = std::fs::File::open(&meta_path).map_err(|e| {
CagraError::Io(format!(
"Failed to open CAGRA sidecar {}: {}",
meta_path.display(),
e
))
})?;
let meta: CagraMeta =
serde_json::from_reader(std::io::BufReader::new(meta_file)).map_err(|e| {
CagraError::BadMeta(format!(
"Failed to parse CAGRA sidecar {}: {}",
meta_path.display(),
e
))
})?;
if meta.magic != CAGRA_META_MAGIC {
return Err(CagraError::BadMeta(format!(
"Unexpected magic {:?} (want {:?})",
meta.magic, CAGRA_META_MAGIC
)));
}
if meta.version != CAGRA_META_VERSION {
return Err(CagraError::Stale {
reason: format!(
"sidecar version {} != current {}",
meta.version, CAGRA_META_VERSION
),
});
}
if meta.dim != expected_dim {
return Err(CagraError::Stale {
reason: format!("dim {} != expected {}", meta.dim, expected_dim),
});
}
if meta.chunk_count != expected_chunks {
return Err(CagraError::Stale {
reason: format!(
"chunk_count {} != expected {} (reindex occurred)",
meta.chunk_count, expected_chunks
),
});
}
if meta.id_map.len() != meta.chunk_count {
return Err(CagraError::BadMeta(format!(
"sidecar id_map has {} entries but claims {} chunks",
meta.id_map.len(),
meta.chunk_count
)));
}
let actual_hash = blake3_of_path(path)?;
if actual_hash != meta.blake3 {
return Err(CagraError::ChecksumMismatch(path.display().to_string()));
}
let path_str = path.to_str().ok_or_else(|| {
CagraError::Io(format!(
"CAGRA load path is not valid UTF-8: {}",
path.display()
))
})?;
let resources = cuvs::Resources::new().map_err(|e| CagraError::Cuvs(e.to_string()))?;
let index = cuvs::cagra::Index::deserialize(&resources, path_str)
.map_err(|e| CagraError::Cuvs(format!("cuvsCagraDeserialize failed: {}", e)))?;
tracing::info!(
n_vectors = meta.chunk_count,
dim = meta.dim,
splade_generation = meta.splade_generation,
"CAGRA index loaded from disk"
);
Ok(Self {
dim: meta.dim,
gpu: Mutex::new(GpuState { resources, index }),
id_map: meta.id_map,
poisoned: AtomicBool::new(false),
})
}
pub fn delete_persisted(path: &Path) {
let _span =
tracing::debug_span!("cagra_delete_persisted", path = %path.display()).entered();
let _ = std::fs::remove_file(path);
let _ = std::fs::remove_file(meta_path_for(path));
}
}
#[cfg(feature = "gpu-index")]
fn meta_path_for(path: &Path) -> std::path::PathBuf {
let mut s = path.as_os_str().to_os_string();
s.push(".meta");
std::path::PathBuf::from(s)
}
#[cfg(feature = "gpu-index")]
fn blake3_of_path(path: &Path) -> Result<String, CagraError> {
let file = std::fs::File::open(path).map_err(|e| {
CagraError::Io(format!(
"Failed to open {} for checksum: {}",
path.display(),
e
))
})?;
let mut hasher = blake3::Hasher::new();
hasher.update_reader(file).map_err(|e| {
CagraError::Io(format!(
"Failed to read {} for checksum: {}",
path.display(),
e
))
})?;
Ok(hasher.finalize().to_hex().to_string())
}
#[cfg(feature = "gpu-index")]
fn write_meta_atomic(path: &Path, meta: &CagraMeta) -> Result<(), CagraError> {
let parent = path.parent().ok_or_else(|| {
CagraError::Io(format!(
"CAGRA sidecar has no parent dir: {}",
path.display()
))
})?;
std::fs::create_dir_all(parent).map_err(|e| {
CagraError::Io(format!(
"Failed to create parent {} for sidecar: {}",
parent.display(),
e
))
})?;
let tmp = parent.join(format!(
".{}.{:016x}.tmp",
path.file_name()
.and_then(|n| n.to_str())
.unwrap_or("cagra_meta"),
crate::temp_suffix()
));
{
let file = std::fs::File::create(&tmp).map_err(|e| {
CagraError::Io(format!(
"Failed to create sidecar temp {}: {}",
tmp.display(),
e
))
})?;
let mut writer = std::io::BufWriter::new(file);
serde_json::to_writer(&mut writer, meta)
.map_err(|e| CagraError::Io(format!("Failed to serialize sidecar: {}", e)))?;
use std::io::Write as _;
writer
.flush()
.map_err(|e| CagraError::Io(format!("Failed to flush sidecar: {}", e)))?;
if let Err(e) = writer.get_ref().sync_all() {
tracing::debug!(error = %e, "fsync of CAGRA sidecar temp failed (non-fatal)");
}
}
if let Err(e) = std::fs::rename(&tmp, path) {
let _ = std::fs::remove_file(&tmp);
return Err(CagraError::Io(format!(
"Failed to rename sidecar {} -> {}: {}",
tmp.display(),
path.display(),
e
)));
}
Ok(())
}
#[cfg(all(test, feature = "gpu-index"))]
mod tests {
use super::*;
use crate::index::VectorIndex;
use crate::EMBEDDING_DIM;
use std::sync::Mutex;
static GPU_LOCK: Mutex<()> = Mutex::new(());
fn make_embedding(seed: u32) -> Embedding {
let mut v = vec![0.0f32; EMBEDDING_DIM];
for (i, val) in v.iter_mut().enumerate() {
*val = ((seed as f32 * 10.0) + (i as f32 * 0.001)).sin();
}
let norm: f32 = v.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
v.iter_mut().for_each(|x| *x /= norm);
}
Embedding::new(v)
}
fn require_gpu() -> bool {
if !CagraIndex::gpu_available() {
eprintln!("Skipping CAGRA test: no GPU available");
return false;
}
true
}
fn build_test_index(n: u32) -> CagraIndex {
let embeddings: Vec<(String, Embedding)> = (0..n)
.map(|i| (format!("chunk_{}", i), make_embedding(i)))
.collect();
CagraIndex::build(embeddings, EMBEDDING_DIM).expect("Failed to build test index")
}
#[test]
fn test_gpu_available() {
let _ = CagraIndex::gpu_available();
}
#[test]
fn test_build_simple() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(5);
assert_eq!(index.len(), 5);
assert!(!index.is_empty());
}
#[test]
fn test_build_empty() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let result = CagraIndex::build(vec![], EMBEDDING_DIM);
assert!(result.is_err());
}
#[test]
fn test_build_dimension_mismatch() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let bad_embedding = Embedding::new(vec![1.0; 100]);
let result = CagraIndex::build(vec![("bad".into(), bad_embedding)], EMBEDDING_DIM);
match result {
Err(CagraError::Build(_)) => {}
Err(e) => panic!("Expected Build error, got: {:?}", e),
Ok(_) => panic!("Expected error, got Ok"),
}
}
#[test]
fn test_search_self_match() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(10);
let query = make_embedding(3);
let results = index.search(&query, 5);
assert!(!results.is_empty(), "Search returned no results");
assert_eq!(results[0].id, "chunk_3", "Top result should be chunk_3");
assert!(
results[0].score > 0.9,
"Self-match score should be high, got {}",
results[0].score
);
}
#[test]
fn test_search_k_limiting() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(10);
let query = make_embedding(0);
let results = index.search(&query, 3);
assert!(results.len() <= 3);
}
#[test]
fn test_search_ordering() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(10);
let query = make_embedding(0);
let results = index.search(&query, 5);
for window in results.windows(2) {
assert!(
window[0].score >= window[1].score,
"Results not sorted: {} < {}",
window[0].score,
window[1].score
);
}
}
#[test]
fn test_search_dimension_mismatch_query() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(5);
let bad_query = Embedding::new(vec![1.0; 100]);
let results = index.search(&bad_query, 3);
assert!(results.is_empty());
}
#[test]
fn test_multiple_searches() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(10);
let results1 = index.search(&make_embedding(0), 3);
assert!(!results1.is_empty());
let results2 = index.search(&make_embedding(5), 3);
assert!(!results2.is_empty());
assert_eq!(results2[0].id, "chunk_5");
}
#[test]
fn test_consecutive_searches() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(20);
for i in 0..10 {
let query = make_embedding(i);
let results = index.search(&query, 5);
assert!(!results.is_empty(), "Search {} should return results", i);
assert!(results.len() <= 5);
}
}
#[test]
fn test_search_with_invalid_k() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(5);
let results = index.search(&make_embedding(0), 0);
assert!(results.is_empty());
let results = index.search(&make_embedding(1), 3);
assert!(!results.is_empty());
}
#[test]
fn test_search_k_greater_than_len_drops_phantoms() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(3);
let results = index.search(&make_embedding(0), 10);
assert!(
results.len() <= 3,
"expected at most 3 results, got {}: {:?}",
results.len(),
results
);
for r in &results {
assert!(
matches!(r.id.as_str(), "chunk_0" | "chunk_1" | "chunk_2"),
"phantom id leaked past sentinel check: {}",
r.id
);
}
let mut ids: Vec<&str> = results.iter().map(|r| r.id.as_str()).collect();
ids.sort_unstable();
let before = ids.len();
ids.dedup();
assert_eq!(before, ids.len(), "duplicate ids in results: {:?}", results);
}
#[test]
fn test_name_returns_cagra() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let index = build_test_index(5);
let vi: &dyn VectorIndex = &index;
assert_eq!(vi.name(), "CAGRA");
}
#[test]
fn test_oom_guard_arithmetic() {
let max_bytes = super::cagra_max_bytes();
let max_chunks = max_bytes / (EMBEDDING_DIM * 4);
let under = max_chunks.saturating_mul(EMBEDDING_DIM).saturating_mul(4);
assert!(under <= max_bytes);
let over = (max_chunks + 1)
.saturating_mul(EMBEDDING_DIM)
.saturating_mul(4);
assert!(over > max_bytes);
}
#[test]
fn test_save_load_round_trip() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("test.cagra");
let original = build_test_index(32);
original
.save(&path)
.expect("CAGRA persist save should succeed");
assert!(path.exists(), "CAGRA blob should be written");
let meta_path = super::meta_path_for(&path);
assert!(meta_path.exists(), "CAGRA sidecar should be written");
let queries: Vec<Embedding> = (0..5).map(make_embedding).collect();
let original_results: Vec<Vec<IndexResult>> =
queries.iter().map(|q| original.search(q, 5)).collect();
drop(original);
let loaded =
CagraIndex::load(&path, EMBEDDING_DIM, 32).expect("CAGRA persist load should succeed");
assert_eq!(loaded.len(), 32, "loaded index should have 32 vectors");
assert_eq!(loaded.dim, EMBEDDING_DIM);
for (i, query) in queries.iter().enumerate() {
let got = loaded.search(query, 5);
let expected = &original_results[i];
assert_eq!(
got.len(),
expected.len(),
"query {} returned different neighbour count",
i
);
for (a, b) in got.iter().zip(expected.iter()) {
assert_eq!(
a.id, b.id,
"query {} neighbour id differs after round-trip",
i
);
assert_eq!(
a.score.to_bits(),
b.score.to_bits(),
"query {} score {} != {} after round-trip",
i,
a.score,
b.score
);
}
}
}
#[test]
fn test_load_rejects_chunk_count_mismatch() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("test.cagra");
build_test_index(10)
.save(&path)
.expect("save should succeed");
match CagraIndex::load(&path, EMBEDDING_DIM, 20) {
Err(CagraError::Stale { reason }) => {
assert!(reason.contains("chunk_count"), "reason: {}", reason);
}
other => panic!("expected Stale, got {:?}", other),
}
}
#[test]
fn test_load_rejects_dim_mismatch() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("test.cagra");
build_test_index(10)
.save(&path)
.expect("save should succeed");
match CagraIndex::load(&path, EMBEDDING_DIM + 1, 10) {
Err(CagraError::Stale { reason }) => {
assert!(reason.contains("dim"), "reason: {}", reason);
}
other => panic!("expected Stale, got {:?}", other),
}
}
#[test]
fn test_load_rejects_corrupted_blob() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("test.cagra");
build_test_index(10)
.save(&path)
.expect("save should succeed");
let mut bytes = std::fs::read(&path).unwrap();
let pos = bytes.len().saturating_sub(16);
bytes[pos] ^= 0xff;
std::fs::write(&path, &bytes).unwrap();
match CagraIndex::load(&path, EMBEDDING_DIM, 10) {
Err(CagraError::ChecksumMismatch(p)) => {
assert!(
p.contains("test.cagra"),
"checksum error should reference file: {}",
p
);
}
other => panic!("expected ChecksumMismatch, got {:?}", other),
}
}
#[test]
fn test_load_requires_sidecar() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("test.cagra");
build_test_index(5)
.save(&path)
.expect("save should succeed");
std::fs::remove_file(super::meta_path_for(&path)).unwrap();
match CagraIndex::load(&path, EMBEDDING_DIM, 5) {
Err(CagraError::BadMeta(_)) => {}
other => panic!("expected BadMeta, got {:?}", other),
}
}
#[test]
fn test_delete_persisted_removes_both_files() {
let _guard = GPU_LOCK.lock().unwrap();
if !require_gpu() {
return;
}
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("test.cagra");
build_test_index(3)
.save(&path)
.expect("save should succeed");
let meta = super::meta_path_for(&path);
assert!(path.exists() && meta.exists());
CagraIndex::delete_persisted(&path);
assert!(!path.exists() && !meta.exists());
CagraIndex::delete_persisted(&path);
}
#[test]
fn test_meta_path_for() {
let p = std::path::Path::new("/tmp/foo.cagra");
let meta = super::meta_path_for(p);
assert_eq!(meta.to_str().unwrap(), "/tmp/foo.cagra.meta");
}
#[test]
fn test_persistence_env_override_blocks_save() {
let saved = std::env::var("CQS_CAGRA_PERSIST").ok();
let enabled = super::cagra_persist_enabled();
match saved {
Some(v) => std::env::set_var("CQS_CAGRA_PERSIST", v),
None => std::env::remove_var("CQS_CAGRA_PERSIST"),
}
let _: bool = enabled;
}
}