1use std::collections::HashMap;
11
12use crate::vector_store::BoxFuture;
13use qdrant_client::Qdrant;
14use qdrant_client::qdrant::vector_output::Vector as VectorVariant;
15use qdrant_client::qdrant::{
16 CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, GetPointsBuilder, PointId,
17 PointStruct, PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder,
18 UpsertPointsBuilder, VectorParamsBuilder, value::Kind,
19};
20
21type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
22
23#[derive(Clone)]
25pub struct QdrantOps {
26 client: Qdrant,
27}
28
29impl std::fmt::Debug for QdrantOps {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("QdrantOps").finish_non_exhaustive()
32 }
33}
34
35impl QdrantOps {
36 pub fn new(url: &str) -> QdrantResult<Self> {
44 let client = Qdrant::from_url(url).build().map_err(Box::new)?;
45 Ok(Self { client })
46 }
47
48 #[must_use]
50 pub fn client(&self) -> &Qdrant {
51 &self.client
52 }
53
54 pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
63 if self
64 .client
65 .collection_exists(collection)
66 .await
67 .map_err(Box::new)?
68 {
69 let existing_size = self.get_collection_vector_size(collection).await?;
70 if existing_size == Some(vector_size) {
71 return Ok(());
72 }
73 tracing::warn!(
74 collection,
75 existing = ?existing_size,
76 required = vector_size,
77 "vector dimension mismatch — recreating collection (existing data will be lost)"
78 );
79 self.client
80 .delete_collection(collection)
81 .await
82 .map_err(Box::new)?;
83 }
84 self.client
85 .create_collection(
86 CreateCollectionBuilder::new(collection)
87 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
88 )
89 .await
90 .map_err(Box::new)?;
91 Ok(())
92 }
93
94 async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
101 let info = self
102 .client
103 .collection_info(collection)
104 .await
105 .map_err(Box::new)?;
106 let size = info
107 .result
108 .and_then(|r| r.config)
109 .and_then(|cfg| cfg.params)
110 .and_then(|params| params.vectors_config)
111 .and_then(|vc| vc.config)
112 .and_then(|cfg| match cfg {
113 qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
114 qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
116 });
117 Ok(size)
118 }
119
120 pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
126 self.client
127 .collection_exists(collection)
128 .await
129 .map_err(Box::new)
130 }
131
132 pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
138 self.client
139 .delete_collection(collection)
140 .await
141 .map_err(Box::new)?;
142 Ok(())
143 }
144
145 pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
151 self.client
152 .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
153 .await
154 .map_err(Box::new)?;
155 Ok(())
156 }
157
158 pub async fn search(
164 &self,
165 collection: &str,
166 vector: Vec<f32>,
167 limit: u64,
168 filter: Option<Filter>,
169 ) -> QdrantResult<Vec<ScoredPoint>> {
170 let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
171 if let Some(f) = filter {
172 builder = builder.filter(f);
173 }
174 let results = self.client.search_points(builder).await.map_err(Box::new)?;
175 Ok(results.result)
176 }
177
178 pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
184 if ids.is_empty() {
185 return Ok(());
186 }
187 self.client
188 .delete_points(
189 DeletePointsBuilder::new(collection)
190 .points(PointsIdsList { ids })
191 .wait(true),
192 )
193 .await
194 .map_err(Box::new)?;
195 Ok(())
196 }
197
198 pub async fn scroll_all(
206 &self,
207 collection: &str,
208 key_field: &str,
209 ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
210 let mut result = HashMap::new();
211 let mut offset: Option<PointId> = None;
212
213 loop {
214 let mut builder = ScrollPointsBuilder::new(collection)
215 .with_payload(true)
216 .with_vectors(false)
217 .limit(100);
218
219 if let Some(ref off) = offset {
220 builder = builder.offset(off.clone());
221 }
222
223 let response = self.client.scroll(builder).await.map_err(Box::new)?;
224
225 for point in &response.result {
226 let Some(key_val) = point.payload.get(key_field) else {
227 continue;
228 };
229 let Some(Kind::StringValue(key)) = &key_val.kind else {
230 continue;
231 };
232
233 let mut fields = HashMap::new();
234 for (k, val) in &point.payload {
235 if let Some(Kind::StringValue(s)) = &val.kind {
236 fields.insert(k.clone(), s.clone());
237 }
238 }
239 result.insert(key.clone(), fields);
240 }
241
242 match response.next_page_offset {
243 Some(next) => offset = Some(next),
244 None => break,
245 }
246 }
247
248 Ok(result)
249 }
250
251 pub async fn ensure_collection_with_quantization(
261 &self,
262 collection: &str,
263 vector_size: u64,
264 keyword_fields: &[&str],
265 ) -> Result<(), crate::VectorStoreError> {
266 use qdrant_client::qdrant::{
267 CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
268 };
269 if self
270 .client
271 .collection_exists(collection)
272 .await
273 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
274 {
275 let existing_size = self
276 .get_collection_vector_size(collection)
277 .await
278 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
279 if existing_size == Some(vector_size) {
280 return Ok(());
281 }
282 tracing::warn!(
283 collection,
284 existing = ?existing_size,
285 required = vector_size,
286 "vector dimension mismatch — recreating collection (existing data will be lost)"
287 );
288 self.client
289 .delete_collection(collection)
290 .await
291 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
292 }
293 self.client
294 .create_collection(
295 CreateCollectionBuilder::new(collection)
296 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
297 .quantization_config(ScalarQuantizationBuilder::default()),
298 )
299 .await
300 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
301
302 for field in keyword_fields {
303 self.client
304 .create_field_index(CreateFieldIndexCollectionBuilder::new(
305 collection,
306 *field,
307 FieldType::Keyword,
308 ))
309 .await
310 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
311 }
312 Ok(())
313 }
314
315 pub fn json_to_payload(
321 value: serde_json::Value,
322 ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
323 serde_json::from_value(value)
324 }
325}
326
327impl crate::vector_store::VectorStore for QdrantOps {
328 fn ensure_collection(
329 &self,
330 collection: &str,
331 vector_size: u64,
332 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
333 let collection = collection.to_owned();
334 Box::pin(async move {
335 self.ensure_collection(&collection, vector_size)
336 .await
337 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
338 })
339 }
340
341 fn collection_exists(
342 &self,
343 collection: &str,
344 ) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
345 let collection = collection.to_owned();
346 Box::pin(async move {
347 self.collection_exists(&collection)
348 .await
349 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
350 })
351 }
352
353 fn delete_collection(
354 &self,
355 collection: &str,
356 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
357 let collection = collection.to_owned();
358 Box::pin(async move {
359 self.delete_collection(&collection)
360 .await
361 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
362 })
363 }
364
365 fn upsert(
366 &self,
367 collection: &str,
368 points: Vec<crate::VectorPoint>,
369 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
370 let collection = collection.to_owned();
371 Box::pin(async move {
372 let qdrant_points: Vec<PointStruct> = points
373 .into_iter()
374 .map(|p| {
375 let payload: HashMap<String, qdrant_client::qdrant::Value> =
376 serde_json::from_value(serde_json::Value::Object(
377 p.payload.into_iter().collect(),
378 ))
379 .unwrap_or_default();
380 PointStruct::new(p.id, p.vector, payload)
381 })
382 .collect();
383 self.upsert(&collection, qdrant_points)
384 .await
385 .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
386 })
387 }
388
389 fn search(
390 &self,
391 collection: &str,
392 vector: Vec<f32>,
393 limit: u64,
394 filter: Option<crate::VectorFilter>,
395 ) -> BoxFuture<'_, Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>> {
396 let collection = collection.to_owned();
397 Box::pin(async move {
398 let qdrant_filter = filter.map(vector_filter_to_qdrant);
399 let results = self
400 .search(&collection, vector, limit, qdrant_filter)
401 .await
402 .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
403 Ok(results.into_iter().map(scored_point_to_vector).collect())
404 })
405 }
406
407 fn delete_by_ids(
408 &self,
409 collection: &str,
410 ids: Vec<String>,
411 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
412 let collection = collection.to_owned();
413 Box::pin(async move {
414 let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
415 self.delete_by_ids(&collection, point_ids)
416 .await
417 .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
418 })
419 }
420
421 fn scroll_all(
422 &self,
423 collection: &str,
424 key_field: &str,
425 ) -> BoxFuture<'_, Result<HashMap<String, HashMap<String, String>>, crate::VectorStoreError>>
426 {
427 let collection = collection.to_owned();
428 let key_field = key_field.to_owned();
429 Box::pin(async move {
430 self.scroll_all(&collection, &key_field)
431 .await
432 .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
433 })
434 }
435
436 fn health_check(&self) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
437 Box::pin(async move {
438 self.client
439 .health_check()
440 .await
441 .map(|_| true)
442 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
443 })
444 }
445
446 fn create_keyword_indexes(
447 &self,
448 collection: &str,
449 fields: &[&str],
450 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
451 use qdrant_client::qdrant::{CreateFieldIndexCollectionBuilder, FieldType};
452 let collection = collection.to_owned();
453 let fields: Vec<String> = fields.iter().map(|f| (*f).to_owned()).collect();
454 Box::pin(async move {
455 for field in &fields {
456 self.client
457 .create_field_index(CreateFieldIndexCollectionBuilder::new(
458 &collection,
459 field.as_str(),
460 FieldType::Keyword,
461 ))
462 .await
463 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
464 }
465 Ok(())
466 })
467 }
468
469 fn get_points(
470 &self,
471 collection: &str,
472 ids: Vec<String>,
473 ) -> BoxFuture<'_, Result<Vec<crate::VectorPoint>, crate::VectorStoreError>> {
474 let collection = collection.to_owned();
475 Box::pin(async move {
476 if ids.is_empty() {
477 return Ok(Vec::new());
478 }
479 let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
480 let response = self
481 .client
482 .get_points(
483 GetPointsBuilder::new(&collection, point_ids)
484 .with_vectors(true)
485 .with_payload(true),
486 )
487 .await
488 .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
489
490 let mut result = Vec::with_capacity(response.result.len());
491 for point in response.result {
492 let Some(id_str) = point_id_to_string(point.id) else {
493 continue;
494 };
495 let vector = match point.vectors.and_then(|v| v.get_vector()) {
497 Some(VectorVariant::Dense(dv)) => dv.data,
498 _ => continue,
499 };
500 let payload: HashMap<String, serde_json::Value> = point
501 .payload
502 .into_iter()
503 .filter_map(|(k, v)| {
504 let json = qdrant_value_to_json(v.kind?)?;
505 Some((k, json))
506 })
507 .collect();
508 result.push(crate::VectorPoint {
509 id: id_str,
510 vector,
511 payload,
512 });
513 }
514 Ok(result)
515 })
516 }
517}
518
519fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
520 let must: Vec<_> = filter
521 .must
522 .into_iter()
523 .map(field_condition_to_qdrant)
524 .collect();
525 let must_not: Vec<_> = filter
526 .must_not
527 .into_iter()
528 .map(field_condition_to_qdrant)
529 .collect();
530
531 let mut f = Filter::default();
532 if !must.is_empty() {
533 f.must = must;
534 }
535 if !must_not.is_empty() {
536 f.must_not = must_not;
537 }
538 f
539}
540
541fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
542 match cond.value {
543 crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
544 crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
545 }
546}
547
548fn point_id_to_string(pid: Option<qdrant_client::qdrant::PointId>) -> Option<String> {
552 match pid?.point_id_options? {
553 qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u) => Some(u),
554 qdrant_client::qdrant::point_id::PointIdOptions::Num(n) => Some(n.to_string()),
555 }
556}
557
558fn qdrant_value_to_json(kind: Kind) -> Option<serde_json::Value> {
562 match kind {
563 Kind::StringValue(s) => Some(serde_json::Value::String(s)),
564 Kind::IntegerValue(i) => Some(serde_json::Value::Number(i.into())),
565 Kind::DoubleValue(d) => serde_json::Number::from_f64(d).map(serde_json::Value::Number),
566 Kind::BoolValue(b) => Some(serde_json::Value::Bool(b)),
567 _ => None,
568 }
569}
570
571fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
572 let payload: HashMap<String, serde_json::Value> = point
573 .payload
574 .into_iter()
575 .filter_map(|(k, v)| Some((k, qdrant_value_to_json(v.kind?)?)))
576 .collect();
577
578 let id = point_id_to_string(point.id).unwrap_or_default();
579
580 crate::ScoredVectorPoint {
581 id,
582 score: point.score,
583 payload,
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590
591 #[test]
592 fn new_valid_url() {
593 let ops = QdrantOps::new("http://localhost:6334");
594 assert!(ops.is_ok());
595 }
596
597 #[test]
598 fn new_invalid_url() {
599 let ops = QdrantOps::new("not a valid url");
600 assert!(ops.is_err());
601 }
602
603 #[test]
604 fn debug_format() {
605 let ops = QdrantOps::new("http://localhost:6334").unwrap();
606 let dbg = format!("{ops:?}");
607 assert!(dbg.contains("QdrantOps"));
608 }
609
610 #[test]
611 fn json_to_payload_valid() {
612 let value = serde_json::json!({"key": "value", "num": 42});
613 let result = QdrantOps::json_to_payload(value);
614 assert!(result.is_ok());
615 }
616
617 #[test]
618 fn json_to_payload_empty() {
619 let result = QdrantOps::json_to_payload(serde_json::json!({}));
620 assert!(result.is_ok());
621 assert!(result.unwrap().is_empty());
622 }
623
624 #[test]
625 fn delete_by_ids_empty_is_ok_sync() {
626 let ops = QdrantOps::new("http://localhost:6334");
630 assert!(ops.is_ok());
631 }
632
633 #[tokio::test]
635 #[ignore = "requires a live Qdrant instance at localhost:6334"]
636 async fn ensure_collection_with_quantization_idempotent() {
637 let ops = QdrantOps::new("http://localhost:6334").unwrap();
638 let collection = "test_quant_idempotent";
639
640 let _ = ops.delete_collection(collection).await;
642
643 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
645 .await
646 .unwrap();
647
648 assert!(ops.collection_exists(collection).await.unwrap());
649
650 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
652 .await
653 .unwrap();
654
655 ops.delete_collection(collection).await.unwrap();
657 }
658
659 #[tokio::test]
661 #[ignore = "requires a live Qdrant instance at localhost:6334"]
662 async fn delete_by_ids_empty_no_network_call() {
663 let ops = QdrantOps::new("http://localhost:6334").unwrap();
664 let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
666 assert!(result.is_ok());
667 }
668
669 #[tokio::test]
671 #[ignore = "requires a live Qdrant instance at localhost:6334"]
672 async fn ensure_collection_idempotent_same_size() {
673 let ops = QdrantOps::new("http://localhost:6334").unwrap();
674 let collection = "test_ensure_idempotent";
675
676 let _ = ops.delete_collection(collection).await;
677
678 ops.ensure_collection(collection, 128).await.unwrap();
679 assert!(ops.collection_exists(collection).await.unwrap());
680
681 ops.ensure_collection(collection, 128).await.unwrap();
683 assert!(ops.collection_exists(collection).await.unwrap());
684
685 ops.delete_collection(collection).await.unwrap();
686 }
687
688 #[tokio::test]
693 #[ignore = "requires a live Qdrant instance at localhost:6334"]
694 async fn ensure_collection_recreates_on_dimension_mismatch() {
695 let ops = QdrantOps::new("http://localhost:6334").unwrap();
696 let collection = "test_dim_mismatch";
697
698 let _ = ops.delete_collection(collection).await;
699
700 ops.ensure_collection(collection, 128).await.unwrap();
702 assert_eq!(
703 ops.get_collection_vector_size(collection).await.unwrap(),
704 Some(128)
705 );
706
707 ops.ensure_collection(collection, 256).await.unwrap();
709 assert_eq!(
710 ops.get_collection_vector_size(collection).await.unwrap(),
711 Some(256),
712 "collection must have been recreated with the new dimension"
713 );
714
715 ops.delete_collection(collection).await.unwrap();
716 }
717
718 #[tokio::test]
722 #[ignore = "requires a live Qdrant instance at localhost:6334"]
723 async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
724 let ops = QdrantOps::new("http://localhost:6334").unwrap();
725 let collection = "test_quant_dim_mismatch";
726
727 let _ = ops.delete_collection(collection).await;
728
729 ops.ensure_collection_with_quantization(collection, 128, &["language"])
730 .await
731 .unwrap();
732 assert_eq!(
733 ops.get_collection_vector_size(collection).await.unwrap(),
734 Some(128)
735 );
736
737 ops.ensure_collection_with_quantization(collection, 384, &["language"])
739 .await
740 .unwrap();
741 assert_eq!(
742 ops.get_collection_vector_size(collection).await.unwrap(),
743 Some(384),
744 "collection must have been recreated with the new dimension"
745 );
746
747 ops.delete_collection(collection).await.unwrap();
748 }
749}