1use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::AtomicUsize;
8
9use parking_lot::RwLock;
10
11#[cfg(feature = "wal")]
12use grafeo_adapters::storage::wal::{
13 DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
14};
15use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
16use grafeo_common::types::{EdgeId, NodeId, Value};
17use grafeo_common::utils::error::{Error, Result};
18use grafeo_core::graph::lpg::LpgStore;
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::config::Config;
23use crate::query::cache::QueryCache;
24use crate::session::Session;
25use crate::transaction::TransactionManager;
26
27pub struct GrafeoDB {
50 config: Config,
52 store: Arc<LpgStore>,
54 #[cfg(feature = "rdf")]
56 rdf_store: Arc<RdfStore>,
57 tx_manager: Arc<TransactionManager>,
59 buffer_manager: Arc<BufferManager>,
61 #[cfg(feature = "wal")]
63 wal: Option<Arc<WalManager>>,
64 query_cache: Arc<QueryCache>,
66 commit_counter: Arc<AtomicUsize>,
68 is_open: RwLock<bool>,
70}
71
72impl GrafeoDB {
73 #[must_use]
89 pub fn new_in_memory() -> Self {
90 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
91 }
92
93 #[cfg(feature = "wal")]
112 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
113 Self::with_config(Config::persistent(path.as_ref()))
114 }
115
116 pub fn with_config(config: Config) -> Result<Self> {
140 config
142 .validate()
143 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
144
145 let store = Arc::new(LpgStore::new());
146 #[cfg(feature = "rdf")]
147 let rdf_store = Arc::new(RdfStore::new());
148 let tx_manager = Arc::new(TransactionManager::new());
149
150 let buffer_config = BufferManagerConfig {
152 budget: config.memory_limit.unwrap_or_else(|| {
153 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
154 }),
155 spill_path: config
156 .spill_path
157 .clone()
158 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
159 ..BufferManagerConfig::default()
160 };
161 let buffer_manager = BufferManager::new(buffer_config);
162
163 #[cfg(feature = "wal")]
165 let wal = if config.wal_enabled {
166 if let Some(ref db_path) = config.path {
167 std::fs::create_dir_all(db_path)?;
169
170 let wal_path = db_path.join("wal");
171
172 if wal_path.exists() {
174 let recovery = WalRecovery::new(&wal_path);
175 let records = recovery.recover()?;
176 Self::apply_wal_records(&store, &records)?;
177 }
178
179 let wal_durability = match config.wal_durability {
181 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
182 crate::config::DurabilityMode::Batch {
183 max_delay_ms,
184 max_records,
185 } => WalDurabilityMode::Batch {
186 max_delay_ms,
187 max_records,
188 },
189 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
190 WalDurabilityMode::Adaptive { target_interval_ms }
191 }
192 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
193 };
194 let wal_config = WalConfig {
195 durability: wal_durability,
196 ..WalConfig::default()
197 };
198 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
199 Some(Arc::new(wal_manager))
200 } else {
201 None
202 }
203 } else {
204 None
205 };
206
207 let query_cache = Arc::new(QueryCache::default());
209
210 Ok(Self {
211 config,
212 store,
213 #[cfg(feature = "rdf")]
214 rdf_store,
215 tx_manager,
216 buffer_manager,
217 #[cfg(feature = "wal")]
218 wal,
219 query_cache,
220 commit_counter: Arc::new(AtomicUsize::new(0)),
221 is_open: RwLock::new(true),
222 })
223 }
224
225 #[cfg(feature = "wal")]
227 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
228 for record in records {
229 match record {
230 WalRecord::CreateNode { id, labels } => {
231 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
232 store.create_node_with_id(*id, &label_refs);
233 }
234 WalRecord::DeleteNode { id } => {
235 store.delete_node(*id);
236 }
237 WalRecord::CreateEdge {
238 id,
239 src,
240 dst,
241 edge_type,
242 } => {
243 store.create_edge_with_id(*id, *src, *dst, edge_type);
244 }
245 WalRecord::DeleteEdge { id } => {
246 store.delete_edge(*id);
247 }
248 WalRecord::SetNodeProperty { id, key, value } => {
249 store.set_node_property(*id, key, value.clone());
250 }
251 WalRecord::SetEdgeProperty { id, key, value } => {
252 store.set_edge_property(*id, key, value.clone());
253 }
254 WalRecord::AddNodeLabel { id, label } => {
255 store.add_label(*id, label);
256 }
257 WalRecord::RemoveNodeLabel { id, label } => {
258 store.remove_label(*id, label);
259 }
260 WalRecord::TxCommit { .. }
261 | WalRecord::TxAbort { .. }
262 | WalRecord::Checkpoint { .. } => {
263 }
266 }
267 }
268 Ok(())
269 }
270
271 #[must_use]
290 pub fn session(&self) -> Session {
291 #[cfg(feature = "rdf")]
292 {
293 Session::with_rdf_store_and_adaptive(
294 Arc::clone(&self.store),
295 Arc::clone(&self.rdf_store),
296 Arc::clone(&self.tx_manager),
297 Arc::clone(&self.query_cache),
298 self.config.adaptive.clone(),
299 self.config.factorized_execution,
300 self.config.graph_model,
301 self.config.query_timeout,
302 Arc::clone(&self.commit_counter),
303 self.config.gc_interval,
304 )
305 }
306 #[cfg(not(feature = "rdf"))]
307 {
308 Session::with_adaptive(
309 Arc::clone(&self.store),
310 Arc::clone(&self.tx_manager),
311 Arc::clone(&self.query_cache),
312 self.config.adaptive.clone(),
313 self.config.factorized_execution,
314 self.config.graph_model,
315 self.config.query_timeout,
316 Arc::clone(&self.commit_counter),
317 self.config.gc_interval,
318 )
319 }
320 }
321
322 #[must_use]
324 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
325 &self.config.adaptive
326 }
327
328 pub fn execute(&self, query: &str) -> Result<QueryResult> {
338 let session = self.session();
339 session.execute(query)
340 }
341
342 pub fn execute_with_params(
348 &self,
349 query: &str,
350 params: std::collections::HashMap<String, grafeo_common::types::Value>,
351 ) -> Result<QueryResult> {
352 let session = self.session();
353 session.execute_with_params(query, params)
354 }
355
356 #[cfg(feature = "cypher")]
362 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
363 let session = self.session();
364 session.execute_cypher(query)
365 }
366
367 #[cfg(feature = "cypher")]
373 pub fn execute_cypher_with_params(
374 &self,
375 query: &str,
376 params: std::collections::HashMap<String, grafeo_common::types::Value>,
377 ) -> Result<QueryResult> {
378 use crate::query::processor::{QueryLanguage, QueryProcessor};
379
380 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
382 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
383 }
384
385 #[cfg(feature = "gremlin")]
391 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
392 let session = self.session();
393 session.execute_gremlin(query)
394 }
395
396 #[cfg(feature = "gremlin")]
402 pub fn execute_gremlin_with_params(
403 &self,
404 query: &str,
405 params: std::collections::HashMap<String, grafeo_common::types::Value>,
406 ) -> Result<QueryResult> {
407 let session = self.session();
408 session.execute_gremlin_with_params(query, params)
409 }
410
411 #[cfg(feature = "graphql")]
417 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
418 let session = self.session();
419 session.execute_graphql(query)
420 }
421
422 #[cfg(feature = "graphql")]
428 pub fn execute_graphql_with_params(
429 &self,
430 query: &str,
431 params: std::collections::HashMap<String, grafeo_common::types::Value>,
432 ) -> Result<QueryResult> {
433 let session = self.session();
434 session.execute_graphql_with_params(query, params)
435 }
436
437 #[cfg(feature = "sql-pgq")]
443 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
444 let session = self.session();
445 session.execute_sql(query)
446 }
447
448 #[cfg(feature = "sql-pgq")]
454 pub fn execute_sql_with_params(
455 &self,
456 query: &str,
457 params: std::collections::HashMap<String, grafeo_common::types::Value>,
458 ) -> Result<QueryResult> {
459 use crate::query::processor::{QueryLanguage, QueryProcessor};
460
461 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
463 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
464 }
465
466 #[cfg(all(feature = "sparql", feature = "rdf"))]
483 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
484 use crate::query::{
485 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
486 };
487
488 let logical_plan = sparql_translator::translate(query)?;
490
491 let optimizer = Optimizer::from_store(&self.store);
493 let optimized_plan = optimizer.optimize(logical_plan)?;
494
495 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
497 let mut physical_plan = planner.plan(&optimized_plan)?;
498
499 let executor = Executor::with_columns(physical_plan.columns.clone());
501 executor.execute(physical_plan.operator.as_mut())
502 }
503
504 #[cfg(feature = "rdf")]
508 #[must_use]
509 pub fn rdf_store(&self) -> &Arc<RdfStore> {
510 &self.rdf_store
511 }
512
513 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
519 let result = self.execute(query)?;
520 result.scalar()
521 }
522
523 #[must_use]
525 pub fn config(&self) -> &Config {
526 &self.config
527 }
528
529 #[must_use]
531 pub fn graph_model(&self) -> crate::config::GraphModel {
532 self.config.graph_model
533 }
534
535 #[must_use]
537 pub fn memory_limit(&self) -> Option<usize> {
538 self.config.memory_limit
539 }
540
541 #[must_use]
545 pub fn store(&self) -> &Arc<LpgStore> {
546 &self.store
547 }
548
549 pub fn gc(&self) {
555 let min_epoch = self.tx_manager.min_active_epoch();
556 self.store.gc_versions(min_epoch);
557 self.tx_manager.gc();
558 }
559
560 #[must_use]
562 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
563 &self.buffer_manager
564 }
565
566 pub fn close(&self) -> Result<()> {
576 let mut is_open = self.is_open.write();
577 if !*is_open {
578 return Ok(());
579 }
580
581 #[cfg(feature = "wal")]
583 if let Some(ref wal) = self.wal {
584 let epoch = self.store.current_epoch();
585
586 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
588 self.tx_manager.begin()
590 });
591
592 wal.log(&WalRecord::TxCommit {
594 tx_id: checkpoint_tx,
595 })?;
596
597 wal.checkpoint(checkpoint_tx, epoch)?;
599 wal.sync()?;
600 }
601
602 *is_open = false;
603 Ok(())
604 }
605
606 #[cfg(feature = "wal")]
608 #[must_use]
609 pub fn wal(&self) -> Option<&Arc<WalManager>> {
610 self.wal.as_ref()
611 }
612
613 #[cfg(feature = "wal")]
615 fn log_wal(&self, record: &WalRecord) -> Result<()> {
616 if let Some(ref wal) = self.wal {
617 wal.log(record)?;
618 }
619 Ok(())
620 }
621
622 #[must_use]
624 pub fn node_count(&self) -> usize {
625 self.store.node_count()
626 }
627
628 #[must_use]
630 pub fn edge_count(&self) -> usize {
631 self.store.edge_count()
632 }
633
634 #[must_use]
636 pub fn label_count(&self) -> usize {
637 self.store.label_count()
638 }
639
640 #[must_use]
642 pub fn property_key_count(&self) -> usize {
643 self.store.property_key_count()
644 }
645
646 #[must_use]
648 pub fn edge_type_count(&self) -> usize {
649 self.store.edge_type_count()
650 }
651
652 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
669 let id = self.store.create_node(labels);
670
671 #[cfg(feature = "wal")]
673 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
674 id,
675 labels: labels.iter().map(|s| (*s).to_string()).collect(),
676 }) {
677 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
678 }
679
680 id
681 }
682
683 pub fn create_node_with_props(
687 &self,
688 labels: &[&str],
689 properties: impl IntoIterator<
690 Item = (
691 impl Into<grafeo_common::types::PropertyKey>,
692 impl Into<grafeo_common::types::Value>,
693 ),
694 >,
695 ) -> grafeo_common::types::NodeId {
696 let props: Vec<(
698 grafeo_common::types::PropertyKey,
699 grafeo_common::types::Value,
700 )> = properties
701 .into_iter()
702 .map(|(k, v)| (k.into(), v.into()))
703 .collect();
704
705 let id = self
706 .store
707 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
708
709 #[cfg(feature = "wal")]
711 {
712 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
713 id,
714 labels: labels.iter().map(|s| (*s).to_string()).collect(),
715 }) {
716 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
717 }
718
719 for (key, value) in props {
721 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
722 id,
723 key: key.to_string(),
724 value,
725 }) {
726 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
727 }
728 }
729 }
730
731 id
732 }
733
734 #[must_use]
736 pub fn get_node(
737 &self,
738 id: grafeo_common::types::NodeId,
739 ) -> Option<grafeo_core::graph::lpg::Node> {
740 self.store.get_node(id)
741 }
742
743 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
747 #[cfg(feature = "vector-index")]
749 let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
750 .store
751 .get_node(id)
752 .map(|node| {
753 let mut indexes = Vec::new();
754 for label in &node.labels {
755 let prefix = format!("{}:", label.as_str());
756 for (key, index) in self.store.vector_index_entries() {
757 if key.starts_with(&prefix) {
758 indexes.push(index);
759 }
760 }
761 }
762 indexes
763 })
764 .unwrap_or_default();
765
766 let result = self.store.delete_node(id);
767
768 #[cfg(feature = "vector-index")]
770 if result {
771 for index in indexes_to_clean {
772 index.remove(id);
773 }
774 }
775
776 #[cfg(feature = "wal")]
777 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
778 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
779 }
780
781 result
782 }
783
784 pub fn set_node_property(
788 &self,
789 id: grafeo_common::types::NodeId,
790 key: &str,
791 value: grafeo_common::types::Value,
792 ) {
793 #[cfg(feature = "vector-index")]
795 let vector_data = match &value {
796 grafeo_common::types::Value::Vector(v) => Some(v.clone()),
797 _ => None,
798 };
799
800 #[cfg(feature = "wal")]
802 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
803 id,
804 key: key.to_string(),
805 value: value.clone(),
806 }) {
807 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
808 }
809
810 self.store.set_node_property(id, key, value);
811
812 #[cfg(feature = "vector-index")]
814 if let Some(vec) = vector_data
815 && let Some(node) = self.store.get_node(id)
816 {
817 for label in &node.labels {
818 if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
819 let accessor =
820 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, key);
821 index.insert(id, &vec, &accessor);
822 }
823 }
824 }
825 }
826
827 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
845 let result = self.store.add_label(id, label);
846
847 #[cfg(feature = "wal")]
848 if result {
849 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
851 id,
852 label: label.to_string(),
853 }) {
854 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
855 }
856 }
857
858 #[cfg(feature = "vector-index")]
860 if result {
861 let prefix = format!("{label}:");
862 for (key, index) in self.store.vector_index_entries() {
863 if let Some(property) = key.strip_prefix(&prefix)
864 && let Some(node) = self.store.get_node(id)
865 {
866 let prop_key = grafeo_common::types::PropertyKey::new(property);
867 if let Some(grafeo_common::types::Value::Vector(v)) =
868 node.properties.get(&prop_key)
869 {
870 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
871 &self.store,
872 property,
873 );
874 index.insert(id, v, &accessor);
875 }
876 }
877 }
878 }
879
880 result
881 }
882
883 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
901 let result = self.store.remove_label(id, label);
902
903 #[cfg(feature = "wal")]
904 if result {
905 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
907 id,
908 label: label.to_string(),
909 }) {
910 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
911 }
912 }
913
914 result
915 }
916
917 #[must_use]
934 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
935 self.store
936 .get_node(id)
937 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
938 }
939
940 pub fn create_edge(
960 &self,
961 src: grafeo_common::types::NodeId,
962 dst: grafeo_common::types::NodeId,
963 edge_type: &str,
964 ) -> grafeo_common::types::EdgeId {
965 let id = self.store.create_edge(src, dst, edge_type);
966
967 #[cfg(feature = "wal")]
969 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
970 id,
971 src,
972 dst,
973 edge_type: edge_type.to_string(),
974 }) {
975 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
976 }
977
978 id
979 }
980
981 pub fn create_edge_with_props(
985 &self,
986 src: grafeo_common::types::NodeId,
987 dst: grafeo_common::types::NodeId,
988 edge_type: &str,
989 properties: impl IntoIterator<
990 Item = (
991 impl Into<grafeo_common::types::PropertyKey>,
992 impl Into<grafeo_common::types::Value>,
993 ),
994 >,
995 ) -> grafeo_common::types::EdgeId {
996 let props: Vec<(
998 grafeo_common::types::PropertyKey,
999 grafeo_common::types::Value,
1000 )> = properties
1001 .into_iter()
1002 .map(|(k, v)| (k.into(), v.into()))
1003 .collect();
1004
1005 let id = self.store.create_edge_with_props(
1006 src,
1007 dst,
1008 edge_type,
1009 props.iter().map(|(k, v)| (k.clone(), v.clone())),
1010 );
1011
1012 #[cfg(feature = "wal")]
1014 {
1015 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
1016 id,
1017 src,
1018 dst,
1019 edge_type: edge_type.to_string(),
1020 }) {
1021 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
1022 }
1023
1024 for (key, value) in props {
1026 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1027 id,
1028 key: key.to_string(),
1029 value,
1030 }) {
1031 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1032 }
1033 }
1034 }
1035
1036 id
1037 }
1038
1039 #[must_use]
1041 pub fn get_edge(
1042 &self,
1043 id: grafeo_common::types::EdgeId,
1044 ) -> Option<grafeo_core::graph::lpg::Edge> {
1045 self.store.get_edge(id)
1046 }
1047
1048 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
1052 let result = self.store.delete_edge(id);
1053
1054 #[cfg(feature = "wal")]
1055 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
1056 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
1057 }
1058
1059 result
1060 }
1061
1062 pub fn set_edge_property(
1066 &self,
1067 id: grafeo_common::types::EdgeId,
1068 key: &str,
1069 value: grafeo_common::types::Value,
1070 ) {
1071 #[cfg(feature = "wal")]
1073 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1074 id,
1075 key: key.to_string(),
1076 value: value.clone(),
1077 }) {
1078 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1079 }
1080 self.store.set_edge_property(id, key, value);
1081 }
1082
1083 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
1087 self.store.remove_node_property(id, key).is_some()
1089 }
1090
1091 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
1095 self.store.remove_edge_property(id, key).is_some()
1097 }
1098
1099 pub fn create_property_index(&self, property: &str) {
1119 self.store.create_property_index(property);
1120 }
1121
1122 pub fn create_vector_index(
1142 &self,
1143 label: &str,
1144 property: &str,
1145 dimensions: Option<usize>,
1146 metric: Option<&str>,
1147 m: Option<usize>,
1148 ef_construction: Option<usize>,
1149 ) -> Result<()> {
1150 use grafeo_common::types::{PropertyKey, Value};
1151 use grafeo_core::index::vector::DistanceMetric;
1152
1153 let metric = match metric {
1154 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1155 grafeo_common::utils::error::Error::Internal(format!(
1156 "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
1157 m
1158 ))
1159 })?,
1160 None => DistanceMetric::Cosine,
1161 };
1162
1163 let prop_key = PropertyKey::new(property);
1165 let mut found_dims: Option<usize> = dimensions;
1166 let mut vector_count = 0usize;
1167
1168 #[cfg(feature = "vector-index")]
1169 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1170
1171 for node in self.store.nodes_with_label(label) {
1172 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1173 if let Some(expected) = found_dims {
1174 if v.len() != expected {
1175 return Err(grafeo_common::utils::error::Error::Internal(format!(
1176 "Vector dimension mismatch: expected {}, found {} on node {}",
1177 expected,
1178 v.len(),
1179 node.id.0
1180 )));
1181 }
1182 } else {
1183 found_dims = Some(v.len());
1184 }
1185 vector_count += 1;
1186 #[cfg(feature = "vector-index")]
1187 vectors.push((node.id, v.to_vec()));
1188 }
1189 }
1190
1191 if vector_count == 0 {
1192 return Err(grafeo_common::utils::error::Error::Internal(format!(
1193 "No vector properties found on :{label}({property})"
1194 )));
1195 }
1196
1197 let dims = found_dims.unwrap_or(0);
1198
1199 #[cfg(feature = "vector-index")]
1201 {
1202 use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1203
1204 let mut config = HnswConfig::new(dims, metric);
1205 if let Some(m_val) = m {
1206 config = config.with_m(m_val);
1207 }
1208 if let Some(ef_c) = ef_construction {
1209 config = config.with_ef_construction(ef_c);
1210 }
1211
1212 let index = HnswIndex::with_capacity(config, vectors.len());
1213 let accessor =
1214 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1215 for (node_id, vec) in &vectors {
1216 index.insert(*node_id, vec, &accessor);
1217 }
1218
1219 self.store
1220 .add_vector_index(label, property, Arc::new(index));
1221 }
1222
1223 let _ = (m, ef_construction);
1225
1226 tracing::info!(
1227 "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1228 metric_name = metric.name()
1229 );
1230
1231 Ok(())
1232 }
1233
1234 #[cfg(feature = "vector-index")]
1242 pub fn drop_vector_index(&self, label: &str, property: &str) -> bool {
1243 let removed = self.store.remove_vector_index(label, property);
1244 if removed {
1245 tracing::info!("Vector index dropped: :{label}({property})");
1246 }
1247 removed
1248 }
1249
1250 #[cfg(feature = "vector-index")]
1261 pub fn rebuild_vector_index(&self, label: &str, property: &str) -> Result<()> {
1262 let config = self
1263 .store
1264 .get_vector_index(label, property)
1265 .map(|idx| idx.config().clone())
1266 .ok_or_else(|| {
1267 grafeo_common::utils::error::Error::Internal(format!(
1268 "No vector index found for :{label}({property}). Cannot rebuild."
1269 ))
1270 })?;
1271
1272 self.store.remove_vector_index(label, property);
1273
1274 self.create_vector_index(
1275 label,
1276 property,
1277 Some(config.dimensions),
1278 Some(config.metric.name()),
1279 Some(config.m),
1280 Some(config.ef_construction),
1281 )
1282 }
1283
1284 #[cfg(feature = "vector-index")]
1290 fn compute_filter_allowlist(
1291 &self,
1292 label: &str,
1293 filters: Option<&std::collections::HashMap<String, Value>>,
1294 ) -> Option<std::collections::HashSet<NodeId>> {
1295 let filters = filters.filter(|f| !f.is_empty())?;
1296
1297 let label_nodes: std::collections::HashSet<NodeId> =
1299 self.store.nodes_by_label(label).into_iter().collect();
1300
1301 let mut allowlist = label_nodes;
1302
1303 for (key, value) in filters {
1304 let matching: std::collections::HashSet<NodeId> = self
1305 .store
1306 .find_nodes_by_property(key, value)
1307 .into_iter()
1308 .collect();
1309 allowlist = allowlist.intersection(&matching).copied().collect();
1310
1311 if allowlist.is_empty() {
1313 return Some(allowlist);
1314 }
1315 }
1316
1317 Some(allowlist)
1318 }
1319
1320 #[cfg(feature = "vector-index")]
1338 pub fn vector_search(
1339 &self,
1340 label: &str,
1341 property: &str,
1342 query: &[f32],
1343 k: usize,
1344 ef: Option<usize>,
1345 filters: Option<&std::collections::HashMap<String, Value>>,
1346 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1347 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1348 grafeo_common::utils::error::Error::Internal(format!(
1349 "No vector index found for :{label}({property}). Call create_vector_index() first."
1350 ))
1351 })?;
1352
1353 let accessor =
1354 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1355
1356 let results = match self.compute_filter_allowlist(label, filters) {
1357 Some(allowlist) => match ef {
1358 Some(ef_val) => {
1359 index.search_with_ef_and_filter(query, k, ef_val, &allowlist, &accessor)
1360 }
1361 None => index.search_with_filter(query, k, &allowlist, &accessor),
1362 },
1363 None => match ef {
1364 Some(ef_val) => index.search_with_ef(query, k, ef_val, &accessor),
1365 None => index.search(query, k, &accessor),
1366 },
1367 };
1368
1369 Ok(results)
1370 }
1371
1372 pub fn batch_create_nodes(
1388 &self,
1389 label: &str,
1390 property: &str,
1391 vectors: Vec<Vec<f32>>,
1392 ) -> Vec<grafeo_common::types::NodeId> {
1393 use grafeo_common::types::{PropertyKey, Value};
1394
1395 let prop_key = PropertyKey::new(property);
1396 let labels: &[&str] = &[label];
1397
1398 let ids: Vec<grafeo_common::types::NodeId> = vectors
1399 .into_iter()
1400 .map(|vec| {
1401 let value = Value::Vector(vec.into());
1402 let id = self.store.create_node_with_props(
1403 labels,
1404 std::iter::once((prop_key.clone(), value.clone())),
1405 );
1406
1407 #[cfg(feature = "wal")]
1409 {
1410 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1411 id,
1412 labels: labels.iter().map(|s| (*s).to_string()).collect(),
1413 }) {
1414 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1415 }
1416 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1417 id,
1418 key: property.to_string(),
1419 value,
1420 }) {
1421 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1422 }
1423 }
1424
1425 id
1426 })
1427 .collect();
1428
1429 #[cfg(feature = "vector-index")]
1431 if let Some(index) = self.store.get_vector_index(label, property) {
1432 let accessor =
1433 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1434 for &id in &ids {
1435 if let Some(node) = self.store.get_node(id) {
1436 let pk = grafeo_common::types::PropertyKey::new(property);
1437 if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
1438 index.insert(id, v, &accessor);
1439 }
1440 }
1441 }
1442 }
1443
1444 ids
1445 }
1446
1447 #[cfg(feature = "vector-index")]
1460 pub fn batch_vector_search(
1461 &self,
1462 label: &str,
1463 property: &str,
1464 queries: &[Vec<f32>],
1465 k: usize,
1466 ef: Option<usize>,
1467 filters: Option<&std::collections::HashMap<String, Value>>,
1468 ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1469 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1470 grafeo_common::utils::error::Error::Internal(format!(
1471 "No vector index found for :{label}({property}). Call create_vector_index() first."
1472 ))
1473 })?;
1474
1475 let accessor =
1476 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1477
1478 let results = match self.compute_filter_allowlist(label, filters) {
1479 Some(allowlist) => match ef {
1480 Some(ef_val) => {
1481 index.batch_search_with_ef_and_filter(queries, k, ef_val, &allowlist, &accessor)
1482 }
1483 None => index.batch_search_with_filter(queries, k, &allowlist, &accessor),
1484 },
1485 None => match ef {
1486 Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val, &accessor),
1487 None => index.batch_search(queries, k, &accessor),
1488 },
1489 };
1490
1491 Ok(results)
1492 }
1493
1494 #[cfg(feature = "vector-index")]
1517 #[allow(clippy::too_many_arguments)]
1518 pub fn mmr_search(
1519 &self,
1520 label: &str,
1521 property: &str,
1522 query: &[f32],
1523 k: usize,
1524 fetch_k: Option<usize>,
1525 lambda: Option<f32>,
1526 ef: Option<usize>,
1527 filters: Option<&std::collections::HashMap<String, Value>>,
1528 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1529 use grafeo_core::index::vector::mmr_select;
1530
1531 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1532 grafeo_common::utils::error::Error::Internal(format!(
1533 "No vector index found for :{label}({property}). Call create_vector_index() first."
1534 ))
1535 })?;
1536
1537 let accessor =
1538 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1539
1540 let fetch_k = fetch_k.unwrap_or(k.saturating_mul(4).max(k));
1541 let lambda = lambda.unwrap_or(0.5);
1542
1543 let initial_results = match self.compute_filter_allowlist(label, filters) {
1545 Some(allowlist) => match ef {
1546 Some(ef_val) => {
1547 index.search_with_ef_and_filter(query, fetch_k, ef_val, &allowlist, &accessor)
1548 }
1549 None => index.search_with_filter(query, fetch_k, &allowlist, &accessor),
1550 },
1551 None => match ef {
1552 Some(ef_val) => index.search_with_ef(query, fetch_k, ef_val, &accessor),
1553 None => index.search(query, fetch_k, &accessor),
1554 },
1555 };
1556
1557 if initial_results.is_empty() {
1558 return Ok(Vec::new());
1559 }
1560
1561 use grafeo_core::index::vector::VectorAccessor;
1563 let candidates: Vec<(grafeo_common::types::NodeId, f32, std::sync::Arc<[f32]>)> =
1564 initial_results
1565 .into_iter()
1566 .filter_map(|(id, dist)| accessor.get_vector(id).map(|vec| (id, dist, vec)))
1567 .collect();
1568
1569 let candidate_refs: Vec<(grafeo_common::types::NodeId, f32, &[f32])> = candidates
1571 .iter()
1572 .map(|(id, dist, vec)| (*id, *dist, vec.as_ref()))
1573 .collect();
1574
1575 let metric = index.config().metric;
1577 Ok(mmr_select(query, &candidate_refs, k, lambda, metric))
1578 }
1579
1580 pub fn drop_property_index(&self, property: &str) -> bool {
1584 self.store.drop_property_index(property)
1585 }
1586
1587 #[must_use]
1589 pub fn has_property_index(&self, property: &str) -> bool {
1590 self.store.has_property_index(property)
1591 }
1592
1593 #[must_use]
1608 pub fn find_nodes_by_property(
1609 &self,
1610 property: &str,
1611 value: &grafeo_common::types::Value,
1612 ) -> Vec<grafeo_common::types::NodeId> {
1613 self.store.find_nodes_by_property(property, value)
1614 }
1615
1616 #[must_use]
1624 pub fn is_persistent(&self) -> bool {
1625 self.config.path.is_some()
1626 }
1627
1628 #[must_use]
1632 pub fn path(&self) -> Option<&Path> {
1633 self.config.path.as_deref()
1634 }
1635
1636 #[must_use]
1640 pub fn info(&self) -> crate::admin::DatabaseInfo {
1641 crate::admin::DatabaseInfo {
1642 mode: crate::admin::DatabaseMode::Lpg,
1643 node_count: self.store.node_count(),
1644 edge_count: self.store.edge_count(),
1645 is_persistent: self.is_persistent(),
1646 path: self.config.path.clone(),
1647 wal_enabled: self.config.wal_enabled,
1648 version: env!("CARGO_PKG_VERSION").to_string(),
1649 }
1650 }
1651
1652 #[must_use]
1656 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1657 #[cfg(feature = "wal")]
1658 let disk_bytes = self.config.path.as_ref().and_then(|p| {
1659 if p.exists() {
1660 Self::calculate_disk_usage(p).ok()
1661 } else {
1662 None
1663 }
1664 });
1665 #[cfg(not(feature = "wal"))]
1666 let disk_bytes: Option<usize> = None;
1667
1668 crate::admin::DatabaseStats {
1669 node_count: self.store.node_count(),
1670 edge_count: self.store.edge_count(),
1671 label_count: self.store.label_count(),
1672 edge_type_count: self.store.edge_type_count(),
1673 property_key_count: self.store.property_key_count(),
1674 index_count: 0, memory_bytes: self.buffer_manager.allocated(),
1676 disk_bytes,
1677 }
1678 }
1679
1680 #[cfg(feature = "wal")]
1682 fn calculate_disk_usage(path: &Path) -> Result<usize> {
1683 let mut total = 0usize;
1684 if path.is_dir() {
1685 for entry in std::fs::read_dir(path)? {
1686 let entry = entry?;
1687 let metadata = entry.metadata()?;
1688 if metadata.is_file() {
1689 total += metadata.len() as usize;
1690 } else if metadata.is_dir() {
1691 total += Self::calculate_disk_usage(&entry.path())?;
1692 }
1693 }
1694 }
1695 Ok(total)
1696 }
1697
1698 #[must_use]
1703 pub fn schema(&self) -> crate::admin::SchemaInfo {
1704 let labels = self
1705 .store
1706 .all_labels()
1707 .into_iter()
1708 .map(|name| crate::admin::LabelInfo {
1709 name: name.clone(),
1710 count: self.store.nodes_with_label(&name).count(),
1711 })
1712 .collect();
1713
1714 let edge_types = self
1715 .store
1716 .all_edge_types()
1717 .into_iter()
1718 .map(|name| crate::admin::EdgeTypeInfo {
1719 name: name.clone(),
1720 count: self.store.edges_with_type(&name).count(),
1721 })
1722 .collect();
1723
1724 let property_keys = self.store.all_property_keys();
1725
1726 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1727 labels,
1728 edge_types,
1729 property_keys,
1730 })
1731 }
1732
1733 #[cfg(feature = "rdf")]
1737 #[must_use]
1738 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1739 let stats = self.rdf_store.stats();
1740
1741 let predicates = self
1742 .rdf_store
1743 .predicates()
1744 .into_iter()
1745 .map(|predicate| {
1746 let count = self.rdf_store.triples_with_predicate(&predicate).len();
1747 crate::admin::PredicateInfo {
1748 iri: predicate.to_string(),
1749 count,
1750 }
1751 })
1752 .collect();
1753
1754 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1755 predicates,
1756 named_graphs: Vec::new(), subject_count: stats.subject_count,
1758 object_count: stats.object_count,
1759 })
1760 }
1761
1762 #[must_use]
1770 pub fn validate(&self) -> crate::admin::ValidationResult {
1771 let mut result = crate::admin::ValidationResult::default();
1772
1773 for edge in self.store.all_edges() {
1775 if self.store.get_node(edge.src).is_none() {
1776 result.errors.push(crate::admin::ValidationError {
1777 code: "DANGLING_SRC".to_string(),
1778 message: format!(
1779 "Edge {} references non-existent source node {}",
1780 edge.id.0, edge.src.0
1781 ),
1782 context: Some(format!("edge:{}", edge.id.0)),
1783 });
1784 }
1785 if self.store.get_node(edge.dst).is_none() {
1786 result.errors.push(crate::admin::ValidationError {
1787 code: "DANGLING_DST".to_string(),
1788 message: format!(
1789 "Edge {} references non-existent destination node {}",
1790 edge.id.0, edge.dst.0
1791 ),
1792 context: Some(format!("edge:{}", edge.id.0)),
1793 });
1794 }
1795 }
1796
1797 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1799 result.warnings.push(crate::admin::ValidationWarning {
1800 code: "NO_EDGES".to_string(),
1801 message: "Database has nodes but no edges".to_string(),
1802 context: None,
1803 });
1804 }
1805
1806 result
1807 }
1808
1809 #[must_use]
1813 pub fn wal_status(&self) -> crate::admin::WalStatus {
1814 #[cfg(feature = "wal")]
1815 if let Some(ref wal) = self.wal {
1816 return crate::admin::WalStatus {
1817 enabled: true,
1818 path: self.config.path.as_ref().map(|p| p.join("wal")),
1819 size_bytes: wal.size_bytes(),
1820 record_count: wal.record_count() as usize,
1821 last_checkpoint: wal.last_checkpoint_timestamp(),
1822 current_epoch: self.store.current_epoch().as_u64(),
1823 };
1824 }
1825
1826 crate::admin::WalStatus {
1827 enabled: false,
1828 path: None,
1829 size_bytes: 0,
1830 record_count: 0,
1831 last_checkpoint: None,
1832 current_epoch: self.store.current_epoch().as_u64(),
1833 }
1834 }
1835
1836 pub fn wal_checkpoint(&self) -> Result<()> {
1844 #[cfg(feature = "wal")]
1845 if let Some(ref wal) = self.wal {
1846 let epoch = self.store.current_epoch();
1847 let tx_id = self
1848 .tx_manager
1849 .last_assigned_tx_id()
1850 .unwrap_or_else(|| self.tx_manager.begin());
1851 wal.checkpoint(tx_id, epoch)?;
1852 wal.sync()?;
1853 }
1854 Ok(())
1855 }
1856
1857 #[cfg(feature = "wal")]
1874 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1875 let path = path.as_ref();
1876
1877 let target_config = Config::persistent(path);
1879 let target = Self::with_config(target_config)?;
1880
1881 for node in self.store.all_nodes() {
1883 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1884 target.store.create_node_with_id(node.id, &label_refs);
1885
1886 target.log_wal(&WalRecord::CreateNode {
1888 id: node.id,
1889 labels: node.labels.iter().map(|s| s.to_string()).collect(),
1890 })?;
1891
1892 for (key, value) in node.properties {
1894 target
1895 .store
1896 .set_node_property(node.id, key.as_str(), value.clone());
1897 target.log_wal(&WalRecord::SetNodeProperty {
1898 id: node.id,
1899 key: key.to_string(),
1900 value,
1901 })?;
1902 }
1903 }
1904
1905 for edge in self.store.all_edges() {
1907 target
1908 .store
1909 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1910
1911 target.log_wal(&WalRecord::CreateEdge {
1913 id: edge.id,
1914 src: edge.src,
1915 dst: edge.dst,
1916 edge_type: edge.edge_type.to_string(),
1917 })?;
1918
1919 for (key, value) in edge.properties {
1921 target
1922 .store
1923 .set_edge_property(edge.id, key.as_str(), value.clone());
1924 target.log_wal(&WalRecord::SetEdgeProperty {
1925 id: edge.id,
1926 key: key.to_string(),
1927 value,
1928 })?;
1929 }
1930 }
1931
1932 target.close()?;
1934
1935 Ok(())
1936 }
1937
1938 pub fn to_memory(&self) -> Result<Self> {
1949 let config = Config::in_memory();
1950 let target = Self::with_config(config)?;
1951
1952 for node in self.store.all_nodes() {
1954 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1955 target.store.create_node_with_id(node.id, &label_refs);
1956
1957 for (key, value) in node.properties {
1959 target.store.set_node_property(node.id, key.as_str(), value);
1960 }
1961 }
1962
1963 for edge in self.store.all_edges() {
1965 target
1966 .store
1967 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1968
1969 for (key, value) in edge.properties {
1971 target.store.set_edge_property(edge.id, key.as_str(), value);
1972 }
1973 }
1974
1975 Ok(target)
1976 }
1977
1978 #[cfg(feature = "wal")]
1987 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1988 let source = Self::open(path)?;
1990
1991 let target = source.to_memory()?;
1993
1994 source.close()?;
1996
1997 Ok(target)
1998 }
1999
2000 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
2013 let nodes: Vec<SnapshotNode> = self
2014 .store
2015 .all_nodes()
2016 .map(|n| SnapshotNode {
2017 id: n.id,
2018 labels: n.labels.iter().map(|l| l.to_string()).collect(),
2019 properties: n
2020 .properties
2021 .into_iter()
2022 .map(|(k, v)| (k.to_string(), v))
2023 .collect(),
2024 })
2025 .collect();
2026
2027 let edges: Vec<SnapshotEdge> = self
2028 .store
2029 .all_edges()
2030 .map(|e| SnapshotEdge {
2031 id: e.id,
2032 src: e.src,
2033 dst: e.dst,
2034 edge_type: e.edge_type.to_string(),
2035 properties: e
2036 .properties
2037 .into_iter()
2038 .map(|(k, v)| (k.to_string(), v))
2039 .collect(),
2040 })
2041 .collect();
2042
2043 let snapshot = Snapshot {
2044 version: 1,
2045 nodes,
2046 edges,
2047 };
2048
2049 let config = bincode::config::standard();
2050 bincode::serde::encode_to_vec(&snapshot, config)
2051 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
2052 }
2053
2054 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
2062 let config = bincode::config::standard();
2063 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
2064 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
2065
2066 if snapshot.version != 1 {
2067 return Err(Error::Internal(format!(
2068 "unsupported snapshot version: {}",
2069 snapshot.version
2070 )));
2071 }
2072
2073 let db = Self::new_in_memory();
2074
2075 for node in snapshot.nodes {
2076 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
2077 db.store.create_node_with_id(node.id, &label_refs);
2078 for (key, value) in node.properties {
2079 db.store.set_node_property(node.id, &key, value);
2080 }
2081 }
2082
2083 for edge in snapshot.edges {
2084 db.store
2085 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2086 for (key, value) in edge.properties {
2087 db.store.set_edge_property(edge.id, &key, value);
2088 }
2089 }
2090
2091 Ok(db)
2092 }
2093
2094 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
2102 self.store.all_nodes()
2103 }
2104
2105 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
2109 self.store.all_edges()
2110 }
2111}
2112
2113#[derive(serde::Serialize, serde::Deserialize)]
2115struct Snapshot {
2116 version: u8,
2117 nodes: Vec<SnapshotNode>,
2118 edges: Vec<SnapshotEdge>,
2119}
2120
2121#[derive(serde::Serialize, serde::Deserialize)]
2122struct SnapshotNode {
2123 id: NodeId,
2124 labels: Vec<String>,
2125 properties: Vec<(String, Value)>,
2126}
2127
2128#[derive(serde::Serialize, serde::Deserialize)]
2129struct SnapshotEdge {
2130 id: EdgeId,
2131 src: NodeId,
2132 dst: NodeId,
2133 edge_type: String,
2134 properties: Vec<(String, Value)>,
2135}
2136
2137impl Drop for GrafeoDB {
2138 fn drop(&mut self) {
2139 if let Err(e) = self.close() {
2140 tracing::error!("Error closing database: {}", e);
2141 }
2142 }
2143}
2144
2145impl crate::admin::AdminService for GrafeoDB {
2146 fn info(&self) -> crate::admin::DatabaseInfo {
2147 self.info()
2148 }
2149
2150 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2151 self.detailed_stats()
2152 }
2153
2154 fn schema(&self) -> crate::admin::SchemaInfo {
2155 self.schema()
2156 }
2157
2158 fn validate(&self) -> crate::admin::ValidationResult {
2159 self.validate()
2160 }
2161
2162 fn wal_status(&self) -> crate::admin::WalStatus {
2163 self.wal_status()
2164 }
2165
2166 fn wal_checkpoint(&self) -> Result<()> {
2167 self.wal_checkpoint()
2168 }
2169}
2170
2171#[derive(Debug)]
2197pub struct QueryResult {
2198 pub columns: Vec<String>,
2200 pub column_types: Vec<grafeo_common::types::LogicalType>,
2202 pub rows: Vec<Vec<grafeo_common::types::Value>>,
2204 pub execution_time_ms: Option<f64>,
2206 pub rows_scanned: Option<u64>,
2208}
2209
2210impl QueryResult {
2211 #[must_use]
2213 pub fn new(columns: Vec<String>) -> Self {
2214 let len = columns.len();
2215 Self {
2216 columns,
2217 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2218 rows: Vec::new(),
2219 execution_time_ms: None,
2220 rows_scanned: None,
2221 }
2222 }
2223
2224 #[must_use]
2226 pub fn with_types(
2227 columns: Vec<String>,
2228 column_types: Vec<grafeo_common::types::LogicalType>,
2229 ) -> Self {
2230 Self {
2231 columns,
2232 column_types,
2233 rows: Vec::new(),
2234 execution_time_ms: None,
2235 rows_scanned: None,
2236 }
2237 }
2238
2239 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2241 self.execution_time_ms = Some(execution_time_ms);
2242 self.rows_scanned = Some(rows_scanned);
2243 self
2244 }
2245
2246 #[must_use]
2248 pub fn execution_time_ms(&self) -> Option<f64> {
2249 self.execution_time_ms
2250 }
2251
2252 #[must_use]
2254 pub fn rows_scanned(&self) -> Option<u64> {
2255 self.rows_scanned
2256 }
2257
2258 #[must_use]
2260 pub fn row_count(&self) -> usize {
2261 self.rows.len()
2262 }
2263
2264 #[must_use]
2266 pub fn column_count(&self) -> usize {
2267 self.columns.len()
2268 }
2269
2270 #[must_use]
2272 pub fn is_empty(&self) -> bool {
2273 self.rows.is_empty()
2274 }
2275
2276 pub fn scalar<T: FromValue>(&self) -> Result<T> {
2285 if self.rows.len() != 1 || self.columns.len() != 1 {
2286 return Err(grafeo_common::utils::error::Error::InvalidValue(
2287 "Expected single value".to_string(),
2288 ));
2289 }
2290 T::from_value(&self.rows[0][0])
2291 }
2292
2293 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2295 self.rows.iter()
2296 }
2297}
2298
2299pub trait FromValue: Sized {
2304 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2306}
2307
2308impl FromValue for i64 {
2309 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2310 value
2311 .as_int64()
2312 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2313 expected: "INT64".to_string(),
2314 found: value.type_name().to_string(),
2315 })
2316 }
2317}
2318
2319impl FromValue for f64 {
2320 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2321 value
2322 .as_float64()
2323 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2324 expected: "FLOAT64".to_string(),
2325 found: value.type_name().to_string(),
2326 })
2327 }
2328}
2329
2330impl FromValue for String {
2331 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2332 value.as_str().map(String::from).ok_or_else(|| {
2333 grafeo_common::utils::error::Error::TypeMismatch {
2334 expected: "STRING".to_string(),
2335 found: value.type_name().to_string(),
2336 }
2337 })
2338 }
2339}
2340
2341impl FromValue for bool {
2342 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2343 value
2344 .as_bool()
2345 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2346 expected: "BOOL".to_string(),
2347 found: value.type_name().to_string(),
2348 })
2349 }
2350}
2351
2352#[cfg(test)]
2353mod tests {
2354 use super::*;
2355
2356 #[test]
2357 fn test_create_in_memory_database() {
2358 let db = GrafeoDB::new_in_memory();
2359 assert_eq!(db.node_count(), 0);
2360 assert_eq!(db.edge_count(), 0);
2361 }
2362
2363 #[test]
2364 fn test_database_config() {
2365 let config = Config::in_memory().with_threads(4).with_query_logging();
2366
2367 let db = GrafeoDB::with_config(config).unwrap();
2368 assert_eq!(db.config().threads, 4);
2369 assert!(db.config().query_logging);
2370 }
2371
2372 #[test]
2373 fn test_database_session() {
2374 let db = GrafeoDB::new_in_memory();
2375 let _session = db.session();
2376 }
2378
2379 #[cfg(feature = "wal")]
2380 #[test]
2381 fn test_persistent_database_recovery() {
2382 use grafeo_common::types::Value;
2383 use tempfile::tempdir;
2384
2385 let dir = tempdir().unwrap();
2386 let db_path = dir.path().join("test_db");
2387
2388 {
2390 let db = GrafeoDB::open(&db_path).unwrap();
2391
2392 let alice = db.create_node(&["Person"]);
2393 db.set_node_property(alice, "name", Value::from("Alice"));
2394
2395 let bob = db.create_node(&["Person"]);
2396 db.set_node_property(bob, "name", Value::from("Bob"));
2397
2398 let _edge = db.create_edge(alice, bob, "KNOWS");
2399
2400 db.close().unwrap();
2402 }
2403
2404 {
2406 let db = GrafeoDB::open(&db_path).unwrap();
2407
2408 assert_eq!(db.node_count(), 2);
2409 assert_eq!(db.edge_count(), 1);
2410
2411 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2413 assert!(node0.is_some());
2414
2415 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2416 assert!(node1.is_some());
2417 }
2418 }
2419
2420 #[cfg(feature = "wal")]
2421 #[test]
2422 fn test_wal_logging() {
2423 use tempfile::tempdir;
2424
2425 let dir = tempdir().unwrap();
2426 let db_path = dir.path().join("wal_test_db");
2427
2428 let db = GrafeoDB::open(&db_path).unwrap();
2429
2430 let node = db.create_node(&["Test"]);
2432 db.delete_node(node);
2433
2434 if let Some(wal) = db.wal() {
2436 assert!(wal.record_count() > 0);
2437 }
2438
2439 db.close().unwrap();
2440 }
2441
2442 #[cfg(feature = "wal")]
2443 #[test]
2444 fn test_wal_recovery_multiple_sessions() {
2445 use grafeo_common::types::Value;
2447 use tempfile::tempdir;
2448
2449 let dir = tempdir().unwrap();
2450 let db_path = dir.path().join("multi_session_db");
2451
2452 {
2454 let db = GrafeoDB::open(&db_path).unwrap();
2455 let alice = db.create_node(&["Person"]);
2456 db.set_node_property(alice, "name", Value::from("Alice"));
2457 db.close().unwrap();
2458 }
2459
2460 {
2462 let db = GrafeoDB::open(&db_path).unwrap();
2463 assert_eq!(db.node_count(), 1); let bob = db.create_node(&["Person"]);
2465 db.set_node_property(bob, "name", Value::from("Bob"));
2466 db.close().unwrap();
2467 }
2468
2469 {
2471 let db = GrafeoDB::open(&db_path).unwrap();
2472 assert_eq!(db.node_count(), 2);
2473
2474 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2476 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2477
2478 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2479 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2480 }
2481 }
2482
2483 #[cfg(feature = "wal")]
2484 #[test]
2485 fn test_database_consistency_after_mutations() {
2486 use grafeo_common::types::Value;
2488 use tempfile::tempdir;
2489
2490 let dir = tempdir().unwrap();
2491 let db_path = dir.path().join("consistency_db");
2492
2493 {
2494 let db = GrafeoDB::open(&db_path).unwrap();
2495
2496 let a = db.create_node(&["Node"]);
2498 let b = db.create_node(&["Node"]);
2499 let c = db.create_node(&["Node"]);
2500
2501 let e1 = db.create_edge(a, b, "LINKS");
2503 let _e2 = db.create_edge(b, c, "LINKS");
2504
2505 db.delete_edge(e1);
2507 db.delete_node(b);
2508
2509 db.set_node_property(a, "value", Value::Int64(1));
2511 db.set_node_property(c, "value", Value::Int64(3));
2512
2513 db.close().unwrap();
2514 }
2515
2516 {
2518 let db = GrafeoDB::open(&db_path).unwrap();
2519
2520 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2524 assert!(node_a.is_some());
2525
2526 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2527 assert!(node_c.is_some());
2528
2529 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2531 assert!(node_b.is_none());
2532 }
2533 }
2534
2535 #[cfg(feature = "wal")]
2536 #[test]
2537 fn test_close_is_idempotent() {
2538 use tempfile::tempdir;
2540
2541 let dir = tempdir().unwrap();
2542 let db_path = dir.path().join("close_test_db");
2543
2544 let db = GrafeoDB::open(&db_path).unwrap();
2545 db.create_node(&["Test"]);
2546
2547 assert!(db.close().is_ok());
2549
2550 assert!(db.close().is_ok());
2552 }
2553
2554 #[test]
2555 fn test_query_result_has_metrics() {
2556 let db = GrafeoDB::new_in_memory();
2558 db.create_node(&["Person"]);
2559 db.create_node(&["Person"]);
2560
2561 #[cfg(feature = "gql")]
2562 {
2563 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2564
2565 assert!(result.execution_time_ms.is_some());
2567 assert!(result.rows_scanned.is_some());
2568 assert!(result.execution_time_ms.unwrap() >= 0.0);
2569 assert_eq!(result.rows_scanned.unwrap(), 2);
2570 }
2571 }
2572
2573 #[test]
2574 fn test_empty_query_result_metrics() {
2575 let db = GrafeoDB::new_in_memory();
2577 db.create_node(&["Person"]);
2578
2579 #[cfg(feature = "gql")]
2580 {
2581 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2583
2584 assert!(result.execution_time_ms.is_some());
2585 assert!(result.rows_scanned.is_some());
2586 assert_eq!(result.rows_scanned.unwrap(), 0);
2587 }
2588 }
2589}