1use std::collections::HashMap;
7
8use qdrant_client::Qdrant;
9use qdrant_client::qdrant::{
10 CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
11 PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
12 VectorParamsBuilder, value::Kind,
13};
14
15type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
16
17#[derive(Clone)]
19pub struct QdrantOps {
20 client: Qdrant,
21}
22
23impl std::fmt::Debug for QdrantOps {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 f.debug_struct("QdrantOps").finish_non_exhaustive()
26 }
27}
28
29impl QdrantOps {
30 pub fn new(url: &str) -> QdrantResult<Self> {
36 let client = Qdrant::from_url(url).build().map_err(Box::new)?;
37 Ok(Self { client })
38 }
39
40 #[must_use]
42 pub fn client(&self) -> &Qdrant {
43 &self.client
44 }
45
46 pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
54 if self
55 .client
56 .collection_exists(collection)
57 .await
58 .map_err(Box::new)?
59 {
60 return Ok(());
61 }
62 self.client
63 .create_collection(
64 CreateCollectionBuilder::new(collection)
65 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
66 )
67 .await
68 .map_err(Box::new)?;
69 Ok(())
70 }
71
72 pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
78 self.client
79 .collection_exists(collection)
80 .await
81 .map_err(Box::new)
82 }
83
84 pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
90 self.client
91 .delete_collection(collection)
92 .await
93 .map_err(Box::new)?;
94 Ok(())
95 }
96
97 pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
103 self.client
104 .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
105 .await
106 .map_err(Box::new)?;
107 Ok(())
108 }
109
110 pub async fn search(
116 &self,
117 collection: &str,
118 vector: Vec<f32>,
119 limit: u64,
120 filter: Option<Filter>,
121 ) -> QdrantResult<Vec<ScoredPoint>> {
122 let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
123 if let Some(f) = filter {
124 builder = builder.filter(f);
125 }
126 let results = self.client.search_points(builder).await.map_err(Box::new)?;
127 Ok(results.result)
128 }
129
130 pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
136 if ids.is_empty() {
137 return Ok(());
138 }
139 self.client
140 .delete_points(
141 DeletePointsBuilder::new(collection)
142 .points(PointsIdsList { ids })
143 .wait(true),
144 )
145 .await
146 .map_err(Box::new)?;
147 Ok(())
148 }
149
150 pub async fn scroll_all(
158 &self,
159 collection: &str,
160 key_field: &str,
161 ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
162 let mut result = HashMap::new();
163 let mut offset: Option<PointId> = None;
164
165 loop {
166 let mut builder = ScrollPointsBuilder::new(collection)
167 .with_payload(true)
168 .with_vectors(false)
169 .limit(100);
170
171 if let Some(ref off) = offset {
172 builder = builder.offset(off.clone());
173 }
174
175 let response = self.client.scroll(builder).await.map_err(Box::new)?;
176
177 for point in &response.result {
178 let Some(key_val) = point.payload.get(key_field) else {
179 continue;
180 };
181 let Some(Kind::StringValue(key)) = &key_val.kind else {
182 continue;
183 };
184
185 let mut fields = HashMap::new();
186 for (k, val) in &point.payload {
187 if let Some(Kind::StringValue(s)) = &val.kind {
188 fields.insert(k.clone(), s.clone());
189 }
190 }
191 result.insert(key.clone(), fields);
192 }
193
194 match response.next_page_offset {
195 Some(next) => offset = Some(next),
196 None => break,
197 }
198 }
199
200 Ok(result)
201 }
202
203 pub async fn ensure_collection_with_quantization(
212 &self,
213 collection: &str,
214 vector_size: u64,
215 keyword_fields: &[&str],
216 ) -> Result<(), crate::VectorStoreError> {
217 use qdrant_client::qdrant::{
218 CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
219 };
220 if self
221 .client
222 .collection_exists(collection)
223 .await
224 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
225 {
226 return Ok(());
227 }
228 self.client
229 .create_collection(
230 CreateCollectionBuilder::new(collection)
231 .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
232 .quantization_config(ScalarQuantizationBuilder::default()),
233 )
234 .await
235 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
236
237 for field in keyword_fields {
238 self.client
239 .create_field_index(CreateFieldIndexCollectionBuilder::new(
240 collection,
241 *field,
242 FieldType::Keyword,
243 ))
244 .await
245 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
246 }
247 Ok(())
248 }
249
250 pub fn json_to_payload(
256 value: serde_json::Value,
257 ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
258 serde_json::from_value(value)
259 }
260}
261
262impl crate::vector_store::VectorStore for QdrantOps {
263 fn ensure_collection(
264 &self,
265 collection: &str,
266 vector_size: u64,
267 ) -> std::pin::Pin<
268 Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
269 > {
270 let collection = collection.to_owned();
271 Box::pin(async move {
272 self.ensure_collection(&collection, vector_size)
273 .await
274 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
275 })
276 }
277
278 fn collection_exists(
279 &self,
280 collection: &str,
281 ) -> std::pin::Pin<
282 Box<dyn std::future::Future<Output = Result<bool, crate::VectorStoreError>> + Send + '_>,
283 > {
284 let collection = collection.to_owned();
285 Box::pin(async move {
286 self.collection_exists(&collection)
287 .await
288 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
289 })
290 }
291
292 fn delete_collection(
293 &self,
294 collection: &str,
295 ) -> std::pin::Pin<
296 Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
297 > {
298 let collection = collection.to_owned();
299 Box::pin(async move {
300 self.delete_collection(&collection)
301 .await
302 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
303 })
304 }
305
306 fn upsert(
307 &self,
308 collection: &str,
309 points: Vec<crate::VectorPoint>,
310 ) -> std::pin::Pin<
311 Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
312 > {
313 let collection = collection.to_owned();
314 Box::pin(async move {
315 let qdrant_points: Vec<PointStruct> = points
316 .into_iter()
317 .map(|p| {
318 let payload: HashMap<String, qdrant_client::qdrant::Value> =
319 serde_json::from_value(serde_json::Value::Object(
320 p.payload.into_iter().collect(),
321 ))
322 .unwrap_or_default();
323 PointStruct::new(p.id, p.vector, payload)
324 })
325 .collect();
326 self.upsert(&collection, qdrant_points)
327 .await
328 .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
329 })
330 }
331
332 fn search(
333 &self,
334 collection: &str,
335 vector: Vec<f32>,
336 limit: u64,
337 filter: Option<crate::VectorFilter>,
338 ) -> std::pin::Pin<
339 Box<
340 dyn std::future::Future<
341 Output = Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>,
342 > + Send
343 + '_,
344 >,
345 > {
346 let collection = collection.to_owned();
347 Box::pin(async move {
348 let qdrant_filter = filter.map(vector_filter_to_qdrant);
349 let results = self
350 .search(&collection, vector, limit, qdrant_filter)
351 .await
352 .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
353 Ok(results.into_iter().map(scored_point_to_vector).collect())
354 })
355 }
356
357 fn delete_by_ids(
358 &self,
359 collection: &str,
360 ids: Vec<String>,
361 ) -> std::pin::Pin<
362 Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
363 > {
364 let collection = collection.to_owned();
365 Box::pin(async move {
366 let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
367 self.delete_by_ids(&collection, point_ids)
368 .await
369 .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
370 })
371 }
372
373 fn scroll_all(
374 &self,
375 collection: &str,
376 key_field: &str,
377 ) -> std::pin::Pin<
378 Box<
379 dyn std::future::Future<
380 Output = Result<
381 HashMap<String, HashMap<String, String>>,
382 crate::VectorStoreError,
383 >,
384 > + Send
385 + '_,
386 >,
387 > {
388 let collection = collection.to_owned();
389 let key_field = key_field.to_owned();
390 Box::pin(async move {
391 self.scroll_all(&collection, &key_field)
392 .await
393 .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
394 })
395 }
396
397 fn health_check(
398 &self,
399 ) -> std::pin::Pin<
400 Box<dyn std::future::Future<Output = Result<bool, crate::VectorStoreError>> + Send + '_>,
401 > {
402 Box::pin(async move {
403 self.client
404 .health_check()
405 .await
406 .map(|_| true)
407 .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
408 })
409 }
410}
411
412fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
413 let must: Vec<_> = filter
414 .must
415 .into_iter()
416 .map(field_condition_to_qdrant)
417 .collect();
418 let must_not: Vec<_> = filter
419 .must_not
420 .into_iter()
421 .map(field_condition_to_qdrant)
422 .collect();
423
424 let mut f = Filter::default();
425 if !must.is_empty() {
426 f.must = must;
427 }
428 if !must_not.is_empty() {
429 f.must_not = must_not;
430 }
431 f
432}
433
434fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
435 match cond.value {
436 crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
437 crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
438 }
439}
440
441fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
442 let payload: HashMap<String, serde_json::Value> = point
443 .payload
444 .into_iter()
445 .filter_map(|(k, v)| {
446 let json_val = match v.kind? {
447 Kind::StringValue(s) => serde_json::Value::String(s),
448 Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
449 Kind::DoubleValue(d) => {
450 serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
451 }
452 Kind::BoolValue(b) => serde_json::Value::Bool(b),
453 _ => return None,
454 };
455 Some((k, json_val))
456 })
457 .collect();
458
459 let id = match point.id.and_then(|pid| pid.point_id_options) {
460 Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
461 Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
462 None => String::new(),
463 };
464
465 crate::ScoredVectorPoint {
466 id,
467 score: point.score,
468 payload,
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use super::*;
475
476 #[test]
477 fn new_valid_url() {
478 let ops = QdrantOps::new("http://localhost:6334");
479 assert!(ops.is_ok());
480 }
481
482 #[test]
483 fn new_invalid_url() {
484 let ops = QdrantOps::new("not a valid url");
485 assert!(ops.is_err());
486 }
487
488 #[test]
489 fn debug_format() {
490 let ops = QdrantOps::new("http://localhost:6334").unwrap();
491 let dbg = format!("{ops:?}");
492 assert!(dbg.contains("QdrantOps"));
493 }
494
495 #[test]
496 fn json_to_payload_valid() {
497 let value = serde_json::json!({"key": "value", "num": 42});
498 let result = QdrantOps::json_to_payload(value);
499 assert!(result.is_ok());
500 }
501
502 #[test]
503 fn json_to_payload_empty() {
504 let result = QdrantOps::json_to_payload(serde_json::json!({}));
505 assert!(result.is_ok());
506 assert!(result.unwrap().is_empty());
507 }
508
509 #[test]
510 fn delete_by_ids_empty_is_ok_sync() {
511 let ops = QdrantOps::new("http://localhost:6334");
515 assert!(ops.is_ok());
516 }
517
518 #[tokio::test]
520 #[ignore]
521 async fn ensure_collection_with_quantization_idempotent() {
522 let ops = QdrantOps::new("http://localhost:6334").unwrap();
523 let collection = "test_quant_idempotent";
524
525 let _ = ops.delete_collection(collection).await;
527
528 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
530 .await
531 .unwrap();
532
533 assert!(ops.collection_exists(collection).await.unwrap());
534
535 ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
537 .await
538 .unwrap();
539
540 ops.delete_collection(collection).await.unwrap();
542 }
543
544 #[tokio::test]
546 #[ignore]
547 async fn delete_by_ids_empty_no_network_call() {
548 let ops = QdrantOps::new("http://localhost:6334").unwrap();
549 let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
551 assert!(result.is_ok());
552 }
553}