1use crate::collection::graph::{EdgeStore, PropertyIndex, RangeIndex};
4use crate::collection::types::{Collection, CollectionConfig, CollectionType};
5use crate::distance::DistanceMetric;
6use crate::error::{Error, Result};
7use crate::guardrails::GuardRails;
8use crate::index::{Bm25Index, HnswIndex};
9use crate::quantization::StorageMode;
10use crate::sparse_index::DEFAULT_SPARSE_INDEX_NAME;
11use crate::storage::{LogPayloadStorage, MmapStorage, PayloadStorage, VectorStorage};
12use crate::velesql::{QueryCache, QueryPlanner};
13
14use std::collections::{BTreeMap, HashMap, VecDeque};
15
16use parking_lot::{Mutex, RwLock};
17use std::path::PathBuf;
18use std::sync::Arc;
19
20impl Collection {
21 pub fn create(path: PathBuf, dimension: usize, metric: DistanceMetric) -> Result<Self> {
27 Self::create_with_options(path, dimension, metric, StorageMode::default())
28 }
29
30 pub fn create_with_options(
43 path: PathBuf,
44 dimension: usize,
45 metric: DistanceMetric,
46 storage_mode: StorageMode,
47 ) -> Result<Self> {
48 std::fs::create_dir_all(&path)?;
49
50 let name = path
51 .file_name()
52 .and_then(|n| n.to_str())
53 .unwrap_or("unknown")
54 .to_string();
55
56 let config = CollectionConfig {
57 name,
58 dimension,
59 metric,
60 point_count: 0,
61 storage_mode,
62 metadata_only: false,
63 graph_schema: None,
64 embedding_dimension: None,
65 pq_rescore_oversampling: Some(4),
66 };
67
68 let vector_storage = Arc::new(RwLock::new(
70 MmapStorage::new(&path, dimension).map_err(Error::Io)?,
71 ));
72
73 let payload_storage = Arc::new(RwLock::new(
74 LogPayloadStorage::new(&path).map_err(Error::Io)?,
75 ));
76
77 let index = Arc::new(HnswIndex::new(dimension, metric));
79
80 let text_index = Arc::new(Bm25Index::new());
82
83 let collection = Self {
84 path,
85 config: Arc::new(RwLock::new(config)),
86 vector_storage,
87 payload_storage,
88 index,
89 text_index,
90 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
91 binary_cache: Arc::new(RwLock::new(HashMap::new())),
92 pq_cache: Arc::new(RwLock::new(HashMap::new())),
93 pq_quantizer: Arc::new(RwLock::new(None)),
94 pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
95 property_index: Arc::new(RwLock::new(PropertyIndex::new())),
96 range_index: Arc::new(RwLock::new(RangeIndex::new())),
97 edge_store: Arc::new(RwLock::new(EdgeStore::new())),
98 sparse_indexes: Arc::new(RwLock::new(BTreeMap::new())),
99 secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
100 guard_rails: Arc::new(GuardRails::default()),
101 query_planner: Arc::new(QueryPlanner::new()),
102 query_cache: Arc::new(QueryCache::new(256)),
103 cached_stats: Arc::new(Mutex::new(None)),
104 write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
105 #[cfg(feature = "persistence")]
106 stream_ingester: Arc::new(RwLock::new(None)),
107 #[cfg(feature = "persistence")]
108 delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
109 };
110
111 collection.save_config()?;
112
113 Ok(collection)
114 }
115
116 pub fn create_typed(
128 path: PathBuf,
129 name: &str,
130 collection_type: &CollectionType,
131 ) -> Result<Self> {
132 match collection_type {
133 CollectionType::Vector {
134 dimension,
135 metric,
136 storage_mode,
137 } => Self::create_with_options(path, *dimension, *metric, *storage_mode),
138 CollectionType::MetadataOnly => Self::create_metadata_only(path, name),
139 CollectionType::Graph { .. } => {
140 Err(crate::Error::GraphNotSupported(
143 "Graph collection creation not yet implemented".to_string(),
144 ))
145 }
146 }
147 }
148
149 pub fn create_metadata_only(path: PathBuf, name: &str) -> Result<Self> {
159 std::fs::create_dir_all(&path)?;
160
161 let config = CollectionConfig {
162 name: name.to_string(),
163 dimension: 0, metric: DistanceMetric::Cosine, point_count: 0,
166 storage_mode: StorageMode::Full, metadata_only: true,
168 graph_schema: None,
169 embedding_dimension: None,
170 pq_rescore_oversampling: Some(4),
171 };
172
173 let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, 0).map_err(Error::Io)?));
176
177 let payload_storage = Arc::new(RwLock::new(
178 LogPayloadStorage::new(&path).map_err(Error::Io)?,
179 ));
180
181 let index = Arc::new(HnswIndex::new(0, DistanceMetric::Cosine));
183
184 let text_index = Arc::new(Bm25Index::new());
186
187 let collection = Self {
188 path,
189 config: Arc::new(RwLock::new(config)),
190 vector_storage,
191 payload_storage,
192 index,
193 text_index,
194 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
195 binary_cache: Arc::new(RwLock::new(HashMap::new())),
196 pq_cache: Arc::new(RwLock::new(HashMap::new())),
197 pq_quantizer: Arc::new(RwLock::new(None)),
198 pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
199 property_index: Arc::new(RwLock::new(PropertyIndex::new())),
200 range_index: Arc::new(RwLock::new(RangeIndex::new())),
201 edge_store: Arc::new(RwLock::new(EdgeStore::new())),
202 sparse_indexes: Arc::new(RwLock::new(BTreeMap::new())),
203 secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
204 guard_rails: Arc::new(GuardRails::default()),
205 query_planner: Arc::new(QueryPlanner::new()),
206 query_cache: Arc::new(QueryCache::new(256)),
207 cached_stats: Arc::new(Mutex::new(None)),
208 write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
209 #[cfg(feature = "persistence")]
210 stream_ingester: Arc::new(RwLock::new(None)),
211 #[cfg(feature = "persistence")]
212 delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
213 };
214
215 collection.save_config()?;
216
217 Ok(collection)
218 }
219
220 #[must_use]
222 pub fn is_metadata_only(&self) -> bool {
223 self.config.read().metadata_only
224 }
225
226 pub fn open(path: PathBuf) -> Result<Self> {
250 let config_path = path.join("config.json");
251 let config_data = std::fs::read_to_string(&config_path)?;
252 let config: CollectionConfig =
253 serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))?;
254
255 let vector_storage = Arc::new(RwLock::new(
257 MmapStorage::new(&path, config.dimension).map_err(Error::Io)?,
258 ));
259
260 let payload_storage = Arc::new(RwLock::new(
261 LogPayloadStorage::new(&path).map_err(Error::Io)?,
262 ));
263
264 let index = if path.join("hnsw.bin").exists() {
266 Arc::new(HnswIndex::load(&path, config.dimension, config.metric).map_err(Error::Io)?)
267 } else {
268 Arc::new(HnswIndex::new(config.dimension, config.metric))
269 };
270
271 let text_index = Arc::new(Bm25Index::new());
273
274 {
276 let storage = payload_storage.read();
277 let ids = storage.ids();
278 for id in ids {
279 if let Ok(Some(payload)) = storage.retrieve(id) {
280 let text = Self::extract_text_from_payload(&payload);
281 if !text.is_empty() {
282 text_index.add_document(id, &text);
283 }
284 }
285 }
286 }
287
288 let property_index = Self::load_property_index(&path);
290 let range_index = Self::load_range_index(&path);
291 let edge_store = Self::load_edge_store(&path);
293 let sparse_indexes = Self::load_named_sparse_indexes(&path);
295
296 Ok(Self {
297 path,
298 config: Arc::new(RwLock::new(config)),
299 vector_storage,
300 payload_storage,
301 index,
302 text_index,
303 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
304 binary_cache: Arc::new(RwLock::new(HashMap::new())),
305 pq_cache: Arc::new(RwLock::new(HashMap::new())),
306 pq_quantizer: Arc::new(RwLock::new(None)),
307 pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
308 property_index: Arc::new(RwLock::new(property_index)),
309 range_index: Arc::new(RwLock::new(range_index)),
310 edge_store: Arc::new(RwLock::new(edge_store)),
311 sparse_indexes: Arc::new(RwLock::new(sparse_indexes)),
312 secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
313 guard_rails: Arc::new(GuardRails::default()),
314 query_planner: Arc::new(QueryPlanner::new()),
315 query_cache: Arc::new(QueryCache::new(256)),
316 cached_stats: Arc::new(Mutex::new(None)),
317 write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
318 #[cfg(feature = "persistence")]
319 stream_ingester: Arc::new(RwLock::new(None)),
320 #[cfg(feature = "persistence")]
321 delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
322 })
323 }
324
325 pub fn create_graph_collection(
333 path: PathBuf,
334 name: &str,
335 schema: crate::collection::graph::GraphSchema,
336 embedding_dim: Option<usize>,
337 metric: DistanceMetric,
338 ) -> Result<Self> {
339 let dim = embedding_dim.unwrap_or(0);
340 std::fs::create_dir_all(&path)?;
341
342 let config = CollectionConfig {
343 name: name.to_string(),
344 dimension: dim,
345 metric,
346 point_count: 0,
347 storage_mode: StorageMode::Full,
348 metadata_only: false,
349 graph_schema: Some(schema),
350 embedding_dimension: embedding_dim,
351 pq_rescore_oversampling: Some(4),
352 };
353
354 let vector_storage = Arc::new(RwLock::new(
355 MmapStorage::new(&path, dim).map_err(Error::Io)?,
356 ));
357 let payload_storage = Arc::new(RwLock::new(
358 LogPayloadStorage::new(&path).map_err(Error::Io)?,
359 ));
360 let index = Arc::new(HnswIndex::new(dim, metric));
361 let text_index = Arc::new(Bm25Index::new());
362
363 let collection = Self {
364 path,
365 config: Arc::new(RwLock::new(config)),
366 vector_storage,
367 payload_storage,
368 index,
369 text_index,
370 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
371 binary_cache: Arc::new(RwLock::new(HashMap::new())),
372 pq_cache: Arc::new(RwLock::new(HashMap::new())),
373 pq_quantizer: Arc::new(RwLock::new(None)),
374 pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
375 property_index: Arc::new(RwLock::new(PropertyIndex::new())),
376 range_index: Arc::new(RwLock::new(RangeIndex::new())),
377 edge_store: Arc::new(RwLock::new(EdgeStore::new())),
378 sparse_indexes: Arc::new(RwLock::new(BTreeMap::new())),
379 secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
380 guard_rails: Arc::new(GuardRails::default()),
381 query_planner: Arc::new(QueryPlanner::new()),
382 query_cache: Arc::new(QueryCache::new(256)),
383 cached_stats: Arc::new(Mutex::new(None)),
384 write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
385 #[cfg(feature = "persistence")]
386 stream_ingester: Arc::new(RwLock::new(None)),
387 #[cfg(feature = "persistence")]
388 delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
389 };
390
391 collection.save_config()?;
392 Ok(collection)
393 }
394
395 fn load_named_sparse_indexes(
416 path: &std::path::Path,
417 ) -> BTreeMap<String, crate::index::sparse::SparseInvertedIndex> {
418 let mut indexes = BTreeMap::new();
419
420 match crate::index::sparse::persistence::load_from_disk(path) {
422 Ok(Some(idx)) => {
423 indexes.insert(DEFAULT_SPARSE_INDEX_NAME.to_string(), idx);
424 }
425 Ok(None) => {}
426 Err(e) => {
427 tracing::warn!(
428 "Failed to load default sparse index from {:?}: {}. Skipping.",
429 path,
430 e
431 );
432 }
433 }
434
435 if let Ok(entries) = std::fs::read_dir(path) {
440 for entry in entries.flatten() {
441 let file_name = entry.file_name();
442 let name_str = file_name.to_string_lossy();
443 if let Some(sparse_name) = name_str
444 .strip_prefix("sparse-")
445 .and_then(|s| s.strip_suffix(".meta"))
446 {
447 let sparse_name = sparse_name.to_string();
448 match crate::index::sparse::persistence::load_named_from_disk(
449 path,
450 &sparse_name,
451 ) {
452 Ok(Some(idx)) => {
453 indexes.insert(sparse_name, idx);
454 }
455 Ok(None) => {}
456 Err(e) => {
457 tracing::warn!(
458 "Failed to load sparse index '{}' from {:?}: {}. Skipping.",
459 sparse_name,
460 path,
461 e
462 );
463 }
464 }
465 }
466 }
467 }
468
469 indexes
470 }
471
472 fn load_edge_store(path: &std::path::Path) -> EdgeStore {
473 let edge_path = path.join("edge_store.bin");
474 if edge_path.exists() {
475 match EdgeStore::load_from_file(&edge_path) {
476 Ok(store) => return store,
477 Err(e) => tracing::warn!(
478 "Failed to load EdgeStore from {:?}: {}. Starting empty.",
479 edge_path,
480 e
481 ),
482 }
483 }
484 EdgeStore::new()
485 }
486
487 fn load_property_index(path: &std::path::Path) -> PropertyIndex {
488 let index_path = path.join("property_index.bin");
489 if index_path.exists() {
490 match PropertyIndex::load_from_file(&index_path) {
491 Ok(idx) => return idx,
492 Err(e) => tracing::warn!(
493 "Failed to load PropertyIndex from {:?}: {}. Starting with empty index.",
494 index_path,
495 e
496 ),
497 }
498 }
499 PropertyIndex::new()
500 }
501
502 fn load_range_index(path: &std::path::Path) -> RangeIndex {
503 let index_path = path.join("range_index.bin");
504 if index_path.exists() {
505 match RangeIndex::load_from_file(&index_path) {
506 Ok(idx) => return idx,
507 Err(e) => tracing::warn!(
508 "Failed to load RangeIndex from {:?}: {}. Starting with empty index.",
509 index_path,
510 e
511 ),
512 }
513 }
514 RangeIndex::new()
515 }
516
517 #[must_use]
519 pub fn config(&self) -> CollectionConfig {
520 self.config.read().clone()
521 }
522
523 pub fn flush(&self) -> Result<()> {
529 self.save_config()?;
530 self.vector_storage.write().flush().map_err(Error::Io)?;
531 self.payload_storage.write().flush().map_err(Error::Io)?;
532 self.index.save(&self.path).map_err(Error::Io)?;
533
534 let property_index_path = self.path.join("property_index.bin");
536 self.property_index
537 .read()
538 .save_to_file(&property_index_path)
539 .map_err(Error::Io)?;
540
541 let range_index_path = self.path.join("range_index.bin");
543 self.range_index
544 .read()
545 .save_to_file(&range_index_path)
546 .map_err(Error::Io)?;
547
548 if self.config.read().graph_schema.is_some() {
550 let edge_store_path = self.path.join("edge_store.bin");
551 self.edge_store
552 .read()
553 .save_to_file(&edge_store_path)
554 .map_err(Error::Io)?;
555 }
556
557 {
559 let indexes = self.sparse_indexes.read();
560 for (name, idx) in indexes.iter() {
561 crate::index::sparse::persistence::compact_named(&self.path, name, idx)?;
562 }
563 }
564
565 Ok(())
566 }
567
568 #[must_use]
570 pub(crate) fn data_path(&self) -> &std::path::Path {
571 &self.path
572 }
573
574 pub(crate) fn config_write(
576 &self,
577 ) -> parking_lot::RwLockWriteGuard<'_, crate::collection::types::CollectionConfig> {
578 self.config.write()
579 }
580
581 pub(crate) fn pq_quantizer_write(
583 &self,
584 ) -> parking_lot::RwLockWriteGuard<'_, Option<crate::quantization::ProductQuantizer>> {
585 self.pq_quantizer.write()
586 }
587
588 pub(crate) fn pq_quantizer_read(
590 &self,
591 ) -> parking_lot::RwLockReadGuard<'_, Option<crate::quantization::ProductQuantizer>> {
592 self.pq_quantizer.read()
593 }
594
595 pub(crate) fn save_config(&self) -> Result<()> {
597 let config = self.config.read();
598 let config_path = self.path.join("config.json");
599 let config_data = serde_json::to_string_pretty(&*config)
600 .map_err(|e| Error::Serialization(e.to_string()))?;
601 std::fs::write(config_path, config_data)?;
602 Ok(())
603 }
604}