Skip to main content

zeph_memory/
qdrant_ops.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Low-level Qdrant operations shared across crates.
5
6use std::collections::HashMap;
7
8use qdrant_client::Qdrant;
9use qdrant_client::qdrant::{
10    CreateCollectionBuilder, DeletePointsBuilder, Distance, Filter, PointId, PointStruct,
11    PointsIdsList, ScoredPoint, ScrollPointsBuilder, SearchPointsBuilder, UpsertPointsBuilder,
12    VectorParamsBuilder, value::Kind,
13};
14
15type QdrantResult<T> = Result<T, Box<qdrant_client::QdrantError>>;
16
17/// Thin wrapper over [`Qdrant`] client encapsulating common collection operations.
18#[derive(Clone)]
19pub struct QdrantOps {
20    client: Qdrant,
21}
22
23impl std::fmt::Debug for QdrantOps {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        f.debug_struct("QdrantOps").finish_non_exhaustive()
26    }
27}
28
29impl QdrantOps {
30    /// Create a new `QdrantOps` connected to the given URL.
31    ///
32    /// # Errors
33    ///
34    /// Returns an error if the Qdrant client cannot be created.
35    pub fn new(url: &str) -> QdrantResult<Self> {
36        let client = Qdrant::from_url(url).build().map_err(Box::new)?;
37        Ok(Self { client })
38    }
39
40    /// Access the underlying Qdrant client for advanced operations.
41    #[must_use]
42    pub fn client(&self) -> &Qdrant {
43        &self.client
44    }
45
46    /// Ensure a collection exists with cosine distance vectors.
47    ///
48    /// Idempotent: no-op if the collection already exists.
49    ///
50    /// # Errors
51    ///
52    /// Returns an error if Qdrant cannot be reached or collection creation fails.
53    pub async fn ensure_collection(&self, collection: &str, vector_size: u64) -> QdrantResult<()> {
54        if self
55            .client
56            .collection_exists(collection)
57            .await
58            .map_err(Box::new)?
59        {
60            return Ok(());
61        }
62        self.client
63            .create_collection(
64                CreateCollectionBuilder::new(collection)
65                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine)),
66            )
67            .await
68            .map_err(Box::new)?;
69        Ok(())
70    }
71
72    /// Check whether a collection exists.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if Qdrant cannot be reached.
77    pub async fn collection_exists(&self, collection: &str) -> QdrantResult<bool> {
78        self.client
79            .collection_exists(collection)
80            .await
81            .map_err(Box::new)
82    }
83
84    /// Delete a collection.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the collection cannot be deleted.
89    pub async fn delete_collection(&self, collection: &str) -> QdrantResult<()> {
90        self.client
91            .delete_collection(collection)
92            .await
93            .map_err(Box::new)?;
94        Ok(())
95    }
96
97    /// Upsert points into a collection.
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if the upsert fails.
102    pub async fn upsert(&self, collection: &str, points: Vec<PointStruct>) -> QdrantResult<()> {
103        self.client
104            .upsert_points(UpsertPointsBuilder::new(collection, points).wait(true))
105            .await
106            .map_err(Box::new)?;
107        Ok(())
108    }
109
110    /// Search for similar vectors, returning scored points with payloads.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the search fails.
115    pub async fn search(
116        &self,
117        collection: &str,
118        vector: Vec<f32>,
119        limit: u64,
120        filter: Option<Filter>,
121    ) -> QdrantResult<Vec<ScoredPoint>> {
122        let mut builder = SearchPointsBuilder::new(collection, vector, limit).with_payload(true);
123        if let Some(f) = filter {
124            builder = builder.filter(f);
125        }
126        let results = self.client.search_points(builder).await.map_err(Box::new)?;
127        Ok(results.result)
128    }
129
130    /// Delete points by their IDs.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the deletion fails.
135    pub async fn delete_by_ids(&self, collection: &str, ids: Vec<PointId>) -> QdrantResult<()> {
136        if ids.is_empty() {
137            return Ok(());
138        }
139        self.client
140            .delete_points(
141                DeletePointsBuilder::new(collection)
142                    .points(PointsIdsList { ids })
143                    .wait(true),
144            )
145            .await
146            .map_err(Box::new)?;
147        Ok(())
148    }
149
150    /// Scroll all points in a collection, extracting string payload fields.
151    ///
152    /// Returns a map of `key_field` value -> { `field_name` -> `field_value` }.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if the scroll operation fails.
157    pub async fn scroll_all(
158        &self,
159        collection: &str,
160        key_field: &str,
161    ) -> QdrantResult<HashMap<String, HashMap<String, String>>> {
162        let mut result = HashMap::new();
163        let mut offset: Option<PointId> = None;
164
165        loop {
166            let mut builder = ScrollPointsBuilder::new(collection)
167                .with_payload(true)
168                .with_vectors(false)
169                .limit(100);
170
171            if let Some(ref off) = offset {
172                builder = builder.offset(off.clone());
173            }
174
175            let response = self.client.scroll(builder).await.map_err(Box::new)?;
176
177            for point in &response.result {
178                let Some(key_val) = point.payload.get(key_field) else {
179                    continue;
180                };
181                let Some(Kind::StringValue(key)) = &key_val.kind else {
182                    continue;
183                };
184
185                let mut fields = HashMap::new();
186                for (k, val) in &point.payload {
187                    if let Some(Kind::StringValue(s)) = &val.kind {
188                        fields.insert(k.clone(), s.clone());
189                    }
190                }
191                result.insert(key.clone(), fields);
192            }
193
194            match response.next_page_offset {
195                Some(next) => offset = Some(next),
196                None => break,
197            }
198        }
199
200        Ok(result)
201    }
202
203    /// Create a collection with scalar INT8 quantization if it does not exist,
204    /// then create keyword indexes for the given fields.
205    ///
206    /// Idempotent: no-op if the collection already exists.
207    ///
208    /// # Errors
209    ///
210    /// Returns an error if any Qdrant operation fails.
211    pub async fn ensure_collection_with_quantization(
212        &self,
213        collection: &str,
214        vector_size: u64,
215        keyword_fields: &[&str],
216    ) -> Result<(), crate::VectorStoreError> {
217        use qdrant_client::qdrant::{
218            CreateFieldIndexCollectionBuilder, FieldType, ScalarQuantizationBuilder,
219        };
220        if self
221            .client
222            .collection_exists(collection)
223            .await
224            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?
225        {
226            return Ok(());
227        }
228        self.client
229            .create_collection(
230                CreateCollectionBuilder::new(collection)
231                    .vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
232                    .quantization_config(ScalarQuantizationBuilder::default()),
233            )
234            .await
235            .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
236
237        for field in keyword_fields {
238            self.client
239                .create_field_index(CreateFieldIndexCollectionBuilder::new(
240                    collection,
241                    *field,
242                    FieldType::Keyword,
243                ))
244                .await
245                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))?;
246        }
247        Ok(())
248    }
249
250    /// Convert a JSON value to a Qdrant payload map.
251    ///
252    /// # Errors
253    ///
254    /// Returns a JSON error if deserialization fails.
255    pub fn json_to_payload(
256        value: serde_json::Value,
257    ) -> Result<HashMap<String, qdrant_client::qdrant::Value>, serde_json::Error> {
258        serde_json::from_value(value)
259    }
260}
261
262impl crate::vector_store::VectorStore for QdrantOps {
263    fn ensure_collection(
264        &self,
265        collection: &str,
266        vector_size: u64,
267    ) -> std::pin::Pin<
268        Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
269    > {
270        let collection = collection.to_owned();
271        Box::pin(async move {
272            self.ensure_collection(&collection, vector_size)
273                .await
274                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
275        })
276    }
277
278    fn collection_exists(
279        &self,
280        collection: &str,
281    ) -> std::pin::Pin<
282        Box<dyn std::future::Future<Output = Result<bool, crate::VectorStoreError>> + Send + '_>,
283    > {
284        let collection = collection.to_owned();
285        Box::pin(async move {
286            self.collection_exists(&collection)
287                .await
288                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
289        })
290    }
291
292    fn delete_collection(
293        &self,
294        collection: &str,
295    ) -> std::pin::Pin<
296        Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
297    > {
298        let collection = collection.to_owned();
299        Box::pin(async move {
300            self.delete_collection(&collection)
301                .await
302                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
303        })
304    }
305
306    fn upsert(
307        &self,
308        collection: &str,
309        points: Vec<crate::VectorPoint>,
310    ) -> std::pin::Pin<
311        Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
312    > {
313        let collection = collection.to_owned();
314        Box::pin(async move {
315            let qdrant_points: Vec<PointStruct> = points
316                .into_iter()
317                .map(|p| {
318                    let payload: HashMap<String, qdrant_client::qdrant::Value> =
319                        serde_json::from_value(serde_json::Value::Object(
320                            p.payload.into_iter().collect(),
321                        ))
322                        .unwrap_or_default();
323                    PointStruct::new(p.id, p.vector, payload)
324                })
325                .collect();
326            self.upsert(&collection, qdrant_points)
327                .await
328                .map_err(|e| crate::VectorStoreError::Upsert(e.to_string()))
329        })
330    }
331
332    fn search(
333        &self,
334        collection: &str,
335        vector: Vec<f32>,
336        limit: u64,
337        filter: Option<crate::VectorFilter>,
338    ) -> std::pin::Pin<
339        Box<
340            dyn std::future::Future<
341                    Output = Result<Vec<crate::ScoredVectorPoint>, crate::VectorStoreError>,
342                > + Send
343                + '_,
344        >,
345    > {
346        let collection = collection.to_owned();
347        Box::pin(async move {
348            let qdrant_filter = filter.map(vector_filter_to_qdrant);
349            let results = self
350                .search(&collection, vector, limit, qdrant_filter)
351                .await
352                .map_err(|e| crate::VectorStoreError::Search(e.to_string()))?;
353            Ok(results.into_iter().map(scored_point_to_vector).collect())
354        })
355    }
356
357    fn delete_by_ids(
358        &self,
359        collection: &str,
360        ids: Vec<String>,
361    ) -> std::pin::Pin<
362        Box<dyn std::future::Future<Output = Result<(), crate::VectorStoreError>> + Send + '_>,
363    > {
364        let collection = collection.to_owned();
365        Box::pin(async move {
366            let point_ids: Vec<PointId> = ids.into_iter().map(PointId::from).collect();
367            self.delete_by_ids(&collection, point_ids)
368                .await
369                .map_err(|e| crate::VectorStoreError::Delete(e.to_string()))
370        })
371    }
372
373    fn scroll_all(
374        &self,
375        collection: &str,
376        key_field: &str,
377    ) -> std::pin::Pin<
378        Box<
379            dyn std::future::Future<
380                    Output = Result<
381                        HashMap<String, HashMap<String, String>>,
382                        crate::VectorStoreError,
383                    >,
384                > + Send
385                + '_,
386        >,
387    > {
388        let collection = collection.to_owned();
389        let key_field = key_field.to_owned();
390        Box::pin(async move {
391            self.scroll_all(&collection, &key_field)
392                .await
393                .map_err(|e| crate::VectorStoreError::Scroll(e.to_string()))
394        })
395    }
396
397    fn health_check(
398        &self,
399    ) -> std::pin::Pin<
400        Box<dyn std::future::Future<Output = Result<bool, crate::VectorStoreError>> + Send + '_>,
401    > {
402        Box::pin(async move {
403            self.client
404                .health_check()
405                .await
406                .map(|_| true)
407                .map_err(|e| crate::VectorStoreError::Collection(e.to_string()))
408        })
409    }
410}
411
412fn vector_filter_to_qdrant(filter: crate::VectorFilter) -> Filter {
413    let must: Vec<_> = filter
414        .must
415        .into_iter()
416        .map(field_condition_to_qdrant)
417        .collect();
418    let must_not: Vec<_> = filter
419        .must_not
420        .into_iter()
421        .map(field_condition_to_qdrant)
422        .collect();
423
424    let mut f = Filter::default();
425    if !must.is_empty() {
426        f.must = must;
427    }
428    if !must_not.is_empty() {
429        f.must_not = must_not;
430    }
431    f
432}
433
434fn field_condition_to_qdrant(cond: crate::FieldCondition) -> qdrant_client::qdrant::Condition {
435    match cond.value {
436        crate::FieldValue::Integer(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
437        crate::FieldValue::Text(v) => qdrant_client::qdrant::Condition::matches(cond.field, v),
438    }
439}
440
441fn scored_point_to_vector(point: ScoredPoint) -> crate::ScoredVectorPoint {
442    let payload: HashMap<String, serde_json::Value> = point
443        .payload
444        .into_iter()
445        .filter_map(|(k, v)| {
446            let json_val = match v.kind? {
447                Kind::StringValue(s) => serde_json::Value::String(s),
448                Kind::IntegerValue(i) => serde_json::Value::Number(i.into()),
449                Kind::DoubleValue(d) => {
450                    serde_json::Number::from_f64(d).map(serde_json::Value::Number)?
451                }
452                Kind::BoolValue(b) => serde_json::Value::Bool(b),
453                _ => return None,
454            };
455            Some((k, json_val))
456        })
457        .collect();
458
459    let id = match point.id.and_then(|pid| pid.point_id_options) {
460        Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(u)) => u,
461        Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(n)) => n.to_string(),
462        None => String::new(),
463    };
464
465    crate::ScoredVectorPoint {
466        id,
467        score: point.score,
468        payload,
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475
476    #[test]
477    fn new_valid_url() {
478        let ops = QdrantOps::new("http://localhost:6334");
479        assert!(ops.is_ok());
480    }
481
482    #[test]
483    fn new_invalid_url() {
484        let ops = QdrantOps::new("not a valid url");
485        assert!(ops.is_err());
486    }
487
488    #[test]
489    fn debug_format() {
490        let ops = QdrantOps::new("http://localhost:6334").unwrap();
491        let dbg = format!("{ops:?}");
492        assert!(dbg.contains("QdrantOps"));
493    }
494
495    #[test]
496    fn json_to_payload_valid() {
497        let value = serde_json::json!({"key": "value", "num": 42});
498        let result = QdrantOps::json_to_payload(value);
499        assert!(result.is_ok());
500    }
501
502    #[test]
503    fn json_to_payload_empty() {
504        let result = QdrantOps::json_to_payload(serde_json::json!({}));
505        assert!(result.is_ok());
506        assert!(result.unwrap().is_empty());
507    }
508
509    #[test]
510    fn delete_by_ids_empty_is_ok_sync() {
511        // Constructing QdrantOps with a valid URL succeeds even without a live server.
512        // delete_by_ids with empty list short-circuits before any network call.
513        // We validate the early-return logic via the async test below.
514        let ops = QdrantOps::new("http://localhost:6334");
515        assert!(ops.is_ok());
516    }
517
518    /// Requires a live Qdrant instance at localhost:6334.
519    #[tokio::test]
520    #[ignore]
521    async fn ensure_collection_with_quantization_idempotent() {
522        let ops = QdrantOps::new("http://localhost:6334").unwrap();
523        let collection = "test_quant_idempotent";
524
525        // Clean up from any prior run
526        let _ = ops.delete_collection(collection).await;
527
528        // First call — creates collection
529        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
530            .await
531            .unwrap();
532
533        assert!(ops.collection_exists(collection).await.unwrap());
534
535        // Second call — idempotent, must not error
536        ops.ensure_collection_with_quantization(collection, 128, &["language", "file_path"])
537            .await
538            .unwrap();
539
540        // Cleanup
541        ops.delete_collection(collection).await.unwrap();
542    }
543
544    /// Requires a live Qdrant instance at localhost:6334.
545    #[tokio::test]
546    #[ignore]
547    async fn delete_by_ids_empty_no_network_call() {
548        let ops = QdrantOps::new("http://localhost:6334").unwrap();
549        // Empty ID list must short-circuit and return Ok without hitting Qdrant.
550        let result = ops.delete_by_ids("nonexistent_collection", vec![]).await;
551        assert!(result.is_ok());
552    }
553}