1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22
23use std::path::Path;
24use std::sync::Arc;
25use std::sync::atomic::AtomicUsize;
26
27use parking_lot::RwLock;
28
29#[cfg(feature = "wal")]
30use grafeo_adapters::storage::wal::{
31 DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
32};
33use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
34use grafeo_common::utils::error::Result;
35use grafeo_core::graph::lpg::LpgStore;
36#[cfg(feature = "rdf")]
37use grafeo_core::graph::rdf::RdfStore;
38
39use crate::config::Config;
40use crate::query::cache::QueryCache;
41use crate::session::Session;
42use crate::transaction::TransactionManager;
43
44pub struct GrafeoDB {
67 pub(super) config: Config,
69 pub(super) store: Arc<LpgStore>,
71 #[cfg(feature = "rdf")]
73 pub(super) rdf_store: Arc<RdfStore>,
74 pub(super) tx_manager: Arc<TransactionManager>,
76 pub(super) buffer_manager: Arc<BufferManager>,
78 #[cfg(feature = "wal")]
80 pub(super) wal: Option<Arc<WalManager>>,
81 pub(super) query_cache: Arc<QueryCache>,
83 pub(super) commit_counter: Arc<AtomicUsize>,
85 pub(super) is_open: RwLock<bool>,
87 #[cfg(feature = "cdc")]
89 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
90 #[cfg(feature = "embed")]
92 pub(super) embedding_models:
93 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
94}
95
96impl GrafeoDB {
97 #[must_use]
113 pub fn new_in_memory() -> Self {
114 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
115 }
116
117 #[cfg(feature = "wal")]
136 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
137 Self::with_config(Config::persistent(path.as_ref()))
138 }
139
140 pub fn with_config(config: Config) -> Result<Self> {
164 config
166 .validate()
167 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
168
169 let store = Arc::new(LpgStore::new());
170 #[cfg(feature = "rdf")]
171 let rdf_store = Arc::new(RdfStore::new());
172 let tx_manager = Arc::new(TransactionManager::new());
173
174 let buffer_config = BufferManagerConfig {
176 budget: config.memory_limit.unwrap_or_else(|| {
177 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
178 }),
179 spill_path: config
180 .spill_path
181 .clone()
182 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
183 ..BufferManagerConfig::default()
184 };
185 let buffer_manager = BufferManager::new(buffer_config);
186
187 #[cfg(feature = "wal")]
189 let wal = if config.wal_enabled {
190 if let Some(ref db_path) = config.path {
191 std::fs::create_dir_all(db_path)?;
193
194 let wal_path = db_path.join("wal");
195
196 if wal_path.exists() {
198 let recovery = WalRecovery::new(&wal_path);
199 let records = recovery.recover()?;
200 Self::apply_wal_records(&store, &records)?;
201 }
202
203 let wal_durability = match config.wal_durability {
205 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
206 crate::config::DurabilityMode::Batch {
207 max_delay_ms,
208 max_records,
209 } => WalDurabilityMode::Batch {
210 max_delay_ms,
211 max_records,
212 },
213 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
214 WalDurabilityMode::Adaptive { target_interval_ms }
215 }
216 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
217 };
218 let wal_config = WalConfig {
219 durability: wal_durability,
220 ..WalConfig::default()
221 };
222 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
223 Some(Arc::new(wal_manager))
224 } else {
225 None
226 }
227 } else {
228 None
229 };
230
231 let query_cache = Arc::new(QueryCache::default());
233
234 Ok(Self {
235 config,
236 store,
237 #[cfg(feature = "rdf")]
238 rdf_store,
239 tx_manager,
240 buffer_manager,
241 #[cfg(feature = "wal")]
242 wal,
243 query_cache,
244 commit_counter: Arc::new(AtomicUsize::new(0)),
245 is_open: RwLock::new(true),
246 #[cfg(feature = "cdc")]
247 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
248 #[cfg(feature = "embed")]
249 embedding_models: RwLock::new(hashbrown::HashMap::new()),
250 })
251 }
252
253 #[cfg(feature = "wal")]
255 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
256 for record in records {
257 match record {
258 WalRecord::CreateNode { id, labels } => {
259 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
260 store.create_node_with_id(*id, &label_refs);
261 }
262 WalRecord::DeleteNode { id } => {
263 store.delete_node(*id);
264 }
265 WalRecord::CreateEdge {
266 id,
267 src,
268 dst,
269 edge_type,
270 } => {
271 store.create_edge_with_id(*id, *src, *dst, edge_type);
272 }
273 WalRecord::DeleteEdge { id } => {
274 store.delete_edge(*id);
275 }
276 WalRecord::SetNodeProperty { id, key, value } => {
277 store.set_node_property(*id, key, value.clone());
278 }
279 WalRecord::SetEdgeProperty { id, key, value } => {
280 store.set_edge_property(*id, key, value.clone());
281 }
282 WalRecord::AddNodeLabel { id, label } => {
283 store.add_label(*id, label);
284 }
285 WalRecord::RemoveNodeLabel { id, label } => {
286 store.remove_label(*id, label);
287 }
288 WalRecord::TxCommit { .. }
289 | WalRecord::TxAbort { .. }
290 | WalRecord::Checkpoint { .. } => {
291 }
294 }
295 }
296 Ok(())
297 }
298
299 #[must_use]
322 pub fn session(&self) -> Session {
323 #[cfg(feature = "rdf")]
324 let mut session = Session::with_rdf_store_and_adaptive(
325 Arc::clone(&self.store),
326 Arc::clone(&self.rdf_store),
327 Arc::clone(&self.tx_manager),
328 Arc::clone(&self.query_cache),
329 self.config.adaptive.clone(),
330 self.config.factorized_execution,
331 self.config.graph_model,
332 self.config.query_timeout,
333 Arc::clone(&self.commit_counter),
334 self.config.gc_interval,
335 );
336 #[cfg(not(feature = "rdf"))]
337 let mut session = Session::with_adaptive(
338 Arc::clone(&self.store),
339 Arc::clone(&self.tx_manager),
340 Arc::clone(&self.query_cache),
341 self.config.adaptive.clone(),
342 self.config.factorized_execution,
343 self.config.graph_model,
344 self.config.query_timeout,
345 Arc::clone(&self.commit_counter),
346 self.config.gc_interval,
347 );
348
349 #[cfg(feature = "cdc")]
350 session.set_cdc_log(Arc::clone(&self.cdc_log));
351
352 let _ = &mut session;
354
355 session
356 }
357
358 #[must_use]
360 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
361 &self.config.adaptive
362 }
363
364 #[must_use]
366 pub fn config(&self) -> &Config {
367 &self.config
368 }
369
370 #[must_use]
372 pub fn graph_model(&self) -> crate::config::GraphModel {
373 self.config.graph_model
374 }
375
376 #[must_use]
378 pub fn memory_limit(&self) -> Option<usize> {
379 self.config.memory_limit
380 }
381
382 #[must_use]
386 pub fn store(&self) -> &Arc<LpgStore> {
387 &self.store
388 }
389
390 pub fn gc(&self) {
396 let min_epoch = self.tx_manager.min_active_epoch();
397 self.store.gc_versions(min_epoch);
398 self.tx_manager.gc();
399 }
400
401 #[must_use]
403 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
404 &self.buffer_manager
405 }
406
407 #[must_use]
409 pub fn query_cache(&self) -> &Arc<QueryCache> {
410 &self.query_cache
411 }
412
413 pub fn close(&self) -> Result<()> {
427 let mut is_open = self.is_open.write();
428 if !*is_open {
429 return Ok(());
430 }
431
432 #[cfg(feature = "wal")]
434 if let Some(ref wal) = self.wal {
435 let epoch = self.store.current_epoch();
436
437 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
439 self.tx_manager.begin()
441 });
442
443 wal.log(&WalRecord::TxCommit {
445 tx_id: checkpoint_tx,
446 })?;
447
448 wal.checkpoint(checkpoint_tx, epoch)?;
450 wal.sync()?;
451 }
452
453 *is_open = false;
454 Ok(())
455 }
456
457 #[cfg(feature = "wal")]
459 #[must_use]
460 pub fn wal(&self) -> Option<&Arc<WalManager>> {
461 self.wal.as_ref()
462 }
463
464 #[cfg(feature = "wal")]
466 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
467 if let Some(ref wal) = self.wal {
468 wal.log(record)?;
469 }
470 Ok(())
471 }
472}
473
474impl Drop for GrafeoDB {
475 fn drop(&mut self) {
476 if let Err(e) = self.close() {
477 tracing::error!("Error closing database: {}", e);
478 }
479 }
480}
481
482impl crate::admin::AdminService for GrafeoDB {
483 fn info(&self) -> crate::admin::DatabaseInfo {
484 self.info()
485 }
486
487 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
488 self.detailed_stats()
489 }
490
491 fn schema(&self) -> crate::admin::SchemaInfo {
492 self.schema()
493 }
494
495 fn validate(&self) -> crate::admin::ValidationResult {
496 self.validate()
497 }
498
499 fn wal_status(&self) -> crate::admin::WalStatus {
500 self.wal_status()
501 }
502
503 fn wal_checkpoint(&self) -> Result<()> {
504 self.wal_checkpoint()
505 }
506}
507
508#[derive(Debug)]
538pub struct QueryResult {
539 pub columns: Vec<String>,
541 pub column_types: Vec<grafeo_common::types::LogicalType>,
543 pub rows: Vec<Vec<grafeo_common::types::Value>>,
545 pub execution_time_ms: Option<f64>,
547 pub rows_scanned: Option<u64>,
549}
550
551impl QueryResult {
552 #[must_use]
554 pub fn new(columns: Vec<String>) -> Self {
555 let len = columns.len();
556 Self {
557 columns,
558 column_types: vec![grafeo_common::types::LogicalType::Any; len],
559 rows: Vec::new(),
560 execution_time_ms: None,
561 rows_scanned: None,
562 }
563 }
564
565 #[must_use]
567 pub fn with_types(
568 columns: Vec<String>,
569 column_types: Vec<grafeo_common::types::LogicalType>,
570 ) -> Self {
571 Self {
572 columns,
573 column_types,
574 rows: Vec::new(),
575 execution_time_ms: None,
576 rows_scanned: None,
577 }
578 }
579
580 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
582 self.execution_time_ms = Some(execution_time_ms);
583 self.rows_scanned = Some(rows_scanned);
584 self
585 }
586
587 #[must_use]
589 pub fn execution_time_ms(&self) -> Option<f64> {
590 self.execution_time_ms
591 }
592
593 #[must_use]
595 pub fn rows_scanned(&self) -> Option<u64> {
596 self.rows_scanned
597 }
598
599 #[must_use]
601 pub fn row_count(&self) -> usize {
602 self.rows.len()
603 }
604
605 #[must_use]
607 pub fn column_count(&self) -> usize {
608 self.columns.len()
609 }
610
611 #[must_use]
613 pub fn is_empty(&self) -> bool {
614 self.rows.is_empty()
615 }
616
617 pub fn scalar<T: FromValue>(&self) -> Result<T> {
626 if self.rows.len() != 1 || self.columns.len() != 1 {
627 return Err(grafeo_common::utils::error::Error::InvalidValue(
628 "Expected single value".to_string(),
629 ));
630 }
631 T::from_value(&self.rows[0][0])
632 }
633
634 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
636 self.rows.iter()
637 }
638}
639
640pub trait FromValue: Sized {
645 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
647}
648
649impl FromValue for i64 {
650 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
651 value
652 .as_int64()
653 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
654 expected: "INT64".to_string(),
655 found: value.type_name().to_string(),
656 })
657 }
658}
659
660impl FromValue for f64 {
661 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
662 value
663 .as_float64()
664 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
665 expected: "FLOAT64".to_string(),
666 found: value.type_name().to_string(),
667 })
668 }
669}
670
671impl FromValue for String {
672 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
673 value.as_str().map(String::from).ok_or_else(|| {
674 grafeo_common::utils::error::Error::TypeMismatch {
675 expected: "STRING".to_string(),
676 found: value.type_name().to_string(),
677 }
678 })
679 }
680}
681
682impl FromValue for bool {
683 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
684 value
685 .as_bool()
686 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
687 expected: "BOOL".to_string(),
688 found: value.type_name().to_string(),
689 })
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use super::*;
696
697 #[test]
698 fn test_create_in_memory_database() {
699 let db = GrafeoDB::new_in_memory();
700 assert_eq!(db.node_count(), 0);
701 assert_eq!(db.edge_count(), 0);
702 }
703
704 #[test]
705 fn test_database_config() {
706 let config = Config::in_memory().with_threads(4).with_query_logging();
707
708 let db = GrafeoDB::with_config(config).unwrap();
709 assert_eq!(db.config().threads, 4);
710 assert!(db.config().query_logging);
711 }
712
713 #[test]
714 fn test_database_session() {
715 let db = GrafeoDB::new_in_memory();
716 let _session = db.session();
717 }
719
720 #[cfg(feature = "wal")]
721 #[test]
722 fn test_persistent_database_recovery() {
723 use grafeo_common::types::Value;
724 use tempfile::tempdir;
725
726 let dir = tempdir().unwrap();
727 let db_path = dir.path().join("test_db");
728
729 {
731 let db = GrafeoDB::open(&db_path).unwrap();
732
733 let alice = db.create_node(&["Person"]);
734 db.set_node_property(alice, "name", Value::from("Alice"));
735
736 let bob = db.create_node(&["Person"]);
737 db.set_node_property(bob, "name", Value::from("Bob"));
738
739 let _edge = db.create_edge(alice, bob, "KNOWS");
740
741 db.close().unwrap();
743 }
744
745 {
747 let db = GrafeoDB::open(&db_path).unwrap();
748
749 assert_eq!(db.node_count(), 2);
750 assert_eq!(db.edge_count(), 1);
751
752 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
754 assert!(node0.is_some());
755
756 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
757 assert!(node1.is_some());
758 }
759 }
760
761 #[cfg(feature = "wal")]
762 #[test]
763 fn test_wal_logging() {
764 use tempfile::tempdir;
765
766 let dir = tempdir().unwrap();
767 let db_path = dir.path().join("wal_test_db");
768
769 let db = GrafeoDB::open(&db_path).unwrap();
770
771 let node = db.create_node(&["Test"]);
773 db.delete_node(node);
774
775 if let Some(wal) = db.wal() {
777 assert!(wal.record_count() > 0);
778 }
779
780 db.close().unwrap();
781 }
782
783 #[cfg(feature = "wal")]
784 #[test]
785 fn test_wal_recovery_multiple_sessions() {
786 use grafeo_common::types::Value;
788 use tempfile::tempdir;
789
790 let dir = tempdir().unwrap();
791 let db_path = dir.path().join("multi_session_db");
792
793 {
795 let db = GrafeoDB::open(&db_path).unwrap();
796 let alice = db.create_node(&["Person"]);
797 db.set_node_property(alice, "name", Value::from("Alice"));
798 db.close().unwrap();
799 }
800
801 {
803 let db = GrafeoDB::open(&db_path).unwrap();
804 assert_eq!(db.node_count(), 1); let bob = db.create_node(&["Person"]);
806 db.set_node_property(bob, "name", Value::from("Bob"));
807 db.close().unwrap();
808 }
809
810 {
812 let db = GrafeoDB::open(&db_path).unwrap();
813 assert_eq!(db.node_count(), 2);
814
815 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
817 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
818
819 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
820 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
821 }
822 }
823
824 #[cfg(feature = "wal")]
825 #[test]
826 fn test_database_consistency_after_mutations() {
827 use grafeo_common::types::Value;
829 use tempfile::tempdir;
830
831 let dir = tempdir().unwrap();
832 let db_path = dir.path().join("consistency_db");
833
834 {
835 let db = GrafeoDB::open(&db_path).unwrap();
836
837 let a = db.create_node(&["Node"]);
839 let b = db.create_node(&["Node"]);
840 let c = db.create_node(&["Node"]);
841
842 let e1 = db.create_edge(a, b, "LINKS");
844 let _e2 = db.create_edge(b, c, "LINKS");
845
846 db.delete_edge(e1);
848 db.delete_node(b);
849
850 db.set_node_property(a, "value", Value::Int64(1));
852 db.set_node_property(c, "value", Value::Int64(3));
853
854 db.close().unwrap();
855 }
856
857 {
859 let db = GrafeoDB::open(&db_path).unwrap();
860
861 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
865 assert!(node_a.is_some());
866
867 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
868 assert!(node_c.is_some());
869
870 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
872 assert!(node_b.is_none());
873 }
874 }
875
876 #[cfg(feature = "wal")]
877 #[test]
878 fn test_close_is_idempotent() {
879 use tempfile::tempdir;
881
882 let dir = tempdir().unwrap();
883 let db_path = dir.path().join("close_test_db");
884
885 let db = GrafeoDB::open(&db_path).unwrap();
886 db.create_node(&["Test"]);
887
888 assert!(db.close().is_ok());
890
891 assert!(db.close().is_ok());
893 }
894
895 #[test]
896 fn test_query_result_has_metrics() {
897 let db = GrafeoDB::new_in_memory();
899 db.create_node(&["Person"]);
900 db.create_node(&["Person"]);
901
902 #[cfg(feature = "gql")]
903 {
904 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
905
906 assert!(result.execution_time_ms.is_some());
908 assert!(result.rows_scanned.is_some());
909 assert!(result.execution_time_ms.unwrap() >= 0.0);
910 assert_eq!(result.rows_scanned.unwrap(), 2);
911 }
912 }
913
914 #[test]
915 fn test_empty_query_result_metrics() {
916 let db = GrafeoDB::new_in_memory();
918 db.create_node(&["Person"]);
919
920 #[cfg(feature = "gql")]
921 {
922 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
924
925 assert!(result.execution_time_ms.is_some());
926 assert!(result.rows_scanned.is_some());
927 assert_eq!(result.rows_scanned.unwrap(), 0);
928 }
929 }
930
931 #[cfg(feature = "cdc")]
932 mod cdc_integration {
933 use super::*;
934
935 #[test]
936 fn test_node_lifecycle_history() {
937 let db = GrafeoDB::new_in_memory();
938
939 let id = db.create_node(&["Person"]);
941 db.set_node_property(id, "name", "Alice".into());
943 db.set_node_property(id, "name", "Bob".into());
944 db.delete_node(id);
946
947 let history = db.history(id).unwrap();
948 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
950 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
951 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
953 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
955 }
956
957 #[test]
958 fn test_edge_lifecycle_history() {
959 let db = GrafeoDB::new_in_memory();
960
961 let alice = db.create_node(&["Person"]);
962 let bob = db.create_node(&["Person"]);
963 let edge = db.create_edge(alice, bob, "KNOWS");
964 db.set_edge_property(edge, "since", 2024i64.into());
965 db.delete_edge(edge);
966
967 let history = db.history(edge).unwrap();
968 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
970 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
971 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
972 }
973
974 #[test]
975 fn test_create_node_with_props_cdc() {
976 let db = GrafeoDB::new_in_memory();
977
978 let id = db.create_node_with_props(
979 &["Person"],
980 vec![
981 ("name", grafeo_common::types::Value::from("Alice")),
982 ("age", grafeo_common::types::Value::from(30i64)),
983 ],
984 );
985
986 let history = db.history(id).unwrap();
987 assert_eq!(history.len(), 1);
988 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
989 let after = history[0].after.as_ref().unwrap();
991 assert_eq!(after.len(), 2);
992 }
993
994 #[test]
995 fn test_changes_between() {
996 let db = GrafeoDB::new_in_memory();
997
998 let id1 = db.create_node(&["A"]);
999 let _id2 = db.create_node(&["B"]);
1000 db.set_node_property(id1, "x", 1i64.into());
1001
1002 let changes = db
1004 .changes_between(
1005 grafeo_common::types::EpochId(0),
1006 grafeo_common::types::EpochId(u64::MAX),
1007 )
1008 .unwrap();
1009 assert_eq!(changes.len(), 3); }
1011 }
1012}