vectx_api/
rest.rs

1use actix_web::{web, App, HttpServer, HttpResponse, Result as ActixResult};
2use actix_cors::Cors;
3use actix_files::Files;
4use actix_multipart::Multipart;
5use chrono::Utc;
6use vectx_core::{CollectionConfig, Collection, Distance, Point, PointId, Vector, PayloadFilter, FilterCondition, Filter, MultiVector};
7use vectx_storage::StorageManager;
8use serde::{Deserialize, Deserializer, Serialize};
9use std::sync::Arc;
10use std::path::Path;
11use std::collections::HashMap;
12use std::time::Instant;
13use futures_util::StreamExt;
14
15/// Create Qdrant-compatible JSON response with status and time
16fn qdrant_response<T: Serialize>(result: T, start_time: Instant) -> HttpResponse {
17    let elapsed = start_time.elapsed().as_secs_f64();
18    HttpResponse::Ok().json(serde_json::json!({
19        "result": result,
20        "status": "ok",
21        "time": elapsed
22    }))
23}
24
25/// Create Qdrant-compatible error response
26fn qdrant_error(error: &str, start_time: Instant) -> HttpResponse {
27    let elapsed = start_time.elapsed().as_secs_f64();
28    HttpResponse::BadRequest().json(serde_json::json!({
29        "status": {
30            "error": error
31        },
32        "time": elapsed
33    }))
34}
35
36/// Create Qdrant-compatible not found response
37fn qdrant_not_found(error: &str, start_time: Instant) -> HttpResponse {
38    let elapsed = start_time.elapsed().as_secs_f64();
39    HttpResponse::NotFound().json(serde_json::json!({
40        "status": {
41            "error": error
42        },
43        "time": elapsed
44    }))
45}
46
47// Dashboard configuration
48const STATIC_DIR: &str = "./static";
49const DASHBOARD_PATH: &str = "/dashboard";
50
51#[derive(Deserialize)]
52struct CreateCollectionRequest {
53    /// Dense vectors configuration (optional - can be omitted for sparse-only collections)
54    #[serde(default, deserialize_with = "deserialize_vectors_optional")]
55    vectors: Option<VectorConfig>,
56    #[serde(default)]
57    use_hnsw: bool,
58    #[serde(default)]
59    enable_bm25: bool,
60    // Qdrant compatibility - sparse vectors (stored but not fully implemented)
61    #[serde(default)]
62    sparse_vectors: Option<serde_json::Value>,
63}
64
65#[derive(Deserialize, Clone)]
66struct VectorConfig {
67    size: usize,
68    distance: Option<String>,
69    // Qdrant compatibility - ignored fields
70    #[serde(default)]
71    on_disk: Option<bool>,
72    #[serde(default)]
73    hnsw_config: Option<serde_json::Value>,
74    #[serde(default)]
75    quantization_config: Option<serde_json::Value>,
76    #[serde(default)]
77    multivector_config: Option<serde_json::Value>,
78    #[serde(default)]
79    datatype: Option<String>,
80}
81
82// Custom deserializer to handle both simple and named vector formats
83fn deserialize_vectors_optional<'de, D>(deserializer: D) -> Result<Option<VectorConfig>, D::Error>
84where
85    D: Deserializer<'de>,
86{
87    let value = Option::<serde_json::Value>::deserialize(deserializer)?;
88    
89    let Some(value) = value else {
90        return Ok(None);
91    };
92    
93    // Try simple format first: {"size": 1536, "distance": "Cosine"}
94    if let Ok(config) = serde_json::from_value::<VectorConfig>(value.clone()) {
95        return Ok(Some(config));
96    }
97    
98    // Try named vectors format: {"": {"size": 1536, ...}} or {"vector_name": {"size": 1536, ...}}
99    if let Ok(named) = serde_json::from_value::<HashMap<String, VectorConfig>>(value.clone()) {
100        // Use the first vector config (default or named)
101        if let Some(config) = named.into_values().next() {
102            return Ok(Some(config));
103        }
104    }
105    
106    Err(serde::de::Error::custom("Invalid vectors configuration: expected either {\"size\": N, \"distance\": \"...\"} or {\"name\": {\"size\": N, ...}}"))
107}
108
109// Note: Using serde_json::json! for flexible responses instead of these structs
110#[allow(dead_code)]
111#[derive(Serialize)]
112struct CollectionInfo {
113    name: String,
114    vectors: VectorConfigResponse,
115    points_count: usize,
116}
117
118#[allow(dead_code)]
119#[derive(Serialize)]
120struct VectorConfigResponse {
121    size: usize,
122    distance: String,
123}
124
125#[derive(Deserialize)]
126struct UpsertPointsRequest {
127    points: Vec<PointRequest>,
128}
129
130/// Parsed sparse vector data
131struct ParsedSparseVector {
132    /// Name of the sparse vector (e.g., "keywords")
133    name: String,
134    /// Indices of non-zero elements
135    indices: Vec<u32>,
136    /// Values at those indices
137    values: Vec<f32>,
138}
139
140/// Parsed vector data - can be single, multi, or sparse
141struct ParsedVector {
142    /// First/primary vector (for backwards compatibility)
143    primary: Vec<f32>,
144    /// Full multivector data if this was a multivector input
145    multivector: Option<Vec<Vec<f32>>>,
146    /// Sparse vectors (named)
147    sparse_vectors: Vec<ParsedSparseVector>,
148}
149
150#[derive(Deserialize)]
151struct PointRequest {
152    id: serde_json::Value,
153    /// Vector is optional when using similarity schema (auto-embedding mode)
154    #[serde(default, deserialize_with = "deserialize_vector_optional")]
155    vector: Option<ParsedVector>,
156    payload: Option<serde_json::Value>,
157}
158
159// Custom deserializer to handle multiple vector formats (Qdrant compatibility)
160// Simple: [0.1, 0.2, 0.3]
161// Multivector: [[0.1, 0.2], [0.3, 0.4]] -> stores full multivector for MaxSim search
162// Named vectors: {"vector_name": [0.1, 0.2]} -> extracts the vector
163fn deserialize_vector<'de, D>(deserializer: D) -> Result<ParsedVector, D::Error>
164where
165    D: Deserializer<'de>,
166{
167    let value = serde_json::Value::deserialize(deserializer)?;
168    
169    fn parse_simple_vector(arr: &[serde_json::Value]) -> Result<Vec<f32>, String> {
170        arr.iter()
171            .map(|v| v.as_f64().map(|f| f as f32).ok_or_else(|| "expected f32".to_string()))
172            .collect()
173    }
174    
175    fn parse_multivector(arr: &[serde_json::Value]) -> Result<Vec<Vec<f32>>, String> {
176        arr.iter()
177            .map(|sub| {
178                if let serde_json::Value::Array(sub_arr) = sub {
179                    parse_simple_vector(sub_arr)
180                } else {
181                    Err("expected array of arrays for multivector".to_string())
182                }
183            })
184            .collect()
185    }
186    
187    match &value {
188        // Array: could be simple vector or multivector
189        serde_json::Value::Array(arr) if !arr.is_empty() => {
190            match arr.first() {
191                // Simple vector: [0.1, 0.2, 0.3]
192                Some(serde_json::Value::Number(_)) => {
193                    let primary = parse_simple_vector(arr).map_err(serde::de::Error::custom)?;
194                    Ok(ParsedVector { primary, multivector: None, sparse_vectors: Vec::new() })
195                }
196                // Multivector: [[0.1, 0.2], [0.3, 0.4]] - store full multivector
197                Some(serde_json::Value::Array(_)) => {
198                    let multivec = parse_multivector(arr).map_err(serde::de::Error::custom)?;
199                    let primary = multivec.first().cloned().unwrap_or_default();
200                    Ok(ParsedVector { primary, multivector: Some(multivec), sparse_vectors: Vec::new() })
201                }
202                _ => Err(serde::de::Error::custom("invalid vector format: expected number or array"))
203            }
204        }
205        // Empty array
206        serde_json::Value::Array(_) => {
207            Err(serde::de::Error::custom("vector cannot be empty"))
208        }
209        // Named vector: {"vector_name": [0.1, 0.2]} or {"": [...]}
210        // Or sparse vector: {"keywords": {"indices": [...], "values": [...]}}
211        serde_json::Value::Object(obj) => {
212            let mut sparse_vectors = Vec::new();
213            let mut primary = vec![0.0];
214            let mut multivector = None;
215            
216            for (name, vec_value) in obj.iter() {
217                match vec_value {
218                    // Dense named vector: [0.1, 0.2, 0.3]
219                    serde_json::Value::Array(arr) if !arr.is_empty() => {
220                        match arr.first() {
221                            Some(serde_json::Value::Number(_)) => {
222                                primary = parse_simple_vector(arr).map_err(serde::de::Error::custom)?;
223                            }
224                            Some(serde_json::Value::Array(_)) => {
225                                // Named multivector
226                                let multivec = parse_multivector(arr).map_err(serde::de::Error::custom)?;
227                                primary = multivec.first().cloned().unwrap_or_default();
228                                multivector = Some(multivec);
229                            }
230                            _ => {}
231                        }
232                    }
233                    // Sparse vector: {"indices": [...], "values": [...]}
234                    serde_json::Value::Object(sparse_obj) => {
235                        if let (Some(indices_arr), Some(values_arr)) = (
236                            sparse_obj.get("indices").and_then(|i| i.as_array()),
237                            sparse_obj.get("values").and_then(|v| v.as_array())
238                        ) {
239                            let indices: Vec<u32> = indices_arr.iter()
240                                .filter_map(|i| i.as_u64().map(|n| n as u32))
241                                .collect();
242                            let values: Vec<f32> = values_arr.iter()
243                                .filter_map(|v| v.as_f64().map(|f| f as f32))
244                                .collect();
245                            
246                            if !indices.is_empty() && !values.is_empty() {
247                                sparse_vectors.push(ParsedSparseVector {
248                                    name: name.clone(),
249                                    indices,
250                                    values,
251                                });
252                            }
253                        }
254                    }
255                    // Empty array - ignore
256                    serde_json::Value::Array(_) => {}
257                    _ => {}
258                }
259            }
260            
261            Ok(ParsedVector { primary, multivector, sparse_vectors })
262        }
263        _ => Err(serde::de::Error::custom("vector must be an array or object")),
264    }
265}
266
267// Custom deserializer for optional vector (for similarity schema auto-embedding)
268fn deserialize_vector_optional<'de, D>(deserializer: D) -> Result<Option<ParsedVector>, D::Error>
269where
270    D: Deserializer<'de>,
271{
272    let value = Option::<serde_json::Value>::deserialize(deserializer)?;
273    
274    let Some(value) = value else {
275        return Ok(None);
276    };
277    
278    // Reuse the same parsing logic
279    fn parse_simple_vector(arr: &[serde_json::Value]) -> Result<Vec<f32>, String> {
280        arr.iter()
281            .map(|v| v.as_f64().map(|f| f as f32).ok_or_else(|| "expected f32".to_string()))
282            .collect()
283    }
284    
285    fn parse_multivector(arr: &[serde_json::Value]) -> Result<Vec<Vec<f32>>, String> {
286        arr.iter()
287            .map(|sub| {
288                if let serde_json::Value::Array(sub_arr) = sub {
289                    parse_simple_vector(sub_arr)
290                } else {
291                    Err("expected array of arrays for multivector".to_string())
292                }
293            })
294            .collect()
295    }
296    
297    match &value {
298        serde_json::Value::Array(arr) if !arr.is_empty() => {
299            match arr.first() {
300                Some(serde_json::Value::Number(_)) => {
301                    let primary = parse_simple_vector(arr).map_err(serde::de::Error::custom)?;
302                    Ok(Some(ParsedVector { primary, multivector: None, sparse_vectors: Vec::new() }))
303                }
304                Some(serde_json::Value::Array(_)) => {
305                    let multivec = parse_multivector(arr).map_err(serde::de::Error::custom)?;
306                    let primary = multivec.first().cloned().unwrap_or_default();
307                    Ok(Some(ParsedVector { primary, multivector: Some(multivec), sparse_vectors: Vec::new() }))
308                }
309                _ => Err(serde::de::Error::custom("invalid vector format"))
310            }
311        }
312        serde_json::Value::Array(_) => Ok(None), // Empty array treated as no vector
313        serde_json::Value::Object(obj) => {
314            let mut sparse_vectors = Vec::new();
315            let mut primary = vec![0.0];
316            let mut multivector = None;
317            
318            for (name, vec_value) in obj.iter() {
319                match vec_value {
320                    serde_json::Value::Array(arr) if !arr.is_empty() => {
321                        match arr.first() {
322                            Some(serde_json::Value::Number(_)) => {
323                                primary = parse_simple_vector(arr).map_err(serde::de::Error::custom)?;
324                            }
325                            Some(serde_json::Value::Array(_)) => {
326                                let multivec = parse_multivector(arr).map_err(serde::de::Error::custom)?;
327                                primary = multivec.first().cloned().unwrap_or_default();
328                                multivector = Some(multivec);
329                            }
330                            _ => {}
331                        }
332                    }
333                    serde_json::Value::Object(sparse_obj) => {
334                        if let (Some(indices_arr), Some(values_arr)) = (
335                            sparse_obj.get("indices").and_then(|i| i.as_array()),
336                            sparse_obj.get("values").and_then(|v| v.as_array())
337                        ) {
338                            let indices: Vec<u32> = indices_arr.iter()
339                                .filter_map(|i| i.as_u64().map(|n| n as u32))
340                                .collect();
341                            let values: Vec<f32> = values_arr.iter()
342                                .filter_map(|v| v.as_f64().map(|f| f as f32))
343                                .collect();
344                            
345                            if !indices.is_empty() && !values.is_empty() {
346                                sparse_vectors.push(ParsedSparseVector {
347                                    name: name.clone(),
348                                    indices,
349                                    values,
350                                });
351                            }
352                        }
353                    }
354                    _ => {}
355                }
356            }
357            
358            Ok(Some(ParsedVector { primary, multivector, sparse_vectors }))
359        }
360        serde_json::Value::Null => Ok(None),
361        _ => Err(serde::de::Error::custom("vector must be an array, object, or null")),
362    }
363}
364
365#[derive(Deserialize)]
366struct SearchRequest {
367    vector: Option<Vec<f32>>,
368    text: Option<String>,
369    #[serde(alias = "top")]
370    limit: Option<usize>,
371    filter: Option<serde_json::Value>,
372    #[serde(default)]
373    with_payload: Option<bool>,
374    #[serde(default)]
375    with_vector: Option<bool>,
376    #[serde(default)]
377    score_threshold: Option<f32>,
378    #[serde(default)]
379    offset: Option<usize>,
380}
381
382#[allow(dead_code)]
383#[derive(Serialize)]
384struct SearchResult {
385    id: serde_json::Value,
386    score: f32,
387    payload: Option<serde_json::Value>,
388}
389
390pub struct RestApi;
391
392impl RestApi {
393    pub async fn start(
394        storage: Arc<StorageManager>,
395        port: u16,
396    ) -> std::io::Result<()> {
397        Self::start_with_static_dir(storage, port, STATIC_DIR).await
398    }
399    
400    pub async fn start_with_static_dir(
401        storage: Arc<StorageManager>,
402        port: u16,
403        static_dir: &str,
404    ) -> std::io::Result<()> {
405        let static_folder = static_dir.to_string();
406        
407        HttpServer::new(move || {
408            let cors = Cors::default()
409                .allow_any_origin()
410                .allow_any_method()
411                .allow_any_header()
412                .max_age(3600);
413
414            let mut app = App::new()
415                .wrap(cors)
416                .app_data(web::Data::new(storage.clone()))
417                // Service endpoints (Qdrant-compatible)
418                .route("/", web::get().to(root_info))
419                .route("/healthz", web::get().to(health_check))
420                .route("/livez", web::get().to(livez_check))
421                .route("/readyz", web::get().to(readyz_check))
422                .route("/metrics", web::get().to(metrics_endpoint))
423                // Collection endpoints
424                .route("/collections", web::get().to(list_collections))
425                .route("/collections/{name}", web::get().to(get_collection))
426                .route("/collections/{name}", web::put().to(create_collection))
427                .route("/collections/{name}", web::delete().to(delete_collection))
428                .route("/collections/{name}/points", web::put().to(upsert_points))
429                .route("/collections/{name}/points/scroll", web::post().to(scroll_points))
430                .route("/collections/{name}/points/delete", web::post().to(delete_points_by_filter))
431                .route("/collections/{name}/points/search", web::post().to(search_points))
432                .route("/collections/{name}/points/query", web::post().to(query_points))
433                .route("/collections/{name}/points/{id}", web::get().to(get_point))
434                .route("/collections/{name}/points/{id}", web::delete().to(delete_point))
435                .route("/collections/{name}/exists", web::get().to(collection_exists))
436                // Qdrant compatibility - additional endpoints
437                .route("/aliases", web::get().to(list_aliases))
438                .route("/collections/aliases", web::post().to(update_aliases))
439                .route("/collections/{name}/aliases", web::get().to(list_collection_aliases))
440                .route("/cluster", web::get().to(cluster_info))
441                .route("/collections/{name}/cluster", web::get().to(collection_cluster_info))
442                .route("/telemetry", web::get().to(telemetry_info))
443                // Points batch operations
444                .route("/collections/{name}/points", web::post().to(get_points_by_ids))
445                .route("/collections/{name}/points/count", web::post().to(count_points))
446                .route("/collections/{name}/points/payload", web::post().to(set_payload))
447                .route("/collections/{name}/points/payload", web::put().to(overwrite_payload))
448                .route("/collections/{name}/points/payload/delete", web::post().to(delete_payload))
449                .route("/collections/{name}/points/payload/clear", web::post().to(clear_payload))
450                .route("/collections/{name}/points/vectors", web::put().to(update_vectors))
451                .route("/collections/{name}/points/vectors/delete", web::post().to(delete_vectors))
452                .route("/collections/{name}/points/batch", web::post().to(batch_update))
453                .route("/collections/{name}/points/search/batch", web::post().to(batch_search))
454                .route("/collections/{name}/points/search/groups", web::post().to(search_groups))
455                .route("/collections/{name}/points/query/batch", web::post().to(batch_query))
456                .route("/collections/{name}/points/query/groups", web::post().to(query_groups))
457                .route("/collections/{name}/points/discover", web::post().to(discover_points))
458                .route("/collections/{name}/points/discover/batch", web::post().to(discover_batch))
459                .route("/collections/{name}/facet", web::post().to(facet_counts))
460                // Index endpoints
461                .route("/collections/{name}/index", web::put().to(create_field_index))
462                .route("/collections/{name}/index/{field_name}", web::delete().to(delete_field_index))
463                // Recommend endpoint
464                .route("/collections/{name}/points/recommend", web::post().to(recommend_points))
465                // Snapshot endpoints (stubs for UI compatibility)
466                .route("/collections/{name}/snapshots", web::get().to(list_snapshots))
467                .route("/collections/{name}/snapshots", web::post().to(create_snapshot))
468                .route("/collections/{name}/snapshots/upload", web::post().to(upload_snapshot))
469                .route("/collections/{name}/snapshots/recover", web::put().to(recover_snapshot))
470                .route("/collections/{name}/snapshots/{snapshot_name}", web::get().to(get_snapshot))
471                .route("/collections/{name}/snapshots/{snapshot_name}", web::delete().to(delete_snapshot))
472                // Full storage snapshots
473                .route("/snapshots", web::get().to(list_all_snapshots))
474                .route("/snapshots", web::post().to(create_full_snapshot))
475                .route("/snapshots/{snapshot_name}", web::get().to(get_full_snapshot))
476                .route("/snapshots/{snapshot_name}", web::delete().to(delete_full_snapshot))
477                // Collection update endpoint
478                .route("/collections/{name}", web::patch().to(update_collection))
479                // Issues endpoints
480                .route("/issues", web::get().to(get_issues))
481                .route("/issues", web::delete().to(clear_issues));
482            
483            // Serve web UI dashboard if static folder exists
484            let static_path = Path::new(&static_folder);
485            if static_path.exists() && static_path.is_dir() {
486                app = app.service(
487                    Files::new(DASHBOARD_PATH, static_folder.clone())
488                        .index_file("index.html")
489                        .use_last_modified(true)
490                );
491            }
492            
493            app
494        })
495        .bind(("0.0.0.0", port))?
496        .run()
497        .await
498    }
499}
500
501async fn root_info() -> ActixResult<HttpResponse> {
502    Ok(HttpResponse::Ok().json(serde_json::json!({
503        "title": "vectx - vector search engine",
504        "version": "0.2.1",
505        "commit": ""
506    })))
507}
508
509async fn health_check() -> ActixResult<HttpResponse> {
510    Ok(HttpResponse::Ok().json(serde_json::json!({
511        "title": "vectx",
512        "version": "0.2.1"
513    })))
514}
515
516/// Kubernetes liveness probe
517async fn livez_check() -> ActixResult<HttpResponse> {
518    Ok(HttpResponse::Ok()
519        .content_type("text/plain")
520        .body("healthz check passed"))
521}
522
523/// Kubernetes readiness probe  
524async fn readyz_check() -> ActixResult<HttpResponse> {
525    Ok(HttpResponse::Ok()
526        .content_type("text/plain")
527        .body("healthz check passed"))
528}
529
530/// Prometheus metrics endpoint
531async fn metrics_endpoint(
532    storage: web::Data<Arc<StorageManager>>,
533) -> ActixResult<HttpResponse> {
534    let collections = storage.list_collections();
535    let collections_count = collections.len();
536    
537    // Count total points across all collections
538    let mut total_points = 0u64;
539    for name in &collections {
540        if let Some(collection) = storage.get_collection(name) {
541            total_points += collection.count() as u64;
542        }
543    }
544    
545    let metrics = format!(
546        "# HELP app_info information about vectx server\n\
547         # TYPE app_info gauge\n\
548         app_info{{name=\"vectx\",version=\"{}\"}} 1\n\
549         # HELP cluster_enabled is cluster support enabled\n\
550         # TYPE cluster_enabled gauge\n\
551         cluster_enabled 0\n\
552         # HELP collections_total number of collections\n\
553         # TYPE collections_total gauge\n\
554         collections_total {}\n\
555         # HELP points_total total number of points across all collections\n\
556         # TYPE points_total gauge\n\
557         points_total {}\n",
558        env!("CARGO_PKG_VERSION"),
559        collections_count,
560        total_points
561    );
562    
563    Ok(HttpResponse::Ok()
564        .content_type("text/plain")
565        .body(metrics))
566}
567
568async fn list_collections(
569    storage: web::Data<Arc<StorageManager>>,
570) -> ActixResult<HttpResponse> {
571    let start_time = Instant::now();
572    let collection_names = storage.list_collections();
573    
574    // Format to match Qdrant's response structure (only name, no config)
575    let collections: Vec<serde_json::Value> = collection_names.into_iter()
576        .map(|name| serde_json::json!({ "name": name }))
577        .collect();
578    
579    Ok(qdrant_response(serde_json::json!({
580        "collections": collections
581    }), start_time))
582}
583
584async fn get_collection(
585    storage: web::Data<Arc<StorageManager>>,
586    path: web::Path<String>,
587) -> ActixResult<HttpResponse> {
588    let start_time = Instant::now();
589    let name = path.into_inner();
590    
591    if let Some(collection) = storage.get_collection(&name) {
592        let distance_str = format!("{:?}", collection.distance());
593        let vector_dim = collection.vector_dim();
594        let points_count = collection.count();
595        
596        // Format to match Qdrant's full response structure
597        Ok(qdrant_response(serde_json::json!({
598            "status": "green",
599            "optimizer_status": "ok",
600            "vectors_count": points_count,
601            "indexed_vectors_count": points_count,
602            "points_count": points_count,
603            "segments_count": 1,
604            "config": {
605                "params": {
606                    "vectors": {
607                        "size": vector_dim,
608                        "distance": distance_str
609                    },
610                    "shard_number": 1,
611                    "replication_factor": 1,
612                    "write_consistency_factor": 1,
613                    "on_disk_payload": true
614                },
615                "hnsw_config": {
616                    "m": 16,
617                    "ef_construct": 100,
618                    "full_scan_threshold": 10000,
619                    "max_indexing_threads": 0,
620                    "on_disk": false
621                },
622                "optimizer_config": {
623                    "deleted_threshold": 0.2,
624                    "vacuum_min_vector_number": 1000,
625                    "default_segment_number": 0,
626                    "indexing_threshold": 10000,
627                    "flush_interval_sec": 5,
628                    "max_segment_size": null,
629                    "memmap_threshold": null,
630                    "max_optimization_threads": null
631                },
632                "wal_config": {
633                    "wal_capacity_mb": 32,
634                    "wal_segments_ahead": 0,
635                    "wal_retain_closed": 1
636                },
637                "quantization_config": null
638            },
639            "payload_schema": {}
640        }), start_time))
641    } else {
642        Ok(qdrant_not_found("Collection not found", start_time))
643    }
644}
645
646async fn create_collection(
647    storage: web::Data<Arc<StorageManager>>,
648    path: web::Path<String>,
649    req: web::Json<CreateCollectionRequest>,
650) -> ActixResult<HttpResponse> {
651    let start_time = Instant::now();
652    let name = path.into_inner();
653    
654    // Handle sparse-only collections (Qdrant compatibility)
655    // For sparse-only collections, we create with a default vector dimension
656    let (vector_dim, distance) = if let Some(ref vectors) = req.vectors {
657        let dist = match vectors.distance.as_deref() {
658            Some("Cosine") | Some("cosine") => Distance::Cosine,
659            Some("Euclidean") | Some("euclidean") => Distance::Euclidean,
660            Some("Dot") | Some("dot") => Distance::Dot,
661            _ => Distance::Cosine,
662        };
663        (vectors.size, dist)
664    } else if req.sparse_vectors.is_some() {
665        // Sparse-only collection - use BM25 with default text dimension
666        (0, Distance::Cosine)
667    } else {
668        return Ok(qdrant_error("'vectors' configuration is required. Clients must provide embedding vectors.", start_time));
669    };
670
671    let config = CollectionConfig {
672        name: name.clone(),
673        vector_dim,
674        distance,
675        use_hnsw: req.use_hnsw,
676        // Enable BM25 for sparse collections
677        enable_bm25: req.enable_bm25 || req.sparse_vectors.is_some(),
678    };
679
680    match storage.create_collection(config) {
681        Ok(_) => Ok(qdrant_response(true, start_time)),
682        Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
683    }
684}
685
686async fn delete_collection(
687    storage: web::Data<Arc<StorageManager>>,
688    path: web::Path<String>,
689) -> ActixResult<HttpResponse> {
690    let start_time = Instant::now();
691    let name = path.into_inner();
692    
693    match storage.delete_collection(&name) {
694        Ok(true) => Ok(qdrant_response(true, start_time)),
695        Ok(false) => Ok(qdrant_not_found("Collection not found", start_time)),
696        Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
697    }
698}
699
700async fn upsert_points(
701    storage: web::Data<Arc<StorageManager>>,
702    path: web::Path<String>,
703    req: web::Json<UpsertPointsRequest>,
704) -> ActixResult<HttpResponse> {
705    let start_time = Instant::now();
706    let name = path.into_inner();
707    
708    let collection = match storage.get_collection(&name) {
709        Some(c) => c,
710        None => {
711            return Ok(qdrant_not_found("Collection not found", start_time));
712        }
713    };
714    
715    let points: Result<Vec<Point>, &str> = req.points.iter().map(|point_req| {
716        let id = match &point_req.id {
717            serde_json::Value::String(s) => PointId::String(s.clone()),
718            serde_json::Value::Number(n) => {
719                if let Some(u) = n.as_u64() {
720                    PointId::Integer(u)
721                } else {
722                    return Err("Invalid point ID");
723                }
724            }
725            _ => return Err("Invalid point ID"),
726        };
727
728        // Vector is required - clients must provide embeddings
729        let mut point = match &point_req.vector {
730            Some(parsed_vector) => {
731                if let Some(ref multivec_data) = parsed_vector.multivector {
732                    // Create MultiVector and Point with multivector
733                    match MultiVector::new(multivec_data.clone()) {
734                        Ok(mv) => Point::new_multi(id, mv, point_req.payload.clone()),
735                        Err(_) => {
736                            let vector = Vector::new(parsed_vector.primary.clone());
737                            Point::new(id, vector, point_req.payload.clone())
738                        }
739                    }
740                } else {
741                    // Simple dense vector
742                    let vector = Vector::new(parsed_vector.primary.clone());
743                    Point::new(id, vector, point_req.payload.clone())
744                }
745            }
746            None => {
747                // No vector provided - use empty vector (for payload-only points)
748                let vector = Vector::new(vec![]);
749                Point::new(id, vector, point_req.payload.clone())
750            }
751        };
752        
753        // Add sparse vectors if present
754        if let Some(ref parsed_vector) = point_req.vector {
755            for sparse in &parsed_vector.sparse_vectors {
756                let sparse_vec = vectx_core::SparseVector::new(
757                    sparse.indices.clone(),
758                    sparse.values.clone()
759                );
760                point.add_sparse_vector(sparse.name.clone(), sparse_vec);
761            }
762        }
763        
764        Ok(point)
765    }).collect();
766
767    match points {
768        Ok(points_vec) => {
769            if points_vec.len() > 1 {
770                const PREWARM_THRESHOLD: usize = 1000;
771                let should_prewarm = points_vec.len() >= PREWARM_THRESHOLD;
772                
773                let result = if should_prewarm {
774                    collection.batch_upsert_with_prewarm(points_vec, true)
775                } else {
776                    collection.batch_upsert(points_vec)
777                };
778                
779                if let Err(e) = result {
780                    return Ok(qdrant_error(&e.to_string(), start_time));
781                }
782            } else if let Some(point) = points_vec.first() {
783                if let Err(e) = collection.upsert(point.clone()) {
784                    return Ok(qdrant_error(&e.to_string(), start_time));
785                }
786            }
787        }
788        Err(e) => {
789            return Ok(qdrant_error(e, start_time));
790        }
791    }
792
793    let operation_id = collection.next_operation_id();
794    Ok(qdrant_response(serde_json::json!({
795        "operation_id": operation_id,
796        "status": "acknowledged"
797    }), start_time))
798}
799
800async fn search_points(
801    storage: web::Data<Arc<StorageManager>>,
802    path: web::Path<String>,
803    req: web::Json<SearchRequest>,
804) -> ActixResult<HttpResponse> {
805    let start_time = Instant::now();
806    let name = path.into_inner();
807    
808    let collection = match storage.get_collection(&name) {
809        Some(c) => c,
810        None => {
811            return Ok(qdrant_not_found("Collection not found", start_time));
812        }
813    };
814
815    let limit = req.limit.unwrap_or(10);
816    let with_payload = req.with_payload.unwrap_or(true);
817    let with_vector = req.with_vector.unwrap_or(false);
818    let score_threshold = req.score_threshold;
819    let offset = req.offset.unwrap_or(0);
820
821    if let Some(text) = &req.text {
822        let results = collection.search_text(text, limit + offset);
823        let search_results: Vec<serde_json::Value> = results
824            .into_iter()
825            .skip(offset)
826            .filter(|(_, score)| score_threshold.map(|t| *score >= t).unwrap_or(true))
827            .filter_map(|(doc_id, score)| {
828                collection.get(&doc_id).map(|point| {
829                    let mut result = serde_json::json!({
830                        "id": point_id_to_json(&point.id),
831                        "version": point.version,
832                        "score": score,
833                    });
834                    if with_payload {
835                        result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
836                    }
837                    if with_vector {
838                        result["vector"] = serde_json::json!(point.vector.as_slice());
839                    }
840                    result
841                })
842            })
843            .collect();
844
845        return Ok(qdrant_response(search_results, start_time));
846    }
847
848    if let Some(vector_data) = &req.vector {
849        let query_vector = Vector::new(vector_data.clone());
850        
851        let filter: Option<Box<dyn Filter>> = req.filter.as_ref().and_then(|f| {
852            parse_filter(f).map(|cond| Box::new(PayloadFilter::new(cond)) as Box<dyn Filter>)
853        });
854
855        let results = if let Some(f) = filter.as_deref() {
856            collection.search(&query_vector, limit + offset, Some(f))
857        } else {
858            collection.search(&query_vector, limit + offset, None)
859        };
860
861        let search_results: Vec<serde_json::Value> = results
862            .into_iter()
863            .skip(offset)
864            .filter(|(_, score)| score_threshold.map(|t| *score >= t).unwrap_or(true))
865            .map(|(point, score)| {
866                let mut result = serde_json::json!({
867                    "id": point_id_to_json(&point.id),
868                    "version": point.version,
869                    "score": score,
870                });
871                if with_payload {
872                    result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
873                }
874                if with_vector {
875                    result["vector"] = serde_json::json!(point.vector.as_slice());
876                }
877                result
878            })
879            .collect();
880
881        return Ok(qdrant_response(search_results, start_time));
882    }
883
884    Ok(qdrant_error("Either 'vector' or 'text' must be provided", start_time))
885}
886
887/// Convert PointId to JSON value
888fn point_id_to_json(id: &vectx_core::PointId) -> serde_json::Value {
889    match id {
890        vectx_core::PointId::String(s) => serde_json::Value::String(s.clone()),
891        vectx_core::PointId::Integer(i) => serde_json::Value::Number((*i).into()),
892        vectx_core::PointId::Uuid(u) => serde_json::Value::String(u.to_string()),
893    }
894}
895
896/// Prefetch query for hybrid search
897#[derive(Deserialize, Clone)]
898struct PrefetchQuery {
899    /// Query vector or sparse vector
900    query: serde_json::Value,
901    #[serde(default)]
902    using: Option<String>,
903    #[serde(default)]
904    limit: Option<usize>,
905    #[serde(default)]
906    filter: Option<serde_json::Value>,
907}
908
909/// Query request for Qdrant's universal query API
910/// Supports both single vectors and multivectors (ColBERT-style MaxSim)
911#[derive(Deserialize)]
912struct QueryRequest {
913    /// Query vector - can be single [f32], multi [[f32]], or fusion object {"fusion": "rrf"}
914    query: serde_json::Value,
915    #[serde(default)]
916    limit: Option<usize>,
917    #[serde(default)]
918    with_payload: Option<bool>,
919    #[serde(default)]
920    with_vector: Option<bool>,
921    #[serde(default)]
922    filter: Option<serde_json::Value>,
923    /// Prefetch queries for hybrid search
924    #[serde(default)]
925    prefetch: Option<Vec<PrefetchQuery>>,
926    /// Which named vector to use
927    #[serde(default)]
928    using: Option<String>,
929}
930
931/// Query points using Qdrant's universal query API
932/// Supports multivector queries with MaxSim scoring, prefetch, and fusion
933async fn query_points(
934    storage: web::Data<Arc<StorageManager>>,
935    path: web::Path<String>,
936    req: web::Json<QueryRequest>,
937) -> ActixResult<HttpResponse> {
938    let start_time = Instant::now();
939    let name = path.into_inner();
940    
941    let collection = match storage.get_collection(&name) {
942        Some(c) => c,
943        None => {
944            return Ok(qdrant_not_found("Collection not found", start_time));
945        }
946    };
947
948    let limit = req.limit.unwrap_or(10);
949    let with_payload = req.with_payload.unwrap_or(true);
950    let with_vector = req.with_vector.unwrap_or(false);
951    
952    // Check if this is a fusion query with prefetch
953    let is_fusion = req.query.as_object()
954        .and_then(|o| o.get("fusion"))
955        .is_some();
956    
957    let results = if is_fusion && req.prefetch.is_some() {
958        // Handle hybrid search with prefetch and fusion
959        match execute_fusion_query(&collection, &req, limit) {
960            Ok(r) => r,
961            Err(e) => return Ok(qdrant_error(&e, start_time)),
962        }
963    } else {
964        // Parse filter if provided
965        let filter: Option<Box<dyn Filter>> = req.filter.as_ref().and_then(|f| {
966            parse_filter(f).map(|cond| Box::new(PayloadFilter::new(cond)) as Box<dyn Filter>)
967        });
968        
969        // Get the "using" parameter for named/sparse vector queries
970        let using = req.using.as_deref();
971        
972        // Determine query type: point ID, single vector, sparse, or multivector
973        match execute_simple_query(&collection, &req.query, limit, filter.as_deref(), using) {
974            Ok(r) => r,
975            Err(e) => return Ok(qdrant_error(&e, start_time)),
976        }
977    };
978    
979    // Format results
980    let search_results: Vec<serde_json::Value> = results
981        .into_iter()
982        .map(|(point, score)| {
983            let mut result = serde_json::json!({
984                "id": point_id_to_json(&point.id),
985                "version": point.version,
986                "score": score,
987            });
988            
989            if with_payload {
990                result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
991            }
992            
993            if with_vector {
994                result["vector"] = serde_json::json!(point.vector.as_slice());
995                if let Some(mv) = &point.multivector {
996                    result["multivector"] = serde_json::json!(mv.vectors());
997                }
998            }
999            
1000            result
1001        })
1002        .collect();
1003
1004    Ok(qdrant_response(serde_json::json!({
1005        "points": search_results
1006    }), start_time))
1007}
1008
1009/// Execute a fusion query with prefetch (RRF - Reciprocal Rank Fusion)
1010fn execute_fusion_query(
1011    collection: &Arc<Collection>,
1012    req: &QueryRequest,
1013    limit: usize,
1014) -> Result<Vec<(Point, f32)>, String> {
1015    use std::collections::HashMap;
1016    
1017    let prefetch = req.prefetch.as_ref().ok_or("Fusion requires prefetch")?;
1018    
1019    // Execute each prefetch query and collect ranked results
1020    let mut all_results: Vec<Vec<(Point, f32)>> = Vec::new();
1021    
1022    for pf in prefetch {
1023        let pf_limit = pf.limit.unwrap_or(20);
1024        let filter: Option<Box<dyn Filter>> = pf.filter.as_ref().and_then(|f| {
1025            parse_filter(f).map(|cond| Box::new(PayloadFilter::new(cond)) as Box<dyn Filter>)
1026        });
1027        
1028        // Parse the prefetch query, using the "using" parameter for named/sparse vectors
1029        let using = pf.using.as_deref();
1030        let pf_results = parse_and_search(collection, &pf.query, pf_limit, filter.as_deref(), using)?;
1031        all_results.push(pf_results);
1032    }
1033    
1034    // Apply RRF (Reciprocal Rank Fusion)
1035    // RRF score = sum(1 / (k + rank_i)) for each result set
1036    // Qdrant uses k=1 for consistent scoring
1037    const K: f32 = 1.0;
1038    
1039    let mut rrf_scores: HashMap<String, (Point, f32)> = HashMap::new();
1040    
1041    for result_set in &all_results {
1042        for (rank, (point, _original_score)) in result_set.iter().enumerate() {
1043            let rrf_contribution = 1.0 / (K + rank as f32 + 1.0);
1044            let point_id = point.id.to_string();
1045            
1046            rrf_scores
1047                .entry(point_id)
1048                .and_modify(|(_, score)| *score += rrf_contribution)
1049                .or_insert_with(|| (point.clone(), rrf_contribution));
1050        }
1051    }
1052    
1053    // Sort by RRF score descending
1054    let mut fused: Vec<(Point, f32)> = rrf_scores.into_values().collect();
1055    fused.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1056    fused.truncate(limit);
1057    
1058    Ok(fused)
1059}
1060
1061/// Parse a query value and execute search
1062fn parse_and_search(
1063    collection: &Arc<Collection>,
1064    query: &serde_json::Value,
1065    limit: usize,
1066    filter: Option<&dyn Filter>,
1067    using: Option<&str>,
1068) -> Result<Vec<(Point, f32)>, String> {
1069    match query {
1070        // Sparse vector format: {"indices": [...], "values": [...]}
1071        serde_json::Value::Object(obj) if obj.contains_key("indices") && obj.contains_key("values") => {
1072            let indices = obj.get("indices")
1073                .and_then(|i| i.as_array())
1074                .ok_or("Invalid sparse vector: missing indices")?;
1075            let values = obj.get("values")
1076                .and_then(|v| v.as_array())
1077                .ok_or("Invalid sparse vector: missing values")?;
1078            
1079            let indices_vec: Vec<u32> = indices.iter()
1080                .filter_map(|i| i.as_u64().map(|n| n as u32))
1081                .collect();
1082            let values_vec: Vec<f32> = values.iter()
1083                .filter_map(|v| v.as_f64().map(|f| f as f32))
1084                .collect();
1085            
1086            if indices_vec.is_empty() || values_vec.is_empty() {
1087                return Ok(Vec::new());
1088            }
1089            
1090            // Use the "using" parameter as the sparse vector name, default to "default"
1091            let vector_name = using.unwrap_or("default");
1092            let query_sparse = vectx_core::SparseVector::new(indices_vec, values_vec);
1093            
1094            // Perform sparse dot product search
1095            Ok(collection.search_sparse(&query_sparse, vector_name, limit, filter))
1096        }
1097        // Array: single vector or multivector
1098        serde_json::Value::Array(arr) if !arr.is_empty() => {
1099            match arr.first() {
1100                // Multivector: [[0.1, 0.2], [0.3, 0.4]]
1101                Some(serde_json::Value::Array(_)) => {
1102                    let multivec_data: Result<Vec<Vec<f32>>, _> = arr.iter()
1103                        .map(|sub| {
1104                            if let serde_json::Value::Array(sub_arr) = sub {
1105                                sub_arr.iter()
1106                                    .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
1107                                    .collect::<Result<Vec<f32>, _>>()
1108                            } else {
1109                                Err("expected array")
1110                            }
1111                        })
1112                        .collect();
1113                    
1114                    let data = multivec_data.map_err(|e| format!("Invalid multivector: {}", e))?;
1115                    let query_mv = MultiVector::new(data).map_err(|e| format!("Invalid multivector: {}", e))?;
1116                    Ok(collection.search_multivector(&query_mv, limit, filter))
1117                }
1118                // Single vector: [0.1, 0.2, 0.3]
1119                Some(serde_json::Value::Number(_)) => {
1120                    let vector_data: Result<Vec<f32>, _> = arr.iter()
1121                        .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
1122                        .collect();
1123                    
1124                    let data = vector_data.map_err(|e| format!("Invalid vector: {}", e))?;
1125                    let query_vector = Vector::new(data);
1126                    Ok(collection.search(&query_vector, limit, filter))
1127                }
1128                _ => Err("Invalid query format".to_string())
1129            }
1130        }
1131        _ => Err("Invalid query format".to_string())
1132    }
1133}
1134
1135/// Execute a simple (non-fusion) query
1136fn execute_simple_query(
1137    collection: &Arc<Collection>,
1138    query: &serde_json::Value,
1139    limit: usize,
1140    filter: Option<&dyn Filter>,
1141    using: Option<&str>,
1142) -> Result<Vec<(Point, f32)>, String> {
1143    match query {
1144        // Query by point ID (nearest to existing point)
1145        serde_json::Value::Number(n) => {
1146            let point_id_str = if let Some(id) = n.as_u64() {
1147                id.to_string()
1148            } else if let Some(id) = n.as_i64() {
1149                id.to_string()
1150            } else {
1151                return Err("Invalid point ID format".to_string());
1152            };
1153            
1154            // Get the point by ID and use its vector for search
1155            if let Some(source_point) = collection.get(&point_id_str) {
1156                let query_vector = source_point.vector.clone();
1157                let mut search_results = collection.search(&query_vector, limit + 1, filter);
1158                // Remove the source point from results
1159                search_results.retain(|(p, _)| p.id.to_string() != point_id_str);
1160                search_results.truncate(limit);
1161                Ok(search_results)
1162            } else {
1163                Err(format!("Point with ID '{}' not found", point_id_str))
1164            }
1165        }
1166        // Query by string point ID
1167        serde_json::Value::String(s) => {
1168            if let Some(source_point) = collection.get(s) {
1169                let query_vector = source_point.vector.clone();
1170                let mut search_results = collection.search(&query_vector, limit + 1, filter);
1171                // Remove the source point from results
1172                search_results.retain(|(p, _)| p.id.to_string() != *s);
1173                search_results.truncate(limit);
1174                Ok(search_results)
1175            } else {
1176                Err(format!("Point with ID '{}' not found", s))
1177            }
1178        }
1179        // Arrays and sparse vectors
1180        _ => parse_and_search(collection, query, limit, filter, using)
1181    }
1182}
1183
1184/// Parse Qdrant-style filter format (must/should/must_not)
1185fn parse_filter(filter_json: &serde_json::Value) -> Option<FilterCondition> {
1186    if let Some(obj) = filter_json.as_object() {
1187        let mut all_conditions: Vec<FilterCondition> = Vec::new();
1188        
1189        // Qdrant-style filter with must (AND conditions)
1190        if let Some(must) = obj.get("must") {
1191            if let Some(arr) = must.as_array() {
1192                let must_conditions: Vec<FilterCondition> = arr.iter()
1193                    .filter_map(parse_field_condition)
1194                    .collect();
1195                if !must_conditions.is_empty() {
1196                    // Multiple must conditions are AND'd together
1197                    if must_conditions.len() == 1 {
1198                        all_conditions.push(must_conditions.into_iter().next().unwrap());
1199                    } else {
1200                        all_conditions.push(FilterCondition::And(must_conditions));
1201                    }
1202                }
1203            }
1204        }
1205        
1206        // Qdrant-style filter with should (OR conditions)
1207        if let Some(should) = obj.get("should") {
1208            if let Some(arr) = should.as_array() {
1209                let should_conditions: Vec<FilterCondition> = arr.iter()
1210                    .filter_map(parse_field_condition)
1211                    .collect();
1212                if !should_conditions.is_empty() {
1213                    // Multiple should conditions are OR'd together
1214                    if should_conditions.len() == 1 {
1215                        all_conditions.push(should_conditions.into_iter().next().unwrap());
1216                    } else {
1217                        all_conditions.push(FilterCondition::Or(should_conditions));
1218                    }
1219                }
1220            }
1221        }
1222        
1223        // Qdrant-style filter with must_not (negated conditions)
1224        if let Some(must_not) = obj.get("must_not") {
1225            if let Some(arr) = must_not.as_array() {
1226                for cond in arr {
1227                    if let Some(fc) = parse_field_condition(cond) {
1228                        all_conditions.push(FilterCondition::Not(Box::new(fc)));
1229                    }
1230                }
1231            }
1232        }
1233        
1234        // If we collected any conditions, combine them
1235        if !all_conditions.is_empty() {
1236            return if all_conditions.len() == 1 {
1237                Some(all_conditions.into_iter().next().unwrap())
1238            } else {
1239                Some(FilterCondition::And(all_conditions))
1240            };
1241        }
1242        
1243        // Legacy simple format: { field, value, operator }
1244        if let Some(field) = obj.get("field").and_then(|v| v.as_str()) {
1245            if let Some(value) = obj.get("value") {
1246                if let Some(op) = obj.get("operator").and_then(|v| v.as_str()) {
1247                    match op {
1248                        "eq" => return Some(FilterCondition::Equals { field: field.to_string(), value: value.clone() }),
1249                        "ne" => return Some(FilterCondition::NotEquals { field: field.to_string(), value: value.clone() }),
1250                        "gt" => return value.as_f64().map(|v| FilterCondition::GreaterThan { field: field.to_string(), value: v }),
1251                        "lt" => return value.as_f64().map(|v| FilterCondition::LessThan { field: field.to_string(), value: v }),
1252                        "gte" => return value.as_f64().map(|v| FilterCondition::GreaterEqual { field: field.to_string(), value: v }),
1253                        "lte" => return value.as_f64().map(|v| FilterCondition::LessEqual { field: field.to_string(), value: v }),
1254                        _ => {}
1255                    }
1256                }
1257            }
1258        }
1259        
1260        // Direct field condition (Qdrant format): { "key": "field", "match": { "value": x } }
1261        if let Some(fc) = parse_field_condition(filter_json) {
1262            return Some(fc);
1263        }
1264    }
1265    None
1266}
1267
1268/// Parse a single Qdrant field condition: { "key": "field", "match": { "value": x } }
1269fn parse_field_condition(cond: &serde_json::Value) -> Option<FilterCondition> {
1270    let obj = cond.as_object()?;
1271    let key = obj.get("key")?.as_str()?;
1272    
1273    // Match condition: { "match": { "value": x } }
1274    if let Some(match_obj) = obj.get("match").and_then(|m| m.as_object()) {
1275        if let Some(value) = match_obj.get("value") {
1276            return Some(FilterCondition::Equals { 
1277                field: key.to_string(), 
1278                value: value.clone() 
1279            });
1280        }
1281        // Match any: { "match": { "any": [x, y, z] } }
1282        if let Some(any_arr) = match_obj.get("any").and_then(|a| a.as_array()) {
1283            if let Some(first) = any_arr.first() {
1284                return Some(FilterCondition::Equals { 
1285                    field: key.to_string(), 
1286                    value: first.clone() 
1287                });
1288            }
1289        }
1290        // Match text: { "match": { "text": "value" } }
1291        if let Some(text) = match_obj.get("text") {
1292            return Some(FilterCondition::Equals { 
1293                field: key.to_string(), 
1294                value: text.clone() 
1295            });
1296        }
1297    }
1298    
1299    // Range condition: { "range": { "gt": x, "lt": y } }
1300    if let Some(range_obj) = obj.get("range").and_then(|r| r.as_object()) {
1301        if let Some(gt) = range_obj.get("gt").and_then(|v| v.as_f64()) {
1302            return Some(FilterCondition::GreaterThan { field: key.to_string(), value: gt });
1303        }
1304        if let Some(gte) = range_obj.get("gte").and_then(|v| v.as_f64()) {
1305            return Some(FilterCondition::GreaterEqual { field: key.to_string(), value: gte });
1306        }
1307        if let Some(lt) = range_obj.get("lt").and_then(|v| v.as_f64()) {
1308            return Some(FilterCondition::LessThan { field: key.to_string(), value: lt });
1309        }
1310        if let Some(lte) = range_obj.get("lte").and_then(|v| v.as_f64()) {
1311            return Some(FilterCondition::LessEqual { field: key.to_string(), value: lte });
1312        }
1313    }
1314    
1315    None
1316}
1317
1318/// Check if a point matches a Qdrant-style filter
1319fn matches_filter(point: &Point, filter: &serde_json::Value) -> bool {
1320    let obj = match filter.as_object() {
1321        Some(o) => o,
1322        None => return true, // No valid filter, match all
1323    };
1324    
1325    // Handle "must" conditions (AND logic)
1326    if let Some(must) = obj.get("must").and_then(|m| m.as_array()) {
1327        for cond in must {
1328            if !matches_condition(point, cond) {
1329                return false; // All must conditions must match
1330            }
1331        }
1332    }
1333    
1334    // Handle "should" conditions (OR logic)
1335    if let Some(should) = obj.get("should").and_then(|s| s.as_array()) {
1336        if !should.is_empty() {
1337            let any_match = should.iter().any(|cond| matches_condition(point, cond));
1338            if !any_match {
1339                return false; // At least one should condition must match
1340            }
1341        }
1342    }
1343    
1344    // Handle "must_not" conditions (NOT logic)
1345    if let Some(must_not) = obj.get("must_not").and_then(|m| m.as_array()) {
1346        for cond in must_not {
1347            if matches_condition(point, cond) {
1348                return false; // No must_not condition should match
1349            }
1350        }
1351    }
1352    
1353    true
1354}
1355
1356/// Check if a point matches a single condition
1357fn matches_condition(point: &Point, cond: &serde_json::Value) -> bool {
1358    let obj = match cond.as_object() {
1359        Some(o) => o,
1360        None => return false,
1361    };
1362    
1363    // Handle "has_id" condition - filter by point IDs
1364    if let Some(ids) = obj.get("has_id").and_then(|h| h.as_array()) {
1365        let point_id_str = point.id.to_string();
1366        return ids.iter().any(|id| {
1367            match id {
1368                serde_json::Value::Number(n) => n.to_string() == point_id_str,
1369                serde_json::Value::String(s) => s == &point_id_str,
1370                _ => false,
1371            }
1372        });
1373    }
1374    
1375    // Handle "nested" filter - match conditions within same array element
1376    if let Some(nested_obj) = obj.get("nested").and_then(|n| n.as_object()) {
1377        let nested_key = match nested_obj.get("key").and_then(|k| k.as_str()) {
1378            Some(k) => k,
1379            None => return false,
1380        };
1381        let nested_filter = match nested_obj.get("filter") {
1382            Some(f) => f,
1383            None => return false,
1384        };
1385        
1386        // Get the array field from payload
1387        let array_value = match &point.payload {
1388            Some(payload) => payload.get(nested_key).and_then(|v| v.as_array()),
1389            None => None,
1390        };
1391        
1392        // Check if any array element matches ALL conditions in the nested filter
1393        if let Some(arr) = array_value {
1394            return arr.iter().any(|element| {
1395                matches_nested_element(element, nested_filter)
1396            });
1397        }
1398        return false;
1399    }
1400    
1401    // Get the field key
1402    let key = match obj.get("key").and_then(|k| k.as_str()) {
1403        Some(k) => k,
1404        None => {
1405            // Handle nested filter (recursive)
1406            if obj.contains_key("must") || obj.contains_key("should") || obj.contains_key("must_not") {
1407                return matches_filter(point, cond);
1408            }
1409            return false;
1410        }
1411    };
1412    
1413    // Get the payload value for this key (supports nested paths like "diet[].food")
1414    let payload_value = get_nested_value(&point.payload, key);
1415    
1416    // Handle "match" condition
1417    if let Some(match_obj) = obj.get("match").and_then(|m| m.as_object()) {
1418        // Match exact value
1419        if let Some(expected) = match_obj.get("value") {
1420            return match &payload_value {
1421                Some(actual) => values_equal(actual, expected),
1422                None => false,
1423            };
1424        }
1425        
1426        // Match any of values
1427        if let Some(any_arr) = match_obj.get("any").and_then(|a| a.as_array()) {
1428            return match &payload_value {
1429                Some(actual) => any_arr.iter().any(|expected| values_equal(actual, expected)),
1430                None => false,
1431            };
1432        }
1433        
1434        // Match text (words OR - any word in query matches)
1435        if let Some(text) = match_obj.get("text").and_then(|t| t.as_str()) {
1436            return match &payload_value {
1437                Some(serde_json::Value::String(s)) => {
1438                    let s_lower = s.to_lowercase();
1439                    // Split query into words and check if any word matches
1440                    text.split_whitespace()
1441                        .any(|word| s_lower.contains(&word.to_lowercase()))
1442                }
1443                _ => false,
1444            };
1445        }
1446    }
1447    
1448    // Handle "range" condition
1449    if let Some(range_obj) = obj.get("range").and_then(|r| r.as_object()) {
1450        let actual_num = match &payload_value {
1451            Some(serde_json::Value::Number(n)) => n.as_f64(),
1452            _ => None,
1453        };
1454        
1455        if let Some(actual) = actual_num {
1456            if let Some(gt) = range_obj.get("gt").and_then(|v| v.as_f64()) {
1457                if actual <= gt { return false; }
1458            }
1459            if let Some(gte) = range_obj.get("gte").and_then(|v| v.as_f64()) {
1460                if actual < gte { return false; }
1461            }
1462            if let Some(lt) = range_obj.get("lt").and_then(|v| v.as_f64()) {
1463                if actual >= lt { return false; }
1464            }
1465            if let Some(lte) = range_obj.get("lte").and_then(|v| v.as_f64()) {
1466                if actual > lte { return false; }
1467            }
1468            return true;
1469        }
1470        return false;
1471    }
1472    
1473    false
1474}
1475
1476/// Get a value from a nested path like "diet[].food" or "nested.field"
1477fn get_nested_value(payload: &Option<serde_json::Value>, path: &str) -> Option<serde_json::Value> {
1478    let payload = payload.as_ref()?;
1479    
1480    // Handle array notation like "diet[].food"
1481    if path.contains("[]") {
1482        let parts: Vec<&str> = path.split("[]").collect();
1483        if parts.len() >= 2 {
1484            let array_key = parts[0];
1485            let field_path = parts[1].trim_start_matches('.');
1486            
1487            if let Some(arr) = payload.get(array_key).and_then(|v| v.as_array()) {
1488                // Collect all matching values from array elements
1489                let values: Vec<serde_json::Value> = arr.iter()
1490                    .filter_map(|element| {
1491                        if field_path.is_empty() {
1492                            Some(element.clone())
1493                        } else {
1494                            element.get(field_path).cloned()
1495                        }
1496                    })
1497                    .collect();
1498                
1499                if !values.is_empty() {
1500                    return Some(serde_json::Value::Array(values));
1501                }
1502            }
1503        }
1504        return None;
1505    }
1506    
1507    // Handle dot notation like "nested.field"
1508    if path.contains('.') {
1509        let mut current = payload;
1510        for part in path.split('.') {
1511            current = current.get(part)?;
1512        }
1513        return Some(current.clone());
1514    }
1515    
1516    // Simple key lookup
1517    payload.get(path).cloned()
1518}
1519
1520/// Check if an array element matches a nested filter
1521fn matches_nested_element(element: &serde_json::Value, filter: &serde_json::Value) -> bool {
1522    let filter_obj = match filter.as_object() {
1523        Some(o) => o,
1524        None => return false,
1525    };
1526    
1527    // Handle "must" conditions within the element
1528    if let Some(must) = filter_obj.get("must").and_then(|m| m.as_array()) {
1529        for cond in must {
1530            if !matches_element_condition(element, cond) {
1531                return false;
1532            }
1533        }
1534    }
1535    
1536    // Handle "should" conditions
1537    if let Some(should) = filter_obj.get("should").and_then(|s| s.as_array()) {
1538        if !should.is_empty() {
1539            let any_match = should.iter().any(|cond| matches_element_condition(element, cond));
1540            if !any_match {
1541                return false;
1542            }
1543        }
1544    }
1545    
1546    // Handle "must_not" conditions
1547    if let Some(must_not) = filter_obj.get("must_not").and_then(|m| m.as_array()) {
1548        for cond in must_not {
1549            if matches_element_condition(element, cond) {
1550                return false;
1551            }
1552        }
1553    }
1554    
1555    true
1556}
1557
1558/// Check if an array element matches a single condition
1559fn matches_element_condition(element: &serde_json::Value, cond: &serde_json::Value) -> bool {
1560    let obj = match cond.as_object() {
1561        Some(o) => o,
1562        None => return false,
1563    };
1564    
1565    let key = match obj.get("key").and_then(|k| k.as_str()) {
1566        Some(k) => k,
1567        None => return false,
1568    };
1569    
1570    let element_value = element.get(key);
1571    
1572    if let Some(match_obj) = obj.get("match").and_then(|m| m.as_object()) {
1573        if let Some(expected) = match_obj.get("value") {
1574            return match element_value {
1575                Some(actual) => values_equal(actual, expected),
1576                None => false,
1577            };
1578        }
1579    }
1580    
1581    false
1582}
1583
1584/// Compare two JSON values for equality
1585fn values_equal(a: &serde_json::Value, b: &serde_json::Value) -> bool {
1586    match (a, b) {
1587        (serde_json::Value::String(s1), serde_json::Value::String(s2)) => s1 == s2,
1588        (serde_json::Value::Number(n1), serde_json::Value::Number(n2)) => {
1589            n1.as_f64() == n2.as_f64()
1590        }
1591        (serde_json::Value::Bool(b1), serde_json::Value::Bool(b2)) => b1 == b2,
1592        (serde_json::Value::Array(arr), val) | (val, serde_json::Value::Array(arr)) => {
1593            // Check if val is in array
1594            arr.iter().any(|item| values_equal(item, val))
1595        }
1596        _ => a == b,
1597    }
1598}
1599
1600#[derive(Deserialize)]
1601struct ScrollRequest {
1602    limit: Option<usize>,
1603    offset: Option<serde_json::Value>,
1604    with_payload: Option<bool>,
1605    with_vector: Option<bool>,
1606    #[serde(default)]
1607    filter: Option<serde_json::Value>,
1608}
1609
1610async fn scroll_points(
1611    storage: web::Data<Arc<StorageManager>>,
1612    path: web::Path<String>,
1613    req: web::Json<ScrollRequest>,
1614) -> ActixResult<HttpResponse> {
1615    let start_time = Instant::now();
1616    let collection_name = path.into_inner();
1617    
1618    let collection = match storage.get_collection(&collection_name) {
1619        Some(c) => c,
1620        None => {
1621            return Ok(qdrant_not_found("Collection not found", start_time));
1622        }
1623    };
1624    
1625    let limit = req.limit.unwrap_or(10);
1626    let with_payload = req.with_payload.unwrap_or(true);
1627    let with_vector = req.with_vector.unwrap_or(false);
1628    
1629    // Get offset as integer if provided
1630    let offset_id: Option<i64> = req.offset.as_ref().and_then(|v| {
1631        match v {
1632            serde_json::Value::Number(n) => n.as_i64(),
1633            serde_json::Value::String(s) => s.parse().ok(),
1634            _ => None,
1635        }
1636    });
1637    
1638    // Get all points and sort by ID for consistent pagination
1639    let all_points = collection.get_all_points();
1640    
1641    // Apply filter if provided
1642    let filtered_points: Vec<_> = if let Some(filter_json) = &req.filter {
1643        all_points.iter()
1644            .filter(|p| matches_filter(p, filter_json))
1645            .collect()
1646    } else {
1647        all_points.iter().collect()
1648    };
1649    
1650    let mut points_with_ids: Vec<_> = filtered_points.iter()
1651        .map(|p| {
1652            let id_num: i64 = match &p.id {
1653                vectx_core::PointId::Integer(i) => *i as i64,
1654                vectx_core::PointId::String(s) => s.parse::<i64>().unwrap_or(0),
1655                vectx_core::PointId::Uuid(_) => 0,
1656            };
1657            (id_num, *p)
1658        })
1659        .collect();
1660    
1661    points_with_ids.sort_by_key(|(id, _)| *id);
1662    
1663    // Apply offset
1664    let start_idx = if let Some(offset) = offset_id {
1665        points_with_ids.iter().position(|(id, _)| *id > offset).unwrap_or(points_with_ids.len())
1666    } else {
1667        0
1668    };
1669    
1670    // Get page of results
1671    let page: Vec<_> = points_with_ids.iter()
1672        .skip(start_idx)
1673        .take(limit)
1674        .collect();
1675    
1676    // Determine next offset
1677    let next_offset = if page.len() == limit && start_idx + limit < points_with_ids.len() {
1678        page.last().map(|(id, _)| serde_json::json!(*id))
1679    } else {
1680        None
1681    };
1682    
1683    // Format results
1684    let results: Vec<serde_json::Value> = page.iter().map(|(_, point)| {
1685        let mut obj = serde_json::json!({
1686            "id": point_id_to_json(&point.id),
1687            "version": point.version,
1688        });
1689        
1690        if with_payload {
1691            obj["payload"] = point.payload.clone().unwrap_or(serde_json::json!({}));
1692        }
1693        if with_vector {
1694            obj["vector"] = serde_json::json!(point.vector.as_slice());
1695        }
1696        
1697        obj
1698    }).collect();
1699    
1700    Ok(qdrant_response(serde_json::json!({
1701        "points": results,
1702        "next_page_offset": next_offset
1703    }), start_time))
1704}
1705
1706async fn get_point(
1707    storage: web::Data<Arc<StorageManager>>,
1708    path: web::Path<(String, String)>,
1709) -> ActixResult<HttpResponse> {
1710    let start_time = Instant::now();
1711    let (collection_name, point_id) = path.into_inner();
1712    
1713    let collection = match storage.get_collection(&collection_name) {
1714        Some(c) => c,
1715        None => {
1716            return Ok(qdrant_not_found("Collection not found", start_time));
1717        }
1718    };
1719
1720    match collection.get(&point_id) {
1721        Some(point) => {
1722            // Build response with optional multivector
1723            let mut result = serde_json::json!({
1724                "id": point_id_to_json(&point.id),
1725                "version": point.version,
1726                "vector": point.vector.as_slice(),
1727                "payload": point.payload.clone().unwrap_or(serde_json::Value::Null),
1728            });
1729            
1730            // Add multivector if present
1731            if let Some(mv) = &point.multivector {
1732                result["multivector"] = serde_json::json!(mv.vectors());
1733            }
1734            
1735            Ok(qdrant_response(result, start_time))
1736        }
1737        None => Ok(qdrant_not_found("Point not found", start_time)),
1738    }
1739}
1740
1741async fn delete_point(
1742    storage: web::Data<Arc<StorageManager>>,
1743    path: web::Path<(String, String)>,
1744) -> ActixResult<HttpResponse> {
1745    let start_time = Instant::now();
1746    let (collection_name, point_id) = path.into_inner();
1747    
1748    let collection = match storage.get_collection(&collection_name) {
1749        Some(c) => c,
1750        None => {
1751            return Ok(qdrant_not_found("Collection not found", start_time));
1752        }
1753    };
1754
1755    match collection.delete(&point_id) {
1756        Ok(true) => {
1757            let operation_id = collection.next_operation_id();
1758            Ok(qdrant_response(serde_json::json!({
1759                "operation_id": operation_id,
1760                "status": "acknowledged"
1761            }), start_time))
1762        }
1763        Ok(false) => Ok(qdrant_not_found("Point not found", start_time)),
1764        Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
1765    }
1766}
1767
1768#[derive(Deserialize)]
1769struct DeletePointsRequest {
1770    filter: Option<DeleteFilter>,
1771    points: Option<Vec<serde_json::Value>>,
1772}
1773
1774#[derive(Deserialize)]
1775struct DeleteFilter {
1776    must: Option<Vec<FilterMust>>,
1777}
1778
1779#[derive(Deserialize)]
1780struct FilterMust {
1781    key: String,
1782    #[serde(rename = "match")]
1783    match_value: MatchValue,
1784}
1785
1786#[derive(Deserialize)]
1787struct MatchValue {
1788    value: serde_json::Value,
1789}
1790
1791async fn delete_points_by_filter(
1792    storage: web::Data<Arc<StorageManager>>,
1793    path: web::Path<String>,
1794    req: web::Json<DeletePointsRequest>,
1795) -> ActixResult<HttpResponse> {
1796    let collection_name = path.into_inner();
1797    
1798    let start_time = Instant::now();
1799    let collection = match storage.get_collection(&collection_name) {
1800        Some(c) => c,
1801        None => {
1802            return Ok(qdrant_not_found("Collection not found", start_time));
1803        }
1804    };
1805    
1806    let mut _deleted_count = 0;
1807    
1808    // Handle filter-based deletion
1809    if let Some(filter) = &req.filter {
1810        if let Some(must_conditions) = &filter.must {
1811            // Get the field and value to match
1812            if let Some(condition) = must_conditions.first() {
1813                let field_key = &condition.key;
1814                let match_value = &condition.match_value.value;
1815                
1816                // Get all points and filter by payload
1817                let all_points = collection.get_all_points();
1818                let mut points_to_delete = Vec::new();
1819                
1820                for point in all_points {
1821                    if let Some(payload) = &point.payload {
1822                        if let Some(field_value) = payload.get(field_key) {
1823                            if field_value == match_value {
1824                                points_to_delete.push(point.id.clone());
1825                            }
1826                        }
1827                    }
1828                }
1829                
1830                // Delete matching points
1831                for point_id in points_to_delete {
1832                    let id_str = match &point_id {
1833                        vectx_core::PointId::String(s) => s.clone(),
1834                        vectx_core::PointId::Integer(i) => i.to_string(),
1835                        vectx_core::PointId::Uuid(u) => u.to_string(),
1836                    };
1837                    if collection.delete(&id_str).is_ok() {
1838                        _deleted_count += 1;
1839                    }
1840                }
1841            }
1842        }
1843    }
1844    
1845    // Handle point ID-based deletion
1846    if let Some(point_ids) = &req.points {
1847        for point_id in point_ids {
1848            let id_str = match point_id {
1849                serde_json::Value::String(s) => s.clone(),
1850                serde_json::Value::Number(n) => n.to_string(),
1851                _ => continue,
1852            };
1853            if collection.delete(&id_str).is_ok() {
1854                _deleted_count += 1;
1855            }
1856        }
1857    }
1858    
1859    let operation_id = collection.next_operation_id();
1860    Ok(qdrant_response(serde_json::json!({
1861        "operation_id": operation_id,
1862        "status": "acknowledged"
1863    }), start_time))
1864}
1865
1866async fn collection_exists(
1867    storage: web::Data<Arc<StorageManager>>,
1868    path: web::Path<String>,
1869) -> ActixResult<HttpResponse> {
1870    let start_time = Instant::now();
1871    let name = path.into_inner();
1872    let exists = storage.collection_exists(&name);
1873    
1874    Ok(qdrant_response(serde_json::json!({
1875        "exists": exists
1876    }), start_time))
1877}
1878
1879// Qdrant compatibility endpoints
1880
1881async fn list_aliases(
1882    storage: web::Data<Arc<StorageManager>>,
1883) -> ActixResult<HttpResponse> {
1884    let start_time = Instant::now();
1885    let aliases: Vec<serde_json::Value> = storage.list_aliases()
1886        .into_iter()
1887        .map(|(alias, collection)| serde_json::json!({
1888            "alias_name": alias,
1889            "collection_name": collection
1890        }))
1891        .collect();
1892    Ok(qdrant_response(serde_json::json!({
1893        "aliases": aliases
1894    }), start_time))
1895}
1896
1897async fn cluster_info() -> ActixResult<HttpResponse> {
1898    let start_time = Instant::now();
1899    // vectX runs as single node - return disabled status like Qdrant single-node
1900    Ok(qdrant_response(serde_json::json!({
1901        "status": "disabled"
1902    }), start_time))
1903}
1904
1905async fn telemetry_info() -> ActixResult<HttpResponse> {
1906    let start_time = Instant::now();
1907    Ok(qdrant_response(serde_json::json!({
1908        "id": "vectx-single-node",
1909        "app": {
1910            "name": "vectx",
1911            "version": "0.2.1"
1912        }
1913    }), start_time))
1914}
1915
1916// Snapshot endpoints
1917
1918async fn list_snapshots(
1919    storage: web::Data<Arc<StorageManager>>,
1920    path: web::Path<String>,
1921) -> ActixResult<HttpResponse> {
1922    let start_time = Instant::now();
1923    let collection_name = path.into_inner();
1924    
1925    match storage.list_collection_snapshots(&collection_name) {
1926        Ok(snapshots) => Ok(qdrant_response(snapshots, start_time)),
1927        Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
1928    }
1929}
1930
1931async fn list_all_snapshots(
1932    storage: web::Data<Arc<StorageManager>>,
1933) -> ActixResult<HttpResponse> {
1934    let start_time = Instant::now();
1935    match storage.list_all_snapshots() {
1936        Ok(snapshots) => Ok(qdrant_response(snapshots, start_time)),
1937        Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
1938    }
1939}
1940
1941/// Create full storage snapshot
1942async fn create_full_snapshot(
1943    storage: web::Data<Arc<StorageManager>>,
1944) -> ActixResult<HttpResponse> {
1945    let start_time = Instant::now();
1946    
1947    // Create snapshots for all collections
1948    let collections = storage.list_collections();
1949    let mut created_snapshots = Vec::new();
1950    
1951    for collection_name in collections {
1952        match storage.create_collection_snapshot(&collection_name) {
1953            Ok(snapshot) => created_snapshots.push(snapshot),
1954            Err(e) => {
1955                return Ok(qdrant_error(&format!("Failed to snapshot {}: {}", collection_name, e), start_time));
1956            }
1957        }
1958    }
1959    
1960    // Return metadata about full snapshot
1961    let snapshot_name = format!("full-snapshot-{}.snapshot", Utc::now().format("%Y-%m-%d-%H-%M-%S"));
1962    
1963    Ok(qdrant_response(serde_json::json!({
1964        "name": snapshot_name,
1965        "creation_time": Utc::now().to_rfc3339(),
1966        "size": 0,
1967        "collections": created_snapshots.len()
1968    }), start_time))
1969}
1970
1971/// Get (download) full snapshot
1972async fn get_full_snapshot(
1973    _path: web::Path<String>,
1974) -> ActixResult<HttpResponse> {
1975    let start_time = Instant::now();
1976    Ok(qdrant_error("Full storage snapshot download not yet implemented", start_time))
1977}
1978
1979/// Delete full snapshot
1980async fn delete_full_snapshot(
1981    _path: web::Path<String>,
1982) -> ActixResult<HttpResponse> {
1983    let start_time = Instant::now();
1984    // For now, just acknowledge
1985    Ok(qdrant_response(true, start_time))
1986}
1987
1988/// Update collection parameters
1989#[derive(Deserialize)]
1990struct UpdateCollectionRequest {
1991    #[serde(default)]
1992    optimizers_config: Option<serde_json::Value>,
1993    #[serde(default)]
1994    params: Option<serde_json::Value>,
1995    #[serde(default)]
1996    hnsw_config: Option<serde_json::Value>,
1997    #[serde(default)]
1998    vectors: Option<serde_json::Value>,
1999    #[serde(default)]
2000    quantization_config: Option<serde_json::Value>,
2001}
2002
2003async fn update_collection(
2004    storage: web::Data<Arc<StorageManager>>,
2005    path: web::Path<String>,
2006    _req: web::Json<UpdateCollectionRequest>,
2007) -> ActixResult<HttpResponse> {
2008    let start_time = Instant::now();
2009    let name = path.into_inner();
2010    
2011    if !storage.collection_exists(&name) {
2012        return Ok(qdrant_not_found("Collection not found", start_time));
2013    }
2014    
2015    // Collection update acknowledged (parameters update not yet fully implemented)
2016    Ok(qdrant_response(true, start_time))
2017}
2018
2019/// Get issues/performance suggestions
2020async fn get_issues() -> ActixResult<HttpResponse> {
2021    let start_time = Instant::now();
2022    Ok(qdrant_response(serde_json::json!({
2023        "issues": []
2024    }), start_time))
2025}
2026
2027/// Clear all reported issues
2028async fn clear_issues() -> ActixResult<HttpResponse> {
2029    let start_time = Instant::now();
2030    Ok(qdrant_response(true, start_time))
2031}
2032
2033async fn create_snapshot(
2034    storage: web::Data<Arc<StorageManager>>,
2035    path: web::Path<String>,
2036) -> ActixResult<HttpResponse> {
2037    let start_time = Instant::now();
2038    let collection_name = path.into_inner();
2039    
2040    // Check if collection exists
2041    if !storage.collection_exists(&collection_name) {
2042        return Ok(qdrant_not_found(&format!("Collection '{}' not found", collection_name), start_time));
2043    }
2044    
2045    match storage.create_collection_snapshot(&collection_name) {
2046        Ok(snapshot) => Ok(qdrant_response(snapshot, start_time)),
2047        Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
2048    }
2049}
2050
2051#[derive(Deserialize)]
2052struct RecoverSnapshotRequest {
2053    location: String,
2054    #[serde(default)]
2055    priority: Option<String>,
2056    #[serde(default)]
2057    checksum: Option<String>,
2058}
2059
2060async fn recover_snapshot(
2061    storage: web::Data<Arc<StorageManager>>,
2062    path: web::Path<String>,
2063    req: web::Json<RecoverSnapshotRequest>,
2064) -> ActixResult<HttpResponse> {
2065    let start_time = Instant::now();
2066    let collection_name = path.into_inner();
2067    let location = &req.location;
2068    
2069    // Helper to build response with collection info
2070    fn build_recovery_result(collection: &vectx_core::Collection) -> serde_json::Value {
2071        let points_count = collection.count();
2072        let vector_dim = collection.vector_dim();
2073        
2074        if points_count == 0 {
2075            serde_json::json!({
2076                "recovered": true,
2077                "collection": collection.name(),
2078                "vector_dim": vector_dim,
2079                "points_count": 0,
2080                "note": "Collection created with config only. If this was a Qdrant snapshot, points must be migrated separately using the scroll API."
2081            })
2082        } else {
2083            serde_json::json!({
2084                "recovered": true,
2085                "collection": collection.name(),
2086                "vector_dim": vector_dim,
2087                "points_count": points_count
2088            })
2089        }
2090    }
2091    
2092    // Check if it's a URL or local file reference
2093    if location.starts_with("http://") || location.starts_with("https://") {
2094        // Remote URL recovery
2095        match storage.recover_from_url(
2096            &collection_name,
2097            location,
2098            req.checksum.as_deref(),
2099        ).await {
2100            Ok(collection) => Ok(qdrant_response(build_recovery_result(&collection), start_time)),
2101            Err(e) => Ok(qdrant_error(&format!("Failed to recover from URL: {}", e), start_time)),
2102        }
2103    } else if location.starts_with("file://") {
2104        // Local file recovery - extract snapshot name from path
2105        let snapshot_name = location
2106            .trim_start_matches("file://")
2107            .rsplit('/')
2108            .next()
2109            .unwrap_or(location);
2110        
2111        match storage.recover_from_snapshot(&collection_name, snapshot_name) {
2112            Ok(collection) => Ok(qdrant_response(build_recovery_result(&collection), start_time)),
2113            Err(e) => Ok(qdrant_error(&format!("Failed to recover from snapshot: {}", e), start_time)),
2114        }
2115    } else {
2116        // Assume it's a snapshot name directly
2117        match storage.recover_from_snapshot(&collection_name, location) {
2118            Ok(collection) => Ok(qdrant_response(build_recovery_result(&collection), start_time)),
2119            Err(e) => Ok(qdrant_error(&format!("Failed to recover from snapshot: {}", e), start_time)),
2120        }
2121    }
2122}
2123
2124async fn get_snapshot(
2125    storage: web::Data<Arc<StorageManager>>,
2126    path: web::Path<(String, String)>,
2127) -> ActixResult<HttpResponse> {
2128    let start_time = Instant::now();
2129    let (collection_name, snapshot_name) = path.into_inner();
2130    
2131    if let Some(snapshot_path) = storage.get_snapshot_path(&collection_name, &snapshot_name) {
2132        // Return the snapshot file for download
2133        match std::fs::read(&snapshot_path) {
2134            Ok(data) => {
2135                Ok(HttpResponse::Ok()
2136                    .content_type("application/octet-stream")
2137                    .insert_header(("Content-Disposition", format!("attachment; filename=\"{}\"", snapshot_name)))
2138                    .body(data))
2139            }
2140            Err(e) => Ok(qdrant_error(&format!("Failed to read snapshot file: {}", e), start_time)),
2141        }
2142    } else {
2143        Ok(qdrant_not_found(&format!("Snapshot '{}' not found in collection '{}'", snapshot_name, collection_name), start_time))
2144    }
2145}
2146
2147async fn delete_snapshot(
2148    storage: web::Data<Arc<StorageManager>>,
2149    path: web::Path<(String, String)>,
2150) -> ActixResult<HttpResponse> {
2151    let start_time = Instant::now();
2152    let (collection_name, snapshot_name) = path.into_inner();
2153    
2154    match storage.delete_collection_snapshot(&collection_name, &snapshot_name) {
2155        Ok(true) => Ok(qdrant_response(true, start_time)),
2156        Ok(false) => Ok(qdrant_not_found(&format!("Snapshot '{}' not found in collection '{}'", snapshot_name, collection_name), start_time)),
2157        Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
2158    }
2159}
2160
2161async fn upload_snapshot(
2162    storage: web::Data<Arc<StorageManager>>,
2163    path: web::Path<String>,
2164    mut payload: Multipart,
2165) -> ActixResult<HttpResponse> {
2166    let start_time = Instant::now();
2167    let collection_name = path.into_inner();
2168    
2169    let mut snapshot_data: Option<Vec<u8>> = None;
2170    let mut filename: Option<String> = None;
2171    
2172    // Process multipart form data
2173    while let Some(item) = payload.next().await {
2174        let mut field = match item {
2175            Ok(f) => f,
2176            Err(e) => {
2177                return Ok(qdrant_error(&format!("Failed to parse multipart: {}", e), start_time));
2178            }
2179        };
2180        
2181        let content_disposition = match field.content_disposition() {
2182            Some(cd) => cd,
2183            None => continue,
2184        };
2185        let field_name = content_disposition.get_name().unwrap_or("");
2186        
2187        if field_name == "snapshot" {
2188            // Get filename
2189            filename = content_disposition.get_filename().map(|s: &str| s.to_string());
2190            
2191            // Read file data
2192            let mut data = Vec::new();
2193            while let Some(chunk) = field.next().await {
2194                match chunk {
2195                    Ok(bytes) => data.extend_from_slice(&bytes),
2196                    Err(e) => {
2197                        return Ok(qdrant_error(&format!("Failed to read file data: {}", e), start_time));
2198                    }
2199                }
2200            }
2201            snapshot_data = Some(data);
2202        }
2203    }
2204    
2205    // Validate we got the snapshot file
2206    let data = match snapshot_data {
2207        Some(d) => d,
2208        None => {
2209            return Ok(qdrant_error("No snapshot file provided in multipart form", start_time));
2210        }
2211    };
2212    
2213    // Save and restore the snapshot
2214    match storage.upload_and_restore_snapshot(&collection_name, &data, filename.as_deref()) {
2215        Ok(collection) => Ok(qdrant_response(serde_json::json!({
2216            "collection": collection_name,
2217            "points_count": collection.count()
2218        }), start_time)),
2219        Err(e) => Ok(qdrant_error(&format!("Failed to restore snapshot: {}", e), start_time)),
2220    }
2221}
2222
2223// ============ Additional Qdrant-compatible endpoints ============
2224
2225/// Update aliases (stub - aliases not yet implemented)
2226/// Update aliases request
2227#[derive(Deserialize)]
2228struct UpdateAliasesRequest {
2229    actions: Vec<serde_json::Value>,
2230}
2231
2232async fn update_aliases(
2233    storage: web::Data<Arc<StorageManager>>,
2234    req: web::Json<UpdateAliasesRequest>,
2235) -> ActixResult<HttpResponse> {
2236    let start_time = Instant::now();
2237    
2238    for action in &req.actions {
2239        if let Some(obj) = action.as_object() {
2240            // Create alias: { "create_alias": { "alias_name": "x", "collection_name": "y" } }
2241            if let Some(create) = obj.get("create_alias").and_then(|c| c.as_object()) {
2242                if let (Some(alias), Some(collection)) = (
2243                    create.get("alias_name").and_then(|v| v.as_str()),
2244                    create.get("collection_name").and_then(|v| v.as_str())
2245                ) {
2246                    let _ = storage.create_alias(alias, collection);
2247                }
2248            }
2249            
2250            // Delete alias: { "delete_alias": { "alias_name": "x" } }
2251            if let Some(delete) = obj.get("delete_alias").and_then(|d| d.as_object()) {
2252                if let Some(alias) = delete.get("alias_name").and_then(|v| v.as_str()) {
2253                    let _ = storage.delete_alias(alias);
2254                }
2255            }
2256            
2257            // Rename alias: { "rename_alias": { "old_alias_name": "x", "new_alias_name": "y" } }
2258            if let Some(rename) = obj.get("rename_alias").and_then(|r| r.as_object()) {
2259                if let (Some(old_alias), Some(new_alias)) = (
2260                    rename.get("old_alias_name").and_then(|v| v.as_str()),
2261                    rename.get("new_alias_name").and_then(|v| v.as_str())
2262                ) {
2263                    let _ = storage.rename_alias(old_alias, new_alias);
2264                }
2265            }
2266        }
2267    }
2268    
2269    Ok(qdrant_response(true, start_time))
2270}
2271
2272/// List collection aliases
2273async fn list_collection_aliases(
2274    storage: web::Data<Arc<StorageManager>>,
2275    path: web::Path<String>,
2276) -> ActixResult<HttpResponse> {
2277    let start_time = Instant::now();
2278    let collection_name = path.into_inner();
2279    let aliases: Vec<serde_json::Value> = storage.list_collection_aliases(&collection_name)
2280        .into_iter()
2281        .map(|alias| serde_json::json!({
2282            "alias_name": alias,
2283            "collection_name": collection_name
2284        }))
2285        .collect();
2286    Ok(qdrant_response(serde_json::json!({
2287        "aliases": aliases
2288    }), start_time))
2289}
2290
2291/// Collection cluster info
2292async fn collection_cluster_info(
2293    path: web::Path<String>,
2294) -> ActixResult<HttpResponse> {
2295    let start_time = Instant::now();
2296    let collection_name = path.into_inner();
2297    Ok(qdrant_response(serde_json::json!({
2298        "peer_id": 0,
2299        "shard_count": 1,
2300        "local_shards": [{
2301            "shard_id": 0,
2302            "points_count": 0,
2303            "state": "Active"
2304        }],
2305        "remote_shards": [],
2306        "shard_transfers": [],
2307        "collection_name": collection_name
2308    }), start_time))
2309}
2310
2311/// Get multiple points by IDs
2312#[derive(Deserialize)]
2313struct GetPointsRequest {
2314    ids: Vec<serde_json::Value>,
2315    #[serde(default)]
2316    with_payload: Option<bool>,
2317    #[serde(default)]
2318    with_vector: Option<bool>,
2319}
2320
2321async fn get_points_by_ids(
2322    storage: web::Data<Arc<StorageManager>>,
2323    path: web::Path<String>,
2324    req: web::Json<GetPointsRequest>,
2325) -> ActixResult<HttpResponse> {
2326    let start_time = Instant::now();
2327    let name = path.into_inner();
2328    
2329    let collection = match storage.get_collection(&name) {
2330        Some(c) => c,
2331        None => {
2332            return Ok(qdrant_not_found("Collection not found", start_time));
2333        }
2334    };
2335
2336    let with_payload = req.with_payload.unwrap_or(true);
2337    let with_vector = req.with_vector.unwrap_or(false);
2338    
2339    let mut points = Vec::new();
2340    for id_value in &req.ids {
2341        let id_str = match id_value {
2342            serde_json::Value::String(s) => s.clone(),
2343            serde_json::Value::Number(n) => n.to_string(),
2344            _ => continue,
2345        };
2346        
2347        if let Some(point) = collection.get(&id_str) {
2348            let mut result = serde_json::json!({
2349                "id": id_value,
2350                "version": point.version
2351            });
2352            if with_payload {
2353                result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
2354            }
2355            if with_vector {
2356                result["vector"] = serde_json::json!(point.vector.as_slice());
2357            }
2358            points.push(result);
2359        }
2360    }
2361
2362    Ok(qdrant_response(points, start_time))
2363}
2364
2365/// Count points in collection
2366#[derive(Deserialize)]
2367struct CountRequest {
2368    #[serde(default)]
2369    filter: Option<serde_json::Value>,
2370    #[serde(default)]
2371    exact: Option<bool>,
2372}
2373
2374async fn count_points(
2375    storage: web::Data<Arc<StorageManager>>,
2376    path: web::Path<String>,
2377    _req: web::Json<CountRequest>,
2378) -> ActixResult<HttpResponse> {
2379    let start_time = Instant::now();
2380    let name = path.into_inner();
2381    
2382    let collection = match storage.get_collection(&name) {
2383        Some(c) => c,
2384        None => {
2385            return Ok(qdrant_not_found("Collection not found", start_time));
2386        }
2387    };
2388
2389    Ok(qdrant_response(serde_json::json!({
2390        "count": collection.count()
2391    }), start_time))
2392}
2393
2394/// Set payload on points
2395#[derive(Deserialize)]
2396struct SetPayloadRequest {
2397    payload: serde_json::Value,
2398    #[serde(default)]
2399    points: Option<Vec<serde_json::Value>>,
2400    #[serde(default)]
2401    filter: Option<serde_json::Value>,
2402}
2403
2404async fn set_payload(
2405    storage: web::Data<Arc<StorageManager>>,
2406    path: web::Path<String>,
2407    req: web::Json<SetPayloadRequest>,
2408) -> ActixResult<HttpResponse> {
2409    let start_time = Instant::now();
2410    let name = path.into_inner();
2411    
2412    let collection = match storage.get_collection(&name) {
2413        Some(c) => c,
2414        None => return Ok(qdrant_not_found("Collection not found", start_time)),
2415    };
2416
2417    let mut updated_count = 0;
2418
2419    // If specific points are provided, update only those
2420    if let Some(point_ids) = &req.points {
2421        for id_value in point_ids {
2422            let id_str = match id_value {
2423                serde_json::Value::String(s) => s.clone(),
2424                serde_json::Value::Number(n) => n.to_string(),
2425                _ => continue,
2426            };
2427            if collection.set_payload(&id_str, req.payload.clone()).unwrap_or(false) {
2428                updated_count += 1;
2429            }
2430        }
2431    } else {
2432        // Update all points (or filtered points)
2433        let all_points = collection.get_all_points();
2434        for point in all_points {
2435            let id_str = point.id.to_string();
2436            if collection.set_payload(&id_str, req.payload.clone()).unwrap_or(false) {
2437                updated_count += 1;
2438            }
2439        }
2440    }
2441
2442    Ok(qdrant_response(serde_json::json!({
2443        "operation_id": updated_count,
2444        "status": "acknowledged"
2445    }), start_time))
2446}
2447
2448/// Overwrite payload on points (replace entire payload)
2449async fn overwrite_payload(
2450    storage: web::Data<Arc<StorageManager>>,
2451    path: web::Path<String>,
2452    req: web::Json<SetPayloadRequest>,
2453) -> ActixResult<HttpResponse> {
2454    let start_time = Instant::now();
2455    let name = path.into_inner();
2456    
2457    let collection = match storage.get_collection(&name) {
2458        Some(c) => c,
2459        None => return Ok(qdrant_not_found("Collection not found", start_time)),
2460    };
2461
2462    let mut updated_count = 0;
2463
2464    if let Some(point_ids) = &req.points {
2465        for id_value in point_ids {
2466            let id_str = match id_value {
2467                serde_json::Value::String(s) => s.clone(),
2468                serde_json::Value::Number(n) => n.to_string(),
2469                _ => continue,
2470            };
2471            if collection.overwrite_payload(&id_str, req.payload.clone()).unwrap_or(false) {
2472                updated_count += 1;
2473            }
2474        }
2475    } else {
2476        let all_points = collection.get_all_points();
2477        for point in all_points {
2478            let id_str = point.id.to_string();
2479            if collection.overwrite_payload(&id_str, req.payload.clone()).unwrap_or(false) {
2480                updated_count += 1;
2481            }
2482        }
2483    }
2484
2485    Ok(qdrant_response(serde_json::json!({
2486        "operation_id": updated_count,
2487        "status": "acknowledged"
2488    }), start_time))
2489}
2490
2491/// Delete payload fields from points
2492#[derive(Deserialize)]
2493struct DeletePayloadRequest {
2494    keys: Vec<String>,
2495    #[serde(default)]
2496    points: Option<Vec<serde_json::Value>>,
2497    #[serde(default)]
2498    filter: Option<serde_json::Value>,
2499}
2500
2501async fn delete_payload(
2502    storage: web::Data<Arc<StorageManager>>,
2503    path: web::Path<String>,
2504    req: web::Json<DeletePayloadRequest>,
2505) -> ActixResult<HttpResponse> {
2506    let start_time = Instant::now();
2507    let name = path.into_inner();
2508    
2509    let collection = match storage.get_collection(&name) {
2510        Some(c) => c,
2511        None => return Ok(qdrant_not_found("Collection not found", start_time)),
2512    };
2513
2514    let mut updated_count = 0;
2515
2516    if let Some(point_ids) = &req.points {
2517        for id_value in point_ids {
2518            let id_str = match id_value {
2519                serde_json::Value::String(s) => s.clone(),
2520                serde_json::Value::Number(n) => n.to_string(),
2521                _ => continue,
2522            };
2523            if collection.delete_payload_keys(&id_str, &req.keys).unwrap_or(false) {
2524                updated_count += 1;
2525            }
2526        }
2527    } else {
2528        let all_points = collection.get_all_points();
2529        for point in all_points {
2530            let id_str = point.id.to_string();
2531            if collection.delete_payload_keys(&id_str, &req.keys).unwrap_or(false) {
2532                updated_count += 1;
2533            }
2534        }
2535    }
2536
2537    Ok(qdrant_response(serde_json::json!({
2538        "operation_id": updated_count,
2539        "status": "acknowledged"
2540    }), start_time))
2541}
2542
2543/// Clear all payload from points
2544#[derive(Deserialize)]
2545struct ClearPayloadRequest {
2546    #[serde(default)]
2547    points: Option<Vec<serde_json::Value>>,
2548    #[serde(default)]
2549    filter: Option<serde_json::Value>,
2550}
2551
2552async fn clear_payload(
2553    storage: web::Data<Arc<StorageManager>>,
2554    path: web::Path<String>,
2555    req: web::Json<ClearPayloadRequest>,
2556) -> ActixResult<HttpResponse> {
2557    let start_time = Instant::now();
2558    let name = path.into_inner();
2559    
2560    let collection = match storage.get_collection(&name) {
2561        Some(c) => c,
2562        None => return Ok(qdrant_not_found("Collection not found", start_time)),
2563    };
2564
2565    let mut updated_count = 0;
2566
2567    if let Some(point_ids) = &req.points {
2568        for id_value in point_ids {
2569            let id_str = match id_value {
2570                serde_json::Value::String(s) => s.clone(),
2571                serde_json::Value::Number(n) => n.to_string(),
2572                _ => continue,
2573            };
2574            if collection.clear_payload(&id_str).unwrap_or(false) {
2575                updated_count += 1;
2576            }
2577        }
2578    } else {
2579        let all_points = collection.get_all_points();
2580        for point in all_points {
2581            let id_str = point.id.to_string();
2582            if collection.clear_payload(&id_str).unwrap_or(false) {
2583                updated_count += 1;
2584            }
2585        }
2586    }
2587
2588    Ok(qdrant_response(serde_json::json!({
2589        "operation_id": updated_count,
2590        "status": "acknowledged"
2591    }), start_time))
2592}
2593
2594/// Update vectors on existing points
2595#[derive(Deserialize)]
2596struct UpdateVectorsRequest {
2597    /// List of point updates with id and vector
2598    points: Vec<UpdateVectorPoint>,
2599}
2600
2601#[derive(Deserialize)]
2602struct UpdateVectorPoint {
2603    id: serde_json::Value,
2604    vector: serde_json::Value,
2605}
2606
2607async fn update_vectors(
2608    storage: web::Data<Arc<StorageManager>>,
2609    path: web::Path<String>,
2610    req: web::Json<UpdateVectorsRequest>,
2611) -> ActixResult<HttpResponse> {
2612    let start_time = Instant::now();
2613    let name = path.into_inner();
2614    
2615    let collection = match storage.get_collection(&name) {
2616        Some(c) => c,
2617        None => return Ok(qdrant_not_found("Collection not found", start_time)),
2618    };
2619
2620    let mut updated_count = 0;
2621
2622    for point_update in &req.points {
2623        let id_str = match &point_update.id {
2624            serde_json::Value::String(s) => s.clone(),
2625            serde_json::Value::Number(n) => n.to_string(),
2626            _ => continue,
2627        };
2628
2629        // Parse vector - can be array or named vectors object
2630        let vector_data = match &point_update.vector {
2631            serde_json::Value::Array(arr) => {
2632                let vec: Result<Vec<f32>, _> = arr.iter()
2633                    .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
2634                    .collect();
2635                vec.ok()
2636            }
2637            serde_json::Value::Object(obj) => {
2638                // Named vectors - get first one
2639                if let Some((_, vec_val)) = obj.iter().next() {
2640                    if let Some(arr) = vec_val.as_array() {
2641                        let vec: Result<Vec<f32>, _> = arr.iter()
2642                            .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
2643                            .collect();
2644                        vec.ok()
2645                    } else {
2646                        None
2647                    }
2648                } else {
2649                    None
2650                }
2651            }
2652            _ => None,
2653        };
2654
2655        if let Some(vec) = vector_data {
2656            let vector = Vector::new(vec);
2657            if collection.update_vector(&id_str, vector).unwrap_or(false) {
2658                updated_count += 1;
2659            }
2660        }
2661    }
2662
2663    Ok(qdrant_response(serde_json::json!({
2664        "operation_id": updated_count,
2665        "status": "acknowledged"
2666    }), start_time))
2667}
2668
2669/// Delete vectors from points
2670#[derive(Deserialize)]
2671struct DeleteVectorsRequest {
2672    #[serde(default)]
2673    points: Option<Vec<serde_json::Value>>,
2674    #[serde(default)]
2675    vectors: Vec<String>,
2676    #[serde(default)]
2677    filter: Option<serde_json::Value>,
2678}
2679
2680async fn delete_vectors(
2681    storage: web::Data<Arc<StorageManager>>,
2682    path: web::Path<String>,
2683    req: web::Json<DeleteVectorsRequest>,
2684) -> ActixResult<HttpResponse> {
2685    let start_time = Instant::now();
2686    let name = path.into_inner();
2687    
2688    let collection = match storage.get_collection(&name) {
2689        Some(c) => c,
2690        None => return Ok(qdrant_not_found("Collection not found", start_time)),
2691    };
2692
2693    let mut deleted_count = 0;
2694
2695    // Note: In a full named-vectors implementation, this would delete specific named vectors
2696    // For now, if points are specified, we clear their vectors (effectively delete the point)
2697    if let Some(point_ids) = &req.points {
2698        for id_value in point_ids {
2699            let id_str = match id_value {
2700                serde_json::Value::String(s) => s.clone(),
2701                serde_json::Value::Number(n) => n.to_string(),
2702                _ => continue,
2703            };
2704            // Delete multivector if it was the target
2705            if req.vectors.iter().any(|v| v == "multivector" || v.is_empty()) {
2706                if collection.update_multivector(&id_str, None).unwrap_or(false) {
2707                    deleted_count += 1;
2708                }
2709            }
2710        }
2711    }
2712
2713    Ok(qdrant_response(serde_json::json!({
2714        "operation_id": deleted_count,
2715        "status": "acknowledged"
2716    }), start_time))
2717}
2718
2719/// Batch update operations
2720#[derive(Deserialize)]
2721struct BatchUpdateRequest {
2722    operations: Vec<serde_json::Value>,
2723}
2724
2725async fn batch_update(
2726    storage: web::Data<Arc<StorageManager>>,
2727    path: web::Path<String>,
2728    req: web::Json<BatchUpdateRequest>,
2729) -> ActixResult<HttpResponse> {
2730    let start_time = Instant::now();
2731    let name = path.into_inner();
2732    
2733    let collection = match storage.get_collection(&name) {
2734        Some(c) => c,
2735        None => return Ok(qdrant_not_found("Collection not found", start_time)),
2736    };
2737
2738    let mut results = Vec::new();
2739
2740    for (idx, operation) in req.operations.iter().enumerate() {
2741        let op_result = process_batch_operation(&collection, operation);
2742        results.push(serde_json::json!({
2743            "operation_id": idx,
2744            "status": if op_result { "acknowledged" } else { "failed" }
2745        }));
2746    }
2747
2748    Ok(qdrant_response(results, start_time))
2749}
2750
2751/// Process a single batch operation
2752fn process_batch_operation(collection: &std::sync::Arc<vectx_core::Collection>, operation: &serde_json::Value) -> bool {
2753    let obj = match operation.as_object() {
2754        Some(o) => o,
2755        None => return false,
2756    };
2757
2758    // Upsert operation
2759    if let Some(upsert) = obj.get("upsert") {
2760        if let Some(points) = upsert.get("points").and_then(|p| p.as_array()) {
2761            for point_json in points {
2762                if let Some(point) = parse_point_from_json(point_json) {
2763                    let _ = collection.upsert(point);
2764                }
2765            }
2766            return true;
2767        }
2768    }
2769
2770    // Delete operation
2771    if let Some(delete) = obj.get("delete") {
2772        if let Some(points) = delete.get("points").and_then(|p| p.as_array()) {
2773            for id_val in points {
2774                let id_str = match id_val {
2775                    serde_json::Value::String(s) => s.clone(),
2776                    serde_json::Value::Number(n) => n.to_string(),
2777                    _ => continue,
2778                };
2779                let _ = collection.delete(&id_str);
2780            }
2781            return true;
2782        }
2783    }
2784
2785    // Set payload operation
2786    if let Some(set_payload) = obj.get("set_payload") {
2787        if let Some(payload) = set_payload.get("payload") {
2788            if let Some(points) = set_payload.get("points").and_then(|p| p.as_array()) {
2789                for id_val in points {
2790                    let id_str = match id_val {
2791                        serde_json::Value::String(s) => s.clone(),
2792                        serde_json::Value::Number(n) => n.to_string(),
2793                        _ => continue,
2794                    };
2795                    let _ = collection.set_payload(&id_str, payload.clone());
2796                }
2797                return true;
2798            }
2799        }
2800    }
2801
2802    // Overwrite payload operation
2803    if let Some(overwrite_payload) = obj.get("overwrite_payload") {
2804        if let Some(payload) = overwrite_payload.get("payload") {
2805            if let Some(points) = overwrite_payload.get("points").and_then(|p| p.as_array()) {
2806                for id_val in points {
2807                    let id_str = match id_val {
2808                        serde_json::Value::String(s) => s.clone(),
2809                        serde_json::Value::Number(n) => n.to_string(),
2810                        _ => continue,
2811                    };
2812                    let _ = collection.overwrite_payload(&id_str, payload.clone());
2813                }
2814                return true;
2815            }
2816        }
2817    }
2818
2819    // Delete payload operation
2820    if let Some(delete_payload) = obj.get("delete_payload") {
2821        if let Some(keys) = delete_payload.get("keys").and_then(|k| k.as_array()) {
2822            let key_strings: Vec<String> = keys.iter()
2823                .filter_map(|k| k.as_str().map(String::from))
2824                .collect();
2825            if let Some(points) = delete_payload.get("points").and_then(|p| p.as_array()) {
2826                for id_val in points {
2827                    let id_str = match id_val {
2828                        serde_json::Value::String(s) => s.clone(),
2829                        serde_json::Value::Number(n) => n.to_string(),
2830                        _ => continue,
2831                    };
2832                    let _ = collection.delete_payload_keys(&id_str, &key_strings);
2833                }
2834                return true;
2835            }
2836        }
2837    }
2838
2839    // Clear payload operation
2840    if let Some(clear_payload) = obj.get("clear_payload") {
2841        if let Some(points) = clear_payload.get("points").and_then(|p| p.as_array()) {
2842            for id_val in points {
2843                let id_str = match id_val {
2844                    serde_json::Value::String(s) => s.clone(),
2845                    serde_json::Value::Number(n) => n.to_string(),
2846                    _ => continue,
2847                };
2848                let _ = collection.clear_payload(&id_str);
2849            }
2850            return true;
2851        }
2852    }
2853
2854    false
2855}
2856
2857/// Parse a point from JSON
2858fn parse_point_from_json(json: &serde_json::Value) -> Option<Point> {
2859    let obj = json.as_object()?;
2860    
2861    let id = match obj.get("id")? {
2862        serde_json::Value::String(s) => vectx_core::PointId::String(s.clone()),
2863        serde_json::Value::Number(n) => {
2864            vectx_core::PointId::Integer(n.as_u64().unwrap_or(0))
2865        }
2866        _ => return None,
2867    };
2868
2869    let vector_data = obj.get("vector")?;
2870    let vector = match vector_data {
2871        serde_json::Value::Array(arr) => {
2872            let vec: Result<Vec<f32>, _> = arr.iter()
2873                .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
2874                .collect();
2875            Vector::new(vec.ok()?)
2876        }
2877        _ => return None,
2878    };
2879
2880    let payload = obj.get("payload").cloned();
2881
2882    Some(Point::new(id, vector, payload))
2883}
2884
2885/// Batch search
2886#[derive(Deserialize)]
2887struct BatchSearchRequest {
2888    searches: Vec<serde_json::Value>,
2889}
2890
2891async fn batch_search(
2892    storage: web::Data<Arc<StorageManager>>,
2893    path: web::Path<String>,
2894    _req: web::Json<BatchSearchRequest>,
2895) -> ActixResult<HttpResponse> {
2896    let start_time = Instant::now();
2897    let name = path.into_inner();
2898    
2899    if storage.get_collection(&name).is_none() {
2900        return Ok(qdrant_not_found("Collection not found", start_time));
2901    }
2902
2903    Ok(qdrant_response(Vec::<serde_json::Value>::new(), start_time))
2904}
2905
2906/// Search points grouped by a payload field
2907#[derive(Deserialize)]
2908struct SearchGroupsRequest {
2909    vector: Vec<f32>,
2910    group_by: String,
2911    #[serde(default)]
2912    limit: Option<usize>,
2913    #[serde(default)]
2914    group_size: Option<usize>,
2915    #[serde(default)]
2916    with_payload: Option<bool>,
2917    #[serde(default)]
2918    with_vector: Option<bool>,
2919    #[serde(default)]
2920    filter: Option<serde_json::Value>,
2921}
2922
2923async fn search_groups(
2924    storage: web::Data<Arc<StorageManager>>,
2925    path: web::Path<String>,
2926    req: web::Json<SearchGroupsRequest>,
2927) -> ActixResult<HttpResponse> {
2928    let start_time = Instant::now();
2929    let name = path.into_inner();
2930    
2931    let collection = match storage.get_collection(&name) {
2932        Some(c) => c,
2933        None => {
2934            return Ok(qdrant_not_found("Collection not found", start_time));
2935        }
2936    };
2937
2938    let limit = req.limit.unwrap_or(5);
2939    let group_size = req.group_size.unwrap_or(3);
2940    let with_payload = req.with_payload.unwrap_or(true);
2941    let with_vector = req.with_vector.unwrap_or(false);
2942    let group_by = &req.group_by;
2943    
2944    let query_vector = Vector::new(req.vector.clone());
2945    let search_results = collection.search(&query_vector, limit * group_size * 2, None);
2946    
2947    // Group results by the group_by field
2948    let mut groups: std::collections::HashMap<String, Vec<serde_json::Value>> = std::collections::HashMap::new();
2949    
2950    for (point, score) in search_results {
2951        let group_key = point.payload
2952            .as_ref()
2953            .and_then(|p| p.get(group_by))
2954            .and_then(|v| match v {
2955                serde_json::Value::String(s) => Some(s.clone()),
2956                serde_json::Value::Number(n) => Some(n.to_string()),
2957                _ => None,
2958            })
2959            .unwrap_or_else(|| "unknown".to_string());
2960        
2961        let group = groups.entry(group_key).or_default();
2962        
2963        if group.len() < group_size {
2964            let mut hit = serde_json::json!({
2965                "id": point_id_to_json(&point.id),
2966                "score": score
2967            });
2968            
2969            if with_payload {
2970                hit["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
2971            }
2972            if with_vector {
2973                hit["vector"] = serde_json::json!(point.vector.as_slice());
2974            }
2975            
2976            group.push(hit);
2977        }
2978        
2979        if groups.len() >= limit && groups.values().all(|g| g.len() >= group_size) {
2980            break;
2981        }
2982    }
2983    
2984    let group_results: Vec<serde_json::Value> = groups
2985        .into_iter()
2986        .take(limit)
2987        .map(|(key, hits)| serde_json::json!({ "id": key, "hits": hits }))
2988        .collect();
2989
2990    Ok(qdrant_response(serde_json::json!({
2991        "groups": group_results
2992    }), start_time))
2993}
2994
2995/// Discover points using context pairs
2996#[derive(Deserialize)]
2997struct DiscoverRequest {
2998    #[serde(default)]
2999    target: Option<serde_json::Value>,
3000    #[serde(default)]
3001    context: Option<Vec<ContextPair>>,
3002    #[serde(default)]
3003    limit: Option<usize>,
3004    #[serde(default)]
3005    with_payload: Option<bool>,
3006    #[serde(default)]
3007    with_vector: Option<bool>,
3008    #[serde(default)]
3009    filter: Option<serde_json::Value>,
3010}
3011
3012#[derive(Deserialize)]
3013struct ContextPair {
3014    positive: serde_json::Value,
3015    negative: serde_json::Value,
3016}
3017
3018async fn discover_points(
3019    storage: web::Data<Arc<StorageManager>>,
3020    path: web::Path<String>,
3021    req: web::Json<DiscoverRequest>,
3022) -> ActixResult<HttpResponse> {
3023    let start_time = Instant::now();
3024    let name = path.into_inner();
3025    
3026    let collection = match storage.get_collection(&name) {
3027        Some(c) => c,
3028        None => {
3029            return Ok(qdrant_not_found("Collection not found", start_time));
3030        }
3031    };
3032
3033    let limit = req.limit.unwrap_or(10);
3034    let with_payload = req.with_payload.unwrap_or(true);
3035    let _with_vector = req.with_vector.unwrap_or(false);
3036    
3037    // Parse target vector or point ID
3038    let target_vector = if let Some(target) = &req.target {
3039        match target {
3040            serde_json::Value::Array(arr) => {
3041                let vec: Result<Vec<f32>, _> = arr.iter()
3042                    .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
3043                    .collect();
3044                vec.ok().map(Vector::new)
3045            }
3046            serde_json::Value::Number(n) => {
3047                let id = n.to_string();
3048                collection.get(&id).map(|p| p.vector.clone())
3049            }
3050            serde_json::Value::String(s) => {
3051                collection.get(s).map(|p| p.vector.clone())
3052            }
3053            _ => None,
3054        }
3055    } else {
3056        None
3057    };
3058
3059    let query = match target_vector {
3060        Some(v) => v,
3061        None => {
3062            return Ok(qdrant_error("Target vector or point ID required", start_time));
3063        }
3064    };
3065    
3066    let results = collection.search(&query, limit, None);
3067    
3068    let scored_points: Vec<serde_json::Value> = results.into_iter().map(|(point, score)| {
3069        let mut result = serde_json::json!({
3070            "id": point_id_to_json(&point.id),
3071            "version": point.version,
3072            "score": score,
3073        });
3074        if with_payload {
3075            result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
3076        }
3077        result
3078    }).collect();
3079
3080    Ok(qdrant_response(scored_points, start_time))
3081}
3082
3083/// Batch discover points
3084#[derive(Deserialize)]
3085struct DiscoverBatchRequest {
3086    searches: Vec<serde_json::Value>,
3087}
3088
3089async fn discover_batch(
3090    storage: web::Data<Arc<StorageManager>>,
3091    path: web::Path<String>,
3092    _req: web::Json<DiscoverBatchRequest>,
3093) -> ActixResult<HttpResponse> {
3094    let start_time = Instant::now();
3095    let name = path.into_inner();
3096    
3097    if storage.get_collection(&name).is_none() {
3098        return Ok(qdrant_not_found("Collection not found", start_time));
3099    }
3100
3101    Ok(qdrant_response(Vec::<Vec<serde_json::Value>>::new(), start_time))
3102}
3103
3104/// Facet counts - count points by unique payload values
3105#[derive(Deserialize)]
3106struct FacetRequest {
3107    key: String,
3108    #[serde(default)]
3109    limit: Option<usize>,
3110    #[serde(default)]
3111    filter: Option<serde_json::Value>,
3112    #[serde(default)]
3113    exact: Option<bool>,
3114}
3115
3116async fn facet_counts(
3117    storage: web::Data<Arc<StorageManager>>,
3118    path: web::Path<String>,
3119    req: web::Json<FacetRequest>,
3120) -> ActixResult<HttpResponse> {
3121    let start_time = Instant::now();
3122    let name = path.into_inner();
3123    
3124    let collection = match storage.get_collection(&name) {
3125        Some(c) => c,
3126        None => {
3127            return Ok(qdrant_not_found("Collection not found", start_time));
3128        }
3129    };
3130
3131    let limit = req.limit.unwrap_or(10);
3132    let key = &req.key;
3133    
3134    // Count occurrences of each value for the given key
3135    let all_points = collection.get_all_points();
3136    let mut value_counts: std::collections::HashMap<String, u64> = std::collections::HashMap::new();
3137    
3138    for point in all_points {
3139        if let Some(payload) = &point.payload {
3140            if let Some(value) = payload.get(key) {
3141                let value_str = match value {
3142                    serde_json::Value::String(s) => s.clone(),
3143                    serde_json::Value::Number(n) => n.to_string(),
3144                    serde_json::Value::Bool(b) => b.to_string(),
3145                    _ => continue,
3146                };
3147                *value_counts.entry(value_str).or_insert(0) += 1;
3148            }
3149        }
3150    }
3151    
3152    // Sort by count and take top limit
3153    let mut counts: Vec<_> = value_counts.into_iter().collect();
3154    counts.sort_by(|a, b| b.1.cmp(&a.1));
3155    
3156    let hits: Vec<serde_json::Value> = counts.into_iter()
3157        .take(limit)
3158        .map(|(value, count)| serde_json::json!({
3159            "value": value,
3160            "count": count
3161        }))
3162        .collect();
3163
3164    Ok(qdrant_response(serde_json::json!({
3165        "hits": hits
3166    }), start_time))
3167}
3168
3169/// Batch query
3170#[derive(Deserialize)]
3171struct BatchQueryRequest {
3172    searches: Vec<serde_json::Value>,
3173}
3174
3175async fn batch_query(
3176    storage: web::Data<Arc<StorageManager>>,
3177    path: web::Path<String>,
3178    _req: web::Json<BatchQueryRequest>,
3179) -> ActixResult<HttpResponse> {
3180    let start_time = Instant::now();
3181    let name = path.into_inner();
3182    
3183    if storage.get_collection(&name).is_none() {
3184        return Ok(qdrant_not_found("Collection not found", start_time));
3185    }
3186
3187    Ok(qdrant_response(Vec::<serde_json::Value>::new(), start_time))
3188}
3189
3190/// Query points with grouping
3191#[derive(Deserialize)]
3192struct QueryGroupsRequest {
3193    query: serde_json::Value,
3194    group_by: String,
3195    #[serde(default)]
3196    limit: Option<usize>,
3197    #[serde(default)]
3198    group_size: Option<usize>,
3199    #[serde(default)]
3200    with_payload: Option<bool>,
3201    #[serde(default)]
3202    with_vector: Option<bool>,
3203    #[serde(default)]
3204    filter: Option<serde_json::Value>,
3205}
3206
3207async fn query_groups(
3208    storage: web::Data<Arc<StorageManager>>,
3209    path: web::Path<String>,
3210    req: web::Json<QueryGroupsRequest>,
3211) -> ActixResult<HttpResponse> {
3212    let start_time = Instant::now();
3213    let name = path.into_inner();
3214    
3215    let collection = match storage.get_collection(&name) {
3216        Some(c) => c,
3217        None => {
3218            return Ok(qdrant_not_found("Collection not found", start_time));
3219        }
3220    };
3221
3222    let limit = req.limit.unwrap_or(5);
3223    let group_size = req.group_size.unwrap_or(3);
3224    let with_payload = req.with_payload.unwrap_or(true);
3225    let with_vector = req.with_vector.unwrap_or(false);
3226    let group_by = &req.group_by;
3227    
3228    // Parse query vector
3229    let query_vector = match &req.query {
3230        serde_json::Value::Array(arr) => {
3231            let vec: Result<Vec<f32>, _> = arr.iter()
3232                .map(|v| v.as_f64().map(|f| f as f32).ok_or("expected f32"))
3233                .collect();
3234            match vec {
3235                Ok(v) => Vector::new(v),
3236                Err(_) => {
3237                    return Ok(qdrant_error("Invalid query vector", start_time));
3238                }
3239            }
3240        }
3241        _ => {
3242            return Ok(qdrant_error("Query must be a vector array", start_time));
3243        }
3244    };
3245    
3246    // Search for points
3247    let search_results = collection.search(&query_vector, limit * group_size * 2, None);
3248    
3249    // Group results by the group_by field
3250    let mut groups: std::collections::HashMap<String, Vec<serde_json::Value>> = std::collections::HashMap::new();
3251    
3252    for (point, score) in search_results {
3253        // Get group key from payload
3254        let group_key = point.payload
3255            .as_ref()
3256            .and_then(|p| p.get(group_by))
3257            .and_then(|v| match v {
3258                serde_json::Value::String(s) => Some(s.clone()),
3259                serde_json::Value::Number(n) => Some(n.to_string()),
3260                _ => None,
3261            })
3262            .unwrap_or_else(|| "unknown".to_string());
3263        
3264        let group = groups.entry(group_key).or_default();
3265        
3266        // Only add if group hasn't reached group_size
3267        if group.len() < group_size {
3268            let mut hit = serde_json::json!({
3269                "id": point_id_to_json(&point.id),
3270                "score": score
3271            });
3272            
3273            if with_payload {
3274                hit["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
3275            }
3276            if with_vector {
3277                hit["vector"] = serde_json::json!(point.vector.as_slice());
3278            }
3279            
3280            group.push(hit);
3281        }
3282        
3283        // Stop if we have enough groups
3284        if groups.len() >= limit && groups.values().all(|g| g.len() >= group_size) {
3285            break;
3286        }
3287    }
3288    
3289    // Format response
3290    let group_results: Vec<serde_json::Value> = groups
3291        .into_iter()
3292        .take(limit)
3293        .map(|(key, hits)| {
3294            serde_json::json!({
3295                "id": key,
3296                "hits": hits
3297            })
3298        })
3299        .collect();
3300
3301    Ok(qdrant_response(serde_json::json!({
3302        "groups": group_results
3303    }), start_time))
3304}
3305
3306/// Create field index
3307#[derive(Deserialize)]
3308struct CreateIndexRequest {
3309    field_name: String,
3310    #[serde(default)]
3311    field_schema: Option<serde_json::Value>,
3312}
3313
3314async fn create_field_index(
3315    storage: web::Data<Arc<StorageManager>>,
3316    path: web::Path<String>,
3317    req: web::Json<CreateIndexRequest>,
3318) -> ActixResult<HttpResponse> {
3319    let start_time = Instant::now();
3320    let name = path.into_inner();
3321    
3322    let collection = match storage.get_collection(&name) {
3323        Some(c) => c,
3324        None => return Ok(qdrant_not_found("Collection not found", start_time)),
3325    };
3326
3327    // Parse field schema to determine index type
3328    let index_type = if let Some(schema) = &req.field_schema {
3329        match schema {
3330            serde_json::Value::String(s) => match s.as_str() {
3331                "keyword" => vectx_core::PayloadIndexType::Keyword,
3332                "integer" => vectx_core::PayloadIndexType::Integer,
3333                "float" => vectx_core::PayloadIndexType::Float,
3334                "bool" => vectx_core::PayloadIndexType::Bool,
3335                "geo" => vectx_core::PayloadIndexType::Geo,
3336                "text" => vectx_core::PayloadIndexType::Text,
3337                _ => vectx_core::PayloadIndexType::Keyword,
3338            }
3339            serde_json::Value::Object(obj) => {
3340                if let Some(type_val) = obj.get("type").and_then(|v| v.as_str()) {
3341                    match type_val {
3342                        "keyword" => vectx_core::PayloadIndexType::Keyword,
3343                        "integer" => vectx_core::PayloadIndexType::Integer,
3344                        "float" => vectx_core::PayloadIndexType::Float,
3345                        "bool" => vectx_core::PayloadIndexType::Bool,
3346                        "geo" => vectx_core::PayloadIndexType::Geo,
3347                        "text" => vectx_core::PayloadIndexType::Text,
3348                        _ => vectx_core::PayloadIndexType::Keyword,
3349                    }
3350                } else {
3351                    vectx_core::PayloadIndexType::Keyword
3352                }
3353            }
3354            _ => vectx_core::PayloadIndexType::Keyword,
3355        }
3356    } else {
3357        vectx_core::PayloadIndexType::Keyword
3358    };
3359
3360    match collection.create_payload_index(&req.field_name, index_type) {
3361        Ok(_) => {
3362            let operation_id = collection.next_operation_id();
3363            Ok(qdrant_response(serde_json::json!({
3364                "operation_id": operation_id,
3365                "status": "acknowledged"
3366            }), start_time))
3367        }
3368        Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
3369    }
3370}
3371
3372/// Delete field index
3373async fn delete_field_index(
3374    storage: web::Data<Arc<StorageManager>>,
3375    path: web::Path<(String, String)>,
3376) -> ActixResult<HttpResponse> {
3377    let start_time = Instant::now();
3378    let (name, field_name) = path.into_inner();
3379    
3380    let collection = match storage.get_collection(&name) {
3381        Some(c) => c,
3382        None => return Ok(qdrant_not_found("Collection not found", start_time)),
3383    };
3384
3385    match collection.delete_payload_index(&field_name) {
3386        Ok(_) => {
3387            let operation_id = collection.next_operation_id();
3388            Ok(qdrant_response(serde_json::json!({
3389                "operation_id": operation_id,
3390                "status": "acknowledged"
3391            }), start_time))
3392        }
3393        Err(e) => Ok(qdrant_error(&e.to_string(), start_time)),
3394    }
3395}
3396
3397/// Recommend points based on positive/negative examples
3398/// Uses average vector strategy: query = 2*avg(positive) - avg(negative)
3399/// This creates a single query vector that moves toward positive examples
3400/// and away from negative examples, then performs one efficient search.
3401#[derive(Deserialize)]
3402struct RecommendRequest {
3403    #[serde(default)]
3404    positive: Vec<serde_json::Value>,
3405    #[serde(default)]
3406    negative: Vec<serde_json::Value>,
3407    #[serde(default)]
3408    limit: Option<usize>,
3409    #[serde(default)]
3410    with_payload: Option<bool>,
3411    #[serde(default)]
3412    with_vector: Option<bool>,
3413    #[serde(default)]
3414    score_threshold: Option<f32>,
3415}
3416
3417async fn recommend_points(
3418    storage: web::Data<Arc<StorageManager>>,
3419    path: web::Path<String>,
3420    req: web::Json<RecommendRequest>,
3421) -> ActixResult<HttpResponse> {
3422    let start_time = Instant::now();
3423    let name = path.into_inner();
3424    
3425    let collection = match storage.get_collection(&name) {
3426        Some(c) => c,
3427        None => {
3428            return Ok(qdrant_not_found("Collection not found", start_time));
3429        }
3430    };
3431
3432    let limit = req.limit.unwrap_or(10);
3433    let with_payload = req.with_payload.unwrap_or(true);
3434    let with_vector = req.with_vector.unwrap_or(false);
3435    let score_threshold = req.score_threshold;
3436    
3437    // Collect point IDs to exclude from results
3438    let mut exclude_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
3439    
3440    // Helper to parse point ID from JSON
3441    let parse_id = |id: &serde_json::Value| -> Option<String> {
3442        match id {
3443            serde_json::Value::String(s) => Some(s.clone()),
3444            serde_json::Value::Number(n) => Some(n.to_string()),
3445            _ => None,
3446        }
3447    };
3448    
3449    // Collect positive vectors and compute average
3450    let mut positive_vectors: Vec<Vec<f32>> = Vec::new();
3451    for pos_id in &req.positive {
3452        if let Some(id_str) = parse_id(pos_id) {
3453            exclude_ids.insert(id_str.clone());
3454            if let Some(point) = collection.get(&id_str) {
3455                positive_vectors.push(point.vector.as_slice().to_vec());
3456            }
3457        }
3458    }
3459    
3460    if positive_vectors.is_empty() {
3461        return Ok(qdrant_error("At least one valid positive example is required", start_time));
3462    }
3463    
3464    // Collect negative vectors and compute average
3465    let mut negative_vectors: Vec<Vec<f32>> = Vec::new();
3466    for neg_id in &req.negative {
3467        if let Some(id_str) = parse_id(neg_id) {
3468            exclude_ids.insert(id_str.clone());
3469            if let Some(point) = collection.get(&id_str) {
3470                negative_vectors.push(point.vector.as_slice().to_vec());
3471            }
3472        }
3473    }
3474    
3475    // Compute average of positive vectors
3476    let dim = positive_vectors[0].len();
3477    let mut avg_positive = vec![0.0f32; dim];
3478    for vec in &positive_vectors {
3479        for (i, &val) in vec.iter().enumerate() {
3480            if i < dim {
3481                avg_positive[i] += val;
3482            }
3483        }
3484    }
3485    let pos_count = positive_vectors.len() as f32;
3486    for val in &mut avg_positive {
3487        *val /= pos_count;
3488    }
3489    
3490    // Create query vector: if negatives exist, use formula 2*avg_pos - avg_neg
3491    // This moves the query toward positives and away from negatives
3492    let query_vector = if !negative_vectors.is_empty() {
3493        let mut avg_negative = vec![0.0f32; dim];
3494        for vec in &negative_vectors {
3495            for (i, &val) in vec.iter().enumerate() {
3496                if i < dim {
3497                    avg_negative[i] += val;
3498                }
3499            }
3500        }
3501        let neg_count = negative_vectors.len() as f32;
3502        for val in &mut avg_negative {
3503            *val /= neg_count;
3504        }
3505        
3506        // Combined query: 2*avg_pos - avg_neg
3507        avg_positive.iter()
3508            .zip(avg_negative.iter())
3509            .map(|(&pos, &neg)| 2.0 * pos - neg)
3510            .collect::<Vec<f32>>()
3511    } else {
3512        avg_positive
3513    };
3514    
3515    // Perform single search with combined query vector
3516    let query = Vector::new(query_vector);
3517    
3518    // Request more results to account for excluded IDs
3519    let search_limit = limit + exclude_ids.len();
3520    let search_results = collection.search(&query, search_limit, None);
3521    
3522    // Build results, excluding input point IDs
3523    let mut results = Vec::with_capacity(limit);
3524    for (point, score) in search_results {
3525        // Skip excluded points (the positive/negative examples)
3526        let point_id_str = point.id.to_string();
3527        if exclude_ids.contains(&point_id_str) {
3528            continue;
3529        }
3530        
3531        // Apply score threshold if provided
3532        if let Some(threshold) = score_threshold {
3533            if score < threshold {
3534                continue;
3535            }
3536        }
3537        
3538        let mut result = serde_json::json!({
3539            "id": point_id_to_json(&point.id),
3540            "version": point.version,
3541            "score": score
3542        });
3543        
3544        if with_payload {
3545            result["payload"] = point.payload.clone().unwrap_or(serde_json::Value::Null);
3546        }
3547        if with_vector {
3548            result["vector"] = serde_json::json!(point.vector.as_slice());
3549        }
3550        
3551        results.push(result);
3552        
3553        if results.len() >= limit {
3554            break;
3555        }
3556    }
3557
3558    Ok(qdrant_response(results, start_time))
3559}