1pub mod raw;
4pub mod relations;
5pub mod streaming;
6
7pub use raw::upsert_points_raw;
8pub use relations::{get_point_relations, relate_points, set_point_ttl, unrelate_points};
9pub use streaming::{
10 __path_enable_streaming, __path_stream_insert, __path_stream_upsert_points, enable_streaming,
11 stream_insert, stream_upsert_points,
12};
13
14use axum::{
15 extract::{Path, State},
16 http::StatusCode,
17 response::IntoResponse,
18 Json,
19};
20use std::sync::Arc;
21
22use crate::types::{
23 ErrorResponse, ScrollPoint, ScrollRequest, ScrollResponse, SparseVectorInput,
24 UpsertPointsRequest,
25};
26use crate::AppState;
27use velesdb_core::Point;
28
29use crate::handlers::helpers::{
30 auto_core_error_response, error_response, get_vector_collection_or_404,
31};
32
33use velesdb_core::index::sparse::SparseVector;
34
35fn convert_sparse_inputs(
40 sparse_vector: Option<SparseVectorInput>,
41 sparse_vectors: Option<std::collections::BTreeMap<String, SparseVectorInput>>,
42) -> Result<Option<std::collections::BTreeMap<String, SparseVector>>, String> {
43 let has_single = sparse_vector.is_some();
44 let has_named = sparse_vectors.as_ref().is_some_and(|m| !m.is_empty());
45
46 if !has_single && !has_named {
47 return Ok(None);
48 }
49
50 let mut result = std::collections::BTreeMap::new();
51
52 if let Some(sv_input) = sparse_vector {
54 let sv = sv_input.into_sparse_vector()?;
55 result.insert(String::new(), sv);
56 }
57
58 if let Some(named) = sparse_vectors {
60 merge_named_sparse_vectors(named, &mut result)?;
61 }
62
63 Ok(Some(result))
64}
65
66fn merge_named_sparse_vectors(
72 named: std::collections::BTreeMap<String, SparseVectorInput>,
73 result: &mut std::collections::BTreeMap<String, SparseVector>,
74) -> Result<(), String> {
75 for (name, sv_input) in named {
76 let sv = sv_input
77 .into_sparse_vector()
78 .map_err(|e| format!("sparse_vectors['{name}']: {e}"))?;
79 if name.is_empty() && result.contains_key("") {
80 tracing::debug!(
81 "sparse_vector (default \"\") is being overwritten by \
82 sparse_vectors[\"\"] — supply only one to avoid ambiguity"
83 );
84 }
85 result.insert(name, sv);
86 }
87 Ok(())
88}
89
90#[utoipa::path(
92 post,
93 path = "/collections/{name}/points",
94 tag = "points",
95 params(
96 ("name" = String, Path, description = "Collection name")
97 ),
98 request_body = UpsertPointsRequest,
99 responses(
100 (status = 200, description = "Points upserted", body = Object),
101 (status = 404, description = "Collection not found", body = ErrorResponse),
102 (status = 400, description = "Invalid request", body = ErrorResponse)
103 )
104)]
105pub async fn upsert_points(
106 State(state): State<Arc<AppState>>,
107 Path(name): Path<String>,
108 Json(req): Json<UpsertPointsRequest>,
109) -> impl IntoResponse {
110 let collection = match get_vector_collection_or_404(&state, &name) {
111 Ok(c) => c,
112 Err(resp) => return resp,
113 };
114
115 let points = match build_points_from_request(req) {
116 Ok(p) => p,
117 Err(e) => {
118 return error_response(StatusCode::BAD_REQUEST, e);
119 }
120 };
121
122 let result = tokio::task::spawn_blocking(move || collection.upsert_bulk(&points)).await;
125
126 upsert_result_to_response(&state, &name, result)
127}
128
129pub(super) fn upsert_result_to_response(
135 state: &AppState,
136 name: &str,
137 result: Result<velesdb_core::Result<usize>, tokio::task::JoinError>,
138) -> axum::response::Response {
139 match result {
140 Ok(Ok(inserted)) => {
141 state.db.notify_upsert(name, inserted);
142 Json(serde_json::json!({
143 "message": "Points upserted",
144 "count": inserted
145 }))
146 .into_response()
147 }
148 Ok(Err(e)) => auto_core_error_response(&e),
149 Err(e) => error_response(
150 StatusCode::INTERNAL_SERVER_ERROR,
151 format!("Task panicked: {e}"),
152 ),
153 }
154}
155
156fn build_points_from_request(req: UpsertPointsRequest) -> Result<Vec<Point>, String> {
158 let mut points: Vec<Point> = Vec::with_capacity(req.points.len());
159 for p in req.points {
160 let sparse = convert_sparse_inputs(p.sparse_vector, p.sparse_vectors)?;
161 let mut point = Point::new(p.id, p.vector, p.payload);
162 point.sparse_vectors = sparse;
163 points.push(point);
164 }
165 Ok(points)
166}
167
168#[utoipa::path(
170 get,
171 path = "/collections/{name}/points/{id}",
172 tag = "points",
173 params(
174 ("name" = String, Path, description = "Collection name"),
175 ("id" = u64, Path, description = "Point ID")
176 ),
177 responses(
178 (status = 200, description = "Point found", body = Object),
179 (status = 404, description = "Point or collection not found", body = ErrorResponse)
180 )
181)]
182pub async fn get_point(
183 State(state): State<Arc<AppState>>,
184 Path((name, id)): Path<(String, u64)>,
185) -> impl IntoResponse {
186 let collection = match get_vector_collection_or_404(&state, &name) {
187 Ok(c) => c,
188 Err(resp) => return resp,
189 };
190
191 let points = collection.get(&[id]);
192
193 match points.into_iter().next().flatten() {
194 Some(point) => Json(serde_json::json!({
195 "id": point.id,
196 "vector": point.vector,
197 "payload": point.payload
198 }))
199 .into_response(),
200 None => auto_core_error_response(&velesdb_core::Error::PointNotFound(id)),
204 }
205}
206
207#[utoipa::path(
209 delete,
210 path = "/collections/{name}/points/{id}",
211 tag = "points",
212 params(
213 ("name" = String, Path, description = "Collection name"),
214 ("id" = u64, Path, description = "Point ID")
215 ),
216 responses(
217 (status = 200, description = "Point deleted", body = Object),
218 (status = 404, description = "Point or collection not found", body = ErrorResponse)
219 )
220)]
221pub async fn delete_point(
222 State(state): State<Arc<AppState>>,
223 Path((name, id)): Path<(String, u64)>,
224) -> impl IntoResponse {
225 let collection = match get_vector_collection_or_404(&state, &name) {
226 Ok(c) => c,
227 Err(resp) => return resp,
228 };
229
230 match collection.delete(&[id]) {
231 Ok(()) => Json(serde_json::json!({
232 "message": "Point deleted",
233 "id": id
234 }))
235 .into_response(),
236 Err(e) => auto_core_error_response(&e),
237 }
238}
239
240const MAX_SCROLL_BATCH_SIZE: u32 = 10_000;
242
243#[utoipa::path(
245 post,
246 path = "/collections/{name}/points/scroll",
247 tag = "points",
248 params(("name" = String, Path, description = "Collection name")),
249 request_body = ScrollRequest,
250 responses(
251 (status = 200, description = "Scroll batch", body = ScrollResponse),
252 (status = 400, description = "Invalid request", body = ErrorResponse),
253 (status = 404, description = "Collection not found", body = ErrorResponse)
254 )
255)]
256pub async fn scroll_points(
257 State(state): State<Arc<AppState>>,
258 Path(name): Path<String>,
259 Json(req): Json<ScrollRequest>,
260) -> impl IntoResponse {
261 if req.batch_size == 0 || req.batch_size > MAX_SCROLL_BATCH_SIZE {
262 return error_response(
263 StatusCode::BAD_REQUEST,
264 "batch_size must be between 1 and 10000".to_string(),
265 );
266 }
267
268 let collection = match get_vector_collection_or_404(&state, &name) {
269 Ok(c) => c,
270 Err(resp) => return resp,
271 };
272
273 let filter = match parse_scroll_filter(&req.filter) {
274 Ok(f) => f,
275 Err(resp) => return resp,
276 };
277
278 let batch_size = req.batch_size as usize;
279 let cursor = req.cursor;
280
281 let result = tokio::task::spawn_blocking(move || {
283 collection.scroll_batch(cursor, batch_size, filter.as_ref())
284 })
285 .await;
286
287 match result {
288 Ok(Ok(batch)) => build_scroll_response(batch),
289 Ok(Err(e)) => auto_core_error_response(&e),
290 Err(e) => error_response(
291 StatusCode::INTERNAL_SERVER_ERROR,
292 format!("Task panicked: {e}"),
293 ),
294 }
295}
296
297#[allow(clippy::result_large_err)]
299fn parse_scroll_filter(
300 filter_json: &Option<serde_json::Value>,
301) -> Result<Option<velesdb_core::Filter>, axum::response::Response> {
302 let Some(ref json) = filter_json else {
303 return Ok(None);
304 };
305 serde_json::from_value::<velesdb_core::Filter>(json.clone())
306 .map(Some)
307 .map_err(|e| error_response(StatusCode::BAD_REQUEST, format!("Invalid filter: {e}")))
308}
309
310fn build_scroll_response(batch: velesdb_core::ScrollBatch) -> axum::response::Response {
312 let points: Vec<ScrollPoint> = batch
313 .points
314 .into_iter()
315 .map(|p| ScrollPoint {
316 id: p.id,
317 vector: p.vector,
318 payload: p.payload,
319 })
320 .collect();
321 Json(ScrollResponse {
322 next_cursor: batch.next_cursor,
323 points,
324 })
325 .into_response()
326}
327
328const MAX_BULK_DELETE_SIZE: usize = 10_000;
330
331#[derive(serde::Deserialize, utoipa::ToSchema)]
333pub struct BulkDeleteRequest {
334 pub ids: Vec<u64>,
336}
337
338#[utoipa::path(
359 post,
360 path = "/collections/{name}/points/delete",
361 tag = "points",
362 params(
363 ("name" = String, Path, description = "Collection name")
364 ),
365 request_body = BulkDeleteRequest,
366 responses(
367 (status = 200, description = "Points deleted", body = Object),
368 (status = 400, description = "Batch too large", body = ErrorResponse),
369 (status = 404, description = "Collection not found", body = ErrorResponse),
370 (status = 500, description = "Delete failed", body = ErrorResponse)
371 )
372)]
373pub async fn bulk_delete_points(
374 State(state): State<Arc<AppState>>,
375 Path(name): Path<String>,
376 Json(req): Json<BulkDeleteRequest>,
377) -> impl IntoResponse {
378 if req.ids.is_empty() {
379 return Json(serde_json::json!({
380 "message": "No points to delete",
381 "collection": name,
382 "deleted_count": 0
383 }))
384 .into_response();
385 }
386
387 if req.ids.len() > MAX_BULK_DELETE_SIZE {
388 return error_response(
389 StatusCode::BAD_REQUEST,
390 format!(
391 "Batch too large: {} IDs (max {MAX_BULK_DELETE_SIZE})",
392 req.ids.len()
393 ),
394 );
395 }
396
397 let collection = match get_vector_collection_or_404(&state, &name) {
398 Ok(c) => c,
399 Err(resp) => return resp,
400 };
401
402 let ids = req.ids;
403 let count = ids.len();
404 let coll_name = name.clone();
405
406 let result = tokio::task::spawn_blocking(move || collection.delete(&ids)).await;
407 match result {
408 Ok(Ok(())) => Json(serde_json::json!({
409 "message": "Points deleted",
410 "collection": coll_name,
411 "deleted_count": count
412 }))
413 .into_response(),
414 Ok(Err(e)) => auto_core_error_response(&e),
415 Err(join_err) => error_response(
416 StatusCode::INTERNAL_SERVER_ERROR,
417 format!("bulk_delete task panicked: {join_err}"),
418 ),
419 }
420}