1use std::collections::HashMap;
11
12use crate::vector_store::BoxFuture;
13use qdrant_client::Qdrant;
14use qdrant_client::qdrant::{
15 CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
16 PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
17 VectorParamsBuilder, value::Kind,
18};
19
20type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
21
22#[derive(Clone)]
24pub struct QdrantOps {
25 client: Qdrant,
26}
27
28impl std::fmt::Debug for QdrantOps {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 f.debug_struct("QdrantOps").finish_non_exhaustive()
31 }
32}
33
34impl QdrantOps {
35 pub fn new(url: &str) -> QdrantResult<Self> {
43 let client = Qdrant::from_url(url).build().map_err(Box::new)?;
44 Ok(Self { client })
45 }
46
47 #[must_use]
49 pub fn client(&self) -> &Qdrant {
50 &self.client
51 }
52
53 pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
62 if self
63 .client
64 .collection_exists(collection)
65 .await
66 .map_err(Box::new)?
67 {
68 let existing_size = self.get_collection_vector_size(collection).await?;
69 if existing_size == Some(vector_size) {
70 return Ok(());
71 }
72 tracing::warn!(
73 collection,
74 existing = ?existing_size,
75 required = vector_size,
76 "vector dimension mismatch — recreating collection (existing data will be lost)"
77 );
78 self.client
79 .delete_collection(collection)
80 .await
81 .map_err(Box::new)?;
82 }
83 self.client
84 .create_collection(
85 CreateCollectionBuilder::new(collection)
86 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
87 )
88 .await
89 .map_err(Box::new)?;
90 Ok(())
91 }
92
93 async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
100 let info = self
101 .client
102 .collection_info(collection)
103 .await
104 .map_err(Box::new)?;
105 let size = info
106 .result
107 .and_then(|r| r.config)
108 .and_then(|cfg| cfg.params)
109 .and_then(|params| params.vectors_config)
110 .and_then(|vc| vc.config)
111 .and_then(|cfg| match cfg {
112 qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
113 qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
115 });
116 Ok(size)
117 }
118
119 pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
125 self.client
126 .collection_exists(collection)
127 .await
128 .map_err(Box::new)
129 }
130
131 pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
137 self.client
138 .delete_collection(collection)
139 .await
140 .map_err(Box::new)?;
141 Ok(())
142 }
143
144 pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
150 self.client
151 .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
152 .await
153 .map_err(Box::new)?;
154 Ok(())
155 }
156
157 pub async fn search(
163 &self,
164 collection: &str,
165 vector: Vec<f32>,
166 limit: u64,
167 filter: Option<Filter>,
168 ) -> QdrantResult<Vec<ScoredPoint>> {
169 let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
170 if let Some(f) = filter {
171 builder = builder.filter(f);
172 }
173 let results = self.client.search_points(builder).await.map_err(Box::new)?;
174 Ok(results.result)
175 }
176
177 pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
183 if ids.is_empty() {
184 return Ok(());
185 }
186 self.client
187 .delete_points(
188 DeletePointsBuilder::new(collection)
189 .points(PointsIdsList { ids })
190 .wait(true),
191 )
192 .await
193 .map_err(Box::new)?;
194 Ok(())
195 }
196
197 pub async fn scroll_all(
205 &self,
206 collection: &str,
207 key_field: &str,
208 ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
209 let mut result = HashMap::new();
210 let mut offset: Option<PointId> = None;
211
212 loop {
213 let mut builder = ScrollPointsBuilder::new(collection)
214 .with_payload(true)
215 .with_vectors(false)
216 .limit(100);
217
218 if let Some(ref off) = offset {
219 builder = builder.offset(off.clone());
220 }
221
222 let response = self.client.scroll(builder).await.map_err(Box::new)?;
223
224 for point in &response.result {
225 let Some(key_val) = point.payload.get(key_field) else {
226 continue;
227 };
228 let Some(Kind::StringValue(key)) = &key_val.kind else {
229 continue;
230 };
231
232 let mut fields = HashMap::new();
233 for (k, val) in &point.payload {
234 if let Some(Kind::StringValue(s)) = &val.kind {
235 fields.insert(k.clone(), s.clone());
236 }
237 }
238 result.insert(key.clone(), fields);
239 }
240
241 match response.next_page_offset {
242 Some(next) => offset = Some(next),
243 None => break,
244 }
245 }
246
247 Ok(result)
248 }
249
250 pub async fn ensure_collection_with_quantization(
260 &self,
261 collection: &str,
262 vector_size: u64,
263 keyword_fields: &[&str],
264 ) -> Result<(), crate::VectorStoreError> {
265 use qdrant_client::qdrant::{
266 CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
267 };
268 if self
269 .client
270 .collection_exists(collection)
271 .await
272 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
273 {
274 let existing_size = self
275 .get_collection_vector_size(collection)
276 .await
277 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
278 if existing_size == Some(vector_size) {
279 return Ok(());
280 }
281 tracing::warn!(
282 collection,
283 existing = ?existing_size,
284 required = vector_size,
285 "vector dimension mismatch — recreating collection (existing data will be lost)"
286 );
287 self.client
288 .delete_collection(collection)
289 .await
290 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
291 }
292 self.client
293 .create_collection(
294 CreateCollectionBuilder::new(collection)
295 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
296 .quantization_config(ScalarQuantizationBuilder::default()),
297 )
298 .await
299 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
300
301 for field in keyword_fields {
302 self.client
303 .create_field_index(CreateFieldIndexCollectionBuilder::new(
304 collection,
305 *field,
306 FieldType::Keyword,
307 ))
308 .await
309 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
310 }
311 Ok(())
312 }
313
314 pub fn json_to_payload(
320 value: serde_json::Value,
321 ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
322 serde_json::from_value(value)
323 }
324}
325
326impl crate::vector_store::VectorStore for QdrantOps {
327 fn ensure_collection(
328 &self,
329 collection: &str,
330 vector_size: u64,
331 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
332 let collection = collection.to_owned();
333 Box::pin(async move {
334 self.ensure_collection(&collection, vector_size)
335 .await
336 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
337 })
338 }
339
340 fn collection_exists(
341 &self,
342 collection: &str,
343 ) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
344 let collection = collection.to_owned();
345 Box::pin(async move {
346 self.collection_exists(&collection)
347 .await
348 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
349 })
350 }
351
352 fn delete_collection(
353 &self,
354 collection: &str,
355 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
356 let collection = collection.to_owned();
357 Box::pin(async move {
358 self.delete_collection(&collection)
359 .await
360 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
361 })
362 }
363
364 fn upsert(
365 &self,
366 collection: &str,
367 points: Vec<crate::VectorPoint>,
368 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
369 let collection = collection.to_owned();
370 Box::pin(async move {
371 let qdrant_points: Vec<PointStruct> = points
372 .into_iter()
373 .map(|p| {
374 let payload: HashMap<String, qdrant_client::qdrant::Value> =
375 serde_json::from_value(serde_json::Value::Object(
376 p.payload.into_iter().collect(),
377 ))
378 .unwrap_or_default();
379 PointStruct::new(p.id, p.vector, payload)
380 })
381 .collect();
382 self.upsert(&collection, qdrant_points)
383 .await
384 .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
385 })
386 }
387
388 fn search(
389 &self,
390 collection: &str,
391 vector: Vec<f32>,
392 limit: u64,
393 filter: Option<crate::VectorFilter>,
394 ) -> BoxFuture<'_, Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>> {
395 let collection = collection.to_owned();
396 Box::pin(async move {
397 let qdrant_filter = filter.map(vector_filter_to_qdrant);
398 let results = self
399 .search(&collection, vector, limit, qdrant_filter)
400 .await
401 .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
402 Ok(results.into_iter().map(scored_point_to_vector).collect())
403 })
404 }
405
406 fn delete_by_ids(
407 &self,
408 collection: &str,
409 ids: Vec<String>,
410 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
411 let collection = collection.to_owned();
412 Box::pin(async move {
413 let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
414 self.delete_by_ids(&collection, point_ids)
415 .await
416 .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
417 })
418 }
419
420 fn scroll_all(
421 &self,
422 collection: &str,
423 key_field: &str,
424 ) -> BoxFuture<'_, Result<HashMap<String, HashMap<String, String>>, crate::VectorStoreError>>
425 {
426 let collection = collection.to_owned();
427 let key_field = key_field.to_owned();
428 Box::pin(async move {
429 self.scroll_all(&collection, &key_field)
430 .await
431 .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
432 })
433 }
434
435 fn health_check(&self) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
436 Box::pin(async move {
437 self.client
438 .health_check()
439 .await
440 .map(|_| true)
441 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
442 })
443 }
444
445 fn create_keyword_indexes(
446 &self,
447 collection: &str,
448 fields: &[&str],
449 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
450 use qdrant_client::qdrant::{CreateFieldIndexCollectionBuilder, FieldType};
451 let collection = collection.to_owned();
452 let fields: Vec<String> = fields.iter().map(|f| (*f).to_owned()).collect();
453 Box::pin(async move {
454 for field in &fields {
455 self.client
456 .create_field_index(CreateFieldIndexCollectionBuilder::new(
457 &collection,
458 field.as_str(),
459 FieldType::Keyword,
460 ))
461 .await
462 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
463 }
464 Ok(())
465 })
466 }
467}
468
469fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
470 let must: Vec<_> = filter
471 .must
472 .into_iter()
473 .map(field_condition_to_qdrant)
474 .collect();
475 let must_not: Vec<_> = filter
476 .must_not
477 .into_iter()
478 .map(field_condition_to_qdrant)
479 .collect();
480
481 let mut f = Filter::default();
482 if !must.is_empty() {
483 f.must = must;
484 }
485 if !must_not.is_empty() {
486 f.must_not = must_not;
487 }
488 f
489}
490
491fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
492 match cond.value {
493 crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
494 crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
495 }
496}
497
498fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
499 let payload: HashMap<String, serde_json::Value> = point
500 .payload
501 .into_iter()
502 .filter_map(|(k, v)| {
503 let json_val = match v.kind? {
504 Kind::StringValue(s) => serde_json::Value::String(s),
505 Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
506 Kind::DoubleValue(d) => {
507 serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
508 }
509 Kind::BoolValue(b) => serde_json::Value::Bool(b),
510 _ => return None,
511 };
512 Some((k, json_val))
513 })
514 .collect();
515
516 let id = match point.id.and_then(|pid| pid.point_id_options) {
517 Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
518 Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
519 None => String::new(),
520 };
521
522 crate::ScoredVectorPoint {
523 id,
524 score: point.score,
525 payload,
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532
533 #[test]
534 fn new_valid_url() {
535 let ops = QdrantOps::new("http://localhost:6334");
536 assert!(ops.is_ok());
537 }
538
539 #[test]
540 fn new_invalid_url() {
541 let ops = QdrantOps::new("not a valid url");
542 assert!(ops.is_err());
543 }
544
545 #[test]
546 fn debug_format() {
547 let ops = QdrantOps::new("http://localhost:6334").unwrap();
548 let dbg = format!("{ops:?}");
549 assert!(dbg.contains("QdrantOps"));
550 }
551
552 #[test]
553 fn json_to_payload_valid() {
554 let value = serde_json::json!({"key": "value", "num": 42});
555 let result = QdrantOps::json_to_payload(value);
556 assert!(result.is_ok());
557 }
558
559 #[test]
560 fn json_to_payload_empty() {
561 let result = QdrantOps::json_to_payload(serde_json::json!({}));
562 assert!(result.is_ok());
563 assert!(result.unwrap().is_empty());
564 }
565
566 #[test]
567 fn delete_by_ids_empty_is_ok_sync() {
568 let ops = QdrantOps::new("http://localhost:6334");
572 assert!(ops.is_ok());
573 }
574
575 #[tokio::test]
577 #[ignore = "requires a live Qdrant instance at localhost:6334"]
578 async fn ensure_collection_with_quantization_idempotent() {
579 let ops = QdrantOps::new("http://localhost:6334").unwrap();
580 let collection = "test_quant_idempotent";
581
582 let _ = ops.delete_collection(collection).await;
584
585 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
587 .await
588 .unwrap();
589
590 assert!(ops.collection_exists(collection).await.unwrap());
591
592 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
594 .await
595 .unwrap();
596
597 ops.delete_collection(collection).await.unwrap();
599 }
600
601 #[tokio::test]
603 #[ignore = "requires a live Qdrant instance at localhost:6334"]
604 async fn delete_by_ids_empty_no_network_call() {
605 let ops = QdrantOps::new("http://localhost:6334").unwrap();
606 let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
608 assert!(result.is_ok());
609 }
610
611 #[tokio::test]
613 #[ignore = "requires a live Qdrant instance at localhost:6334"]
614 async fn ensure_collection_idempotent_same_size() {
615 let ops = QdrantOps::new("http://localhost:6334").unwrap();
616 let collection = "test_ensure_idempotent";
617
618 let _ = ops.delete_collection(collection).await;
619
620 ops.ensure_collection(collection, 128).await.unwrap();
621 assert!(ops.collection_exists(collection).await.unwrap());
622
623 ops.ensure_collection(collection, 128).await.unwrap();
625 assert!(ops.collection_exists(collection).await.unwrap());
626
627 ops.delete_collection(collection).await.unwrap();
628 }
629
630 #[tokio::test]
635 #[ignore = "requires a live Qdrant instance at localhost:6334"]
636 async fn ensure_collection_recreates_on_dimension_mismatch() {
637 let ops = QdrantOps::new("http://localhost:6334").unwrap();
638 let collection = "test_dim_mismatch";
639
640 let _ = ops.delete_collection(collection).await;
641
642 ops.ensure_collection(collection, 128).await.unwrap();
644 assert_eq!(
645 ops.get_collection_vector_size(collection).await.unwrap(),
646 Some(128)
647 );
648
649 ops.ensure_collection(collection, 256).await.unwrap();
651 assert_eq!(
652 ops.get_collection_vector_size(collection).await.unwrap(),
653 Some(256),
654 "collection must have been recreated with the new dimension"
655 );
656
657 ops.delete_collection(collection).await.unwrap();
658 }
659
660 #[tokio::test]
664 #[ignore = "requires a live Qdrant instance at localhost:6334"]
665 async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
666 let ops = QdrantOps::new("http://localhost:6334").unwrap();
667 let collection = "test_quant_dim_mismatch";
668
669 let _ = ops.delete_collection(collection).await;
670
671 ops.ensure_collection_with_quantization(collection, 128, &["language"])
672 .await
673 .unwrap();
674 assert_eq!(
675 ops.get_collection_vector_size(collection).await.unwrap(),
676 Some(128)
677 );
678
679 ops.ensure_collection_with_quantization(collection, 384, &["language"])
681 .await
682 .unwrap();
683 assert_eq!(
684 ops.get_collection_vector_size(collection).await.unwrap(),
685 Some(384),
686 "collection must have been recreated with the new dimension"
687 );
688
689 ops.delete_collection(collection).await.unwrap();
690 }
691}