Skip to main content

zeph_memory/
qdrant_ops.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Low-level Qdrant operations shared across crates.
5//!
6//! [`QdrantOps`] is the single point of contact with the `qdrant-client` crate.
7//! All higher-level stores ([`crate::embedding_store::EmbeddingStore`],
8//! [`crate::embedding_registry::EmbeddingRegistry`]) route through this type.
9
10use std::collections::HashMap;
11
12use crate::vector_store::BoxFuture;
13use qdrant_client::Qdrant;
14use qdrant_client::qdrant::{
15    CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
16    PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
17    VectorParamsBuilder, value::Kind,
18};
19
20type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
21
22/// Thin wrapper over [`Qdrant`] client encapsulating common collection operations.
23#[derive(Clone)]
24pub struct QdrantOps {
25    client: Qdrant,
26}
27
28impl std::fmt::Debug for QdrantOps {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        f.debug_struct("QdrantOps").finish_non_exhaustive()
31    }
32}
33
34impl QdrantOps {
35    /// Create a new `QdrantOps` connected to the given URL.
36    ///
37    /// # Errors
38    ///
39    /// Returns an error if the Qdrant client cannot be created.
40    // TODO(#2389): add optional `api_key: Option<String>` parameter and wire it via
41    // `.with_api_key()` on the builder once the Qdrant config struct exposes the field.
42    pub fn new(url: &str) -> QdrantResult<Self> {
43        let client = Qdrant::from_url(url).build().map_err(Box::new)?;
44        Ok(Self { client })
45    }
46
47    /// Access the underlying Qdrant client for advanced operations.
48    #[must_use]
49    pub fn client(&self) -> &Qdrant {
50        &self.client
51    }
52
53    /// Ensure a collection exists with cosine distance vectors.
54    ///
55    /// If the collection already exists but has a different vector dimension than `vector_size`,
56    /// the collection is deleted and recreated. All existing data in the collection is lost.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if Qdrant cannot be reached or collection creation fails.
61    pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
62        if self
63            .client
64            .collection_exists(collection)
65            .await
66            .map_err(Box::new)?
67        {
68            let existing_size = self.get_collection_vector_size(collection).await?;
69            if existing_size == Some(vector_size) {
70                return Ok(());
71            }
72            tracing::warn!(
73                collection,
74                existing = ?existing_size,
75                required = vector_size,
76                "vector dimension mismatch — recreating collection (existing data will be lost)"
77            );
78            self.client
79                .delete_collection(collection)
80                .await
81                .map_err(Box::new)?;
82        }
83        self.client
84            .create_collection(
85                CreateCollectionBuilder::new(collection)
86                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
87            )
88            .await
89            .map_err(Box::new)?;
90        Ok(())
91    }
92
93    /// Returns the configured vector size of an existing collection, or `None` if it cannot be
94    /// determined (e.g. named-vector collections, or `collection_info` fails gracefully).
95    ///
96    /// # Errors
97    ///
98    /// Returns an error only on hard Qdrant communication failures.
99    async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
100        let info = self
101            .client
102            .collection_info(collection)
103            .await
104            .map_err(Box::new)?;
105        let size = info
106            .result
107            .and_then(|r| r.config)
108            .and_then(|cfg| cfg.params)
109            .and_then(|params| params.vectors_config)
110            .and_then(|vc| vc.config)
111            .and_then(|cfg| match cfg {
112                qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
113                // Named-vector collections are not supported here; treat as unknown.
114                qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
115            });
116        Ok(size)
117    }
118
119    /// Check whether a collection exists.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if Qdrant cannot be reached.
124    pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
125        self.client
126            .collection_exists(collection)
127            .await
128            .map_err(Box::new)
129    }
130
131    /// Delete a collection.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the collection cannot be deleted.
136    pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
137        self.client
138            .delete_collection(collection)
139            .await
140            .map_err(Box::new)?;
141        Ok(())
142    }
143
144    /// Upsert points into a collection.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the upsert fails.
149    pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
150        self.client
151            .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
152            .await
153            .map_err(Box::new)?;
154        Ok(())
155    }
156
157    /// Search for similar vectors, returning scored points with payloads.
158    ///
159    /// # Errors
160    ///
161    /// Returns an error if the search fails.
162    pub async fn search(
163        &self,
164        collection: &str,
165        vector: Vec<f32>,
166        limit: u64,
167        filter: Option<Filter>,
168    ) -> QdrantResult<Vec<ScoredPoint>> {
169        let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
170        if let Some(f) = filter {
171            builder = builder.filter(f);
172        }
173        let results = self.client.search_points(builder).await.map_err(Box::new)?;
174        Ok(results.result)
175    }
176
177    /// Delete points by their IDs.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if the deletion fails.
182    pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
183        if ids.is_empty() {
184            return Ok(());
185        }
186        self.client
187            .delete_points(
188                DeletePointsBuilder::new(collection)
189                    .points(PointsIdsList { ids })
190                    .wait(true),
191            )
192            .await
193            .map_err(Box::new)?;
194        Ok(())
195    }
196
197    /// Scroll all points in a collection, extracting string payload fields.
198    ///
199    /// Returns a map of `key_field` value -> { `field_name` -> `field_value` }.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if the scroll operation fails.
204    pub async fn scroll_all(
205        &self,
206        collection: &str,
207        key_field: &str,
208    ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
209        let mut result = HashMap::new();
210        let mut offset: Option<PointId> = None;
211
212        loop {
213            let mut builder = ScrollPointsBuilder::new(collection)
214                .with_payload(true)
215                .with_vectors(false)
216                .limit(100);
217
218            if let Some(ref off) = offset {
219                builder = builder.offset(off.clone());
220            }
221
222            let response = self.client.scroll(builder).await.map_err(Box::new)?;
223
224            for point in &response.result {
225                let Some(key_val) = point.payload.get(key_field) else {
226                    continue;
227                };
228                let Some(Kind::StringValue(key)) = &key_val.kind else {
229                    continue;
230                };
231
232                let mut fields = HashMap::new();
233                for (k, val) in &point.payload {
234                    if let Some(Kind::StringValue(s)) = &val.kind {
235                        fields.insert(k.clone(), s.clone());
236                    }
237                }
238                result.insert(key.clone(), fields);
239            }
240
241            match response.next_page_offset {
242                Some(next) => offset = Some(next),
243                None => break,
244            }
245        }
246
247        Ok(result)
248    }
249
250    /// Create a collection with scalar INT8 quantization if it does not exist,
251    /// then create keyword indexes for the given fields.
252    ///
253    /// If the collection already exists but has a different vector dimension than `vector_size`,
254    /// the collection is deleted and recreated. All existing data in the collection is lost.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if any Qdrant operation fails.
259    pub async fn ensure_collection_with_quantization(
260        &self,
261        collection: &str,
262        vector_size: u64,
263        keyword_fields: &[&str],
264    ) -> Result<(), crate::VectorStoreError> {
265        use qdrant_client::qdrant::{
266            CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
267        };
268        if self
269            .client
270            .collection_exists(collection)
271            .await
272            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
273        {
274            let existing_size = self
275                .get_collection_vector_size(collection)
276                .await
277                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
278            if existing_size == Some(vector_size) {
279                return Ok(());
280            }
281            tracing::warn!(
282                collection,
283                existing = ?existing_size,
284                required = vector_size,
285                "vector dimension mismatch — recreating collection (existing data will be lost)"
286            );
287            self.client
288                .delete_collection(collection)
289                .await
290                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
291        }
292        self.client
293            .create_collection(
294                CreateCollectionBuilder::new(collection)
295                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
296                    .quantization_config(ScalarQuantizationBuilder::default()),
297            )
298            .await
299            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
300
301        for field in keyword_fields {
302            self.client
303                .create_field_index(CreateFieldIndexCollectionBuilder::new(
304                    collection,
305                    *field,
306                    FieldType::Keyword,
307                ))
308                .await
309                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
310        }
311        Ok(())
312    }
313
314    /// Convert a JSON value to a Qdrant payload map.
315    ///
316    /// # Errors
317    ///
318    /// Returns a JSON error if deserialization fails.
319    pub fn json_to_payload(
320        value: serde_json::Value,
321    ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
322        serde_json::from_value(value)
323    }
324}
325
326impl crate::vector_store::VectorStore for QdrantOps {
327    fn ensure_collection(
328        &self,
329        collection: &str,
330        vector_size: u64,
331    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
332        let collection = collection.to_owned();
333        Box::pin(async move {
334            self.ensure_collection(&collection, vector_size)
335                .await
336                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
337        })
338    }
339
340    fn collection_exists(
341        &self,
342        collection: &str,
343    ) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
344        let collection = collection.to_owned();
345        Box::pin(async move {
346            self.collection_exists(&collection)
347                .await
348                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
349        })
350    }
351
352    fn delete_collection(
353        &self,
354        collection: &str,
355    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
356        let collection = collection.to_owned();
357        Box::pin(async move {
358            self.delete_collection(&collection)
359                .await
360                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
361        })
362    }
363
364    fn upsert(
365        &self,
366        collection: &str,
367        points: Vec<crate::VectorPoint>,
368    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
369        let collection = collection.to_owned();
370        Box::pin(async move {
371            let qdrant_points: Vec<PointStruct> = points
372                .into_iter()
373                .map(|p| {
374                    let payload: HashMap<String, qdrant_client::qdrant::Value> =
375                        serde_json::from_value(serde_json::Value::Object(
376                            p.payload.into_iter().collect(),
377                        ))
378                        .unwrap_or_default();
379                    PointStruct::new(p.id, p.vector, payload)
380                })
381                .collect();
382            self.upsert(&collection, qdrant_points)
383                .await
384                .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
385        })
386    }
387
388    fn search(
389        &self,
390        collection: &str,
391        vector: Vec<f32>,
392        limit: u64,
393        filter: Option<crate::VectorFilter>,
394    ) -> BoxFuture<'_, Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>> {
395        let collection = collection.to_owned();
396        Box::pin(async move {
397            let qdrant_filter = filter.map(vector_filter_to_qdrant);
398            let results = self
399                .search(&collection, vector, limit, qdrant_filter)
400                .await
401                .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
402            Ok(results.into_iter().map(scored_point_to_vector).collect())
403        })
404    }
405
406    fn delete_by_ids(
407        &self,
408        collection: &str,
409        ids: Vec<String>,
410    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
411        let collection = collection.to_owned();
412        Box::pin(async move {
413            let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
414            self.delete_by_ids(&collection, point_ids)
415                .await
416                .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
417        })
418    }
419
420    fn scroll_all(
421        &self,
422        collection: &str,
423        key_field: &str,
424    ) -> BoxFuture<'_, Result<HashMap<String, HashMap<String, String>>, crate::VectorStoreError>>
425    {
426        let collection = collection.to_owned();
427        let key_field = key_field.to_owned();
428        Box::pin(async move {
429            self.scroll_all(&collection, &key_field)
430                .await
431                .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
432        })
433    }
434
435    fn health_check(&self) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
436        Box::pin(async move {
437            self.client
438                .health_check()
439                .await
440                .map(|_| true)
441                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
442        })
443    }
444
445    fn create_keyword_indexes(
446        &self,
447        collection: &str,
448        fields: &[&str],
449    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
450        use qdrant_client::qdrant::{CreateFieldIndexCollectionBuilder, FieldType};
451        let collection = collection.to_owned();
452        let fields: Vec<String> = fields.iter().map(|f| (*f).to_owned()).collect();
453        Box::pin(async move {
454            for field in &fields {
455                self.client
456                    .create_field_index(CreateFieldIndexCollectionBuilder::new(
457                        &collection,
458                        field.as_str(),
459                        FieldType::Keyword,
460                    ))
461                    .await
462                    .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
463            }
464            Ok(())
465        })
466    }
467}
468
469fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
470    let must: Vec<_> = filter
471        .must
472        .into_iter()
473        .map(field_condition_to_qdrant)
474        .collect();
475    let must_not: Vec<_> = filter
476        .must_not
477        .into_iter()
478        .map(field_condition_to_qdrant)
479        .collect();
480
481    let mut f = Filter::default();
482    if !must.is_empty() {
483        f.must = must;
484    }
485    if !must_not.is_empty() {
486        f.must_not = must_not;
487    }
488    f
489}
490
491fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
492    match cond.value {
493        crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
494        crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
495    }
496}
497
498fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
499    let payload: HashMap<String, serde_json::Value> = point
500        .payload
501        .into_iter()
502        .filter_map(|(k, v)| {
503            let json_val = match v.kind? {
504                Kind::StringValue(s) => serde_json::Value::String(s),
505                Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
506                Kind::DoubleValue(d) => {
507                    serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
508                }
509                Kind::BoolValue(b) => serde_json::Value::Bool(b),
510                _ => return None,
511            };
512            Some((k, json_val))
513        })
514        .collect();
515
516    let id = match point.id.and_then(|pid| pid.point_id_options) {
517        Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
518        Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
519        None => String::new(),
520    };
521
522    crate::ScoredVectorPoint {
523        id,
524        score: point.score,
525        payload,
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532
533    #[test]
534    fn new_valid_url() {
535        let ops = QdrantOps::new("http://localhost:6334");
536        assert!(ops.is_ok());
537    }
538
539    #[test]
540    fn new_invalid_url() {
541        let ops = QdrantOps::new("not a valid url");
542        assert!(ops.is_err());
543    }
544
545    #[test]
546    fn debug_format() {
547        let ops = QdrantOps::new("http://localhost:6334").unwrap();
548        let dbg = format!("{ops:?}");
549        assert!(dbg.contains("QdrantOps"));
550    }
551
552    #[test]
553    fn json_to_payload_valid() {
554        let value = serde_json::json!({"key": "value", "num": 42});
555        let result = QdrantOps::json_to_payload(value);
556        assert!(result.is_ok());
557    }
558
559    #[test]
560    fn json_to_payload_empty() {
561        let result = QdrantOps::json_to_payload(serde_json::json!({}));
562        assert!(result.is_ok());
563        assert!(result.unwrap().is_empty());
564    }
565
566    #[test]
567    fn delete_by_ids_empty_is_ok_sync() {
568        // Constructing QdrantOps with a valid URL succeeds even without a live server.
569        // delete_by_ids with empty list short-circuits before any network call.
570        // We validate the early-return logic via the async test below.
571        let ops = QdrantOps::new("http://localhost:6334");
572        assert!(ops.is_ok());
573    }
574
575    /// Requires a live Qdrant instance at localhost:6334.
576    #[tokio::test]
577    #[ignore = "requires a live Qdrant instance at localhost:6334"]
578    async fn ensure_collection_with_quantization_idempotent() {
579        let ops = QdrantOps::new("http://localhost:6334").unwrap();
580        let collection = "test_quant_idempotent";
581
582        // Clean up from any prior run
583        let _ = ops.delete_collection(collection).await;
584
585        // First call — creates collection
586        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
587            .await
588            .unwrap();
589
590        assert!(ops.collection_exists(collection).await.unwrap());
591
592        // Second call — idempotent, must not error
593        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
594            .await
595            .unwrap();
596
597        // Cleanup
598        ops.delete_collection(collection).await.unwrap();
599    }
600
601    /// Requires a live Qdrant instance at localhost:6334.
602    #[tokio::test]
603    #[ignore = "requires a live Qdrant instance at localhost:6334"]
604    async fn delete_by_ids_empty_no_network_call() {
605        let ops = QdrantOps::new("http://localhost:6334").unwrap();
606        // Empty ID list must short-circuit and return Ok without hitting Qdrant.
607        let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
608        assert!(result.is_ok());
609    }
610
611    /// Requires a live Qdrant instance at localhost:6334.
612    #[tokio::test]
613    #[ignore = "requires a live Qdrant instance at localhost:6334"]
614    async fn ensure_collection_idempotent_same_size() {
615        let ops = QdrantOps::new("http://localhost:6334").unwrap();
616        let collection = "test_ensure_idempotent";
617
618        let _ = ops.delete_collection(collection).await;
619
620        ops.ensure_collection(collection, 128).await.unwrap();
621        assert!(ops.collection_exists(collection).await.unwrap());
622
623        // Second call with same size must be a no-op.
624        ops.ensure_collection(collection, 128).await.unwrap();
625        assert!(ops.collection_exists(collection).await.unwrap());
626
627        ops.delete_collection(collection).await.unwrap();
628    }
629
630    /// Requires a live Qdrant instance at localhost:6334.
631    ///
632    /// Verifies that `ensure_collection` detects a vector dimension mismatch and
633    /// recreates the collection instead of silently reusing the wrong-dimension one.
634    #[tokio::test]
635    #[ignore = "requires a live Qdrant instance at localhost:6334"]
636    async fn ensure_collection_recreates_on_dimension_mismatch() {
637        let ops = QdrantOps::new("http://localhost:6334").unwrap();
638        let collection = "test_dim_mismatch";
639
640        let _ = ops.delete_collection(collection).await;
641
642        // Create with 128 dims.
643        ops.ensure_collection(collection, 128).await.unwrap();
644        assert_eq!(
645            ops.get_collection_vector_size(collection).await.unwrap(),
646            Some(128)
647        );
648
649        // Call again with a different size — must recreate.
650        ops.ensure_collection(collection, 256).await.unwrap();
651        assert_eq!(
652            ops.get_collection_vector_size(collection).await.unwrap(),
653            Some(256),
654            "collection must have been recreated with the new dimension"
655        );
656
657        ops.delete_collection(collection).await.unwrap();
658    }
659
660    /// Requires a live Qdrant instance at localhost:6334.
661    ///
662    /// Verifies that `ensure_collection_with_quantization` also detects dimension mismatch.
663    #[tokio::test]
664    #[ignore = "requires a live Qdrant instance at localhost:6334"]
665    async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
666        let ops = QdrantOps::new("http://localhost:6334").unwrap();
667        let collection = "test_quant_dim_mismatch";
668
669        let _ = ops.delete_collection(collection).await;
670
671        ops.ensure_collection_with_quantization(collection, 128, &["language"])
672            .await
673            .unwrap();
674        assert_eq!(
675            ops.get_collection_vector_size(collection).await.unwrap(),
676            Some(128)
677        );
678
679        // Call again with a different size — must recreate.
680        ops.ensure_collection_with_quantization(collection, 384, &["language"])
681            .await
682            .unwrap();
683        assert_eq!(
684            ops.get_collection_vector_size(collection).await.unwrap(),
685            Some(384),
686            "collection must have been recreated with the new dimension"
687        );
688
689        ops.delete_collection(collection).await.unwrap();
690    }
691}