velesdb_core/database/mod.rs
1//! Database facade and orchestration layer for collection lifecycle and query routing.
2//!
3//! This module is split into focused submodules:
4//!
5//! - [`collection_ops`] — Collection CRUD dispatcher (create, delete, list, get)
6//! - [`vector_ops`] — Vector collection create/get
7//! - [`graph_ops`] — Graph collection create/get
8//! - [`metadata_ops`] — Metadata-only collection create/get
9//! - [`query_engine`] — `VelesQL` query execution, plan caching, DML
10//! - [`persistence`] — Loading collections from disk at startup
11//! - [`training`] — `TRAIN QUANTIZER` statement execution
12//! - [`stats`] — Collection statistics (analyze, cache)
13//! - [`database_helpers`] — DML value conversion and JOIN column store helpers
14
15use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
16use crate::observer::DatabaseObserver;
17use crate::simd_dispatch;
18#[allow(deprecated)]
19use crate::{Collection, ColumnStore, Error, Result};
20
21mod collection_ops;
22mod graph_ops;
23mod metadata_ops;
24mod persistence;
25mod query_engine;
26mod stats;
27mod training;
28mod vector_ops;
29
30#[cfg(feature = "persistence")]
31mod database_helpers;
32
33#[cfg(all(test, feature = "persistence"))]
34mod collection_ops_tests;
35#[cfg(all(test, feature = "persistence"))]
36mod database_tests;
37#[cfg(all(test, feature = "persistence"))]
38mod graph_ops_tests;
39#[cfg(all(test, feature = "persistence"))]
40mod query_engine_tests;
41#[cfg(all(test, feature = "persistence"))]
42mod stats_tests;
43
44/// Database instance managing collections and storage.
45///
46/// # Lifecycle
47///
48/// `Database::open()` automatically loads all previously created collections from disk.
49/// There is no need to call `load_collections()` separately.
50///
51/// # Extension (Premium)
52///
53/// Use [`Database::open_with_observer`] to inject a [`DatabaseObserver`] implementation
54/// from `velesdb-premium` without modifying this crate.
55#[cfg(feature = "persistence")]
56pub struct Database {
57 /// Path to the data directory
58 data_dir: std::path::PathBuf,
59 /// Exclusive file lock preventing multi-process corruption.
60 ///
61 /// The lock is held for the lifetime of the `Database` and released on `Drop`.
62 /// The `_` prefix signals this field is kept for its RAII side effect.
63 _lock_file: std::fs::File,
64 /// Legacy registry (Collection god-object) — kept for backward compatibility during migration.
65 #[allow(deprecated)]
66 collections: parking_lot::RwLock<std::collections::HashMap<String, Collection>>,
67 /// New registry: vector collections.
68 vector_colls: parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
69 /// New registry: graph collections.
70 graph_colls: parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
71 /// New registry: metadata-only collections.
72 metadata_colls: parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
73 /// Cached collection statistics for CBO planning.
74 collection_stats: parking_lot::RwLock<
75 std::collections::HashMap<String, crate::collection::stats::CollectionStats>,
76 >,
77 /// Optional lifecycle observer (used by velesdb-premium for RBAC, audit, multi-tenant).
78 observer: Option<std::sync::Arc<dyn DatabaseObserver>>,
79 /// Monotonic DDL schema version counter (CACHE-01).
80 ///
81 /// Incremented on every create/drop collection operation.
82 /// Used by `CompiledPlanCache` to invalidate cached query plans.
83 schema_version: std::sync::atomic::AtomicU64,
84 /// Compiled query plan cache (CACHE-02).
85 ///
86 /// Stores recently compiled `QueryPlan` instances keyed by `PlanKey`.
87 /// Default sizing: L1 = 1K hot entries, L2 = 10K LRU entries.
88 compiled_plan_cache: crate::cache::CompiledPlanCache,
89}
90
91#[cfg(feature = "persistence")]
92impl Database {
93 /// Opens or creates a database, **automatically loading all existing collections**.
94 ///
95 /// This replaces the previous `open()` + `load_collections()` two-step pattern.
96 /// The new `open()` is a strict auto-load: all `config.json` directories under
97 /// `path` are loaded on startup.
98 ///
99 /// # Errors
100 ///
101 /// Returns an error if the directory cannot be created or accessed.
102 pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
103 Self::open_impl(path, None)
104 }
105
106 /// Opens a database with a [`DatabaseObserver`] (used by velesdb-premium).
107 ///
108 /// The observer receives lifecycle hooks for every collection operation,
109 /// enabling RBAC, audit logging, multi-tenant routing, etc.
110 ///
111 /// # Errors
112 ///
113 /// Returns an error if the directory cannot be created or accessed.
114 pub fn open_with_observer<P: AsRef<std::path::Path>>(
115 path: P,
116 observer: std::sync::Arc<dyn DatabaseObserver>,
117 ) -> Result<Self> {
118 Self::open_impl(path, Some(observer))
119 }
120
121 fn open_impl<P: AsRef<std::path::Path>>(
122 path: P,
123 observer: Option<std::sync::Arc<dyn DatabaseObserver>>,
124 ) -> Result<Self> {
125 let data_dir = path.as_ref().to_path_buf();
126 std::fs::create_dir_all(&data_dir)?;
127
128 // Acquire exclusive file lock to prevent multi-process corruption
129 let lock_path = data_dir.join("velesdb.lock");
130 let lock_file = std::fs::File::create(&lock_path)?;
131 fs2::FileExt::try_lock_exclusive(&lock_file)
132 .map_err(|_| Error::DatabaseLocked(data_dir.display().to_string()))?;
133
134 // Log SIMD features detected at startup
135 let features = simd_dispatch::simd_features_info();
136 tracing::info!(
137 avx512 = features.avx512f,
138 avx2 = features.avx2,
139 "SIMD features detected - direct dispatch enabled"
140 );
141
142 let db = Self {
143 data_dir,
144 _lock_file: lock_file,
145 collections: parking_lot::RwLock::new(std::collections::HashMap::new()),
146 vector_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
147 graph_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
148 metadata_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
149 collection_stats: parking_lot::RwLock::new(std::collections::HashMap::new()),
150 observer,
151 schema_version: std::sync::atomic::AtomicU64::new(0),
152 compiled_plan_cache: crate::cache::CompiledPlanCache::new(1_000, 10_000),
153 };
154
155 // Auto-load all existing collections from disk (replaces manual load_collections()).
156 db.load_collections()?;
157
158 Ok(db)
159 }
160
161 /// Returns the path to the data directory.
162 #[must_use]
163 pub fn data_dir(&self) -> &std::path::Path {
164 &self.data_dir
165 }
166
167 /// Returns the current DDL schema version counter.
168 #[must_use]
169 pub fn schema_version(&self) -> u64 {
170 self.schema_version
171 .load(std::sync::atomic::Ordering::Relaxed)
172 }
173
174 /// Returns a reference to the compiled query plan cache.
175 #[must_use]
176 pub fn plan_cache(&self) -> &crate::cache::CompiledPlanCache {
177 &self.compiled_plan_cache
178 }
179
180 // =========================================================================
181 // Observer notification helpers (called by server handlers after operations)
182 // =========================================================================
183
184 /// Notifies the observer that points were upserted into a collection.
185 ///
186 /// **Caller contract**: this method is NOT called automatically by
187 /// [`Database`] internals. HTTP handlers and SDK bindings are responsible
188 /// for calling it after a successful upsert, passing the number of points
189 /// written. Forgetting to call it means the observer receives no upsert
190 /// events for that operation.
191 ///
192 /// No-op when no observer is registered.
193 pub fn notify_upsert(&self, collection: &str, point_count: usize) {
194 if let Some(ref obs) = self.observer {
195 obs.on_upsert(collection, point_count);
196 }
197 }
198
199 /// Notifies the observer that a query was executed, with its duration.
200 ///
201 /// **Caller contract**: this method is NOT called automatically by
202 /// [`Database::execute_query`]. Callers must measure the wall-clock
203 /// duration themselves (e.g. `std::time::Instant::now()` before the call)
204 /// and invoke this method afterwards with the elapsed microseconds.
205 ///
206 /// No-op when no observer is registered.
207 pub fn notify_query(&self, collection: &str, duration_us: u64) {
208 if let Some(ref obs) = self.observer {
209 obs.on_query(collection, duration_us);
210 }
211 }
212}