vectx_api/
grpc.rs

1use std::sync::Arc;
2use std::time::Instant;
3use tonic::{Request, Response, Status};
4use vectx_storage::StorageManager;
5use vectx_core::{Point, PointId, Vector, Distance as CoreDistance};
6
7pub mod vectx {
8    tonic::include_proto!("vectx");
9}
10
11use vectx::*;
12
13// ============================================================================
14// Qdrant Service (Health Check)
15// ============================================================================
16
17pub struct QdrantService;
18
19#[tonic::async_trait]
20impl vectx::qdrant_server::Qdrant for QdrantService {
21    async fn health_check(
22        &self,
23        _request: Request<HealthCheckRequest>,
24    ) -> Result<Response<HealthCheckReply>, Status> {
25        Ok(Response::new(HealthCheckReply {
26            title: "vectx".to_string(),
27            version: env!("CARGO_PKG_VERSION").to_string(),
28            commit: None,
29        }))
30    }
31}
32
33// ============================================================================
34// Collections Service
35// ============================================================================
36
37pub struct CollectionsService {
38    storage: Arc<StorageManager>,
39}
40
41impl CollectionsService {
42    pub fn new(storage: Arc<StorageManager>) -> Self {
43        Self { storage }
44    }
45}
46
47#[tonic::async_trait]
48impl vectx::collections_server::Collections for CollectionsService {
49    async fn get(
50        &self,
51        request: Request<GetCollectionInfoRequest>,
52    ) -> Result<Response<GetCollectionInfoResponse>, Status> {
53        let start_time = Instant::now();
54        let req = request.into_inner();
55        
56        let collection = self.storage.get_collection(&req.collection_name)
57            .ok_or_else(|| Status::not_found("Collection not found"))?;
58        
59        let points_count = collection.count() as u64;
60        let vector_dim = collection.vector_dim() as u64;
61        let distance = match collection.distance() {
62            CoreDistance::Cosine => Distance::Cosine,
63            CoreDistance::Euclidean => Distance::Euclid,
64            CoreDistance::Dot => Distance::Dot,
65        };
66
67        let result = CollectionInfo {
68            status: CollectionStatus::Green as i32,
69            optimizer_status: Some(OptimizerStatus {
70                ok: true,
71                error: String::new(),
72            }),
73            vectors_count: points_count,
74            indexed_vectors_count: points_count,
75            points_count,
76            segments_count: 1,
77            config: Some(CollectionConfig {
78                vectors: Some(VectorsConfig {
79                    config: Some(vectors_config::Config::Params(VectorParams {
80                        size: vector_dim,
81                        distance: distance as i32,
82                        on_disk: Some(false),
83                    })),
84                }),
85                shard_number: 1,
86                replication_factor: 1,
87                hnsw_config: Some(HnswConfig {
88                    m: 16,
89                    ef_construct: 100,
90                    full_scan_threshold: 10000,
91                    max_indexing_threads: None,
92                    on_disk: Some(false),
93                }),
94                wal_config: Some(WalConfig {
95                    wal_capacity_mb: 32,
96                    wal_segments_ahead: 0,
97                }),
98                optimizer_config: Some(OptimizerConfig {
99                    deleted_threshold: 0.2,
100                    vacuum_min_vector_number: 1000,
101                    default_segment_number: 0,
102                    max_segment_size: None,
103                    memmap_threshold: None,
104                    indexing_threshold: 20000,
105                    flush_interval_sec: 5,
106                }),
107            }),
108            payload_schema: Default::default(),
109        };
110
111        Ok(Response::new(GetCollectionInfoResponse {
112            result: Some(result),
113            time: start_time.elapsed().as_secs_f64(),
114        }))
115    }
116
117    async fn list(
118        &self,
119        _request: Request<ListCollectionsRequest>,
120    ) -> Result<Response<ListCollectionsResponse>, Status> {
121        let start_time = Instant::now();
122        let collections = self.storage.list_collections();
123        
124        let descriptions: Vec<CollectionDescription> = collections
125            .into_iter()
126            .map(|name| CollectionDescription { name })
127            .collect();
128
129        Ok(Response::new(ListCollectionsResponse {
130            collections: descriptions,
131            time: start_time.elapsed().as_secs_f64(),
132        }))
133    }
134
135    async fn create(
136        &self,
137        request: Request<CreateCollection>,
138    ) -> Result<Response<CollectionOperationResponse>, Status> {
139        let start_time = Instant::now();
140        let req = request.into_inner();
141        
142        let (vector_dim, distance) = if let Some(vectors_config) = req.vectors_config {
143            match vectors_config.config {
144                Some(vectors_config::Config::Params(params)) => {
145                    let dist = match Distance::try_from(params.distance) {
146                        Ok(Distance::Cosine) => CoreDistance::Cosine,
147                        Ok(Distance::Euclid) => CoreDistance::Euclidean,
148                        Ok(Distance::Dot) => CoreDistance::Dot,
149                        _ => CoreDistance::Cosine,
150                    };
151                    (params.size as usize, dist)
152                }
153                Some(vectors_config::Config::ParamsMap(map)) => {
154                    // Take first vector config
155                    if let Some((_, params)) = map.map.into_iter().next() {
156                        let dist = match Distance::try_from(params.distance) {
157                            Ok(Distance::Cosine) => CoreDistance::Cosine,
158                            Ok(Distance::Euclid) => CoreDistance::Euclidean,
159                            Ok(Distance::Dot) => CoreDistance::Dot,
160                            _ => CoreDistance::Cosine,
161                        };
162                        (params.size as usize, dist)
163                    } else {
164                        return Err(Status::invalid_argument("Vector configuration required"));
165                    }
166                }
167                None => return Err(Status::invalid_argument("Vector configuration required")),
168            }
169        } else {
170            return Err(Status::invalid_argument("Vector configuration required"));
171        };
172
173        let config = vectx_core::CollectionConfig {
174            name: req.collection_name,
175            vector_dim,
176            distance,
177            use_hnsw: true,
178            enable_bm25: false,
179        };
180
181        self.storage.create_collection(config)
182            .map_err(|e| Status::internal(e.to_string()))?;
183
184        Ok(Response::new(CollectionOperationResponse {
185            result: true,
186            time: start_time.elapsed().as_secs_f64(),
187        }))
188    }
189
190    async fn update(
191        &self,
192        request: Request<UpdateCollection>,
193    ) -> Result<Response<CollectionOperationResponse>, Status> {
194        let start_time = Instant::now();
195        let req = request.into_inner();
196        
197        // Verify collection exists
198        if self.storage.get_collection(&req.collection_name).is_none() {
199            return Err(Status::not_found("Collection not found"));
200        }
201
202        // Update not fully implemented - just return success
203        Ok(Response::new(CollectionOperationResponse {
204            result: true,
205            time: start_time.elapsed().as_secs_f64(),
206        }))
207    }
208
209    async fn delete(
210        &self,
211        request: Request<DeleteCollection>,
212    ) -> Result<Response<CollectionOperationResponse>, Status> {
213        let start_time = Instant::now();
214        let req = request.into_inner();
215        
216        match self.storage.delete_collection(&req.collection_name) {
217            Ok(true) => Ok(Response::new(CollectionOperationResponse {
218                result: true,
219                time: start_time.elapsed().as_secs_f64(),
220            })),
221            Ok(false) => Err(Status::not_found("Collection not found")),
222            Err(e) => Err(Status::internal(e.to_string())),
223        }
224    }
225
226    async fn collection_exists(
227        &self,
228        request: Request<CollectionExistsRequest>,
229    ) -> Result<Response<CollectionExistsResponse>, Status> {
230        let start_time = Instant::now();
231        let req = request.into_inner();
232        
233        let exists = self.storage.collection_exists(&req.collection_name);
234
235        Ok(Response::new(CollectionExistsResponse {
236            result: Some(CollectionExistsResult { exists }),
237            time: start_time.elapsed().as_secs_f64(),
238        }))
239    }
240}
241
242// ============================================================================
243// Points Service
244// ============================================================================
245
246pub struct PointsService {
247    storage: Arc<StorageManager>,
248}
249
250impl PointsService {
251    pub fn new(storage: Arc<StorageManager>) -> Self {
252        Self { storage }
253    }
254
255    fn parse_point_id(id: &vectx::PointId) -> Option<String> {
256        match &id.point_id_options {
257            Some(point_id::PointIdOptions::Num(n)) => Some(n.to_string()),
258            Some(point_id::PointIdOptions::Uuid(s)) => Some(s.clone()),
259            None => None,
260        }
261    }
262
263    fn to_proto_point_id(id: &PointId) -> vectx::PointId {
264        match id {
265            PointId::String(s) => vectx::PointId {
266                point_id_options: Some(point_id::PointIdOptions::Uuid(s.clone())),
267            },
268            PointId::Integer(i) => vectx::PointId {
269                point_id_options: Some(point_id::PointIdOptions::Num(*i)),
270            },
271            PointId::Uuid(u) => vectx::PointId {
272                point_id_options: Some(point_id::PointIdOptions::Uuid(u.to_string())),
273            },
274        }
275    }
276
277    fn proto_value_to_json(value: &vectx::Value) -> serde_json::Value {
278        match &value.kind {
279            Some(value::Kind::DoubleValue(v)) => serde_json::json!(*v),
280            Some(value::Kind::IntegerValue(v)) => serde_json::json!(*v),
281            Some(value::Kind::StringValue(v)) => serde_json::json!(v),
282            Some(value::Kind::BoolValue(v)) => serde_json::json!(*v),
283            Some(value::Kind::ListValue(list)) => {
284                let values: Vec<serde_json::Value> = list.values.iter()
285                    .map(Self::proto_value_to_json)
286                    .collect();
287                serde_json::json!(values)
288            }
289            Some(value::Kind::StructValue(s)) => {
290                let map: serde_json::Map<String, serde_json::Value> = s.fields.iter()
291                    .map(|(k, v)| (k.clone(), Self::proto_value_to_json(v)))
292                    .collect();
293                serde_json::Value::Object(map)
294            }
295            Some(value::Kind::NullValue(_)) | None => serde_json::Value::Null,
296        }
297    }
298
299    fn json_to_proto_value(value: &serde_json::Value) -> vectx::Value {
300        let kind = match value {
301            serde_json::Value::Null => Some(value::Kind::NullValue(0)),
302            serde_json::Value::Bool(b) => Some(value::Kind::BoolValue(*b)),
303            serde_json::Value::Number(n) => {
304                if let Some(i) = n.as_i64() {
305                    Some(value::Kind::IntegerValue(i))
306                } else if let Some(f) = n.as_f64() {
307                    Some(value::Kind::DoubleValue(f))
308                } else {
309                    None
310                }
311            }
312            serde_json::Value::String(s) => Some(value::Kind::StringValue(s.clone())),
313            serde_json::Value::Array(arr) => {
314                let values = arr.iter().map(Self::json_to_proto_value).collect();
315                Some(value::Kind::ListValue(ListValue { values }))
316            }
317            serde_json::Value::Object(map) => {
318                let fields = map.iter()
319                    .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
320                    .collect();
321                Some(value::Kind::StructValue(Struct { fields }))
322            }
323        };
324        vectx::Value { kind }
325    }
326}
327
328#[tonic::async_trait]
329impl vectx::points_server::Points for PointsService {
330    async fn upsert(
331        &self,
332        request: Request<UpsertPoints>,
333    ) -> Result<Response<PointsOperationResponse>, Status> {
334        let start_time = Instant::now();
335        let req = request.into_inner();
336        
337        let collection = self.storage.get_collection(&req.collection_name)
338            .ok_or_else(|| Status::not_found("Collection not found"))?;
339
340        let points: Result<Vec<Point>, Status> = req.points.into_iter().map(|p| {
341            let id = p.id.as_ref()
342                .and_then(Self::parse_point_id)
343                .ok_or_else(|| Status::invalid_argument("Point ID required"))?;
344            
345            let point_id = if let Ok(num) = id.parse::<u64>() {
346                PointId::Integer(num)
347            } else {
348                PointId::String(id)
349            };
350            
351            let vector_data = p.vectors.as_ref()
352                .and_then(|vi| match &vi.variant {
353                    Some(vector_input::Variant::Dense(v)) => Some(v.data.clone()),
354                    Some(vector_input::Variant::Named(nv)) => {
355                        nv.vectors.values().next().map(|v| v.data.clone())
356                    }
357                    None => None,
358                })
359                .ok_or_else(|| Status::invalid_argument("Vector required"))?;
360            
361            let payload = if p.payload.is_empty() {
362                None
363            } else {
364                let json_map: serde_json::Map<String, serde_json::Value> = p.payload.iter()
365                    .map(|(k, v)| (k.clone(), Self::proto_value_to_json(v)))
366                    .collect();
367                Some(serde_json::Value::Object(json_map))
368            };
369            
370            let vector = Vector::new(vector_data);
371            Ok(Point::new(point_id, vector, payload))
372        }).collect();
373
374        let points = points?;
375        let count = points.len();
376        
377        if count > 1 {
378            collection.batch_upsert(points)
379                .map_err(|e| Status::internal(e.to_string()))?;
380        } else if let Some(point) = points.into_iter().next() {
381            collection.upsert(point)
382                .map_err(|e| Status::internal(e.to_string()))?;
383        }
384
385        Ok(Response::new(PointsOperationResponse {
386            result: Some(UpdateResult {
387                operation_id: 0,
388                status: UpdateStatus::Acknowledged as i32,
389            }),
390            time: start_time.elapsed().as_secs_f64(),
391        }))
392    }
393
394    async fn delete(
395        &self,
396        request: Request<DeletePoints>,
397    ) -> Result<Response<PointsOperationResponse>, Status> {
398        let start_time = Instant::now();
399        let req = request.into_inner();
400        
401        let collection = self.storage.get_collection(&req.collection_name)
402            .ok_or_else(|| Status::not_found("Collection not found"))?;
403
404        if let Some(points_selector) = req.points {
405            if let Some(points_selector::PointsSelectorOneOf::Points(list)) = points_selector.points_selector_one_of {
406                for point_id in list.ids {
407                    if let Some(id_str) = Self::parse_point_id(&point_id) {
408                        let _ = collection.delete(&id_str);
409                    }
410                }
411            }
412        }
413
414        Ok(Response::new(PointsOperationResponse {
415            result: Some(UpdateResult {
416                operation_id: 0,
417                status: UpdateStatus::Acknowledged as i32,
418            }),
419            time: start_time.elapsed().as_secs_f64(),
420        }))
421    }
422
423    async fn get(
424        &self,
425        request: Request<GetPoints>,
426    ) -> Result<Response<GetResponse>, Status> {
427        let start_time = Instant::now();
428        let req = request.into_inner();
429        
430        let collection = self.storage.get_collection(&req.collection_name)
431            .ok_or_else(|| Status::not_found("Collection not found"))?;
432
433        let mut results = Vec::new();
434        for point_id in req.ids {
435            if let Some(id_str) = Self::parse_point_id(&point_id) {
436                if let Some(point) = collection.get(&id_str) {
437                    let payload: std::collections::HashMap<String, vectx::Value> = point.payload
438                        .as_ref()
439                        .and_then(|p| p.as_object())
440                        .map(|obj| {
441                            obj.iter()
442                                .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
443                                .collect()
444                        })
445                        .unwrap_or_default();
446
447                    results.push(RetrievedPoint {
448                        id: Some(Self::to_proto_point_id(&point.id)),
449                        payload,
450                        vectors: Some(VectorInput {
451                            variant: Some(vector_input::Variant::Dense(vectx::Vector {
452                                data: point.vector.as_slice().to_vec(),
453                            })),
454                        }),
455                    });
456                }
457            }
458        }
459
460        Ok(Response::new(GetResponse {
461            result: results,
462            time: start_time.elapsed().as_secs_f64(),
463        }))
464    }
465
466    async fn set_payload(
467        &self,
468        request: Request<SetPayloadPoints>,
469    ) -> Result<Response<PointsOperationResponse>, Status> {
470        let start_time = Instant::now();
471        let req = request.into_inner();
472        
473        if self.storage.get_collection(&req.collection_name).is_none() {
474            return Err(Status::not_found("Collection not found"));
475        }
476
477        // Payload update stub - not fully implemented
478        Ok(Response::new(PointsOperationResponse {
479            result: Some(UpdateResult {
480                operation_id: 0,
481                status: UpdateStatus::Acknowledged as i32,
482            }),
483            time: start_time.elapsed().as_secs_f64(),
484        }))
485    }
486
487    async fn delete_payload(
488        &self,
489        request: Request<DeletePayloadPoints>,
490    ) -> Result<Response<PointsOperationResponse>, Status> {
491        let start_time = Instant::now();
492        let req = request.into_inner();
493        
494        if self.storage.get_collection(&req.collection_name).is_none() {
495            return Err(Status::not_found("Collection not found"));
496        }
497
498        Ok(Response::new(PointsOperationResponse {
499            result: Some(UpdateResult {
500                operation_id: 0,
501                status: UpdateStatus::Acknowledged as i32,
502            }),
503            time: start_time.elapsed().as_secs_f64(),
504        }))
505    }
506
507    async fn clear_payload(
508        &self,
509        request: Request<ClearPayloadPoints>,
510    ) -> Result<Response<PointsOperationResponse>, Status> {
511        let start_time = Instant::now();
512        let req = request.into_inner();
513        
514        if self.storage.get_collection(&req.collection_name).is_none() {
515            return Err(Status::not_found("Collection not found"));
516        }
517
518        Ok(Response::new(PointsOperationResponse {
519            result: Some(UpdateResult {
520                operation_id: 0,
521                status: UpdateStatus::Acknowledged as i32,
522            }),
523            time: start_time.elapsed().as_secs_f64(),
524        }))
525    }
526
527    async fn create_field_index(
528        &self,
529        request: Request<CreateFieldIndexCollection>,
530    ) -> Result<Response<PointsOperationResponse>, Status> {
531        let start_time = Instant::now();
532        let req = request.into_inner();
533        
534        if self.storage.get_collection(&req.collection_name).is_none() {
535            return Err(Status::not_found("Collection not found"));
536        }
537
538        Ok(Response::new(PointsOperationResponse {
539            result: Some(UpdateResult {
540                operation_id: 0,
541                status: UpdateStatus::Acknowledged as i32,
542            }),
543            time: start_time.elapsed().as_secs_f64(),
544        }))
545    }
546
547    async fn delete_field_index(
548        &self,
549        request: Request<DeleteFieldIndexCollection>,
550    ) -> Result<Response<PointsOperationResponse>, Status> {
551        let start_time = Instant::now();
552        let req = request.into_inner();
553        
554        if self.storage.get_collection(&req.collection_name).is_none() {
555            return Err(Status::not_found("Collection not found"));
556        }
557
558        Ok(Response::new(PointsOperationResponse {
559            result: Some(UpdateResult {
560                operation_id: 0,
561                status: UpdateStatus::Acknowledged as i32,
562            }),
563            time: start_time.elapsed().as_secs_f64(),
564        }))
565    }
566
567    async fn search(
568        &self,
569        request: Request<SearchPoints>,
570    ) -> Result<Response<SearchResponse>, Status> {
571        let start_time = Instant::now();
572        let req = request.into_inner();
573        
574        let collection = self.storage.get_collection(&req.collection_name)
575            .ok_or_else(|| Status::not_found("Collection not found"))?;
576
577        let query = Vector::new(req.vector);
578        let limit = req.limit as usize;
579        
580        let results = collection.search(&query, limit, None);
581        
582        let scored_points: Vec<ScoredPoint> = results.into_iter().map(|(point, score)| {
583            let payload: std::collections::HashMap<String, vectx::Value> = point.payload
584                .as_ref()
585                .and_then(|p| p.as_object())
586                .map(|obj| {
587                    obj.iter()
588                        .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
589                        .collect()
590                })
591                .unwrap_or_default();
592
593            ScoredPoint {
594                id: Some(Self::to_proto_point_id(&point.id)),
595                payload,
596                score,
597                vectors: None,
598                version: Some(0),
599            }
600        }).collect();
601
602        Ok(Response::new(SearchResponse {
603            result: scored_points,
604            time: start_time.elapsed().as_secs_f64(),
605        }))
606    }
607
608    async fn scroll(
609        &self,
610        request: Request<ScrollPoints>,
611    ) -> Result<Response<ScrollResponse>, Status> {
612        let start_time = Instant::now();
613        let req = request.into_inner();
614        
615        let collection = self.storage.get_collection(&req.collection_name)
616            .ok_or_else(|| Status::not_found("Collection not found"))?;
617
618        let limit = req.limit.unwrap_or(10) as usize;
619        let all_points = collection.get_all_points();
620        
621        // Get offset
622        let offset_id: Option<String> = req.offset.as_ref()
623            .and_then(Self::parse_point_id);
624        
625        let mut points_iter = all_points.iter();
626        
627        // Skip to offset
628        if let Some(ref offset) = offset_id {
629            while let Some(p) = points_iter.next() {
630                if p.id.to_string() == *offset {
631                    break;
632                }
633            }
634        }
635        
636        let mut results = Vec::new();
637        let mut last_id = None;
638        
639        for point in points_iter.take(limit) {
640            last_id = Some(Self::to_proto_point_id(&point.id));
641            
642            let payload: std::collections::HashMap<String, vectx::Value> = point.payload
643                .as_ref()
644                .and_then(|p| p.as_object())
645                .map(|obj| {
646                    obj.iter()
647                        .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
648                        .collect()
649                })
650                .unwrap_or_default();
651
652            results.push(RetrievedPoint {
653                id: Some(Self::to_proto_point_id(&point.id)),
654                payload,
655                vectors: Some(VectorInput {
656                    variant: Some(vector_input::Variant::Dense(vectx::Vector {
657                        data: point.vector.as_slice().to_vec(),
658                    })),
659                }),
660            });
661        }
662
663        // Determine next page offset
664        let next_offset = if results.len() == limit {
665            last_id
666        } else {
667            None
668        };
669
670        Ok(Response::new(ScrollResponse {
671            next_page_offset: next_offset,
672            result: results,
673            time: start_time.elapsed().as_secs_f64(),
674        }))
675    }
676
677    async fn recommend(
678        &self,
679        request: Request<RecommendPoints>,
680    ) -> Result<Response<RecommendResponse>, Status> {
681        let start_time = Instant::now();
682        let req = request.into_inner();
683        
684        let collection = self.storage.get_collection(&req.collection_name)
685            .ok_or_else(|| Status::not_found("Collection not found"))?;
686
687        let limit = req.limit as usize;
688        
689        // Collect positive vectors
690        let mut positive_vectors: Vec<Vec<f32>> = Vec::new();
691        let mut exclude_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
692        
693        for pos_id in &req.positive {
694            if let Some(id_str) = Self::parse_point_id(pos_id) {
695                exclude_ids.insert(id_str.clone());
696                if let Some(point) = collection.get(&id_str) {
697                    positive_vectors.push(point.vector.as_slice().to_vec());
698                }
699            }
700        }
701        
702        if positive_vectors.is_empty() {
703            return Err(Status::invalid_argument("At least one valid positive example required"));
704        }
705        
706        // Collect negative vectors
707        let mut negative_vectors: Vec<Vec<f32>> = Vec::new();
708        for neg_id in &req.negative {
709            if let Some(id_str) = Self::parse_point_id(neg_id) {
710                exclude_ids.insert(id_str.clone());
711                if let Some(point) = collection.get(&id_str) {
712                    negative_vectors.push(point.vector.as_slice().to_vec());
713                }
714            }
715        }
716        
717        // Compute average positive
718        let dim = positive_vectors[0].len();
719        let mut avg_positive = vec![0.0f32; dim];
720        for vec in &positive_vectors {
721            for (i, &val) in vec.iter().enumerate() {
722                if i < dim { avg_positive[i] += val; }
723            }
724        }
725        let pos_count = positive_vectors.len() as f32;
726        for val in &mut avg_positive { *val /= pos_count; }
727        
728        // Create query vector
729        let query_data = if !negative_vectors.is_empty() {
730            let mut avg_negative = vec![0.0f32; dim];
731            for vec in &negative_vectors {
732                for (i, &val) in vec.iter().enumerate() {
733                    if i < dim { avg_negative[i] += val; }
734                }
735            }
736            let neg_count = negative_vectors.len() as f32;
737            for val in &mut avg_negative { *val /= neg_count; }
738            
739            avg_positive.iter()
740                .zip(avg_negative.iter())
741                .map(|(p, n)| 2.0 * p - n)
742                .collect()
743        } else {
744            avg_positive
745        };
746        
747        let query = Vector::new(query_data);
748        let search_results = collection.search(&query, limit + exclude_ids.len(), None);
749        
750        let scored_points: Vec<ScoredPoint> = search_results
751            .into_iter()
752            .filter(|(point, _)| !exclude_ids.contains(&point.id.to_string()))
753            .take(limit)
754            .map(|(point, score)| {
755                let payload: std::collections::HashMap<String, vectx::Value> = point.payload
756                    .as_ref()
757                    .and_then(|p| p.as_object())
758                    .map(|obj| {
759                        obj.iter()
760                            .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
761                            .collect()
762                    })
763                    .unwrap_or_default();
764
765                ScoredPoint {
766                    id: Some(Self::to_proto_point_id(&point.id)),
767                    payload,
768                    score,
769                    vectors: None,
770                    version: Some(0),
771                }
772            })
773            .collect();
774
775        Ok(Response::new(RecommendResponse {
776            result: scored_points,
777            time: start_time.elapsed().as_secs_f64(),
778        }))
779    }
780
781    async fn count(
782        &self,
783        request: Request<CountPoints>,
784    ) -> Result<Response<CountResponse>, Status> {
785        let start_time = Instant::now();
786        let req = request.into_inner();
787        
788        let collection = self.storage.get_collection(&req.collection_name)
789            .ok_or_else(|| Status::not_found("Collection not found"))?;
790
791        Ok(Response::new(CountResponse {
792            result: Some(CountResult {
793                count: collection.count() as u64,
794            }),
795            time: start_time.elapsed().as_secs_f64(),
796        }))
797    }
798
799    async fn query(
800        &self,
801        request: Request<QueryPoints>,
802    ) -> Result<Response<QueryResponse>, Status> {
803        let start_time = Instant::now();
804        let req = request.into_inner();
805        
806        let collection = self.storage.get_collection(&req.collection_name)
807            .ok_or_else(|| Status::not_found("Collection not found"))?;
808
809        let limit = req.limit as usize;
810        
811        let query_data = req.query
812            .and_then(|vi| match vi.variant {
813                Some(vector_input::Variant::Dense(v)) => Some(v.data),
814                Some(vector_input::Variant::Named(nv)) => {
815                    nv.vectors.values().next().map(|v| v.data.clone())
816                }
817                None => None,
818            })
819            .ok_or_else(|| Status::invalid_argument("Query vector required"))?;
820        
821        let query = Vector::new(query_data);
822        let results = collection.search(&query, limit, None);
823        
824        let scored_points: Vec<ScoredPoint> = results.into_iter().map(|(point, score)| {
825            let payload: std::collections::HashMap<String, vectx::Value> = point.payload
826                .as_ref()
827                .and_then(|p| p.as_object())
828                .map(|obj| {
829                    obj.iter()
830                        .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
831                        .collect()
832                })
833                .unwrap_or_default();
834
835            ScoredPoint {
836                id: Some(Self::to_proto_point_id(&point.id)),
837                payload,
838                score,
839                vectors: None,
840                version: Some(0),
841            }
842        }).collect();
843
844        Ok(Response::new(QueryResponse {
845            result: scored_points,
846            time: start_time.elapsed().as_secs_f64(),
847        }))
848    }
849}
850
851// ============================================================================
852// Snapshots Service
853// ============================================================================
854
855pub struct SnapshotsService {
856    storage: Arc<StorageManager>,
857}
858
859impl SnapshotsService {
860    pub fn new(storage: Arc<StorageManager>) -> Self {
861        Self { storage }
862    }
863}
864
865#[tonic::async_trait]
866impl vectx::snapshots_server::Snapshots for SnapshotsService {
867    async fn create(
868        &self,
869        request: Request<CreateSnapshotRequest>,
870    ) -> Result<Response<CreateSnapshotResponse>, Status> {
871        let start_time = Instant::now();
872        let req = request.into_inner();
873        
874        match self.storage.create_collection_snapshot(&req.collection_name) {
875            Ok(snapshot) => Ok(Response::new(CreateSnapshotResponse {
876                result: Some(SnapshotDescription {
877                    name: snapshot.name,
878                    creation_time: snapshot.creation_time.unwrap_or_default(),
879                    size: snapshot.size as i64,
880                    checksum: snapshot.checksum,
881                }),
882                time: start_time.elapsed().as_secs_f64(),
883            })),
884            Err(e) => Err(Status::internal(e.to_string())),
885        }
886    }
887
888    async fn list(
889        &self,
890        request: Request<ListSnapshotsRequest>,
891    ) -> Result<Response<ListSnapshotsResponse>, Status> {
892        let start_time = Instant::now();
893        let req = request.into_inner();
894        
895        match self.storage.list_collection_snapshots(&req.collection_name) {
896            Ok(snapshots) => {
897                let descriptions: Vec<SnapshotDescription> = snapshots
898                    .into_iter()
899                    .map(|s| SnapshotDescription {
900                        name: s.name,
901                        creation_time: s.creation_time.unwrap_or_default(),
902                        size: s.size as i64,
903                        checksum: s.checksum,
904                    })
905                    .collect();
906                
907                Ok(Response::new(ListSnapshotsResponse {
908                    snapshots: descriptions,
909                    time: start_time.elapsed().as_secs_f64(),
910                }))
911            }
912            Err(e) => Err(Status::internal(e.to_string())),
913        }
914    }
915
916    async fn delete(
917        &self,
918        request: Request<DeleteSnapshotRequest>,
919    ) -> Result<Response<DeleteSnapshotResponse>, Status> {
920        let start_time = Instant::now();
921        let req = request.into_inner();
922        
923        match self.storage.delete_collection_snapshot(&req.collection_name, &req.snapshot_name) {
924            Ok(true) => Ok(Response::new(DeleteSnapshotResponse {
925                result: true,
926                time: start_time.elapsed().as_secs_f64(),
927            })),
928            Ok(false) => Err(Status::not_found("Snapshot not found")),
929            Err(e) => Err(Status::internal(e.to_string())),
930        }
931    }
932
933    async fn recover(
934        &self,
935        request: Request<RecoverSnapshotRequest>,
936    ) -> Result<Response<RecoverSnapshotResponse>, Status> {
937        let start_time = Instant::now();
938        let req = request.into_inner();
939        
940        match self.storage.recover_from_snapshot(&req.collection_name, &req.location) {
941            Ok(_) => Ok(Response::new(RecoverSnapshotResponse {
942                result: true,
943                time: start_time.elapsed().as_secs_f64(),
944            })),
945            Err(e) => Err(Status::internal(e.to_string())),
946        }
947    }
948}
949
950// ============================================================================
951// gRPC Server Startup
952// ============================================================================
953
954pub struct GrpcApi;
955
956impl GrpcApi {
957    pub async fn start(
958        storage: Arc<StorageManager>,
959        port: u16,
960    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
961        let addr = format!("0.0.0.0:{}", port).parse()?;
962        
963        let qdrant_service = vectx::qdrant_server::QdrantServer::new(QdrantService);
964        let collections_service = vectx::collections_server::CollectionsServer::new(
965            CollectionsService::new(storage.clone())
966        );
967        let points_service = vectx::points_server::PointsServer::new(
968            PointsService::new(storage.clone())
969        );
970        let snapshots_service = vectx::snapshots_server::SnapshotsServer::new(
971            SnapshotsService::new(storage)
972        );
973        
974        println!("gRPC server listening on {}", addr);
975        
976        tonic::transport::Server::builder()
977            .add_service(qdrant_service)
978            .add_service(collections_service)
979            .add_service(points_service)
980            .add_service(snapshots_service)
981            .serve(addr)
982            .await?;
983        
984        Ok(())
985    }
986}