Skip to main content

velesdb_core/
database.rs

1//! Database facade and orchestration layer for collection lifecycle and query routing.
2
3use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
4use crate::observer::DatabaseObserver;
5use crate::simd_dispatch;
6use crate::{
7    Collection, CollectionType, ColumnStore, DistanceMetric, Error, Result, SearchResult,
8    StorageMode,
9};
10
11/// Database instance managing collections and storage.
12///
13/// # Lifecycle
14///
15/// `Database::open()` automatically loads all previously created collections from disk.
16/// There is no need to call `load_collections()` separately.
17///
18/// # Extension (Premium)
19///
20/// Use [`Database::open_with_observer`] to inject a [`DatabaseObserver`] implementation
21/// from `velesdb-premium` without modifying this crate.
22#[cfg(feature = "persistence")]
23pub struct Database {
24    /// Path to the data directory
25    data_dir: std::path::PathBuf,
26    /// Legacy registry (Collection god-object) — kept for backward compatibility during migration.
27    collections: parking_lot::RwLock<std::collections::HashMap<String, Collection>>,
28    /// New registry: vector collections.
29    vector_colls: parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
30    /// New registry: graph collections.
31    graph_colls: parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
32    /// New registry: metadata-only collections.
33    metadata_colls: parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
34    /// Cached collection statistics for CBO planning.
35    collection_stats: parking_lot::RwLock<
36        std::collections::HashMap<String, crate::collection::stats::CollectionStats>,
37    >,
38    /// Optional lifecycle observer (used by velesdb-premium for RBAC, audit, multi-tenant).
39    observer: Option<std::sync::Arc<dyn DatabaseObserver>>,
40    /// Monotonic DDL schema version counter (CACHE-01).
41    ///
42    /// Incremented on every create/drop collection operation.
43    /// Used by `CompiledPlanCache` to invalidate cached query plans.
44    schema_version: std::sync::atomic::AtomicU64,
45    /// Compiled query plan cache (CACHE-02).
46    ///
47    /// Stores recently compiled `QueryPlan` instances keyed by `PlanKey`.
48    /// Default sizing: L1 = 1K hot entries, L2 = 10K LRU entries.
49    compiled_plan_cache: crate::cache::CompiledPlanCache,
50}
51
52#[cfg(feature = "persistence")]
53impl Database {
54    /// Ensures a collection name is free in memory and on disk.
55    ///
56    /// This prevents re-creating over a skipped/corrupted on-disk collection
57    /// that was not loaded into registries.
58    fn ensure_collection_name_available(&self, name: &str) -> Result<()> {
59        let exists_in_registry = self.collections.read().contains_key(name)
60            || self.vector_colls.read().contains_key(name)
61            || self.graph_colls.read().contains_key(name)
62            || self.metadata_colls.read().contains_key(name);
63        if exists_in_registry {
64            return Err(Error::CollectionExists(name.to_string()));
65        }
66
67        let collection_path = self.data_dir.join(name);
68        if collection_path.exists() {
69            return Err(Error::CollectionExists(name.to_string()));
70        }
71
72        Ok(())
73    }
74
75    /// Opens or creates a database, **automatically loading all existing collections**.
76    ///
77    /// This replaces the previous `open()` + `load_collections()` two-step pattern.
78    /// The new `open()` is a strict auto-load: all `config.json` directories under
79    /// `path` are loaded on startup.
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if the directory cannot be created or accessed.
84    pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
85        Self::open_impl(path, None)
86    }
87
88    /// Opens a database with a [`DatabaseObserver`] (used by velesdb-premium).
89    ///
90    /// The observer receives lifecycle hooks for every collection operation,
91    /// enabling RBAC, audit logging, multi-tenant routing, etc.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the directory cannot be created or accessed.
96    pub fn open_with_observer<P: AsRef<std::path::Path>>(
97        path: P,
98        observer: std::sync::Arc<dyn DatabaseObserver>,
99    ) -> Result<Self> {
100        Self::open_impl(path, Some(observer))
101    }
102
103    fn open_impl<P: AsRef<std::path::Path>>(
104        path: P,
105        observer: Option<std::sync::Arc<dyn DatabaseObserver>>,
106    ) -> Result<Self> {
107        let data_dir = path.as_ref().to_path_buf();
108        std::fs::create_dir_all(&data_dir)?;
109
110        // Log SIMD features detected at startup
111        let features = simd_dispatch::simd_features_info();
112        tracing::info!(
113            avx512 = features.avx512f,
114            avx2 = features.avx2,
115            "SIMD features detected - direct dispatch enabled"
116        );
117
118        let db = Self {
119            data_dir,
120            collections: parking_lot::RwLock::new(std::collections::HashMap::new()),
121            vector_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
122            graph_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
123            metadata_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
124            collection_stats: parking_lot::RwLock::new(std::collections::HashMap::new()),
125            observer,
126            schema_version: std::sync::atomic::AtomicU64::new(0),
127            compiled_plan_cache: crate::cache::CompiledPlanCache::new(1_000, 10_000),
128        };
129
130        // Auto-load all existing collections from disk (replaces manual load_collections()).
131        db.load_collections()?;
132
133        Ok(db)
134    }
135
136    /// Creates a new collection with the specified parameters.
137    ///
138    /// # Arguments
139    ///
140    /// * `name` - Unique name for the collection
141    /// * `dimension` - Vector dimension (e.g., 768 for many embedding models)
142    /// * `metric` - Distance metric to use for similarity calculations
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if a collection with the same name already exists.
147    pub fn create_collection(
148        &self,
149        name: &str,
150        dimension: usize,
151        metric: DistanceMetric,
152    ) -> Result<()> {
153        self.create_collection_with_options(name, dimension, metric, StorageMode::default())
154    }
155
156    /// Creates a new collection with custom storage options.
157    ///
158    /// # Arguments
159    ///
160    /// * `name` - Unique name for the collection
161    /// * `dimension` - Vector dimension
162    /// * `metric` - Distance metric
163    /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if a collection with the same name already exists.
168    pub fn create_collection_with_options(
169        &self,
170        name: &str,
171        dimension: usize,
172        metric: DistanceMetric,
173        storage_mode: StorageMode,
174    ) -> Result<()> {
175        self.ensure_collection_name_available(name)?;
176
177        let collection_path = self.data_dir.join(name);
178        let coll =
179            VectorCollection::create(collection_path, name, dimension, metric, storage_mode)?;
180        // Keep legacy and typed registries in sync (same Arc<> — zero copy).
181        self.collections
182            .write()
183            .insert(name.to_string(), coll.inner.clone());
184        self.vector_colls.write().insert(name.to_string(), coll);
185
186        // Bump schema version (CACHE-01 DDL invalidation).
187        self.schema_version
188            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
189
190        Ok(())
191    }
192
193    /// Returns the path to the data directory.
194    #[must_use]
195    pub fn data_dir(&self) -> &std::path::Path {
196        &self.data_dir
197    }
198
199    /// Returns the current DDL schema version counter.
200    #[must_use]
201    pub fn schema_version(&self) -> u64 {
202        self.schema_version
203            .load(std::sync::atomic::Ordering::Relaxed)
204    }
205
206    /// Returns a reference to the compiled query plan cache.
207    #[must_use]
208    pub fn plan_cache(&self) -> &crate::cache::CompiledPlanCache {
209        &self.compiled_plan_cache
210    }
211
212    /// Returns the write generation for a named collection, if it exists.
213    ///
214    /// Checks the legacy registry first (covers all collection types).
215    #[must_use]
216    pub fn collection_write_generation(&self, name: &str) -> Option<u64> {
217        self.collections
218            .read()
219            .get(name)
220            .map(crate::Collection::write_generation)
221    }
222
223    /// Gets a reference to a collection by name.
224    ///
225    /// # Arguments
226    ///
227    /// * `name` - Name of the collection
228    ///
229    /// # Returns
230    ///
231    /// Returns `None` if the collection does not exist.
232    pub fn get_collection(&self, name: &str) -> Option<Collection> {
233        self.collections.read().get(name).cloned()
234    }
235
236    /// Analyzes a collection, caches stats, and persists them to disk.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if the collection does not exist, analysis fails, or
241    /// stats cannot be serialized and written to disk.
242    pub fn analyze_collection(
243        &self,
244        name: &str,
245    ) -> Result<crate::collection::stats::CollectionStats> {
246        let collection = self
247            .get_collection(name)
248            .ok_or_else(|| Error::CollectionNotFound(name.to_string()))?;
249        let stats = collection.analyze()?;
250
251        self.collection_stats
252            .write()
253            .insert(name.to_string(), stats.clone());
254
255        let stats_path = self.data_dir.join(name).join("collection.stats.json");
256        let serialized = serde_json::to_vec_pretty(&stats)
257            .map_err(|e| Error::Serialization(format!("failed to serialize stats: {e}")))?;
258        std::fs::write(&stats_path, serialized)?;
259
260        Ok(stats)
261    }
262
263    /// Returns cached statistics when available, loading from disk if present.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the on-disk stats file exists but cannot be read or
268    /// deserialized.
269    pub fn get_collection_stats(
270        &self,
271        name: &str,
272    ) -> Result<Option<crate::collection::stats::CollectionStats>> {
273        if let Some(stats) = self.collection_stats.read().get(name).cloned() {
274            return Ok(Some(stats));
275        }
276
277        let stats_path = self.data_dir.join(name).join("collection.stats.json");
278        if !stats_path.exists() {
279            return Ok(None);
280        }
281
282        let bytes = std::fs::read(stats_path)?;
283        let stats: crate::collection::stats::CollectionStats = serde_json::from_slice(&bytes)
284            .map_err(|e| Error::Serialization(format!("failed to parse stats: {e}")))?;
285        self.collection_stats
286            .write()
287            .insert(name.to_string(), stats.clone());
288        Ok(Some(stats))
289    }
290
291    /// Produces a canonical JSON string for a `serde_json::Value`.
292    ///
293    /// Recursively sorts the keys of every JSON object so that two values
294    /// representing the same logical structure always produce identical bytes,
295    /// regardless of the `HashMap` iteration order used during serialization.
296    ///
297    /// This is required because `FusionConfig::params` and
298    /// `TrainStatement::params` are `HashMap`-backed; `serde_json` serialises
299    /// them in hash-order, which is non-deterministic across invocations.
300    fn canonical_json(value: serde_json::Value) -> serde_json::Value {
301        match value {
302            serde_json::Value::Object(map) => {
303                // Without the `preserve_order` feature flag, `serde_json::Map` is already
304                // backed by `BTreeMap` and therefore already sorted. This explicit sort
305                // step is kept as defense-in-depth: if `preserve_order` is ever enabled
306                // in `Cargo.toml` (which switches the backing store to `IndexMap` and
307                // preserves insertion order), the canonical key ordering is still upheld
308                // without any change to this function.
309                let sorted: serde_json::Map<String, serde_json::Value> = map
310                    .into_iter()
311                    .map(|(k, v)| (k, Self::canonical_json(v)))
312                    .collect::<std::collections::BTreeMap<_, _>>()
313                    .into_iter()
314                    .collect();
315                serde_json::Value::Object(sorted)
316            }
317            serde_json::Value::Array(arr) => {
318                serde_json::Value::Array(arr.into_iter().map(Self::canonical_json).collect())
319            }
320            other => other,
321        }
322    }
323
324    /// Builds a deterministic cache key for a query (CACHE-02).
325    ///
326    /// Serialises the query to canonical JSON (object keys sorted recursively),
327    /// reads the current `schema_version`, and gathers per-collection
328    /// `write_generation` counters (sorted by collection name) to form a
329    /// `PlanKey`.
330    ///
331    /// # Why canonical JSON instead of `Debug`
332    ///
333    /// `format!("{query:?}")` is non-deterministic when the `Query` AST
334    /// contains `HashMap`-backed fields (`FusionConfig::params`,
335    /// `TrainStatement::params`) because `HashMap` iteration order is not
336    /// guaranteed across invocations. Canonical JSON with sorted object keys
337    /// is stable and produces the same byte sequence for logically identical
338    /// queries.
339    #[must_use]
340    pub fn build_plan_key(&self, query: &crate::velesql::Query) -> crate::cache::PlanKey {
341        use std::hash::{BuildHasher, Hasher};
342
343        // Serialise via serde_json, then canonicalise (sort object keys) before hashing.
344        // Fallback to Debug representation if serialization fails (should never happen in
345        // practice since all Query fields are Serialize, but erring on the side of liveness).
346        let query_text = serde_json::to_value(query)
347            .map(Self::canonical_json)
348            .and_then(|v| serde_json::to_string(&v))
349            .unwrap_or_else(|_| format!("{query:?}"));
350
351        let mut hasher = rustc_hash::FxBuildHasher.build_hasher();
352        hasher.write(query_text.as_bytes());
353        let query_hash = hasher.finish();
354
355        let schema_version = self.schema_version();
356
357        // Gather referenced collection names (base + join targets), sort them.
358        let mut collection_names = vec![query.select.from.clone()];
359        for join in &query.select.joins {
360            collection_names.push(join.table.clone());
361        }
362        collection_names.sort();
363        collection_names.dedup();
364
365        // Build generations vector in sorted collection order.
366        let collection_generations: smallvec::SmallVec<[u64; 4]> = collection_names
367            .iter()
368            .map(|name| self.collection_write_generation(name).unwrap_or(0))
369            .collect();
370
371        crate::cache::PlanKey {
372            query_hash,
373            schema_version,
374            collection_generations,
375        }
376    }
377
378    /// Returns the query plan for a query, with cache status populated (CACHE-02).
379    ///
380    /// If the plan is cached, returns it with `cache_hit: Some(true)` and
381    /// `plan_reuse_count` set. Otherwise generates a fresh plan with
382    /// `cache_hit: Some(false)`.
383    ///
384    /// # Design decision: `explain_query` does not populate the cache
385    ///
386    /// `explain_query` intentionally does **not** insert a new plan into the
387    /// compiled plan cache. EXPLAIN is a diagnostic operation; allowing it to
388    /// influence cache state would make cache metrics (hit/miss ratios,
389    /// `plan_reuse_count`) unreliable because EXPLAIN calls would be
390    /// indistinguishable from real execution hits. Only `execute_query` is
391    /// authorised to write to the cache.
392    ///
393    /// # Errors
394    ///
395    /// Returns an error if the query is invalid.
396    pub fn explain_query(
397        &self,
398        query: &crate::velesql::Query,
399    ) -> Result<crate::velesql::QueryPlan> {
400        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
401
402        let plan_key = self.build_plan_key(query);
403
404        if let Some(cached) = self.compiled_plan_cache.get(&plan_key) {
405            let mut plan = cached.plan.clone();
406            plan.cache_hit = Some(true);
407            plan.plan_reuse_count = Some(
408                cached
409                    .reuse_count
410                    .load(std::sync::atomic::Ordering::Relaxed),
411            );
412            return Ok(plan);
413        }
414
415        let mut plan = crate::velesql::QueryPlan::from_select(&query.select);
416        plan.cache_hit = Some(false);
417        plan.plan_reuse_count = Some(0);
418        Ok(plan)
419    }
420
421    /// Executes a `VelesQL` query with database-level JOIN resolution.
422    ///
423    /// This method resolves JOIN target collections from the database registry
424    /// and executes JOIN runtime in sequence. Query plans are cached and
425    /// reused for identical queries against unchanged collections (CACHE-02).
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if the base collection or any JOIN collection is missing.
430    pub fn execute_query(
431        &self,
432        query: &crate::velesql::Query,
433        params: &std::collections::HashMap<String, serde_json::Value>,
434    ) -> Result<Vec<SearchResult>> {
435        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
436
437        if let Some(train) = query.train.as_ref() {
438            return self.execute_train(train);
439        }
440
441        if let Some(dml) = query.dml.as_ref() {
442            return self.execute_dml(dml, params);
443        }
444
445        if query.is_match_query() {
446            return Err(Error::Query(
447                "Database::execute_query does not support top-level MATCH queries. Use Collection::execute_query or pass the collection name."
448                    .to_string(),
449            ));
450        }
451
452        // Build plan key and check cache WITHOUT recording hit/miss metrics (CACHE-02).
453        //
454        // `contains()` is used instead of `get().is_some()` so that this
455        // existence check does not increment the hit/miss counters or
456        // `reuse_count`. Only `explain_query` (which surfaces these values to
457        // callers) should call `get()`.
458        let pre_exec_key = self.build_plan_key(query);
459        let is_cached = self.compiled_plan_cache.contains(&pre_exec_key);
460
461        let base_name = query.select.from.clone();
462        // Priority: collections registry first (contains live instances for both legacy
463        // create_collection and new create_vector_collection via shared inner Arc<>).
464        // Fallback to opening from disk via get_vector_collection for any missed case.
465        let base_collection = self
466            .get_collection(&base_name)
467            .or_else(|| self.get_vector_collection(&base_name).map(|vc| vc.inner))
468            .ok_or_else(|| Error::CollectionNotFound(base_name.clone()))?;
469
470        let results = if query.select.joins.is_empty() {
471            base_collection.execute_query(query, params)?
472        } else {
473            let mut base_query = query.clone();
474            base_query.select.joins.clear();
475
476            let mut results = base_collection.execute_query(&base_query, params)?;
477            for join in &query.select.joins {
478                let join_collection = self
479                    .get_collection(&join.table)
480                    .or_else(|| self.get_vector_collection(&join.table).map(|vc| vc.inner))
481                    .ok_or_else(|| Error::CollectionNotFound(join.table.clone()))?;
482                let column_store = Self::build_join_column_store(&join_collection)?;
483                let joined = crate::collection::search::query::join::execute_join(
484                    &results,
485                    join,
486                    &column_store,
487                )?;
488                results = crate::collection::search::query::join::joined_to_search_results(joined);
489            }
490            results
491        };
492
493        // Populate cache on miss (CACHE-02).
494        //
495        // C-1 TOCTOU fix: rebuild the plan key AFTER execution. Between the
496        // pre-execution `contains()` check and here, a concurrent writer may
497        // have bumped a collection's `write_generation` (e.g. via `upsert` on
498        // another thread). Rebuilding the key captures the post-execution
499        // state, so the cached plan is associated with the generation that was
500        // live when the plan was actually compiled — not a potentially stale
501        // pre-execution snapshot.
502        if !is_cached {
503            let mut collection_names = vec![query.select.from.clone()];
504            for join in &query.select.joins {
505                collection_names.push(join.table.clone());
506            }
507            collection_names.sort();
508            collection_names.dedup();
509
510            let compiled = std::sync::Arc::new(crate::cache::CompiledPlan {
511                plan: crate::velesql::QueryPlan::from_select(&query.select),
512                referenced_collections: collection_names,
513                compiled_at: std::time::Instant::now(),
514                reuse_count: std::sync::atomic::AtomicU64::new(0),
515            });
516            // Rebuild key after execution to reflect current write_generation (C-1).
517            let post_exec_key = self.build_plan_key(query);
518            self.compiled_plan_cache.insert(post_exec_key, compiled);
519        }
520
521        Ok(results)
522    }
523
524    /// Lists all collection names in the database.
525    ///
526    /// Includes collections created via any typed API (vector, graph, metadata).
527    pub fn list_collections(&self) -> Vec<String> {
528        // BUG-7: acquire all locks together for a consistent point-in-time snapshot.
529        let collections = self.collections.read();
530        let vector_colls = self.vector_colls.read();
531        let graph_colls = self.graph_colls.read();
532        let metadata_colls = self.metadata_colls.read();
533
534        let mut names: std::collections::HashSet<String> = collections.keys().cloned().collect();
535        for k in vector_colls.keys() {
536            names.insert(k.clone());
537        }
538        for k in graph_colls.keys() {
539            names.insert(k.clone());
540        }
541        for k in metadata_colls.keys() {
542            names.insert(k.clone());
543        }
544        let mut result: Vec<String> = names.into_iter().collect();
545        result.sort();
546        result
547    }
548
549    /// Deletes a collection by name.
550    ///
551    /// # Arguments
552    ///
553    /// * `name` - Name of the collection to delete
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if the collection does not exist in any registry.
558    ///
559    /// # Concurrency
560    ///
561    /// The existence check and the registry removals are not a single atomic
562    /// operation. Under concurrent deletion of the same collection, at most one
563    /// caller receives `CollectionNotFound`; subsequent callers perform no-op
564    /// `remove()` calls which are safe and idempotent. `remove_dir_all` is
565    /// guarded by an `exists()` check and is therefore also safe.
566    pub fn delete_collection(&self, name: &str) -> Result<()> {
567        // Check existence across all registries before taking any write lock.
568        let exists = self.collections.read().contains_key(name)
569            || self.vector_colls.read().contains_key(name)
570            || self.graph_colls.read().contains_key(name)
571            || self.metadata_colls.read().contains_key(name);
572
573        if !exists {
574            return Err(Error::CollectionNotFound(name.to_string()));
575        }
576
577        // Remove from all registries and delete directory.
578        // Remove directory BEFORE purging registries so that, on failure, the
579        // in-memory state is still consistent (collection remains accessible).
580        let collection_path = self.data_dir.join(name);
581        if collection_path.exists() {
582            std::fs::remove_dir_all(&collection_path)?;
583        }
584
585        // Directory is gone (or never existed) — now purge registries (BUG-6).
586        self.collections.write().remove(name);
587        self.vector_colls.write().remove(name);
588        self.graph_colls.write().remove(name);
589        self.metadata_colls.write().remove(name);
590        self.collection_stats.write().remove(name);
591
592        if let Some(ref obs) = self.observer {
593            obs.on_collection_deleted(name);
594        }
595
596        // Bump schema version (CACHE-01 DDL invalidation).
597        self.schema_version
598            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
599
600        Ok(())
601    }
602
603    // =========================================================================
604    // New typed API (WP-4) — preferred over the legacy `create_collection` methods
605    // =========================================================================
606
607    /// Creates a new vector collection.
608    ///
609    /// # Errors
610    ///
611    /// Returns an error if a collection with the same name already exists.
612    pub fn create_vector_collection(
613        &self,
614        name: &str,
615        dimension: usize,
616        metric: DistanceMetric,
617    ) -> Result<()> {
618        self.create_vector_collection_with_options(name, dimension, metric, StorageMode::default())
619    }
620
621    /// Creates a new vector collection with custom storage options.
622    ///
623    /// # Errors
624    ///
625    /// Returns an error if a collection with the same name already exists.
626    pub fn create_vector_collection_with_options(
627        &self,
628        name: &str,
629        dimension: usize,
630        metric: DistanceMetric,
631        storage_mode: StorageMode,
632    ) -> Result<()> {
633        self.ensure_collection_name_available(name)?;
634        let path = self.data_dir.join(name);
635        let coll = VectorCollection::create(path, name, dimension, metric, storage_mode)?;
636        // Register the inner Collection in the legacy registry so that both
637        // get_collection() and get_vector_collection() use the same live instance.
638        // Collection is Clone and all heavy fields are Arc<> so this is zero-copy.
639        self.collections
640            .write()
641            .insert(name.to_string(), coll.inner.clone());
642        self.vector_colls.write().insert(name.to_string(), coll);
643
644        if let Some(ref obs) = self.observer {
645            let kind = CollectionType::Vector {
646                dimension,
647                metric,
648                storage_mode,
649            };
650            obs.on_collection_created(name, &kind);
651        }
652
653        // Bump schema version (CACHE-01 DDL invalidation).
654        self.schema_version
655            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
656
657        Ok(())
658    }
659
660    /// Creates a new graph collection.
661    ///
662    /// # Errors
663    ///
664    /// Returns an error if a collection with the same name already exists.
665    pub fn create_graph_collection(
666        &self,
667        name: &str,
668        schema: crate::collection::GraphSchema,
669    ) -> Result<()> {
670        self.ensure_collection_name_available(name)?;
671        let path = self.data_dir.join(name);
672        let coll =
673            GraphCollection::create(path, name, None, DistanceMetric::Cosine, schema.clone())?;
674        // Register in legacy registry so get_collection() and execute_query() work (BUG-2).
675        self.collections
676            .write()
677            .insert(name.to_string(), coll.inner.clone());
678        self.graph_colls.write().insert(name.to_string(), coll);
679
680        if let Some(ref obs) = self.observer {
681            let kind = CollectionType::Graph {
682                dimension: None,
683                metric: DistanceMetric::Cosine,
684                schema,
685            };
686            obs.on_collection_created(name, &kind);
687        }
688
689        // Bump schema version (CACHE-01 DDL invalidation).
690        self.schema_version
691            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
692
693        Ok(())
694    }
695
696    /// Creates a new metadata-only collection.
697    ///
698    /// # Errors
699    ///
700    /// Returns an error if a collection with the same name already exists.
701    pub fn create_metadata_collection(&self, name: &str) -> Result<()> {
702        self.ensure_collection_name_available(name)?;
703        let path = self.data_dir.join(name);
704        let coll = MetadataCollection::create(path, name)?;
705        // Share the same inner Collection instance in the legacy registry so that
706        // get_collection() and execute_query() see the same live data.
707        self.collections
708            .write()
709            .insert(name.to_string(), coll.inner.clone());
710        self.metadata_colls.write().insert(name.to_string(), coll);
711
712        if let Some(ref obs) = self.observer {
713            obs.on_collection_created(name, &CollectionType::MetadataOnly);
714        }
715
716        // Bump schema version (CACHE-01 DDL invalidation).
717        self.schema_version
718            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
719
720        Ok(())
721    }
722
723    // =========================================================================
724    // Observer notification helpers (called by server handlers after operations)
725    // =========================================================================
726
727    /// Notifies the observer that points were upserted into a collection.
728    ///
729    /// **Caller contract**: this method is NOT called automatically by
730    /// [`Database`] internals. HTTP handlers and SDK bindings are responsible
731    /// for calling it after a successful upsert, passing the number of points
732    /// written. Forgetting to call it means the observer receives no upsert
733    /// events for that operation.
734    ///
735    /// No-op when no observer is registered.
736    pub fn notify_upsert(&self, collection: &str, point_count: usize) {
737        if let Some(ref obs) = self.observer {
738            obs.on_upsert(collection, point_count);
739        }
740    }
741
742    /// Notifies the observer that a query was executed, with its duration.
743    ///
744    /// **Caller contract**: this method is NOT called automatically by
745    /// [`Database::execute_query`]. Callers must measure the wall-clock
746    /// duration themselves (e.g. `std::time::Instant::now()` before the call)
747    /// and invoke this method afterwards with the elapsed microseconds.
748    ///
749    /// No-op when no observer is registered.
750    pub fn notify_query(&self, collection: &str, duration_us: u64) {
751        if let Some(ref obs) = self.observer {
752            obs.on_query(collection, duration_us);
753        }
754    }
755
756    /// Returns a `VectorCollection` by name.
757    ///
758    /// Checks the typed registry first.  If not found there, falls back to
759    /// opening the collection directory from disk (e.g. for collections created
760    /// via the legacy `create_collection` API that were not registered in the
761    /// typed registry).  The opened instance is cached back into the registry
762    /// so subsequent calls avoid the disk round-trip.
763    ///
764    /// Returns `None` if the collection does not exist on disk.
765    #[must_use]
766    pub fn get_vector_collection(&self, name: &str) -> Option<VectorCollection> {
767        if let Some(c) = self.vector_colls.read().get(name).cloned() {
768            return Some(c);
769        }
770        // Fallback: open from disk (e.g. collections created via legacy API).
771        // Intentional: supports mixed-mode databases that were not fully migrated.
772        // Type is verified via config.json before caching (graph_schema / metadata_only checks).
773        let path = self.data_dir.join(name);
774        let config_path = path.join("config.json");
775        if config_path.exists() {
776            // Read config to confirm this is a vector collection.
777            let data = std::fs::read_to_string(&config_path).ok()?;
778            let cfg = serde_json::from_str::<crate::collection::CollectionConfig>(&data).ok()?;
779            if cfg.graph_schema.is_some() || cfg.metadata_only {
780                return None;
781            }
782            if let Ok(coll) = VectorCollection::open(path) {
783                self.vector_colls
784                    .write()
785                    .insert(name.to_string(), coll.clone());
786                return Some(coll);
787            }
788        }
789        None
790    }
791
792    /// Returns a `GraphCollection` by name.
793    ///
794    /// Checks the typed registry first.  Falls back to opening from disk if the
795    /// collection was not registered in-memory (e.g. after a restart or when
796    /// the collection was auto-created by a graph handler).  The instance is
797    /// cached into the registry so subsequent calls are free.
798    ///
799    /// Returns `None` if the collection does not exist on disk.
800    #[must_use]
801    pub fn get_graph_collection(&self, name: &str) -> Option<GraphCollection> {
802        if let Some(c) = self.graph_colls.read().get(name).cloned() {
803            return Some(c);
804        }
805        // Fallback: open from disk and cache to avoid repeated I/O.
806        // Type is verified via config.json: graph_schema must be Some (cfg.graph_schema.as_ref()?).
807        let path = self.data_dir.join(name);
808        let config_path = path.join("config.json");
809        if config_path.exists() {
810            let data = std::fs::read_to_string(&config_path).ok()?;
811            let cfg = serde_json::from_str::<crate::collection::CollectionConfig>(&data).ok()?;
812            cfg.graph_schema.as_ref()?;
813            if let Ok(coll) = GraphCollection::open(path) {
814                self.graph_colls
815                    .write()
816                    .insert(name.to_string(), coll.clone());
817                return Some(coll);
818            }
819        }
820        None
821    }
822
823    /// Returns a `MetadataCollection` by name.
824    ///
825    /// Checks the typed registry first.  Falls back to opening from disk for
826    /// collections created before the typed API existed or after a restart.
827    /// The instance is cached to avoid repeated disk reads.
828    ///
829    /// Returns `None` if the collection does not exist on disk.
830    #[must_use]
831    pub fn get_metadata_collection(&self, name: &str) -> Option<MetadataCollection> {
832        if let Some(c) = self.metadata_colls.read().get(name).cloned() {
833            return Some(c);
834        }
835        // Fallback: open from disk and cache.
836        // Type is verified via config.json: metadata_only must be true before caching.
837        let path = self.data_dir.join(name);
838        let config_path = path.join("config.json");
839        if config_path.exists() {
840            let data = std::fs::read_to_string(&config_path).ok()?;
841            let cfg = serde_json::from_str::<crate::collection::CollectionConfig>(&data).ok()?;
842            if !cfg.metadata_only {
843                return None;
844            }
845            if let Ok(coll) = MetadataCollection::open(path) {
846                self.metadata_colls
847                    .write()
848                    .insert(name.to_string(), coll.clone());
849                return Some(coll);
850            }
851        }
852        None
853    }
854
855    // =========================================================================
856    // Legacy typed creation (kept for backward compatibility)
857    // =========================================================================
858
859    /// Creates a new collection with a specific type (Vector or `MetadataOnly`).
860    ///
861    /// # Arguments
862    ///
863    /// * `name` - Unique name for the collection
864    /// * `collection_type` - Type of collection to create
865    ///
866    /// # Errors
867    ///
868    /// Returns an error if a collection with the same name already exists.
869    ///
870    /// # Examples
871    ///
872    /// ```rust,ignore
873    /// use velesdb_core::{Database, CollectionType, DistanceMetric, StorageMode};
874    ///
875    /// let db = Database::open("./data")?;
876    ///
877    /// // Create a metadata-only collection
878    /// db.create_collection_typed("products", CollectionType::MetadataOnly)?;
879    ///
880    /// // Create a vector collection
881    /// db.create_collection_typed("embeddings", CollectionType::Vector {
882    ///     dimension: 768,
883    ///     metric: DistanceMetric::Cosine,
884    ///     storage_mode: StorageMode::Full,
885    /// })?;
886    /// ```
887    pub fn create_collection_typed(
888        &self,
889        name: &str,
890        collection_type: &CollectionType,
891    ) -> Result<()> {
892        // Delegate to the typed APIs so all registries stay in sync.
893        match collection_type {
894            CollectionType::Vector {
895                dimension,
896                metric,
897                storage_mode,
898            } => {
899                self.create_vector_collection_with_options(name, *dimension, *metric, *storage_mode)
900            }
901            CollectionType::MetadataOnly => self.create_metadata_collection(name),
902            CollectionType::Graph {
903                dimension,
904                metric,
905                schema,
906            } => {
907                // Graph with optional embeddings: delegate to the graph API if schema given.
908                self.ensure_collection_name_available(name)?;
909                let path = self.data_dir.join(name);
910                let coll =
911                    GraphCollection::create(path, name, *dimension, *metric, schema.clone())?;
912                self.collections
913                    .write()
914                    .insert(name.to_string(), coll.inner.clone());
915                self.graph_colls.write().insert(name.to_string(), coll);
916                if let Some(ref obs) = self.observer {
917                    obs.on_collection_created(name, collection_type);
918                }
919                // Bump schema version (CACHE-01 DDL invalidation).
920                self.schema_version
921                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
922                Ok(())
923            }
924        }
925    }
926
927    /// Loads existing collections from disk.
928    ///
929    /// # Deprecation note
930    ///
931    /// **This method is called automatically by [`Database::open`].**
932    /// There is no need to call it manually. It is kept public only for
933    /// backward compatibility with code that relied on the old two-step pattern.
934    ///
935    /// # Errors
936    ///
937    /// Returns an error if collection directories cannot be read.
938    pub fn load_collections(&self) -> Result<()> {
939        let mut loaded_count: usize = 0;
940
941        for entry in std::fs::read_dir(&self.data_dir)? {
942            let entry = entry?;
943            let path = entry.path();
944
945            if !path.is_dir() {
946                continue;
947            }
948            let config_path = path.join("config.json");
949            if !config_path.exists() {
950                continue;
951            }
952
953            let name = path
954                .file_name()
955                .and_then(|n| n.to_str())
956                .unwrap_or("unknown")
957                .to_string();
958
959            // Skip names already present in the legacy registry.
960            if self.collections.read().contains_key(&name) {
961                continue;
962            }
963
964            // Read config to determine the concrete type before opening.
965            let cfg_data = match std::fs::read_to_string(&config_path) {
966                Ok(d) => d,
967                Err(e) => {
968                    tracing::warn!(error = %e, name, "Cannot read config.json — skipping");
969                    continue;
970                }
971            };
972            let cfg = match serde_json::from_str::<crate::collection::CollectionConfig>(&cfg_data) {
973                Ok(c) => c,
974                Err(e) => {
975                    tracing::warn!(error = %e, name, "Cannot parse config.json — skipping");
976                    continue;
977                }
978            };
979
980            if cfg.graph_schema.is_some() {
981                // Graph collection
982                match GraphCollection::open(path.clone()) {
983                    Ok(coll) => {
984                        self.collections
985                            .write()
986                            .insert(name.clone(), coll.inner.clone());
987                        self.graph_colls.write().insert(name, coll);
988                        loaded_count += 1;
989                    }
990                    Err(e) => {
991                        tracing::warn!(error = %e, name = %path.display(), "Failed to load graph collection");
992                    }
993                }
994            } else if cfg.metadata_only {
995                // Metadata-only collection
996                match MetadataCollection::open(path.clone()) {
997                    Ok(coll) => {
998                        self.collections
999                            .write()
1000                            .insert(name.clone(), coll.inner.clone());
1001                        self.metadata_colls.write().insert(name, coll);
1002                        loaded_count += 1;
1003                    }
1004                    Err(e) => {
1005                        tracing::warn!(error = %e, name = %path.display(), "Failed to load metadata collection");
1006                    }
1007                }
1008            } else {
1009                // Vector collection
1010                match VectorCollection::open(path.clone()) {
1011                    Ok(coll) => {
1012                        self.collections
1013                            .write()
1014                            .insert(name.clone(), coll.inner.clone());
1015                        self.vector_colls.write().insert(name, coll);
1016                        loaded_count += 1;
1017                    }
1018                    Err(e) => {
1019                        tracing::warn!(error = %e, name = %path.display(), "Failed to load vector collection");
1020                    }
1021                }
1022            }
1023        }
1024
1025        // Bump schema_version if at least one collection was loaded from disk (C-3).
1026        //
1027        // This ensures that any plan key built before load_collections() ran
1028        // (schema_version = 0) will never match a key built after it
1029        // (schema_version ≥ 1), preventing the plan cache from serving a stale
1030        // plan for a collection that was not yet visible in the registry.
1031        if loaded_count > 0 {
1032            self.schema_version
1033                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1034        }
1035
1036        Ok(())
1037    }
1038
1039    fn execute_dml(
1040        &self,
1041        dml: &crate::velesql::DmlStatement,
1042        params: &std::collections::HashMap<String, serde_json::Value>,
1043    ) -> Result<Vec<SearchResult>> {
1044        match dml {
1045            crate::velesql::DmlStatement::Insert(stmt) => self.execute_insert(stmt, params),
1046            crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
1047        }
1048    }
1049
1050    fn execute_insert(
1051        &self,
1052        stmt: &crate::velesql::InsertStatement,
1053        params: &std::collections::HashMap<String, serde_json::Value>,
1054    ) -> Result<Vec<SearchResult>> {
1055        let collection = self
1056            .get_collection(&stmt.table)
1057            .or_else(|| self.get_vector_collection(&stmt.table).map(|vc| vc.inner))
1058            .ok_or_else(|| Error::CollectionNotFound(stmt.table.clone()))?;
1059
1060        let mut id: Option<u64> = None;
1061        let mut payload = serde_json::Map::new();
1062        let mut vector: Option<Vec<f32>> = None;
1063
1064        for (column, value_expr) in stmt.columns.iter().zip(&stmt.values) {
1065            let resolved = Self::resolve_dml_value(value_expr, params)?;
1066            if column == "id" {
1067                id = Some(Self::json_to_u64_id(&resolved)?);
1068                continue;
1069            }
1070            if column == "vector" {
1071                vector = Some(Self::json_to_vector(&resolved)?);
1072                continue;
1073            }
1074            payload.insert(column.clone(), resolved);
1075        }
1076
1077        let point_id =
1078            id.ok_or_else(|| Error::Query("INSERT requires integer 'id' column".to_string()))?;
1079        let point = if collection.is_metadata_only() {
1080            if vector.is_some() {
1081                return Err(Error::Query(
1082                    "INSERT on metadata-only collection cannot set 'vector'".to_string(),
1083                ));
1084            }
1085            crate::Point::metadata_only(point_id, serde_json::Value::Object(payload))
1086        } else {
1087            let vec_value = vector.ok_or_else(|| {
1088                Error::Query("INSERT on vector collection requires 'vector' column".to_string())
1089            })?;
1090            crate::Point::new(
1091                point_id,
1092                vec_value,
1093                Some(serde_json::Value::Object(payload)),
1094            )
1095        };
1096
1097        let result = SearchResult::new(point.clone(), 0.0);
1098        collection.upsert(vec![point])?;
1099        Ok(vec![result])
1100    }
1101
1102    fn execute_update(
1103        &self,
1104        stmt: &crate::velesql::UpdateStatement,
1105        params: &std::collections::HashMap<String, serde_json::Value>,
1106    ) -> Result<Vec<SearchResult>> {
1107        let collection = self
1108            .get_collection(&stmt.table)
1109            .or_else(|| self.get_vector_collection(&stmt.table).map(|vc| vc.inner))
1110            .ok_or_else(|| Error::CollectionNotFound(stmt.table.clone()))?;
1111
1112        let assignments = stmt
1113            .assignments
1114            .iter()
1115            .map(|a| Ok((a.column.clone(), Self::resolve_dml_value(&a.value, params)?)))
1116            .collect::<Result<Vec<_>>>()?;
1117
1118        if assignments.iter().any(|(name, _)| name == "id") {
1119            return Err(Error::Query(
1120                "UPDATE cannot modify primary key column 'id'".to_string(),
1121            ));
1122        }
1123
1124        let all_ids = collection.all_ids();
1125        let rows = collection.get(&all_ids);
1126        let filter = Self::build_update_filter(stmt.where_clause.as_ref())?;
1127
1128        let mut updated_points = Vec::new();
1129        for point in rows.into_iter().flatten() {
1130            if !Self::matches_update_filter(&point, filter.as_ref()) {
1131                continue;
1132            }
1133
1134            let mut payload_map = point
1135                .payload
1136                .as_ref()
1137                .and_then(serde_json::Value::as_object)
1138                .cloned()
1139                .unwrap_or_default();
1140
1141            let mut updated_vector = point.vector.clone();
1142
1143            for (field, value) in &assignments {
1144                if field == "vector" {
1145                    if collection.is_metadata_only() {
1146                        return Err(Error::Query(
1147                            "UPDATE on metadata-only collection cannot set 'vector'".to_string(),
1148                        ));
1149                    }
1150                    updated_vector = Self::json_to_vector(value)?;
1151                } else {
1152                    payload_map.insert(field.clone(), value.clone());
1153                }
1154            }
1155
1156            let updated = if collection.is_metadata_only() {
1157                crate::Point::metadata_only(point.id, serde_json::Value::Object(payload_map))
1158            } else {
1159                crate::Point::new(
1160                    point.id,
1161                    updated_vector,
1162                    Some(serde_json::Value::Object(payload_map)),
1163                )
1164            };
1165            updated_points.push(updated);
1166        }
1167
1168        if updated_points.is_empty() {
1169            return Ok(Vec::new());
1170        }
1171
1172        let results = updated_points
1173            .iter()
1174            .map(|p| SearchResult::new(p.clone(), 0.0))
1175            .collect();
1176        collection.upsert(updated_points)?;
1177        Ok(results)
1178    }
1179
1180    /// Executes a `TRAIN QUANTIZER` statement.
1181    ///
1182    /// Trains a PQ/OPQ/RaBitQ codebook on the collection's vectors, stores the
1183    /// resulting quantizer, updates storage mode, and persists the codebook to disk.
1184    ///
1185    /// # Lock Ordering
1186    ///
1187    /// Vectors are extracted under `vector_storage` read lock, which is released
1188    /// before acquiring `pq_quantizer` write lock (respects canonical lock order).
1189    ///
1190    /// # Errors
1191    ///
1192    /// - `Error::CollectionNotFound` if the target collection does not exist.
1193    /// - `Error::InvalidQuantizerConfig` for invalid params (m=0, dim%m!=0, already trained).
1194    /// - `Error::TrainingFailed` if the underlying training algorithm errors.
1195    #[allow(
1196        clippy::too_many_lines,
1197        clippy::cast_possible_truncation,
1198        clippy::cast_sign_loss
1199    )]
1200    fn execute_train(&self, stmt: &crate::velesql::TrainStatement) -> Result<Vec<SearchResult>> {
1201        use crate::velesql::WithValue;
1202
1203        // 1. Look up collection
1204        let collection = self
1205            .get_collection(&stmt.collection)
1206            .or_else(|| {
1207                self.get_vector_collection(&stmt.collection)
1208                    .map(|vc| vc.inner)
1209            })
1210            .ok_or_else(|| Error::CollectionNotFound(stmt.collection.clone()))?;
1211
1212        // 2. Extract parameters from stmt.params
1213        let m = stmt
1214            .params
1215            .get("m")
1216            .and_then(WithValue::as_integer)
1217            .map_or(0_usize, |v| v.max(0) as usize);
1218        let k = stmt
1219            .params
1220            .get("k")
1221            .and_then(WithValue::as_integer)
1222            .map_or(256_usize, |v| v.max(0) as usize);
1223        let train_type = stmt
1224            .params
1225            .get("type")
1226            .and_then(WithValue::as_str)
1227            .unwrap_or("pq")
1228            .to_lowercase();
1229        let oversampling = stmt
1230            .params
1231            .get("oversampling")
1232            .and_then(WithValue::as_integer)
1233            .map_or(4_u32, |v| v.max(0) as u32);
1234        let sample_limit = stmt
1235            .params
1236            .get("sample")
1237            .and_then(WithValue::as_integer)
1238            .map(|v| v.max(0) as usize);
1239        let force = stmt
1240            .params
1241            .get("force")
1242            .and_then(WithValue::as_bool)
1243            .unwrap_or(false);
1244
1245        // 3. Validate m and k (basic checks before dimension check)
1246        if m == 0 {
1247            return Err(Error::InvalidQuantizerConfig(
1248                "m (num_subspaces) must be > 0".into(),
1249            ));
1250        }
1251        if k == 0 {
1252            return Err(Error::InvalidQuantizerConfig(
1253                "k (num_centroids) must be > 0".into(),
1254            ));
1255        }
1256
1257        // 4. Read dimension from config (no heavy lock)
1258        let config = collection.config();
1259        let dim = config.dimension;
1260
1261        // RaBitQ doesn't need dimension % m check
1262        if train_type != "rabitq" && dim % m != 0 {
1263            return Err(Error::InvalidQuantizerConfig(format!(
1264                "dimension {dim} is not divisible by m={m}"
1265            )));
1266        }
1267
1268        // 5. Check if already trained (unless force)
1269        {
1270            let quantizer = collection.pq_quantizer_read();
1271            if quantizer.is_some() && !force {
1272                return Err(Error::InvalidQuantizerConfig(
1273                    "Quantizer already trained. Use force=true to retrain.".into(),
1274                ));
1275            }
1276        }
1277
1278        // 6. Extract vectors (read lock on storage, released before pq_quantizer write)
1279        let all_ids = collection.all_ids();
1280        let points = collection.get(&all_ids);
1281        let mut vectors: Vec<Vec<f32>> = points
1282            .into_iter()
1283            .flatten()
1284            .filter(|p| !p.vector.is_empty())
1285            .map(|p| p.vector)
1286            .collect();
1287
1288        // Optional sampling
1289        if let Some(limit) = sample_limit {
1290            if vectors.len() > limit {
1291                vectors.truncate(limit);
1292            }
1293        }
1294
1295        if vectors.is_empty() {
1296            return Err(Error::TrainingFailed(
1297                "no vectors available for training".into(),
1298            ));
1299        }
1300
1301        // 7. Train based on type
1302        match train_type.as_str() {
1303            "pq" => {
1304                let pq = crate::quantization::ProductQuantizer::train(&vectors, m, k)
1305                    .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1306
1307                let codebook_size = pq.codebook.num_subspaces * pq.codebook.num_centroids;
1308                let n_train = vectors.len();
1309
1310                // Persist codebook
1311                pq.save_codebook(collection.data_path())
1312                    .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1313
1314                // Store quantizer (write lock)
1315                *collection.pq_quantizer_write() = Some(pq);
1316
1317                // Update config
1318                {
1319                    let mut cfg = collection.config_write();
1320                    cfg.storage_mode = StorageMode::ProductQuantization;
1321                    cfg.pq_rescore_oversampling = Some(oversampling);
1322                }
1323                collection.save_config()?;
1324
1325                Ok(vec![SearchResult::new(
1326                    crate::Point::metadata_only(
1327                        0,
1328                        serde_json::json!({
1329                            "status": "trained",
1330                            "type": "pq",
1331                            "m": m,
1332                            "k": k,
1333                            "codebook_size": codebook_size,
1334                            "training_vectors": n_train
1335                        }),
1336                    ),
1337                    0.0,
1338                )])
1339            }
1340            "opq" => {
1341                let pq = crate::quantization::train_opq(&vectors, m, k, true, 10)
1342                    .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1343
1344                let codebook_size = pq.codebook.num_subspaces * pq.codebook.num_centroids;
1345                let n_train = vectors.len();
1346
1347                // Persist codebook + rotation
1348                pq.save_codebook(collection.data_path())
1349                    .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1350                pq.save_rotation(collection.data_path())
1351                    .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1352
1353                *collection.pq_quantizer_write() = Some(pq);
1354
1355                {
1356                    let mut cfg = collection.config_write();
1357                    cfg.storage_mode = StorageMode::ProductQuantization;
1358                    cfg.pq_rescore_oversampling = Some(oversampling);
1359                }
1360                collection.save_config()?;
1361
1362                Ok(vec![SearchResult::new(
1363                    crate::Point::metadata_only(
1364                        0,
1365                        serde_json::json!({
1366                            "status": "trained",
1367                            "type": "opq",
1368                            "m": m,
1369                            "k": k,
1370                            "codebook_size": codebook_size,
1371                            "training_vectors": n_train
1372                        }),
1373                    ),
1374                    0.0,
1375                )])
1376            }
1377            "rabitq" => {
1378                let rbq = crate::quantization::RaBitQIndex::train(&vectors, 42)
1379                    .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1380
1381                // Persist
1382                rbq.save(collection.data_path())
1383                    .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1384
1385                // RaBitQ doesn't use pq_quantizer Arc, but update storage_mode
1386                {
1387                    let mut cfg = collection.config_write();
1388                    cfg.storage_mode = StorageMode::RaBitQ;
1389                    cfg.pq_rescore_oversampling = Some(oversampling);
1390                }
1391                collection.save_config()?;
1392
1393                Ok(vec![SearchResult::new(
1394                    crate::Point::metadata_only(
1395                        0,
1396                        serde_json::json!({
1397                            "status": "trained",
1398                            "type": "rabitq",
1399                            "dimension": dim,
1400                            "training_vectors": vectors.len()
1401                        }),
1402                    ),
1403                    0.0,
1404                )])
1405            }
1406            other => Err(Error::InvalidQuantizerConfig(format!(
1407                "unknown quantizer type: '{other}'. Supported: pq, opq, rabitq"
1408            ))),
1409        }
1410    }
1411}
1412
1413#[cfg(feature = "persistence")]
1414mod database_helpers;
1415
1416#[cfg(all(test, feature = "persistence"))]
1417mod database_tests;