Skip to main content

velesdb_server/handlers/points/
mod.rs

1//! Point operations handlers.
2
3pub mod raw;
4pub mod relations;
5pub mod streaming;
6
7pub use raw::upsert_points_raw;
8pub use relations::{get_point_relations, relate_points, set_point_ttl, unrelate_points};
9pub use streaming::{
10    __path_enable_streaming, __path_stream_insert, __path_stream_upsert_points, enable_streaming,
11    stream_insert, stream_upsert_points,
12};
13
14use axum::{
15    extract::{Path, State},
16    http::StatusCode,
17    response::IntoResponse,
18    Json,
19};
20use std::sync::Arc;
21
22use crate::types::{
23    ErrorResponse, ScrollPoint, ScrollRequest, ScrollResponse, SparseVectorInput,
24    UpsertPointsRequest,
25};
26use crate::AppState;
27use velesdb_core::Point;
28
29use crate::handlers::helpers::{
30    auto_core_error_response, error_response, get_vector_collection_or_404,
31};
32
33use velesdb_core::index::sparse::SparseVector;
34
35/// Converts sparse vector input fields from a request into a `BTreeMap<String, SparseVector>`.
36///
37/// Merges `sparse_vector` (single, stored under `""`) and `sparse_vectors` (named map).
38/// Named map takes precedence if both provide the same key.
39fn convert_sparse_inputs(
40    sparse_vector: Option<SparseVectorInput>,
41    sparse_vectors: Option<std::collections::BTreeMap<String, SparseVectorInput>>,
42) -> Result<Option<std::collections::BTreeMap<String, SparseVector>>, String> {
43    let has_single = sparse_vector.is_some();
44    let has_named = sparse_vectors.as_ref().is_some_and(|m| !m.is_empty());
45
46    if !has_single && !has_named {
47        return Ok(None);
48    }
49
50    let mut result = std::collections::BTreeMap::new();
51
52    // Single sparse vector goes under default name ""
53    if let Some(sv_input) = sparse_vector {
54        let sv = sv_input.into_sparse_vector()?;
55        result.insert(String::new(), sv);
56    }
57
58    // Named sparse vectors (overwrite default if same key).
59    if let Some(named) = sparse_vectors {
60        merge_named_sparse_vectors(named, &mut result)?;
61    }
62
63    Ok(Some(result))
64}
65
66/// Merge a named-sparse-vector map into `result`, converting each input.
67///
68/// If both `sparse_vector` and `sparse_vectors[""]` are supplied, the named map
69/// wins; a debug trace is emitted so operators can spot this (usually
70/// unintentional) pattern.
71fn merge_named_sparse_vectors(
72    named: std::collections::BTreeMap<String, SparseVectorInput>,
73    result: &mut std::collections::BTreeMap<String, SparseVector>,
74) -> Result<(), String> {
75    for (name, sv_input) in named {
76        let sv = sv_input
77            .into_sparse_vector()
78            .map_err(|e| format!("sparse_vectors['{name}']: {e}"))?;
79        if name.is_empty() && result.contains_key("") {
80            tracing::debug!(
81                "sparse_vector (default \"\") is being overwritten by \
82                 sparse_vectors[\"\"] — supply only one to avoid ambiguity"
83            );
84        }
85        result.insert(name, sv);
86    }
87    Ok(())
88}
89
90/// Upsert points to a collection.
91#[utoipa::path(
92    post,
93    path = "/collections/{name}/points",
94    tag = "points",
95    params(
96        ("name" = String, Path, description = "Collection name")
97    ),
98    request_body = UpsertPointsRequest,
99    responses(
100        (status = 200, description = "Points upserted", body = Object),
101        (status = 404, description = "Collection not found", body = ErrorResponse),
102        (status = 400, description = "Invalid request", body = ErrorResponse)
103    )
104)]
105pub async fn upsert_points(
106    State(state): State<Arc<AppState>>,
107    Path(name): Path<String>,
108    Json(req): Json<UpsertPointsRequest>,
109) -> impl IntoResponse {
110    let collection = match get_vector_collection_or_404(&state, &name) {
111        Ok(c) => c,
112        Err(resp) => return resp,
113    };
114
115    let points = match build_points_from_request(req) {
116        Ok(p) => p,
117        Err(e) => {
118            return error_response(StatusCode::BAD_REQUEST, e);
119        }
120    };
121
122    // CRITICAL: upsert_bulk is blocking (HNSW insertion + I/O).
123    // Must use spawn_blocking to avoid blocking the async runtime.
124    let result = tokio::task::spawn_blocking(move || collection.upsert_bulk(&points)).await;
125
126    upsert_result_to_response(&state, &name, result)
127}
128
129/// Convert a `spawn_blocking` bulk-upsert result into an HTTP response.
130///
131/// On success it notifies the observer and returns `{message, count}`; a core
132/// error maps via [`auto_core_error_response`] and a task panic yields a 500.
133/// Shared by [`upsert_points`] and [`raw::upsert_points_raw`].
134pub(super) fn upsert_result_to_response(
135    state: &AppState,
136    name: &str,
137    result: Result<velesdb_core::Result<usize>, tokio::task::JoinError>,
138) -> axum::response::Response {
139    match result {
140        Ok(Ok(inserted)) => {
141            state.db.notify_upsert(name, inserted);
142            Json(serde_json::json!({
143                "message": "Points upserted",
144                "count": inserted
145            }))
146            .into_response()
147        }
148        Ok(Err(e)) => auto_core_error_response(&e),
149        Err(e) => error_response(
150            StatusCode::INTERNAL_SERVER_ERROR,
151            format!("Task panicked: {e}"),
152        ),
153    }
154}
155
156/// Convert an `UpsertPointsRequest` into a `Vec<Point>`, merging sparse inputs.
157fn build_points_from_request(req: UpsertPointsRequest) -> Result<Vec<Point>, String> {
158    let mut points: Vec<Point> = Vec::with_capacity(req.points.len());
159    for p in req.points {
160        let sparse = convert_sparse_inputs(p.sparse_vector, p.sparse_vectors)?;
161        let mut point = Point::new(p.id, p.vector, p.payload);
162        point.sparse_vectors = sparse;
163        points.push(point);
164    }
165    Ok(points)
166}
167
168/// Get a point by ID.
169#[utoipa::path(
170    get,
171    path = "/collections/{name}/points/{id}",
172    tag = "points",
173    params(
174        ("name" = String, Path, description = "Collection name"),
175        ("id" = u64, Path, description = "Point ID")
176    ),
177    responses(
178        (status = 200, description = "Point found", body = Object),
179        (status = 404, description = "Point or collection not found", body = ErrorResponse)
180    )
181)]
182pub async fn get_point(
183    State(state): State<Arc<AppState>>,
184    Path((name, id)): Path<(String, u64)>,
185) -> impl IntoResponse {
186    let collection = match get_vector_collection_or_404(&state, &name) {
187        Ok(c) => c,
188        Err(resp) => return resp,
189    };
190
191    let points = collection.get(&[id]);
192
193    match points.into_iter().next().flatten() {
194        Some(point) => Json(serde_json::json!({
195            "id": point.id,
196            "vector": point.vector,
197            "payload": point.payload
198        }))
199        .into_response(),
200        // PR #586 Devin fix: emit `VELES-003 PointNotFound` via
201        // `auto_core_error_response` so typed-error clients surface
202        // `PointNotFoundError` instead of a generic fallback.
203        None => auto_core_error_response(&velesdb_core::Error::PointNotFound(id)),
204    }
205}
206
207/// Delete a point by ID.
208#[utoipa::path(
209    delete,
210    path = "/collections/{name}/points/{id}",
211    tag = "points",
212    params(
213        ("name" = String, Path, description = "Collection name"),
214        ("id" = u64, Path, description = "Point ID")
215    ),
216    responses(
217        (status = 200, description = "Point deleted", body = Object),
218        (status = 404, description = "Point or collection not found", body = ErrorResponse)
219    )
220)]
221pub async fn delete_point(
222    State(state): State<Arc<AppState>>,
223    Path((name, id)): Path<(String, u64)>,
224) -> impl IntoResponse {
225    let collection = match get_vector_collection_or_404(&state, &name) {
226        Ok(c) => c,
227        Err(resp) => return resp,
228    };
229
230    match collection.delete(&[id]) {
231        Ok(()) => Json(serde_json::json!({
232            "message": "Point deleted",
233            "id": id
234        }))
235        .into_response(),
236        Err(e) => auto_core_error_response(&e),
237    }
238}
239
240/// Maximum allowed batch size for scroll requests.
241const MAX_SCROLL_BATCH_SIZE: u32 = 10_000;
242
243/// Scroll through collection points with cursor-based pagination.
244#[utoipa::path(
245    post,
246    path = "/collections/{name}/points/scroll",
247    tag = "points",
248    params(("name" = String, Path, description = "Collection name")),
249    request_body = ScrollRequest,
250    responses(
251        (status = 200, description = "Scroll batch", body = ScrollResponse),
252        (status = 400, description = "Invalid request", body = ErrorResponse),
253        (status = 404, description = "Collection not found", body = ErrorResponse)
254    )
255)]
256pub async fn scroll_points(
257    State(state): State<Arc<AppState>>,
258    Path(name): Path<String>,
259    Json(req): Json<ScrollRequest>,
260) -> impl IntoResponse {
261    if req.batch_size == 0 || req.batch_size > MAX_SCROLL_BATCH_SIZE {
262        return error_response(
263            StatusCode::BAD_REQUEST,
264            "batch_size must be between 1 and 10000".to_string(),
265        );
266    }
267
268    let collection = match get_vector_collection_or_404(&state, &name) {
269        Ok(c) => c,
270        Err(resp) => return resp,
271    };
272
273    let filter = match parse_scroll_filter(&req.filter) {
274        Ok(f) => f,
275        Err(resp) => return resp,
276    };
277
278    let batch_size = req.batch_size as usize;
279    let cursor = req.cursor;
280
281    // scroll_batch is blocking (reads from storage).
282    let result = tokio::task::spawn_blocking(move || {
283        collection.scroll_batch(cursor, batch_size, filter.as_ref())
284    })
285    .await;
286
287    match result {
288        Ok(Ok(batch)) => build_scroll_response(batch),
289        Ok(Err(e)) => auto_core_error_response(&e),
290        Err(e) => error_response(
291            StatusCode::INTERNAL_SERVER_ERROR,
292            format!("Task panicked: {e}"),
293        ),
294    }
295}
296
297/// Parse the optional filter JSON into a core `Filter`.
298#[allow(clippy::result_large_err)]
299fn parse_scroll_filter(
300    filter_json: &Option<serde_json::Value>,
301) -> Result<Option<velesdb_core::Filter>, axum::response::Response> {
302    let Some(ref json) = filter_json else {
303        return Ok(None);
304    };
305    serde_json::from_value::<velesdb_core::Filter>(json.clone())
306        .map(Some)
307        .map_err(|e| error_response(StatusCode::BAD_REQUEST, format!("Invalid filter: {e}")))
308}
309
310/// Convert a core `ScrollBatch` into an HTTP JSON response.
311fn build_scroll_response(batch: velesdb_core::ScrollBatch) -> axum::response::Response {
312    let points: Vec<ScrollPoint> = batch
313        .points
314        .into_iter()
315        .map(|p| ScrollPoint {
316            id: p.id,
317            vector: p.vector,
318            payload: p.payload,
319        })
320        .collect();
321    Json(ScrollResponse {
322        next_cursor: batch.next_cursor,
323        points,
324    })
325    .into_response()
326}
327
328/// Maximum number of IDs in a single bulk delete request.
329const MAX_BULK_DELETE_SIZE: usize = 10_000;
330
331/// Request body for bulk point deletion.
332#[derive(serde::Deserialize, utoipa::ToSchema)]
333pub struct BulkDeleteRequest {
334    /// List of point IDs to delete.
335    pub ids: Vec<u64>,
336}
337
338/// Deletes multiple points by ID in a single request.
339///
340/// Accepts a JSON body with a list of point IDs. All IDs are passed to
341/// the underlying `Collection::delete(&[u64])` in one call, which is
342/// more efficient than individual deletions.
343///
344/// Returns the number of points that were requested for deletion.
345/// Points that do not exist are silently skipped (idempotent delete).
346///
347/// # Empty payload semantics
348///
349/// `{ "ids": [] }` is treated as a successful no-op: the response is
350/// `200 OK` with `deleted_count = 0`. This matches Kubernetes-style
351/// idempotent batch APIs and lets callers send empty batches without
352/// special-casing on the client side.
353///
354/// # Limits
355///
356/// Batches larger than `MAX_BULK_DELETE_SIZE` (10000) are rejected with
357/// `400 BAD_REQUEST`.
358#[utoipa::path(
359    post,
360    path = "/collections/{name}/points/delete",
361    tag = "points",
362    params(
363        ("name" = String, Path, description = "Collection name")
364    ),
365    request_body = BulkDeleteRequest,
366    responses(
367        (status = 200, description = "Points deleted", body = Object),
368        (status = 400, description = "Batch too large", body = ErrorResponse),
369        (status = 404, description = "Collection not found", body = ErrorResponse),
370        (status = 500, description = "Delete failed", body = ErrorResponse)
371    )
372)]
373pub async fn bulk_delete_points(
374    State(state): State<Arc<AppState>>,
375    Path(name): Path<String>,
376    Json(req): Json<BulkDeleteRequest>,
377) -> impl IntoResponse {
378    if req.ids.is_empty() {
379        return Json(serde_json::json!({
380            "message": "No points to delete",
381            "collection": name,
382            "deleted_count": 0
383        }))
384        .into_response();
385    }
386
387    if req.ids.len() > MAX_BULK_DELETE_SIZE {
388        return error_response(
389            StatusCode::BAD_REQUEST,
390            format!(
391                "Batch too large: {} IDs (max {MAX_BULK_DELETE_SIZE})",
392                req.ids.len()
393            ),
394        );
395    }
396
397    let collection = match get_vector_collection_or_404(&state, &name) {
398        Ok(c) => c,
399        Err(resp) => return resp,
400    };
401
402    let ids = req.ids;
403    let count = ids.len();
404    let coll_name = name.clone();
405
406    let result = tokio::task::spawn_blocking(move || collection.delete(&ids)).await;
407    match result {
408        Ok(Ok(())) => Json(serde_json::json!({
409            "message": "Points deleted",
410            "collection": coll_name,
411            "deleted_count": count
412        }))
413        .into_response(),
414        Ok(Err(e)) => auto_core_error_response(&e),
415        Err(join_err) => error_response(
416            StatusCode::INTERNAL_SERVER_ERROR,
417            format!("bulk_delete task panicked: {join_err}"),
418        ),
419    }
420}