1use actix_web::{web, App, HttpServer, HttpResponse, Result as ActixResult};
2use actix_cors::Cors;
3use actix_files::Files;
4use actix_multipart::Multipart;
5use chrono::Utc;
6use vectx_core::{CollectionConfig, Collection, Distance, Point, PointId, Vector, PayloadFilter, FilterCondition, Filter, MultiVector};
7use vectx_storage::StorageManager;
8use serde::{Deserialize, Deserializer, Serialize};
9use std::sync::Arc;
10use std::path::Path;
11use std::collections::HashMap;
12use std::time::Instant;
13use futures_util::StreamExt;
14
15fn qdrant_response<T: Serialize>(result: T, start_time: Instant) -> HttpResponse {
17 let elapsed = start_time.elapsed().as_secs_f64();
18 HttpResponse::Ok().json(serde_json::json!({
19 "result": result,
20 "status": "ok",
21 "time": elapsed
22 }))
23}
24
25fn qdrant_error(error: &str, start_time: Instant) -> HttpResponse {
27 let elapsed = start_time.elapsed().as_secs_f64();
28 HttpResponse::BadRequest().json(serde_json::json!({
29 "status": {
30 "error": error
31 },
32 "time": elapsed
33 }))
34}
35
36fn qdrant_not_found(error: &str, start_time: Instant) -> HttpResponse {
38 let elapsed = start_time.elapsed().as_secs_f64();
39 HttpResponse::NotFound().json(serde_json::json!({
40 "status": {
41 "error": error
42 },
43 "time": elapsed
44 }))
45}
46
47const STATIC_DIR: &str = "./static";
49const DASHBOARD_PATH: &str = "/dashboard";
50
51#[derive(Deserialize)]
52struct CreateCollectionRequest {
53 #[serde(default, deserialize_with = "deserialize_vectors_optional")]
55 vectors: Option<VectorConfig>,
56 #[serde(default)]
57 use_hnsw: bool,
58 #[serde(default)]
59 enable_bm25: bool,
60 #[serde(default)]
62 sparse_vectors: Option<serde_json::Value>,
63}
64
65#[derive(Deserialize, Clone)]
66struct VectorConfig {
67 size: usize,
68 distance: Option<String>,
69 #[serde(default)]
71 on_disk: Option<bool>,
72 #[serde(default)]
73 hnsw_config: Option<serde_json::Value>,
74 #[serde(default)]
75 quantization_config: Option<serde_json::Value>,
76 #[serde(default)]
77 multivector_config: Option<serde_json::Value>,
78 #[serde(default)]
79 datatype: Option<String>,
80}
81
82fn deserialize_vectors_optional<'de, D>(deserializer: D) -> Result<Option<VectorConfig>, D::Error>
84where
85 D: Deserializer<'de>,
86{
87 let value = Option::<serde_json::Value>::deserialize(deserializer)?;
88
89 let Some(value) = value else {
90 return Ok(None);
91 };
92
93 if let Ok(config) = serde_json::from_value::<VectorConfig>(value.clone()) {
95 return Ok(Some(config));
96 }
97
98 if let Ok(named) = serde_json::from_value::<HashMap<String, VectorConfig>>(value.clone()) {
100 if let Some(config) = named.into_values().next() {
102 return Ok(Some(config));
103 }
104 }
105
106 Err(serde::de::Error::custom("Invalid vectors configuration: expected either {\"size\": N, \"distance\": \"...\"} or {\"name\": {\"size\": N, ...}}"))
107}
108
109#[allow(dead_code)]
111#[derive(Serialize)]
112struct CollectionInfo {
113 name: String,
114 vectors: VectorConfigResponse,
115 points_count: usize,
116}
117
118#[allow(dead_code)]
119#[derive(Serialize)]
120struct VectorConfigResponse {
121 size: usize,
122 distance: String,
123}
124
125#[derive(Deserialize)]
126struct UpsertPointsRequest {
127 points: Vec<PointRequest>,
128}
129
130struct ParsedSparseVector {
132 name: String,
134 indices: Vec<u32>,
136 values: Vec<f32>,
138}
139
140struct ParsedVector {
142 primary: Vec<f32>,
144 multivector: Option<Vec<Vec<f32>>>,
146 sparse_vectors: Vec<ParsedSparseVector>,
148}
149
150#[derive(Deserialize)]
151struct PointRequest {
152 id: serde_json::Value,
153 #[serde(default, deserialize_with = "deserialize_vector_optional")]
155 vector: Option<ParsedVector>,
156 payload: Option<serde_json::Value>,
157}
158
159fn deserialize_vector<'de, D>(deserializer: D) -> Result<ParsedVector, D::Error>
164where
165 D: Deserializer<'de>,
166{
167 let value = serde_json::Value::deserialize(deserializer)?;
168
169 fn parse_simple_vector(arr: &[serde_json::Value]) -> Result<Vec<f32>, String> {
170 arr.iter()
171 .map(|v| v.as_f64().map(|f| f as f32).ok_or_else(|| "expected f32".to_string()))
172 .collect()
173 }
174
175 fn parse_multivector(arr: &[serde_json::Value]) -> Result<Vec<Vec<f32>>, String> {
176 arr.iter()
177 .map(|sub| {
178 if let serde_json::Value::Array(sub_arr) = sub {
179 parse_simple_vector(sub_arr)
180 } else {
181 Err("expected array of arrays for multivector".to_string())
182 }
183 })
184 .collect()
185 }
186
187 match &value {
188 serde_json::Value::Array(arr) if !arr.is_empty() => {
190 match arr.first() {
191 Some(serde_json::Value::Number(_)) => {
193 let primary = parse_simple_vector(arr).map_err(serde::de::Error::custom)?;
194 Ok(ParsedVector { primary, multivector: None, sparse_vectors: Vec::new() })
195 }
196 Some(serde_json::Value::Array(_)) => {
198 let multivec = parse_multivector(arr).map_err(serde::de::Error::custom)?;
199 let primary = multivec.first().cloned().unwrap_or_default();
200 Ok(ParsedVector { primary, multivector: Some(multivec), sparse_vectors: Vec::new() })
201 }
202 _ => Err(serde::de::Error::custom("invalid vector format: expected number or array"))
203 }
204 }
205 serde_json::Value::Array(_) => {
207 Err(serde::de::Error::custom("vector cannot be empty"))
208 }
209 serde_json::Value::Object(obj) => {
212 let mut sparse_vectors = Vec::new();
213 let mut primary = vec![0.0];
214 let mut multivector = None;
215
216 for (name, vec_value) in obj.iter() {
217 match vec_value {
218 serde_json::Value::Array(arr) if !arr.is_empty() => {
220 match arr.first() {
221 Some(serde_json::Value::Number(_)) => {
222 primary = parse_simple_vector(arr).map_err(serde::de::Error::custom)?;
223 }
224 Some(serde_json::Value::Array(_)) => {
225 let multivec = parse_multivector(arr).map_err(serde::de::Error::custom)?;
227 primary = multivec.first().cloned().unwrap_or_default();
228 multivector = Some(multivec);
229 }
230 _ => {}
231 }
232 }
233 serde_json::Value::Object(sparse_obj) => {
235 if let (Some(indices_arr), Some(values_arr)) = (
236 sparse_obj.get("indices").and_then(|i| i.as_array()),
237 sparse_obj.get("values").and_then(|v| v.as_array())
238 ) {
239 let indices: Vec<u32> = indices_arr.iter()
240 .filter_map(|i| i.as_u64().map(|n| n as u32))
241 .collect();
242 let values: Vec<f32> = values_arr.iter()
243 .filter_map(|v| v.as_f64().map(|f| f as f32))
244 .collect();
245
246 if !indices.is_empty() && !values.is_empty() {
247 sparse_vectors.push(ParsedSparseVector {
248 name: name.clone(),
249 indices,
250 values,
251 });
252 }
253 }
254 }
255 serde_json::Value::Array(_) => {}
257 _ => {}
258 }
259 }
260
261 Ok(ParsedVector { primary, multivector, sparse_vectors })
262 }
263 _ => Err(serde::de::Error::custom("vector must be an array or object")),
264 }
265}
266
267fn deserialize_vector_optional<'de, D>(deserializer: D) -> Result<Option<ParsedVector>, D::Error>
269where
270 D: Deserializer<'de>,
271{
272 let value = Option::<serde_json::Value>::deserialize(deserializer)?;
273
274 let Some(value) = value else {
275 return Ok(None);
276 };
277
278 fn parse_simple_vector(arr: &[serde_json::Value]) -> Result<Vec<f32>, String> {
280 arr.iter()
281 .map(|v| v.as_f64().map(|f| f as f32).ok_or_else(|| "expected f32".to_string()))
282 .collect()
283 }
284
285 fn parse_multivector(arr: &[serde_json::Value]) -> Result<Vec<Vec<f32>>, String> {
286 arr.iter()
287 .map(|sub| {
288 if let serde_json::Value::Array(sub_arr) = sub {
289 parse_simple_vector(sub_arr)
290 } else {
291 Err("expected array of arrays for multivector".to_string())
292 }
293 })
294 .collect()
295 }
296
297 match &value {
298 serde_json::Value::Array(arr) if !arr.is_empty() => {
299 match arr.first() {
300 Some(serde_json::Value::Number(_)) => {
301 let primary = parse_simple_vector(arr).map_err(serde::de::Error::custom)?;
302 Ok(Some(ParsedVector { primary, multivector: None, sparse_vectors: Vec::new() }))
303 }
304 Some(serde_json::Value::Array(_)) => {
305 let multivec = parse_multivector(arr).map_err(serde::de::Error::custom)?;
306 let primary = multivec.first().cloned().unwrap_or_default();
307 Ok(Some(ParsedVector { primary, multivector: Some(multivec), sparse_vectors: Vec::new() }))
308 }
309 _ => Err(serde::de::Error::custom("invalid vector format"))
310 }
311 }
312 serde_json::Value::Array(_) => Ok(None), serde_json::Value::Object(obj) => {
314 let mut sparse_vectors = Vec::new();
315 let mut primary = vec![0.0];
316 let mut multivector = None;
317
318 for (name, vec_value) in obj.iter() {
319 match vec_value {
320 serde_json::Value::Array(arr) if !arr.is_empty() => {
321 match arr.first() {
322 Some(serde_json::Value::Number(_)) => {
323 primary = parse_simple_vector(arr).map_err(serde::de::Error::custom)?;
324 }
325 Some(serde_json::Value::Array(_)) => {
326 let multivec = parse_multivector(arr).map_err(serde::de::Error::custom)?;
327 primary = multivec.first().cloned().unwrap_or_default();
328 multivector = Some(multivec);
329 }
330 _ => {}
331 }
332 }
333 serde_json::Value::Object(sparse_obj) => {
334 if let (Some(indices_arr), Some(values_arr)) = (
335 sparse_obj.get("indices").and_then(|i| i.as_array()),
336 sparse_obj.get("values").and_then(|v| v.as_array())
337 ) {
338 let indices: Vec<u32> = indices_arr.iter()
339 .filter_map(|i| i.as_u64().map(|n| n as u32))
340 .collect();
341 let values: Vec<f32> = values_arr.iter()
342 .filter_map(|v| v.as_f64().map(|f| f as f32))
343 .collect();
344
345 if !indices.is_empty() && !values.is_empty() {
346 sparse_vectors.push(ParsedSparseVector {
347 name: name.clone(),
348 indices,
349 values,
350 });
351 }
352 }
353 }
354 _ => {}
355 }
356 }
357
358 Ok(Some(ParsedVector { primary, multivector, sparse_vectors }))
359 }
360 serde_json::Value::Null => Ok(None),
361 _ => Err(serde::de::Error::custom("vector must be an array, object, or null")),
362 }
363}
364
365#[derive(Deserialize)]
366struct SearchRequest {
367 vector: Option<Vec<f32>>,
368 text: Option<String>,
369 #[serde(alias = "top")]
370 limit: Option<usize>,
371 filter: Option<serde_json::Value>,
372 #[serde(default)]
373 with_payload: Option<bool>,
374 #[serde(default)]
375 with_vector: Option<bool>,
376 #[serde(default)]
377 score_threshold: Option<f32>,
378 #[serde(default)]
379 offset: Option<usize>,
380}
381
382#[allow(dead_code)]
383#[derive(Serialize)]
384struct SearchResult {
385 id: serde_json::Value,
386 score: f32,
387 payload: Option<serde_json::Value>,
388}
389
390pub struct RestApi;
391
392impl RestApi {
393 pub async fn start(
394 storage: Arc<StorageManager>,
395 port: u16,
396 ) -> std::io::Result<()> {
397 Self::start_with_static_dir(storage, port, STATIC_DIR).await
398 }
399
400 pub async fn start_with_static_dir(
401 storage: Arc<StorageManager>,
402 port: u16,
403 static_dir: &str,
404 ) -> std::io::Result<()> {
405 let static_folder = static_dir.to_string();
406
407 HttpServer::new(move || {
408 let cors = Cors::default()
409 .allow_any_origin()
410 .allow_any_method()
411 .allow_any_header()
412 .max_age(3600);
413
414 let mut app = App::new()
415 .wrap(cors)
416 .app_data(web::Data::new(storage.clone()))
417 .route("/", web::get().to(root_info))
419 .route("/healthz", web::get().to(health_check))
420 .route("/livez", web::get().to(livez_check))
421 .route("/readyz", web::get().to(readyz_check))
422 .route("/metrics", web::get().to(metrics_endpoint))
423 .route("/collections", web::get().to(list_collections))
425 .route("/collections/{name}", web::get().to(get_collection))
426 .route("/collections/{name}", web::put().to(create_collection))
427 .route("/collections/{name}", web::delete().to(delete_collection))
428 .route("/collections/{name}/points", web::put().to(upsert_points))
429 .route("/collections/{name}/points/scroll", web::post().to(scroll_points))
430 .route("/collections/{name}/points/delete", web::post().to(delete_points_by_filter))
431 .route("/collections/{name}/points/search", web::post().to(search_points))
432 .route("/collections/{name}/points/query", web::post().to(query_points))
433 .route("/collections/{name}/points/{id}", web::get().to(get_point))
434 .route("/collections/{name}/points/{id}", web::delete().to(delete_point))
435 .route("/collections/{name}/exists", web::get().to(collection_exists))
436 .route("/aliases", web::get().to(list_aliases))
438 .route("/collections/aliases", web::post().to(update_aliases))
439 .route("/collections/{name}/aliases", web::get().to(list_collection_aliases))
440 .route("/cluster", web::get().to(cluster_info))
441 .route("/collections/{name}/cluster", web::get().to(collection_cluster_info))
442 .route("/telemetry", web::get().to(telemetry_info))
443 .route("/collections/{name}/points", web::post().to(get_points_by_ids))
445 .route("/collections/{name}/points/count", web::post().to(count_points))
446 .route("/collections/{name}/points/payload", web::post().to(set_payload))
447 .route("/collections/{name}/points/payload", web::put().to(overwrite_payload))
448 .route("/collections/{name}/points/payload/delete", web::post().to(delete_payload))
449 .route("/collections/{name}/points/payload/clear", web::post().to(clear_payload))
450 .route("/collections/{name}/points/vectors", web::put().to(update_vectors))
451 .route("/collections/{name}/points/vectors/delete", web::post().to(delete_vectors))
452 .route("/collections/{name}/points/batch", web::post().to(batch_update))
453 .route("/collections/{name}/points/search/batch", web::post().to(batch_search))
454 .route("/collections/{name}/points/search/groups", web::post().to(search_groups))
455 .route("/collections/{name}/points/query/batch", web::post().to(batch_query))
456 .route("/collections/{name}/points/query/groups", web::post().to(query_groups))
457 .route("/collections/{name}/points/discover", web::post().to(discover_points))
458 .route("/collections/{name}/points/discover/batch", web::post().to(discover_batch))
459 .route("/collections/{name}/facet", web::post().to(facet_counts))
460 .route("/collections/{name}/index", web::put().to(create_field_index))
462 .route("/collections/{name}/index/{field_name}", web::delete().to(delete_field_index))
463 .route("/collections/{name}/points/recommend", web::post().to(recommend_points))
465 .route("/collections/{name}/snapshots", web::get().to(list_snapshots))
467 .route("/collections/{name}/snapshots", web::post().to(create_snapshot))
468 .route("/collections/{name}/snapshots/upload", web::post().to(upload_snapshot))
469 .route("/collections/{name}/snapshots/recover", web::put().to(recover_snapshot))
470 .route("/collections/{name}/snapshots/{snapshot_name}", web::get().to(get_snapshot))
471 .route("/collections/{name}/snapshots/{snapshot_name}", web::delete().to(delete_snapshot))
472 .route("/snapshots", web::get().to(list_all_snapshots))
474 .route("/snapshots", web::post().to(create_full_snapshot))
475 .route("/snapshots/{snapshot_name}", web::get().to(get_full_snapshot))
476 .route("/snapshots/{snapshot_name}", web::delete().to(delete_full_snapshot))
477 .route("/collections/{name}", web::patch().to(update_collection))
479 .route("/issues", web::get().to(get_issues))
481 .route("/issues", web::delete().to(clear_issues));
482
483 let static_path = Path::new(&static_folder);
485 if static_path.exists() && static_path.is_dir() {
486 app = app.service(
487 Files::new(DASHBOARD_PATH, static_folder.clone())
488 .index_file("index.html")
489 .use_last_modified(true)
490 );
491 }
492
493 app
494 })
495 .bind(("0.0.0.0", port))?
496 .run()
497 .await
498 }
499}
500
501async fn root_info() -> ActixResult<HttpResponse> {
502 Ok(HttpResponse::Ok().json(serde_json::json!({
503 "title": "vectx - vector search engine",
504 "version": "0.2.1",
505 "commit": ""
506 })))
507}
508
509async fn health_check() -> ActixResult<HttpResponse> {
510 Ok(HttpResponse::Ok().json(serde_json::json!({
511 "title": "vectx",
512 "version": "0.2.1"
513 })))
514}
515
516async fn livez_check() -> ActixResult<HttpResponse> {
518 Ok(HttpResponse::Ok()
519 .content_type("text/plain")
520 .body("healthz check passed"))
521}
522
523async fn readyz_check() -> ActixResult<HttpResponse> {
525 Ok(HttpResponse::Ok()
526 .content_type("text/plain")
527 .body("healthz check passed"))
528}
529
530async fn metrics_endpoint(
532 storage: web::Data<Arc<StorageManager>>,
533) -> ActixResult<HttpResponse> {
534 let collections = storage.list_collections();
535 let collections_count = collections.len();
536
537 let mut total_points = 0u64;
539 for name in &collections {
540 if let Some(collection) = storage.get_collection(name) {
541 total_points += collection.count() as u64;
542 }
543 }
544
545 let metrics = format!(
546 "# HELP app_info information about vectx server\n\
547 # TYPE app_info gauge\n\
548 app_info{{name=\"vectx\",version=\"{}\"}} 1\n\
549 # HELP cluster_enabled is cluster support enabled\n\
550 # TYPE cluster_enabled gauge\n\
551 cluster_enabled 0\n\
552 # HELP collections_total number of collections\n\
553 # TYPE collections_total gauge\n\
554 collections_total {}\n\
555 # HELP points_total total number of points across all collections\n\
556 # TYPE points_total gauge\n\
557 points_total {}\n",
558 env!("CARGO_PKG_VERSION"),
559 collections_count,
560 total_points
561 );
562
563 Ok(HttpResponse::Ok()
564 .content_type("text/plain")
565 .body(metrics))
566}
567
568async fn list_collections(
569 storage: web::Data<Arc<StorageManager>>,
570) -> ActixResult<HttpResponse> {
571 let start_time = Instant::now();
572 let collection_names = storage.list_collections();
573
574 let collections: Vec<serde_json::Value> = collection_names.into_iter()
576 .map(|name| serde_json::json!({ "name": name }))
577 .collect();
578
579 Ok(qdrant_response(serde_json::json!({
580 "collections": collections
581 }), start_time))
582}
583
584async fn get_collection(
585 storage: web::Data<Arc<StorageManager>>,
586 path: web::Path<String>,
587) -> ActixResult<HttpResponse> {
588 let start_time = Instant::now();
589 let name = path.into_inner();
590
591 if let Some(collection) = storage.get_collection(&name) {
592 let distance_str = format!("{:?}", collection.distance());
593 let vector_dim = collection.vector_dim();
594 let points_count = collection.count();
595
596 Ok(qdrant_response(serde_json::json!({
598 "status": "green",
599 "optimizer_status": "ok",
600 "vectors_count": points_count,
601 "indexed_vectors_count": points_count,
602 "points_count": points_count,
603 "segments_count": 1,
604 "config": {
605 "params": {
606 "vectors": {
607 "size": vector_dim,
608 "distance": distance_str
609 },
610 "shard_number": 1,
611 "replication_factor": 1,
612 "write_consistency_factor": 1,
613 "on_disk_payload": true
614 },
615 "hnsw_config": {
616 "m": 16,
617 "ef_construct": 100,
618 "full_scan_threshold": 10000,
619 "max_indexing_threads": 0,
620 "on_disk": false
621 },
622 "optimizer_config": {
623 "deleted_threshold": 0.2,
624 "vacuum_min_vector_number": 1000,
625 "default_segment_number": 0,
626 "indexing_threshold": 10000,
627 "flush_interval_sec": 5,
628 "max_segment_size": null,
629 "memmap_threshold": null,
630 "max_optimization_threads": null
631 },
632 "wal_config": {
633 "wal_capacity_mb": 32,
634 "wal_segments_ahead": 0,
635 "wal_retain_closed": 1
636 },
637 "quantization_config": null
638 },
639 "payload_schema": {}
640 }), start_time))
641 } else {
642 Ok(qdrant_not_found("Collection not found", start_time))
643 }
644}
645
646async fn create_collection(
647 storage: web::Data<Arc<StorageManager>>,
648 path: web::Path<String>,
649 req: web::Json<CreateCollectionRequest>,
650) -> ActixResult<HttpResponse> {
651 let start_time = Instant::now();
652 let name = path.into_inner();
653
654 let (vector_dim, distance) = if let Some(ref vectors) = req.vectors {
657 let dist = match vectors.distance.as_deref() {
658 Some("Cosine") | Some("cosine") => Distance::Cosine,
659 Some("Euclidean") | Some("euclidean") => Distance::Euclidean,
660 Some("Dot") | Some("dot") => Distance::Dot,
661 _ => Distance::Cosine,
662 };
663 (vectors.size, dist)
664 } else if req.sparse_vectors.is_some() {
665 (0, Distance::Cosine)
667 } else {
668 return Ok(qdrant_error("'vectors' configuration is required. Clients must provide embedding vectors.", start_time));
669 };
670
671 let config = CollectionConfig {
672 name: name.clone(),
673 vector_dim,
674 distance,
675 use_hnsw: req.use_hnsw,
676 enable_bm25: req.enable_bm25 || req.sparse_vectors.is_some(),
678 };
679
680 match storage.create_collection(config) {
681 Ok(_) => Ok(qdrant_response(true, start_time)),
682 Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
683 }
684}
685
686async fn delete_collection(
687 storage: web::Data<Arc<StorageManager>>,
688 path: web::Path<String>,
689) -> ActixResult<HttpResponse> {
690 let start_time = Instant::now();
691 let name = path.into_inner();
692
693 match storage.delete_collection(&name) {
694 Ok(true) => Ok(qdrant_response(true, start_time)),
695 Ok(false) => Ok(qdrant_not_found("Collection not found", start_time)),
696 Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
697 }
698}
699
700async fn upsert_points(
701 storage: web::Data<Arc<StorageManager>>,
702 path: web::Path<String>,
703 req: web::Json<UpsertPointsRequest>,
704) -> ActixResult<HttpResponse> {
705 let start_time = Instant::now();
706 let name = path.into_inner();
707
708 let collection = match storage.get_collection(&name) {
709 Some(c) => c,
710 None => {
711 return Ok(qdrant_not_found("Collection not found", start_time));
712 }
713 };
714
715 let points: Result<Vec<Point>, &str> = req.points.iter().map(|point_req| {
716 let id = match &point_req.id {
717 serde_json::Value::String(s) => PointId::String(s.clone()),
718 serde_json::Value::Number(n) => {
719 if let Some(u) = n.as_u64() {
720 PointId::Integer(u)
721 } else {
722 return Err("Invalid point ID");
723 }
724 }
725 _ => return Err("Invalid point ID"),
726 };
727
728 let mut point = match &point_req.vector {
730 Some(parsed_vector) => {
731 if let Some(ref multivec_data) = parsed_vector.multivector {
732 match MultiVector::new(multivec_data.clone()) {
734 Ok(mv) => Point::new_multi(id, mv, point_req.payload.clone()),
735 Err(_) => {
736 let vector = Vector::new(parsed_vector.primary.clone());
737 Point::new(id, vector, point_req.payload.clone())
738 }
739 }
740 } else {
741 let vector = Vector::new(parsed_vector.primary.clone());
743 Point::new(id, vector, point_req.payload.clone())
744 }
745 }
746 None => {
747 let vector = Vector::new(vec![]);
749 Point::new(id, vector, point_req.payload.clone())
750 }
751 };
752
753 if let Some(ref parsed_vector) = point_req.vector {
755 for sparse in &parsed_vector.sparse_vectors {
756 let sparse_vec = vectx_core::SparseVector::new(
757 sparse.indices.clone(),
758 sparse.values.clone()
759 );
760 point.add_sparse_vector(sparse.name.clone(), sparse_vec);
761 }
762 }
763
764 Ok(point)
765 }).collect();
766
767 match points {
768 Ok(points_vec) => {
769 if points_vec.len() > 1 {
770 const PREWARM_THRESHOLD: usize = 1000;
771 let should_prewarm = points_vec.len() >= PREWARM_THRESHOLD;
772
773 let result = if should_prewarm {
774 collection.batch_upsert_with_prewarm(points_vec, true)
775 } else {
776 collection.batch_upsert(points_vec)
777 };
778
779 if let Err(e) = result {
780 return Ok(qdrant_error(&e.to_string(), start_time));
781 }
782 } else if let Some(point) = points_vec.first() {
783 if let Err(e) = collection.upsert(point.clone()) {
784 return Ok(qdrant_error(&e.to_string(), start_time));
785 }
786 }
787 }
788 Err(e) => {
789 return Ok(qdrant_error(e, start_time));
790 }
791 }
792
793 let operation_id = collection.next_operation_id();
794 Ok(qdrant_response(serde_json::json!({
795 "operation_id": operation_id,
796 "status": "acknowledged"
797 }), start_time))
798}
799
800async fn search_points(
801 storage: web::Data<Arc<StorageManager>>,
802 path: web::Path<String>,
803 req: web::Json<SearchRequest>,
804) -> ActixResult<HttpResponse> {
805 let start_time = Instant::now();
806 let name = path.into_inner();
807
808 let collection = match storage.get_collection(&name) {
809 Some(c) => c,
810 None => {
811 return Ok(qdrant_not_found("Collection not found", start_time));
812 }
813 };
814
815 let limit = req.limit.unwrap_or(10);
816 let with_payload = req.with_payload.unwrap_or(true);
817 let with_vector = req.with_vector.unwrap_or(false);
818 let score_threshold = req.score_threshold;
819 let offset = req.offset.unwrap_or(0);
820
821 if let Some(text) = &req.text {
822 let results = collection.search_text(text, limit + offset);
823 let search_results: Vec<serde_json::Value> = results
824 .into_iter()
825 .skip(offset)
826 .filter(|(_, score)| score_threshold.map(|t| *score >= t).unwrap_or(true))
827 .filter_map(|(doc_id, score)| {
828 collection.get(&doc_id).map(|point| {
829 let mut result = serde_json::json!({
830 "id": point_id_to_json(&point.id),
831 "version": point.version,
832 "score": score,
833 });
834 if with_payload {
835 result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
836 }
837 if with_vector {
838 result["vector"] = serde_json::json!(point.vector.as_slice());
839 }
840 result
841 })
842 })
843 .collect();
844
845 return Ok(qdrant_response(search_results, start_time));
846 }
847
848 if let Some(vector_data) = &req.vector {
849 let query_vector = Vector::new(vector_data.clone());
850
851 let filter: Option<Box<dyn Filter>> = req.filter.as_ref().and_then(|f| {
852 parse_filter(f).map(|cond| Box::new(PayloadFilter::new(cond)) as Box<dyn Filter>)
853 });
854
855 let results = if let Some(f) = filter.as_deref() {
856 collection.search(&query_vector, limit + offset, Some(f))
857 } else {
858 collection.search(&query_vector, limit + offset, None)
859 };
860
861 let search_results: Vec<serde_json::Value> = results
862 .into_iter()
863 .skip(offset)
864 .filter(|(_, score)| score_threshold.map(|t| *score >= t).unwrap_or(true))
865 .map(|(point, score)| {
866 let mut result = serde_json::json!({
867 "id": point_id_to_json(&point.id),
868 "version": point.version,
869 "score": score,
870 });
871 if with_payload {
872 result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
873 }
874 if with_vector {
875 result["vector"] = serde_json::json!(point.vector.as_slice());
876 }
877 result
878 })
879 .collect();
880
881 return Ok(qdrant_response(search_results, start_time));
882 }
883
884 Ok(qdrant_error("Either 'vector' or 'text' must be provided", start_time))
885}
886
887fn point_id_to_json(id: &vectx_core::PointId) -> serde_json::Value {
889 match id {
890 vectx_core::PointId::String(s) => serde_json::Value::String(s.clone()),
891 vectx_core::PointId::Integer(i) => serde_json::Value::Number((*i).into()),
892 vectx_core::PointId::Uuid(u) => serde_json::Value::String(u.to_string()),
893 }
894}
895
896#[derive(Deserialize, Clone)]
898struct PrefetchQuery {
899 query: serde_json::Value,
901 #[serde(default)]
902 using: Option<String>,
903 #[serde(default)]
904 limit: Option<usize>,
905 #[serde(default)]
906 filter: Option<serde_json::Value>,
907}
908
909#[derive(Deserialize)]
912struct QueryRequest {
913 query: serde_json::Value,
915 #[serde(default)]
916 limit: Option<usize>,
917 #[serde(default)]
918 with_payload: Option<bool>,
919 #[serde(default)]
920 with_vector: Option<bool>,
921 #[serde(default)]
922 filter: Option<serde_json::Value>,
923 #[serde(default)]
925 prefetch: Option<Vec<PrefetchQuery>>,
926 #[serde(default)]
928 using: Option<String>,
929}
930
931async fn query_points(
934 storage: web::Data<Arc<StorageManager>>,
935 path: web::Path<String>,
936 req: web::Json<QueryRequest>,
937) -> ActixResult<HttpResponse> {
938 let start_time = Instant::now();
939 let name = path.into_inner();
940
941 let collection = match storage.get_collection(&name) {
942 Some(c) => c,
943 None => {
944 return Ok(qdrant_not_found("Collection not found", start_time));
945 }
946 };
947
948 let limit = req.limit.unwrap_or(10);
949 let with_payload = req.with_payload.unwrap_or(true);
950 let with_vector = req.with_vector.unwrap_or(false);
951
952 let is_fusion = req.query.as_object()
954 .and_then(|o| o.get("fusion"))
955 .is_some();
956
957 let results = if is_fusion && req.prefetch.is_some() {
958 match execute_fusion_query(&collection, &req, limit) {
960 Ok(r) => r,
961 Err(e) => return Ok(qdrant_error(&e, start_time)),
962 }
963 } else {
964 let filter: Option<Box<dyn Filter>> = req.filter.as_ref().and_then(|f| {
966 parse_filter(f).map(|cond| Box::new(PayloadFilter::new(cond)) as Box<dyn Filter>)
967 });
968
969 let using = req.using.as_deref();
971
972 match execute_simple_query(&collection, &req.query, limit, filter.as_deref(), using) {
974 Ok(r) => r,
975 Err(e) => return Ok(qdrant_error(&e, start_time)),
976 }
977 };
978
979 let search_results: Vec<serde_json::Value> = results
981 .into_iter()
982 .map(|(point, score)| {
983 let mut result = serde_json::json!({
984 "id": point_id_to_json(&point.id),
985 "version": point.version,
986 "score": score,
987 });
988
989 if with_payload {
990 result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
991 }
992
993 if with_vector {
994 result["vector"] = serde_json::json!(point.vector.as_slice());
995 if let Some(mv) = &point.multivector {
996 result["multivector"] = serde_json::json!(mv.vectors());
997 }
998 }
999
1000 result
1001 })
1002 .collect();
1003
1004 Ok(qdrant_response(serde_json::json!({
1005 "points": search_results
1006 }), start_time))
1007}
1008
1009fn execute_fusion_query(
1011 collection: &Arc<Collection>,
1012 req: &QueryRequest,
1013 limit: usize,
1014) -> Result<Vec<(Point, f32)>, String> {
1015 use std::collections::HashMap;
1016
1017 let prefetch = req.prefetch.as_ref().ok_or("Fusion requires prefetch")?;
1018
1019 let mut all_results: Vec<Vec<(Point, f32)>> = Vec::new();
1021
1022 for pf in prefetch {
1023 let pf_limit = pf.limit.unwrap_or(20);
1024 let filter: Option<Box<dyn Filter>> = pf.filter.as_ref().and_then(|f| {
1025 parse_filter(f).map(|cond| Box::new(PayloadFilter::new(cond)) as Box<dyn Filter>)
1026 });
1027
1028 let using = pf.using.as_deref();
1030 let pf_results = parse_and_search(collection, &pf.query, pf_limit, filter.as_deref(), using)?;
1031 all_results.push(pf_results);
1032 }
1033
1034 const K: f32 = 1.0;
1038
1039 let mut rrf_scores: HashMap<String, (Point, f32)> = HashMap::new();
1040
1041 for result_set in &all_results {
1042 for (rank, (point, _original_score)) in result_set.iter().enumerate() {
1043 let rrf_contribution = 1.0 / (K + rank as f32 + 1.0);
1044 let point_id = point.id.to_string();
1045
1046 rrf_scores
1047 .entry(point_id)
1048 .and_modify(|(_, score)| *score += rrf_contribution)
1049 .or_insert_with(|| (point.clone(), rrf_contribution));
1050 }
1051 }
1052
1053 let mut fused: Vec<(Point, f32)> = rrf_scores.into_values().collect();
1055 fused.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1056 fused.truncate(limit);
1057
1058 Ok(fused)
1059}
1060
1061fn parse_and_search(
1063 collection: &Arc<Collection>,
1064 query: &serde_json::Value,
1065 limit: usize,
1066 filter: Option<&dyn Filter>,
1067 using: Option<&str>,
1068) -> Result<Vec<(Point, f32)>, String> {
1069 match query {
1070 serde_json::Value::Object(obj) if obj.contains_key("indices") && obj.contains_key("values") => {
1072 let indices = obj.get("indices")
1073 .and_then(|i| i.as_array())
1074 .ok_or("Invalid sparse vector: missing indices")?;
1075 let values = obj.get("values")
1076 .and_then(|v| v.as_array())
1077 .ok_or("Invalid sparse vector: missing values")?;
1078
1079 let indices_vec: Vec<u32> = indices.iter()
1080 .filter_map(|i| i.as_u64().map(|n| n as u32))
1081 .collect();
1082 let values_vec: Vec<f32> = values.iter()
1083 .filter_map(|v| v.as_f64().map(|f| f as f32))
1084 .collect();
1085
1086 if indices_vec.is_empty() || values_vec.is_empty() {
1087 return Ok(Vec::new());
1088 }
1089
1090 let vector_name = using.unwrap_or("default");
1092 let query_sparse = vectx_core::SparseVector::new(indices_vec, values_vec);
1093
1094 Ok(collection.search_sparse(&query_sparse, vector_name, limit, filter))
1096 }
1097 serde_json::Value::Array(arr) if !arr.is_empty() => {
1099 match arr.first() {
1100 Some(serde_json::Value::Array(_)) => {
1102 let multivec_data: Result<Vec<Vec<f32>>, _> = arr.iter()
1103 .map(|sub| {
1104 if let serde_json::Value::Array(sub_arr) = sub {
1105 sub_arr.iter()
1106 .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
1107 .collect::<Result<Vec<f32>, _>>()
1108 } else {
1109 Err("expected array")
1110 }
1111 })
1112 .collect();
1113
1114 let data = multivec_data.map_err(|e| format!("Invalid multivector: {}", e))?;
1115 let query_mv = MultiVector::new(data).map_err(|e| format!("Invalid multivector: {}", e))?;
1116 Ok(collection.search_multivector(&query_mv, limit, filter))
1117 }
1118 Some(serde_json::Value::Number(_)) => {
1120 let vector_data: Result<Vec<f32>, _> = arr.iter()
1121 .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
1122 .collect();
1123
1124 let data = vector_data.map_err(|e| format!("Invalid vector: {}", e))?;
1125 let query_vector = Vector::new(data);
1126 Ok(collection.search(&query_vector, limit, filter))
1127 }
1128 _ => Err("Invalid query format".to_string())
1129 }
1130 }
1131 _ => Err("Invalid query format".to_string())
1132 }
1133}
1134
1135fn execute_simple_query(
1137 collection: &Arc<Collection>,
1138 query: &serde_json::Value,
1139 limit: usize,
1140 filter: Option<&dyn Filter>,
1141 using: Option<&str>,
1142) -> Result<Vec<(Point, f32)>, String> {
1143 match query {
1144 serde_json::Value::Number(n) => {
1146 let point_id_str = if let Some(id) = n.as_u64() {
1147 id.to_string()
1148 } else if let Some(id) = n.as_i64() {
1149 id.to_string()
1150 } else {
1151 return Err("Invalid point ID format".to_string());
1152 };
1153
1154 if let Some(source_point) = collection.get(&point_id_str) {
1156 let query_vector = source_point.vector.clone();
1157 let mut search_results = collection.search(&query_vector, limit + 1, filter);
1158 search_results.retain(|(p, _)| p.id.to_string() != point_id_str);
1160 search_results.truncate(limit);
1161 Ok(search_results)
1162 } else {
1163 Err(format!("Point with ID '{}' not found", point_id_str))
1164 }
1165 }
1166 serde_json::Value::String(s) => {
1168 if let Some(source_point) = collection.get(s) {
1169 let query_vector = source_point.vector.clone();
1170 let mut search_results = collection.search(&query_vector, limit + 1, filter);
1171 search_results.retain(|(p, _)| p.id.to_string() != *s);
1173 search_results.truncate(limit);
1174 Ok(search_results)
1175 } else {
1176 Err(format!("Point with ID '{}' not found", s))
1177 }
1178 }
1179 _ => parse_and_search(collection, query, limit, filter, using)
1181 }
1182}
1183
1184fn parse_filter(filter_json: &serde_json::Value) -> Option<FilterCondition> {
1186 if let Some(obj) = filter_json.as_object() {
1187 let mut all_conditions: Vec<FilterCondition> = Vec::new();
1188
1189 if let Some(must) = obj.get("must") {
1191 if let Some(arr) = must.as_array() {
1192 let must_conditions: Vec<FilterCondition> = arr.iter()
1193 .filter_map(parse_field_condition)
1194 .collect();
1195 if !must_conditions.is_empty() {
1196 if must_conditions.len() == 1 {
1198 all_conditions.push(must_conditions.into_iter().next().unwrap());
1199 } else {
1200 all_conditions.push(FilterCondition::And(must_conditions));
1201 }
1202 }
1203 }
1204 }
1205
1206 if let Some(should) = obj.get("should") {
1208 if let Some(arr) = should.as_array() {
1209 let should_conditions: Vec<FilterCondition> = arr.iter()
1210 .filter_map(parse_field_condition)
1211 .collect();
1212 if !should_conditions.is_empty() {
1213 if should_conditions.len() == 1 {
1215 all_conditions.push(should_conditions.into_iter().next().unwrap());
1216 } else {
1217 all_conditions.push(FilterCondition::Or(should_conditions));
1218 }
1219 }
1220 }
1221 }
1222
1223 if let Some(must_not) = obj.get("must_not") {
1225 if let Some(arr) = must_not.as_array() {
1226 for cond in arr {
1227 if let Some(fc) = parse_field_condition(cond) {
1228 all_conditions.push(FilterCondition::Not(Box::new(fc)));
1229 }
1230 }
1231 }
1232 }
1233
1234 if !all_conditions.is_empty() {
1236 return if all_conditions.len() == 1 {
1237 Some(all_conditions.into_iter().next().unwrap())
1238 } else {
1239 Some(FilterCondition::And(all_conditions))
1240 };
1241 }
1242
1243 if let Some(field) = obj.get("field").and_then(|v| v.as_str()) {
1245 if let Some(value) = obj.get("value") {
1246 if let Some(op) = obj.get("operator").and_then(|v| v.as_str()) {
1247 match op {
1248 "eq" => return Some(FilterCondition::Equals { field: field.to_string(), value: value.clone() }),
1249 "ne" => return Some(FilterCondition::NotEquals { field: field.to_string(), value: value.clone() }),
1250 "gt" => return value.as_f64().map(|v| FilterCondition::GreaterThan { field: field.to_string(), value: v }),
1251 "lt" => return value.as_f64().map(|v| FilterCondition::LessThan { field: field.to_string(), value: v }),
1252 "gte" => return value.as_f64().map(|v| FilterCondition::GreaterEqual { field: field.to_string(), value: v }),
1253 "lte" => return value.as_f64().map(|v| FilterCondition::LessEqual { field: field.to_string(), value: v }),
1254 _ => {}
1255 }
1256 }
1257 }
1258 }
1259
1260 if let Some(fc) = parse_field_condition(filter_json) {
1262 return Some(fc);
1263 }
1264 }
1265 None
1266}
1267
1268fn parse_field_condition(cond: &serde_json::Value) -> Option<FilterCondition> {
1270 let obj = cond.as_object()?;
1271 let key = obj.get("key")?.as_str()?;
1272
1273 if let Some(match_obj) = obj.get("match").and_then(|m| m.as_object()) {
1275 if let Some(value) = match_obj.get("value") {
1276 return Some(FilterCondition::Equals {
1277 field: key.to_string(),
1278 value: value.clone()
1279 });
1280 }
1281 if let Some(any_arr) = match_obj.get("any").and_then(|a| a.as_array()) {
1283 if let Some(first) = any_arr.first() {
1284 return Some(FilterCondition::Equals {
1285 field: key.to_string(),
1286 value: first.clone()
1287 });
1288 }
1289 }
1290 if let Some(text) = match_obj.get("text") {
1292 return Some(FilterCondition::Equals {
1293 field: key.to_string(),
1294 value: text.clone()
1295 });
1296 }
1297 }
1298
1299 if let Some(range_obj) = obj.get("range").and_then(|r| r.as_object()) {
1301 if let Some(gt) = range_obj.get("gt").and_then(|v| v.as_f64()) {
1302 return Some(FilterCondition::GreaterThan { field: key.to_string(), value: gt });
1303 }
1304 if let Some(gte) = range_obj.get("gte").and_then(|v| v.as_f64()) {
1305 return Some(FilterCondition::GreaterEqual { field: key.to_string(), value: gte });
1306 }
1307 if let Some(lt) = range_obj.get("lt").and_then(|v| v.as_f64()) {
1308 return Some(FilterCondition::LessThan { field: key.to_string(), value: lt });
1309 }
1310 if let Some(lte) = range_obj.get("lte").and_then(|v| v.as_f64()) {
1311 return Some(FilterCondition::LessEqual { field: key.to_string(), value: lte });
1312 }
1313 }
1314
1315 None
1316}
1317
1318fn matches_filter(point: &Point, filter: &serde_json::Value) -> bool {
1320 let obj = match filter.as_object() {
1321 Some(o) => o,
1322 None => return true, };
1324
1325 if let Some(must) = obj.get("must").and_then(|m| m.as_array()) {
1327 for cond in must {
1328 if !matches_condition(point, cond) {
1329 return false; }
1331 }
1332 }
1333
1334 if let Some(should) = obj.get("should").and_then(|s| s.as_array()) {
1336 if !should.is_empty() {
1337 let any_match = should.iter().any(|cond| matches_condition(point, cond));
1338 if !any_match {
1339 return false; }
1341 }
1342 }
1343
1344 if let Some(must_not) = obj.get("must_not").and_then(|m| m.as_array()) {
1346 for cond in must_not {
1347 if matches_condition(point, cond) {
1348 return false; }
1350 }
1351 }
1352
1353 true
1354}
1355
1356fn matches_condition(point: &Point, cond: &serde_json::Value) -> bool {
1358 let obj = match cond.as_object() {
1359 Some(o) => o,
1360 None => return false,
1361 };
1362
1363 if let Some(ids) = obj.get("has_id").and_then(|h| h.as_array()) {
1365 let point_id_str = point.id.to_string();
1366 return ids.iter().any(|id| {
1367 match id {
1368 serde_json::Value::Number(n) => n.to_string() == point_id_str,
1369 serde_json::Value::String(s) => s == &point_id_str,
1370 _ => false,
1371 }
1372 });
1373 }
1374
1375 if let Some(nested_obj) = obj.get("nested").and_then(|n| n.as_object()) {
1377 let nested_key = match nested_obj.get("key").and_then(|k| k.as_str()) {
1378 Some(k) => k,
1379 None => return false,
1380 };
1381 let nested_filter = match nested_obj.get("filter") {
1382 Some(f) => f,
1383 None => return false,
1384 };
1385
1386 let array_value = match &point.payload {
1388 Some(payload) => payload.get(nested_key).and_then(|v| v.as_array()),
1389 None => None,
1390 };
1391
1392 if let Some(arr) = array_value {
1394 return arr.iter().any(|element| {
1395 matches_nested_element(element, nested_filter)
1396 });
1397 }
1398 return false;
1399 }
1400
1401 let key = match obj.get("key").and_then(|k| k.as_str()) {
1403 Some(k) => k,
1404 None => {
1405 if obj.contains_key("must") || obj.contains_key("should") || obj.contains_key("must_not") {
1407 return matches_filter(point, cond);
1408 }
1409 return false;
1410 }
1411 };
1412
1413 let payload_value = get_nested_value(&point.payload, key);
1415
1416 if let Some(match_obj) = obj.get("match").and_then(|m| m.as_object()) {
1418 if let Some(expected) = match_obj.get("value") {
1420 return match &payload_value {
1421 Some(actual) => values_equal(actual, expected),
1422 None => false,
1423 };
1424 }
1425
1426 if let Some(any_arr) = match_obj.get("any").and_then(|a| a.as_array()) {
1428 return match &payload_value {
1429 Some(actual) => any_arr.iter().any(|expected| values_equal(actual, expected)),
1430 None => false,
1431 };
1432 }
1433
1434 if let Some(text) = match_obj.get("text").and_then(|t| t.as_str()) {
1436 return match &payload_value {
1437 Some(serde_json::Value::String(s)) => {
1438 let s_lower = s.to_lowercase();
1439 text.split_whitespace()
1441 .any(|word| s_lower.contains(&word.to_lowercase()))
1442 }
1443 _ => false,
1444 };
1445 }
1446 }
1447
1448 if let Some(range_obj) = obj.get("range").and_then(|r| r.as_object()) {
1450 let actual_num = match &payload_value {
1451 Some(serde_json::Value::Number(n)) => n.as_f64(),
1452 _ => None,
1453 };
1454
1455 if let Some(actual) = actual_num {
1456 if let Some(gt) = range_obj.get("gt").and_then(|v| v.as_f64()) {
1457 if actual <= gt { return false; }
1458 }
1459 if let Some(gte) = range_obj.get("gte").and_then(|v| v.as_f64()) {
1460 if actual < gte { return false; }
1461 }
1462 if let Some(lt) = range_obj.get("lt").and_then(|v| v.as_f64()) {
1463 if actual >= lt { return false; }
1464 }
1465 if let Some(lte) = range_obj.get("lte").and_then(|v| v.as_f64()) {
1466 if actual > lte { return false; }
1467 }
1468 return true;
1469 }
1470 return false;
1471 }
1472
1473 false
1474}
1475
1476fn get_nested_value(payload: &Option<serde_json::Value>, path: &str) -> Option<serde_json::Value> {
1478 let payload = payload.as_ref()?;
1479
1480 if path.contains("[]") {
1482 let parts: Vec<&str> = path.split("[]").collect();
1483 if parts.len() >= 2 {
1484 let array_key = parts[0];
1485 let field_path = parts[1].trim_start_matches('.');
1486
1487 if let Some(arr) = payload.get(array_key).and_then(|v| v.as_array()) {
1488 let values: Vec<serde_json::Value> = arr.iter()
1490 .filter_map(|element| {
1491 if field_path.is_empty() {
1492 Some(element.clone())
1493 } else {
1494 element.get(field_path).cloned()
1495 }
1496 })
1497 .collect();
1498
1499 if !values.is_empty() {
1500 return Some(serde_json::Value::Array(values));
1501 }
1502 }
1503 }
1504 return None;
1505 }
1506
1507 if path.contains('.') {
1509 let mut current = payload;
1510 for part in path.split('.') {
1511 current = current.get(part)?;
1512 }
1513 return Some(current.clone());
1514 }
1515
1516 payload.get(path).cloned()
1518}
1519
1520fn matches_nested_element(element: &serde_json::Value, filter: &serde_json::Value) -> bool {
1522 let filter_obj = match filter.as_object() {
1523 Some(o) => o,
1524 None => return false,
1525 };
1526
1527 if let Some(must) = filter_obj.get("must").and_then(|m| m.as_array()) {
1529 for cond in must {
1530 if !matches_element_condition(element, cond) {
1531 return false;
1532 }
1533 }
1534 }
1535
1536 if let Some(should) = filter_obj.get("should").and_then(|s| s.as_array()) {
1538 if !should.is_empty() {
1539 let any_match = should.iter().any(|cond| matches_element_condition(element, cond));
1540 if !any_match {
1541 return false;
1542 }
1543 }
1544 }
1545
1546 if let Some(must_not) = filter_obj.get("must_not").and_then(|m| m.as_array()) {
1548 for cond in must_not {
1549 if matches_element_condition(element, cond) {
1550 return false;
1551 }
1552 }
1553 }
1554
1555 true
1556}
1557
1558fn matches_element_condition(element: &serde_json::Value, cond: &serde_json::Value) -> bool {
1560 let obj = match cond.as_object() {
1561 Some(o) => o,
1562 None => return false,
1563 };
1564
1565 let key = match obj.get("key").and_then(|k| k.as_str()) {
1566 Some(k) => k,
1567 None => return false,
1568 };
1569
1570 let element_value = element.get(key);
1571
1572 if let Some(match_obj) = obj.get("match").and_then(|m| m.as_object()) {
1573 if let Some(expected) = match_obj.get("value") {
1574 return match element_value {
1575 Some(actual) => values_equal(actual, expected),
1576 None => false,
1577 };
1578 }
1579 }
1580
1581 false
1582}
1583
1584fn values_equal(a: &serde_json::Value, b: &serde_json::Value) -> bool {
1586 match (a, b) {
1587 (serde_json::Value::String(s1), serde_json::Value::String(s2)) => s1 == s2,
1588 (serde_json::Value::Number(n1), serde_json::Value::Number(n2)) => {
1589 n1.as_f64() == n2.as_f64()
1590 }
1591 (serde_json::Value::Bool(b1), serde_json::Value::Bool(b2)) => b1 == b2,
1592 (serde_json::Value::Array(arr), val) | (val, serde_json::Value::Array(arr)) => {
1593 arr.iter().any(|item| values_equal(item, val))
1595 }
1596 _ => a == b,
1597 }
1598}
1599
1600#[derive(Deserialize)]
1601struct ScrollRequest {
1602 limit: Option<usize>,
1603 offset: Option<serde_json::Value>,
1604 with_payload: Option<bool>,
1605 with_vector: Option<bool>,
1606 #[serde(default)]
1607 filter: Option<serde_json::Value>,
1608}
1609
1610async fn scroll_points(
1611 storage: web::Data<Arc<StorageManager>>,
1612 path: web::Path<String>,
1613 req: web::Json<ScrollRequest>,
1614) -> ActixResult<HttpResponse> {
1615 let start_time = Instant::now();
1616 let collection_name = path.into_inner();
1617
1618 let collection = match storage.get_collection(&collection_name) {
1619 Some(c) => c,
1620 None => {
1621 return Ok(qdrant_not_found("Collection not found", start_time));
1622 }
1623 };
1624
1625 let limit = req.limit.unwrap_or(10);
1626 let with_payload = req.with_payload.unwrap_or(true);
1627 let with_vector = req.with_vector.unwrap_or(false);
1628
1629 let offset_id: Option<i64> = req.offset.as_ref().and_then(|v| {
1631 match v {
1632 serde_json::Value::Number(n) => n.as_i64(),
1633 serde_json::Value::String(s) => s.parse().ok(),
1634 _ => None,
1635 }
1636 });
1637
1638 let all_points = collection.get_all_points();
1640
1641 let filtered_points: Vec<_> = if let Some(filter_json) = &req.filter {
1643 all_points.iter()
1644 .filter(|p| matches_filter(p, filter_json))
1645 .collect()
1646 } else {
1647 all_points.iter().collect()
1648 };
1649
1650 let mut points_with_ids: Vec<_> = filtered_points.iter()
1651 .map(|p| {
1652 let id_num: i64 = match &p.id {
1653 vectx_core::PointId::Integer(i) => *i as i64,
1654 vectx_core::PointId::String(s) => s.parse::<i64>().unwrap_or(0),
1655 vectx_core::PointId::Uuid(_) => 0,
1656 };
1657 (id_num, *p)
1658 })
1659 .collect();
1660
1661 points_with_ids.sort_by_key(|(id, _)| *id);
1662
1663 let start_idx = if let Some(offset) = offset_id {
1665 points_with_ids.iter().position(|(id, _)| *id > offset).unwrap_or(points_with_ids.len())
1666 } else {
1667 0
1668 };
1669
1670 let page: Vec<_> = points_with_ids.iter()
1672 .skip(start_idx)
1673 .take(limit)
1674 .collect();
1675
1676 let next_offset = if page.len() == limit && start_idx + limit < points_with_ids.len() {
1678 page.last().map(|(id, _)| serde_json::json!(*id))
1679 } else {
1680 None
1681 };
1682
1683 let results: Vec<serde_json::Value> = page.iter().map(|(_, point)| {
1685 let mut obj = serde_json::json!({
1686 "id": point_id_to_json(&point.id),
1687 "version": point.version,
1688 });
1689
1690 if with_payload {
1691 obj["payload"] = point.payload.clone().unwrap_or(serde_json::json!({}));
1692 }
1693 if with_vector {
1694 obj["vector"] = serde_json::json!(point.vector.as_slice());
1695 }
1696
1697 obj
1698 }).collect();
1699
1700 Ok(qdrant_response(serde_json::json!({
1701 "points": results,
1702 "next_page_offset": next_offset
1703 }), start_time))
1704}
1705
1706async fn get_point(
1707 storage: web::Data<Arc<StorageManager>>,
1708 path: web::Path<(String, String)>,
1709) -> ActixResult<HttpResponse> {
1710 let start_time = Instant::now();
1711 let (collection_name, point_id) = path.into_inner();
1712
1713 let collection = match storage.get_collection(&collection_name) {
1714 Some(c) => c,
1715 None => {
1716 return Ok(qdrant_not_found("Collection not found", start_time));
1717 }
1718 };
1719
1720 match collection.get(&point_id) {
1721 Some(point) => {
1722 let mut result = serde_json::json!({
1724 "id": point_id_to_json(&point.id),
1725 "version": point.version,
1726 "vector": point.vector.as_slice(),
1727 "payload": point.payload.clone().unwrap_or(serde_json::Value::Null),
1728 });
1729
1730 if let Some(mv) = &point.multivector {
1732 result["multivector"] = serde_json::json!(mv.vectors());
1733 }
1734
1735 Ok(qdrant_response(result, start_time))
1736 }
1737 None => Ok(qdrant_not_found("Point not found", start_time)),
1738 }
1739}
1740
1741async fn delete_point(
1742 storage: web::Data<Arc<StorageManager>>,
1743 path: web::Path<(String, String)>,
1744) -> ActixResult<HttpResponse> {
1745 let start_time = Instant::now();
1746 let (collection_name, point_id) = path.into_inner();
1747
1748 let collection = match storage.get_collection(&collection_name) {
1749 Some(c) => c,
1750 None => {
1751 return Ok(qdrant_not_found("Collection not found", start_time));
1752 }
1753 };
1754
1755 match collection.delete(&point_id) {
1756 Ok(true) => {
1757 let operation_id = collection.next_operation_id();
1758 Ok(qdrant_response(serde_json::json!({
1759 "operation_id": operation_id,
1760 "status": "acknowledged"
1761 }), start_time))
1762 }
1763 Ok(false) => Ok(qdrant_not_found("Point not found", start_time)),
1764 Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
1765 }
1766}
1767
1768#[derive(Deserialize)]
1769struct DeletePointsRequest {
1770 filter: Option<DeleteFilter>,
1771 points: Option<Vec<serde_json::Value>>,
1772}
1773
1774#[derive(Deserialize)]
1775struct DeleteFilter {
1776 must: Option<Vec<FilterMust>>,
1777}
1778
1779#[derive(Deserialize)]
1780struct FilterMust {
1781 key: String,
1782 #[serde(rename = "match")]
1783 match_value: MatchValue,
1784}
1785
1786#[derive(Deserialize)]
1787struct MatchValue {
1788 value: serde_json::Value,
1789}
1790
1791async fn delete_points_by_filter(
1792 storage: web::Data<Arc<StorageManager>>,
1793 path: web::Path<String>,
1794 req: web::Json<DeletePointsRequest>,
1795) -> ActixResult<HttpResponse> {
1796 let collection_name = path.into_inner();
1797
1798 let start_time = Instant::now();
1799 let collection = match storage.get_collection(&collection_name) {
1800 Some(c) => c,
1801 None => {
1802 return Ok(qdrant_not_found("Collection not found", start_time));
1803 }
1804 };
1805
1806 let mut _deleted_count = 0;
1807
1808 if let Some(filter) = &req.filter {
1810 if let Some(must_conditions) = &filter.must {
1811 if let Some(condition) = must_conditions.first() {
1813 let field_key = &condition.key;
1814 let match_value = &condition.match_value.value;
1815
1816 let all_points = collection.get_all_points();
1818 let mut points_to_delete = Vec::new();
1819
1820 for point in all_points {
1821 if let Some(payload) = &point.payload {
1822 if let Some(field_value) = payload.get(field_key) {
1823 if field_value == match_value {
1824 points_to_delete.push(point.id.clone());
1825 }
1826 }
1827 }
1828 }
1829
1830 for point_id in points_to_delete {
1832 let id_str = match &point_id {
1833 vectx_core::PointId::String(s) => s.clone(),
1834 vectx_core::PointId::Integer(i) => i.to_string(),
1835 vectx_core::PointId::Uuid(u) => u.to_string(),
1836 };
1837 if collection.delete(&id_str).is_ok() {
1838 _deleted_count += 1;
1839 }
1840 }
1841 }
1842 }
1843 }
1844
1845 if let Some(point_ids) = &req.points {
1847 for point_id in point_ids {
1848 let id_str = match point_id {
1849 serde_json::Value::String(s) => s.clone(),
1850 serde_json::Value::Number(n) => n.to_string(),
1851 _ => continue,
1852 };
1853 if collection.delete(&id_str).is_ok() {
1854 _deleted_count += 1;
1855 }
1856 }
1857 }
1858
1859 let operation_id = collection.next_operation_id();
1860 Ok(qdrant_response(serde_json::json!({
1861 "operation_id": operation_id,
1862 "status": "acknowledged"
1863 }), start_time))
1864}
1865
1866async fn collection_exists(
1867 storage: web::Data<Arc<StorageManager>>,
1868 path: web::Path<String>,
1869) -> ActixResult<HttpResponse> {
1870 let start_time = Instant::now();
1871 let name = path.into_inner();
1872 let exists = storage.collection_exists(&name);
1873
1874 Ok(qdrant_response(serde_json::json!({
1875 "exists": exists
1876 }), start_time))
1877}
1878
1879async fn list_aliases(
1882 storage: web::Data<Arc<StorageManager>>,
1883) -> ActixResult<HttpResponse> {
1884 let start_time = Instant::now();
1885 let aliases: Vec<serde_json::Value> = storage.list_aliases()
1886 .into_iter()
1887 .map(|(alias, collection)| serde_json::json!({
1888 "alias_name": alias,
1889 "collection_name": collection
1890 }))
1891 .collect();
1892 Ok(qdrant_response(serde_json::json!({
1893 "aliases": aliases
1894 }), start_time))
1895}
1896
1897async fn cluster_info() -> ActixResult<HttpResponse> {
1898 let start_time = Instant::now();
1899 Ok(qdrant_response(serde_json::json!({
1901 "status": "disabled"
1902 }), start_time))
1903}
1904
1905async fn telemetry_info() -> ActixResult<HttpResponse> {
1906 let start_time = Instant::now();
1907 Ok(qdrant_response(serde_json::json!({
1908 "id": "vectx-single-node",
1909 "app": {
1910 "name": "vectx",
1911 "version": "0.2.1"
1912 }
1913 }), start_time))
1914}
1915
1916async fn list_snapshots(
1919 storage: web::Data<Arc<StorageManager>>,
1920 path: web::Path<String>,
1921) -> ActixResult<HttpResponse> {
1922 let start_time = Instant::now();
1923 let collection_name = path.into_inner();
1924
1925 match storage.list_collection_snapshots(&collection_name) {
1926 Ok(snapshots) => Ok(qdrant_response(snapshots, start_time)),
1927 Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
1928 }
1929}
1930
1931async fn list_all_snapshots(
1932 storage: web::Data<Arc<StorageManager>>,
1933) -> ActixResult<HttpResponse> {
1934 let start_time = Instant::now();
1935 match storage.list_all_snapshots() {
1936 Ok(snapshots) => Ok(qdrant_response(snapshots, start_time)),
1937 Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
1938 }
1939}
1940
1941async fn create_full_snapshot(
1943 storage: web::Data<Arc<StorageManager>>,
1944) -> ActixResult<HttpResponse> {
1945 let start_time = Instant::now();
1946
1947 let collections = storage.list_collections();
1949 let mut created_snapshots = Vec::new();
1950
1951 for collection_name in collections {
1952 match storage.create_collection_snapshot(&collection_name) {
1953 Ok(snapshot) => created_snapshots.push(snapshot),
1954 Err(e) => {
1955 return Ok(qdrant_error(&format!("Failed to snapshot {}: {}", collection_name, e), start_time));
1956 }
1957 }
1958 }
1959
1960 let snapshot_name = format!("full-snapshot-{}.snapshot", Utc::now().format("%Y-%m-%d-%H-%M-%S"));
1962
1963 Ok(qdrant_response(serde_json::json!({
1964 "name": snapshot_name,
1965 "creation_time": Utc::now().to_rfc3339(),
1966 "size": 0,
1967 "collections": created_snapshots.len()
1968 }), start_time))
1969}
1970
1971async fn get_full_snapshot(
1973 _path: web::Path<String>,
1974) -> ActixResult<HttpResponse> {
1975 let start_time = Instant::now();
1976 Ok(qdrant_error("Full storage snapshot download not yet implemented", start_time))
1977}
1978
1979async fn delete_full_snapshot(
1981 _path: web::Path<String>,
1982) -> ActixResult<HttpResponse> {
1983 let start_time = Instant::now();
1984 Ok(qdrant_response(true, start_time))
1986}
1987
1988#[derive(Deserialize)]
1990struct UpdateCollectionRequest {
1991 #[serde(default)]
1992 optimizers_config: Option<serde_json::Value>,
1993 #[serde(default)]
1994 params: Option<serde_json::Value>,
1995 #[serde(default)]
1996 hnsw_config: Option<serde_json::Value>,
1997 #[serde(default)]
1998 vectors: Option<serde_json::Value>,
1999 #[serde(default)]
2000 quantization_config: Option<serde_json::Value>,
2001}
2002
2003async fn update_collection(
2004 storage: web::Data<Arc<StorageManager>>,
2005 path: web::Path<String>,
2006 _req: web::Json<UpdateCollectionRequest>,
2007) -> ActixResult<HttpResponse> {
2008 let start_time = Instant::now();
2009 let name = path.into_inner();
2010
2011 if !storage.collection_exists(&name) {
2012 return Ok(qdrant_not_found("Collection not found", start_time));
2013 }
2014
2015 Ok(qdrant_response(true, start_time))
2017}
2018
2019async fn get_issues() -> ActixResult<HttpResponse> {
2021 let start_time = Instant::now();
2022 Ok(qdrant_response(serde_json::json!({
2023 "issues": []
2024 }), start_time))
2025}
2026
2027async fn clear_issues() -> ActixResult<HttpResponse> {
2029 let start_time = Instant::now();
2030 Ok(qdrant_response(true, start_time))
2031}
2032
2033async fn create_snapshot(
2034 storage: web::Data<Arc<StorageManager>>,
2035 path: web::Path<String>,
2036) -> ActixResult<HttpResponse> {
2037 let start_time = Instant::now();
2038 let collection_name = path.into_inner();
2039
2040 if !storage.collection_exists(&collection_name) {
2042 return Ok(qdrant_not_found(&format!("Collection '{}' not found", collection_name), start_time));
2043 }
2044
2045 match storage.create_collection_snapshot(&collection_name) {
2046 Ok(snapshot) => Ok(qdrant_response(snapshot, start_time)),
2047 Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
2048 }
2049}
2050
2051#[derive(Deserialize)]
2052struct RecoverSnapshotRequest {
2053 location: String,
2054 #[serde(default)]
2055 priority: Option<String>,
2056 #[serde(default)]
2057 checksum: Option<String>,
2058}
2059
2060async fn recover_snapshot(
2061 storage: web::Data<Arc<StorageManager>>,
2062 path: web::Path<String>,
2063 req: web::Json<RecoverSnapshotRequest>,
2064) -> ActixResult<HttpResponse> {
2065 let start_time = Instant::now();
2066 let collection_name = path.into_inner();
2067 let location = &req.location;
2068
2069 fn build_recovery_result(collection: &vectx_core::Collection) -> serde_json::Value {
2071 let points_count = collection.count();
2072 let vector_dim = collection.vector_dim();
2073
2074 if points_count == 0 {
2075 serde_json::json!({
2076 "recovered": true,
2077 "collection": collection.name(),
2078 "vector_dim": vector_dim,
2079 "points_count": 0,
2080 "note": "Collection created with config only. If this was a Qdrant snapshot, points must be migrated separately using the scroll API."
2081 })
2082 } else {
2083 serde_json::json!({
2084 "recovered": true,
2085 "collection": collection.name(),
2086 "vector_dim": vector_dim,
2087 "points_count": points_count
2088 })
2089 }
2090 }
2091
2092 if location.starts_with("http://") || location.starts_with("https://") {
2094 match storage.recover_from_url(
2096 &collection_name,
2097 location,
2098 req.checksum.as_deref(),
2099 ).await {
2100 Ok(collection) => Ok(qdrant_response(build_recovery_result(&collection), start_time)),
2101 Err(e) => Ok(qdrant_error(&format!("Failed to recover from URL: {}", e), start_time)),
2102 }
2103 } else if location.starts_with("file://") {
2104 let snapshot_name = location
2106 .trim_start_matches("file://")
2107 .rsplit('/')
2108 .next()
2109 .unwrap_or(location);
2110
2111 match storage.recover_from_snapshot(&collection_name, snapshot_name) {
2112 Ok(collection) => Ok(qdrant_response(build_recovery_result(&collection), start_time)),
2113 Err(e) => Ok(qdrant_error(&format!("Failed to recover from snapshot: {}", e), start_time)),
2114 }
2115 } else {
2116 match storage.recover_from_snapshot(&collection_name, location) {
2118 Ok(collection) => Ok(qdrant_response(build_recovery_result(&collection), start_time)),
2119 Err(e) => Ok(qdrant_error(&format!("Failed to recover from snapshot: {}", e), start_time)),
2120 }
2121 }
2122}
2123
2124async fn get_snapshot(
2125 storage: web::Data<Arc<StorageManager>>,
2126 path: web::Path<(String, String)>,
2127) -> ActixResult<HttpResponse> {
2128 let start_time = Instant::now();
2129 let (collection_name, snapshot_name) = path.into_inner();
2130
2131 if let Some(snapshot_path) = storage.get_snapshot_path(&collection_name, &snapshot_name) {
2132 match std::fs::read(&snapshot_path) {
2134 Ok(data) => {
2135 Ok(HttpResponse::Ok()
2136 .content_type("application/octet-stream")
2137 .insert_header(("Content-Disposition", format!("attachment; filename=\"{}\"", snapshot_name)))
2138 .body(data))
2139 }
2140 Err(e) => Ok(qdrant_error(&format!("Failed to read snapshot file: {}", e), start_time)),
2141 }
2142 } else {
2143 Ok(qdrant_not_found(&format!("Snapshot '{}' not found in collection '{}'", snapshot_name, collection_name), start_time))
2144 }
2145}
2146
2147async fn delete_snapshot(
2148 storage: web::Data<Arc<StorageManager>>,
2149 path: web::Path<(String, String)>,
2150) -> ActixResult<HttpResponse> {
2151 let start_time = Instant::now();
2152 let (collection_name, snapshot_name) = path.into_inner();
2153
2154 match storage.delete_collection_snapshot(&collection_name, &snapshot_name) {
2155 Ok(true) => Ok(qdrant_response(true, start_time)),
2156 Ok(false) => Ok(qdrant_not_found(&format!("Snapshot '{}' not found in collection '{}'", snapshot_name, collection_name), start_time)),
2157 Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
2158 }
2159}
2160
2161async fn upload_snapshot(
2162 storage: web::Data<Arc<StorageManager>>,
2163 path: web::Path<String>,
2164 mut payload: Multipart,
2165) -> ActixResult<HttpResponse> {
2166 let start_time = Instant::now();
2167 let collection_name = path.into_inner();
2168
2169 let mut snapshot_data: Option<Vec<u8>> = None;
2170 let mut filename: Option<String> = None;
2171
2172 while let Some(item) = payload.next().await {
2174 let mut field = match item {
2175 Ok(f) => f,
2176 Err(e) => {
2177 return Ok(qdrant_error(&format!("Failed to parse multipart: {}", e), start_time));
2178 }
2179 };
2180
2181 let content_disposition = match field.content_disposition() {
2182 Some(cd) => cd,
2183 None => continue,
2184 };
2185 let field_name = content_disposition.get_name().unwrap_or("");
2186
2187 if field_name == "snapshot" {
2188 filename = content_disposition.get_filename().map(|s: &str| s.to_string());
2190
2191 let mut data = Vec::new();
2193 while let Some(chunk) = field.next().await {
2194 match chunk {
2195 Ok(bytes) => data.extend_from_slice(&bytes),
2196 Err(e) => {
2197 return Ok(qdrant_error(&format!("Failed to read file data: {}", e), start_time));
2198 }
2199 }
2200 }
2201 snapshot_data = Some(data);
2202 }
2203 }
2204
2205 let data = match snapshot_data {
2207 Some(d) => d,
2208 None => {
2209 return Ok(qdrant_error("No snapshot file provided in multipart form", start_time));
2210 }
2211 };
2212
2213 match storage.upload_and_restore_snapshot(&collection_name, &data, filename.as_deref()) {
2215 Ok(collection) => Ok(qdrant_response(serde_json::json!({
2216 "collection": collection_name,
2217 "points_count": collection.count()
2218 }), start_time)),
2219 Err(e) => Ok(qdrant_error(&format!("Failed to restore snapshot: {}", e), start_time)),
2220 }
2221}
2222
2223#[derive(Deserialize)]
2228struct UpdateAliasesRequest {
2229 actions: Vec<serde_json::Value>,
2230}
2231
2232async fn update_aliases(
2233 storage: web::Data<Arc<StorageManager>>,
2234 req: web::Json<UpdateAliasesRequest>,
2235) -> ActixResult<HttpResponse> {
2236 let start_time = Instant::now();
2237
2238 for action in &req.actions {
2239 if let Some(obj) = action.as_object() {
2240 if let Some(create) = obj.get("create_alias").and_then(|c| c.as_object()) {
2242 if let (Some(alias), Some(collection)) = (
2243 create.get("alias_name").and_then(|v| v.as_str()),
2244 create.get("collection_name").and_then(|v| v.as_str())
2245 ) {
2246 let _ = storage.create_alias(alias, collection);
2247 }
2248 }
2249
2250 if let Some(delete) = obj.get("delete_alias").and_then(|d| d.as_object()) {
2252 if let Some(alias) = delete.get("alias_name").and_then(|v| v.as_str()) {
2253 let _ = storage.delete_alias(alias);
2254 }
2255 }
2256
2257 if let Some(rename) = obj.get("rename_alias").and_then(|r| r.as_object()) {
2259 if let (Some(old_alias), Some(new_alias)) = (
2260 rename.get("old_alias_name").and_then(|v| v.as_str()),
2261 rename.get("new_alias_name").and_then(|v| v.as_str())
2262 ) {
2263 let _ = storage.rename_alias(old_alias, new_alias);
2264 }
2265 }
2266 }
2267 }
2268
2269 Ok(qdrant_response(true, start_time))
2270}
2271
2272async fn list_collection_aliases(
2274 storage: web::Data<Arc<StorageManager>>,
2275 path: web::Path<String>,
2276) -> ActixResult<HttpResponse> {
2277 let start_time = Instant::now();
2278 let collection_name = path.into_inner();
2279 let aliases: Vec<serde_json::Value> = storage.list_collection_aliases(&collection_name)
2280 .into_iter()
2281 .map(|alias| serde_json::json!({
2282 "alias_name": alias,
2283 "collection_name": collection_name
2284 }))
2285 .collect();
2286 Ok(qdrant_response(serde_json::json!({
2287 "aliases": aliases
2288 }), start_time))
2289}
2290
2291async fn collection_cluster_info(
2293 path: web::Path<String>,
2294) -> ActixResult<HttpResponse> {
2295 let start_time = Instant::now();
2296 let collection_name = path.into_inner();
2297 Ok(qdrant_response(serde_json::json!({
2298 "peer_id": 0,
2299 "shard_count": 1,
2300 "local_shards": [{
2301 "shard_id": 0,
2302 "points_count": 0,
2303 "state": "Active"
2304 }],
2305 "remote_shards": [],
2306 "shard_transfers": [],
2307 "collection_name": collection_name
2308 }), start_time))
2309}
2310
2311#[derive(Deserialize)]
2313struct GetPointsRequest {
2314 ids: Vec<serde_json::Value>,
2315 #[serde(default)]
2316 with_payload: Option<bool>,
2317 #[serde(default)]
2318 with_vector: Option<bool>,
2319}
2320
2321async fn get_points_by_ids(
2322 storage: web::Data<Arc<StorageManager>>,
2323 path: web::Path<String>,
2324 req: web::Json<GetPointsRequest>,
2325) -> ActixResult<HttpResponse> {
2326 let start_time = Instant::now();
2327 let name = path.into_inner();
2328
2329 let collection = match storage.get_collection(&name) {
2330 Some(c) => c,
2331 None => {
2332 return Ok(qdrant_not_found("Collection not found", start_time));
2333 }
2334 };
2335
2336 let with_payload = req.with_payload.unwrap_or(true);
2337 let with_vector = req.with_vector.unwrap_or(false);
2338
2339 let mut points = Vec::new();
2340 for id_value in &req.ids {
2341 let id_str = match id_value {
2342 serde_json::Value::String(s) => s.clone(),
2343 serde_json::Value::Number(n) => n.to_string(),
2344 _ => continue,
2345 };
2346
2347 if let Some(point) = collection.get(&id_str) {
2348 let mut result = serde_json::json!({
2349 "id": id_value,
2350 "version": point.version
2351 });
2352 if with_payload {
2353 result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
2354 }
2355 if with_vector {
2356 result["vector"] = serde_json::json!(point.vector.as_slice());
2357 }
2358 points.push(result);
2359 }
2360 }
2361
2362 Ok(qdrant_response(points, start_time))
2363}
2364
2365#[derive(Deserialize)]
2367struct CountRequest {
2368 #[serde(default)]
2369 filter: Option<serde_json::Value>,
2370 #[serde(default)]
2371 exact: Option<bool>,
2372}
2373
2374async fn count_points(
2375 storage: web::Data<Arc<StorageManager>>,
2376 path: web::Path<String>,
2377 _req: web::Json<CountRequest>,
2378) -> ActixResult<HttpResponse> {
2379 let start_time = Instant::now();
2380 let name = path.into_inner();
2381
2382 let collection = match storage.get_collection(&name) {
2383 Some(c) => c,
2384 None => {
2385 return Ok(qdrant_not_found("Collection not found", start_time));
2386 }
2387 };
2388
2389 Ok(qdrant_response(serde_json::json!({
2390 "count": collection.count()
2391 }), start_time))
2392}
2393
2394#[derive(Deserialize)]
2396struct SetPayloadRequest {
2397 payload: serde_json::Value,
2398 #[serde(default)]
2399 points: Option<Vec<serde_json::Value>>,
2400 #[serde(default)]
2401 filter: Option<serde_json::Value>,
2402}
2403
2404async fn set_payload(
2405 storage: web::Data<Arc<StorageManager>>,
2406 path: web::Path<String>,
2407 req: web::Json<SetPayloadRequest>,
2408) -> ActixResult<HttpResponse> {
2409 let start_time = Instant::now();
2410 let name = path.into_inner();
2411
2412 let collection = match storage.get_collection(&name) {
2413 Some(c) => c,
2414 None => return Ok(qdrant_not_found("Collection not found", start_time)),
2415 };
2416
2417 let mut updated_count = 0;
2418
2419 if let Some(point_ids) = &req.points {
2421 for id_value in point_ids {
2422 let id_str = match id_value {
2423 serde_json::Value::String(s) => s.clone(),
2424 serde_json::Value::Number(n) => n.to_string(),
2425 _ => continue,
2426 };
2427 if collection.set_payload(&id_str, req.payload.clone()).unwrap_or(false) {
2428 updated_count += 1;
2429 }
2430 }
2431 } else {
2432 let all_points = collection.get_all_points();
2434 for point in all_points {
2435 let id_str = point.id.to_string();
2436 if collection.set_payload(&id_str, req.payload.clone()).unwrap_or(false) {
2437 updated_count += 1;
2438 }
2439 }
2440 }
2441
2442 Ok(qdrant_response(serde_json::json!({
2443 "operation_id": updated_count,
2444 "status": "acknowledged"
2445 }), start_time))
2446}
2447
2448async fn overwrite_payload(
2450 storage: web::Data<Arc<StorageManager>>,
2451 path: web::Path<String>,
2452 req: web::Json<SetPayloadRequest>,
2453) -> ActixResult<HttpResponse> {
2454 let start_time = Instant::now();
2455 let name = path.into_inner();
2456
2457 let collection = match storage.get_collection(&name) {
2458 Some(c) => c,
2459 None => return Ok(qdrant_not_found("Collection not found", start_time)),
2460 };
2461
2462 let mut updated_count = 0;
2463
2464 if let Some(point_ids) = &req.points {
2465 for id_value in point_ids {
2466 let id_str = match id_value {
2467 serde_json::Value::String(s) => s.clone(),
2468 serde_json::Value::Number(n) => n.to_string(),
2469 _ => continue,
2470 };
2471 if collection.overwrite_payload(&id_str, req.payload.clone()).unwrap_or(false) {
2472 updated_count += 1;
2473 }
2474 }
2475 } else {
2476 let all_points = collection.get_all_points();
2477 for point in all_points {
2478 let id_str = point.id.to_string();
2479 if collection.overwrite_payload(&id_str, req.payload.clone()).unwrap_or(false) {
2480 updated_count += 1;
2481 }
2482 }
2483 }
2484
2485 Ok(qdrant_response(serde_json::json!({
2486 "operation_id": updated_count,
2487 "status": "acknowledged"
2488 }), start_time))
2489}
2490
2491#[derive(Deserialize)]
2493struct DeletePayloadRequest {
2494 keys: Vec<String>,
2495 #[serde(default)]
2496 points: Option<Vec<serde_json::Value>>,
2497 #[serde(default)]
2498 filter: Option<serde_json::Value>,
2499}
2500
2501async fn delete_payload(
2502 storage: web::Data<Arc<StorageManager>>,
2503 path: web::Path<String>,
2504 req: web::Json<DeletePayloadRequest>,
2505) -> ActixResult<HttpResponse> {
2506 let start_time = Instant::now();
2507 let name = path.into_inner();
2508
2509 let collection = match storage.get_collection(&name) {
2510 Some(c) => c,
2511 None => return Ok(qdrant_not_found("Collection not found", start_time)),
2512 };
2513
2514 let mut updated_count = 0;
2515
2516 if let Some(point_ids) = &req.points {
2517 for id_value in point_ids {
2518 let id_str = match id_value {
2519 serde_json::Value::String(s) => s.clone(),
2520 serde_json::Value::Number(n) => n.to_string(),
2521 _ => continue,
2522 };
2523 if collection.delete_payload_keys(&id_str, &req.keys).unwrap_or(false) {
2524 updated_count += 1;
2525 }
2526 }
2527 } else {
2528 let all_points = collection.get_all_points();
2529 for point in all_points {
2530 let id_str = point.id.to_string();
2531 if collection.delete_payload_keys(&id_str, &req.keys).unwrap_or(false) {
2532 updated_count += 1;
2533 }
2534 }
2535 }
2536
2537 Ok(qdrant_response(serde_json::json!({
2538 "operation_id": updated_count,
2539 "status": "acknowledged"
2540 }), start_time))
2541}
2542
2543#[derive(Deserialize)]
2545struct ClearPayloadRequest {
2546 #[serde(default)]
2547 points: Option<Vec<serde_json::Value>>,
2548 #[serde(default)]
2549 filter: Option<serde_json::Value>,
2550}
2551
2552async fn clear_payload(
2553 storage: web::Data<Arc<StorageManager>>,
2554 path: web::Path<String>,
2555 req: web::Json<ClearPayloadRequest>,
2556) -> ActixResult<HttpResponse> {
2557 let start_time = Instant::now();
2558 let name = path.into_inner();
2559
2560 let collection = match storage.get_collection(&name) {
2561 Some(c) => c,
2562 None => return Ok(qdrant_not_found("Collection not found", start_time)),
2563 };
2564
2565 let mut updated_count = 0;
2566
2567 if let Some(point_ids) = &req.points {
2568 for id_value in point_ids {
2569 let id_str = match id_value {
2570 serde_json::Value::String(s) => s.clone(),
2571 serde_json::Value::Number(n) => n.to_string(),
2572 _ => continue,
2573 };
2574 if collection.clear_payload(&id_str).unwrap_or(false) {
2575 updated_count += 1;
2576 }
2577 }
2578 } else {
2579 let all_points = collection.get_all_points();
2580 for point in all_points {
2581 let id_str = point.id.to_string();
2582 if collection.clear_payload(&id_str).unwrap_or(false) {
2583 updated_count += 1;
2584 }
2585 }
2586 }
2587
2588 Ok(qdrant_response(serde_json::json!({
2589 "operation_id": updated_count,
2590 "status": "acknowledged"
2591 }), start_time))
2592}
2593
2594#[derive(Deserialize)]
2596struct UpdateVectorsRequest {
2597 points: Vec<UpdateVectorPoint>,
2599}
2600
2601#[derive(Deserialize)]
2602struct UpdateVectorPoint {
2603 id: serde_json::Value,
2604 vector: serde_json::Value,
2605}
2606
2607async fn update_vectors(
2608 storage: web::Data<Arc<StorageManager>>,
2609 path: web::Path<String>,
2610 req: web::Json<UpdateVectorsRequest>,
2611) -> ActixResult<HttpResponse> {
2612 let start_time = Instant::now();
2613 let name = path.into_inner();
2614
2615 let collection = match storage.get_collection(&name) {
2616 Some(c) => c,
2617 None => return Ok(qdrant_not_found("Collection not found", start_time)),
2618 };
2619
2620 let mut updated_count = 0;
2621
2622 for point_update in &req.points {
2623 let id_str = match &point_update.id {
2624 serde_json::Value::String(s) => s.clone(),
2625 serde_json::Value::Number(n) => n.to_string(),
2626 _ => continue,
2627 };
2628
2629 let vector_data = match &point_update.vector {
2631 serde_json::Value::Array(arr) => {
2632 let vec: Result<Vec<f32>, _> = arr.iter()
2633 .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
2634 .collect();
2635 vec.ok()
2636 }
2637 serde_json::Value::Object(obj) => {
2638 if let Some((_, vec_val)) = obj.iter().next() {
2640 if let Some(arr) = vec_val.as_array() {
2641 let vec: Result<Vec<f32>, _> = arr.iter()
2642 .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
2643 .collect();
2644 vec.ok()
2645 } else {
2646 None
2647 }
2648 } else {
2649 None
2650 }
2651 }
2652 _ => None,
2653 };
2654
2655 if let Some(vec) = vector_data {
2656 let vector = Vector::new(vec);
2657 if collection.update_vector(&id_str, vector).unwrap_or(false) {
2658 updated_count += 1;
2659 }
2660 }
2661 }
2662
2663 Ok(qdrant_response(serde_json::json!({
2664 "operation_id": updated_count,
2665 "status": "acknowledged"
2666 }), start_time))
2667}
2668
2669#[derive(Deserialize)]
2671struct DeleteVectorsRequest {
2672 #[serde(default)]
2673 points: Option<Vec<serde_json::Value>>,
2674 #[serde(default)]
2675 vectors: Vec<String>,
2676 #[serde(default)]
2677 filter: Option<serde_json::Value>,
2678}
2679
2680async fn delete_vectors(
2681 storage: web::Data<Arc<StorageManager>>,
2682 path: web::Path<String>,
2683 req: web::Json<DeleteVectorsRequest>,
2684) -> ActixResult<HttpResponse> {
2685 let start_time = Instant::now();
2686 let name = path.into_inner();
2687
2688 let collection = match storage.get_collection(&name) {
2689 Some(c) => c,
2690 None => return Ok(qdrant_not_found("Collection not found", start_time)),
2691 };
2692
2693 let mut deleted_count = 0;
2694
2695 if let Some(point_ids) = &req.points {
2698 for id_value in point_ids {
2699 let id_str = match id_value {
2700 serde_json::Value::String(s) => s.clone(),
2701 serde_json::Value::Number(n) => n.to_string(),
2702 _ => continue,
2703 };
2704 if req.vectors.iter().any(|v| v == "multivector" || v.is_empty()) {
2706 if collection.update_multivector(&id_str, None).unwrap_or(false) {
2707 deleted_count += 1;
2708 }
2709 }
2710 }
2711 }
2712
2713 Ok(qdrant_response(serde_json::json!({
2714 "operation_id": deleted_count,
2715 "status": "acknowledged"
2716 }), start_time))
2717}
2718
2719#[derive(Deserialize)]
2721struct BatchUpdateRequest {
2722 operations: Vec<serde_json::Value>,
2723}
2724
2725async fn batch_update(
2726 storage: web::Data<Arc<StorageManager>>,
2727 path: web::Path<String>,
2728 req: web::Json<BatchUpdateRequest>,
2729) -> ActixResult<HttpResponse> {
2730 let start_time = Instant::now();
2731 let name = path.into_inner();
2732
2733 let collection = match storage.get_collection(&name) {
2734 Some(c) => c,
2735 None => return Ok(qdrant_not_found("Collection not found", start_time)),
2736 };
2737
2738 let mut results = Vec::new();
2739
2740 for (idx, operation) in req.operations.iter().enumerate() {
2741 let op_result = process_batch_operation(&collection, operation);
2742 results.push(serde_json::json!({
2743 "operation_id": idx,
2744 "status": if op_result { "acknowledged" } else { "failed" }
2745 }));
2746 }
2747
2748 Ok(qdrant_response(results, start_time))
2749}
2750
2751fn process_batch_operation(collection: &std::sync::Arc<vectx_core::Collection>, operation: &serde_json::Value) -> bool {
2753 let obj = match operation.as_object() {
2754 Some(o) => o,
2755 None => return false,
2756 };
2757
2758 if let Some(upsert) = obj.get("upsert") {
2760 if let Some(points) = upsert.get("points").and_then(|p| p.as_array()) {
2761 for point_json in points {
2762 if let Some(point) = parse_point_from_json(point_json) {
2763 let _ = collection.upsert(point);
2764 }
2765 }
2766 return true;
2767 }
2768 }
2769
2770 if let Some(delete) = obj.get("delete") {
2772 if let Some(points) = delete.get("points").and_then(|p| p.as_array()) {
2773 for id_val in points {
2774 let id_str = match id_val {
2775 serde_json::Value::String(s) => s.clone(),
2776 serde_json::Value::Number(n) => n.to_string(),
2777 _ => continue,
2778 };
2779 let _ = collection.delete(&id_str);
2780 }
2781 return true;
2782 }
2783 }
2784
2785 if let Some(set_payload) = obj.get("set_payload") {
2787 if let Some(payload) = set_payload.get("payload") {
2788 if let Some(points) = set_payload.get("points").and_then(|p| p.as_array()) {
2789 for id_val in points {
2790 let id_str = match id_val {
2791 serde_json::Value::String(s) => s.clone(),
2792 serde_json::Value::Number(n) => n.to_string(),
2793 _ => continue,
2794 };
2795 let _ = collection.set_payload(&id_str, payload.clone());
2796 }
2797 return true;
2798 }
2799 }
2800 }
2801
2802 if let Some(overwrite_payload) = obj.get("overwrite_payload") {
2804 if let Some(payload) = overwrite_payload.get("payload") {
2805 if let Some(points) = overwrite_payload.get("points").and_then(|p| p.as_array()) {
2806 for id_val in points {
2807 let id_str = match id_val {
2808 serde_json::Value::String(s) => s.clone(),
2809 serde_json::Value::Number(n) => n.to_string(),
2810 _ => continue,
2811 };
2812 let _ = collection.overwrite_payload(&id_str, payload.clone());
2813 }
2814 return true;
2815 }
2816 }
2817 }
2818
2819 if let Some(delete_payload) = obj.get("delete_payload") {
2821 if let Some(keys) = delete_payload.get("keys").and_then(|k| k.as_array()) {
2822 let key_strings: Vec<String> = keys.iter()
2823 .filter_map(|k| k.as_str().map(String::from))
2824 .collect();
2825 if let Some(points) = delete_payload.get("points").and_then(|p| p.as_array()) {
2826 for id_val in points {
2827 let id_str = match id_val {
2828 serde_json::Value::String(s) => s.clone(),
2829 serde_json::Value::Number(n) => n.to_string(),
2830 _ => continue,
2831 };
2832 let _ = collection.delete_payload_keys(&id_str, &key_strings);
2833 }
2834 return true;
2835 }
2836 }
2837 }
2838
2839 if let Some(clear_payload) = obj.get("clear_payload") {
2841 if let Some(points) = clear_payload.get("points").and_then(|p| p.as_array()) {
2842 for id_val in points {
2843 let id_str = match id_val {
2844 serde_json::Value::String(s) => s.clone(),
2845 serde_json::Value::Number(n) => n.to_string(),
2846 _ => continue,
2847 };
2848 let _ = collection.clear_payload(&id_str);
2849 }
2850 return true;
2851 }
2852 }
2853
2854 false
2855}
2856
2857fn parse_point_from_json(json: &serde_json::Value) -> Option<Point> {
2859 let obj = json.as_object()?;
2860
2861 let id = match obj.get("id")? {
2862 serde_json::Value::String(s) => vectx_core::PointId::String(s.clone()),
2863 serde_json::Value::Number(n) => {
2864 vectx_core::PointId::Integer(n.as_u64().unwrap_or(0))
2865 }
2866 _ => return None,
2867 };
2868
2869 let vector_data = obj.get("vector")?;
2870 let vector = match vector_data {
2871 serde_json::Value::Array(arr) => {
2872 let vec: Result<Vec<f32>, _> = arr.iter()
2873 .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
2874 .collect();
2875 Vector::new(vec.ok()?)
2876 }
2877 _ => return None,
2878 };
2879
2880 let payload = obj.get("payload").cloned();
2881
2882 Some(Point::new(id, vector, payload))
2883}
2884
2885#[derive(Deserialize)]
2887struct BatchSearchRequest {
2888 searches: Vec<serde_json::Value>,
2889}
2890
2891async fn batch_search(
2892 storage: web::Data<Arc<StorageManager>>,
2893 path: web::Path<String>,
2894 _req: web::Json<BatchSearchRequest>,
2895) -> ActixResult<HttpResponse> {
2896 let start_time = Instant::now();
2897 let name = path.into_inner();
2898
2899 if storage.get_collection(&name).is_none() {
2900 return Ok(qdrant_not_found("Collection not found", start_time));
2901 }
2902
2903 Ok(qdrant_response(Vec::<serde_json::Value>::new(), start_time))
2904}
2905
2906#[derive(Deserialize)]
2908struct SearchGroupsRequest {
2909 vector: Vec<f32>,
2910 group_by: String,
2911 #[serde(default)]
2912 limit: Option<usize>,
2913 #[serde(default)]
2914 group_size: Option<usize>,
2915 #[serde(default)]
2916 with_payload: Option<bool>,
2917 #[serde(default)]
2918 with_vector: Option<bool>,
2919 #[serde(default)]
2920 filter: Option<serde_json::Value>,
2921}
2922
2923async fn search_groups(
2924 storage: web::Data<Arc<StorageManager>>,
2925 path: web::Path<String>,
2926 req: web::Json<SearchGroupsRequest>,
2927) -> ActixResult<HttpResponse> {
2928 let start_time = Instant::now();
2929 let name = path.into_inner();
2930
2931 let collection = match storage.get_collection(&name) {
2932 Some(c) => c,
2933 None => {
2934 return Ok(qdrant_not_found("Collection not found", start_time));
2935 }
2936 };
2937
2938 let limit = req.limit.unwrap_or(5);
2939 let group_size = req.group_size.unwrap_or(3);
2940 let with_payload = req.with_payload.unwrap_or(true);
2941 let with_vector = req.with_vector.unwrap_or(false);
2942 let group_by = &req.group_by;
2943
2944 let query_vector = Vector::new(req.vector.clone());
2945 let search_results = collection.search(&query_vector, limit * group_size * 2, None);
2946
2947 let mut groups: std::collections::HashMap<String, Vec<serde_json::Value>> = std::collections::HashMap::new();
2949
2950 for (point, score) in search_results {
2951 let group_key = point.payload
2952 .as_ref()
2953 .and_then(|p| p.get(group_by))
2954 .and_then(|v| match v {
2955 serde_json::Value::String(s) => Some(s.clone()),
2956 serde_json::Value::Number(n) => Some(n.to_string()),
2957 _ => None,
2958 })
2959 .unwrap_or_else(|| "unknown".to_string());
2960
2961 let group = groups.entry(group_key).or_default();
2962
2963 if group.len() < group_size {
2964 let mut hit = serde_json::json!({
2965 "id": point_id_to_json(&point.id),
2966 "score": score
2967 });
2968
2969 if with_payload {
2970 hit["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
2971 }
2972 if with_vector {
2973 hit["vector"] = serde_json::json!(point.vector.as_slice());
2974 }
2975
2976 group.push(hit);
2977 }
2978
2979 if groups.len() >= limit && groups.values().all(|g| g.len() >= group_size) {
2980 break;
2981 }
2982 }
2983
2984 let group_results: Vec<serde_json::Value> = groups
2985 .into_iter()
2986 .take(limit)
2987 .map(|(key, hits)| serde_json::json!({ "id": key, "hits": hits }))
2988 .collect();
2989
2990 Ok(qdrant_response(serde_json::json!({
2991 "groups": group_results
2992 }), start_time))
2993}
2994
2995#[derive(Deserialize)]
2997struct DiscoverRequest {
2998 #[serde(default)]
2999 target: Option<serde_json::Value>,
3000 #[serde(default)]
3001 context: Option<Vec<ContextPair>>,
3002 #[serde(default)]
3003 limit: Option<usize>,
3004 #[serde(default)]
3005 with_payload: Option<bool>,
3006 #[serde(default)]
3007 with_vector: Option<bool>,
3008 #[serde(default)]
3009 filter: Option<serde_json::Value>,
3010}
3011
3012#[derive(Deserialize)]
3013struct ContextPair {
3014 positive: serde_json::Value,
3015 negative: serde_json::Value,
3016}
3017
3018async fn discover_points(
3019 storage: web::Data<Arc<StorageManager>>,
3020 path: web::Path<String>,
3021 req: web::Json<DiscoverRequest>,
3022) -> ActixResult<HttpResponse> {
3023 let start_time = Instant::now();
3024 let name = path.into_inner();
3025
3026 let collection = match storage.get_collection(&name) {
3027 Some(c) => c,
3028 None => {
3029 return Ok(qdrant_not_found("Collection not found", start_time));
3030 }
3031 };
3032
3033 let limit = req.limit.unwrap_or(10);
3034 let with_payload = req.with_payload.unwrap_or(true);
3035 let _with_vector = req.with_vector.unwrap_or(false);
3036
3037 let target_vector = if let Some(target) = &req.target {
3039 match target {
3040 serde_json::Value::Array(arr) => {
3041 let vec: Result<Vec<f32>, _> = arr.iter()
3042 .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
3043 .collect();
3044 vec.ok().map(Vector::new)
3045 }
3046 serde_json::Value::Number(n) => {
3047 let id = n.to_string();
3048 collection.get(&id).map(|p| p.vector.clone())
3049 }
3050 serde_json::Value::String(s) => {
3051 collection.get(s).map(|p| p.vector.clone())
3052 }
3053 _ => None,
3054 }
3055 } else {
3056 None
3057 };
3058
3059 let query = match target_vector {
3060 Some(v) => v,
3061 None => {
3062 return Ok(qdrant_error("Target vector or point ID required", start_time));
3063 }
3064 };
3065
3066 let results = collection.search(&query, limit, None);
3067
3068 let scored_points: Vec<serde_json::Value> = results.into_iter().map(|(point, score)| {
3069 let mut result = serde_json::json!({
3070 "id": point_id_to_json(&point.id),
3071 "version": point.version,
3072 "score": score,
3073 });
3074 if with_payload {
3075 result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
3076 }
3077 result
3078 }).collect();
3079
3080 Ok(qdrant_response(scored_points, start_time))
3081}
3082
3083#[derive(Deserialize)]
3085struct DiscoverBatchRequest {
3086 searches: Vec<serde_json::Value>,
3087}
3088
3089async fn discover_batch(
3090 storage: web::Data<Arc<StorageManager>>,
3091 path: web::Path<String>,
3092 _req: web::Json<DiscoverBatchRequest>,
3093) -> ActixResult<HttpResponse> {
3094 let start_time = Instant::now();
3095 let name = path.into_inner();
3096
3097 if storage.get_collection(&name).is_none() {
3098 return Ok(qdrant_not_found("Collection not found", start_time));
3099 }
3100
3101 Ok(qdrant_response(Vec::<Vec<serde_json::Value>>::new(), start_time))
3102}
3103
3104#[derive(Deserialize)]
3106struct FacetRequest {
3107 key: String,
3108 #[serde(default)]
3109 limit: Option<usize>,
3110 #[serde(default)]
3111 filter: Option<serde_json::Value>,
3112 #[serde(default)]
3113 exact: Option<bool>,
3114}
3115
3116async fn facet_counts(
3117 storage: web::Data<Arc<StorageManager>>,
3118 path: web::Path<String>,
3119 req: web::Json<FacetRequest>,
3120) -> ActixResult<HttpResponse> {
3121 let start_time = Instant::now();
3122 let name = path.into_inner();
3123
3124 let collection = match storage.get_collection(&name) {
3125 Some(c) => c,
3126 None => {
3127 return Ok(qdrant_not_found("Collection not found", start_time));
3128 }
3129 };
3130
3131 let limit = req.limit.unwrap_or(10);
3132 let key = &req.key;
3133
3134 let all_points = collection.get_all_points();
3136 let mut value_counts: std::collections::HashMap<String, u64> = std::collections::HashMap::new();
3137
3138 for point in all_points {
3139 if let Some(payload) = &point.payload {
3140 if let Some(value) = payload.get(key) {
3141 let value_str = match value {
3142 serde_json::Value::String(s) => s.clone(),
3143 serde_json::Value::Number(n) => n.to_string(),
3144 serde_json::Value::Bool(b) => b.to_string(),
3145 _ => continue,
3146 };
3147 *value_counts.entry(value_str).or_insert(0) += 1;
3148 }
3149 }
3150 }
3151
3152 let mut counts: Vec<_> = value_counts.into_iter().collect();
3154 counts.sort_by(|a, b| b.1.cmp(&a.1));
3155
3156 let hits: Vec<serde_json::Value> = counts.into_iter()
3157 .take(limit)
3158 .map(|(value, count)| serde_json::json!({
3159 "value": value,
3160 "count": count
3161 }))
3162 .collect();
3163
3164 Ok(qdrant_response(serde_json::json!({
3165 "hits": hits
3166 }), start_time))
3167}
3168
3169#[derive(Deserialize)]
3171struct BatchQueryRequest {
3172 searches: Vec<serde_json::Value>,
3173}
3174
3175async fn batch_query(
3176 storage: web::Data<Arc<StorageManager>>,
3177 path: web::Path<String>,
3178 _req: web::Json<BatchQueryRequest>,
3179) -> ActixResult<HttpResponse> {
3180 let start_time = Instant::now();
3181 let name = path.into_inner();
3182
3183 if storage.get_collection(&name).is_none() {
3184 return Ok(qdrant_not_found("Collection not found", start_time));
3185 }
3186
3187 Ok(qdrant_response(Vec::<serde_json::Value>::new(), start_time))
3188}
3189
3190#[derive(Deserialize)]
3192struct QueryGroupsRequest {
3193 query: serde_json::Value,
3194 group_by: String,
3195 #[serde(default)]
3196 limit: Option<usize>,
3197 #[serde(default)]
3198 group_size: Option<usize>,
3199 #[serde(default)]
3200 with_payload: Option<bool>,
3201 #[serde(default)]
3202 with_vector: Option<bool>,
3203 #[serde(default)]
3204 filter: Option<serde_json::Value>,
3205}
3206
3207async fn query_groups(
3208 storage: web::Data<Arc<StorageManager>>,
3209 path: web::Path<String>,
3210 req: web::Json<QueryGroupsRequest>,
3211) -> ActixResult<HttpResponse> {
3212 let start_time = Instant::now();
3213 let name = path.into_inner();
3214
3215 let collection = match storage.get_collection(&name) {
3216 Some(c) => c,
3217 None => {
3218 return Ok(qdrant_not_found("Collection not found", start_time));
3219 }
3220 };
3221
3222 let limit = req.limit.unwrap_or(5);
3223 let group_size = req.group_size.unwrap_or(3);
3224 let with_payload = req.with_payload.unwrap_or(true);
3225 let with_vector = req.with_vector.unwrap_or(false);
3226 let group_by = &req.group_by;
3227
3228 let query_vector = match &req.query {
3230 serde_json::Value::Array(arr) => {
3231 let vec: Result<Vec<f32>, _> = arr.iter()
3232 .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
3233 .collect();
3234 match vec {
3235 Ok(v) => Vector::new(v),
3236 Err(_) => {
3237 return Ok(qdrant_error("Invalid query vector", start_time));
3238 }
3239 }
3240 }
3241 _ => {
3242 return Ok(qdrant_error("Query must be a vector array", start_time));
3243 }
3244 };
3245
3246 let search_results = collection.search(&query_vector, limit * group_size * 2, None);
3248
3249 let mut groups: std::collections::HashMap<String, Vec<serde_json::Value>> = std::collections::HashMap::new();
3251
3252 for (point, score) in search_results {
3253 let group_key = point.payload
3255 .as_ref()
3256 .and_then(|p| p.get(group_by))
3257 .and_then(|v| match v {
3258 serde_json::Value::String(s) => Some(s.clone()),
3259 serde_json::Value::Number(n) => Some(n.to_string()),
3260 _ => None,
3261 })
3262 .unwrap_or_else(|| "unknown".to_string());
3263
3264 let group = groups.entry(group_key).or_default();
3265
3266 if group.len() < group_size {
3268 let mut hit = serde_json::json!({
3269 "id": point_id_to_json(&point.id),
3270 "score": score
3271 });
3272
3273 if with_payload {
3274 hit["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
3275 }
3276 if with_vector {
3277 hit["vector"] = serde_json::json!(point.vector.as_slice());
3278 }
3279
3280 group.push(hit);
3281 }
3282
3283 if groups.len() >= limit && groups.values().all(|g| g.len() >= group_size) {
3285 break;
3286 }
3287 }
3288
3289 let group_results: Vec<serde_json::Value> = groups
3291 .into_iter()
3292 .take(limit)
3293 .map(|(key, hits)| {
3294 serde_json::json!({
3295 "id": key,
3296 "hits": hits
3297 })
3298 })
3299 .collect();
3300
3301 Ok(qdrant_response(serde_json::json!({
3302 "groups": group_results
3303 }), start_time))
3304}
3305
3306#[derive(Deserialize)]
3308struct CreateIndexRequest {
3309 field_name: String,
3310 #[serde(default)]
3311 field_schema: Option<serde_json::Value>,
3312}
3313
3314async fn create_field_index(
3315 storage: web::Data<Arc<StorageManager>>,
3316 path: web::Path<String>,
3317 req: web::Json<CreateIndexRequest>,
3318) -> ActixResult<HttpResponse> {
3319 let start_time = Instant::now();
3320 let name = path.into_inner();
3321
3322 let collection = match storage.get_collection(&name) {
3323 Some(c) => c,
3324 None => return Ok(qdrant_not_found("Collection not found", start_time)),
3325 };
3326
3327 let index_type = if let Some(schema) = &req.field_schema {
3329 match schema {
3330 serde_json::Value::String(s) => match s.as_str() {
3331 "keyword" => vectx_core::PayloadIndexType::Keyword,
3332 "integer" => vectx_core::PayloadIndexType::Integer,
3333 "float" => vectx_core::PayloadIndexType::Float,
3334 "bool" => vectx_core::PayloadIndexType::Bool,
3335 "geo" => vectx_core::PayloadIndexType::Geo,
3336 "text" => vectx_core::PayloadIndexType::Text,
3337 _ => vectx_core::PayloadIndexType::Keyword,
3338 }
3339 serde_json::Value::Object(obj) => {
3340 if let Some(type_val) = obj.get("type").and_then(|v| v.as_str()) {
3341 match type_val {
3342 "keyword" => vectx_core::PayloadIndexType::Keyword,
3343 "integer" => vectx_core::PayloadIndexType::Integer,
3344 "float" => vectx_core::PayloadIndexType::Float,
3345 "bool" => vectx_core::PayloadIndexType::Bool,
3346 "geo" => vectx_core::PayloadIndexType::Geo,
3347 "text" => vectx_core::PayloadIndexType::Text,
3348 _ => vectx_core::PayloadIndexType::Keyword,
3349 }
3350 } else {
3351 vectx_core::PayloadIndexType::Keyword
3352 }
3353 }
3354 _ => vectx_core::PayloadIndexType::Keyword,
3355 }
3356 } else {
3357 vectx_core::PayloadIndexType::Keyword
3358 };
3359
3360 match collection.create_payload_index(&req.field_name, index_type) {
3361 Ok(_) => {
3362 let operation_id = collection.next_operation_id();
3363 Ok(qdrant_response(serde_json::json!({
3364 "operation_id": operation_id,
3365 "status": "acknowledged"
3366 }), start_time))
3367 }
3368 Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
3369 }
3370}
3371
3372async fn delete_field_index(
3374 storage: web::Data<Arc<StorageManager>>,
3375 path: web::Path<(String, String)>,
3376) -> ActixResult<HttpResponse> {
3377 let start_time = Instant::now();
3378 let (name, field_name) = path.into_inner();
3379
3380 let collection = match storage.get_collection(&name) {
3381 Some(c) => c,
3382 None => return Ok(qdrant_not_found("Collection not found", start_time)),
3383 };
3384
3385 match collection.delete_payload_index(&field_name) {
3386 Ok(_) => {
3387 let operation_id = collection.next_operation_id();
3388 Ok(qdrant_response(serde_json::json!({
3389 "operation_id": operation_id,
3390 "status": "acknowledged"
3391 }), start_time))
3392 }
3393 Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
3394 }
3395}
3396
3397#[derive(Deserialize)]
3402struct RecommendRequest {
3403 #[serde(default)]
3404 positive: Vec<serde_json::Value>,
3405 #[serde(default)]
3406 negative: Vec<serde_json::Value>,
3407 #[serde(default)]
3408 limit: Option<usize>,
3409 #[serde(default)]
3410 with_payload: Option<bool>,
3411 #[serde(default)]
3412 with_vector: Option<bool>,
3413 #[serde(default)]
3414 score_threshold: Option<f32>,
3415}
3416
3417async fn recommend_points(
3418 storage: web::Data<Arc<StorageManager>>,
3419 path: web::Path<String>,
3420 req: web::Json<RecommendRequest>,
3421) -> ActixResult<HttpResponse> {
3422 let start_time = Instant::now();
3423 let name = path.into_inner();
3424
3425 let collection = match storage.get_collection(&name) {
3426 Some(c) => c,
3427 None => {
3428 return Ok(qdrant_not_found("Collection not found", start_time));
3429 }
3430 };
3431
3432 let limit = req.limit.unwrap_or(10);
3433 let with_payload = req.with_payload.unwrap_or(true);
3434 let with_vector = req.with_vector.unwrap_or(false);
3435 let score_threshold = req.score_threshold;
3436
3437 let mut exclude_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
3439
3440 let parse_id = |id: &serde_json::Value| -> Option<String> {
3442 match id {
3443 serde_json::Value::String(s) => Some(s.clone()),
3444 serde_json::Value::Number(n) => Some(n.to_string()),
3445 _ => None,
3446 }
3447 };
3448
3449 let mut positive_vectors: Vec<Vec<f32>> = Vec::new();
3451 for pos_id in &req.positive {
3452 if let Some(id_str) = parse_id(pos_id) {
3453 exclude_ids.insert(id_str.clone());
3454 if let Some(point) = collection.get(&id_str) {
3455 positive_vectors.push(point.vector.as_slice().to_vec());
3456 }
3457 }
3458 }
3459
3460 if positive_vectors.is_empty() {
3461 return Ok(qdrant_error("At least one valid positive example is required", start_time));
3462 }
3463
3464 let mut negative_vectors: Vec<Vec<f32>> = Vec::new();
3466 for neg_id in &req.negative {
3467 if let Some(id_str) = parse_id(neg_id) {
3468 exclude_ids.insert(id_str.clone());
3469 if let Some(point) = collection.get(&id_str) {
3470 negative_vectors.push(point.vector.as_slice().to_vec());
3471 }
3472 }
3473 }
3474
3475 let dim = positive_vectors[0].len();
3477 let mut avg_positive = vec![0.0f32; dim];
3478 for vec in &positive_vectors {
3479 for (i, &val) in vec.iter().enumerate() {
3480 if i < dim {
3481 avg_positive[i] += val;
3482 }
3483 }
3484 }
3485 let pos_count = positive_vectors.len() as f32;
3486 for val in &mut avg_positive {
3487 *val /= pos_count;
3488 }
3489
3490 let query_vector = if !negative_vectors.is_empty() {
3493 let mut avg_negative = vec![0.0f32; dim];
3494 for vec in &negative_vectors {
3495 for (i, &val) in vec.iter().enumerate() {
3496 if i < dim {
3497 avg_negative[i] += val;
3498 }
3499 }
3500 }
3501 let neg_count = negative_vectors.len() as f32;
3502 for val in &mut avg_negative {
3503 *val /= neg_count;
3504 }
3505
3506 avg_positive.iter()
3508 .zip(avg_negative.iter())
3509 .map(|(&pos, &neg)| 2.0 * pos - neg)
3510 .collect::<Vec<f32>>()
3511 } else {
3512 avg_positive
3513 };
3514
3515 let query = Vector::new(query_vector);
3517
3518 let search_limit = limit + exclude_ids.len();
3520 let search_results = collection.search(&query, search_limit, None);
3521
3522 let mut results = Vec::with_capacity(limit);
3524 for (point, score) in search_results {
3525 let point_id_str = point.id.to_string();
3527 if exclude_ids.contains(&point_id_str) {
3528 continue;
3529 }
3530
3531 if let Some(threshold) = score_threshold {
3533 if score < threshold {
3534 continue;
3535 }
3536 }
3537
3538 let mut result = serde_json::json!({
3539 "id": point_id_to_json(&point.id),
3540 "version": point.version,
3541 "score": score
3542 });
3543
3544 if with_payload {
3545 result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
3546 }
3547 if with_vector {
3548 result["vector"] = serde_json::json!(point.vector.as_slice());
3549 }
3550
3551 results.push(result);
3552
3553 if results.len() >= limit {
3554 break;
3555 }
3556 }
3557
3558 Ok(qdrant_response(results, start_time))
3559}