1use std::{collections::HashMap, path::PathBuf, sync::Arc};
3
4use chrono::Utc;
5use tokio::sync::RwLock;
6use tracing::{instrument, warn};
7use uuid::Uuid;
8
9use crate::{
10 config::VectorConfig,
11 error::{VectorError, VectorResult},
12 index::selector::IndexSelector,
13 store::{mmap::MmapVectorFile, sqlite::VectorStore},
14 types::{Collection, DistanceMetric, IndexType, VectorRecord},
15};
16
17pub struct CollectionManager {
19 pub config: VectorConfig,
21 pub store: Arc<VectorStore>,
23 pub indexes: Arc<RwLock<HashMap<String, IndexSelector>>>,
25 pub mmap_files: Arc<RwLock<HashMap<String, MmapVectorFile>>>,
27}
28
29impl CollectionManager {
30 #[instrument(skip(store))]
32 pub async fn new(config: VectorConfig, store: Arc<VectorStore>) -> VectorResult<Self> {
33 std::fs::create_dir_all(&config.index_dir)?;
34
35 let manager = CollectionManager {
36 config,
37 store,
38 indexes: Arc::new(RwLock::new(HashMap::new())),
39 mmap_files: Arc::new(RwLock::new(HashMap::new())),
40 };
41
42 let collections = manager.store.list_all_collections().await?;
43 for collection in collections {
44 manager.restore_collection(&collection).await?;
45 }
46
47 Ok(manager)
48 }
49
50 #[instrument(skip(self))]
52 pub async fn create_collection(
53 &self,
54 workspace_id: &str,
55 name: &str,
56 dimensions: usize,
57 distance: DistanceMetric,
58 ) -> VectorResult<Collection> {
59 if name.trim().is_empty() {
60 return Err(VectorError::Collection {
61 name: name.to_string(),
62 reason: "name must not be empty".into(),
63 });
64 }
65
66 let collection = Collection {
67 workspace_id: workspace_id.to_string(),
68 name: name.to_string(),
69 dimensions,
70 distance,
71 index_type: IndexType::Flat,
72 created_at: Utc::now(),
73 vector_count: 0,
74 metadata: serde_json::json!({}),
75 ef_construction: self.config.ef_construction,
76 m_connections: self.config.m_connections,
77 };
78
79 let dir = self.collection_dir(workspace_id, name);
80 std::fs::create_dir_all(&dir)?;
81 let mmap = MmapVectorFile::create(
82 &self.vector_file_path(workspace_id, name),
83 dimensions,
84 self.config.max_elements.max(1),
85 )?;
86 let index = IndexSelector::new(dimensions, distance, &self.config);
87
88 self.store.create_collection(workspace_id, &collection).await?;
89
90 let key = self.collection_key(workspace_id, name);
91 self.indexes.write().await.insert(key.clone(), index);
92 self.mmap_files.write().await.insert(key, mmap);
93
94 Ok(collection)
95 }
96
97 #[instrument(skip(self))]
99 pub async fn get_collection(&self, workspace_id: &str, name: &str) -> VectorResult<Collection> {
100 self.store.get_collection(workspace_id, name).await
101 }
102
103 #[instrument(skip(self))]
105 pub async fn delete_collection(&self, workspace_id: &str, name: &str) -> VectorResult<()> {
106 let key = self.collection_key(workspace_id, name);
107 let removed_index = self.indexes.write().await.remove(&key);
108 let removed_mmap = self.mmap_files.write().await.remove(&key);
109
110 if removed_index.is_none() || removed_mmap.is_none() {
111 let exists = self.store.get_collection(workspace_id, name).await.is_ok();
112 if !exists {
113 return Err(VectorError::NotFound {
114 entity: "collection".into(),
115 id: format!("{workspace_id}/{name}"),
116 });
117 }
118 }
119
120 if let Err(err) = self.store.delete_collection(workspace_id, name).await {
121 if let Some(index) = removed_index {
122 self.indexes.write().await.insert(key.clone(), index);
123 }
124 if let Some(mmap) = removed_mmap {
125 self.mmap_files.write().await.insert(key.clone(), mmap);
126 }
127 return Err(err);
128 }
129
130 let collection_dir = self.collection_dir(workspace_id, name);
131 if collection_dir.exists() {
132 std::fs::remove_dir_all(collection_dir)?;
133 }
134
135 Ok(())
136 }
137
138 #[instrument(skip(self))]
140 pub async fn list_collections(&self, workspace_id: &str) -> VectorResult<Vec<Collection>> {
141 self.store.list_collections(workspace_id).await
142 }
143
144 #[instrument(skip(self, record))]
146 pub async fn insert_vector(&self, workspace_id: &str, record: VectorRecord) -> VectorResult<Uuid> {
147 let collection = self
148 .store
149 .get_collection(workspace_id, &record.collection)
150 .await?;
151 if record.vector.len() != collection.dimensions {
152 return Err(VectorError::DimensionMismatch {
153 expected: collection.dimensions,
154 got: record.vector.len(),
155 });
156 }
157
158 let internal_id = self
159 .store
160 .next_internal_id(workspace_id, &record.collection)
161 .await?;
162 let record_id = record.id;
163 self.apply_in_memory_insert(workspace_id, &record, internal_id)
164 .await?;
165
166 if let Err(err) = self
167 .store
168 .insert_record(workspace_id, &record, internal_id)
169 .await
170 {
171 self.rollback_in_memory_insert(workspace_id, &record.collection, internal_id)
172 .await;
173 return Err(err);
174 }
175
176 if let Err(err) = self
177 .store
178 .increment_vector_count(workspace_id, &record.collection, 1)
179 .await
180 {
181 let _ = self.store.delete_record(workspace_id, record.id).await;
182 self.rollback_in_memory_insert(workspace_id, &record.collection, internal_id)
183 .await;
184 return Err(err);
185 }
186
187 self.sync_collection_index_type(workspace_id, &record.collection)
188 .await?;
189
190 Ok(record_id)
191 }
192
193 #[instrument(skip(self, records))]
195 pub async fn insert_batch(
196 &self,
197 workspace_id: &str,
198 records: Vec<VectorRecord>,
199 ) -> VectorResult<Vec<Uuid>> {
200 if records.is_empty() {
201 return Ok(Vec::new());
202 }
203
204 let mut next_ids = HashMap::<String, usize>::new();
205 let mut deltas = HashMap::<String, i64>::new();
206 let mut staged = Vec::with_capacity(records.len());
207 let mut ids = Vec::with_capacity(records.len());
208
209 for record in records {
210 let collection = self
211 .store
212 .get_collection(workspace_id, &record.collection)
213 .await?;
214 if record.vector.len() != collection.dimensions {
215 return Err(VectorError::DimensionMismatch {
216 expected: collection.dimensions,
217 got: record.vector.len(),
218 });
219 }
220
221 let next_id = if let Some(next_id) = next_ids.get_mut(&record.collection) {
222 let current = *next_id;
223 *next_id += 1;
224 current
225 } else {
226 let current = self
227 .store
228 .next_internal_id(workspace_id, &record.collection)
229 .await?;
230 next_ids.insert(record.collection.clone(), current + 1);
231 current
232 };
233
234 *deltas.entry(record.collection.clone()).or_insert(0) += 1;
235 ids.push(record.id);
236 staged.push((record, next_id));
237 }
238
239 for (record, internal_id) in &staged {
240 if let Err(err) = self
241 .apply_in_memory_insert(workspace_id, record, *internal_id)
242 .await
243 {
244 self.rollback_batch_in_memory(workspace_id, &staged).await;
245 return Err(err);
246 }
247 }
248
249 if let Err(err) = self.store.batch_insert_records(workspace_id, &staged).await {
250 self.rollback_batch_in_memory(workspace_id, &staged).await;
251 return Err(err);
252 }
253
254 for (collection, delta) in deltas {
255 if let Err(err) = self
256 .store
257 .increment_vector_count(workspace_id, &collection, delta)
258 .await
259 {
260 for (record, _) in &staged {
261 let _ = self.store.delete_record(workspace_id, record.id).await;
262 }
263 self.rollback_batch_in_memory(workspace_id, &staged).await;
264 return Err(err);
265 }
266 self.sync_collection_index_type(workspace_id, &collection)
267 .await?;
268 }
269
270 Ok(ids)
271 }
272
273 #[instrument(skip(self))]
275 pub async fn delete_vector(
276 &self,
277 workspace_id: &str,
278 collection: &str,
279 id: Uuid,
280 ) -> VectorResult<bool> {
281 let (record, internal_id) = match self.store.get_record(workspace_id, id).await {
282 Ok(value) => value,
283 Err(VectorError::NotFound { .. }) => return Ok(false),
284 Err(err) => return Err(err),
285 };
286
287 if record.collection != collection {
288 return Ok(false);
289 }
290
291 {
292 let mut indexes = self.indexes.write().await;
293 let key = self.collection_key(workspace_id, collection);
294 let index = indexes
295 .get_mut(&key)
296 .ok_or_else(|| VectorError::NotFound {
297 entity: "collection".into(),
298 id: format!("{workspace_id}/{collection}"),
299 })?;
300 index.delete(internal_id)?;
301 }
302
303 {
304 let mut mmap_files = self.mmap_files.write().await;
305 let key = self.collection_key(workspace_id, collection);
306 let mmap = mmap_files
307 .get_mut(&key)
308 .ok_or_else(|| VectorError::NotFound {
309 entity: "collection".into(),
310 id: format!("{workspace_id}/{collection}"),
311 })?;
312 mmap.delete_vector(internal_id)?;
313 mmap.flush()?;
314 }
315
316 self.store.delete_record(workspace_id, id).await?;
317 self.store
318 .increment_vector_count(workspace_id, collection, -1)
319 .await?;
320 Ok(true)
321 }
322
323 #[instrument(skip(self))]
325 pub async fn get_vector(
326 &self,
327 workspace_id: &str,
328 collection: &str,
329 id: Uuid,
330 ) -> VectorResult<VectorRecord> {
331 let (mut record, internal_id) = self.store.get_record(workspace_id, id).await?;
332 if record.collection != collection {
333 return Err(VectorError::NotFound {
334 entity: "record".into(),
335 id: id.to_string(),
336 });
337 }
338
339 let mmap_files = self.mmap_files.read().await;
340 let key = self.collection_key(workspace_id, collection);
341 let mmap = mmap_files
342 .get(&key)
343 .ok_or_else(|| VectorError::NotFound {
344 entity: "collection".into(),
345 id: format!("{workspace_id}/{collection}"),
346 })?;
347 record.vector = mmap.read_vector(internal_id)?;
348 Ok(record)
349 }
350
351 #[instrument(skip(self))]
353 pub async fn persist_indexes(&self) -> VectorResult<()> {
354 let indexes = self.indexes.read().await;
355 for (key, index) in indexes.iter() {
356 if let Some((workspace_id, name)) = key.split_once("::") {
357 index.save(&self.config.index_dir, workspace_id, name)?;
358 }
359 }
360 Ok(())
361 }
362
363 pub async fn read_vector_by_internal_id(
365 &self,
366 workspace_id: &str,
367 collection: &str,
368 internal_id: usize,
369 ) -> VectorResult<Vec<f32>> {
370 let mmap_files = self.mmap_files.read().await;
371 let key = self.collection_key(workspace_id, collection);
372 let mmap = mmap_files
373 .get(&key)
374 .ok_or_else(|| VectorError::NotFound {
375 entity: "collection".into(),
376 id: format!("{workspace_id}/{collection}"),
377 })?;
378 mmap.read_vector(internal_id)
379 }
380
381 pub async fn loaded_index_count(&self) -> usize {
383 self.indexes.read().await.len()
384 }
385
386 pub async fn loaded_mmap_count(&self) -> usize {
388 self.mmap_files.read().await.len()
389 }
390
391 async fn restore_collection(&self, collection: &Collection) -> VectorResult<()> {
392 let dir = self.collection_dir(&collection.workspace_id, &collection.name);
393 std::fs::create_dir_all(&dir)?;
394
395 let mmap_path = self.vector_file_path(&collection.workspace_id, &collection.name);
396 let mmap = if mmap_path.exists() {
397 MmapVectorFile::open(&mmap_path)?
398 } else {
399 MmapVectorFile::create(
400 &mmap_path,
401 collection.dimensions,
402 self.config
403 .max_elements
404 .max(collection.vector_count as usize + 1),
405 )?
406 };
407
408 let index = match IndexSelector::load(
409 &self.config.index_dir,
410 &collection.workspace_id,
411 &collection.name,
412 &self.config,
413 collection.distance,
414 collection.dimensions,
415 ) {
416 Ok(index) => index,
417 Err(err) => {
418 warn!(
419 collection = %collection.name,
420 error = %err,
421 "failed to load persisted index, rebuilding from mmap"
422 );
423 self.rebuild_index(collection, &mmap).await?
424 }
425 };
426
427 let key = self.collection_key(&collection.workspace_id, &collection.name);
428 self.indexes
429 .write()
430 .await
431 .insert(key.clone(), index);
432 self.mmap_files
433 .write()
434 .await
435 .insert(key, mmap);
436 self.sync_collection_index_type(&collection.workspace_id, &collection.name)
437 .await?;
438 Ok(())
439 }
440
441 async fn rebuild_index(
442 &self,
443 collection: &Collection,
444 mmap: &MmapVectorFile,
445 ) -> VectorResult<IndexSelector> {
446 let mut index =
447 IndexSelector::new(collection.dimensions, collection.distance, &self.config);
448 let records = self
449 .store
450 .list_records_for_collection(&collection.workspace_id, &collection.name)
451 .await?;
452 let mut items = Vec::with_capacity(records.len());
453 for (_, internal_id) in records {
454 let vector = mmap.read_vector(internal_id)?;
455 items.push((internal_id, vector));
456 }
457 index.insert_batch(items, &self.config)?;
458 Ok(index)
459 }
460
461 async fn apply_in_memory_insert(
462 &self,
463 workspace_id: &str,
464 record: &VectorRecord,
465 internal_id: usize,
466 ) -> VectorResult<()> {
467 {
468 let mut mmap_files = self.mmap_files.write().await;
469 let key = self.collection_key(workspace_id, &record.collection);
470 let mmap =
471 mmap_files
472 .get_mut(&key)
473 .ok_or_else(|| VectorError::NotFound {
474 entity: "collection".into(),
475 id: format!("{workspace_id}/{}", record.collection),
476 })?;
477 mmap.write_vector(internal_id, &record.vector)?;
478 mmap.flush()?;
479 }
480
481 let mut indexes = self.indexes.write().await;
482 let key = self.collection_key(workspace_id, &record.collection);
483 let index = indexes
484 .get_mut(&key)
485 .ok_or_else(|| VectorError::NotFound {
486 entity: "collection".into(),
487 id: format!("{workspace_id}/{}", record.collection),
488 })?;
489 index.insert(internal_id, record.vector.clone(), &self.config)?;
490 Ok(())
491 }
492
493 async fn rollback_in_memory_insert(
494 &self,
495 workspace_id: &str,
496 collection: &str,
497 internal_id: usize,
498 ) {
499 let key = self.collection_key(workspace_id, collection);
500 if let Some(index) = self.indexes.write().await.get_mut(&key) {
501 let _ = index.delete(internal_id);
502 }
503 if let Some(mmap) = self.mmap_files.write().await.get_mut(&key) {
504 let _ = mmap.delete_vector(internal_id);
505 let _ = mmap.flush();
506 }
507 }
508
509 async fn rollback_batch_in_memory(&self, workspace_id: &str, staged: &[(VectorRecord, usize)]) {
510 for (record, internal_id) in staged.iter().rev() {
511 self.rollback_in_memory_insert(workspace_id, &record.collection, *internal_id)
512 .await;
513 }
514 }
515
516 fn collection_dir(&self, workspace_id: &str, name: &str) -> PathBuf {
517 self.config.index_dir.join(workspace_id).join(name)
518 }
519
520 fn vector_file_path(&self, workspace_id: &str, name: &str) -> PathBuf {
521 self.collection_dir(workspace_id, name).join("vectors.bin")
522 }
523
524 async fn sync_collection_index_type(&self, workspace_id: &str, collection: &str) -> VectorResult<()> {
525 let current_type = {
526 let indexes = self.indexes.read().await;
527 let key = self.collection_key(workspace_id, collection);
528 let index = indexes
529 .get(&key)
530 .ok_or_else(|| VectorError::NotFound {
531 entity: "collection".into(),
532 id: format!("{workspace_id}/{collection}"),
533 })?;
534 if index.is_hnsw() {
535 IndexType::HNSW
536 } else {
537 IndexType::Flat
538 }
539 };
540 self.store
541 .update_collection_index_type(workspace_id, collection, current_type)
542 .await
543 }
544
545 fn collection_key(&self, workspace_id: &str, name: &str) -> String {
546 format!("{workspace_id}::{name}")
547 }
548}