1use axum::{
4 extract::{Path, State},
5 http::StatusCode,
6 response::IntoResponse,
7 Json,
8};
9use std::sync::Arc;
10
11use crate::types::{
12 CollectionConfigResponse, CollectionDiagnosticsResponse, CollectionStatsResponse,
13 ColumnStatsResponse, ErrorResponse, GuardRailsConfigRequest, GuardRailsConfigResponse,
14 IndexStatsResponse,
15};
16use crate::AppState;
17
18use super::helpers::{
19 auto_core_error_response, error_response, get_collection_or_404, get_vector_collection_or_404,
20};
21
22#[utoipa::path(
24 get,
25 path = "/collections/{name}/config",
26 tag = "collections",
27 params(
28 ("name" = String, Path, description = "Collection name")
29 ),
30 responses(
31 (status = 200, description = "Collection configuration", body = CollectionConfigResponse),
32 (status = 404, description = "Collection not found", body = ErrorResponse)
33 )
34)]
35pub async fn get_collection_config(
36 State(state): State<Arc<AppState>>,
37 Path(name): Path<String>,
38) -> impl IntoResponse {
39 let collection = match get_collection_or_404(&state, &name) {
40 Ok(c) => c,
41 Err(resp) => return resp,
42 };
43
44 let config = collection.config();
45 let graph_schema = config
46 .graph_schema
47 .as_ref()
48 .and_then(|gs| serde_json::to_value(gs).ok());
49 let hnsw_params = config
50 .hnsw_params
51 .as_ref()
52 .and_then(|p| serde_json::to_value(p).ok());
53
54 #[cfg(feature = "persistence")]
57 let deferred_indexing = config
58 .deferred_indexing
59 .as_ref()
60 .and_then(|d| serde_json::to_value(d).ok());
61 #[cfg(not(feature = "persistence"))]
62 let deferred_indexing = None;
63
64 let async_index_builder = config
65 .async_index_builder
66 .as_ref()
67 .and_then(|a| serde_json::to_value(a).ok());
68
69 Json(CollectionConfigResponse {
70 name: config.name,
71 dimension: config.dimension,
72 metric: format!("{:?}", config.metric).to_lowercase(),
73 storage_mode: format!("{:?}", config.storage_mode).to_lowercase(),
74 point_count: config.point_count,
75 metadata_only: config.metadata_only,
76 graph_schema,
77 embedding_dimension: config.embedding_dimension,
78 schema_version: config.schema_version,
79 pq_rescore_oversampling: config.pq_rescore_oversampling,
80 hnsw_params,
81 deferred_indexing,
82 async_index_builder,
83 })
84 .into_response()
85}
86
87#[utoipa::path(
95 post,
96 path = "/collections/{name}/index/rebuild",
97 tag = "collections",
98 params(
99 ("name" = String, Path, description = "Collection name")
100 ),
101 responses(
102 (status = 200, description = "Index rebuilt", body = Object),
103 (status = 404, description = "Collection not found", body = ErrorResponse),
104 (status = 500, description = "Rebuild failed", body = ErrorResponse)
105 )
106)]
107pub async fn rebuild_index(
108 State(state): State<Arc<AppState>>,
109 Path(name): Path<String>,
110) -> impl IntoResponse {
111 let collection = match get_vector_collection_or_404(&state, &name) {
112 Ok(c) => c,
113 Err(resp) => return resp,
114 };
115
116 let result = tokio::task::spawn_blocking(move || collection.rebuild_index()).await;
117 match result {
118 Ok(Ok(compacted)) => (
119 StatusCode::OK,
120 Json(serde_json::json!({
121 "message": "Index rebuilt",
122 "collection": name,
123 "compacted_entries": compacted
124 })),
125 )
126 .into_response(),
127 Ok(Err(e)) => auto_core_error_response(&e),
128 Err(join_err) => error_response(
129 StatusCode::INTERNAL_SERVER_ERROR,
130 format!("rebuild_index task panicked: {join_err}"),
131 ),
132 }
133}
134
135#[utoipa::path(
144 post,
145 path = "/collections/{name}/vacuum",
146 tag = "collections",
147 params(
148 ("name" = String, Path, description = "Collection name")
149 ),
150 responses(
151 (status = 200, description = "Index vacuumed", body = Object),
152 (status = 404, description = "Collection not found", body = ErrorResponse),
153 (status = 500, description = "Vacuum failed", body = ErrorResponse)
154 )
155)]
156pub async fn vacuum_collection(
157 State(state): State<Arc<AppState>>,
158 Path(name): Path<String>,
159) -> impl IntoResponse {
160 let collection = match get_vector_collection_or_404(&state, &name) {
161 Ok(c) => c,
162 Err(resp) => return resp,
163 };
164
165 let result = tokio::task::spawn_blocking(move || collection.rebuild_index()).await;
166 match result {
167 Ok(Ok(compacted)) => (
168 StatusCode::OK,
169 Json(serde_json::json!({
170 "message": "Index vacuumed",
171 "collection": name,
172 "compacted_entries": compacted
173 })),
174 )
175 .into_response(),
176 Ok(Err(e)) => auto_core_error_response(&e),
177 Err(join_err) => error_response(
178 StatusCode::INTERNAL_SERVER_ERROR,
179 format!("vacuum task panicked: {join_err}"),
180 ),
181 }
182}
183
184#[utoipa::path(
190 post,
191 path = "/collections/{name}/compact",
192 tag = "collections",
193 params(
194 ("name" = String, Path, description = "Collection name")
195 ),
196 responses(
197 (status = 200, description = "Storage compacted", body = Object),
198 (status = 404, description = "Collection not found", body = ErrorResponse),
199 (status = 500, description = "Compaction failed", body = ErrorResponse)
200 )
201)]
202pub async fn compact_collection(
203 State(state): State<Arc<AppState>>,
204 Path(name): Path<String>,
205) -> impl IntoResponse {
206 let collection = match get_vector_collection_or_404(&state, &name) {
207 Ok(c) => c,
208 Err(resp) => return resp,
209 };
210
211 let result = tokio::task::spawn_blocking(move || collection.compact_storage()).await;
212 match result {
213 Ok(Ok(bytes_reclaimed)) => (
214 StatusCode::OK,
215 Json(serde_json::json!({
216 "message": "Storage compacted",
217 "collection": name,
218 "bytes_reclaimed": bytes_reclaimed
219 })),
220 )
221 .into_response(),
222 Ok(Err(e)) => auto_core_error_response(&e),
223 Err(join_err) => error_response(
224 StatusCode::INTERNAL_SERVER_ERROR,
225 format!("compact task panicked: {join_err}"),
226 ),
227 }
228}
229
230#[utoipa::path(
238 post,
239 path = "/collections/{name}/locality/reorder",
240 tag = "collections",
241 params(
242 ("name" = String, Path, description = "Collection name")
243 ),
244 responses(
245 (status = 200, description = "Locality reordered", body = Object),
246 (status = 404, description = "Collection not found", body = ErrorResponse),
247 (status = 500, description = "Reorder failed", body = ErrorResponse)
248 )
249)]
250pub async fn reorder_for_locality(
251 State(state): State<Arc<AppState>>,
252 Path(name): Path<String>,
253) -> impl IntoResponse {
254 let collection = match get_vector_collection_or_404(&state, &name) {
255 Ok(c) => c,
256 Err(resp) => return resp,
257 };
258
259 let result = tokio::task::spawn_blocking(move || collection.reorder_for_locality()).await;
260 match result {
261 Ok(Ok(())) => (
262 StatusCode::OK,
263 Json(serde_json::json!({
264 "message": "Locality reordered",
265 "collection": name
266 })),
267 )
268 .into_response(),
269 Ok(Err(e)) => auto_core_error_response(&e),
270 Err(join_err) => error_response(
271 StatusCode::INTERNAL_SERVER_ERROR,
272 format!("reorder task panicked: {join_err}"),
273 ),
274 }
275}
276
277#[utoipa::path(
279 post,
280 path = "/collections/{name}/analyze",
281 tag = "collections",
282 params(
283 ("name" = String, Path, description = "Collection name")
284 ),
285 responses(
286 (status = 200, description = "Collection analyzed", body = CollectionStatsResponse),
287 (status = 404, description = "Collection not found", body = ErrorResponse),
288 (status = 500, description = "Analysis failed", body = ErrorResponse)
289 )
290)]
291pub async fn analyze_collection(
292 State(state): State<Arc<AppState>>,
293 Path(name): Path<String>,
294) -> impl IntoResponse {
295 let coll_name = name.clone();
296 let state_clone = state.clone();
297 let result =
298 tokio::task::spawn_blocking(move || state_clone.db.analyze_collection(&coll_name)).await;
299 match result {
300 Ok(Ok(stats)) => {
301 let response = map_stats_to_response(&stats);
302 (StatusCode::OK, Json(response)).into_response()
303 }
304 Ok(Err(e)) => auto_core_error_response(&e),
305 Err(join_err) => error_response(
306 StatusCode::INTERNAL_SERVER_ERROR,
307 format!("analyze_collection task panicked: {join_err}"),
308 ),
309 }
310}
311
312#[utoipa::path(
314 get,
315 path = "/collections/{name}/stats",
316 tag = "collections",
317 params(
318 ("name" = String, Path, description = "Collection name")
319 ),
320 responses(
321 (status = 200, description = "Collection statistics", body = CollectionStatsResponse),
322 (status = 404, description = "No statistics available", body = ErrorResponse),
323 (status = 500, description = "Failed to read stats", body = ErrorResponse)
324 )
325)]
326pub async fn get_collection_stats(
327 State(state): State<Arc<AppState>>,
328 Path(name): Path<String>,
329) -> impl IntoResponse {
330 match state.db.get_collection_stats(&name) {
331 Ok(Some(stats)) => {
332 let response = map_stats_to_response(&stats);
333 (StatusCode::OK, Json(response)).into_response()
334 }
335 Ok(None) => error_response(
336 StatusCode::NOT_FOUND,
337 format!("No stats for '{name}'. Run POST /collections/{name}/analyze first."),
338 ),
339 Err(e) => auto_core_error_response(&e),
340 }
341}
342
343#[utoipa::path(
345 get,
346 path = "/collections/{name}/diagnostics",
347 tag = "collections",
348 params(
349 ("name" = String, Path, description = "Collection name")
350 ),
351 responses(
352 (status = 200, description = "Collection diagnostics", body = CollectionDiagnosticsResponse),
353 (status = 404, description = "Collection not found", body = ErrorResponse)
354 )
355)]
356pub async fn collection_diagnostics(
357 State(state): State<Arc<AppState>>,
358 Path(name): Path<String>,
359) -> impl IntoResponse {
360 match state.db.collection_diagnostics(&name) {
361 Ok(diag) => (StatusCode::OK, Json(diagnostics_to_response(&diag))).into_response(),
362 Err(e) => auto_core_error_response(&e),
363 }
364}
365
366fn diagnostics_to_response(
368 diag: &velesdb_core::collection::CollectionDiagnostics,
369) -> CollectionDiagnosticsResponse {
370 use velesdb_core::collection::IndexHealth;
371 let (index_health, index_health_detail) = match &diag.index_health {
372 IndexHealth::Healthy => ("healthy".to_string(), None),
373 IndexHealth::Empty => ("empty".to_string(), None),
374 IndexHealth::NeedsRebuild(reason) => ("needs_rebuild".to_string(), Some(reason.clone())),
375 _ => ("unknown".to_string(), None),
376 };
377 CollectionDiagnosticsResponse {
378 has_vectors: diag.has_vectors,
379 search_ready: diag.search_ready,
380 dimension_configured: diag.dimension_configured,
381 point_count: diag.point_count,
382 index_health,
383 index_health_detail,
384 }
385}
386
387#[utoipa::path(
389 get,
390 path = "/guardrails",
391 tag = "guardrails",
392 responses(
393 (status = 200, description = "Current guard-rails config", body = GuardRailsConfigResponse)
394 )
395)]
396pub async fn get_guardrails(State(state): State<Arc<AppState>>) -> impl IntoResponse {
397 let limits = state.query_limits.read();
398 Json(limits_to_response(&limits))
399}
400
401#[utoipa::path(
403 put,
404 path = "/guardrails",
405 tag = "guardrails",
406 request_body = GuardRailsConfigRequest,
407 responses(
408 (status = 200, description = "Updated guard-rails config", body = GuardRailsConfigResponse)
409 )
410)]
411pub async fn update_guardrails(
412 State(state): State<Arc<AppState>>,
413 Json(req): Json<GuardRailsConfigRequest>,
414) -> impl IntoResponse {
415 let mut limits = state.query_limits.write();
416 apply_guardrails_update(&mut limits, &req);
417
418 state.db.update_guardrails(&limits);
421
422 Json(limits_to_response(&limits))
423}
424
425fn limits_to_response(limits: &velesdb_core::guardrails::QueryLimits) -> GuardRailsConfigResponse {
427 GuardRailsConfigResponse {
428 max_depth: limits.max_depth,
429 max_cardinality: limits.max_cardinality,
430 memory_limit_bytes: limits.memory_limit_bytes,
431 timeout_ms: limits.timeout_ms,
432 rate_limit_qps: limits.rate_limit_qps,
433 circuit_failure_threshold: limits.circuit_failure_threshold,
434 circuit_recovery_seconds: limits.circuit_recovery_seconds,
435 }
436}
437
438fn map_stats_to_response(
440 stats: &velesdb_core::collection::stats::CollectionStats,
441) -> CollectionStatsResponse {
442 let column_stats = stats
443 .column_stats
444 .iter()
445 .map(|(k, v)| {
446 (
447 k.clone(),
448 ColumnStatsResponse {
449 name: v.name.clone(),
450 null_count: v.null_count,
451 distinct_count: v.distinct_count,
452 min_value: v.min_value.clone(),
453 max_value: v.max_value.clone(),
454 avg_size_bytes: v.avg_size_bytes,
455 histogram_buckets: v.histogram.as_ref().map(|h| h.buckets.len()),
456 histogram_stale: v.histogram.as_ref().map(|h| h.stale),
457 },
458 )
459 })
460 .collect();
461
462 let index_stats = stats
463 .index_stats
464 .iter()
465 .map(|(k, v)| {
466 (
467 k.clone(),
468 IndexStatsResponse {
469 name: v.name.clone(),
470 index_type: v.index_type.clone(),
471 entry_count: v.entry_count,
472 depth: v.depth,
473 size_bytes: v.size_bytes,
474 },
475 )
476 })
477 .collect();
478
479 CollectionStatsResponse {
480 total_points: stats.total_points,
481 total_size_bytes: stats.total_size_bytes,
482 row_count: stats.row_count,
483 deleted_count: stats.deleted_count,
484 avg_row_size_bytes: stats.avg_row_size_bytes,
485 payload_size_bytes: stats.payload_size_bytes,
486 last_analyzed_epoch_ms: stats.last_analyzed_epoch_ms,
487 column_stats,
488 index_stats,
489 }
490}
491
492fn apply_guardrails_update(
494 limits: &mut velesdb_core::guardrails::QueryLimits,
495 req: &GuardRailsConfigRequest,
496) {
497 if let Some(v) = req.max_depth {
498 limits.max_depth = v;
499 }
500 if let Some(v) = req.max_cardinality {
501 limits.max_cardinality = v;
502 }
503 if let Some(v) = req.memory_limit_bytes {
504 limits.memory_limit_bytes = v;
505 }
506 if let Some(v) = req.timeout_ms {
507 limits.timeout_ms = v;
508 }
509 if let Some(v) = req.rate_limit_qps {
510 limits.rate_limit_qps = v;
511 }
512 if let Some(v) = req.circuit_failure_threshold {
513 limits.circuit_failure_threshold = v;
514 }
515 if let Some(v) = req.circuit_recovery_seconds {
516 limits.circuit_recovery_seconds = v;
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523 use velesdb_core::guardrails::QueryLimits;
524
525 #[test]
526 fn test_limits_to_response_roundtrip() {
527 let limits = QueryLimits::default();
528 let response = limits_to_response(&limits);
529 assert_eq!(response.max_depth, limits.max_depth);
530 assert_eq!(response.max_cardinality, limits.max_cardinality);
531 assert_eq!(response.memory_limit_bytes, limits.memory_limit_bytes);
532 assert_eq!(response.timeout_ms, limits.timeout_ms);
533 assert_eq!(response.rate_limit_qps, limits.rate_limit_qps);
534 assert_eq!(
535 response.circuit_failure_threshold,
536 limits.circuit_failure_threshold
537 );
538 assert_eq!(
539 response.circuit_recovery_seconds,
540 limits.circuit_recovery_seconds
541 );
542 }
543
544 #[test]
545 fn test_apply_guardrails_partial_update() {
546 let mut limits = QueryLimits::default();
547 let original_timeout = limits.timeout_ms;
548
549 let req = GuardRailsConfigRequest {
550 max_depth: Some(20),
551 max_cardinality: None,
552 memory_limit_bytes: None,
553 timeout_ms: None,
554 rate_limit_qps: Some(500),
555 circuit_failure_threshold: None,
556 circuit_recovery_seconds: None,
557 };
558
559 apply_guardrails_update(&mut limits, &req);
560
561 assert_eq!(limits.max_depth, 20);
562 assert_eq!(limits.rate_limit_qps, 500);
563 assert_eq!(limits.timeout_ms, original_timeout);
565 }
566
567 #[test]
568 fn test_apply_guardrails_full_update() {
569 let mut limits = QueryLimits::default();
570
571 let req = GuardRailsConfigRequest {
572 max_depth: Some(5),
573 max_cardinality: Some(50_000),
574 memory_limit_bytes: Some(1024 * 1024),
575 timeout_ms: Some(10_000),
576 rate_limit_qps: Some(200),
577 circuit_failure_threshold: Some(3),
578 circuit_recovery_seconds: Some(60),
579 };
580
581 apply_guardrails_update(&mut limits, &req);
582
583 assert_eq!(limits.max_depth, 5);
584 assert_eq!(limits.max_cardinality, 50_000);
585 assert_eq!(limits.memory_limit_bytes, 1024 * 1024);
586 assert_eq!(limits.timeout_ms, 10_000);
587 assert_eq!(limits.rate_limit_qps, 200);
588 assert_eq!(limits.circuit_failure_threshold, 3);
589 assert_eq!(limits.circuit_recovery_seconds, 60);
590 }
591
592 #[test]
593 fn test_guardrails_response_serialization() {
594 let response = GuardRailsConfigResponse {
595 max_depth: 10,
596 max_cardinality: 100_000,
597 memory_limit_bytes: 104_857_600,
598 timeout_ms: 30_000,
599 rate_limit_qps: 100,
600 circuit_failure_threshold: 5,
601 circuit_recovery_seconds: 30,
602 };
603 let json = serde_json::to_string(&response).expect("serialize");
604 assert!(json.contains("\"max_depth\":10"));
605 assert!(json.contains("\"rate_limit_qps\":100"));
606 }
607}