Skip to main content

claw_vector/
collections.rs

1// collections.rs — collection management and persistence orchestration.
2use std::{collections::HashMap, path::PathBuf, sync::Arc};
3
4use chrono::Utc;
5use tokio::sync::RwLock;
6use tracing::{instrument, warn};
7use uuid::Uuid;
8
9use crate::{
10    config::VectorConfig,
11    error::{VectorError, VectorResult},
12    index::selector::IndexSelector,
13    store::{mmap::MmapVectorFile, sqlite::VectorStore},
14    types::{Collection, DistanceMetric, IndexType, VectorRecord},
15};
16
17/// Coordinates collection metadata, vector files, and in-memory indexes.
18pub struct CollectionManager {
19    /// Runtime configuration shared by all collections.
20    pub config: VectorConfig,
21    /// Persistent metadata store.
22    pub store: Arc<VectorStore>,
23    /// In-memory index implementations keyed by collection name.
24    pub indexes: Arc<RwLock<HashMap<String, IndexSelector>>>,
25    /// Memory-mapped vector files keyed by collection name.
26    pub mmap_files: Arc<RwLock<HashMap<String, MmapVectorFile>>>,
27}
28
29impl CollectionManager {
30    /// Build a manager and restore all persisted collections.
31    #[instrument(skip(store))]
32    pub async fn new(config: VectorConfig, store: Arc<VectorStore>) -> VectorResult<Self> {
33        std::fs::create_dir_all(&config.index_dir)?;
34
35        let manager = CollectionManager {
36            config,
37            store,
38            indexes: Arc::new(RwLock::new(HashMap::new())),
39            mmap_files: Arc::new(RwLock::new(HashMap::new())),
40        };
41
42        let collections = manager.store.list_all_collections().await?;
43        for collection in collections {
44            manager.restore_collection(&collection).await?;
45        }
46
47        Ok(manager)
48    }
49
50    /// Create a new collection and initialize its index and vector file.
51    #[instrument(skip(self))]
52    pub async fn create_collection(
53        &self,
54        workspace_id: &str,
55        name: &str,
56        dimensions: usize,
57        distance: DistanceMetric,
58    ) -> VectorResult<Collection> {
59        if name.trim().is_empty() {
60            return Err(VectorError::Collection {
61                name: name.to_string(),
62                reason: "name must not be empty".into(),
63            });
64        }
65
66        let collection = Collection {
67            workspace_id: workspace_id.to_string(),
68            name: name.to_string(),
69            dimensions,
70            distance,
71            index_type: IndexType::Flat,
72            created_at: Utc::now(),
73            vector_count: 0,
74            metadata: serde_json::json!({}),
75            ef_construction: self.config.ef_construction,
76            m_connections: self.config.m_connections,
77        };
78
79        let dir = self.collection_dir(workspace_id, name);
80        std::fs::create_dir_all(&dir)?;
81        let mmap = MmapVectorFile::create(
82            &self.vector_file_path(workspace_id, name),
83            dimensions,
84            self.config.max_elements.max(1),
85        )?;
86        let index = IndexSelector::new(dimensions, distance, &self.config);
87
88        self.store.create_collection(workspace_id, &collection).await?;
89
90        let key = self.collection_key(workspace_id, name);
91        self.indexes.write().await.insert(key.clone(), index);
92        self.mmap_files.write().await.insert(key, mmap);
93
94        Ok(collection)
95    }
96
97    /// Fetch a collection definition by name.
98    #[instrument(skip(self))]
99    pub async fn get_collection(&self, workspace_id: &str, name: &str) -> VectorResult<Collection> {
100        self.store.get_collection(workspace_id, name).await
101    }
102
103    /// Delete a collection and all of its persisted state.
104    #[instrument(skip(self))]
105    pub async fn delete_collection(&self, workspace_id: &str, name: &str) -> VectorResult<()> {
106        let key = self.collection_key(workspace_id, name);
107        let removed_index = self.indexes.write().await.remove(&key);
108        let removed_mmap = self.mmap_files.write().await.remove(&key);
109
110        if removed_index.is_none() || removed_mmap.is_none() {
111            let exists = self.store.get_collection(workspace_id, name).await.is_ok();
112            if !exists {
113                return Err(VectorError::NotFound {
114                    entity: "collection".into(),
115                    id: format!("{workspace_id}/{name}"),
116                });
117            }
118        }
119
120        if let Err(err) = self.store.delete_collection(workspace_id, name).await {
121            if let Some(index) = removed_index {
122                self.indexes.write().await.insert(key.clone(), index);
123            }
124            if let Some(mmap) = removed_mmap {
125                self.mmap_files.write().await.insert(key.clone(), mmap);
126            }
127            return Err(err);
128        }
129
130        let collection_dir = self.collection_dir(workspace_id, name);
131        if collection_dir.exists() {
132            std::fs::remove_dir_all(collection_dir)?;
133        }
134
135        Ok(())
136    }
137
138    /// List all persisted collections.
139    #[instrument(skip(self))]
140    pub async fn list_collections(&self, workspace_id: &str) -> VectorResult<Vec<Collection>> {
141        self.store.list_collections(workspace_id).await
142    }
143
144    /// Insert a single vector record into its collection.
145    #[instrument(skip(self, record))]
146    pub async fn insert_vector(&self, workspace_id: &str, record: VectorRecord) -> VectorResult<Uuid> {
147        let collection = self
148            .store
149            .get_collection(workspace_id, &record.collection)
150            .await?;
151        if record.vector.len() != collection.dimensions {
152            return Err(VectorError::DimensionMismatch {
153                expected: collection.dimensions,
154                got: record.vector.len(),
155            });
156        }
157
158        let internal_id = self
159            .store
160            .next_internal_id(workspace_id, &record.collection)
161            .await?;
162        let record_id = record.id;
163        self.apply_in_memory_insert(workspace_id, &record, internal_id)
164            .await?;
165
166        if let Err(err) = self
167            .store
168            .insert_record(workspace_id, &record, internal_id)
169            .await
170        {
171            self.rollback_in_memory_insert(workspace_id, &record.collection, internal_id)
172                .await;
173            return Err(err);
174        }
175
176        if let Err(err) = self
177            .store
178            .increment_vector_count(workspace_id, &record.collection, 1)
179            .await
180        {
181            let _ = self.store.delete_record(workspace_id, record.id).await;
182            self.rollback_in_memory_insert(workspace_id, &record.collection, internal_id)
183                .await;
184            return Err(err);
185        }
186
187        self.sync_collection_index_type(workspace_id, &record.collection)
188            .await?;
189
190        Ok(record_id)
191    }
192
193    /// Insert multiple vector records atomically.
194    #[instrument(skip(self, records))]
195    pub async fn insert_batch(
196        &self,
197        workspace_id: &str,
198        records: Vec<VectorRecord>,
199    ) -> VectorResult<Vec<Uuid>> {
200        if records.is_empty() {
201            return Ok(Vec::new());
202        }
203
204        let mut next_ids = HashMap::<String, usize>::new();
205        let mut deltas = HashMap::<String, i64>::new();
206        let mut staged = Vec::with_capacity(records.len());
207        let mut ids = Vec::with_capacity(records.len());
208
209        for record in records {
210            let collection = self
211                .store
212                .get_collection(workspace_id, &record.collection)
213                .await?;
214            if record.vector.len() != collection.dimensions {
215                return Err(VectorError::DimensionMismatch {
216                    expected: collection.dimensions,
217                    got: record.vector.len(),
218                });
219            }
220
221            let next_id = if let Some(next_id) = next_ids.get_mut(&record.collection) {
222                let current = *next_id;
223                *next_id += 1;
224                current
225            } else {
226                let current = self
227                    .store
228                    .next_internal_id(workspace_id, &record.collection)
229                    .await?;
230                next_ids.insert(record.collection.clone(), current + 1);
231                current
232            };
233
234            *deltas.entry(record.collection.clone()).or_insert(0) += 1;
235            ids.push(record.id);
236            staged.push((record, next_id));
237        }
238
239        for (record, internal_id) in &staged {
240            if let Err(err) = self
241                .apply_in_memory_insert(workspace_id, record, *internal_id)
242                .await
243            {
244                self.rollback_batch_in_memory(workspace_id, &staged).await;
245                return Err(err);
246            }
247        }
248
249        if let Err(err) = self.store.batch_insert_records(workspace_id, &staged).await {
250            self.rollback_batch_in_memory(workspace_id, &staged).await;
251            return Err(err);
252        }
253
254        for (collection, delta) in deltas {
255            if let Err(err) = self
256                .store
257                .increment_vector_count(workspace_id, &collection, delta)
258                .await
259            {
260                for (record, _) in &staged {
261                    let _ = self.store.delete_record(workspace_id, record.id).await;
262                }
263                self.rollback_batch_in_memory(workspace_id, &staged).await;
264                return Err(err);
265            }
266            self.sync_collection_index_type(workspace_id, &collection)
267                .await?;
268        }
269
270        Ok(ids)
271    }
272
273    /// Delete a vector from a collection by UUID.
274    #[instrument(skip(self))]
275    pub async fn delete_vector(
276        &self,
277        workspace_id: &str,
278        collection: &str,
279        id: Uuid,
280    ) -> VectorResult<bool> {
281        let (record, internal_id) = match self.store.get_record(workspace_id, id).await {
282            Ok(value) => value,
283            Err(VectorError::NotFound { .. }) => return Ok(false),
284            Err(err) => return Err(err),
285        };
286
287        if record.collection != collection {
288            return Ok(false);
289        }
290
291        {
292            let mut indexes = self.indexes.write().await;
293            let key = self.collection_key(workspace_id, collection);
294            let index = indexes
295                .get_mut(&key)
296                .ok_or_else(|| VectorError::NotFound {
297                    entity: "collection".into(),
298                    id: format!("{workspace_id}/{collection}"),
299                })?;
300            index.delete(internal_id)?;
301        }
302
303        {
304            let mut mmap_files = self.mmap_files.write().await;
305            let key = self.collection_key(workspace_id, collection);
306            let mmap = mmap_files
307                .get_mut(&key)
308                .ok_or_else(|| VectorError::NotFound {
309                    entity: "collection".into(),
310                    id: format!("{workspace_id}/{collection}"),
311                })?;
312            mmap.delete_vector(internal_id)?;
313            mmap.flush()?;
314        }
315
316        self.store.delete_record(workspace_id, id).await?;
317        self.store
318            .increment_vector_count(workspace_id, collection, -1)
319            .await?;
320        Ok(true)
321    }
322
323    /// Load a full vector record, including its raw vector from the mmap file.
324    #[instrument(skip(self))]
325    pub async fn get_vector(
326        &self,
327        workspace_id: &str,
328        collection: &str,
329        id: Uuid,
330    ) -> VectorResult<VectorRecord> {
331        let (mut record, internal_id) = self.store.get_record(workspace_id, id).await?;
332        if record.collection != collection {
333            return Err(VectorError::NotFound {
334                entity: "record".into(),
335                id: id.to_string(),
336            });
337        }
338
339        let mmap_files = self.mmap_files.read().await;
340        let key = self.collection_key(workspace_id, collection);
341        let mmap = mmap_files
342            .get(&key)
343            .ok_or_else(|| VectorError::NotFound {
344                entity: "collection".into(),
345                id: format!("{workspace_id}/{collection}"),
346            })?;
347        record.vector = mmap.read_vector(internal_id)?;
348        Ok(record)
349    }
350
351    /// Persist all loaded indexes to disk.
352    #[instrument(skip(self))]
353    pub async fn persist_indexes(&self) -> VectorResult<()> {
354        let indexes = self.indexes.read().await;
355        for (key, index) in indexes.iter() {
356            if let Some((workspace_id, name)) = key.split_once("::") {
357                index.save(&self.config.index_dir, workspace_id, name)?;
358            }
359        }
360        Ok(())
361    }
362
363    /// Read a raw vector by collection and internal id.
364    pub async fn read_vector_by_internal_id(
365        &self,
366        workspace_id: &str,
367        collection: &str,
368        internal_id: usize,
369    ) -> VectorResult<Vec<f32>> {
370        let mmap_files = self.mmap_files.read().await;
371        let key = self.collection_key(workspace_id, collection);
372        let mmap = mmap_files
373            .get(&key)
374            .ok_or_else(|| VectorError::NotFound {
375                entity: "collection".into(),
376                id: format!("{workspace_id}/{collection}"),
377            })?;
378        mmap.read_vector(internal_id)
379    }
380
381    /// Return the number of loaded indexes.
382    pub async fn loaded_index_count(&self) -> usize {
383        self.indexes.read().await.len()
384    }
385
386    /// Return the number of loaded mmap vector files.
387    pub async fn loaded_mmap_count(&self) -> usize {
388        self.mmap_files.read().await.len()
389    }
390
391    async fn restore_collection(&self, collection: &Collection) -> VectorResult<()> {
392        let dir = self.collection_dir(&collection.workspace_id, &collection.name);
393        std::fs::create_dir_all(&dir)?;
394
395        let mmap_path = self.vector_file_path(&collection.workspace_id, &collection.name);
396        let mmap = if mmap_path.exists() {
397            MmapVectorFile::open(&mmap_path)?
398        } else {
399            MmapVectorFile::create(
400                &mmap_path,
401                collection.dimensions,
402                self.config
403                    .max_elements
404                    .max(collection.vector_count as usize + 1),
405            )?
406        };
407
408        let index = match IndexSelector::load(
409            &self.config.index_dir,
410            &collection.workspace_id,
411            &collection.name,
412            &self.config,
413            collection.distance,
414            collection.dimensions,
415        ) {
416            Ok(index) => index,
417            Err(err) => {
418                warn!(
419                    collection = %collection.name,
420                    error = %err,
421                    "failed to load persisted index, rebuilding from mmap"
422                );
423                self.rebuild_index(collection, &mmap).await?
424            }
425        };
426
427        let key = self.collection_key(&collection.workspace_id, &collection.name);
428        self.indexes
429            .write()
430            .await
431            .insert(key.clone(), index);
432        self.mmap_files
433            .write()
434            .await
435            .insert(key, mmap);
436        self.sync_collection_index_type(&collection.workspace_id, &collection.name)
437            .await?;
438        Ok(())
439    }
440
441    async fn rebuild_index(
442        &self,
443        collection: &Collection,
444        mmap: &MmapVectorFile,
445    ) -> VectorResult<IndexSelector> {
446        let mut index =
447            IndexSelector::new(collection.dimensions, collection.distance, &self.config);
448        let records = self
449            .store
450            .list_records_for_collection(&collection.workspace_id, &collection.name)
451            .await?;
452        let mut items = Vec::with_capacity(records.len());
453        for (_, internal_id) in records {
454            let vector = mmap.read_vector(internal_id)?;
455            items.push((internal_id, vector));
456        }
457        index.insert_batch(items, &self.config)?;
458        Ok(index)
459    }
460
461    async fn apply_in_memory_insert(
462        &self,
463        workspace_id: &str,
464        record: &VectorRecord,
465        internal_id: usize,
466    ) -> VectorResult<()> {
467        {
468            let mut mmap_files = self.mmap_files.write().await;
469            let key = self.collection_key(workspace_id, &record.collection);
470            let mmap =
471                mmap_files
472                    .get_mut(&key)
473                    .ok_or_else(|| VectorError::NotFound {
474                        entity: "collection".into(),
475                        id: format!("{workspace_id}/{}", record.collection),
476                    })?;
477            mmap.write_vector(internal_id, &record.vector)?;
478            mmap.flush()?;
479        }
480
481        let mut indexes = self.indexes.write().await;
482        let key = self.collection_key(workspace_id, &record.collection);
483        let index = indexes
484            .get_mut(&key)
485            .ok_or_else(|| VectorError::NotFound {
486                entity: "collection".into(),
487                id: format!("{workspace_id}/{}", record.collection),
488            })?;
489        index.insert(internal_id, record.vector.clone(), &self.config)?;
490        Ok(())
491    }
492
493    async fn rollback_in_memory_insert(
494        &self,
495        workspace_id: &str,
496        collection: &str,
497        internal_id: usize,
498    ) {
499        let key = self.collection_key(workspace_id, collection);
500        if let Some(index) = self.indexes.write().await.get_mut(&key) {
501            let _ = index.delete(internal_id);
502        }
503        if let Some(mmap) = self.mmap_files.write().await.get_mut(&key) {
504            let _ = mmap.delete_vector(internal_id);
505            let _ = mmap.flush();
506        }
507    }
508
509    async fn rollback_batch_in_memory(&self, workspace_id: &str, staged: &[(VectorRecord, usize)]) {
510        for (record, internal_id) in staged.iter().rev() {
511            self.rollback_in_memory_insert(workspace_id, &record.collection, *internal_id)
512                .await;
513        }
514    }
515
516    fn collection_dir(&self, workspace_id: &str, name: &str) -> PathBuf {
517        self.config.index_dir.join(workspace_id).join(name)
518    }
519
520    fn vector_file_path(&self, workspace_id: &str, name: &str) -> PathBuf {
521        self.collection_dir(workspace_id, name).join("vectors.bin")
522    }
523
524    async fn sync_collection_index_type(&self, workspace_id: &str, collection: &str) -> VectorResult<()> {
525        let current_type = {
526            let indexes = self.indexes.read().await;
527            let key = self.collection_key(workspace_id, collection);
528            let index = indexes
529                .get(&key)
530                .ok_or_else(|| VectorError::NotFound {
531                    entity: "collection".into(),
532                    id: format!("{workspace_id}/{collection}"),
533                })?;
534            if index.is_hnsw() {
535                IndexType::HNSW
536            } else {
537                IndexType::Flat
538            }
539        };
540        self.store
541            .update_collection_index_type(workspace_id, collection, current_type)
542            .await
543    }
544
545    fn collection_key(&self, workspace_id: &str, name: &str) -> String {
546        format!("{workspace_id}::{name}")
547    }
548}