velesdb_core/database/collection_ops.rs
1//! Collection CRUD dispatcher: create, delete, list, get, and diagnostics.
2//!
3//! Type-specific operations are in sibling modules:
4//! - [`vector_ops`] — vector collection create/get
5//! - [`graph_ops`] — graph collection create/get
6//! - [`metadata_ops`] — metadata-only collection create/get
7
8use crate::collection::AnyCollection;
9use crate::{CollectionType, DistanceMetric, Error, Result, StorageMode};
10
11use super::Database;
12
13impl Database {
14 /// Ensures a collection name is valid, free in memory, and free on disk.
15 ///
16 /// Validates the name against path traversal and forbidden characters
17 /// **before** any filesystem operation, then checks that no collection
18 /// with the same name already exists in any registry or on disk, and
19 /// finally enforces the `LimitsConfig::max_collections` cap so that
20 /// callers are refused cleanly instead of filling the registry past
21 /// the configured ceiling.
22 pub(super) fn ensure_collection_name_available(&self, name: &str) -> Result<()> {
23 crate::validation::validate_collection_name(name)?;
24
25 if self.collection_exists_in_registry(name) {
26 return Err(Error::CollectionExists(name.to_string()));
27 }
28
29 let collection_path = self.data_dir.join(name);
30 if collection_path.exists() {
31 return Err(Error::CollectionExists(name.to_string()));
32 }
33
34 // Wave 3 Commit 7 — enforce `LimitsConfig::max_collections`.
35 //
36 // Counted across every typed registry (vector + graph + metadata)
37 // because the limit is tenant-wide, not per-type. Evaluated after
38 // the name validation and duplicate checks so the typed error
39 // precedence stays unchanged: invalid name and duplicate still
40 // win over the cap — callers that want to detect "too many
41 // collections" specifically rely on the `GuardRail` variant.
42 let total_collections = self.vector_colls.read().len()
43 + self.graph_colls.read().len()
44 + self.metadata_colls.read().len();
45 let cap = self.config.limits.max_collections;
46 if total_collections >= cap {
47 return Err(Error::GuardRail(format!(
48 "max_collections limit reached ({total_collections} / {cap}); \
49 raise `limits.max_collections` in VelesConfig to create more"
50 )));
51 }
52
53 Ok(())
54 }
55
56 /// Pushes the live [`LimitsConfig`](crate::config::LimitsConfig) ingest/
57 /// search caps into a collection (parity item E).
58 ///
59 /// Single helper reused by the vector / graph / metadata registration and
60 /// disk-open paths so all three thread the same runtime limits into the
61 /// `Collection`. The limits are **not** persisted to `config.json`: they
62 /// are re-pushed on every open from the live `VelesConfig`.
63 pub(super) fn push_runtime_limits(&self, coll: &crate::collection::Collection) {
64 coll.set_runtime_limits(crate::collection::RuntimeLimits::from_config(
65 &self.config.limits,
66 ));
67 }
68
69 /// Checks whether a collection name exists in any of the typed registries.
70 fn collection_exists_in_registry(&self, name: &str) -> bool {
71 self.vector_colls.read().contains_key(name)
72 || self.graph_colls.read().contains_key(name)
73 || self.metadata_colls.read().contains_key(name)
74 }
75
76 /// Enforces `LimitsConfig::max_dimensions` on a prospective vector
77 /// collection creation.
78 ///
79 /// Complements [`crate::validation::validate_dimension`] (the static
80 /// `65_536` hard ceiling): the config-driven limit is typically tighter
81 /// — 4096 by default — and is consulted here so the guard-rail can
82 /// be relaxed per tenant via [`Database::open_with_config`] without
83 /// touching the static constant.
84 ///
85 /// Dimension `0` is accepted because it is the sentinel used by
86 /// metadata-only and graph-without-embeddings collections. Callers
87 /// that need to reject zero should do so upstream via
88 /// [`crate::validation::validate_dimension`].
89 pub(super) fn enforce_vector_dimension_limit(&self, dimension: usize) -> Result<()> {
90 if dimension == 0 {
91 return Ok(());
92 }
93 let cap = self.config.limits.max_dimensions;
94 if dimension > cap {
95 return Err(Error::GuardRail(format!(
96 "vector dimension {dimension} exceeds configured max_dimensions cap of {cap}; \
97 raise `limits.max_dimensions` in VelesConfig to allow larger vectors"
98 )));
99 }
100 Ok(())
101 }
102
103 /// Creates a new collection with the specified parameters.
104 ///
105 /// # Arguments
106 ///
107 /// * `name` - Unique name for the collection
108 /// * `dimension` - Vector dimension (e.g., 768 for many embedding models)
109 /// * `metric` - Distance metric to use for similarity calculations
110 ///
111 /// # Errors
112 ///
113 /// - Returns `Error::CollectionExists` if a collection with the same name already exists.
114 /// - Returns an error if the directory cannot be created or storage initialization fails.
115 ///
116 /// # Examples
117 ///
118 /// ```rust,no_run
119 /// # use velesdb_core::{Database, DistanceMetric};
120 /// let db = Database::open("./data")?;
121 /// db.create_collection("documents", 768, DistanceMetric::Cosine)?;
122 /// # Ok::<(), velesdb_core::Error>(())
123 /// ```
124 pub fn create_collection(
125 &self,
126 name: &str,
127 dimension: usize,
128 metric: DistanceMetric,
129 ) -> Result<()> {
130 self.create_collection_with_options(name, dimension, metric, StorageMode::default())
131 }
132
133 /// Creates a new collection with custom storage options.
134 ///
135 /// # Errors
136 ///
137 /// Returns an error if a collection with the same name already exists.
138 pub fn create_collection_with_options(
139 &self,
140 name: &str,
141 dimension: usize,
142 metric: DistanceMetric,
143 storage_mode: StorageMode,
144 ) -> Result<()> {
145 self.create_vector_collection_with_options(name, dimension, metric, storage_mode)
146 }
147
148 /// Returns a type-erased collection handle by name.
149 ///
150 /// Checks vector → graph → metadata registries in order.
151 /// Returns `None` if no collection with the given name exists.
152 #[must_use]
153 pub fn get_any_collection(&self, name: &str) -> Option<AnyCollection> {
154 if let Some(c) = self.get_vector_collection(name) {
155 return Some(AnyCollection::Vector(c));
156 }
157 if let Some(c) = self.get_graph_collection(name) {
158 return Some(AnyCollection::Graph(c));
159 }
160 if let Some(c) = self.get_metadata_collection(name) {
161 return Some(AnyCollection::Metadata(c));
162 }
163 None
164 }
165
166 /// Returns the write generation for a named collection, if it exists.
167 #[must_use]
168 pub fn collection_write_generation(&self, name: &str) -> Option<u64> {
169 if let Some(vc) = self.vector_colls.read().get(name) {
170 return Some(vc.inner.write_generation());
171 }
172 if let Some(gc) = self.graph_colls.read().get(name) {
173 return Some(gc.inner.write_generation());
174 }
175 if let Some(mc) = self.metadata_colls.read().get(name) {
176 return Some(mc.inner.write_generation());
177 }
178 None
179 }
180
181 /// Returns the set of payload field names covered by a secondary index
182 /// for the named collection (issue #607). Empty set when the collection
183 /// has no indexes or does not exist.
184 ///
185 /// Used by `Database::build_plan_with_stats` to thread the real
186 /// indexed-field set into `QueryPlan::from_query_with_stats` so that
187 /// `IndexLookup` plan nodes are generated in the EXPLAIN tree when a
188 /// WHERE clause targets an indexed column.
189 #[must_use]
190 pub fn indexed_fields_for(&self, name: &str) -> std::collections::HashSet<String> {
191 if let Some(vc) = self.vector_colls.read().get(name) {
192 return vc.inner.indexed_field_names();
193 }
194 if let Some(gc) = self.graph_colls.read().get(name) {
195 return gc.inner.indexed_field_names();
196 }
197 if let Some(mc) = self.metadata_colls.read().get(name) {
198 return mc.inner.indexed_field_names();
199 }
200 std::collections::HashSet::new()
201 }
202
203 /// Returns the analyze generation for a named collection, if it exists
204 /// (issue #608).
205 ///
206 /// Parallel to [`Self::collection_write_generation`], but tracks `ANALYZE`
207 /// invocations instead of data mutations. Threaded into the compiled plan
208 /// cache key so that an `ANALYZE` run alone invalidates cached plans whose
209 /// cost estimates pre-date the fresh calibrated statistics.
210 #[must_use]
211 pub fn collection_analyze_generation(&self, name: &str) -> Option<u64> {
212 if let Some(vc) = self.vector_colls.read().get(name) {
213 return Some(vc.inner.analyze_generation());
214 }
215 if let Some(gc) = self.graph_colls.read().get(name) {
216 return Some(gc.inner.analyze_generation());
217 }
218 if let Some(mc) = self.metadata_colls.read().get(name) {
219 return Some(mc.inner.analyze_generation());
220 }
221 None
222 }
223
224 /// Lists all collection names in the database.
225 ///
226 /// Includes collections created via any typed API (vector, graph, metadata).
227 pub fn list_collections(&self) -> Vec<String> {
228 let vector_colls = self.vector_colls.read();
229 let graph_colls = self.graph_colls.read();
230 let metadata_colls = self.metadata_colls.read();
231
232 let mut names: std::collections::HashSet<String> = vector_colls.keys().cloned().collect();
233 for k in graph_colls.keys() {
234 names.insert(k.clone());
235 }
236 for k in metadata_colls.keys() {
237 names.insert(k.clone());
238 }
239 let mut result: Vec<String> = names.into_iter().collect();
240 result.sort();
241 result
242 }
243
244 /// Deletes a collection by name.
245 ///
246 /// # Errors
247 ///
248 /// Returns an error if the name is invalid or the collection does not
249 /// exist in any registry.
250 pub fn delete_collection(&self, name: &str) -> Result<()> {
251 crate::validation::validate_collection_name(name)?;
252
253 if !self.collection_exists_in_registry(name) {
254 return Err(Error::CollectionNotFound(name.to_string()));
255 }
256
257 let collection_path = self.data_dir.join(name);
258 if collection_path.exists() {
259 std::fs::remove_dir_all(&collection_path)?;
260 }
261
262 self.remove_from_all_registries(name);
263
264 if let Some(ref obs) = self.observer {
265 obs.on_collection_deleted(name);
266 }
267
268 self.schema_version
269 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
270
271 Ok(())
272 }
273
274 /// Removes a collection from all registries and stats cache.
275 fn remove_from_all_registries(&self, name: &str) {
276 self.vector_colls.write().remove(name);
277 self.graph_colls.write().remove(name);
278 self.metadata_colls.write().remove(name);
279 self.collection_stats.write().remove(name);
280 }
281
282 /// Creates a new collection with a specific type (Vector, Graph, or `MetadataOnly`).
283 ///
284 /// # Errors
285 ///
286 /// Returns an error if a collection with the same name already exists.
287 pub fn create_collection_typed(
288 &self,
289 name: &str,
290 collection_type: &CollectionType,
291 ) -> Result<()> {
292 match collection_type {
293 CollectionType::Vector {
294 dimension,
295 metric,
296 storage_mode,
297 } => {
298 self.create_vector_collection_with_options(name, *dimension, *metric, *storage_mode)
299 }
300 CollectionType::MetadataOnly => self.create_metadata_collection(name),
301 CollectionType::Graph {
302 dimension,
303 metric,
304 schema,
305 } => self.create_graph_collection_from_type(name, *dimension, *metric, schema),
306 }
307 }
308
309 /// Reads and parses `config.json` from a collection directory.
310 ///
311 /// Returns `None` if the name is invalid, the config file does not exist,
312 /// or the config cannot be parsed.
313 pub(super) fn read_collection_config(
314 &self,
315 name: &str,
316 ) -> Option<crate::collection::CollectionConfig> {
317 if crate::validation::validate_collection_name(name).is_err() {
318 return None;
319 }
320 let path = self.data_dir.join(name);
321 let config_path = path.join("config.json");
322 if !config_path.exists() {
323 return None;
324 }
325 let data = std::fs::read_to_string(&config_path).ok()?;
326 serde_json::from_str(&data).ok()
327 }
328
329 /// Propagates updated query limits to all active collections.
330 pub fn update_guardrails(&self, limits: &crate::guardrails::QueryLimits) {
331 for vc in self.vector_colls.read().values() {
332 vc.guard_rails().update_limits(limits);
333 }
334 for gc in self.graph_colls.read().values() {
335 gc.inner.guard_rails().update_limits(limits);
336 }
337 for mc in self.metadata_colls.read().values() {
338 mc.inner.guard_rails().update_limits(limits);
339 }
340 }
341
342 /// Returns diagnostics for a named collection.
343 ///
344 /// # Errors
345 ///
346 /// Returns `Error::CollectionNotFound` if the collection does not exist.
347 pub fn collection_diagnostics(
348 &self,
349 name: &str,
350 ) -> Result<crate::collection::CollectionDiagnostics> {
351 if let Some(c) = self.get_vector_collection(name) {
352 return Ok(c.diagnostics());
353 }
354 if let Some(c) = self.get_graph_collection(name) {
355 return Ok(c.diagnostics());
356 }
357 if let Some(c) = self.get_metadata_collection(name) {
358 return Ok(c.diagnostics());
359 }
360 Err(Error::CollectionNotFound(name.to_string()))
361 }
362}