1use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10#[cfg(feature = "wal")]
11use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
12use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
13use grafeo_common::utils::error::Result;
14use grafeo_core::graph::lpg::LpgStore;
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::Config;
19use crate::query::cache::QueryCache;
20use crate::session::Session;
21use crate::transaction::TransactionManager;
22
23pub struct GrafeoDB {
46 config: Config,
48 store: Arc<LpgStore>,
50 #[cfg(feature = "rdf")]
52 rdf_store: Arc<RdfStore>,
53 tx_manager: Arc<TransactionManager>,
55 buffer_manager: Arc<BufferManager>,
57 #[cfg(feature = "wal")]
59 wal: Option<Arc<WalManager>>,
60 query_cache: Arc<QueryCache>,
62 is_open: RwLock<bool>,
64}
65
66impl GrafeoDB {
67 #[must_use]
83 pub fn new_in_memory() -> Self {
84 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
85 }
86
87 #[cfg(feature = "wal")]
106 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
107 Self::with_config(Config::persistent(path.as_ref()))
108 }
109
110 pub fn with_config(config: Config) -> Result<Self> {
134 let store = Arc::new(LpgStore::new());
135 #[cfg(feature = "rdf")]
136 let rdf_store = Arc::new(RdfStore::new());
137 let tx_manager = Arc::new(TransactionManager::new());
138
139 let buffer_config = BufferManagerConfig {
141 budget: config.memory_limit.unwrap_or_else(|| {
142 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
143 }),
144 spill_path: config
145 .spill_path
146 .clone()
147 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
148 ..BufferManagerConfig::default()
149 };
150 let buffer_manager = BufferManager::new(buffer_config);
151
152 #[cfg(feature = "wal")]
154 let wal = if config.wal_enabled {
155 if let Some(ref db_path) = config.path {
156 std::fs::create_dir_all(db_path)?;
158
159 let wal_path = db_path.join("wal");
160
161 if wal_path.exists() {
163 let recovery = WalRecovery::new(&wal_path);
164 let records = recovery.recover()?;
165 Self::apply_wal_records(&store, &records)?;
166 }
167
168 let wal_config = WalConfig::default();
170 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
171 Some(Arc::new(wal_manager))
172 } else {
173 None
174 }
175 } else {
176 None
177 };
178
179 let query_cache = Arc::new(QueryCache::default());
181
182 Ok(Self {
183 config,
184 store,
185 #[cfg(feature = "rdf")]
186 rdf_store,
187 tx_manager,
188 buffer_manager,
189 #[cfg(feature = "wal")]
190 wal,
191 query_cache,
192 is_open: RwLock::new(true),
193 })
194 }
195
196 #[cfg(feature = "wal")]
198 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
199 for record in records {
200 match record {
201 WalRecord::CreateNode { id, labels } => {
202 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
203 store.create_node_with_id(*id, &label_refs);
204 }
205 WalRecord::DeleteNode { id } => {
206 store.delete_node(*id);
207 }
208 WalRecord::CreateEdge {
209 id,
210 src,
211 dst,
212 edge_type,
213 } => {
214 store.create_edge_with_id(*id, *src, *dst, edge_type);
215 }
216 WalRecord::DeleteEdge { id } => {
217 store.delete_edge(*id);
218 }
219 WalRecord::SetNodeProperty { id, key, value } => {
220 store.set_node_property(*id, key, value.clone());
221 }
222 WalRecord::SetEdgeProperty { id, key, value } => {
223 store.set_edge_property(*id, key, value.clone());
224 }
225 WalRecord::AddNodeLabel { id, label } => {
226 store.add_label(*id, label);
227 }
228 WalRecord::RemoveNodeLabel { id, label } => {
229 store.remove_label(*id, label);
230 }
231 WalRecord::TxCommit { .. }
232 | WalRecord::TxAbort { .. }
233 | WalRecord::Checkpoint { .. } => {
234 }
237 }
238 }
239 Ok(())
240 }
241
242 #[must_use]
261 pub fn session(&self) -> Session {
262 #[cfg(feature = "rdf")]
263 {
264 Session::with_rdf_store_and_adaptive(
265 Arc::clone(&self.store),
266 Arc::clone(&self.rdf_store),
267 Arc::clone(&self.tx_manager),
268 Arc::clone(&self.query_cache),
269 self.config.adaptive.clone(),
270 self.config.factorized_execution,
271 )
272 }
273 #[cfg(not(feature = "rdf"))]
274 {
275 Session::with_adaptive(
276 Arc::clone(&self.store),
277 Arc::clone(&self.tx_manager),
278 Arc::clone(&self.query_cache),
279 self.config.adaptive.clone(),
280 self.config.factorized_execution,
281 )
282 }
283 }
284
285 #[must_use]
287 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
288 &self.config.adaptive
289 }
290
291 pub fn execute(&self, query: &str) -> Result<QueryResult> {
301 let session = self.session();
302 session.execute(query)
303 }
304
305 pub fn execute_with_params(
311 &self,
312 query: &str,
313 params: std::collections::HashMap<String, grafeo_common::types::Value>,
314 ) -> Result<QueryResult> {
315 let session = self.session();
316 session.execute_with_params(query, params)
317 }
318
319 #[cfg(feature = "cypher")]
325 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
326 let session = self.session();
327 session.execute_cypher(query)
328 }
329
330 #[cfg(feature = "cypher")]
336 pub fn execute_cypher_with_params(
337 &self,
338 query: &str,
339 params: std::collections::HashMap<String, grafeo_common::types::Value>,
340 ) -> Result<QueryResult> {
341 use crate::query::processor::{QueryLanguage, QueryProcessor};
342
343 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
345 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
346 }
347
348 #[cfg(feature = "gremlin")]
354 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
355 let session = self.session();
356 session.execute_gremlin(query)
357 }
358
359 #[cfg(feature = "gremlin")]
365 pub fn execute_gremlin_with_params(
366 &self,
367 query: &str,
368 params: std::collections::HashMap<String, grafeo_common::types::Value>,
369 ) -> Result<QueryResult> {
370 let session = self.session();
371 session.execute_gremlin_with_params(query, params)
372 }
373
374 #[cfg(feature = "graphql")]
380 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
381 let session = self.session();
382 session.execute_graphql(query)
383 }
384
385 #[cfg(feature = "graphql")]
391 pub fn execute_graphql_with_params(
392 &self,
393 query: &str,
394 params: std::collections::HashMap<String, grafeo_common::types::Value>,
395 ) -> Result<QueryResult> {
396 let session = self.session();
397 session.execute_graphql_with_params(query, params)
398 }
399
400 #[cfg(all(feature = "sparql", feature = "rdf"))]
417 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
418 use crate::query::{
419 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
420 };
421
422 let logical_plan = sparql_translator::translate(query)?;
424
425 let optimizer = Optimizer::from_store(&self.store);
427 let optimized_plan = optimizer.optimize(logical_plan)?;
428
429 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
431 let mut physical_plan = planner.plan(&optimized_plan)?;
432
433 let executor = Executor::with_columns(physical_plan.columns.clone());
435 executor.execute(physical_plan.operator.as_mut())
436 }
437
438 #[cfg(feature = "rdf")]
442 #[must_use]
443 pub fn rdf_store(&self) -> &Arc<RdfStore> {
444 &self.rdf_store
445 }
446
447 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
453 let result = self.execute(query)?;
454 result.scalar()
455 }
456
457 #[must_use]
459 pub fn config(&self) -> &Config {
460 &self.config
461 }
462
463 #[must_use]
467 pub fn store(&self) -> &Arc<LpgStore> {
468 &self.store
469 }
470
471 #[must_use]
473 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
474 &self.buffer_manager
475 }
476
477 pub fn close(&self) -> Result<()> {
487 let mut is_open = self.is_open.write();
488 if !*is_open {
489 return Ok(());
490 }
491
492 #[cfg(feature = "wal")]
494 if let Some(ref wal) = self.wal {
495 let epoch = self.store.current_epoch();
496
497 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
499 self.tx_manager.begin()
501 });
502
503 wal.log(&WalRecord::TxCommit {
505 tx_id: checkpoint_tx,
506 })?;
507
508 wal.checkpoint(checkpoint_tx, epoch)?;
510 wal.sync()?;
511 }
512
513 *is_open = false;
514 Ok(())
515 }
516
517 #[cfg(feature = "wal")]
519 #[must_use]
520 pub fn wal(&self) -> Option<&Arc<WalManager>> {
521 self.wal.as_ref()
522 }
523
524 #[cfg(feature = "wal")]
526 fn log_wal(&self, record: &WalRecord) -> Result<()> {
527 if let Some(ref wal) = self.wal {
528 wal.log(record)?;
529 }
530 Ok(())
531 }
532
533 #[must_use]
535 pub fn node_count(&self) -> usize {
536 self.store.node_count()
537 }
538
539 #[must_use]
541 pub fn edge_count(&self) -> usize {
542 self.store.edge_count()
543 }
544
545 #[must_use]
547 pub fn label_count(&self) -> usize {
548 self.store.label_count()
549 }
550
551 #[must_use]
553 pub fn property_key_count(&self) -> usize {
554 self.store.property_key_count()
555 }
556
557 #[must_use]
559 pub fn edge_type_count(&self) -> usize {
560 self.store.edge_type_count()
561 }
562
563 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
580 let id = self.store.create_node(labels);
581
582 #[cfg(feature = "wal")]
584 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
585 id,
586 labels: labels.iter().map(|s| s.to_string()).collect(),
587 }) {
588 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
589 }
590
591 id
592 }
593
594 pub fn create_node_with_props(
598 &self,
599 labels: &[&str],
600 properties: impl IntoIterator<
601 Item = (
602 impl Into<grafeo_common::types::PropertyKey>,
603 impl Into<grafeo_common::types::Value>,
604 ),
605 >,
606 ) -> grafeo_common::types::NodeId {
607 let props: Vec<(
609 grafeo_common::types::PropertyKey,
610 grafeo_common::types::Value,
611 )> = properties
612 .into_iter()
613 .map(|(k, v)| (k.into(), v.into()))
614 .collect();
615
616 let id = self
617 .store
618 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
619
620 #[cfg(feature = "wal")]
622 {
623 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
624 id,
625 labels: labels.iter().map(|s| s.to_string()).collect(),
626 }) {
627 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
628 }
629
630 for (key, value) in props {
632 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
633 id,
634 key: key.to_string(),
635 value,
636 }) {
637 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
638 }
639 }
640 }
641
642 id
643 }
644
645 #[must_use]
647 pub fn get_node(
648 &self,
649 id: grafeo_common::types::NodeId,
650 ) -> Option<grafeo_core::graph::lpg::Node> {
651 self.store.get_node(id)
652 }
653
654 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
658 let result = self.store.delete_node(id);
659
660 #[cfg(feature = "wal")]
661 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
662 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
663 }
664
665 result
666 }
667
668 pub fn set_node_property(
672 &self,
673 id: grafeo_common::types::NodeId,
674 key: &str,
675 value: grafeo_common::types::Value,
676 ) {
677 #[cfg(feature = "wal")]
679 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
680 id,
681 key: key.to_string(),
682 value: value.clone(),
683 }) {
684 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
685 }
686
687 self.store.set_node_property(id, key, value);
688 }
689
690 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
708 let result = self.store.add_label(id, label);
709
710 #[cfg(feature = "wal")]
711 if result {
712 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
714 id,
715 label: label.to_string(),
716 }) {
717 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
718 }
719 }
720
721 result
722 }
723
724 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
742 let result = self.store.remove_label(id, label);
743
744 #[cfg(feature = "wal")]
745 if result {
746 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
748 id,
749 label: label.to_string(),
750 }) {
751 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
752 }
753 }
754
755 result
756 }
757
758 #[must_use]
775 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
776 self.store
777 .get_node(id)
778 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
779 }
780
781 pub fn create_edge(
801 &self,
802 src: grafeo_common::types::NodeId,
803 dst: grafeo_common::types::NodeId,
804 edge_type: &str,
805 ) -> grafeo_common::types::EdgeId {
806 let id = self.store.create_edge(src, dst, edge_type);
807
808 #[cfg(feature = "wal")]
810 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
811 id,
812 src,
813 dst,
814 edge_type: edge_type.to_string(),
815 }) {
816 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
817 }
818
819 id
820 }
821
822 pub fn create_edge_with_props(
826 &self,
827 src: grafeo_common::types::NodeId,
828 dst: grafeo_common::types::NodeId,
829 edge_type: &str,
830 properties: impl IntoIterator<
831 Item = (
832 impl Into<grafeo_common::types::PropertyKey>,
833 impl Into<grafeo_common::types::Value>,
834 ),
835 >,
836 ) -> grafeo_common::types::EdgeId {
837 let props: Vec<(
839 grafeo_common::types::PropertyKey,
840 grafeo_common::types::Value,
841 )> = properties
842 .into_iter()
843 .map(|(k, v)| (k.into(), v.into()))
844 .collect();
845
846 let id = self.store.create_edge_with_props(
847 src,
848 dst,
849 edge_type,
850 props.iter().map(|(k, v)| (k.clone(), v.clone())),
851 );
852
853 #[cfg(feature = "wal")]
855 {
856 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
857 id,
858 src,
859 dst,
860 edge_type: edge_type.to_string(),
861 }) {
862 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
863 }
864
865 for (key, value) in props {
867 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
868 id,
869 key: key.to_string(),
870 value,
871 }) {
872 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
873 }
874 }
875 }
876
877 id
878 }
879
880 #[must_use]
882 pub fn get_edge(
883 &self,
884 id: grafeo_common::types::EdgeId,
885 ) -> Option<grafeo_core::graph::lpg::Edge> {
886 self.store.get_edge(id)
887 }
888
889 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
893 let result = self.store.delete_edge(id);
894
895 #[cfg(feature = "wal")]
896 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
897 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
898 }
899
900 result
901 }
902
903 pub fn set_edge_property(
907 &self,
908 id: grafeo_common::types::EdgeId,
909 key: &str,
910 value: grafeo_common::types::Value,
911 ) {
912 #[cfg(feature = "wal")]
914 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
915 id,
916 key: key.to_string(),
917 value: value.clone(),
918 }) {
919 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
920 }
921 self.store.set_edge_property(id, key, value);
922 }
923
924 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
928 self.store.remove_node_property(id, key).is_some()
930 }
931
932 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
936 self.store.remove_edge_property(id, key).is_some()
938 }
939
940 pub fn create_property_index(&self, property: &str) {
960 self.store.create_property_index(property);
961 }
962
963 pub fn create_vector_index(
983 &self,
984 label: &str,
985 property: &str,
986 dimensions: Option<usize>,
987 metric: Option<&str>,
988 m: Option<usize>,
989 ef_construction: Option<usize>,
990 ) -> Result<()> {
991 use grafeo_common::types::{PropertyKey, Value};
992 use grafeo_core::index::vector::DistanceMetric;
993
994 let metric = match metric {
995 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
996 grafeo_common::utils::error::Error::Internal(format!(
997 "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
998 m
999 ))
1000 })?,
1001 None => DistanceMetric::Cosine,
1002 };
1003
1004 let prop_key = PropertyKey::new(property);
1006 let mut found_dims: Option<usize> = dimensions;
1007 let mut vector_count = 0usize;
1008
1009 #[cfg(feature = "vector-index")]
1010 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1011
1012 for node in self.store.nodes_with_label(label) {
1013 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1014 if let Some(expected) = found_dims {
1015 if v.len() != expected {
1016 return Err(grafeo_common::utils::error::Error::Internal(format!(
1017 "Vector dimension mismatch: expected {}, found {} on node {}",
1018 expected,
1019 v.len(),
1020 node.id.0
1021 )));
1022 }
1023 } else {
1024 found_dims = Some(v.len());
1025 }
1026 vector_count += 1;
1027 #[cfg(feature = "vector-index")]
1028 vectors.push((node.id, v.to_vec()));
1029 }
1030 }
1031
1032 if vector_count == 0 {
1033 return Err(grafeo_common::utils::error::Error::Internal(format!(
1034 "No vector properties found on :{label}({property})"
1035 )));
1036 }
1037
1038 let dims = found_dims.unwrap_or(0);
1039
1040 #[cfg(feature = "vector-index")]
1042 {
1043 use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1044
1045 let mut config = HnswConfig::new(dims, metric);
1046 if let Some(m_val) = m {
1047 config = config.with_m(m_val);
1048 }
1049 if let Some(ef_c) = ef_construction {
1050 config = config.with_ef_construction(ef_c);
1051 }
1052
1053 let index = HnswIndex::with_capacity(config, vectors.len());
1054 for (node_id, vec) in &vectors {
1055 index.insert(*node_id, vec);
1056 }
1057
1058 self.store
1059 .add_vector_index(label, property, Arc::new(index));
1060 }
1061
1062 let _ = (m, ef_construction);
1064
1065 tracing::info!(
1066 "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1067 metric_name = metric.name()
1068 );
1069
1070 Ok(())
1071 }
1072
1073 #[cfg(feature = "vector-index")]
1089 pub fn vector_search(
1090 &self,
1091 label: &str,
1092 property: &str,
1093 query: &[f32],
1094 k: usize,
1095 ef: Option<usize>,
1096 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1097 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1098 grafeo_common::utils::error::Error::Internal(format!(
1099 "No vector index found for :{label}({property}). Call create_vector_index() first."
1100 ))
1101 })?;
1102
1103 let results = match ef {
1104 Some(ef_val) => index.search_with_ef(query, k, ef_val),
1105 None => index.search(query, k),
1106 };
1107
1108 Ok(results)
1109 }
1110
1111 pub fn batch_create_nodes(
1127 &self,
1128 label: &str,
1129 property: &str,
1130 vectors: Vec<Vec<f32>>,
1131 ) -> Vec<grafeo_common::types::NodeId> {
1132 use grafeo_common::types::{PropertyKey, Value};
1133
1134 let prop_key = PropertyKey::new(property);
1135 let labels: &[&str] = &[label];
1136
1137 vectors
1138 .into_iter()
1139 .map(|vec| {
1140 let value = Value::Vector(vec.into());
1141 let id = self.store.create_node_with_props(
1142 labels,
1143 std::iter::once((prop_key.clone(), value.clone())),
1144 );
1145
1146 #[cfg(feature = "wal")]
1148 {
1149 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1150 id,
1151 labels: labels.iter().map(|s| s.to_string()).collect(),
1152 }) {
1153 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1154 }
1155 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1156 id,
1157 key: property.to_string(),
1158 value,
1159 }) {
1160 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1161 }
1162 }
1163
1164 id
1165 })
1166 .collect()
1167 }
1168
1169 #[cfg(feature = "vector-index")]
1181 pub fn batch_vector_search(
1182 &self,
1183 label: &str,
1184 property: &str,
1185 queries: &[Vec<f32>],
1186 k: usize,
1187 ef: Option<usize>,
1188 ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1189 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1190 grafeo_common::utils::error::Error::Internal(format!(
1191 "No vector index found for :{label}({property}). Call create_vector_index() first."
1192 ))
1193 })?;
1194
1195 let results = match ef {
1196 Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val),
1197 None => index.batch_search(queries, k),
1198 };
1199
1200 Ok(results)
1201 }
1202
1203 pub fn drop_property_index(&self, property: &str) -> bool {
1207 self.store.drop_property_index(property)
1208 }
1209
1210 #[must_use]
1212 pub fn has_property_index(&self, property: &str) -> bool {
1213 self.store.has_property_index(property)
1214 }
1215
1216 #[must_use]
1231 pub fn find_nodes_by_property(
1232 &self,
1233 property: &str,
1234 value: &grafeo_common::types::Value,
1235 ) -> Vec<grafeo_common::types::NodeId> {
1236 self.store.find_nodes_by_property(property, value)
1237 }
1238
1239 #[must_use]
1247 pub fn is_persistent(&self) -> bool {
1248 self.config.path.is_some()
1249 }
1250
1251 #[must_use]
1255 pub fn path(&self) -> Option<&Path> {
1256 self.config.path.as_deref()
1257 }
1258
1259 #[must_use]
1263 pub fn info(&self) -> crate::admin::DatabaseInfo {
1264 crate::admin::DatabaseInfo {
1265 mode: crate::admin::DatabaseMode::Lpg,
1266 node_count: self.store.node_count(),
1267 edge_count: self.store.edge_count(),
1268 is_persistent: self.is_persistent(),
1269 path: self.config.path.clone(),
1270 wal_enabled: self.config.wal_enabled,
1271 version: env!("CARGO_PKG_VERSION").to_string(),
1272 }
1273 }
1274
1275 #[must_use]
1279 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1280 #[cfg(feature = "wal")]
1281 let disk_bytes = self.config.path.as_ref().and_then(|p| {
1282 if p.exists() {
1283 Self::calculate_disk_usage(p).ok()
1284 } else {
1285 None
1286 }
1287 });
1288 #[cfg(not(feature = "wal"))]
1289 let disk_bytes: Option<usize> = None;
1290
1291 crate::admin::DatabaseStats {
1292 node_count: self.store.node_count(),
1293 edge_count: self.store.edge_count(),
1294 label_count: self.store.label_count(),
1295 edge_type_count: self.store.edge_type_count(),
1296 property_key_count: self.store.property_key_count(),
1297 index_count: 0, memory_bytes: self.buffer_manager.allocated(),
1299 disk_bytes,
1300 }
1301 }
1302
1303 #[cfg(feature = "wal")]
1305 fn calculate_disk_usage(path: &Path) -> Result<usize> {
1306 let mut total = 0usize;
1307 if path.is_dir() {
1308 for entry in std::fs::read_dir(path)? {
1309 let entry = entry?;
1310 let metadata = entry.metadata()?;
1311 if metadata.is_file() {
1312 total += metadata.len() as usize;
1313 } else if metadata.is_dir() {
1314 total += Self::calculate_disk_usage(&entry.path())?;
1315 }
1316 }
1317 }
1318 Ok(total)
1319 }
1320
1321 #[must_use]
1326 pub fn schema(&self) -> crate::admin::SchemaInfo {
1327 let labels = self
1328 .store
1329 .all_labels()
1330 .into_iter()
1331 .map(|name| crate::admin::LabelInfo {
1332 name: name.clone(),
1333 count: self.store.nodes_with_label(&name).count(),
1334 })
1335 .collect();
1336
1337 let edge_types = self
1338 .store
1339 .all_edge_types()
1340 .into_iter()
1341 .map(|name| crate::admin::EdgeTypeInfo {
1342 name: name.clone(),
1343 count: self.store.edges_with_type(&name).count(),
1344 })
1345 .collect();
1346
1347 let property_keys = self.store.all_property_keys();
1348
1349 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1350 labels,
1351 edge_types,
1352 property_keys,
1353 })
1354 }
1355
1356 #[cfg(feature = "rdf")]
1360 #[must_use]
1361 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1362 let stats = self.rdf_store.stats();
1363
1364 let predicates = self
1365 .rdf_store
1366 .predicates()
1367 .into_iter()
1368 .map(|predicate| {
1369 let count = self.rdf_store.triples_with_predicate(&predicate).len();
1370 crate::admin::PredicateInfo {
1371 iri: predicate.to_string(),
1372 count,
1373 }
1374 })
1375 .collect();
1376
1377 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1378 predicates,
1379 named_graphs: Vec::new(), subject_count: stats.subject_count,
1381 object_count: stats.object_count,
1382 })
1383 }
1384
1385 #[must_use]
1393 pub fn validate(&self) -> crate::admin::ValidationResult {
1394 let mut result = crate::admin::ValidationResult::default();
1395
1396 for edge in self.store.all_edges() {
1398 if self.store.get_node(edge.src).is_none() {
1399 result.errors.push(crate::admin::ValidationError {
1400 code: "DANGLING_SRC".to_string(),
1401 message: format!(
1402 "Edge {} references non-existent source node {}",
1403 edge.id.0, edge.src.0
1404 ),
1405 context: Some(format!("edge:{}", edge.id.0)),
1406 });
1407 }
1408 if self.store.get_node(edge.dst).is_none() {
1409 result.errors.push(crate::admin::ValidationError {
1410 code: "DANGLING_DST".to_string(),
1411 message: format!(
1412 "Edge {} references non-existent destination node {}",
1413 edge.id.0, edge.dst.0
1414 ),
1415 context: Some(format!("edge:{}", edge.id.0)),
1416 });
1417 }
1418 }
1419
1420 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1422 result.warnings.push(crate::admin::ValidationWarning {
1423 code: "NO_EDGES".to_string(),
1424 message: "Database has nodes but no edges".to_string(),
1425 context: None,
1426 });
1427 }
1428
1429 result
1430 }
1431
1432 #[must_use]
1436 pub fn wal_status(&self) -> crate::admin::WalStatus {
1437 #[cfg(feature = "wal")]
1438 if let Some(ref wal) = self.wal {
1439 return crate::admin::WalStatus {
1440 enabled: true,
1441 path: self.config.path.as_ref().map(|p| p.join("wal")),
1442 size_bytes: wal.size_bytes(),
1443 record_count: wal.record_count() as usize,
1444 last_checkpoint: wal.last_checkpoint_timestamp(),
1445 current_epoch: self.store.current_epoch().as_u64(),
1446 };
1447 }
1448
1449 crate::admin::WalStatus {
1450 enabled: false,
1451 path: None,
1452 size_bytes: 0,
1453 record_count: 0,
1454 last_checkpoint: None,
1455 current_epoch: self.store.current_epoch().as_u64(),
1456 }
1457 }
1458
1459 pub fn wal_checkpoint(&self) -> Result<()> {
1467 #[cfg(feature = "wal")]
1468 if let Some(ref wal) = self.wal {
1469 let epoch = self.store.current_epoch();
1470 let tx_id = self
1471 .tx_manager
1472 .last_assigned_tx_id()
1473 .unwrap_or_else(|| self.tx_manager.begin());
1474 wal.checkpoint(tx_id, epoch)?;
1475 wal.sync()?;
1476 }
1477 Ok(())
1478 }
1479
1480 #[cfg(feature = "wal")]
1497 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1498 let path = path.as_ref();
1499
1500 let target_config = Config::persistent(path);
1502 let target = Self::with_config(target_config)?;
1503
1504 for node in self.store.all_nodes() {
1506 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1507 target.store.create_node_with_id(node.id, &label_refs);
1508
1509 target.log_wal(&WalRecord::CreateNode {
1511 id: node.id,
1512 labels: node.labels.iter().map(|s| s.to_string()).collect(),
1513 })?;
1514
1515 for (key, value) in node.properties {
1517 target
1518 .store
1519 .set_node_property(node.id, key.as_str(), value.clone());
1520 target.log_wal(&WalRecord::SetNodeProperty {
1521 id: node.id,
1522 key: key.to_string(),
1523 value,
1524 })?;
1525 }
1526 }
1527
1528 for edge in self.store.all_edges() {
1530 target
1531 .store
1532 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1533
1534 target.log_wal(&WalRecord::CreateEdge {
1536 id: edge.id,
1537 src: edge.src,
1538 dst: edge.dst,
1539 edge_type: edge.edge_type.to_string(),
1540 })?;
1541
1542 for (key, value) in edge.properties {
1544 target
1545 .store
1546 .set_edge_property(edge.id, key.as_str(), value.clone());
1547 target.log_wal(&WalRecord::SetEdgeProperty {
1548 id: edge.id,
1549 key: key.to_string(),
1550 value,
1551 })?;
1552 }
1553 }
1554
1555 target.close()?;
1557
1558 Ok(())
1559 }
1560
1561 pub fn to_memory(&self) -> Result<Self> {
1572 let config = Config::in_memory();
1573 let target = Self::with_config(config)?;
1574
1575 for node in self.store.all_nodes() {
1577 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1578 target.store.create_node_with_id(node.id, &label_refs);
1579
1580 for (key, value) in node.properties {
1582 target.store.set_node_property(node.id, key.as_str(), value);
1583 }
1584 }
1585
1586 for edge in self.store.all_edges() {
1588 target
1589 .store
1590 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1591
1592 for (key, value) in edge.properties {
1594 target.store.set_edge_property(edge.id, key.as_str(), value);
1595 }
1596 }
1597
1598 Ok(target)
1599 }
1600
1601 #[cfg(feature = "wal")]
1610 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1611 let source = Self::open(path)?;
1613
1614 let target = source.to_memory()?;
1616
1617 source.close()?;
1619
1620 Ok(target)
1621 }
1622
1623 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1631 self.store.all_nodes()
1632 }
1633
1634 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1638 self.store.all_edges()
1639 }
1640}
1641
1642impl Drop for GrafeoDB {
1643 fn drop(&mut self) {
1644 if let Err(e) = self.close() {
1645 tracing::error!("Error closing database: {}", e);
1646 }
1647 }
1648}
1649
1650#[derive(Debug)]
1676pub struct QueryResult {
1677 pub columns: Vec<String>,
1679 pub column_types: Vec<grafeo_common::types::LogicalType>,
1681 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1683 pub execution_time_ms: Option<f64>,
1685 pub rows_scanned: Option<u64>,
1687}
1688
1689impl QueryResult {
1690 #[must_use]
1692 pub fn new(columns: Vec<String>) -> Self {
1693 let len = columns.len();
1694 Self {
1695 columns,
1696 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1697 rows: Vec::new(),
1698 execution_time_ms: None,
1699 rows_scanned: None,
1700 }
1701 }
1702
1703 #[must_use]
1705 pub fn with_types(
1706 columns: Vec<String>,
1707 column_types: Vec<grafeo_common::types::LogicalType>,
1708 ) -> Self {
1709 Self {
1710 columns,
1711 column_types,
1712 rows: Vec::new(),
1713 execution_time_ms: None,
1714 rows_scanned: None,
1715 }
1716 }
1717
1718 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1720 self.execution_time_ms = Some(execution_time_ms);
1721 self.rows_scanned = Some(rows_scanned);
1722 self
1723 }
1724
1725 #[must_use]
1727 pub fn execution_time_ms(&self) -> Option<f64> {
1728 self.execution_time_ms
1729 }
1730
1731 #[must_use]
1733 pub fn rows_scanned(&self) -> Option<u64> {
1734 self.rows_scanned
1735 }
1736
1737 #[must_use]
1739 pub fn row_count(&self) -> usize {
1740 self.rows.len()
1741 }
1742
1743 #[must_use]
1745 pub fn column_count(&self) -> usize {
1746 self.columns.len()
1747 }
1748
1749 #[must_use]
1751 pub fn is_empty(&self) -> bool {
1752 self.rows.is_empty()
1753 }
1754
1755 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1764 if self.rows.len() != 1 || self.columns.len() != 1 {
1765 return Err(grafeo_common::utils::error::Error::InvalidValue(
1766 "Expected single value".to_string(),
1767 ));
1768 }
1769 T::from_value(&self.rows[0][0])
1770 }
1771
1772 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1774 self.rows.iter()
1775 }
1776}
1777
1778pub trait FromValue: Sized {
1783 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1785}
1786
1787impl FromValue for i64 {
1788 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1789 value
1790 .as_int64()
1791 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1792 expected: "INT64".to_string(),
1793 found: value.type_name().to_string(),
1794 })
1795 }
1796}
1797
1798impl FromValue for f64 {
1799 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1800 value
1801 .as_float64()
1802 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1803 expected: "FLOAT64".to_string(),
1804 found: value.type_name().to_string(),
1805 })
1806 }
1807}
1808
1809impl FromValue for String {
1810 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1811 value.as_str().map(String::from).ok_or_else(|| {
1812 grafeo_common::utils::error::Error::TypeMismatch {
1813 expected: "STRING".to_string(),
1814 found: value.type_name().to_string(),
1815 }
1816 })
1817 }
1818}
1819
1820impl FromValue for bool {
1821 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1822 value
1823 .as_bool()
1824 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1825 expected: "BOOL".to_string(),
1826 found: value.type_name().to_string(),
1827 })
1828 }
1829}
1830
1831#[cfg(test)]
1832mod tests {
1833 use super::*;
1834
1835 #[test]
1836 fn test_create_in_memory_database() {
1837 let db = GrafeoDB::new_in_memory();
1838 assert_eq!(db.node_count(), 0);
1839 assert_eq!(db.edge_count(), 0);
1840 }
1841
1842 #[test]
1843 fn test_database_config() {
1844 let config = Config::in_memory().with_threads(4).with_query_logging();
1845
1846 let db = GrafeoDB::with_config(config).unwrap();
1847 assert_eq!(db.config().threads, 4);
1848 assert!(db.config().query_logging);
1849 }
1850
1851 #[test]
1852 fn test_database_session() {
1853 let db = GrafeoDB::new_in_memory();
1854 let _session = db.session();
1855 }
1857
1858 #[cfg(feature = "wal")]
1859 #[test]
1860 fn test_persistent_database_recovery() {
1861 use grafeo_common::types::Value;
1862 use tempfile::tempdir;
1863
1864 let dir = tempdir().unwrap();
1865 let db_path = dir.path().join("test_db");
1866
1867 {
1869 let db = GrafeoDB::open(&db_path).unwrap();
1870
1871 let alice = db.create_node(&["Person"]);
1872 db.set_node_property(alice, "name", Value::from("Alice"));
1873
1874 let bob = db.create_node(&["Person"]);
1875 db.set_node_property(bob, "name", Value::from("Bob"));
1876
1877 let _edge = db.create_edge(alice, bob, "KNOWS");
1878
1879 db.close().unwrap();
1881 }
1882
1883 {
1885 let db = GrafeoDB::open(&db_path).unwrap();
1886
1887 assert_eq!(db.node_count(), 2);
1888 assert_eq!(db.edge_count(), 1);
1889
1890 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1892 assert!(node0.is_some());
1893
1894 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1895 assert!(node1.is_some());
1896 }
1897 }
1898
1899 #[cfg(feature = "wal")]
1900 #[test]
1901 fn test_wal_logging() {
1902 use tempfile::tempdir;
1903
1904 let dir = tempdir().unwrap();
1905 let db_path = dir.path().join("wal_test_db");
1906
1907 let db = GrafeoDB::open(&db_path).unwrap();
1908
1909 let node = db.create_node(&["Test"]);
1911 db.delete_node(node);
1912
1913 if let Some(wal) = db.wal() {
1915 assert!(wal.record_count() > 0);
1916 }
1917
1918 db.close().unwrap();
1919 }
1920
1921 #[cfg(feature = "wal")]
1922 #[test]
1923 fn test_wal_recovery_multiple_sessions() {
1924 use grafeo_common::types::Value;
1926 use tempfile::tempdir;
1927
1928 let dir = tempdir().unwrap();
1929 let db_path = dir.path().join("multi_session_db");
1930
1931 {
1933 let db = GrafeoDB::open(&db_path).unwrap();
1934 let alice = db.create_node(&["Person"]);
1935 db.set_node_property(alice, "name", Value::from("Alice"));
1936 db.close().unwrap();
1937 }
1938
1939 {
1941 let db = GrafeoDB::open(&db_path).unwrap();
1942 assert_eq!(db.node_count(), 1); let bob = db.create_node(&["Person"]);
1944 db.set_node_property(bob, "name", Value::from("Bob"));
1945 db.close().unwrap();
1946 }
1947
1948 {
1950 let db = GrafeoDB::open(&db_path).unwrap();
1951 assert_eq!(db.node_count(), 2);
1952
1953 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1955 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1956
1957 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1958 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1959 }
1960 }
1961
1962 #[cfg(feature = "wal")]
1963 #[test]
1964 fn test_database_consistency_after_mutations() {
1965 use grafeo_common::types::Value;
1967 use tempfile::tempdir;
1968
1969 let dir = tempdir().unwrap();
1970 let db_path = dir.path().join("consistency_db");
1971
1972 {
1973 let db = GrafeoDB::open(&db_path).unwrap();
1974
1975 let a = db.create_node(&["Node"]);
1977 let b = db.create_node(&["Node"]);
1978 let c = db.create_node(&["Node"]);
1979
1980 let e1 = db.create_edge(a, b, "LINKS");
1982 let _e2 = db.create_edge(b, c, "LINKS");
1983
1984 db.delete_edge(e1);
1986 db.delete_node(b);
1987
1988 db.set_node_property(a, "value", Value::Int64(1));
1990 db.set_node_property(c, "value", Value::Int64(3));
1991
1992 db.close().unwrap();
1993 }
1994
1995 {
1997 let db = GrafeoDB::open(&db_path).unwrap();
1998
1999 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2003 assert!(node_a.is_some());
2004
2005 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2006 assert!(node_c.is_some());
2007
2008 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2010 assert!(node_b.is_none());
2011 }
2012 }
2013
2014 #[cfg(feature = "wal")]
2015 #[test]
2016 fn test_close_is_idempotent() {
2017 use tempfile::tempdir;
2019
2020 let dir = tempdir().unwrap();
2021 let db_path = dir.path().join("close_test_db");
2022
2023 let db = GrafeoDB::open(&db_path).unwrap();
2024 db.create_node(&["Test"]);
2025
2026 assert!(db.close().is_ok());
2028
2029 assert!(db.close().is_ok());
2031 }
2032
2033 #[test]
2034 fn test_query_result_has_metrics() {
2035 let db = GrafeoDB::new_in_memory();
2037 db.create_node(&["Person"]);
2038 db.create_node(&["Person"]);
2039
2040 #[cfg(feature = "gql")]
2041 {
2042 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2043
2044 assert!(result.execution_time_ms.is_some());
2046 assert!(result.rows_scanned.is_some());
2047 assert!(result.execution_time_ms.unwrap() >= 0.0);
2048 assert_eq!(result.rows_scanned.unwrap(), 2);
2049 }
2050 }
2051
2052 #[test]
2053 fn test_empty_query_result_metrics() {
2054 let db = GrafeoDB::new_in_memory();
2056 db.create_node(&["Person"]);
2057
2058 #[cfg(feature = "gql")]
2059 {
2060 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2062
2063 assert!(result.execution_time_ms.is_some());
2064 assert!(result.rows_scanned.is_some());
2065 assert_eq!(result.rows_scanned.unwrap(), 0);
2066 }
2067 }
2068}