use crate::collection::graph::{EdgeType, GraphSchema, NodeType, ValueType};
use crate::velesql::{
AlterCollectionStatement, AnalyzeStatement, CreateCollectionKind, CreateIndexStatement,
DdlStatement, DropIndexStatement, GraphSchemaMode, SchemaDefinition, TruncateStatement,
};
use crate::{Error, Result, SearchResult};
use super::Database;
impl Database {
pub(super) fn execute_ddl(&self, ddl: &DdlStatement) -> Result<Vec<SearchResult>> {
if let Some(ref observer) = self.observer {
let (operation, name) = ddl_operation_info(ddl);
observer.on_ddl_request(operation, &name)?;
}
match ddl {
DdlStatement::CreateCollection(stmt) => self.execute_create_collection(stmt),
DdlStatement::DropCollection(stmt) => self.execute_drop_collection(stmt),
DdlStatement::CreateIndex(stmt) => self.execute_create_index(stmt),
DdlStatement::DropIndex(stmt) => self.execute_drop_index(stmt),
DdlStatement::Analyze(stmt) => self.execute_analyze(stmt),
DdlStatement::Truncate(stmt) => self.execute_truncate(stmt),
DdlStatement::AlterCollection(stmt) => self.execute_alter_collection(stmt),
}
}
fn execute_create_collection(
&self,
stmt: &crate::velesql::CreateCollectionStatement,
) -> Result<Vec<SearchResult>> {
match &stmt.kind {
CreateCollectionKind::Vector(params) => self.create_vector_from_ddl(&stmt.name, params),
CreateCollectionKind::Graph(params) => self.create_graph_from_ddl(&stmt.name, params),
CreateCollectionKind::Metadata => {
self.create_metadata_collection(&stmt.name)?;
Ok(Vec::new())
}
}
}
fn create_vector_from_ddl(
&self,
name: &str,
params: &crate::velesql::VectorCollectionParams,
) -> Result<Vec<SearchResult>> {
let metric = resolve_metric(¶ms.metric)?;
let storage = resolve_storage_mode(params.storage.as_deref())?;
if params.m.is_some() || params.ef_construction.is_some() {
self.create_vector_collection_with_hnsw(
name,
params.dimension,
metric,
storage,
params.m,
params.ef_construction,
)?;
} else {
self.create_vector_collection_with_options(name, params.dimension, metric, storage)?;
}
Ok(Vec::new())
}
fn create_graph_from_ddl(
&self,
name: &str,
params: &crate::velesql::GraphCollectionParams,
) -> Result<Vec<SearchResult>> {
let schema = build_graph_schema(¶ms.schema_mode);
if let Some(dim) = params.dimension {
let metric_str = params.metric.as_deref().unwrap_or("cosine");
let metric = resolve_metric(metric_str)?;
self.create_graph_collection_with_embeddings(name, schema, dim, metric)?;
} else {
self.create_graph_collection(name, schema)?;
}
Ok(Vec::new())
}
fn execute_drop_collection(
&self,
stmt: &crate::velesql::DropCollectionStatement,
) -> Result<Vec<SearchResult>> {
match self.delete_collection(&stmt.name) {
Ok(()) => Ok(Vec::new()),
Err(Error::CollectionNotFound(_)) if stmt.if_exists => Ok(Vec::new()),
Err(e) => Err(e),
}
}
fn execute_create_index(&self, stmt: &CreateIndexStatement) -> Result<Vec<SearchResult>> {
let collection = self.resolve_writable_collection(&stmt.collection)?;
collection.create_index(&stmt.field)?;
Ok(Vec::new())
}
fn execute_drop_index(&self, stmt: &DropIndexStatement) -> Result<Vec<SearchResult>> {
let collection = self.resolve_writable_collection(&stmt.collection)?;
let _ = collection.drop_secondary_index(&stmt.field);
Ok(Vec::new())
}
fn execute_analyze(&self, stmt: &AnalyzeStatement) -> Result<Vec<SearchResult>> {
let stats = self.analyze_collection(&stmt.collection)?;
let stats_json = serde_json::to_value(&stats)
.unwrap_or_else(|_| serde_json::json!({"error": "failed to serialize stats"}));
let result = SearchResult::new(crate::Point::metadata_only(0, stats_json), 0.0);
Ok(vec![result])
}
fn execute_truncate(&self, stmt: &TruncateStatement) -> Result<Vec<SearchResult>> {
if let Some(gc) = self.get_graph_collection(&stmt.collection) {
return Self::truncate_graph(&gc);
}
let collection = self
.resolve_writable_collection(&stmt.collection)
.or_else(|_| self.resolve_collection(&stmt.collection))?;
let ids = collection.all_point_ids();
let count = ids.len();
if !ids.is_empty() {
collection.delete(&ids)?;
}
let payload = serde_json::json!({"deleted_count": count});
let result = SearchResult::new(crate::Point::metadata_only(0, payload), 0.0);
Ok(vec![result])
}
fn truncate_graph(gc: &crate::collection::GraphCollection) -> Result<Vec<SearchResult>> {
let edges = gc.get_edges(None);
let edge_count = edges.len();
for edge in &edges {
let _ = gc.remove_edge(edge.id());
}
let node_ids = gc.all_node_ids();
let node_count = node_ids.len();
if !node_ids.is_empty() {
gc.delete(&node_ids)?;
}
let payload = serde_json::json!({
"deleted_nodes": node_count,
"deleted_edges": edge_count,
"deleted_count": node_count + edge_count,
});
let result = SearchResult::new(crate::Point::metadata_only(0, payload), 0.0);
Ok(vec![result])
}
fn execute_alter_collection(
&self,
stmt: &AlterCollectionStatement,
) -> Result<Vec<SearchResult>> {
let _collection = self.resolve_writable_collection(&stmt.collection)?;
for (key, value) in &stmt.options {
apply_alter_option(key, value)?;
}
let option_list = stmt
.options
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(", ");
Err(Error::Query(format!(
"ALTER COLLECTION SET is parsed but not yet implemented \
(tracked under US-300). Requested options on collection '{}': \
[{}]. Workaround: drop and recreate the collection with \
CREATE COLLECTION.",
stmt.collection, option_list
)))
}
}
fn ddl_operation_info(ddl: &DdlStatement) -> (&str, String) {
match ddl {
DdlStatement::CreateCollection(stmt) => ("CREATE", stmt.name.clone()),
DdlStatement::DropCollection(stmt) => ("DROP", stmt.name.clone()),
DdlStatement::CreateIndex(stmt) => ("CREATE_INDEX", stmt.collection.clone()),
DdlStatement::DropIndex(stmt) => ("DROP_INDEX", stmt.collection.clone()),
DdlStatement::Analyze(stmt) => ("ANALYZE", stmt.collection.clone()),
DdlStatement::Truncate(stmt) => ("TRUNCATE", stmt.collection.clone()),
DdlStatement::AlterCollection(stmt) => ("ALTER", stmt.collection.clone()),
}
}
fn resolve_metric(s: &str) -> Result<crate::DistanceMetric> {
crate::DistanceMetric::parse_alias(s).ok_or_else(|| {
Error::Query(format!(
"Unknown metric '{s}'. Use: cosine, euclidean, dot, hamming, jaccard"
))
})
}
fn resolve_storage_mode(s: Option<&str>) -> Result<crate::StorageMode> {
let Some(name) = s else {
return Ok(crate::StorageMode::default());
};
crate::StorageMode::parse_alias(name).ok_or_else(|| {
Error::Query(format!(
"Unknown storage mode '{name}'. Use: full, sq8, binary, pq, rabitq"
))
})
}
fn resolve_value_type(s: &str) -> ValueType {
match s.to_uppercase().as_str() {
"INTEGER" | "INT" => ValueType::Integer,
"FLOAT" | "DOUBLE" => ValueType::Float,
"BOOLEAN" | "BOOL" => ValueType::Boolean,
"VECTOR" | "EMBEDDING" => ValueType::Vector,
_ => ValueType::String,
}
}
fn build_graph_schema(mode: &GraphSchemaMode) -> GraphSchema {
match mode {
GraphSchemaMode::Schemaless => GraphSchema::schemaless(),
GraphSchemaMode::Typed(definitions) => build_typed_schema(definitions),
}
}
fn build_typed_schema(definitions: &[SchemaDefinition]) -> GraphSchema {
let mut schema = GraphSchema::new();
for def in definitions {
match def {
SchemaDefinition::Node { name, properties } => {
let props: std::collections::HashMap<String, ValueType> = properties
.iter()
.map(|(k, v)| (k.clone(), resolve_value_type(v)))
.collect();
schema = schema.with_node_type(NodeType::new(name).with_properties(props));
}
SchemaDefinition::Edge {
name,
from_type,
to_type,
} => {
schema = schema.with_edge_type(EdgeType::new(name, from_type, to_type));
}
}
}
schema
}
fn apply_alter_option(key: &str, value: &str) -> Result<()> {
match key {
"auto_reindex" => value.parse::<bool>().map(|_| ()).map_err(|_| {
Error::Query(format!(
"auto_reindex must be 'true' or 'false', got '{value}'"
))
}),
_ => Err(Error::Query(format!(
"Unsupported ALTER option: '{key}'. Supported: auto_reindex"
))),
}
}