1use std::collections::HashMap;
7
8use qdrant_client::Qdrant;
9use qdrant_client::qdrant::{
10 CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
11 PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
12 VectorParamsBuilder, value::Kind,
13};
14
15type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
16
17#[derive(Clone)]
19pub struct QdrantOps {
20 client: Qdrant,
21}
22
23impl std::fmt::Debug for QdrantOps {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 f.debug_struct("QdrantOps").finish_non_exhaustive()
26 }
27}
28
29impl QdrantOps {
30 pub fn new(url: &str) -> QdrantResult<Self> {
36 let client = Qdrant::from_url(url).build().map_err(Box::new)?;
37 Ok(Self { client })
38 }
39
40 #[must_use]
42 pub fn client(&self) -> &Qdrant {
43 &self.client
44 }
45
46 pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
55 if self
56 .client
57 .collection_exists(collection)
58 .await
59 .map_err(Box::new)?
60 {
61 let existing_size = self.get_collection_vector_size(collection).await?;
62 if existing_size == Some(vector_size) {
63 return Ok(());
64 }
65 tracing::warn!(
66 collection,
67 existing = ?existing_size,
68 required = vector_size,
69 "vector dimension mismatch — recreating collection (existing data will be lost)"
70 );
71 self.client
72 .delete_collection(collection)
73 .await
74 .map_err(Box::new)?;
75 }
76 self.client
77 .create_collection(
78 CreateCollectionBuilder::new(collection)
79 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
80 )
81 .await
82 .map_err(Box::new)?;
83 Ok(())
84 }
85
86 async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
93 let info = self
94 .client
95 .collection_info(collection)
96 .await
97 .map_err(Box::new)?;
98 let size = info
99 .result
100 .and_then(|r| r.config)
101 .and_then(|cfg| cfg.params)
102 .and_then(|params| params.vectors_config)
103 .and_then(|vc| vc.config)
104 .and_then(|cfg| match cfg {
105 qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
106 qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
108 });
109 Ok(size)
110 }
111
112 pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
118 self.client
119 .collection_exists(collection)
120 .await
121 .map_err(Box::new)
122 }
123
124 pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
130 self.client
131 .delete_collection(collection)
132 .await
133 .map_err(Box::new)?;
134 Ok(())
135 }
136
137 pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
143 self.client
144 .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
145 .await
146 .map_err(Box::new)?;
147 Ok(())
148 }
149
150 pub async fn search(
156 &self,
157 collection: &str,
158 vector: Vec<f32>,
159 limit: u64,
160 filter: Option<Filter>,
161 ) -> QdrantResult<Vec<ScoredPoint>> {
162 let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
163 if let Some(f) = filter {
164 builder = builder.filter(f);
165 }
166 let results = self.client.search_points(builder).await.map_err(Box::new)?;
167 Ok(results.result)
168 }
169
170 pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
176 if ids.is_empty() {
177 return Ok(());
178 }
179 self.client
180 .delete_points(
181 DeletePointsBuilder::new(collection)
182 .points(PointsIdsList { ids })
183 .wait(true),
184 )
185 .await
186 .map_err(Box::new)?;
187 Ok(())
188 }
189
190 pub async fn scroll_all(
198 &self,
199 collection: &str,
200 key_field: &str,
201 ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
202 let mut result = HashMap::new();
203 let mut offset: Option<PointId> = None;
204
205 loop {
206 let mut builder = ScrollPointsBuilder::new(collection)
207 .with_payload(true)
208 .with_vectors(false)
209 .limit(100);
210
211 if let Some(ref off) = offset {
212 builder = builder.offset(off.clone());
213 }
214
215 let response = self.client.scroll(builder).await.map_err(Box::new)?;
216
217 for point in &response.result {
218 let Some(key_val) = point.payload.get(key_field) else {
219 continue;
220 };
221 let Some(Kind::StringValue(key)) = &key_val.kind else {
222 continue;
223 };
224
225 let mut fields = HashMap::new();
226 for (k, val) in &point.payload {
227 if let Some(Kind::StringValue(s)) = &val.kind {
228 fields.insert(k.clone(), s.clone());
229 }
230 }
231 result.insert(key.clone(), fields);
232 }
233
234 match response.next_page_offset {
235 Some(next) => offset = Some(next),
236 None => break,
237 }
238 }
239
240 Ok(result)
241 }
242
243 pub async fn ensure_collection_with_quantization(
253 &self,
254 collection: &str,
255 vector_size: u64,
256 keyword_fields: &[&str],
257 ) -> Result<(), crate::VectorStoreError> {
258 use qdrant_client::qdrant::{
259 CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
260 };
261 if self
262 .client
263 .collection_exists(collection)
264 .await
265 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
266 {
267 let existing_size = self
268 .get_collection_vector_size(collection)
269 .await
270 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
271 if existing_size == Some(vector_size) {
272 return Ok(());
273 }
274 tracing::warn!(
275 collection,
276 existing = ?existing_size,
277 required = vector_size,
278 "vector dimension mismatch — recreating collection (existing data will be lost)"
279 );
280 self.client
281 .delete_collection(collection)
282 .await
283 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
284 }
285 self.client
286 .create_collection(
287 CreateCollectionBuilder::new(collection)
288 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
289 .quantization_config(ScalarQuantizationBuilder::default()),
290 )
291 .await
292 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
293
294 for field in keyword_fields {
295 self.client
296 .create_field_index(CreateFieldIndexCollectionBuilder::new(
297 collection,
298 *field,
299 FieldType::Keyword,
300 ))
301 .await
302 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
303 }
304 Ok(())
305 }
306
307 pub fn json_to_payload(
313 value: serde_json::Value,
314 ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
315 serde_json::from_value(value)
316 }
317}
318
319impl crate::vector_store::VectorStore for QdrantOps {
320 fn ensure_collection(
321 &self,
322 collection: &str,
323 vector_size: u64,
324 ) -> std::pin::Pin<
325 Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
326 > {
327 let collection = collection.to_owned();
328 Box::pin(async move {
329 self.ensure_collection(&collection, vector_size)
330 .await
331 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
332 })
333 }
334
335 fn collection_exists(
336 &self,
337 collection: &str,
338 ) -> std::pin::Pin<
339 Box<dyn std::future::Future<Output = Result<bool, crate::VectorStoreError>> + Send + '_>,
340 > {
341 let collection = collection.to_owned();
342 Box::pin(async move {
343 self.collection_exists(&collection)
344 .await
345 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
346 })
347 }
348
349 fn delete_collection(
350 &self,
351 collection: &str,
352 ) -> std::pin::Pin<
353 Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
354 > {
355 let collection = collection.to_owned();
356 Box::pin(async move {
357 self.delete_collection(&collection)
358 .await
359 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
360 })
361 }
362
363 fn upsert(
364 &self,
365 collection: &str,
366 points: Vec<crate::VectorPoint>,
367 ) -> std::pin::Pin<
368 Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
369 > {
370 let collection = collection.to_owned();
371 Box::pin(async move {
372 let qdrant_points: Vec<PointStruct> = points
373 .into_iter()
374 .map(|p| {
375 let payload: HashMap<String, qdrant_client::qdrant::Value> =
376 serde_json::from_value(serde_json::Value::Object(
377 p.payload.into_iter().collect(),
378 ))
379 .unwrap_or_default();
380 PointStruct::new(p.id, p.vector, payload)
381 })
382 .collect();
383 self.upsert(&collection, qdrant_points)
384 .await
385 .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
386 })
387 }
388
389 fn search(
390 &self,
391 collection: &str,
392 vector: Vec<f32>,
393 limit: u64,
394 filter: Option<crate::VectorFilter>,
395 ) -> std::pin::Pin<
396 Box<
397 dyn std::future::Future<
398 Output = Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>,
399 > + Send
400 + '_,
401 >,
402 > {
403 let collection = collection.to_owned();
404 Box::pin(async move {
405 let qdrant_filter = filter.map(vector_filter_to_qdrant);
406 let results = self
407 .search(&collection, vector, limit, qdrant_filter)
408 .await
409 .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
410 Ok(results.into_iter().map(scored_point_to_vector).collect())
411 })
412 }
413
414 fn delete_by_ids(
415 &self,
416 collection: &str,
417 ids: Vec<String>,
418 ) -> std::pin::Pin<
419 Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
420 > {
421 let collection = collection.to_owned();
422 Box::pin(async move {
423 let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
424 self.delete_by_ids(&collection, point_ids)
425 .await
426 .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
427 })
428 }
429
430 fn scroll_all(
431 &self,
432 collection: &str,
433 key_field: &str,
434 ) -> std::pin::Pin<
435 Box<
436 dyn std::future::Future<
437 Output = Result<
438 HashMap<String, HashMap<String, String>>,
439 crate::VectorStoreError,
440 >,
441 > + Send
442 + '_,
443 >,
444 > {
445 let collection = collection.to_owned();
446 let key_field = key_field.to_owned();
447 Box::pin(async move {
448 self.scroll_all(&collection, &key_field)
449 .await
450 .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
451 })
452 }
453
454 fn health_check(
455 &self,
456 ) -> std::pin::Pin<
457 Box<dyn std::future::Future<Output = Result<bool, crate::VectorStoreError>> + Send + '_>,
458 > {
459 Box::pin(async move {
460 self.client
461 .health_check()
462 .await
463 .map(|_| true)
464 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
465 })
466 }
467}
468
469fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
470 let must: Vec<_> = filter
471 .must
472 .into_iter()
473 .map(field_condition_to_qdrant)
474 .collect();
475 let must_not: Vec<_> = filter
476 .must_not
477 .into_iter()
478 .map(field_condition_to_qdrant)
479 .collect();
480
481 let mut f = Filter::default();
482 if !must.is_empty() {
483 f.must = must;
484 }
485 if !must_not.is_empty() {
486 f.must_not = must_not;
487 }
488 f
489}
490
491fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
492 match cond.value {
493 crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
494 crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
495 }
496}
497
498fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
499 let payload: HashMap<String, serde_json::Value> = point
500 .payload
501 .into_iter()
502 .filter_map(|(k, v)| {
503 let json_val = match v.kind? {
504 Kind::StringValue(s) => serde_json::Value::String(s),
505 Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
506 Kind::DoubleValue(d) => {
507 serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
508 }
509 Kind::BoolValue(b) => serde_json::Value::Bool(b),
510 _ => return None,
511 };
512 Some((k, json_val))
513 })
514 .collect();
515
516 let id = match point.id.and_then(|pid| pid.point_id_options) {
517 Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
518 Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
519 None => String::new(),
520 };
521
522 crate::ScoredVectorPoint {
523 id,
524 score: point.score,
525 payload,
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532
533 #[test]
534 fn new_valid_url() {
535 let ops = QdrantOps::new("http://localhost:6334");
536 assert!(ops.is_ok());
537 }
538
539 #[test]
540 fn new_invalid_url() {
541 let ops = QdrantOps::new("not a valid url");
542 assert!(ops.is_err());
543 }
544
545 #[test]
546 fn debug_format() {
547 let ops = QdrantOps::new("http://localhost:6334").unwrap();
548 let dbg = format!("{ops:?}");
549 assert!(dbg.contains("QdrantOps"));
550 }
551
552 #[test]
553 fn json_to_payload_valid() {
554 let value = serde_json::json!({"key": "value", "num": 42});
555 let result = QdrantOps::json_to_payload(value);
556 assert!(result.is_ok());
557 }
558
559 #[test]
560 fn json_to_payload_empty() {
561 let result = QdrantOps::json_to_payload(serde_json::json!({}));
562 assert!(result.is_ok());
563 assert!(result.unwrap().is_empty());
564 }
565
566 #[test]
567 fn delete_by_ids_empty_is_ok_sync() {
568 let ops = QdrantOps::new("http://localhost:6334");
572 assert!(ops.is_ok());
573 }
574
575 #[tokio::test]
577 #[ignore = "requires a live Qdrant instance at localhost:6334"]
578 async fn ensure_collection_with_quantization_idempotent() {
579 let ops = QdrantOps::new("http://localhost:6334").unwrap();
580 let collection = "test_quant_idempotent";
581
582 let _ = ops.delete_collection(collection).await;
584
585 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
587 .await
588 .unwrap();
589
590 assert!(ops.collection_exists(collection).await.unwrap());
591
592 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
594 .await
595 .unwrap();
596
597 ops.delete_collection(collection).await.unwrap();
599 }
600
601 #[tokio::test]
603 #[ignore = "requires a live Qdrant instance at localhost:6334"]
604 async fn delete_by_ids_empty_no_network_call() {
605 let ops = QdrantOps::new("http://localhost:6334").unwrap();
606 let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
608 assert!(result.is_ok());
609 }
610
611 #[tokio::test]
613 #[ignore = "requires a live Qdrant instance at localhost:6334"]
614 async fn ensure_collection_idempotent_same_size() {
615 let ops = QdrantOps::new("http://localhost:6334").unwrap();
616 let collection = "test_ensure_idempotent";
617
618 let _ = ops.delete_collection(collection).await;
619
620 ops.ensure_collection(collection, 128).await.unwrap();
621 assert!(ops.collection_exists(collection).await.unwrap());
622
623 ops.ensure_collection(collection, 128).await.unwrap();
625 assert!(ops.collection_exists(collection).await.unwrap());
626
627 ops.delete_collection(collection).await.unwrap();
628 }
629
630 #[tokio::test]
635 #[ignore = "requires a live Qdrant instance at localhost:6334"]
636 async fn ensure_collection_recreates_on_dimension_mismatch() {
637 let ops = QdrantOps::new("http://localhost:6334").unwrap();
638 let collection = "test_dim_mismatch";
639
640 let _ = ops.delete_collection(collection).await;
641
642 ops.ensure_collection(collection, 128).await.unwrap();
644 assert_eq!(
645 ops.get_collection_vector_size(collection).await.unwrap(),
646 Some(128)
647 );
648
649 ops.ensure_collection(collection, 256).await.unwrap();
651 assert_eq!(
652 ops.get_collection_vector_size(collection).await.unwrap(),
653 Some(256),
654 "collection must have been recreated with the new dimension"
655 );
656
657 ops.delete_collection(collection).await.unwrap();
658 }
659
660 #[tokio::test]
664 #[ignore = "requires a live Qdrant instance at localhost:6334"]
665 async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
666 let ops = QdrantOps::new("http://localhost:6334").unwrap();
667 let collection = "test_quant_dim_mismatch";
668
669 let _ = ops.delete_collection(collection).await;
670
671 ops.ensure_collection_with_quantization(collection, 128, &["language"])
672 .await
673 .unwrap();
674 assert_eq!(
675 ops.get_collection_vector_size(collection).await.unwrap(),
676 Some(128)
677 );
678
679 ops.ensure_collection_with_quantization(collection, 384, &["language"])
681 .await
682 .unwrap();
683 assert_eq!(
684 ops.get_collection_vector_size(collection).await.unwrap(),
685 Some(384),
686 "collection must have been recreated with the new dimension"
687 );
688
689 ops.delete_collection(collection).await.unwrap();
690 }
691}