Skip to main content

zeph_memory/
qdrant_ops.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Low-level Qdrant operations shared across crates.
5
6use std::collections::HashMap;
7
8use crate::vector_store::BoxFuture;
9use qdrant_client::Qdrant;
10use qdrant_client::qdrant::{
11    CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
12    PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
13    VectorParamsBuilder, value::Kind,
14};
15
16type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
17
18/// Thin wrapper over [`Qdrant`] client encapsulating common collection operations.
19#[derive(Clone)]
20pub struct QdrantOps {
21    client: Qdrant,
22}
23
24impl std::fmt::Debug for QdrantOps {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("QdrantOps").finish_non_exhaustive()
27    }
28}
29
30impl QdrantOps {
31    /// Create a new `QdrantOps` connected to the given URL.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if the Qdrant client cannot be created.
36    // TODO(#2389): add optional `api_key: Option<String>` parameter and wire it via
37    // `.with_api_key()` on the builder once the Qdrant config struct exposes the field.
38    pub fn new(url: &str) -> QdrantResult<Self> {
39        let client = Qdrant::from_url(url).build().map_err(Box::new)?;
40        Ok(Self { client })
41    }
42
43    /// Access the underlying Qdrant client for advanced operations.
44    #[must_use]
45    pub fn client(&self) -> &Qdrant {
46        &self.client
47    }
48
49    /// Ensure a collection exists with cosine distance vectors.
50    ///
51    /// If the collection already exists but has a different vector dimension than `vector_size`,
52    /// the collection is deleted and recreated. All existing data in the collection is lost.
53    ///
54    /// # Errors
55    ///
56    /// Returns an error if Qdrant cannot be reached or collection creation fails.
57    pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
58        if self
59            .client
60            .collection_exists(collection)
61            .await
62            .map_err(Box::new)?
63        {
64            let existing_size = self.get_collection_vector_size(collection).await?;
65            if existing_size == Some(vector_size) {
66                return Ok(());
67            }
68            tracing::warn!(
69                collection,
70                existing = ?existing_size,
71                required = vector_size,
72                "vector dimension mismatch — recreating collection (existing data will be lost)"
73            );
74            self.client
75                .delete_collection(collection)
76                .await
77                .map_err(Box::new)?;
78        }
79        self.client
80            .create_collection(
81                CreateCollectionBuilder::new(collection)
82                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
83            )
84            .await
85            .map_err(Box::new)?;
86        Ok(())
87    }
88
89    /// Returns the configured vector size of an existing collection, or `None` if it cannot be
90    /// determined (e.g. named-vector collections, or `collection_info` fails gracefully).
91    ///
92    /// # Errors
93    ///
94    /// Returns an error only on hard Qdrant communication failures.
95    async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
96        let info = self
97            .client
98            .collection_info(collection)
99            .await
100            .map_err(Box::new)?;
101        let size = info
102            .result
103            .and_then(|r| r.config)
104            .and_then(|cfg| cfg.params)
105            .and_then(|params| params.vectors_config)
106            .and_then(|vc| vc.config)
107            .and_then(|cfg| match cfg {
108                qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
109                // Named-vector collections are not supported here; treat as unknown.
110                qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
111            });
112        Ok(size)
113    }
114
115    /// Check whether a collection exists.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if Qdrant cannot be reached.
120    pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
121        self.client
122            .collection_exists(collection)
123            .await
124            .map_err(Box::new)
125    }
126
127    /// Delete a collection.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the collection cannot be deleted.
132    pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
133        self.client
134            .delete_collection(collection)
135            .await
136            .map_err(Box::new)?;
137        Ok(())
138    }
139
140    /// Upsert points into a collection.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the upsert fails.
145    pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
146        self.client
147            .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
148            .await
149            .map_err(Box::new)?;
150        Ok(())
151    }
152
153    /// Search for similar vectors, returning scored points with payloads.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the search fails.
158    pub async fn search(
159        &self,
160        collection: &str,
161        vector: Vec<f32>,
162        limit: u64,
163        filter: Option<Filter>,
164    ) -> QdrantResult<Vec<ScoredPoint>> {
165        let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
166        if let Some(f) = filter {
167            builder = builder.filter(f);
168        }
169        let results = self.client.search_points(builder).await.map_err(Box::new)?;
170        Ok(results.result)
171    }
172
173    /// Delete points by their IDs.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the deletion fails.
178    pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
179        if ids.is_empty() {
180            return Ok(());
181        }
182        self.client
183            .delete_points(
184                DeletePointsBuilder::new(collection)
185                    .points(PointsIdsList { ids })
186                    .wait(true),
187            )
188            .await
189            .map_err(Box::new)?;
190        Ok(())
191    }
192
193    /// Scroll all points in a collection, extracting string payload fields.
194    ///
195    /// Returns a map of `key_field` value -> { `field_name` -> `field_value` }.
196    ///
197    /// # Errors
198    ///
199    /// Returns an error if the scroll operation fails.
200    pub async fn scroll_all(
201        &self,
202        collection: &str,
203        key_field: &str,
204    ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
205        let mut result = HashMap::new();
206        let mut offset: Option<PointId> = None;
207
208        loop {
209            let mut builder = ScrollPointsBuilder::new(collection)
210                .with_payload(true)
211                .with_vectors(false)
212                .limit(100);
213
214            if let Some(ref off) = offset {
215                builder = builder.offset(off.clone());
216            }
217
218            let response = self.client.scroll(builder).await.map_err(Box::new)?;
219
220            for point in &response.result {
221                let Some(key_val) = point.payload.get(key_field) else {
222                    continue;
223                };
224                let Some(Kind::StringValue(key)) = &key_val.kind else {
225                    continue;
226                };
227
228                let mut fields = HashMap::new();
229                for (k, val) in &point.payload {
230                    if let Some(Kind::StringValue(s)) = &val.kind {
231                        fields.insert(k.clone(), s.clone());
232                    }
233                }
234                result.insert(key.clone(), fields);
235            }
236
237            match response.next_page_offset {
238                Some(next) => offset = Some(next),
239                None => break,
240            }
241        }
242
243        Ok(result)
244    }
245
246    /// Create a collection with scalar INT8 quantization if it does not exist,
247    /// then create keyword indexes for the given fields.
248    ///
249    /// If the collection already exists but has a different vector dimension than `vector_size`,
250    /// the collection is deleted and recreated. All existing data in the collection is lost.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if any Qdrant operation fails.
255    pub async fn ensure_collection_with_quantization(
256        &self,
257        collection: &str,
258        vector_size: u64,
259        keyword_fields: &[&str],
260    ) -> Result<(), crate::VectorStoreError> {
261        use qdrant_client::qdrant::{
262            CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
263        };
264        if self
265            .client
266            .collection_exists(collection)
267            .await
268            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
269        {
270            let existing_size = self
271                .get_collection_vector_size(collection)
272                .await
273                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
274            if existing_size == Some(vector_size) {
275                return Ok(());
276            }
277            tracing::warn!(
278                collection,
279                existing = ?existing_size,
280                required = vector_size,
281                "vector dimension mismatch — recreating collection (existing data will be lost)"
282            );
283            self.client
284                .delete_collection(collection)
285                .await
286                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
287        }
288        self.client
289            .create_collection(
290                CreateCollectionBuilder::new(collection)
291                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
292                    .quantization_config(ScalarQuantizationBuilder::default()),
293            )
294            .await
295            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
296
297        for field in keyword_fields {
298            self.client
299                .create_field_index(CreateFieldIndexCollectionBuilder::new(
300                    collection,
301                    *field,
302                    FieldType::Keyword,
303                ))
304                .await
305                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
306        }
307        Ok(())
308    }
309
310    /// Convert a JSON value to a Qdrant payload map.
311    ///
312    /// # Errors
313    ///
314    /// Returns a JSON error if deserialization fails.
315    pub fn json_to_payload(
316        value: serde_json::Value,
317    ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
318        serde_json::from_value(value)
319    }
320}
321
322impl crate::vector_store::VectorStore for QdrantOps {
323    fn ensure_collection(
324        &self,
325        collection: &str,
326        vector_size: u64,
327    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
328        let collection = collection.to_owned();
329        Box::pin(async move {
330            self.ensure_collection(&collection, vector_size)
331                .await
332                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
333        })
334    }
335
336    fn collection_exists(
337        &self,
338        collection: &str,
339    ) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
340        let collection = collection.to_owned();
341        Box::pin(async move {
342            self.collection_exists(&collection)
343                .await
344                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
345        })
346    }
347
348    fn delete_collection(
349        &self,
350        collection: &str,
351    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
352        let collection = collection.to_owned();
353        Box::pin(async move {
354            self.delete_collection(&collection)
355                .await
356                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
357        })
358    }
359
360    fn upsert(
361        &self,
362        collection: &str,
363        points: Vec<crate::VectorPoint>,
364    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
365        let collection = collection.to_owned();
366        Box::pin(async move {
367            let qdrant_points: Vec<PointStruct> = points
368                .into_iter()
369                .map(|p| {
370                    let payload: HashMap<String, qdrant_client::qdrant::Value> =
371                        serde_json::from_value(serde_json::Value::Object(
372                            p.payload.into_iter().collect(),
373                        ))
374                        .unwrap_or_default();
375                    PointStruct::new(p.id, p.vector, payload)
376                })
377                .collect();
378            self.upsert(&collection, qdrant_points)
379                .await
380                .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
381        })
382    }
383
384    fn search(
385        &self,
386        collection: &str,
387        vector: Vec<f32>,
388        limit: u64,
389        filter: Option<crate::VectorFilter>,
390    ) -> BoxFuture<'_, Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>> {
391        let collection = collection.to_owned();
392        Box::pin(async move {
393            let qdrant_filter = filter.map(vector_filter_to_qdrant);
394            let results = self
395                .search(&collection, vector, limit, qdrant_filter)
396                .await
397                .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
398            Ok(results.into_iter().map(scored_point_to_vector).collect())
399        })
400    }
401
402    fn delete_by_ids(
403        &self,
404        collection: &str,
405        ids: Vec<String>,
406    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
407        let collection = collection.to_owned();
408        Box::pin(async move {
409            let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
410            self.delete_by_ids(&collection, point_ids)
411                .await
412                .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
413        })
414    }
415
416    fn scroll_all(
417        &self,
418        collection: &str,
419        key_field: &str,
420    ) -> BoxFuture<'_, Result<HashMap<String, HashMap<String, String>>, crate::VectorStoreError>>
421    {
422        let collection = collection.to_owned();
423        let key_field = key_field.to_owned();
424        Box::pin(async move {
425            self.scroll_all(&collection, &key_field)
426                .await
427                .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
428        })
429    }
430
431    fn health_check(&self) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
432        Box::pin(async move {
433            self.client
434                .health_check()
435                .await
436                .map(|_| true)
437                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
438        })
439    }
440
441    fn create_keyword_indexes(
442        &self,
443        collection: &str,
444        fields: &[&str],
445    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
446        use qdrant_client::qdrant::{CreateFieldIndexCollectionBuilder, FieldType};
447        let collection = collection.to_owned();
448        let fields: Vec<String> = fields.iter().map(|f| (*f).to_owned()).collect();
449        Box::pin(async move {
450            for field in &fields {
451                self.client
452                    .create_field_index(CreateFieldIndexCollectionBuilder::new(
453                        &collection,
454                        field.as_str(),
455                        FieldType::Keyword,
456                    ))
457                    .await
458                    .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
459            }
460            Ok(())
461        })
462    }
463}
464
465fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
466    let must: Vec<_> = filter
467        .must
468        .into_iter()
469        .map(field_condition_to_qdrant)
470        .collect();
471    let must_not: Vec<_> = filter
472        .must_not
473        .into_iter()
474        .map(field_condition_to_qdrant)
475        .collect();
476
477    let mut f = Filter::default();
478    if !must.is_empty() {
479        f.must = must;
480    }
481    if !must_not.is_empty() {
482        f.must_not = must_not;
483    }
484    f
485}
486
487fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
488    match cond.value {
489        crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
490        crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
491    }
492}
493
494fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
495    let payload: HashMap<String, serde_json::Value> = point
496        .payload
497        .into_iter()
498        .filter_map(|(k, v)| {
499            let json_val = match v.kind? {
500                Kind::StringValue(s) => serde_json::Value::String(s),
501                Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
502                Kind::DoubleValue(d) => {
503                    serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
504                }
505                Kind::BoolValue(b) => serde_json::Value::Bool(b),
506                _ => return None,
507            };
508            Some((k, json_val))
509        })
510        .collect();
511
512    let id = match point.id.and_then(|pid| pid.point_id_options) {
513        Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
514        Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
515        None => String::new(),
516    };
517
518    crate::ScoredVectorPoint {
519        id,
520        score: point.score,
521        payload,
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528
529    #[test]
530    fn new_valid_url() {
531        let ops = QdrantOps::new("http://localhost:6334");
532        assert!(ops.is_ok());
533    }
534
535    #[test]
536    fn new_invalid_url() {
537        let ops = QdrantOps::new("not a valid url");
538        assert!(ops.is_err());
539    }
540
541    #[test]
542    fn debug_format() {
543        let ops = QdrantOps::new("http://localhost:6334").unwrap();
544        let dbg = format!("{ops:?}");
545        assert!(dbg.contains("QdrantOps"));
546    }
547
548    #[test]
549    fn json_to_payload_valid() {
550        let value = serde_json::json!({"key": "value", "num": 42});
551        let result = QdrantOps::json_to_payload(value);
552        assert!(result.is_ok());
553    }
554
555    #[test]
556    fn json_to_payload_empty() {
557        let result = QdrantOps::json_to_payload(serde_json::json!({}));
558        assert!(result.is_ok());
559        assert!(result.unwrap().is_empty());
560    }
561
562    #[test]
563    fn delete_by_ids_empty_is_ok_sync() {
564        // Constructing QdrantOps with a valid URL succeeds even without a live server.
565        // delete_by_ids with empty list short-circuits before any network call.
566        // We validate the early-return logic via the async test below.
567        let ops = QdrantOps::new("http://localhost:6334");
568        assert!(ops.is_ok());
569    }
570
571    /// Requires a live Qdrant instance at localhost:6334.
572    #[tokio::test]
573    #[ignore = "requires a live Qdrant instance at localhost:6334"]
574    async fn ensure_collection_with_quantization_idempotent() {
575        let ops = QdrantOps::new("http://localhost:6334").unwrap();
576        let collection = "test_quant_idempotent";
577
578        // Clean up from any prior run
579        let _ = ops.delete_collection(collection).await;
580
581        // First call — creates collection
582        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
583            .await
584            .unwrap();
585
586        assert!(ops.collection_exists(collection).await.unwrap());
587
588        // Second call — idempotent, must not error
589        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
590            .await
591            .unwrap();
592
593        // Cleanup
594        ops.delete_collection(collection).await.unwrap();
595    }
596
597    /// Requires a live Qdrant instance at localhost:6334.
598    #[tokio::test]
599    #[ignore = "requires a live Qdrant instance at localhost:6334"]
600    async fn delete_by_ids_empty_no_network_call() {
601        let ops = QdrantOps::new("http://localhost:6334").unwrap();
602        // Empty ID list must short-circuit and return Ok without hitting Qdrant.
603        let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
604        assert!(result.is_ok());
605    }
606
607    /// Requires a live Qdrant instance at localhost:6334.
608    #[tokio::test]
609    #[ignore = "requires a live Qdrant instance at localhost:6334"]
610    async fn ensure_collection_idempotent_same_size() {
611        let ops = QdrantOps::new("http://localhost:6334").unwrap();
612        let collection = "test_ensure_idempotent";
613
614        let _ = ops.delete_collection(collection).await;
615
616        ops.ensure_collection(collection, 128).await.unwrap();
617        assert!(ops.collection_exists(collection).await.unwrap());
618
619        // Second call with same size must be a no-op.
620        ops.ensure_collection(collection, 128).await.unwrap();
621        assert!(ops.collection_exists(collection).await.unwrap());
622
623        ops.delete_collection(collection).await.unwrap();
624    }
625
626    /// Requires a live Qdrant instance at localhost:6334.
627    ///
628    /// Verifies that `ensure_collection` detects a vector dimension mismatch and
629    /// recreates the collection instead of silently reusing the wrong-dimension one.
630    #[tokio::test]
631    #[ignore = "requires a live Qdrant instance at localhost:6334"]
632    async fn ensure_collection_recreates_on_dimension_mismatch() {
633        let ops = QdrantOps::new("http://localhost:6334").unwrap();
634        let collection = "test_dim_mismatch";
635
636        let _ = ops.delete_collection(collection).await;
637
638        // Create with 128 dims.
639        ops.ensure_collection(collection, 128).await.unwrap();
640        assert_eq!(
641            ops.get_collection_vector_size(collection).await.unwrap(),
642            Some(128)
643        );
644
645        // Call again with a different size — must recreate.
646        ops.ensure_collection(collection, 256).await.unwrap();
647        assert_eq!(
648            ops.get_collection_vector_size(collection).await.unwrap(),
649            Some(256),
650            "collection must have been recreated with the new dimension"
651        );
652
653        ops.delete_collection(collection).await.unwrap();
654    }
655
656    /// Requires a live Qdrant instance at localhost:6334.
657    ///
658    /// Verifies that `ensure_collection_with_quantization` also detects dimension mismatch.
659    #[tokio::test]
660    #[ignore = "requires a live Qdrant instance at localhost:6334"]
661    async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
662        let ops = QdrantOps::new("http://localhost:6334").unwrap();
663        let collection = "test_quant_dim_mismatch";
664
665        let _ = ops.delete_collection(collection).await;
666
667        ops.ensure_collection_with_quantization(collection, 128, &["language"])
668            .await
669            .unwrap();
670        assert_eq!(
671            ops.get_collection_vector_size(collection).await.unwrap(),
672            Some(128)
673        );
674
675        // Call again with a different size — must recreate.
676        ops.ensure_collection_with_quantization(collection, 384, &["language"])
677            .await
678            .unwrap();
679        assert_eq!(
680            ops.get_collection_vector_size(collection).await.unwrap(),
681            Some(384),
682            "collection must have been recreated with the new dimension"
683        );
684
685        ops.delete_collection(collection).await.unwrap();
686    }
687}