Skip to main content

velesdb_server/handlers/
admin.rs

1//! Admin and diagnostic handlers: stats, config, guardrails, analyze.
2
3use axum::{
4    extract::{Path, State},
5    http::StatusCode,
6    response::IntoResponse,
7    Json,
8};
9use std::sync::Arc;
10
11use crate::types::{
12    CollectionConfigResponse, CollectionDiagnosticsResponse, CollectionStatsResponse,
13    ColumnStatsResponse, ErrorResponse, GuardRailsConfigRequest, GuardRailsConfigResponse,
14    IndexStatsResponse,
15};
16use crate::AppState;
17
18use super::helpers::{
19    auto_core_error_response, error_response, get_collection_or_404, get_vector_collection_or_404,
20};
21
22/// Get detailed collection configuration (HNSW params, storage mode, schema, etc.).
23#[utoipa::path(
24    get,
25    path = "/collections/{name}/config",
26    tag = "collections",
27    params(
28        ("name" = String, Path, description = "Collection name")
29    ),
30    responses(
31        (status = 200, description = "Collection configuration", body = CollectionConfigResponse),
32        (status = 404, description = "Collection not found", body = ErrorResponse)
33    )
34)]
35pub async fn get_collection_config(
36    State(state): State<Arc<AppState>>,
37    Path(name): Path<String>,
38) -> impl IntoResponse {
39    let collection = match get_collection_or_404(&state, &name) {
40        Ok(c) => c,
41        Err(resp) => return resp,
42    };
43
44    let config = collection.config();
45    let graph_schema = config
46        .graph_schema
47        .as_ref()
48        .and_then(|gs| serde_json::to_value(gs).ok());
49    let hnsw_params = config
50        .hnsw_params
51        .as_ref()
52        .and_then(|p| serde_json::to_value(p).ok());
53
54    // `deferred_indexing` is only populated under the `persistence`
55    // feature because the core config field is gated the same way.
56    #[cfg(feature = "persistence")]
57    let deferred_indexing = config
58        .deferred_indexing
59        .as_ref()
60        .and_then(|d| serde_json::to_value(d).ok());
61    #[cfg(not(feature = "persistence"))]
62    let deferred_indexing = None;
63
64    let async_index_builder = config
65        .async_index_builder
66        .as_ref()
67        .and_then(|a| serde_json::to_value(a).ok());
68
69    Json(CollectionConfigResponse {
70        name: config.name,
71        dimension: config.dimension,
72        metric: format!("{:?}", config.metric).to_lowercase(),
73        storage_mode: format!("{:?}", config.storage_mode).to_lowercase(),
74        point_count: config.point_count,
75        metadata_only: config.metadata_only,
76        graph_schema,
77        embedding_dimension: config.embedding_dimension,
78        schema_version: config.schema_version,
79        pq_rescore_oversampling: config.pq_rescore_oversampling,
80        hnsw_params,
81        deferred_indexing,
82        async_index_builder,
83    })
84    .into_response()
85}
86
87/// Rebuilds the HNSW index of a vector collection, reclaiming memory
88/// occupied by tombstoned entries and producing a fresh graph from
89/// the current vector storage.
90///
91/// This is a blocking operation: for large collections it may take
92/// several seconds. The response includes the number of entries that
93/// were compacted during the rebuild.
94#[utoipa::path(
95    post,
96    path = "/collections/{name}/index/rebuild",
97    tag = "collections",
98    params(
99        ("name" = String, Path, description = "Collection name")
100    ),
101    responses(
102        (status = 200, description = "Index rebuilt", body = Object),
103        (status = 404, description = "Collection not found", body = ErrorResponse),
104        (status = 500, description = "Rebuild failed", body = ErrorResponse)
105    )
106)]
107pub async fn rebuild_index(
108    State(state): State<Arc<AppState>>,
109    Path(name): Path<String>,
110) -> impl IntoResponse {
111    let collection = match get_vector_collection_or_404(&state, &name) {
112        Ok(c) => c,
113        Err(resp) => return resp,
114    };
115
116    let result = tokio::task::spawn_blocking(move || collection.rebuild_index()).await;
117    match result {
118        Ok(Ok(compacted)) => (
119            StatusCode::OK,
120            Json(serde_json::json!({
121                "message": "Index rebuilt",
122                "collection": name,
123                "compacted_entries": compacted
124            })),
125        )
126            .into_response(),
127        Ok(Err(e)) => auto_core_error_response(&e),
128        Err(join_err) => error_response(
129            StatusCode::INTERNAL_SERVER_ERROR,
130            format!("rebuild_index task panicked: {join_err}"),
131        ),
132    }
133}
134
135/// Vacuums the HNSW index of a vector collection, removing tombstoned
136/// entries and rebuilding the graph from current vectors.
137///
138/// Semantically equivalent to `POST /collections/{name}/index/rebuild`
139/// but exposed under a more intuitive maintenance-oriented name.
140///
141/// This is a blocking operation: for large collections it may take
142/// several seconds.
143#[utoipa::path(
144    post,
145    path = "/collections/{name}/vacuum",
146    tag = "collections",
147    params(
148        ("name" = String, Path, description = "Collection name")
149    ),
150    responses(
151        (status = 200, description = "Index vacuumed", body = Object),
152        (status = 404, description = "Collection not found", body = ErrorResponse),
153        (status = 500, description = "Vacuum failed", body = ErrorResponse)
154    )
155)]
156pub async fn vacuum_collection(
157    State(state): State<Arc<AppState>>,
158    Path(name): Path<String>,
159) -> impl IntoResponse {
160    let collection = match get_vector_collection_or_404(&state, &name) {
161        Ok(c) => c,
162        Err(resp) => return resp,
163    };
164
165    let result = tokio::task::spawn_blocking(move || collection.rebuild_index()).await;
166    match result {
167        Ok(Ok(compacted)) => (
168            StatusCode::OK,
169            Json(serde_json::json!({
170                "message": "Index vacuumed",
171                "collection": name,
172                "compacted_entries": compacted
173            })),
174        )
175            .into_response(),
176        Ok(Err(e)) => auto_core_error_response(&e),
177        Err(join_err) => error_response(
178            StatusCode::INTERNAL_SERVER_ERROR,
179            format!("vacuum task panicked: {join_err}"),
180        ),
181    }
182}
183
184/// Compacts the vector storage of a collection, rewriting active vectors
185/// into a contiguous layout and reclaiming disk space from deleted entries.
186///
187/// This is a blocking operation that may involve significant I/O for
188/// large, fragmented collections.
189#[utoipa::path(
190    post,
191    path = "/collections/{name}/compact",
192    tag = "collections",
193    params(
194        ("name" = String, Path, description = "Collection name")
195    ),
196    responses(
197        (status = 200, description = "Storage compacted", body = Object),
198        (status = 404, description = "Collection not found", body = ErrorResponse),
199        (status = 500, description = "Compaction failed", body = ErrorResponse)
200    )
201)]
202pub async fn compact_collection(
203    State(state): State<Arc<AppState>>,
204    Path(name): Path<String>,
205) -> impl IntoResponse {
206    let collection = match get_vector_collection_or_404(&state, &name) {
207        Ok(c) => c,
208        Err(resp) => return resp,
209    };
210
211    let result = tokio::task::spawn_blocking(move || collection.compact_storage()).await;
212    match result {
213        Ok(Ok(bytes_reclaimed)) => (
214            StatusCode::OK,
215            Json(serde_json::json!({
216                "message": "Storage compacted",
217                "collection": name,
218                "bytes_reclaimed": bytes_reclaimed
219            })),
220        )
221            .into_response(),
222        Ok(Err(e)) => auto_core_error_response(&e),
223        Err(join_err) => error_response(
224            StatusCode::INTERNAL_SERVER_ERROR,
225            format!("compact task panicked: {join_err}"),
226        ),
227    }
228}
229
230/// Reorders the HNSW adjacency lists and vector storage for cache
231/// locality so nodes traversed together during search sit close in
232/// memory (issue #377). No-op for collections with fewer than 1 000
233/// vectors. Recall is preserved — only the physical layout changes.
234///
235/// This is a blocking operation that may involve significant I/O for
236/// large collections.
237#[utoipa::path(
238    post,
239    path = "/collections/{name}/locality/reorder",
240    tag = "collections",
241    params(
242        ("name" = String, Path, description = "Collection name")
243    ),
244    responses(
245        (status = 200, description = "Locality reordered", body = Object),
246        (status = 404, description = "Collection not found", body = ErrorResponse),
247        (status = 500, description = "Reorder failed", body = ErrorResponse)
248    )
249)]
250pub async fn reorder_for_locality(
251    State(state): State<Arc<AppState>>,
252    Path(name): Path<String>,
253) -> impl IntoResponse {
254    let collection = match get_vector_collection_or_404(&state, &name) {
255        Ok(c) => c,
256        Err(resp) => return resp,
257    };
258
259    let result = tokio::task::spawn_blocking(move || collection.reorder_for_locality()).await;
260    match result {
261        Ok(Ok(())) => (
262            StatusCode::OK,
263            Json(serde_json::json!({
264                "message": "Locality reordered",
265                "collection": name
266            })),
267        )
268            .into_response(),
269        Ok(Err(e)) => auto_core_error_response(&e),
270        Err(join_err) => error_response(
271            StatusCode::INTERNAL_SERVER_ERROR,
272            format!("reorder task panicked: {join_err}"),
273        ),
274    }
275}
276
277/// Analyze a collection, computing and persisting statistics.
278#[utoipa::path(
279    post,
280    path = "/collections/{name}/analyze",
281    tag = "collections",
282    params(
283        ("name" = String, Path, description = "Collection name")
284    ),
285    responses(
286        (status = 200, description = "Collection analyzed", body = CollectionStatsResponse),
287        (status = 404, description = "Collection not found", body = ErrorResponse),
288        (status = 500, description = "Analysis failed", body = ErrorResponse)
289    )
290)]
291pub async fn analyze_collection(
292    State(state): State<Arc<AppState>>,
293    Path(name): Path<String>,
294) -> impl IntoResponse {
295    let coll_name = name.clone();
296    let state_clone = state.clone();
297    let result =
298        tokio::task::spawn_blocking(move || state_clone.db.analyze_collection(&coll_name)).await;
299    match result {
300        Ok(Ok(stats)) => {
301            let response = map_stats_to_response(&stats);
302            (StatusCode::OK, Json(response)).into_response()
303        }
304        Ok(Err(e)) => auto_core_error_response(&e),
305        Err(join_err) => error_response(
306            StatusCode::INTERNAL_SERVER_ERROR,
307            format!("analyze_collection task panicked: {join_err}"),
308        ),
309    }
310}
311
312/// Get cached collection statistics (returns 404 if never analyzed).
313#[utoipa::path(
314    get,
315    path = "/collections/{name}/stats",
316    tag = "collections",
317    params(
318        ("name" = String, Path, description = "Collection name")
319    ),
320    responses(
321        (status = 200, description = "Collection statistics", body = CollectionStatsResponse),
322        (status = 404, description = "No statistics available", body = ErrorResponse),
323        (status = 500, description = "Failed to read stats", body = ErrorResponse)
324    )
325)]
326pub async fn get_collection_stats(
327    State(state): State<Arc<AppState>>,
328    Path(name): Path<String>,
329) -> impl IntoResponse {
330    match state.db.get_collection_stats(&name) {
331        Ok(Some(stats)) => {
332            let response = map_stats_to_response(&stats);
333            (StatusCode::OK, Json(response)).into_response()
334        }
335        Ok(None) => error_response(
336            StatusCode::NOT_FOUND,
337            format!("No stats for '{name}'. Run POST /collections/{name}/analyze first."),
338        ),
339        Err(e) => auto_core_error_response(&e),
340    }
341}
342
343/// Get health diagnostics for a collection (index readiness, point count).
344#[utoipa::path(
345    get,
346    path = "/collections/{name}/diagnostics",
347    tag = "collections",
348    params(
349        ("name" = String, Path, description = "Collection name")
350    ),
351    responses(
352        (status = 200, description = "Collection diagnostics", body = CollectionDiagnosticsResponse),
353        (status = 404, description = "Collection not found", body = ErrorResponse)
354    )
355)]
356pub async fn collection_diagnostics(
357    State(state): State<Arc<AppState>>,
358    Path(name): Path<String>,
359) -> impl IntoResponse {
360    match state.db.collection_diagnostics(&name) {
361        Ok(diag) => (StatusCode::OK, Json(diagnostics_to_response(&diag))).into_response(),
362        Err(e) => auto_core_error_response(&e),
363    }
364}
365
366/// Maps the core diagnostics snapshot to the REST response DTO.
367fn diagnostics_to_response(
368    diag: &velesdb_core::collection::CollectionDiagnostics,
369) -> CollectionDiagnosticsResponse {
370    use velesdb_core::collection::IndexHealth;
371    let (index_health, index_health_detail) = match &diag.index_health {
372        IndexHealth::Healthy => ("healthy".to_string(), None),
373        IndexHealth::Empty => ("empty".to_string(), None),
374        IndexHealth::NeedsRebuild(reason) => ("needs_rebuild".to_string(), Some(reason.clone())),
375        _ => ("unknown".to_string(), None),
376    };
377    CollectionDiagnosticsResponse {
378        has_vectors: diag.has_vectors,
379        search_ready: diag.search_ready,
380        dimension_configured: diag.dimension_configured,
381        point_count: diag.point_count,
382        index_health,
383        index_health_detail,
384    }
385}
386
387/// Get current guard-rails configuration.
388#[utoipa::path(
389    get,
390    path = "/guardrails",
391    tag = "guardrails",
392    responses(
393        (status = 200, description = "Current guard-rails config", body = GuardRailsConfigResponse)
394    )
395)]
396pub async fn get_guardrails(State(state): State<Arc<AppState>>) -> impl IntoResponse {
397    let limits = state.query_limits.read();
398    Json(limits_to_response(&limits))
399}
400
401/// Update guard-rails configuration (partial update).
402#[utoipa::path(
403    put,
404    path = "/guardrails",
405    tag = "guardrails",
406    request_body = GuardRailsConfigRequest,
407    responses(
408        (status = 200, description = "Updated guard-rails config", body = GuardRailsConfigResponse)
409    )
410)]
411pub async fn update_guardrails(
412    State(state): State<Arc<AppState>>,
413    Json(req): Json<GuardRailsConfigRequest>,
414) -> impl IntoResponse {
415    let mut limits = state.query_limits.write();
416    apply_guardrails_update(&mut limits, &req);
417
418    // Propagate the updated limits to all active collections so that
419    // subsequent queries use the new thresholds (EPIC-048).
420    state.db.update_guardrails(&limits);
421
422    Json(limits_to_response(&limits))
423}
424
425/// Convert `QueryLimits` to the REST response type.
426fn limits_to_response(limits: &velesdb_core::guardrails::QueryLimits) -> GuardRailsConfigResponse {
427    GuardRailsConfigResponse {
428        max_depth: limits.max_depth,
429        max_cardinality: limits.max_cardinality,
430        memory_limit_bytes: limits.memory_limit_bytes,
431        timeout_ms: limits.timeout_ms,
432        rate_limit_qps: limits.rate_limit_qps,
433        circuit_failure_threshold: limits.circuit_failure_threshold,
434        circuit_recovery_seconds: limits.circuit_recovery_seconds,
435    }
436}
437
438/// Convert core `CollectionStats` to the REST response type.
439fn map_stats_to_response(
440    stats: &velesdb_core::collection::stats::CollectionStats,
441) -> CollectionStatsResponse {
442    let column_stats = stats
443        .column_stats
444        .iter()
445        .map(|(k, v)| {
446            (
447                k.clone(),
448                ColumnStatsResponse {
449                    name: v.name.clone(),
450                    null_count: v.null_count,
451                    distinct_count: v.distinct_count,
452                    min_value: v.min_value.clone(),
453                    max_value: v.max_value.clone(),
454                    avg_size_bytes: v.avg_size_bytes,
455                    histogram_buckets: v.histogram.as_ref().map(|h| h.buckets.len()),
456                    histogram_stale: v.histogram.as_ref().map(|h| h.stale),
457                },
458            )
459        })
460        .collect();
461
462    let index_stats = stats
463        .index_stats
464        .iter()
465        .map(|(k, v)| {
466            (
467                k.clone(),
468                IndexStatsResponse {
469                    name: v.name.clone(),
470                    index_type: v.index_type.clone(),
471                    entry_count: v.entry_count,
472                    depth: v.depth,
473                    size_bytes: v.size_bytes,
474                },
475            )
476        })
477        .collect();
478
479    CollectionStatsResponse {
480        total_points: stats.total_points,
481        total_size_bytes: stats.total_size_bytes,
482        row_count: stats.row_count,
483        deleted_count: stats.deleted_count,
484        avg_row_size_bytes: stats.avg_row_size_bytes,
485        payload_size_bytes: stats.payload_size_bytes,
486        last_analyzed_epoch_ms: stats.last_analyzed_epoch_ms,
487        column_stats,
488        index_stats,
489    }
490}
491
492/// Apply partial update fields to query limits.
493fn apply_guardrails_update(
494    limits: &mut velesdb_core::guardrails::QueryLimits,
495    req: &GuardRailsConfigRequest,
496) {
497    if let Some(v) = req.max_depth {
498        limits.max_depth = v;
499    }
500    if let Some(v) = req.max_cardinality {
501        limits.max_cardinality = v;
502    }
503    if let Some(v) = req.memory_limit_bytes {
504        limits.memory_limit_bytes = v;
505    }
506    if let Some(v) = req.timeout_ms {
507        limits.timeout_ms = v;
508    }
509    if let Some(v) = req.rate_limit_qps {
510        limits.rate_limit_qps = v;
511    }
512    if let Some(v) = req.circuit_failure_threshold {
513        limits.circuit_failure_threshold = v;
514    }
515    if let Some(v) = req.circuit_recovery_seconds {
516        limits.circuit_recovery_seconds = v;
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use velesdb_core::guardrails::QueryLimits;
524
525    #[test]
526    fn test_limits_to_response_roundtrip() {
527        let limits = QueryLimits::default();
528        let response = limits_to_response(&limits);
529        assert_eq!(response.max_depth, limits.max_depth);
530        assert_eq!(response.max_cardinality, limits.max_cardinality);
531        assert_eq!(response.memory_limit_bytes, limits.memory_limit_bytes);
532        assert_eq!(response.timeout_ms, limits.timeout_ms);
533        assert_eq!(response.rate_limit_qps, limits.rate_limit_qps);
534        assert_eq!(
535            response.circuit_failure_threshold,
536            limits.circuit_failure_threshold
537        );
538        assert_eq!(
539            response.circuit_recovery_seconds,
540            limits.circuit_recovery_seconds
541        );
542    }
543
544    #[test]
545    fn test_apply_guardrails_partial_update() {
546        let mut limits = QueryLimits::default();
547        let original_timeout = limits.timeout_ms;
548
549        let req = GuardRailsConfigRequest {
550            max_depth: Some(20),
551            max_cardinality: None,
552            memory_limit_bytes: None,
553            timeout_ms: None,
554            rate_limit_qps: Some(500),
555            circuit_failure_threshold: None,
556            circuit_recovery_seconds: None,
557        };
558
559        apply_guardrails_update(&mut limits, &req);
560
561        assert_eq!(limits.max_depth, 20);
562        assert_eq!(limits.rate_limit_qps, 500);
563        // Unchanged fields remain at defaults
564        assert_eq!(limits.timeout_ms, original_timeout);
565    }
566
567    #[test]
568    fn test_apply_guardrails_full_update() {
569        let mut limits = QueryLimits::default();
570
571        let req = GuardRailsConfigRequest {
572            max_depth: Some(5),
573            max_cardinality: Some(50_000),
574            memory_limit_bytes: Some(1024 * 1024),
575            timeout_ms: Some(10_000),
576            rate_limit_qps: Some(200),
577            circuit_failure_threshold: Some(3),
578            circuit_recovery_seconds: Some(60),
579        };
580
581        apply_guardrails_update(&mut limits, &req);
582
583        assert_eq!(limits.max_depth, 5);
584        assert_eq!(limits.max_cardinality, 50_000);
585        assert_eq!(limits.memory_limit_bytes, 1024 * 1024);
586        assert_eq!(limits.timeout_ms, 10_000);
587        assert_eq!(limits.rate_limit_qps, 200);
588        assert_eq!(limits.circuit_failure_threshold, 3);
589        assert_eq!(limits.circuit_recovery_seconds, 60);
590    }
591
592    #[test]
593    fn test_guardrails_response_serialization() {
594        let response = GuardRailsConfigResponse {
595            max_depth: 10,
596            max_cardinality: 100_000,
597            memory_limit_bytes: 104_857_600,
598            timeout_ms: 30_000,
599            rate_limit_qps: 100,
600            circuit_failure_threshold: 5,
601            circuit_recovery_seconds: 30,
602        };
603        let json = serde_json::to_string(&response).expect("serialize");
604        assert!(json.contains("\"max_depth\":10"));
605        assert!(json.contains("\"rate_limit_qps\":100"));
606    }
607}