1use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
11use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::lpg::LpgStore;
14#[cfg(feature = "rdf")]
15use grafeo_core::graph::rdf::RdfStore;
16
17use crate::config::Config;
18use crate::query::cache::QueryCache;
19use crate::session::Session;
20use crate::transaction::TransactionManager;
21
22pub struct GrafeoDB {
45 config: Config,
47 store: Arc<LpgStore>,
49 #[cfg(feature = "rdf")]
51 rdf_store: Arc<RdfStore>,
52 tx_manager: Arc<TransactionManager>,
54 buffer_manager: Arc<BufferManager>,
56 wal: Option<Arc<WalManager>>,
58 query_cache: Arc<QueryCache>,
60 is_open: RwLock<bool>,
62}
63
64impl GrafeoDB {
65 #[must_use]
81 pub fn new_in_memory() -> Self {
82 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
83 }
84
85 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
104 Self::with_config(Config::persistent(path.as_ref()))
105 }
106
107 pub fn with_config(config: Config) -> Result<Self> {
131 let store = Arc::new(LpgStore::new());
132 #[cfg(feature = "rdf")]
133 let rdf_store = Arc::new(RdfStore::new());
134 let tx_manager = Arc::new(TransactionManager::new());
135
136 let buffer_config = BufferManagerConfig {
138 budget: config.memory_limit.unwrap_or_else(|| {
139 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
140 }),
141 spill_path: config
142 .spill_path
143 .clone()
144 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
145 ..BufferManagerConfig::default()
146 };
147 let buffer_manager = BufferManager::new(buffer_config);
148
149 let wal = if config.wal_enabled {
151 if let Some(ref db_path) = config.path {
152 std::fs::create_dir_all(db_path)?;
154
155 let wal_path = db_path.join("wal");
156
157 if wal_path.exists() {
159 let recovery = WalRecovery::new(&wal_path);
160 let records = recovery.recover()?;
161 Self::apply_wal_records(&store, &records)?;
162 }
163
164 let wal_config = WalConfig::default();
166 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
167 Some(Arc::new(wal_manager))
168 } else {
169 None
170 }
171 } else {
172 None
173 };
174
175 let query_cache = Arc::new(QueryCache::default());
177
178 Ok(Self {
179 config,
180 store,
181 #[cfg(feature = "rdf")]
182 rdf_store,
183 tx_manager,
184 buffer_manager,
185 wal,
186 query_cache,
187 is_open: RwLock::new(true),
188 })
189 }
190
191 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
193 for record in records {
194 match record {
195 WalRecord::CreateNode { id, labels } => {
196 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
197 store.create_node_with_id(*id, &label_refs);
198 }
199 WalRecord::DeleteNode { id } => {
200 store.delete_node(*id);
201 }
202 WalRecord::CreateEdge {
203 id,
204 src,
205 dst,
206 edge_type,
207 } => {
208 store.create_edge_with_id(*id, *src, *dst, edge_type);
209 }
210 WalRecord::DeleteEdge { id } => {
211 store.delete_edge(*id);
212 }
213 WalRecord::SetNodeProperty { id, key, value } => {
214 store.set_node_property(*id, key, value.clone());
215 }
216 WalRecord::SetEdgeProperty { id, key, value } => {
217 store.set_edge_property(*id, key, value.clone());
218 }
219 WalRecord::AddNodeLabel { id, label } => {
220 store.add_label(*id, label);
221 }
222 WalRecord::RemoveNodeLabel { id, label } => {
223 store.remove_label(*id, label);
224 }
225 WalRecord::TxCommit { .. }
226 | WalRecord::TxAbort { .. }
227 | WalRecord::Checkpoint { .. } => {
228 }
231 }
232 }
233 Ok(())
234 }
235
236 #[must_use]
255 pub fn session(&self) -> Session {
256 #[cfg(feature = "rdf")]
257 {
258 Session::with_rdf_store_and_adaptive(
259 Arc::clone(&self.store),
260 Arc::clone(&self.rdf_store),
261 Arc::clone(&self.tx_manager),
262 Arc::clone(&self.query_cache),
263 self.config.adaptive.clone(),
264 self.config.factorized_execution,
265 )
266 }
267 #[cfg(not(feature = "rdf"))]
268 {
269 Session::with_adaptive(
270 Arc::clone(&self.store),
271 Arc::clone(&self.tx_manager),
272 Arc::clone(&self.query_cache),
273 self.config.adaptive.clone(),
274 self.config.factorized_execution,
275 )
276 }
277 }
278
279 #[must_use]
281 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
282 &self.config.adaptive
283 }
284
285 pub fn execute(&self, query: &str) -> Result<QueryResult> {
295 let session = self.session();
296 session.execute(query)
297 }
298
299 pub fn execute_with_params(
305 &self,
306 query: &str,
307 params: std::collections::HashMap<String, grafeo_common::types::Value>,
308 ) -> Result<QueryResult> {
309 let session = self.session();
310 session.execute_with_params(query, params)
311 }
312
313 #[cfg(feature = "cypher")]
319 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
320 let session = self.session();
321 session.execute_cypher(query)
322 }
323
324 #[cfg(feature = "cypher")]
330 pub fn execute_cypher_with_params(
331 &self,
332 query: &str,
333 params: std::collections::HashMap<String, grafeo_common::types::Value>,
334 ) -> Result<QueryResult> {
335 use crate::query::processor::{QueryLanguage, QueryProcessor};
336
337 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
339 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
340 }
341
342 #[cfg(feature = "gremlin")]
348 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
349 let session = self.session();
350 session.execute_gremlin(query)
351 }
352
353 #[cfg(feature = "gremlin")]
359 pub fn execute_gremlin_with_params(
360 &self,
361 query: &str,
362 params: std::collections::HashMap<String, grafeo_common::types::Value>,
363 ) -> Result<QueryResult> {
364 let session = self.session();
365 session.execute_gremlin_with_params(query, params)
366 }
367
368 #[cfg(feature = "graphql")]
374 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
375 let session = self.session();
376 session.execute_graphql(query)
377 }
378
379 #[cfg(feature = "graphql")]
385 pub fn execute_graphql_with_params(
386 &self,
387 query: &str,
388 params: std::collections::HashMap<String, grafeo_common::types::Value>,
389 ) -> Result<QueryResult> {
390 let session = self.session();
391 session.execute_graphql_with_params(query, params)
392 }
393
394 #[cfg(all(feature = "sparql", feature = "rdf"))]
411 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
412 use crate::query::{
413 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
414 };
415
416 let logical_plan = sparql_translator::translate(query)?;
418
419 let optimizer = Optimizer::from_store(&self.store);
421 let optimized_plan = optimizer.optimize(logical_plan)?;
422
423 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
425 let mut physical_plan = planner.plan(&optimized_plan)?;
426
427 let executor = Executor::with_columns(physical_plan.columns.clone());
429 executor.execute(physical_plan.operator.as_mut())
430 }
431
432 #[cfg(feature = "rdf")]
436 #[must_use]
437 pub fn rdf_store(&self) -> &Arc<RdfStore> {
438 &self.rdf_store
439 }
440
441 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
447 let result = self.execute(query)?;
448 result.scalar()
449 }
450
451 #[must_use]
453 pub fn config(&self) -> &Config {
454 &self.config
455 }
456
457 #[must_use]
461 pub fn store(&self) -> &Arc<LpgStore> {
462 &self.store
463 }
464
465 #[must_use]
467 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
468 &self.buffer_manager
469 }
470
471 pub fn close(&self) -> Result<()> {
481 let mut is_open = self.is_open.write();
482 if !*is_open {
483 return Ok(());
484 }
485
486 if let Some(ref wal) = self.wal {
488 let epoch = self.store.current_epoch();
489
490 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
492 self.tx_manager.begin()
494 });
495
496 wal.log(&WalRecord::TxCommit {
498 tx_id: checkpoint_tx,
499 })?;
500
501 wal.checkpoint(checkpoint_tx, epoch)?;
503 wal.sync()?;
504 }
505
506 *is_open = false;
507 Ok(())
508 }
509
510 #[must_use]
512 pub fn wal(&self) -> Option<&Arc<WalManager>> {
513 self.wal.as_ref()
514 }
515
516 fn log_wal(&self, record: &WalRecord) -> Result<()> {
518 if let Some(ref wal) = self.wal {
519 wal.log(record)?;
520 }
521 Ok(())
522 }
523
524 #[must_use]
526 pub fn node_count(&self) -> usize {
527 self.store.node_count()
528 }
529
530 #[must_use]
532 pub fn edge_count(&self) -> usize {
533 self.store.edge_count()
534 }
535
536 #[must_use]
538 pub fn label_count(&self) -> usize {
539 self.store.label_count()
540 }
541
542 #[must_use]
544 pub fn property_key_count(&self) -> usize {
545 self.store.property_key_count()
546 }
547
548 #[must_use]
550 pub fn edge_type_count(&self) -> usize {
551 self.store.edge_type_count()
552 }
553
554 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
571 let id = self.store.create_node(labels);
572
573 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
575 id,
576 labels: labels.iter().map(|s| s.to_string()).collect(),
577 }) {
578 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
579 }
580
581 id
582 }
583
584 pub fn create_node_with_props(
588 &self,
589 labels: &[&str],
590 properties: impl IntoIterator<
591 Item = (
592 impl Into<grafeo_common::types::PropertyKey>,
593 impl Into<grafeo_common::types::Value>,
594 ),
595 >,
596 ) -> grafeo_common::types::NodeId {
597 let props: Vec<(
599 grafeo_common::types::PropertyKey,
600 grafeo_common::types::Value,
601 )> = properties
602 .into_iter()
603 .map(|(k, v)| (k.into(), v.into()))
604 .collect();
605
606 let id = self
607 .store
608 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
609
610 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
612 id,
613 labels: labels.iter().map(|s| s.to_string()).collect(),
614 }) {
615 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
616 }
617
618 for (key, value) in props {
620 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
621 id,
622 key: key.to_string(),
623 value,
624 }) {
625 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
626 }
627 }
628
629 id
630 }
631
632 #[must_use]
634 pub fn get_node(
635 &self,
636 id: grafeo_common::types::NodeId,
637 ) -> Option<grafeo_core::graph::lpg::Node> {
638 self.store.get_node(id)
639 }
640
641 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
645 let result = self.store.delete_node(id);
646
647 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
648 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
649 }
650
651 result
652 }
653
654 pub fn set_node_property(
658 &self,
659 id: grafeo_common::types::NodeId,
660 key: &str,
661 value: grafeo_common::types::Value,
662 ) {
663 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
665 id,
666 key: key.to_string(),
667 value: value.clone(),
668 }) {
669 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
670 }
671
672 self.store.set_node_property(id, key, value);
673 }
674
675 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
693 let result = self.store.add_label(id, label);
694
695 if result {
696 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
698 id,
699 label: label.to_string(),
700 }) {
701 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
702 }
703 }
704
705 result
706 }
707
708 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
726 let result = self.store.remove_label(id, label);
727
728 if result {
729 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
731 id,
732 label: label.to_string(),
733 }) {
734 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
735 }
736 }
737
738 result
739 }
740
741 #[must_use]
758 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
759 self.store
760 .get_node(id)
761 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
762 }
763
764 pub fn create_edge(
784 &self,
785 src: grafeo_common::types::NodeId,
786 dst: grafeo_common::types::NodeId,
787 edge_type: &str,
788 ) -> grafeo_common::types::EdgeId {
789 let id = self.store.create_edge(src, dst, edge_type);
790
791 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
793 id,
794 src,
795 dst,
796 edge_type: edge_type.to_string(),
797 }) {
798 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
799 }
800
801 id
802 }
803
804 pub fn create_edge_with_props(
808 &self,
809 src: grafeo_common::types::NodeId,
810 dst: grafeo_common::types::NodeId,
811 edge_type: &str,
812 properties: impl IntoIterator<
813 Item = (
814 impl Into<grafeo_common::types::PropertyKey>,
815 impl Into<grafeo_common::types::Value>,
816 ),
817 >,
818 ) -> grafeo_common::types::EdgeId {
819 let props: Vec<(
821 grafeo_common::types::PropertyKey,
822 grafeo_common::types::Value,
823 )> = properties
824 .into_iter()
825 .map(|(k, v)| (k.into(), v.into()))
826 .collect();
827
828 let id = self.store.create_edge_with_props(
829 src,
830 dst,
831 edge_type,
832 props.iter().map(|(k, v)| (k.clone(), v.clone())),
833 );
834
835 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
837 id,
838 src,
839 dst,
840 edge_type: edge_type.to_string(),
841 }) {
842 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
843 }
844
845 for (key, value) in props {
847 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
848 id,
849 key: key.to_string(),
850 value,
851 }) {
852 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
853 }
854 }
855
856 id
857 }
858
859 #[must_use]
861 pub fn get_edge(
862 &self,
863 id: grafeo_common::types::EdgeId,
864 ) -> Option<grafeo_core::graph::lpg::Edge> {
865 self.store.get_edge(id)
866 }
867
868 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
872 let result = self.store.delete_edge(id);
873
874 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
875 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
876 }
877
878 result
879 }
880
881 pub fn set_edge_property(
885 &self,
886 id: grafeo_common::types::EdgeId,
887 key: &str,
888 value: grafeo_common::types::Value,
889 ) {
890 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
892 id,
893 key: key.to_string(),
894 value: value.clone(),
895 }) {
896 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
897 }
898 self.store.set_edge_property(id, key, value);
899 }
900
901 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
905 self.store.remove_node_property(id, key).is_some()
907 }
908
909 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
913 self.store.remove_edge_property(id, key).is_some()
915 }
916
917 pub fn create_property_index(&self, property: &str) {
937 self.store.create_property_index(property);
938 }
939
940 pub fn create_vector_index(
960 &self,
961 label: &str,
962 property: &str,
963 dimensions: Option<usize>,
964 metric: Option<&str>,
965 m: Option<usize>,
966 ef_construction: Option<usize>,
967 ) -> Result<()> {
968 use grafeo_common::types::{PropertyKey, Value};
969 use grafeo_core::index::vector::DistanceMetric;
970
971 let metric = match metric {
972 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
973 grafeo_common::utils::error::Error::Internal(format!(
974 "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
975 m
976 ))
977 })?,
978 None => DistanceMetric::Cosine,
979 };
980
981 let prop_key = PropertyKey::new(property);
983 let mut found_dims: Option<usize> = dimensions;
984 let mut vector_count = 0usize;
985
986 #[cfg(feature = "vector-index")]
987 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
988
989 for node in self.store.nodes_with_label(label) {
990 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
991 if let Some(expected) = found_dims {
992 if v.len() != expected {
993 return Err(grafeo_common::utils::error::Error::Internal(format!(
994 "Vector dimension mismatch: expected {}, found {} on node {}",
995 expected,
996 v.len(),
997 node.id.0
998 )));
999 }
1000 } else {
1001 found_dims = Some(v.len());
1002 }
1003 vector_count += 1;
1004 #[cfg(feature = "vector-index")]
1005 vectors.push((node.id, v.to_vec()));
1006 }
1007 }
1008
1009 if vector_count == 0 {
1010 return Err(grafeo_common::utils::error::Error::Internal(format!(
1011 "No vector properties found on :{label}({property})"
1012 )));
1013 }
1014
1015 let dims = found_dims.unwrap_or(0);
1016
1017 #[cfg(feature = "vector-index")]
1019 {
1020 use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1021
1022 let mut config = HnswConfig::new(dims, metric);
1023 if let Some(m_val) = m {
1024 config = config.with_m(m_val);
1025 }
1026 if let Some(ef_c) = ef_construction {
1027 config = config.with_ef_construction(ef_c);
1028 }
1029
1030 let index = HnswIndex::with_capacity(config, vectors.len());
1031 for (node_id, vec) in &vectors {
1032 index.insert(*node_id, vec);
1033 }
1034
1035 self.store
1036 .add_vector_index(label, property, Arc::new(index));
1037 }
1038
1039 let _ = (m, ef_construction);
1041
1042 tracing::info!(
1043 "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1044 metric_name = metric.name()
1045 );
1046
1047 Ok(())
1048 }
1049
1050 #[cfg(feature = "vector-index")]
1066 pub fn vector_search(
1067 &self,
1068 label: &str,
1069 property: &str,
1070 query: &[f32],
1071 k: usize,
1072 ef: Option<usize>,
1073 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1074 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1075 grafeo_common::utils::error::Error::Internal(format!(
1076 "No vector index found for :{label}({property}). Call create_vector_index() first."
1077 ))
1078 })?;
1079
1080 let results = match ef {
1081 Some(ef_val) => index.search_with_ef(query, k, ef_val),
1082 None => index.search(query, k),
1083 };
1084
1085 Ok(results)
1086 }
1087
1088 pub fn batch_create_nodes(
1104 &self,
1105 label: &str,
1106 property: &str,
1107 vectors: Vec<Vec<f32>>,
1108 ) -> Vec<grafeo_common::types::NodeId> {
1109 use grafeo_common::types::{PropertyKey, Value};
1110
1111 let prop_key = PropertyKey::new(property);
1112 let labels: &[&str] = &[label];
1113
1114 vectors
1115 .into_iter()
1116 .map(|vec| {
1117 let value = Value::Vector(vec.into());
1118 let id = self.store.create_node_with_props(
1119 labels,
1120 std::iter::once((prop_key.clone(), value.clone())),
1121 );
1122
1123 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1125 id,
1126 labels: labels.iter().map(|s| s.to_string()).collect(),
1127 }) {
1128 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1129 }
1130 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1131 id,
1132 key: property.to_string(),
1133 value,
1134 }) {
1135 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1136 }
1137
1138 id
1139 })
1140 .collect()
1141 }
1142
1143 #[cfg(feature = "vector-index")]
1155 pub fn batch_vector_search(
1156 &self,
1157 label: &str,
1158 property: &str,
1159 queries: &[Vec<f32>],
1160 k: usize,
1161 ef: Option<usize>,
1162 ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1163 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1164 grafeo_common::utils::error::Error::Internal(format!(
1165 "No vector index found for :{label}({property}). Call create_vector_index() first."
1166 ))
1167 })?;
1168
1169 let results = match ef {
1170 Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val),
1171 None => index.batch_search(queries, k),
1172 };
1173
1174 Ok(results)
1175 }
1176
1177 pub fn drop_property_index(&self, property: &str) -> bool {
1181 self.store.drop_property_index(property)
1182 }
1183
1184 #[must_use]
1186 pub fn has_property_index(&self, property: &str) -> bool {
1187 self.store.has_property_index(property)
1188 }
1189
1190 #[must_use]
1205 pub fn find_nodes_by_property(
1206 &self,
1207 property: &str,
1208 value: &grafeo_common::types::Value,
1209 ) -> Vec<grafeo_common::types::NodeId> {
1210 self.store.find_nodes_by_property(property, value)
1211 }
1212
1213 #[must_use]
1221 pub fn is_persistent(&self) -> bool {
1222 self.config.path.is_some()
1223 }
1224
1225 #[must_use]
1229 pub fn path(&self) -> Option<&Path> {
1230 self.config.path.as_deref()
1231 }
1232
1233 #[must_use]
1237 pub fn info(&self) -> crate::admin::DatabaseInfo {
1238 crate::admin::DatabaseInfo {
1239 mode: crate::admin::DatabaseMode::Lpg,
1240 node_count: self.store.node_count(),
1241 edge_count: self.store.edge_count(),
1242 is_persistent: self.is_persistent(),
1243 path: self.config.path.clone(),
1244 wal_enabled: self.config.wal_enabled,
1245 version: env!("CARGO_PKG_VERSION").to_string(),
1246 }
1247 }
1248
1249 #[must_use]
1253 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1254 let disk_bytes = self.config.path.as_ref().and_then(|p| {
1255 if p.exists() {
1256 Self::calculate_disk_usage(p).ok()
1257 } else {
1258 None
1259 }
1260 });
1261
1262 crate::admin::DatabaseStats {
1263 node_count: self.store.node_count(),
1264 edge_count: self.store.edge_count(),
1265 label_count: self.store.label_count(),
1266 edge_type_count: self.store.edge_type_count(),
1267 property_key_count: self.store.property_key_count(),
1268 index_count: 0, memory_bytes: self.buffer_manager.allocated(),
1270 disk_bytes,
1271 }
1272 }
1273
1274 fn calculate_disk_usage(path: &Path) -> Result<usize> {
1276 let mut total = 0usize;
1277 if path.is_dir() {
1278 for entry in std::fs::read_dir(path)? {
1279 let entry = entry?;
1280 let metadata = entry.metadata()?;
1281 if metadata.is_file() {
1282 total += metadata.len() as usize;
1283 } else if metadata.is_dir() {
1284 total += Self::calculate_disk_usage(&entry.path())?;
1285 }
1286 }
1287 }
1288 Ok(total)
1289 }
1290
1291 #[must_use]
1296 pub fn schema(&self) -> crate::admin::SchemaInfo {
1297 let labels = self
1298 .store
1299 .all_labels()
1300 .into_iter()
1301 .map(|name| crate::admin::LabelInfo {
1302 name: name.clone(),
1303 count: self.store.nodes_with_label(&name).count(),
1304 })
1305 .collect();
1306
1307 let edge_types = self
1308 .store
1309 .all_edge_types()
1310 .into_iter()
1311 .map(|name| crate::admin::EdgeTypeInfo {
1312 name: name.clone(),
1313 count: self.store.edges_with_type(&name).count(),
1314 })
1315 .collect();
1316
1317 let property_keys = self.store.all_property_keys();
1318
1319 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1320 labels,
1321 edge_types,
1322 property_keys,
1323 })
1324 }
1325
1326 #[cfg(feature = "rdf")]
1330 #[must_use]
1331 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1332 let stats = self.rdf_store.stats();
1333
1334 let predicates = self
1335 .rdf_store
1336 .predicates()
1337 .into_iter()
1338 .map(|predicate| {
1339 let count = self.rdf_store.triples_with_predicate(&predicate).len();
1340 crate::admin::PredicateInfo {
1341 iri: predicate.to_string(),
1342 count,
1343 }
1344 })
1345 .collect();
1346
1347 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1348 predicates,
1349 named_graphs: Vec::new(), subject_count: stats.subject_count,
1351 object_count: stats.object_count,
1352 })
1353 }
1354
1355 #[must_use]
1363 pub fn validate(&self) -> crate::admin::ValidationResult {
1364 let mut result = crate::admin::ValidationResult::default();
1365
1366 for edge in self.store.all_edges() {
1368 if self.store.get_node(edge.src).is_none() {
1369 result.errors.push(crate::admin::ValidationError {
1370 code: "DANGLING_SRC".to_string(),
1371 message: format!(
1372 "Edge {} references non-existent source node {}",
1373 edge.id.0, edge.src.0
1374 ),
1375 context: Some(format!("edge:{}", edge.id.0)),
1376 });
1377 }
1378 if self.store.get_node(edge.dst).is_none() {
1379 result.errors.push(crate::admin::ValidationError {
1380 code: "DANGLING_DST".to_string(),
1381 message: format!(
1382 "Edge {} references non-existent destination node {}",
1383 edge.id.0, edge.dst.0
1384 ),
1385 context: Some(format!("edge:{}", edge.id.0)),
1386 });
1387 }
1388 }
1389
1390 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1392 result.warnings.push(crate::admin::ValidationWarning {
1393 code: "NO_EDGES".to_string(),
1394 message: "Database has nodes but no edges".to_string(),
1395 context: None,
1396 });
1397 }
1398
1399 result
1400 }
1401
1402 #[must_use]
1406 pub fn wal_status(&self) -> crate::admin::WalStatus {
1407 if let Some(ref wal) = self.wal {
1408 crate::admin::WalStatus {
1409 enabled: true,
1410 path: self.config.path.as_ref().map(|p| p.join("wal")),
1411 size_bytes: wal.size_bytes(),
1412 record_count: wal.record_count() as usize,
1413 last_checkpoint: wal.last_checkpoint_timestamp(),
1414 current_epoch: self.store.current_epoch().as_u64(),
1415 }
1416 } else {
1417 crate::admin::WalStatus {
1418 enabled: false,
1419 path: None,
1420 size_bytes: 0,
1421 record_count: 0,
1422 last_checkpoint: None,
1423 current_epoch: self.store.current_epoch().as_u64(),
1424 }
1425 }
1426 }
1427
1428 pub fn wal_checkpoint(&self) -> Result<()> {
1436 if let Some(ref wal) = self.wal {
1437 let epoch = self.store.current_epoch();
1438 let tx_id = self
1439 .tx_manager
1440 .last_assigned_tx_id()
1441 .unwrap_or_else(|| self.tx_manager.begin());
1442 wal.checkpoint(tx_id, epoch)?;
1443 wal.sync()?;
1444 }
1445 Ok(())
1446 }
1447
1448 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1463 let path = path.as_ref();
1464
1465 let target_config = Config::persistent(path);
1467 let target = Self::with_config(target_config)?;
1468
1469 for node in self.store.all_nodes() {
1471 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1472 target.store.create_node_with_id(node.id, &label_refs);
1473
1474 target.log_wal(&WalRecord::CreateNode {
1476 id: node.id,
1477 labels: node.labels.iter().map(|s| s.to_string()).collect(),
1478 })?;
1479
1480 for (key, value) in node.properties {
1482 target
1483 .store
1484 .set_node_property(node.id, key.as_str(), value.clone());
1485 target.log_wal(&WalRecord::SetNodeProperty {
1486 id: node.id,
1487 key: key.to_string(),
1488 value,
1489 })?;
1490 }
1491 }
1492
1493 for edge in self.store.all_edges() {
1495 target
1496 .store
1497 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1498
1499 target.log_wal(&WalRecord::CreateEdge {
1501 id: edge.id,
1502 src: edge.src,
1503 dst: edge.dst,
1504 edge_type: edge.edge_type.to_string(),
1505 })?;
1506
1507 for (key, value) in edge.properties {
1509 target
1510 .store
1511 .set_edge_property(edge.id, key.as_str(), value.clone());
1512 target.log_wal(&WalRecord::SetEdgeProperty {
1513 id: edge.id,
1514 key: key.to_string(),
1515 value,
1516 })?;
1517 }
1518 }
1519
1520 target.close()?;
1522
1523 Ok(())
1524 }
1525
1526 pub fn to_memory(&self) -> Result<Self> {
1537 let config = Config::in_memory();
1538 let target = Self::with_config(config)?;
1539
1540 for node in self.store.all_nodes() {
1542 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1543 target.store.create_node_with_id(node.id, &label_refs);
1544
1545 for (key, value) in node.properties {
1547 target.store.set_node_property(node.id, key.as_str(), value);
1548 }
1549 }
1550
1551 for edge in self.store.all_edges() {
1553 target
1554 .store
1555 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1556
1557 for (key, value) in edge.properties {
1559 target.store.set_edge_property(edge.id, key.as_str(), value);
1560 }
1561 }
1562
1563 Ok(target)
1564 }
1565
1566 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1575 let source = Self::open(path)?;
1577
1578 let target = source.to_memory()?;
1580
1581 source.close()?;
1583
1584 Ok(target)
1585 }
1586
1587 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1595 self.store.all_nodes()
1596 }
1597
1598 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1602 self.store.all_edges()
1603 }
1604}
1605
1606impl Drop for GrafeoDB {
1607 fn drop(&mut self) {
1608 if let Err(e) = self.close() {
1609 tracing::error!("Error closing database: {}", e);
1610 }
1611 }
1612}
1613
1614#[derive(Debug)]
1640pub struct QueryResult {
1641 pub columns: Vec<String>,
1643 pub column_types: Vec<grafeo_common::types::LogicalType>,
1645 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1647 pub execution_time_ms: Option<f64>,
1649 pub rows_scanned: Option<u64>,
1651}
1652
1653impl QueryResult {
1654 #[must_use]
1656 pub fn new(columns: Vec<String>) -> Self {
1657 let len = columns.len();
1658 Self {
1659 columns,
1660 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1661 rows: Vec::new(),
1662 execution_time_ms: None,
1663 rows_scanned: None,
1664 }
1665 }
1666
1667 #[must_use]
1669 pub fn with_types(
1670 columns: Vec<String>,
1671 column_types: Vec<grafeo_common::types::LogicalType>,
1672 ) -> Self {
1673 Self {
1674 columns,
1675 column_types,
1676 rows: Vec::new(),
1677 execution_time_ms: None,
1678 rows_scanned: None,
1679 }
1680 }
1681
1682 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1684 self.execution_time_ms = Some(execution_time_ms);
1685 self.rows_scanned = Some(rows_scanned);
1686 self
1687 }
1688
1689 #[must_use]
1691 pub fn execution_time_ms(&self) -> Option<f64> {
1692 self.execution_time_ms
1693 }
1694
1695 #[must_use]
1697 pub fn rows_scanned(&self) -> Option<u64> {
1698 self.rows_scanned
1699 }
1700
1701 #[must_use]
1703 pub fn row_count(&self) -> usize {
1704 self.rows.len()
1705 }
1706
1707 #[must_use]
1709 pub fn column_count(&self) -> usize {
1710 self.columns.len()
1711 }
1712
1713 #[must_use]
1715 pub fn is_empty(&self) -> bool {
1716 self.rows.is_empty()
1717 }
1718
1719 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1728 if self.rows.len() != 1 || self.columns.len() != 1 {
1729 return Err(grafeo_common::utils::error::Error::InvalidValue(
1730 "Expected single value".to_string(),
1731 ));
1732 }
1733 T::from_value(&self.rows[0][0])
1734 }
1735
1736 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1738 self.rows.iter()
1739 }
1740}
1741
1742pub trait FromValue: Sized {
1747 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1749}
1750
1751impl FromValue for i64 {
1752 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1753 value
1754 .as_int64()
1755 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1756 expected: "INT64".to_string(),
1757 found: value.type_name().to_string(),
1758 })
1759 }
1760}
1761
1762impl FromValue for f64 {
1763 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1764 value
1765 .as_float64()
1766 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1767 expected: "FLOAT64".to_string(),
1768 found: value.type_name().to_string(),
1769 })
1770 }
1771}
1772
1773impl FromValue for String {
1774 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1775 value.as_str().map(String::from).ok_or_else(|| {
1776 grafeo_common::utils::error::Error::TypeMismatch {
1777 expected: "STRING".to_string(),
1778 found: value.type_name().to_string(),
1779 }
1780 })
1781 }
1782}
1783
1784impl FromValue for bool {
1785 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1786 value
1787 .as_bool()
1788 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1789 expected: "BOOL".to_string(),
1790 found: value.type_name().to_string(),
1791 })
1792 }
1793}
1794
1795#[cfg(test)]
1796mod tests {
1797 use super::*;
1798
1799 #[test]
1800 fn test_create_in_memory_database() {
1801 let db = GrafeoDB::new_in_memory();
1802 assert_eq!(db.node_count(), 0);
1803 assert_eq!(db.edge_count(), 0);
1804 }
1805
1806 #[test]
1807 fn test_database_config() {
1808 let config = Config::in_memory().with_threads(4).with_query_logging();
1809
1810 let db = GrafeoDB::with_config(config).unwrap();
1811 assert_eq!(db.config().threads, 4);
1812 assert!(db.config().query_logging);
1813 }
1814
1815 #[test]
1816 fn test_database_session() {
1817 let db = GrafeoDB::new_in_memory();
1818 let _session = db.session();
1819 }
1821
1822 #[test]
1823 fn test_persistent_database_recovery() {
1824 use grafeo_common::types::Value;
1825 use tempfile::tempdir;
1826
1827 let dir = tempdir().unwrap();
1828 let db_path = dir.path().join("test_db");
1829
1830 {
1832 let db = GrafeoDB::open(&db_path).unwrap();
1833
1834 let alice = db.create_node(&["Person"]);
1835 db.set_node_property(alice, "name", Value::from("Alice"));
1836
1837 let bob = db.create_node(&["Person"]);
1838 db.set_node_property(bob, "name", Value::from("Bob"));
1839
1840 let _edge = db.create_edge(alice, bob, "KNOWS");
1841
1842 db.close().unwrap();
1844 }
1845
1846 {
1848 let db = GrafeoDB::open(&db_path).unwrap();
1849
1850 assert_eq!(db.node_count(), 2);
1851 assert_eq!(db.edge_count(), 1);
1852
1853 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1855 assert!(node0.is_some());
1856
1857 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1858 assert!(node1.is_some());
1859 }
1860 }
1861
1862 #[test]
1863 fn test_wal_logging() {
1864 use tempfile::tempdir;
1865
1866 let dir = tempdir().unwrap();
1867 let db_path = dir.path().join("wal_test_db");
1868
1869 let db = GrafeoDB::open(&db_path).unwrap();
1870
1871 let node = db.create_node(&["Test"]);
1873 db.delete_node(node);
1874
1875 if let Some(wal) = db.wal() {
1877 assert!(wal.record_count() > 0);
1878 }
1879
1880 db.close().unwrap();
1881 }
1882
1883 #[test]
1884 fn test_wal_recovery_multiple_sessions() {
1885 use grafeo_common::types::Value;
1887 use tempfile::tempdir;
1888
1889 let dir = tempdir().unwrap();
1890 let db_path = dir.path().join("multi_session_db");
1891
1892 {
1894 let db = GrafeoDB::open(&db_path).unwrap();
1895 let alice = db.create_node(&["Person"]);
1896 db.set_node_property(alice, "name", Value::from("Alice"));
1897 db.close().unwrap();
1898 }
1899
1900 {
1902 let db = GrafeoDB::open(&db_path).unwrap();
1903 assert_eq!(db.node_count(), 1); let bob = db.create_node(&["Person"]);
1905 db.set_node_property(bob, "name", Value::from("Bob"));
1906 db.close().unwrap();
1907 }
1908
1909 {
1911 let db = GrafeoDB::open(&db_path).unwrap();
1912 assert_eq!(db.node_count(), 2);
1913
1914 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1916 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1917
1918 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1919 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1920 }
1921 }
1922
1923 #[test]
1924 fn test_database_consistency_after_mutations() {
1925 use grafeo_common::types::Value;
1927 use tempfile::tempdir;
1928
1929 let dir = tempdir().unwrap();
1930 let db_path = dir.path().join("consistency_db");
1931
1932 {
1933 let db = GrafeoDB::open(&db_path).unwrap();
1934
1935 let a = db.create_node(&["Node"]);
1937 let b = db.create_node(&["Node"]);
1938 let c = db.create_node(&["Node"]);
1939
1940 let e1 = db.create_edge(a, b, "LINKS");
1942 let _e2 = db.create_edge(b, c, "LINKS");
1943
1944 db.delete_edge(e1);
1946 db.delete_node(b);
1947
1948 db.set_node_property(a, "value", Value::Int64(1));
1950 db.set_node_property(c, "value", Value::Int64(3));
1951
1952 db.close().unwrap();
1953 }
1954
1955 {
1957 let db = GrafeoDB::open(&db_path).unwrap();
1958
1959 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1963 assert!(node_a.is_some());
1964
1965 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1966 assert!(node_c.is_some());
1967
1968 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1970 assert!(node_b.is_none());
1971 }
1972 }
1973
1974 #[test]
1975 fn test_close_is_idempotent() {
1976 use tempfile::tempdir;
1978
1979 let dir = tempdir().unwrap();
1980 let db_path = dir.path().join("close_test_db");
1981
1982 let db = GrafeoDB::open(&db_path).unwrap();
1983 db.create_node(&["Test"]);
1984
1985 assert!(db.close().is_ok());
1987
1988 assert!(db.close().is_ok());
1990 }
1991
1992 #[test]
1993 fn test_query_result_has_metrics() {
1994 let db = GrafeoDB::new_in_memory();
1996 db.create_node(&["Person"]);
1997 db.create_node(&["Person"]);
1998
1999 #[cfg(feature = "gql")]
2000 {
2001 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2002
2003 assert!(result.execution_time_ms.is_some());
2005 assert!(result.rows_scanned.is_some());
2006 assert!(result.execution_time_ms.unwrap() >= 0.0);
2007 assert_eq!(result.rows_scanned.unwrap(), 2);
2008 }
2009 }
2010
2011 #[test]
2012 fn test_empty_query_result_metrics() {
2013 let db = GrafeoDB::new_in_memory();
2015 db.create_node(&["Person"]);
2016
2017 #[cfg(feature = "gql")]
2018 {
2019 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2021
2022 assert!(result.execution_time_ms.is_some());
2023 assert!(result.rows_scanned.is_some());
2024 assert_eq!(result.rows_scanned.unwrap(), 0);
2025 }
2026 }
2027}