1use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
11use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::lpg::LpgStore;
14#[cfg(feature = "rdf")]
15use grafeo_core::graph::rdf::RdfStore;
16
17use crate::config::Config;
18use crate::query::cache::QueryCache;
19use crate::session::Session;
20use crate::transaction::TransactionManager;
21
22pub struct GrafeoDB {
45 config: Config,
47 store: Arc<LpgStore>,
49 #[cfg(feature = "rdf")]
51 rdf_store: Arc<RdfStore>,
52 tx_manager: Arc<TransactionManager>,
54 buffer_manager: Arc<BufferManager>,
56 wal: Option<Arc<WalManager>>,
58 query_cache: Arc<QueryCache>,
60 is_open: RwLock<bool>,
62}
63
64impl GrafeoDB {
65 #[must_use]
81 pub fn new_in_memory() -> Self {
82 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
83 }
84
85 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
104 Self::with_config(Config::persistent(path.as_ref()))
105 }
106
107 pub fn with_config(config: Config) -> Result<Self> {
131 let store = Arc::new(LpgStore::new());
132 #[cfg(feature = "rdf")]
133 let rdf_store = Arc::new(RdfStore::new());
134 let tx_manager = Arc::new(TransactionManager::new());
135
136 let buffer_config = BufferManagerConfig {
138 budget: config.memory_limit.unwrap_or_else(|| {
139 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
140 }),
141 spill_path: config
142 .spill_path
143 .clone()
144 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
145 ..BufferManagerConfig::default()
146 };
147 let buffer_manager = BufferManager::new(buffer_config);
148
149 let wal = if config.wal_enabled {
151 if let Some(ref db_path) = config.path {
152 std::fs::create_dir_all(db_path)?;
154
155 let wal_path = db_path.join("wal");
156
157 if wal_path.exists() {
159 let recovery = WalRecovery::new(&wal_path);
160 let records = recovery.recover()?;
161 Self::apply_wal_records(&store, &records)?;
162 }
163
164 let wal_config = WalConfig::default();
166 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
167 Some(Arc::new(wal_manager))
168 } else {
169 None
170 }
171 } else {
172 None
173 };
174
175 let query_cache = Arc::new(QueryCache::default());
177
178 Ok(Self {
179 config,
180 store,
181 #[cfg(feature = "rdf")]
182 rdf_store,
183 tx_manager,
184 buffer_manager,
185 wal,
186 query_cache,
187 is_open: RwLock::new(true),
188 })
189 }
190
191 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
193 for record in records {
194 match record {
195 WalRecord::CreateNode { id, labels } => {
196 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
197 store.create_node_with_id(*id, &label_refs);
198 }
199 WalRecord::DeleteNode { id } => {
200 store.delete_node(*id);
201 }
202 WalRecord::CreateEdge {
203 id,
204 src,
205 dst,
206 edge_type,
207 } => {
208 store.create_edge_with_id(*id, *src, *dst, edge_type);
209 }
210 WalRecord::DeleteEdge { id } => {
211 store.delete_edge(*id);
212 }
213 WalRecord::SetNodeProperty { id, key, value } => {
214 store.set_node_property(*id, key, value.clone());
215 }
216 WalRecord::SetEdgeProperty { id, key, value } => {
217 store.set_edge_property(*id, key, value.clone());
218 }
219 WalRecord::AddNodeLabel { id, label } => {
220 store.add_label(*id, label);
221 }
222 WalRecord::RemoveNodeLabel { id, label } => {
223 store.remove_label(*id, label);
224 }
225 WalRecord::TxCommit { .. }
226 | WalRecord::TxAbort { .. }
227 | WalRecord::Checkpoint { .. } => {
228 }
231 }
232 }
233 Ok(())
234 }
235
236 #[must_use]
255 pub fn session(&self) -> Session {
256 #[cfg(feature = "rdf")]
257 {
258 Session::with_rdf_store_and_adaptive(
259 Arc::clone(&self.store),
260 Arc::clone(&self.rdf_store),
261 Arc::clone(&self.tx_manager),
262 Arc::clone(&self.query_cache),
263 self.config.adaptive.clone(),
264 self.config.factorized_execution,
265 )
266 }
267 #[cfg(not(feature = "rdf"))]
268 {
269 Session::with_adaptive(
270 Arc::clone(&self.store),
271 Arc::clone(&self.tx_manager),
272 Arc::clone(&self.query_cache),
273 self.config.adaptive.clone(),
274 self.config.factorized_execution,
275 )
276 }
277 }
278
279 #[must_use]
281 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
282 &self.config.adaptive
283 }
284
285 pub fn execute(&self, query: &str) -> Result<QueryResult> {
295 let session = self.session();
296 session.execute(query)
297 }
298
299 pub fn execute_with_params(
305 &self,
306 query: &str,
307 params: std::collections::HashMap<String, grafeo_common::types::Value>,
308 ) -> Result<QueryResult> {
309 let session = self.session();
310 session.execute_with_params(query, params)
311 }
312
313 #[cfg(feature = "cypher")]
319 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
320 let session = self.session();
321 session.execute_cypher(query)
322 }
323
324 #[cfg(feature = "cypher")]
330 pub fn execute_cypher_with_params(
331 &self,
332 query: &str,
333 params: std::collections::HashMap<String, grafeo_common::types::Value>,
334 ) -> Result<QueryResult> {
335 use crate::query::processor::{QueryLanguage, QueryProcessor};
336
337 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
339 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
340 }
341
342 #[cfg(feature = "gremlin")]
348 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
349 let session = self.session();
350 session.execute_gremlin(query)
351 }
352
353 #[cfg(feature = "gremlin")]
359 pub fn execute_gremlin_with_params(
360 &self,
361 query: &str,
362 params: std::collections::HashMap<String, grafeo_common::types::Value>,
363 ) -> Result<QueryResult> {
364 let session = self.session();
365 session.execute_gremlin_with_params(query, params)
366 }
367
368 #[cfg(feature = "graphql")]
374 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
375 let session = self.session();
376 session.execute_graphql(query)
377 }
378
379 #[cfg(feature = "graphql")]
385 pub fn execute_graphql_with_params(
386 &self,
387 query: &str,
388 params: std::collections::HashMap<String, grafeo_common::types::Value>,
389 ) -> Result<QueryResult> {
390 let session = self.session();
391 session.execute_graphql_with_params(query, params)
392 }
393
394 #[cfg(all(feature = "sparql", feature = "rdf"))]
411 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
412 use crate::query::{
413 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
414 };
415
416 let logical_plan = sparql_translator::translate(query)?;
418
419 let optimizer = Optimizer::new();
421 let optimized_plan = optimizer.optimize(logical_plan)?;
422
423 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
425 let mut physical_plan = planner.plan(&optimized_plan)?;
426
427 let executor = Executor::with_columns(physical_plan.columns.clone());
429 executor.execute(physical_plan.operator.as_mut())
430 }
431
432 #[cfg(feature = "rdf")]
436 #[must_use]
437 pub fn rdf_store(&self) -> &Arc<RdfStore> {
438 &self.rdf_store
439 }
440
441 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
447 let result = self.execute(query)?;
448 result.scalar()
449 }
450
451 #[must_use]
453 pub fn config(&self) -> &Config {
454 &self.config
455 }
456
457 #[must_use]
461 pub fn store(&self) -> &Arc<LpgStore> {
462 &self.store
463 }
464
465 #[must_use]
467 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
468 &self.buffer_manager
469 }
470
471 pub fn close(&self) -> Result<()> {
481 let mut is_open = self.is_open.write();
482 if !*is_open {
483 return Ok(());
484 }
485
486 if let Some(ref wal) = self.wal {
488 let epoch = self.store.current_epoch();
489
490 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
492 self.tx_manager.begin()
494 });
495
496 wal.log(&WalRecord::TxCommit {
498 tx_id: checkpoint_tx,
499 })?;
500
501 wal.checkpoint(checkpoint_tx, epoch)?;
503 wal.sync()?;
504 }
505
506 *is_open = false;
507 Ok(())
508 }
509
510 #[must_use]
512 pub fn wal(&self) -> Option<&Arc<WalManager>> {
513 self.wal.as_ref()
514 }
515
516 fn log_wal(&self, record: &WalRecord) -> Result<()> {
518 if let Some(ref wal) = self.wal {
519 wal.log(record)?;
520 }
521 Ok(())
522 }
523
524 #[must_use]
526 pub fn node_count(&self) -> usize {
527 self.store.node_count()
528 }
529
530 #[must_use]
532 pub fn edge_count(&self) -> usize {
533 self.store.edge_count()
534 }
535
536 #[must_use]
538 pub fn label_count(&self) -> usize {
539 self.store.label_count()
540 }
541
542 #[must_use]
544 pub fn property_key_count(&self) -> usize {
545 self.store.property_key_count()
546 }
547
548 #[must_use]
550 pub fn edge_type_count(&self) -> usize {
551 self.store.edge_type_count()
552 }
553
554 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
571 let id = self.store.create_node(labels);
572
573 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
575 id,
576 labels: labels.iter().map(|s| s.to_string()).collect(),
577 }) {
578 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
579 }
580
581 id
582 }
583
584 pub fn create_node_with_props(
588 &self,
589 labels: &[&str],
590 properties: impl IntoIterator<
591 Item = (
592 impl Into<grafeo_common::types::PropertyKey>,
593 impl Into<grafeo_common::types::Value>,
594 ),
595 >,
596 ) -> grafeo_common::types::NodeId {
597 let props: Vec<(
599 grafeo_common::types::PropertyKey,
600 grafeo_common::types::Value,
601 )> = properties
602 .into_iter()
603 .map(|(k, v)| (k.into(), v.into()))
604 .collect();
605
606 let id = self
607 .store
608 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
609
610 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
612 id,
613 labels: labels.iter().map(|s| s.to_string()).collect(),
614 }) {
615 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
616 }
617
618 for (key, value) in props {
620 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
621 id,
622 key: key.to_string(),
623 value,
624 }) {
625 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
626 }
627 }
628
629 id
630 }
631
632 #[must_use]
634 pub fn get_node(
635 &self,
636 id: grafeo_common::types::NodeId,
637 ) -> Option<grafeo_core::graph::lpg::Node> {
638 self.store.get_node(id)
639 }
640
641 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
645 let result = self.store.delete_node(id);
646
647 if result {
648 if let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
649 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
650 }
651 }
652
653 result
654 }
655
656 pub fn set_node_property(
660 &self,
661 id: grafeo_common::types::NodeId,
662 key: &str,
663 value: grafeo_common::types::Value,
664 ) {
665 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
667 id,
668 key: key.to_string(),
669 value: value.clone(),
670 }) {
671 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
672 }
673
674 self.store.set_node_property(id, key, value);
675 }
676
677 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
695 let result = self.store.add_label(id, label);
696
697 if result {
698 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
700 id,
701 label: label.to_string(),
702 }) {
703 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
704 }
705 }
706
707 result
708 }
709
710 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
728 let result = self.store.remove_label(id, label);
729
730 if result {
731 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
733 id,
734 label: label.to_string(),
735 }) {
736 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
737 }
738 }
739
740 result
741 }
742
743 #[must_use]
760 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
761 self.store
762 .get_node(id)
763 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
764 }
765
766 pub fn create_edge(
786 &self,
787 src: grafeo_common::types::NodeId,
788 dst: grafeo_common::types::NodeId,
789 edge_type: &str,
790 ) -> grafeo_common::types::EdgeId {
791 let id = self.store.create_edge(src, dst, edge_type);
792
793 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
795 id,
796 src,
797 dst,
798 edge_type: edge_type.to_string(),
799 }) {
800 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
801 }
802
803 id
804 }
805
806 pub fn create_edge_with_props(
810 &self,
811 src: grafeo_common::types::NodeId,
812 dst: grafeo_common::types::NodeId,
813 edge_type: &str,
814 properties: impl IntoIterator<
815 Item = (
816 impl Into<grafeo_common::types::PropertyKey>,
817 impl Into<grafeo_common::types::Value>,
818 ),
819 >,
820 ) -> grafeo_common::types::EdgeId {
821 let props: Vec<(
823 grafeo_common::types::PropertyKey,
824 grafeo_common::types::Value,
825 )> = properties
826 .into_iter()
827 .map(|(k, v)| (k.into(), v.into()))
828 .collect();
829
830 let id = self.store.create_edge_with_props(
831 src,
832 dst,
833 edge_type,
834 props.iter().map(|(k, v)| (k.clone(), v.clone())),
835 );
836
837 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
839 id,
840 src,
841 dst,
842 edge_type: edge_type.to_string(),
843 }) {
844 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
845 }
846
847 for (key, value) in props {
849 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
850 id,
851 key: key.to_string(),
852 value,
853 }) {
854 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
855 }
856 }
857
858 id
859 }
860
861 #[must_use]
863 pub fn get_edge(
864 &self,
865 id: grafeo_common::types::EdgeId,
866 ) -> Option<grafeo_core::graph::lpg::Edge> {
867 self.store.get_edge(id)
868 }
869
870 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
874 let result = self.store.delete_edge(id);
875
876 if result {
877 if let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
878 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
879 }
880 }
881
882 result
883 }
884
885 pub fn set_edge_property(
889 &self,
890 id: grafeo_common::types::EdgeId,
891 key: &str,
892 value: grafeo_common::types::Value,
893 ) {
894 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
896 id,
897 key: key.to_string(),
898 value: value.clone(),
899 }) {
900 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
901 }
902 self.store.set_edge_property(id, key, value);
903 }
904
905 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
909 self.store.remove_node_property(id, key).is_some()
911 }
912
913 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
917 self.store.remove_edge_property(id, key).is_some()
919 }
920
921 #[must_use]
929 pub fn is_persistent(&self) -> bool {
930 self.config.path.is_some()
931 }
932
933 #[must_use]
937 pub fn path(&self) -> Option<&Path> {
938 self.config.path.as_deref()
939 }
940
941 #[must_use]
945 pub fn info(&self) -> crate::admin::DatabaseInfo {
946 crate::admin::DatabaseInfo {
947 mode: crate::admin::DatabaseMode::Lpg,
948 node_count: self.store.node_count(),
949 edge_count: self.store.edge_count(),
950 is_persistent: self.is_persistent(),
951 path: self.config.path.clone(),
952 wal_enabled: self.config.wal_enabled,
953 version: env!("CARGO_PKG_VERSION").to_string(),
954 }
955 }
956
957 #[must_use]
961 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
962 let disk_bytes = self.config.path.as_ref().and_then(|p| {
963 if p.exists() {
964 Self::calculate_disk_usage(p).ok()
965 } else {
966 None
967 }
968 });
969
970 crate::admin::DatabaseStats {
971 node_count: self.store.node_count(),
972 edge_count: self.store.edge_count(),
973 label_count: self.store.label_count(),
974 edge_type_count: self.store.edge_type_count(),
975 property_key_count: self.store.property_key_count(),
976 index_count: 0, memory_bytes: self.buffer_manager.allocated(),
978 disk_bytes,
979 }
980 }
981
982 fn calculate_disk_usage(path: &Path) -> Result<usize> {
984 let mut total = 0usize;
985 if path.is_dir() {
986 for entry in std::fs::read_dir(path)? {
987 let entry = entry?;
988 let metadata = entry.metadata()?;
989 if metadata.is_file() {
990 total += metadata.len() as usize;
991 } else if metadata.is_dir() {
992 total += Self::calculate_disk_usage(&entry.path())?;
993 }
994 }
995 }
996 Ok(total)
997 }
998
999 #[must_use]
1004 pub fn schema(&self) -> crate::admin::SchemaInfo {
1005 let labels = self
1006 .store
1007 .all_labels()
1008 .into_iter()
1009 .map(|name| crate::admin::LabelInfo {
1010 name: name.clone(),
1011 count: self.store.nodes_with_label(&name).count(),
1012 })
1013 .collect();
1014
1015 let edge_types = self
1016 .store
1017 .all_edge_types()
1018 .into_iter()
1019 .map(|name| crate::admin::EdgeTypeInfo {
1020 name: name.clone(),
1021 count: self.store.edges_with_type(&name).count(),
1022 })
1023 .collect();
1024
1025 let property_keys = self.store.all_property_keys();
1026
1027 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1028 labels,
1029 edge_types,
1030 property_keys,
1031 })
1032 }
1033
1034 #[cfg(feature = "rdf")]
1038 #[must_use]
1039 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1040 let stats = self.rdf_store.stats();
1041
1042 let predicates = self
1043 .rdf_store
1044 .predicates()
1045 .into_iter()
1046 .map(|predicate| {
1047 let count = self.rdf_store.triples_with_predicate(&predicate).len();
1048 crate::admin::PredicateInfo {
1049 iri: predicate.to_string(),
1050 count,
1051 }
1052 })
1053 .collect();
1054
1055 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1056 predicates,
1057 named_graphs: Vec::new(), subject_count: stats.subject_count,
1059 object_count: stats.object_count,
1060 })
1061 }
1062
1063 #[must_use]
1071 pub fn validate(&self) -> crate::admin::ValidationResult {
1072 let mut result = crate::admin::ValidationResult::default();
1073
1074 for edge in self.store.all_edges() {
1076 if self.store.get_node(edge.src).is_none() {
1077 result.errors.push(crate::admin::ValidationError {
1078 code: "DANGLING_SRC".to_string(),
1079 message: format!(
1080 "Edge {} references non-existent source node {}",
1081 edge.id.0, edge.src.0
1082 ),
1083 context: Some(format!("edge:{}", edge.id.0)),
1084 });
1085 }
1086 if self.store.get_node(edge.dst).is_none() {
1087 result.errors.push(crate::admin::ValidationError {
1088 code: "DANGLING_DST".to_string(),
1089 message: format!(
1090 "Edge {} references non-existent destination node {}",
1091 edge.id.0, edge.dst.0
1092 ),
1093 context: Some(format!("edge:{}", edge.id.0)),
1094 });
1095 }
1096 }
1097
1098 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1100 result.warnings.push(crate::admin::ValidationWarning {
1101 code: "NO_EDGES".to_string(),
1102 message: "Database has nodes but no edges".to_string(),
1103 context: None,
1104 });
1105 }
1106
1107 result
1108 }
1109
1110 #[must_use]
1114 pub fn wal_status(&self) -> crate::admin::WalStatus {
1115 if let Some(ref wal) = self.wal {
1116 crate::admin::WalStatus {
1117 enabled: true,
1118 path: self.config.path.as_ref().map(|p| p.join("wal")),
1119 size_bytes: wal.size_bytes(),
1120 record_count: wal.record_count() as usize,
1121 last_checkpoint: wal.last_checkpoint_timestamp(),
1122 current_epoch: self.store.current_epoch().as_u64(),
1123 }
1124 } else {
1125 crate::admin::WalStatus {
1126 enabled: false,
1127 path: None,
1128 size_bytes: 0,
1129 record_count: 0,
1130 last_checkpoint: None,
1131 current_epoch: self.store.current_epoch().as_u64(),
1132 }
1133 }
1134 }
1135
1136 pub fn wal_checkpoint(&self) -> Result<()> {
1144 if let Some(ref wal) = self.wal {
1145 let epoch = self.store.current_epoch();
1146 let tx_id = self
1147 .tx_manager
1148 .last_assigned_tx_id()
1149 .unwrap_or_else(|| self.tx_manager.begin());
1150 wal.checkpoint(tx_id, epoch)?;
1151 wal.sync()?;
1152 }
1153 Ok(())
1154 }
1155
1156 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1171 let path = path.as_ref();
1172
1173 let target_config = Config::persistent(path);
1175 let target = Self::with_config(target_config)?;
1176
1177 for node in self.store.all_nodes() {
1179 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1180 target.store.create_node_with_id(node.id, &label_refs);
1181
1182 target.log_wal(&WalRecord::CreateNode {
1184 id: node.id,
1185 labels: node.labels.iter().map(|s| s.to_string()).collect(),
1186 })?;
1187
1188 for (key, value) in node.properties {
1190 target
1191 .store
1192 .set_node_property(node.id, key.as_str(), value.clone());
1193 target.log_wal(&WalRecord::SetNodeProperty {
1194 id: node.id,
1195 key: key.to_string(),
1196 value,
1197 })?;
1198 }
1199 }
1200
1201 for edge in self.store.all_edges() {
1203 target
1204 .store
1205 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1206
1207 target.log_wal(&WalRecord::CreateEdge {
1209 id: edge.id,
1210 src: edge.src,
1211 dst: edge.dst,
1212 edge_type: edge.edge_type.to_string(),
1213 })?;
1214
1215 for (key, value) in edge.properties {
1217 target
1218 .store
1219 .set_edge_property(edge.id, key.as_str(), value.clone());
1220 target.log_wal(&WalRecord::SetEdgeProperty {
1221 id: edge.id,
1222 key: key.to_string(),
1223 value,
1224 })?;
1225 }
1226 }
1227
1228 target.close()?;
1230
1231 Ok(())
1232 }
1233
1234 pub fn to_memory(&self) -> Result<Self> {
1245 let config = Config::in_memory();
1246 let target = Self::with_config(config)?;
1247
1248 for node in self.store.all_nodes() {
1250 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1251 target.store.create_node_with_id(node.id, &label_refs);
1252
1253 for (key, value) in node.properties {
1255 target.store.set_node_property(node.id, key.as_str(), value);
1256 }
1257 }
1258
1259 for edge in self.store.all_edges() {
1261 target
1262 .store
1263 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1264
1265 for (key, value) in edge.properties {
1267 target.store.set_edge_property(edge.id, key.as_str(), value);
1268 }
1269 }
1270
1271 Ok(target)
1272 }
1273
1274 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1283 let source = Self::open(path)?;
1285
1286 let target = source.to_memory()?;
1288
1289 source.close()?;
1291
1292 Ok(target)
1293 }
1294
1295 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1303 self.store.all_nodes()
1304 }
1305
1306 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1310 self.store.all_edges()
1311 }
1312}
1313
1314impl Drop for GrafeoDB {
1315 fn drop(&mut self) {
1316 if let Err(e) = self.close() {
1317 tracing::error!("Error closing database: {}", e);
1318 }
1319 }
1320}
1321
1322#[derive(Debug)]
1348pub struct QueryResult {
1349 pub columns: Vec<String>,
1351 pub column_types: Vec<grafeo_common::types::LogicalType>,
1353 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1355}
1356
1357impl QueryResult {
1358 #[must_use]
1360 pub fn new(columns: Vec<String>) -> Self {
1361 let len = columns.len();
1362 Self {
1363 columns,
1364 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1365 rows: Vec::new(),
1366 }
1367 }
1368
1369 #[must_use]
1371 pub fn with_types(
1372 columns: Vec<String>,
1373 column_types: Vec<grafeo_common::types::LogicalType>,
1374 ) -> Self {
1375 Self {
1376 columns,
1377 column_types,
1378 rows: Vec::new(),
1379 }
1380 }
1381
1382 #[must_use]
1384 pub fn row_count(&self) -> usize {
1385 self.rows.len()
1386 }
1387
1388 #[must_use]
1390 pub fn column_count(&self) -> usize {
1391 self.columns.len()
1392 }
1393
1394 #[must_use]
1396 pub fn is_empty(&self) -> bool {
1397 self.rows.is_empty()
1398 }
1399
1400 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1409 if self.rows.len() != 1 || self.columns.len() != 1 {
1410 return Err(grafeo_common::utils::error::Error::InvalidValue(
1411 "Expected single value".to_string(),
1412 ));
1413 }
1414 T::from_value(&self.rows[0][0])
1415 }
1416
1417 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1419 self.rows.iter()
1420 }
1421}
1422
1423pub trait FromValue: Sized {
1428 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1430}
1431
1432impl FromValue for i64 {
1433 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1434 value
1435 .as_int64()
1436 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1437 expected: "INT64".to_string(),
1438 found: value.type_name().to_string(),
1439 })
1440 }
1441}
1442
1443impl FromValue for f64 {
1444 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1445 value
1446 .as_float64()
1447 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1448 expected: "FLOAT64".to_string(),
1449 found: value.type_name().to_string(),
1450 })
1451 }
1452}
1453
1454impl FromValue for String {
1455 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1456 value.as_str().map(String::from).ok_or_else(|| {
1457 grafeo_common::utils::error::Error::TypeMismatch {
1458 expected: "STRING".to_string(),
1459 found: value.type_name().to_string(),
1460 }
1461 })
1462 }
1463}
1464
1465impl FromValue for bool {
1466 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1467 value
1468 .as_bool()
1469 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1470 expected: "BOOL".to_string(),
1471 found: value.type_name().to_string(),
1472 })
1473 }
1474}
1475
1476#[cfg(test)]
1477mod tests {
1478 use super::*;
1479
1480 #[test]
1481 fn test_create_in_memory_database() {
1482 let db = GrafeoDB::new_in_memory();
1483 assert_eq!(db.node_count(), 0);
1484 assert_eq!(db.edge_count(), 0);
1485 }
1486
1487 #[test]
1488 fn test_database_config() {
1489 let config = Config::in_memory().with_threads(4).with_query_logging();
1490
1491 let db = GrafeoDB::with_config(config).unwrap();
1492 assert_eq!(db.config().threads, 4);
1493 assert!(db.config().query_logging);
1494 }
1495
1496 #[test]
1497 fn test_database_session() {
1498 let db = GrafeoDB::new_in_memory();
1499 let _session = db.session();
1500 }
1502
1503 #[test]
1504 fn test_persistent_database_recovery() {
1505 use grafeo_common::types::Value;
1506 use tempfile::tempdir;
1507
1508 let dir = tempdir().unwrap();
1509 let db_path = dir.path().join("test_db");
1510
1511 {
1513 let db = GrafeoDB::open(&db_path).unwrap();
1514
1515 let alice = db.create_node(&["Person"]);
1516 db.set_node_property(alice, "name", Value::from("Alice"));
1517
1518 let bob = db.create_node(&["Person"]);
1519 db.set_node_property(bob, "name", Value::from("Bob"));
1520
1521 let _edge = db.create_edge(alice, bob, "KNOWS");
1522
1523 db.close().unwrap();
1525 }
1526
1527 {
1529 let db = GrafeoDB::open(&db_path).unwrap();
1530
1531 assert_eq!(db.node_count(), 2);
1532 assert_eq!(db.edge_count(), 1);
1533
1534 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1536 assert!(node0.is_some());
1537
1538 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1539 assert!(node1.is_some());
1540 }
1541 }
1542
1543 #[test]
1544 fn test_wal_logging() {
1545 use tempfile::tempdir;
1546
1547 let dir = tempdir().unwrap();
1548 let db_path = dir.path().join("wal_test_db");
1549
1550 let db = GrafeoDB::open(&db_path).unwrap();
1551
1552 let node = db.create_node(&["Test"]);
1554 db.delete_node(node);
1555
1556 if let Some(wal) = db.wal() {
1558 assert!(wal.record_count() > 0);
1559 }
1560
1561 db.close().unwrap();
1562 }
1563}