1use std::collections::HashMap;
7
8use crate::vector_store::BoxFuture;
9use qdrant_client::Qdrant;
10use qdrant_client::qdrant::{
11 CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
12 PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
13 VectorParamsBuilder, value::Kind,
14};
15
16type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
17
18#[derive(Clone)]
20pub struct QdrantOps {
21 client: Qdrant,
22}
23
24impl std::fmt::Debug for QdrantOps {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 f.debug_struct("QdrantOps").finish_non_exhaustive()
27 }
28}
29
30impl QdrantOps {
31 pub fn new(url: &str) -> QdrantResult<Self> {
39 let client = Qdrant::from_url(url).build().map_err(Box::new)?;
40 Ok(Self { client })
41 }
42
43 #[must_use]
45 pub fn client(&self) -> &Qdrant {
46 &self.client
47 }
48
49 pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
58 if self
59 .client
60 .collection_exists(collection)
61 .await
62 .map_err(Box::new)?
63 {
64 let existing_size = self.get_collection_vector_size(collection).await?;
65 if existing_size == Some(vector_size) {
66 return Ok(());
67 }
68 tracing::warn!(
69 collection,
70 existing = ?existing_size,
71 required = vector_size,
72 "vector dimension mismatch — recreating collection (existing data will be lost)"
73 );
74 self.client
75 .delete_collection(collection)
76 .await
77 .map_err(Box::new)?;
78 }
79 self.client
80 .create_collection(
81 CreateCollectionBuilder::new(collection)
82 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
83 )
84 .await
85 .map_err(Box::new)?;
86 Ok(())
87 }
88
89 async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
96 let info = self
97 .client
98 .collection_info(collection)
99 .await
100 .map_err(Box::new)?;
101 let size = info
102 .result
103 .and_then(|r| r.config)
104 .and_then(|cfg| cfg.params)
105 .and_then(|params| params.vectors_config)
106 .and_then(|vc| vc.config)
107 .and_then(|cfg| match cfg {
108 qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
109 qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
111 });
112 Ok(size)
113 }
114
115 pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
121 self.client
122 .collection_exists(collection)
123 .await
124 .map_err(Box::new)
125 }
126
127 pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
133 self.client
134 .delete_collection(collection)
135 .await
136 .map_err(Box::new)?;
137 Ok(())
138 }
139
140 pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
146 self.client
149 .upsert_points(UpsertPointsBuilder::new(collection, points).wait(false))
150 .await
151 .map_err(Box::new)?;
152 Ok(())
153 }
154
155 pub async fn search(
161 &self,
162 collection: &str,
163 vector: Vec<f32>,
164 limit: u64,
165 filter: Option<Filter>,
166 ) -> QdrantResult<Vec<ScoredPoint>> {
167 let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
168 if let Some(f) = filter {
169 builder = builder.filter(f);
170 }
171 let results = self.client.search_points(builder).await.map_err(Box::new)?;
172 Ok(results.result)
173 }
174
175 pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
181 if ids.is_empty() {
182 return Ok(());
183 }
184 self.client
185 .delete_points(
186 DeletePointsBuilder::new(collection)
187 .points(PointsIdsList { ids })
188 .wait(true),
189 )
190 .await
191 .map_err(Box::new)?;
192 Ok(())
193 }
194
195 pub async fn scroll_all(
203 &self,
204 collection: &str,
205 key_field: &str,
206 ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
207 let mut result = HashMap::new();
208 let mut offset: Option<PointId> = None;
209
210 loop {
211 let mut builder = ScrollPointsBuilder::new(collection)
212 .with_payload(true)
213 .with_vectors(false)
214 .limit(100);
215
216 if let Some(ref off) = offset {
217 builder = builder.offset(off.clone());
218 }
219
220 let response = self.client.scroll(builder).await.map_err(Box::new)?;
221
222 for point in &response.result {
223 let Some(key_val) = point.payload.get(key_field) else {
224 continue;
225 };
226 let Some(Kind::StringValue(key)) = &key_val.kind else {
227 continue;
228 };
229
230 let mut fields = HashMap::new();
231 for (k, val) in &point.payload {
232 if let Some(Kind::StringValue(s)) = &val.kind {
233 fields.insert(k.clone(), s.clone());
234 }
235 }
236 result.insert(key.clone(), fields);
237 }
238
239 match response.next_page_offset {
240 Some(next) => offset = Some(next),
241 None => break,
242 }
243 }
244
245 Ok(result)
246 }
247
248 pub async fn ensure_collection_with_quantization(
258 &self,
259 collection: &str,
260 vector_size: u64,
261 keyword_fields: &[&str],
262 ) -> Result<(), crate::VectorStoreError> {
263 use qdrant_client::qdrant::{
264 CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
265 };
266 if self
267 .client
268 .collection_exists(collection)
269 .await
270 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
271 {
272 let existing_size = self
273 .get_collection_vector_size(collection)
274 .await
275 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
276 if existing_size == Some(vector_size) {
277 return Ok(());
278 }
279 tracing::warn!(
280 collection,
281 existing = ?existing_size,
282 required = vector_size,
283 "vector dimension mismatch — recreating collection (existing data will be lost)"
284 );
285 self.client
286 .delete_collection(collection)
287 .await
288 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
289 }
290 self.client
291 .create_collection(
292 CreateCollectionBuilder::new(collection)
293 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
294 .quantization_config(ScalarQuantizationBuilder::default()),
295 )
296 .await
297 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
298
299 for field in keyword_fields {
300 self.client
301 .create_field_index(CreateFieldIndexCollectionBuilder::new(
302 collection,
303 *field,
304 FieldType::Keyword,
305 ))
306 .await
307 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
308 }
309 Ok(())
310 }
311
312 pub fn json_to_payload(
318 value: serde_json::Value,
319 ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
320 serde_json::from_value(value)
321 }
322}
323
324impl crate::vector_store::VectorStore for QdrantOps {
325 fn ensure_collection(
326 &self,
327 collection: &str,
328 vector_size: u64,
329 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
330 let collection = collection.to_owned();
331 Box::pin(async move {
332 self.ensure_collection(&collection, vector_size)
333 .await
334 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
335 })
336 }
337
338 fn collection_exists(
339 &self,
340 collection: &str,
341 ) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
342 let collection = collection.to_owned();
343 Box::pin(async move {
344 self.collection_exists(&collection)
345 .await
346 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
347 })
348 }
349
350 fn delete_collection(
351 &self,
352 collection: &str,
353 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
354 let collection = collection.to_owned();
355 Box::pin(async move {
356 self.delete_collection(&collection)
357 .await
358 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
359 })
360 }
361
362 fn upsert(
363 &self,
364 collection: &str,
365 points: Vec<crate::VectorPoint>,
366 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
367 let collection = collection.to_owned();
368 Box::pin(async move {
369 let qdrant_points: Vec<PointStruct> = points
370 .into_iter()
371 .map(|p| {
372 let payload: HashMap<String, qdrant_client::qdrant::Value> =
373 serde_json::from_value(serde_json::Value::Object(
374 p.payload.into_iter().collect(),
375 ))
376 .unwrap_or_default();
377 PointStruct::new(p.id, p.vector, payload)
378 })
379 .collect();
380 self.upsert(&collection, qdrant_points)
381 .await
382 .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
383 })
384 }
385
386 fn search(
387 &self,
388 collection: &str,
389 vector: Vec<f32>,
390 limit: u64,
391 filter: Option<crate::VectorFilter>,
392 ) -> BoxFuture<'_, Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>> {
393 let collection = collection.to_owned();
394 Box::pin(async move {
395 let qdrant_filter = filter.map(vector_filter_to_qdrant);
396 let results = self
397 .search(&collection, vector, limit, qdrant_filter)
398 .await
399 .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
400 Ok(results.into_iter().map(scored_point_to_vector).collect())
401 })
402 }
403
404 fn delete_by_ids(
405 &self,
406 collection: &str,
407 ids: Vec<String>,
408 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
409 let collection = collection.to_owned();
410 Box::pin(async move {
411 let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
412 self.delete_by_ids(&collection, point_ids)
413 .await
414 .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
415 })
416 }
417
418 fn scroll_all(
419 &self,
420 collection: &str,
421 key_field: &str,
422 ) -> BoxFuture<'_, Result<HashMap<String, HashMap<String, String>>, crate::VectorStoreError>>
423 {
424 let collection = collection.to_owned();
425 let key_field = key_field.to_owned();
426 Box::pin(async move {
427 self.scroll_all(&collection, &key_field)
428 .await
429 .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
430 })
431 }
432
433 fn health_check(&self) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
434 Box::pin(async move {
435 self.client
436 .health_check()
437 .await
438 .map(|_| true)
439 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
440 })
441 }
442}
443
444fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
445 let must: Vec<_> = filter
446 .must
447 .into_iter()
448 .map(field_condition_to_qdrant)
449 .collect();
450 let must_not: Vec<_> = filter
451 .must_not
452 .into_iter()
453 .map(field_condition_to_qdrant)
454 .collect();
455
456 let mut f = Filter::default();
457 if !must.is_empty() {
458 f.must = must;
459 }
460 if !must_not.is_empty() {
461 f.must_not = must_not;
462 }
463 f
464}
465
466fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
467 match cond.value {
468 crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
469 crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
470 }
471}
472
473fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
474 let payload: HashMap<String, serde_json::Value> = point
475 .payload
476 .into_iter()
477 .filter_map(|(k, v)| {
478 let json_val = match v.kind? {
479 Kind::StringValue(s) => serde_json::Value::String(s),
480 Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
481 Kind::DoubleValue(d) => {
482 serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
483 }
484 Kind::BoolValue(b) => serde_json::Value::Bool(b),
485 _ => return None,
486 };
487 Some((k, json_val))
488 })
489 .collect();
490
491 let id = match point.id.and_then(|pid| pid.point_id_options) {
492 Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
493 Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
494 None => String::new(),
495 };
496
497 crate::ScoredVectorPoint {
498 id,
499 score: point.score,
500 payload,
501 }
502}
503
504#[cfg(test)]
505mod tests {
506 use super::*;
507
508 #[test]
509 fn new_valid_url() {
510 let ops = QdrantOps::new("http://localhost:6334");
511 assert!(ops.is_ok());
512 }
513
514 #[test]
515 fn new_invalid_url() {
516 let ops = QdrantOps::new("not a valid url");
517 assert!(ops.is_err());
518 }
519
520 #[test]
521 fn debug_format() {
522 let ops = QdrantOps::new("http://localhost:6334").unwrap();
523 let dbg = format!("{ops:?}");
524 assert!(dbg.contains("QdrantOps"));
525 }
526
527 #[test]
528 fn json_to_payload_valid() {
529 let value = serde_json::json!({"key": "value", "num": 42});
530 let result = QdrantOps::json_to_payload(value);
531 assert!(result.is_ok());
532 }
533
534 #[test]
535 fn json_to_payload_empty() {
536 let result = QdrantOps::json_to_payload(serde_json::json!({}));
537 assert!(result.is_ok());
538 assert!(result.unwrap().is_empty());
539 }
540
541 #[test]
542 fn delete_by_ids_empty_is_ok_sync() {
543 let ops = QdrantOps::new("http://localhost:6334");
547 assert!(ops.is_ok());
548 }
549
550 #[tokio::test]
552 #[ignore = "requires a live Qdrant instance at localhost:6334"]
553 async fn ensure_collection_with_quantization_idempotent() {
554 let ops = QdrantOps::new("http://localhost:6334").unwrap();
555 let collection = "test_quant_idempotent";
556
557 let _ = ops.delete_collection(collection).await;
559
560 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
562 .await
563 .unwrap();
564
565 assert!(ops.collection_exists(collection).await.unwrap());
566
567 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
569 .await
570 .unwrap();
571
572 ops.delete_collection(collection).await.unwrap();
574 }
575
576 #[tokio::test]
578 #[ignore = "requires a live Qdrant instance at localhost:6334"]
579 async fn delete_by_ids_empty_no_network_call() {
580 let ops = QdrantOps::new("http://localhost:6334").unwrap();
581 let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
583 assert!(result.is_ok());
584 }
585
586 #[tokio::test]
588 #[ignore = "requires a live Qdrant instance at localhost:6334"]
589 async fn ensure_collection_idempotent_same_size() {
590 let ops = QdrantOps::new("http://localhost:6334").unwrap();
591 let collection = "test_ensure_idempotent";
592
593 let _ = ops.delete_collection(collection).await;
594
595 ops.ensure_collection(collection, 128).await.unwrap();
596 assert!(ops.collection_exists(collection).await.unwrap());
597
598 ops.ensure_collection(collection, 128).await.unwrap();
600 assert!(ops.collection_exists(collection).await.unwrap());
601
602 ops.delete_collection(collection).await.unwrap();
603 }
604
605 #[tokio::test]
610 #[ignore = "requires a live Qdrant instance at localhost:6334"]
611 async fn ensure_collection_recreates_on_dimension_mismatch() {
612 let ops = QdrantOps::new("http://localhost:6334").unwrap();
613 let collection = "test_dim_mismatch";
614
615 let _ = ops.delete_collection(collection).await;
616
617 ops.ensure_collection(collection, 128).await.unwrap();
619 assert_eq!(
620 ops.get_collection_vector_size(collection).await.unwrap(),
621 Some(128)
622 );
623
624 ops.ensure_collection(collection, 256).await.unwrap();
626 assert_eq!(
627 ops.get_collection_vector_size(collection).await.unwrap(),
628 Some(256),
629 "collection must have been recreated with the new dimension"
630 );
631
632 ops.delete_collection(collection).await.unwrap();
633 }
634
635 #[tokio::test]
639 #[ignore = "requires a live Qdrant instance at localhost:6334"]
640 async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
641 let ops = QdrantOps::new("http://localhost:6334").unwrap();
642 let collection = "test_quant_dim_mismatch";
643
644 let _ = ops.delete_collection(collection).await;
645
646 ops.ensure_collection_with_quantization(collection, 128, &["language"])
647 .await
648 .unwrap();
649 assert_eq!(
650 ops.get_collection_vector_size(collection).await.unwrap(),
651 Some(128)
652 );
653
654 ops.ensure_collection_with_quantization(collection, 384, &["language"])
656 .await
657 .unwrap();
658 assert_eq!(
659 ops.get_collection_vector_size(collection).await.unwrap(),
660 Some(384),
661 "collection must have been recreated with the new dimension"
662 );
663
664 ops.delete_collection(collection).await.unwrap();
665 }
666}