1use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10#[cfg(feature = "wal")]
11use grafeo_adapters::storage::wal::{
12 DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
13};
14use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
15use grafeo_common::types::{EdgeId, NodeId, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::graph::lpg::LpgStore;
18#[cfg(feature = "rdf")]
19use grafeo_core::graph::rdf::RdfStore;
20
21use crate::config::Config;
22use crate::query::cache::QueryCache;
23use crate::session::Session;
24use crate::transaction::TransactionManager;
25
26pub struct GrafeoDB {
49 config: Config,
51 store: Arc<LpgStore>,
53 #[cfg(feature = "rdf")]
55 rdf_store: Arc<RdfStore>,
56 tx_manager: Arc<TransactionManager>,
58 buffer_manager: Arc<BufferManager>,
60 #[cfg(feature = "wal")]
62 wal: Option<Arc<WalManager>>,
63 query_cache: Arc<QueryCache>,
65 is_open: RwLock<bool>,
67}
68
69impl GrafeoDB {
70 #[must_use]
86 pub fn new_in_memory() -> Self {
87 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
88 }
89
90 #[cfg(feature = "wal")]
109 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
110 Self::with_config(Config::persistent(path.as_ref()))
111 }
112
113 pub fn with_config(config: Config) -> Result<Self> {
137 config
139 .validate()
140 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
141
142 let store = Arc::new(LpgStore::new());
143 #[cfg(feature = "rdf")]
144 let rdf_store = Arc::new(RdfStore::new());
145 let tx_manager = Arc::new(TransactionManager::new());
146
147 let buffer_config = BufferManagerConfig {
149 budget: config.memory_limit.unwrap_or_else(|| {
150 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
151 }),
152 spill_path: config
153 .spill_path
154 .clone()
155 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
156 ..BufferManagerConfig::default()
157 };
158 let buffer_manager = BufferManager::new(buffer_config);
159
160 #[cfg(feature = "wal")]
162 let wal = if config.wal_enabled {
163 if let Some(ref db_path) = config.path {
164 std::fs::create_dir_all(db_path)?;
166
167 let wal_path = db_path.join("wal");
168
169 if wal_path.exists() {
171 let recovery = WalRecovery::new(&wal_path);
172 let records = recovery.recover()?;
173 Self::apply_wal_records(&store, &records)?;
174 }
175
176 let wal_durability = match config.wal_durability {
178 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
179 crate::config::DurabilityMode::Batch {
180 max_delay_ms,
181 max_records,
182 } => WalDurabilityMode::Batch {
183 max_delay_ms,
184 max_records,
185 },
186 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
187 WalDurabilityMode::Adaptive { target_interval_ms }
188 }
189 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
190 };
191 let wal_config = WalConfig {
192 durability: wal_durability,
193 ..WalConfig::default()
194 };
195 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
196 Some(Arc::new(wal_manager))
197 } else {
198 None
199 }
200 } else {
201 None
202 };
203
204 let query_cache = Arc::new(QueryCache::default());
206
207 Ok(Self {
208 config,
209 store,
210 #[cfg(feature = "rdf")]
211 rdf_store,
212 tx_manager,
213 buffer_manager,
214 #[cfg(feature = "wal")]
215 wal,
216 query_cache,
217 is_open: RwLock::new(true),
218 })
219 }
220
221 #[cfg(feature = "wal")]
223 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
224 for record in records {
225 match record {
226 WalRecord::CreateNode { id, labels } => {
227 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
228 store.create_node_with_id(*id, &label_refs);
229 }
230 WalRecord::DeleteNode { id } => {
231 store.delete_node(*id);
232 }
233 WalRecord::CreateEdge {
234 id,
235 src,
236 dst,
237 edge_type,
238 } => {
239 store.create_edge_with_id(*id, *src, *dst, edge_type);
240 }
241 WalRecord::DeleteEdge { id } => {
242 store.delete_edge(*id);
243 }
244 WalRecord::SetNodeProperty { id, key, value } => {
245 store.set_node_property(*id, key, value.clone());
246 }
247 WalRecord::SetEdgeProperty { id, key, value } => {
248 store.set_edge_property(*id, key, value.clone());
249 }
250 WalRecord::AddNodeLabel { id, label } => {
251 store.add_label(*id, label);
252 }
253 WalRecord::RemoveNodeLabel { id, label } => {
254 store.remove_label(*id, label);
255 }
256 WalRecord::TxCommit { .. }
257 | WalRecord::TxAbort { .. }
258 | WalRecord::Checkpoint { .. } => {
259 }
262 }
263 }
264 Ok(())
265 }
266
267 #[must_use]
286 pub fn session(&self) -> Session {
287 #[cfg(feature = "rdf")]
288 {
289 Session::with_rdf_store_and_adaptive(
290 Arc::clone(&self.store),
291 Arc::clone(&self.rdf_store),
292 Arc::clone(&self.tx_manager),
293 Arc::clone(&self.query_cache),
294 self.config.adaptive.clone(),
295 self.config.factorized_execution,
296 self.config.graph_model,
297 )
298 }
299 #[cfg(not(feature = "rdf"))]
300 {
301 Session::with_adaptive(
302 Arc::clone(&self.store),
303 Arc::clone(&self.tx_manager),
304 Arc::clone(&self.query_cache),
305 self.config.adaptive.clone(),
306 self.config.factorized_execution,
307 self.config.graph_model,
308 )
309 }
310 }
311
312 #[must_use]
314 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
315 &self.config.adaptive
316 }
317
318 pub fn execute(&self, query: &str) -> Result<QueryResult> {
328 let session = self.session();
329 session.execute(query)
330 }
331
332 pub fn execute_with_params(
338 &self,
339 query: &str,
340 params: std::collections::HashMap<String, grafeo_common::types::Value>,
341 ) -> Result<QueryResult> {
342 let session = self.session();
343 session.execute_with_params(query, params)
344 }
345
346 #[cfg(feature = "cypher")]
352 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
353 let session = self.session();
354 session.execute_cypher(query)
355 }
356
357 #[cfg(feature = "cypher")]
363 pub fn execute_cypher_with_params(
364 &self,
365 query: &str,
366 params: std::collections::HashMap<String, grafeo_common::types::Value>,
367 ) -> Result<QueryResult> {
368 use crate::query::processor::{QueryLanguage, QueryProcessor};
369
370 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
372 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
373 }
374
375 #[cfg(feature = "gremlin")]
381 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
382 let session = self.session();
383 session.execute_gremlin(query)
384 }
385
386 #[cfg(feature = "gremlin")]
392 pub fn execute_gremlin_with_params(
393 &self,
394 query: &str,
395 params: std::collections::HashMap<String, grafeo_common::types::Value>,
396 ) -> Result<QueryResult> {
397 let session = self.session();
398 session.execute_gremlin_with_params(query, params)
399 }
400
401 #[cfg(feature = "graphql")]
407 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
408 let session = self.session();
409 session.execute_graphql(query)
410 }
411
412 #[cfg(feature = "graphql")]
418 pub fn execute_graphql_with_params(
419 &self,
420 query: &str,
421 params: std::collections::HashMap<String, grafeo_common::types::Value>,
422 ) -> Result<QueryResult> {
423 let session = self.session();
424 session.execute_graphql_with_params(query, params)
425 }
426
427 #[cfg(feature = "sql-pgq")]
433 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
434 let session = self.session();
435 session.execute_sql(query)
436 }
437
438 #[cfg(feature = "sql-pgq")]
444 pub fn execute_sql_with_params(
445 &self,
446 query: &str,
447 params: std::collections::HashMap<String, grafeo_common::types::Value>,
448 ) -> Result<QueryResult> {
449 use crate::query::processor::{QueryLanguage, QueryProcessor};
450
451 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
453 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
454 }
455
456 #[cfg(all(feature = "sparql", feature = "rdf"))]
473 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
474 use crate::query::{
475 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
476 };
477
478 let logical_plan = sparql_translator::translate(query)?;
480
481 let optimizer = Optimizer::from_store(&self.store);
483 let optimized_plan = optimizer.optimize(logical_plan)?;
484
485 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
487 let mut physical_plan = planner.plan(&optimized_plan)?;
488
489 let executor = Executor::with_columns(physical_plan.columns.clone());
491 executor.execute(physical_plan.operator.as_mut())
492 }
493
494 #[cfg(feature = "rdf")]
498 #[must_use]
499 pub fn rdf_store(&self) -> &Arc<RdfStore> {
500 &self.rdf_store
501 }
502
503 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
509 let result = self.execute(query)?;
510 result.scalar()
511 }
512
513 #[must_use]
515 pub fn config(&self) -> &Config {
516 &self.config
517 }
518
519 #[must_use]
521 pub fn graph_model(&self) -> crate::config::GraphModel {
522 self.config.graph_model
523 }
524
525 #[must_use]
527 pub fn memory_limit(&self) -> Option<usize> {
528 self.config.memory_limit
529 }
530
531 #[must_use]
535 pub fn store(&self) -> &Arc<LpgStore> {
536 &self.store
537 }
538
539 #[must_use]
541 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
542 &self.buffer_manager
543 }
544
545 pub fn close(&self) -> Result<()> {
555 let mut is_open = self.is_open.write();
556 if !*is_open {
557 return Ok(());
558 }
559
560 #[cfg(feature = "wal")]
562 if let Some(ref wal) = self.wal {
563 let epoch = self.store.current_epoch();
564
565 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
567 self.tx_manager.begin()
569 });
570
571 wal.log(&WalRecord::TxCommit {
573 tx_id: checkpoint_tx,
574 })?;
575
576 wal.checkpoint(checkpoint_tx, epoch)?;
578 wal.sync()?;
579 }
580
581 *is_open = false;
582 Ok(())
583 }
584
585 #[cfg(feature = "wal")]
587 #[must_use]
588 pub fn wal(&self) -> Option<&Arc<WalManager>> {
589 self.wal.as_ref()
590 }
591
592 #[cfg(feature = "wal")]
594 fn log_wal(&self, record: &WalRecord) -> Result<()> {
595 if let Some(ref wal) = self.wal {
596 wal.log(record)?;
597 }
598 Ok(())
599 }
600
601 #[must_use]
603 pub fn node_count(&self) -> usize {
604 self.store.node_count()
605 }
606
607 #[must_use]
609 pub fn edge_count(&self) -> usize {
610 self.store.edge_count()
611 }
612
613 #[must_use]
615 pub fn label_count(&self) -> usize {
616 self.store.label_count()
617 }
618
619 #[must_use]
621 pub fn property_key_count(&self) -> usize {
622 self.store.property_key_count()
623 }
624
625 #[must_use]
627 pub fn edge_type_count(&self) -> usize {
628 self.store.edge_type_count()
629 }
630
631 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
648 let id = self.store.create_node(labels);
649
650 #[cfg(feature = "wal")]
652 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
653 id,
654 labels: labels.iter().map(|s| (*s).to_string()).collect(),
655 }) {
656 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
657 }
658
659 id
660 }
661
662 pub fn create_node_with_props(
666 &self,
667 labels: &[&str],
668 properties: impl IntoIterator<
669 Item = (
670 impl Into<grafeo_common::types::PropertyKey>,
671 impl Into<grafeo_common::types::Value>,
672 ),
673 >,
674 ) -> grafeo_common::types::NodeId {
675 let props: Vec<(
677 grafeo_common::types::PropertyKey,
678 grafeo_common::types::Value,
679 )> = properties
680 .into_iter()
681 .map(|(k, v)| (k.into(), v.into()))
682 .collect();
683
684 let id = self
685 .store
686 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
687
688 #[cfg(feature = "wal")]
690 {
691 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
692 id,
693 labels: labels.iter().map(|s| (*s).to_string()).collect(),
694 }) {
695 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
696 }
697
698 for (key, value) in props {
700 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
701 id,
702 key: key.to_string(),
703 value,
704 }) {
705 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
706 }
707 }
708 }
709
710 id
711 }
712
713 #[must_use]
715 pub fn get_node(
716 &self,
717 id: grafeo_common::types::NodeId,
718 ) -> Option<grafeo_core::graph::lpg::Node> {
719 self.store.get_node(id)
720 }
721
722 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
726 #[cfg(feature = "vector-index")]
728 let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
729 .store
730 .get_node(id)
731 .map(|node| {
732 let mut indexes = Vec::new();
733 for label in &node.labels {
734 let prefix = format!("{}:", label.as_str());
735 for (key, index) in self.store.vector_index_entries() {
736 if key.starts_with(&prefix) {
737 indexes.push(index);
738 }
739 }
740 }
741 indexes
742 })
743 .unwrap_or_default();
744
745 let result = self.store.delete_node(id);
746
747 #[cfg(feature = "vector-index")]
749 if result {
750 for index in indexes_to_clean {
751 index.remove(id);
752 }
753 }
754
755 #[cfg(feature = "wal")]
756 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
757 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
758 }
759
760 result
761 }
762
763 pub fn set_node_property(
767 &self,
768 id: grafeo_common::types::NodeId,
769 key: &str,
770 value: grafeo_common::types::Value,
771 ) {
772 #[cfg(feature = "vector-index")]
774 let vector_data = match &value {
775 grafeo_common::types::Value::Vector(v) => Some(v.clone()),
776 _ => None,
777 };
778
779 #[cfg(feature = "wal")]
781 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
782 id,
783 key: key.to_string(),
784 value: value.clone(),
785 }) {
786 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
787 }
788
789 self.store.set_node_property(id, key, value);
790
791 #[cfg(feature = "vector-index")]
793 if let Some(vec) = vector_data
794 && let Some(node) = self.store.get_node(id)
795 {
796 for label in &node.labels {
797 if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
798 index.insert(id, &vec);
799 }
800 }
801 }
802 }
803
804 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
822 let result = self.store.add_label(id, label);
823
824 #[cfg(feature = "wal")]
825 if result {
826 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
828 id,
829 label: label.to_string(),
830 }) {
831 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
832 }
833 }
834
835 #[cfg(feature = "vector-index")]
837 if result {
838 let prefix = format!("{label}:");
839 for (key, index) in self.store.vector_index_entries() {
840 if let Some(property) = key.strip_prefix(&prefix)
841 && let Some(node) = self.store.get_node(id)
842 {
843 let prop_key = grafeo_common::types::PropertyKey::new(property);
844 if let Some(grafeo_common::types::Value::Vector(v)) =
845 node.properties.get(&prop_key)
846 {
847 index.insert(id, v);
848 }
849 }
850 }
851 }
852
853 result
854 }
855
856 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
874 let result = self.store.remove_label(id, label);
875
876 #[cfg(feature = "wal")]
877 if result {
878 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
880 id,
881 label: label.to_string(),
882 }) {
883 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
884 }
885 }
886
887 result
888 }
889
890 #[must_use]
907 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
908 self.store
909 .get_node(id)
910 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
911 }
912
913 pub fn create_edge(
933 &self,
934 src: grafeo_common::types::NodeId,
935 dst: grafeo_common::types::NodeId,
936 edge_type: &str,
937 ) -> grafeo_common::types::EdgeId {
938 let id = self.store.create_edge(src, dst, edge_type);
939
940 #[cfg(feature = "wal")]
942 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
943 id,
944 src,
945 dst,
946 edge_type: edge_type.to_string(),
947 }) {
948 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
949 }
950
951 id
952 }
953
954 pub fn create_edge_with_props(
958 &self,
959 src: grafeo_common::types::NodeId,
960 dst: grafeo_common::types::NodeId,
961 edge_type: &str,
962 properties: impl IntoIterator<
963 Item = (
964 impl Into<grafeo_common::types::PropertyKey>,
965 impl Into<grafeo_common::types::Value>,
966 ),
967 >,
968 ) -> grafeo_common::types::EdgeId {
969 let props: Vec<(
971 grafeo_common::types::PropertyKey,
972 grafeo_common::types::Value,
973 )> = properties
974 .into_iter()
975 .map(|(k, v)| (k.into(), v.into()))
976 .collect();
977
978 let id = self.store.create_edge_with_props(
979 src,
980 dst,
981 edge_type,
982 props.iter().map(|(k, v)| (k.clone(), v.clone())),
983 );
984
985 #[cfg(feature = "wal")]
987 {
988 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
989 id,
990 src,
991 dst,
992 edge_type: edge_type.to_string(),
993 }) {
994 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
995 }
996
997 for (key, value) in props {
999 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1000 id,
1001 key: key.to_string(),
1002 value,
1003 }) {
1004 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1005 }
1006 }
1007 }
1008
1009 id
1010 }
1011
1012 #[must_use]
1014 pub fn get_edge(
1015 &self,
1016 id: grafeo_common::types::EdgeId,
1017 ) -> Option<grafeo_core::graph::lpg::Edge> {
1018 self.store.get_edge(id)
1019 }
1020
1021 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
1025 let result = self.store.delete_edge(id);
1026
1027 #[cfg(feature = "wal")]
1028 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
1029 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
1030 }
1031
1032 result
1033 }
1034
1035 pub fn set_edge_property(
1039 &self,
1040 id: grafeo_common::types::EdgeId,
1041 key: &str,
1042 value: grafeo_common::types::Value,
1043 ) {
1044 #[cfg(feature = "wal")]
1046 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1047 id,
1048 key: key.to_string(),
1049 value: value.clone(),
1050 }) {
1051 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1052 }
1053 self.store.set_edge_property(id, key, value);
1054 }
1055
1056 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
1060 self.store.remove_node_property(id, key).is_some()
1062 }
1063
1064 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
1068 self.store.remove_edge_property(id, key).is_some()
1070 }
1071
1072 pub fn create_property_index(&self, property: &str) {
1092 self.store.create_property_index(property);
1093 }
1094
1095 pub fn create_vector_index(
1115 &self,
1116 label: &str,
1117 property: &str,
1118 dimensions: Option<usize>,
1119 metric: Option<&str>,
1120 m: Option<usize>,
1121 ef_construction: Option<usize>,
1122 ) -> Result<()> {
1123 use grafeo_common::types::{PropertyKey, Value};
1124 use grafeo_core::index::vector::DistanceMetric;
1125
1126 let metric = match metric {
1127 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1128 grafeo_common::utils::error::Error::Internal(format!(
1129 "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
1130 m
1131 ))
1132 })?,
1133 None => DistanceMetric::Cosine,
1134 };
1135
1136 let prop_key = PropertyKey::new(property);
1138 let mut found_dims: Option<usize> = dimensions;
1139 let mut vector_count = 0usize;
1140
1141 #[cfg(feature = "vector-index")]
1142 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1143
1144 for node in self.store.nodes_with_label(label) {
1145 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1146 if let Some(expected) = found_dims {
1147 if v.len() != expected {
1148 return Err(grafeo_common::utils::error::Error::Internal(format!(
1149 "Vector dimension mismatch: expected {}, found {} on node {}",
1150 expected,
1151 v.len(),
1152 node.id.0
1153 )));
1154 }
1155 } else {
1156 found_dims = Some(v.len());
1157 }
1158 vector_count += 1;
1159 #[cfg(feature = "vector-index")]
1160 vectors.push((node.id, v.to_vec()));
1161 }
1162 }
1163
1164 if vector_count == 0 {
1165 return Err(grafeo_common::utils::error::Error::Internal(format!(
1166 "No vector properties found on :{label}({property})"
1167 )));
1168 }
1169
1170 let dims = found_dims.unwrap_or(0);
1171
1172 #[cfg(feature = "vector-index")]
1174 {
1175 use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1176
1177 let mut config = HnswConfig::new(dims, metric);
1178 if let Some(m_val) = m {
1179 config = config.with_m(m_val);
1180 }
1181 if let Some(ef_c) = ef_construction {
1182 config = config.with_ef_construction(ef_c);
1183 }
1184
1185 let index = HnswIndex::with_capacity(config, vectors.len());
1186 for (node_id, vec) in &vectors {
1187 index.insert(*node_id, vec);
1188 }
1189
1190 self.store
1191 .add_vector_index(label, property, Arc::new(index));
1192 }
1193
1194 let _ = (m, ef_construction);
1196
1197 tracing::info!(
1198 "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1199 metric_name = metric.name()
1200 );
1201
1202 Ok(())
1203 }
1204
1205 #[cfg(feature = "vector-index")]
1213 pub fn drop_vector_index(&self, label: &str, property: &str) -> bool {
1214 let removed = self.store.remove_vector_index(label, property);
1215 if removed {
1216 tracing::info!("Vector index dropped: :{label}({property})");
1217 }
1218 removed
1219 }
1220
1221 #[cfg(feature = "vector-index")]
1232 pub fn rebuild_vector_index(&self, label: &str, property: &str) -> Result<()> {
1233 let config = self
1234 .store
1235 .get_vector_index(label, property)
1236 .map(|idx| idx.config().clone())
1237 .ok_or_else(|| {
1238 grafeo_common::utils::error::Error::Internal(format!(
1239 "No vector index found for :{label}({property}). Cannot rebuild."
1240 ))
1241 })?;
1242
1243 self.store.remove_vector_index(label, property);
1244
1245 self.create_vector_index(
1246 label,
1247 property,
1248 Some(config.dimensions),
1249 Some(config.metric.name()),
1250 Some(config.m),
1251 Some(config.ef_construction),
1252 )
1253 }
1254
1255 #[cfg(feature = "vector-index")]
1261 fn compute_filter_allowlist(
1262 &self,
1263 label: &str,
1264 filters: Option<&std::collections::HashMap<String, Value>>,
1265 ) -> Option<std::collections::HashSet<NodeId>> {
1266 let filters = filters.filter(|f| !f.is_empty())?;
1267
1268 let label_nodes: std::collections::HashSet<NodeId> =
1270 self.store.nodes_by_label(label).into_iter().collect();
1271
1272 let mut allowlist = label_nodes;
1273
1274 for (key, value) in filters {
1275 let matching: std::collections::HashSet<NodeId> = self
1276 .store
1277 .find_nodes_by_property(key, value)
1278 .into_iter()
1279 .collect();
1280 allowlist = allowlist.intersection(&matching).copied().collect();
1281
1282 if allowlist.is_empty() {
1284 return Some(allowlist);
1285 }
1286 }
1287
1288 Some(allowlist)
1289 }
1290
1291 #[cfg(feature = "vector-index")]
1309 pub fn vector_search(
1310 &self,
1311 label: &str,
1312 property: &str,
1313 query: &[f32],
1314 k: usize,
1315 ef: Option<usize>,
1316 filters: Option<&std::collections::HashMap<String, Value>>,
1317 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1318 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1319 grafeo_common::utils::error::Error::Internal(format!(
1320 "No vector index found for :{label}({property}). Call create_vector_index() first."
1321 ))
1322 })?;
1323
1324 let results = match self.compute_filter_allowlist(label, filters) {
1325 Some(allowlist) => match ef {
1326 Some(ef_val) => index.search_with_ef_and_filter(query, k, ef_val, &allowlist),
1327 None => index.search_with_filter(query, k, &allowlist),
1328 },
1329 None => match ef {
1330 Some(ef_val) => index.search_with_ef(query, k, ef_val),
1331 None => index.search(query, k),
1332 },
1333 };
1334
1335 Ok(results)
1336 }
1337
1338 pub fn batch_create_nodes(
1354 &self,
1355 label: &str,
1356 property: &str,
1357 vectors: Vec<Vec<f32>>,
1358 ) -> Vec<grafeo_common::types::NodeId> {
1359 use grafeo_common::types::{PropertyKey, Value};
1360
1361 let prop_key = PropertyKey::new(property);
1362 let labels: &[&str] = &[label];
1363
1364 let ids: Vec<grafeo_common::types::NodeId> = vectors
1365 .into_iter()
1366 .map(|vec| {
1367 let value = Value::Vector(vec.into());
1368 let id = self.store.create_node_with_props(
1369 labels,
1370 std::iter::once((prop_key.clone(), value.clone())),
1371 );
1372
1373 #[cfg(feature = "wal")]
1375 {
1376 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1377 id,
1378 labels: labels.iter().map(|s| (*s).to_string()).collect(),
1379 }) {
1380 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1381 }
1382 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1383 id,
1384 key: property.to_string(),
1385 value,
1386 }) {
1387 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1388 }
1389 }
1390
1391 id
1392 })
1393 .collect();
1394
1395 #[cfg(feature = "vector-index")]
1397 if let Some(index) = self.store.get_vector_index(label, property) {
1398 for &id in &ids {
1399 if let Some(node) = self.store.get_node(id) {
1400 let pk = grafeo_common::types::PropertyKey::new(property);
1401 if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
1402 index.insert(id, v);
1403 }
1404 }
1405 }
1406 }
1407
1408 ids
1409 }
1410
1411 #[cfg(feature = "vector-index")]
1424 pub fn batch_vector_search(
1425 &self,
1426 label: &str,
1427 property: &str,
1428 queries: &[Vec<f32>],
1429 k: usize,
1430 ef: Option<usize>,
1431 filters: Option<&std::collections::HashMap<String, Value>>,
1432 ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1433 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1434 grafeo_common::utils::error::Error::Internal(format!(
1435 "No vector index found for :{label}({property}). Call create_vector_index() first."
1436 ))
1437 })?;
1438
1439 let results = match self.compute_filter_allowlist(label, filters) {
1440 Some(allowlist) => match ef {
1441 Some(ef_val) => {
1442 index.batch_search_with_ef_and_filter(queries, k, ef_val, &allowlist)
1443 }
1444 None => index.batch_search_with_filter(queries, k, &allowlist),
1445 },
1446 None => match ef {
1447 Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val),
1448 None => index.batch_search(queries, k),
1449 },
1450 };
1451
1452 Ok(results)
1453 }
1454
1455 #[cfg(feature = "vector-index")]
1478 #[allow(clippy::too_many_arguments)]
1479 pub fn mmr_search(
1480 &self,
1481 label: &str,
1482 property: &str,
1483 query: &[f32],
1484 k: usize,
1485 fetch_k: Option<usize>,
1486 lambda: Option<f32>,
1487 ef: Option<usize>,
1488 filters: Option<&std::collections::HashMap<String, Value>>,
1489 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1490 use grafeo_core::index::vector::mmr_select;
1491
1492 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1493 grafeo_common::utils::error::Error::Internal(format!(
1494 "No vector index found for :{label}({property}). Call create_vector_index() first."
1495 ))
1496 })?;
1497
1498 let fetch_k = fetch_k.unwrap_or(k.saturating_mul(4).max(k));
1499 let lambda = lambda.unwrap_or(0.5);
1500
1501 let initial_results = match self.compute_filter_allowlist(label, filters) {
1503 Some(allowlist) => match ef {
1504 Some(ef_val) => index.search_with_ef_and_filter(query, fetch_k, ef_val, &allowlist),
1505 None => index.search_with_filter(query, fetch_k, &allowlist),
1506 },
1507 None => match ef {
1508 Some(ef_val) => index.search_with_ef(query, fetch_k, ef_val),
1509 None => index.search(query, fetch_k),
1510 },
1511 };
1512
1513 if initial_results.is_empty() {
1514 return Ok(Vec::new());
1515 }
1516
1517 let candidates: Vec<(grafeo_common::types::NodeId, f32, std::sync::Arc<[f32]>)> =
1519 initial_results
1520 .into_iter()
1521 .filter_map(|(id, dist)| index.get(id).map(|vec| (id, dist, vec)))
1522 .collect();
1523
1524 let candidate_refs: Vec<(grafeo_common::types::NodeId, f32, &[f32])> = candidates
1526 .iter()
1527 .map(|(id, dist, vec)| (*id, *dist, vec.as_ref()))
1528 .collect();
1529
1530 let metric = index.config().metric;
1532 Ok(mmr_select(query, &candidate_refs, k, lambda, metric))
1533 }
1534
1535 pub fn drop_property_index(&self, property: &str) -> bool {
1539 self.store.drop_property_index(property)
1540 }
1541
1542 #[must_use]
1544 pub fn has_property_index(&self, property: &str) -> bool {
1545 self.store.has_property_index(property)
1546 }
1547
1548 #[must_use]
1563 pub fn find_nodes_by_property(
1564 &self,
1565 property: &str,
1566 value: &grafeo_common::types::Value,
1567 ) -> Vec<grafeo_common::types::NodeId> {
1568 self.store.find_nodes_by_property(property, value)
1569 }
1570
1571 #[must_use]
1579 pub fn is_persistent(&self) -> bool {
1580 self.config.path.is_some()
1581 }
1582
1583 #[must_use]
1587 pub fn path(&self) -> Option<&Path> {
1588 self.config.path.as_deref()
1589 }
1590
1591 #[must_use]
1595 pub fn info(&self) -> crate::admin::DatabaseInfo {
1596 crate::admin::DatabaseInfo {
1597 mode: crate::admin::DatabaseMode::Lpg,
1598 node_count: self.store.node_count(),
1599 edge_count: self.store.edge_count(),
1600 is_persistent: self.is_persistent(),
1601 path: self.config.path.clone(),
1602 wal_enabled: self.config.wal_enabled,
1603 version: env!("CARGO_PKG_VERSION").to_string(),
1604 }
1605 }
1606
1607 #[must_use]
1611 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1612 #[cfg(feature = "wal")]
1613 let disk_bytes = self.config.path.as_ref().and_then(|p| {
1614 if p.exists() {
1615 Self::calculate_disk_usage(p).ok()
1616 } else {
1617 None
1618 }
1619 });
1620 #[cfg(not(feature = "wal"))]
1621 let disk_bytes: Option<usize> = None;
1622
1623 crate::admin::DatabaseStats {
1624 node_count: self.store.node_count(),
1625 edge_count: self.store.edge_count(),
1626 label_count: self.store.label_count(),
1627 edge_type_count: self.store.edge_type_count(),
1628 property_key_count: self.store.property_key_count(),
1629 index_count: 0, memory_bytes: self.buffer_manager.allocated(),
1631 disk_bytes,
1632 }
1633 }
1634
1635 #[cfg(feature = "wal")]
1637 fn calculate_disk_usage(path: &Path) -> Result<usize> {
1638 let mut total = 0usize;
1639 if path.is_dir() {
1640 for entry in std::fs::read_dir(path)? {
1641 let entry = entry?;
1642 let metadata = entry.metadata()?;
1643 if metadata.is_file() {
1644 total += metadata.len() as usize;
1645 } else if metadata.is_dir() {
1646 total += Self::calculate_disk_usage(&entry.path())?;
1647 }
1648 }
1649 }
1650 Ok(total)
1651 }
1652
1653 #[must_use]
1658 pub fn schema(&self) -> crate::admin::SchemaInfo {
1659 let labels = self
1660 .store
1661 .all_labels()
1662 .into_iter()
1663 .map(|name| crate::admin::LabelInfo {
1664 name: name.clone(),
1665 count: self.store.nodes_with_label(&name).count(),
1666 })
1667 .collect();
1668
1669 let edge_types = self
1670 .store
1671 .all_edge_types()
1672 .into_iter()
1673 .map(|name| crate::admin::EdgeTypeInfo {
1674 name: name.clone(),
1675 count: self.store.edges_with_type(&name).count(),
1676 })
1677 .collect();
1678
1679 let property_keys = self.store.all_property_keys();
1680
1681 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1682 labels,
1683 edge_types,
1684 property_keys,
1685 })
1686 }
1687
1688 #[cfg(feature = "rdf")]
1692 #[must_use]
1693 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1694 let stats = self.rdf_store.stats();
1695
1696 let predicates = self
1697 .rdf_store
1698 .predicates()
1699 .into_iter()
1700 .map(|predicate| {
1701 let count = self.rdf_store.triples_with_predicate(&predicate).len();
1702 crate::admin::PredicateInfo {
1703 iri: predicate.to_string(),
1704 count,
1705 }
1706 })
1707 .collect();
1708
1709 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1710 predicates,
1711 named_graphs: Vec::new(), subject_count: stats.subject_count,
1713 object_count: stats.object_count,
1714 })
1715 }
1716
1717 #[must_use]
1725 pub fn validate(&self) -> crate::admin::ValidationResult {
1726 let mut result = crate::admin::ValidationResult::default();
1727
1728 for edge in self.store.all_edges() {
1730 if self.store.get_node(edge.src).is_none() {
1731 result.errors.push(crate::admin::ValidationError {
1732 code: "DANGLING_SRC".to_string(),
1733 message: format!(
1734 "Edge {} references non-existent source node {}",
1735 edge.id.0, edge.src.0
1736 ),
1737 context: Some(format!("edge:{}", edge.id.0)),
1738 });
1739 }
1740 if self.store.get_node(edge.dst).is_none() {
1741 result.errors.push(crate::admin::ValidationError {
1742 code: "DANGLING_DST".to_string(),
1743 message: format!(
1744 "Edge {} references non-existent destination node {}",
1745 edge.id.0, edge.dst.0
1746 ),
1747 context: Some(format!("edge:{}", edge.id.0)),
1748 });
1749 }
1750 }
1751
1752 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1754 result.warnings.push(crate::admin::ValidationWarning {
1755 code: "NO_EDGES".to_string(),
1756 message: "Database has nodes but no edges".to_string(),
1757 context: None,
1758 });
1759 }
1760
1761 result
1762 }
1763
1764 #[must_use]
1768 pub fn wal_status(&self) -> crate::admin::WalStatus {
1769 #[cfg(feature = "wal")]
1770 if let Some(ref wal) = self.wal {
1771 return crate::admin::WalStatus {
1772 enabled: true,
1773 path: self.config.path.as_ref().map(|p| p.join("wal")),
1774 size_bytes: wal.size_bytes(),
1775 record_count: wal.record_count() as usize,
1776 last_checkpoint: wal.last_checkpoint_timestamp(),
1777 current_epoch: self.store.current_epoch().as_u64(),
1778 };
1779 }
1780
1781 crate::admin::WalStatus {
1782 enabled: false,
1783 path: None,
1784 size_bytes: 0,
1785 record_count: 0,
1786 last_checkpoint: None,
1787 current_epoch: self.store.current_epoch().as_u64(),
1788 }
1789 }
1790
1791 pub fn wal_checkpoint(&self) -> Result<()> {
1799 #[cfg(feature = "wal")]
1800 if let Some(ref wal) = self.wal {
1801 let epoch = self.store.current_epoch();
1802 let tx_id = self
1803 .tx_manager
1804 .last_assigned_tx_id()
1805 .unwrap_or_else(|| self.tx_manager.begin());
1806 wal.checkpoint(tx_id, epoch)?;
1807 wal.sync()?;
1808 }
1809 Ok(())
1810 }
1811
1812 #[cfg(feature = "wal")]
1829 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1830 let path = path.as_ref();
1831
1832 let target_config = Config::persistent(path);
1834 let target = Self::with_config(target_config)?;
1835
1836 for node in self.store.all_nodes() {
1838 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1839 target.store.create_node_with_id(node.id, &label_refs);
1840
1841 target.log_wal(&WalRecord::CreateNode {
1843 id: node.id,
1844 labels: node.labels.iter().map(|s| s.to_string()).collect(),
1845 })?;
1846
1847 for (key, value) in node.properties {
1849 target
1850 .store
1851 .set_node_property(node.id, key.as_str(), value.clone());
1852 target.log_wal(&WalRecord::SetNodeProperty {
1853 id: node.id,
1854 key: key.to_string(),
1855 value,
1856 })?;
1857 }
1858 }
1859
1860 for edge in self.store.all_edges() {
1862 target
1863 .store
1864 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1865
1866 target.log_wal(&WalRecord::CreateEdge {
1868 id: edge.id,
1869 src: edge.src,
1870 dst: edge.dst,
1871 edge_type: edge.edge_type.to_string(),
1872 })?;
1873
1874 for (key, value) in edge.properties {
1876 target
1877 .store
1878 .set_edge_property(edge.id, key.as_str(), value.clone());
1879 target.log_wal(&WalRecord::SetEdgeProperty {
1880 id: edge.id,
1881 key: key.to_string(),
1882 value,
1883 })?;
1884 }
1885 }
1886
1887 target.close()?;
1889
1890 Ok(())
1891 }
1892
1893 pub fn to_memory(&self) -> Result<Self> {
1904 let config = Config::in_memory();
1905 let target = Self::with_config(config)?;
1906
1907 for node in self.store.all_nodes() {
1909 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1910 target.store.create_node_with_id(node.id, &label_refs);
1911
1912 for (key, value) in node.properties {
1914 target.store.set_node_property(node.id, key.as_str(), value);
1915 }
1916 }
1917
1918 for edge in self.store.all_edges() {
1920 target
1921 .store
1922 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1923
1924 for (key, value) in edge.properties {
1926 target.store.set_edge_property(edge.id, key.as_str(), value);
1927 }
1928 }
1929
1930 Ok(target)
1931 }
1932
1933 #[cfg(feature = "wal")]
1942 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1943 let source = Self::open(path)?;
1945
1946 let target = source.to_memory()?;
1948
1949 source.close()?;
1951
1952 Ok(target)
1953 }
1954
1955 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
1968 let nodes: Vec<SnapshotNode> = self
1969 .store
1970 .all_nodes()
1971 .map(|n| SnapshotNode {
1972 id: n.id,
1973 labels: n.labels.iter().map(|l| l.to_string()).collect(),
1974 properties: n
1975 .properties
1976 .into_iter()
1977 .map(|(k, v)| (k.to_string(), v))
1978 .collect(),
1979 })
1980 .collect();
1981
1982 let edges: Vec<SnapshotEdge> = self
1983 .store
1984 .all_edges()
1985 .map(|e| SnapshotEdge {
1986 id: e.id,
1987 src: e.src,
1988 dst: e.dst,
1989 edge_type: e.edge_type.to_string(),
1990 properties: e
1991 .properties
1992 .into_iter()
1993 .map(|(k, v)| (k.to_string(), v))
1994 .collect(),
1995 })
1996 .collect();
1997
1998 let snapshot = Snapshot {
1999 version: 1,
2000 nodes,
2001 edges,
2002 };
2003
2004 let config = bincode::config::standard();
2005 bincode::serde::encode_to_vec(&snapshot, config)
2006 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
2007 }
2008
2009 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
2017 let config = bincode::config::standard();
2018 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
2019 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
2020
2021 if snapshot.version != 1 {
2022 return Err(Error::Internal(format!(
2023 "unsupported snapshot version: {}",
2024 snapshot.version
2025 )));
2026 }
2027
2028 let db = Self::new_in_memory();
2029
2030 for node in snapshot.nodes {
2031 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
2032 db.store.create_node_with_id(node.id, &label_refs);
2033 for (key, value) in node.properties {
2034 db.store.set_node_property(node.id, &key, value);
2035 }
2036 }
2037
2038 for edge in snapshot.edges {
2039 db.store
2040 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2041 for (key, value) in edge.properties {
2042 db.store.set_edge_property(edge.id, &key, value);
2043 }
2044 }
2045
2046 Ok(db)
2047 }
2048
2049 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
2057 self.store.all_nodes()
2058 }
2059
2060 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
2064 self.store.all_edges()
2065 }
2066}
2067
2068#[derive(serde::Serialize, serde::Deserialize)]
2070struct Snapshot {
2071 version: u8,
2072 nodes: Vec<SnapshotNode>,
2073 edges: Vec<SnapshotEdge>,
2074}
2075
2076#[derive(serde::Serialize, serde::Deserialize)]
2077struct SnapshotNode {
2078 id: NodeId,
2079 labels: Vec<String>,
2080 properties: Vec<(String, Value)>,
2081}
2082
2083#[derive(serde::Serialize, serde::Deserialize)]
2084struct SnapshotEdge {
2085 id: EdgeId,
2086 src: NodeId,
2087 dst: NodeId,
2088 edge_type: String,
2089 properties: Vec<(String, Value)>,
2090}
2091
2092impl Drop for GrafeoDB {
2093 fn drop(&mut self) {
2094 if let Err(e) = self.close() {
2095 tracing::error!("Error closing database: {}", e);
2096 }
2097 }
2098}
2099
2100impl crate::admin::AdminService for GrafeoDB {
2101 fn info(&self) -> crate::admin::DatabaseInfo {
2102 self.info()
2103 }
2104
2105 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2106 self.detailed_stats()
2107 }
2108
2109 fn schema(&self) -> crate::admin::SchemaInfo {
2110 self.schema()
2111 }
2112
2113 fn validate(&self) -> crate::admin::ValidationResult {
2114 self.validate()
2115 }
2116
2117 fn wal_status(&self) -> crate::admin::WalStatus {
2118 self.wal_status()
2119 }
2120
2121 fn wal_checkpoint(&self) -> Result<()> {
2122 self.wal_checkpoint()
2123 }
2124}
2125
2126#[derive(Debug)]
2152pub struct QueryResult {
2153 pub columns: Vec<String>,
2155 pub column_types: Vec<grafeo_common::types::LogicalType>,
2157 pub rows: Vec<Vec<grafeo_common::types::Value>>,
2159 pub execution_time_ms: Option<f64>,
2161 pub rows_scanned: Option<u64>,
2163}
2164
2165impl QueryResult {
2166 #[must_use]
2168 pub fn new(columns: Vec<String>) -> Self {
2169 let len = columns.len();
2170 Self {
2171 columns,
2172 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2173 rows: Vec::new(),
2174 execution_time_ms: None,
2175 rows_scanned: None,
2176 }
2177 }
2178
2179 #[must_use]
2181 pub fn with_types(
2182 columns: Vec<String>,
2183 column_types: Vec<grafeo_common::types::LogicalType>,
2184 ) -> Self {
2185 Self {
2186 columns,
2187 column_types,
2188 rows: Vec::new(),
2189 execution_time_ms: None,
2190 rows_scanned: None,
2191 }
2192 }
2193
2194 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2196 self.execution_time_ms = Some(execution_time_ms);
2197 self.rows_scanned = Some(rows_scanned);
2198 self
2199 }
2200
2201 #[must_use]
2203 pub fn execution_time_ms(&self) -> Option<f64> {
2204 self.execution_time_ms
2205 }
2206
2207 #[must_use]
2209 pub fn rows_scanned(&self) -> Option<u64> {
2210 self.rows_scanned
2211 }
2212
2213 #[must_use]
2215 pub fn row_count(&self) -> usize {
2216 self.rows.len()
2217 }
2218
2219 #[must_use]
2221 pub fn column_count(&self) -> usize {
2222 self.columns.len()
2223 }
2224
2225 #[must_use]
2227 pub fn is_empty(&self) -> bool {
2228 self.rows.is_empty()
2229 }
2230
2231 pub fn scalar<T: FromValue>(&self) -> Result<T> {
2240 if self.rows.len() != 1 || self.columns.len() != 1 {
2241 return Err(grafeo_common::utils::error::Error::InvalidValue(
2242 "Expected single value".to_string(),
2243 ));
2244 }
2245 T::from_value(&self.rows[0][0])
2246 }
2247
2248 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2250 self.rows.iter()
2251 }
2252}
2253
2254pub trait FromValue: Sized {
2259 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2261}
2262
2263impl FromValue for i64 {
2264 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2265 value
2266 .as_int64()
2267 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2268 expected: "INT64".to_string(),
2269 found: value.type_name().to_string(),
2270 })
2271 }
2272}
2273
2274impl FromValue for f64 {
2275 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2276 value
2277 .as_float64()
2278 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2279 expected: "FLOAT64".to_string(),
2280 found: value.type_name().to_string(),
2281 })
2282 }
2283}
2284
2285impl FromValue for String {
2286 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2287 value.as_str().map(String::from).ok_or_else(|| {
2288 grafeo_common::utils::error::Error::TypeMismatch {
2289 expected: "STRING".to_string(),
2290 found: value.type_name().to_string(),
2291 }
2292 })
2293 }
2294}
2295
2296impl FromValue for bool {
2297 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2298 value
2299 .as_bool()
2300 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2301 expected: "BOOL".to_string(),
2302 found: value.type_name().to_string(),
2303 })
2304 }
2305}
2306
2307#[cfg(test)]
2308mod tests {
2309 use super::*;
2310
2311 #[test]
2312 fn test_create_in_memory_database() {
2313 let db = GrafeoDB::new_in_memory();
2314 assert_eq!(db.node_count(), 0);
2315 assert_eq!(db.edge_count(), 0);
2316 }
2317
2318 #[test]
2319 fn test_database_config() {
2320 let config = Config::in_memory().with_threads(4).with_query_logging();
2321
2322 let db = GrafeoDB::with_config(config).unwrap();
2323 assert_eq!(db.config().threads, 4);
2324 assert!(db.config().query_logging);
2325 }
2326
2327 #[test]
2328 fn test_database_session() {
2329 let db = GrafeoDB::new_in_memory();
2330 let _session = db.session();
2331 }
2333
2334 #[cfg(feature = "wal")]
2335 #[test]
2336 fn test_persistent_database_recovery() {
2337 use grafeo_common::types::Value;
2338 use tempfile::tempdir;
2339
2340 let dir = tempdir().unwrap();
2341 let db_path = dir.path().join("test_db");
2342
2343 {
2345 let db = GrafeoDB::open(&db_path).unwrap();
2346
2347 let alice = db.create_node(&["Person"]);
2348 db.set_node_property(alice, "name", Value::from("Alice"));
2349
2350 let bob = db.create_node(&["Person"]);
2351 db.set_node_property(bob, "name", Value::from("Bob"));
2352
2353 let _edge = db.create_edge(alice, bob, "KNOWS");
2354
2355 db.close().unwrap();
2357 }
2358
2359 {
2361 let db = GrafeoDB::open(&db_path).unwrap();
2362
2363 assert_eq!(db.node_count(), 2);
2364 assert_eq!(db.edge_count(), 1);
2365
2366 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2368 assert!(node0.is_some());
2369
2370 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2371 assert!(node1.is_some());
2372 }
2373 }
2374
2375 #[cfg(feature = "wal")]
2376 #[test]
2377 fn test_wal_logging() {
2378 use tempfile::tempdir;
2379
2380 let dir = tempdir().unwrap();
2381 let db_path = dir.path().join("wal_test_db");
2382
2383 let db = GrafeoDB::open(&db_path).unwrap();
2384
2385 let node = db.create_node(&["Test"]);
2387 db.delete_node(node);
2388
2389 if let Some(wal) = db.wal() {
2391 assert!(wal.record_count() > 0);
2392 }
2393
2394 db.close().unwrap();
2395 }
2396
2397 #[cfg(feature = "wal")]
2398 #[test]
2399 fn test_wal_recovery_multiple_sessions() {
2400 use grafeo_common::types::Value;
2402 use tempfile::tempdir;
2403
2404 let dir = tempdir().unwrap();
2405 let db_path = dir.path().join("multi_session_db");
2406
2407 {
2409 let db = GrafeoDB::open(&db_path).unwrap();
2410 let alice = db.create_node(&["Person"]);
2411 db.set_node_property(alice, "name", Value::from("Alice"));
2412 db.close().unwrap();
2413 }
2414
2415 {
2417 let db = GrafeoDB::open(&db_path).unwrap();
2418 assert_eq!(db.node_count(), 1); let bob = db.create_node(&["Person"]);
2420 db.set_node_property(bob, "name", Value::from("Bob"));
2421 db.close().unwrap();
2422 }
2423
2424 {
2426 let db = GrafeoDB::open(&db_path).unwrap();
2427 assert_eq!(db.node_count(), 2);
2428
2429 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2431 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2432
2433 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2434 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2435 }
2436 }
2437
2438 #[cfg(feature = "wal")]
2439 #[test]
2440 fn test_database_consistency_after_mutations() {
2441 use grafeo_common::types::Value;
2443 use tempfile::tempdir;
2444
2445 let dir = tempdir().unwrap();
2446 let db_path = dir.path().join("consistency_db");
2447
2448 {
2449 let db = GrafeoDB::open(&db_path).unwrap();
2450
2451 let a = db.create_node(&["Node"]);
2453 let b = db.create_node(&["Node"]);
2454 let c = db.create_node(&["Node"]);
2455
2456 let e1 = db.create_edge(a, b, "LINKS");
2458 let _e2 = db.create_edge(b, c, "LINKS");
2459
2460 db.delete_edge(e1);
2462 db.delete_node(b);
2463
2464 db.set_node_property(a, "value", Value::Int64(1));
2466 db.set_node_property(c, "value", Value::Int64(3));
2467
2468 db.close().unwrap();
2469 }
2470
2471 {
2473 let db = GrafeoDB::open(&db_path).unwrap();
2474
2475 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2479 assert!(node_a.is_some());
2480
2481 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2482 assert!(node_c.is_some());
2483
2484 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2486 assert!(node_b.is_none());
2487 }
2488 }
2489
2490 #[cfg(feature = "wal")]
2491 #[test]
2492 fn test_close_is_idempotent() {
2493 use tempfile::tempdir;
2495
2496 let dir = tempdir().unwrap();
2497 let db_path = dir.path().join("close_test_db");
2498
2499 let db = GrafeoDB::open(&db_path).unwrap();
2500 db.create_node(&["Test"]);
2501
2502 assert!(db.close().is_ok());
2504
2505 assert!(db.close().is_ok());
2507 }
2508
2509 #[test]
2510 fn test_query_result_has_metrics() {
2511 let db = GrafeoDB::new_in_memory();
2513 db.create_node(&["Person"]);
2514 db.create_node(&["Person"]);
2515
2516 #[cfg(feature = "gql")]
2517 {
2518 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2519
2520 assert!(result.execution_time_ms.is_some());
2522 assert!(result.rows_scanned.is_some());
2523 assert!(result.execution_time_ms.unwrap() >= 0.0);
2524 assert_eq!(result.rows_scanned.unwrap(), 2);
2525 }
2526 }
2527
2528 #[test]
2529 fn test_empty_query_result_metrics() {
2530 let db = GrafeoDB::new_in_memory();
2532 db.create_node(&["Person"]);
2533
2534 #[cfg(feature = "gql")]
2535 {
2536 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2538
2539 assert!(result.execution_time_ms.is_some());
2540 assert!(result.rows_scanned.is_some());
2541 assert_eq!(result.rows_scanned.unwrap(), 0);
2542 }
2543 }
2544}