1use std::sync::Arc;
2use std::time::Instant;
3use tonic::{Request, Response, Status};
4use vectx_storage::StorageManager;
5use vectx_core::{Point, PointId, Vector, Distance as CoreDistance};
6
7pub mod vectx {
8 tonic::include_proto!("vectx");
9}
10
11use vectx::*;
12
13pub struct QdrantService;
18
19#[tonic::async_trait]
20impl vectx::qdrant_server::Qdrant for QdrantService {
21 async fn health_check(
22 &self,
23 _request: Request<HealthCheckRequest>,
24 ) -> Result<Response<HealthCheckReply>, Status> {
25 Ok(Response::new(HealthCheckReply {
26 title: "vectx".to_string(),
27 version: env!("CARGO_PKG_VERSION").to_string(),
28 commit: None,
29 }))
30 }
31}
32
33pub struct CollectionsService {
38 storage: Arc<StorageManager>,
39}
40
41impl CollectionsService {
42 pub fn new(storage: Arc<StorageManager>) -> Self {
43 Self { storage }
44 }
45}
46
47#[tonic::async_trait]
48impl vectx::collections_server::Collections for CollectionsService {
49 async fn get(
50 &self,
51 request: Request<GetCollectionInfoRequest>,
52 ) -> Result<Response<GetCollectionInfoResponse>, Status> {
53 let start_time = Instant::now();
54 let req = request.into_inner();
55
56 let collection = self.storage.get_collection(&req.collection_name)
57 .ok_or_else(|| Status::not_found("Collection not found"))?;
58
59 let points_count = collection.count() as u64;
60 let vector_dim = collection.vector_dim() as u64;
61 let distance = match collection.distance() {
62 CoreDistance::Cosine => Distance::Cosine,
63 CoreDistance::Euclidean => Distance::Euclid,
64 CoreDistance::Dot => Distance::Dot,
65 };
66
67 let result = CollectionInfo {
68 status: CollectionStatus::Green as i32,
69 optimizer_status: Some(OptimizerStatus {
70 ok: true,
71 error: String::new(),
72 }),
73 vectors_count: points_count,
74 indexed_vectors_count: points_count,
75 points_count,
76 segments_count: 1,
77 config: Some(CollectionConfig {
78 vectors: Some(VectorsConfig {
79 config: Some(vectors_config::Config::Params(VectorParams {
80 size: vector_dim,
81 distance: distance as i32,
82 on_disk: Some(false),
83 })),
84 }),
85 shard_number: 1,
86 replication_factor: 1,
87 hnsw_config: Some(HnswConfig {
88 m: 16,
89 ef_construct: 100,
90 full_scan_threshold: 10000,
91 max_indexing_threads: None,
92 on_disk: Some(false),
93 }),
94 wal_config: Some(WalConfig {
95 wal_capacity_mb: 32,
96 wal_segments_ahead: 0,
97 }),
98 optimizer_config: Some(OptimizerConfig {
99 deleted_threshold: 0.2,
100 vacuum_min_vector_number: 1000,
101 default_segment_number: 0,
102 max_segment_size: None,
103 memmap_threshold: None,
104 indexing_threshold: 20000,
105 flush_interval_sec: 5,
106 }),
107 }),
108 payload_schema: Default::default(),
109 };
110
111 Ok(Response::new(GetCollectionInfoResponse {
112 result: Some(result),
113 time: start_time.elapsed().as_secs_f64(),
114 }))
115 }
116
117 async fn list(
118 &self,
119 _request: Request<ListCollectionsRequest>,
120 ) -> Result<Response<ListCollectionsResponse>, Status> {
121 let start_time = Instant::now();
122 let collections = self.storage.list_collections();
123
124 let descriptions: Vec<CollectionDescription> = collections
125 .into_iter()
126 .map(|name| CollectionDescription { name })
127 .collect();
128
129 Ok(Response::new(ListCollectionsResponse {
130 collections: descriptions,
131 time: start_time.elapsed().as_secs_f64(),
132 }))
133 }
134
135 async fn create(
136 &self,
137 request: Request<CreateCollection>,
138 ) -> Result<Response<CollectionOperationResponse>, Status> {
139 let start_time = Instant::now();
140 let req = request.into_inner();
141
142 let (vector_dim, distance) = if let Some(vectors_config) = req.vectors_config {
143 match vectors_config.config {
144 Some(vectors_config::Config::Params(params)) => {
145 let dist = match Distance::try_from(params.distance) {
146 Ok(Distance::Cosine) => CoreDistance::Cosine,
147 Ok(Distance::Euclid) => CoreDistance::Euclidean,
148 Ok(Distance::Dot) => CoreDistance::Dot,
149 _ => CoreDistance::Cosine,
150 };
151 (params.size as usize, dist)
152 }
153 Some(vectors_config::Config::ParamsMap(map)) => {
154 if let Some((_, params)) = map.map.into_iter().next() {
156 let dist = match Distance::try_from(params.distance) {
157 Ok(Distance::Cosine) => CoreDistance::Cosine,
158 Ok(Distance::Euclid) => CoreDistance::Euclidean,
159 Ok(Distance::Dot) => CoreDistance::Dot,
160 _ => CoreDistance::Cosine,
161 };
162 (params.size as usize, dist)
163 } else {
164 return Err(Status::invalid_argument("Vector configuration required"));
165 }
166 }
167 None => return Err(Status::invalid_argument("Vector configuration required")),
168 }
169 } else {
170 return Err(Status::invalid_argument("Vector configuration required"));
171 };
172
173 let config = vectx_core::CollectionConfig {
174 name: req.collection_name,
175 vector_dim,
176 distance,
177 use_hnsw: true,
178 enable_bm25: false,
179 };
180
181 self.storage.create_collection(config)
182 .map_err(|e| Status::internal(e.to_string()))?;
183
184 Ok(Response::new(CollectionOperationResponse {
185 result: true,
186 time: start_time.elapsed().as_secs_f64(),
187 }))
188 }
189
190 async fn update(
191 &self,
192 request: Request<UpdateCollection>,
193 ) -> Result<Response<CollectionOperationResponse>, Status> {
194 let start_time = Instant::now();
195 let req = request.into_inner();
196
197 if self.storage.get_collection(&req.collection_name).is_none() {
199 return Err(Status::not_found("Collection not found"));
200 }
201
202 Ok(Response::new(CollectionOperationResponse {
204 result: true,
205 time: start_time.elapsed().as_secs_f64(),
206 }))
207 }
208
209 async fn delete(
210 &self,
211 request: Request<DeleteCollection>,
212 ) -> Result<Response<CollectionOperationResponse>, Status> {
213 let start_time = Instant::now();
214 let req = request.into_inner();
215
216 match self.storage.delete_collection(&req.collection_name) {
217 Ok(true) => Ok(Response::new(CollectionOperationResponse {
218 result: true,
219 time: start_time.elapsed().as_secs_f64(),
220 })),
221 Ok(false) => Err(Status::not_found("Collection not found")),
222 Err(e) => Err(Status::internal(e.to_string())),
223 }
224 }
225
226 async fn collection_exists(
227 &self,
228 request: Request<CollectionExistsRequest>,
229 ) -> Result<Response<CollectionExistsResponse>, Status> {
230 let start_time = Instant::now();
231 let req = request.into_inner();
232
233 let exists = self.storage.collection_exists(&req.collection_name);
234
235 Ok(Response::new(CollectionExistsResponse {
236 result: Some(CollectionExistsResult { exists }),
237 time: start_time.elapsed().as_secs_f64(),
238 }))
239 }
240}
241
242pub struct PointsService {
247 storage: Arc<StorageManager>,
248}
249
250impl PointsService {
251 pub fn new(storage: Arc<StorageManager>) -> Self {
252 Self { storage }
253 }
254
255 fn parse_point_id(id: &vectx::PointId) -> Option<String> {
256 match &id.point_id_options {
257 Some(point_id::PointIdOptions::Num(n)) => Some(n.to_string()),
258 Some(point_id::PointIdOptions::Uuid(s)) => Some(s.clone()),
259 None => None,
260 }
261 }
262
263 fn to_proto_point_id(id: &PointId) -> vectx::PointId {
264 match id {
265 PointId::String(s) => vectx::PointId {
266 point_id_options: Some(point_id::PointIdOptions::Uuid(s.clone())),
267 },
268 PointId::Integer(i) => vectx::PointId {
269 point_id_options: Some(point_id::PointIdOptions::Num(*i)),
270 },
271 PointId::Uuid(u) => vectx::PointId {
272 point_id_options: Some(point_id::PointIdOptions::Uuid(u.to_string())),
273 },
274 }
275 }
276
277 fn proto_value_to_json(value: &vectx::Value) -> serde_json::Value {
278 match &value.kind {
279 Some(value::Kind::DoubleValue(v)) => serde_json::json!(*v),
280 Some(value::Kind::IntegerValue(v)) => serde_json::json!(*v),
281 Some(value::Kind::StringValue(v)) => serde_json::json!(v),
282 Some(value::Kind::BoolValue(v)) => serde_json::json!(*v),
283 Some(value::Kind::ListValue(list)) => {
284 let values: Vec<serde_json::Value> = list.values.iter()
285 .map(Self::proto_value_to_json)
286 .collect();
287 serde_json::json!(values)
288 }
289 Some(value::Kind::StructValue(s)) => {
290 let map: serde_json::Map<String, serde_json::Value> = s.fields.iter()
291 .map(|(k, v)| (k.clone(), Self::proto_value_to_json(v)))
292 .collect();
293 serde_json::Value::Object(map)
294 }
295 Some(value::Kind::NullValue(_)) | None => serde_json::Value::Null,
296 }
297 }
298
299 fn json_to_proto_value(value: &serde_json::Value) -> vectx::Value {
300 let kind = match value {
301 serde_json::Value::Null => Some(value::Kind::NullValue(0)),
302 serde_json::Value::Bool(b) => Some(value::Kind::BoolValue(*b)),
303 serde_json::Value::Number(n) => {
304 if let Some(i) = n.as_i64() {
305 Some(value::Kind::IntegerValue(i))
306 } else if let Some(f) = n.as_f64() {
307 Some(value::Kind::DoubleValue(f))
308 } else {
309 None
310 }
311 }
312 serde_json::Value::String(s) => Some(value::Kind::StringValue(s.clone())),
313 serde_json::Value::Array(arr) => {
314 let values = arr.iter().map(Self::json_to_proto_value).collect();
315 Some(value::Kind::ListValue(ListValue { values }))
316 }
317 serde_json::Value::Object(map) => {
318 let fields = map.iter()
319 .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
320 .collect();
321 Some(value::Kind::StructValue(Struct { fields }))
322 }
323 };
324 vectx::Value { kind }
325 }
326}
327
328#[tonic::async_trait]
329impl vectx::points_server::Points for PointsService {
330 async fn upsert(
331 &self,
332 request: Request<UpsertPoints>,
333 ) -> Result<Response<PointsOperationResponse>, Status> {
334 let start_time = Instant::now();
335 let req = request.into_inner();
336
337 let collection = self.storage.get_collection(&req.collection_name)
338 .ok_or_else(|| Status::not_found("Collection not found"))?;
339
340 let points: Result<Vec<Point>, Status> = req.points.into_iter().map(|p| {
341 let id = p.id.as_ref()
342 .and_then(Self::parse_point_id)
343 .ok_or_else(|| Status::invalid_argument("Point ID required"))?;
344
345 let point_id = if let Ok(num) = id.parse::<u64>() {
346 PointId::Integer(num)
347 } else {
348 PointId::String(id)
349 };
350
351 let vector_data = p.vectors.as_ref()
352 .and_then(|vi| match &vi.variant {
353 Some(vector_input::Variant::Dense(v)) => Some(v.data.clone()),
354 Some(vector_input::Variant::Named(nv)) => {
355 nv.vectors.values().next().map(|v| v.data.clone())
356 }
357 None => None,
358 })
359 .ok_or_else(|| Status::invalid_argument("Vector required"))?;
360
361 let payload = if p.payload.is_empty() {
362 None
363 } else {
364 let json_map: serde_json::Map<String, serde_json::Value> = p.payload.iter()
365 .map(|(k, v)| (k.clone(), Self::proto_value_to_json(v)))
366 .collect();
367 Some(serde_json::Value::Object(json_map))
368 };
369
370 let vector = Vector::new(vector_data);
371 Ok(Point::new(point_id, vector, payload))
372 }).collect();
373
374 let points = points?;
375 let count = points.len();
376
377 if count > 1 {
378 collection.batch_upsert(points)
379 .map_err(|e| Status::internal(e.to_string()))?;
380 } else if let Some(point) = points.into_iter().next() {
381 collection.upsert(point)
382 .map_err(|e| Status::internal(e.to_string()))?;
383 }
384
385 Ok(Response::new(PointsOperationResponse {
386 result: Some(UpdateResult {
387 operation_id: 0,
388 status: UpdateStatus::Acknowledged as i32,
389 }),
390 time: start_time.elapsed().as_secs_f64(),
391 }))
392 }
393
394 async fn delete(
395 &self,
396 request: Request<DeletePoints>,
397 ) -> Result<Response<PointsOperationResponse>, Status> {
398 let start_time = Instant::now();
399 let req = request.into_inner();
400
401 let collection = self.storage.get_collection(&req.collection_name)
402 .ok_or_else(|| Status::not_found("Collection not found"))?;
403
404 if let Some(points_selector) = req.points {
405 if let Some(points_selector::PointsSelectorOneOf::Points(list)) = points_selector.points_selector_one_of {
406 for point_id in list.ids {
407 if let Some(id_str) = Self::parse_point_id(&point_id) {
408 let _ = collection.delete(&id_str);
409 }
410 }
411 }
412 }
413
414 Ok(Response::new(PointsOperationResponse {
415 result: Some(UpdateResult {
416 operation_id: 0,
417 status: UpdateStatus::Acknowledged as i32,
418 }),
419 time: start_time.elapsed().as_secs_f64(),
420 }))
421 }
422
423 async fn get(
424 &self,
425 request: Request<GetPoints>,
426 ) -> Result<Response<GetResponse>, Status> {
427 let start_time = Instant::now();
428 let req = request.into_inner();
429
430 let collection = self.storage.get_collection(&req.collection_name)
431 .ok_or_else(|| Status::not_found("Collection not found"))?;
432
433 let mut results = Vec::new();
434 for point_id in req.ids {
435 if let Some(id_str) = Self::parse_point_id(&point_id) {
436 if let Some(point) = collection.get(&id_str) {
437 let payload: std::collections::HashMap<String, vectx::Value> = point.payload
438 .as_ref()
439 .and_then(|p| p.as_object())
440 .map(|obj| {
441 obj.iter()
442 .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
443 .collect()
444 })
445 .unwrap_or_default();
446
447 results.push(RetrievedPoint {
448 id: Some(Self::to_proto_point_id(&point.id)),
449 payload,
450 vectors: Some(VectorInput {
451 variant: Some(vector_input::Variant::Dense(vectx::Vector {
452 data: point.vector.as_slice().to_vec(),
453 })),
454 }),
455 });
456 }
457 }
458 }
459
460 Ok(Response::new(GetResponse {
461 result: results,
462 time: start_time.elapsed().as_secs_f64(),
463 }))
464 }
465
466 async fn set_payload(
467 &self,
468 request: Request<SetPayloadPoints>,
469 ) -> Result<Response<PointsOperationResponse>, Status> {
470 let start_time = Instant::now();
471 let req = request.into_inner();
472
473 if self.storage.get_collection(&req.collection_name).is_none() {
474 return Err(Status::not_found("Collection not found"));
475 }
476
477 Ok(Response::new(PointsOperationResponse {
479 result: Some(UpdateResult {
480 operation_id: 0,
481 status: UpdateStatus::Acknowledged as i32,
482 }),
483 time: start_time.elapsed().as_secs_f64(),
484 }))
485 }
486
487 async fn delete_payload(
488 &self,
489 request: Request<DeletePayloadPoints>,
490 ) -> Result<Response<PointsOperationResponse>, Status> {
491 let start_time = Instant::now();
492 let req = request.into_inner();
493
494 if self.storage.get_collection(&req.collection_name).is_none() {
495 return Err(Status::not_found("Collection not found"));
496 }
497
498 Ok(Response::new(PointsOperationResponse {
499 result: Some(UpdateResult {
500 operation_id: 0,
501 status: UpdateStatus::Acknowledged as i32,
502 }),
503 time: start_time.elapsed().as_secs_f64(),
504 }))
505 }
506
507 async fn clear_payload(
508 &self,
509 request: Request<ClearPayloadPoints>,
510 ) -> Result<Response<PointsOperationResponse>, Status> {
511 let start_time = Instant::now();
512 let req = request.into_inner();
513
514 if self.storage.get_collection(&req.collection_name).is_none() {
515 return Err(Status::not_found("Collection not found"));
516 }
517
518 Ok(Response::new(PointsOperationResponse {
519 result: Some(UpdateResult {
520 operation_id: 0,
521 status: UpdateStatus::Acknowledged as i32,
522 }),
523 time: start_time.elapsed().as_secs_f64(),
524 }))
525 }
526
527 async fn create_field_index(
528 &self,
529 request: Request<CreateFieldIndexCollection>,
530 ) -> Result<Response<PointsOperationResponse>, Status> {
531 let start_time = Instant::now();
532 let req = request.into_inner();
533
534 if self.storage.get_collection(&req.collection_name).is_none() {
535 return Err(Status::not_found("Collection not found"));
536 }
537
538 Ok(Response::new(PointsOperationResponse {
539 result: Some(UpdateResult {
540 operation_id: 0,
541 status: UpdateStatus::Acknowledged as i32,
542 }),
543 time: start_time.elapsed().as_secs_f64(),
544 }))
545 }
546
547 async fn delete_field_index(
548 &self,
549 request: Request<DeleteFieldIndexCollection>,
550 ) -> Result<Response<PointsOperationResponse>, Status> {
551 let start_time = Instant::now();
552 let req = request.into_inner();
553
554 if self.storage.get_collection(&req.collection_name).is_none() {
555 return Err(Status::not_found("Collection not found"));
556 }
557
558 Ok(Response::new(PointsOperationResponse {
559 result: Some(UpdateResult {
560 operation_id: 0,
561 status: UpdateStatus::Acknowledged as i32,
562 }),
563 time: start_time.elapsed().as_secs_f64(),
564 }))
565 }
566
567 async fn search(
568 &self,
569 request: Request<SearchPoints>,
570 ) -> Result<Response<SearchResponse>, Status> {
571 let start_time = Instant::now();
572 let req = request.into_inner();
573
574 let collection = self.storage.get_collection(&req.collection_name)
575 .ok_or_else(|| Status::not_found("Collection not found"))?;
576
577 let query = Vector::new(req.vector);
578 let limit = req.limit as usize;
579
580 let results = collection.search(&query, limit, None);
581
582 let scored_points: Vec<ScoredPoint> = results.into_iter().map(|(point, score)| {
583 let payload: std::collections::HashMap<String, vectx::Value> = point.payload
584 .as_ref()
585 .and_then(|p| p.as_object())
586 .map(|obj| {
587 obj.iter()
588 .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
589 .collect()
590 })
591 .unwrap_or_default();
592
593 ScoredPoint {
594 id: Some(Self::to_proto_point_id(&point.id)),
595 payload,
596 score,
597 vectors: None,
598 version: Some(0),
599 }
600 }).collect();
601
602 Ok(Response::new(SearchResponse {
603 result: scored_points,
604 time: start_time.elapsed().as_secs_f64(),
605 }))
606 }
607
608 async fn scroll(
609 &self,
610 request: Request<ScrollPoints>,
611 ) -> Result<Response<ScrollResponse>, Status> {
612 let start_time = Instant::now();
613 let req = request.into_inner();
614
615 let collection = self.storage.get_collection(&req.collection_name)
616 .ok_or_else(|| Status::not_found("Collection not found"))?;
617
618 let limit = req.limit.unwrap_or(10) as usize;
619 let all_points = collection.get_all_points();
620
621 let offset_id: Option<String> = req.offset.as_ref()
623 .and_then(Self::parse_point_id);
624
625 let mut points_iter = all_points.iter();
626
627 if let Some(ref offset) = offset_id {
629 while let Some(p) = points_iter.next() {
630 if p.id.to_string() == *offset {
631 break;
632 }
633 }
634 }
635
636 let mut results = Vec::new();
637 let mut last_id = None;
638
639 for point in points_iter.take(limit) {
640 last_id = Some(Self::to_proto_point_id(&point.id));
641
642 let payload: std::collections::HashMap<String, vectx::Value> = point.payload
643 .as_ref()
644 .and_then(|p| p.as_object())
645 .map(|obj| {
646 obj.iter()
647 .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
648 .collect()
649 })
650 .unwrap_or_default();
651
652 results.push(RetrievedPoint {
653 id: Some(Self::to_proto_point_id(&point.id)),
654 payload,
655 vectors: Some(VectorInput {
656 variant: Some(vector_input::Variant::Dense(vectx::Vector {
657 data: point.vector.as_slice().to_vec(),
658 })),
659 }),
660 });
661 }
662
663 let next_offset = if results.len() == limit {
665 last_id
666 } else {
667 None
668 };
669
670 Ok(Response::new(ScrollResponse {
671 next_page_offset: next_offset,
672 result: results,
673 time: start_time.elapsed().as_secs_f64(),
674 }))
675 }
676
677 async fn recommend(
678 &self,
679 request: Request<RecommendPoints>,
680 ) -> Result<Response<RecommendResponse>, Status> {
681 let start_time = Instant::now();
682 let req = request.into_inner();
683
684 let collection = self.storage.get_collection(&req.collection_name)
685 .ok_or_else(|| Status::not_found("Collection not found"))?;
686
687 let limit = req.limit as usize;
688
689 let mut positive_vectors: Vec<Vec<f32>> = Vec::new();
691 let mut exclude_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
692
693 for pos_id in &req.positive {
694 if let Some(id_str) = Self::parse_point_id(pos_id) {
695 exclude_ids.insert(id_str.clone());
696 if let Some(point) = collection.get(&id_str) {
697 positive_vectors.push(point.vector.as_slice().to_vec());
698 }
699 }
700 }
701
702 if positive_vectors.is_empty() {
703 return Err(Status::invalid_argument("At least one valid positive example required"));
704 }
705
706 let mut negative_vectors: Vec<Vec<f32>> = Vec::new();
708 for neg_id in &req.negative {
709 if let Some(id_str) = Self::parse_point_id(neg_id) {
710 exclude_ids.insert(id_str.clone());
711 if let Some(point) = collection.get(&id_str) {
712 negative_vectors.push(point.vector.as_slice().to_vec());
713 }
714 }
715 }
716
717 let dim = positive_vectors[0].len();
719 let mut avg_positive = vec![0.0f32; dim];
720 for vec in &positive_vectors {
721 for (i, &val) in vec.iter().enumerate() {
722 if i < dim { avg_positive[i] += val; }
723 }
724 }
725 let pos_count = positive_vectors.len() as f32;
726 for val in &mut avg_positive { *val /= pos_count; }
727
728 let query_data = if !negative_vectors.is_empty() {
730 let mut avg_negative = vec![0.0f32; dim];
731 for vec in &negative_vectors {
732 for (i, &val) in vec.iter().enumerate() {
733 if i < dim { avg_negative[i] += val; }
734 }
735 }
736 let neg_count = negative_vectors.len() as f32;
737 for val in &mut avg_negative { *val /= neg_count; }
738
739 avg_positive.iter()
740 .zip(avg_negative.iter())
741 .map(|(p, n)| 2.0 * p - n)
742 .collect()
743 } else {
744 avg_positive
745 };
746
747 let query = Vector::new(query_data);
748 let search_results = collection.search(&query, limit + exclude_ids.len(), None);
749
750 let scored_points: Vec<ScoredPoint> = search_results
751 .into_iter()
752 .filter(|(point, _)| !exclude_ids.contains(&point.id.to_string()))
753 .take(limit)
754 .map(|(point, score)| {
755 let payload: std::collections::HashMap<String, vectx::Value> = point.payload
756 .as_ref()
757 .and_then(|p| p.as_object())
758 .map(|obj| {
759 obj.iter()
760 .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
761 .collect()
762 })
763 .unwrap_or_default();
764
765 ScoredPoint {
766 id: Some(Self::to_proto_point_id(&point.id)),
767 payload,
768 score,
769 vectors: None,
770 version: Some(0),
771 }
772 })
773 .collect();
774
775 Ok(Response::new(RecommendResponse {
776 result: scored_points,
777 time: start_time.elapsed().as_secs_f64(),
778 }))
779 }
780
781 async fn count(
782 &self,
783 request: Request<CountPoints>,
784 ) -> Result<Response<CountResponse>, Status> {
785 let start_time = Instant::now();
786 let req = request.into_inner();
787
788 let collection = self.storage.get_collection(&req.collection_name)
789 .ok_or_else(|| Status::not_found("Collection not found"))?;
790
791 Ok(Response::new(CountResponse {
792 result: Some(CountResult {
793 count: collection.count() as u64,
794 }),
795 time: start_time.elapsed().as_secs_f64(),
796 }))
797 }
798
799 async fn query(
800 &self,
801 request: Request<QueryPoints>,
802 ) -> Result<Response<QueryResponse>, Status> {
803 let start_time = Instant::now();
804 let req = request.into_inner();
805
806 let collection = self.storage.get_collection(&req.collection_name)
807 .ok_or_else(|| Status::not_found("Collection not found"))?;
808
809 let limit = req.limit as usize;
810
811 let query_data = req.query
812 .and_then(|vi| match vi.variant {
813 Some(vector_input::Variant::Dense(v)) => Some(v.data),
814 Some(vector_input::Variant::Named(nv)) => {
815 nv.vectors.values().next().map(|v| v.data.clone())
816 }
817 None => None,
818 })
819 .ok_or_else(|| Status::invalid_argument("Query vector required"))?;
820
821 let query = Vector::new(query_data);
822 let results = collection.search(&query, limit, None);
823
824 let scored_points: Vec<ScoredPoint> = results.into_iter().map(|(point, score)| {
825 let payload: std::collections::HashMap<String, vectx::Value> = point.payload
826 .as_ref()
827 .and_then(|p| p.as_object())
828 .map(|obj| {
829 obj.iter()
830 .map(|(k, v)| (k.clone(), Self::json_to_proto_value(v)))
831 .collect()
832 })
833 .unwrap_or_default();
834
835 ScoredPoint {
836 id: Some(Self::to_proto_point_id(&point.id)),
837 payload,
838 score,
839 vectors: None,
840 version: Some(0),
841 }
842 }).collect();
843
844 Ok(Response::new(QueryResponse {
845 result: scored_points,
846 time: start_time.elapsed().as_secs_f64(),
847 }))
848 }
849}
850
851pub struct SnapshotsService {
856 storage: Arc<StorageManager>,
857}
858
859impl SnapshotsService {
860 pub fn new(storage: Arc<StorageManager>) -> Self {
861 Self { storage }
862 }
863}
864
865#[tonic::async_trait]
866impl vectx::snapshots_server::Snapshots for SnapshotsService {
867 async fn create(
868 &self,
869 request: Request<CreateSnapshotRequest>,
870 ) -> Result<Response<CreateSnapshotResponse>, Status> {
871 let start_time = Instant::now();
872 let req = request.into_inner();
873
874 match self.storage.create_collection_snapshot(&req.collection_name) {
875 Ok(snapshot) => Ok(Response::new(CreateSnapshotResponse {
876 result: Some(SnapshotDescription {
877 name: snapshot.name,
878 creation_time: snapshot.creation_time.unwrap_or_default(),
879 size: snapshot.size as i64,
880 checksum: snapshot.checksum,
881 }),
882 time: start_time.elapsed().as_secs_f64(),
883 })),
884 Err(e) => Err(Status::internal(e.to_string())),
885 }
886 }
887
888 async fn list(
889 &self,
890 request: Request<ListSnapshotsRequest>,
891 ) -> Result<Response<ListSnapshotsResponse>, Status> {
892 let start_time = Instant::now();
893 let req = request.into_inner();
894
895 match self.storage.list_collection_snapshots(&req.collection_name) {
896 Ok(snapshots) => {
897 let descriptions: Vec<SnapshotDescription> = snapshots
898 .into_iter()
899 .map(|s| SnapshotDescription {
900 name: s.name,
901 creation_time: s.creation_time.unwrap_or_default(),
902 size: s.size as i64,
903 checksum: s.checksum,
904 })
905 .collect();
906
907 Ok(Response::new(ListSnapshotsResponse {
908 snapshots: descriptions,
909 time: start_time.elapsed().as_secs_f64(),
910 }))
911 }
912 Err(e) => Err(Status::internal(e.to_string())),
913 }
914 }
915
916 async fn delete(
917 &self,
918 request: Request<DeleteSnapshotRequest>,
919 ) -> Result<Response<DeleteSnapshotResponse>, Status> {
920 let start_time = Instant::now();
921 let req = request.into_inner();
922
923 match self.storage.delete_collection_snapshot(&req.collection_name, &req.snapshot_name) {
924 Ok(true) => Ok(Response::new(DeleteSnapshotResponse {
925 result: true,
926 time: start_time.elapsed().as_secs_f64(),
927 })),
928 Ok(false) => Err(Status::not_found("Snapshot not found")),
929 Err(e) => Err(Status::internal(e.to_string())),
930 }
931 }
932
933 async fn recover(
934 &self,
935 request: Request<RecoverSnapshotRequest>,
936 ) -> Result<Response<RecoverSnapshotResponse>, Status> {
937 let start_time = Instant::now();
938 let req = request.into_inner();
939
940 match self.storage.recover_from_snapshot(&req.collection_name, &req.location) {
941 Ok(_) => Ok(Response::new(RecoverSnapshotResponse {
942 result: true,
943 time: start_time.elapsed().as_secs_f64(),
944 })),
945 Err(e) => Err(Status::internal(e.to_string())),
946 }
947 }
948}
949
950pub struct GrpcApi;
955
956impl GrpcApi {
957 pub async fn start(
958 storage: Arc<StorageManager>,
959 port: u16,
960 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
961 let addr = format!("0.0.0.0:{}", port).parse()?;
962
963 let qdrant_service = vectx::qdrant_server::QdrantServer::new(QdrantService);
964 let collections_service = vectx::collections_server::CollectionsServer::new(
965 CollectionsService::new(storage.clone())
966 );
967 let points_service = vectx::points_server::PointsServer::new(
968 PointsService::new(storage.clone())
969 );
970 let snapshots_service = vectx::snapshots_server::SnapshotsServer::new(
971 SnapshotsService::new(storage)
972 );
973
974 println!("gRPC server listening on {}", addr);
975
976 tonic::transport::Server::builder()
977 .add_service(qdrant_service)
978 .add_service(collections_service)
979 .add_service(points_service)
980 .add_service(snapshots_service)
981 .serve(addr)
982 .await?;
983
984 Ok(())
985 }
986}