1use std::{collections::HashMap, path::PathBuf, sync::Arc};
3
4use chrono::Utc;
5use tokio::sync::RwLock;
6use tracing::{instrument, warn};
7use uuid::Uuid;
8
9use crate::{
10 config::VectorConfig,
11 error::{VectorError, VectorResult},
12 index::selector::IndexSelector,
13 store::{mmap::MmapVectorFile, sqlite::VectorStore},
14 types::{Collection, DistanceMetric, IndexType, VectorRecord},
15};
16
17pub struct CollectionManager {
19 pub config: VectorConfig,
21 pub store: Arc<VectorStore>,
23 pub indexes: Arc<RwLock<HashMap<String, IndexSelector>>>,
25 pub mmap_files: Arc<RwLock<HashMap<String, MmapVectorFile>>>,
27}
28
29impl CollectionManager {
30 #[instrument(skip(store))]
32 pub async fn new(config: VectorConfig, store: Arc<VectorStore>) -> VectorResult<Self> {
33 std::fs::create_dir_all(&config.index_dir)?;
34
35 let manager = CollectionManager {
36 config,
37 store,
38 indexes: Arc::new(RwLock::new(HashMap::new())),
39 mmap_files: Arc::new(RwLock::new(HashMap::new())),
40 };
41
42 let collections = manager.store.list_all_collections().await?;
43 for collection in collections {
44 manager.restore_collection(&collection).await?;
45 }
46
47 Ok(manager)
48 }
49
50 #[instrument(skip(self))]
52 pub async fn create_collection(
53 &self,
54 workspace_id: &str,
55 name: &str,
56 dimensions: usize,
57 distance: DistanceMetric,
58 ) -> VectorResult<Collection> {
59 if name.trim().is_empty() {
60 return Err(VectorError::Collection {
61 name: name.to_string(),
62 reason: "name must not be empty".into(),
63 });
64 }
65
66 let collection = Collection {
67 workspace_id: workspace_id.to_string(),
68 name: name.to_string(),
69 dimensions,
70 distance,
71 index_type: IndexType::Flat,
72 created_at: Utc::now(),
73 vector_count: 0,
74 metadata: serde_json::json!({}),
75 ef_construction: self.config.ef_construction,
76 m_connections: self.config.m_connections,
77 };
78
79 let dir = self.collection_dir(workspace_id, name);
80 std::fs::create_dir_all(&dir)?;
81 let mmap = MmapVectorFile::create(
82 &self.vector_file_path(workspace_id, name),
83 dimensions,
84 self.config.max_elements.max(1),
85 )?;
86 let index = IndexSelector::new(dimensions, distance, &self.config);
87
88 self.store
89 .create_collection(workspace_id, &collection)
90 .await?;
91
92 let key = self.collection_key(workspace_id, name);
93 self.indexes.write().await.insert(key.clone(), index);
94 self.mmap_files.write().await.insert(key, mmap);
95
96 Ok(collection)
97 }
98
99 #[instrument(skip(self))]
101 pub async fn get_collection(&self, workspace_id: &str, name: &str) -> VectorResult<Collection> {
102 self.store.get_collection(workspace_id, name).await
103 }
104
105 #[instrument(skip(self))]
107 pub async fn delete_collection(&self, workspace_id: &str, name: &str) -> VectorResult<()> {
108 let key = self.collection_key(workspace_id, name);
109 let removed_index = self.indexes.write().await.remove(&key);
110 let removed_mmap = self.mmap_files.write().await.remove(&key);
111
112 if removed_index.is_none() || removed_mmap.is_none() {
113 let exists = self.store.get_collection(workspace_id, name).await.is_ok();
114 if !exists {
115 return Err(VectorError::NotFound {
116 entity: "collection".into(),
117 id: format!("{workspace_id}/{name}"),
118 });
119 }
120 }
121
122 if let Err(err) = self.store.delete_collection(workspace_id, name).await {
123 if let Some(index) = removed_index {
124 self.indexes.write().await.insert(key.clone(), index);
125 }
126 if let Some(mmap) = removed_mmap {
127 self.mmap_files.write().await.insert(key.clone(), mmap);
128 }
129 return Err(err);
130 }
131
132 let collection_dir = self.collection_dir(workspace_id, name);
133 if collection_dir.exists() {
134 std::fs::remove_dir_all(collection_dir)?;
135 }
136
137 Ok(())
138 }
139
140 #[instrument(skip(self))]
142 pub async fn list_collections(&self, workspace_id: &str) -> VectorResult<Vec<Collection>> {
143 self.store.list_collections(workspace_id).await
144 }
145
146 #[instrument(skip(self, record))]
148 pub async fn insert_vector(
149 &self,
150 workspace_id: &str,
151 record: VectorRecord,
152 ) -> VectorResult<Uuid> {
153 let collection = self
154 .store
155 .get_collection(workspace_id, &record.collection)
156 .await?;
157 if record.vector.len() != collection.dimensions {
158 return Err(VectorError::DimensionMismatch {
159 expected: collection.dimensions,
160 got: record.vector.len(),
161 });
162 }
163
164 let internal_id = self
165 .store
166 .next_internal_id(workspace_id, &record.collection)
167 .await?;
168 let record_id = record.id;
169 self.apply_in_memory_insert(workspace_id, &record, internal_id)
170 .await?;
171
172 if let Err(err) = self
173 .store
174 .insert_record(workspace_id, &record, internal_id)
175 .await
176 {
177 self.rollback_in_memory_insert(workspace_id, &record.collection, internal_id)
178 .await;
179 return Err(err);
180 }
181
182 if let Err(err) = self
183 .store
184 .increment_vector_count(workspace_id, &record.collection, 1)
185 .await
186 {
187 let _ = self.store.delete_record(workspace_id, record.id).await;
188 self.rollback_in_memory_insert(workspace_id, &record.collection, internal_id)
189 .await;
190 return Err(err);
191 }
192
193 self.sync_collection_index_type(workspace_id, &record.collection)
194 .await?;
195
196 Ok(record_id)
197 }
198
199 #[instrument(skip(self, records))]
201 pub async fn insert_batch(
202 &self,
203 workspace_id: &str,
204 records: Vec<VectorRecord>,
205 ) -> VectorResult<Vec<Uuid>> {
206 if records.is_empty() {
207 return Ok(Vec::new());
208 }
209
210 let mut next_ids = HashMap::<String, usize>::new();
211 let mut deltas = HashMap::<String, i64>::new();
212 let mut staged = Vec::with_capacity(records.len());
213 let mut ids = Vec::with_capacity(records.len());
214
215 for record in records {
216 let collection = self
217 .store
218 .get_collection(workspace_id, &record.collection)
219 .await?;
220 if record.vector.len() != collection.dimensions {
221 return Err(VectorError::DimensionMismatch {
222 expected: collection.dimensions,
223 got: record.vector.len(),
224 });
225 }
226
227 let next_id = if let Some(next_id) = next_ids.get_mut(&record.collection) {
228 let current = *next_id;
229 *next_id += 1;
230 current
231 } else {
232 let current = self
233 .store
234 .next_internal_id(workspace_id, &record.collection)
235 .await?;
236 next_ids.insert(record.collection.clone(), current + 1);
237 current
238 };
239
240 *deltas.entry(record.collection.clone()).or_insert(0) += 1;
241 ids.push(record.id);
242 staged.push((record, next_id));
243 }
244
245 for (record, internal_id) in &staged {
246 if let Err(err) = self
247 .apply_in_memory_insert(workspace_id, record, *internal_id)
248 .await
249 {
250 self.rollback_batch_in_memory(workspace_id, &staged).await;
251 return Err(err);
252 }
253 }
254
255 if let Err(err) = self.store.batch_insert_records(workspace_id, &staged).await {
256 self.rollback_batch_in_memory(workspace_id, &staged).await;
257 return Err(err);
258 }
259
260 for (collection, delta) in deltas {
261 if let Err(err) = self
262 .store
263 .increment_vector_count(workspace_id, &collection, delta)
264 .await
265 {
266 for (record, _) in &staged {
267 let _ = self.store.delete_record(workspace_id, record.id).await;
268 }
269 self.rollback_batch_in_memory(workspace_id, &staged).await;
270 return Err(err);
271 }
272 self.sync_collection_index_type(workspace_id, &collection)
273 .await?;
274 }
275
276 Ok(ids)
277 }
278
279 #[instrument(skip(self))]
281 pub async fn delete_vector(
282 &self,
283 workspace_id: &str,
284 collection: &str,
285 id: Uuid,
286 ) -> VectorResult<bool> {
287 let (record, internal_id) = match self.store.get_record(workspace_id, id).await {
288 Ok(value) => value,
289 Err(VectorError::NotFound { .. }) => return Ok(false),
290 Err(err) => return Err(err),
291 };
292
293 if record.collection != collection {
294 return Ok(false);
295 }
296
297 {
298 let mut indexes = self.indexes.write().await;
299 let key = self.collection_key(workspace_id, collection);
300 let index = indexes.get_mut(&key).ok_or_else(|| VectorError::NotFound {
301 entity: "collection".into(),
302 id: format!("{workspace_id}/{collection}"),
303 })?;
304 index.delete(internal_id)?;
305 }
306
307 {
308 let mut mmap_files = self.mmap_files.write().await;
309 let key = self.collection_key(workspace_id, collection);
310 let mmap = mmap_files
311 .get_mut(&key)
312 .ok_or_else(|| VectorError::NotFound {
313 entity: "collection".into(),
314 id: format!("{workspace_id}/{collection}"),
315 })?;
316 mmap.delete_vector(internal_id)?;
317 mmap.flush()?;
318 }
319
320 self.store.delete_record(workspace_id, id).await?;
321 self.store
322 .increment_vector_count(workspace_id, collection, -1)
323 .await?;
324 Ok(true)
325 }
326
327 #[instrument(skip(self))]
329 pub async fn get_vector(
330 &self,
331 workspace_id: &str,
332 collection: &str,
333 id: Uuid,
334 ) -> VectorResult<VectorRecord> {
335 let (mut record, internal_id) = self.store.get_record(workspace_id, id).await?;
336 if record.collection != collection {
337 return Err(VectorError::NotFound {
338 entity: "record".into(),
339 id: id.to_string(),
340 });
341 }
342
343 let mmap_files = self.mmap_files.read().await;
344 let key = self.collection_key(workspace_id, collection);
345 let mmap = mmap_files.get(&key).ok_or_else(|| VectorError::NotFound {
346 entity: "collection".into(),
347 id: format!("{workspace_id}/{collection}"),
348 })?;
349 record.vector = mmap.read_vector(internal_id)?;
350 Ok(record)
351 }
352
353 #[instrument(skip(self))]
355 pub async fn persist_indexes(&self) -> VectorResult<()> {
356 let indexes = self.indexes.read().await;
357 for (key, index) in indexes.iter() {
358 if let Some((workspace_id, name)) = key.split_once("::") {
359 index.save(&self.config.index_dir, workspace_id, name)?;
360 }
361 }
362 Ok(())
363 }
364
365 pub async fn read_vector_by_internal_id(
367 &self,
368 workspace_id: &str,
369 collection: &str,
370 internal_id: usize,
371 ) -> VectorResult<Vec<f32>> {
372 let mmap_files = self.mmap_files.read().await;
373 let key = self.collection_key(workspace_id, collection);
374 let mmap = mmap_files.get(&key).ok_or_else(|| VectorError::NotFound {
375 entity: "collection".into(),
376 id: format!("{workspace_id}/{collection}"),
377 })?;
378 mmap.read_vector(internal_id)
379 }
380
381 pub async fn loaded_index_count(&self) -> usize {
383 self.indexes.read().await.len()
384 }
385
386 pub async fn loaded_mmap_count(&self) -> usize {
388 self.mmap_files.read().await.len()
389 }
390
391 async fn restore_collection(&self, collection: &Collection) -> VectorResult<()> {
392 let dir = self.collection_dir(&collection.workspace_id, &collection.name);
393 std::fs::create_dir_all(&dir)?;
394
395 let mmap_path = self.vector_file_path(&collection.workspace_id, &collection.name);
396 let mmap = if mmap_path.exists() {
397 MmapVectorFile::open(&mmap_path)?
398 } else {
399 MmapVectorFile::create(
400 &mmap_path,
401 collection.dimensions,
402 self.config
403 .max_elements
404 .max(collection.vector_count as usize + 1),
405 )?
406 };
407
408 let index = match IndexSelector::load(
409 &self.config.index_dir,
410 &collection.workspace_id,
411 &collection.name,
412 &self.config,
413 collection.distance,
414 collection.dimensions,
415 ) {
416 Ok(index) => index,
417 Err(err) => {
418 warn!(
419 collection = %collection.name,
420 error = %err,
421 "failed to load persisted index, rebuilding from mmap"
422 );
423 self.rebuild_index(collection, &mmap).await?
424 }
425 };
426
427 let key = self.collection_key(&collection.workspace_id, &collection.name);
428 self.indexes.write().await.insert(key.clone(), index);
429 self.mmap_files.write().await.insert(key, mmap);
430 self.sync_collection_index_type(&collection.workspace_id, &collection.name)
431 .await?;
432 Ok(())
433 }
434
435 async fn rebuild_index(
436 &self,
437 collection: &Collection,
438 mmap: &MmapVectorFile,
439 ) -> VectorResult<IndexSelector> {
440 let mut index =
441 IndexSelector::new(collection.dimensions, collection.distance, &self.config);
442 let records = self
443 .store
444 .list_records_for_collection(&collection.workspace_id, &collection.name)
445 .await?;
446 let mut items = Vec::with_capacity(records.len());
447 for (_, internal_id) in records {
448 let vector = mmap.read_vector(internal_id)?;
449 items.push((internal_id, vector));
450 }
451 index.insert_batch(items, &self.config)?;
452 Ok(index)
453 }
454
455 async fn apply_in_memory_insert(
456 &self,
457 workspace_id: &str,
458 record: &VectorRecord,
459 internal_id: usize,
460 ) -> VectorResult<()> {
461 {
462 let mut mmap_files = self.mmap_files.write().await;
463 let key = self.collection_key(workspace_id, &record.collection);
464 let mmap = mmap_files
465 .get_mut(&key)
466 .ok_or_else(|| VectorError::NotFound {
467 entity: "collection".into(),
468 id: format!("{workspace_id}/{}", record.collection),
469 })?;
470 mmap.write_vector(internal_id, &record.vector)?;
471 mmap.flush()?;
472 }
473
474 let mut indexes = self.indexes.write().await;
475 let key = self.collection_key(workspace_id, &record.collection);
476 let index = indexes.get_mut(&key).ok_or_else(|| VectorError::NotFound {
477 entity: "collection".into(),
478 id: format!("{workspace_id}/{}", record.collection),
479 })?;
480 index.insert(internal_id, record.vector.clone(), &self.config)?;
481 Ok(())
482 }
483
484 async fn rollback_in_memory_insert(
485 &self,
486 workspace_id: &str,
487 collection: &str,
488 internal_id: usize,
489 ) {
490 let key = self.collection_key(workspace_id, collection);
491 if let Some(index) = self.indexes.write().await.get_mut(&key) {
492 let _ = index.delete(internal_id);
493 }
494 if let Some(mmap) = self.mmap_files.write().await.get_mut(&key) {
495 let _ = mmap.delete_vector(internal_id);
496 let _ = mmap.flush();
497 }
498 }
499
500 async fn rollback_batch_in_memory(&self, workspace_id: &str, staged: &[(VectorRecord, usize)]) {
501 for (record, internal_id) in staged.iter().rev() {
502 self.rollback_in_memory_insert(workspace_id, &record.collection, *internal_id)
503 .await;
504 }
505 }
506
507 fn collection_dir(&self, workspace_id: &str, name: &str) -> PathBuf {
508 self.config.index_dir.join(workspace_id).join(name)
509 }
510
511 fn vector_file_path(&self, workspace_id: &str, name: &str) -> PathBuf {
512 self.collection_dir(workspace_id, name).join("vectors.bin")
513 }
514
515 async fn sync_collection_index_type(
516 &self,
517 workspace_id: &str,
518 collection: &str,
519 ) -> VectorResult<()> {
520 let current_type = {
521 let indexes = self.indexes.read().await;
522 let key = self.collection_key(workspace_id, collection);
523 let index = indexes.get(&key).ok_or_else(|| VectorError::NotFound {
524 entity: "collection".into(),
525 id: format!("{workspace_id}/{collection}"),
526 })?;
527 if index.is_hnsw() {
528 IndexType::HNSW
529 } else {
530 IndexType::Flat
531 }
532 };
533 self.store
534 .update_collection_index_type(workspace_id, collection, current_type)
535 .await
536 }
537
538 fn collection_key(&self, workspace_id: &str, name: &str) -> String {
539 format!("{workspace_id}::{name}")
540 }
541}