Skip to main content

velesdb_server/handlers/
collections.rs

1//! Collection management handlers.
2
3use axum::{
4    extract::{Path, State},
5    http::StatusCode,
6    response::IntoResponse,
7    Json,
8};
9use std::sync::Arc;
10
11use crate::types::{CollectionResponse, CreateCollectionRequest, ErrorResponse};
12use crate::AppState;
13use velesdb_core::index::HnswParams;
14use velesdb_core::{DistanceMetric, StorageMode};
15
16use super::helpers::{auto_core_error_response, error_response, get_collection_or_404};
17
18/// List all collections.
19#[utoipa::path(
20    get,
21    path = "/collections",
22    tag = "collections",
23    responses(
24        (status = 200, description = "List of collections", body = Object)
25    )
26)]
27pub async fn list_collections(State(state): State<Arc<AppState>>) -> impl IntoResponse {
28    let collections = state.db.list_collections();
29    Json(serde_json::json!({ "collections": collections }))
30}
31
32/// Create a new collection.
33#[utoipa::path(
34    post,
35    path = "/collections",
36    tag = "collections",
37    request_body = CreateCollectionRequest,
38    responses(
39        (status = 201, description = "Collection created", body = Object),
40        (status = 400, description = "Invalid request", body = ErrorResponse)
41    )
42)]
43pub async fn create_collection(
44    State(state): State<Arc<AppState>>,
45    Json(req): Json<CreateCollectionRequest>,
46) -> impl IntoResponse {
47    let metric = match parse_distance_metric(&req.metric) {
48        Ok(m) => m,
49        Err(resp) => return resp,
50    };
51
52    let storage_mode = match parse_storage_mode(&req.storage_mode) {
53        Ok(s) => s,
54        Err(resp) => return resp,
55    };
56
57    let result = match dispatch_create(&state, &req, metric, storage_mode) {
58        Ok(r) => r,
59        Err(resp) => return resp,
60    };
61
62    match result {
63        Ok(()) => create_collection_success_response(&req),
64        Err(e) => auto_core_error_response(&e),
65    }
66}
67
68/// Parse a distance metric string into the core enum.
69///
70/// Delegates to [`DistanceMetric::from_str`] to keep alias parsing in one place.
71#[allow(clippy::result_large_err)]
72fn parse_distance_metric(raw: &str) -> Result<DistanceMetric, axum::response::Response> {
73    raw.parse::<DistanceMetric>()
74        .map_err(|e| error_response(StatusCode::BAD_REQUEST, e.to_string()))
75}
76
77/// Parse a storage mode string into the core enum.
78///
79/// Delegates to [`StorageMode::from_str`] (single source of truth in `velesdb-core`).
80#[allow(clippy::result_large_err)]
81fn parse_storage_mode(raw: &str) -> Result<StorageMode, axum::response::Response> {
82    raw.parse::<StorageMode>()
83        .map_err(|e| error_response(StatusCode::BAD_REQUEST, e))
84}
85
86/// Build a full `HnswParams` override from the request fields, or return
87/// `None` when the caller supplied no HNSW tuning fields at all.
88///
89/// The base parameters come from `HnswParams::auto(dimension)` so that
90/// unspecified fields inherit the engine's dimension-aware defaults.
91/// `storage_mode` always mirrors the top-level collection `storage_mode`
92/// — callers cannot desync the HNSW inner storage mode from the
93/// collection's advertised quantisation (the `HnswParams::storage_mode`
94/// field is a denormalised copy that the engine keeps in sync).
95fn build_hnsw_params_override(
96    req: &CreateCollectionRequest,
97    dimension: usize,
98    storage_mode: StorageMode,
99) -> Option<HnswParams> {
100    if req.hnsw_m.is_none()
101        && req.hnsw_ef_construction.is_none()
102        && req.hnsw_alpha.is_none()
103        && req.hnsw_max_elements.is_none()
104    {
105        return None;
106    }
107    let base = HnswParams::auto(dimension);
108    Some(HnswParams {
109        max_connections: req.hnsw_m.unwrap_or(base.max_connections),
110        ef_construction: req.hnsw_ef_construction.unwrap_or(base.ef_construction),
111        max_elements: req.hnsw_max_elements.unwrap_or(base.max_elements),
112        storage_mode,
113        alpha: req.hnsw_alpha.unwrap_or(base.alpha),
114    })
115}
116
117/// Create a vector collection, requiring a dimension in the request.
118///
119/// Applies advanced configuration overrides (pq_rescore_oversampling,
120/// deferred_indexing, async_index_builder) in a second pass via
121/// `VectorCollection::apply_advanced_config` once the base collection
122/// has been registered. This two-step approach keeps the core
123/// `Database::create_vector_collection_*` API stable while still
124/// honouring the full PROP-CONFIG-ADVANCED field set on the REST
125/// surface.
126#[allow(clippy::result_large_err)]
127fn create_vector_collection(
128    state: &AppState,
129    req: &CreateCollectionRequest,
130    metric: DistanceMetric,
131    storage_mode: StorageMode,
132) -> Result<velesdb_core::error::Result<()>, axum::response::Response> {
133    let dimension = req.dimension.ok_or_else(|| {
134        error_response(
135            StatusCode::BAD_REQUEST,
136            "dimension is required for vector collections".to_string(),
137        )
138    })?;
139
140    // Parse the advanced override fields up-front so a malformed JSON
141    // payload fails the request before any collection is created on
142    // disk. This avoids the half-initialised state where the base
143    // collection exists but the advanced fields are missing.
144    let advanced = parse_advanced_config(req)?;
145
146    // Phase 1: create the base collection with HNSW params.
147    //
148    // Any of `hnsw_m`, `hnsw_ef_construction`, `hnsw_alpha`, or
149    // `hnsw_max_elements` being present triggers the "with_params"
150    // path so the caller-supplied values flow into a full `HnswParams`
151    // starting from the engine's dimension-aware auto defaults. The
152    // legacy `with_hnsw` helper cannot carry alpha/max_elements and
153    // would silently drop them, re-introducing the PROP-HNSW-ALPHA gap.
154    let base_result = if let Some(hnsw_params) =
155        build_hnsw_params_override(req, dimension, storage_mode)
156    {
157        // Reject out-of-range tunables (e.g. hnsw_alpha < 1.0 or non-finite)
158        // before any collection is created on disk.
159        hnsw_params
160            .validate()
161            .map_err(|e| error_response(StatusCode::BAD_REQUEST, e.to_string()))?;
162        state.db.create_vector_collection_with_params(
163            &req.name,
164            dimension,
165            metric,
166            storage_mode,
167            hnsw_params,
168            None,
169        )
170    } else {
171        state
172            .db
173            .create_vector_collection_with_options(&req.name, dimension, metric, storage_mode)
174    };
175    if let Err(e) = base_result {
176        return Ok(Err(e));
177    }
178
179    // Phase 2: persist advanced overrides if any were requested.
180    if advanced.has_any() {
181        return Ok(apply_advanced_with_rollback(state, &req.name, advanced));
182    }
183
184    Ok(Ok(()))
185}
186
187/// Applies advanced config overrides with rollback on failure.
188///
189/// If `apply_advanced_config` fails, deletes the collection to avoid
190/// orphaned half-initialised state, and logs diagnostics for operators.
191fn apply_advanced_with_rollback(
192    state: &AppState,
193    name: &str,
194    advanced: AdvancedCreateOverrides,
195) -> velesdb_core::error::Result<()> {
196    let Some(coll) = state.db.get_vector_collection(name) else {
197        return Err(velesdb_core::error::Error::CollectionNotFound(
198            name.to_string(),
199        ));
200    };
201    if let Err(phase_two_err) = coll.apply_advanced_config(
202        advanced.pq_rescore_oversampling,
203        #[cfg(feature = "persistence")]
204        advanced.deferred_indexing,
205        advanced.async_index_builder,
206    ) {
207        drop(coll);
208        let rollback_outcome = state.db.delete_collection(name);
209        if let Err(ref rollback_err) = rollback_outcome {
210            tracing::warn!(
211                collection = %name,
212                rollback_error = %rollback_err,
213                phase_two_error = %phase_two_err,
214                "failed to roll back collection after apply_advanced_config error"
215            );
216        }
217        log_rollback_invariant(state, name, &rollback_outcome, &phase_two_err);
218        return Err(phase_two_err);
219    }
220    Ok(())
221}
222
223/// Logs a critical diagnostic if a collection survives rollback.
224fn log_rollback_invariant(
225    state: &AppState,
226    name: &str,
227    rollback_outcome: &velesdb_core::error::Result<()>,
228    phase_two_err: &velesdb_core::error::Error,
229) {
230    if state.db.get_any_collection(name).is_some() {
231        tracing::error!(
232            collection = %name,
233            rollback_outcome = ?rollback_outcome,
234            phase_two_error = %phase_two_err,
235            "post-rollback invariant violated: collection still present in \
236             registry after delete_collection was attempted. Manual \
237             reconciliation required — client retries will fail with \
238             CollectionExists until the orphaned collection is cleaned up."
239        );
240    }
241}
242
243/// Parsed advanced override fields for the create-collection pipeline.
244///
245/// The outer `Option` signals whether the field was present in the
246/// request body; the inner `Option` carries the value the caller
247/// wanted to persist (including explicit `null` → `Some(None)`).
248/// A local clippy allow is applied because the three-state semantics
249/// are the intended contract here.
250#[allow(clippy::option_option)]
251#[derive(Default)]
252struct AdvancedCreateOverrides {
253    pq_rescore_oversampling: Option<Option<u32>>,
254    #[cfg(feature = "persistence")]
255    deferred_indexing: Option<Option<velesdb_core::collection::streaming::DeferredIndexerConfig>>,
256    async_index_builder:
257        Option<Option<velesdb_core::collection::streaming::AsyncIndexBuilderConfig>>,
258}
259
260impl AdvancedCreateOverrides {
261    #[cfg(feature = "persistence")]
262    fn has_any(&self) -> bool {
263        self.pq_rescore_oversampling.is_some()
264            || self.deferred_indexing.is_some()
265            || self.async_index_builder.is_some()
266    }
267
268    #[cfg(not(feature = "persistence"))]
269    fn has_any(&self) -> bool {
270        self.pq_rescore_oversampling.is_some() || self.async_index_builder.is_some()
271    }
272}
273
274/// Parses the advanced override JSON fields on `CreateCollectionRequest`
275/// into typed `CollectionConfig` fragments. A malformed JSON payload
276/// becomes a 400 response.
277#[allow(clippy::result_large_err)]
278fn parse_advanced_config(
279    req: &CreateCollectionRequest,
280) -> Result<AdvancedCreateOverrides, axum::response::Response> {
281    let mut overrides = AdvancedCreateOverrides {
282        pq_rescore_oversampling: req.pq_rescore_oversampling.map(Some),
283        ..Default::default()
284    };
285
286    #[cfg(feature = "persistence")]
287    if let Some(ref value) = req.deferred_indexing {
288        let parsed: velesdb_core::collection::streaming::DeferredIndexerConfig =
289            serde_json::from_value(value.clone()).map_err(|e| {
290                error_response(
291                    StatusCode::BAD_REQUEST,
292                    format!("Invalid 'deferred_indexing' configuration: {e}"),
293                )
294            })?;
295        overrides.deferred_indexing = Some(Some(parsed));
296    }
297
298    if let Some(ref value) = req.async_index_builder {
299        let parsed: velesdb_core::collection::streaming::AsyncIndexBuilderConfig =
300            serde_json::from_value(value.clone()).map_err(|e| {
301                error_response(
302                    StatusCode::BAD_REQUEST,
303                    format!("Invalid 'async_index_builder' configuration: {e}"),
304                )
305            })?;
306        overrides.async_index_builder = Some(Some(parsed));
307    }
308
309    Ok(overrides)
310}
311
312/// Parses the optional `graph_schema` JSON field on
313/// `CreateCollectionRequest` into a typed `GraphSchema`. When the field
314/// is absent the schemaless default is returned, preserving backward
315/// compatibility with callers that relied on the previous behaviour.
316#[allow(clippy::result_large_err)]
317fn parse_graph_schema(
318    req: &CreateCollectionRequest,
319) -> Result<velesdb_core::GraphSchema, axum::response::Response> {
320    match req.graph_schema.as_ref() {
321        Some(value) => serde_json::from_value(value.clone()).map_err(|e| {
322            error_response(
323                StatusCode::BAD_REQUEST,
324                format!("Invalid 'graph_schema' payload: {e}"),
325            )
326        }),
327        None => Ok(velesdb_core::GraphSchema::schemaless()),
328    }
329}
330
331/// Dispatch collection creation based on `collection_type`.
332#[allow(clippy::result_large_err)]
333fn dispatch_create(
334    state: &AppState,
335    req: &CreateCollectionRequest,
336    metric: DistanceMetric,
337    storage_mode: StorageMode,
338) -> Result<velesdb_core::error::Result<()>, axum::response::Response> {
339    match req.collection_type.to_lowercase().as_str() {
340        "metadata_only" | "metadata-only" | "metadata" => {
341            Ok(state.db.create_metadata_collection(&req.name))
342        }
343        "graph" | "knowledge_graph" | "kg" => {
344            let schema = parse_graph_schema(req)?;
345            Ok(state.db.create_graph_collection(&req.name, schema))
346        }
347        "vector" | "" => create_vector_collection(state, req, metric, storage_mode),
348        _ => Err(error_response(
349            StatusCode::BAD_REQUEST,
350            format!(
351                "Invalid collection_type: {}. Valid: vector, graph, metadata_only",
352                req.collection_type
353            ),
354        )),
355    }
356}
357
358/// Build a 201 Created response for successful collection creation.
359fn create_collection_success_response(req: &CreateCollectionRequest) -> axum::response::Response {
360    let mut warnings = Vec::new();
361    let is_vector = matches!(req.collection_type.to_lowercase().as_str(), "vector" | "");
362    if is_vector {
363        warnings.push("Collection dimension and metric are immutable after creation. If your embedding model changes, create a new collection and reindex data.");
364        warnings.push("For first queries, start without strict filters/thresholds, then tighten progressively.");
365    }
366
367    (
368        StatusCode::CREATED,
369        Json(serde_json::json!({
370            "message": "Collection created",
371            "name": req.name,
372            "type": req.collection_type,
373            "warnings": warnings
374        })),
375    )
376        .into_response()
377}
378
379/// Get collection information.
380#[utoipa::path(
381    get,
382    path = "/collections/{name}",
383    tag = "collections",
384    params(
385        ("name" = String, Path, description = "Collection name")
386    ),
387    responses(
388        (status = 200, description = "Collection details", body = CollectionResponse),
389        (status = 404, description = "Collection not found", body = ErrorResponse)
390    )
391)]
392pub async fn get_collection(
393    State(state): State<Arc<AppState>>,
394    Path(name): Path<String>,
395) -> impl IntoResponse {
396    let collection = match get_collection_or_404(&state, &name) {
397        Ok(c) => c,
398        Err(resp) => return resp,
399    };
400
401    let config = collection.config();
402    Json(CollectionResponse {
403        name: config.name,
404        dimension: config.dimension,
405        metric: format!("{:?}", config.metric).to_lowercase(),
406        point_count: config.point_count,
407        storage_mode: format!("{:?}", config.storage_mode).to_lowercase(),
408    })
409    .into_response()
410}
411
412/// Run a quick sanity check for onboarding and troubleshooting.
413#[utoipa::path(
414    get,
415    path = "/collections/{name}/sanity",
416    tag = "collections",
417    params(
418        ("name" = String, Path, description = "Collection name")
419    ),
420    responses(
421        (status = 200, description = "Collection sanity status", body = Object),
422        (status = 404, description = "Collection not found", body = ErrorResponse)
423    )
424)]
425pub async fn collection_sanity(
426    State(state): State<Arc<AppState>>,
427    Path(name): Path<String>,
428) -> impl IntoResponse {
429    let collection = match get_collection_or_404(&state, &name) {
430        Ok(c) => c,
431        Err(resp) => return resp,
432    };
433
434    let config = collection.config();
435    build_sanity_response(&state, &config, &collection)
436}
437
438/// Build the JSON sanity check response body.
439fn build_sanity_response(
440    state: &AppState,
441    config: &velesdb_core::collection::CollectionConfig,
442    collection: &velesdb_core::AnyCollection,
443) -> axum::response::Response {
444    let has_data = config.point_count > 0;
445    Json(serde_json::json!({
446        "collection": config.name,
447        "dimension": config.dimension,
448        "metric": format!("{:?}", config.metric).to_lowercase(),
449        "point_count": config.point_count,
450        "is_empty": collection.is_empty(),
451        "checks": {
452            "has_vectors": has_data,
453            "search_ready": has_data,
454            "dimension_configured": config.dimension > 0
455        },
456        "diagnostics": {
457            "search_requests_total": state.onboarding_metrics.search_requests_total.load(std::sync::atomic::Ordering::Relaxed),
458            "dimension_mismatch_total": state.onboarding_metrics.dimension_mismatch_total.load(std::sync::atomic::Ordering::Relaxed),
459            "empty_search_results_total": state.onboarding_metrics.empty_search_results_total.load(std::sync::atomic::Ordering::Relaxed),
460            "filter_parse_errors_total": state.onboarding_metrics.filter_parse_errors_total.load(std::sync::atomic::Ordering::Relaxed)
461        },
462        "hints": if has_data {
463            vec![
464                "Run a search without strict filters first, then tighten filters progressively."
465            ]
466        } else {
467            vec![
468                "Insert at least one known vector before evaluating search quality.",
469                "Verify you are querying the intended collection."
470            ]
471        }
472    }))
473    .into_response()
474}
475
476/// Delete a collection.
477#[utoipa::path(
478    delete,
479    path = "/collections/{name}",
480    tag = "collections",
481    params(
482        ("name" = String, Path, description = "Collection name")
483    ),
484    responses(
485        (status = 200, description = "Collection deleted", body = Object),
486        (status = 404, description = "Collection not found", body = ErrorResponse)
487    )
488)]
489pub async fn delete_collection(
490    State(state): State<Arc<AppState>>,
491    Path(name): Path<String>,
492) -> impl IntoResponse {
493    match state.db.delete_collection(&name) {
494        Ok(()) => Json(serde_json::json!({
495            "message": "Collection deleted",
496            "name": name
497        }))
498        .into_response(),
499        Err(e) => auto_core_error_response(&e),
500    }
501}
502
503/// Check if a collection is empty.
504#[utoipa::path(
505    get,
506    path = "/collections/{name}/empty",
507    tag = "collections",
508    params(
509        ("name" = String, Path, description = "Collection name")
510    ),
511    responses(
512        (status = 200, description = "Empty status", body = Object),
513        (status = 404, description = "Collection not found", body = ErrorResponse)
514    )
515)]
516pub async fn is_empty(
517    State(state): State<Arc<AppState>>,
518    Path(name): Path<String>,
519) -> impl IntoResponse {
520    let collection = match get_collection_or_404(&state, &name) {
521        Ok(c) => c,
522        Err(resp) => return resp,
523    };
524
525    Json(serde_json::json!({
526        "is_empty": collection.is_empty()
527    }))
528    .into_response()
529}
530
531/// Flush pending changes to disk.
532#[utoipa::path(
533    post,
534    path = "/collections/{name}/flush",
535    tag = "collections",
536    params(
537        ("name" = String, Path, description = "Collection name")
538    ),
539    responses(
540        (status = 200, description = "Flushed successfully", body = Object),
541        (status = 404, description = "Collection not found", body = ErrorResponse),
542        (status = 500, description = "Flush failed", body = ErrorResponse)
543    )
544)]
545pub async fn flush_collection(
546    State(state): State<Arc<AppState>>,
547    Path(name): Path<String>,
548) -> impl IntoResponse {
549    let collection = match get_collection_or_404(&state, &name) {
550        Ok(c) => c,
551        Err(resp) => return resp,
552    };
553
554    let result = tokio::task::spawn_blocking(move || collection.flush()).await;
555    match result {
556        Ok(Ok(())) => Json(serde_json::json!({
557            "message": "Flushed successfully",
558            "collection": name
559        }))
560        .into_response(),
561        Ok(Err(e)) => auto_core_error_response(&e),
562        Err(join_err) => error_response(
563            StatusCode::INTERNAL_SERVER_ERROR,
564            format!("flush task panicked: {join_err}"),
565        ),
566    }
567}