use async_trait::async_trait;
use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
use opendal::{services, Operator};
use serde::{Deserialize, Serialize};
use crate::traits::{IndexStorage, IndexType, VectorStorage};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct NamespaceMetadata {
dimension: Option<usize>,
vector_count: usize,
created_at: u64,
updated_at: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct StoredVector {
id: String,
values: Vec<f32>,
metadata: Option<serde_json::Value>,
}
impl From<Vector> for StoredVector {
fn from(v: Vector) -> Self {
Self {
id: v.id,
values: v.values,
metadata: v.metadata,
}
}
}
impl From<StoredVector> for Vector {
fn from(v: StoredVector) -> Self {
Self {
id: v.id,
values: v.values,
metadata: v.metadata,
ttl_seconds: None,
expires_at: None,
}
}
}
#[derive(Debug, Clone, Default)]
pub enum ObjectStorageConfig {
#[default]
Memory,
Filesystem { root: String },
S3 {
bucket: String,
region: Option<String>,
endpoint: Option<String>,
access_key_id: Option<String>,
secret_access_key: Option<String>,
},
Azure {
container: String,
account_name: String,
account_key: Option<String>,
sas_token: Option<String>,
endpoint: Option<String>,
},
Gcs {
bucket: String,
credential_path: Option<String>,
endpoint: Option<String>,
},
}
#[derive(Clone)]
pub struct ObjectStorage {
operator: Operator,
}
impl ObjectStorage {
pub fn new(config: ObjectStorageConfig) -> Result<Self> {
let operator = Self::build_operator(&config)?;
Ok(Self { operator })
}
pub fn memory() -> Result<Self> {
Self::new(ObjectStorageConfig::Memory)
}
pub fn filesystem(root: impl Into<String>) -> Result<Self> {
Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
}
pub fn s3(bucket: impl Into<String>) -> Result<Self> {
Self::new(ObjectStorageConfig::S3 {
bucket: bucket.into(),
region: None,
endpoint: None,
access_key_id: None,
secret_access_key: None,
})
}
pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
Self::new(ObjectStorageConfig::Azure {
container: container.into(),
account_name: account_name.into(),
account_key: None,
sas_token: None,
endpoint: None,
})
}
pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
Self::new(ObjectStorageConfig::Gcs {
bucket: bucket.into(),
credential_path: None,
endpoint: None,
})
}
pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
match config {
ObjectStorageConfig::Memory => {
let builder = services::Memory::default();
Operator::new(builder)
.map(|op| op.finish())
.map_err(|e| DakeraError::Storage(e.to_string()))
}
ObjectStorageConfig::Filesystem { root } => {
let builder = services::Fs::default().root(root);
Operator::new(builder)
.map(|op| op.finish())
.map_err(|e| DakeraError::Storage(e.to_string()))
}
ObjectStorageConfig::S3 {
bucket,
region,
endpoint,
access_key_id,
secret_access_key,
} => {
let mut builder = services::S3::default().bucket(bucket);
if let Some(region) = region {
builder = builder.region(region);
}
if let Some(endpoint) = endpoint {
builder = builder.endpoint(endpoint);
}
if let Some(key) = access_key_id {
builder = builder.access_key_id(key);
}
if let Some(secret) = secret_access_key {
builder = builder.secret_access_key(secret);
}
Operator::new(builder)
.map(|op| op.finish())
.map_err(|e| DakeraError::Storage(e.to_string()))
}
ObjectStorageConfig::Azure {
container,
account_name,
account_key,
sas_token,
endpoint,
} => {
let mut builder = services::Azblob::default()
.container(container)
.account_name(account_name);
if let Some(key) = account_key {
builder = builder.account_key(key);
}
if let Some(token) = sas_token {
builder = builder.sas_token(token);
}
if let Some(endpoint) = endpoint {
builder = builder.endpoint(endpoint);
}
Operator::new(builder)
.map(|op| op.finish())
.map_err(|e| DakeraError::Storage(e.to_string()))
}
ObjectStorageConfig::Gcs {
bucket,
credential_path,
endpoint,
} => {
let mut builder = services::Gcs::default().bucket(bucket);
if let Some(cred_path) = credential_path {
builder = builder.credential_path(cred_path);
}
if let Some(endpoint) = endpoint {
builder = builder.endpoint(endpoint);
}
Operator::new(builder)
.map(|op| op.finish())
.map_err(|e| DakeraError::Storage(e.to_string()))
}
}
}
fn vector_path(namespace: &str, vector_id: &str) -> String {
format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
}
fn namespace_meta_path(namespace: &str) -> String {
format!("namespaces/{}/meta.json", namespace)
}
fn namespace_vectors_prefix(namespace: &str) -> String {
format!("namespaces/{}/vectors/", namespace)
}
async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
let path = Self::namespace_meta_path(namespace);
match self.operator.read(&path).await {
Ok(data) => {
let bytes = data.to_vec();
if bytes.is_empty() {
tracing::warn!(
namespace = %namespace,
path = %path,
"Empty namespace metadata file detected, treating as missing"
);
return Ok(None);
}
match serde_json::from_slice(&bytes) {
Ok(meta) => Ok(Some(meta)),
Err(e) => {
tracing::warn!(
namespace = %namespace,
path = %path,
error = %e,
bytes_len = bytes.len(),
"Corrupted namespace metadata, treating as missing and will be recreated"
);
Ok(None)
}
}
}
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
Err(e) => Err(DakeraError::Storage(e.to_string())),
}
}
async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
let path = Self::namespace_meta_path(namespace);
let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
self.operator
.write(&path, data)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
Ok(())
}
fn now() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
}
#[async_trait]
impl VectorStorage for ObjectStorage {
async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
if vectors.is_empty() {
return Ok(0);
}
let mut meta = self
.read_namespace_meta(namespace)
.await?
.unwrap_or_else(|| NamespaceMetadata {
dimension: None,
vector_count: 0,
created_at: Self::now(),
updated_at: Self::now(),
});
let first_dim = vectors[0].values.len();
if let Some(dim) = meta.dimension {
for v in &vectors {
if v.values.len() != dim {
return Err(DakeraError::DimensionMismatch {
expected: dim,
actual: v.values.len(),
});
}
}
} else {
meta.dimension = Some(first_dim);
}
let mut upserted = 0;
for vector in vectors {
let path = Self::vector_path(namespace, &vector.id);
let stored: StoredVector = vector.into();
let data =
serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
let exists = self
.operator
.exists(&path)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
self.operator
.write(&path, data)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
if !exists {
meta.vector_count += 1;
}
upserted += 1;
}
meta.updated_at = Self::now();
self.write_namespace_meta(namespace, &meta).await?;
tracing::debug!(
namespace = namespace,
upserted = upserted,
"Upserted vectors to object storage"
);
Ok(upserted)
}
async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
let mut vectors = Vec::new();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
for id in ids {
let path = Self::vector_path(namespace, id);
match self.operator.read(&path).await {
Ok(data) => {
let bytes = data.to_vec();
if bytes.is_empty() {
tracing::warn!(
namespace = %namespace,
vector_id = %id,
"Empty vector file detected, skipping"
);
continue;
}
match serde_json::from_slice::<StoredVector>(&bytes) {
Ok(stored) => {
let vector: Vector = stored.into();
if !vector.is_expired_at(now) {
vectors.push(vector);
}
}
Err(e) => {
tracing::warn!(
namespace = %namespace,
vector_id = %id,
error = %e,
bytes_len = bytes.len(),
"Corrupted vector file detected, skipping"
);
}
}
}
Err(e) if e.kind() == opendal::ErrorKind::NotFound => {
}
Err(e) => return Err(DakeraError::Storage(e.to_string())),
}
}
Ok(vectors)
}
async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
let prefix = Self::namespace_vectors_prefix(namespace);
let entries = self
.operator
.list(&prefix)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let mut vectors = Vec::new();
for entry in entries {
let path = entry.path();
if path.ends_with(".json") {
match self.operator.read(path).await {
Ok(data) => {
let bytes = data.to_vec();
if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
let vector: Vector = stored.into();
if !vector.is_expired_at(now) {
vectors.push(vector);
}
}
}
Err(e) => {
tracing::warn!(path = path, error = %e, "Failed to read vector");
}
}
}
}
Ok(vectors)
}
async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
let mut deleted = 0;
for id in ids {
let path = Self::vector_path(namespace, id);
let exists = self
.operator
.exists(&path)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
if exists {
match self.operator.delete(&path).await {
Ok(_) => deleted += 1,
Err(e) if e.kind() == opendal::ErrorKind::NotFound => {}
Err(e) => return Err(DakeraError::Storage(e.to_string())),
}
}
}
if deleted > 0 {
if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
meta.vector_count = meta.vector_count.saturating_sub(deleted);
meta.updated_at = Self::now();
self.write_namespace_meta(namespace, &meta).await?;
}
}
Ok(deleted)
}
async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
Ok(self.read_namespace_meta(namespace).await?.is_some())
}
async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
if self.read_namespace_meta(namespace).await?.is_none() {
let meta = NamespaceMetadata {
dimension: None,
vector_count: 0,
created_at: Self::now(),
updated_at: Self::now(),
};
self.write_namespace_meta(namespace, &meta).await?;
}
Ok(())
}
async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
Ok(self
.read_namespace_meta(namespace)
.await?
.and_then(|m| m.dimension))
}
async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
Ok(self
.read_namespace_meta(namespace)
.await?
.map(|m| m.vector_count)
.unwrap_or(0))
}
async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
let entries = self
.operator
.list("namespaces/")
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
let mut namespaces = Vec::new();
for entry in entries {
let path = entry.path();
if let Some(ns) = path.strip_prefix("namespaces/") {
let ns = ns.trim_end_matches('/');
if !ns.is_empty() && !ns.contains('/') {
if self.read_namespace_meta(ns).await?.is_some() {
namespaces.push(ns.to_string());
}
}
}
}
Ok(namespaces)
}
async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
if !self.namespace_exists(namespace).await? {
return Ok(false);
}
let prefix = format!("namespaces/{}/", namespace);
self.operator
.remove_all(&prefix)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
tracing::debug!(
namespace = namespace,
"Deleted namespace from object storage"
);
Ok(true)
}
async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
Ok(0)
}
async fn cleanup_all_expired(&self) -> Result<usize> {
Ok(0)
}
}
#[async_trait]
impl IndexStorage for ObjectStorage {
async fn save_index(
&self,
namespace: &NamespaceId,
index_type: IndexType,
data: Vec<u8>,
) -> Result<()> {
let path = format!(
"namespaces/{}/indexes/{}.bin",
namespace,
index_type.as_str()
);
self.operator
.write(&path, data)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
tracing::debug!(
namespace = namespace,
index_type = index_type.as_str(),
"Saved index to object storage"
);
Ok(())
}
async fn load_index(
&self,
namespace: &NamespaceId,
index_type: IndexType,
) -> Result<Option<Vec<u8>>> {
let path = format!(
"namespaces/{}/indexes/{}.bin",
namespace,
index_type.as_str()
);
match self.operator.read(&path).await {
Ok(data) => {
tracing::debug!(
namespace = namespace,
index_type = index_type.as_str(),
size = data.len(),
"Loaded index from object storage"
);
Ok(Some(data.to_vec()))
}
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
Err(e) => Err(DakeraError::Storage(e.to_string())),
}
}
async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
let path = format!(
"namespaces/{}/indexes/{}.bin",
namespace,
index_type.as_str()
);
let exists = self
.operator
.exists(&path)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
if exists {
self.operator
.delete(&path)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
tracing::debug!(
namespace = namespace,
index_type = index_type.as_str(),
"Deleted index from object storage"
);
}
Ok(exists)
}
async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
let path = format!(
"namespaces/{}/indexes/{}.bin",
namespace,
index_type.as_str()
);
self.operator
.exists(&path)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))
}
async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
let prefix = format!("namespaces/{}/indexes/", namespace);
let entries = self
.operator
.list(&prefix)
.await
.map_err(|e| DakeraError::Storage(e.to_string()))?;
let mut indexes = Vec::new();
for entry in entries {
let path = entry.path();
if path.ends_with(".bin") {
if let Some(filename) = path.strip_prefix(&prefix) {
let name = filename.trim_end_matches(".bin");
match name {
"hnsw" => indexes.push(IndexType::Hnsw),
"pq" => indexes.push(IndexType::Pq),
"ivf" => indexes.push(IndexType::Ivf),
"spfresh" => indexes.push(IndexType::SpFresh),
"fulltext" => indexes.push(IndexType::FullText),
_ => {} }
}
}
}
Ok(indexes)
}
}
pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
ObjectStorage::build_operator(config)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_object_storage_memory() {
let storage = ObjectStorage::memory().unwrap();
let namespace = "test".to_string();
storage.ensure_namespace(&namespace).await.unwrap();
assert!(storage.namespace_exists(&namespace).await.unwrap());
let vectors = vec![
Vector {
id: "v1".to_string(),
values: vec![1.0, 2.0, 3.0],
metadata: None,
ttl_seconds: None,
expires_at: None,
},
Vector {
id: "v2".to_string(),
values: vec![4.0, 5.0, 6.0],
metadata: Some(serde_json::json!({"key": "value"})),
ttl_seconds: None,
expires_at: None,
},
];
let count = storage.upsert(&namespace, vectors).await.unwrap();
assert_eq!(count, 2);
let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].id, "v1");
assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
let all = storage.get_all(&namespace).await.unwrap();
assert_eq!(all.len(), 2);
assert_eq!(storage.count(&namespace).await.unwrap(), 2);
let deleted = storage
.delete(&namespace, &["v1".to_string()])
.await
.unwrap();
assert_eq!(deleted, 1);
assert!(storage
.get(&namespace, &["v1".to_string()])
.await
.unwrap()
.is_empty());
assert_eq!(storage.count(&namespace).await.unwrap(), 1);
}
#[tokio::test]
async fn test_object_storage_dimension_mismatch() {
let storage = ObjectStorage::memory().unwrap();
let namespace = "test".to_string();
storage.ensure_namespace(&namespace).await.unwrap();
let v1 = vec![Vector {
id: "v1".to_string(),
values: vec![1.0, 2.0, 3.0],
metadata: None,
ttl_seconds: None,
expires_at: None,
}];
storage.upsert(&namespace, v1).await.unwrap();
let v2 = vec![Vector {
id: "v2".to_string(),
values: vec![1.0, 2.0], metadata: None,
ttl_seconds: None,
expires_at: None,
}];
let result = storage.upsert(&namespace, v2).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_object_storage_upsert() {
let storage = ObjectStorage::memory().unwrap();
let namespace = "test".to_string();
storage.ensure_namespace(&namespace).await.unwrap();
let v1 = vec![Vector {
id: "v1".to_string(),
values: vec![1.0, 2.0],
metadata: None,
ttl_seconds: None,
expires_at: None,
}];
storage.upsert(&namespace, v1).await.unwrap();
let v1_updated = vec![Vector {
id: "v1".to_string(),
values: vec![3.0, 4.0],
metadata: None,
ttl_seconds: None,
expires_at: None,
}];
storage.upsert(&namespace, v1_updated).await.unwrap();
let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].values, vec![3.0, 4.0]);
assert_eq!(storage.count(&namespace).await.unwrap(), 1);
}
#[tokio::test]
async fn test_index_storage() {
let storage = ObjectStorage::memory().unwrap();
let namespace = "test_index".to_string();
assert!(!storage
.index_exists(&namespace, IndexType::Hnsw)
.await
.unwrap());
let index_data = b"fake hnsw index data for testing".to_vec();
storage
.save_index(&namespace, IndexType::Hnsw, index_data.clone())
.await
.unwrap();
assert!(storage
.index_exists(&namespace, IndexType::Hnsw)
.await
.unwrap());
assert!(!storage
.index_exists(&namespace, IndexType::Pq)
.await
.unwrap());
let loaded = storage
.load_index(&namespace, IndexType::Hnsw)
.await
.unwrap();
assert!(loaded.is_some());
assert_eq!(loaded.unwrap(), index_data);
let pq_data = b"fake pq index data".to_vec();
storage
.save_index(&namespace, IndexType::Pq, pq_data)
.await
.unwrap();
let indexes = storage.list_indexes(&namespace).await.unwrap();
assert_eq!(indexes.len(), 2);
assert!(indexes.contains(&IndexType::Hnsw));
assert!(indexes.contains(&IndexType::Pq));
let deleted = storage
.delete_index(&namespace, IndexType::Hnsw)
.await
.unwrap();
assert!(deleted);
assert!(!storage
.index_exists(&namespace, IndexType::Hnsw)
.await
.unwrap());
let deleted = storage
.delete_index(&namespace, IndexType::Hnsw)
.await
.unwrap();
assert!(!deleted);
let loaded = storage
.load_index(&namespace, IndexType::Hnsw)
.await
.unwrap();
assert!(loaded.is_none());
}
}