Skip to main content

velesdb_core/database/
collection_ops.rs

1//! Collection CRUD dispatcher: create, delete, list, get, and diagnostics.
2//!
3//! Type-specific operations are in sibling modules:
4//! - [`vector_ops`] — vector collection create/get
5//! - [`graph_ops`] — graph collection create/get
6//! - [`metadata_ops`] — metadata-only collection create/get
7
8use crate::collection::AnyCollection;
9use crate::{CollectionType, DistanceMetric, Error, Result, StorageMode};
10
11use super::Database;
12
13impl Database {
14    /// Ensures a collection name is valid, free in memory, and free on disk.
15    ///
16    /// Validates the name against path traversal and forbidden characters
17    /// **before** any filesystem operation, then checks that no collection
18    /// with the same name already exists in any registry or on disk, and
19    /// finally enforces the `LimitsConfig::max_collections` cap so that
20    /// callers are refused cleanly instead of filling the registry past
21    /// the configured ceiling.
22    pub(super) fn ensure_collection_name_available(&self, name: &str) -> Result<()> {
23        crate::validation::validate_collection_name(name)?;
24
25        if self.collection_exists_in_registry(name) {
26            return Err(Error::CollectionExists(name.to_string()));
27        }
28
29        let collection_path = self.data_dir.join(name);
30        if collection_path.exists() {
31            return Err(Error::CollectionExists(name.to_string()));
32        }
33
34        // Wave 3 Commit 7 — enforce `LimitsConfig::max_collections`.
35        //
36        // Counted across every typed registry (vector + graph + metadata)
37        // because the limit is tenant-wide, not per-type. Evaluated after
38        // the name validation and duplicate checks so the typed error
39        // precedence stays unchanged: invalid name and duplicate still
40        // win over the cap — callers that want to detect "too many
41        // collections" specifically rely on the `GuardRail` variant.
42        let total_collections = self.vector_colls.read().len()
43            + self.graph_colls.read().len()
44            + self.metadata_colls.read().len();
45        let cap = self.config.limits.max_collections;
46        if total_collections >= cap {
47            return Err(Error::GuardRail(format!(
48                "max_collections limit reached ({total_collections} / {cap}); \
49                 raise `limits.max_collections` in VelesConfig to create more"
50            )));
51        }
52
53        Ok(())
54    }
55
56    /// Checks whether a collection name exists in any of the typed registries.
57    fn collection_exists_in_registry(&self, name: &str) -> bool {
58        self.vector_colls.read().contains_key(name)
59            || self.graph_colls.read().contains_key(name)
60            || self.metadata_colls.read().contains_key(name)
61    }
62
63    /// Enforces `LimitsConfig::max_dimensions` on a prospective vector
64    /// collection creation.
65    ///
66    /// Complements [`crate::validation::validate_dimension`] (the static
67    /// `65_536` hard ceiling): the config-driven limit is typically tighter
68    /// — 4096 by default — and is consulted here so the guard-rail can
69    /// be relaxed per tenant via [`Database::open_with_config`] without
70    /// touching the static constant.
71    ///
72    /// Dimension `0` is accepted because it is the sentinel used by
73    /// metadata-only and graph-without-embeddings collections. Callers
74    /// that need to reject zero should do so upstream via
75    /// [`crate::validation::validate_dimension`].
76    pub(super) fn enforce_vector_dimension_limit(&self, dimension: usize) -> Result<()> {
77        if dimension == 0 {
78            return Ok(());
79        }
80        let cap = self.config.limits.max_dimensions;
81        if dimension > cap {
82            return Err(Error::GuardRail(format!(
83                "vector dimension {dimension} exceeds configured max_dimensions cap of {cap}; \
84                 raise `limits.max_dimensions` in VelesConfig to allow larger vectors"
85            )));
86        }
87        Ok(())
88    }
89
90    /// Creates a new collection with the specified parameters.
91    ///
92    /// # Arguments
93    ///
94    /// * `name` - Unique name for the collection
95    /// * `dimension` - Vector dimension (e.g., 768 for many embedding models)
96    /// * `metric` - Distance metric to use for similarity calculations
97    ///
98    /// # Errors
99    ///
100    /// - Returns `Error::CollectionExists` if a collection with the same name already exists.
101    /// - Returns an error if the directory cannot be created or storage initialization fails.
102    ///
103    /// # Examples
104    ///
105    /// ```rust,no_run
106    /// # use velesdb_core::{Database, DistanceMetric};
107    /// let db = Database::open("./data")?;
108    /// db.create_collection("documents", 768, DistanceMetric::Cosine)?;
109    /// # Ok::<(), velesdb_core::Error>(())
110    /// ```
111    pub fn create_collection(
112        &self,
113        name: &str,
114        dimension: usize,
115        metric: DistanceMetric,
116    ) -> Result<()> {
117        self.create_collection_with_options(name, dimension, metric, StorageMode::default())
118    }
119
120    /// Creates a new collection with custom storage options.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if a collection with the same name already exists.
125    pub fn create_collection_with_options(
126        &self,
127        name: &str,
128        dimension: usize,
129        metric: DistanceMetric,
130        storage_mode: StorageMode,
131    ) -> Result<()> {
132        self.create_vector_collection_with_options(name, dimension, metric, storage_mode)
133    }
134
135    /// Returns a type-erased collection handle by name.
136    ///
137    /// Checks vector → graph → metadata registries in order.
138    /// Returns `None` if no collection with the given name exists.
139    #[must_use]
140    pub fn get_any_collection(&self, name: &str) -> Option<AnyCollection> {
141        if let Some(c) = self.get_vector_collection(name) {
142            return Some(AnyCollection::Vector(c));
143        }
144        if let Some(c) = self.get_graph_collection(name) {
145            return Some(AnyCollection::Graph(c));
146        }
147        if let Some(c) = self.get_metadata_collection(name) {
148            return Some(AnyCollection::Metadata(c));
149        }
150        None
151    }
152
153    /// Returns the write generation for a named collection, if it exists.
154    #[must_use]
155    pub fn collection_write_generation(&self, name: &str) -> Option<u64> {
156        if let Some(vc) = self.vector_colls.read().get(name) {
157            return Some(vc.inner.write_generation());
158        }
159        if let Some(gc) = self.graph_colls.read().get(name) {
160            return Some(gc.inner.write_generation());
161        }
162        if let Some(mc) = self.metadata_colls.read().get(name) {
163            return Some(mc.inner.write_generation());
164        }
165        None
166    }
167
168    /// Returns the set of payload field names covered by a secondary index
169    /// for the named collection (issue #607). Empty set when the collection
170    /// has no indexes or does not exist.
171    ///
172    /// Used by `Database::build_plan_with_stats` to thread the real
173    /// indexed-field set into `QueryPlan::from_query_with_stats` so that
174    /// `IndexLookup` plan nodes are generated in the EXPLAIN tree when a
175    /// WHERE clause targets an indexed column.
176    #[must_use]
177    pub fn indexed_fields_for(&self, name: &str) -> std::collections::HashSet<String> {
178        if let Some(vc) = self.vector_colls.read().get(name) {
179            return vc.inner.indexed_field_names();
180        }
181        if let Some(gc) = self.graph_colls.read().get(name) {
182            return gc.inner.indexed_field_names();
183        }
184        if let Some(mc) = self.metadata_colls.read().get(name) {
185            return mc.inner.indexed_field_names();
186        }
187        std::collections::HashSet::new()
188    }
189
190    /// Returns the analyze generation for a named collection, if it exists
191    /// (issue #608).
192    ///
193    /// Parallel to [`Self::collection_write_generation`], but tracks `ANALYZE`
194    /// invocations instead of data mutations. Threaded into the compiled plan
195    /// cache key so that an `ANALYZE` run alone invalidates cached plans whose
196    /// cost estimates pre-date the fresh calibrated statistics.
197    #[must_use]
198    pub fn collection_analyze_generation(&self, name: &str) -> Option<u64> {
199        if let Some(vc) = self.vector_colls.read().get(name) {
200            return Some(vc.inner.analyze_generation());
201        }
202        if let Some(gc) = self.graph_colls.read().get(name) {
203            return Some(gc.inner.analyze_generation());
204        }
205        if let Some(mc) = self.metadata_colls.read().get(name) {
206            return Some(mc.inner.analyze_generation());
207        }
208        None
209    }
210
211    /// Lists all collection names in the database.
212    ///
213    /// Includes collections created via any typed API (vector, graph, metadata).
214    pub fn list_collections(&self) -> Vec<String> {
215        let vector_colls = self.vector_colls.read();
216        let graph_colls = self.graph_colls.read();
217        let metadata_colls = self.metadata_colls.read();
218
219        let mut names: std::collections::HashSet<String> = vector_colls.keys().cloned().collect();
220        for k in graph_colls.keys() {
221            names.insert(k.clone());
222        }
223        for k in metadata_colls.keys() {
224            names.insert(k.clone());
225        }
226        let mut result: Vec<String> = names.into_iter().collect();
227        result.sort();
228        result
229    }
230
231    /// Deletes a collection by name.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if the name is invalid or the collection does not
236    /// exist in any registry.
237    pub fn delete_collection(&self, name: &str) -> Result<()> {
238        crate::validation::validate_collection_name(name)?;
239
240        if !self.collection_exists_in_registry(name) {
241            return Err(Error::CollectionNotFound(name.to_string()));
242        }
243
244        let collection_path = self.data_dir.join(name);
245        if collection_path.exists() {
246            std::fs::remove_dir_all(&collection_path)?;
247        }
248
249        self.remove_from_all_registries(name);
250
251        if let Some(ref obs) = self.observer {
252            obs.on_collection_deleted(name);
253        }
254
255        self.schema_version
256            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
257
258        Ok(())
259    }
260
261    /// Removes a collection from all registries and stats cache.
262    fn remove_from_all_registries(&self, name: &str) {
263        self.vector_colls.write().remove(name);
264        self.graph_colls.write().remove(name);
265        self.metadata_colls.write().remove(name);
266        self.collection_stats.write().remove(name);
267    }
268
269    /// Creates a new collection with a specific type (Vector, Graph, or `MetadataOnly`).
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if a collection with the same name already exists.
274    pub fn create_collection_typed(
275        &self,
276        name: &str,
277        collection_type: &CollectionType,
278    ) -> Result<()> {
279        match collection_type {
280            CollectionType::Vector {
281                dimension,
282                metric,
283                storage_mode,
284            } => {
285                self.create_vector_collection_with_options(name, *dimension, *metric, *storage_mode)
286            }
287            CollectionType::MetadataOnly => self.create_metadata_collection(name),
288            CollectionType::Graph {
289                dimension,
290                metric,
291                schema,
292            } => self.create_graph_collection_from_type(name, *dimension, *metric, schema),
293        }
294    }
295
296    /// Reads and parses `config.json` from a collection directory.
297    ///
298    /// Returns `None` if the name is invalid, the config file does not exist,
299    /// or the config cannot be parsed.
300    pub(super) fn read_collection_config(
301        &self,
302        name: &str,
303    ) -> Option<crate::collection::CollectionConfig> {
304        if crate::validation::validate_collection_name(name).is_err() {
305            return None;
306        }
307        let path = self.data_dir.join(name);
308        let config_path = path.join("config.json");
309        if !config_path.exists() {
310            return None;
311        }
312        let data = std::fs::read_to_string(&config_path).ok()?;
313        serde_json::from_str(&data).ok()
314    }
315
316    /// Propagates updated query limits to all active collections.
317    pub fn update_guardrails(&self, limits: &crate::guardrails::QueryLimits) {
318        for vc in self.vector_colls.read().values() {
319            vc.guard_rails().update_limits(limits);
320        }
321        for gc in self.graph_colls.read().values() {
322            gc.inner.guard_rails().update_limits(limits);
323        }
324        for mc in self.metadata_colls.read().values() {
325            mc.inner.guard_rails().update_limits(limits);
326        }
327    }
328
329    /// Returns diagnostics for a named collection.
330    ///
331    /// # Errors
332    ///
333    /// Returns `Error::CollectionNotFound` if the collection does not exist.
334    pub fn collection_diagnostics(
335        &self,
336        name: &str,
337    ) -> Result<crate::collection::CollectionDiagnostics> {
338        if let Some(c) = self.get_vector_collection(name) {
339            return Ok(c.diagnostics());
340        }
341        if let Some(c) = self.get_graph_collection(name) {
342            return Ok(c.diagnostics());
343        }
344        if let Some(c) = self.get_metadata_collection(name) {
345            return Ok(c.diagnostics());
346        }
347        Err(Error::CollectionNotFound(name.to_string()))
348    }
349}