1use super::types::{Collection, CollectionConfig, CollectionType};
4use crate::distance::DistanceMetric;
5use crate::error::{Error, Result};
6use crate::index::{Bm25Index, HnswIndex, VectorIndex};
7use crate::point::Point;
8use crate::quantization::{BinaryQuantizedVector, QuantizedVector, StorageMode};
9use crate::storage::{LogPayloadStorage, MmapStorage, PayloadStorage, VectorStorage};
10
11use std::collections::HashMap;
12
13use parking_lot::RwLock;
14use std::path::PathBuf;
15use std::sync::Arc;
16
17impl Collection {
18 pub fn create(path: PathBuf, dimension: usize, metric: DistanceMetric) -> Result<Self> {
24 Self::create_with_options(path, dimension, metric, StorageMode::default())
25 }
26
27 pub fn create_with_options(
40 path: PathBuf,
41 dimension: usize,
42 metric: DistanceMetric,
43 storage_mode: StorageMode,
44 ) -> Result<Self> {
45 std::fs::create_dir_all(&path)?;
46
47 let name = path
48 .file_name()
49 .and_then(|n| n.to_str())
50 .unwrap_or("unknown")
51 .to_string();
52
53 let config = CollectionConfig {
54 name,
55 dimension,
56 metric,
57 point_count: 0,
58 storage_mode,
59 metadata_only: false,
60 };
61
62 let vector_storage = Arc::new(RwLock::new(
64 MmapStorage::new(&path, dimension).map_err(Error::Io)?,
65 ));
66
67 let payload_storage = Arc::new(RwLock::new(
68 LogPayloadStorage::new(&path).map_err(Error::Io)?,
69 ));
70
71 let index = Arc::new(HnswIndex::new(dimension, metric));
73
74 let text_index = Arc::new(Bm25Index::new());
76
77 let collection = Self {
78 path,
79 config: Arc::new(RwLock::new(config)),
80 vector_storage,
81 payload_storage,
82 index,
83 text_index,
84 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
85 binary_cache: Arc::new(RwLock::new(HashMap::new())),
86 };
87
88 collection.save_config()?;
89
90 Ok(collection)
91 }
92
93 pub fn create_typed(
105 path: PathBuf,
106 name: &str,
107 collection_type: &CollectionType,
108 ) -> Result<Self> {
109 match collection_type {
110 CollectionType::Vector {
111 dimension,
112 metric,
113 storage_mode,
114 } => Self::create_with_options(path, *dimension, *metric, *storage_mode),
115 CollectionType::MetadataOnly => Self::create_metadata_only(path, name),
116 }
117 }
118
119 pub fn create_metadata_only(path: PathBuf, name: &str) -> Result<Self> {
129 std::fs::create_dir_all(&path)?;
130
131 let config = CollectionConfig {
132 name: name.to_string(),
133 dimension: 0, metric: DistanceMetric::Cosine, point_count: 0,
136 storage_mode: StorageMode::Full, metadata_only: true,
138 };
139
140 let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, 0).map_err(Error::Io)?));
143
144 let payload_storage = Arc::new(RwLock::new(
145 LogPayloadStorage::new(&path).map_err(Error::Io)?,
146 ));
147
148 let index = Arc::new(HnswIndex::new(0, DistanceMetric::Cosine));
150
151 let text_index = Arc::new(Bm25Index::new());
153
154 let collection = Self {
155 path,
156 config: Arc::new(RwLock::new(config)),
157 vector_storage,
158 payload_storage,
159 index,
160 text_index,
161 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
162 binary_cache: Arc::new(RwLock::new(HashMap::new())),
163 };
164
165 collection.save_config()?;
166
167 Ok(collection)
168 }
169
170 #[must_use]
172 pub fn is_metadata_only(&self) -> bool {
173 self.config.read().metadata_only
174 }
175
176 pub fn open(path: PathBuf) -> Result<Self> {
182 let config_path = path.join("config.json");
183 let config_data = std::fs::read_to_string(&config_path)?;
184 let config: CollectionConfig =
185 serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))?;
186
187 let vector_storage = Arc::new(RwLock::new(
189 MmapStorage::new(&path, config.dimension).map_err(Error::Io)?,
190 ));
191
192 let payload_storage = Arc::new(RwLock::new(
193 LogPayloadStorage::new(&path).map_err(Error::Io)?,
194 ));
195
196 let index = if path.join("hnsw.bin").exists() {
198 Arc::new(HnswIndex::load(&path, config.dimension, config.metric).map_err(Error::Io)?)
199 } else {
200 Arc::new(HnswIndex::new(config.dimension, config.metric))
201 };
202
203 let text_index = Arc::new(Bm25Index::new());
205
206 {
208 let storage = payload_storage.read();
209 let ids = storage.ids();
210 for id in ids {
211 if let Ok(Some(payload)) = storage.retrieve(id) {
212 let text = Self::extract_text_from_payload(&payload);
213 if !text.is_empty() {
214 text_index.add_document(id, &text);
215 }
216 }
217 }
218 }
219
220 Ok(Self {
221 path,
222 config: Arc::new(RwLock::new(config)),
223 vector_storage,
224 payload_storage,
225 index,
226 text_index,
227 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
228 binary_cache: Arc::new(RwLock::new(HashMap::new())),
229 })
230 }
231
232 #[must_use]
234 pub fn config(&self) -> CollectionConfig {
235 self.config.read().clone()
236 }
237
238 pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
247 let points: Vec<Point> = points.into_iter().collect();
248 let config = self.config.read();
249 let dimension = config.dimension;
250 let storage_mode = config.storage_mode;
251 let metadata_only = config.metadata_only;
252 let name = config.name.clone();
253 drop(config);
254
255 if metadata_only {
257 for point in &points {
258 if !point.vector.is_empty() {
259 return Err(Error::VectorNotAllowed(name));
260 }
261 }
262 return self.upsert_metadata(points);
264 }
265
266 for point in &points {
268 if point.dimension() != dimension {
269 return Err(Error::DimensionMismatch {
270 expected: dimension,
271 actual: point.dimension(),
272 });
273 }
274 }
275
276 let mut vector_storage = self.vector_storage.write();
277 let mut payload_storage = self.payload_storage.write();
278
279 let mut sq8_cache = match storage_mode {
281 StorageMode::SQ8 => Some(self.sq8_cache.write()),
282 _ => None,
283 };
284 let mut binary_cache = match storage_mode {
285 StorageMode::Binary => Some(self.binary_cache.write()),
286 _ => None,
287 };
288
289 for point in points {
290 vector_storage
292 .store(point.id, &point.vector)
293 .map_err(Error::Io)?;
294
295 match storage_mode {
297 StorageMode::SQ8 => {
298 if let Some(ref mut cache) = sq8_cache {
299 let quantized = QuantizedVector::from_f32(&point.vector);
300 cache.insert(point.id, quantized);
301 }
302 }
303 StorageMode::Binary => {
304 if let Some(ref mut cache) = binary_cache {
305 let quantized = BinaryQuantizedVector::from_f32(&point.vector);
306 cache.insert(point.id, quantized);
307 }
308 }
309 StorageMode::Full => {}
310 }
311
312 if let Some(payload) = &point.payload {
314 payload_storage
315 .store(point.id, payload)
316 .map_err(Error::Io)?;
317 } else {
318 let _ = payload_storage.delete(point.id);
319 }
320
321 self.index.insert(point.id, &point.vector);
323
324 if let Some(payload) = &point.payload {
326 let text = Self::extract_text_from_payload(payload);
327 if !text.is_empty() {
328 self.text_index.add_document(point.id, &text);
329 }
330 } else {
331 self.text_index.remove_document(point.id);
332 }
333 }
334
335 let mut config = self.config.write();
337 config.point_count = vector_storage.len();
338
339 vector_storage.flush().map_err(Error::Io)?;
341 payload_storage.flush().map_err(Error::Io)?;
342 self.index.save(&self.path).map_err(Error::Io)?;
343
344 Ok(())
345 }
346
347 pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
356 let points: Vec<Point> = points.into_iter().collect();
357
358 let mut payload_storage = self.payload_storage.write();
359
360 for point in &points {
361 if let Some(payload) = &point.payload {
363 payload_storage
364 .store(point.id, payload)
365 .map_err(Error::Io)?;
366
367 let text = Self::extract_text_from_payload(payload);
369 if !text.is_empty() {
370 self.text_index.add_document(point.id, &text);
371 }
372 } else {
373 let _ = payload_storage.delete(point.id);
374 self.text_index.remove_document(point.id);
375 }
376 }
377
378 let mut config = self.config.write();
380 config.point_count = payload_storage.ids().len();
381
382 payload_storage.flush().map_err(Error::Io)?;
384
385 Ok(())
386 }
387
388 pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
403 if points.is_empty() {
404 return Ok(0);
405 }
406
407 let config = self.config.read();
408 let dimension = config.dimension;
409 drop(config);
410
411 for point in points {
413 if point.dimension() != dimension {
414 return Err(Error::DimensionMismatch {
415 expected: dimension,
416 actual: point.dimension(),
417 });
418 }
419 }
420
421 let vectors_for_hnsw: Vec<(u64, Vec<f32>)> =
423 points.iter().map(|p| (p.id, p.vector.clone())).collect();
424
425 let vectors_for_storage: Vec<(u64, &[f32])> = vectors_for_hnsw
428 .iter()
429 .map(|(id, v)| (*id, v.as_slice()))
430 .collect();
431
432 let mut vector_storage = self.vector_storage.write();
433 vector_storage
434 .store_batch(&vectors_for_storage)
435 .map_err(Error::Io)?;
436 drop(vector_storage);
437
438 let mut payload_storage = self.payload_storage.write();
440 for point in points {
441 if let Some(payload) = &point.payload {
442 payload_storage
443 .store(point.id, payload)
444 .map_err(Error::Io)?;
445
446 let text = Self::extract_text_from_payload(payload);
448 if !text.is_empty() {
449 self.text_index.add_document(point.id, &text);
450 }
451 }
452 }
453 drop(payload_storage);
454
455 let inserted = self.index.insert_batch_parallel(vectors_for_hnsw);
457 self.index.set_searching_mode();
458
459 let mut config = self.config.write();
461 config.point_count = self.vector_storage.read().len();
462 drop(config);
463
464 self.vector_storage.write().flush().map_err(Error::Io)?;
468 self.payload_storage.write().flush().map_err(Error::Io)?;
469 Ok(inserted)
473 }
474
475 #[must_use]
477 pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
478 let config = self.config.read();
479 let is_metadata_only = config.metadata_only;
480 drop(config);
481
482 let payload_storage = self.payload_storage.read();
483
484 if is_metadata_only {
485 ids.iter()
487 .map(|&id| {
488 let payload = payload_storage.retrieve(id).ok().flatten()?;
489 Some(Point {
490 id,
491 vector: Vec::new(),
492 payload: Some(payload),
493 })
494 })
495 .collect()
496 } else {
497 let vector_storage = self.vector_storage.read();
499 ids.iter()
500 .map(|&id| {
501 let vector = vector_storage.retrieve(id).ok().flatten()?;
502 let payload = payload_storage.retrieve(id).ok().flatten();
503 Some(Point {
504 id,
505 vector,
506 payload,
507 })
508 })
509 .collect()
510 }
511 }
512
513 pub fn delete(&self, ids: &[u64]) -> Result<()> {
519 let config = self.config.read();
520 let is_metadata_only = config.metadata_only;
521 drop(config);
522
523 let mut payload_storage = self.payload_storage.write();
524
525 if is_metadata_only {
526 for &id in ids {
528 payload_storage.delete(id).map_err(Error::Io)?;
529 self.text_index.remove_document(id);
530 }
531
532 let mut config = self.config.write();
533 config.point_count = payload_storage.ids().len();
534 } else {
535 let mut vector_storage = self.vector_storage.write();
537
538 for &id in ids {
539 vector_storage.delete(id).map_err(Error::Io)?;
540 payload_storage.delete(id).map_err(Error::Io)?;
541 self.index.remove(id);
542 self.text_index.remove_document(id);
543 }
544
545 let mut config = self.config.write();
546 config.point_count = vector_storage.len();
547 }
548
549 Ok(())
550 }
551
552 #[must_use]
555 pub fn len(&self) -> usize {
556 self.config.read().point_count
557 }
558
559 #[must_use]
562 pub fn is_empty(&self) -> bool {
563 self.config.read().point_count == 0
564 }
565
566 pub fn flush(&self) -> Result<()> {
572 self.save_config()?;
573 self.vector_storage.write().flush().map_err(Error::Io)?;
574 self.payload_storage.write().flush().map_err(Error::Io)?;
575 self.index.save(&self.path).map_err(Error::Io)?;
576 Ok(())
577 }
578
579 fn save_config(&self) -> Result<()> {
581 let config = self.config.read();
582 let config_path = self.path.join("config.json");
583 let config_data = serde_json::to_string_pretty(&*config)
584 .map_err(|e| Error::Serialization(e.to_string()))?;
585 std::fs::write(config_path, config_data)?;
586 Ok(())
587 }
588}