velesdb_core/collection/core/lifecycle.rs
1//! Collection lifecycle methods (create, open, flush).
2
3use crate::collection::graph::{EdgeStore, PropertyIndex, RangeIndex};
4use crate::collection::types::{Collection, CollectionConfig, CollectionType};
5use crate::distance::DistanceMetric;
6use crate::error::{Error, Result};
7use crate::guardrails::GuardRails;
8use crate::index::{Bm25Index, HnswIndex};
9use crate::quantization::StorageMode;
10use crate::sparse_index::DEFAULT_SPARSE_INDEX_NAME;
11use crate::storage::{LogPayloadStorage, MmapStorage, PayloadStorage, VectorStorage};
12use crate::validation::validate_dimension;
13use crate::velesql::{QueryCache, QueryPlanner};
14
15use crate::index::sparse::SparseInvertedIndex;
16
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use parking_lot::{Mutex, RwLock};
20use std::path::PathBuf;
21use std::sync::Arc;
22
23/// Pre-built components needed to assemble a [`Collection`].
24///
25/// Used by [`Collection::assemble`] as the single point of truth for the
26/// struct literal, eliminating duplication across the five public constructors.
27struct CollectionParts {
28 path: PathBuf,
29 config: CollectionConfig,
30 vector_storage: Arc<RwLock<MmapStorage>>,
31 payload_storage: Arc<RwLock<LogPayloadStorage>>,
32 index: Arc<HnswIndex>,
33 text_index: Arc<Bm25Index>,
34 property_index: PropertyIndex,
35 range_index: RangeIndex,
36 edge_store: EdgeStore,
37 sparse_indexes: BTreeMap<String, SparseInvertedIndex>,
38}
39
40impl CollectionParts {
41 /// Returns a new `CollectionParts` with empty graph and sparse indexes.
42 ///
43 /// The six storage/index fields must be supplied by the caller; only the
44 /// four optional index fields (`property_index`, `range_index`,
45 /// `edge_store`, `sparse_indexes`) default to empty.
46 fn new_with_empty_indexes(
47 path: PathBuf,
48 config: CollectionConfig,
49 vector_storage: Arc<RwLock<MmapStorage>>,
50 payload_storage: Arc<RwLock<LogPayloadStorage>>,
51 index: Arc<HnswIndex>,
52 text_index: Arc<Bm25Index>,
53 ) -> Self {
54 Self {
55 path,
56 config,
57 vector_storage,
58 payload_storage,
59 index,
60 text_index,
61 property_index: PropertyIndex::new(),
62 range_index: RangeIndex::new(),
63 edge_store: EdgeStore::new(),
64 sparse_indexes: BTreeMap::new(),
65 }
66 }
67}
68
69impl Collection {
70 /// Assembles a `Collection` from pre-built components and default caches.
71 ///
72 /// This is the single point of truth for the `Self { .. }` struct literal,
73 /// eliminating duplication across the five public constructors.
74 fn assemble(parts: CollectionParts) -> Self {
75 #[cfg(feature = "persistence")]
76 let deferred_indexer = Self::build_deferred_indexer(&parts.config);
77
78 Self {
79 path: parts.path,
80 config: Arc::new(RwLock::new(parts.config)),
81 vector_storage: parts.vector_storage,
82 payload_storage: parts.payload_storage,
83 index: parts.index,
84 text_index: parts.text_index,
85 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
86 binary_cache: Arc::new(RwLock::new(HashMap::new())),
87 pq_cache: Arc::new(RwLock::new(HashMap::new())),
88 pq_quantizer: Arc::new(RwLock::new(None)),
89 pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
90 property_index: Arc::new(RwLock::new(parts.property_index)),
91 range_index: Arc::new(RwLock::new(parts.range_index)),
92 edge_store: Arc::new(RwLock::new(parts.edge_store)),
93 sparse_indexes: Arc::new(RwLock::new(parts.sparse_indexes)),
94 secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
95 guard_rails: Arc::new(GuardRails::default()),
96 query_planner: Arc::new(QueryPlanner::new()),
97 query_cache: Arc::new(QueryCache::new(256)),
98 cached_stats: Arc::new(Mutex::new(None)),
99 write_generation: Arc::new(std::sync::atomic::AtomicU64::new(0)),
100 #[cfg(feature = "persistence")]
101 stream_ingester: Arc::new(RwLock::new(None)),
102 #[cfg(feature = "persistence")]
103 delta_buffer: Arc::new(crate::collection::streaming::delta::DeltaBuffer::new()),
104 #[cfg(feature = "persistence")]
105 deferred_indexer,
106 }
107 }
108
109 /// Builds the optional `DeferredIndexer` from config.
110 ///
111 /// Returns `Some(Arc<DeferredIndexer>)` when `deferred_indexing` is
112 /// configured and enabled; `None` otherwise.
113 #[cfg(feature = "persistence")]
114 fn build_deferred_indexer(
115 config: &CollectionConfig,
116 ) -> Option<Arc<crate::collection::streaming::DeferredIndexer>> {
117 config
118 .deferred_indexing
119 .as_ref()
120 .filter(|cfg| cfg.enabled)
121 .map(|cfg| {
122 Arc::new(crate::collection::streaming::DeferredIndexer::new(
123 cfg.clone(),
124 ))
125 })
126 }
127
128 /// Initialises persistent storages and indexes for a new collection.
129 ///
130 /// Returns a complete `CollectionParts` with empty graph/sparse indexes,
131 /// ready to be passed to [`Self::assemble`].
132 fn init_collection_parts(
133 path: PathBuf,
134 config: CollectionConfig,
135 hnsw_params: Option<crate::index::hnsw::HnswParams>,
136 ) -> Result<CollectionParts> {
137 let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
138 let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
139 let index = if let Some(params) = hnsw_params {
140 Arc::new(HnswIndex::with_params(
141 config.dimension,
142 config.metric,
143 params,
144 )?)
145 } else {
146 Arc::new(HnswIndex::new(config.dimension, config.metric)?)
147 };
148 let text_index = Arc::new(Bm25Index::new());
149 Ok(CollectionParts::new_with_empty_indexes(
150 path,
151 config,
152 vector_storage,
153 payload_storage,
154 index,
155 text_index,
156 ))
157 }
158
159 /// Rebuilds the BM25 full-text index from persisted payloads.
160 fn rebuild_bm25_index(
161 payload_storage: &Arc<RwLock<LogPayloadStorage>>,
162 text_index: &Arc<Bm25Index>,
163 ) {
164 let storage = payload_storage.read();
165 let ids = storage.ids();
166 for id in ids {
167 if let Ok(Some(payload)) = storage.retrieve(id) {
168 let text = Self::extract_text_from_payload(&payload);
169 if !text.is_empty() {
170 text_index.add_document(id, &text);
171 }
172 }
173 }
174 }
175
176 /// Creates a new collection at the specified path.
177 ///
178 /// # Errors
179 ///
180 /// Returns an error if the directory cannot be created or the config cannot be saved.
181 pub fn create(path: PathBuf, dimension: usize, metric: DistanceMetric) -> Result<Self> {
182 Self::create_with_options(path, dimension, metric, StorageMode::default())
183 }
184
185 /// Derives the collection name from the directory path.
186 fn name_from_path(path: &std::path::Path) -> String {
187 path.file_name()
188 .and_then(|n| n.to_str())
189 .unwrap_or("unknown")
190 .to_string()
191 }
192
193 /// Shared init-and-persist pipeline for all `create_*` constructors.
194 ///
195 /// Validates dimensions (when non-zero), creates the directory, assembles
196 /// the collection from the supplied config, and persists `config.json`.
197 fn create_from_config(
198 path: PathBuf,
199 config: CollectionConfig,
200 hnsw_params: Option<crate::index::hnsw::HnswParams>,
201 ) -> Result<Self> {
202 let skip_dimension_check = config.metadata_only
203 || (config.graph_schema.is_some() && config.embedding_dimension.is_none());
204 if skip_dimension_check {
205 // dimension=0 is valid for metadata-only and graph-without-embedding
206 } else {
207 validate_dimension(config.dimension)?;
208 }
209 std::fs::create_dir_all(&path)?;
210
211 let collection = Self::assemble(Self::init_collection_parts(path, config, hnsw_params)?);
212 collection.save_config()?;
213 Ok(collection)
214 }
215
216 /// Creates a new collection with custom storage options.
217 ///
218 /// # Arguments
219 ///
220 /// * `path` - Path to the collection directory
221 /// * `dimension` - Vector dimension
222 /// * `metric` - Distance metric
223 /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
224 ///
225 /// # Errors
226 ///
227 /// Returns an error if the directory cannot be created or the config cannot be saved.
228 pub fn create_with_options(
229 path: PathBuf,
230 dimension: usize,
231 metric: DistanceMetric,
232 storage_mode: StorageMode,
233 ) -> Result<Self> {
234 let config = CollectionConfig {
235 name: Self::name_from_path(&path),
236 dimension,
237 metric,
238 point_count: 0,
239 storage_mode,
240 metadata_only: false,
241 graph_schema: None,
242 embedding_dimension: None,
243 pq_rescore_oversampling: Some(4),
244 hnsw_params: None,
245 #[cfg(feature = "persistence")]
246 deferred_indexing: None,
247 };
248 Self::create_from_config(path, config, None)
249 }
250
251 /// Creates a new collection with custom HNSW parameters.
252 ///
253 /// This is the lowest-level vector collection constructor, giving full
254 /// control over the HNSW graph topology (M, `ef_construction`) while
255 /// retaining the standard storage pipeline.
256 ///
257 /// # Arguments
258 ///
259 /// * `path` - Path to the collection directory
260 /// * `dimension` - Vector dimension
261 /// * `metric` - Distance metric
262 /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
263 /// * `hnsw_params` - Custom HNSW index parameters
264 ///
265 /// # Errors
266 ///
267 /// Returns an error if the directory cannot be created or the config cannot be saved.
268 pub fn create_with_hnsw_params(
269 path: PathBuf,
270 dimension: usize,
271 metric: DistanceMetric,
272 storage_mode: StorageMode,
273 hnsw_params: crate::index::hnsw::HnswParams,
274 ) -> Result<Self> {
275 let config = CollectionConfig {
276 name: Self::name_from_path(&path),
277 dimension,
278 metric,
279 point_count: 0,
280 storage_mode,
281 metadata_only: false,
282 graph_schema: None,
283 embedding_dimension: None,
284 pq_rescore_oversampling: Some(4),
285 hnsw_params: Some(hnsw_params),
286 #[cfg(feature = "persistence")]
287 deferred_indexing: None,
288 };
289 Self::create_from_config(path, config, Some(hnsw_params))
290 }
291
292 /// Creates a new collection with a specific type (Vector or `MetadataOnly`).
293 ///
294 /// # Arguments
295 ///
296 /// * `path` - Path to the collection directory
297 /// * `name` - Name of the collection
298 /// * `collection_type` - Type of collection to create
299 ///
300 /// # Errors
301 ///
302 /// Returns an error if the directory cannot be created or the config cannot be saved.
303 pub fn create_typed(
304 path: PathBuf,
305 name: &str,
306 collection_type: &CollectionType,
307 ) -> Result<Self> {
308 match collection_type {
309 CollectionType::Vector {
310 dimension,
311 metric,
312 storage_mode,
313 } => Self::create_with_options(path, *dimension, *metric, *storage_mode),
314 CollectionType::MetadataOnly => Self::create_metadata_only(path, name),
315 CollectionType::Graph { .. } => {
316 // Graph collections will be implemented in EPIC-004
317 // For now, return an error indicating this is not yet supported
318 Err(crate::Error::GraphNotSupported(
319 "Graph collection creation not yet implemented".to_string(),
320 ))
321 }
322 }
323 }
324
325 /// Creates a new metadata-only collection (no vectors, no HNSW index).
326 ///
327 /// Metadata-only collections are optimized for storing reference data,
328 /// catalogs, and other non-vector data. They support CRUD operations
329 /// and `VelesQL` queries on payload, but NOT vector search.
330 ///
331 /// # Errors
332 ///
333 /// Returns an error if the directory cannot be created or the config cannot be saved.
334 pub fn create_metadata_only(path: PathBuf, name: &str) -> Result<Self> {
335 let config = CollectionConfig {
336 name: name.to_string(),
337 dimension: 0, // No vector dimension
338 metric: DistanceMetric::Cosine, // Default, not used
339 point_count: 0,
340 storage_mode: StorageMode::Full, // Default, not used
341 metadata_only: true,
342 graph_schema: None,
343 embedding_dimension: None,
344 pq_rescore_oversampling: Some(4),
345 hnsw_params: None,
346 #[cfg(feature = "persistence")]
347 deferred_indexing: None,
348 };
349 Self::create_from_config(path, config, None)
350 }
351
352 /// Returns true if this is a metadata-only collection.
353 #[must_use]
354 pub fn is_metadata_only(&self) -> bool {
355 self.config.read().metadata_only
356 }
357
358 /// Opens an existing collection from the specified path.
359 ///
360 /// # Errors
361 ///
362 /// Returns an error if the config file cannot be read or parsed.
363 ///
364 /// # INVARIANT(CACHE-01): write_generation starts at 0 on open
365 ///
366 /// Every call to `Collection::open` initialises `write_generation` to 0.
367 /// This is **safe** for cache correctness because:
368 ///
369 /// 1. The plan cache is **not persisted** across process restarts — it is
370 /// always empty when the database opens. There are therefore no stale
371 /// cached plans that could be incorrectly served.
372 ///
373 /// 2. `Database::load_collections` bumps `schema_version` after loading
374 /// at least one collection (C-3). Any plan key built before the load
375 /// would carry the pre-load `schema_version` and would miss the cache
376 /// even if the `write_generation` happened to match.
377 ///
378 /// 3. Within a single process lifetime the `write_generation` is only ever
379 /// incremented (never reset), so a cache key built with generation N
380 /// will never be reused once the generation advances past N.
381 pub fn open(path: PathBuf) -> Result<Self> {
382 let config_path = path.join("config.json");
383 let config_data = std::fs::read_to_string(&config_path)?;
384 let config: CollectionConfig =
385 serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))?;
386
387 // Open persistent storages
388 let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, config.dimension)?));
389 let payload_storage = Arc::new(RwLock::new(LogPayloadStorage::new(&path)?));
390
391 // Load HNSW index if it exists, otherwise create new (empty).
392 // When hnsw.bin is absent and config.hnsw_params is set, honour the
393 // persisted custom params so they survive collection reopen.
394 let index = Self::load_or_create_hnsw(&path, &config)?;
395 let text_index = Arc::new(Bm25Index::new());
396
397 // Rebuild BM25 index from persisted payloads
398 Self::rebuild_bm25_index(&payload_storage, &text_index);
399
400 // Load persisted graph/sparse indexes (EPIC-009, EPIC-062)
401 let property_index = Self::load_property_index(&path);
402 let range_index = Self::load_range_index(&path);
403 let edge_store = Self::load_edge_store(&path);
404 let sparse_indexes = Self::load_named_sparse_indexes(&path);
405
406 // Reconcile point_count from storage (config.json may be stale if
407 // the previous process exited without calling save_config).
408 let actual_count = if config.metadata_only {
409 payload_storage.read().ids().len()
410 } else {
411 vector_storage.read().len()
412 };
413 let mut config = config;
414 config.point_count = actual_count;
415
416 // TODO #370: implement crash recovery gap detection for deferred indexer.
417 // After loading HNSW and vector storage, detect vectors in storage but
418 // not in HNSW (gap vectors from a crash during deferred merge) and
419 // re-index them. This requires comparing storage IDs against HNSW IDs
420 // which may be expensive for large collections.
421
422 Ok(Self::assemble(CollectionParts {
423 path,
424 config,
425 vector_storage,
426 payload_storage,
427 index,
428 text_index,
429 property_index,
430 range_index,
431 edge_store,
432 sparse_indexes,
433 }))
434 }
435
436 /// Creates a new graph collection (with optional node embeddings).
437 ///
438 /// Persists `graph_schema` and `embedding_dimension` in `config.json`.
439 ///
440 /// # Errors
441 ///
442 /// Returns an error if the directory cannot be created or the config cannot be saved.
443 pub fn create_graph_collection(
444 path: PathBuf,
445 name: &str,
446 schema: crate::collection::graph::GraphSchema,
447 embedding_dim: Option<usize>,
448 metric: DistanceMetric,
449 ) -> Result<Self> {
450 let config = CollectionConfig {
451 name: name.to_string(),
452 dimension: embedding_dim.unwrap_or(0),
453 metric,
454 point_count: 0,
455 storage_mode: StorageMode::Full,
456 metadata_only: false,
457 graph_schema: Some(schema),
458 embedding_dimension: embedding_dim,
459 pq_rescore_oversampling: Some(4),
460 hnsw_params: None,
461 #[cfg(feature = "persistence")]
462 deferred_indexing: None,
463 };
464 // NOTE: create_from_config validates dimension only when > 0,
465 // so embedding_dim=None (dimension=0) skips validation correctly.
466 Self::create_from_config(path, config, None)
467 }
468
469 /// Loads the HNSW index from `hnsw.bin` or creates an empty one.
470 ///
471 /// When `hnsw.bin` is absent and `config.hnsw_params` is set, the
472 /// persisted custom params are honoured so they survive collection reopen.
473 fn load_or_create_hnsw(
474 path: &std::path::Path,
475 config: &CollectionConfig,
476 ) -> Result<Arc<HnswIndex>> {
477 if path.join("hnsw.bin").exists() {
478 let idx = HnswIndex::load(path, config.dimension, config.metric)?;
479 Ok(Arc::new(idx))
480 } else if let Some(params) = config.hnsw_params {
481 Ok(Arc::new(HnswIndex::with_params(
482 config.dimension,
483 config.metric,
484 params,
485 )?))
486 } else {
487 Ok(Arc::new(HnswIndex::new(config.dimension, config.metric)?))
488 }
489 }
490
491 /// Loads all named sparse indexes from disk.
492 ///
493 /// Scans for `sparse.meta` (default name `""`) and `sparse-{name}.meta` files.
494 /// Returns a `BTreeMap` keyed by sparse vector name.
495 ///
496 /// # Concurrency safety of `read_dir`
497 ///
498 /// The `read_dir` scan below is safe from race conditions for two reasons:
499 ///
500 /// 1. **Single-threaded open**: `Collection::open` (and therefore this
501 /// function) is always called from `Database::open`, which runs
502 /// single-threaded during startup. No concurrent writers exist at this
503 /// point.
504 ///
505 /// 2. **Atomic rename in compaction**: `compact_with_prefix` writes new
506 /// data to `{prefix}.*.tmp` staging files and only promotes them to
507 /// their final names via an atomic `rename(2)`. A `read_dir` scan
508 /// therefore never observes a partially-written `sparse-*.meta` file;
509 /// it either sees the complete previous version or the complete new
510 /// version — never a torn write.
511 fn load_named_sparse_indexes(
512 path: &std::path::Path,
513 ) -> BTreeMap<String, crate::index::sparse::SparseInvertedIndex> {
514 let mut indexes = BTreeMap::new();
515
516 // Load default (unprefixed) sparse index: sparse.meta / sparse.wal
517 match crate::index::sparse::persistence::load_from_disk(path) {
518 Ok(Some(idx)) => {
519 indexes.insert(DEFAULT_SPARSE_INDEX_NAME.to_string(), idx);
520 }
521 Ok(None) => {}
522 Err(e) => {
523 tracing::warn!(
524 "Failed to load default sparse index from {:?}: {}. Skipping.",
525 path,
526 e
527 );
528 }
529 }
530
531 // Scan for named sparse indexes: sparse-{name}.meta files.
532 // The `.meta` suffix is the sentinel for a fully compacted (committed)
533 // index file; stale `.tmp` artefacts from interrupted compactions are
534 // ignored because they do not match the `strip_suffix(".meta")` filter.
535 if let Ok(entries) = std::fs::read_dir(path) {
536 for entry in entries.flatten() {
537 let file_name = entry.file_name();
538 let name_str = file_name.to_string_lossy();
539 if let Some(sparse_name) = name_str
540 .strip_prefix("sparse-")
541 .and_then(|s| s.strip_suffix(".meta"))
542 {
543 let sparse_name = sparse_name.to_string();
544 match crate::index::sparse::persistence::load_named_from_disk(
545 path,
546 &sparse_name,
547 ) {
548 Ok(Some(idx)) => {
549 indexes.insert(sparse_name, idx);
550 }
551 Ok(None) => {}
552 Err(e) => {
553 tracing::warn!(
554 "Failed to load sparse index '{}' from {:?}: {}. Skipping.",
555 sparse_name,
556 path,
557 e
558 );
559 }
560 }
561 }
562 }
563 }
564
565 indexes
566 }
567
568 /// Loads a persisted index from disk, falling back to a default on missing
569 /// file or deserialization error.
570 ///
571 /// This is the single implementation for the load-or-default pattern shared
572 /// by `PropertyIndex`, `RangeIndex`, and `EdgeStore`.
573 fn load_or_default<T>(
574 path: &std::path::Path,
575 file_name: &str,
576 load_fn: impl FnOnce(&std::path::Path) -> std::io::Result<T>,
577 default: impl FnOnce() -> T,
578 ) -> T {
579 let full_path = path.join(file_name);
580 if full_path.exists() {
581 match load_fn(&full_path) {
582 Ok(val) => return val,
583 Err(e) => tracing::warn!(
584 "Failed to load {} from {:?}: {}. Starting with empty default.",
585 file_name,
586 full_path,
587 e
588 ),
589 }
590 }
591 default()
592 }
593
594 fn load_edge_store(path: &std::path::Path) -> EdgeStore {
595 Self::load_or_default(
596 path,
597 "edge_store.bin",
598 EdgeStore::load_from_file,
599 EdgeStore::new,
600 )
601 }
602
603 fn load_property_index(path: &std::path::Path) -> PropertyIndex {
604 Self::load_or_default(
605 path,
606 "property_index.bin",
607 PropertyIndex::load_from_file,
608 PropertyIndex::new,
609 )
610 }
611
612 fn load_range_index(path: &std::path::Path) -> RangeIndex {
613 Self::load_or_default(
614 path,
615 "range_index.bin",
616 RangeIndex::load_from_file,
617 RangeIndex::new,
618 )
619 }
620
621 /// Returns a reference to the collection's guard rails.
622 #[must_use]
623 pub fn guard_rails(&self) -> &std::sync::Arc<crate::guardrails::GuardRails> {
624 &self.guard_rails
625 }
626
627 /// Returns the collection configuration.
628 #[must_use]
629 pub fn config(&self) -> CollectionConfig {
630 self.config.read().clone()
631 }
632
633 /// Saves the collection configuration and index to disk.
634 ///
635 /// If the delta buffer is active (HNSW rebuild in progress), drains
636 /// buffered vectors into the HNSW index before persisting it. This
637 /// ensures graceful shutdown does not lose vectors that were accepted
638 /// during the rebuild window.
639 ///
640 /// # Errors
641 ///
642 /// Returns an error if storage operations fail.
643 pub fn flush(&self) -> Result<()> {
644 self.save_config()?;
645 self.vector_storage.write().flush()?;
646 self.payload_storage.write().flush()?;
647 // Drain delta buffer into HNSW before persisting the index.
648 // Lock order: delta_buffer(10) is acquired after vector_storage(2)
649 // and payload_storage(3) — both already released above.
650 self.drain_delta_into_index();
651 // Drain deferred indexer into HNSW (position 11, after delta at 10).
652 self.drain_deferred_into_index();
653 self.index.save(&self.path)?;
654 self.flush_secondary_indexes()?;
655 self.flush_sparse_indexes()
656 }
657
658 /// Drains the delta buffer into the HNSW index (if active).
659 ///
660 /// No-op when the delta buffer is inactive (no rebuild in progress).
661 /// After draining, the buffer is empty and inactive.
662 ///
663 /// Filters out IDs that have been deleted from vector storage since they
664 /// were buffered, preventing ghost vectors from being re-inserted into
665 /// HNSW after a concurrent delete.
666 ///
667 /// Uses `insert_batch_parallel` for consistent batch insert performance
668 /// (same strategy as `merge_deferred_batch` in crud.rs).
669 ///
670 /// # Lock ordering
671 ///
672 /// Acquires `vector_storage` (position 2) briefly for the validity
673 /// check, releases it, then inserts into the index (no lock).
674 /// `delta_buffer` (position 10) is acquired first via `deactivate_and_drain`.
675 /// The caller must NOT hold any lower-numbered lock when calling this method.
676 #[cfg(feature = "persistence")]
677 fn drain_delta_into_index(&self) {
678 let drained = self.delta_buffer.deactivate_and_drain();
679 if drained.is_empty() {
680 return;
681 }
682 // Filter out vectors deleted from storage during the buffer's
683 // lifetime to prevent ghost re-insertion into HNSW.
684 let storage = self.vector_storage.read();
685 let valid: Vec<(u64, &[f32])> = drained
686 .iter()
687 .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
688 .map(|(id, v)| (*id, v.as_slice()))
689 .collect();
690 drop(storage); // Release read lock before batch insert
691 if !valid.is_empty() {
692 self.index.insert_batch_parallel(valid);
693 }
694 }
695
696 /// No-op stub when persistence is disabled.
697 #[cfg(not(feature = "persistence"))]
698 fn drain_delta_into_index(&self) {}
699
700 /// Drains the deferred indexer into the HNSW index (if configured).
701 ///
702 /// No-op when deferred indexing is not configured or disabled.
703 /// After draining, both buffers are empty and inactive.
704 ///
705 /// Filters out IDs that have been deleted from vector storage since they
706 /// were buffered, preventing ghost vectors from being re-inserted into
707 /// HNSW after a concurrent delete.
708 ///
709 /// Uses `insert_batch_parallel` for consistent batch insert performance
710 /// (same strategy as `merge_deferred_batch` in crud.rs).
711 ///
712 /// # Lock ordering
713 ///
714 /// Acquires `vector_storage` (position 2) briefly for the validity
715 /// check, releases it, then inserts into the index (no lock).
716 /// `deferred_indexer` (position 11) is acquired first via `drain_all`.
717 /// The caller must NOT hold any lower-numbered lock.
718 #[cfg(feature = "persistence")]
719 fn drain_deferred_into_index(&self) {
720 if let Some(ref di) = self.deferred_indexer {
721 let drained = di.drain_all();
722 if drained.is_empty() {
723 return;
724 }
725 // Filter out vectors deleted from storage during the buffer's
726 // lifetime to prevent ghost re-insertion into HNSW.
727 let storage = self.vector_storage.read();
728 let valid: Vec<(u64, &[f32])> = drained
729 .iter()
730 .filter(|(id, _)| storage.retrieve(*id).ok().flatten().is_some())
731 .map(|(id, v)| (*id, v.as_slice()))
732 .collect();
733 drop(storage); // Release read lock before batch insert
734 if !valid.is_empty() {
735 self.index.insert_batch_parallel(valid);
736 }
737 }
738 }
739
740 /// No-op stub when persistence is disabled.
741 #[cfg(not(feature = "persistence"))]
742 fn drain_deferred_into_index(&self) {}
743
744 /// Persists property index, range index, and edge store (EPIC-009 US-005).
745 fn flush_secondary_indexes(&self) -> Result<()> {
746 let property_index_path = self.path.join("property_index.bin");
747 self.property_index
748 .read()
749 .save_to_file(&property_index_path)?;
750
751 let range_index_path = self.path.join("range_index.bin");
752 self.range_index.read().save_to_file(&range_index_path)?;
753
754 // Save EdgeStore for graph collections (BUG-1: was never persisted)
755 if self.config.read().graph_schema.is_some() {
756 let edge_store_path = self.path.join("edge_store.bin");
757 self.edge_store.read().save_to_file(&edge_store_path)?;
758 }
759
760 Ok(())
761 }
762
763 /// Compacts all named sparse indexes to disk (EPIC-062 / SPARSE-04).
764 fn flush_sparse_indexes(&self) -> Result<()> {
765 let indexes = self.sparse_indexes.read();
766 for (name, idx) in indexes.iter() {
767 crate::index::sparse::persistence::compact_named(&self.path, name, idx)?;
768 }
769 Ok(())
770 }
771
772 /// Returns a reference to the collection's data path.
773 #[must_use]
774 pub(crate) fn data_path(&self) -> &std::path::Path {
775 &self.path
776 }
777
778 /// Returns a write guard on the collection config for mutation.
779 pub(crate) fn config_write(
780 &self,
781 ) -> parking_lot::RwLockWriteGuard<'_, crate::collection::types::CollectionConfig> {
782 self.config.write()
783 }
784
785 /// Returns a write guard on the PQ quantizer slot.
786 pub(crate) fn pq_quantizer_write(
787 &self,
788 ) -> parking_lot::RwLockWriteGuard<'_, Option<crate::quantization::ProductQuantizer>> {
789 self.pq_quantizer.write()
790 }
791
792 /// Returns a read guard on the PQ quantizer slot.
793 pub(crate) fn pq_quantizer_read(
794 &self,
795 ) -> parking_lot::RwLockReadGuard<'_, Option<crate::quantization::ProductQuantizer>> {
796 self.pq_quantizer.read()
797 }
798
799 /// Saves the collection configuration to disk.
800 ///
801 /// Uses atomic write-tmp-fsync-rename to prevent torn writes on crash.
802 pub(crate) fn save_config(&self) -> Result<()> {
803 use std::io::Write;
804
805 let config = self.config.read();
806 let config_path = self.path.join("config.json");
807 let tmp_path = self.path.join("config.json.tmp");
808 let config_data = serde_json::to_string_pretty(&*config)
809 .map_err(|e| Error::Serialization(e.to_string()))?;
810
811 let file = std::fs::File::create(&tmp_path)?;
812 let mut writer = std::io::BufWriter::new(file);
813 writer.write_all(config_data.as_bytes())?;
814 writer.flush()?;
815 writer.get_ref().sync_all()?;
816 std::fs::rename(&tmp_path, &config_path)?;
817 Ok(())
818 }
819}