1use std::path::Path;
4use std::sync::Arc;
5
6use parking_lot::RwLock;
7
8use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
9use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
10use grafeo_common::utils::error::Result;
11use grafeo_core::graph::lpg::LpgStore;
12#[cfg(feature = "rdf")]
13use grafeo_core::graph::rdf::RdfStore;
14
15use crate::config::Config;
16use crate::session::Session;
17use crate::transaction::TransactionManager;
18
19pub struct GrafeoDB {
21 config: Config,
23 store: Arc<LpgStore>,
25 #[cfg(feature = "rdf")]
27 rdf_store: Arc<RdfStore>,
28 tx_manager: Arc<TransactionManager>,
30 buffer_manager: Arc<BufferManager>,
32 wal: Option<Arc<WalManager>>,
34 is_open: RwLock<bool>,
36}
37
38impl GrafeoDB {
39 #[must_use]
50 pub fn new_in_memory() -> Self {
51 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
52 }
53
54 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
71 Self::with_config(Config::persistent(path.as_ref()))
72 }
73
74 pub fn with_config(config: Config) -> Result<Self> {
94 let store = Arc::new(LpgStore::new());
95 #[cfg(feature = "rdf")]
96 let rdf_store = Arc::new(RdfStore::new());
97 let tx_manager = Arc::new(TransactionManager::new());
98
99 let buffer_config = BufferManagerConfig {
101 budget: config.memory_limit.unwrap_or_else(|| {
102 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
103 }),
104 spill_path: config
105 .spill_path
106 .clone()
107 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
108 ..BufferManagerConfig::default()
109 };
110 let buffer_manager = BufferManager::new(buffer_config);
111
112 let wal = if config.wal_enabled {
114 if let Some(ref db_path) = config.path {
115 std::fs::create_dir_all(db_path)?;
117
118 let wal_path = db_path.join("wal");
119
120 if wal_path.exists() {
122 let recovery = WalRecovery::new(&wal_path);
123 let records = recovery.recover()?;
124 Self::apply_wal_records(&store, &records)?;
125 }
126
127 let wal_config = WalConfig::default();
129 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
130 Some(Arc::new(wal_manager))
131 } else {
132 None
133 }
134 } else {
135 None
136 };
137
138 Ok(Self {
139 config,
140 store,
141 #[cfg(feature = "rdf")]
142 rdf_store,
143 tx_manager,
144 buffer_manager,
145 wal,
146 is_open: RwLock::new(true),
147 })
148 }
149
150 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
152 for record in records {
153 match record {
154 WalRecord::CreateNode { id, labels } => {
155 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
156 store.create_node_with_id(*id, &label_refs);
157 }
158 WalRecord::DeleteNode { id } => {
159 store.delete_node(*id);
160 }
161 WalRecord::CreateEdge {
162 id,
163 src,
164 dst,
165 edge_type,
166 } => {
167 store.create_edge_with_id(*id, *src, *dst, edge_type);
168 }
169 WalRecord::DeleteEdge { id } => {
170 store.delete_edge(*id);
171 }
172 WalRecord::SetNodeProperty { id, key, value } => {
173 store.set_node_property(*id, key, value.clone());
174 }
175 WalRecord::SetEdgeProperty { id, key, value } => {
176 store.set_edge_property(*id, key, value.clone());
177 }
178 WalRecord::TxCommit { .. }
179 | WalRecord::TxAbort { .. }
180 | WalRecord::Checkpoint { .. } => {
181 }
184 }
185 }
186 Ok(())
187 }
188
189 #[must_use]
201 pub fn session(&self) -> Session {
202 Session::new(Arc::clone(&self.store), Arc::clone(&self.tx_manager))
203 }
204
205 pub fn execute(&self, query: &str) -> Result<QueryResult> {
214 let session = self.session();
215 session.execute(query)
216 }
217
218 pub fn execute_with_params(
224 &self,
225 query: &str,
226 params: std::collections::HashMap<String, grafeo_common::types::Value>,
227 ) -> Result<QueryResult> {
228 let session = self.session();
229 session.execute_with_params(query, params)
230 }
231
232 #[cfg(feature = "gremlin")]
238 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
239 let session = self.session();
240 session.execute_gremlin(query)
241 }
242
243 #[cfg(feature = "gremlin")]
249 pub fn execute_gremlin_with_params(
250 &self,
251 query: &str,
252 params: std::collections::HashMap<String, grafeo_common::types::Value>,
253 ) -> Result<QueryResult> {
254 let session = self.session();
255 session.execute_gremlin_with_params(query, params)
256 }
257
258 #[cfg(feature = "graphql")]
264 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
265 let session = self.session();
266 session.execute_graphql(query)
267 }
268
269 #[cfg(feature = "graphql")]
275 pub fn execute_graphql_with_params(
276 &self,
277 query: &str,
278 params: std::collections::HashMap<String, grafeo_common::types::Value>,
279 ) -> Result<QueryResult> {
280 let session = self.session();
281 session.execute_graphql_with_params(query, params)
282 }
283
284 #[cfg(all(feature = "sparql", feature = "rdf"))]
301 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
302 use crate::query::{
303 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
304 };
305
306 let logical_plan = sparql_translator::translate(query)?;
308
309 let optimizer = Optimizer::new();
311 let optimized_plan = optimizer.optimize(logical_plan)?;
312
313 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
315 let mut physical_plan = planner.plan(&optimized_plan)?;
316
317 let executor = Executor::with_columns(physical_plan.columns.clone());
319 executor.execute(physical_plan.operator.as_mut())
320 }
321
322 #[cfg(feature = "rdf")]
326 #[must_use]
327 pub fn rdf_store(&self) -> &Arc<RdfStore> {
328 &self.rdf_store
329 }
330
331 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
337 let result = self.execute(query)?;
338 result.scalar()
339 }
340
341 #[must_use]
343 pub fn config(&self) -> &Config {
344 &self.config
345 }
346
347 #[must_use]
351 pub fn store(&self) -> &Arc<LpgStore> {
352 &self.store
353 }
354
355 #[must_use]
357 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
358 &self.buffer_manager
359 }
360
361 pub fn close(&self) -> Result<()> {
372 let mut is_open = self.is_open.write();
373 if !*is_open {
374 return Ok(());
375 }
376
377 if let Some(ref wal) = self.wal {
379 let epoch = self.store.current_epoch();
380
381 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
383 self.tx_manager.begin()
385 });
386
387 wal.log(&WalRecord::TxCommit {
389 tx_id: checkpoint_tx,
390 })?;
391
392 wal.checkpoint(checkpoint_tx, epoch)?;
394 wal.sync()?;
395 }
396
397 *is_open = false;
398 Ok(())
399 }
400
401 #[must_use]
403 pub fn wal(&self) -> Option<&Arc<WalManager>> {
404 self.wal.as_ref()
405 }
406
407 fn log_wal(&self, record: &WalRecord) -> Result<()> {
409 if let Some(ref wal) = self.wal {
410 wal.log(record)?;
411 }
412 Ok(())
413 }
414
415 #[must_use]
417 pub fn node_count(&self) -> usize {
418 self.store.node_count()
419 }
420
421 #[must_use]
423 pub fn edge_count(&self) -> usize {
424 self.store.edge_count()
425 }
426
427 #[must_use]
429 pub fn label_count(&self) -> usize {
430 self.store.label_count()
431 }
432
433 #[must_use]
435 pub fn property_key_count(&self) -> usize {
436 self.store.property_key_count()
437 }
438
439 #[must_use]
441 pub fn edge_type_count(&self) -> usize {
442 self.store.edge_type_count()
443 }
444
445 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
451 let id = self.store.create_node(labels);
452
453 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
455 id,
456 labels: labels.iter().map(|s| s.to_string()).collect(),
457 }) {
458 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
459 }
460
461 id
462 }
463
464 pub fn create_node_with_props(
468 &self,
469 labels: &[&str],
470 properties: impl IntoIterator<
471 Item = (
472 impl Into<grafeo_common::types::PropertyKey>,
473 impl Into<grafeo_common::types::Value>,
474 ),
475 >,
476 ) -> grafeo_common::types::NodeId {
477 let props: Vec<(
479 grafeo_common::types::PropertyKey,
480 grafeo_common::types::Value,
481 )> = properties
482 .into_iter()
483 .map(|(k, v)| (k.into(), v.into()))
484 .collect();
485
486 let id = self
487 .store
488 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
489
490 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
492 id,
493 labels: labels.iter().map(|s| s.to_string()).collect(),
494 }) {
495 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
496 }
497
498 for (key, value) in props {
500 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
501 id,
502 key: key.to_string(),
503 value,
504 }) {
505 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
506 }
507 }
508
509 id
510 }
511
512 #[must_use]
514 pub fn get_node(
515 &self,
516 id: grafeo_common::types::NodeId,
517 ) -> Option<grafeo_core::graph::lpg::Node> {
518 self.store.get_node(id)
519 }
520
521 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
525 let result = self.store.delete_node(id);
526
527 if result {
528 if let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
529 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
530 }
531 }
532
533 result
534 }
535
536 pub fn set_node_property(
540 &self,
541 id: grafeo_common::types::NodeId,
542 key: &str,
543 value: grafeo_common::types::Value,
544 ) {
545 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
547 id,
548 key: key.to_string(),
549 value: value.clone(),
550 }) {
551 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
552 }
553
554 self.store.set_node_property(id, key, value);
555 }
556
557 pub fn create_edge(
563 &self,
564 src: grafeo_common::types::NodeId,
565 dst: grafeo_common::types::NodeId,
566 edge_type: &str,
567 ) -> grafeo_common::types::EdgeId {
568 let id = self.store.create_edge(src, dst, edge_type);
569
570 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
572 id,
573 src,
574 dst,
575 edge_type: edge_type.to_string(),
576 }) {
577 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
578 }
579
580 id
581 }
582
583 pub fn create_edge_with_props(
587 &self,
588 src: grafeo_common::types::NodeId,
589 dst: grafeo_common::types::NodeId,
590 edge_type: &str,
591 properties: impl IntoIterator<
592 Item = (
593 impl Into<grafeo_common::types::PropertyKey>,
594 impl Into<grafeo_common::types::Value>,
595 ),
596 >,
597 ) -> grafeo_common::types::EdgeId {
598 let props: Vec<(
600 grafeo_common::types::PropertyKey,
601 grafeo_common::types::Value,
602 )> = properties
603 .into_iter()
604 .map(|(k, v)| (k.into(), v.into()))
605 .collect();
606
607 let id = self.store.create_edge_with_props(
608 src,
609 dst,
610 edge_type,
611 props.iter().map(|(k, v)| (k.clone(), v.clone())),
612 );
613
614 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
616 id,
617 src,
618 dst,
619 edge_type: edge_type.to_string(),
620 }) {
621 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
622 }
623
624 for (key, value) in props {
626 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
627 id,
628 key: key.to_string(),
629 value,
630 }) {
631 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
632 }
633 }
634
635 id
636 }
637
638 #[must_use]
640 pub fn get_edge(
641 &self,
642 id: grafeo_common::types::EdgeId,
643 ) -> Option<grafeo_core::graph::lpg::Edge> {
644 self.store.get_edge(id)
645 }
646
647 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
651 let result = self.store.delete_edge(id);
652
653 if result {
654 if let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
655 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
656 }
657 }
658
659 result
660 }
661
662 pub fn set_edge_property(
666 &self,
667 id: grafeo_common::types::EdgeId,
668 key: &str,
669 value: grafeo_common::types::Value,
670 ) {
671 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
673 id,
674 key: key.to_string(),
675 value: value.clone(),
676 }) {
677 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
678 }
679 self.store.set_edge_property(id, key, value);
680 }
681
682 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
686 self.store.remove_node_property(id, key).is_some()
688 }
689
690 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
694 self.store.remove_edge_property(id, key).is_some()
696 }
697}
698
699impl Drop for GrafeoDB {
700 fn drop(&mut self) {
701 if let Err(e) = self.close() {
702 tracing::error!("Error closing database: {}", e);
703 }
704 }
705}
706
707#[derive(Debug)]
709pub struct QueryResult {
710 pub columns: Vec<String>,
712 pub column_types: Vec<grafeo_common::types::LogicalType>,
714 pub rows: Vec<Vec<grafeo_common::types::Value>>,
716}
717
718impl QueryResult {
719 #[must_use]
721 pub fn new(columns: Vec<String>) -> Self {
722 let len = columns.len();
723 Self {
724 columns,
725 column_types: vec![grafeo_common::types::LogicalType::Any; len],
726 rows: Vec::new(),
727 }
728 }
729
730 #[must_use]
732 pub fn with_types(
733 columns: Vec<String>,
734 column_types: Vec<grafeo_common::types::LogicalType>,
735 ) -> Self {
736 Self {
737 columns,
738 column_types,
739 rows: Vec::new(),
740 }
741 }
742
743 #[must_use]
745 pub fn row_count(&self) -> usize {
746 self.rows.len()
747 }
748
749 #[must_use]
751 pub fn column_count(&self) -> usize {
752 self.columns.len()
753 }
754
755 #[must_use]
757 pub fn is_empty(&self) -> bool {
758 self.rows.is_empty()
759 }
760
761 pub fn scalar<T: FromValue>(&self) -> Result<T> {
767 if self.rows.len() != 1 || self.columns.len() != 1 {
768 return Err(grafeo_common::utils::error::Error::InvalidValue(
769 "Expected single value".to_string(),
770 ));
771 }
772 T::from_value(&self.rows[0][0])
773 }
774
775 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
777 self.rows.iter()
778 }
779}
780
781pub trait FromValue: Sized {
783 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
789}
790
791impl FromValue for i64 {
792 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
793 value
794 .as_int64()
795 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
796 expected: "INT64".to_string(),
797 found: value.type_name().to_string(),
798 })
799 }
800}
801
802impl FromValue for f64 {
803 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
804 value
805 .as_float64()
806 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
807 expected: "FLOAT64".to_string(),
808 found: value.type_name().to_string(),
809 })
810 }
811}
812
813impl FromValue for String {
814 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
815 value.as_str().map(String::from).ok_or_else(|| {
816 grafeo_common::utils::error::Error::TypeMismatch {
817 expected: "STRING".to_string(),
818 found: value.type_name().to_string(),
819 }
820 })
821 }
822}
823
824impl FromValue for bool {
825 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
826 value
827 .as_bool()
828 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
829 expected: "BOOL".to_string(),
830 found: value.type_name().to_string(),
831 })
832 }
833}
834
835#[cfg(test)]
836mod tests {
837 use super::*;
838
839 #[test]
840 fn test_create_in_memory_database() {
841 let db = GrafeoDB::new_in_memory();
842 assert_eq!(db.node_count(), 0);
843 assert_eq!(db.edge_count(), 0);
844 }
845
846 #[test]
847 fn test_database_config() {
848 let config = Config::in_memory().with_threads(4).with_query_logging();
849
850 let db = GrafeoDB::with_config(config).unwrap();
851 assert_eq!(db.config().threads, 4);
852 assert!(db.config().query_logging);
853 }
854
855 #[test]
856 fn test_database_session() {
857 let db = GrafeoDB::new_in_memory();
858 let _session = db.session();
859 }
861
862 #[test]
863 fn test_persistent_database_recovery() {
864 use grafeo_common::types::Value;
865 use tempfile::tempdir;
866
867 let dir = tempdir().unwrap();
868 let db_path = dir.path().join("test_db");
869
870 {
872 let db = GrafeoDB::open(&db_path).unwrap();
873
874 let alice = db.create_node(&["Person"]);
875 db.set_node_property(alice, "name", Value::from("Alice"));
876
877 let bob = db.create_node(&["Person"]);
878 db.set_node_property(bob, "name", Value::from("Bob"));
879
880 let _edge = db.create_edge(alice, bob, "KNOWS");
881
882 db.close().unwrap();
884 }
885
886 {
888 let db = GrafeoDB::open(&db_path).unwrap();
889
890 assert_eq!(db.node_count(), 2);
891 assert_eq!(db.edge_count(), 1);
892
893 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
895 assert!(node0.is_some());
896
897 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
898 assert!(node1.is_some());
899 }
900 }
901
902 #[test]
903 fn test_wal_logging() {
904 use tempfile::tempdir;
905
906 let dir = tempdir().unwrap();
907 let db_path = dir.path().join("wal_test_db");
908
909 let db = GrafeoDB::open(&db_path).unwrap();
910
911 let node = db.create_node(&["Test"]);
913 db.delete_node(node);
914
915 if let Some(wal) = db.wal() {
917 assert!(wal.record_count() > 0);
918 }
919
920 db.close().unwrap();
921 }
922}