1use std::collections::HashMap;
7
8use crate::vector_store::BoxFuture;
9use qdrant_client::Qdrant;
10use qdrant_client::qdrant::{
11 CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
12 PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
13 VectorParamsBuilder, value::Kind,
14};
15
16type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
17
18#[derive(Clone)]
20pub struct QdrantOps {
21 client: Qdrant,
22}
23
24impl std::fmt::Debug for QdrantOps {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 f.debug_struct("QdrantOps").finish_non_exhaustive()
27 }
28}
29
30impl QdrantOps {
31 pub fn new(url: &str) -> QdrantResult<Self> {
37 let client = Qdrant::from_url(url).build().map_err(Box::new)?;
38 Ok(Self { client })
39 }
40
41 #[must_use]
43 pub fn client(&self) -> &Qdrant {
44 &self.client
45 }
46
47 pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
56 if self
57 .client
58 .collection_exists(collection)
59 .await
60 .map_err(Box::new)?
61 {
62 let existing_size = self.get_collection_vector_size(collection).await?;
63 if existing_size == Some(vector_size) {
64 return Ok(());
65 }
66 tracing::warn!(
67 collection,
68 existing = ?existing_size,
69 required = vector_size,
70 "vector dimension mismatch — recreating collection (existing data will be lost)"
71 );
72 self.client
73 .delete_collection(collection)
74 .await
75 .map_err(Box::new)?;
76 }
77 self.client
78 .create_collection(
79 CreateCollectionBuilder::new(collection)
80 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
81 )
82 .await
83 .map_err(Box::new)?;
84 Ok(())
85 }
86
87 async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
94 let info = self
95 .client
96 .collection_info(collection)
97 .await
98 .map_err(Box::new)?;
99 let size = info
100 .result
101 .and_then(|r| r.config)
102 .and_then(|cfg| cfg.params)
103 .and_then(|params| params.vectors_config)
104 .and_then(|vc| vc.config)
105 .and_then(|cfg| match cfg {
106 qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
107 qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
109 });
110 Ok(size)
111 }
112
113 pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
119 self.client
120 .collection_exists(collection)
121 .await
122 .map_err(Box::new)
123 }
124
125 pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
131 self.client
132 .delete_collection(collection)
133 .await
134 .map_err(Box::new)?;
135 Ok(())
136 }
137
138 pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
144 self.client
145 .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
146 .await
147 .map_err(Box::new)?;
148 Ok(())
149 }
150
151 pub async fn search(
157 &self,
158 collection: &str,
159 vector: Vec<f32>,
160 limit: u64,
161 filter: Option<Filter>,
162 ) -> QdrantResult<Vec<ScoredPoint>> {
163 let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
164 if let Some(f) = filter {
165 builder = builder.filter(f);
166 }
167 let results = self.client.search_points(builder).await.map_err(Box::new)?;
168 Ok(results.result)
169 }
170
171 pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
177 if ids.is_empty() {
178 return Ok(());
179 }
180 self.client
181 .delete_points(
182 DeletePointsBuilder::new(collection)
183 .points(PointsIdsList { ids })
184 .wait(true),
185 )
186 .await
187 .map_err(Box::new)?;
188 Ok(())
189 }
190
191 pub async fn scroll_all(
199 &self,
200 collection: &str,
201 key_field: &str,
202 ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
203 let mut result = HashMap::new();
204 let mut offset: Option<PointId> = None;
205
206 loop {
207 let mut builder = ScrollPointsBuilder::new(collection)
208 .with_payload(true)
209 .with_vectors(false)
210 .limit(100);
211
212 if let Some(ref off) = offset {
213 builder = builder.offset(off.clone());
214 }
215
216 let response = self.client.scroll(builder).await.map_err(Box::new)?;
217
218 for point in &response.result {
219 let Some(key_val) = point.payload.get(key_field) else {
220 continue;
221 };
222 let Some(Kind::StringValue(key)) = &key_val.kind else {
223 continue;
224 };
225
226 let mut fields = HashMap::new();
227 for (k, val) in &point.payload {
228 if let Some(Kind::StringValue(s)) = &val.kind {
229 fields.insert(k.clone(), s.clone());
230 }
231 }
232 result.insert(key.clone(), fields);
233 }
234
235 match response.next_page_offset {
236 Some(next) => offset = Some(next),
237 None => break,
238 }
239 }
240
241 Ok(result)
242 }
243
244 pub async fn ensure_collection_with_quantization(
254 &self,
255 collection: &str,
256 vector_size: u64,
257 keyword_fields: &[&str],
258 ) -> Result<(), crate::VectorStoreError> {
259 use qdrant_client::qdrant::{
260 CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
261 };
262 if self
263 .client
264 .collection_exists(collection)
265 .await
266 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
267 {
268 let existing_size = self
269 .get_collection_vector_size(collection)
270 .await
271 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
272 if existing_size == Some(vector_size) {
273 return Ok(());
274 }
275 tracing::warn!(
276 collection,
277 existing = ?existing_size,
278 required = vector_size,
279 "vector dimension mismatch — recreating collection (existing data will be lost)"
280 );
281 self.client
282 .delete_collection(collection)
283 .await
284 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
285 }
286 self.client
287 .create_collection(
288 CreateCollectionBuilder::new(collection)
289 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
290 .quantization_config(ScalarQuantizationBuilder::default()),
291 )
292 .await
293 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
294
295 for field in keyword_fields {
296 self.client
297 .create_field_index(CreateFieldIndexCollectionBuilder::new(
298 collection,
299 *field,
300 FieldType::Keyword,
301 ))
302 .await
303 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
304 }
305 Ok(())
306 }
307
308 pub fn json_to_payload(
314 value: serde_json::Value,
315 ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
316 serde_json::from_value(value)
317 }
318}
319
320impl crate::vector_store::VectorStore for QdrantOps {
321 fn ensure_collection(
322 &self,
323 collection: &str,
324 vector_size: u64,
325 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
326 let collection = collection.to_owned();
327 Box::pin(async move {
328 self.ensure_collection(&collection, vector_size)
329 .await
330 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
331 })
332 }
333
334 fn collection_exists(
335 &self,
336 collection: &str,
337 ) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
338 let collection = collection.to_owned();
339 Box::pin(async move {
340 self.collection_exists(&collection)
341 .await
342 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
343 })
344 }
345
346 fn delete_collection(
347 &self,
348 collection: &str,
349 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
350 let collection = collection.to_owned();
351 Box::pin(async move {
352 self.delete_collection(&collection)
353 .await
354 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
355 })
356 }
357
358 fn upsert(
359 &self,
360 collection: &str,
361 points: Vec<crate::VectorPoint>,
362 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
363 let collection = collection.to_owned();
364 Box::pin(async move {
365 let qdrant_points: Vec<PointStruct> = points
366 .into_iter()
367 .map(|p| {
368 let payload: HashMap<String, qdrant_client::qdrant::Value> =
369 serde_json::from_value(serde_json::Value::Object(
370 p.payload.into_iter().collect(),
371 ))
372 .unwrap_or_default();
373 PointStruct::new(p.id, p.vector, payload)
374 })
375 .collect();
376 self.upsert(&collection, qdrant_points)
377 .await
378 .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
379 })
380 }
381
382 fn search(
383 &self,
384 collection: &str,
385 vector: Vec<f32>,
386 limit: u64,
387 filter: Option<crate::VectorFilter>,
388 ) -> BoxFuture<'_, Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>> {
389 let collection = collection.to_owned();
390 Box::pin(async move {
391 let qdrant_filter = filter.map(vector_filter_to_qdrant);
392 let results = self
393 .search(&collection, vector, limit, qdrant_filter)
394 .await
395 .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
396 Ok(results.into_iter().map(scored_point_to_vector).collect())
397 })
398 }
399
400 fn delete_by_ids(
401 &self,
402 collection: &str,
403 ids: Vec<String>,
404 ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
405 let collection = collection.to_owned();
406 Box::pin(async move {
407 let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
408 self.delete_by_ids(&collection, point_ids)
409 .await
410 .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
411 })
412 }
413
414 fn scroll_all(
415 &self,
416 collection: &str,
417 key_field: &str,
418 ) -> BoxFuture<'_, Result<HashMap<String, HashMap<String, String>>, crate::VectorStoreError>>
419 {
420 let collection = collection.to_owned();
421 let key_field = key_field.to_owned();
422 Box::pin(async move {
423 self.scroll_all(&collection, &key_field)
424 .await
425 .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
426 })
427 }
428
429 fn health_check(&self) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
430 Box::pin(async move {
431 self.client
432 .health_check()
433 .await
434 .map(|_| true)
435 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
436 })
437 }
438}
439
440fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
441 let must: Vec<_> = filter
442 .must
443 .into_iter()
444 .map(field_condition_to_qdrant)
445 .collect();
446 let must_not: Vec<_> = filter
447 .must_not
448 .into_iter()
449 .map(field_condition_to_qdrant)
450 .collect();
451
452 let mut f = Filter::default();
453 if !must.is_empty() {
454 f.must = must;
455 }
456 if !must_not.is_empty() {
457 f.must_not = must_not;
458 }
459 f
460}
461
462fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
463 match cond.value {
464 crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
465 crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
466 }
467}
468
469fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
470 let payload: HashMap<String, serde_json::Value> = point
471 .payload
472 .into_iter()
473 .filter_map(|(k, v)| {
474 let json_val = match v.kind? {
475 Kind::StringValue(s) => serde_json::Value::String(s),
476 Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
477 Kind::DoubleValue(d) => {
478 serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
479 }
480 Kind::BoolValue(b) => serde_json::Value::Bool(b),
481 _ => return None,
482 };
483 Some((k, json_val))
484 })
485 .collect();
486
487 let id = match point.id.and_then(|pid| pid.point_id_options) {
488 Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
489 Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
490 None => String::new(),
491 };
492
493 crate::ScoredVectorPoint {
494 id,
495 score: point.score,
496 payload,
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503
504 #[test]
505 fn new_valid_url() {
506 let ops = QdrantOps::new("http://localhost:6334");
507 assert!(ops.is_ok());
508 }
509
510 #[test]
511 fn new_invalid_url() {
512 let ops = QdrantOps::new("not a valid url");
513 assert!(ops.is_err());
514 }
515
516 #[test]
517 fn debug_format() {
518 let ops = QdrantOps::new("http://localhost:6334").unwrap();
519 let dbg = format!("{ops:?}");
520 assert!(dbg.contains("QdrantOps"));
521 }
522
523 #[test]
524 fn json_to_payload_valid() {
525 let value = serde_json::json!({"key": "value", "num": 42});
526 let result = QdrantOps::json_to_payload(value);
527 assert!(result.is_ok());
528 }
529
530 #[test]
531 fn json_to_payload_empty() {
532 let result = QdrantOps::json_to_payload(serde_json::json!({}));
533 assert!(result.is_ok());
534 assert!(result.unwrap().is_empty());
535 }
536
537 #[test]
538 fn delete_by_ids_empty_is_ok_sync() {
539 let ops = QdrantOps::new("http://localhost:6334");
543 assert!(ops.is_ok());
544 }
545
546 #[tokio::test]
548 #[ignore = "requires a live Qdrant instance at localhost:6334"]
549 async fn ensure_collection_with_quantization_idempotent() {
550 let ops = QdrantOps::new("http://localhost:6334").unwrap();
551 let collection = "test_quant_idempotent";
552
553 let _ = ops.delete_collection(collection).await;
555
556 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
558 .await
559 .unwrap();
560
561 assert!(ops.collection_exists(collection).await.unwrap());
562
563 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
565 .await
566 .unwrap();
567
568 ops.delete_collection(collection).await.unwrap();
570 }
571
572 #[tokio::test]
574 #[ignore = "requires a live Qdrant instance at localhost:6334"]
575 async fn delete_by_ids_empty_no_network_call() {
576 let ops = QdrantOps::new("http://localhost:6334").unwrap();
577 let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
579 assert!(result.is_ok());
580 }
581
582 #[tokio::test]
584 #[ignore = "requires a live Qdrant instance at localhost:6334"]
585 async fn ensure_collection_idempotent_same_size() {
586 let ops = QdrantOps::new("http://localhost:6334").unwrap();
587 let collection = "test_ensure_idempotent";
588
589 let _ = ops.delete_collection(collection).await;
590
591 ops.ensure_collection(collection, 128).await.unwrap();
592 assert!(ops.collection_exists(collection).await.unwrap());
593
594 ops.ensure_collection(collection, 128).await.unwrap();
596 assert!(ops.collection_exists(collection).await.unwrap());
597
598 ops.delete_collection(collection).await.unwrap();
599 }
600
601 #[tokio::test]
606 #[ignore = "requires a live Qdrant instance at localhost:6334"]
607 async fn ensure_collection_recreates_on_dimension_mismatch() {
608 let ops = QdrantOps::new("http://localhost:6334").unwrap();
609 let collection = "test_dim_mismatch";
610
611 let _ = ops.delete_collection(collection).await;
612
613 ops.ensure_collection(collection, 128).await.unwrap();
615 assert_eq!(
616 ops.get_collection_vector_size(collection).await.unwrap(),
617 Some(128)
618 );
619
620 ops.ensure_collection(collection, 256).await.unwrap();
622 assert_eq!(
623 ops.get_collection_vector_size(collection).await.unwrap(),
624 Some(256),
625 "collection must have been recreated with the new dimension"
626 );
627
628 ops.delete_collection(collection).await.unwrap();
629 }
630
631 #[tokio::test]
635 #[ignore = "requires a live Qdrant instance at localhost:6334"]
636 async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
637 let ops = QdrantOps::new("http://localhost:6334").unwrap();
638 let collection = "test_quant_dim_mismatch";
639
640 let _ = ops.delete_collection(collection).await;
641
642 ops.ensure_collection_with_quantization(collection, 128, &["language"])
643 .await
644 .unwrap();
645 assert_eq!(
646 ops.get_collection_vector_size(collection).await.unwrap(),
647 Some(128)
648 );
649
650 ops.ensure_collection_with_quantization(collection, 384, &["language"])
652 .await
653 .unwrap();
654 assert_eq!(
655 ops.get_collection_vector_size(collection).await.unwrap(),
656 Some(384),
657 "collection must have been recreated with the new dimension"
658 );
659
660 ops.delete_collection(collection).await.unwrap();
661 }
662}