1use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
11use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::lpg::LpgStore;
14#[cfg(feature = "rdf")]
15use grafeo_core::graph::rdf::RdfStore;
16
17use crate::config::Config;
18use crate::session::Session;
19use crate::transaction::TransactionManager;
20
21pub struct GrafeoDB {
44 config: Config,
46 store: Arc<LpgStore>,
48 #[cfg(feature = "rdf")]
50 rdf_store: Arc<RdfStore>,
51 tx_manager: Arc<TransactionManager>,
53 buffer_manager: Arc<BufferManager>,
55 wal: Option<Arc<WalManager>>,
57 is_open: RwLock<bool>,
59}
60
61impl GrafeoDB {
62 #[must_use]
78 pub fn new_in_memory() -> Self {
79 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
80 }
81
82 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
101 Self::with_config(Config::persistent(path.as_ref()))
102 }
103
104 pub fn with_config(config: Config) -> Result<Self> {
128 let store = Arc::new(LpgStore::new());
129 #[cfg(feature = "rdf")]
130 let rdf_store = Arc::new(RdfStore::new());
131 let tx_manager = Arc::new(TransactionManager::new());
132
133 let buffer_config = BufferManagerConfig {
135 budget: config.memory_limit.unwrap_or_else(|| {
136 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
137 }),
138 spill_path: config
139 .spill_path
140 .clone()
141 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
142 ..BufferManagerConfig::default()
143 };
144 let buffer_manager = BufferManager::new(buffer_config);
145
146 let wal = if config.wal_enabled {
148 if let Some(ref db_path) = config.path {
149 std::fs::create_dir_all(db_path)?;
151
152 let wal_path = db_path.join("wal");
153
154 if wal_path.exists() {
156 let recovery = WalRecovery::new(&wal_path);
157 let records = recovery.recover()?;
158 Self::apply_wal_records(&store, &records)?;
159 }
160
161 let wal_config = WalConfig::default();
163 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
164 Some(Arc::new(wal_manager))
165 } else {
166 None
167 }
168 } else {
169 None
170 };
171
172 Ok(Self {
173 config,
174 store,
175 #[cfg(feature = "rdf")]
176 rdf_store,
177 tx_manager,
178 buffer_manager,
179 wal,
180 is_open: RwLock::new(true),
181 })
182 }
183
184 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
186 for record in records {
187 match record {
188 WalRecord::CreateNode { id, labels } => {
189 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
190 store.create_node_with_id(*id, &label_refs);
191 }
192 WalRecord::DeleteNode { id } => {
193 store.delete_node(*id);
194 }
195 WalRecord::CreateEdge {
196 id,
197 src,
198 dst,
199 edge_type,
200 } => {
201 store.create_edge_with_id(*id, *src, *dst, edge_type);
202 }
203 WalRecord::DeleteEdge { id } => {
204 store.delete_edge(*id);
205 }
206 WalRecord::SetNodeProperty { id, key, value } => {
207 store.set_node_property(*id, key, value.clone());
208 }
209 WalRecord::SetEdgeProperty { id, key, value } => {
210 store.set_edge_property(*id, key, value.clone());
211 }
212 WalRecord::AddNodeLabel { id, label } => {
213 store.add_label(*id, label);
214 }
215 WalRecord::RemoveNodeLabel { id, label } => {
216 store.remove_label(*id, label);
217 }
218 WalRecord::TxCommit { .. }
219 | WalRecord::TxAbort { .. }
220 | WalRecord::Checkpoint { .. } => {
221 }
224 }
225 }
226 Ok(())
227 }
228
229 #[must_use]
248 pub fn session(&self) -> Session {
249 #[cfg(feature = "rdf")]
250 {
251 Session::with_rdf_store_and_adaptive(
252 Arc::clone(&self.store),
253 Arc::clone(&self.rdf_store),
254 Arc::clone(&self.tx_manager),
255 self.config.adaptive.clone(),
256 )
257 }
258 #[cfg(not(feature = "rdf"))]
259 {
260 Session::with_adaptive(
261 Arc::clone(&self.store),
262 Arc::clone(&self.tx_manager),
263 self.config.adaptive.clone(),
264 )
265 }
266 }
267
268 #[must_use]
270 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
271 &self.config.adaptive
272 }
273
274 pub fn execute(&self, query: &str) -> Result<QueryResult> {
284 let session = self.session();
285 session.execute(query)
286 }
287
288 pub fn execute_with_params(
294 &self,
295 query: &str,
296 params: std::collections::HashMap<String, grafeo_common::types::Value>,
297 ) -> Result<QueryResult> {
298 let session = self.session();
299 session.execute_with_params(query, params)
300 }
301
302 #[cfg(feature = "cypher")]
308 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
309 let session = self.session();
310 session.execute_cypher(query)
311 }
312
313 #[cfg(feature = "cypher")]
319 pub fn execute_cypher_with_params(
320 &self,
321 query: &str,
322 params: std::collections::HashMap<String, grafeo_common::types::Value>,
323 ) -> Result<QueryResult> {
324 use crate::query::processor::{QueryLanguage, QueryProcessor};
325
326 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
328 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
329 }
330
331 #[cfg(feature = "gremlin")]
337 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
338 let session = self.session();
339 session.execute_gremlin(query)
340 }
341
342 #[cfg(feature = "gremlin")]
348 pub fn execute_gremlin_with_params(
349 &self,
350 query: &str,
351 params: std::collections::HashMap<String, grafeo_common::types::Value>,
352 ) -> Result<QueryResult> {
353 let session = self.session();
354 session.execute_gremlin_with_params(query, params)
355 }
356
357 #[cfg(feature = "graphql")]
363 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
364 let session = self.session();
365 session.execute_graphql(query)
366 }
367
368 #[cfg(feature = "graphql")]
374 pub fn execute_graphql_with_params(
375 &self,
376 query: &str,
377 params: std::collections::HashMap<String, grafeo_common::types::Value>,
378 ) -> Result<QueryResult> {
379 let session = self.session();
380 session.execute_graphql_with_params(query, params)
381 }
382
383 #[cfg(all(feature = "sparql", feature = "rdf"))]
400 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
401 use crate::query::{
402 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
403 };
404
405 let logical_plan = sparql_translator::translate(query)?;
407
408 let optimizer = Optimizer::new();
410 let optimized_plan = optimizer.optimize(logical_plan)?;
411
412 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
414 let mut physical_plan = planner.plan(&optimized_plan)?;
415
416 let executor = Executor::with_columns(physical_plan.columns.clone());
418 executor.execute(physical_plan.operator.as_mut())
419 }
420
421 #[cfg(feature = "rdf")]
425 #[must_use]
426 pub fn rdf_store(&self) -> &Arc<RdfStore> {
427 &self.rdf_store
428 }
429
430 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
436 let result = self.execute(query)?;
437 result.scalar()
438 }
439
440 #[must_use]
442 pub fn config(&self) -> &Config {
443 &self.config
444 }
445
446 #[must_use]
450 pub fn store(&self) -> &Arc<LpgStore> {
451 &self.store
452 }
453
454 #[must_use]
456 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
457 &self.buffer_manager
458 }
459
460 pub fn close(&self) -> Result<()> {
470 let mut is_open = self.is_open.write();
471 if !*is_open {
472 return Ok(());
473 }
474
475 if let Some(ref wal) = self.wal {
477 let epoch = self.store.current_epoch();
478
479 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
481 self.tx_manager.begin()
483 });
484
485 wal.log(&WalRecord::TxCommit {
487 tx_id: checkpoint_tx,
488 })?;
489
490 wal.checkpoint(checkpoint_tx, epoch)?;
492 wal.sync()?;
493 }
494
495 *is_open = false;
496 Ok(())
497 }
498
499 #[must_use]
501 pub fn wal(&self) -> Option<&Arc<WalManager>> {
502 self.wal.as_ref()
503 }
504
505 fn log_wal(&self, record: &WalRecord) -> Result<()> {
507 if let Some(ref wal) = self.wal {
508 wal.log(record)?;
509 }
510 Ok(())
511 }
512
513 #[must_use]
515 pub fn node_count(&self) -> usize {
516 self.store.node_count()
517 }
518
519 #[must_use]
521 pub fn edge_count(&self) -> usize {
522 self.store.edge_count()
523 }
524
525 #[must_use]
527 pub fn label_count(&self) -> usize {
528 self.store.label_count()
529 }
530
531 #[must_use]
533 pub fn property_key_count(&self) -> usize {
534 self.store.property_key_count()
535 }
536
537 #[must_use]
539 pub fn edge_type_count(&self) -> usize {
540 self.store.edge_type_count()
541 }
542
543 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
560 let id = self.store.create_node(labels);
561
562 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
564 id,
565 labels: labels.iter().map(|s| s.to_string()).collect(),
566 }) {
567 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
568 }
569
570 id
571 }
572
573 pub fn create_node_with_props(
577 &self,
578 labels: &[&str],
579 properties: impl IntoIterator<
580 Item = (
581 impl Into<grafeo_common::types::PropertyKey>,
582 impl Into<grafeo_common::types::Value>,
583 ),
584 >,
585 ) -> grafeo_common::types::NodeId {
586 let props: Vec<(
588 grafeo_common::types::PropertyKey,
589 grafeo_common::types::Value,
590 )> = properties
591 .into_iter()
592 .map(|(k, v)| (k.into(), v.into()))
593 .collect();
594
595 let id = self
596 .store
597 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
598
599 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
601 id,
602 labels: labels.iter().map(|s| s.to_string()).collect(),
603 }) {
604 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
605 }
606
607 for (key, value) in props {
609 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
610 id,
611 key: key.to_string(),
612 value,
613 }) {
614 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
615 }
616 }
617
618 id
619 }
620
621 #[must_use]
623 pub fn get_node(
624 &self,
625 id: grafeo_common::types::NodeId,
626 ) -> Option<grafeo_core::graph::lpg::Node> {
627 self.store.get_node(id)
628 }
629
630 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
634 let result = self.store.delete_node(id);
635
636 if result {
637 if let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
638 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
639 }
640 }
641
642 result
643 }
644
645 pub fn set_node_property(
649 &self,
650 id: grafeo_common::types::NodeId,
651 key: &str,
652 value: grafeo_common::types::Value,
653 ) {
654 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
656 id,
657 key: key.to_string(),
658 value: value.clone(),
659 }) {
660 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
661 }
662
663 self.store.set_node_property(id, key, value);
664 }
665
666 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
684 let result = self.store.add_label(id, label);
685
686 if result {
687 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
689 id,
690 label: label.to_string(),
691 }) {
692 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
693 }
694 }
695
696 result
697 }
698
699 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
717 let result = self.store.remove_label(id, label);
718
719 if result {
720 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
722 id,
723 label: label.to_string(),
724 }) {
725 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
726 }
727 }
728
729 result
730 }
731
732 #[must_use]
749 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
750 self.store
751 .get_node(id)
752 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
753 }
754
755 pub fn create_edge(
775 &self,
776 src: grafeo_common::types::NodeId,
777 dst: grafeo_common::types::NodeId,
778 edge_type: &str,
779 ) -> grafeo_common::types::EdgeId {
780 let id = self.store.create_edge(src, dst, edge_type);
781
782 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
784 id,
785 src,
786 dst,
787 edge_type: edge_type.to_string(),
788 }) {
789 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
790 }
791
792 id
793 }
794
795 pub fn create_edge_with_props(
799 &self,
800 src: grafeo_common::types::NodeId,
801 dst: grafeo_common::types::NodeId,
802 edge_type: &str,
803 properties: impl IntoIterator<
804 Item = (
805 impl Into<grafeo_common::types::PropertyKey>,
806 impl Into<grafeo_common::types::Value>,
807 ),
808 >,
809 ) -> grafeo_common::types::EdgeId {
810 let props: Vec<(
812 grafeo_common::types::PropertyKey,
813 grafeo_common::types::Value,
814 )> = properties
815 .into_iter()
816 .map(|(k, v)| (k.into(), v.into()))
817 .collect();
818
819 let id = self.store.create_edge_with_props(
820 src,
821 dst,
822 edge_type,
823 props.iter().map(|(k, v)| (k.clone(), v.clone())),
824 );
825
826 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
828 id,
829 src,
830 dst,
831 edge_type: edge_type.to_string(),
832 }) {
833 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
834 }
835
836 for (key, value) in props {
838 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
839 id,
840 key: key.to_string(),
841 value,
842 }) {
843 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
844 }
845 }
846
847 id
848 }
849
850 #[must_use]
852 pub fn get_edge(
853 &self,
854 id: grafeo_common::types::EdgeId,
855 ) -> Option<grafeo_core::graph::lpg::Edge> {
856 self.store.get_edge(id)
857 }
858
859 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
863 let result = self.store.delete_edge(id);
864
865 if result {
866 if let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
867 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
868 }
869 }
870
871 result
872 }
873
874 pub fn set_edge_property(
878 &self,
879 id: grafeo_common::types::EdgeId,
880 key: &str,
881 value: grafeo_common::types::Value,
882 ) {
883 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
885 id,
886 key: key.to_string(),
887 value: value.clone(),
888 }) {
889 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
890 }
891 self.store.set_edge_property(id, key, value);
892 }
893
894 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
898 self.store.remove_node_property(id, key).is_some()
900 }
901
902 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
906 self.store.remove_edge_property(id, key).is_some()
908 }
909
910 #[must_use]
918 pub fn is_persistent(&self) -> bool {
919 self.config.path.is_some()
920 }
921
922 #[must_use]
926 pub fn path(&self) -> Option<&Path> {
927 self.config.path.as_deref()
928 }
929
930 #[must_use]
934 pub fn info(&self) -> crate::admin::DatabaseInfo {
935 crate::admin::DatabaseInfo {
936 mode: crate::admin::DatabaseMode::Lpg,
937 node_count: self.store.node_count(),
938 edge_count: self.store.edge_count(),
939 is_persistent: self.is_persistent(),
940 path: self.config.path.clone(),
941 wal_enabled: self.config.wal_enabled,
942 version: env!("CARGO_PKG_VERSION").to_string(),
943 }
944 }
945
946 #[must_use]
950 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
951 let disk_bytes = self.config.path.as_ref().and_then(|p| {
952 if p.exists() {
953 Self::calculate_disk_usage(p).ok()
954 } else {
955 None
956 }
957 });
958
959 crate::admin::DatabaseStats {
960 node_count: self.store.node_count(),
961 edge_count: self.store.edge_count(),
962 label_count: self.store.label_count(),
963 edge_type_count: self.store.edge_type_count(),
964 property_key_count: self.store.property_key_count(),
965 index_count: 0, memory_bytes: self.buffer_manager.allocated(),
967 disk_bytes,
968 }
969 }
970
971 fn calculate_disk_usage(path: &Path) -> Result<usize> {
973 let mut total = 0usize;
974 if path.is_dir() {
975 for entry in std::fs::read_dir(path)? {
976 let entry = entry?;
977 let metadata = entry.metadata()?;
978 if metadata.is_file() {
979 total += metadata.len() as usize;
980 } else if metadata.is_dir() {
981 total += Self::calculate_disk_usage(&entry.path())?;
982 }
983 }
984 }
985 Ok(total)
986 }
987
988 #[must_use]
993 pub fn schema(&self) -> crate::admin::SchemaInfo {
994 let labels = self
995 .store
996 .all_labels()
997 .into_iter()
998 .map(|name| crate::admin::LabelInfo {
999 name: name.clone(),
1000 count: self.store.nodes_with_label(&name).count(),
1001 })
1002 .collect();
1003
1004 let edge_types = self
1005 .store
1006 .all_edge_types()
1007 .into_iter()
1008 .map(|name| crate::admin::EdgeTypeInfo {
1009 name: name.clone(),
1010 count: self.store.edges_with_type(&name).count(),
1011 })
1012 .collect();
1013
1014 let property_keys = self.store.all_property_keys();
1015
1016 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1017 labels,
1018 edge_types,
1019 property_keys,
1020 })
1021 }
1022
1023 #[cfg(feature = "rdf")]
1027 #[must_use]
1028 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1029 let stats = self.rdf_store.stats();
1030
1031 let predicates = self
1032 .rdf_store
1033 .predicates()
1034 .into_iter()
1035 .map(|predicate| {
1036 let count = self.rdf_store.triples_with_predicate(&predicate).len();
1037 crate::admin::PredicateInfo {
1038 iri: predicate.to_string(),
1039 count,
1040 }
1041 })
1042 .collect();
1043
1044 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1045 predicates,
1046 named_graphs: Vec::new(), subject_count: stats.subject_count,
1048 object_count: stats.object_count,
1049 })
1050 }
1051
1052 #[must_use]
1060 pub fn validate(&self) -> crate::admin::ValidationResult {
1061 let mut result = crate::admin::ValidationResult::default();
1062
1063 for edge in self.store.all_edges() {
1065 if self.store.get_node(edge.src).is_none() {
1066 result.errors.push(crate::admin::ValidationError {
1067 code: "DANGLING_SRC".to_string(),
1068 message: format!(
1069 "Edge {} references non-existent source node {}",
1070 edge.id.0, edge.src.0
1071 ),
1072 context: Some(format!("edge:{}", edge.id.0)),
1073 });
1074 }
1075 if self.store.get_node(edge.dst).is_none() {
1076 result.errors.push(crate::admin::ValidationError {
1077 code: "DANGLING_DST".to_string(),
1078 message: format!(
1079 "Edge {} references non-existent destination node {}",
1080 edge.id.0, edge.dst.0
1081 ),
1082 context: Some(format!("edge:{}", edge.id.0)),
1083 });
1084 }
1085 }
1086
1087 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1089 result.warnings.push(crate::admin::ValidationWarning {
1090 code: "NO_EDGES".to_string(),
1091 message: "Database has nodes but no edges".to_string(),
1092 context: None,
1093 });
1094 }
1095
1096 result
1097 }
1098
1099 #[must_use]
1103 pub fn wal_status(&self) -> crate::admin::WalStatus {
1104 if let Some(ref wal) = self.wal {
1105 crate::admin::WalStatus {
1106 enabled: true,
1107 path: self.config.path.as_ref().map(|p| p.join("wal")),
1108 size_bytes: wal.size_bytes(),
1109 record_count: wal.record_count() as usize,
1110 last_checkpoint: wal.last_checkpoint_timestamp(),
1111 current_epoch: self.store.current_epoch().as_u64(),
1112 }
1113 } else {
1114 crate::admin::WalStatus {
1115 enabled: false,
1116 path: None,
1117 size_bytes: 0,
1118 record_count: 0,
1119 last_checkpoint: None,
1120 current_epoch: self.store.current_epoch().as_u64(),
1121 }
1122 }
1123 }
1124
1125 pub fn wal_checkpoint(&self) -> Result<()> {
1133 if let Some(ref wal) = self.wal {
1134 let epoch = self.store.current_epoch();
1135 let tx_id = self
1136 .tx_manager
1137 .last_assigned_tx_id()
1138 .unwrap_or_else(|| self.tx_manager.begin());
1139 wal.checkpoint(tx_id, epoch)?;
1140 wal.sync()?;
1141 }
1142 Ok(())
1143 }
1144
1145 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1160 let path = path.as_ref();
1161
1162 let target_config = Config::persistent(path);
1164 let target = Self::with_config(target_config)?;
1165
1166 for node in self.store.all_nodes() {
1168 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1169 target.store.create_node_with_id(node.id, &label_refs);
1170
1171 target.log_wal(&WalRecord::CreateNode {
1173 id: node.id,
1174 labels: node.labels.iter().map(|s| s.to_string()).collect(),
1175 })?;
1176
1177 for (key, value) in node.properties {
1179 target
1180 .store
1181 .set_node_property(node.id, key.as_str(), value.clone());
1182 target.log_wal(&WalRecord::SetNodeProperty {
1183 id: node.id,
1184 key: key.to_string(),
1185 value,
1186 })?;
1187 }
1188 }
1189
1190 for edge in self.store.all_edges() {
1192 target
1193 .store
1194 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1195
1196 target.log_wal(&WalRecord::CreateEdge {
1198 id: edge.id,
1199 src: edge.src,
1200 dst: edge.dst,
1201 edge_type: edge.edge_type.to_string(),
1202 })?;
1203
1204 for (key, value) in edge.properties {
1206 target
1207 .store
1208 .set_edge_property(edge.id, key.as_str(), value.clone());
1209 target.log_wal(&WalRecord::SetEdgeProperty {
1210 id: edge.id,
1211 key: key.to_string(),
1212 value,
1213 })?;
1214 }
1215 }
1216
1217 target.close()?;
1219
1220 Ok(())
1221 }
1222
1223 pub fn to_memory(&self) -> Result<Self> {
1234 let config = Config::in_memory();
1235 let target = Self::with_config(config)?;
1236
1237 for node in self.store.all_nodes() {
1239 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1240 target.store.create_node_with_id(node.id, &label_refs);
1241
1242 for (key, value) in node.properties {
1244 target.store.set_node_property(node.id, key.as_str(), value);
1245 }
1246 }
1247
1248 for edge in self.store.all_edges() {
1250 target
1251 .store
1252 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1253
1254 for (key, value) in edge.properties {
1256 target.store.set_edge_property(edge.id, key.as_str(), value);
1257 }
1258 }
1259
1260 Ok(target)
1261 }
1262
1263 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1272 let source = Self::open(path)?;
1274
1275 let target = source.to_memory()?;
1277
1278 source.close()?;
1280
1281 Ok(target)
1282 }
1283
1284 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1292 self.store.all_nodes()
1293 }
1294
1295 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1299 self.store.all_edges()
1300 }
1301}
1302
1303impl Drop for GrafeoDB {
1304 fn drop(&mut self) {
1305 if let Err(e) = self.close() {
1306 tracing::error!("Error closing database: {}", e);
1307 }
1308 }
1309}
1310
1311#[derive(Debug)]
1337pub struct QueryResult {
1338 pub columns: Vec<String>,
1340 pub column_types: Vec<grafeo_common::types::LogicalType>,
1342 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1344}
1345
1346impl QueryResult {
1347 #[must_use]
1349 pub fn new(columns: Vec<String>) -> Self {
1350 let len = columns.len();
1351 Self {
1352 columns,
1353 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1354 rows: Vec::new(),
1355 }
1356 }
1357
1358 #[must_use]
1360 pub fn with_types(
1361 columns: Vec<String>,
1362 column_types: Vec<grafeo_common::types::LogicalType>,
1363 ) -> Self {
1364 Self {
1365 columns,
1366 column_types,
1367 rows: Vec::new(),
1368 }
1369 }
1370
1371 #[must_use]
1373 pub fn row_count(&self) -> usize {
1374 self.rows.len()
1375 }
1376
1377 #[must_use]
1379 pub fn column_count(&self) -> usize {
1380 self.columns.len()
1381 }
1382
1383 #[must_use]
1385 pub fn is_empty(&self) -> bool {
1386 self.rows.is_empty()
1387 }
1388
1389 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1398 if self.rows.len() != 1 || self.columns.len() != 1 {
1399 return Err(grafeo_common::utils::error::Error::InvalidValue(
1400 "Expected single value".to_string(),
1401 ));
1402 }
1403 T::from_value(&self.rows[0][0])
1404 }
1405
1406 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1408 self.rows.iter()
1409 }
1410}
1411
1412pub trait FromValue: Sized {
1417 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1419}
1420
1421impl FromValue for i64 {
1422 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1423 value
1424 .as_int64()
1425 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1426 expected: "INT64".to_string(),
1427 found: value.type_name().to_string(),
1428 })
1429 }
1430}
1431
1432impl FromValue for f64 {
1433 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1434 value
1435 .as_float64()
1436 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1437 expected: "FLOAT64".to_string(),
1438 found: value.type_name().to_string(),
1439 })
1440 }
1441}
1442
1443impl FromValue for String {
1444 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1445 value.as_str().map(String::from).ok_or_else(|| {
1446 grafeo_common::utils::error::Error::TypeMismatch {
1447 expected: "STRING".to_string(),
1448 found: value.type_name().to_string(),
1449 }
1450 })
1451 }
1452}
1453
1454impl FromValue for bool {
1455 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1456 value
1457 .as_bool()
1458 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1459 expected: "BOOL".to_string(),
1460 found: value.type_name().to_string(),
1461 })
1462 }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467 use super::*;
1468
1469 #[test]
1470 fn test_create_in_memory_database() {
1471 let db = GrafeoDB::new_in_memory();
1472 assert_eq!(db.node_count(), 0);
1473 assert_eq!(db.edge_count(), 0);
1474 }
1475
1476 #[test]
1477 fn test_database_config() {
1478 let config = Config::in_memory().with_threads(4).with_query_logging();
1479
1480 let db = GrafeoDB::with_config(config).unwrap();
1481 assert_eq!(db.config().threads, 4);
1482 assert!(db.config().query_logging);
1483 }
1484
1485 #[test]
1486 fn test_database_session() {
1487 let db = GrafeoDB::new_in_memory();
1488 let _session = db.session();
1489 }
1491
1492 #[test]
1493 fn test_persistent_database_recovery() {
1494 use grafeo_common::types::Value;
1495 use tempfile::tempdir;
1496
1497 let dir = tempdir().unwrap();
1498 let db_path = dir.path().join("test_db");
1499
1500 {
1502 let db = GrafeoDB::open(&db_path).unwrap();
1503
1504 let alice = db.create_node(&["Person"]);
1505 db.set_node_property(alice, "name", Value::from("Alice"));
1506
1507 let bob = db.create_node(&["Person"]);
1508 db.set_node_property(bob, "name", Value::from("Bob"));
1509
1510 let _edge = db.create_edge(alice, bob, "KNOWS");
1511
1512 db.close().unwrap();
1514 }
1515
1516 {
1518 let db = GrafeoDB::open(&db_path).unwrap();
1519
1520 assert_eq!(db.node_count(), 2);
1521 assert_eq!(db.edge_count(), 1);
1522
1523 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1525 assert!(node0.is_some());
1526
1527 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1528 assert!(node1.is_some());
1529 }
1530 }
1531
1532 #[test]
1533 fn test_wal_logging() {
1534 use tempfile::tempdir;
1535
1536 let dir = tempdir().unwrap();
1537 let db_path = dir.path().join("wal_test_db");
1538
1539 let db = GrafeoDB::open(&db_path).unwrap();
1540
1541 let node = db.create_node(&["Test"]);
1543 db.delete_node(node);
1544
1545 if let Some(wal) = db.wal() {
1547 assert!(wal.record_count() > 0);
1548 }
1549
1550 db.close().unwrap();
1551 }
1552}