1use std::collections::HashMap;
11
12use crate::vector_store::BoxFuture;
13use qdrant_client::Qdrant;
14use qdrant_client::qdrant::vector_output::Vector as VectorVariant;
15use qdrant_client::qdrant::{
16 CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, GetPointsBuilder, PointId,
17 PointStruct, PointsIdsList, QueryPointsBuilder, ScoredPoint, ScrollPointsBuilder,
18 UpsertPointsBuilder, VectorParamsBuilder, value::Kind,
19};
20
21type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
22
23#[derive(Clone)]
25pub struct QdrantOps {
26 client: Qdrant,
27}
28
29impl std::fmt::Debug for QdrantOps {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("QdrantOps").finish_non_exhaustive()
32 }
33}
34
35impl QdrantOps {
36 pub fn new(url: &str, api_key: Option<&str>) -> QdrantResult<Self> {
49 let mut builder = Qdrant::from_url(url);
50 if let Some(key) = api_key.filter(|k| !k.trim().is_empty()) {
51 builder = builder.api_key(key.trim());
52 }
53 let client = builder.build().map_err(Box::new)?;
54 Ok(Self { client })
55 }
56
57 #[must_use]
59 pub fn client(&self) -> &Qdrant {
60 &self.client
61 }
62
63 pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
72 if self
73 .client
74 .collection_exists(collection)
75 .await
76 .map_err(Box::new)?
77 {
78 let existing_size = self.get_collection_vector_size(collection).await?;
79 if existing_size == Some(vector_size) {
80 return Ok(());
81 }
82 tracing::warn!(
83 collection,
84 existing = ?existing_size,
85 required = vector_size,
86 "vector dimension mismatch — recreating collection (existing data will be lost)"
87 );
88 self.client
89 .delete_collection(collection)
90 .await
91 .map_err(Box::new)?;
92 }
93 self.client
94 .create_collection(
95 CreateCollectionBuilder::new(collection)
96 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
97 )
98 .await
99 .map_err(Box::new)?;
100 Ok(())
101 }
102
103 pub(crate) async fn get_collection_vector_size(
114 &self,
115 collection: &str,
116 ) -> QdrantResult<Option<u64>> {
117 let info = self
118 .client
119 .collection_info(collection)
120 .await
121 .map_err(Box::new)?;
122 let size = info
123 .result
124 .and_then(|r| r.config)
125 .and_then(|cfg| cfg.params)
126 .and_then(|params| params.vectors_config)
127 .and_then(|vc| vc.config)
128 .and_then(|cfg| match cfg {
129 qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
130 qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
132 });
133 Ok(size)
134 }
135
136 pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
142 self.client
143 .collection_exists(collection)
144 .await
145 .map_err(Box::new)
146 }
147
148 pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
154 self.client
155 .delete_collection(collection)
156 .await
157 .map_err(Box::new)?;
158 Ok(())
159 }
160
161 pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
167 self.client
168 .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
169 .await
170 .map_err(Box::new)?;
171 Ok(())
172 }
173
174 pub async fn search(
183 &self,
184 collection: &str,
185 vector: Vec<f32>,
186 limit: u64,
187 filter: Option<Filter>,
188 ) -> QdrantResult<Vec<ScoredPoint>> {
189 let mut builder = QueryPointsBuilder::new(collection)
190 .query(vector)
191 .limit(limit)
192 .with_payload(true);
193 if let Some(f) = filter {
194 builder = builder.filter(f);
195 }
196 let results = self.client.query(builder).await.map_err(Box::new)?;
197 Ok(results.result)
198 }
199
200 pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
206 if ids.is_empty() {
207 return Ok(());
208 }
209 self.client
210 .delete_points(
211 DeletePointsBuilder::new(collection)
212 .points(PointsIdsList { ids })
213 .wait(true),
214 )
215 .await
216 .map_err(Box::new)?;
217 Ok(())
218 }
219
220 pub async fn scroll_all(
228 &self,
229 collection: &str,
230 key_field: &str,
231 ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
232 let mut result = HashMap::new();
233 let mut offset: Option<PointId> = None;
234
235 loop {
236 let mut builder = ScrollPointsBuilder::new(collection)
237 .with_payload(true)
238 .with_vectors(false)
239 .limit(100);
240
241 if let Some(ref off) = offset {
242 builder = builder.offset(off.clone());
243 }
244
245 let response = self.client.scroll(builder).await.map_err(Box::new)?;
246
247 for point in &response.result {
248 let Some(key_val) = point.payload.get(key_field) else {
249 continue;
250 };
251 let Some(Kind::StringValue(key)) = &key_val.kind else {
252 continue;
253 };
254
255 let mut fields = HashMap::new();
256 for (k, val) in &point.payload {
257 if let Some(Kind::StringValue(s)) = &val.kind {
258 fields.insert(k.clone(), s.clone());
259 }
260 }
261 result.insert(key.clone(), fields);
262 }
263
264 match response.next_page_offset {
265 Some(next) => offset = Some(next),
266 None => break,
267 }
268 }
269
270 Ok(result)
271 }
272
273 pub async fn scroll_all_with_point_ids(
282 &self,
283 collection: &str,
284 key_field: &str,
285 ) -> QdrantResult<Vec<(String, HashMap<String, String>)>> {
286 let mut result = Vec::new();
287 let mut offset: Option<PointId> = None;
288
289 loop {
290 let mut builder = ScrollPointsBuilder::new(collection)
291 .with_payload(true)
292 .with_vectors(false)
293 .limit(100);
294
295 if let Some(ref off) = offset {
296 builder = builder.offset(off.clone());
297 }
298
299 let response = self.client.scroll(builder).await.map_err(Box::new)?;
300
301 for point in &response.result {
302 let Some(key_val) = point.payload.get(key_field) else {
303 continue;
304 };
305 let Some(Kind::StringValue(_)) = &key_val.kind else {
306 continue;
307 };
308 let Some(point_id_str) = point_id_to_string(point.id.clone()) else {
309 continue;
310 };
311
312 let mut fields = HashMap::new();
313 for (k, val) in &point.payload {
314 if let Some(Kind::StringValue(s)) = &val.kind {
315 fields.insert(k.clone(), s.clone());
316 }
317 }
318 result.push((point_id_str, fields));
319 }
320
321 match response.next_page_offset {
322 Some(next) => offset = Some(next),
323 None => break,
324 }
325 }
326
327 Ok(result)
328 }
329
330 pub async fn ensure_collection_with_quantization(
340 &self,
341 collection: &str,
342 vector_size: u64,
343 keyword_fields: &[&str],
344 ) -> Result<(), crate::VectorStoreError> {
345 use qdrant_client::qdrant::{
346 CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
347 };
348 if self
349 .client
350 .collection_exists(collection)
351 .await
352 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
353 {
354 let existing_size = self
355 .get_collection_vector_size(collection)
356 .await
357 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
358 if existing_size == Some(vector_size) {
359 return Ok(());
360 }
361 tracing::warn!(
362 collection,
363 existing = ?existing_size,
364 required = vector_size,
365 "vector dimension mismatch — recreating collection (existing data will be lost)"
366 );
367 self.client
368 .delete_collection(collection)
369 .await
370 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
371 }
372 self.client
373 .create_collection(
374 CreateCollectionBuilder::new(collection)
375 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
376 .quantization_config(ScalarQuantizationBuilder::default()),
377 )
378 .await
379 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
380
381 for field in keyword_fields {
382 self.client
383 .create_field_index(CreateFieldIndexCollectionBuilder::new(
384 collection,
385 *field,
386 FieldType::Keyword,
387 ))
388 .await
389 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
390 }
391 Ok(())
392 }
393
394 pub fn json_to_payload(
400 value: serde_json::Value,
401 ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
402 serde_json::from_value(value)
403 }
404}
405
406impl crate::vector_store::VectorStore for QdrantOps {
407 fn ensure_collection(
408 &self,
409 collection: &str,
410 vector_size: u64,
411 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
412 let collection = collection.to_owned();
413 Box::pin(async move {
414 self.ensure_collection(&collection, vector_size)
415 .await
416 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
417 })
418 }
419
420 fn collection_exists(
421 &self,
422 collection: &str,
423 ) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
424 let collection = collection.to_owned();
425 Box::pin(async move {
426 self.collection_exists(&collection)
427 .await
428 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
429 })
430 }
431
432 fn delete_collection(
433 &self,
434 collection: &str,
435 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
436 let collection = collection.to_owned();
437 Box::pin(async move {
438 self.delete_collection(&collection)
439 .await
440 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
441 })
442 }
443
444 fn upsert(
445 &self,
446 collection: &str,
447 points: Vec<crate::VectorPoint>,
448 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
449 let collection = collection.to_owned();
450 Box::pin(async move {
451 let qdrant_points: Vec<PointStruct> = points
452 .into_iter()
453 .map(|p| {
454 let payload: HashMap<String, qdrant_client::qdrant::Value> =
455 serde_json::from_value(serde_json::Value::Object(
456 p.payload.into_iter().collect(),
457 ))
458 .unwrap_or_default();
459 PointStruct::new(p.id, p.vector, payload)
460 })
461 .collect();
462 self.upsert(&collection, qdrant_points)
463 .await
464 .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
465 })
466 }
467
468 fn search(
469 &self,
470 collection: &str,
471 vector: Vec<f32>,
472 limit: u64,
473 filter: Option<crate::VectorFilter>,
474 ) -> BoxFuture<'_, Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>> {
475 let collection = collection.to_owned();
476 Box::pin(async move {
477 let qdrant_filter = filter.map(vector_filter_to_qdrant);
478 let results = self
479 .search(&collection, vector, limit, qdrant_filter)
480 .await
481 .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
482 Ok(results.into_iter().map(scored_point_to_vector).collect())
483 })
484 }
485
486 fn delete_by_ids(
487 &self,
488 collection: &str,
489 ids: Vec<String>,
490 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
491 let collection = collection.to_owned();
492 Box::pin(async move {
493 let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
494 self.delete_by_ids(&collection, point_ids)
495 .await
496 .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
497 })
498 }
499
500 fn scroll_all(
501 &self,
502 collection: &str,
503 key_field: &str,
504 ) -> BoxFuture<'_, Result<HashMap<String, HashMap<String, String>>, crate::VectorStoreError>>
505 {
506 let collection = collection.to_owned();
507 let key_field = key_field.to_owned();
508 Box::pin(async move {
509 self.scroll_all(&collection, &key_field)
510 .await
511 .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
512 })
513 }
514
515 fn scroll_all_with_point_ids(
516 &self,
517 collection: &str,
518 key_field: &str,
519 ) -> BoxFuture<'_, Result<crate::vector_store::ScrollWithIdsResult, crate::VectorStoreError>>
520 {
521 let collection = collection.to_owned();
522 let key_field = key_field.to_owned();
523 Box::pin(async move {
524 self.scroll_all_with_point_ids(&collection, &key_field)
525 .await
526 .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
527 })
528 }
529
530 fn health_check(&self) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
531 Box::pin(async move {
532 self.client
533 .health_check()
534 .await
535 .map(|_| true)
536 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
537 })
538 }
539
540 fn create_keyword_indexes(
541 &self,
542 collection: &str,
543 fields: &[&str],
544 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
545 use qdrant_client::qdrant::{CreateFieldIndexCollectionBuilder, FieldType};
546 let collection = collection.to_owned();
547 let fields: Vec<String> = fields.iter().map(|f| (*f).to_owned()).collect();
548 Box::pin(async move {
549 for field in &fields {
550 self.client
551 .create_field_index(CreateFieldIndexCollectionBuilder::new(
552 &collection,
553 field.as_str(),
554 FieldType::Keyword,
555 ))
556 .await
557 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
558 }
559 Ok(())
560 })
561 }
562
563 fn get_points(
564 &self,
565 collection: &str,
566 ids: Vec<String>,
567 ) -> BoxFuture<'_, Result<Vec<crate::VectorPoint>, crate::VectorStoreError>> {
568 let collection = collection.to_owned();
569 Box::pin(async move {
570 if ids.is_empty() {
571 return Ok(Vec::new());
572 }
573 let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
574 let response = self
575 .client
576 .get_points(
577 GetPointsBuilder::new(&collection, point_ids)
578 .with_vectors(true)
579 .with_payload(true),
580 )
581 .await
582 .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
583
584 let mut result = Vec::with_capacity(response.result.len());
585 for point in response.result {
586 let Some(id_str) = point_id_to_string(point.id) else {
587 continue;
588 };
589 let vector = match point.vectors.and_then(|v| v.get_vector()) {
591 Some(VectorVariant::Dense(dv)) => dv.data,
592 _ => continue,
593 };
594 let payload: HashMap<String, serde_json::Value> = point
595 .payload
596 .into_iter()
597 .filter_map(|(k, v)| {
598 let json = qdrant_value_to_json(v.kind?)?;
599 Some((k, json))
600 })
601 .collect();
602 result.push(crate::VectorPoint {
603 id: id_str,
604 vector,
605 payload,
606 });
607 }
608 Ok(result)
609 })
610 }
611}
612
613fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
614 let must: Vec<_> = filter
615 .must
616 .into_iter()
617 .map(field_condition_to_qdrant)
618 .collect();
619 let must_not: Vec<_> = filter
620 .must_not
621 .into_iter()
622 .map(field_condition_to_qdrant)
623 .collect();
624
625 let mut f = Filter::default();
626 if !must.is_empty() {
627 f.must = must;
628 }
629 if !must_not.is_empty() {
630 f.must_not = must_not;
631 }
632 f
633}
634
635fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
636 match cond.value {
637 crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
638 crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
639 }
640}
641
642fn point_id_to_string(pid: Option<qdrant_client::qdrant::PointId>) -> Option<String> {
646 match pid?.point_id_options? {
647 qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u) => Some(u),
648 qdrant_client::qdrant::point_id::PointIdOptions::Num(n) => Some(n.to_string()),
649 }
650}
651
652fn qdrant_value_to_json(kind: Kind) -> Option<serde_json::Value> {
656 match kind {
657 Kind::StringValue(s) => Some(serde_json::Value::String(s)),
658 Kind::IntegerValue(i) => Some(serde_json::Value::Number(i.into())),
659 Kind::DoubleValue(d) => serde_json::Number::from_f64(d).map(serde_json::Value::Number),
660 Kind::BoolValue(b) => Some(serde_json::Value::Bool(b)),
661 _ => None,
662 }
663}
664
665fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
666 let payload: HashMap<String, serde_json::Value> = point
667 .payload
668 .into_iter()
669 .filter_map(|(k, v)| Some((k, qdrant_value_to_json(v.kind?)?)))
670 .collect();
671
672 let id = point_id_to_string(point.id).unwrap_or_default();
673
674 crate::ScoredVectorPoint {
675 id,
676 score: point.score,
677 payload,
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use super::*;
684
685 #[test]
686 fn new_valid_url() {
687 let ops = QdrantOps::new("http://localhost:6334", None);
688 assert!(ops.is_ok());
689 }
690
691 #[test]
692 fn new_invalid_url() {
693 let ops = QdrantOps::new("not a valid url", None);
694 assert!(ops.is_err());
695 }
696
697 #[test]
700 fn new_empty_api_key_is_treated_as_none() {
701 let result = QdrantOps::new("http://127.0.0.1:9999", Some(""));
702 assert!(result.is_ok(), "empty key must not cause a build error");
703 }
704
705 #[test]
707 fn new_whitespace_api_key_is_treated_as_none() {
708 let result = QdrantOps::new("http://127.0.0.1:9999", Some(" "));
709 assert!(
710 result.is_ok(),
711 "whitespace-only key must not cause a build error"
712 );
713 }
714
715 #[test]
717 fn new_with_api_key_constructs_successfully() {
718 let result = QdrantOps::new("http://127.0.0.1:9999", Some("valid-key"));
719 assert!(result.is_ok(), "valid key must not cause a build error");
720 }
721
722 #[test]
723 fn debug_format() {
724 let ops = QdrantOps::new("http://localhost:6334", None).unwrap();
725 let dbg = format!("{ops:?}");
726 assert!(dbg.contains("QdrantOps"));
727 }
728
729 #[test]
730 fn json_to_payload_valid() {
731 let value = serde_json::json!({"key": "value", "num": 42});
732 let result = QdrantOps::json_to_payload(value);
733 assert!(result.is_ok());
734 }
735
736 #[test]
737 fn json_to_payload_empty() {
738 let result = QdrantOps::json_to_payload(serde_json::json!({}));
739 assert!(result.is_ok());
740 assert!(result.unwrap().is_empty());
741 }
742
743 #[test]
744 fn delete_by_ids_empty_is_ok_sync() {
745 let ops = QdrantOps::new("http://localhost:6334", None);
749 assert!(ops.is_ok());
750 }
751
752 #[tokio::test]
754 #[ignore = "requires a live Qdrant instance at localhost:6334"]
755 async fn ensure_collection_with_quantization_idempotent() {
756 let ops = QdrantOps::new("http://localhost:6334", None).unwrap();
757 let collection = "test_quant_idempotent";
758
759 let _ = ops.delete_collection(collection).await;
761
762 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
764 .await
765 .unwrap();
766
767 assert!(ops.collection_exists(collection).await.unwrap());
768
769 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
771 .await
772 .unwrap();
773
774 ops.delete_collection(collection).await.unwrap();
776 }
777
778 #[tokio::test]
780 #[ignore = "requires a live Qdrant instance at localhost:6334"]
781 async fn delete_by_ids_empty_no_network_call() {
782 let ops = QdrantOps::new("http://localhost:6334", None).unwrap();
783 let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
785 assert!(result.is_ok());
786 }
787
788 #[tokio::test]
790 #[ignore = "requires a live Qdrant instance at localhost:6334"]
791 async fn ensure_collection_idempotent_same_size() {
792 let ops = QdrantOps::new("http://localhost:6334", None).unwrap();
793 let collection = "test_ensure_idempotent";
794
795 let _ = ops.delete_collection(collection).await;
796
797 ops.ensure_collection(collection, 128).await.unwrap();
798 assert!(ops.collection_exists(collection).await.unwrap());
799
800 ops.ensure_collection(collection, 128).await.unwrap();
802 assert!(ops.collection_exists(collection).await.unwrap());
803
804 ops.delete_collection(collection).await.unwrap();
805 }
806
807 #[tokio::test]
812 #[ignore = "requires a live Qdrant instance at localhost:6334"]
813 async fn ensure_collection_recreates_on_dimension_mismatch() {
814 let ops = QdrantOps::new("http://localhost:6334", None).unwrap();
815 let collection = "test_dim_mismatch";
816
817 let _ = ops.delete_collection(collection).await;
818
819 ops.ensure_collection(collection, 128).await.unwrap();
821 assert_eq!(
822 ops.get_collection_vector_size(collection).await.unwrap(),
823 Some(128)
824 );
825
826 ops.ensure_collection(collection, 256).await.unwrap();
828 assert_eq!(
829 ops.get_collection_vector_size(collection).await.unwrap(),
830 Some(256),
831 "collection must have been recreated with the new dimension"
832 );
833
834 ops.delete_collection(collection).await.unwrap();
835 }
836
837 #[tokio::test]
841 #[ignore = "requires a live Qdrant instance at localhost:6334"]
842 async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
843 let ops = QdrantOps::new("http://localhost:6334", None).unwrap();
844 let collection = "test_quant_dim_mismatch";
845
846 let _ = ops.delete_collection(collection).await;
847
848 ops.ensure_collection_with_quantization(collection, 128, &["language"])
849 .await
850 .unwrap();
851 assert_eq!(
852 ops.get_collection_vector_size(collection).await.unwrap(),
853 Some(128)
854 );
855
856 ops.ensure_collection_with_quantization(collection, 384, &["language"])
858 .await
859 .unwrap();
860 assert_eq!(
861 ops.get_collection_vector_size(collection).await.unwrap(),
862 Some(384),
863 "collection must have been recreated with the new dimension"
864 );
865
866 ops.delete_collection(collection).await.unwrap();
867 }
868}