1use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
11use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::lpg::LpgStore;
14#[cfg(feature = "rdf")]
15use grafeo_core::graph::rdf::RdfStore;
16
17use crate::config::Config;
18use crate::session::Session;
19use crate::transaction::TransactionManager;
20
21pub struct GrafeoDB {
44 config: Config,
46 store: Arc<LpgStore>,
48 #[cfg(feature = "rdf")]
50 rdf_store: Arc<RdfStore>,
51 tx_manager: Arc<TransactionManager>,
53 buffer_manager: Arc<BufferManager>,
55 wal: Option<Arc<WalManager>>,
57 is_open: RwLock<bool>,
59}
60
61impl GrafeoDB {
62 #[must_use]
78 pub fn new_in_memory() -> Self {
79 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
80 }
81
82 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
101 Self::with_config(Config::persistent(path.as_ref()))
102 }
103
104 pub fn with_config(config: Config) -> Result<Self> {
128 let store = Arc::new(LpgStore::new());
129 #[cfg(feature = "rdf")]
130 let rdf_store = Arc::new(RdfStore::new());
131 let tx_manager = Arc::new(TransactionManager::new());
132
133 let buffer_config = BufferManagerConfig {
135 budget: config.memory_limit.unwrap_or_else(|| {
136 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
137 }),
138 spill_path: config
139 .spill_path
140 .clone()
141 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
142 ..BufferManagerConfig::default()
143 };
144 let buffer_manager = BufferManager::new(buffer_config);
145
146 let wal = if config.wal_enabled {
148 if let Some(ref db_path) = config.path {
149 std::fs::create_dir_all(db_path)?;
151
152 let wal_path = db_path.join("wal");
153
154 if wal_path.exists() {
156 let recovery = WalRecovery::new(&wal_path);
157 let records = recovery.recover()?;
158 Self::apply_wal_records(&store, &records)?;
159 }
160
161 let wal_config = WalConfig::default();
163 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
164 Some(Arc::new(wal_manager))
165 } else {
166 None
167 }
168 } else {
169 None
170 };
171
172 Ok(Self {
173 config,
174 store,
175 #[cfg(feature = "rdf")]
176 rdf_store,
177 tx_manager,
178 buffer_manager,
179 wal,
180 is_open: RwLock::new(true),
181 })
182 }
183
184 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
186 for record in records {
187 match record {
188 WalRecord::CreateNode { id, labels } => {
189 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
190 store.create_node_with_id(*id, &label_refs);
191 }
192 WalRecord::DeleteNode { id } => {
193 store.delete_node(*id);
194 }
195 WalRecord::CreateEdge {
196 id,
197 src,
198 dst,
199 edge_type,
200 } => {
201 store.create_edge_with_id(*id, *src, *dst, edge_type);
202 }
203 WalRecord::DeleteEdge { id } => {
204 store.delete_edge(*id);
205 }
206 WalRecord::SetNodeProperty { id, key, value } => {
207 store.set_node_property(*id, key, value.clone());
208 }
209 WalRecord::SetEdgeProperty { id, key, value } => {
210 store.set_edge_property(*id, key, value.clone());
211 }
212 WalRecord::TxCommit { .. }
213 | WalRecord::TxAbort { .. }
214 | WalRecord::Checkpoint { .. } => {
215 }
218 }
219 }
220 Ok(())
221 }
222
223 #[must_use]
242 pub fn session(&self) -> Session {
243 #[cfg(feature = "rdf")]
244 {
245 Session::with_rdf_store_and_adaptive(
246 Arc::clone(&self.store),
247 Arc::clone(&self.rdf_store),
248 Arc::clone(&self.tx_manager),
249 self.config.adaptive.clone(),
250 )
251 }
252 #[cfg(not(feature = "rdf"))]
253 {
254 Session::with_adaptive(
255 Arc::clone(&self.store),
256 Arc::clone(&self.tx_manager),
257 self.config.adaptive.clone(),
258 )
259 }
260 }
261
262 #[must_use]
264 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
265 &self.config.adaptive
266 }
267
268 pub fn execute(&self, query: &str) -> Result<QueryResult> {
278 let session = self.session();
279 session.execute(query)
280 }
281
282 pub fn execute_with_params(
288 &self,
289 query: &str,
290 params: std::collections::HashMap<String, grafeo_common::types::Value>,
291 ) -> Result<QueryResult> {
292 let session = self.session();
293 session.execute_with_params(query, params)
294 }
295
296 #[cfg(feature = "cypher")]
302 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
303 let session = self.session();
304 session.execute_cypher(query)
305 }
306
307 #[cfg(feature = "cypher")]
313 pub fn execute_cypher_with_params(
314 &self,
315 query: &str,
316 params: std::collections::HashMap<String, grafeo_common::types::Value>,
317 ) -> Result<QueryResult> {
318 use crate::query::processor::{QueryLanguage, QueryProcessor};
319
320 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
322 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
323 }
324
325 #[cfg(feature = "gremlin")]
331 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
332 let session = self.session();
333 session.execute_gremlin(query)
334 }
335
336 #[cfg(feature = "gremlin")]
342 pub fn execute_gremlin_with_params(
343 &self,
344 query: &str,
345 params: std::collections::HashMap<String, grafeo_common::types::Value>,
346 ) -> Result<QueryResult> {
347 let session = self.session();
348 session.execute_gremlin_with_params(query, params)
349 }
350
351 #[cfg(feature = "graphql")]
357 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
358 let session = self.session();
359 session.execute_graphql(query)
360 }
361
362 #[cfg(feature = "graphql")]
368 pub fn execute_graphql_with_params(
369 &self,
370 query: &str,
371 params: std::collections::HashMap<String, grafeo_common::types::Value>,
372 ) -> Result<QueryResult> {
373 let session = self.session();
374 session.execute_graphql_with_params(query, params)
375 }
376
377 #[cfg(all(feature = "sparql", feature = "rdf"))]
394 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
395 use crate::query::{
396 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
397 };
398
399 let logical_plan = sparql_translator::translate(query)?;
401
402 let optimizer = Optimizer::new();
404 let optimized_plan = optimizer.optimize(logical_plan)?;
405
406 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
408 let mut physical_plan = planner.plan(&optimized_plan)?;
409
410 let executor = Executor::with_columns(physical_plan.columns.clone());
412 executor.execute(physical_plan.operator.as_mut())
413 }
414
415 #[cfg(feature = "rdf")]
419 #[must_use]
420 pub fn rdf_store(&self) -> &Arc<RdfStore> {
421 &self.rdf_store
422 }
423
424 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
430 let result = self.execute(query)?;
431 result.scalar()
432 }
433
434 #[must_use]
436 pub fn config(&self) -> &Config {
437 &self.config
438 }
439
440 #[must_use]
444 pub fn store(&self) -> &Arc<LpgStore> {
445 &self.store
446 }
447
448 #[must_use]
450 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
451 &self.buffer_manager
452 }
453
454 pub fn close(&self) -> Result<()> {
464 let mut is_open = self.is_open.write();
465 if !*is_open {
466 return Ok(());
467 }
468
469 if let Some(ref wal) = self.wal {
471 let epoch = self.store.current_epoch();
472
473 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
475 self.tx_manager.begin()
477 });
478
479 wal.log(&WalRecord::TxCommit {
481 tx_id: checkpoint_tx,
482 })?;
483
484 wal.checkpoint(checkpoint_tx, epoch)?;
486 wal.sync()?;
487 }
488
489 *is_open = false;
490 Ok(())
491 }
492
493 #[must_use]
495 pub fn wal(&self) -> Option<&Arc<WalManager>> {
496 self.wal.as_ref()
497 }
498
499 fn log_wal(&self, record: &WalRecord) -> Result<()> {
501 if let Some(ref wal) = self.wal {
502 wal.log(record)?;
503 }
504 Ok(())
505 }
506
507 #[must_use]
509 pub fn node_count(&self) -> usize {
510 self.store.node_count()
511 }
512
513 #[must_use]
515 pub fn edge_count(&self) -> usize {
516 self.store.edge_count()
517 }
518
519 #[must_use]
521 pub fn label_count(&self) -> usize {
522 self.store.label_count()
523 }
524
525 #[must_use]
527 pub fn property_key_count(&self) -> usize {
528 self.store.property_key_count()
529 }
530
531 #[must_use]
533 pub fn edge_type_count(&self) -> usize {
534 self.store.edge_type_count()
535 }
536
537 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
554 let id = self.store.create_node(labels);
555
556 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
558 id,
559 labels: labels.iter().map(|s| s.to_string()).collect(),
560 }) {
561 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
562 }
563
564 id
565 }
566
567 pub fn create_node_with_props(
571 &self,
572 labels: &[&str],
573 properties: impl IntoIterator<
574 Item = (
575 impl Into<grafeo_common::types::PropertyKey>,
576 impl Into<grafeo_common::types::Value>,
577 ),
578 >,
579 ) -> grafeo_common::types::NodeId {
580 let props: Vec<(
582 grafeo_common::types::PropertyKey,
583 grafeo_common::types::Value,
584 )> = properties
585 .into_iter()
586 .map(|(k, v)| (k.into(), v.into()))
587 .collect();
588
589 let id = self
590 .store
591 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
592
593 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
595 id,
596 labels: labels.iter().map(|s| s.to_string()).collect(),
597 }) {
598 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
599 }
600
601 for (key, value) in props {
603 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
604 id,
605 key: key.to_string(),
606 value,
607 }) {
608 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
609 }
610 }
611
612 id
613 }
614
615 #[must_use]
617 pub fn get_node(
618 &self,
619 id: grafeo_common::types::NodeId,
620 ) -> Option<grafeo_core::graph::lpg::Node> {
621 self.store.get_node(id)
622 }
623
624 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
628 let result = self.store.delete_node(id);
629
630 if result {
631 if let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
632 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
633 }
634 }
635
636 result
637 }
638
639 pub fn set_node_property(
643 &self,
644 id: grafeo_common::types::NodeId,
645 key: &str,
646 value: grafeo_common::types::Value,
647 ) {
648 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
650 id,
651 key: key.to_string(),
652 value: value.clone(),
653 }) {
654 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
655 }
656
657 self.store.set_node_property(id, key, value);
658 }
659
660 pub fn create_edge(
680 &self,
681 src: grafeo_common::types::NodeId,
682 dst: grafeo_common::types::NodeId,
683 edge_type: &str,
684 ) -> grafeo_common::types::EdgeId {
685 let id = self.store.create_edge(src, dst, edge_type);
686
687 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
689 id,
690 src,
691 dst,
692 edge_type: edge_type.to_string(),
693 }) {
694 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
695 }
696
697 id
698 }
699
700 pub fn create_edge_with_props(
704 &self,
705 src: grafeo_common::types::NodeId,
706 dst: grafeo_common::types::NodeId,
707 edge_type: &str,
708 properties: impl IntoIterator<
709 Item = (
710 impl Into<grafeo_common::types::PropertyKey>,
711 impl Into<grafeo_common::types::Value>,
712 ),
713 >,
714 ) -> grafeo_common::types::EdgeId {
715 let props: Vec<(
717 grafeo_common::types::PropertyKey,
718 grafeo_common::types::Value,
719 )> = properties
720 .into_iter()
721 .map(|(k, v)| (k.into(), v.into()))
722 .collect();
723
724 let id = self.store.create_edge_with_props(
725 src,
726 dst,
727 edge_type,
728 props.iter().map(|(k, v)| (k.clone(), v.clone())),
729 );
730
731 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
733 id,
734 src,
735 dst,
736 edge_type: edge_type.to_string(),
737 }) {
738 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
739 }
740
741 for (key, value) in props {
743 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
744 id,
745 key: key.to_string(),
746 value,
747 }) {
748 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
749 }
750 }
751
752 id
753 }
754
755 #[must_use]
757 pub fn get_edge(
758 &self,
759 id: grafeo_common::types::EdgeId,
760 ) -> Option<grafeo_core::graph::lpg::Edge> {
761 self.store.get_edge(id)
762 }
763
764 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
768 let result = self.store.delete_edge(id);
769
770 if result {
771 if let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
772 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
773 }
774 }
775
776 result
777 }
778
779 pub fn set_edge_property(
783 &self,
784 id: grafeo_common::types::EdgeId,
785 key: &str,
786 value: grafeo_common::types::Value,
787 ) {
788 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
790 id,
791 key: key.to_string(),
792 value: value.clone(),
793 }) {
794 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
795 }
796 self.store.set_edge_property(id, key, value);
797 }
798
799 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
803 self.store.remove_node_property(id, key).is_some()
805 }
806
807 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
811 self.store.remove_edge_property(id, key).is_some()
813 }
814
815 #[must_use]
823 pub fn is_persistent(&self) -> bool {
824 self.config.path.is_some()
825 }
826
827 #[must_use]
831 pub fn path(&self) -> Option<&Path> {
832 self.config.path.as_deref()
833 }
834
835 #[must_use]
839 pub fn info(&self) -> crate::admin::DatabaseInfo {
840 crate::admin::DatabaseInfo {
841 mode: crate::admin::DatabaseMode::Lpg,
842 node_count: self.store.node_count(),
843 edge_count: self.store.edge_count(),
844 is_persistent: self.is_persistent(),
845 path: self.config.path.clone(),
846 wal_enabled: self.config.wal_enabled,
847 version: env!("CARGO_PKG_VERSION").to_string(),
848 }
849 }
850
851 #[must_use]
855 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
856 let disk_bytes = self.config.path.as_ref().and_then(|p| {
857 if p.exists() {
858 Self::calculate_disk_usage(p).ok()
859 } else {
860 None
861 }
862 });
863
864 crate::admin::DatabaseStats {
865 node_count: self.store.node_count(),
866 edge_count: self.store.edge_count(),
867 label_count: self.store.label_count(),
868 edge_type_count: self.store.edge_type_count(),
869 property_key_count: self.store.property_key_count(),
870 index_count: 0, memory_bytes: self.buffer_manager.allocated(),
872 disk_bytes,
873 }
874 }
875
876 fn calculate_disk_usage(path: &Path) -> Result<usize> {
878 let mut total = 0usize;
879 if path.is_dir() {
880 for entry in std::fs::read_dir(path)? {
881 let entry = entry?;
882 let metadata = entry.metadata()?;
883 if metadata.is_file() {
884 total += metadata.len() as usize;
885 } else if metadata.is_dir() {
886 total += Self::calculate_disk_usage(&entry.path())?;
887 }
888 }
889 }
890 Ok(total)
891 }
892
893 #[must_use]
898 pub fn schema(&self) -> crate::admin::SchemaInfo {
899 let labels = self
900 .store
901 .all_labels()
902 .into_iter()
903 .map(|name| crate::admin::LabelInfo {
904 name: name.clone(),
905 count: self.store.nodes_with_label(&name).count(),
906 })
907 .collect();
908
909 let edge_types = self
910 .store
911 .all_edge_types()
912 .into_iter()
913 .map(|name| crate::admin::EdgeTypeInfo {
914 name: name.clone(),
915 count: self.store.edges_with_type(&name).count(),
916 })
917 .collect();
918
919 let property_keys = self.store.all_property_keys();
920
921 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
922 labels,
923 edge_types,
924 property_keys,
925 })
926 }
927
928 #[cfg(feature = "rdf")]
932 #[must_use]
933 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
934 let stats = self.rdf_store.stats();
935
936 let predicates = self
937 .rdf_store
938 .predicates()
939 .into_iter()
940 .map(|predicate| {
941 let count = self.rdf_store.triples_with_predicate(&predicate).len();
942 crate::admin::PredicateInfo {
943 iri: predicate.to_string(),
944 count,
945 }
946 })
947 .collect();
948
949 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
950 predicates,
951 named_graphs: Vec::new(), subject_count: stats.subject_count,
953 object_count: stats.object_count,
954 })
955 }
956
957 #[must_use]
965 pub fn validate(&self) -> crate::admin::ValidationResult {
966 let mut result = crate::admin::ValidationResult::default();
967
968 for edge in self.store.all_edges() {
970 if self.store.get_node(edge.src).is_none() {
971 result.errors.push(crate::admin::ValidationError {
972 code: "DANGLING_SRC".to_string(),
973 message: format!(
974 "Edge {} references non-existent source node {}",
975 edge.id.0, edge.src.0
976 ),
977 context: Some(format!("edge:{}", edge.id.0)),
978 });
979 }
980 if self.store.get_node(edge.dst).is_none() {
981 result.errors.push(crate::admin::ValidationError {
982 code: "DANGLING_DST".to_string(),
983 message: format!(
984 "Edge {} references non-existent destination node {}",
985 edge.id.0, edge.dst.0
986 ),
987 context: Some(format!("edge:{}", edge.id.0)),
988 });
989 }
990 }
991
992 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
994 result.warnings.push(crate::admin::ValidationWarning {
995 code: "NO_EDGES".to_string(),
996 message: "Database has nodes but no edges".to_string(),
997 context: None,
998 });
999 }
1000
1001 result
1002 }
1003
1004 #[must_use]
1008 pub fn wal_status(&self) -> crate::admin::WalStatus {
1009 if let Some(ref wal) = self.wal {
1010 crate::admin::WalStatus {
1011 enabled: true,
1012 path: self.config.path.as_ref().map(|p| p.join("wal")),
1013 size_bytes: wal.size_bytes(),
1014 record_count: wal.record_count() as usize,
1015 last_checkpoint: wal.last_checkpoint_timestamp(),
1016 current_epoch: self.store.current_epoch().as_u64(),
1017 }
1018 } else {
1019 crate::admin::WalStatus {
1020 enabled: false,
1021 path: None,
1022 size_bytes: 0,
1023 record_count: 0,
1024 last_checkpoint: None,
1025 current_epoch: self.store.current_epoch().as_u64(),
1026 }
1027 }
1028 }
1029
1030 pub fn wal_checkpoint(&self) -> Result<()> {
1038 if let Some(ref wal) = self.wal {
1039 let epoch = self.store.current_epoch();
1040 let tx_id = self
1041 .tx_manager
1042 .last_assigned_tx_id()
1043 .unwrap_or_else(|| self.tx_manager.begin());
1044 wal.checkpoint(tx_id, epoch)?;
1045 wal.sync()?;
1046 }
1047 Ok(())
1048 }
1049
1050 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1065 let path = path.as_ref();
1066
1067 let target_config = Config::persistent(path);
1069 let target = Self::with_config(target_config)?;
1070
1071 for node in self.store.all_nodes() {
1073 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1074 target.store.create_node_with_id(node.id, &label_refs);
1075
1076 target.log_wal(&WalRecord::CreateNode {
1078 id: node.id,
1079 labels: node.labels.iter().map(|s| s.to_string()).collect(),
1080 })?;
1081
1082 for (key, value) in node.properties {
1084 target
1085 .store
1086 .set_node_property(node.id, key.as_str(), value.clone());
1087 target.log_wal(&WalRecord::SetNodeProperty {
1088 id: node.id,
1089 key: key.to_string(),
1090 value,
1091 })?;
1092 }
1093 }
1094
1095 for edge in self.store.all_edges() {
1097 target
1098 .store
1099 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1100
1101 target.log_wal(&WalRecord::CreateEdge {
1103 id: edge.id,
1104 src: edge.src,
1105 dst: edge.dst,
1106 edge_type: edge.edge_type.to_string(),
1107 })?;
1108
1109 for (key, value) in edge.properties {
1111 target
1112 .store
1113 .set_edge_property(edge.id, key.as_str(), value.clone());
1114 target.log_wal(&WalRecord::SetEdgeProperty {
1115 id: edge.id,
1116 key: key.to_string(),
1117 value,
1118 })?;
1119 }
1120 }
1121
1122 target.close()?;
1124
1125 Ok(())
1126 }
1127
1128 pub fn to_memory(&self) -> Result<Self> {
1139 let config = Config::in_memory();
1140 let target = Self::with_config(config)?;
1141
1142 for node in self.store.all_nodes() {
1144 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1145 target.store.create_node_with_id(node.id, &label_refs);
1146
1147 for (key, value) in node.properties {
1149 target.store.set_node_property(node.id, key.as_str(), value);
1150 }
1151 }
1152
1153 for edge in self.store.all_edges() {
1155 target
1156 .store
1157 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1158
1159 for (key, value) in edge.properties {
1161 target.store.set_edge_property(edge.id, key.as_str(), value);
1162 }
1163 }
1164
1165 Ok(target)
1166 }
1167
1168 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1177 let source = Self::open(path)?;
1179
1180 let target = source.to_memory()?;
1182
1183 source.close()?;
1185
1186 Ok(target)
1187 }
1188
1189 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1197 self.store.all_nodes()
1198 }
1199
1200 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1204 self.store.all_edges()
1205 }
1206}
1207
1208impl Drop for GrafeoDB {
1209 fn drop(&mut self) {
1210 if let Err(e) = self.close() {
1211 tracing::error!("Error closing database: {}", e);
1212 }
1213 }
1214}
1215
1216#[derive(Debug)]
1242pub struct QueryResult {
1243 pub columns: Vec<String>,
1245 pub column_types: Vec<grafeo_common::types::LogicalType>,
1247 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1249}
1250
1251impl QueryResult {
1252 #[must_use]
1254 pub fn new(columns: Vec<String>) -> Self {
1255 let len = columns.len();
1256 Self {
1257 columns,
1258 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1259 rows: Vec::new(),
1260 }
1261 }
1262
1263 #[must_use]
1265 pub fn with_types(
1266 columns: Vec<String>,
1267 column_types: Vec<grafeo_common::types::LogicalType>,
1268 ) -> Self {
1269 Self {
1270 columns,
1271 column_types,
1272 rows: Vec::new(),
1273 }
1274 }
1275
1276 #[must_use]
1278 pub fn row_count(&self) -> usize {
1279 self.rows.len()
1280 }
1281
1282 #[must_use]
1284 pub fn column_count(&self) -> usize {
1285 self.columns.len()
1286 }
1287
1288 #[must_use]
1290 pub fn is_empty(&self) -> bool {
1291 self.rows.is_empty()
1292 }
1293
1294 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1303 if self.rows.len() != 1 || self.columns.len() != 1 {
1304 return Err(grafeo_common::utils::error::Error::InvalidValue(
1305 "Expected single value".to_string(),
1306 ));
1307 }
1308 T::from_value(&self.rows[0][0])
1309 }
1310
1311 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1313 self.rows.iter()
1314 }
1315}
1316
1317pub trait FromValue: Sized {
1322 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1324}
1325
1326impl FromValue for i64 {
1327 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1328 value
1329 .as_int64()
1330 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1331 expected: "INT64".to_string(),
1332 found: value.type_name().to_string(),
1333 })
1334 }
1335}
1336
1337impl FromValue for f64 {
1338 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1339 value
1340 .as_float64()
1341 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1342 expected: "FLOAT64".to_string(),
1343 found: value.type_name().to_string(),
1344 })
1345 }
1346}
1347
1348impl FromValue for String {
1349 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1350 value.as_str().map(String::from).ok_or_else(|| {
1351 grafeo_common::utils::error::Error::TypeMismatch {
1352 expected: "STRING".to_string(),
1353 found: value.type_name().to_string(),
1354 }
1355 })
1356 }
1357}
1358
1359impl FromValue for bool {
1360 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1361 value
1362 .as_bool()
1363 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1364 expected: "BOOL".to_string(),
1365 found: value.type_name().to_string(),
1366 })
1367 }
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372 use super::*;
1373
1374 #[test]
1375 fn test_create_in_memory_database() {
1376 let db = GrafeoDB::new_in_memory();
1377 assert_eq!(db.node_count(), 0);
1378 assert_eq!(db.edge_count(), 0);
1379 }
1380
1381 #[test]
1382 fn test_database_config() {
1383 let config = Config::in_memory().with_threads(4).with_query_logging();
1384
1385 let db = GrafeoDB::with_config(config).unwrap();
1386 assert_eq!(db.config().threads, 4);
1387 assert!(db.config().query_logging);
1388 }
1389
1390 #[test]
1391 fn test_database_session() {
1392 let db = GrafeoDB::new_in_memory();
1393 let _session = db.session();
1394 }
1396
1397 #[test]
1398 fn test_persistent_database_recovery() {
1399 use grafeo_common::types::Value;
1400 use tempfile::tempdir;
1401
1402 let dir = tempdir().unwrap();
1403 let db_path = dir.path().join("test_db");
1404
1405 {
1407 let db = GrafeoDB::open(&db_path).unwrap();
1408
1409 let alice = db.create_node(&["Person"]);
1410 db.set_node_property(alice, "name", Value::from("Alice"));
1411
1412 let bob = db.create_node(&["Person"]);
1413 db.set_node_property(bob, "name", Value::from("Bob"));
1414
1415 let _edge = db.create_edge(alice, bob, "KNOWS");
1416
1417 db.close().unwrap();
1419 }
1420
1421 {
1423 let db = GrafeoDB::open(&db_path).unwrap();
1424
1425 assert_eq!(db.node_count(), 2);
1426 assert_eq!(db.edge_count(), 1);
1427
1428 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1430 assert!(node0.is_some());
1431
1432 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1433 assert!(node1.is_some());
1434 }
1435 }
1436
1437 #[test]
1438 fn test_wal_logging() {
1439 use tempfile::tempdir;
1440
1441 let dir = tempdir().unwrap();
1442 let db_path = dir.path().join("wal_test_db");
1443
1444 let db = GrafeoDB::open(&db_path).unwrap();
1445
1446 let node = db.create_node(&["Test"]);
1448 db.delete_node(node);
1449
1450 if let Some(wal) = db.wal() {
1452 assert!(wal.record_count() > 0);
1453 }
1454
1455 db.close().unwrap();
1456 }
1457}