use std::path::Path;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use sochdb_storage::database::{Database, DatabaseConfig};
use crate::config::EngineConfig;
use crate::error::{Error, Result};
use crate::types::*;
pub struct Catalog {
db: Arc<Database>,
}
impl Catalog {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let config = DatabaseConfig {
group_commit: true,
..Default::default()
};
let db =
Database::open_with_config(path, config).map_err(|e| Error::Storage(e.to_string()))?;
Ok(Self { db })
}
pub fn open_memory() -> Result<Self> {
let temp_dir = tempfile::tempdir().map_err(|e| Error::Storage(e.to_string()))?;
Self::open(temp_dir.path())
}
fn now_secs() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64
}
pub fn create_collection(&self, name: &str, config: &EngineConfig) -> Result<i64> {
let config_json =
serde_json::to_string(config).map_err(|e| Error::Serialization(e.to_string()))?;
let metric = match config.metric {
Metric::DotProduct => "dot_product",
Metric::Cosine => "cosine",
};
let txn = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
let id = Self::now_secs();
let key = format!("collections/{}", name);
let value = serde_json::json!({
"id": id,
"name": name,
"dim": config.dim,
"metric": metric,
"config_json": config_json,
"created_at": Self::now_secs()
});
self.db
.put(txn, key.as_bytes(), value.to_string().as_bytes())
.map_err(|e| Error::Storage(e.to_string()))?;
self.db
.commit(txn)
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(id)
}
pub fn get_collection(&self, name: &str) -> Result<CollectionInfo> {
let key = format!("collections/{}", name);
let txn = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
let value = self
.db
.get(txn, key.as_bytes())
.map_err(|e| Error::Storage(e.to_string()))?
.ok_or_else(|| Error::CollectionNotFound(name.to_string()))?;
self.db
.commit(txn)
.map_err(|e| Error::Storage(e.to_string()))?;
let json: serde_json::Value =
serde_json::from_slice(&value).map_err(|e| Error::Serialization(e.to_string()))?;
Ok(CollectionInfo {
id: json["id"].as_i64().unwrap_or(0),
name: json["name"].as_str().unwrap_or("").to_string(),
dim: json["dim"].as_u64().unwrap_or(0) as u32,
metric: json["metric"].as_str().unwrap_or("dot_product").to_string(),
config_json: json["config_json"].as_str().unwrap_or("{}").to_string(),
})
}
pub fn list_collections(&self) -> Result<Vec<CollectionInfo>> {
let txn = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
let prefix = b"collections/";
let entries = self
.db
.scan(txn, prefix)
.map_err(|e| Error::Storage(e.to_string()))?;
self.db
.commit(txn)
.map_err(|e| Error::Storage(e.to_string()))?;
let mut collections = Vec::new();
for (_key, value) in entries {
if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&value) {
collections.push(CollectionInfo {
id: json["id"].as_i64().unwrap_or(0),
name: json["name"].as_str().unwrap_or("").to_string(),
dim: json["dim"].as_u64().unwrap_or(0) as u32,
metric: json["metric"].as_str().unwrap_or("dot_product").to_string(),
config_json: json["config_json"].as_str().unwrap_or("{}").to_string(),
});
}
}
Ok(collections)
}
pub fn add_segment(&self, collection_id: i64, segment: &SegmentInfo) -> Result<()> {
let txn = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
let key = format!("segments/{}/{}", collection_id, segment.id);
let value = serde_json::json!({
"id": segment.id,
"collection_id": collection_id,
"path": segment.path,
"state": segment.state.to_string(),
"n_vec": segment.n_vec,
"min_vec_id": segment.min_vec_id,
"max_vec_id": segment.max_vec_id,
"created_at": Self::now_secs()
});
self.db
.put(txn, key.as_bytes(), value.to_string().as_bytes())
.map_err(|e| Error::Storage(e.to_string()))?;
self.db
.commit(txn)
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(())
}
pub fn get_segments(&self, collection_id: i64) -> Result<Vec<SegmentInfo>> {
let txn = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
let prefix = format!("segments/{}/", collection_id);
let entries = self
.db
.scan(txn, prefix.as_bytes())
.map_err(|e| Error::Storage(e.to_string()))?;
self.db
.commit(txn)
.map_err(|e| Error::Storage(e.to_string()))?;
let mut segments = Vec::new();
for (_key, value) in entries {
if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&value) {
let state_str = json["state"].as_str().unwrap_or("sealed");
if state_str != "deleted" {
segments.push(SegmentInfo {
id: json["id"].as_u64().unwrap_or(0),
path: json["path"].as_str().unwrap_or("").to_string(),
state: SegmentState::from_str(state_str),
n_vec: json["n_vec"].as_u64().unwrap_or(0) as u32,
min_vec_id: json["min_vec_id"].as_u64().map(|v| v as u32),
max_vec_id: json["max_vec_id"].as_u64().map(|v| v as u32),
});
}
}
}
segments.sort_by(|a, b| b.id.cmp(&a.id));
Ok(segments)
}
pub fn update_segment_state(&self, segment_id: u64, state: SegmentState) -> Result<()> {
let txn = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
let prefix = b"segments/";
let entries = self
.db
.scan(txn, prefix)
.map_err(|e| Error::Storage(e.to_string()))?;
let mut found = false;
for (key, value) in &entries {
if let Ok(mut json) = serde_json::from_slice::<serde_json::Value>(value) {
if json["id"].as_u64() == Some(segment_id) {
json["state"] = serde_json::Value::String(state.to_string().to_owned());
let txn2 = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
self.db
.put(txn2, key, json.to_string().as_bytes())
.map_err(|e| Error::Storage(e.to_string()))?;
self.db
.commit(txn2)
.map_err(|e| Error::Storage(e.to_string()))?;
found = true;
break;
}
}
}
self.db
.commit(txn)
.map_err(|e| Error::Storage(e.to_string()))?;
if !found {
tracing::warn!("update_segment_state: segment {} not found", segment_id);
}
Ok(())
}
pub fn add_tombstone(&self, collection_id: i64, segment_id: u64, vec_id: u32) -> Result<()> {
let txn = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
let key = format!("tombstones/{}/{}/{}", collection_id, segment_id, vec_id);
let value = serde_json::json!({
"collection_id": collection_id,
"segment_id": segment_id,
"vec_id": vec_id,
"created_at": Self::now_secs()
});
self.db
.put(txn, key.as_bytes(), value.to_string().as_bytes())
.map_err(|e| Error::Storage(e.to_string()))?;
self.db
.commit(txn)
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(())
}
pub fn get_tombstones(&self, segment_id: u64) -> Result<Vec<u32>> {
let txn = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
let prefix = b"tombstones/";
let entries = self
.db
.scan(txn, prefix)
.map_err(|e| Error::Storage(e.to_string()))?;
self.db
.commit(txn)
.map_err(|e| Error::Storage(e.to_string()))?;
let mut tombstones = Vec::new();
for (_key, value) in entries {
if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&value) {
if json["segment_id"].as_u64() == Some(segment_id) {
if let Some(vec_id) = json["vec_id"].as_u64() {
tombstones.push(vec_id as u32);
}
}
}
}
tombstones.sort();
Ok(tombstones)
}
pub fn clear_tombstones(&self, segment_id: u64) -> Result<()> {
let txn = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
let prefix = b"tombstones/";
let entries = self
.db
.scan(txn, prefix)
.map_err(|e| Error::Storage(e.to_string()))?;
self.db
.commit(txn)
.map_err(|e| Error::Storage(e.to_string()))?;
let mut keys_to_delete = Vec::new();
for (key, value) in &entries {
if let Ok(json) = serde_json::from_slice::<serde_json::Value>(value) {
if json["segment_id"].as_u64() == Some(segment_id) {
keys_to_delete.push(key.clone());
}
}
}
if !keys_to_delete.is_empty() {
let txn = self
.db
.begin_transaction()
.map_err(|e| Error::Storage(e.to_string()))?;
for key in &keys_to_delete {
self.db
.delete(txn, key)
.map_err(|e| Error::Storage(e.to_string()))?;
}
self.db
.commit(txn)
.map_err(|e| Error::Storage(e.to_string()))?;
tracing::info!(
"clear_tombstones: removed {} tombstones for segment {}",
keys_to_delete.len(),
segment_id
);
}
Ok(())
}
pub fn get_vector_count(&self, collection_id: i64) -> Result<u64> {
let segments = self.get_segments(collection_id)?;
let total: u64 = segments.iter().map(|s| s.n_vec as u64).sum();
Ok(total)
}
pub fn begin_transaction(&self) -> Result<()> {
Ok(())
}
pub fn commit(&self) -> Result<()> {
Ok(())
}
pub fn rollback(&self) -> Result<()> {
Ok(())
}
pub fn checkpoint(&self) -> Result<()> {
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectionInfo {
pub id: i64,
pub name: String,
pub dim: u32,
pub metric: String,
pub config_json: String,
}
impl CollectionInfo {
pub fn config(&self) -> Result<EngineConfig> {
serde_json::from_str(&self.config_json).map_err(|e| Error::Serialization(e.to_string()))
}
}
#[derive(Debug, Clone)]
pub struct SegmentInfo {
pub id: u64,
pub path: String,
pub state: SegmentState,
pub n_vec: u32,
pub min_vec_id: Option<u32>,
pub max_vec_id: Option<u32>,
}
impl SegmentState {
fn to_string(&self) -> &'static str {
match self {
SegmentState::Mutable => "mutable",
SegmentState::Sealing => "sealing",
SegmentState::Sealed => "sealed",
SegmentState::Compacting => "compacting",
SegmentState::Deleted => "deleted",
}
}
fn from_str(s: &str) -> Self {
match s {
"mutable" => SegmentState::Mutable,
"sealing" => SegmentState::Sealing,
"sealed" => SegmentState::Sealed,
"compacting" => SegmentState::Compacting,
"deleted" => SegmentState::Deleted,
_ => SegmentState::Sealed,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_catalog_operations() {
let catalog = Catalog::open_memory().unwrap();
let config = EngineConfig::with_dim(768);
let collection_id = catalog.create_collection("test", &config).unwrap();
assert!(collection_id > 0);
let info = catalog.get_collection("test").unwrap();
assert_eq!(info.dim, 768);
let segment = SegmentInfo {
id: 1,
path: "/data/segment_1.seg".to_string(),
state: SegmentState::Sealed,
n_vec: 10000,
min_vec_id: Some(0),
max_vec_id: Some(9999),
};
catalog.add_segment(collection_id, &segment).unwrap();
let segments = catalog.get_segments(collection_id).unwrap();
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].n_vec, 10000);
catalog.add_tombstone(collection_id, 1, 500).unwrap();
let tombstones = catalog.get_tombstones(1).unwrap();
assert_eq!(tombstones, vec![500]);
}
#[test]
fn test_collection_not_found() {
let catalog = Catalog::open_memory().unwrap();
let result = catalog.get_collection("nonexistent");
assert!(matches!(result, Err(Error::CollectionNotFound(_))));
}
}