Skip to main content

zeph_memory/
qdrant_ops.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Low-level Qdrant operations shared across crates.
5
6use std::collections::HashMap;
7
8use crate::vector_store::BoxFuture;
9use qdrant_client::Qdrant;
10use qdrant_client::qdrant::{
11    CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
12    PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
13    VectorParamsBuilder, value::Kind,
14};
15
16type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
17
18/// Thin wrapper over [`Qdrant`] client encapsulating common collection operations.
19#[derive(Clone)]
20pub struct QdrantOps {
21    client: Qdrant,
22}
23
24impl std::fmt::Debug for QdrantOps {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("QdrantOps").finish_non_exhaustive()
27    }
28}
29
30impl QdrantOps {
31    /// Create a new `QdrantOps` connected to the given URL.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if the Qdrant client cannot be created.
36    pub fn new(url: &str) -> QdrantResult<Self> {
37        let client = Qdrant::from_url(url).build().map_err(Box::new)?;
38        Ok(Self { client })
39    }
40
41    /// Access the underlying Qdrant client for advanced operations.
42    #[must_use]
43    pub fn client(&self) -> &Qdrant {
44        &self.client
45    }
46
47    /// Ensure a collection exists with cosine distance vectors.
48    ///
49    /// If the collection already exists but has a different vector dimension than `vector_size`,
50    /// the collection is deleted and recreated. All existing data in the collection is lost.
51    ///
52    /// # Errors
53    ///
54    /// Returns an error if Qdrant cannot be reached or collection creation fails.
55    pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
56        if self
57            .client
58            .collection_exists(collection)
59            .await
60            .map_err(Box::new)?
61        {
62            let existing_size = self.get_collection_vector_size(collection).await?;
63            if existing_size == Some(vector_size) {
64                return Ok(());
65            }
66            tracing::warn!(
67                collection,
68                existing = ?existing_size,
69                required = vector_size,
70                "vector dimension mismatch — recreating collection (existing data will be lost)"
71            );
72            self.client
73                .delete_collection(collection)
74                .await
75                .map_err(Box::new)?;
76        }
77        self.client
78            .create_collection(
79                CreateCollectionBuilder::new(collection)
80                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
81            )
82            .await
83            .map_err(Box::new)?;
84        Ok(())
85    }
86
87    /// Returns the configured vector size of an existing collection, or `None` if it cannot be
88    /// determined (e.g. named-vector collections, or `collection_info` fails gracefully).
89    ///
90    /// # Errors
91    ///
92    /// Returns an error only on hard Qdrant communication failures.
93    async fn get_collection_vector_size(&self, collection: &str) -> QdrantResult<Option<u64>> {
94        let info = self
95            .client
96            .collection_info(collection)
97            .await
98            .map_err(Box::new)?;
99        let size = info
100            .result
101            .and_then(|r| r.config)
102            .and_then(|cfg| cfg.params)
103            .and_then(|params| params.vectors_config)
104            .and_then(|vc| vc.config)
105            .and_then(|cfg| match cfg {
106                qdrant_client::qdrant::vectors_config::Config::Params(vp) => Some(vp.size),
107                // Named-vector collections are not supported here; treat as unknown.
108                qdrant_client::qdrant::vectors_config::Config::ParamsMap(_) => None,
109            });
110        Ok(size)
111    }
112
113    /// Check whether a collection exists.
114    ///
115    /// # Errors
116    ///
117    /// Returns an error if Qdrant cannot be reached.
118    pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
119        self.client
120            .collection_exists(collection)
121            .await
122            .map_err(Box::new)
123    }
124
125    /// Delete a collection.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if the collection cannot be deleted.
130    pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
131        self.client
132            .delete_collection(collection)
133            .await
134            .map_err(Box::new)?;
135        Ok(())
136    }
137
138    /// Upsert points into a collection.
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if the upsert fails.
143    pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
144        self.client
145            .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
146            .await
147            .map_err(Box::new)?;
148        Ok(())
149    }
150
151    /// Search for similar vectors, returning scored points with payloads.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the search fails.
156    pub async fn search(
157        &self,
158        collection: &str,
159        vector: Vec<f32>,
160        limit: u64,
161        filter: Option<Filter>,
162    ) -> QdrantResult<Vec<ScoredPoint>> {
163        let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
164        if let Some(f) = filter {
165            builder = builder.filter(f);
166        }
167        let results = self.client.search_points(builder).await.map_err(Box::new)?;
168        Ok(results.result)
169    }
170
171    /// Delete points by their IDs.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the deletion fails.
176    pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
177        if ids.is_empty() {
178            return Ok(());
179        }
180        self.client
181            .delete_points(
182                DeletePointsBuilder::new(collection)
183                    .points(PointsIdsList { ids })
184                    .wait(true),
185            )
186            .await
187            .map_err(Box::new)?;
188        Ok(())
189    }
190
191    /// Scroll all points in a collection, extracting string payload fields.
192    ///
193    /// Returns a map of `key_field` value -> { `field_name` -> `field_value` }.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if the scroll operation fails.
198    pub async fn scroll_all(
199        &self,
200        collection: &str,
201        key_field: &str,
202    ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
203        let mut result = HashMap::new();
204        let mut offset: Option<PointId> = None;
205
206        loop {
207            let mut builder = ScrollPointsBuilder::new(collection)
208                .with_payload(true)
209                .with_vectors(false)
210                .limit(100);
211
212            if let Some(ref off) = offset {
213                builder = builder.offset(off.clone());
214            }
215
216            let response = self.client.scroll(builder).await.map_err(Box::new)?;
217
218            for point in &response.result {
219                let Some(key_val) = point.payload.get(key_field) else {
220                    continue;
221                };
222                let Some(Kind::StringValue(key)) = &key_val.kind else {
223                    continue;
224                };
225
226                let mut fields = HashMap::new();
227                for (k, val) in &point.payload {
228                    if let Some(Kind::StringValue(s)) = &val.kind {
229                        fields.insert(k.clone(), s.clone());
230                    }
231                }
232                result.insert(key.clone(), fields);
233            }
234
235            match response.next_page_offset {
236                Some(next) => offset = Some(next),
237                None => break,
238            }
239        }
240
241        Ok(result)
242    }
243
244    /// Create a collection with scalar INT8 quantization if it does not exist,
245    /// then create keyword indexes for the given fields.
246    ///
247    /// If the collection already exists but has a different vector dimension than `vector_size`,
248    /// the collection is deleted and recreated. All existing data in the collection is lost.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if any Qdrant operation fails.
253    pub async fn ensure_collection_with_quantization(
254        &self,
255        collection: &str,
256        vector_size: u64,
257        keyword_fields: &[&str],
258    ) -> Result<(), crate::VectorStoreError> {
259        use qdrant_client::qdrant::{
260            CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
261        };
262        if self
263            .client
264            .collection_exists(collection)
265            .await
266            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
267        {
268            let existing_size = self
269                .get_collection_vector_size(collection)
270                .await
271                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
272            if existing_size == Some(vector_size) {
273                return Ok(());
274            }
275            tracing::warn!(
276                collection,
277                existing = ?existing_size,
278                required = vector_size,
279                "vector dimension mismatch — recreating collection (existing data will be lost)"
280            );
281            self.client
282                .delete_collection(collection)
283                .await
284                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
285        }
286        self.client
287            .create_collection(
288                CreateCollectionBuilder::new(collection)
289                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
290                    .quantization_config(ScalarQuantizationBuilder::default()),
291            )
292            .await
293            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
294
295        for field in keyword_fields {
296            self.client
297                .create_field_index(CreateFieldIndexCollectionBuilder::new(
298                    collection,
299                    *field,
300                    FieldType::Keyword,
301                ))
302                .await
303                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
304        }
305        Ok(())
306    }
307
308    /// Convert a JSON value to a Qdrant payload map.
309    ///
310    /// # Errors
311    ///
312    /// Returns a JSON error if deserialization fails.
313    pub fn json_to_payload(
314        value: serde_json::Value,
315    ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
316        serde_json::from_value(value)
317    }
318}
319
320impl crate::vector_store::VectorStore for QdrantOps {
321    fn ensure_collection(
322        &self,
323        collection: &str,
324        vector_size: u64,
325    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
326        let collection = collection.to_owned();
327        Box::pin(async move {
328            self.ensure_collection(&collection, vector_size)
329                .await
330                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
331        })
332    }
333
334    fn collection_exists(
335        &self,
336        collection: &str,
337    ) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
338        let collection = collection.to_owned();
339        Box::pin(async move {
340            self.collection_exists(&collection)
341                .await
342                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
343        })
344    }
345
346    fn delete_collection(
347        &self,
348        collection: &str,
349    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
350        let collection = collection.to_owned();
351        Box::pin(async move {
352            self.delete_collection(&collection)
353                .await
354                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
355        })
356    }
357
358    fn upsert(
359        &self,
360        collection: &str,
361        points: Vec<crate::VectorPoint>,
362    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
363        let collection = collection.to_owned();
364        Box::pin(async move {
365            let qdrant_points: Vec<PointStruct> = points
366                .into_iter()
367                .map(|p| {
368                    let payload: HashMap<String, qdrant_client::qdrant::Value> =
369                        serde_json::from_value(serde_json::Value::Object(
370                            p.payload.into_iter().collect(),
371                        ))
372                        .unwrap_or_default();
373                    PointStruct::new(p.id, p.vector, payload)
374                })
375                .collect();
376            self.upsert(&collection, qdrant_points)
377                .await
378                .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
379        })
380    }
381
382    fn search(
383        &self,
384        collection: &str,
385        vector: Vec<f32>,
386        limit: u64,
387        filter: Option<crate::VectorFilter>,
388    ) -> BoxFuture<'_, Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>> {
389        let collection = collection.to_owned();
390        Box::pin(async move {
391            let qdrant_filter = filter.map(vector_filter_to_qdrant);
392            let results = self
393                .search(&collection, vector, limit, qdrant_filter)
394                .await
395                .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
396            Ok(results.into_iter().map(scored_point_to_vector).collect())
397        })
398    }
399
400    fn delete_by_ids(
401        &self,
402        collection: &str,
403        ids: Vec<String>,
404    ) -> BoxFuture<'_, Result<(), crate::VectorStoreError>> {
405        let collection = collection.to_owned();
406        Box::pin(async move {
407            let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
408            self.delete_by_ids(&collection, point_ids)
409                .await
410                .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
411        })
412    }
413
414    fn scroll_all(
415        &self,
416        collection: &str,
417        key_field: &str,
418    ) -> BoxFuture<'_, Result<HashMap<String, HashMap<String, String>>, crate::VectorStoreError>>
419    {
420        let collection = collection.to_owned();
421        let key_field = key_field.to_owned();
422        Box::pin(async move {
423            self.scroll_all(&collection, &key_field)
424                .await
425                .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
426        })
427    }
428
429    fn health_check(&self) -> BoxFuture<'_, Result<bool, crate::VectorStoreError>> {
430        Box::pin(async move {
431            self.client
432                .health_check()
433                .await
434                .map(|_| true)
435                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
436        })
437    }
438}
439
440fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
441    let must: Vec<_> = filter
442        .must
443        .into_iter()
444        .map(field_condition_to_qdrant)
445        .collect();
446    let must_not: Vec<_> = filter
447        .must_not
448        .into_iter()
449        .map(field_condition_to_qdrant)
450        .collect();
451
452    let mut f = Filter::default();
453    if !must.is_empty() {
454        f.must = must;
455    }
456    if !must_not.is_empty() {
457        f.must_not = must_not;
458    }
459    f
460}
461
462fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
463    match cond.value {
464        crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
465        crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
466    }
467}
468
469fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
470    let payload: HashMap<String, serde_json::Value> = point
471        .payload
472        .into_iter()
473        .filter_map(|(k, v)| {
474            let json_val = match v.kind? {
475                Kind::StringValue(s) => serde_json::Value::String(s),
476                Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
477                Kind::DoubleValue(d) => {
478                    serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
479                }
480                Kind::BoolValue(b) => serde_json::Value::Bool(b),
481                _ => return None,
482            };
483            Some((k, json_val))
484        })
485        .collect();
486
487    let id = match point.id.and_then(|pid| pid.point_id_options) {
488        Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
489        Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
490        None => String::new(),
491    };
492
493    crate::ScoredVectorPoint {
494        id,
495        score: point.score,
496        payload,
497    }
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503
504    #[test]
505    fn new_valid_url() {
506        let ops = QdrantOps::new("http://localhost:6334");
507        assert!(ops.is_ok());
508    }
509
510    #[test]
511    fn new_invalid_url() {
512        let ops = QdrantOps::new("not a valid url");
513        assert!(ops.is_err());
514    }
515
516    #[test]
517    fn debug_format() {
518        let ops = QdrantOps::new("http://localhost:6334").unwrap();
519        let dbg = format!("{ops:?}");
520        assert!(dbg.contains("QdrantOps"));
521    }
522
523    #[test]
524    fn json_to_payload_valid() {
525        let value = serde_json::json!({"key": "value", "num": 42});
526        let result = QdrantOps::json_to_payload(value);
527        assert!(result.is_ok());
528    }
529
530    #[test]
531    fn json_to_payload_empty() {
532        let result = QdrantOps::json_to_payload(serde_json::json!({}));
533        assert!(result.is_ok());
534        assert!(result.unwrap().is_empty());
535    }
536
537    #[test]
538    fn delete_by_ids_empty_is_ok_sync() {
539        // Constructing QdrantOps with a valid URL succeeds even without a live server.
540        // delete_by_ids with empty list short-circuits before any network call.
541        // We validate the early-return logic via the async test below.
542        let ops = QdrantOps::new("http://localhost:6334");
543        assert!(ops.is_ok());
544    }
545
546    /// Requires a live Qdrant instance at localhost:6334.
547    #[tokio::test]
548    #[ignore = "requires a live Qdrant instance at localhost:6334"]
549    async fn ensure_collection_with_quantization_idempotent() {
550        let ops = QdrantOps::new("http://localhost:6334").unwrap();
551        let collection = "test_quant_idempotent";
552
553        // Clean up from any prior run
554        let _ = ops.delete_collection(collection).await;
555
556        // First call — creates collection
557        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
558            .await
559            .unwrap();
560
561        assert!(ops.collection_exists(collection).await.unwrap());
562
563        // Second call — idempotent, must not error
564        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
565            .await
566            .unwrap();
567
568        // Cleanup
569        ops.delete_collection(collection).await.unwrap();
570    }
571
572    /// Requires a live Qdrant instance at localhost:6334.
573    #[tokio::test]
574    #[ignore = "requires a live Qdrant instance at localhost:6334"]
575    async fn delete_by_ids_empty_no_network_call() {
576        let ops = QdrantOps::new("http://localhost:6334").unwrap();
577        // Empty ID list must short-circuit and return Ok without hitting Qdrant.
578        let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
579        assert!(result.is_ok());
580    }
581
582    /// Requires a live Qdrant instance at localhost:6334.
583    #[tokio::test]
584    #[ignore = "requires a live Qdrant instance at localhost:6334"]
585    async fn ensure_collection_idempotent_same_size() {
586        let ops = QdrantOps::new("http://localhost:6334").unwrap();
587        let collection = "test_ensure_idempotent";
588
589        let _ = ops.delete_collection(collection).await;
590
591        ops.ensure_collection(collection, 128).await.unwrap();
592        assert!(ops.collection_exists(collection).await.unwrap());
593
594        // Second call with same size must be a no-op.
595        ops.ensure_collection(collection, 128).await.unwrap();
596        assert!(ops.collection_exists(collection).await.unwrap());
597
598        ops.delete_collection(collection).await.unwrap();
599    }
600
601    /// Requires a live Qdrant instance at localhost:6334.
602    ///
603    /// Verifies that `ensure_collection` detects a vector dimension mismatch and
604    /// recreates the collection instead of silently reusing the wrong-dimension one.
605    #[tokio::test]
606    #[ignore = "requires a live Qdrant instance at localhost:6334"]
607    async fn ensure_collection_recreates_on_dimension_mismatch() {
608        let ops = QdrantOps::new("http://localhost:6334").unwrap();
609        let collection = "test_dim_mismatch";
610
611        let _ = ops.delete_collection(collection).await;
612
613        // Create with 128 dims.
614        ops.ensure_collection(collection, 128).await.unwrap();
615        assert_eq!(
616            ops.get_collection_vector_size(collection).await.unwrap(),
617            Some(128)
618        );
619
620        // Call again with a different size — must recreate.
621        ops.ensure_collection(collection, 256).await.unwrap();
622        assert_eq!(
623            ops.get_collection_vector_size(collection).await.unwrap(),
624            Some(256),
625            "collection must have been recreated with the new dimension"
626        );
627
628        ops.delete_collection(collection).await.unwrap();
629    }
630
631    /// Requires a live Qdrant instance at localhost:6334.
632    ///
633    /// Verifies that `ensure_collection_with_quantization` also detects dimension mismatch.
634    #[tokio::test]
635    #[ignore = "requires a live Qdrant instance at localhost:6334"]
636    async fn ensure_collection_with_quantization_recreates_on_dimension_mismatch() {
637        let ops = QdrantOps::new("http://localhost:6334").unwrap();
638        let collection = "test_quant_dim_mismatch";
639
640        let _ = ops.delete_collection(collection).await;
641
642        ops.ensure_collection_with_quantization(collection, 128, &["language"])
643            .await
644            .unwrap();
645        assert_eq!(
646            ops.get_collection_vector_size(collection).await.unwrap(),
647            Some(128)
648        );
649
650        // Call again with a different size — must recreate.
651        ops.ensure_collection_with_quantization(collection, 384, &["language"])
652            .await
653            .unwrap();
654        assert_eq!(
655            ops.get_collection_vector_size(collection).await.unwrap(),
656            Some(384),
657            "collection must have been recreated with the new dimension"
658        );
659
660        ops.delete_collection(collection).await.unwrap();
661    }
662}