1pub mod relations;
4pub mod streaming;
5
6pub use relations::{get_point_relations, relate_points, set_point_ttl, unrelate_points};
7pub use streaming::{
8 __path_stream_insert, __path_stream_upsert_points, stream_insert, stream_upsert_points,
9};
10
11use axum::{
12 extract::{Path, State},
13 http::StatusCode,
14 response::IntoResponse,
15 Json,
16};
17use std::sync::Arc;
18
19use crate::types::{
20 ErrorResponse, ScrollPoint, ScrollRequest, ScrollResponse, SparseVectorInput,
21 UpsertPointsRequest,
22};
23use crate::AppState;
24use velesdb_core::Point;
25
26use crate::handlers::helpers::{
27 auto_core_error_response, error_response, get_vector_collection_or_404,
28};
29
30use velesdb_core::index::sparse::SparseVector;
31
32fn convert_sparse_inputs(
37 sparse_vector: Option<SparseVectorInput>,
38 sparse_vectors: Option<std::collections::BTreeMap<String, SparseVectorInput>>,
39) -> Result<Option<std::collections::BTreeMap<String, SparseVector>>, String> {
40 let has_single = sparse_vector.is_some();
41 let has_named = sparse_vectors.as_ref().is_some_and(|m| !m.is_empty());
42
43 if !has_single && !has_named {
44 return Ok(None);
45 }
46
47 let mut result = std::collections::BTreeMap::new();
48
49 if let Some(sv_input) = sparse_vector {
51 let sv = sv_input.into_sparse_vector()?;
52 result.insert(String::new(), sv);
53 }
54
55 if let Some(named) = sparse_vectors {
57 merge_named_sparse_vectors(named, &mut result)?;
58 }
59
60 Ok(Some(result))
61}
62
63fn merge_named_sparse_vectors(
69 named: std::collections::BTreeMap<String, SparseVectorInput>,
70 result: &mut std::collections::BTreeMap<String, SparseVector>,
71) -> Result<(), String> {
72 for (name, sv_input) in named {
73 let sv = sv_input
74 .into_sparse_vector()
75 .map_err(|e| format!("sparse_vectors['{name}']: {e}"))?;
76 if name.is_empty() && result.contains_key("") {
77 tracing::debug!(
78 "sparse_vector (default \"\") is being overwritten by \
79 sparse_vectors[\"\"] — supply only one to avoid ambiguity"
80 );
81 }
82 result.insert(name, sv);
83 }
84 Ok(())
85}
86
87#[utoipa::path(
89 post,
90 path = "/collections/{name}/points",
91 tag = "points",
92 params(
93 ("name" = String, Path, description = "Collection name")
94 ),
95 request_body = UpsertPointsRequest,
96 responses(
97 (status = 200, description = "Points upserted", body = Object),
98 (status = 404, description = "Collection not found", body = ErrorResponse),
99 (status = 400, description = "Invalid request", body = ErrorResponse)
100 )
101)]
102pub async fn upsert_points(
103 State(state): State<Arc<AppState>>,
104 Path(name): Path<String>,
105 Json(req): Json<UpsertPointsRequest>,
106) -> impl IntoResponse {
107 let collection = match get_vector_collection_or_404(&state, &name) {
108 Ok(c) => c,
109 Err(resp) => return resp,
110 };
111
112 let points = match build_points_from_request(req) {
113 Ok(p) => p,
114 Err(e) => {
115 return error_response(StatusCode::BAD_REQUEST, e);
116 }
117 };
118
119 let result = tokio::task::spawn_blocking(move || collection.upsert_bulk(&points)).await;
122
123 match result {
124 Ok(Ok(inserted)) => {
125 state.db.notify_upsert(&name, inserted);
126 Json(serde_json::json!({
127 "message": "Points upserted",
128 "count": inserted
129 }))
130 .into_response()
131 }
132 Ok(Err(e)) => auto_core_error_response(&e),
133 Err(e) => error_response(
134 StatusCode::INTERNAL_SERVER_ERROR,
135 format!("Task panicked: {e}"),
136 ),
137 }
138}
139
140fn build_points_from_request(req: UpsertPointsRequest) -> Result<Vec<Point>, String> {
142 let mut points: Vec<Point> = Vec::with_capacity(req.points.len());
143 for p in req.points {
144 let sparse = convert_sparse_inputs(p.sparse_vector, p.sparse_vectors)?;
145 let mut point = Point::new(p.id, p.vector, p.payload);
146 point.sparse_vectors = sparse;
147 points.push(point);
148 }
149 Ok(points)
150}
151
152#[utoipa::path(
154 get,
155 path = "/collections/{name}/points/{id}",
156 tag = "points",
157 params(
158 ("name" = String, Path, description = "Collection name"),
159 ("id" = u64, Path, description = "Point ID")
160 ),
161 responses(
162 (status = 200, description = "Point found", body = Object),
163 (status = 404, description = "Point or collection not found", body = ErrorResponse)
164 )
165)]
166pub async fn get_point(
167 State(state): State<Arc<AppState>>,
168 Path((name, id)): Path<(String, u64)>,
169) -> impl IntoResponse {
170 let collection = match get_vector_collection_or_404(&state, &name) {
171 Ok(c) => c,
172 Err(resp) => return resp,
173 };
174
175 let points = collection.get(&[id]);
176
177 match points.into_iter().next().flatten() {
178 Some(point) => Json(serde_json::json!({
179 "id": point.id,
180 "vector": point.vector,
181 "payload": point.payload
182 }))
183 .into_response(),
184 None => auto_core_error_response(&velesdb_core::Error::PointNotFound(id)),
188 }
189}
190
191#[utoipa::path(
193 delete,
194 path = "/collections/{name}/points/{id}",
195 tag = "points",
196 params(
197 ("name" = String, Path, description = "Collection name"),
198 ("id" = u64, Path, description = "Point ID")
199 ),
200 responses(
201 (status = 200, description = "Point deleted", body = Object),
202 (status = 404, description = "Point or collection not found", body = ErrorResponse)
203 )
204)]
205pub async fn delete_point(
206 State(state): State<Arc<AppState>>,
207 Path((name, id)): Path<(String, u64)>,
208) -> impl IntoResponse {
209 let collection = match get_vector_collection_or_404(&state, &name) {
210 Ok(c) => c,
211 Err(resp) => return resp,
212 };
213
214 match collection.delete(&[id]) {
215 Ok(()) => Json(serde_json::json!({
216 "message": "Point deleted",
217 "id": id
218 }))
219 .into_response(),
220 Err(e) => auto_core_error_response(&e),
221 }
222}
223
224const MAX_SCROLL_BATCH_SIZE: u32 = 10_000;
226
227#[utoipa::path(
229 post,
230 path = "/collections/{name}/points/scroll",
231 tag = "points",
232 params(("name" = String, Path, description = "Collection name")),
233 request_body = ScrollRequest,
234 responses(
235 (status = 200, description = "Scroll batch", body = ScrollResponse),
236 (status = 400, description = "Invalid request", body = ErrorResponse),
237 (status = 404, description = "Collection not found", body = ErrorResponse)
238 )
239)]
240pub async fn scroll_points(
241 State(state): State<Arc<AppState>>,
242 Path(name): Path<String>,
243 Json(req): Json<ScrollRequest>,
244) -> impl IntoResponse {
245 if req.batch_size == 0 || req.batch_size > MAX_SCROLL_BATCH_SIZE {
246 return error_response(
247 StatusCode::BAD_REQUEST,
248 "batch_size must be between 1 and 10000".to_string(),
249 );
250 }
251
252 let collection = match get_vector_collection_or_404(&state, &name) {
253 Ok(c) => c,
254 Err(resp) => return resp,
255 };
256
257 let filter = match parse_scroll_filter(&req.filter) {
258 Ok(f) => f,
259 Err(resp) => return resp,
260 };
261
262 let batch_size = req.batch_size as usize;
263 let cursor = req.cursor;
264
265 let result = tokio::task::spawn_blocking(move || {
267 collection.scroll_batch(cursor, batch_size, filter.as_ref())
268 })
269 .await;
270
271 match result {
272 Ok(Ok(batch)) => build_scroll_response(batch),
273 Ok(Err(e)) => auto_core_error_response(&e),
274 Err(e) => error_response(
275 StatusCode::INTERNAL_SERVER_ERROR,
276 format!("Task panicked: {e}"),
277 ),
278 }
279}
280
281#[allow(clippy::result_large_err)]
283fn parse_scroll_filter(
284 filter_json: &Option<serde_json::Value>,
285) -> Result<Option<velesdb_core::Filter>, axum::response::Response> {
286 let Some(ref json) = filter_json else {
287 return Ok(None);
288 };
289 serde_json::from_value::<velesdb_core::Filter>(json.clone())
290 .map(Some)
291 .map_err(|e| error_response(StatusCode::BAD_REQUEST, format!("Invalid filter: {e}")))
292}
293
294fn build_scroll_response(batch: velesdb_core::ScrollBatch) -> axum::response::Response {
296 let points: Vec<ScrollPoint> = batch
297 .points
298 .into_iter()
299 .map(|p| ScrollPoint {
300 id: p.id,
301 vector: p.vector,
302 payload: p.payload,
303 })
304 .collect();
305 Json(ScrollResponse {
306 next_cursor: batch.next_cursor,
307 points,
308 })
309 .into_response()
310}
311
312const MAX_BULK_DELETE_SIZE: usize = 10_000;
314
315#[derive(serde::Deserialize, utoipa::ToSchema)]
317pub struct BulkDeleteRequest {
318 pub ids: Vec<u64>,
320}
321
322#[utoipa::path(
343 post,
344 path = "/collections/{name}/points/delete",
345 tag = "points",
346 params(
347 ("name" = String, Path, description = "Collection name")
348 ),
349 request_body = BulkDeleteRequest,
350 responses(
351 (status = 200, description = "Points deleted", body = Object),
352 (status = 400, description = "Batch too large", body = ErrorResponse),
353 (status = 404, description = "Collection not found", body = ErrorResponse),
354 (status = 500, description = "Delete failed", body = ErrorResponse)
355 )
356)]
357pub async fn bulk_delete_points(
358 State(state): State<Arc<AppState>>,
359 Path(name): Path<String>,
360 Json(req): Json<BulkDeleteRequest>,
361) -> impl IntoResponse {
362 if req.ids.is_empty() {
363 return Json(serde_json::json!({
364 "message": "No points to delete",
365 "collection": name,
366 "deleted_count": 0
367 }))
368 .into_response();
369 }
370
371 if req.ids.len() > MAX_BULK_DELETE_SIZE {
372 return error_response(
373 StatusCode::BAD_REQUEST,
374 format!(
375 "Batch too large: {} IDs (max {MAX_BULK_DELETE_SIZE})",
376 req.ids.len()
377 ),
378 );
379 }
380
381 let collection = match get_vector_collection_or_404(&state, &name) {
382 Ok(c) => c,
383 Err(resp) => return resp,
384 };
385
386 let ids = req.ids;
387 let count = ids.len();
388 let coll_name = name.clone();
389
390 let result = tokio::task::spawn_blocking(move || collection.delete(&ids)).await;
391 match result {
392 Ok(Ok(())) => Json(serde_json::json!({
393 "message": "Points deleted",
394 "collection": coll_name,
395 "deleted_count": count
396 }))
397 .into_response(),
398 Ok(Err(e)) => auto_core_error_response(&e),
399 Err(join_err) => error_response(
400 StatusCode::INTERNAL_SERVER_ERROR,
401 format!("bulk_delete task panicked: {join_err}"),
402 ),
403 }
404}