manifoldb_vector/store/
point_store.rs

1//! Point store implementation for Qdrant-style vector collections.
2//!
3//! This module provides storage for points with multiple named vectors (dense,
4//! sparse, or multi-vector) and JSON payloads.
5//!
6//! # Storage Tables
7//!
8//! - `point_collections`: Collection metadata (schema)
9//! - `point_payloads`: Point payloads (JSON data)
10//! - `point_dense_vectors`: Dense vectors
11//! - `point_sparse_vectors`: Sparse vectors
12//! - `point_multi_vectors`: Multi-vectors
13//!
14//! # Example
15//!
16//! ```ignore
17//! use manifoldb_vector::store::PointStore;
18//! use manifoldb_vector::types::{CollectionName, CollectionSchema, VectorConfig, Payload, NamedVector};
19//! use manifoldb_core::PointId;
20//!
21//! let store = PointStore::new(engine);
22//!
23//! // Create a collection
24//! let name = CollectionName::new("documents")?;
25//! let schema = CollectionSchema::new()
26//!     .with_vector("dense", VectorConfig::dense(384))
27//!     .with_vector("sparse", VectorConfig::sparse(30522));
28//! store.create_collection(&name, schema)?;
29//!
30//! // Insert a point
31//! let mut payload = Payload::new();
32//! payload.insert("title", "Hello World".into());
33//!
34//! let mut vectors = HashMap::new();
35//! vectors.insert("dense".to_string(), NamedVector::Dense(vec![0.1; 384]));
36//! vectors.insert("sparse".to_string(), NamedVector::Sparse(vec![(100, 0.5)]));
37//!
38//! store.upsert_point(&name, PointId::new(1), payload, vectors)?;
39//! ```
40
41use std::collections::HashMap;
42use std::ops::Bound;
43
44use manifoldb_core::PointId;
45use manifoldb_storage::{Cursor, StorageEngine, Transaction};
46
47use crate::encoding::{
48    decode_point_payload_point_id, encode_collection_key, encode_collection_prefix,
49    encode_dense_vector_collection_prefix, encode_dense_vector_key,
50    encode_dense_vector_point_prefix, encode_multi_vector_collection_prefix,
51    encode_multi_vector_key, encode_multi_vector_point_prefix, encode_point_payload_key,
52    encode_point_payload_prefix, encode_sparse_vector_collection_prefix, encode_sparse_vector_key,
53    encode_sparse_vector_point_prefix,
54};
55use crate::error::VectorError;
56use crate::types::{
57    Collection, CollectionName, CollectionSchema, NamedVector, Payload, VectorConfig, VectorType,
58};
59
60/// Table name for collection metadata.
61const TABLE_COLLECTIONS: &str = "point_collections";
62
63/// Table name for point payloads.
64const TABLE_PAYLOADS: &str = "point_payloads";
65
66/// Table name for dense vectors.
67const TABLE_DENSE_VECTORS: &str = "point_dense_vectors";
68
69/// Table name for sparse vectors.
70const TABLE_SPARSE_VECTORS: &str = "point_sparse_vectors";
71
72/// Table name for multi-vectors.
73const TABLE_MULTI_VECTORS: &str = "point_multi_vectors";
74
75/// A store for points with multiple named vectors.
76///
77/// `PointStore` provides CRUD operations for points organized into named
78/// collections. Each point can have multiple named vectors (dense, sparse,
79/// or multi-vector) and a JSON payload.
80pub struct PointStore<E: StorageEngine> {
81    engine: E,
82}
83
84impl<E: StorageEngine> PointStore<E> {
85    /// Create a new point store with the given storage engine.
86    #[must_use]
87    pub const fn new(engine: E) -> Self {
88        Self { engine }
89    }
90
91    /// Get a reference to the storage engine.
92    #[must_use]
93    pub fn engine(&self) -> &E {
94        &self.engine
95    }
96
97    // ========================================================================
98    // Collection operations
99    // ========================================================================
100
101    /// Create a new collection.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the collection already exists or if the storage
106    /// operation fails.
107    pub fn create_collection(
108        &self,
109        name: &CollectionName,
110        schema: CollectionSchema,
111    ) -> Result<(), VectorError> {
112        let mut tx = self.engine.begin_write()?;
113
114        let key = encode_collection_key(name.as_str());
115
116        // Check if collection already exists
117        if tx.get(TABLE_COLLECTIONS, &key)?.is_some() {
118            return Err(VectorError::InvalidName(format!("collection '{}' already exists", name)));
119        }
120
121        // Store the collection metadata
122        let collection = Collection::new(name.clone(), schema);
123        tx.put(TABLE_COLLECTIONS, &key, &collection.to_bytes()?)?;
124        tx.commit()?;
125
126        Ok(())
127    }
128
129    /// Get a collection by name.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the collection doesn't exist or if the storage
134    /// operation fails.
135    pub fn get_collection(&self, name: &CollectionName) -> Result<Collection, VectorError> {
136        let tx = self.engine.begin_read()?;
137        let key = encode_collection_key(name.as_str());
138
139        let bytes = tx
140            .get(TABLE_COLLECTIONS, &key)?
141            .ok_or_else(|| VectorError::SpaceNotFound(format!("collection '{}'", name)))?;
142
143        Collection::from_bytes(&bytes)
144    }
145
146    /// Delete a collection and all its points.
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if the collection doesn't exist or if the storage
151    /// operation fails.
152    pub fn delete_collection(&self, name: &CollectionName) -> Result<(), VectorError> {
153        let mut tx = self.engine.begin_write()?;
154
155        let collection_key = encode_collection_key(name.as_str());
156
157        // Check if collection exists
158        if tx.get(TABLE_COLLECTIONS, &collection_key)?.is_none() {
159            return Err(VectorError::SpaceNotFound(format!("collection '{}'", name)));
160        }
161
162        // Delete all payloads
163        delete_by_prefix(&mut tx, TABLE_PAYLOADS, &encode_point_payload_prefix(name.as_str()))?;
164
165        // Delete all dense vectors
166        delete_by_prefix(
167            &mut tx,
168            TABLE_DENSE_VECTORS,
169            &encode_dense_vector_collection_prefix(name.as_str()),
170        )?;
171
172        // Delete all sparse vectors
173        delete_by_prefix(
174            &mut tx,
175            TABLE_SPARSE_VECTORS,
176            &encode_sparse_vector_collection_prefix(name.as_str()),
177        )?;
178
179        // Delete all multi-vectors
180        delete_by_prefix(
181            &mut tx,
182            TABLE_MULTI_VECTORS,
183            &encode_multi_vector_collection_prefix(name.as_str()),
184        )?;
185
186        // Delete the collection metadata
187        tx.delete(TABLE_COLLECTIONS, &collection_key)?;
188
189        tx.commit()?;
190        Ok(())
191    }
192
193    /// List all collections.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if the storage operation fails.
198    pub fn list_collections(&self) -> Result<Vec<Collection>, VectorError> {
199        let tx = self.engine.begin_read()?;
200
201        let prefix = encode_collection_prefix();
202        let prefix_end = next_prefix(&prefix);
203
204        let mut cursor = tx.range(
205            TABLE_COLLECTIONS,
206            Bound::Included(prefix.as_slice()),
207            Bound::Excluded(prefix_end.as_slice()),
208        )?;
209
210        let mut collections = Vec::new();
211        while let Some((_, value)) = cursor.next()? {
212            collections.push(Collection::from_bytes(&value)?);
213        }
214
215        Ok(collections)
216    }
217
218    /// Check if a collection exists.
219    ///
220    /// # Errors
221    ///
222    /// Returns an error if the storage operation fails.
223    pub fn collection_exists(&self, name: &CollectionName) -> Result<bool, VectorError> {
224        let tx = self.engine.begin_read()?;
225        let key = encode_collection_key(name.as_str());
226        Ok(tx.get(TABLE_COLLECTIONS, &key)?.is_some())
227    }
228
229    // ========================================================================
230    // Point operations
231    // ========================================================================
232
233    /// Upsert a point (insert or update).
234    ///
235    /// This operation will:
236    /// - Insert the point if it doesn't exist
237    /// - Update the point if it exists, replacing payload and specified vectors
238    /// - Vectors not specified in this call are left unchanged
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if:
243    /// - The collection doesn't exist
244    /// - A vector type doesn't match the schema
245    /// - A vector dimension doesn't match the schema
246    /// - The storage operation fails
247    pub fn upsert_point(
248        &self,
249        collection_name: &CollectionName,
250        point_id: PointId,
251        payload: Payload,
252        vectors: HashMap<String, NamedVector>,
253    ) -> Result<(), VectorError> {
254        // Get collection to validate schema
255        let collection = self.get_collection(collection_name)?;
256        let schema = collection.schema();
257
258        // Validate vectors against schema
259        for (vector_name, vector) in &vectors {
260            if let Some(config) = schema.get_vector(vector_name) {
261                validate_vector(vector, config)?;
262            }
263            // Allow vectors not in schema (flexible schema)
264        }
265
266        let mut tx = self.engine.begin_write()?;
267        let collection_str = collection_name.as_str();
268
269        // Store payload
270        let payload_key = encode_point_payload_key(collection_str, point_id);
271        tx.put(TABLE_PAYLOADS, &payload_key, &payload.to_bytes()?)?;
272
273        // Store each vector
274        for (vector_name, vector) in vectors {
275            match vector {
276                NamedVector::Dense(data) => {
277                    let key = encode_dense_vector_key(collection_str, point_id, &vector_name);
278                    tx.put(TABLE_DENSE_VECTORS, &key, &encode_dense_vector(&data))?;
279                }
280                NamedVector::Sparse(data) => {
281                    let key = encode_sparse_vector_key(collection_str, point_id, &vector_name);
282                    tx.put(TABLE_SPARSE_VECTORS, &key, &encode_sparse_vector(&data))?;
283                }
284                NamedVector::Multi(data) => {
285                    let key = encode_multi_vector_key(collection_str, point_id, &vector_name);
286                    tx.put(TABLE_MULTI_VECTORS, &key, &encode_multi_vector(&data))?;
287                }
288            }
289        }
290
291        tx.commit()?;
292        Ok(())
293    }
294
295    /// Insert a point. Fails if the point already exists.
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if the point already exists, the collection doesn't exist,
300    /// or the storage operation fails.
301    pub fn insert_point(
302        &self,
303        collection_name: &CollectionName,
304        point_id: PointId,
305        payload: Payload,
306        vectors: HashMap<String, NamedVector>,
307    ) -> Result<(), VectorError> {
308        // Check if point exists
309        if self.point_exists(collection_name, point_id)? {
310            return Err(VectorError::Encoding(format!(
311                "point {} already exists in collection '{}'",
312                point_id, collection_name
313            )));
314        }
315
316        self.upsert_point(collection_name, point_id, payload, vectors)
317    }
318
319    /// Get a point's payload.
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if the point doesn't exist or the storage operation fails.
324    pub fn get_payload(
325        &self,
326        collection_name: &CollectionName,
327        point_id: PointId,
328    ) -> Result<Payload, VectorError> {
329        let tx = self.engine.begin_read()?;
330        let key = encode_point_payload_key(collection_name.as_str(), point_id);
331
332        let bytes =
333            tx.get(TABLE_PAYLOADS, &key)?.ok_or_else(|| VectorError::EmbeddingNotFound {
334                entity_id: point_id.as_u64(),
335                space: format!("collection '{}'", collection_name),
336            })?;
337
338        Payload::from_bytes(&bytes)
339    }
340
341    /// Get a specific vector from a point.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the vector doesn't exist or the storage operation fails.
346    pub fn get_vector(
347        &self,
348        collection_name: &CollectionName,
349        point_id: PointId,
350        vector_name: &str,
351    ) -> Result<NamedVector, VectorError> {
352        let tx = self.engine.begin_read()?;
353        let collection_str = collection_name.as_str();
354
355        // Try dense first
356        let dense_key = encode_dense_vector_key(collection_str, point_id, vector_name);
357        if let Some(bytes) = tx.get(TABLE_DENSE_VECTORS, &dense_key)? {
358            return Ok(NamedVector::Dense(decode_dense_vector(&bytes)?));
359        }
360
361        // Try sparse
362        let sparse_key = encode_sparse_vector_key(collection_str, point_id, vector_name);
363        if let Some(bytes) = tx.get(TABLE_SPARSE_VECTORS, &sparse_key)? {
364            return Ok(NamedVector::Sparse(decode_sparse_vector(&bytes)?));
365        }
366
367        // Try multi
368        let multi_key = encode_multi_vector_key(collection_str, point_id, vector_name);
369        if let Some(bytes) = tx.get(TABLE_MULTI_VECTORS, &multi_key)? {
370            return Ok(NamedVector::Multi(decode_multi_vector(&bytes)?));
371        }
372
373        Err(VectorError::EmbeddingNotFound {
374            entity_id: point_id.as_u64(),
375            space: format!("vector '{}' in collection '{}'", vector_name, collection_name),
376        })
377    }
378
379    /// Get all vectors for a point.
380    ///
381    /// # Errors
382    ///
383    /// Returns an error if the storage operation fails.
384    pub fn get_all_vectors(
385        &self,
386        collection_name: &CollectionName,
387        point_id: PointId,
388    ) -> Result<HashMap<String, NamedVector>, VectorError> {
389        let tx = self.engine.begin_read()?;
390        let collection_str = collection_name.as_str();
391        let mut vectors = HashMap::new();
392
393        // Get all dense vectors for this point
394        let dense_prefix = encode_dense_vector_point_prefix(collection_str, point_id);
395        let dense_prefix_end = next_prefix(&dense_prefix);
396        let mut cursor = tx.range(
397            TABLE_DENSE_VECTORS,
398            Bound::Included(dense_prefix.as_slice()),
399            Bound::Excluded(dense_prefix_end.as_slice()),
400        )?;
401        while let Some((key, value)) = cursor.next()? {
402            if let Some(name) = extract_vector_name_from_key(&key, collection_str, point_id) {
403                vectors.insert(name, NamedVector::Dense(decode_dense_vector(&value)?));
404            }
405        }
406        drop(cursor);
407
408        // Get all sparse vectors for this point
409        let sparse_prefix = encode_sparse_vector_point_prefix(collection_str, point_id);
410        let sparse_prefix_end = next_prefix(&sparse_prefix);
411        let mut cursor = tx.range(
412            TABLE_SPARSE_VECTORS,
413            Bound::Included(sparse_prefix.as_slice()),
414            Bound::Excluded(sparse_prefix_end.as_slice()),
415        )?;
416        while let Some((key, value)) = cursor.next()? {
417            if let Some(name) = extract_vector_name_from_key(&key, collection_str, point_id) {
418                vectors.insert(name, NamedVector::Sparse(decode_sparse_vector(&value)?));
419            }
420        }
421        drop(cursor);
422
423        // Get all multi-vectors for this point
424        let multi_prefix = encode_multi_vector_point_prefix(collection_str, point_id);
425        let multi_prefix_end = next_prefix(&multi_prefix);
426        let mut cursor = tx.range(
427            TABLE_MULTI_VECTORS,
428            Bound::Included(multi_prefix.as_slice()),
429            Bound::Excluded(multi_prefix_end.as_slice()),
430        )?;
431        while let Some((key, value)) = cursor.next()? {
432            if let Some(name) = extract_vector_name_from_key(&key, collection_str, point_id) {
433                vectors.insert(name, NamedVector::Multi(decode_multi_vector(&value)?));
434            }
435        }
436
437        Ok(vectors)
438    }
439
440    /// Update a point's payload without touching vectors.
441    ///
442    /// # Errors
443    ///
444    /// Returns an error if the point doesn't exist or the storage operation fails.
445    pub fn update_payload(
446        &self,
447        collection_name: &CollectionName,
448        point_id: PointId,
449        payload: Payload,
450    ) -> Result<(), VectorError> {
451        // Check point exists
452        if !self.point_exists(collection_name, point_id)? {
453            return Err(VectorError::EmbeddingNotFound {
454                entity_id: point_id.as_u64(),
455                space: format!("collection '{}'", collection_name),
456            });
457        }
458
459        let mut tx = self.engine.begin_write()?;
460        let key = encode_point_payload_key(collection_name.as_str(), point_id);
461        tx.put(TABLE_PAYLOADS, &key, &payload.to_bytes()?)?;
462        tx.commit()?;
463
464        Ok(())
465    }
466
467    /// Update a specific vector without touching the payload or other vectors.
468    ///
469    /// # Errors
470    ///
471    /// Returns an error if the collection doesn't exist or the storage operation fails.
472    pub fn update_vector(
473        &self,
474        collection_name: &CollectionName,
475        point_id: PointId,
476        vector_name: &str,
477        vector: NamedVector,
478    ) -> Result<(), VectorError> {
479        // Validate against schema if defined
480        let collection = self.get_collection(collection_name)?;
481        if let Some(config) = collection.schema().get_vector(vector_name) {
482            validate_vector(&vector, config)?;
483        }
484
485        let mut tx = self.engine.begin_write()?;
486        let collection_str = collection_name.as_str();
487
488        match vector {
489            NamedVector::Dense(data) => {
490                let key = encode_dense_vector_key(collection_str, point_id, vector_name);
491                tx.put(TABLE_DENSE_VECTORS, &key, &encode_dense_vector(&data))?;
492            }
493            NamedVector::Sparse(data) => {
494                let key = encode_sparse_vector_key(collection_str, point_id, vector_name);
495                tx.put(TABLE_SPARSE_VECTORS, &key, &encode_sparse_vector(&data))?;
496            }
497            NamedVector::Multi(data) => {
498                let key = encode_multi_vector_key(collection_str, point_id, vector_name);
499                tx.put(TABLE_MULTI_VECTORS, &key, &encode_multi_vector(&data))?;
500            }
501        }
502
503        tx.commit()?;
504        Ok(())
505    }
506
507    /// Delete a point and all its vectors.
508    ///
509    /// # Returns
510    ///
511    /// Returns `Ok(true)` if the point was deleted, `Ok(false)` if it didn't exist.
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if the storage operation fails.
516    pub fn delete_point(
517        &self,
518        collection_name: &CollectionName,
519        point_id: PointId,
520    ) -> Result<bool, VectorError> {
521        let mut tx = self.engine.begin_write()?;
522        let collection_str = collection_name.as_str();
523
524        // Delete payload
525        let payload_key = encode_point_payload_key(collection_str, point_id);
526        let existed = tx.delete(TABLE_PAYLOADS, &payload_key)?;
527
528        // Delete all dense vectors for this point
529        delete_by_prefix(
530            &mut tx,
531            TABLE_DENSE_VECTORS,
532            &encode_dense_vector_point_prefix(collection_str, point_id),
533        )?;
534
535        // Delete all sparse vectors for this point
536        delete_by_prefix(
537            &mut tx,
538            TABLE_SPARSE_VECTORS,
539            &encode_sparse_vector_point_prefix(collection_str, point_id),
540        )?;
541
542        // Delete all multi-vectors for this point
543        delete_by_prefix(
544            &mut tx,
545            TABLE_MULTI_VECTORS,
546            &encode_multi_vector_point_prefix(collection_str, point_id),
547        )?;
548
549        tx.commit()?;
550        Ok(existed)
551    }
552
553    /// Delete a specific vector from a point.
554    ///
555    /// # Returns
556    ///
557    /// Returns `Ok(true)` if the vector was deleted, `Ok(false)` if it didn't exist.
558    ///
559    /// # Errors
560    ///
561    /// Returns an error if the storage operation fails.
562    pub fn delete_vector(
563        &self,
564        collection_name: &CollectionName,
565        point_id: PointId,
566        vector_name: &str,
567    ) -> Result<bool, VectorError> {
568        let mut tx = self.engine.begin_write()?;
569        let collection_str = collection_name.as_str();
570
571        // Try to delete from each vector table
572        let dense_key = encode_dense_vector_key(collection_str, point_id, vector_name);
573        if tx.delete(TABLE_DENSE_VECTORS, &dense_key)? {
574            tx.commit()?;
575            return Ok(true);
576        }
577
578        let sparse_key = encode_sparse_vector_key(collection_str, point_id, vector_name);
579        if tx.delete(TABLE_SPARSE_VECTORS, &sparse_key)? {
580            tx.commit()?;
581            return Ok(true);
582        }
583
584        let multi_key = encode_multi_vector_key(collection_str, point_id, vector_name);
585        if tx.delete(TABLE_MULTI_VECTORS, &multi_key)? {
586            tx.commit()?;
587            return Ok(true);
588        }
589
590        tx.commit()?;
591        Ok(false)
592    }
593
594    /// Check if a point exists.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if the storage operation fails.
599    pub fn point_exists(
600        &self,
601        collection_name: &CollectionName,
602        point_id: PointId,
603    ) -> Result<bool, VectorError> {
604        let tx = self.engine.begin_read()?;
605        let key = encode_point_payload_key(collection_name.as_str(), point_id);
606        Ok(tx.get(TABLE_PAYLOADS, &key)?.is_some())
607    }
608
609    /// List all point IDs in a collection.
610    ///
611    /// # Errors
612    ///
613    /// Returns an error if the storage operation fails.
614    pub fn list_points(
615        &self,
616        collection_name: &CollectionName,
617    ) -> Result<Vec<PointId>, VectorError> {
618        let tx = self.engine.begin_read()?;
619
620        let prefix = encode_point_payload_prefix(collection_name.as_str());
621        let prefix_end = next_prefix(&prefix);
622
623        let mut cursor = tx.range(
624            TABLE_PAYLOADS,
625            Bound::Included(prefix.as_slice()),
626            Bound::Excluded(prefix_end.as_slice()),
627        )?;
628
629        let mut points = Vec::new();
630        while let Some((key, _)) = cursor.next()? {
631            if let Some(point_id) = decode_point_payload_point_id(&key) {
632                points.push(point_id);
633            }
634        }
635
636        Ok(points)
637    }
638
639    /// Count the number of points in a collection.
640    ///
641    /// # Errors
642    ///
643    /// Returns an error if the storage operation fails.
644    pub fn count_points(&self, collection_name: &CollectionName) -> Result<usize, VectorError> {
645        let tx = self.engine.begin_read()?;
646
647        let prefix = encode_point_payload_prefix(collection_name.as_str());
648        let prefix_end = next_prefix(&prefix);
649
650        let mut cursor = tx.range(
651            TABLE_PAYLOADS,
652            Bound::Included(prefix.as_slice()),
653            Bound::Excluded(prefix_end.as_slice()),
654        )?;
655
656        let mut count = 0;
657        while cursor.next()?.is_some() {
658            count += 1;
659        }
660
661        Ok(count)
662    }
663
664    /// Get multiple points at once.
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if the storage operation fails.
669    pub fn get_points(
670        &self,
671        collection_name: &CollectionName,
672        point_ids: &[PointId],
673    ) -> Result<Vec<(PointId, Option<Payload>)>, VectorError> {
674        let tx = self.engine.begin_read()?;
675
676        let mut results = Vec::with_capacity(point_ids.len());
677
678        for &point_id in point_ids {
679            let key = encode_point_payload_key(collection_name.as_str(), point_id);
680            let payload = tx
681                .get(TABLE_PAYLOADS, &key)?
682                .map(|bytes| Payload::from_bytes(&bytes))
683                .transpose()?;
684
685            results.push((point_id, payload));
686        }
687
688        Ok(results)
689    }
690}
691
692// ============================================================================
693// Helper functions
694// ============================================================================
695
696/// Calculate the next prefix for range scanning.
697fn next_prefix(prefix: &[u8]) -> Vec<u8> {
698    let mut result = prefix.to_vec();
699
700    for byte in result.iter_mut().rev() {
701        if *byte < 0xFF {
702            *byte += 1;
703            return result;
704        }
705    }
706
707    result.push(0xFF);
708    result
709}
710
711/// Delete all keys matching a prefix.
712fn delete_by_prefix<T: Transaction>(
713    tx: &mut T,
714    table: &str,
715    prefix: &[u8],
716) -> Result<(), VectorError> {
717    let prefix_end = next_prefix(prefix);
718
719    let mut keys_to_delete = Vec::new();
720    {
721        let mut cursor =
722            tx.range(table, Bound::Included(prefix), Bound::Excluded(prefix_end.as_slice()))?;
723
724        while let Some((key, _)) = cursor.next()? {
725            keys_to_delete.push(key);
726        }
727    }
728
729    for key in keys_to_delete {
730        tx.delete(table, &key)?;
731    }
732
733    Ok(())
734}
735
736/// Validate a vector against its configuration.
737fn validate_vector(vector: &NamedVector, config: &VectorConfig) -> Result<(), VectorError> {
738    match (vector, config.vector_type) {
739        (NamedVector::Dense(data), VectorType::Dense) => {
740            if data.len() != config.dimension as usize {
741                return Err(VectorError::DimensionMismatch {
742                    expected: config.dimension as usize,
743                    actual: data.len(),
744                });
745            }
746        }
747        (NamedVector::Sparse(data), VectorType::Sparse) => {
748            // Check all indices are within bounds
749            for &(idx, _) in data {
750                if idx >= config.dimension {
751                    return Err(VectorError::Encoding(format!(
752                        "sparse vector index {} exceeds max dimension {}",
753                        idx, config.dimension
754                    )));
755                }
756            }
757        }
758        (NamedVector::Multi(data), VectorType::Multi) => {
759            // Check all inner vectors have the correct dimension
760            for (i, inner) in data.iter().enumerate() {
761                if inner.len() != config.dimension as usize {
762                    return Err(VectorError::Encoding(format!(
763                        "multi-vector inner vector {} has dimension {} but expected {}",
764                        i,
765                        inner.len(),
766                        config.dimension
767                    )));
768                }
769            }
770        }
771        (actual, expected) => {
772            return Err(VectorError::Encoding(format!(
773                "vector type mismatch: expected {:?}, got {:?}",
774                expected,
775                actual.vector_type()
776            )));
777        }
778    }
779
780    Ok(())
781}
782
783/// Extract the vector name from a vector key.
784///
785/// We need to reverse the hash to get the name. Since we can't do that,
786/// we store the name in the collection schema and look it up by hash.
787/// For now, we return None since we need the schema to do the reverse lookup.
788fn extract_vector_name_from_key(
789    _key: &[u8],
790    _collection: &str,
791    _point_id: PointId,
792) -> Option<String> {
793    // Vector keys are: [prefix][collection_hash][point_id][vector_name_hash]
794    // We can't reverse a hash, so we need a different approach.
795    // For now, this function isn't used in production - vectors are looked up by name.
796    None
797}
798
799// ============================================================================
800// Vector encoding/decoding
801// ============================================================================
802
803/// Encode a dense vector to bytes.
804fn encode_dense_vector(data: &[f32]) -> Vec<u8> {
805    let mut bytes = Vec::with_capacity(4 + data.len() * 4);
806    bytes.extend_from_slice(&(data.len() as u32).to_be_bytes());
807    for &value in data {
808        bytes.extend_from_slice(&value.to_le_bytes());
809    }
810    bytes
811}
812
813/// Decode a dense vector from bytes.
814fn decode_dense_vector(bytes: &[u8]) -> Result<Vec<f32>, VectorError> {
815    if bytes.len() < 4 {
816        return Err(VectorError::Encoding("truncated dense vector".to_string()));
817    }
818
819    let count = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
820    let expected_len = 4 + count * 4;
821
822    if bytes.len() != expected_len {
823        return Err(VectorError::Encoding(format!(
824            "dense vector length mismatch: expected {}, got {}",
825            expected_len,
826            bytes.len()
827        )));
828    }
829
830    let mut data = Vec::with_capacity(count);
831    for i in 0..count {
832        let offset = 4 + i * 4;
833        let value = f32::from_le_bytes([
834            bytes[offset],
835            bytes[offset + 1],
836            bytes[offset + 2],
837            bytes[offset + 3],
838        ]);
839        data.push(value);
840    }
841
842    Ok(data)
843}
844
845/// Encode a sparse vector to bytes.
846fn encode_sparse_vector(data: &[(u32, f32)]) -> Vec<u8> {
847    let mut bytes = Vec::with_capacity(4 + data.len() * 8);
848    bytes.extend_from_slice(&(data.len() as u32).to_be_bytes());
849    for &(idx, value) in data {
850        bytes.extend_from_slice(&idx.to_be_bytes());
851        bytes.extend_from_slice(&value.to_le_bytes());
852    }
853    bytes
854}
855
856/// Decode a sparse vector from bytes.
857fn decode_sparse_vector(bytes: &[u8]) -> Result<Vec<(u32, f32)>, VectorError> {
858    if bytes.len() < 4 {
859        return Err(VectorError::Encoding("truncated sparse vector".to_string()));
860    }
861
862    let count = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
863    let expected_len = 4 + count * 8;
864
865    if bytes.len() != expected_len {
866        return Err(VectorError::Encoding(format!(
867            "sparse vector length mismatch: expected {}, got {}",
868            expected_len,
869            bytes.len()
870        )));
871    }
872
873    let mut data = Vec::with_capacity(count);
874    for i in 0..count {
875        let offset = 4 + i * 8;
876        let idx = u32::from_be_bytes([
877            bytes[offset],
878            bytes[offset + 1],
879            bytes[offset + 2],
880            bytes[offset + 3],
881        ]);
882        let value = f32::from_le_bytes([
883            bytes[offset + 4],
884            bytes[offset + 5],
885            bytes[offset + 6],
886            bytes[offset + 7],
887        ]);
888        data.push((idx, value));
889    }
890
891    Ok(data)
892}
893
894/// Encode a multi-vector to bytes.
895fn encode_multi_vector(data: &[Vec<f32>]) -> Vec<u8> {
896    // Format: count (u32) + dimension (u32) + flat f32 data
897    if data.is_empty() {
898        return vec![0, 0, 0, 0, 0, 0, 0, 0];
899    }
900
901    let count = data.len();
902    let dimension = data[0].len();
903    let mut bytes = Vec::with_capacity(8 + count * dimension * 4);
904
905    bytes.extend_from_slice(&(count as u32).to_be_bytes());
906    bytes.extend_from_slice(&(dimension as u32).to_be_bytes());
907
908    for inner in data {
909        for &value in inner {
910            bytes.extend_from_slice(&value.to_le_bytes());
911        }
912    }
913
914    bytes
915}
916
917/// Decode a multi-vector from bytes.
918fn decode_multi_vector(bytes: &[u8]) -> Result<Vec<Vec<f32>>, VectorError> {
919    if bytes.len() < 8 {
920        return Err(VectorError::Encoding("truncated multi-vector".to_string()));
921    }
922
923    let count = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
924    let dimension = u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]) as usize;
925
926    if count == 0 {
927        return Ok(Vec::new());
928    }
929
930    let expected_len = 8 + count * dimension * 4;
931    if bytes.len() != expected_len {
932        return Err(VectorError::Encoding(format!(
933            "multi-vector length mismatch: expected {}, got {}",
934            expected_len,
935            bytes.len()
936        )));
937    }
938
939    let mut data = Vec::with_capacity(count);
940    for i in 0..count {
941        let mut inner = Vec::with_capacity(dimension);
942        for j in 0..dimension {
943            let offset = 8 + (i * dimension + j) * 4;
944            let value = f32::from_le_bytes([
945                bytes[offset],
946                bytes[offset + 1],
947                bytes[offset + 2],
948                bytes[offset + 3],
949            ]);
950            inner.push(value);
951        }
952        data.push(inner);
953    }
954
955    Ok(data)
956}
957
958#[cfg(test)]
959mod tests {
960    use super::*;
961    use manifoldb_storage::backends::RedbEngine;
962    use serde_json::json;
963    use std::sync::atomic::{AtomicUsize, Ordering};
964
965    static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
966
967    fn create_test_store() -> PointStore<RedbEngine> {
968        let engine = RedbEngine::in_memory().unwrap();
969        PointStore::new(engine)
970    }
971
972    fn unique_collection_name() -> CollectionName {
973        let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
974        CollectionName::new(format!("test_collection_{}", count)).unwrap()
975    }
976
977    #[test]
978    fn create_and_get_collection() {
979        let store = create_test_store();
980        let name = unique_collection_name();
981        let schema = CollectionSchema::new()
982            .with_vector("dense", VectorConfig::dense(384))
983            .with_vector("sparse", VectorConfig::sparse(30522));
984
985        store.create_collection(&name, schema.clone()).unwrap();
986
987        let collection = store.get_collection(&name).unwrap();
988        assert_eq!(collection.name().as_str(), name.as_str());
989        assert_eq!(collection.schema().len(), 2);
990    }
991
992    #[test]
993    fn create_duplicate_collection_fails() {
994        let store = create_test_store();
995        let name = unique_collection_name();
996        let schema = CollectionSchema::new();
997
998        store.create_collection(&name, schema.clone()).unwrap();
999        let result = store.create_collection(&name, schema);
1000
1001        assert!(result.is_err());
1002    }
1003
1004    #[test]
1005    fn list_collections() {
1006        let store = create_test_store();
1007
1008        let name1 = unique_collection_name();
1009        let name2 = unique_collection_name();
1010
1011        store.create_collection(&name1, CollectionSchema::new()).unwrap();
1012        store.create_collection(&name2, CollectionSchema::new()).unwrap();
1013
1014        let collections = store.list_collections().unwrap();
1015        assert!(collections.len() >= 2);
1016    }
1017
1018    #[test]
1019    fn delete_collection() {
1020        let store = create_test_store();
1021        let name = unique_collection_name();
1022
1023        store.create_collection(&name, CollectionSchema::new()).unwrap();
1024
1025        // Add a point
1026        let mut vectors = HashMap::new();
1027        vectors.insert("v".to_string(), NamedVector::Dense(vec![0.1, 0.2]));
1028        store.upsert_point(&name, PointId::new(1), Payload::new(), vectors).unwrap();
1029
1030        // Delete collection
1031        store.delete_collection(&name).unwrap();
1032
1033        // Collection should not exist
1034        assert!(!store.collection_exists(&name).unwrap());
1035    }
1036
1037    #[test]
1038    fn upsert_and_get_point() {
1039        let store = create_test_store();
1040        let name = unique_collection_name();
1041        let schema = CollectionSchema::new().with_vector("dense", VectorConfig::dense(3));
1042
1043        store.create_collection(&name, schema).unwrap();
1044
1045        // Create payload
1046        let mut payload = Payload::new();
1047        payload.insert("title", json!("Test Document"));
1048        payload.insert("count", json!(42));
1049
1050        // Create vectors
1051        let mut vectors = HashMap::new();
1052        vectors.insert("dense".to_string(), NamedVector::Dense(vec![0.1, 0.2, 0.3]));
1053
1054        // Upsert
1055        store.upsert_point(&name, PointId::new(1), payload, vectors).unwrap();
1056
1057        // Get payload
1058        let retrieved_payload = store.get_payload(&name, PointId::new(1)).unwrap();
1059        assert_eq!(retrieved_payload.get("title"), Some(&json!("Test Document")));
1060
1061        // Get vector
1062        let retrieved_vector = store.get_vector(&name, PointId::new(1), "dense").unwrap();
1063        assert_eq!(retrieved_vector.as_dense(), Some(&[0.1, 0.2, 0.3][..]));
1064    }
1065
1066    #[test]
1067    fn upsert_updates_existing_point() {
1068        let store = create_test_store();
1069        let name = unique_collection_name();
1070        store.create_collection(&name, CollectionSchema::new()).unwrap();
1071
1072        // First upsert
1073        let mut payload1 = Payload::new();
1074        payload1.insert("version", json!(1));
1075
1076        let mut vectors1 = HashMap::new();
1077        vectors1.insert("v".to_string(), NamedVector::Dense(vec![1.0]));
1078
1079        store.upsert_point(&name, PointId::new(1), payload1, vectors1).unwrap();
1080
1081        // Second upsert (update)
1082        let mut payload2 = Payload::new();
1083        payload2.insert("version", json!(2));
1084
1085        let mut vectors2 = HashMap::new();
1086        vectors2.insert("v".to_string(), NamedVector::Dense(vec![2.0]));
1087
1088        store.upsert_point(&name, PointId::new(1), payload2, vectors2).unwrap();
1089
1090        // Check updated values
1091        let payload = store.get_payload(&name, PointId::new(1)).unwrap();
1092        assert_eq!(payload.get("version"), Some(&json!(2)));
1093
1094        let vector = store.get_vector(&name, PointId::new(1), "v").unwrap();
1095        assert_eq!(vector.as_dense(), Some(&[2.0][..]));
1096    }
1097
1098    #[test]
1099    fn insert_duplicate_fails() {
1100        let store = create_test_store();
1101        let name = unique_collection_name();
1102        store.create_collection(&name, CollectionSchema::new()).unwrap();
1103
1104        store.insert_point(&name, PointId::new(1), Payload::new(), HashMap::new()).unwrap();
1105
1106        let result = store.insert_point(&name, PointId::new(1), Payload::new(), HashMap::new());
1107        assert!(result.is_err());
1108    }
1109
1110    #[test]
1111    fn multi_vector_point() {
1112        let store = create_test_store();
1113        let name = unique_collection_name();
1114        let schema = CollectionSchema::new()
1115            .with_vector("dense", VectorConfig::dense(3))
1116            .with_vector("sparse", VectorConfig::sparse(1000))
1117            .with_vector("multi", VectorConfig::multi(2));
1118
1119        store.create_collection(&name, schema).unwrap();
1120
1121        let mut vectors = HashMap::new();
1122        vectors.insert("dense".to_string(), NamedVector::Dense(vec![0.1, 0.2, 0.3]));
1123        vectors.insert("sparse".to_string(), NamedVector::Sparse(vec![(10, 0.5), (50, 0.3)]));
1124        vectors
1125            .insert("multi".to_string(), NamedVector::Multi(vec![vec![0.1, 0.2], vec![0.3, 0.4]]));
1126
1127        store.upsert_point(&name, PointId::new(1), Payload::new(), vectors).unwrap();
1128
1129        // Retrieve each vector
1130        let dense = store.get_vector(&name, PointId::new(1), "dense").unwrap();
1131        assert!(dense.as_dense().is_some());
1132
1133        let sparse = store.get_vector(&name, PointId::new(1), "sparse").unwrap();
1134        assert!(sparse.as_sparse().is_some());
1135
1136        let multi = store.get_vector(&name, PointId::new(1), "multi").unwrap();
1137        assert!(multi.as_multi().is_some());
1138    }
1139
1140    #[test]
1141    fn update_individual_vector() {
1142        let store = create_test_store();
1143        let name = unique_collection_name();
1144        store.create_collection(&name, CollectionSchema::new()).unwrap();
1145
1146        // Insert with vector v1
1147        let mut vectors = HashMap::new();
1148        vectors.insert("v1".to_string(), NamedVector::Dense(vec![1.0, 2.0]));
1149        store.upsert_point(&name, PointId::new(1), Payload::new(), vectors).unwrap();
1150
1151        // Update v1
1152        store
1153            .update_vector(&name, PointId::new(1), "v1", NamedVector::Dense(vec![3.0, 4.0]))
1154            .unwrap();
1155
1156        let v1 = store.get_vector(&name, PointId::new(1), "v1").unwrap();
1157        assert_eq!(v1.as_dense(), Some(&[3.0, 4.0][..]));
1158    }
1159
1160    #[test]
1161    fn delete_point() {
1162        let store = create_test_store();
1163        let name = unique_collection_name();
1164        store.create_collection(&name, CollectionSchema::new()).unwrap();
1165
1166        let mut vectors = HashMap::new();
1167        vectors.insert("v".to_string(), NamedVector::Dense(vec![0.1]));
1168        store.upsert_point(&name, PointId::new(1), Payload::new(), vectors).unwrap();
1169
1170        assert!(store.point_exists(&name, PointId::new(1)).unwrap());
1171        assert!(store.delete_point(&name, PointId::new(1)).unwrap());
1172        assert!(!store.point_exists(&name, PointId::new(1)).unwrap());
1173
1174        // Delete again returns false
1175        assert!(!store.delete_point(&name, PointId::new(1)).unwrap());
1176    }
1177
1178    #[test]
1179    fn delete_vector() {
1180        let store = create_test_store();
1181        let name = unique_collection_name();
1182        store.create_collection(&name, CollectionSchema::new()).unwrap();
1183
1184        let mut vectors = HashMap::new();
1185        vectors.insert("v1".to_string(), NamedVector::Dense(vec![1.0]));
1186        vectors.insert("v2".to_string(), NamedVector::Dense(vec![2.0]));
1187        store.upsert_point(&name, PointId::new(1), Payload::new(), vectors).unwrap();
1188
1189        assert!(store.delete_vector(&name, PointId::new(1), "v1").unwrap());
1190
1191        // v1 should be gone
1192        assert!(store.get_vector(&name, PointId::new(1), "v1").is_err());
1193
1194        // v2 should still exist
1195        assert!(store.get_vector(&name, PointId::new(1), "v2").is_ok());
1196    }
1197
1198    #[test]
1199    fn list_and_count_points() {
1200        let store = create_test_store();
1201        let name = unique_collection_name();
1202        store.create_collection(&name, CollectionSchema::new()).unwrap();
1203
1204        for i in 1..=5 {
1205            store.insert_point(&name, PointId::new(i), Payload::new(), HashMap::new()).unwrap();
1206        }
1207
1208        let points = store.list_points(&name).unwrap();
1209        assert_eq!(points.len(), 5);
1210
1211        let count = store.count_points(&name).unwrap();
1212        assert_eq!(count, 5);
1213    }
1214
1215    #[test]
1216    fn get_multiple_points() {
1217        let store = create_test_store();
1218        let name = unique_collection_name();
1219        store.create_collection(&name, CollectionSchema::new()).unwrap();
1220
1221        store.insert_point(&name, PointId::new(1), Payload::new(), HashMap::new()).unwrap();
1222        store.insert_point(&name, PointId::new(3), Payload::new(), HashMap::new()).unwrap();
1223
1224        let results =
1225            store.get_points(&name, &[PointId::new(1), PointId::new(2), PointId::new(3)]).unwrap();
1226
1227        assert_eq!(results.len(), 3);
1228        assert!(results[0].1.is_some()); // Point 1 exists
1229        assert!(results[1].1.is_none()); // Point 2 doesn't exist
1230        assert!(results[2].1.is_some()); // Point 3 exists
1231    }
1232
1233    #[test]
1234    fn dimension_mismatch_fails() {
1235        let store = create_test_store();
1236        let name = unique_collection_name();
1237        let schema = CollectionSchema::new().with_vector("dense", VectorConfig::dense(3));
1238
1239        store.create_collection(&name, schema).unwrap();
1240
1241        let mut vectors = HashMap::new();
1242        vectors.insert("dense".to_string(), NamedVector::Dense(vec![0.1, 0.2])); // Wrong dimension
1243
1244        let result = store.upsert_point(&name, PointId::new(1), Payload::new(), vectors);
1245        assert!(result.is_err());
1246    }
1247
1248    #[test]
1249    fn vector_encoding_roundtrip() {
1250        // Dense
1251        let dense = vec![0.1, 0.2, 0.3, 0.4];
1252        let encoded = encode_dense_vector(&dense);
1253        let decoded = decode_dense_vector(&encoded).unwrap();
1254        assert_eq!(dense, decoded);
1255
1256        // Sparse
1257        let sparse = vec![(10, 0.5), (50, 0.3), (100, 0.2)];
1258        let encoded = encode_sparse_vector(&sparse);
1259        let decoded = decode_sparse_vector(&encoded).unwrap();
1260        assert_eq!(sparse, decoded);
1261
1262        // Multi
1263        let multi = vec![vec![0.1, 0.2], vec![0.3, 0.4], vec![0.5, 0.6]];
1264        let encoded = encode_multi_vector(&multi);
1265        let decoded = decode_multi_vector(&encoded).unwrap();
1266        assert_eq!(multi, decoded);
1267    }
1268}