1use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
11use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::lpg::LpgStore;
14#[cfg(feature = "rdf")]
15use grafeo_core::graph::rdf::RdfStore;
16
17use crate::config::Config;
18use crate::session::Session;
19use crate::transaction::TransactionManager;
20
21pub struct GrafeoDB {
44 config: Config,
46 store: Arc<LpgStore>,
48 #[cfg(feature = "rdf")]
50 rdf_store: Arc<RdfStore>,
51 tx_manager: Arc<TransactionManager>,
53 buffer_manager: Arc<BufferManager>,
55 wal: Option<Arc<WalManager>>,
57 is_open: RwLock<bool>,
59}
60
61impl GrafeoDB {
62 #[must_use]
78 pub fn new_in_memory() -> Self {
79 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
80 }
81
82 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
101 Self::with_config(Config::persistent(path.as_ref()))
102 }
103
104 pub fn with_config(config: Config) -> Result<Self> {
128 let store = Arc::new(LpgStore::new());
129 #[cfg(feature = "rdf")]
130 let rdf_store = Arc::new(RdfStore::new());
131 let tx_manager = Arc::new(TransactionManager::new());
132
133 let buffer_config = BufferManagerConfig {
135 budget: config.memory_limit.unwrap_or_else(|| {
136 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
137 }),
138 spill_path: config
139 .spill_path
140 .clone()
141 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
142 ..BufferManagerConfig::default()
143 };
144 let buffer_manager = BufferManager::new(buffer_config);
145
146 let wal = if config.wal_enabled {
148 if let Some(ref db_path) = config.path {
149 std::fs::create_dir_all(db_path)?;
151
152 let wal_path = db_path.join("wal");
153
154 if wal_path.exists() {
156 let recovery = WalRecovery::new(&wal_path);
157 let records = recovery.recover()?;
158 Self::apply_wal_records(&store, &records)?;
159 }
160
161 let wal_config = WalConfig::default();
163 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
164 Some(Arc::new(wal_manager))
165 } else {
166 None
167 }
168 } else {
169 None
170 };
171
172 Ok(Self {
173 config,
174 store,
175 #[cfg(feature = "rdf")]
176 rdf_store,
177 tx_manager,
178 buffer_manager,
179 wal,
180 is_open: RwLock::new(true),
181 })
182 }
183
184 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
186 for record in records {
187 match record {
188 WalRecord::CreateNode { id, labels } => {
189 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
190 store.create_node_with_id(*id, &label_refs);
191 }
192 WalRecord::DeleteNode { id } => {
193 store.delete_node(*id);
194 }
195 WalRecord::CreateEdge {
196 id,
197 src,
198 dst,
199 edge_type,
200 } => {
201 store.create_edge_with_id(*id, *src, *dst, edge_type);
202 }
203 WalRecord::DeleteEdge { id } => {
204 store.delete_edge(*id);
205 }
206 WalRecord::SetNodeProperty { id, key, value } => {
207 store.set_node_property(*id, key, value.clone());
208 }
209 WalRecord::SetEdgeProperty { id, key, value } => {
210 store.set_edge_property(*id, key, value.clone());
211 }
212 WalRecord::AddNodeLabel { id, label } => {
213 store.add_label(*id, label);
214 }
215 WalRecord::RemoveNodeLabel { id, label } => {
216 store.remove_label(*id, label);
217 }
218 WalRecord::TxCommit { .. }
219 | WalRecord::TxAbort { .. }
220 | WalRecord::Checkpoint { .. } => {
221 }
224 }
225 }
226 Ok(())
227 }
228
229 #[must_use]
248 pub fn session(&self) -> Session {
249 #[cfg(feature = "rdf")]
250 {
251 Session::with_rdf_store_and_adaptive(
252 Arc::clone(&self.store),
253 Arc::clone(&self.rdf_store),
254 Arc::clone(&self.tx_manager),
255 self.config.adaptive.clone(),
256 self.config.factorized_execution,
257 )
258 }
259 #[cfg(not(feature = "rdf"))]
260 {
261 Session::with_adaptive(
262 Arc::clone(&self.store),
263 Arc::clone(&self.tx_manager),
264 self.config.adaptive.clone(),
265 self.config.factorized_execution,
266 )
267 }
268 }
269
270 #[must_use]
272 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
273 &self.config.adaptive
274 }
275
276 pub fn execute(&self, query: &str) -> Result<QueryResult> {
286 let session = self.session();
287 session.execute(query)
288 }
289
290 pub fn execute_with_params(
296 &self,
297 query: &str,
298 params: std::collections::HashMap<String, grafeo_common::types::Value>,
299 ) -> Result<QueryResult> {
300 let session = self.session();
301 session.execute_with_params(query, params)
302 }
303
304 #[cfg(feature = "cypher")]
310 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
311 let session = self.session();
312 session.execute_cypher(query)
313 }
314
315 #[cfg(feature = "cypher")]
321 pub fn execute_cypher_with_params(
322 &self,
323 query: &str,
324 params: std::collections::HashMap<String, grafeo_common::types::Value>,
325 ) -> Result<QueryResult> {
326 use crate::query::processor::{QueryLanguage, QueryProcessor};
327
328 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
330 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
331 }
332
333 #[cfg(feature = "gremlin")]
339 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
340 let session = self.session();
341 session.execute_gremlin(query)
342 }
343
344 #[cfg(feature = "gremlin")]
350 pub fn execute_gremlin_with_params(
351 &self,
352 query: &str,
353 params: std::collections::HashMap<String, grafeo_common::types::Value>,
354 ) -> Result<QueryResult> {
355 let session = self.session();
356 session.execute_gremlin_with_params(query, params)
357 }
358
359 #[cfg(feature = "graphql")]
365 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
366 let session = self.session();
367 session.execute_graphql(query)
368 }
369
370 #[cfg(feature = "graphql")]
376 pub fn execute_graphql_with_params(
377 &self,
378 query: &str,
379 params: std::collections::HashMap<String, grafeo_common::types::Value>,
380 ) -> Result<QueryResult> {
381 let session = self.session();
382 session.execute_graphql_with_params(query, params)
383 }
384
385 #[cfg(all(feature = "sparql", feature = "rdf"))]
402 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
403 use crate::query::{
404 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
405 };
406
407 let logical_plan = sparql_translator::translate(query)?;
409
410 let optimizer = Optimizer::new();
412 let optimized_plan = optimizer.optimize(logical_plan)?;
413
414 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
416 let mut physical_plan = planner.plan(&optimized_plan)?;
417
418 let executor = Executor::with_columns(physical_plan.columns.clone());
420 executor.execute(physical_plan.operator.as_mut())
421 }
422
423 #[cfg(feature = "rdf")]
427 #[must_use]
428 pub fn rdf_store(&self) -> &Arc<RdfStore> {
429 &self.rdf_store
430 }
431
432 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
438 let result = self.execute(query)?;
439 result.scalar()
440 }
441
442 #[must_use]
444 pub fn config(&self) -> &Config {
445 &self.config
446 }
447
448 #[must_use]
452 pub fn store(&self) -> &Arc<LpgStore> {
453 &self.store
454 }
455
456 #[must_use]
458 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
459 &self.buffer_manager
460 }
461
462 pub fn close(&self) -> Result<()> {
472 let mut is_open = self.is_open.write();
473 if !*is_open {
474 return Ok(());
475 }
476
477 if let Some(ref wal) = self.wal {
479 let epoch = self.store.current_epoch();
480
481 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
483 self.tx_manager.begin()
485 });
486
487 wal.log(&WalRecord::TxCommit {
489 tx_id: checkpoint_tx,
490 })?;
491
492 wal.checkpoint(checkpoint_tx, epoch)?;
494 wal.sync()?;
495 }
496
497 *is_open = false;
498 Ok(())
499 }
500
501 #[must_use]
503 pub fn wal(&self) -> Option<&Arc<WalManager>> {
504 self.wal.as_ref()
505 }
506
507 fn log_wal(&self, record: &WalRecord) -> Result<()> {
509 if let Some(ref wal) = self.wal {
510 wal.log(record)?;
511 }
512 Ok(())
513 }
514
515 #[must_use]
517 pub fn node_count(&self) -> usize {
518 self.store.node_count()
519 }
520
521 #[must_use]
523 pub fn edge_count(&self) -> usize {
524 self.store.edge_count()
525 }
526
527 #[must_use]
529 pub fn label_count(&self) -> usize {
530 self.store.label_count()
531 }
532
533 #[must_use]
535 pub fn property_key_count(&self) -> usize {
536 self.store.property_key_count()
537 }
538
539 #[must_use]
541 pub fn edge_type_count(&self) -> usize {
542 self.store.edge_type_count()
543 }
544
545 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
562 let id = self.store.create_node(labels);
563
564 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
566 id,
567 labels: labels.iter().map(|s| s.to_string()).collect(),
568 }) {
569 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
570 }
571
572 id
573 }
574
575 pub fn create_node_with_props(
579 &self,
580 labels: &[&str],
581 properties: impl IntoIterator<
582 Item = (
583 impl Into<grafeo_common::types::PropertyKey>,
584 impl Into<grafeo_common::types::Value>,
585 ),
586 >,
587 ) -> grafeo_common::types::NodeId {
588 let props: Vec<(
590 grafeo_common::types::PropertyKey,
591 grafeo_common::types::Value,
592 )> = properties
593 .into_iter()
594 .map(|(k, v)| (k.into(), v.into()))
595 .collect();
596
597 let id = self
598 .store
599 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
600
601 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
603 id,
604 labels: labels.iter().map(|s| s.to_string()).collect(),
605 }) {
606 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
607 }
608
609 for (key, value) in props {
611 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
612 id,
613 key: key.to_string(),
614 value,
615 }) {
616 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
617 }
618 }
619
620 id
621 }
622
623 #[must_use]
625 pub fn get_node(
626 &self,
627 id: grafeo_common::types::NodeId,
628 ) -> Option<grafeo_core::graph::lpg::Node> {
629 self.store.get_node(id)
630 }
631
632 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
636 let result = self.store.delete_node(id);
637
638 if result {
639 if let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
640 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
641 }
642 }
643
644 result
645 }
646
647 pub fn set_node_property(
651 &self,
652 id: grafeo_common::types::NodeId,
653 key: &str,
654 value: grafeo_common::types::Value,
655 ) {
656 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
658 id,
659 key: key.to_string(),
660 value: value.clone(),
661 }) {
662 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
663 }
664
665 self.store.set_node_property(id, key, value);
666 }
667
668 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
686 let result = self.store.add_label(id, label);
687
688 if result {
689 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
691 id,
692 label: label.to_string(),
693 }) {
694 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
695 }
696 }
697
698 result
699 }
700
701 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
719 let result = self.store.remove_label(id, label);
720
721 if result {
722 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
724 id,
725 label: label.to_string(),
726 }) {
727 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
728 }
729 }
730
731 result
732 }
733
734 #[must_use]
751 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
752 self.store
753 .get_node(id)
754 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
755 }
756
757 pub fn create_edge(
777 &self,
778 src: grafeo_common::types::NodeId,
779 dst: grafeo_common::types::NodeId,
780 edge_type: &str,
781 ) -> grafeo_common::types::EdgeId {
782 let id = self.store.create_edge(src, dst, edge_type);
783
784 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
786 id,
787 src,
788 dst,
789 edge_type: edge_type.to_string(),
790 }) {
791 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
792 }
793
794 id
795 }
796
797 pub fn create_edge_with_props(
801 &self,
802 src: grafeo_common::types::NodeId,
803 dst: grafeo_common::types::NodeId,
804 edge_type: &str,
805 properties: impl IntoIterator<
806 Item = (
807 impl Into<grafeo_common::types::PropertyKey>,
808 impl Into<grafeo_common::types::Value>,
809 ),
810 >,
811 ) -> grafeo_common::types::EdgeId {
812 let props: Vec<(
814 grafeo_common::types::PropertyKey,
815 grafeo_common::types::Value,
816 )> = properties
817 .into_iter()
818 .map(|(k, v)| (k.into(), v.into()))
819 .collect();
820
821 let id = self.store.create_edge_with_props(
822 src,
823 dst,
824 edge_type,
825 props.iter().map(|(k, v)| (k.clone(), v.clone())),
826 );
827
828 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
830 id,
831 src,
832 dst,
833 edge_type: edge_type.to_string(),
834 }) {
835 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
836 }
837
838 for (key, value) in props {
840 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
841 id,
842 key: key.to_string(),
843 value,
844 }) {
845 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
846 }
847 }
848
849 id
850 }
851
852 #[must_use]
854 pub fn get_edge(
855 &self,
856 id: grafeo_common::types::EdgeId,
857 ) -> Option<grafeo_core::graph::lpg::Edge> {
858 self.store.get_edge(id)
859 }
860
861 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
865 let result = self.store.delete_edge(id);
866
867 if result {
868 if let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
869 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
870 }
871 }
872
873 result
874 }
875
876 pub fn set_edge_property(
880 &self,
881 id: grafeo_common::types::EdgeId,
882 key: &str,
883 value: grafeo_common::types::Value,
884 ) {
885 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
887 id,
888 key: key.to_string(),
889 value: value.clone(),
890 }) {
891 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
892 }
893 self.store.set_edge_property(id, key, value);
894 }
895
896 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
900 self.store.remove_node_property(id, key).is_some()
902 }
903
904 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
908 self.store.remove_edge_property(id, key).is_some()
910 }
911
912 #[must_use]
920 pub fn is_persistent(&self) -> bool {
921 self.config.path.is_some()
922 }
923
924 #[must_use]
928 pub fn path(&self) -> Option<&Path> {
929 self.config.path.as_deref()
930 }
931
932 #[must_use]
936 pub fn info(&self) -> crate::admin::DatabaseInfo {
937 crate::admin::DatabaseInfo {
938 mode: crate::admin::DatabaseMode::Lpg,
939 node_count: self.store.node_count(),
940 edge_count: self.store.edge_count(),
941 is_persistent: self.is_persistent(),
942 path: self.config.path.clone(),
943 wal_enabled: self.config.wal_enabled,
944 version: env!("CARGO_PKG_VERSION").to_string(),
945 }
946 }
947
948 #[must_use]
952 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
953 let disk_bytes = self.config.path.as_ref().and_then(|p| {
954 if p.exists() {
955 Self::calculate_disk_usage(p).ok()
956 } else {
957 None
958 }
959 });
960
961 crate::admin::DatabaseStats {
962 node_count: self.store.node_count(),
963 edge_count: self.store.edge_count(),
964 label_count: self.store.label_count(),
965 edge_type_count: self.store.edge_type_count(),
966 property_key_count: self.store.property_key_count(),
967 index_count: 0, memory_bytes: self.buffer_manager.allocated(),
969 disk_bytes,
970 }
971 }
972
973 fn calculate_disk_usage(path: &Path) -> Result<usize> {
975 let mut total = 0usize;
976 if path.is_dir() {
977 for entry in std::fs::read_dir(path)? {
978 let entry = entry?;
979 let metadata = entry.metadata()?;
980 if metadata.is_file() {
981 total += metadata.len() as usize;
982 } else if metadata.is_dir() {
983 total += Self::calculate_disk_usage(&entry.path())?;
984 }
985 }
986 }
987 Ok(total)
988 }
989
990 #[must_use]
995 pub fn schema(&self) -> crate::admin::SchemaInfo {
996 let labels = self
997 .store
998 .all_labels()
999 .into_iter()
1000 .map(|name| crate::admin::LabelInfo {
1001 name: name.clone(),
1002 count: self.store.nodes_with_label(&name).count(),
1003 })
1004 .collect();
1005
1006 let edge_types = self
1007 .store
1008 .all_edge_types()
1009 .into_iter()
1010 .map(|name| crate::admin::EdgeTypeInfo {
1011 name: name.clone(),
1012 count: self.store.edges_with_type(&name).count(),
1013 })
1014 .collect();
1015
1016 let property_keys = self.store.all_property_keys();
1017
1018 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1019 labels,
1020 edge_types,
1021 property_keys,
1022 })
1023 }
1024
1025 #[cfg(feature = "rdf")]
1029 #[must_use]
1030 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1031 let stats = self.rdf_store.stats();
1032
1033 let predicates = self
1034 .rdf_store
1035 .predicates()
1036 .into_iter()
1037 .map(|predicate| {
1038 let count = self.rdf_store.triples_with_predicate(&predicate).len();
1039 crate::admin::PredicateInfo {
1040 iri: predicate.to_string(),
1041 count,
1042 }
1043 })
1044 .collect();
1045
1046 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1047 predicates,
1048 named_graphs: Vec::new(), subject_count: stats.subject_count,
1050 object_count: stats.object_count,
1051 })
1052 }
1053
1054 #[must_use]
1062 pub fn validate(&self) -> crate::admin::ValidationResult {
1063 let mut result = crate::admin::ValidationResult::default();
1064
1065 for edge in self.store.all_edges() {
1067 if self.store.get_node(edge.src).is_none() {
1068 result.errors.push(crate::admin::ValidationError {
1069 code: "DANGLING_SRC".to_string(),
1070 message: format!(
1071 "Edge {} references non-existent source node {}",
1072 edge.id.0, edge.src.0
1073 ),
1074 context: Some(format!("edge:{}", edge.id.0)),
1075 });
1076 }
1077 if self.store.get_node(edge.dst).is_none() {
1078 result.errors.push(crate::admin::ValidationError {
1079 code: "DANGLING_DST".to_string(),
1080 message: format!(
1081 "Edge {} references non-existent destination node {}",
1082 edge.id.0, edge.dst.0
1083 ),
1084 context: Some(format!("edge:{}", edge.id.0)),
1085 });
1086 }
1087 }
1088
1089 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1091 result.warnings.push(crate::admin::ValidationWarning {
1092 code: "NO_EDGES".to_string(),
1093 message: "Database has nodes but no edges".to_string(),
1094 context: None,
1095 });
1096 }
1097
1098 result
1099 }
1100
1101 #[must_use]
1105 pub fn wal_status(&self) -> crate::admin::WalStatus {
1106 if let Some(ref wal) = self.wal {
1107 crate::admin::WalStatus {
1108 enabled: true,
1109 path: self.config.path.as_ref().map(|p| p.join("wal")),
1110 size_bytes: wal.size_bytes(),
1111 record_count: wal.record_count() as usize,
1112 last_checkpoint: wal.last_checkpoint_timestamp(),
1113 current_epoch: self.store.current_epoch().as_u64(),
1114 }
1115 } else {
1116 crate::admin::WalStatus {
1117 enabled: false,
1118 path: None,
1119 size_bytes: 0,
1120 record_count: 0,
1121 last_checkpoint: None,
1122 current_epoch: self.store.current_epoch().as_u64(),
1123 }
1124 }
1125 }
1126
1127 pub fn wal_checkpoint(&self) -> Result<()> {
1135 if let Some(ref wal) = self.wal {
1136 let epoch = self.store.current_epoch();
1137 let tx_id = self
1138 .tx_manager
1139 .last_assigned_tx_id()
1140 .unwrap_or_else(|| self.tx_manager.begin());
1141 wal.checkpoint(tx_id, epoch)?;
1142 wal.sync()?;
1143 }
1144 Ok(())
1145 }
1146
1147 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1162 let path = path.as_ref();
1163
1164 let target_config = Config::persistent(path);
1166 let target = Self::with_config(target_config)?;
1167
1168 for node in self.store.all_nodes() {
1170 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1171 target.store.create_node_with_id(node.id, &label_refs);
1172
1173 target.log_wal(&WalRecord::CreateNode {
1175 id: node.id,
1176 labels: node.labels.iter().map(|s| s.to_string()).collect(),
1177 })?;
1178
1179 for (key, value) in node.properties {
1181 target
1182 .store
1183 .set_node_property(node.id, key.as_str(), value.clone());
1184 target.log_wal(&WalRecord::SetNodeProperty {
1185 id: node.id,
1186 key: key.to_string(),
1187 value,
1188 })?;
1189 }
1190 }
1191
1192 for edge in self.store.all_edges() {
1194 target
1195 .store
1196 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1197
1198 target.log_wal(&WalRecord::CreateEdge {
1200 id: edge.id,
1201 src: edge.src,
1202 dst: edge.dst,
1203 edge_type: edge.edge_type.to_string(),
1204 })?;
1205
1206 for (key, value) in edge.properties {
1208 target
1209 .store
1210 .set_edge_property(edge.id, key.as_str(), value.clone());
1211 target.log_wal(&WalRecord::SetEdgeProperty {
1212 id: edge.id,
1213 key: key.to_string(),
1214 value,
1215 })?;
1216 }
1217 }
1218
1219 target.close()?;
1221
1222 Ok(())
1223 }
1224
1225 pub fn to_memory(&self) -> Result<Self> {
1236 let config = Config::in_memory();
1237 let target = Self::with_config(config)?;
1238
1239 for node in self.store.all_nodes() {
1241 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1242 target.store.create_node_with_id(node.id, &label_refs);
1243
1244 for (key, value) in node.properties {
1246 target.store.set_node_property(node.id, key.as_str(), value);
1247 }
1248 }
1249
1250 for edge in self.store.all_edges() {
1252 target
1253 .store
1254 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1255
1256 for (key, value) in edge.properties {
1258 target.store.set_edge_property(edge.id, key.as_str(), value);
1259 }
1260 }
1261
1262 Ok(target)
1263 }
1264
1265 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1274 let source = Self::open(path)?;
1276
1277 let target = source.to_memory()?;
1279
1280 source.close()?;
1282
1283 Ok(target)
1284 }
1285
1286 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1294 self.store.all_nodes()
1295 }
1296
1297 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1301 self.store.all_edges()
1302 }
1303}
1304
1305impl Drop for GrafeoDB {
1306 fn drop(&mut self) {
1307 if let Err(e) = self.close() {
1308 tracing::error!("Error closing database: {}", e);
1309 }
1310 }
1311}
1312
1313#[derive(Debug)]
1339pub struct QueryResult {
1340 pub columns: Vec<String>,
1342 pub column_types: Vec<grafeo_common::types::LogicalType>,
1344 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1346}
1347
1348impl QueryResult {
1349 #[must_use]
1351 pub fn new(columns: Vec<String>) -> Self {
1352 let len = columns.len();
1353 Self {
1354 columns,
1355 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1356 rows: Vec::new(),
1357 }
1358 }
1359
1360 #[must_use]
1362 pub fn with_types(
1363 columns: Vec<String>,
1364 column_types: Vec<grafeo_common::types::LogicalType>,
1365 ) -> Self {
1366 Self {
1367 columns,
1368 column_types,
1369 rows: Vec::new(),
1370 }
1371 }
1372
1373 #[must_use]
1375 pub fn row_count(&self) -> usize {
1376 self.rows.len()
1377 }
1378
1379 #[must_use]
1381 pub fn column_count(&self) -> usize {
1382 self.columns.len()
1383 }
1384
1385 #[must_use]
1387 pub fn is_empty(&self) -> bool {
1388 self.rows.is_empty()
1389 }
1390
1391 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1400 if self.rows.len() != 1 || self.columns.len() != 1 {
1401 return Err(grafeo_common::utils::error::Error::InvalidValue(
1402 "Expected single value".to_string(),
1403 ));
1404 }
1405 T::from_value(&self.rows[0][0])
1406 }
1407
1408 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1410 self.rows.iter()
1411 }
1412}
1413
1414pub trait FromValue: Sized {
1419 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1421}
1422
1423impl FromValue for i64 {
1424 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1425 value
1426 .as_int64()
1427 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1428 expected: "INT64".to_string(),
1429 found: value.type_name().to_string(),
1430 })
1431 }
1432}
1433
1434impl FromValue for f64 {
1435 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1436 value
1437 .as_float64()
1438 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1439 expected: "FLOAT64".to_string(),
1440 found: value.type_name().to_string(),
1441 })
1442 }
1443}
1444
1445impl FromValue for String {
1446 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1447 value.as_str().map(String::from).ok_or_else(|| {
1448 grafeo_common::utils::error::Error::TypeMismatch {
1449 expected: "STRING".to_string(),
1450 found: value.type_name().to_string(),
1451 }
1452 })
1453 }
1454}
1455
1456impl FromValue for bool {
1457 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1458 value
1459 .as_bool()
1460 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1461 expected: "BOOL".to_string(),
1462 found: value.type_name().to_string(),
1463 })
1464 }
1465}
1466
1467#[cfg(test)]
1468mod tests {
1469 use super::*;
1470
1471 #[test]
1472 fn test_create_in_memory_database() {
1473 let db = GrafeoDB::new_in_memory();
1474 assert_eq!(db.node_count(), 0);
1475 assert_eq!(db.edge_count(), 0);
1476 }
1477
1478 #[test]
1479 fn test_database_config() {
1480 let config = Config::in_memory().with_threads(4).with_query_logging();
1481
1482 let db = GrafeoDB::with_config(config).unwrap();
1483 assert_eq!(db.config().threads, 4);
1484 assert!(db.config().query_logging);
1485 }
1486
1487 #[test]
1488 fn test_database_session() {
1489 let db = GrafeoDB::new_in_memory();
1490 let _session = db.session();
1491 }
1493
1494 #[test]
1495 fn test_persistent_database_recovery() {
1496 use grafeo_common::types::Value;
1497 use tempfile::tempdir;
1498
1499 let dir = tempdir().unwrap();
1500 let db_path = dir.path().join("test_db");
1501
1502 {
1504 let db = GrafeoDB::open(&db_path).unwrap();
1505
1506 let alice = db.create_node(&["Person"]);
1507 db.set_node_property(alice, "name", Value::from("Alice"));
1508
1509 let bob = db.create_node(&["Person"]);
1510 db.set_node_property(bob, "name", Value::from("Bob"));
1511
1512 let _edge = db.create_edge(alice, bob, "KNOWS");
1513
1514 db.close().unwrap();
1516 }
1517
1518 {
1520 let db = GrafeoDB::open(&db_path).unwrap();
1521
1522 assert_eq!(db.node_count(), 2);
1523 assert_eq!(db.edge_count(), 1);
1524
1525 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1527 assert!(node0.is_some());
1528
1529 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1530 assert!(node1.is_some());
1531 }
1532 }
1533
1534 #[test]
1535 fn test_wal_logging() {
1536 use tempfile::tempdir;
1537
1538 let dir = tempdir().unwrap();
1539 let db_path = dir.path().join("wal_test_db");
1540
1541 let db = GrafeoDB::open(&db_path).unwrap();
1542
1543 let node = db.create_node(&["Test"]);
1545 db.delete_node(node);
1546
1547 if let Some(wal) = db.wal() {
1549 assert!(wal.record_count() > 0);
1550 }
1551
1552 db.close().unwrap();
1553 }
1554}