Skip to main content

velesdb_server/handlers/points/
mod.rs

1//! Point operations handlers.
2
3pub mod streaming;
4
5pub use streaming::{
6    __path_stream_insert, __path_stream_upsert_points, stream_insert, stream_upsert_points,
7};
8
9use axum::{
10    extract::{Path, State},
11    http::StatusCode,
12    response::IntoResponse,
13    Json,
14};
15use std::sync::Arc;
16
17use crate::types::{
18    ErrorResponse, ScrollPoint, ScrollRequest, ScrollResponse, SparseVectorInput,
19    UpsertPointsRequest,
20};
21use crate::AppState;
22use velesdb_core::Point;
23
24use crate::handlers::helpers::{
25    auto_core_error_response, error_response, get_vector_collection_or_404,
26};
27
28use velesdb_core::index::sparse::SparseVector;
29
30/// Converts sparse vector input fields from a request into a `BTreeMap<String, SparseVector>`.
31///
32/// Merges `sparse_vector` (single, stored under `""`) and `sparse_vectors` (named map).
33/// Named map takes precedence if both provide the same key.
34fn convert_sparse_inputs(
35    sparse_vector: Option<SparseVectorInput>,
36    sparse_vectors: Option<std::collections::BTreeMap<String, SparseVectorInput>>,
37) -> Result<Option<std::collections::BTreeMap<String, SparseVector>>, String> {
38    let has_single = sparse_vector.is_some();
39    let has_named = sparse_vectors.as_ref().is_some_and(|m| !m.is_empty());
40
41    if !has_single && !has_named {
42        return Ok(None);
43    }
44
45    let mut result = std::collections::BTreeMap::new();
46
47    // Single sparse vector goes under default name ""
48    if let Some(sv_input) = sparse_vector {
49        let sv = sv_input.into_sparse_vector()?;
50        result.insert(String::new(), sv);
51    }
52
53    // Named sparse vectors (overwrite default if same key).
54    // If both `sparse_vector` and `sparse_vectors[""]` are supplied, the named map wins.
55    // A debug trace is emitted so operators can detect this (usually unintentional) pattern.
56    if let Some(named) = sparse_vectors {
57        for (name, sv_input) in named {
58            let sv = sv_input
59                .into_sparse_vector()
60                .map_err(|e| format!("sparse_vectors['{name}']: {e}"))?;
61            if name.is_empty() && result.contains_key("") {
62                tracing::debug!(
63                    "sparse_vector (default \"\") is being overwritten by \
64                     sparse_vectors[\"\"] — supply only one to avoid ambiguity"
65                );
66            }
67            result.insert(name, sv);
68        }
69    }
70
71    Ok(Some(result))
72}
73
74/// Upsert points to a collection.
75#[utoipa::path(
76    post,
77    path = "/collections/{name}/points",
78    tag = "points",
79    params(
80        ("name" = String, Path, description = "Collection name")
81    ),
82    request_body = UpsertPointsRequest,
83    responses(
84        (status = 200, description = "Points upserted", body = Object),
85        (status = 404, description = "Collection not found", body = ErrorResponse),
86        (status = 400, description = "Invalid request", body = ErrorResponse)
87    )
88)]
89pub async fn upsert_points(
90    State(state): State<Arc<AppState>>,
91    Path(name): Path<String>,
92    Json(req): Json<UpsertPointsRequest>,
93) -> impl IntoResponse {
94    let collection = match get_vector_collection_or_404(&state, &name) {
95        Ok(c) => c,
96        Err(resp) => return resp,
97    };
98
99    let points = match build_points_from_request(req) {
100        Ok(p) => p,
101        Err(e) => {
102            return error_response(StatusCode::BAD_REQUEST, e);
103        }
104    };
105
106    // CRITICAL: upsert_bulk is blocking (HNSW insertion + I/O).
107    // Must use spawn_blocking to avoid blocking the async runtime.
108    let result = tokio::task::spawn_blocking(move || collection.upsert_bulk(&points)).await;
109
110    match result {
111        Ok(Ok(inserted)) => {
112            state.db.notify_upsert(&name, inserted);
113            Json(serde_json::json!({
114                "message": "Points upserted",
115                "count": inserted
116            }))
117            .into_response()
118        }
119        Ok(Err(e)) => auto_core_error_response(&e),
120        Err(e) => error_response(
121            StatusCode::INTERNAL_SERVER_ERROR,
122            format!("Task panicked: {e}"),
123        ),
124    }
125}
126
127/// Convert an `UpsertPointsRequest` into a `Vec<Point>`, merging sparse inputs.
128fn build_points_from_request(req: UpsertPointsRequest) -> Result<Vec<Point>, String> {
129    let mut points: Vec<Point> = Vec::with_capacity(req.points.len());
130    for p in req.points {
131        let sparse = convert_sparse_inputs(p.sparse_vector, p.sparse_vectors)?;
132        let mut point = Point::new(p.id, p.vector, p.payload);
133        point.sparse_vectors = sparse;
134        points.push(point);
135    }
136    Ok(points)
137}
138
139/// Get a point by ID.
140#[utoipa::path(
141    get,
142    path = "/collections/{name}/points/{id}",
143    tag = "points",
144    params(
145        ("name" = String, Path, description = "Collection name"),
146        ("id" = u64, Path, description = "Point ID")
147    ),
148    responses(
149        (status = 200, description = "Point found", body = Object),
150        (status = 404, description = "Point or collection not found", body = ErrorResponse)
151    )
152)]
153pub async fn get_point(
154    State(state): State<Arc<AppState>>,
155    Path((name, id)): Path<(String, u64)>,
156) -> impl IntoResponse {
157    let collection = match get_vector_collection_or_404(&state, &name) {
158        Ok(c) => c,
159        Err(resp) => return resp,
160    };
161
162    let points = collection.get(&[id]);
163
164    match points.into_iter().next().flatten() {
165        Some(point) => Json(serde_json::json!({
166            "id": point.id,
167            "vector": point.vector,
168            "payload": point.payload
169        }))
170        .into_response(),
171        // PR #586 Devin fix: emit `VELES-003 PointNotFound` via
172        // `auto_core_error_response` so typed-error clients surface
173        // `PointNotFoundError` instead of a generic fallback.
174        None => auto_core_error_response(&velesdb_core::Error::PointNotFound(id)),
175    }
176}
177
178/// Delete a point by ID.
179#[utoipa::path(
180    delete,
181    path = "/collections/{name}/points/{id}",
182    tag = "points",
183    params(
184        ("name" = String, Path, description = "Collection name"),
185        ("id" = u64, Path, description = "Point ID")
186    ),
187    responses(
188        (status = 200, description = "Point deleted", body = Object),
189        (status = 404, description = "Point or collection not found", body = ErrorResponse)
190    )
191)]
192pub async fn delete_point(
193    State(state): State<Arc<AppState>>,
194    Path((name, id)): Path<(String, u64)>,
195) -> impl IntoResponse {
196    let collection = match get_vector_collection_or_404(&state, &name) {
197        Ok(c) => c,
198        Err(resp) => return resp,
199    };
200
201    match collection.delete(&[id]) {
202        Ok(()) => Json(serde_json::json!({
203            "message": "Point deleted",
204            "id": id
205        }))
206        .into_response(),
207        Err(e) => auto_core_error_response(&e),
208    }
209}
210
211/// Maximum allowed batch size for scroll requests.
212const MAX_SCROLL_BATCH_SIZE: u32 = 10_000;
213
214/// Scroll through collection points with cursor-based pagination.
215#[utoipa::path(
216    post,
217    path = "/collections/{name}/points/scroll",
218    tag = "points",
219    params(("name" = String, Path, description = "Collection name")),
220    request_body = ScrollRequest,
221    responses(
222        (status = 200, description = "Scroll batch", body = ScrollResponse),
223        (status = 400, description = "Invalid request", body = ErrorResponse),
224        (status = 404, description = "Collection not found", body = ErrorResponse)
225    )
226)]
227pub async fn scroll_points(
228    State(state): State<Arc<AppState>>,
229    Path(name): Path<String>,
230    Json(req): Json<ScrollRequest>,
231) -> impl IntoResponse {
232    if req.batch_size == 0 || req.batch_size > MAX_SCROLL_BATCH_SIZE {
233        return error_response(
234            StatusCode::BAD_REQUEST,
235            "batch_size must be between 1 and 10000".to_string(),
236        );
237    }
238
239    let collection = match get_vector_collection_or_404(&state, &name) {
240        Ok(c) => c,
241        Err(resp) => return resp,
242    };
243
244    let filter = match parse_scroll_filter(&req.filter) {
245        Ok(f) => f,
246        Err(resp) => return resp,
247    };
248
249    let batch_size = req.batch_size as usize;
250    let cursor = req.cursor;
251
252    // scroll_batch is blocking (reads from storage).
253    let result = tokio::task::spawn_blocking(move || {
254        collection.scroll_batch(cursor, batch_size, filter.as_ref())
255    })
256    .await;
257
258    match result {
259        Ok(Ok(batch)) => build_scroll_response(batch),
260        Ok(Err(e)) => auto_core_error_response(&e),
261        Err(e) => error_response(
262            StatusCode::INTERNAL_SERVER_ERROR,
263            format!("Task panicked: {e}"),
264        ),
265    }
266}
267
268/// Parse the optional filter JSON into a core `Filter`.
269#[allow(clippy::result_large_err)]
270fn parse_scroll_filter(
271    filter_json: &Option<serde_json::Value>,
272) -> Result<Option<velesdb_core::Filter>, axum::response::Response> {
273    let Some(ref json) = filter_json else {
274        return Ok(None);
275    };
276    serde_json::from_value::<velesdb_core::Filter>(json.clone())
277        .map(Some)
278        .map_err(|e| error_response(StatusCode::BAD_REQUEST, format!("Invalid filter: {e}")))
279}
280
281/// Convert a core `ScrollBatch` into an HTTP JSON response.
282fn build_scroll_response(batch: velesdb_core::ScrollBatch) -> axum::response::Response {
283    let points: Vec<ScrollPoint> = batch
284        .points
285        .into_iter()
286        .map(|p| ScrollPoint {
287            id: p.id,
288            vector: p.vector,
289            payload: p.payload,
290        })
291        .collect();
292    Json(ScrollResponse {
293        next_cursor: batch.next_cursor,
294        points,
295    })
296    .into_response()
297}
298
299/// Maximum number of IDs in a single bulk delete request.
300const MAX_BULK_DELETE_SIZE: usize = 10_000;
301
302/// Request body for bulk point deletion.
303#[derive(serde::Deserialize, utoipa::ToSchema)]
304pub struct BulkDeleteRequest {
305    /// List of point IDs to delete.
306    pub ids: Vec<u64>,
307}
308
309/// Deletes multiple points by ID in a single request.
310///
311/// Accepts a JSON body with a list of point IDs. All IDs are passed to
312/// the underlying `Collection::delete(&[u64])` in one call, which is
313/// more efficient than individual deletions.
314///
315/// Returns the number of points that were requested for deletion.
316/// Points that do not exist are silently skipped (idempotent delete).
317///
318/// # Empty payload semantics
319///
320/// `{ "ids": [] }` is treated as a successful no-op: the response is
321/// `200 OK` with `deleted_count = 0`. This matches Kubernetes-style
322/// idempotent batch APIs and lets callers send empty batches without
323/// special-casing on the client side.
324///
325/// # Limits
326///
327/// Batches larger than `MAX_BULK_DELETE_SIZE` (10000) are rejected with
328/// `400 BAD_REQUEST`.
329#[utoipa::path(
330    post,
331    path = "/collections/{name}/points/delete",
332    tag = "points",
333    params(
334        ("name" = String, Path, description = "Collection name")
335    ),
336    request_body = BulkDeleteRequest,
337    responses(
338        (status = 200, description = "Points deleted", body = Object),
339        (status = 400, description = "Batch too large", body = ErrorResponse),
340        (status = 404, description = "Collection not found", body = ErrorResponse),
341        (status = 500, description = "Delete failed", body = ErrorResponse)
342    )
343)]
344pub async fn bulk_delete_points(
345    State(state): State<Arc<AppState>>,
346    Path(name): Path<String>,
347    Json(req): Json<BulkDeleteRequest>,
348) -> impl IntoResponse {
349    if req.ids.is_empty() {
350        return Json(serde_json::json!({
351            "message": "No points to delete",
352            "collection": name,
353            "deleted_count": 0
354        }))
355        .into_response();
356    }
357
358    if req.ids.len() > MAX_BULK_DELETE_SIZE {
359        return error_response(
360            StatusCode::BAD_REQUEST,
361            format!(
362                "Batch too large: {} IDs (max {MAX_BULK_DELETE_SIZE})",
363                req.ids.len()
364            ),
365        );
366    }
367
368    let collection = match get_vector_collection_or_404(&state, &name) {
369        Ok(c) => c,
370        Err(resp) => return resp,
371    };
372
373    let ids = req.ids;
374    let count = ids.len();
375    let coll_name = name.clone();
376
377    let result = tokio::task::spawn_blocking(move || collection.delete(&ids)).await;
378    match result {
379        Ok(Ok(())) => Json(serde_json::json!({
380            "message": "Points deleted",
381            "collection": coll_name,
382            "deleted_count": count
383        }))
384        .into_response(),
385        Ok(Err(e)) => auto_core_error_response(&e),
386        Err(join_err) => error_response(
387            StatusCode::INTERNAL_SERVER_ERROR,
388            format!("bulk_delete task panicked: {join_err}"),
389        ),
390    }
391}