Skip to main content

velesdb_server/handlers/points/
mod.rs

1//! Point operations handlers.
2
3pub mod relations;
4pub mod streaming;
5
6pub use relations::{get_point_relations, relate_points, set_point_ttl, unrelate_points};
7pub use streaming::{
8    __path_stream_insert, __path_stream_upsert_points, stream_insert, stream_upsert_points,
9};
10
11use axum::{
12    extract::{Path, State},
13    http::StatusCode,
14    response::IntoResponse,
15    Json,
16};
17use std::sync::Arc;
18
19use crate::types::{
20    ErrorResponse, ScrollPoint, ScrollRequest, ScrollResponse, SparseVectorInput,
21    UpsertPointsRequest,
22};
23use crate::AppState;
24use velesdb_core::Point;
25
26use crate::handlers::helpers::{
27    auto_core_error_response, error_response, get_vector_collection_or_404,
28};
29
30use velesdb_core::index::sparse::SparseVector;
31
32/// Converts sparse vector input fields from a request into a `BTreeMap<String, SparseVector>`.
33///
34/// Merges `sparse_vector` (single, stored under `""`) and `sparse_vectors` (named map).
35/// Named map takes precedence if both provide the same key.
36fn convert_sparse_inputs(
37    sparse_vector: Option<SparseVectorInput>,
38    sparse_vectors: Option<std::collections::BTreeMap<String, SparseVectorInput>>,
39) -> Result<Option<std::collections::BTreeMap<String, SparseVector>>, String> {
40    let has_single = sparse_vector.is_some();
41    let has_named = sparse_vectors.as_ref().is_some_and(|m| !m.is_empty());
42
43    if !has_single && !has_named {
44        return Ok(None);
45    }
46
47    let mut result = std::collections::BTreeMap::new();
48
49    // Single sparse vector goes under default name ""
50    if let Some(sv_input) = sparse_vector {
51        let sv = sv_input.into_sparse_vector()?;
52        result.insert(String::new(), sv);
53    }
54
55    // Named sparse vectors (overwrite default if same key).
56    if let Some(named) = sparse_vectors {
57        merge_named_sparse_vectors(named, &mut result)?;
58    }
59
60    Ok(Some(result))
61}
62
63/// Merge a named-sparse-vector map into `result`, converting each input.
64///
65/// If both `sparse_vector` and `sparse_vectors[""]` are supplied, the named map
66/// wins; a debug trace is emitted so operators can spot this (usually
67/// unintentional) pattern.
68fn merge_named_sparse_vectors(
69    named: std::collections::BTreeMap<String, SparseVectorInput>,
70    result: &mut std::collections::BTreeMap<String, SparseVector>,
71) -> Result<(), String> {
72    for (name, sv_input) in named {
73        let sv = sv_input
74            .into_sparse_vector()
75            .map_err(|e| format!("sparse_vectors['{name}']: {e}"))?;
76        if name.is_empty() && result.contains_key("") {
77            tracing::debug!(
78                "sparse_vector (default \"\") is being overwritten by \
79                 sparse_vectors[\"\"] — supply only one to avoid ambiguity"
80            );
81        }
82        result.insert(name, sv);
83    }
84    Ok(())
85}
86
87/// Upsert points to a collection.
88#[utoipa::path(
89    post,
90    path = "/collections/{name}/points",
91    tag = "points",
92    params(
93        ("name" = String, Path, description = "Collection name")
94    ),
95    request_body = UpsertPointsRequest,
96    responses(
97        (status = 200, description = "Points upserted", body = Object),
98        (status = 404, description = "Collection not found", body = ErrorResponse),
99        (status = 400, description = "Invalid request", body = ErrorResponse)
100    )
101)]
102pub async fn upsert_points(
103    State(state): State<Arc<AppState>>,
104    Path(name): Path<String>,
105    Json(req): Json<UpsertPointsRequest>,
106) -> impl IntoResponse {
107    let collection = match get_vector_collection_or_404(&state, &name) {
108        Ok(c) => c,
109        Err(resp) => return resp,
110    };
111
112    let points = match build_points_from_request(req) {
113        Ok(p) => p,
114        Err(e) => {
115            return error_response(StatusCode::BAD_REQUEST, e);
116        }
117    };
118
119    // CRITICAL: upsert_bulk is blocking (HNSW insertion + I/O).
120    // Must use spawn_blocking to avoid blocking the async runtime.
121    let result = tokio::task::spawn_blocking(move || collection.upsert_bulk(&points)).await;
122
123    match result {
124        Ok(Ok(inserted)) => {
125            state.db.notify_upsert(&name, inserted);
126            Json(serde_json::json!({
127                "message": "Points upserted",
128                "count": inserted
129            }))
130            .into_response()
131        }
132        Ok(Err(e)) => auto_core_error_response(&e),
133        Err(e) => error_response(
134            StatusCode::INTERNAL_SERVER_ERROR,
135            format!("Task panicked: {e}"),
136        ),
137    }
138}
139
140/// Convert an `UpsertPointsRequest` into a `Vec<Point>`, merging sparse inputs.
141fn build_points_from_request(req: UpsertPointsRequest) -> Result<Vec<Point>, String> {
142    let mut points: Vec<Point> = Vec::with_capacity(req.points.len());
143    for p in req.points {
144        let sparse = convert_sparse_inputs(p.sparse_vector, p.sparse_vectors)?;
145        let mut point = Point::new(p.id, p.vector, p.payload);
146        point.sparse_vectors = sparse;
147        points.push(point);
148    }
149    Ok(points)
150}
151
152/// Get a point by ID.
153#[utoipa::path(
154    get,
155    path = "/collections/{name}/points/{id}",
156    tag = "points",
157    params(
158        ("name" = String, Path, description = "Collection name"),
159        ("id" = u64, Path, description = "Point ID")
160    ),
161    responses(
162        (status = 200, description = "Point found", body = Object),
163        (status = 404, description = "Point or collection not found", body = ErrorResponse)
164    )
165)]
166pub async fn get_point(
167    State(state): State<Arc<AppState>>,
168    Path((name, id)): Path<(String, u64)>,
169) -> impl IntoResponse {
170    let collection = match get_vector_collection_or_404(&state, &name) {
171        Ok(c) => c,
172        Err(resp) => return resp,
173    };
174
175    let points = collection.get(&[id]);
176
177    match points.into_iter().next().flatten() {
178        Some(point) => Json(serde_json::json!({
179            "id": point.id,
180            "vector": point.vector,
181            "payload": point.payload
182        }))
183        .into_response(),
184        // PR #586 Devin fix: emit `VELES-003 PointNotFound` via
185        // `auto_core_error_response` so typed-error clients surface
186        // `PointNotFoundError` instead of a generic fallback.
187        None => auto_core_error_response(&velesdb_core::Error::PointNotFound(id)),
188    }
189}
190
191/// Delete a point by ID.
192#[utoipa::path(
193    delete,
194    path = "/collections/{name}/points/{id}",
195    tag = "points",
196    params(
197        ("name" = String, Path, description = "Collection name"),
198        ("id" = u64, Path, description = "Point ID")
199    ),
200    responses(
201        (status = 200, description = "Point deleted", body = Object),
202        (status = 404, description = "Point or collection not found", body = ErrorResponse)
203    )
204)]
205pub async fn delete_point(
206    State(state): State<Arc<AppState>>,
207    Path((name, id)): Path<(String, u64)>,
208) -> impl IntoResponse {
209    let collection = match get_vector_collection_or_404(&state, &name) {
210        Ok(c) => c,
211        Err(resp) => return resp,
212    };
213
214    match collection.delete(&[id]) {
215        Ok(()) => Json(serde_json::json!({
216            "message": "Point deleted",
217            "id": id
218        }))
219        .into_response(),
220        Err(e) => auto_core_error_response(&e),
221    }
222}
223
224/// Maximum allowed batch size for scroll requests.
225const MAX_SCROLL_BATCH_SIZE: u32 = 10_000;
226
227/// Scroll through collection points with cursor-based pagination.
228#[utoipa::path(
229    post,
230    path = "/collections/{name}/points/scroll",
231    tag = "points",
232    params(("name" = String, Path, description = "Collection name")),
233    request_body = ScrollRequest,
234    responses(
235        (status = 200, description = "Scroll batch", body = ScrollResponse),
236        (status = 400, description = "Invalid request", body = ErrorResponse),
237        (status = 404, description = "Collection not found", body = ErrorResponse)
238    )
239)]
240pub async fn scroll_points(
241    State(state): State<Arc<AppState>>,
242    Path(name): Path<String>,
243    Json(req): Json<ScrollRequest>,
244) -> impl IntoResponse {
245    if req.batch_size == 0 || req.batch_size > MAX_SCROLL_BATCH_SIZE {
246        return error_response(
247            StatusCode::BAD_REQUEST,
248            "batch_size must be between 1 and 10000".to_string(),
249        );
250    }
251
252    let collection = match get_vector_collection_or_404(&state, &name) {
253        Ok(c) => c,
254        Err(resp) => return resp,
255    };
256
257    let filter = match parse_scroll_filter(&req.filter) {
258        Ok(f) => f,
259        Err(resp) => return resp,
260    };
261
262    let batch_size = req.batch_size as usize;
263    let cursor = req.cursor;
264
265    // scroll_batch is blocking (reads from storage).
266    let result = tokio::task::spawn_blocking(move || {
267        collection.scroll_batch(cursor, batch_size, filter.as_ref())
268    })
269    .await;
270
271    match result {
272        Ok(Ok(batch)) => build_scroll_response(batch),
273        Ok(Err(e)) => auto_core_error_response(&e),
274        Err(e) => error_response(
275            StatusCode::INTERNAL_SERVER_ERROR,
276            format!("Task panicked: {e}"),
277        ),
278    }
279}
280
281/// Parse the optional filter JSON into a core `Filter`.
282#[allow(clippy::result_large_err)]
283fn parse_scroll_filter(
284    filter_json: &Option<serde_json::Value>,
285) -> Result<Option<velesdb_core::Filter>, axum::response::Response> {
286    let Some(ref json) = filter_json else {
287        return Ok(None);
288    };
289    serde_json::from_value::<velesdb_core::Filter>(json.clone())
290        .map(Some)
291        .map_err(|e| error_response(StatusCode::BAD_REQUEST, format!("Invalid filter: {e}")))
292}
293
294/// Convert a core `ScrollBatch` into an HTTP JSON response.
295fn build_scroll_response(batch: velesdb_core::ScrollBatch) -> axum::response::Response {
296    let points: Vec<ScrollPoint> = batch
297        .points
298        .into_iter()
299        .map(|p| ScrollPoint {
300            id: p.id,
301            vector: p.vector,
302            payload: p.payload,
303        })
304        .collect();
305    Json(ScrollResponse {
306        next_cursor: batch.next_cursor,
307        points,
308    })
309    .into_response()
310}
311
312/// Maximum number of IDs in a single bulk delete request.
313const MAX_BULK_DELETE_SIZE: usize = 10_000;
314
315/// Request body for bulk point deletion.
316#[derive(serde::Deserialize, utoipa::ToSchema)]
317pub struct BulkDeleteRequest {
318    /// List of point IDs to delete.
319    pub ids: Vec<u64>,
320}
321
322/// Deletes multiple points by ID in a single request.
323///
324/// Accepts a JSON body with a list of point IDs. All IDs are passed to
325/// the underlying `Collection::delete(&[u64])` in one call, which is
326/// more efficient than individual deletions.
327///
328/// Returns the number of points that were requested for deletion.
329/// Points that do not exist are silently skipped (idempotent delete).
330///
331/// # Empty payload semantics
332///
333/// `{ "ids": [] }` is treated as a successful no-op: the response is
334/// `200 OK` with `deleted_count = 0`. This matches Kubernetes-style
335/// idempotent batch APIs and lets callers send empty batches without
336/// special-casing on the client side.
337///
338/// # Limits
339///
340/// Batches larger than `MAX_BULK_DELETE_SIZE` (10000) are rejected with
341/// `400 BAD_REQUEST`.
342#[utoipa::path(
343    post,
344    path = "/collections/{name}/points/delete",
345    tag = "points",
346    params(
347        ("name" = String, Path, description = "Collection name")
348    ),
349    request_body = BulkDeleteRequest,
350    responses(
351        (status = 200, description = "Points deleted", body = Object),
352        (status = 400, description = "Batch too large", body = ErrorResponse),
353        (status = 404, description = "Collection not found", body = ErrorResponse),
354        (status = 500, description = "Delete failed", body = ErrorResponse)
355    )
356)]
357pub async fn bulk_delete_points(
358    State(state): State<Arc<AppState>>,
359    Path(name): Path<String>,
360    Json(req): Json<BulkDeleteRequest>,
361) -> impl IntoResponse {
362    if req.ids.is_empty() {
363        return Json(serde_json::json!({
364            "message": "No points to delete",
365            "collection": name,
366            "deleted_count": 0
367        }))
368        .into_response();
369    }
370
371    if req.ids.len() > MAX_BULK_DELETE_SIZE {
372        return error_response(
373            StatusCode::BAD_REQUEST,
374            format!(
375                "Batch too large: {} IDs (max {MAX_BULK_DELETE_SIZE})",
376                req.ids.len()
377            ),
378        );
379    }
380
381    let collection = match get_vector_collection_or_404(&state, &name) {
382        Ok(c) => c,
383        Err(resp) => return resp,
384    };
385
386    let ids = req.ids;
387    let count = ids.len();
388    let coll_name = name.clone();
389
390    let result = tokio::task::spawn_blocking(move || collection.delete(&ids)).await;
391    match result {
392        Ok(Ok(())) => Json(serde_json::json!({
393            "message": "Points deleted",
394            "collection": coll_name,
395            "deleted_count": count
396        }))
397        .into_response(),
398        Ok(Err(e)) => auto_core_error_response(&e),
399        Err(join_err) => error_response(
400            StatusCode::INTERNAL_SERVER_ERROR,
401            format!("bulk_delete task panicked: {join_err}"),
402        ),
403    }
404}