1use std::collections::HashMap;
7
8use crate::vector_store::BoxFuture;
9use qdrant_client::Qdrant;
10use qdrant_client::qdrant::{
11 CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
12 PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
13 VectorParamsBuilder, value::Kind,
14};
15
16type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
17
18#[derive(Clone)]
20pub struct QdrantOps {
21 client: Qdrant,
22}
23
24impl std::fmt::Debug for QdrantOps {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 f.debug_struct("QdrantOps").finish_non_exhaustive()
27 }
28}
29
30impl QdrantOps {
31 pub fn new(url: &str) -> QdrantResult<Self> {
39 let client = Qdrant::from_url(url).build().map_err(Box::new)?;
40 Ok(Self { client })
41 }
42
43 #[must_use]
45 pub fn client(&self) -> &Qdrant {
46 &self.client
47 }
48
49 pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
58 if self
59 .client
60 .collection_exists(collection)
61 .await
62 .map_err(Box::new)?
63 {
64 let existing_size = self.get_collection_vector_size(collection).await?;
65 if existing_size == Some(vector_size) {
66 return Ok(());
67 }
68 tracing::warn!(
69 collection,
70 existing = ?existing_size,
71 required = vector_size,
72 "vector dimension mismatch — recreating collection (existing data will be lost)"
73 );
74 self.client
75 .delete_collection(collection)
76 .await
77 .map_err(Box::new)?;
78 }
79 self.client
80 .create_collection(
81 CreateCollectionBuilder::new(collection)
82 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
83 )
84 .await
85 .map_err(Box::new)?;
86 Ok(())
87 }
88
89 async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
96 let info = self
97 .client
98 .collection_info(collection)
99 .await
100 .map_err(Box::new)?;
101 let size = info
102 .result
103 .and_then(|r| r.config)
104 .and_then(|cfg| cfg.params)
105 .and_then(|params| params.vectors_config)
106 .and_then(|vc| vc.config)
107 .and_then(|cfg| match cfg {
108 qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
109 qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
111 });
112 Ok(size)
113 }
114
115 pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
121 self.client
122 .collection_exists(collection)
123 .await
124 .map_err(Box::new)
125 }
126
127 pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
133 self.client
134 .delete_collection(collection)
135 .await
136 .map_err(Box::new)?;
137 Ok(())
138 }
139
140 pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
146 self.client
147 .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
148 .await
149 .map_err(Box::new)?;
150 Ok(())
151 }
152
153 pub async fn search(
159 &self,
160 collection: &str,
161 vector: Vec<f32>,
162 limit: u64,
163 filter: Option<Filter>,
164 ) -> QdrantResult<Vec<ScoredPoint>> {
165 let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
166 if let Some(f) = filter {
167 builder = builder.filter(f);
168 }
169 let results = self.client.search_points(builder).await.map_err(Box::new)?;
170 Ok(results.result)
171 }
172
173 pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
179 if ids.is_empty() {
180 return Ok(());
181 }
182 self.client
183 .delete_points(
184 DeletePointsBuilder::new(collection)
185 .points(PointsIdsList { ids })
186 .wait(true),
187 )
188 .await
189 .map_err(Box::new)?;
190 Ok(())
191 }
192
193 pub async fn scroll_all(
201 &self,
202 collection: &str,
203 key_field: &str,
204 ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
205 let mut result = HashMap::new();
206 let mut offset: Option<PointId> = None;
207
208 loop {
209 let mut builder = ScrollPointsBuilder::new(collection)
210 .with_payload(true)
211 .with_vectors(false)
212 .limit(100);
213
214 if let Some(ref off) = offset {
215 builder = builder.offset(off.clone());
216 }
217
218 let response = self.client.scroll(builder).await.map_err(Box::new)?;
219
220 for point in &response.result {
221 let Some(key_val) = point.payload.get(key_field) else {
222 continue;
223 };
224 let Some(Kind::StringValue(key)) = &key_val.kind else {
225 continue;
226 };
227
228 let mut fields = HashMap::new();
229 for (k, val) in &point.payload {
230 if let Some(Kind::StringValue(s)) = &val.kind {
231 fields.insert(k.clone(), s.clone());
232 }
233 }
234 result.insert(key.clone(), fields);
235 }
236
237 match response.next_page_offset {
238 Some(next) => offset = Some(next),
239 None => break,
240 }
241 }
242
243 Ok(result)
244 }
245
246 pub async fn ensure_collection_with_quantization(
256 &self,
257 collection: &str,
258 vector_size: u64,
259 keyword_fields: &[&str],
260 ) -> Result<(), crate::VectorStoreError> {
261 use qdrant_client::qdrant::{
262 CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
263 };
264 if self
265 .client
266 .collection_exists(collection)
267 .await
268 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
269 {
270 let existing_size = self
271 .get_collection_vector_size(collection)
272 .await
273 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
274 if existing_size == Some(vector_size) {
275 return Ok(());
276 }
277 tracing::warn!(
278 collection,
279 existing = ?existing_size,
280 required = vector_size,
281 "vector dimension mismatch — recreating collection (existing data will be lost)"
282 );
283 self.client
284 .delete_collection(collection)
285 .await
286 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
287 }
288 self.client
289 .create_collection(
290 CreateCollectionBuilder::new(collection)
291 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
292 .quantization_config(ScalarQuantizationBuilder::default()),
293 )
294 .await
295 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
296
297 for field in keyword_fields {
298 self.client
299 .create_field_index(CreateFieldIndexCollectionBuilder::new(
300 collection,
301 *field,
302 FieldType::Keyword,
303 ))
304 .await
305 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
306 }
307 Ok(())
308 }
309
310 pub fn json_to_payload(
316 value: serde_json::Value,
317 ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
318 serde_json::from_value(value)
319 }
320}
321
322impl crate::vector_store::VectorStore for QdrantOps {
323 fn ensure_collection(
324 &self,
325 collection: &str,
326 vector_size: u64,
327 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
328 let collection = collection.to_owned();
329 Box::pin(async move {
330 self.ensure_collection(&collection, vector_size)
331 .await
332 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
333 })
334 }
335
336 fn collection_exists(
337 &self,
338 collection: &str,
339 ) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
340 let collection = collection.to_owned();
341 Box::pin(async move {
342 self.collection_exists(&collection)
343 .await
344 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
345 })
346 }
347
348 fn delete_collection(
349 &self,
350 collection: &str,
351 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
352 let collection = collection.to_owned();
353 Box::pin(async move {
354 self.delete_collection(&collection)
355 .await
356 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
357 })
358 }
359
360 fn upsert(
361 &self,
362 collection: &str,
363 points: Vec<crate::VectorPoint>,
364 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
365 let collection = collection.to_owned();
366 Box::pin(async move {
367 let qdrant_points: Vec<PointStruct> = points
368 .into_iter()
369 .map(|p| {
370 let payload: HashMap<String, qdrant_client::qdrant::Value> =
371 serde_json::from_value(serde_json::Value::Object(
372 p.payload.into_iter().collect(),
373 ))
374 .unwrap_or_default();
375 PointStruct::new(p.id, p.vector, payload)
376 })
377 .collect();
378 self.upsert(&collection, qdrant_points)
379 .await
380 .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
381 })
382 }
383
384 fn search(
385 &self,
386 collection: &str,
387 vector: Vec<f32>,
388 limit: u64,
389 filter: Option<crate::VectorFilter>,
390 ) -> BoxFuture<'_, Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>> {
391 let collection = collection.to_owned();
392 Box::pin(async move {
393 let qdrant_filter = filter.map(vector_filter_to_qdrant);
394 let results = self
395 .search(&collection, vector, limit, qdrant_filter)
396 .await
397 .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
398 Ok(results.into_iter().map(scored_point_to_vector).collect())
399 })
400 }
401
402 fn delete_by_ids(
403 &self,
404 collection: &str,
405 ids: Vec<String>,
406 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
407 let collection = collection.to_owned();
408 Box::pin(async move {
409 let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
410 self.delete_by_ids(&collection, point_ids)
411 .await
412 .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
413 })
414 }
415
416 fn scroll_all(
417 &self,
418 collection: &str,
419 key_field: &str,
420 ) -> BoxFuture<'_, Result<HashMap<String, HashMap<String, String>>, crate::VectorStoreError>>
421 {
422 let collection = collection.to_owned();
423 let key_field = key_field.to_owned();
424 Box::pin(async move {
425 self.scroll_all(&collection, &key_field)
426 .await
427 .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
428 })
429 }
430
431 fn health_check(&self) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
432 Box::pin(async move {
433 self.client
434 .health_check()
435 .await
436 .map(|_| true)
437 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
438 })
439 }
440
441 fn create_keyword_indexes(
442 &self,
443 collection: &str,
444 fields: &[&str],
445 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
446 use qdrant_client::qdrant::{CreateFieldIndexCollectionBuilder, FieldType};
447 let collection = collection.to_owned();
448 let fields: Vec<String> = fields.iter().map(|f| (*f).to_owned()).collect();
449 Box::pin(async move {
450 for field in &fields {
451 self.client
452 .create_field_index(CreateFieldIndexCollectionBuilder::new(
453 &collection,
454 field.as_str(),
455 FieldType::Keyword,
456 ))
457 .await
458 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
459 }
460 Ok(())
461 })
462 }
463}
464
465fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
466 let must: Vec<_> = filter
467 .must
468 .into_iter()
469 .map(field_condition_to_qdrant)
470 .collect();
471 let must_not: Vec<_> = filter
472 .must_not
473 .into_iter()
474 .map(field_condition_to_qdrant)
475 .collect();
476
477 let mut f = Filter::default();
478 if !must.is_empty() {
479 f.must = must;
480 }
481 if !must_not.is_empty() {
482 f.must_not = must_not;
483 }
484 f
485}
486
487fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
488 match cond.value {
489 crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
490 crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
491 }
492}
493
494fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
495 let payload: HashMap<String, serde_json::Value> = point
496 .payload
497 .into_iter()
498 .filter_map(|(k, v)| {
499 let json_val = match v.kind? {
500 Kind::StringValue(s) => serde_json::Value::String(s),
501 Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
502 Kind::DoubleValue(d) => {
503 serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
504 }
505 Kind::BoolValue(b) => serde_json::Value::Bool(b),
506 _ => return None,
507 };
508 Some((k, json_val))
509 })
510 .collect();
511
512 let id = match point.id.and_then(|pid| pid.point_id_options) {
513 Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
514 Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
515 None => String::new(),
516 };
517
518 crate::ScoredVectorPoint {
519 id,
520 score: point.score,
521 payload,
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528
529 #[test]
530 fn new_valid_url() {
531 let ops = QdrantOps::new("http://localhost:6334");
532 assert!(ops.is_ok());
533 }
534
535 #[test]
536 fn new_invalid_url() {
537 let ops = QdrantOps::new("not a valid url");
538 assert!(ops.is_err());
539 }
540
541 #[test]
542 fn debug_format() {
543 let ops = QdrantOps::new("http://localhost:6334").unwrap();
544 let dbg = format!("{ops:?}");
545 assert!(dbg.contains("QdrantOps"));
546 }
547
548 #[test]
549 fn json_to_payload_valid() {
550 let value = serde_json::json!({"key": "value", "num": 42});
551 let result = QdrantOps::json_to_payload(value);
552 assert!(result.is_ok());
553 }
554
555 #[test]
556 fn json_to_payload_empty() {
557 let result = QdrantOps::json_to_payload(serde_json::json!({}));
558 assert!(result.is_ok());
559 assert!(result.unwrap().is_empty());
560 }
561
562 #[test]
563 fn delete_by_ids_empty_is_ok_sync() {
564 let ops = QdrantOps::new("http://localhost:6334");
568 assert!(ops.is_ok());
569 }
570
571 #[tokio::test]
573 #[ignore = "requires a live Qdrant instance at localhost:6334"]
574 async fn ensure_collection_with_quantization_idempotent() {
575 let ops = QdrantOps::new("http://localhost:6334").unwrap();
576 let collection = "test_quant_idempotent";
577
578 let _ = ops.delete_collection(collection).await;
580
581 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
583 .await
584 .unwrap();
585
586 assert!(ops.collection_exists(collection).await.unwrap());
587
588 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
590 .await
591 .unwrap();
592
593 ops.delete_collection(collection).await.unwrap();
595 }
596
597 #[tokio::test]
599 #[ignore = "requires a live Qdrant instance at localhost:6334"]
600 async fn delete_by_ids_empty_no_network_call() {
601 let ops = QdrantOps::new("http://localhost:6334").unwrap();
602 let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
604 assert!(result.is_ok());
605 }
606
607 #[tokio::test]
609 #[ignore = "requires a live Qdrant instance at localhost:6334"]
610 async fn ensure_collection_idempotent_same_size() {
611 let ops = QdrantOps::new("http://localhost:6334").unwrap();
612 let collection = "test_ensure_idempotent";
613
614 let _ = ops.delete_collection(collection).await;
615
616 ops.ensure_collection(collection, 128).await.unwrap();
617 assert!(ops.collection_exists(collection).await.unwrap());
618
619 ops.ensure_collection(collection, 128).await.unwrap();
621 assert!(ops.collection_exists(collection).await.unwrap());
622
623 ops.delete_collection(collection).await.unwrap();
624 }
625
626 #[tokio::test]
631 #[ignore = "requires a live Qdrant instance at localhost:6334"]
632 async fn ensure_collection_recreates_on_dimension_mismatch() {
633 let ops = QdrantOps::new("http://localhost:6334").unwrap();
634 let collection = "test_dim_mismatch";
635
636 let _ = ops.delete_collection(collection).await;
637
638 ops.ensure_collection(collection, 128).await.unwrap();
640 assert_eq!(
641 ops.get_collection_vector_size(collection).await.unwrap(),
642 Some(128)
643 );
644
645 ops.ensure_collection(collection, 256).await.unwrap();
647 assert_eq!(
648 ops.get_collection_vector_size(collection).await.unwrap(),
649 Some(256),
650 "collection must have been recreated with the new dimension"
651 );
652
653 ops.delete_collection(collection).await.unwrap();
654 }
655
656 #[tokio::test]
660 #[ignore = "requires a live Qdrant instance at localhost:6334"]
661 async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
662 let ops = QdrantOps::new("http://localhost:6334").unwrap();
663 let collection = "test_quant_dim_mismatch";
664
665 let _ = ops.delete_collection(collection).await;
666
667 ops.ensure_collection_with_quantization(collection, 128, &["language"])
668 .await
669 .unwrap();
670 assert_eq!(
671 ops.get_collection_vector_size(collection).await.unwrap(),
672 Some(128)
673 );
674
675 ops.ensure_collection_with_quantization(collection, 384, &["language"])
677 .await
678 .unwrap();
679 assert_eq!(
680 ops.get_collection_vector_size(collection).await.unwrap(),
681 Some(384),
682 "collection must have been recreated with the new dimension"
683 );
684
685 ops.delete_collection(collection).await.unwrap();
686 }
687}