1use crate::simd_dispatch;
4use crate::{
5 Collection, CollectionType, ColumnStore, DistanceMetric, Error, Result, SearchResult,
6 StorageMode,
7};
8
9#[cfg(feature = "persistence")]
11pub struct Database {
12 data_dir: std::path::PathBuf,
14 collections: parking_lot::RwLock<std::collections::HashMap<String, Collection>>,
16 collection_stats: parking_lot::RwLock<
18 std::collections::HashMap<String, crate::collection::stats::CollectionStats>,
19 >,
20}
21
22#[cfg(feature = "persistence")]
23impl Database {
24 pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
34 let data_dir = path.as_ref().to_path_buf();
35 std::fs::create_dir_all(&data_dir)?;
36
37 let features = simd_dispatch::simd_features_info();
39 tracing::info!(
40 avx512 = features.avx512f,
41 avx2 = features.avx2,
42 "SIMD features detected - direct dispatch enabled"
43 );
44
45 Ok(Self {
46 data_dir,
47 collections: parking_lot::RwLock::new(std::collections::HashMap::new()),
48 collection_stats: parking_lot::RwLock::new(std::collections::HashMap::new()),
49 })
50 }
51
52 pub fn create_collection(
64 &self,
65 name: &str,
66 dimension: usize,
67 metric: DistanceMetric,
68 ) -> Result<()> {
69 self.create_collection_with_options(name, dimension, metric, StorageMode::default())
70 }
71
72 pub fn create_collection_with_options(
85 &self,
86 name: &str,
87 dimension: usize,
88 metric: DistanceMetric,
89 storage_mode: StorageMode,
90 ) -> Result<()> {
91 let mut collections = self.collections.write();
92
93 if collections.contains_key(name) {
94 return Err(Error::CollectionExists(name.to_string()));
95 }
96
97 let collection_path = self.data_dir.join(name);
98 let collection =
99 Collection::create_with_options(collection_path, dimension, metric, storage_mode)?;
100 collections.insert(name.to_string(), collection);
101
102 Ok(())
103 }
104
105 pub fn get_collection(&self, name: &str) -> Option<Collection> {
115 self.collections.read().get(name).cloned()
116 }
117
118 pub fn analyze_collection(
120 &self,
121 name: &str,
122 ) -> Result<crate::collection::stats::CollectionStats> {
123 let collection = self
124 .get_collection(name)
125 .ok_or_else(|| Error::CollectionNotFound(name.to_string()))?;
126 let stats = collection.analyze()?;
127
128 self.collection_stats
129 .write()
130 .insert(name.to_string(), stats.clone());
131
132 let stats_path = self.data_dir.join(name).join("collection.stats.json");
133 let serialized = serde_json::to_vec_pretty(&stats)
134 .map_err(|e| Error::Serialization(format!("failed to serialize stats: {e}")))?;
135 std::fs::write(&stats_path, serialized)?;
136
137 Ok(stats)
138 }
139
140 pub fn get_collection_stats(
142 &self,
143 name: &str,
144 ) -> Result<Option<crate::collection::stats::CollectionStats>> {
145 if let Some(stats) = self.collection_stats.read().get(name).cloned() {
146 return Ok(Some(stats));
147 }
148
149 let stats_path = self.data_dir.join(name).join("collection.stats.json");
150 if !stats_path.exists() {
151 return Ok(None);
152 }
153
154 let bytes = std::fs::read(stats_path)?;
155 let stats: crate::collection::stats::CollectionStats = serde_json::from_slice(&bytes)
156 .map_err(|e| Error::Serialization(format!("failed to parse stats: {e}")))?;
157 self.collection_stats
158 .write()
159 .insert(name.to_string(), stats.clone());
160 Ok(Some(stats))
161 }
162
163 pub fn execute_query(
172 &self,
173 query: &crate::velesql::Query,
174 params: &std::collections::HashMap<String, serde_json::Value>,
175 ) -> Result<Vec<SearchResult>> {
176 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
177
178 if let Some(dml) = query.dml.as_ref() {
179 return self.execute_dml(dml, params);
180 }
181
182 if query.is_match_query() {
183 return Err(Error::Query(
184 "Database::execute_query does not support top-level MATCH queries. Use Collection::execute_query or pass the collection name."
185 .to_string(),
186 ));
187 }
188
189 let base_name = query.select.from.clone();
190 let base_collection = self
191 .get_collection(&base_name)
192 .ok_or_else(|| Error::CollectionNotFound(base_name.clone()))?;
193
194 if query.select.joins.is_empty() {
195 return base_collection.execute_query(query, params);
196 }
197
198 let mut base_query = query.clone();
199 base_query.select.joins.clear();
200
201 let mut results = base_collection.execute_query(&base_query, params)?;
202 for join in &query.select.joins {
203 let join_collection = self
204 .get_collection(&join.table)
205 .ok_or_else(|| Error::CollectionNotFound(join.table.clone()))?;
206 let column_store = Self::build_join_column_store(&join_collection)?;
207 let joined = crate::collection::search::query::join::execute_join(
208 &results,
209 join,
210 &column_store,
211 )?;
212 results = crate::collection::search::query::join::joined_to_search_results(joined);
213 }
214
215 Ok(results)
216 }
217
218 pub fn list_collections(&self) -> Vec<String> {
220 self.collections.read().keys().cloned().collect()
221 }
222
223 pub fn delete_collection(&self, name: &str) -> Result<()> {
233 let mut collections = self.collections.write();
234
235 if collections.remove(name).is_none() {
236 return Err(Error::CollectionNotFound(name.to_string()));
237 }
238
239 let collection_path = self.data_dir.join(name);
240 if collection_path.exists() {
241 std::fs::remove_dir_all(collection_path)?;
242 }
243
244 Ok(())
245 }
246
247 pub fn create_collection_typed(
276 &self,
277 name: &str,
278 collection_type: &CollectionType,
279 ) -> Result<()> {
280 let mut collections = self.collections.write();
281
282 if collections.contains_key(name) {
283 return Err(Error::CollectionExists(name.to_string()));
284 }
285
286 let collection_path = self.data_dir.join(name);
287 let collection = Collection::create_typed(collection_path, name, collection_type)?;
288 collections.insert(name.to_string(), collection);
289
290 Ok(())
291 }
292
293 pub fn load_collections(&self) -> Result<()> {
301 let mut collections = self.collections.write();
302
303 for entry in std::fs::read_dir(&self.data_dir)? {
304 let entry = entry?;
305 let path = entry.path();
306
307 if path.is_dir() {
308 let config_path = path.join("config.json");
309 if config_path.exists() {
310 let name = path
311 .file_name()
312 .and_then(|n| n.to_str())
313 .unwrap_or("unknown")
314 .to_string();
315
316 if let std::collections::hash_map::Entry::Vacant(entry) =
317 collections.entry(name)
318 {
319 match Collection::open(path) {
320 Ok(collection) => {
321 entry.insert(collection);
322 }
323 Err(err) => {
324 tracing::warn!(error = %err, "Failed to load collection");
325 }
326 }
327 }
328 }
329 }
330 }
331
332 Ok(())
333 }
334
335 fn execute_dml(
336 &self,
337 dml: &crate::velesql::DmlStatement,
338 params: &std::collections::HashMap<String, serde_json::Value>,
339 ) -> Result<Vec<SearchResult>> {
340 match dml {
341 crate::velesql::DmlStatement::Insert(stmt) => self.execute_insert(stmt, params),
342 crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
343 }
344 }
345
346 fn execute_insert(
347 &self,
348 stmt: &crate::velesql::InsertStatement,
349 params: &std::collections::HashMap<String, serde_json::Value>,
350 ) -> Result<Vec<SearchResult>> {
351 let collection = self
352 .get_collection(&stmt.table)
353 .ok_or_else(|| Error::CollectionNotFound(stmt.table.clone()))?;
354
355 let mut id: Option<u64> = None;
356 let mut payload = serde_json::Map::new();
357 let mut vector: Option<Vec<f32>> = None;
358
359 for (column, value_expr) in stmt.columns.iter().zip(&stmt.values) {
360 let resolved = Self::resolve_dml_value(value_expr, params)?;
361 if column == "id" {
362 id = Some(Self::json_to_u64_id(&resolved)?);
363 continue;
364 }
365 if column == "vector" {
366 vector = Some(Self::json_to_vector(&resolved)?);
367 continue;
368 }
369 payload.insert(column.clone(), resolved);
370 }
371
372 let point_id =
373 id.ok_or_else(|| Error::Query("INSERT requires integer 'id' column".to_string()))?;
374 let point = if collection.is_metadata_only() {
375 if vector.is_some() {
376 return Err(Error::Query(
377 "INSERT on metadata-only collection cannot set 'vector'".to_string(),
378 ));
379 }
380 crate::Point::metadata_only(point_id, serde_json::Value::Object(payload))
381 } else {
382 let vec_value = vector.ok_or_else(|| {
383 Error::Query("INSERT on vector collection requires 'vector' column".to_string())
384 })?;
385 crate::Point::new(
386 point_id,
387 vec_value,
388 Some(serde_json::Value::Object(payload)),
389 )
390 };
391
392 collection.upsert(vec![point.clone()])?;
393 Ok(vec![SearchResult::new(point, 0.0)])
394 }
395
396 fn execute_update(
397 &self,
398 stmt: &crate::velesql::UpdateStatement,
399 params: &std::collections::HashMap<String, serde_json::Value>,
400 ) -> Result<Vec<SearchResult>> {
401 let collection = self
402 .get_collection(&stmt.table)
403 .ok_or_else(|| Error::CollectionNotFound(stmt.table.clone()))?;
404
405 let assignments = stmt
406 .assignments
407 .iter()
408 .map(|a| Ok((a.column.clone(), Self::resolve_dml_value(&a.value, params)?)))
409 .collect::<Result<Vec<_>>>()?;
410
411 if assignments.iter().any(|(name, _)| name == "id") {
412 return Err(Error::Query(
413 "UPDATE cannot modify primary key column 'id'".to_string(),
414 ));
415 }
416
417 let all_ids = collection.all_ids();
418 let rows = collection.get(&all_ids);
419 let filter = Self::build_update_filter(stmt.where_clause.as_ref())?;
420
421 let mut updated_points = Vec::new();
422 for point in rows.into_iter().flatten() {
423 if !Self::matches_update_filter(&point, filter.as_ref()) {
424 continue;
425 }
426
427 let mut payload_map = point
428 .payload
429 .as_ref()
430 .and_then(serde_json::Value::as_object)
431 .cloned()
432 .unwrap_or_default();
433
434 let mut updated_vector = point.vector.clone();
435
436 for (field, value) in &assignments {
437 if field == "vector" {
438 if collection.is_metadata_only() {
439 return Err(Error::Query(
440 "UPDATE on metadata-only collection cannot set 'vector'".to_string(),
441 ));
442 }
443 updated_vector = Self::json_to_vector(value)?;
444 } else {
445 payload_map.insert(field.clone(), value.clone());
446 }
447 }
448
449 let updated = if collection.is_metadata_only() {
450 crate::Point::metadata_only(point.id, serde_json::Value::Object(payload_map))
451 } else {
452 crate::Point::new(
453 point.id,
454 updated_vector,
455 Some(serde_json::Value::Object(payload_map)),
456 )
457 };
458 updated_points.push(updated);
459 }
460
461 if updated_points.is_empty() {
462 return Ok(Vec::new());
463 }
464
465 collection.upsert(updated_points.clone())?;
466 Ok(updated_points
467 .into_iter()
468 .map(|p| SearchResult::new(p, 0.0))
469 .collect())
470 }
471}
472
473#[cfg(feature = "persistence")]
474mod database_helpers;
475
476#[cfg(all(test, feature = "persistence"))]
477mod database_tests;