Skip to main content

velesdb_core/
database.rs

1//! Database facade and orchestration layer for collection lifecycle and query routing.
2
3use crate::simd_dispatch;
4use crate::{
5    Collection, CollectionType, ColumnStore, DistanceMetric, Error, Result, SearchResult,
6    StorageMode,
7};
8
9/// Database instance managing collections and storage.
10#[cfg(feature = "persistence")]
11pub struct Database {
12    /// Path to the data directory
13    data_dir: std::path::PathBuf,
14    /// Collections managed by this database
15    collections: parking_lot::RwLock<std::collections::HashMap<String, Collection>>,
16    /// Cached collection statistics for CBO planning.
17    collection_stats: parking_lot::RwLock<
18        std::collections::HashMap<String, crate::collection::stats::CollectionStats>,
19    >,
20}
21
22#[cfg(feature = "persistence")]
23impl Database {
24    /// Opens or creates a database at the specified path.
25    ///
26    /// # Arguments
27    ///
28    /// * `path` - Path to the data directory
29    ///
30    /// # Errors
31    ///
32    /// Returns an error if the directory cannot be created or accessed.
33    pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
34        let data_dir = path.as_ref().to_path_buf();
35        std::fs::create_dir_all(&data_dir)?;
36
37        // Log SIMD features detected at startup
38        let features = simd_dispatch::simd_features_info();
39        tracing::info!(
40            avx512 = features.avx512f,
41            avx2 = features.avx2,
42            "SIMD features detected - direct dispatch enabled"
43        );
44
45        Ok(Self {
46            data_dir,
47            collections: parking_lot::RwLock::new(std::collections::HashMap::new()),
48            collection_stats: parking_lot::RwLock::new(std::collections::HashMap::new()),
49        })
50    }
51
52    /// Creates a new collection with the specified parameters.
53    ///
54    /// # Arguments
55    ///
56    /// * `name` - Unique name for the collection
57    /// * `dimension` - Vector dimension (e.g., 768 for many embedding models)
58    /// * `metric` - Distance metric to use for similarity calculations
59    ///
60    /// # Errors
61    ///
62    /// Returns an error if a collection with the same name already exists.
63    pub fn create_collection(
64        &self,
65        name: &str,
66        dimension: usize,
67        metric: DistanceMetric,
68    ) -> Result<()> {
69        self.create_collection_with_options(name, dimension, metric, StorageMode::default())
70    }
71
72    /// Creates a new collection with custom storage options.
73    ///
74    /// # Arguments
75    ///
76    /// * `name` - Unique name for the collection
77    /// * `dimension` - Vector dimension
78    /// * `metric` - Distance metric
79    /// * `storage_mode` - Vector storage mode (Full, SQ8, Binary)
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if a collection with the same name already exists.
84    pub fn create_collection_with_options(
85        &self,
86        name: &str,
87        dimension: usize,
88        metric: DistanceMetric,
89        storage_mode: StorageMode,
90    ) -> Result<()> {
91        let mut collections = self.collections.write();
92
93        if collections.contains_key(name) {
94            return Err(Error::CollectionExists(name.to_string()));
95        }
96
97        let collection_path = self.data_dir.join(name);
98        let collection =
99            Collection::create_with_options(collection_path, dimension, metric, storage_mode)?;
100        collections.insert(name.to_string(), collection);
101
102        Ok(())
103    }
104
105    /// Gets a reference to a collection by name.
106    ///
107    /// # Arguments
108    ///
109    /// * `name` - Name of the collection
110    ///
111    /// # Returns
112    ///
113    /// Returns `None` if the collection does not exist.
114    pub fn get_collection(&self, name: &str) -> Option<Collection> {
115        self.collections.read().get(name).cloned()
116    }
117
118    /// Analyzes a collection, caches stats, and persists them to disk.
119    pub fn analyze_collection(
120        &self,
121        name: &str,
122    ) -> Result<crate::collection::stats::CollectionStats> {
123        let collection = self
124            .get_collection(name)
125            .ok_or_else(|| Error::CollectionNotFound(name.to_string()))?;
126        let stats = collection.analyze()?;
127
128        self.collection_stats
129            .write()
130            .insert(name.to_string(), stats.clone());
131
132        let stats_path = self.data_dir.join(name).join("collection.stats.json");
133        let serialized = serde_json::to_vec_pretty(&stats)
134            .map_err(|e| Error::Serialization(format!("failed to serialize stats: {e}")))?;
135        std::fs::write(&stats_path, serialized)?;
136
137        Ok(stats)
138    }
139
140    /// Returns cached statistics when available, loading from disk if present.
141    pub fn get_collection_stats(
142        &self,
143        name: &str,
144    ) -> Result<Option<crate::collection::stats::CollectionStats>> {
145        if let Some(stats) = self.collection_stats.read().get(name).cloned() {
146            return Ok(Some(stats));
147        }
148
149        let stats_path = self.data_dir.join(name).join("collection.stats.json");
150        if !stats_path.exists() {
151            return Ok(None);
152        }
153
154        let bytes = std::fs::read(stats_path)?;
155        let stats: crate::collection::stats::CollectionStats = serde_json::from_slice(&bytes)
156            .map_err(|e| Error::Serialization(format!("failed to parse stats: {e}")))?;
157        self.collection_stats
158            .write()
159            .insert(name.to_string(), stats.clone());
160        Ok(Some(stats))
161    }
162
163    /// Executes a `VelesQL` query with database-level JOIN resolution.
164    ///
165    /// This method resolves JOIN target collections from the database registry
166    /// and executes JOIN runtime in sequence.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if the base collection or any JOIN collection is missing.
171    pub fn execute_query(
172        &self,
173        query: &crate::velesql::Query,
174        params: &std::collections::HashMap<String, serde_json::Value>,
175    ) -> Result<Vec<SearchResult>> {
176        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
177
178        if let Some(dml) = query.dml.as_ref() {
179            return self.execute_dml(dml, params);
180        }
181
182        if query.is_match_query() {
183            return Err(Error::Query(
184                "Database::execute_query does not support top-level MATCH queries. Use Collection::execute_query or pass the collection name."
185                    .to_string(),
186            ));
187        }
188
189        let base_name = query.select.from.clone();
190        let base_collection = self
191            .get_collection(&base_name)
192            .ok_or_else(|| Error::CollectionNotFound(base_name.clone()))?;
193
194        if query.select.joins.is_empty() {
195            return base_collection.execute_query(query, params);
196        }
197
198        let mut base_query = query.clone();
199        base_query.select.joins.clear();
200
201        let mut results = base_collection.execute_query(&base_query, params)?;
202        for join in &query.select.joins {
203            let join_collection = self
204                .get_collection(&join.table)
205                .ok_or_else(|| Error::CollectionNotFound(join.table.clone()))?;
206            let column_store = Self::build_join_column_store(&join_collection)?;
207            let joined = crate::collection::search::query::join::execute_join(
208                &results,
209                join,
210                &column_store,
211            )?;
212            results = crate::collection::search::query::join::joined_to_search_results(joined);
213        }
214
215        Ok(results)
216    }
217
218    /// Lists all collection names in the database.
219    pub fn list_collections(&self) -> Vec<String> {
220        self.collections.read().keys().cloned().collect()
221    }
222
223    /// Deletes a collection by name.
224    ///
225    /// # Arguments
226    ///
227    /// * `name` - Name of the collection to delete
228    ///
229    /// # Errors
230    ///
231    /// Returns an error if the collection does not exist.
232    pub fn delete_collection(&self, name: &str) -> Result<()> {
233        let mut collections = self.collections.write();
234
235        if collections.remove(name).is_none() {
236            return Err(Error::CollectionNotFound(name.to_string()));
237        }
238
239        let collection_path = self.data_dir.join(name);
240        if collection_path.exists() {
241            std::fs::remove_dir_all(collection_path)?;
242        }
243
244        Ok(())
245    }
246
247    /// Creates a new collection with a specific type (Vector or `MetadataOnly`).
248    ///
249    /// # Arguments
250    ///
251    /// * `name` - Unique name for the collection
252    /// * `collection_type` - Type of collection to create
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if a collection with the same name already exists.
257    ///
258    /// # Examples
259    ///
260    /// ```rust,ignore
261    /// use velesdb_core::{Database, CollectionType, DistanceMetric, StorageMode};
262    ///
263    /// let db = Database::open("./data")?;
264    ///
265    /// // Create a metadata-only collection
266    /// db.create_collection_typed("products", CollectionType::MetadataOnly)?;
267    ///
268    /// // Create a vector collection
269    /// db.create_collection_typed("embeddings", CollectionType::Vector {
270    ///     dimension: 768,
271    ///     metric: DistanceMetric::Cosine,
272    ///     storage_mode: StorageMode::Full,
273    /// })?;
274    /// ```
275    pub fn create_collection_typed(
276        &self,
277        name: &str,
278        collection_type: &CollectionType,
279    ) -> Result<()> {
280        let mut collections = self.collections.write();
281
282        if collections.contains_key(name) {
283            return Err(Error::CollectionExists(name.to_string()));
284        }
285
286        let collection_path = self.data_dir.join(name);
287        let collection = Collection::create_typed(collection_path, name, collection_type)?;
288        collections.insert(name.to_string(), collection);
289
290        Ok(())
291    }
292
293    /// Loads existing collections from disk.
294    ///
295    /// Call this after opening a database to load previously created collections.
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if collection directories cannot be read.
300    pub fn load_collections(&self) -> Result<()> {
301        let mut collections = self.collections.write();
302
303        for entry in std::fs::read_dir(&self.data_dir)? {
304            let entry = entry?;
305            let path = entry.path();
306
307            if path.is_dir() {
308                let config_path = path.join("config.json");
309                if config_path.exists() {
310                    let name = path
311                        .file_name()
312                        .and_then(|n| n.to_str())
313                        .unwrap_or("unknown")
314                        .to_string();
315
316                    if let std::collections::hash_map::Entry::Vacant(entry) =
317                        collections.entry(name)
318                    {
319                        match Collection::open(path) {
320                            Ok(collection) => {
321                                entry.insert(collection);
322                            }
323                            Err(err) => {
324                                tracing::warn!(error = %err, "Failed to load collection");
325                            }
326                        }
327                    }
328                }
329            }
330        }
331
332        Ok(())
333    }
334
335    fn execute_dml(
336        &self,
337        dml: &crate::velesql::DmlStatement,
338        params: &std::collections::HashMap<String, serde_json::Value>,
339    ) -> Result<Vec<SearchResult>> {
340        match dml {
341            crate::velesql::DmlStatement::Insert(stmt) => self.execute_insert(stmt, params),
342            crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
343        }
344    }
345
346    fn execute_insert(
347        &self,
348        stmt: &crate::velesql::InsertStatement,
349        params: &std::collections::HashMap<String, serde_json::Value>,
350    ) -> Result<Vec<SearchResult>> {
351        let collection = self
352            .get_collection(&stmt.table)
353            .ok_or_else(|| Error::CollectionNotFound(stmt.table.clone()))?;
354
355        let mut id: Option<u64> = None;
356        let mut payload = serde_json::Map::new();
357        let mut vector: Option<Vec<f32>> = None;
358
359        for (column, value_expr) in stmt.columns.iter().zip(&stmt.values) {
360            let resolved = Self::resolve_dml_value(value_expr, params)?;
361            if column == "id" {
362                id = Some(Self::json_to_u64_id(&resolved)?);
363                continue;
364            }
365            if column == "vector" {
366                vector = Some(Self::json_to_vector(&resolved)?);
367                continue;
368            }
369            payload.insert(column.clone(), resolved);
370        }
371
372        let point_id =
373            id.ok_or_else(|| Error::Query("INSERT requires integer 'id' column".to_string()))?;
374        let point = if collection.is_metadata_only() {
375            if vector.is_some() {
376                return Err(Error::Query(
377                    "INSERT on metadata-only collection cannot set 'vector'".to_string(),
378                ));
379            }
380            crate::Point::metadata_only(point_id, serde_json::Value::Object(payload))
381        } else {
382            let vec_value = vector.ok_or_else(|| {
383                Error::Query("INSERT on vector collection requires 'vector' column".to_string())
384            })?;
385            crate::Point::new(
386                point_id,
387                vec_value,
388                Some(serde_json::Value::Object(payload)),
389            )
390        };
391
392        collection.upsert(vec![point.clone()])?;
393        Ok(vec![SearchResult::new(point, 0.0)])
394    }
395
396    fn execute_update(
397        &self,
398        stmt: &crate::velesql::UpdateStatement,
399        params: &std::collections::HashMap<String, serde_json::Value>,
400    ) -> Result<Vec<SearchResult>> {
401        let collection = self
402            .get_collection(&stmt.table)
403            .ok_or_else(|| Error::CollectionNotFound(stmt.table.clone()))?;
404
405        let assignments = stmt
406            .assignments
407            .iter()
408            .map(|a| Ok((a.column.clone(), Self::resolve_dml_value(&a.value, params)?)))
409            .collect::<Result<Vec<_>>>()?;
410
411        if assignments.iter().any(|(name, _)| name == "id") {
412            return Err(Error::Query(
413                "UPDATE cannot modify primary key column 'id'".to_string(),
414            ));
415        }
416
417        let all_ids = collection.all_ids();
418        let rows = collection.get(&all_ids);
419        let filter = Self::build_update_filter(stmt.where_clause.as_ref())?;
420
421        let mut updated_points = Vec::new();
422        for point in rows.into_iter().flatten() {
423            if !Self::matches_update_filter(&point, filter.as_ref()) {
424                continue;
425            }
426
427            let mut payload_map = point
428                .payload
429                .as_ref()
430                .and_then(serde_json::Value::as_object)
431                .cloned()
432                .unwrap_or_default();
433
434            let mut updated_vector = point.vector.clone();
435
436            for (field, value) in &assignments {
437                if field == "vector" {
438                    if collection.is_metadata_only() {
439                        return Err(Error::Query(
440                            "UPDATE on metadata-only collection cannot set 'vector'".to_string(),
441                        ));
442                    }
443                    updated_vector = Self::json_to_vector(value)?;
444                } else {
445                    payload_map.insert(field.clone(), value.clone());
446                }
447            }
448
449            let updated = if collection.is_metadata_only() {
450                crate::Point::metadata_only(point.id, serde_json::Value::Object(payload_map))
451            } else {
452                crate::Point::new(
453                    point.id,
454                    updated_vector,
455                    Some(serde_json::Value::Object(payload_map)),
456                )
457            };
458            updated_points.push(updated);
459        }
460
461        if updated_points.is_empty() {
462            return Ok(Vec::new());
463        }
464
465        collection.upsert(updated_points.clone())?;
466        Ok(updated_points
467            .into_iter()
468            .map(|p| SearchResult::new(p, 0.0))
469            .collect())
470    }
471}
472
473#[cfg(feature = "persistence")]
474mod database_helpers;
475
476#[cfg(all(test, feature = "persistence"))]
477mod database_tests;