1use crate::collection::graph::{EdgeStore, PropertyIndex, RangeIndex};
4use crate::collection::types::{Collection, CollectionConfig, CollectionType};
5use crate::distance::DistanceMetric;
6use crate::error::{Error, Result};
7use crate::guardrails::GuardRails;
8use crate::index::{Bm25Index, HnswIndex};
9use crate::quantization::StorageMode;
10use crate::sparse_index::DEFAULT_SPARSE_INDEX_NAME;
11use crate::storage::{LogPayloadStorage, MmapStorage, PayloadStorage, VectorStorage};
12use crate::validation::validate_dimension;
13use crate::velesql::{QueryCache, QueryPlanner};
14
15use crate::index::sparse::SparseInvertedIndex;
16
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use parking_lot::{Mutex, RwLock};
20use std::path::PathBuf;
21use std::sync::Arc;
22
23struct CollectionParts {
28 path: PathBuf,
29 config: CollectionConfig,
30 vector_storage: Arc<RwLock<MmapStorage>>,
31 payload_storage: Arc<RwLock<LogPayloadStorage>>,
32 index: Arc<HnswIndex>,
33 text_index: Arc<Bm25Index>,
34 property_index: PropertyIndex,
35 range_index: RangeIndex,
36 edge_store: EdgeStore,
37 sparse_indexes: BTreeMap<String, SparseInvertedIndex>,
38}
39
40impl CollectionParts {
41 fn new_with_empty_indexes(
47 path: PathBuf,
48 config: CollectionConfig,
49 vector_storage: Arc<RwLock<MmapStorage>>,
50 payload_storage: Arc<RwLock<LogPayloadStorage>>,
51 index: Arc<HnswIndex>,
52 text_index: Arc<Bm25Index>,
53 ) -> Self {
54 Self {
55 path,
56 config,
57 vector_storage,
58 payload_storage,
59 index,
60 text_index,
61 property_index: PropertyIndex::new(),
62 range_index: RangeIndex::new(),
63 edge_store: EdgeStore::new(),
64 sparse_indexes: BTreeMap::new(),
65 }
66 }
67}
68
69impl Collection {
70 fn assemble(parts: CollectionParts) -> Self {
75 #[cfg(feature = "persistence")]
76 let deferred_indexer = Self::build_deferred_indexer(&parts.config);
77
78 Self {
79 path: parts.path,
80 config: Arc::new(RwLock::new(parts.config)),
81 vector_storage: parts.vector_storage,
82 payload_storage: parts.payload_storage,
83 index: parts.index,
84 text_index: parts.text_index,
85 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
86 binary_cache: Arc::new(RwLock::new(HashMap::new())),
87 pq_cache: Arc::new(RwLock::new(HashMap::new())),
88 pq_quantizer: Arc::new(RwLock::new(None)),
89 pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
90 property_index: Arc::new(RwLock::new(parts.property_index)),
91 range_index: Arc::new(RwLock::new(parts.range_index)),
92 edge_store: Arc::new(RwLock::new(parts.edge_store)),
93 sparse_indexes: Arc::new(RwLock::new(parts.sparse_indexes)),
94 secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
95 guard_rails: Arc::new(GuardRails::default()),
96 query_planner: Arc::new(QueryPlanner::new()),
97 query_cache: Arc::new(QueryCache::new(256)),
98 cached_stats: Arc::new(Mutex::new(None)),
99 write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
100 inserts_since_last_hnsw_save: Arc::new(std::sync::atomic::AtomicU64::new(0)),
101 #[cfg(feature = "persistence")]
102 stream_ingester: Arc::new(RwLock::new(None)),
103 #[cfg(feature = "persistence")]
104 delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
105 #[cfg(feature = "persistence")]
106 deferred_indexer,
107 }
108 }
109
110 #[cfg(feature = "persistence")]
115 fn build_deferred_indexer(
116 config: &CollectionConfig,
117 ) -> Option<Arc<crate::collection::streaming::DeferredIndexer>> {
118 config
119 .deferred_indexing
120 .as_ref()
121 .filter(|cfg| cfg.enabled)
122 .map(|cfg| {
123 Arc::new(crate::collection::streaming::DeferredIndexer::new(
124 cfg.clone(),
125 ))
126 })
127 }
128
129 fn init_collection_parts(
134 path: PathBuf,
135 config: CollectionConfig,
136 hnsw_params: Option<crate::index::hnsw::HnswParams>,
137 ) -> Result<CollectionParts> {
138 let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
139 let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
140 let index = if let Some(params) = hnsw_params {
141 Arc::new(HnswIndex::with_params(
142 config.dimension,
143 config.metric,
144 params,
145 )?)
146 } else {
147 Arc::new(HnswIndex::new(config.dimension, config.metric)?)
148 };
149 let text_index = Arc::new(Bm25Index::new());
150 Ok(CollectionParts::new_with_empty_indexes(
151 path,
152 config,
153 vector_storage,
154 payload_storage,
155 index,
156 text_index,
157 ))
158 }
159
160 fn rebuild_bm25_index(
162 payload_storage: &Arc<RwLock<LogPayloadStorage>>,
163 text_index: &Arc<Bm25Index>,
164 ) {
165 let storage = payload_storage.read();
166 let ids = storage.ids();
167 for id in ids {
168 if let Ok(Some(payload)) = storage.retrieve(id) {
169 let text = Self::extract_text_from_payload(&payload);
170 if !text.is_empty() {
171 text_index.add_document(id, &text);
172 }
173 }
174 }
175 }
176
177 pub fn create(path: PathBuf, dimension: usize, metric: DistanceMetric) -> Result<Self> {
183 Self::create_with_options(path, dimension, metric, StorageMode::default())
184 }
185
186 fn name_from_path(path: &std::path::Path) -> String {
188 path.file_name()
189 .and_then(|n| n.to_str())
190 .unwrap_or("unknown")
191 .to_string()
192 }
193
194 fn create_from_config(
199 path: PathBuf,
200 config: CollectionConfig,
201 hnsw_params: Option<crate::index::hnsw::HnswParams>,
202 ) -> Result<Self> {
203 let skip_dimension_check = config.metadata_only
205 || (config.graph_schema.is_some() && config.embedding_dimension.is_none());
206 if !skip_dimension_check {
207 validate_dimension(config.dimension)?;
208 }
209 std::fs::create_dir_all(&path)?;
210
211 let collection = Self::assemble(Self::init_collection_parts(path, config, hnsw_params)?);
212 collection.save_config()?;
213 Ok(collection)
214 }
215
216 pub fn create_with_options(
229 path: PathBuf,
230 dimension: usize,
231 metric: DistanceMetric,
232 storage_mode: StorageMode,
233 ) -> Result<Self> {
234 let config = CollectionConfig {
235 name: Self::name_from_path(&path),
236 dimension,
237 metric,
238 point_count: 0,
239 storage_mode,
240 metadata_only: false,
241 graph_schema: None,
242 embedding_dimension: None,
243 pq_rescore_oversampling: Some(4),
244 hnsw_params: None,
245 #[cfg(feature = "persistence")]
246 deferred_indexing: None,
247 };
248 Self::create_from_config(path, config, None)
249 }
250
251 pub fn create_with_hnsw_params(
269 path: PathBuf,
270 dimension: usize,
271 metric: DistanceMetric,
272 storage_mode: StorageMode,
273 hnsw_params: crate::index::hnsw::HnswParams,
274 ) -> Result<Self> {
275 let config = CollectionConfig {
276 name: Self::name_from_path(&path),
277 dimension,
278 metric,
279 point_count: 0,
280 storage_mode,
281 metadata_only: false,
282 graph_schema: None,
283 embedding_dimension: None,
284 pq_rescore_oversampling: Some(4),
285 hnsw_params: Some(hnsw_params),
286 #[cfg(feature = "persistence")]
287 deferred_indexing: None,
288 };
289 Self::create_from_config(path, config, Some(hnsw_params))
290 }
291
292 pub fn create_typed(
304 path: PathBuf,
305 name: &str,
306 collection_type: &CollectionType,
307 ) -> Result<Self> {
308 match collection_type {
309 CollectionType::Vector {
310 dimension,
311 metric,
312 storage_mode,
313 } => Self::create_with_options(path, *dimension, *metric, *storage_mode),
314 CollectionType::MetadataOnly => Self::create_metadata_only(path, name),
315 CollectionType::Graph { .. } => {
316 Err(crate::Error::GraphNotSupported(
319 "Graph collection creation not yet implemented".to_string(),
320 ))
321 }
322 }
323 }
324
325 pub fn create_metadata_only(path: PathBuf, name: &str) -> Result<Self> {
335 let config = CollectionConfig {
336 name: name.to_string(),
337 dimension: 0, metric: DistanceMetric::Cosine, point_count: 0,
340 storage_mode: StorageMode::Full, metadata_only: true,
342 graph_schema: None,
343 embedding_dimension: None,
344 pq_rescore_oversampling: Some(4),
345 hnsw_params: None,
346 #[cfg(feature = "persistence")]
347 deferred_indexing: None,
348 };
349 Self::create_from_config(path, config, None)
350 }
351
352 #[must_use]
354 pub fn is_metadata_only(&self) -> bool {
355 self.config.read().metadata_only
356 }
357
358 pub fn open(path: PathBuf) -> Result<Self> {
382 let mut config = Self::load_config(&path)?;
383
384 let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
385 let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
386 let index = Self::load_or_create_hnsw(&path, &config)?;
387 let text_index = Arc::new(Bm25Index::new());
388
389 Self::rebuild_bm25_index(&payload_storage, &text_index);
390
391 let property_index = Self::load_property_index(&path);
392 let range_index = Self::load_range_index(&path);
393 let edge_store = Self::load_edge_store(&path);
394 let sparse_indexes = Self::load_named_sparse_indexes(&path);
395
396 config.point_count =
397 Self::reconcile_point_count(&config, &vector_storage, &payload_storage);
398
399 Self::run_crash_recovery(&config, &vector_storage, &index)?;
400
401 Ok(Self::assemble(CollectionParts {
402 path,
403 config,
404 vector_storage,
405 payload_storage,
406 index,
407 text_index,
408 property_index,
409 range_index,
410 edge_store,
411 sparse_indexes,
412 }))
413 }
414
415 fn load_config(path: &std::path::Path) -> Result<CollectionConfig> {
417 let config_path = path.join("config.json");
418 let config_data = std::fs::read_to_string(&config_path)?;
419 serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))
420 }
421
422 fn reconcile_point_count(
425 config: &CollectionConfig,
426 vector_storage: &Arc<RwLock<MmapStorage>>,
427 payload_storage: &Arc<RwLock<LogPayloadStorage>>,
428 ) -> usize {
429 if config.metadata_only {
430 payload_storage.read().ids().len()
431 } else {
432 vector_storage.read().len()
433 }
434 }
435
436 #[cfg(feature = "persistence")]
439 fn run_crash_recovery(
440 config: &CollectionConfig,
441 vector_storage: &Arc<RwLock<MmapStorage>>,
442 index: &Arc<HnswIndex>,
443 ) -> Result<()> {
444 if config.metadata_only || config.dimension == 0 {
445 return Ok(());
446 }
447 let recovered = super::recovery::recover_hnsw_gap(vector_storage, index, config.dimension)?;
448 if recovered > 0 {
449 tracing::info!(
450 collection = %config.name,
451 recovered,
452 "Collection gap recovery completed on open"
453 );
454 }
455 Ok(())
456 }
457
458 #[cfg(not(feature = "persistence"))]
460 fn run_crash_recovery(
461 _config: &CollectionConfig,
462 _vector_storage: &Arc<RwLock<MmapStorage>>,
463 _index: &Arc<HnswIndex>,
464 ) -> Result<()> {
465 Ok(())
466 }
467
468 pub fn create_graph_collection(
476 path: PathBuf,
477 name: &str,
478 schema: crate::collection::graph::GraphSchema,
479 embedding_dim: Option<usize>,
480 metric: DistanceMetric,
481 ) -> Result<Self> {
482 let config = CollectionConfig {
483 name: name.to_string(),
484 dimension: embedding_dim.unwrap_or(0),
485 metric,
486 point_count: 0,
487 storage_mode: StorageMode::Full,
488 metadata_only: false,
489 graph_schema: Some(schema),
490 embedding_dimension: embedding_dim,
491 pq_rescore_oversampling: Some(4),
492 hnsw_params: None,
493 #[cfg(feature = "persistence")]
494 deferred_indexing: None,
495 };
496 Self::create_from_config(path, config, None)
499 }
500
501 fn load_or_create_hnsw(
506 path: &std::path::Path,
507 config: &CollectionConfig,
508 ) -> Result<Arc<HnswIndex>> {
509 if path.join("hnsw.bin").exists() {
510 let idx = HnswIndex::load(path, config.dimension, config.metric)?;
511 Ok(Arc::new(idx))
512 } else if let Some(params) = config.hnsw_params {
513 Ok(Arc::new(HnswIndex::with_params(
514 config.dimension,
515 config.metric,
516 params,
517 )?))
518 } else {
519 Ok(Arc::new(HnswIndex::new(config.dimension, config.metric)?))
520 }
521 }
522
523 fn load_named_sparse_indexes(
544 path: &std::path::Path,
545 ) -> BTreeMap<String, crate::index::sparse::SparseInvertedIndex> {
546 let mut indexes = BTreeMap::new();
547
548 match crate::index::sparse::persistence::load_from_disk(path) {
550 Ok(Some(idx)) => {
551 indexes.insert(DEFAULT_SPARSE_INDEX_NAME.to_string(), idx);
552 }
553 Ok(None) => {}
554 Err(e) => {
555 tracing::warn!(
556 "Failed to load default sparse index from {:?}: {}. Skipping.",
557 path,
558 e
559 );
560 }
561 }
562
563 if let Ok(entries) = std::fs::read_dir(path) {
568 for entry in entries.flatten() {
569 let file_name = entry.file_name();
570 let name_str = file_name.to_string_lossy();
571 if let Some(sparse_name) = name_str
572 .strip_prefix("sparse-")
573 .and_then(|s| s.strip_suffix(".meta"))
574 {
575 let sparse_name = sparse_name.to_string();
576 match crate::index::sparse::persistence::load_named_from_disk(
577 path,
578 &sparse_name,
579 ) {
580 Ok(Some(idx)) => {
581 indexes.insert(sparse_name, idx);
582 }
583 Ok(None) => {}
584 Err(e) => {
585 tracing::warn!(
586 "Failed to load sparse index '{}' from {:?}: {}. Skipping.",
587 sparse_name,
588 path,
589 e
590 );
591 }
592 }
593 }
594 }
595 }
596
597 indexes
598 }
599
600 fn load_or_default<T>(
606 path: &std::path::Path,
607 file_name: &str,
608 load_fn: impl FnOnce(&std::path::Path) -> std::io::Result<T>,
609 default: impl FnOnce() -> T,
610 ) -> T {
611 let full_path = path.join(file_name);
612 if full_path.exists() {
613 match load_fn(&full_path) {
614 Ok(val) => return val,
615 Err(e) => tracing::warn!(
616 "Failed to load {} from {:?}: {}. Starting with empty default.",
617 file_name,
618 full_path,
619 e
620 ),
621 }
622 }
623 default()
624 }
625
626 fn load_edge_store(path: &std::path::Path) -> EdgeStore {
627 Self::load_or_default(
628 path,
629 "edge_store.bin",
630 EdgeStore::load_from_file,
631 EdgeStore::new,
632 )
633 }
634
635 fn load_property_index(path: &std::path::Path) -> PropertyIndex {
636 Self::load_or_default(
637 path,
638 "property_index.bin",
639 PropertyIndex::load_from_file,
640 PropertyIndex::new,
641 )
642 }
643
644 fn load_range_index(path: &std::path::Path) -> RangeIndex {
645 Self::load_or_default(
646 path,
647 "range_index.bin",
648 RangeIndex::load_from_file,
649 RangeIndex::new,
650 )
651 }
652
653 #[must_use]
655 pub fn guard_rails(&self) -> &std::sync::Arc<crate::guardrails::GuardRails> {
656 &self.guard_rails
657 }
658
659 #[must_use]
661 pub fn config(&self) -> CollectionConfig {
662 self.config.read().clone()
663 }
664
665 const HNSW_SAVE_THRESHOLD: u64 = 10_000;
670
671 pub fn flush(&self) -> Result<()> {
683 self.save_config()?;
684 self.vector_storage.write().flush()?;
688 self.payload_storage.write().flush()?;
689 self.drain_delta_into_index();
693 self.drain_deferred_into_index();
695 self.save_hnsw_if_threshold_exceeded()?;
698 self.flush_secondary_indexes()?;
699 self.flush_sparse_indexes()
700 }
701
702 pub fn flush_full(&self) -> Result<()> {
713 self.save_config()?;
714 self.vector_storage.write().flush()?;
715 self.payload_storage.write().flush()?;
716 self.drain_delta_into_index();
717 self.drain_deferred_into_index();
718 self.index.save(&self.path)?;
720 self.inserts_since_last_hnsw_save
721 .store(0, std::sync::atomic::Ordering::Relaxed);
722 self.flush_secondary_indexes()?;
723 self.flush_sparse_indexes()?;
724 self.vector_storage.read().flush_index()?;
726 Ok(())
727 }
728
729 fn save_hnsw_if_threshold_exceeded(&self) -> Result<()> {
734 let count = self
735 .inserts_since_last_hnsw_save
736 .load(std::sync::atomic::Ordering::Relaxed);
737 if count > Self::HNSW_SAVE_THRESHOLD {
738 self.index.save(&self.path)?;
739 self.inserts_since_last_hnsw_save
740 .store(0, std::sync::atomic::Ordering::Relaxed);
741 }
742 Ok(())
743 }
744
745 #[cfg(feature = "persistence")]
764 fn drain_delta_into_index(&self) {
765 let drained = self.delta_buffer.deactivate_and_drain();
766 if drained.is_empty() {
767 return;
768 }
769 let storage = self.vector_storage.read();
772 let valid: Vec<(u64, &[f32])> = drained
773 .iter()
774 .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
775 .map(|(id, v)| (*id, v.as_slice()))
776 .collect();
777 drop(storage); if !valid.is_empty() {
779 self.index.insert_batch_parallel(valid);
780 }
781 }
782
783 #[cfg(not(feature = "persistence"))]
785 fn drain_delta_into_index(&self) {}
786
787 #[cfg(feature = "persistence")]
806 fn drain_deferred_into_index(&self) {
807 if let Some(ref di) = self.deferred_indexer {
808 let drained = di.drain_all();
809 if drained.is_empty() {
810 return;
811 }
812 let storage = self.vector_storage.read();
815 let valid: Vec<(u64, &[f32])> = drained
816 .iter()
817 .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
818 .map(|(id, v)| (*id, v.as_slice()))
819 .collect();
820 drop(storage); if !valid.is_empty() {
822 self.index.insert_batch_parallel(valid);
823 }
824 }
825 }
826
827 #[cfg(not(feature = "persistence"))]
829 fn drain_deferred_into_index(&self) {}
830
831 fn flush_secondary_indexes(&self) -> Result<()> {
833 let property_index_path = self.path.join("property_index.bin");
834 self.property_index
835 .read()
836 .save_to_file(&property_index_path)?;
837
838 let range_index_path = self.path.join("range_index.bin");
839 self.range_index.read().save_to_file(&range_index_path)?;
840
841 if self.config.read().graph_schema.is_some() {
843 let edge_store_path = self.path.join("edge_store.bin");
844 self.edge_store.read().save_to_file(&edge_store_path)?;
845 }
846
847 Ok(())
848 }
849
850 fn flush_sparse_indexes(&self) -> Result<()> {
852 let indexes = self.sparse_indexes.read();
853 for (name, idx) in indexes.iter() {
854 crate::index::sparse::persistence::compact_named(&self.path, name, idx)?;
855 }
856 Ok(())
857 }
858
859 #[must_use]
861 pub(crate) fn data_path(&self) -> &std::path::Path {
862 &self.path
863 }
864
865 pub(crate) fn config_write(
867 &self,
868 ) -> parking_lot::RwLockWriteGuard<'_, crate::collection::types::CollectionConfig> {
869 self.config.write()
870 }
871
872 pub(crate) fn pq_quantizer_write(
874 &self,
875 ) -> parking_lot::RwLockWriteGuard<'_, Option<crate::quantization::ProductQuantizer>> {
876 self.pq_quantizer.write()
877 }
878
879 pub(crate) fn pq_quantizer_read(
881 &self,
882 ) -> parking_lot::RwLockReadGuard<'_, Option<crate::quantization::ProductQuantizer>> {
883 self.pq_quantizer.read()
884 }
885
886 pub(crate) fn save_config(&self) -> Result<()> {
890 use std::io::Write;
891
892 let config = self.config.read();
893 let config_path = self.path.join("config.json");
894 let tmp_path = self.path.join("config.json.tmp");
895 let config_data = serde_json::to_string_pretty(&*config)
896 .map_err(|e| Error::Serialization(e.to_string()))?;
897
898 let file = std::fs::File::create(&tmp_path)?;
899 let mut writer = std::io::BufWriter::new(file);
900 writer.write_all(config_data.as_bytes())?;
901 writer.flush()?;
902 writer.get_ref().sync_all()?;
903 std::fs::rename(&tmp_path, &config_path)?;
904 Ok(())
905 }
906}