1pub mod streaming;
4
5pub use streaming::{
6 __path_stream_insert, __path_stream_upsert_points, stream_insert, stream_upsert_points,
7};
8
9use axum::{
10 extract::{Path, State},
11 http::StatusCode,
12 response::IntoResponse,
13 Json,
14};
15use std::sync::Arc;
16
17use crate::types::{
18 ErrorResponse, ScrollPoint, ScrollRequest, ScrollResponse, SparseVectorInput,
19 UpsertPointsRequest,
20};
21use crate::AppState;
22use velesdb_core::Point;
23
24use crate::handlers::helpers::{
25 auto_core_error_response, error_response, get_vector_collection_or_404,
26};
27
28use velesdb_core::index::sparse::SparseVector;
29
30fn convert_sparse_inputs(
35 sparse_vector: Option<SparseVectorInput>,
36 sparse_vectors: Option<std::collections::BTreeMap<String, SparseVectorInput>>,
37) -> Result<Option<std::collections::BTreeMap<String, SparseVector>>, String> {
38 let has_single = sparse_vector.is_some();
39 let has_named = sparse_vectors.as_ref().is_some_and(|m| !m.is_empty());
40
41 if !has_single && !has_named {
42 return Ok(None);
43 }
44
45 let mut result = std::collections::BTreeMap::new();
46
47 if let Some(sv_input) = sparse_vector {
49 let sv = sv_input.into_sparse_vector()?;
50 result.insert(String::new(), sv);
51 }
52
53 if let Some(named) = sparse_vectors {
57 for (name, sv_input) in named {
58 let sv = sv_input
59 .into_sparse_vector()
60 .map_err(|e| format!("sparse_vectors['{name}']: {e}"))?;
61 if name.is_empty() && result.contains_key("") {
62 tracing::debug!(
63 "sparse_vector (default \"\") is being overwritten by \
64 sparse_vectors[\"\"] — supply only one to avoid ambiguity"
65 );
66 }
67 result.insert(name, sv);
68 }
69 }
70
71 Ok(Some(result))
72}
73
74#[utoipa::path(
76 post,
77 path = "/collections/{name}/points",
78 tag = "points",
79 params(
80 ("name" = String, Path, description = "Collection name")
81 ),
82 request_body = UpsertPointsRequest,
83 responses(
84 (status = 200, description = "Points upserted", body = Object),
85 (status = 404, description = "Collection not found", body = ErrorResponse),
86 (status = 400, description = "Invalid request", body = ErrorResponse)
87 )
88)]
89pub async fn upsert_points(
90 State(state): State<Arc<AppState>>,
91 Path(name): Path<String>,
92 Json(req): Json<UpsertPointsRequest>,
93) -> impl IntoResponse {
94 let collection = match get_vector_collection_or_404(&state, &name) {
95 Ok(c) => c,
96 Err(resp) => return resp,
97 };
98
99 let points = match build_points_from_request(req) {
100 Ok(p) => p,
101 Err(e) => {
102 return error_response(StatusCode::BAD_REQUEST, e);
103 }
104 };
105
106 let result = tokio::task::spawn_blocking(move || collection.upsert_bulk(&points)).await;
109
110 match result {
111 Ok(Ok(inserted)) => {
112 state.db.notify_upsert(&name, inserted);
113 Json(serde_json::json!({
114 "message": "Points upserted",
115 "count": inserted
116 }))
117 .into_response()
118 }
119 Ok(Err(e)) => auto_core_error_response(&e),
120 Err(e) => error_response(
121 StatusCode::INTERNAL_SERVER_ERROR,
122 format!("Task panicked: {e}"),
123 ),
124 }
125}
126
127fn build_points_from_request(req: UpsertPointsRequest) -> Result<Vec<Point>, String> {
129 let mut points: Vec<Point> = Vec::with_capacity(req.points.len());
130 for p in req.points {
131 let sparse = convert_sparse_inputs(p.sparse_vector, p.sparse_vectors)?;
132 let mut point = Point::new(p.id, p.vector, p.payload);
133 point.sparse_vectors = sparse;
134 points.push(point);
135 }
136 Ok(points)
137}
138
139#[utoipa::path(
141 get,
142 path = "/collections/{name}/points/{id}",
143 tag = "points",
144 params(
145 ("name" = String, Path, description = "Collection name"),
146 ("id" = u64, Path, description = "Point ID")
147 ),
148 responses(
149 (status = 200, description = "Point found", body = Object),
150 (status = 404, description = "Point or collection not found", body = ErrorResponse)
151 )
152)]
153pub async fn get_point(
154 State(state): State<Arc<AppState>>,
155 Path((name, id)): Path<(String, u64)>,
156) -> impl IntoResponse {
157 let collection = match get_vector_collection_or_404(&state, &name) {
158 Ok(c) => c,
159 Err(resp) => return resp,
160 };
161
162 let points = collection.get(&[id]);
163
164 match points.into_iter().next().flatten() {
165 Some(point) => Json(serde_json::json!({
166 "id": point.id,
167 "vector": point.vector,
168 "payload": point.payload
169 }))
170 .into_response(),
171 None => auto_core_error_response(&velesdb_core::Error::PointNotFound(id)),
175 }
176}
177
178#[utoipa::path(
180 delete,
181 path = "/collections/{name}/points/{id}",
182 tag = "points",
183 params(
184 ("name" = String, Path, description = "Collection name"),
185 ("id" = u64, Path, description = "Point ID")
186 ),
187 responses(
188 (status = 200, description = "Point deleted", body = Object),
189 (status = 404, description = "Point or collection not found", body = ErrorResponse)
190 )
191)]
192pub async fn delete_point(
193 State(state): State<Arc<AppState>>,
194 Path((name, id)): Path<(String, u64)>,
195) -> impl IntoResponse {
196 let collection = match get_vector_collection_or_404(&state, &name) {
197 Ok(c) => c,
198 Err(resp) => return resp,
199 };
200
201 match collection.delete(&[id]) {
202 Ok(()) => Json(serde_json::json!({
203 "message": "Point deleted",
204 "id": id
205 }))
206 .into_response(),
207 Err(e) => auto_core_error_response(&e),
208 }
209}
210
211const MAX_SCROLL_BATCH_SIZE: u32 = 10_000;
213
214#[utoipa::path(
216 post,
217 path = "/collections/{name}/points/scroll",
218 tag = "points",
219 params(("name" = String, Path, description = "Collection name")),
220 request_body = ScrollRequest,
221 responses(
222 (status = 200, description = "Scroll batch", body = ScrollResponse),
223 (status = 400, description = "Invalid request", body = ErrorResponse),
224 (status = 404, description = "Collection not found", body = ErrorResponse)
225 )
226)]
227pub async fn scroll_points(
228 State(state): State<Arc<AppState>>,
229 Path(name): Path<String>,
230 Json(req): Json<ScrollRequest>,
231) -> impl IntoResponse {
232 if req.batch_size == 0 || req.batch_size > MAX_SCROLL_BATCH_SIZE {
233 return error_response(
234 StatusCode::BAD_REQUEST,
235 "batch_size must be between 1 and 10000".to_string(),
236 );
237 }
238
239 let collection = match get_vector_collection_or_404(&state, &name) {
240 Ok(c) => c,
241 Err(resp) => return resp,
242 };
243
244 let filter = match parse_scroll_filter(&req.filter) {
245 Ok(f) => f,
246 Err(resp) => return resp,
247 };
248
249 let batch_size = req.batch_size as usize;
250 let cursor = req.cursor;
251
252 let result = tokio::task::spawn_blocking(move || {
254 collection.scroll_batch(cursor, batch_size, filter.as_ref())
255 })
256 .await;
257
258 match result {
259 Ok(Ok(batch)) => build_scroll_response(batch),
260 Ok(Err(e)) => auto_core_error_response(&e),
261 Err(e) => error_response(
262 StatusCode::INTERNAL_SERVER_ERROR,
263 format!("Task panicked: {e}"),
264 ),
265 }
266}
267
268#[allow(clippy::result_large_err)]
270fn parse_scroll_filter(
271 filter_json: &Option<serde_json::Value>,
272) -> Result<Option<velesdb_core::Filter>, axum::response::Response> {
273 let Some(ref json) = filter_json else {
274 return Ok(None);
275 };
276 serde_json::from_value::<velesdb_core::Filter>(json.clone())
277 .map(Some)
278 .map_err(|e| error_response(StatusCode::BAD_REQUEST, format!("Invalid filter: {e}")))
279}
280
281fn build_scroll_response(batch: velesdb_core::ScrollBatch) -> axum::response::Response {
283 let points: Vec<ScrollPoint> = batch
284 .points
285 .into_iter()
286 .map(|p| ScrollPoint {
287 id: p.id,
288 vector: p.vector,
289 payload: p.payload,
290 })
291 .collect();
292 Json(ScrollResponse {
293 next_cursor: batch.next_cursor,
294 points,
295 })
296 .into_response()
297}
298
299const MAX_BULK_DELETE_SIZE: usize = 10_000;
301
302#[derive(serde::Deserialize, utoipa::ToSchema)]
304pub struct BulkDeleteRequest {
305 pub ids: Vec<u64>,
307}
308
309#[utoipa::path(
330 post,
331 path = "/collections/{name}/points/delete",
332 tag = "points",
333 params(
334 ("name" = String, Path, description = "Collection name")
335 ),
336 request_body = BulkDeleteRequest,
337 responses(
338 (status = 200, description = "Points deleted", body = Object),
339 (status = 400, description = "Batch too large", body = ErrorResponse),
340 (status = 404, description = "Collection not found", body = ErrorResponse),
341 (status = 500, description = "Delete failed", body = ErrorResponse)
342 )
343)]
344pub async fn bulk_delete_points(
345 State(state): State<Arc<AppState>>,
346 Path(name): Path<String>,
347 Json(req): Json<BulkDeleteRequest>,
348) -> impl IntoResponse {
349 if req.ids.is_empty() {
350 return Json(serde_json::json!({
351 "message": "No points to delete",
352 "collection": name,
353 "deleted_count": 0
354 }))
355 .into_response();
356 }
357
358 if req.ids.len() > MAX_BULK_DELETE_SIZE {
359 return error_response(
360 StatusCode::BAD_REQUEST,
361 format!(
362 "Batch too large: {} IDs (max {MAX_BULK_DELETE_SIZE})",
363 req.ids.len()
364 ),
365 );
366 }
367
368 let collection = match get_vector_collection_or_404(&state, &name) {
369 Ok(c) => c,
370 Err(resp) => return resp,
371 };
372
373 let ids = req.ids;
374 let count = ids.len();
375 let coll_name = name.clone();
376
377 let result = tokio::task::spawn_blocking(move || collection.delete(&ids)).await;
378 match result {
379 Ok(Ok(())) => Json(serde_json::json!({
380 "message": "Points deleted",
381 "collection": coll_name,
382 "deleted_count": count
383 }))
384 .into_response(),
385 Ok(Err(e)) => auto_core_error_response(&e),
386 Err(join_err) => error_response(
387 StatusCode::INTERNAL_SERVER_ERROR,
388 format!("bulk_delete task panicked: {join_err}"),
389 ),
390 }
391}