velesdb_core/collection/core/lifecycle.rs
1//! Collection lifecycle methods (create, open, flush).
2
3use crate::collection::graph::{EdgeStore, PropertyIndex, RangeIndex};
4use crate::collection::types::{Collection, CollectionConfig, CollectionType};
5use crate::distance::DistanceMetric;
6use crate::error::{Error, Result};
7use crate::guardrails::GuardRails;
8use crate::index::{Bm25Index, HnswIndex};
9use crate::quantization::StorageMode;
10use crate::sparse_index::DEFAULT_SPARSE_INDEX_NAME;
11use crate::storage::{LogPayloadStorage, MmapStorage, PayloadStorage, VectorStorage};
12use crate::validation::validate_dimension;
13use crate::velesql::{QueryCache, QueryPlanner};
14
15use crate::index::sparse::SparseInvertedIndex;
16
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use parking_lot::{Mutex, RwLock};
20use std::path::PathBuf;
21use std::sync::Arc;
22
23/// Pre-built components needed to assemble a [`Collection`].
24///
25/// Used by [`Collection::assemble`] as the single point of truth for the
26/// struct literal, eliminating duplication across the five public constructors.
27struct CollectionParts {
28 path: PathBuf,
29 config: CollectionConfig,
30 vector_storage: Arc<RwLock<MmapStorage>>,
31 payload_storage: Arc<RwLock<LogPayloadStorage>>,
32 index: Arc<HnswIndex>,
33 text_index: Arc<Bm25Index>,
34 property_index: PropertyIndex,
35 range_index: RangeIndex,
36 edge_store: EdgeStore,
37 sparse_indexes: BTreeMap<String, SparseInvertedIndex>,
38}
39
40impl CollectionParts {
41 /// Returns a new `CollectionParts` with empty graph and sparse indexes.
42 ///
43 /// The six storage/index fields must be supplied by the caller; only the
44 /// four optional index fields (`property_index`, `range_index`,
45 /// `edge_store`, `sparse_indexes`) default to empty.
46 fn new_with_empty_indexes(
47 path: PathBuf,
48 config: CollectionConfig,
49 vector_storage: Arc<RwLock<MmapStorage>>,
50 payload_storage: Arc<RwLock<LogPayloadStorage>>,
51 index: Arc<HnswIndex>,
52 text_index: Arc<Bm25Index>,
53 ) -> Self {
54 Self {
55 path,
56 config,
57 vector_storage,
58 payload_storage,
59 index,
60 text_index,
61 property_index: PropertyIndex::new(),
62 range_index: RangeIndex::new(),
63 edge_store: EdgeStore::new(),
64 sparse_indexes: BTreeMap::new(),
65 }
66 }
67}
68
69impl Collection {
70 /// Assembles a `Collection` from pre-built components and default caches.
71 ///
72 /// This is the single point of truth for the `Self { .. }` struct literal,
73 /// eliminating duplication across the five public constructors.
74 fn assemble(parts: CollectionParts) -> Self {
75 #[cfg(feature = "persistence")]
76 let deferred_indexer = Self::build_deferred_indexer(&parts.config);
77
78 Self {
79 path: parts.path,
80 config: Arc::new(RwLock::new(parts.config)),
81 vector_storage: parts.vector_storage,
82 payload_storage: parts.payload_storage,
83 index: parts.index,
84 text_index: parts.text_index,
85 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
86 binary_cache: Arc::new(RwLock::new(HashMap::new())),
87 pq_cache: Arc::new(RwLock::new(HashMap::new())),
88 pq_quantizer: Arc::new(RwLock::new(None)),
89 pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
90 property_index: Arc::new(RwLock::new(parts.property_index)),
91 range_index: Arc::new(RwLock::new(parts.range_index)),
92 edge_store: Arc::new(RwLock::new(parts.edge_store)),
93 sparse_indexes: Arc::new(RwLock::new(parts.sparse_indexes)),
94 secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
95 guard_rails: Arc::new(GuardRails::default()),
96 query_planner: Arc::new(QueryPlanner::new()),
97 query_cache: Arc::new(QueryCache::new(256)),
98 cached_stats: Arc::new(Mutex::new(None)),
99 write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
100 #[cfg(feature = "persistence")]
101 stream_ingester: Arc::new(RwLock::new(None)),
102 #[cfg(feature = "persistence")]
103 delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
104 #[cfg(feature = "persistence")]
105 deferred_indexer,
106 }
107 }
108
109 /// Builds the optional `DeferredIndexer` from config.
110 ///
111 /// Returns `Some(Arc<DeferredIndexer>)` when `deferred_indexing` is
112 /// configured and enabled; `None` otherwise.
113 #[cfg(feature = "persistence")]
114 fn build_deferred_indexer(
115 config: &CollectionConfig,
116 ) -> Option<Arc<crate::collection::streaming::DeferredIndexer>> {
117 config
118 .deferred_indexing
119 .as_ref()
120 .filter(|cfg| cfg.enabled)
121 .map(|cfg| {
122 Arc::new(crate::collection::streaming::DeferredIndexer::new(
123 cfg.clone(),
124 ))
125 })
126 }
127
128 /// Initialises persistent storages and indexes for a new collection.
129 ///
130 /// Returns a complete `CollectionParts` with empty graph/sparse indexes,
131 /// ready to be passed to [`Self::assemble`].
132 fn init_collection_parts(
133 path: PathBuf,
134 config: CollectionConfig,
135 hnsw_params: Option<crate::index::hnsw::HnswParams>,
136 ) -> Result<CollectionParts> {
137 let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
138 let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
139 let index = if let Some(params) = hnsw_params {
140 Arc::new(HnswIndex::with_params(
141 config.dimension,
142 config.metric,
143 params,
144 )?)
145 } else {
146 Arc::new(HnswIndex::new(config.dimension, config.metric)?)
147 };
148 let text_index = Arc::new(Bm25Index::new());
149 Ok(CollectionParts::new_with_empty_indexes(
150 path,
151 config,
152 vector_storage,
153 payload_storage,
154 index,
155 text_index,
156 ))
157 }
158
159 /// Rebuilds the BM25 full-text index from persisted payloads.
160 fn rebuild_bm25_index(
161 payload_storage: &Arc<RwLock<LogPayloadStorage>>,
162 text_index: &Arc<Bm25Index>,
163 ) {
164 let storage = payload_storage.read();
165 let ids = storage.ids();
166 for id in ids {
167 if let Ok(Some(payload)) = storage.retrieve(id) {
168 let text = Self::extract_text_from_payload(&payload);
169 if !text.is_empty() {
170 text_index.add_document(id, &text);
171 }
172 }
173 }
174 }
175
176 /// Creates a new collection at the specified path.
177 ///
178 /// # Errors
179 ///
180 /// Returns an error if the directory cannot be created or the config cannot be saved.
181 pub fn create(path: PathBuf, dimension: usize, metric: DistanceMetric) -> Result<Self> {
182 Self::create_with_options(path, dimension, metric, StorageMode::default())
183 }
184
185 /// Derives the collection name from the directory path.
186 fn name_from_path(path: &std::path::Path) -> String {
187 path.file_name()
188 .and_then(|n| n.to_str())
189 .unwrap_or("unknown")
190 .to_string()
191 }
192
193 /// Shared init-and-persist pipeline for all `create_*` constructors.
194 ///
195 /// Validates dimensions (when non-zero), creates the directory, assembles
196 /// the collection from the supplied config, and persists `config.json`.
197 fn create_from_config(
198 path: PathBuf,
199 config: CollectionConfig,
200 hnsw_params: Option<crate::index::hnsw::HnswParams>,
201 ) -> Result<Self> {
202 let skip_dimension_check = config.metadata_only
203 || (config.graph_schema.is_some() && config.embedding_dimension.is_none());
204 if skip_dimension_check {
205 // dimension=0 is valid for metadata-only and graph-without-embedding
206 } else {
207 validate_dimension(config.dimension)?;
208 }
209 std::fs::create_dir_all(&path)?;
210
211 let collection = Self::assemble(Self::init_collection_parts(path, config, hnsw_params)?);
212 collection.save_config()?;
213 Ok(collection)
214 }
215
216 /// Creates a new collection with custom storage options.
217 ///
218 /// # Arguments
219 ///
220 /// * `path` - Path to the collection directory
221 /// * `dimension` - Vector dimension
222 /// * `metric` - Distance metric
223 /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
224 ///
225 /// # Errors
226 ///
227 /// Returns an error if the directory cannot be created or the config cannot be saved.
228 pub fn create_with_options(
229 path: PathBuf,
230 dimension: usize,
231 metric: DistanceMetric,
232 storage_mode: StorageMode,
233 ) -> Result<Self> {
234 let config = CollectionConfig {
235 name: Self::name_from_path(&path),
236 dimension,
237 metric,
238 point_count: 0,
239 storage_mode,
240 metadata_only: false,
241 graph_schema: None,
242 embedding_dimension: None,
243 pq_rescore_oversampling: Some(4),
244 hnsw_params: None,
245 #[cfg(feature = "persistence")]
246 deferred_indexing: None,
247 };
248 Self::create_from_config(path, config, None)
249 }
250
251 /// Creates a new collection with custom HNSW parameters.
252 ///
253 /// This is the lowest-level vector collection constructor, giving full
254 /// control over the HNSW graph topology (M, `ef_construction`) while
255 /// retaining the standard storage pipeline.
256 ///
257 /// # Arguments
258 ///
259 /// * `path` - Path to the collection directory
260 /// * `dimension` - Vector dimension
261 /// * `metric` - Distance metric
262 /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
263 /// * `hnsw_params` - Custom HNSW index parameters
264 ///
265 /// # Errors
266 ///
267 /// Returns an error if the directory cannot be created or the config cannot be saved.
268 pub fn create_with_hnsw_params(
269 path: PathBuf,
270 dimension: usize,
271 metric: DistanceMetric,
272 storage_mode: StorageMode,
273 hnsw_params: crate::index::hnsw::HnswParams,
274 ) -> Result<Self> {
275 let config = CollectionConfig {
276 name: Self::name_from_path(&path),
277 dimension,
278 metric,
279 point_count: 0,
280 storage_mode,
281 metadata_only: false,
282 graph_schema: None,
283 embedding_dimension: None,
284 pq_rescore_oversampling: Some(4),
285 hnsw_params: Some(hnsw_params),
286 #[cfg(feature = "persistence")]
287 deferred_indexing: None,
288 };
289 Self::create_from_config(path, config, Some(hnsw_params))
290 }
291
292 /// Creates a new collection with a specific type (Vector or `MetadataOnly`).
293 ///
294 /// # Arguments
295 ///
296 /// * `path` - Path to the collection directory
297 /// * `name` - Name of the collection
298 /// * `collection_type` - Type of collection to create
299 ///
300 /// # Errors
301 ///
302 /// Returns an error if the directory cannot be created or the config cannot be saved.
303 pub fn create_typed(
304 path: PathBuf,
305 name: &str,
306 collection_type: &CollectionType,
307 ) -> Result<Self> {
308 match collection_type {
309 CollectionType::Vector {
310 dimension,
311 metric,
312 storage_mode,
313 } => Self::create_with_options(path, *dimension, *metric, *storage_mode),
314 CollectionType::MetadataOnly => Self::create_metadata_only(path, name),
315 CollectionType::Graph { .. } => {
316 // Graph collections will be implemented in EPIC-004
317 // For now, return an error indicating this is not yet supported
318 Err(crate::Error::GraphNotSupported(
319 "Graph collection creation not yet implemented".to_string(),
320 ))
321 }
322 }
323 }
324
325 /// Creates a new metadata-only collection (no vectors, no HNSW index).
326 ///
327 /// Metadata-only collections are optimized for storing reference data,
328 /// catalogs, and other non-vector data. They support CRUD operations
329 /// and `VelesQL` queries on payload, but NOT vector search.
330 ///
331 /// # Errors
332 ///
333 /// Returns an error if the directory cannot be created or the config cannot be saved.
334 pub fn create_metadata_only(path: PathBuf, name: &str) -> Result<Self> {
335 let config = CollectionConfig {
336 name: name.to_string(),
337 dimension: 0, // No vector dimension
338 metric: DistanceMetric::Cosine, // Default, not used
339 point_count: 0,
340 storage_mode: StorageMode::Full, // Default, not used
341 metadata_only: true,
342 graph_schema: None,
343 embedding_dimension: None,
344 pq_rescore_oversampling: Some(4),
345 hnsw_params: None,
346 #[cfg(feature = "persistence")]
347 deferred_indexing: None,
348 };
349 Self::create_from_config(path, config, None)
350 }
351
352 /// Returns true if this is a metadata-only collection.
353 #[must_use]
354 pub fn is_metadata_only(&self) -> bool {
355 self.config.read().metadata_only
356 }
357
358 /// Opens an existing collection from the specified path.
359 ///
360 /// # Errors
361 ///
362 /// Returns an error if the config file cannot be read or parsed.
363 ///
364 /// # INVARIANT(CACHE-01): write_generation starts at 0 on open
365 ///
366 /// Every call to `Collection::open` initialises `write_generation` to 0.
367 /// This is **safe** for cache correctness because:
368 ///
369 /// 1. The plan cache is **not persisted** across process restarts — it is
370 /// always empty when the database opens. There are therefore no stale
371 /// cached plans that could be incorrectly served.
372 ///
373 /// 2. `Database::load_collections` bumps `schema_version` after loading
374 /// at least one collection (C-3). Any plan key built before the load
375 /// would carry the pre-load `schema_version` and would miss the cache
376 /// even if the `write_generation` happened to match.
377 ///
378 /// 3. Within a single process lifetime the `write_generation` is only ever
379 /// incremented (never reset), so a cache key built with generation N
380 /// will never be reused once the generation advances past N.
381 pub fn open(path: PathBuf) -> Result<Self> {
382 let config_path = path.join("config.json");
383 let config_data = std::fs::read_to_string(&config_path)?;
384 let config: CollectionConfig =
385 serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))?;
386
387 // Open persistent storages
388 let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
389 let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
390
391 // Load HNSW index if it exists, otherwise create new (empty).
392 // When hnsw.bin is absent and config.hnsw_params is set, honour the
393 // persisted custom params so they survive collection reopen.
394 let index = Self::load_or_create_hnsw(&path, &config)?;
395 let text_index = Arc::new(Bm25Index::new());
396
397 // Rebuild BM25 index from persisted payloads
398 Self::rebuild_bm25_index(&payload_storage, &text_index);
399
400 // Load persisted graph/sparse indexes (EPIC-009, EPIC-062)
401 let property_index = Self::load_property_index(&path);
402 let range_index = Self::load_range_index(&path);
403 let edge_store = Self::load_edge_store(&path);
404 let sparse_indexes = Self::load_named_sparse_indexes(&path);
405
406 // Reconcile point_count from storage (config.json may be stale if
407 // the previous process exited without calling save_config).
408 let actual_count = if config.metadata_only {
409 payload_storage.read().ids().len()
410 } else {
411 vector_storage.read().len()
412 };
413 let mut config = config;
414 config.point_count = actual_count;
415
416 // Crash recovery: detect vectors in storage but not in HNSW (gap from
417 // crash during deferred merge, delta drain, or normal insert).
418 #[cfg(feature = "persistence")]
419 if !config.metadata_only && config.dimension > 0 {
420 let recovered =
421 super::recovery::recover_hnsw_gap(&vector_storage, &index, config.dimension)?;
422 if recovered > 0 {
423 tracing::info!(
424 collection = %config.name,
425 recovered,
426 "Collection gap recovery completed on open"
427 );
428 }
429 }
430
431 Ok(Self::assemble(CollectionParts {
432 path,
433 config,
434 vector_storage,
435 payload_storage,
436 index,
437 text_index,
438 property_index,
439 range_index,
440 edge_store,
441 sparse_indexes,
442 }))
443 }
444
445 /// Creates a new graph collection (with optional node embeddings).
446 ///
447 /// Persists `graph_schema` and `embedding_dimension` in `config.json`.
448 ///
449 /// # Errors
450 ///
451 /// Returns an error if the directory cannot be created or the config cannot be saved.
452 pub fn create_graph_collection(
453 path: PathBuf,
454 name: &str,
455 schema: crate::collection::graph::GraphSchema,
456 embedding_dim: Option<usize>,
457 metric: DistanceMetric,
458 ) -> Result<Self> {
459 let config = CollectionConfig {
460 name: name.to_string(),
461 dimension: embedding_dim.unwrap_or(0),
462 metric,
463 point_count: 0,
464 storage_mode: StorageMode::Full,
465 metadata_only: false,
466 graph_schema: Some(schema),
467 embedding_dimension: embedding_dim,
468 pq_rescore_oversampling: Some(4),
469 hnsw_params: None,
470 #[cfg(feature = "persistence")]
471 deferred_indexing: None,
472 };
473 // NOTE: create_from_config validates dimension only when > 0,
474 // so embedding_dim=None (dimension=0) skips validation correctly.
475 Self::create_from_config(path, config, None)
476 }
477
478 /// Loads the HNSW index from `hnsw.bin` or creates an empty one.
479 ///
480 /// When `hnsw.bin` is absent and `config.hnsw_params` is set, the
481 /// persisted custom params are honoured so they survive collection reopen.
482 fn load_or_create_hnsw(
483 path: &std::path::Path,
484 config: &CollectionConfig,
485 ) -> Result<Arc<HnswIndex>> {
486 if path.join("hnsw.bin").exists() {
487 let idx = HnswIndex::load(path, config.dimension, config.metric)?;
488 Ok(Arc::new(idx))
489 } else if let Some(params) = config.hnsw_params {
490 Ok(Arc::new(HnswIndex::with_params(
491 config.dimension,
492 config.metric,
493 params,
494 )?))
495 } else {
496 Ok(Arc::new(HnswIndex::new(config.dimension, config.metric)?))
497 }
498 }
499
500 /// Loads all named sparse indexes from disk.
501 ///
502 /// Scans for `sparse.meta` (default name `""`) and `sparse-{name}.meta` files.
503 /// Returns a `BTreeMap` keyed by sparse vector name.
504 ///
505 /// # Concurrency safety of `read_dir`
506 ///
507 /// The `read_dir` scan below is safe from race conditions for two reasons:
508 ///
509 /// 1. **Single-threaded open**: `Collection::open` (and therefore this
510 /// function) is always called from `Database::open`, which runs
511 /// single-threaded during startup. No concurrent writers exist at this
512 /// point.
513 ///
514 /// 2. **Atomic rename in compaction**: `compact_with_prefix` writes new
515 /// data to `{prefix}.*.tmp` staging files and only promotes them to
516 /// their final names via an atomic `rename(2)`. A `read_dir` scan
517 /// therefore never observes a partially-written `sparse-*.meta` file;
518 /// it either sees the complete previous version or the complete new
519 /// version — never a torn write.
520 fn load_named_sparse_indexes(
521 path: &std::path::Path,
522 ) -> BTreeMap<String, crate::index::sparse::SparseInvertedIndex> {
523 let mut indexes = BTreeMap::new();
524
525 // Load default (unprefixed) sparse index: sparse.meta / sparse.wal
526 match crate::index::sparse::persistence::load_from_disk(path) {
527 Ok(Some(idx)) => {
528 indexes.insert(DEFAULT_SPARSE_INDEX_NAME.to_string(), idx);
529 }
530 Ok(None) => {}
531 Err(e) => {
532 tracing::warn!(
533 "Failed to load default sparse index from {:?}: {}. Skipping.",
534 path,
535 e
536 );
537 }
538 }
539
540 // Scan for named sparse indexes: sparse-{name}.meta files.
541 // The `.meta` suffix is the sentinel for a fully compacted (committed)
542 // index file; stale `.tmp` artefacts from interrupted compactions are
543 // ignored because they do not match the `strip_suffix(".meta")` filter.
544 if let Ok(entries) = std::fs::read_dir(path) {
545 for entry in entries.flatten() {
546 let file_name = entry.file_name();
547 let name_str = file_name.to_string_lossy();
548 if let Some(sparse_name) = name_str
549 .strip_prefix("sparse-")
550 .and_then(|s| s.strip_suffix(".meta"))
551 {
552 let sparse_name = sparse_name.to_string();
553 match crate::index::sparse::persistence::load_named_from_disk(
554 path,
555 &sparse_name,
556 ) {
557 Ok(Some(idx)) => {
558 indexes.insert(sparse_name, idx);
559 }
560 Ok(None) => {}
561 Err(e) => {
562 tracing::warn!(
563 "Failed to load sparse index '{}' from {:?}: {}. Skipping.",
564 sparse_name,
565 path,
566 e
567 );
568 }
569 }
570 }
571 }
572 }
573
574 indexes
575 }
576
577 /// Loads a persisted index from disk, falling back to a default on missing
578 /// file or deserialization error.
579 ///
580 /// This is the single implementation for the load-or-default pattern shared
581 /// by `PropertyIndex`, `RangeIndex`, and `EdgeStore`.
582 fn load_or_default<T>(
583 path: &std::path::Path,
584 file_name: &str,
585 load_fn: impl FnOnce(&std::path::Path) -> std::io::Result<T>,
586 default: impl FnOnce() -> T,
587 ) -> T {
588 let full_path = path.join(file_name);
589 if full_path.exists() {
590 match load_fn(&full_path) {
591 Ok(val) => return val,
592 Err(e) => tracing::warn!(
593 "Failed to load {} from {:?}: {}. Starting with empty default.",
594 file_name,
595 full_path,
596 e
597 ),
598 }
599 }
600 default()
601 }
602
603 fn load_edge_store(path: &std::path::Path) -> EdgeStore {
604 Self::load_or_default(
605 path,
606 "edge_store.bin",
607 EdgeStore::load_from_file,
608 EdgeStore::new,
609 )
610 }
611
612 fn load_property_index(path: &std::path::Path) -> PropertyIndex {
613 Self::load_or_default(
614 path,
615 "property_index.bin",
616 PropertyIndex::load_from_file,
617 PropertyIndex::new,
618 )
619 }
620
621 fn load_range_index(path: &std::path::Path) -> RangeIndex {
622 Self::load_or_default(
623 path,
624 "range_index.bin",
625 RangeIndex::load_from_file,
626 RangeIndex::new,
627 )
628 }
629
630 /// Returns a reference to the collection's guard rails.
631 #[must_use]
632 pub fn guard_rails(&self) -> &std::sync::Arc<crate::guardrails::GuardRails> {
633 &self.guard_rails
634 }
635
636 /// Returns the collection configuration.
637 #[must_use]
638 pub fn config(&self) -> CollectionConfig {
639 self.config.read().clone()
640 }
641
642 /// Saves the collection configuration and index to disk.
643 ///
644 /// If the delta buffer is active (HNSW rebuild in progress), drains
645 /// buffered vectors into the HNSW index before persisting it. This
646 /// ensures graceful shutdown does not lose vectors that were accepted
647 /// during the rebuild window.
648 ///
649 /// # Errors
650 ///
651 /// Returns an error if storage operations fail.
652 pub fn flush(&self) -> Result<()> {
653 self.save_config()?;
654 self.vector_storage.write().flush()?;
655 self.payload_storage.write().flush()?;
656 // Drain delta buffer into HNSW before persisting the index.
657 // Lock order: delta_buffer(10) is acquired after vector_storage(2)
658 // and payload_storage(3) — both already released above.
659 self.drain_delta_into_index();
660 // Drain deferred indexer into HNSW (position 11, after delta at 10).
661 self.drain_deferred_into_index();
662 self.index.save(&self.path)?;
663 self.flush_secondary_indexes()?;
664 self.flush_sparse_indexes()
665 }
666
667 /// Drains the delta buffer into the HNSW index (if active).
668 ///
669 /// No-op when the delta buffer is inactive (no rebuild in progress).
670 /// After draining, the buffer is empty and inactive.
671 ///
672 /// Filters out IDs that have been deleted from vector storage since they
673 /// were buffered, preventing ghost vectors from being re-inserted into
674 /// HNSW after a concurrent delete.
675 ///
676 /// Uses `insert_batch_parallel` for consistent batch insert performance
677 /// (same strategy as `merge_deferred_batch` in crud.rs).
678 ///
679 /// # Lock ordering
680 ///
681 /// Acquires `vector_storage` (position 2) briefly for the validity
682 /// check, releases it, then inserts into the index (no lock).
683 /// `delta_buffer` (position 10) is acquired first via `deactivate_and_drain`.
684 /// The caller must NOT hold any lower-numbered lock when calling this method.
685 #[cfg(feature = "persistence")]
686 fn drain_delta_into_index(&self) {
687 let drained = self.delta_buffer.deactivate_and_drain();
688 if drained.is_empty() {
689 return;
690 }
691 // Filter out vectors deleted from storage during the buffer's
692 // lifetime to prevent ghost re-insertion into HNSW.
693 let storage = self.vector_storage.read();
694 let valid: Vec<(u64, &[f32])> = drained
695 .iter()
696 .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
697 .map(|(id, v)| (*id, v.as_slice()))
698 .collect();
699 drop(storage); // Release read lock before batch insert
700 if !valid.is_empty() {
701 self.index.insert_batch_parallel(valid);
702 }
703 }
704
705 /// No-op stub when persistence is disabled.
706 #[cfg(not(feature = "persistence"))]
707 fn drain_delta_into_index(&self) {}
708
709 /// Drains the deferred indexer into the HNSW index (if configured).
710 ///
711 /// No-op when deferred indexing is not configured or disabled.
712 /// After draining, both buffers are empty and inactive.
713 ///
714 /// Filters out IDs that have been deleted from vector storage since they
715 /// were buffered, preventing ghost vectors from being re-inserted into
716 /// HNSW after a concurrent delete.
717 ///
718 /// Uses `insert_batch_parallel` for consistent batch insert performance
719 /// (same strategy as `merge_deferred_batch` in crud.rs).
720 ///
721 /// # Lock ordering
722 ///
723 /// Acquires `vector_storage` (position 2) briefly for the validity
724 /// check, releases it, then inserts into the index (no lock).
725 /// `deferred_indexer` (position 11) is acquired first via `drain_all`.
726 /// The caller must NOT hold any lower-numbered lock.
727 #[cfg(feature = "persistence")]
728 fn drain_deferred_into_index(&self) {
729 if let Some(ref di) = self.deferred_indexer {
730 let drained = di.drain_all();
731 if drained.is_empty() {
732 return;
733 }
734 // Filter out vectors deleted from storage during the buffer's
735 // lifetime to prevent ghost re-insertion into HNSW.
736 let storage = self.vector_storage.read();
737 let valid: Vec<(u64, &[f32])> = drained
738 .iter()
739 .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
740 .map(|(id, v)| (*id, v.as_slice()))
741 .collect();
742 drop(storage); // Release read lock before batch insert
743 if !valid.is_empty() {
744 self.index.insert_batch_parallel(valid);
745 }
746 }
747 }
748
749 /// No-op stub when persistence is disabled.
750 #[cfg(not(feature = "persistence"))]
751 fn drain_deferred_into_index(&self) {}
752
753 /// Persists property index, range index, and edge store (EPIC-009 US-005).
754 fn flush_secondary_indexes(&self) -> Result<()> {
755 let property_index_path = self.path.join("property_index.bin");
756 self.property_index
757 .read()
758 .save_to_file(&property_index_path)?;
759
760 let range_index_path = self.path.join("range_index.bin");
761 self.range_index.read().save_to_file(&range_index_path)?;
762
763 // Save EdgeStore for graph collections (BUG-1: was never persisted)
764 if self.config.read().graph_schema.is_some() {
765 let edge_store_path = self.path.join("edge_store.bin");
766 self.edge_store.read().save_to_file(&edge_store_path)?;
767 }
768
769 Ok(())
770 }
771
772 /// Compacts all named sparse indexes to disk (EPIC-062 / SPARSE-04).
773 fn flush_sparse_indexes(&self) -> Result<()> {
774 let indexes = self.sparse_indexes.read();
775 for (name, idx) in indexes.iter() {
776 crate::index::sparse::persistence::compact_named(&self.path, name, idx)?;
777 }
778 Ok(())
779 }
780
781 /// Returns a reference to the collection's data path.
782 #[must_use]
783 pub(crate) fn data_path(&self) -> &std::path::Path {
784 &self.path
785 }
786
787 /// Returns a write guard on the collection config for mutation.
788 pub(crate) fn config_write(
789 &self,
790 ) -> parking_lot::RwLockWriteGuard<'_, crate::collection::types::CollectionConfig> {
791 self.config.write()
792 }
793
794 /// Returns a write guard on the PQ quantizer slot.
795 pub(crate) fn pq_quantizer_write(
796 &self,
797 ) -> parking_lot::RwLockWriteGuard<'_, Option<crate::quantization::ProductQuantizer>> {
798 self.pq_quantizer.write()
799 }
800
801 /// Returns a read guard on the PQ quantizer slot.
802 pub(crate) fn pq_quantizer_read(
803 &self,
804 ) -> parking_lot::RwLockReadGuard<'_, Option<crate::quantization::ProductQuantizer>> {
805 self.pq_quantizer.read()
806 }
807
808 /// Saves the collection configuration to disk.
809 ///
810 /// Uses atomic write-tmp-fsync-rename to prevent torn writes on crash.
811 pub(crate) fn save_config(&self) -> Result<()> {
812 use std::io::Write;
813
814 let config = self.config.read();
815 let config_path = self.path.join("config.json");
816 let tmp_path = self.path.join("config.json.tmp");
817 let config_data = serde_json::to_string_pretty(&*config)
818 .map_err(|e| Error::Serialization(e.to_string()))?;
819
820 let file = std::fs::File::create(&tmp_path)?;
821 let mut writer = std::io::BufWriter::new(file);
822 writer.write_all(config_data.as_bytes())?;
823 writer.flush()?;
824 writer.get_ref().sync_all()?;
825 std::fs::rename(&tmp_path, &config_path)?;
826 Ok(())
827 }
828}