Skip to main content

velesdb_core/database/
mod.rs

1//! Database facade and orchestration layer for collection lifecycle and query routing.
2//!
3//! This module is split into focused submodules:
4//!
5//! - [`collection_ops`] — Collection CRUD dispatcher (create, delete, list, get)
6//! - [`vector_ops`] — Vector collection create/get
7//! - [`graph_ops`] — Graph collection create/get
8//! - [`metadata_ops`] — Metadata-only collection create/get
9//! - [`query_engine`] — `VelesQL` query execution, plan caching, DML
10//! - [`persistence`] — Loading collections from disk at startup
11//! - [`training`] — `TRAIN QUANTIZER` statement execution
12//! - [`stats`] — Collection statistics (analyze, cache)
13//! - [`database_helpers`] — DML value conversion and JOIN column store helpers
14
15use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
16use crate::observer::DatabaseObserver;
17use crate::simd_dispatch;
18#[allow(deprecated)]
19use crate::{Collection, ColumnStore, Error, Result};
20
21mod collection_ops;
22mod graph_ops;
23mod metadata_ops;
24mod persistence;
25mod query_engine;
26mod stats;
27mod training;
28mod vector_ops;
29
30#[cfg(feature = "persistence")]
31mod database_helpers;
32
33#[cfg(all(test, feature = "persistence"))]
34mod collection_ops_tests;
35#[cfg(all(test, feature = "persistence"))]
36mod database_tests;
37#[cfg(all(test, feature = "persistence"))]
38mod graph_ops_tests;
39#[cfg(all(test, feature = "persistence"))]
40mod query_engine_tests;
41#[cfg(all(test, feature = "persistence"))]
42mod stats_tests;
43
44/// Database instance managing collections and storage.
45///
46/// # Lifecycle
47///
48/// `Database::open()` automatically loads all previously created collections from disk.
49/// There is no need to call `load_collections()` separately.
50///
51/// # Extension (Premium)
52///
53/// Use [`Database::open_with_observer`] to inject a [`DatabaseObserver`] implementation
54/// from `velesdb-premium` without modifying this crate.
55#[cfg(feature = "persistence")]
56pub struct Database {
57    /// Path to the data directory
58    data_dir: std::path::PathBuf,
59    /// Exclusive file lock preventing multi-process corruption.
60    ///
61    /// The lock is held for the lifetime of the `Database` and released on `Drop`.
62    /// The `_` prefix signals this field is kept for its RAII side effect.
63    _lock_file: std::fs::File,
64    /// Legacy registry (Collection god-object) — kept for backward compatibility during migration.
65    #[allow(deprecated)]
66    collections: parking_lot::RwLock<std::collections::HashMap<String, Collection>>,
67    /// New registry: vector collections.
68    vector_colls: parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
69    /// New registry: graph collections.
70    graph_colls: parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
71    /// New registry: metadata-only collections.
72    metadata_colls: parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
73    /// Cached collection statistics for CBO planning.
74    collection_stats: parking_lot::RwLock<
75        std::collections::HashMap<String, crate::collection::stats::CollectionStats>,
76    >,
77    /// Optional lifecycle observer (used by velesdb-premium for RBAC, audit, multi-tenant).
78    observer: Option<std::sync::Arc<dyn DatabaseObserver>>,
79    /// Monotonic DDL schema version counter (CACHE-01).
80    ///
81    /// Incremented on every create/drop collection operation.
82    /// Used by `CompiledPlanCache` to invalidate cached query plans.
83    schema_version: std::sync::atomic::AtomicU64,
84    /// Compiled query plan cache (CACHE-02).
85    ///
86    /// Stores recently compiled `QueryPlan` instances keyed by `PlanKey`.
87    /// Default sizing: L1 = 1K hot entries, L2 = 10K LRU entries.
88    compiled_plan_cache: crate::cache::CompiledPlanCache,
89}
90
91#[cfg(feature = "persistence")]
92impl Database {
93    /// Opens or creates a database, **automatically loading all existing collections**.
94    ///
95    /// This replaces the previous `open()` + `load_collections()` two-step pattern.
96    /// The new `open()` is a strict auto-load: all `config.json` directories under
97    /// `path` are loaded on startup.
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if the directory cannot be created or accessed.
102    pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
103        Self::open_impl(path, None)
104    }
105
106    /// Opens a database with a [`DatabaseObserver`] (used by velesdb-premium).
107    ///
108    /// The observer receives lifecycle hooks for every collection operation,
109    /// enabling RBAC, audit logging, multi-tenant routing, etc.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the directory cannot be created or accessed.
114    pub fn open_with_observer<P: AsRef<std::path::Path>>(
115        path: P,
116        observer: std::sync::Arc<dyn DatabaseObserver>,
117    ) -> Result<Self> {
118        Self::open_impl(path, Some(observer))
119    }
120
121    fn open_impl<P: AsRef<std::path::Path>>(
122        path: P,
123        observer: Option<std::sync::Arc<dyn DatabaseObserver>>,
124    ) -> Result<Self> {
125        let data_dir = path.as_ref().to_path_buf();
126        std::fs::create_dir_all(&data_dir)?;
127
128        // Acquire exclusive file lock to prevent multi-process corruption
129        let lock_path = data_dir.join("velesdb.lock");
130        let lock_file = std::fs::File::create(&lock_path)?;
131        fs2::FileExt::try_lock_exclusive(&lock_file)
132            .map_err(|_| Error::DatabaseLocked(data_dir.display().to_string()))?;
133
134        // Log SIMD features detected at startup
135        let features = simd_dispatch::simd_features_info();
136        tracing::info!(
137            avx512 = features.avx512f,
138            avx2 = features.avx2,
139            "SIMD features detected - direct dispatch enabled"
140        );
141
142        let db = Self {
143            data_dir,
144            _lock_file: lock_file,
145            collections: parking_lot::RwLock::new(std::collections::HashMap::new()),
146            vector_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
147            graph_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
148            metadata_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
149            collection_stats: parking_lot::RwLock::new(std::collections::HashMap::new()),
150            observer,
151            schema_version: std::sync::atomic::AtomicU64::new(0),
152            compiled_plan_cache: crate::cache::CompiledPlanCache::new(1_000, 10_000),
153        };
154
155        // Auto-load all existing collections from disk (replaces manual load_collections()).
156        db.load_collections()?;
157
158        Ok(db)
159    }
160
161    /// Returns the path to the data directory.
162    #[must_use]
163    pub fn data_dir(&self) -> &std::path::Path {
164        &self.data_dir
165    }
166
167    /// Returns the current DDL schema version counter.
168    #[must_use]
169    pub fn schema_version(&self) -> u64 {
170        self.schema_version
171            .load(std::sync::atomic::Ordering::Relaxed)
172    }
173
174    /// Returns a reference to the compiled query plan cache.
175    #[must_use]
176    pub fn plan_cache(&self) -> &crate::cache::CompiledPlanCache {
177        &self.compiled_plan_cache
178    }
179
180    // =========================================================================
181    // Observer notification helpers (called by server handlers after operations)
182    // =========================================================================
183
184    /// Notifies the observer that points were upserted into a collection.
185    ///
186    /// **Caller contract**: this method is NOT called automatically by
187    /// [`Database`] internals. HTTP handlers and SDK bindings are responsible
188    /// for calling it after a successful upsert, passing the number of points
189    /// written. Forgetting to call it means the observer receives no upsert
190    /// events for that operation.
191    ///
192    /// No-op when no observer is registered.
193    pub fn notify_upsert(&self, collection: &str, point_count: usize) {
194        if let Some(ref obs) = self.observer {
195            obs.on_upsert(collection, point_count);
196        }
197    }
198
199    /// Notifies the observer that a query was executed, with its duration.
200    ///
201    /// **Caller contract**: this method is NOT called automatically by
202    /// [`Database::execute_query`]. Callers must measure the wall-clock
203    /// duration themselves (e.g. `std::time::Instant::now()` before the call)
204    /// and invoke this method afterwards with the elapsed microseconds.
205    ///
206    /// No-op when no observer is registered.
207    pub fn notify_query(&self, collection: &str, duration_us: u64) {
208        if let Some(ref obs) = self.observer {
209            obs.on_query(collection, duration_us);
210        }
211    }
212}