use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::client::VectorDBClient;
use crate::collection::Collection;
use crate::error::{Result, VectorDBError};
use crate::index::Index;
#[derive(Debug, Clone)]
pub struct Database {
client: VectorDBClient,
name: String,
}
#[derive(Debug, Serialize)]
struct CreateDatabaseRequest {
database: String,
}
#[derive(Debug, Serialize)]
struct CreateCollectionRequest {
database: String,
collection: String,
#[serde(rename = "shardNum")]
shard: u32,
#[serde(rename = "replicaNum")]
replicas: u32,
#[serde(skip_serializing_if = "Option::is_none")]
description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
indexes: Option<Vec<serde_json::Value>>,
#[serde(skip_serializing_if = "Option::is_none")]
embedding: Option<Value>, #[serde(skip_serializing_if = "Option::is_none")]
ttl_config: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
filter_index_config: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct ApiResponse<T> {
code: i32,
msg: String,
#[serde(default)]
data: Option<T>,
}
impl Database {
pub fn new(client: VectorDBClient, name: String) -> Self {
Self { client, name }
}
pub fn name(&self) -> &str {
&self.name
}
pub fn client(&self) -> &VectorDBClient {
&self.client
}
pub async fn create(&self) -> Result<Value> {
let request = CreateDatabaseRequest {
database: self.name.clone(),
};
let response = self.client
.post("/database/create")
.await?
.json(&request)
.send()
.await?;
let response_text = response.text().await?;
let response_json: serde_json::Value = serde_json::from_str(&response_text)?;
if let Some(code) = response_json.get("code").and_then(|v| v.as_i64()) {
if code != 0 {
let msg = response_json.get("msg")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
return Err(VectorDBError::server_error(code as i32, msg));
}
}
Ok(response_json.get("data")
.or_else(|| response_json.get("affectedCount"))
.cloned()
.unwrap_or(Value::Null))
}
pub async fn drop(&self) -> Result<Value> {
let request = serde_json::json!({
"database": self.name
});
let response = self.client
.post("/database/drop")
.await?
.json(&request)
.send()
.await?;
let response_text = response.text().await?;
let response_json: serde_json::Value = serde_json::from_str(&response_text)?;
if let Some(code) = response_json.get("code").and_then(|v| v.as_i64()) {
if code != 0 {
let msg = response_json.get("msg")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
return Err(VectorDBError::server_error(code as i32, msg));
}
}
Ok(response_json.get("data")
.or_else(|| response_json.get("affectedCount"))
.cloned()
.unwrap_or(Value::Null))
}
pub async fn exists_collection(&self, collection_name: &str) -> Result<bool> {
let collections = self.list_collections().await?;
Ok(collections.iter().any(|c| c.name() == collection_name))
}
pub async fn create_collection(
&self,
name: impl Into<String>,
shard: u32,
replicas: u32,
description: Option<String>,
index: Option<Index>,
embedding: Option<crate::enums::Embedding>,
ttl_config: Option<Value>,
) -> Result<Collection> {
let collection_name = name.into();
let indexes = if let Some(idx) = index {
let index_map: std::collections::HashMap<String, serde_json::Value> = serde_json::from_value(serde_json::to_value(idx)?)?;
Some(index_map.into_values().collect())
} else {
None
};
let request = CreateCollectionRequest {
database: self.name.clone(),
collection: collection_name.clone(),
shard,
replicas,
description,
indexes,
embedding: embedding.map(|e| serde_json::to_value(e).unwrap()),
ttl_config,
filter_index_config: None,
};
let response = self.client
.post("/collection/create")
.await?
.json(&request)
.send()
.await?;
let response_text = response.text().await?;
let response_json: serde_json::Value = serde_json::from_str(&response_text)?;
if let Some(code) = response_json.get("code").and_then(|v| v.as_i64()) {
if code != 0 {
let msg = response_json.get("msg")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
return Err(VectorDBError::server_error(code as i32, msg));
}
}
Ok(Collection::new(self.clone(), collection_name))
}
pub async fn create_collection_if_not_exists(
&self,
name: impl Into<String>,
shard: u32,
replicas: u32,
description: Option<String>,
index: Option<Index>,
embedding: Option<crate::enums::Embedding>,
ttl_config: Option<Value>,
) -> Result<Collection> {
let collection_name = name.into();
if self.exists_collection(&collection_name).await? {
Ok(Collection::new(self.clone(), collection_name))
} else {
self.create_collection(collection_name, shard, replicas, description, index, embedding, ttl_config).await
}
}
pub async fn list_collections(&self) -> Result<Vec<Collection>> {
let request = serde_json::json!({
"database": self.name
});
let response = self.client
.post("/collection/list")
.await?
.json(&request)
.send()
.await?;
let response_text = response.text().await?;
let response_json: serde_json::Value = serde_json::from_str(&response_text)?;
if let Some(code) = response_json.get("code").and_then(|v| v.as_i64()) {
if code != 0 {
let msg = response_json.get("msg")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
return Err(VectorDBError::server_error(code as i32, msg));
}
}
let default_array = Value::Array(vec![]);
let collections_data = response_json.get("collections").unwrap_or(&default_array);
let collection_names: Vec<String> = if let Value::Array(arr) = collections_data {
arr.iter()
.filter_map(|item| {
match item {
Value::String(name) => Some(name.clone()),
Value::Object(obj) => {
obj.get("collection")
.or_else(|| obj.get("name"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
_ => None,
}
})
.collect()
} else {
vec![]
};
Ok(collection_names
.into_iter()
.map(|name| Collection::new(self.clone(), name))
.collect())
}
pub async fn collection(&self, collection_name: impl Into<String>) -> Result<Collection> {
let collection_name = collection_name.into();
if self.exists_collection(&collection_name).await? {
Ok(Collection::new(self.clone(), collection_name))
} else {
Err(VectorDBError::param_error(
14100,
format!("Collection not exist: {}", collection_name),
))
}
}
pub async fn describe_collection(&self, collection_name: impl Into<String>) -> Result<Collection> {
let collection_name = collection_name.into();
let request = serde_json::json!({
"database": self.name,
"collection": collection_name
});
let response = self.client
.post("/collection/describe")
.await?
.json(&request)
.send()
.await?;
let response_text = response.text().await?;
let response_json: serde_json::Value = serde_json::from_str(&response_text)?;
if let Some(code) = response_json.get("code").and_then(|v| v.as_i64()) {
if code != 0 {
let msg = response_json.get("msg")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
return Err(VectorDBError::server_error(code as i32, msg));
}
}
Ok(Collection::new(self.clone(), collection_name))
}
pub async fn drop_collection(&self, collection_name: impl Into<String>) -> Result<Value> {
let collection_name = collection_name.into();
let request = serde_json::json!({
"database": self.name,
"collection": collection_name
});
let response = self.client
.post("/collection/drop")
.await?
.json(&request)
.send()
.await?;
let response_text = response.text().await?;
let response_json: serde_json::Value = serde_json::from_str(&response_text)?;
if let Some(code) = response_json.get("code").and_then(|v| v.as_i64()) {
if code != 0 {
let msg = response_json.get("msg")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
return Err(VectorDBError::server_error(code as i32, msg));
}
}
Ok(response_json.get("data")
.or_else(|| response_json.get("affectedCount"))
.cloned()
.unwrap_or(Value::Null))
}
pub async fn truncate_collection(&self, collection_name: impl Into<String>) -> Result<Value> {
let collection_name = collection_name.into();
let request = serde_json::json!({
"database": self.name,
"collection": collection_name
});
let response = self.client
.post("/collection/truncate")
.await?
.json(&request)
.send()
.await?;
let response_text = response.text().await?;
let response_json: serde_json::Value = serde_json::from_str(&response_text)?;
if let Some(code) = response_json.get("code").and_then(|v| v.as_i64()) {
if code != 0 {
let msg = response_json.get("msg")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
return Err(VectorDBError::server_error(code as i32, msg));
}
}
Ok(response_json.get("data")
.or_else(|| response_json.get("affectedCount"))
.cloned()
.unwrap_or(Value::Null))
}
pub async fn set_alias(&self, collection_name: impl Into<String>, alias: impl Into<String>) -> Result<Value> {
let collection_name = collection_name.into();
let alias = alias.into();
let request = serde_json::json!({
"database": self.name,
"collection": collection_name,
"alias": alias
});
let response = self.client
.post("/alias/set")
.await?
.json(&request)
.send()
.await?;
let response_text = response.text().await?;
let response_json: serde_json::Value = serde_json::from_str(&response_text)?;
if let Some(code) = response_json.get("code").and_then(|v| v.as_i64()) {
if code != 0 {
let msg = response_json.get("msg")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
return Err(VectorDBError::server_error(code as i32, msg));
}
}
Ok(response_json.get("data")
.or_else(|| response_json.get("affectedCount"))
.cloned()
.unwrap_or(Value::Null))
}
pub async fn delete_alias(&self, alias: impl Into<String>) -> Result<Value> {
let alias = alias.into();
let request = serde_json::json!({
"database": self.name,
"alias": alias
});
let response = self.client
.post("/alias/delete")
.await?
.json(&request)
.send()
.await?;
let response_text = response.text().await?;
let response_json: serde_json::Value = serde_json::from_str(&response_text)?;
if let Some(code) = response_json.get("code").and_then(|v| v.as_i64()) {
if code != 0 {
let msg = response_json.get("msg")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
return Err(VectorDBError::server_error(code as i32, msg));
}
}
Ok(response_json.get("data")
.or_else(|| response_json.get("affectedCount"))
.cloned()
.unwrap_or(Value::Null))
}
}