Skip to main content

zeph_memory/
qdrant_ops.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Low-level Qdrant operations shared across crates.
5
6use std::collections::HashMap;
7
8use crate::vector_store::BoxFuture;
9use qdrant_client::Qdrant;
10use qdrant_client::qdrant::{
11    CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
12    PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
13    VectorParamsBuilder, value::Kind,
14};
15
16type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
17
18/// Thin wrapper over [`Qdrant`] client encapsulating common collection operations.
19#[derive(Clone)]
20pub struct QdrantOps {
21    client: Qdrant,
22}
23
24impl std::fmt::Debug for QdrantOps {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("QdrantOps").finish_non_exhaustive()
27    }
28}
29
30impl QdrantOps {
31    /// Create a new `QdrantOps` connected to the given URL.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if the Qdrant client cannot be created.
36    // TODO(#2389): add optional `api_key: Option<String>` parameter and wire it via
37    // `.with_api_key()` on the builder once the Qdrant config struct exposes the field.
38    pub fn new(url: &str) -> QdrantResult<Self> {
39        let client = Qdrant::from_url(url).build().map_err(Box::new)?;
40        Ok(Self { client })
41    }
42
43    /// Access the underlying Qdrant client for advanced operations.
44    #[must_use]
45    pub fn client(&self) -> &Qdrant {
46        &self.client
47    }
48
49    /// Ensure a collection exists with cosine distance vectors.
50    ///
51    /// If the collection already exists but has a different vector dimension than `vector_size`,
52    /// the collection is deleted and recreated. All existing data in the collection is lost.
53    ///
54    /// # Errors
55    ///
56    /// Returns an error if Qdrant cannot be reached or collection creation fails.
57    pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
58        if self
59            .client
60            .collection_exists(collection)
61            .await
62            .map_err(Box::new)?
63        {
64            let existing_size = self.get_collection_vector_size(collection).await?;
65            if existing_size == Some(vector_size) {
66                return Ok(());
67            }
68            tracing::warn!(
69                collection,
70                existing = ?existing_size,
71                required = vector_size,
72                "vector dimension mismatch — recreating collection (existing data will be lost)"
73            );
74            self.client
75                .delete_collection(collection)
76                .await
77                .map_err(Box::new)?;
78        }
79        self.client
80            .create_collection(
81                CreateCollectionBuilder::new(collection)
82                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
83            )
84            .await
85            .map_err(Box::new)?;
86        Ok(())
87    }
88
89    /// Returns the configured vector size of an existing collection, or `None` if it cannot be
90    /// determined (e.g. named-vector collections, or `collection_info` fails gracefully).
91    ///
92    /// # Errors
93    ///
94    /// Returns an error only on hard Qdrant communication failures.
95    async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
96        let info = self
97            .client
98            .collection_info(collection)
99            .await
100            .map_err(Box::new)?;
101        let size = info
102            .result
103            .and_then(|r| r.config)
104            .and_then(|cfg| cfg.params)
105            .and_then(|params| params.vectors_config)
106            .and_then(|vc| vc.config)
107            .and_then(|cfg| match cfg {
108                qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
109                // Named-vector collections are not supported here; treat as unknown.
110                qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
111            });
112        Ok(size)
113    }
114
115    /// Check whether a collection exists.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if Qdrant cannot be reached.
120    pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
121        self.client
122            .collection_exists(collection)
123            .await
124            .map_err(Box::new)
125    }
126
127    /// Delete a collection.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the collection cannot be deleted.
132    pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
133        self.client
134            .delete_collection(collection)
135            .await
136            .map_err(Box::new)?;
137        Ok(())
138    }
139
140    /// Upsert points into a collection.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the upsert fails.
145    pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
146        // wait(false): fire-and-forget on the hot path — saves 3-15ms per embedding store call.
147        // Qdrant guarantees eventual consistency; read-your-writes is not required here.
148        self.client
149            .upsert_points(UpsertPointsBuilder::new(collection, points).wait(false))
150            .await
151            .map_err(Box::new)?;
152        Ok(())
153    }
154
155    /// Search for similar vectors, returning scored points with payloads.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the search fails.
160    pub async fn search(
161        &self,
162        collection: &str,
163        vector: Vec<f32>,
164        limit: u64,
165        filter: Option<Filter>,
166    ) -> QdrantResult<Vec<ScoredPoint>> {
167        let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
168        if let Some(f) = filter {
169            builder = builder.filter(f);
170        }
171        let results = self.client.search_points(builder).await.map_err(Box::new)?;
172        Ok(results.result)
173    }
174
175    /// Delete points by their IDs.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the deletion fails.
180    pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
181        if ids.is_empty() {
182            return Ok(());
183        }
184        self.client
185            .delete_points(
186                DeletePointsBuilder::new(collection)
187                    .points(PointsIdsList { ids })
188                    .wait(true),
189            )
190            .await
191            .map_err(Box::new)?;
192        Ok(())
193    }
194
195    /// Scroll all points in a collection, extracting string payload fields.
196    ///
197    /// Returns a map of `key_field` value -> { `field_name` -> `field_value` }.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if the scroll operation fails.
202    pub async fn scroll_all(
203        &self,
204        collection: &str,
205        key_field: &str,
206    ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
207        let mut result = HashMap::new();
208        let mut offset: Option<PointId> = None;
209
210        loop {
211            let mut builder = ScrollPointsBuilder::new(collection)
212                .with_payload(true)
213                .with_vectors(false)
214                .limit(100);
215
216            if let Some(ref off) = offset {
217                builder = builder.offset(off.clone());
218            }
219
220            let response = self.client.scroll(builder).await.map_err(Box::new)?;
221
222            for point in &response.result {
223                let Some(key_val) = point.payload.get(key_field) else {
224                    continue;
225                };
226                let Some(Kind::StringValue(key)) = &key_val.kind else {
227                    continue;
228                };
229
230                let mut fields = HashMap::new();
231                for (k, val) in &point.payload {
232                    if let Some(Kind::StringValue(s)) = &val.kind {
233                        fields.insert(k.clone(), s.clone());
234                    }
235                }
236                result.insert(key.clone(), fields);
237            }
238
239            match response.next_page_offset {
240                Some(next) => offset = Some(next),
241                None => break,
242            }
243        }
244
245        Ok(result)
246    }
247
248    /// Create a collection with scalar INT8 quantization if it does not exist,
249    /// then create keyword indexes for the given fields.
250    ///
251    /// If the collection already exists but has a different vector dimension than `vector_size`,
252    /// the collection is deleted and recreated. All existing data in the collection is lost.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if any Qdrant operation fails.
257    pub async fn ensure_collection_with_quantization(
258        &self,
259        collection: &str,
260        vector_size: u64,
261        keyword_fields: &[&str],
262    ) -> Result<(), crate::VectorStoreError> {
263        use qdrant_client::qdrant::{
264            CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
265        };
266        if self
267            .client
268            .collection_exists(collection)
269            .await
270            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
271        {
272            let existing_size = self
273                .get_collection_vector_size(collection)
274                .await
275                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
276            if existing_size == Some(vector_size) {
277                return Ok(());
278            }
279            tracing::warn!(
280                collection,
281                existing = ?existing_size,
282                required = vector_size,
283                "vector dimension mismatch — recreating collection (existing data will be lost)"
284            );
285            self.client
286                .delete_collection(collection)
287                .await
288                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
289        }
290        self.client
291            .create_collection(
292                CreateCollectionBuilder::new(collection)
293                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
294                    .quantization_config(ScalarQuantizationBuilder::default()),
295            )
296            .await
297            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
298
299        for field in keyword_fields {
300            self.client
301                .create_field_index(CreateFieldIndexCollectionBuilder::new(
302                    collection,
303                    *field,
304                    FieldType::Keyword,
305                ))
306                .await
307                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
308        }
309        Ok(())
310    }
311
312    /// Convert a JSON value to a Qdrant payload map.
313    ///
314    /// # Errors
315    ///
316    /// Returns a JSON error if deserialization fails.
317    pub fn json_to_payload(
318        value: serde_json::Value,
319    ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
320        serde_json::from_value(value)
321    }
322}
323
324impl crate::vector_store::VectorStore for QdrantOps {
325    fn ensure_collection(
326        &self,
327        collection: &str,
328        vector_size: u64,
329    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
330        let collection = collection.to_owned();
331        Box::pin(async move {
332            self.ensure_collection(&collection, vector_size)
333                .await
334                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
335        })
336    }
337
338    fn collection_exists(
339        &self,
340        collection: &str,
341    ) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
342        let collection = collection.to_owned();
343        Box::pin(async move {
344            self.collection_exists(&collection)
345                .await
346                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
347        })
348    }
349
350    fn delete_collection(
351        &self,
352        collection: &str,
353    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
354        let collection = collection.to_owned();
355        Box::pin(async move {
356            self.delete_collection(&collection)
357                .await
358                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
359        })
360    }
361
362    fn upsert(
363        &self,
364        collection: &str,
365        points: Vec<crate::VectorPoint>,
366    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
367        let collection = collection.to_owned();
368        Box::pin(async move {
369            let qdrant_points: Vec<PointStruct> = points
370                .into_iter()
371                .map(|p| {
372                    let payload: HashMap<String, qdrant_client::qdrant::Value> =
373                        serde_json::from_value(serde_json::Value::Object(
374                            p.payload.into_iter().collect(),
375                        ))
376                        .unwrap_or_default();
377                    PointStruct::new(p.id, p.vector, payload)
378                })
379                .collect();
380            self.upsert(&collection, qdrant_points)
381                .await
382                .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
383        })
384    }
385
386    fn search(
387        &self,
388        collection: &str,
389        vector: Vec<f32>,
390        limit: u64,
391        filter: Option<crate::VectorFilter>,
392    ) -> BoxFuture<'_, Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>> {
393        let collection = collection.to_owned();
394        Box::pin(async move {
395            let qdrant_filter = filter.map(vector_filter_to_qdrant);
396            let results = self
397                .search(&collection, vector, limit, qdrant_filter)
398                .await
399                .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
400            Ok(results.into_iter().map(scored_point_to_vector).collect())
401        })
402    }
403
404    fn delete_by_ids(
405        &self,
406        collection: &str,
407        ids: Vec<String>,
408    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
409        let collection = collection.to_owned();
410        Box::pin(async move {
411            let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
412            self.delete_by_ids(&collection, point_ids)
413                .await
414                .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
415        })
416    }
417
418    fn scroll_all(
419        &self,
420        collection: &str,
421        key_field: &str,
422    ) -> BoxFuture<'_, Result<HashMap<String, HashMap<String, String>>, crate::VectorStoreError>>
423    {
424        let collection = collection.to_owned();
425        let key_field = key_field.to_owned();
426        Box::pin(async move {
427            self.scroll_all(&collection, &key_field)
428                .await
429                .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
430        })
431    }
432
433    fn health_check(&self) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
434        Box::pin(async move {
435            self.client
436                .health_check()
437                .await
438                .map(|_| true)
439                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
440        })
441    }
442}
443
444fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
445    let must: Vec<_> = filter
446        .must
447        .into_iter()
448        .map(field_condition_to_qdrant)
449        .collect();
450    let must_not: Vec<_> = filter
451        .must_not
452        .into_iter()
453        .map(field_condition_to_qdrant)
454        .collect();
455
456    let mut f = Filter::default();
457    if !must.is_empty() {
458        f.must = must;
459    }
460    if !must_not.is_empty() {
461        f.must_not = must_not;
462    }
463    f
464}
465
466fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
467    match cond.value {
468        crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
469        crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
470    }
471}
472
473fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
474    let payload: HashMap<String, serde_json::Value> = point
475        .payload
476        .into_iter()
477        .filter_map(|(k, v)| {
478            let json_val = match v.kind? {
479                Kind::StringValue(s) => serde_json::Value::String(s),
480                Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
481                Kind::DoubleValue(d) => {
482                    serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
483                }
484                Kind::BoolValue(b) => serde_json::Value::Bool(b),
485                _ => return None,
486            };
487            Some((k, json_val))
488        })
489        .collect();
490
491    let id = match point.id.and_then(|pid| pid.point_id_options) {
492        Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
493        Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
494        None => String::new(),
495    };
496
497    crate::ScoredVectorPoint {
498        id,
499        score: point.score,
500        payload,
501    }
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507
508    #[test]
509    fn new_valid_url() {
510        let ops = QdrantOps::new("http://localhost:6334");
511        assert!(ops.is_ok());
512    }
513
514    #[test]
515    fn new_invalid_url() {
516        let ops = QdrantOps::new("not a valid url");
517        assert!(ops.is_err());
518    }
519
520    #[test]
521    fn debug_format() {
522        let ops = QdrantOps::new("http://localhost:6334").unwrap();
523        let dbg = format!("{ops:?}");
524        assert!(dbg.contains("QdrantOps"));
525    }
526
527    #[test]
528    fn json_to_payload_valid() {
529        let value = serde_json::json!({"key": "value", "num": 42});
530        let result = QdrantOps::json_to_payload(value);
531        assert!(result.is_ok());
532    }
533
534    #[test]
535    fn json_to_payload_empty() {
536        let result = QdrantOps::json_to_payload(serde_json::json!({}));
537        assert!(result.is_ok());
538        assert!(result.unwrap().is_empty());
539    }
540
541    #[test]
542    fn delete_by_ids_empty_is_ok_sync() {
543        // Constructing QdrantOps with a valid URL succeeds even without a live server.
544        // delete_by_ids with empty list short-circuits before any network call.
545        // We validate the early-return logic via the async test below.
546        let ops = QdrantOps::new("http://localhost:6334");
547        assert!(ops.is_ok());
548    }
549
550    /// Requires a live Qdrant instance at localhost:6334.
551    #[tokio::test]
552    #[ignore = "requires a live Qdrant instance at localhost:6334"]
553    async fn ensure_collection_with_quantization_idempotent() {
554        let ops = QdrantOps::new("http://localhost:6334").unwrap();
555        let collection = "test_quant_idempotent";
556
557        // Clean up from any prior run
558        let _ = ops.delete_collection(collection).await;
559
560        // First call — creates collection
561        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
562            .await
563            .unwrap();
564
565        assert!(ops.collection_exists(collection).await.unwrap());
566
567        // Second call — idempotent, must not error
568        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
569            .await
570            .unwrap();
571
572        // Cleanup
573        ops.delete_collection(collection).await.unwrap();
574    }
575
576    /// Requires a live Qdrant instance at localhost:6334.
577    #[tokio::test]
578    #[ignore = "requires a live Qdrant instance at localhost:6334"]
579    async fn delete_by_ids_empty_no_network_call() {
580        let ops = QdrantOps::new("http://localhost:6334").unwrap();
581        // Empty ID list must short-circuit and return Ok without hitting Qdrant.
582        let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
583        assert!(result.is_ok());
584    }
585
586    /// Requires a live Qdrant instance at localhost:6334.
587    #[tokio::test]
588    #[ignore = "requires a live Qdrant instance at localhost:6334"]
589    async fn ensure_collection_idempotent_same_size() {
590        let ops = QdrantOps::new("http://localhost:6334").unwrap();
591        let collection = "test_ensure_idempotent";
592
593        let _ = ops.delete_collection(collection).await;
594
595        ops.ensure_collection(collection, 128).await.unwrap();
596        assert!(ops.collection_exists(collection).await.unwrap());
597
598        // Second call with same size must be a no-op.
599        ops.ensure_collection(collection, 128).await.unwrap();
600        assert!(ops.collection_exists(collection).await.unwrap());
601
602        ops.delete_collection(collection).await.unwrap();
603    }
604
605    /// Requires a live Qdrant instance at localhost:6334.
606    ///
607    /// Verifies that `ensure_collection` detects a vector dimension mismatch and
608    /// recreates the collection instead of silently reusing the wrong-dimension one.
609    #[tokio::test]
610    #[ignore = "requires a live Qdrant instance at localhost:6334"]
611    async fn ensure_collection_recreates_on_dimension_mismatch() {
612        let ops = QdrantOps::new("http://localhost:6334").unwrap();
613        let collection = "test_dim_mismatch";
614
615        let _ = ops.delete_collection(collection).await;
616
617        // Create with 128 dims.
618        ops.ensure_collection(collection, 128).await.unwrap();
619        assert_eq!(
620            ops.get_collection_vector_size(collection).await.unwrap(),
621            Some(128)
622        );
623
624        // Call again with a different size — must recreate.
625        ops.ensure_collection(collection, 256).await.unwrap();
626        assert_eq!(
627            ops.get_collection_vector_size(collection).await.unwrap(),
628            Some(256),
629            "collection must have been recreated with the new dimension"
630        );
631
632        ops.delete_collection(collection).await.unwrap();
633    }
634
635    /// Requires a live Qdrant instance at localhost:6334.
636    ///
637    /// Verifies that `ensure_collection_with_quantization` also detects dimension mismatch.
638    #[tokio::test]
639    #[ignore = "requires a live Qdrant instance at localhost:6334"]
640    async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
641        let ops = QdrantOps::new("http://localhost:6334").unwrap();
642        let collection = "test_quant_dim_mismatch";
643
644        let _ = ops.delete_collection(collection).await;
645
646        ops.ensure_collection_with_quantization(collection, 128, &["language"])
647            .await
648            .unwrap();
649        assert_eq!(
650            ops.get_collection_vector_size(collection).await.unwrap(),
651            Some(128)
652        );
653
654        // Call again with a different size — must recreate.
655        ops.ensure_collection_with_quantization(collection, 384, &["language"])
656            .await
657            .unwrap();
658        assert_eq!(
659            ops.get_collection_vector_size(collection).await.unwrap(),
660            Some(384),
661            "collection must have been recreated with the new dimension"
662        );
663
664        ops.delete_collection(collection).await.unwrap();
665    }
666}