Skip to main content

claw_vector/
collections.rs

1// collections.rs — collection management and persistence orchestration.
2use std::{collections::HashMap, path::PathBuf, sync::Arc};
3
4use chrono::Utc;
5use tokio::sync::RwLock;
6use tracing::{instrument, warn};
7use uuid::Uuid;
8
9use crate::{
10    config::VectorConfig,
11    error::{VectorError, VectorResult},
12    index::selector::IndexSelector,
13    store::{mmap::MmapVectorFile, sqlite::VectorStore},
14    types::{Collection, DistanceMetric, IndexType, VectorRecord},
15};
16
17/// Coordinates collection metadata, vector files, and in-memory indexes.
18pub struct CollectionManager {
19    /// Runtime configuration shared by all collections.
20    pub config: VectorConfig,
21    /// Persistent metadata store.
22    pub store: Arc<VectorStore>,
23    /// In-memory index implementations keyed by collection name.
24    pub indexes: Arc<RwLock<HashMap<String, IndexSelector>>>,
25    /// Memory-mapped vector files keyed by collection name.
26    pub mmap_files: Arc<RwLock<HashMap<String, MmapVectorFile>>>,
27}
28
29impl CollectionManager {
30    /// Build a manager and restore all persisted collections.
31    #[instrument(skip(store))]
32    pub async fn new(config: VectorConfig, store: Arc<VectorStore>) -> VectorResult<Self> {
33        std::fs::create_dir_all(&config.index_dir)?;
34
35        let manager = CollectionManager {
36            config,
37            store,
38            indexes: Arc::new(RwLock::new(HashMap::new())),
39            mmap_files: Arc::new(RwLock::new(HashMap::new())),
40        };
41
42        let collections = manager.store.list_all_collections().await?;
43        for collection in collections {
44            manager.restore_collection(&collection).await?;
45        }
46
47        Ok(manager)
48    }
49
50    /// Create a new collection and initialize its index and vector file.
51    #[instrument(skip(self))]
52    pub async fn create_collection(
53        &self,
54        workspace_id: &str,
55        name: &str,
56        dimensions: usize,
57        distance: DistanceMetric,
58    ) -> VectorResult<Collection> {
59        if name.trim().is_empty() {
60            return Err(VectorError::Collection {
61                name: name.to_string(),
62                reason: "name must not be empty".into(),
63            });
64        }
65
66        let collection = Collection {
67            workspace_id: workspace_id.to_string(),
68            name: name.to_string(),
69            dimensions,
70            distance,
71            index_type: IndexType::Flat,
72            created_at: Utc::now(),
73            vector_count: 0,
74            metadata: serde_json::json!({}),
75            ef_construction: self.config.ef_construction,
76            m_connections: self.config.m_connections,
77        };
78
79        let dir = self.collection_dir(workspace_id, name);
80        std::fs::create_dir_all(&dir)?;
81        let mmap = MmapVectorFile::create(
82            &self.vector_file_path(workspace_id, name),
83            dimensions,
84            self.config.max_elements.max(1),
85        )?;
86        let index = IndexSelector::new(dimensions, distance, &self.config);
87
88        self.store
89            .create_collection(workspace_id, &collection)
90            .await?;
91
92        let key = self.collection_key(workspace_id, name);
93        self.indexes.write().await.insert(key.clone(), index);
94        self.mmap_files.write().await.insert(key, mmap);
95
96        Ok(collection)
97    }
98
99    /// Fetch a collection definition by name.
100    #[instrument(skip(self))]
101    pub async fn get_collection(&self, workspace_id: &str, name: &str) -> VectorResult<Collection> {
102        self.store.get_collection(workspace_id, name).await
103    }
104
105    /// Delete a collection and all of its persisted state.
106    #[instrument(skip(self))]
107    pub async fn delete_collection(&self, workspace_id: &str, name: &str) -> VectorResult<()> {
108        let key = self.collection_key(workspace_id, name);
109        let removed_index = self.indexes.write().await.remove(&key);
110        let removed_mmap = self.mmap_files.write().await.remove(&key);
111
112        if removed_index.is_none() || removed_mmap.is_none() {
113            let exists = self.store.get_collection(workspace_id, name).await.is_ok();
114            if !exists {
115                return Err(VectorError::NotFound {
116                    entity: "collection".into(),
117                    id: format!("{workspace_id}/{name}"),
118                });
119            }
120        }
121
122        if let Err(err) = self.store.delete_collection(workspace_id, name).await {
123            if let Some(index) = removed_index {
124                self.indexes.write().await.insert(key.clone(), index);
125            }
126            if let Some(mmap) = removed_mmap {
127                self.mmap_files.write().await.insert(key.clone(), mmap);
128            }
129            return Err(err);
130        }
131
132        let collection_dir = self.collection_dir(workspace_id, name);
133        if collection_dir.exists() {
134            std::fs::remove_dir_all(collection_dir)?;
135        }
136
137        Ok(())
138    }
139
140    /// List all persisted collections.
141    #[instrument(skip(self))]
142    pub async fn list_collections(&self, workspace_id: &str) -> VectorResult<Vec<Collection>> {
143        self.store.list_collections(workspace_id).await
144    }
145
146    /// Insert a single vector record into its collection.
147    #[instrument(skip(self, record))]
148    pub async fn insert_vector(
149        &self,
150        workspace_id: &str,
151        record: VectorRecord,
152    ) -> VectorResult<Uuid> {
153        let collection = self
154            .store
155            .get_collection(workspace_id, &record.collection)
156            .await?;
157        if record.vector.len() != collection.dimensions {
158            return Err(VectorError::DimensionMismatch {
159                expected: collection.dimensions,
160                got: record.vector.len(),
161            });
162        }
163
164        let internal_id = self
165            .store
166            .next_internal_id(workspace_id, &record.collection)
167            .await?;
168        let record_id = record.id;
169        self.apply_in_memory_insert(workspace_id, &record, internal_id)
170            .await?;
171
172        if let Err(err) = self
173            .store
174            .insert_record(workspace_id, &record, internal_id)
175            .await
176        {
177            self.rollback_in_memory_insert(workspace_id, &record.collection, internal_id)
178                .await;
179            return Err(err);
180        }
181
182        if let Err(err) = self
183            .store
184            .increment_vector_count(workspace_id, &record.collection, 1)
185            .await
186        {
187            let _ = self.store.delete_record(workspace_id, record.id).await;
188            self.rollback_in_memory_insert(workspace_id, &record.collection, internal_id)
189                .await;
190            return Err(err);
191        }
192
193        self.sync_collection_index_type(workspace_id, &record.collection)
194            .await?;
195
196        Ok(record_id)
197    }
198
199    /// Insert multiple vector records atomically.
200    #[instrument(skip(self, records))]
201    pub async fn insert_batch(
202        &self,
203        workspace_id: &str,
204        records: Vec<VectorRecord>,
205    ) -> VectorResult<Vec<Uuid>> {
206        if records.is_empty() {
207            return Ok(Vec::new());
208        }
209
210        let mut next_ids = HashMap::<String, usize>::new();
211        let mut deltas = HashMap::<String, i64>::new();
212        let mut staged = Vec::with_capacity(records.len());
213        let mut ids = Vec::with_capacity(records.len());
214
215        for record in records {
216            let collection = self
217                .store
218                .get_collection(workspace_id, &record.collection)
219                .await?;
220            if record.vector.len() != collection.dimensions {
221                return Err(VectorError::DimensionMismatch {
222                    expected: collection.dimensions,
223                    got: record.vector.len(),
224                });
225            }
226
227            let next_id = if let Some(next_id) = next_ids.get_mut(&record.collection) {
228                let current = *next_id;
229                *next_id += 1;
230                current
231            } else {
232                let current = self
233                    .store
234                    .next_internal_id(workspace_id, &record.collection)
235                    .await?;
236                next_ids.insert(record.collection.clone(), current + 1);
237                current
238            };
239
240            *deltas.entry(record.collection.clone()).or_insert(0) += 1;
241            ids.push(record.id);
242            staged.push((record, next_id));
243        }
244
245        for (record, internal_id) in &staged {
246            if let Err(err) = self
247                .apply_in_memory_insert(workspace_id, record, *internal_id)
248                .await
249            {
250                self.rollback_batch_in_memory(workspace_id, &staged).await;
251                return Err(err);
252            }
253        }
254
255        if let Err(err) = self.store.batch_insert_records(workspace_id, &staged).await {
256            self.rollback_batch_in_memory(workspace_id, &staged).await;
257            return Err(err);
258        }
259
260        for (collection, delta) in deltas {
261            if let Err(err) = self
262                .store
263                .increment_vector_count(workspace_id, &collection, delta)
264                .await
265            {
266                for (record, _) in &staged {
267                    let _ = self.store.delete_record(workspace_id, record.id).await;
268                }
269                self.rollback_batch_in_memory(workspace_id, &staged).await;
270                return Err(err);
271            }
272            self.sync_collection_index_type(workspace_id, &collection)
273                .await?;
274        }
275
276        Ok(ids)
277    }
278
279    /// Delete a vector from a collection by UUID.
280    #[instrument(skip(self))]
281    pub async fn delete_vector(
282        &self,
283        workspace_id: &str,
284        collection: &str,
285        id: Uuid,
286    ) -> VectorResult<bool> {
287        let (record, internal_id) = match self.store.get_record(workspace_id, id).await {
288            Ok(value) => value,
289            Err(VectorError::NotFound { .. }) => return Ok(false),
290            Err(err) => return Err(err),
291        };
292
293        if record.collection != collection {
294            return Ok(false);
295        }
296
297        {
298            let mut indexes = self.indexes.write().await;
299            let key = self.collection_key(workspace_id, collection);
300            let index = indexes.get_mut(&key).ok_or_else(|| VectorError::NotFound {
301                entity: "collection".into(),
302                id: format!("{workspace_id}/{collection}"),
303            })?;
304            index.delete(internal_id)?;
305        }
306
307        {
308            let mut mmap_files = self.mmap_files.write().await;
309            let key = self.collection_key(workspace_id, collection);
310            let mmap = mmap_files
311                .get_mut(&key)
312                .ok_or_else(|| VectorError::NotFound {
313                    entity: "collection".into(),
314                    id: format!("{workspace_id}/{collection}"),
315                })?;
316            mmap.delete_vector(internal_id)?;
317            mmap.flush()?;
318        }
319
320        self.store.delete_record(workspace_id, id).await?;
321        self.store
322            .increment_vector_count(workspace_id, collection, -1)
323            .await?;
324        Ok(true)
325    }
326
327    /// Load a full vector record, including its raw vector from the mmap file.
328    #[instrument(skip(self))]
329    pub async fn get_vector(
330        &self,
331        workspace_id: &str,
332        collection: &str,
333        id: Uuid,
334    ) -> VectorResult<VectorRecord> {
335        let (mut record, internal_id) = self.store.get_record(workspace_id, id).await?;
336        if record.collection != collection {
337            return Err(VectorError::NotFound {
338                entity: "record".into(),
339                id: id.to_string(),
340            });
341        }
342
343        let mmap_files = self.mmap_files.read().await;
344        let key = self.collection_key(workspace_id, collection);
345        let mmap = mmap_files.get(&key).ok_or_else(|| VectorError::NotFound {
346            entity: "collection".into(),
347            id: format!("{workspace_id}/{collection}"),
348        })?;
349        record.vector = mmap.read_vector(internal_id)?;
350        Ok(record)
351    }
352
353    /// Persist all loaded indexes to disk.
354    #[instrument(skip(self))]
355    pub async fn persist_indexes(&self) -> VectorResult<()> {
356        let indexes = self.indexes.read().await;
357        for (key, index) in indexes.iter() {
358            if let Some((workspace_id, name)) = key.split_once("::") {
359                index.save(&self.config.index_dir, workspace_id, name)?;
360            }
361        }
362        Ok(())
363    }
364
365    /// Read a raw vector by collection and internal id.
366    pub async fn read_vector_by_internal_id(
367        &self,
368        workspace_id: &str,
369        collection: &str,
370        internal_id: usize,
371    ) -> VectorResult<Vec<f32>> {
372        let mmap_files = self.mmap_files.read().await;
373        let key = self.collection_key(workspace_id, collection);
374        let mmap = mmap_files.get(&key).ok_or_else(|| VectorError::NotFound {
375            entity: "collection".into(),
376            id: format!("{workspace_id}/{collection}"),
377        })?;
378        mmap.read_vector(internal_id)
379    }
380
381    /// Return the number of loaded indexes.
382    pub async fn loaded_index_count(&self) -> usize {
383        self.indexes.read().await.len()
384    }
385
386    /// Return the number of loaded mmap vector files.
387    pub async fn loaded_mmap_count(&self) -> usize {
388        self.mmap_files.read().await.len()
389    }
390
391    async fn restore_collection(&self, collection: &Collection) -> VectorResult<()> {
392        let dir = self.collection_dir(&collection.workspace_id, &collection.name);
393        std::fs::create_dir_all(&dir)?;
394
395        let mmap_path = self.vector_file_path(&collection.workspace_id, &collection.name);
396        let mmap = if mmap_path.exists() {
397            MmapVectorFile::open(&mmap_path)?
398        } else {
399            MmapVectorFile::create(
400                &mmap_path,
401                collection.dimensions,
402                self.config
403                    .max_elements
404                    .max(collection.vector_count as usize + 1),
405            )?
406        };
407
408        let index = match IndexSelector::load(
409            &self.config.index_dir,
410            &collection.workspace_id,
411            &collection.name,
412            &self.config,
413            collection.distance,
414            collection.dimensions,
415        ) {
416            Ok(index) => index,
417            Err(err) => {
418                warn!(
419                    collection = %collection.name,
420                    error = %err,
421                    "failed to load persisted index, rebuilding from mmap"
422                );
423                self.rebuild_index(collection, &mmap).await?
424            }
425        };
426
427        let key = self.collection_key(&collection.workspace_id, &collection.name);
428        self.indexes.write().await.insert(key.clone(), index);
429        self.mmap_files.write().await.insert(key, mmap);
430        self.sync_collection_index_type(&collection.workspace_id, &collection.name)
431            .await?;
432        Ok(())
433    }
434
435    async fn rebuild_index(
436        &self,
437        collection: &Collection,
438        mmap: &MmapVectorFile,
439    ) -> VectorResult<IndexSelector> {
440        let mut index =
441            IndexSelector::new(collection.dimensions, collection.distance, &self.config);
442        let records = self
443            .store
444            .list_records_for_collection(&collection.workspace_id, &collection.name)
445            .await?;
446        let mut items = Vec::with_capacity(records.len());
447        for (_, internal_id) in records {
448            let vector = mmap.read_vector(internal_id)?;
449            items.push((internal_id, vector));
450        }
451        index.insert_batch(items, &self.config)?;
452        Ok(index)
453    }
454
455    async fn apply_in_memory_insert(
456        &self,
457        workspace_id: &str,
458        record: &VectorRecord,
459        internal_id: usize,
460    ) -> VectorResult<()> {
461        {
462            let mut mmap_files = self.mmap_files.write().await;
463            let key = self.collection_key(workspace_id, &record.collection);
464            let mmap = mmap_files
465                .get_mut(&key)
466                .ok_or_else(|| VectorError::NotFound {
467                    entity: "collection".into(),
468                    id: format!("{workspace_id}/{}", record.collection),
469                })?;
470            mmap.write_vector(internal_id, &record.vector)?;
471            mmap.flush()?;
472        }
473
474        let mut indexes = self.indexes.write().await;
475        let key = self.collection_key(workspace_id, &record.collection);
476        let index = indexes.get_mut(&key).ok_or_else(|| VectorError::NotFound {
477            entity: "collection".into(),
478            id: format!("{workspace_id}/{}", record.collection),
479        })?;
480        index.insert(internal_id, record.vector.clone(), &self.config)?;
481        Ok(())
482    }
483
484    async fn rollback_in_memory_insert(
485        &self,
486        workspace_id: &str,
487        collection: &str,
488        internal_id: usize,
489    ) {
490        let key = self.collection_key(workspace_id, collection);
491        if let Some(index) = self.indexes.write().await.get_mut(&key) {
492            let _ = index.delete(internal_id);
493        }
494        if let Some(mmap) = self.mmap_files.write().await.get_mut(&key) {
495            let _ = mmap.delete_vector(internal_id);
496            let _ = mmap.flush();
497        }
498    }
499
500    async fn rollback_batch_in_memory(&self, workspace_id: &str, staged: &[(VectorRecord, usize)]) {
501        for (record, internal_id) in staged.iter().rev() {
502            self.rollback_in_memory_insert(workspace_id, &record.collection, *internal_id)
503                .await;
504        }
505    }
506
507    fn collection_dir(&self, workspace_id: &str, name: &str) -> PathBuf {
508        self.config.index_dir.join(workspace_id).join(name)
509    }
510
511    fn vector_file_path(&self, workspace_id: &str, name: &str) -> PathBuf {
512        self.collection_dir(workspace_id, name).join("vectors.bin")
513    }
514
515    async fn sync_collection_index_type(
516        &self,
517        workspace_id: &str,
518        collection: &str,
519    ) -> VectorResult<()> {
520        let current_type = {
521            let indexes = self.indexes.read().await;
522            let key = self.collection_key(workspace_id, collection);
523            let index = indexes.get(&key).ok_or_else(|| VectorError::NotFound {
524                entity: "collection".into(),
525                id: format!("{workspace_id}/{collection}"),
526            })?;
527            if index.is_hnsw() {
528                IndexType::HNSW
529            } else {
530                IndexType::Flat
531            }
532        };
533        self.store
534            .update_collection_index_type(workspace_id, collection, current_type)
535            .await
536    }
537
538    fn collection_key(&self, workspace_id: &str, name: &str) -> String {
539        format!("{workspace_id}::{name}")
540    }
541}