1use crate::collection::{GraphCollection, MetadataCollection, VectorCollection};
4use crate::observer::DatabaseObserver;
5use crate::simd_dispatch;
6use crate::{
7 Collection, CollectionType, ColumnStore, DistanceMetric, Error, Result, SearchResult,
8 StorageMode,
9};
10
11#[cfg(feature = "persistence")]
23pub struct Database {
24 data_dir: std::path::PathBuf,
26 collections: parking_lot::RwLock<std::collections::HashMap<String, Collection>>,
28 vector_colls: parking_lot::RwLock<std::collections::HashMap<String, VectorCollection>>,
30 graph_colls: parking_lot::RwLock<std::collections::HashMap<String, GraphCollection>>,
32 metadata_colls: parking_lot::RwLock<std::collections::HashMap<String, MetadataCollection>>,
34 collection_stats: parking_lot::RwLock<
36 std::collections::HashMap<String, crate::collection::stats::CollectionStats>,
37 >,
38 observer: Option<std::sync::Arc<dyn DatabaseObserver>>,
40 schema_version: std::sync::atomic::AtomicU64,
45 compiled_plan_cache: crate::cache::CompiledPlanCache,
50}
51
52#[cfg(feature = "persistence")]
53impl Database {
54 fn ensure_collection_name_available(&self, name: &str) -> Result<()> {
59 let exists_in_registry = self.collections.read().contains_key(name)
60 || self.vector_colls.read().contains_key(name)
61 || self.graph_colls.read().contains_key(name)
62 || self.metadata_colls.read().contains_key(name);
63 if exists_in_registry {
64 return Err(Error::CollectionExists(name.to_string()));
65 }
66
67 let collection_path = self.data_dir.join(name);
68 if collection_path.exists() {
69 return Err(Error::CollectionExists(name.to_string()));
70 }
71
72 Ok(())
73 }
74
75 pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
85 Self::open_impl(path, None)
86 }
87
88 pub fn open_with_observer<P: AsRef<std::path::Path>>(
97 path: P,
98 observer: std::sync::Arc<dyn DatabaseObserver>,
99 ) -> Result<Self> {
100 Self::open_impl(path, Some(observer))
101 }
102
103 fn open_impl<P: AsRef<std::path::Path>>(
104 path: P,
105 observer: Option<std::sync::Arc<dyn DatabaseObserver>>,
106 ) -> Result<Self> {
107 let data_dir = path.as_ref().to_path_buf();
108 std::fs::create_dir_all(&data_dir)?;
109
110 let features = simd_dispatch::simd_features_info();
112 tracing::info!(
113 avx512 = features.avx512f,
114 avx2 = features.avx2,
115 "SIMD features detected - direct dispatch enabled"
116 );
117
118 let db = Self {
119 data_dir,
120 collections: parking_lot::RwLock::new(std::collections::HashMap::new()),
121 vector_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
122 graph_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
123 metadata_colls: parking_lot::RwLock::new(std::collections::HashMap::new()),
124 collection_stats: parking_lot::RwLock::new(std::collections::HashMap::new()),
125 observer,
126 schema_version: std::sync::atomic::AtomicU64::new(0),
127 compiled_plan_cache: crate::cache::CompiledPlanCache::new(1_000, 10_000),
128 };
129
130 db.load_collections()?;
132
133 Ok(db)
134 }
135
136 pub fn create_collection(
148 &self,
149 name: &str,
150 dimension: usize,
151 metric: DistanceMetric,
152 ) -> Result<()> {
153 self.create_collection_with_options(name, dimension, metric, StorageMode::default())
154 }
155
156 pub fn create_collection_with_options(
169 &self,
170 name: &str,
171 dimension: usize,
172 metric: DistanceMetric,
173 storage_mode: StorageMode,
174 ) -> Result<()> {
175 self.ensure_collection_name_available(name)?;
176
177 let collection_path = self.data_dir.join(name);
178 let coll =
179 VectorCollection::create(collection_path, name, dimension, metric, storage_mode)?;
180 self.collections
182 .write()
183 .insert(name.to_string(), coll.inner.clone());
184 self.vector_colls.write().insert(name.to_string(), coll);
185
186 self.schema_version
188 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
189
190 Ok(())
191 }
192
193 #[must_use]
195 pub fn data_dir(&self) -> &std::path::Path {
196 &self.data_dir
197 }
198
199 #[must_use]
201 pub fn schema_version(&self) -> u64 {
202 self.schema_version
203 .load(std::sync::atomic::Ordering::Relaxed)
204 }
205
206 #[must_use]
208 pub fn plan_cache(&self) -> &crate::cache::CompiledPlanCache {
209 &self.compiled_plan_cache
210 }
211
212 #[must_use]
216 pub fn collection_write_generation(&self, name: &str) -> Option<u64> {
217 self.collections
218 .read()
219 .get(name)
220 .map(crate::Collection::write_generation)
221 }
222
223 pub fn get_collection(&self, name: &str) -> Option<Collection> {
233 self.collections.read().get(name).cloned()
234 }
235
236 pub fn analyze_collection(
243 &self,
244 name: &str,
245 ) -> Result<crate::collection::stats::CollectionStats> {
246 let collection = self
247 .get_collection(name)
248 .ok_or_else(|| Error::CollectionNotFound(name.to_string()))?;
249 let stats = collection.analyze()?;
250
251 self.collection_stats
252 .write()
253 .insert(name.to_string(), stats.clone());
254
255 let stats_path = self.data_dir.join(name).join("collection.stats.json");
256 let serialized = serde_json::to_vec_pretty(&stats)
257 .map_err(|e| Error::Serialization(format!("failed to serialize stats: {e}")))?;
258 std::fs::write(&stats_path, serialized)?;
259
260 Ok(stats)
261 }
262
263 pub fn get_collection_stats(
270 &self,
271 name: &str,
272 ) -> Result<Option<crate::collection::stats::CollectionStats>> {
273 if let Some(stats) = self.collection_stats.read().get(name).cloned() {
274 return Ok(Some(stats));
275 }
276
277 let stats_path = self.data_dir.join(name).join("collection.stats.json");
278 if !stats_path.exists() {
279 return Ok(None);
280 }
281
282 let bytes = std::fs::read(stats_path)?;
283 let stats: crate::collection::stats::CollectionStats = serde_json::from_slice(&bytes)
284 .map_err(|e| Error::Serialization(format!("failed to parse stats: {e}")))?;
285 self.collection_stats
286 .write()
287 .insert(name.to_string(), stats.clone());
288 Ok(Some(stats))
289 }
290
291 fn canonical_json(value: serde_json::Value) -> serde_json::Value {
301 match value {
302 serde_json::Value::Object(map) => {
303 let sorted: serde_json::Map<String, serde_json::Value> = map
310 .into_iter()
311 .map(|(k, v)| (k, Self::canonical_json(v)))
312 .collect::<std::collections::BTreeMap<_, _>>()
313 .into_iter()
314 .collect();
315 serde_json::Value::Object(sorted)
316 }
317 serde_json::Value::Array(arr) => {
318 serde_json::Value::Array(arr.into_iter().map(Self::canonical_json).collect())
319 }
320 other => other,
321 }
322 }
323
324 #[must_use]
340 pub fn build_plan_key(&self, query: &crate::velesql::Query) -> crate::cache::PlanKey {
341 use std::hash::{BuildHasher, Hasher};
342
343 let query_text = serde_json::to_value(query)
347 .map(Self::canonical_json)
348 .and_then(|v| serde_json::to_string(&v))
349 .unwrap_or_else(|_| format!("{query:?}"));
350
351 let mut hasher = rustc_hash::FxBuildHasher.build_hasher();
352 hasher.write(query_text.as_bytes());
353 let query_hash = hasher.finish();
354
355 let schema_version = self.schema_version();
356
357 let mut collection_names = vec![query.select.from.clone()];
359 for join in &query.select.joins {
360 collection_names.push(join.table.clone());
361 }
362 collection_names.sort();
363 collection_names.dedup();
364
365 let collection_generations: smallvec::SmallVec<[u64; 4]> = collection_names
367 .iter()
368 .map(|name| self.collection_write_generation(name).unwrap_or(0))
369 .collect();
370
371 crate::cache::PlanKey {
372 query_hash,
373 schema_version,
374 collection_generations,
375 }
376 }
377
378 pub fn explain_query(
397 &self,
398 query: &crate::velesql::Query,
399 ) -> Result<crate::velesql::QueryPlan> {
400 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
401
402 let plan_key = self.build_plan_key(query);
403
404 if let Some(cached) = self.compiled_plan_cache.get(&plan_key) {
405 let mut plan = cached.plan.clone();
406 plan.cache_hit = Some(true);
407 plan.plan_reuse_count = Some(
408 cached
409 .reuse_count
410 .load(std::sync::atomic::Ordering::Relaxed),
411 );
412 return Ok(plan);
413 }
414
415 let mut plan = crate::velesql::QueryPlan::from_select(&query.select);
416 plan.cache_hit = Some(false);
417 plan.plan_reuse_count = Some(0);
418 Ok(plan)
419 }
420
421 pub fn execute_query(
431 &self,
432 query: &crate::velesql::Query,
433 params: &std::collections::HashMap<String, serde_json::Value>,
434 ) -> Result<Vec<SearchResult>> {
435 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
436
437 if let Some(train) = query.train.as_ref() {
438 return self.execute_train(train);
439 }
440
441 if let Some(dml) = query.dml.as_ref() {
442 return self.execute_dml(dml, params);
443 }
444
445 if query.is_match_query() {
446 return Err(Error::Query(
447 "Database::execute_query does not support top-level MATCH queries. Use Collection::execute_query or pass the collection name."
448 .to_string(),
449 ));
450 }
451
452 let pre_exec_key = self.build_plan_key(query);
459 let is_cached = self.compiled_plan_cache.contains(&pre_exec_key);
460
461 let base_name = query.select.from.clone();
462 let base_collection = self
466 .get_collection(&base_name)
467 .or_else(|| self.get_vector_collection(&base_name).map(|vc| vc.inner))
468 .ok_or_else(|| Error::CollectionNotFound(base_name.clone()))?;
469
470 let results = if query.select.joins.is_empty() {
471 base_collection.execute_query(query, params)?
472 } else {
473 let mut base_query = query.clone();
474 base_query.select.joins.clear();
475
476 let mut results = base_collection.execute_query(&base_query, params)?;
477 for join in &query.select.joins {
478 let join_collection = self
479 .get_collection(&join.table)
480 .or_else(|| self.get_vector_collection(&join.table).map(|vc| vc.inner))
481 .ok_or_else(|| Error::CollectionNotFound(join.table.clone()))?;
482 let column_store = Self::build_join_column_store(&join_collection)?;
483 let joined = crate::collection::search::query::join::execute_join(
484 &results,
485 join,
486 &column_store,
487 )?;
488 results = crate::collection::search::query::join::joined_to_search_results(joined);
489 }
490 results
491 };
492
493 if !is_cached {
503 let mut collection_names = vec![query.select.from.clone()];
504 for join in &query.select.joins {
505 collection_names.push(join.table.clone());
506 }
507 collection_names.sort();
508 collection_names.dedup();
509
510 let compiled = std::sync::Arc::new(crate::cache::CompiledPlan {
511 plan: crate::velesql::QueryPlan::from_select(&query.select),
512 referenced_collections: collection_names,
513 compiled_at: std::time::Instant::now(),
514 reuse_count: std::sync::atomic::AtomicU64::new(0),
515 });
516 let post_exec_key = self.build_plan_key(query);
518 self.compiled_plan_cache.insert(post_exec_key, compiled);
519 }
520
521 Ok(results)
522 }
523
524 pub fn list_collections(&self) -> Vec<String> {
528 let collections = self.collections.read();
530 let vector_colls = self.vector_colls.read();
531 let graph_colls = self.graph_colls.read();
532 let metadata_colls = self.metadata_colls.read();
533
534 let mut names: std::collections::HashSet<String> = collections.keys().cloned().collect();
535 for k in vector_colls.keys() {
536 names.insert(k.clone());
537 }
538 for k in graph_colls.keys() {
539 names.insert(k.clone());
540 }
541 for k in metadata_colls.keys() {
542 names.insert(k.clone());
543 }
544 let mut result: Vec<String> = names.into_iter().collect();
545 result.sort();
546 result
547 }
548
549 pub fn delete_collection(&self, name: &str) -> Result<()> {
567 let exists = self.collections.read().contains_key(name)
569 || self.vector_colls.read().contains_key(name)
570 || self.graph_colls.read().contains_key(name)
571 || self.metadata_colls.read().contains_key(name);
572
573 if !exists {
574 return Err(Error::CollectionNotFound(name.to_string()));
575 }
576
577 let collection_path = self.data_dir.join(name);
581 if collection_path.exists() {
582 std::fs::remove_dir_all(&collection_path)?;
583 }
584
585 self.collections.write().remove(name);
587 self.vector_colls.write().remove(name);
588 self.graph_colls.write().remove(name);
589 self.metadata_colls.write().remove(name);
590 self.collection_stats.write().remove(name);
591
592 if let Some(ref obs) = self.observer {
593 obs.on_collection_deleted(name);
594 }
595
596 self.schema_version
598 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
599
600 Ok(())
601 }
602
603 pub fn create_vector_collection(
613 &self,
614 name: &str,
615 dimension: usize,
616 metric: DistanceMetric,
617 ) -> Result<()> {
618 self.create_vector_collection_with_options(name, dimension, metric, StorageMode::default())
619 }
620
621 pub fn create_vector_collection_with_options(
627 &self,
628 name: &str,
629 dimension: usize,
630 metric: DistanceMetric,
631 storage_mode: StorageMode,
632 ) -> Result<()> {
633 self.ensure_collection_name_available(name)?;
634 let path = self.data_dir.join(name);
635 let coll = VectorCollection::create(path, name, dimension, metric, storage_mode)?;
636 self.collections
640 .write()
641 .insert(name.to_string(), coll.inner.clone());
642 self.vector_colls.write().insert(name.to_string(), coll);
643
644 if let Some(ref obs) = self.observer {
645 let kind = CollectionType::Vector {
646 dimension,
647 metric,
648 storage_mode,
649 };
650 obs.on_collection_created(name, &kind);
651 }
652
653 self.schema_version
655 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
656
657 Ok(())
658 }
659
660 pub fn create_graph_collection(
666 &self,
667 name: &str,
668 schema: crate::collection::GraphSchema,
669 ) -> Result<()> {
670 self.ensure_collection_name_available(name)?;
671 let path = self.data_dir.join(name);
672 let coll =
673 GraphCollection::create(path, name, None, DistanceMetric::Cosine, schema.clone())?;
674 self.collections
676 .write()
677 .insert(name.to_string(), coll.inner.clone());
678 self.graph_colls.write().insert(name.to_string(), coll);
679
680 if let Some(ref obs) = self.observer {
681 let kind = CollectionType::Graph {
682 dimension: None,
683 metric: DistanceMetric::Cosine,
684 schema,
685 };
686 obs.on_collection_created(name, &kind);
687 }
688
689 self.schema_version
691 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
692
693 Ok(())
694 }
695
696 pub fn create_metadata_collection(&self, name: &str) -> Result<()> {
702 self.ensure_collection_name_available(name)?;
703 let path = self.data_dir.join(name);
704 let coll = MetadataCollection::create(path, name)?;
705 self.collections
708 .write()
709 .insert(name.to_string(), coll.inner.clone());
710 self.metadata_colls.write().insert(name.to_string(), coll);
711
712 if let Some(ref obs) = self.observer {
713 obs.on_collection_created(name, &CollectionType::MetadataOnly);
714 }
715
716 self.schema_version
718 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
719
720 Ok(())
721 }
722
723 pub fn notify_upsert(&self, collection: &str, point_count: usize) {
737 if let Some(ref obs) = self.observer {
738 obs.on_upsert(collection, point_count);
739 }
740 }
741
742 pub fn notify_query(&self, collection: &str, duration_us: u64) {
751 if let Some(ref obs) = self.observer {
752 obs.on_query(collection, duration_us);
753 }
754 }
755
756 #[must_use]
766 pub fn get_vector_collection(&self, name: &str) -> Option<VectorCollection> {
767 if let Some(c) = self.vector_colls.read().get(name).cloned() {
768 return Some(c);
769 }
770 let path = self.data_dir.join(name);
774 let config_path = path.join("config.json");
775 if config_path.exists() {
776 let data = std::fs::read_to_string(&config_path).ok()?;
778 let cfg = serde_json::from_str::<crate::collection::CollectionConfig>(&data).ok()?;
779 if cfg.graph_schema.is_some() || cfg.metadata_only {
780 return None;
781 }
782 if let Ok(coll) = VectorCollection::open(path) {
783 self.vector_colls
784 .write()
785 .insert(name.to_string(), coll.clone());
786 return Some(coll);
787 }
788 }
789 None
790 }
791
792 #[must_use]
801 pub fn get_graph_collection(&self, name: &str) -> Option<GraphCollection> {
802 if let Some(c) = self.graph_colls.read().get(name).cloned() {
803 return Some(c);
804 }
805 let path = self.data_dir.join(name);
808 let config_path = path.join("config.json");
809 if config_path.exists() {
810 let data = std::fs::read_to_string(&config_path).ok()?;
811 let cfg = serde_json::from_str::<crate::collection::CollectionConfig>(&data).ok()?;
812 cfg.graph_schema.as_ref()?;
813 if let Ok(coll) = GraphCollection::open(path) {
814 self.graph_colls
815 .write()
816 .insert(name.to_string(), coll.clone());
817 return Some(coll);
818 }
819 }
820 None
821 }
822
823 #[must_use]
831 pub fn get_metadata_collection(&self, name: &str) -> Option<MetadataCollection> {
832 if let Some(c) = self.metadata_colls.read().get(name).cloned() {
833 return Some(c);
834 }
835 let path = self.data_dir.join(name);
838 let config_path = path.join("config.json");
839 if config_path.exists() {
840 let data = std::fs::read_to_string(&config_path).ok()?;
841 let cfg = serde_json::from_str::<crate::collection::CollectionConfig>(&data).ok()?;
842 if !cfg.metadata_only {
843 return None;
844 }
845 if let Ok(coll) = MetadataCollection::open(path) {
846 self.metadata_colls
847 .write()
848 .insert(name.to_string(), coll.clone());
849 return Some(coll);
850 }
851 }
852 None
853 }
854
855 pub fn create_collection_typed(
888 &self,
889 name: &str,
890 collection_type: &CollectionType,
891 ) -> Result<()> {
892 match collection_type {
894 CollectionType::Vector {
895 dimension,
896 metric,
897 storage_mode,
898 } => {
899 self.create_vector_collection_with_options(name, *dimension, *metric, *storage_mode)
900 }
901 CollectionType::MetadataOnly => self.create_metadata_collection(name),
902 CollectionType::Graph {
903 dimension,
904 metric,
905 schema,
906 } => {
907 self.ensure_collection_name_available(name)?;
909 let path = self.data_dir.join(name);
910 let coll =
911 GraphCollection::create(path, name, *dimension, *metric, schema.clone())?;
912 self.collections
913 .write()
914 .insert(name.to_string(), coll.inner.clone());
915 self.graph_colls.write().insert(name.to_string(), coll);
916 if let Some(ref obs) = self.observer {
917 obs.on_collection_created(name, collection_type);
918 }
919 self.schema_version
921 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
922 Ok(())
923 }
924 }
925 }
926
927 pub fn load_collections(&self) -> Result<()> {
939 let mut loaded_count: usize = 0;
940
941 for entry in std::fs::read_dir(&self.data_dir)? {
942 let entry = entry?;
943 let path = entry.path();
944
945 if !path.is_dir() {
946 continue;
947 }
948 let config_path = path.join("config.json");
949 if !config_path.exists() {
950 continue;
951 }
952
953 let name = path
954 .file_name()
955 .and_then(|n| n.to_str())
956 .unwrap_or("unknown")
957 .to_string();
958
959 if self.collections.read().contains_key(&name) {
961 continue;
962 }
963
964 let cfg_data = match std::fs::read_to_string(&config_path) {
966 Ok(d) => d,
967 Err(e) => {
968 tracing::warn!(error = %e, name, "Cannot read config.json — skipping");
969 continue;
970 }
971 };
972 let cfg = match serde_json::from_str::<crate::collection::CollectionConfig>(&cfg_data) {
973 Ok(c) => c,
974 Err(e) => {
975 tracing::warn!(error = %e, name, "Cannot parse config.json — skipping");
976 continue;
977 }
978 };
979
980 if cfg.graph_schema.is_some() {
981 match GraphCollection::open(path.clone()) {
983 Ok(coll) => {
984 self.collections
985 .write()
986 .insert(name.clone(), coll.inner.clone());
987 self.graph_colls.write().insert(name, coll);
988 loaded_count += 1;
989 }
990 Err(e) => {
991 tracing::warn!(error = %e, name = %path.display(), "Failed to load graph collection");
992 }
993 }
994 } else if cfg.metadata_only {
995 match MetadataCollection::open(path.clone()) {
997 Ok(coll) => {
998 self.collections
999 .write()
1000 .insert(name.clone(), coll.inner.clone());
1001 self.metadata_colls.write().insert(name, coll);
1002 loaded_count += 1;
1003 }
1004 Err(e) => {
1005 tracing::warn!(error = %e, name = %path.display(), "Failed to load metadata collection");
1006 }
1007 }
1008 } else {
1009 match VectorCollection::open(path.clone()) {
1011 Ok(coll) => {
1012 self.collections
1013 .write()
1014 .insert(name.clone(), coll.inner.clone());
1015 self.vector_colls.write().insert(name, coll);
1016 loaded_count += 1;
1017 }
1018 Err(e) => {
1019 tracing::warn!(error = %e, name = %path.display(), "Failed to load vector collection");
1020 }
1021 }
1022 }
1023 }
1024
1025 if loaded_count > 0 {
1032 self.schema_version
1033 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1034 }
1035
1036 Ok(())
1037 }
1038
1039 fn execute_dml(
1040 &self,
1041 dml: &crate::velesql::DmlStatement,
1042 params: &std::collections::HashMap<String, serde_json::Value>,
1043 ) -> Result<Vec<SearchResult>> {
1044 match dml {
1045 crate::velesql::DmlStatement::Insert(stmt) => self.execute_insert(stmt, params),
1046 crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
1047 }
1048 }
1049
1050 fn execute_insert(
1051 &self,
1052 stmt: &crate::velesql::InsertStatement,
1053 params: &std::collections::HashMap<String, serde_json::Value>,
1054 ) -> Result<Vec<SearchResult>> {
1055 let collection = self
1056 .get_collection(&stmt.table)
1057 .or_else(|| self.get_vector_collection(&stmt.table).map(|vc| vc.inner))
1058 .ok_or_else(|| Error::CollectionNotFound(stmt.table.clone()))?;
1059
1060 let mut id: Option<u64> = None;
1061 let mut payload = serde_json::Map::new();
1062 let mut vector: Option<Vec<f32>> = None;
1063
1064 for (column, value_expr) in stmt.columns.iter().zip(&stmt.values) {
1065 let resolved = Self::resolve_dml_value(value_expr, params)?;
1066 if column == "id" {
1067 id = Some(Self::json_to_u64_id(&resolved)?);
1068 continue;
1069 }
1070 if column == "vector" {
1071 vector = Some(Self::json_to_vector(&resolved)?);
1072 continue;
1073 }
1074 payload.insert(column.clone(), resolved);
1075 }
1076
1077 let point_id =
1078 id.ok_or_else(|| Error::Query("INSERT requires integer 'id' column".to_string()))?;
1079 let point = if collection.is_metadata_only() {
1080 if vector.is_some() {
1081 return Err(Error::Query(
1082 "INSERT on metadata-only collection cannot set 'vector'".to_string(),
1083 ));
1084 }
1085 crate::Point::metadata_only(point_id, serde_json::Value::Object(payload))
1086 } else {
1087 let vec_value = vector.ok_or_else(|| {
1088 Error::Query("INSERT on vector collection requires 'vector' column".to_string())
1089 })?;
1090 crate::Point::new(
1091 point_id,
1092 vec_value,
1093 Some(serde_json::Value::Object(payload)),
1094 )
1095 };
1096
1097 let result = SearchResult::new(point.clone(), 0.0);
1098 collection.upsert(vec![point])?;
1099 Ok(vec![result])
1100 }
1101
1102 fn execute_update(
1103 &self,
1104 stmt: &crate::velesql::UpdateStatement,
1105 params: &std::collections::HashMap<String, serde_json::Value>,
1106 ) -> Result<Vec<SearchResult>> {
1107 let collection = self
1108 .get_collection(&stmt.table)
1109 .or_else(|| self.get_vector_collection(&stmt.table).map(|vc| vc.inner))
1110 .ok_or_else(|| Error::CollectionNotFound(stmt.table.clone()))?;
1111
1112 let assignments = stmt
1113 .assignments
1114 .iter()
1115 .map(|a| Ok((a.column.clone(), Self::resolve_dml_value(&a.value, params)?)))
1116 .collect::<Result<Vec<_>>>()?;
1117
1118 if assignments.iter().any(|(name, _)| name == "id") {
1119 return Err(Error::Query(
1120 "UPDATE cannot modify primary key column 'id'".to_string(),
1121 ));
1122 }
1123
1124 let all_ids = collection.all_ids();
1125 let rows = collection.get(&all_ids);
1126 let filter = Self::build_update_filter(stmt.where_clause.as_ref())?;
1127
1128 let mut updated_points = Vec::new();
1129 for point in rows.into_iter().flatten() {
1130 if !Self::matches_update_filter(&point, filter.as_ref()) {
1131 continue;
1132 }
1133
1134 let mut payload_map = point
1135 .payload
1136 .as_ref()
1137 .and_then(serde_json::Value::as_object)
1138 .cloned()
1139 .unwrap_or_default();
1140
1141 let mut updated_vector = point.vector.clone();
1142
1143 for (field, value) in &assignments {
1144 if field == "vector" {
1145 if collection.is_metadata_only() {
1146 return Err(Error::Query(
1147 "UPDATE on metadata-only collection cannot set 'vector'".to_string(),
1148 ));
1149 }
1150 updated_vector = Self::json_to_vector(value)?;
1151 } else {
1152 payload_map.insert(field.clone(), value.clone());
1153 }
1154 }
1155
1156 let updated = if collection.is_metadata_only() {
1157 crate::Point::metadata_only(point.id, serde_json::Value::Object(payload_map))
1158 } else {
1159 crate::Point::new(
1160 point.id,
1161 updated_vector,
1162 Some(serde_json::Value::Object(payload_map)),
1163 )
1164 };
1165 updated_points.push(updated);
1166 }
1167
1168 if updated_points.is_empty() {
1169 return Ok(Vec::new());
1170 }
1171
1172 let results = updated_points
1173 .iter()
1174 .map(|p| SearchResult::new(p.clone(), 0.0))
1175 .collect();
1176 collection.upsert(updated_points)?;
1177 Ok(results)
1178 }
1179
1180 #[allow(
1196 clippy::too_many_lines,
1197 clippy::cast_possible_truncation,
1198 clippy::cast_sign_loss
1199 )]
1200 fn execute_train(&self, stmt: &crate::velesql::TrainStatement) -> Result<Vec<SearchResult>> {
1201 use crate::velesql::WithValue;
1202
1203 let collection = self
1205 .get_collection(&stmt.collection)
1206 .or_else(|| {
1207 self.get_vector_collection(&stmt.collection)
1208 .map(|vc| vc.inner)
1209 })
1210 .ok_or_else(|| Error::CollectionNotFound(stmt.collection.clone()))?;
1211
1212 let m = stmt
1214 .params
1215 .get("m")
1216 .and_then(WithValue::as_integer)
1217 .map_or(0_usize, |v| v.max(0) as usize);
1218 let k = stmt
1219 .params
1220 .get("k")
1221 .and_then(WithValue::as_integer)
1222 .map_or(256_usize, |v| v.max(0) as usize);
1223 let train_type = stmt
1224 .params
1225 .get("type")
1226 .and_then(WithValue::as_str)
1227 .unwrap_or("pq")
1228 .to_lowercase();
1229 let oversampling = stmt
1230 .params
1231 .get("oversampling")
1232 .and_then(WithValue::as_integer)
1233 .map_or(4_u32, |v| v.max(0) as u32);
1234 let sample_limit = stmt
1235 .params
1236 .get("sample")
1237 .and_then(WithValue::as_integer)
1238 .map(|v| v.max(0) as usize);
1239 let force = stmt
1240 .params
1241 .get("force")
1242 .and_then(WithValue::as_bool)
1243 .unwrap_or(false);
1244
1245 if m == 0 {
1247 return Err(Error::InvalidQuantizerConfig(
1248 "m (num_subspaces) must be > 0".into(),
1249 ));
1250 }
1251 if k == 0 {
1252 return Err(Error::InvalidQuantizerConfig(
1253 "k (num_centroids) must be > 0".into(),
1254 ));
1255 }
1256
1257 let config = collection.config();
1259 let dim = config.dimension;
1260
1261 if train_type != "rabitq" && dim % m != 0 {
1263 return Err(Error::InvalidQuantizerConfig(format!(
1264 "dimension {dim} is not divisible by m={m}"
1265 )));
1266 }
1267
1268 {
1270 let quantizer = collection.pq_quantizer_read();
1271 if quantizer.is_some() && !force {
1272 return Err(Error::InvalidQuantizerConfig(
1273 "Quantizer already trained. Use force=true to retrain.".into(),
1274 ));
1275 }
1276 }
1277
1278 let all_ids = collection.all_ids();
1280 let points = collection.get(&all_ids);
1281 let mut vectors: Vec<Vec<f32>> = points
1282 .into_iter()
1283 .flatten()
1284 .filter(|p| !p.vector.is_empty())
1285 .map(|p| p.vector)
1286 .collect();
1287
1288 if let Some(limit) = sample_limit {
1290 if vectors.len() > limit {
1291 vectors.truncate(limit);
1292 }
1293 }
1294
1295 if vectors.is_empty() {
1296 return Err(Error::TrainingFailed(
1297 "no vectors available for training".into(),
1298 ));
1299 }
1300
1301 match train_type.as_str() {
1303 "pq" => {
1304 let pq = crate::quantization::ProductQuantizer::train(&vectors, m, k)
1305 .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1306
1307 let codebook_size = pq.codebook.num_subspaces * pq.codebook.num_centroids;
1308 let n_train = vectors.len();
1309
1310 pq.save_codebook(collection.data_path())
1312 .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1313
1314 *collection.pq_quantizer_write() = Some(pq);
1316
1317 {
1319 let mut cfg = collection.config_write();
1320 cfg.storage_mode = StorageMode::ProductQuantization;
1321 cfg.pq_rescore_oversampling = Some(oversampling);
1322 }
1323 collection.save_config()?;
1324
1325 Ok(vec![SearchResult::new(
1326 crate::Point::metadata_only(
1327 0,
1328 serde_json::json!({
1329 "status": "trained",
1330 "type": "pq",
1331 "m": m,
1332 "k": k,
1333 "codebook_size": codebook_size,
1334 "training_vectors": n_train
1335 }),
1336 ),
1337 0.0,
1338 )])
1339 }
1340 "opq" => {
1341 let pq = crate::quantization::train_opq(&vectors, m, k, true, 10)
1342 .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1343
1344 let codebook_size = pq.codebook.num_subspaces * pq.codebook.num_centroids;
1345 let n_train = vectors.len();
1346
1347 pq.save_codebook(collection.data_path())
1349 .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1350 pq.save_rotation(collection.data_path())
1351 .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1352
1353 *collection.pq_quantizer_write() = Some(pq);
1354
1355 {
1356 let mut cfg = collection.config_write();
1357 cfg.storage_mode = StorageMode::ProductQuantization;
1358 cfg.pq_rescore_oversampling = Some(oversampling);
1359 }
1360 collection.save_config()?;
1361
1362 Ok(vec![SearchResult::new(
1363 crate::Point::metadata_only(
1364 0,
1365 serde_json::json!({
1366 "status": "trained",
1367 "type": "opq",
1368 "m": m,
1369 "k": k,
1370 "codebook_size": codebook_size,
1371 "training_vectors": n_train
1372 }),
1373 ),
1374 0.0,
1375 )])
1376 }
1377 "rabitq" => {
1378 let rbq = crate::quantization::RaBitQIndex::train(&vectors, 42)
1379 .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1380
1381 rbq.save(collection.data_path())
1383 .map_err(|e| Error::TrainingFailed(e.to_string()))?;
1384
1385 {
1387 let mut cfg = collection.config_write();
1388 cfg.storage_mode = StorageMode::RaBitQ;
1389 cfg.pq_rescore_oversampling = Some(oversampling);
1390 }
1391 collection.save_config()?;
1392
1393 Ok(vec![SearchResult::new(
1394 crate::Point::metadata_only(
1395 0,
1396 serde_json::json!({
1397 "status": "trained",
1398 "type": "rabitq",
1399 "dimension": dim,
1400 "training_vectors": vectors.len()
1401 }),
1402 ),
1403 0.0,
1404 )])
1405 }
1406 other => Err(Error::InvalidQuantizerConfig(format!(
1407 "unknown quantizer type: '{other}'. Supported: pq, opq, rabitq"
1408 ))),
1409 }
1410 }
1411}
1412
1413#[cfg(feature = "persistence")]
1414mod database_helpers;
1415
1416#[cfg(all(test, feature = "persistence"))]
1417mod database_tests;