#![cfg(feature = "native-v3")]
use crate::backend::native::v3::{KvValue, V3Backend};
use crate::hnsw::errors::{HnswError, HnswStorageError};
use crate::hnsw::storage::{VectorBatch, VectorRecord, VectorStorage};
use crate::snapshot::SnapshotId;
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct StoredVectorRecord {
id: u64,
dimension: usize,
data: Vec<f32>,
metadata: Option<Value>,
created_at: u64,
updated_at: u64,
}
impl From<&VectorRecord> for StoredVectorRecord {
fn from(record: &VectorRecord) -> Self {
Self {
id: record.id(),
dimension: record.dimension(),
data: record.data().to_vec(),
metadata: record.metadata().cloned(),
created_at: record.created_at(),
updated_at: record.updated_at(),
}
}
}
pub struct V3VectorStorageHandle {
backend_ptr: *const V3Backend,
index_name: String,
next_id: std::sync::atomic::AtomicU64,
count: std::sync::atomic::AtomicUsize,
}
unsafe impl Send for V3VectorStorageHandle {}
unsafe impl Sync for V3VectorStorageHandle {}
impl V3VectorStorageHandle {
fn new(backend: &V3Backend, index_name: impl Into<String>) -> Self {
Self {
backend_ptr: backend as *const V3Backend,
index_name: index_name.into(),
next_id: std::sync::atomic::AtomicU64::new(1),
count: std::sync::atomic::AtomicUsize::new(0),
}
}
unsafe fn backend(&self) -> &V3Backend {
unsafe { &*self.backend_ptr }
}
fn vector_key(&self, id: u64) -> Vec<u8> {
format!("hnsw:{}:vector:{}", self.index_name, id).into_bytes()
}
fn next_id(&self) -> u64 {
self.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
}
impl VectorStorage for V3VectorStorageHandle {
fn store_vector(&mut self, vector: &[f32], metadata: Option<Value>) -> Result<u64, HnswError> {
let id = self.next_id();
let record = VectorRecord::new(id, vector.to_vec(), metadata);
record.validate()?;
let stored: StoredVectorRecord = (&record).into();
let json_value = serde_json::to_value(&stored).map_err(|e| {
HnswError::Storage(HnswStorageError::IoError(format!(
"Serialization error: {}",
e
)))
})?;
let key = self.vector_key(id);
unsafe {
self.backend()
.kv_set_v3(key, KvValue::Json(json_value), None);
}
self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(id)
}
fn store_vector_with_id(
&mut self,
id: u64,
vector: Vec<f32>,
metadata: Option<Value>,
) -> Result<(), HnswError> {
let current_next = self.next_id.load(std::sync::atomic::Ordering::SeqCst);
if id >= current_next {
self.next_id
.store(id + 1, std::sync::atomic::Ordering::SeqCst);
}
let record = VectorRecord::new(id, vector, metadata);
record.validate()?;
let stored: StoredVectorRecord = (&record).into();
let json_value = serde_json::to_value(&stored).map_err(|e| {
HnswError::Storage(HnswStorageError::IoError(format!(
"Serialization error: {}",
e
)))
})?;
let key = self.vector_key(id);
unsafe {
self.backend()
.kv_set_v3(key, KvValue::Json(json_value), None);
}
self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
fn get_vector(&self, id: u64) -> Result<Option<Vec<f32>>, HnswError> {
let key = self.vector_key(id);
let snapshot_id = SnapshotId::current();
let value = unsafe { self.backend().kv_get_v3(snapshot_id, &key) };
match value {
Some(KvValue::Json(json)) => {
let stored: StoredVectorRecord = serde_json::from_value(json).map_err(|e| {
HnswError::Storage(HnswStorageError::IoError(format!(
"Deserialization error: {}",
e
)))
})?;
Ok(Some(stored.data))
}
Some(_) => Err(HnswError::Storage(HnswStorageError::IoError(
"Unexpected KV value type".to_string(),
))),
None => Ok(None),
}
}
fn get_vector_with_metadata(&self, id: u64) -> Result<Option<(Vec<f32>, Value)>, HnswError> {
let key = self.vector_key(id);
let snapshot_id = SnapshotId::current();
let value = unsafe { self.backend().kv_get_v3(snapshot_id, &key) };
match value {
Some(KvValue::Json(json)) => {
let stored: StoredVectorRecord = serde_json::from_value(json).map_err(|e| {
HnswError::Storage(HnswStorageError::IoError(format!(
"Deserialization error: {}",
e
)))
})?;
let metadata = stored.metadata.unwrap_or(Value::Null);
Ok(Some((stored.data, metadata)))
}
Some(_) => Err(HnswError::Storage(HnswStorageError::IoError(
"Unexpected KV value type".to_string(),
))),
None => Ok(None),
}
}
fn store_batch(&mut self, batch: VectorBatch) -> Result<Vec<u64>, HnswError> {
let mut ids = Vec::with_capacity(batch.len());
for record in batch.vectors {
let id = self.next_id();
record.validate()?;
let stored = StoredVectorRecord {
id,
dimension: record.dimension(),
data: record.data().to_vec(),
metadata: record.metadata().cloned(),
created_at: record.created_at(),
updated_at: record.updated_at(),
};
let json_value = serde_json::to_value(&stored).map_err(|e| {
HnswError::Storage(HnswStorageError::IoError(format!(
"Serialization error: {}",
e
)))
})?;
let key = self.vector_key(id);
unsafe {
self.backend()
.kv_set_v3(key, KvValue::Json(json_value), None);
}
ids.push(id);
self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Ok(ids)
}
fn delete_vector(&mut self, id: u64) -> Result<(), HnswError> {
let key = self.vector_key(id);
unsafe {
self.backend().kv_delete_v3(&key);
}
let current = self.count.load(std::sync::atomic::Ordering::SeqCst);
if current > 0 {
self.count.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
Ok(())
}
fn vector_count(&self) -> Result<usize, HnswError> {
Ok(self.count.load(std::sync::atomic::Ordering::SeqCst))
}
fn list_vectors(&self) -> Result<Vec<u64>, HnswError> {
Ok(Vec::new())
}
fn clear_vectors(&mut self) -> Result<(), HnswError> {
self.next_id.store(1, std::sync::atomic::Ordering::SeqCst);
self.count.store(0, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
fn get_statistics(&self) -> Result<crate::hnsw::storage::VectorStorageStats, HnswError> {
let count = self.count.load(std::sync::atomic::Ordering::SeqCst);
Ok(crate::hnsw::storage::VectorStorageStats::new(
count,
0,
"V3KV".to_string(),
))
}
}
impl V3Backend {
pub fn create_hnsw_storage(
&self,
index_name: impl Into<String>,
) -> Option<Box<dyn VectorStorage>> {
Some(Box::new(V3VectorStorageHandle::new(self, index_name)))
}
}