1use axum::{
4 extract::{Path, State},
5 http::StatusCode,
6 response::IntoResponse,
7 Json,
8};
9use std::sync::Arc;
10
11use crate::types::{CollectionResponse, CreateCollectionRequest, ErrorResponse};
12use crate::AppState;
13use velesdb_core::index::HnswParams;
14use velesdb_core::{DistanceMetric, StorageMode};
15
16use super::helpers::{auto_core_error_response, error_response, get_collection_or_404};
17
18#[utoipa::path(
20 get,
21 path = "/collections",
22 tag = "collections",
23 responses(
24 (status = 200, description = "List of collections", body = Object)
25 )
26)]
27pub async fn list_collections(State(state): State<Arc<AppState>>) -> impl IntoResponse {
28 let collections = state.db.list_collections();
29 Json(serde_json::json!({ "collections": collections }))
30}
31
32#[utoipa::path(
34 post,
35 path = "/collections",
36 tag = "collections",
37 request_body = CreateCollectionRequest,
38 responses(
39 (status = 201, description = "Collection created", body = Object),
40 (status = 400, description = "Invalid request", body = ErrorResponse)
41 )
42)]
43pub async fn create_collection(
44 State(state): State<Arc<AppState>>,
45 Json(req): Json<CreateCollectionRequest>,
46) -> impl IntoResponse {
47 let metric = match parse_distance_metric(&req.metric) {
48 Ok(m) => m,
49 Err(resp) => return resp,
50 };
51
52 let storage_mode = match parse_storage_mode(&req.storage_mode) {
53 Ok(s) => s,
54 Err(resp) => return resp,
55 };
56
57 let result = match dispatch_create(&state, &req, metric, storage_mode) {
58 Ok(r) => r,
59 Err(resp) => return resp,
60 };
61
62 match result {
63 Ok(()) => create_collection_success_response(&req),
64 Err(e) => auto_core_error_response(&e),
65 }
66}
67
68#[allow(clippy::result_large_err)]
72fn parse_distance_metric(raw: &str) -> Result<DistanceMetric, axum::response::Response> {
73 raw.parse::<DistanceMetric>()
74 .map_err(|e| error_response(StatusCode::BAD_REQUEST, e.to_string()))
75}
76
77#[allow(clippy::result_large_err)]
81fn parse_storage_mode(raw: &str) -> Result<StorageMode, axum::response::Response> {
82 raw.parse::<StorageMode>()
83 .map_err(|e| error_response(StatusCode::BAD_REQUEST, e))
84}
85
86fn build_hnsw_params_override(
96 req: &CreateCollectionRequest,
97 dimension: usize,
98 storage_mode: StorageMode,
99) -> Option<HnswParams> {
100 if req.hnsw_m.is_none()
101 && req.hnsw_ef_construction.is_none()
102 && req.hnsw_alpha.is_none()
103 && req.hnsw_max_elements.is_none()
104 {
105 return None;
106 }
107 let base = HnswParams::auto(dimension);
108 Some(HnswParams {
109 max_connections: req.hnsw_m.unwrap_or(base.max_connections),
110 ef_construction: req.hnsw_ef_construction.unwrap_or(base.ef_construction),
111 max_elements: req.hnsw_max_elements.unwrap_or(base.max_elements),
112 storage_mode,
113 alpha: req.hnsw_alpha.unwrap_or(base.alpha),
114 })
115}
116
117#[allow(clippy::result_large_err)]
127fn create_vector_collection(
128 state: &AppState,
129 req: &CreateCollectionRequest,
130 metric: DistanceMetric,
131 storage_mode: StorageMode,
132) -> Result<velesdb_core::error::Result<()>, axum::response::Response> {
133 let dimension = req.dimension.ok_or_else(|| {
134 error_response(
135 StatusCode::BAD_REQUEST,
136 "dimension is required for vector collections".to_string(),
137 )
138 })?;
139
140 let advanced = parse_advanced_config(req)?;
145
146 let base_result = if let Some(hnsw_params) =
155 build_hnsw_params_override(req, dimension, storage_mode)
156 {
157 hnsw_params
160 .validate()
161 .map_err(|e| error_response(StatusCode::BAD_REQUEST, e.to_string()))?;
162 state.db.create_vector_collection_with_params(
163 &req.name,
164 dimension,
165 metric,
166 storage_mode,
167 hnsw_params,
168 None,
169 )
170 } else {
171 state
172 .db
173 .create_vector_collection_with_options(&req.name, dimension, metric, storage_mode)
174 };
175 if let Err(e) = base_result {
176 return Ok(Err(e));
177 }
178
179 if advanced.has_any() {
181 return Ok(apply_advanced_with_rollback(state, &req.name, advanced));
182 }
183
184 Ok(Ok(()))
185}
186
187fn apply_advanced_with_rollback(
192 state: &AppState,
193 name: &str,
194 advanced: AdvancedCreateOverrides,
195) -> velesdb_core::error::Result<()> {
196 let Some(coll) = state.db.get_vector_collection(name) else {
197 return Err(velesdb_core::error::Error::CollectionNotFound(
198 name.to_string(),
199 ));
200 };
201 if let Err(phase_two_err) = coll.apply_advanced_config(
202 advanced.pq_rescore_oversampling,
203 #[cfg(feature = "persistence")]
204 advanced.deferred_indexing,
205 advanced.async_index_builder,
206 ) {
207 drop(coll);
208 let rollback_outcome = state.db.delete_collection(name);
209 if let Err(ref rollback_err) = rollback_outcome {
210 tracing::warn!(
211 collection = %name,
212 rollback_error = %rollback_err,
213 phase_two_error = %phase_two_err,
214 "failed to roll back collection after apply_advanced_config error"
215 );
216 }
217 log_rollback_invariant(state, name, &rollback_outcome, &phase_two_err);
218 return Err(phase_two_err);
219 }
220 Ok(())
221}
222
223fn log_rollback_invariant(
225 state: &AppState,
226 name: &str,
227 rollback_outcome: &velesdb_core::error::Result<()>,
228 phase_two_err: &velesdb_core::error::Error,
229) {
230 if state.db.get_any_collection(name).is_some() {
231 tracing::error!(
232 collection = %name,
233 rollback_outcome = ?rollback_outcome,
234 phase_two_error = %phase_two_err,
235 "post-rollback invariant violated: collection still present in \
236 registry after delete_collection was attempted. Manual \
237 reconciliation required — client retries will fail with \
238 CollectionExists until the orphaned collection is cleaned up."
239 );
240 }
241}
242
243#[allow(clippy::option_option)]
251#[derive(Default)]
252struct AdvancedCreateOverrides {
253 pq_rescore_oversampling: Option<Option<u32>>,
254 #[cfg(feature = "persistence")]
255 deferred_indexing: Option<Option<velesdb_core::collection::streaming::DeferredIndexerConfig>>,
256 async_index_builder:
257 Option<Option<velesdb_core::collection::streaming::AsyncIndexBuilderConfig>>,
258}
259
260impl AdvancedCreateOverrides {
261 #[cfg(feature = "persistence")]
262 fn has_any(&self) -> bool {
263 self.pq_rescore_oversampling.is_some()
264 || self.deferred_indexing.is_some()
265 || self.async_index_builder.is_some()
266 }
267
268 #[cfg(not(feature = "persistence"))]
269 fn has_any(&self) -> bool {
270 self.pq_rescore_oversampling.is_some() || self.async_index_builder.is_some()
271 }
272}
273
274#[allow(clippy::result_large_err)]
278fn parse_advanced_config(
279 req: &CreateCollectionRequest,
280) -> Result<AdvancedCreateOverrides, axum::response::Response> {
281 let mut overrides = AdvancedCreateOverrides {
282 pq_rescore_oversampling: req.pq_rescore_oversampling.map(Some),
283 ..Default::default()
284 };
285
286 #[cfg(feature = "persistence")]
287 if let Some(ref value) = req.deferred_indexing {
288 let parsed: velesdb_core::collection::streaming::DeferredIndexerConfig =
289 serde_json::from_value(value.clone()).map_err(|e| {
290 error_response(
291 StatusCode::BAD_REQUEST,
292 format!("Invalid 'deferred_indexing' configuration: {e}"),
293 )
294 })?;
295 overrides.deferred_indexing = Some(Some(parsed));
296 }
297
298 if let Some(ref value) = req.async_index_builder {
299 let parsed: velesdb_core::collection::streaming::AsyncIndexBuilderConfig =
300 serde_json::from_value(value.clone()).map_err(|e| {
301 error_response(
302 StatusCode::BAD_REQUEST,
303 format!("Invalid 'async_index_builder' configuration: {e}"),
304 )
305 })?;
306 overrides.async_index_builder = Some(Some(parsed));
307 }
308
309 Ok(overrides)
310}
311
312#[allow(clippy::result_large_err)]
317fn parse_graph_schema(
318 req: &CreateCollectionRequest,
319) -> Result<velesdb_core::GraphSchema, axum::response::Response> {
320 match req.graph_schema.as_ref() {
321 Some(value) => serde_json::from_value(value.clone()).map_err(|e| {
322 error_response(
323 StatusCode::BAD_REQUEST,
324 format!("Invalid 'graph_schema' payload: {e}"),
325 )
326 }),
327 None => Ok(velesdb_core::GraphSchema::schemaless()),
328 }
329}
330
331#[allow(clippy::result_large_err)]
333fn dispatch_create(
334 state: &AppState,
335 req: &CreateCollectionRequest,
336 metric: DistanceMetric,
337 storage_mode: StorageMode,
338) -> Result<velesdb_core::error::Result<()>, axum::response::Response> {
339 match req.collection_type.to_lowercase().as_str() {
340 "metadata_only" | "metadata-only" | "metadata" => {
341 Ok(state.db.create_metadata_collection(&req.name))
342 }
343 "graph" | "knowledge_graph" | "kg" => {
344 let schema = parse_graph_schema(req)?;
345 Ok(state.db.create_graph_collection(&req.name, schema))
346 }
347 "vector" | "" => create_vector_collection(state, req, metric, storage_mode),
348 _ => Err(error_response(
349 StatusCode::BAD_REQUEST,
350 format!(
351 "Invalid collection_type: {}. Valid: vector, graph, metadata_only",
352 req.collection_type
353 ),
354 )),
355 }
356}
357
358fn create_collection_success_response(req: &CreateCollectionRequest) -> axum::response::Response {
360 let mut warnings = Vec::new();
361 let is_vector = matches!(req.collection_type.to_lowercase().as_str(), "vector" | "");
362 if is_vector {
363 warnings.push("Collection dimension and metric are immutable after creation. If your embedding model changes, create a new collection and reindex data.");
364 warnings.push("For first queries, start without strict filters/thresholds, then tighten progressively.");
365 }
366
367 (
368 StatusCode::CREATED,
369 Json(serde_json::json!({
370 "message": "Collection created",
371 "name": req.name,
372 "type": req.collection_type,
373 "warnings": warnings
374 })),
375 )
376 .into_response()
377}
378
379#[utoipa::path(
381 get,
382 path = "/collections/{name}",
383 tag = "collections",
384 params(
385 ("name" = String, Path, description = "Collection name")
386 ),
387 responses(
388 (status = 200, description = "Collection details", body = CollectionResponse),
389 (status = 404, description = "Collection not found", body = ErrorResponse)
390 )
391)]
392pub async fn get_collection(
393 State(state): State<Arc<AppState>>,
394 Path(name): Path<String>,
395) -> impl IntoResponse {
396 let collection = match get_collection_or_404(&state, &name) {
397 Ok(c) => c,
398 Err(resp) => return resp,
399 };
400
401 let config = collection.config();
402 Json(CollectionResponse {
403 name: config.name,
404 dimension: config.dimension,
405 metric: format!("{:?}", config.metric).to_lowercase(),
406 point_count: config.point_count,
407 storage_mode: format!("{:?}", config.storage_mode).to_lowercase(),
408 })
409 .into_response()
410}
411
412#[utoipa::path(
414 get,
415 path = "/collections/{name}/sanity",
416 tag = "collections",
417 params(
418 ("name" = String, Path, description = "Collection name")
419 ),
420 responses(
421 (status = 200, description = "Collection sanity status", body = Object),
422 (status = 404, description = "Collection not found", body = ErrorResponse)
423 )
424)]
425pub async fn collection_sanity(
426 State(state): State<Arc<AppState>>,
427 Path(name): Path<String>,
428) -> impl IntoResponse {
429 let collection = match get_collection_or_404(&state, &name) {
430 Ok(c) => c,
431 Err(resp) => return resp,
432 };
433
434 let config = collection.config();
435 build_sanity_response(&state, &config, &collection)
436}
437
438fn build_sanity_response(
440 state: &AppState,
441 config: &velesdb_core::collection::CollectionConfig,
442 collection: &velesdb_core::AnyCollection,
443) -> axum::response::Response {
444 let has_data = config.point_count > 0;
445 Json(serde_json::json!({
446 "collection": config.name,
447 "dimension": config.dimension,
448 "metric": format!("{:?}", config.metric).to_lowercase(),
449 "point_count": config.point_count,
450 "is_empty": collection.is_empty(),
451 "checks": {
452 "has_vectors": has_data,
453 "search_ready": has_data,
454 "dimension_configured": config.dimension > 0
455 },
456 "diagnostics": {
457 "search_requests_total": state.onboarding_metrics.search_requests_total.load(std::sync::atomic::Ordering::Relaxed),
458 "dimension_mismatch_total": state.onboarding_metrics.dimension_mismatch_total.load(std::sync::atomic::Ordering::Relaxed),
459 "empty_search_results_total": state.onboarding_metrics.empty_search_results_total.load(std::sync::atomic::Ordering::Relaxed),
460 "filter_parse_errors_total": state.onboarding_metrics.filter_parse_errors_total.load(std::sync::atomic::Ordering::Relaxed)
461 },
462 "hints": if has_data {
463 vec![
464 "Run a search without strict filters first, then tighten filters progressively."
465 ]
466 } else {
467 vec![
468 "Insert at least one known vector before evaluating search quality.",
469 "Verify you are querying the intended collection."
470 ]
471 }
472 }))
473 .into_response()
474}
475
476#[utoipa::path(
478 delete,
479 path = "/collections/{name}",
480 tag = "collections",
481 params(
482 ("name" = String, Path, description = "Collection name")
483 ),
484 responses(
485 (status = 200, description = "Collection deleted", body = Object),
486 (status = 404, description = "Collection not found", body = ErrorResponse)
487 )
488)]
489pub async fn delete_collection(
490 State(state): State<Arc<AppState>>,
491 Path(name): Path<String>,
492) -> impl IntoResponse {
493 match state.db.delete_collection(&name) {
494 Ok(()) => Json(serde_json::json!({
495 "message": "Collection deleted",
496 "name": name
497 }))
498 .into_response(),
499 Err(e) => auto_core_error_response(&e),
500 }
501}
502
503#[utoipa::path(
505 get,
506 path = "/collections/{name}/empty",
507 tag = "collections",
508 params(
509 ("name" = String, Path, description = "Collection name")
510 ),
511 responses(
512 (status = 200, description = "Empty status", body = Object),
513 (status = 404, description = "Collection not found", body = ErrorResponse)
514 )
515)]
516pub async fn is_empty(
517 State(state): State<Arc<AppState>>,
518 Path(name): Path<String>,
519) -> impl IntoResponse {
520 let collection = match get_collection_or_404(&state, &name) {
521 Ok(c) => c,
522 Err(resp) => return resp,
523 };
524
525 Json(serde_json::json!({
526 "is_empty": collection.is_empty()
527 }))
528 .into_response()
529}
530
531#[utoipa::path(
533 post,
534 path = "/collections/{name}/flush",
535 tag = "collections",
536 params(
537 ("name" = String, Path, description = "Collection name")
538 ),
539 responses(
540 (status = 200, description = "Flushed successfully", body = Object),
541 (status = 404, description = "Collection not found", body = ErrorResponse),
542 (status = 500, description = "Flush failed", body = ErrorResponse)
543 )
544)]
545pub async fn flush_collection(
546 State(state): State<Arc<AppState>>,
547 Path(name): Path<String>,
548) -> impl IntoResponse {
549 let collection = match get_collection_or_404(&state, &name) {
550 Ok(c) => c,
551 Err(resp) => return resp,
552 };
553
554 let result = tokio::task::spawn_blocking(move || collection.flush()).await;
555 match result {
556 Ok(Ok(())) => Json(serde_json::json!({
557 "message": "Flushed successfully",
558 "collection": name
559 }))
560 .into_response(),
561 Ok(Err(e)) => auto_core_error_response(&e),
562 Err(join_err) => error_response(
563 StatusCode::INTERNAL_SERVER_ERROR,
564 format!("flush task panicked: {join_err}"),
565 ),
566 }
567}