Skip to main content

zeph_memory/
qdrant_ops.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Low-level Qdrant operations shared across crates.
5
6use std::collections::HashMap;
7
8use qdrant_client::Qdrant;
9use qdrant_client::qdrant::{
10    CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
11    PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
12    VectorParamsBuilder, value::Kind,
13};
14
15type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
16
17/// Thin wrapper over [`Qdrant`] client encapsulating common collection operations.
18#[derive(Clone)]
19pub struct QdrantOps {
20    client: Qdrant,
21}
22
23impl std::fmt::Debug for QdrantOps {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        f.debug_struct("QdrantOps").finish_non_exhaustive()
26    }
27}
28
29impl QdrantOps {
30    /// Create a new `QdrantOps` connected to the given URL.
31    ///
32    /// # Errors
33    ///
34    /// Returns an error if the Qdrant client cannot be created.
35    pub fn new(url: &str) -> QdrantResult<Self> {
36        let client = Qdrant::from_url(url).build().map_err(Box::new)?;
37        Ok(Self { client })
38    }
39
40    /// Access the underlying Qdrant client for advanced operations.
41    #[must_use]
42    pub fn client(&self) -> &Qdrant {
43        &self.client
44    }
45
46    /// Ensure a collection exists with cosine distance vectors.
47    ///
48    /// If the collection already exists but has a different vector dimension than `vector_size`,
49    /// the collection is deleted and recreated. All existing data in the collection is lost.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if Qdrant cannot be reached or collection creation fails.
54    pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
55        if self
56            .client
57            .collection_exists(collection)
58            .await
59            .map_err(Box::new)?
60        {
61            let existing_size = self.get_collection_vector_size(collection).await?;
62            if existing_size == Some(vector_size) {
63                return Ok(());
64            }
65            tracing::warn!(
66                collection,
67                existing = ?existing_size,
68                required = vector_size,
69                "vector dimension mismatch — recreating collection (existing data will be lost)"
70            );
71            self.client
72                .delete_collection(collection)
73                .await
74                .map_err(Box::new)?;
75        }
76        self.client
77            .create_collection(
78                CreateCollectionBuilder::new(collection)
79                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
80            )
81            .await
82            .map_err(Box::new)?;
83        Ok(())
84    }
85
86    /// Returns the configured vector size of an existing collection, or `None` if it cannot be
87    /// determined (e.g. named-vector collections, or `collection_info` fails gracefully).
88    ///
89    /// # Errors
90    ///
91    /// Returns an error only on hard Qdrant communication failures.
92    async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
93        let info = self
94            .client
95            .collection_info(collection)
96            .await
97            .map_err(Box::new)?;
98        let size = info
99            .result
100            .and_then(|r| r.config)
101            .and_then(|cfg| cfg.params)
102            .and_then(|params| params.vectors_config)
103            .and_then(|vc| vc.config)
104            .and_then(|cfg| match cfg {
105                qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
106                // Named-vector collections are not supported here; treat as unknown.
107                qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
108            });
109        Ok(size)
110    }
111
112    /// Check whether a collection exists.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if Qdrant cannot be reached.
117    pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
118        self.client
119            .collection_exists(collection)
120            .await
121            .map_err(Box::new)
122    }
123
124    /// Delete a collection.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if the collection cannot be deleted.
129    pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
130        self.client
131            .delete_collection(collection)
132            .await
133            .map_err(Box::new)?;
134        Ok(())
135    }
136
137    /// Upsert points into a collection.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if the upsert fails.
142    pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
143        self.client
144            .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
145            .await
146            .map_err(Box::new)?;
147        Ok(())
148    }
149
150    /// Search for similar vectors, returning scored points with payloads.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the search fails.
155    pub async fn search(
156        &self,
157        collection: &str,
158        vector: Vec<f32>,
159        limit: u64,
160        filter: Option<Filter>,
161    ) -> QdrantResult<Vec<ScoredPoint>> {
162        let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
163        if let Some(f) = filter {
164            builder = builder.filter(f);
165        }
166        let results = self.client.search_points(builder).await.map_err(Box::new)?;
167        Ok(results.result)
168    }
169
170    /// Delete points by their IDs.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if the deletion fails.
175    pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
176        if ids.is_empty() {
177            return Ok(());
178        }
179        self.client
180            .delete_points(
181                DeletePointsBuilder::new(collection)
182                    .points(PointsIdsList { ids })
183                    .wait(true),
184            )
185            .await
186            .map_err(Box::new)?;
187        Ok(())
188    }
189
190    /// Scroll all points in a collection, extracting string payload fields.
191    ///
192    /// Returns a map of `key_field` value -> { `field_name` -> `field_value` }.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if the scroll operation fails.
197    pub async fn scroll_all(
198        &self,
199        collection: &str,
200        key_field: &str,
201    ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
202        let mut result = HashMap::new();
203        let mut offset: Option<PointId> = None;
204
205        loop {
206            let mut builder = ScrollPointsBuilder::new(collection)
207                .with_payload(true)
208                .with_vectors(false)
209                .limit(100);
210
211            if let Some(ref off) = offset {
212                builder = builder.offset(off.clone());
213            }
214
215            let response = self.client.scroll(builder).await.map_err(Box::new)?;
216
217            for point in &response.result {
218                let Some(key_val) = point.payload.get(key_field) else {
219                    continue;
220                };
221                let Some(Kind::StringValue(key)) = &key_val.kind else {
222                    continue;
223                };
224
225                let mut fields = HashMap::new();
226                for (k, val) in &point.payload {
227                    if let Some(Kind::StringValue(s)) = &val.kind {
228                        fields.insert(k.clone(), s.clone());
229                    }
230                }
231                result.insert(key.clone(), fields);
232            }
233
234            match response.next_page_offset {
235                Some(next) => offset = Some(next),
236                None => break,
237            }
238        }
239
240        Ok(result)
241    }
242
243    /// Create a collection with scalar INT8 quantization if it does not exist,
244    /// then create keyword indexes for the given fields.
245    ///
246    /// If the collection already exists but has a different vector dimension than `vector_size`,
247    /// the collection is deleted and recreated. All existing data in the collection is lost.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if any Qdrant operation fails.
252    pub async fn ensure_collection_with_quantization(
253        &self,
254        collection: &str,
255        vector_size: u64,
256        keyword_fields: &[&str],
257    ) -> Result<(), crate::VectorStoreError> {
258        use qdrant_client::qdrant::{
259            CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
260        };
261        if self
262            .client
263            .collection_exists(collection)
264            .await
265            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
266        {
267            let existing_size = self
268                .get_collection_vector_size(collection)
269                .await
270                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
271            if existing_size == Some(vector_size) {
272                return Ok(());
273            }
274            tracing::warn!(
275                collection,
276                existing = ?existing_size,
277                required = vector_size,
278                "vector dimension mismatch — recreating collection (existing data will be lost)"
279            );
280            self.client
281                .delete_collection(collection)
282                .await
283                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
284        }
285        self.client
286            .create_collection(
287                CreateCollectionBuilder::new(collection)
288                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
289                    .quantization_config(ScalarQuantizationBuilder::default()),
290            )
291            .await
292            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
293
294        for field in keyword_fields {
295            self.client
296                .create_field_index(CreateFieldIndexCollectionBuilder::new(
297                    collection,
298                    *field,
299                    FieldType::Keyword,
300                ))
301                .await
302                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
303        }
304        Ok(())
305    }
306
307    /// Convert a JSON value to a Qdrant payload map.
308    ///
309    /// # Errors
310    ///
311    /// Returns a JSON error if deserialization fails.
312    pub fn json_to_payload(
313        value: serde_json::Value,
314    ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
315        serde_json::from_value(value)
316    }
317}
318
319impl crate::vector_store::VectorStore for QdrantOps {
320    fn ensure_collection(
321        &self,
322        collection: &str,
323        vector_size: u64,
324    ) -> std::pin::Pin<
325        Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
326    > {
327        let collection = collection.to_owned();
328        Box::pin(async move {
329            self.ensure_collection(&collection, vector_size)
330                .await
331                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
332        })
333    }
334
335    fn collection_exists(
336        &self,
337        collection: &str,
338    ) -> std::pin::Pin<
339        Box<dyn std::future::Future<Output = Result<bool, crate::VectorStoreError>> + Send + '_>,
340    > {
341        let collection = collection.to_owned();
342        Box::pin(async move {
343            self.collection_exists(&collection)
344                .await
345                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
346        })
347    }
348
349    fn delete_collection(
350        &self,
351        collection: &str,
352    ) -> std::pin::Pin<
353        Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
354    > {
355        let collection = collection.to_owned();
356        Box::pin(async move {
357            self.delete_collection(&collection)
358                .await
359                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
360        })
361    }
362
363    fn upsert(
364        &self,
365        collection: &str,
366        points: Vec<crate::VectorPoint>,
367    ) -> std::pin::Pin<
368        Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
369    > {
370        let collection = collection.to_owned();
371        Box::pin(async move {
372            let qdrant_points: Vec<PointStruct> = points
373                .into_iter()
374                .map(|p| {
375                    let payload: HashMap<String, qdrant_client::qdrant::Value> =
376                        serde_json::from_value(serde_json::Value::Object(
377                            p.payload.into_iter().collect(),
378                        ))
379                        .unwrap_or_default();
380                    PointStruct::new(p.id, p.vector, payload)
381                })
382                .collect();
383            self.upsert(&collection, qdrant_points)
384                .await
385                .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
386        })
387    }
388
389    fn search(
390        &self,
391        collection: &str,
392        vector: Vec<f32>,
393        limit: u64,
394        filter: Option<crate::VectorFilter>,
395    ) -> std::pin::Pin<
396        Box<
397            dyn std::future::Future<
398                    Output = Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>,
399                > + Send
400                + '_,
401        >,
402    > {
403        let collection = collection.to_owned();
404        Box::pin(async move {
405            let qdrant_filter = filter.map(vector_filter_to_qdrant);
406            let results = self
407                .search(&collection, vector, limit, qdrant_filter)
408                .await
409                .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
410            Ok(results.into_iter().map(scored_point_to_vector).collect())
411        })
412    }
413
414    fn delete_by_ids(
415        &self,
416        collection: &str,
417        ids: Vec<String>,
418    ) -> std::pin::Pin<
419        Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
420    > {
421        let collection = collection.to_owned();
422        Box::pin(async move {
423            let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
424            self.delete_by_ids(&collection, point_ids)
425                .await
426                .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
427        })
428    }
429
430    fn scroll_all(
431        &self,
432        collection: &str,
433        key_field: &str,
434    ) -> std::pin::Pin<
435        Box<
436            dyn std::future::Future<
437                    Output = Result<
438                        HashMap<String, HashMap<String, String>>,
439                        crate::VectorStoreError,
440                    >,
441                > + Send
442                + '_,
443        >,
444    > {
445        let collection = collection.to_owned();
446        let key_field = key_field.to_owned();
447        Box::pin(async move {
448            self.scroll_all(&collection, &key_field)
449                .await
450                .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
451        })
452    }
453
454    fn health_check(
455        &self,
456    ) -> std::pin::Pin<
457        Box<dyn std::future::Future<Output = Result<bool, crate::VectorStoreError>> + Send + '_>,
458    > {
459        Box::pin(async move {
460            self.client
461                .health_check()
462                .await
463                .map(|_| true)
464                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
465        })
466    }
467}
468
469fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
470    let must: Vec<_> = filter
471        .must
472        .into_iter()
473        .map(field_condition_to_qdrant)
474        .collect();
475    let must_not: Vec<_> = filter
476        .must_not
477        .into_iter()
478        .map(field_condition_to_qdrant)
479        .collect();
480
481    let mut f = Filter::default();
482    if !must.is_empty() {
483        f.must = must;
484    }
485    if !must_not.is_empty() {
486        f.must_not = must_not;
487    }
488    f
489}
490
491fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
492    match cond.value {
493        crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
494        crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
495    }
496}
497
498fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
499    let payload: HashMap<String, serde_json::Value> = point
500        .payload
501        .into_iter()
502        .filter_map(|(k, v)| {
503            let json_val = match v.kind? {
504                Kind::StringValue(s) => serde_json::Value::String(s),
505                Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
506                Kind::DoubleValue(d) => {
507                    serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
508                }
509                Kind::BoolValue(b) => serde_json::Value::Bool(b),
510                _ => return None,
511            };
512            Some((k, json_val))
513        })
514        .collect();
515
516    let id = match point.id.and_then(|pid| pid.point_id_options) {
517        Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
518        Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
519        None => String::new(),
520    };
521
522    crate::ScoredVectorPoint {
523        id,
524        score: point.score,
525        payload,
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532
533    #[test]
534    fn new_valid_url() {
535        let ops = QdrantOps::new("http://localhost:6334");
536        assert!(ops.is_ok());
537    }
538
539    #[test]
540    fn new_invalid_url() {
541        let ops = QdrantOps::new("not a valid url");
542        assert!(ops.is_err());
543    }
544
545    #[test]
546    fn debug_format() {
547        let ops = QdrantOps::new("http://localhost:6334").unwrap();
548        let dbg = format!("{ops:?}");
549        assert!(dbg.contains("QdrantOps"));
550    }
551
552    #[test]
553    fn json_to_payload_valid() {
554        let value = serde_json::json!({"key": "value", "num": 42});
555        let result = QdrantOps::json_to_payload(value);
556        assert!(result.is_ok());
557    }
558
559    #[test]
560    fn json_to_payload_empty() {
561        let result = QdrantOps::json_to_payload(serde_json::json!({}));
562        assert!(result.is_ok());
563        assert!(result.unwrap().is_empty());
564    }
565
566    #[test]
567    fn delete_by_ids_empty_is_ok_sync() {
568        // Constructing QdrantOps with a valid URL succeeds even without a live server.
569        // delete_by_ids with empty list short-circuits before any network call.
570        // We validate the early-return logic via the async test below.
571        let ops = QdrantOps::new("http://localhost:6334");
572        assert!(ops.is_ok());
573    }
574
575    /// Requires a live Qdrant instance at localhost:6334.
576    #[tokio::test]
577    #[ignore = "requires a live Qdrant instance at localhost:6334"]
578    async fn ensure_collection_with_quantization_idempotent() {
579        let ops = QdrantOps::new("http://localhost:6334").unwrap();
580        let collection = "test_quant_idempotent";
581
582        // Clean up from any prior run
583        let _ = ops.delete_collection(collection).await;
584
585        // First call — creates collection
586        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
587            .await
588            .unwrap();
589
590        assert!(ops.collection_exists(collection).await.unwrap());
591
592        // Second call — idempotent, must not error
593        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
594            .await
595            .unwrap();
596
597        // Cleanup
598        ops.delete_collection(collection).await.unwrap();
599    }
600
601    /// Requires a live Qdrant instance at localhost:6334.
602    #[tokio::test]
603    #[ignore = "requires a live Qdrant instance at localhost:6334"]
604    async fn delete_by_ids_empty_no_network_call() {
605        let ops = QdrantOps::new("http://localhost:6334").unwrap();
606        // Empty ID list must short-circuit and return Ok without hitting Qdrant.
607        let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
608        assert!(result.is_ok());
609    }
610
611    /// Requires a live Qdrant instance at localhost:6334.
612    #[tokio::test]
613    #[ignore = "requires a live Qdrant instance at localhost:6334"]
614    async fn ensure_collection_idempotent_same_size() {
615        let ops = QdrantOps::new("http://localhost:6334").unwrap();
616        let collection = "test_ensure_idempotent";
617
618        let _ = ops.delete_collection(collection).await;
619
620        ops.ensure_collection(collection, 128).await.unwrap();
621        assert!(ops.collection_exists(collection).await.unwrap());
622
623        // Second call with same size must be a no-op.
624        ops.ensure_collection(collection, 128).await.unwrap();
625        assert!(ops.collection_exists(collection).await.unwrap());
626
627        ops.delete_collection(collection).await.unwrap();
628    }
629
630    /// Requires a live Qdrant instance at localhost:6334.
631    ///
632    /// Verifies that `ensure_collection` detects a vector dimension mismatch and
633    /// recreates the collection instead of silently reusing the wrong-dimension one.
634    #[tokio::test]
635    #[ignore = "requires a live Qdrant instance at localhost:6334"]
636    async fn ensure_collection_recreates_on_dimension_mismatch() {
637        let ops = QdrantOps::new("http://localhost:6334").unwrap();
638        let collection = "test_dim_mismatch";
639
640        let _ = ops.delete_collection(collection).await;
641
642        // Create with 128 dims.
643        ops.ensure_collection(collection, 128).await.unwrap();
644        assert_eq!(
645            ops.get_collection_vector_size(collection).await.unwrap(),
646            Some(128)
647        );
648
649        // Call again with a different size — must recreate.
650        ops.ensure_collection(collection, 256).await.unwrap();
651        assert_eq!(
652            ops.get_collection_vector_size(collection).await.unwrap(),
653            Some(256),
654            "collection must have been recreated with the new dimension"
655        );
656
657        ops.delete_collection(collection).await.unwrap();
658    }
659
660    /// Requires a live Qdrant instance at localhost:6334.
661    ///
662    /// Verifies that `ensure_collection_with_quantization` also detects dimension mismatch.
663    #[tokio::test]
664    #[ignore = "requires a live Qdrant instance at localhost:6334"]
665    async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
666        let ops = QdrantOps::new("http://localhost:6334").unwrap();
667        let collection = "test_quant_dim_mismatch";
668
669        let _ = ops.delete_collection(collection).await;
670
671        ops.ensure_collection_with_quantization(collection, 128, &["language"])
672            .await
673            .unwrap();
674        assert_eq!(
675            ops.get_collection_vector_size(collection).await.unwrap(),
676            Some(128)
677        );
678
679        // Call again with a different size — must recreate.
680        ops.ensure_collection_with_quantization(collection, 384, &["language"])
681            .await
682            .unwrap();
683        assert_eq!(
684            ops.get_collection_vector_size(collection).await.unwrap(),
685            Some(384),
686            "collection must have been recreated with the new dimension"
687        );
688
689        ops.delete_collection(collection).await.unwrap();
690    }
691}