1mod admin;
15#[cfg(feature = "async-storage")]
16mod async_ops;
17#[cfg(feature = "async-storage")]
18pub(crate) mod async_wal_store;
19#[cfg(feature = "cdc")]
20pub(crate) mod cdc_store;
21mod crud;
22#[cfg(feature = "embed")]
23mod embed;
24mod import;
25mod index;
26mod persistence;
27mod query;
28#[cfg(feature = "rdf")]
29mod rdf_ops;
30mod search;
31#[cfg(feature = "wal")]
32pub(crate) mod wal_store;
33
34use grafeo_common::grafeo_error;
35#[cfg(feature = "wal")]
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::AtomicUsize;
39
40use parking_lot::RwLock;
41
42#[cfg(feature = "grafeo-file")]
43use grafeo_adapters::storage::file::GrafeoFileManager;
44#[cfg(feature = "wal")]
45use grafeo_adapters::storage::wal::{
46 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
47};
48use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
49use grafeo_common::utils::error::Result;
50use grafeo_core::graph::lpg::LpgStore;
51#[cfg(feature = "rdf")]
52use grafeo_core::graph::rdf::RdfStore;
53use grafeo_core::graph::{GraphStore, GraphStoreMut};
54
55use crate::catalog::Catalog;
56use crate::config::Config;
57use crate::query::cache::QueryCache;
58use crate::session::Session;
59use crate::transaction::TransactionManager;
60
61pub struct GrafeoDB {
84 pub(super) config: Config,
86 pub(super) store: Option<Arc<LpgStore>>,
88 pub(super) catalog: Arc<Catalog>,
90 #[cfg(feature = "rdf")]
92 pub(super) rdf_store: Arc<RdfStore>,
93 pub(super) transaction_manager: Arc<TransactionManager>,
95 pub(super) buffer_manager: Arc<BufferManager>,
97 #[cfg(feature = "wal")]
99 pub(super) wal: Option<Arc<LpgWal>>,
100 #[cfg(feature = "wal")]
104 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
105 pub(super) query_cache: Arc<QueryCache>,
107 pub(super) commit_counter: Arc<AtomicUsize>,
109 pub(super) is_open: RwLock<bool>,
111 #[cfg(feature = "cdc")]
113 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
114 #[cfg(feature = "cdc")]
116 cdc_enabled: std::sync::atomic::AtomicBool,
117 #[cfg(feature = "embed")]
119 pub(super) embedding_models:
120 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
121 #[cfg(feature = "grafeo-file")]
123 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
124 pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
127 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
130 #[cfg(feature = "metrics")]
132 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
133 current_graph: RwLock<Option<String>>,
137 current_schema: RwLock<Option<String>>,
141 read_only: bool,
144}
145
146impl GrafeoDB {
147 fn lpg_store(&self) -> &Arc<LpgStore> {
155 self.store.as_ref().expect(
156 "no built-in LpgStore: this GrafeoDB was created with an external store \
157 (with_store / with_read_store). Use session() or graph_store() instead.",
158 )
159 }
160
161 #[cfg(feature = "cdc")]
163 #[inline]
164 pub(super) fn cdc_active(&self) -> bool {
165 self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
166 }
167
168 #[must_use]
189 pub fn new_in_memory() -> Self {
190 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
191 }
192
193 #[cfg(feature = "wal")]
212 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
213 Self::with_config(Config::persistent(path.as_ref()))
214 }
215
216 #[cfg(feature = "grafeo-file")]
241 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
242 Self::with_config(Config::read_only(path.as_ref()))
243 }
244
245 pub fn with_config(config: Config) -> Result<Self> {
269 config
271 .validate()
272 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
273
274 let store = Arc::new(LpgStore::new()?);
275 #[cfg(feature = "rdf")]
276 let rdf_store = Arc::new(RdfStore::new());
277 let transaction_manager = Arc::new(TransactionManager::new());
278
279 let buffer_config = BufferManagerConfig {
281 budget: config.memory_limit.unwrap_or_else(|| {
282 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
283 }),
284 spill_path: config
285 .spill_path
286 .clone()
287 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
288 ..BufferManagerConfig::default()
289 };
290 let buffer_manager = BufferManager::new(buffer_config);
291
292 let catalog = Arc::new(Catalog::new());
294
295 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
296
297 #[cfg(feature = "grafeo-file")]
299 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
300 if let Some(ref db_path) = config.path {
302 if db_path.exists() && db_path.is_file() {
303 let fm = GrafeoFileManager::open_read_only(db_path)?;
304 let snapshot_data = fm.read_snapshot()?;
305 if !snapshot_data.is_empty() {
306 Self::apply_snapshot_data(
307 &store,
308 &catalog,
309 #[cfg(feature = "rdf")]
310 &rdf_store,
311 &snapshot_data,
312 )?;
313 }
314 Some(Arc::new(fm))
315 } else {
316 return Err(grafeo_common::utils::error::Error::Internal(format!(
317 "read-only open requires an existing .grafeo file: {}",
318 db_path.display()
319 )));
320 }
321 } else {
322 return Err(grafeo_common::utils::error::Error::Internal(
323 "read-only mode requires a database path".to_string(),
324 ));
325 }
326 } else if let Some(ref db_path) = config.path {
327 if Self::should_use_single_file(db_path, config.storage_format) {
332 let fm = if db_path.exists() && db_path.is_file() {
333 GrafeoFileManager::open(db_path)?
334 } else if !db_path.exists() {
335 GrafeoFileManager::create(db_path)?
336 } else {
337 return Err(grafeo_common::utils::error::Error::Internal(format!(
339 "path exists but is not a file: {}",
340 db_path.display()
341 )));
342 };
343
344 let snapshot_data = fm.read_snapshot()?;
346 if !snapshot_data.is_empty() {
347 Self::apply_snapshot_data(
348 &store,
349 &catalog,
350 #[cfg(feature = "rdf")]
351 &rdf_store,
352 &snapshot_data,
353 )?;
354 }
355
356 #[cfg(feature = "wal")]
358 if config.wal_enabled && fm.has_sidecar_wal() {
359 let recovery = WalRecovery::new(fm.sidecar_wal_path());
360 let records = recovery.recover()?;
361 Self::apply_wal_records(
362 &store,
363 &catalog,
364 #[cfg(feature = "rdf")]
365 &rdf_store,
366 &records,
367 )?;
368 }
369
370 Some(Arc::new(fm))
371 } else {
372 None
373 }
374 } else {
375 None
376 };
377
378 #[cfg(feature = "wal")]
381 let wal = if is_read_only {
382 None
383 } else if config.wal_enabled {
384 if let Some(ref db_path) = config.path {
385 #[cfg(feature = "grafeo-file")]
387 let wal_path = if let Some(ref fm) = file_manager {
388 let p = fm.sidecar_wal_path();
389 std::fs::create_dir_all(&p)?;
390 p
391 } else {
392 std::fs::create_dir_all(db_path)?;
394 db_path.join("wal")
395 };
396
397 #[cfg(not(feature = "grafeo-file"))]
398 let wal_path = {
399 std::fs::create_dir_all(db_path)?;
400 db_path.join("wal")
401 };
402
403 #[cfg(feature = "grafeo-file")]
405 let is_single_file = file_manager.is_some();
406 #[cfg(not(feature = "grafeo-file"))]
407 let is_single_file = false;
408
409 if !is_single_file && wal_path.exists() {
410 let recovery = WalRecovery::new(&wal_path);
411 let records = recovery.recover()?;
412 Self::apply_wal_records(
413 &store,
414 &catalog,
415 #[cfg(feature = "rdf")]
416 &rdf_store,
417 &records,
418 )?;
419 }
420
421 let wal_durability = match config.wal_durability {
423 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
424 crate::config::DurabilityMode::Batch {
425 max_delay_ms,
426 max_records,
427 } => WalDurabilityMode::Batch {
428 max_delay_ms,
429 max_records,
430 },
431 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
432 WalDurabilityMode::Adaptive { target_interval_ms }
433 }
434 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
435 };
436 let wal_config = WalConfig {
437 durability: wal_durability,
438 ..WalConfig::default()
439 };
440 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
441 Some(Arc::new(wal_manager))
442 } else {
443 None
444 }
445 } else {
446 None
447 };
448
449 let query_cache = Arc::new(QueryCache::default());
451
452 #[cfg(feature = "temporal")]
455 transaction_manager.sync_epoch(store.current_epoch());
456
457 #[cfg(feature = "cdc")]
458 let cdc_enabled_val = config.cdc_enabled;
459
460 Ok(Self {
461 config,
462 store: Some(store),
463 catalog,
464 #[cfg(feature = "rdf")]
465 rdf_store,
466 transaction_manager,
467 buffer_manager,
468 #[cfg(feature = "wal")]
469 wal,
470 #[cfg(feature = "wal")]
471 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
472 query_cache,
473 commit_counter: Arc::new(AtomicUsize::new(0)),
474 is_open: RwLock::new(true),
475 #[cfg(feature = "cdc")]
476 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
477 #[cfg(feature = "cdc")]
478 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
479 #[cfg(feature = "embed")]
480 embedding_models: RwLock::new(hashbrown::HashMap::new()),
481 #[cfg(feature = "grafeo-file")]
482 file_manager,
483 external_read_store: None,
484 external_write_store: None,
485 #[cfg(feature = "metrics")]
486 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
487 current_graph: RwLock::new(None),
488 current_schema: RwLock::new(None),
489 read_only: is_read_only,
490 })
491 }
492
493 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
522 config
523 .validate()
524 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
525
526 let transaction_manager = Arc::new(TransactionManager::new());
527
528 let buffer_config = BufferManagerConfig {
529 budget: config.memory_limit.unwrap_or_else(|| {
530 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
531 }),
532 spill_path: None,
533 ..BufferManagerConfig::default()
534 };
535 let buffer_manager = BufferManager::new(buffer_config);
536
537 let query_cache = Arc::new(QueryCache::default());
538
539 #[cfg(feature = "cdc")]
540 let cdc_enabled_val = config.cdc_enabled;
541
542 Ok(Self {
543 config,
544 store: None,
545 catalog: Arc::new(Catalog::new()),
546 #[cfg(feature = "rdf")]
547 rdf_store: Arc::new(RdfStore::new()),
548 transaction_manager,
549 buffer_manager,
550 #[cfg(feature = "wal")]
551 wal: None,
552 #[cfg(feature = "wal")]
553 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
554 query_cache,
555 commit_counter: Arc::new(AtomicUsize::new(0)),
556 is_open: RwLock::new(true),
557 #[cfg(feature = "cdc")]
558 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
559 #[cfg(feature = "cdc")]
560 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
561 #[cfg(feature = "embed")]
562 embedding_models: RwLock::new(hashbrown::HashMap::new()),
563 #[cfg(feature = "grafeo-file")]
564 file_manager: None,
565 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
566 external_write_store: Some(store),
567 #[cfg(feature = "metrics")]
568 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
569 current_graph: RwLock::new(None),
570 current_schema: RwLock::new(None),
571 read_only: false,
572 })
573 }
574
575 pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
600 config
601 .validate()
602 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
603
604 let transaction_manager = Arc::new(TransactionManager::new());
605
606 let buffer_config = BufferManagerConfig {
607 budget: config.memory_limit.unwrap_or_else(|| {
608 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
609 }),
610 spill_path: None,
611 ..BufferManagerConfig::default()
612 };
613 let buffer_manager = BufferManager::new(buffer_config);
614
615 let query_cache = Arc::new(QueryCache::default());
616
617 #[cfg(feature = "cdc")]
618 let cdc_enabled_val = config.cdc_enabled;
619
620 Ok(Self {
621 config,
622 store: None,
623 catalog: Arc::new(Catalog::new()),
624 #[cfg(feature = "rdf")]
625 rdf_store: Arc::new(RdfStore::new()),
626 transaction_manager,
627 buffer_manager,
628 #[cfg(feature = "wal")]
629 wal: None,
630 #[cfg(feature = "wal")]
631 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
632 query_cache,
633 commit_counter: Arc::new(AtomicUsize::new(0)),
634 is_open: RwLock::new(true),
635 #[cfg(feature = "cdc")]
636 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
637 #[cfg(feature = "cdc")]
638 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
639 #[cfg(feature = "embed")]
640 embedding_models: RwLock::new(hashbrown::HashMap::new()),
641 #[cfg(feature = "grafeo-file")]
642 file_manager: None,
643 external_read_store: Some(store),
644 external_write_store: None,
645 #[cfg(feature = "metrics")]
646 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
647 current_graph: RwLock::new(None),
648 current_schema: RwLock::new(None),
649 read_only: true,
650 })
651 }
652
653 #[cfg(feature = "compact-store")]
671 pub fn compact(&mut self) -> Result<()> {
672 use grafeo_core::graph::compact::from_graph_store;
673
674 let current_store = self.graph_store();
675 let compact = from_graph_store(current_store.as_ref())
676 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
677
678 self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
679 self.external_write_store = None;
680 self.store = None;
681 self.read_only = true;
682 self.query_cache = Arc::new(QueryCache::default());
683
684 Ok(())
685 }
686
687 #[cfg(feature = "wal")]
693 fn apply_wal_records(
694 store: &Arc<LpgStore>,
695 catalog: &Catalog,
696 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
697 records: &[WalRecord],
698 ) -> Result<()> {
699 use crate::catalog::{
700 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
701 };
702 use grafeo_common::utils::error::Error;
703
704 let mut current_graph: Option<String> = None;
707 let mut target_store: Arc<LpgStore> = Arc::clone(store);
708
709 for record in records {
710 match record {
711 WalRecord::CreateNamedGraph { name } => {
713 let _ = store.create_graph(name);
714 }
715 WalRecord::DropNamedGraph { name } => {
716 store.drop_graph(name);
717 if current_graph.as_deref() == Some(name.as_str()) {
719 current_graph = None;
720 target_store = Arc::clone(store);
721 }
722 }
723 WalRecord::SwitchGraph { name } => {
724 current_graph.clone_from(name);
725 target_store = match ¤t_graph {
726 None => Arc::clone(store),
727 Some(graph_name) => store
728 .graph_or_create(graph_name)
729 .map_err(|e| Error::Internal(e.to_string()))?,
730 };
731 }
732
733 WalRecord::CreateNode { id, labels } => {
735 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
736 target_store.create_node_with_id(*id, &label_refs)?;
737 }
738 WalRecord::DeleteNode { id } => {
739 target_store.delete_node(*id);
740 }
741 WalRecord::CreateEdge {
742 id,
743 src,
744 dst,
745 edge_type,
746 } => {
747 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
748 }
749 WalRecord::DeleteEdge { id } => {
750 target_store.delete_edge(*id);
751 }
752 WalRecord::SetNodeProperty { id, key, value } => {
753 target_store.set_node_property(*id, key, value.clone());
754 }
755 WalRecord::SetEdgeProperty { id, key, value } => {
756 target_store.set_edge_property(*id, key, value.clone());
757 }
758 WalRecord::AddNodeLabel { id, label } => {
759 target_store.add_label(*id, label);
760 }
761 WalRecord::RemoveNodeLabel { id, label } => {
762 target_store.remove_label(*id, label);
763 }
764 WalRecord::RemoveNodeProperty { id, key } => {
765 target_store.remove_node_property(*id, key);
766 }
767 WalRecord::RemoveEdgeProperty { id, key } => {
768 target_store.remove_edge_property(*id, key);
769 }
770
771 WalRecord::CreateNodeType {
773 name,
774 properties,
775 constraints,
776 } => {
777 let def = NodeTypeDefinition {
778 name: name.clone(),
779 properties: properties
780 .iter()
781 .map(|(n, t, nullable)| TypedProperty {
782 name: n.clone(),
783 data_type: PropertyDataType::from_type_name(t),
784 nullable: *nullable,
785 default_value: None,
786 })
787 .collect(),
788 constraints: constraints
789 .iter()
790 .map(|(kind, props)| match kind.as_str() {
791 "unique" => TypeConstraint::Unique(props.clone()),
792 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
793 "not_null" if !props.is_empty() => {
794 TypeConstraint::NotNull(props[0].clone())
795 }
796 _ => TypeConstraint::Unique(props.clone()),
797 })
798 .collect(),
799 parent_types: Vec::new(),
800 };
801 let _ = catalog.register_node_type(def);
802 }
803 WalRecord::DropNodeType { name } => {
804 let _ = catalog.drop_node_type(name);
805 }
806 WalRecord::CreateEdgeType {
807 name,
808 properties,
809 constraints,
810 } => {
811 let def = EdgeTypeDefinition {
812 name: name.clone(),
813 properties: properties
814 .iter()
815 .map(|(n, t, nullable)| TypedProperty {
816 name: n.clone(),
817 data_type: PropertyDataType::from_type_name(t),
818 nullable: *nullable,
819 default_value: None,
820 })
821 .collect(),
822 constraints: constraints
823 .iter()
824 .map(|(kind, props)| match kind.as_str() {
825 "unique" => TypeConstraint::Unique(props.clone()),
826 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
827 "not_null" if !props.is_empty() => {
828 TypeConstraint::NotNull(props[0].clone())
829 }
830 _ => TypeConstraint::Unique(props.clone()),
831 })
832 .collect(),
833 source_node_types: Vec::new(),
834 target_node_types: Vec::new(),
835 };
836 let _ = catalog.register_edge_type_def(def);
837 }
838 WalRecord::DropEdgeType { name } => {
839 let _ = catalog.drop_edge_type_def(name);
840 }
841 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
842 }
845 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
846 }
849 WalRecord::CreateGraphType {
850 name,
851 node_types,
852 edge_types,
853 open,
854 } => {
855 use crate::catalog::GraphTypeDefinition;
856 let def = GraphTypeDefinition {
857 name: name.clone(),
858 allowed_node_types: node_types.clone(),
859 allowed_edge_types: edge_types.clone(),
860 open: *open,
861 };
862 let _ = catalog.register_graph_type(def);
863 }
864 WalRecord::DropGraphType { name } => {
865 let _ = catalog.drop_graph_type(name);
866 }
867 WalRecord::CreateSchema { name } => {
868 let _ = catalog.register_schema_namespace(name.clone());
869 }
870 WalRecord::DropSchema { name } => {
871 let _ = catalog.drop_schema_namespace(name);
872 }
873
874 WalRecord::AlterNodeType { name, alterations } => {
875 for (action, prop_name, type_name, nullable) in alterations {
876 match action.as_str() {
877 "add" => {
878 let prop = TypedProperty {
879 name: prop_name.clone(),
880 data_type: PropertyDataType::from_type_name(type_name),
881 nullable: *nullable,
882 default_value: None,
883 };
884 let _ = catalog.alter_node_type_add_property(name, prop);
885 }
886 "drop" => {
887 let _ = catalog.alter_node_type_drop_property(name, prop_name);
888 }
889 _ => {}
890 }
891 }
892 }
893 WalRecord::AlterEdgeType { name, alterations } => {
894 for (action, prop_name, type_name, nullable) in alterations {
895 match action.as_str() {
896 "add" => {
897 let prop = TypedProperty {
898 name: prop_name.clone(),
899 data_type: PropertyDataType::from_type_name(type_name),
900 nullable: *nullable,
901 default_value: None,
902 };
903 let _ = catalog.alter_edge_type_add_property(name, prop);
904 }
905 "drop" => {
906 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
907 }
908 _ => {}
909 }
910 }
911 }
912 WalRecord::AlterGraphType { name, alterations } => {
913 for (action, type_name) in alterations {
914 match action.as_str() {
915 "add_node" => {
916 let _ =
917 catalog.alter_graph_type_add_node_type(name, type_name.clone());
918 }
919 "drop_node" => {
920 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
921 }
922 "add_edge" => {
923 let _ =
924 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
925 }
926 "drop_edge" => {
927 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
928 }
929 _ => {}
930 }
931 }
932 }
933
934 WalRecord::CreateProcedure {
935 name,
936 params,
937 returns,
938 body,
939 } => {
940 use crate::catalog::ProcedureDefinition;
941 let def = ProcedureDefinition {
942 name: name.clone(),
943 params: params.clone(),
944 returns: returns.clone(),
945 body: body.clone(),
946 };
947 let _ = catalog.register_procedure(def);
948 }
949 WalRecord::DropProcedure { name } => {
950 let _ = catalog.drop_procedure(name);
951 }
952
953 #[cfg(feature = "rdf")]
955 WalRecord::InsertRdfTriple { .. }
956 | WalRecord::DeleteRdfTriple { .. }
957 | WalRecord::ClearRdfGraph { .. }
958 | WalRecord::CreateRdfGraph { .. }
959 | WalRecord::DropRdfGraph { .. } => {
960 rdf_ops::replay_rdf_wal_record(rdf_store, record);
961 }
962 #[cfg(not(feature = "rdf"))]
963 WalRecord::InsertRdfTriple { .. }
964 | WalRecord::DeleteRdfTriple { .. }
965 | WalRecord::ClearRdfGraph { .. }
966 | WalRecord::CreateRdfGraph { .. }
967 | WalRecord::DropRdfGraph { .. } => {}
968
969 WalRecord::TransactionCommit { .. } => {
970 #[cfg(feature = "temporal")]
974 {
975 target_store.new_epoch();
976 }
977 }
978 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
979 }
982 }
983 }
984 Ok(())
985 }
986
987 #[cfg(feature = "grafeo-file")]
993 fn should_use_single_file(
994 path: &std::path::Path,
995 configured: crate::config::StorageFormat,
996 ) -> bool {
997 use crate::config::StorageFormat;
998 match configured {
999 StorageFormat::SingleFile => true,
1000 StorageFormat::WalDirectory => false,
1001 StorageFormat::Auto => {
1002 if path.is_file() {
1004 if let Ok(mut f) = std::fs::File::open(path) {
1005 use std::io::Read;
1006 let mut magic = [0u8; 4];
1007 if f.read_exact(&mut magic).is_ok()
1008 && magic == grafeo_adapters::storage::file::MAGIC
1009 {
1010 return true;
1011 }
1012 }
1013 return false;
1014 }
1015 if path.is_dir() {
1017 return false;
1018 }
1019 path.extension().is_some_and(|ext| ext == "grafeo")
1021 }
1022 }
1023 }
1024
1025 #[cfg(feature = "grafeo-file")]
1027 fn apply_snapshot_data(
1028 store: &Arc<LpgStore>,
1029 catalog: &Arc<crate::catalog::Catalog>,
1030 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
1031 data: &[u8],
1032 ) -> Result<()> {
1033 persistence::load_snapshot_into_store(
1034 store,
1035 catalog,
1036 #[cfg(feature = "rdf")]
1037 rdf_store,
1038 data,
1039 )
1040 }
1041
1042 #[must_use]
1070 pub fn session(&self) -> Session {
1071 self.create_session_inner(None)
1072 }
1073
1074 #[cfg(feature = "cdc")]
1093 #[must_use]
1094 pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1095 self.create_session_inner(Some(cdc_enabled))
1096 }
1097
1098 #[must_use]
1105 pub fn session_read_only(&self) -> Session {
1106 self.create_session_inner_opts(None, true)
1107 }
1108
1109 #[allow(unused_variables)] fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1115 self.create_session_inner_opts(cdc_override, false)
1116 }
1117
1118 #[allow(unused_variables)]
1120 fn create_session_inner_opts(
1121 &self,
1122 cdc_override: Option<bool>,
1123 force_read_only: bool,
1124 ) -> Session {
1125 let session_cfg = || crate::session::SessionConfig {
1126 transaction_manager: Arc::clone(&self.transaction_manager),
1127 query_cache: Arc::clone(&self.query_cache),
1128 catalog: Arc::clone(&self.catalog),
1129 adaptive_config: self.config.adaptive.clone(),
1130 factorized_execution: self.config.factorized_execution,
1131 graph_model: self.config.graph_model,
1132 query_timeout: self.config.query_timeout,
1133 commit_counter: Arc::clone(&self.commit_counter),
1134 gc_interval: self.config.gc_interval,
1135 read_only: self.read_only || force_read_only,
1136 };
1137
1138 if let Some(ref ext_read) = self.external_read_store {
1139 return Session::with_external_store(
1140 Arc::clone(ext_read),
1141 self.external_write_store.as_ref().map(Arc::clone),
1142 session_cfg(),
1143 )
1144 .expect("arena allocation for external store session");
1145 }
1146
1147 #[cfg(feature = "rdf")]
1148 let mut session = Session::with_rdf_store_and_adaptive(
1149 Arc::clone(self.lpg_store()),
1150 Arc::clone(&self.rdf_store),
1151 session_cfg(),
1152 );
1153 #[cfg(not(feature = "rdf"))]
1154 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1155
1156 #[cfg(feature = "wal")]
1157 if let Some(ref wal) = self.wal {
1158 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1159 }
1160
1161 #[cfg(feature = "cdc")]
1162 {
1163 let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1164 if should_enable {
1165 session.set_cdc_log(Arc::clone(&self.cdc_log));
1166 }
1167 }
1168
1169 #[cfg(feature = "metrics")]
1170 {
1171 if let Some(ref m) = self.metrics {
1172 session.set_metrics(Arc::clone(m));
1173 m.session_created
1174 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1175 m.session_active
1176 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1177 }
1178 }
1179
1180 if let Some(ref graph) = *self.current_graph.read() {
1182 session.use_graph(graph);
1183 }
1184
1185 if let Some(ref schema) = *self.current_schema.read() {
1187 session.set_schema(schema);
1188 }
1189
1190 let _ = &mut session;
1192
1193 session
1194 }
1195
1196 #[must_use]
1202 pub fn current_graph(&self) -> Option<String> {
1203 self.current_graph.read().clone()
1204 }
1205
1206 pub fn set_current_graph(&self, name: Option<&str>) {
1211 *self.current_graph.write() = name.map(ToString::to_string);
1212 }
1213
1214 #[must_use]
1219 pub fn current_schema(&self) -> Option<String> {
1220 self.current_schema.read().clone()
1221 }
1222
1223 pub fn set_current_schema(&self, name: Option<&str>) {
1228 *self.current_schema.write() = name.map(ToString::to_string);
1229 }
1230
1231 #[must_use]
1233 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1234 &self.config.adaptive
1235 }
1236
1237 #[must_use]
1239 pub fn is_read_only(&self) -> bool {
1240 self.read_only
1241 }
1242
1243 #[must_use]
1245 pub fn config(&self) -> &Config {
1246 &self.config
1247 }
1248
1249 #[must_use]
1251 pub fn graph_model(&self) -> crate::config::GraphModel {
1252 self.config.graph_model
1253 }
1254
1255 #[must_use]
1257 pub fn memory_limit(&self) -> Option<usize> {
1258 self.config.memory_limit
1259 }
1260
1261 #[cfg(feature = "metrics")]
1266 #[must_use]
1267 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1268 let mut snapshot = self
1269 .metrics
1270 .as_ref()
1271 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1272
1273 let cache_stats = self.query_cache.stats();
1275 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1276 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1277 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1278 snapshot.cache_invalidations = cache_stats.invalidations;
1279
1280 snapshot
1281 }
1282
1283 #[cfg(feature = "metrics")]
1287 #[must_use]
1288 pub fn metrics_prometheus(&self) -> String {
1289 self.metrics
1290 .as_ref()
1291 .map_or_else(String::new, |m| m.to_prometheus())
1292 }
1293
1294 #[cfg(feature = "metrics")]
1296 pub fn reset_metrics(&self) {
1297 if let Some(ref m) = self.metrics {
1298 m.reset();
1299 }
1300 self.query_cache.reset_stats();
1301 }
1302
1303 #[must_use]
1311 pub fn store(&self) -> &Arc<LpgStore> {
1312 self.lpg_store()
1313 }
1314
1315 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
1322 let store = self.lpg_store();
1323 let graph_name = self.current_graph.read().clone();
1324 match graph_name {
1325 None => Arc::clone(store),
1326 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(store),
1327 Some(ref name) => store.graph(name).unwrap_or_else(|| Arc::clone(store)),
1328 }
1329 }
1330
1331 pub fn create_graph(&self, name: &str) -> Result<bool> {
1339 Ok(self.lpg_store().create_graph(name)?)
1340 }
1341
1342 pub fn drop_graph(&self, name: &str) -> bool {
1344 self.lpg_store().drop_graph(name)
1345 }
1346
1347 #[must_use]
1349 pub fn list_graphs(&self) -> Vec<String> {
1350 self.lpg_store().graph_names()
1351 }
1352
1353 #[must_use]
1362 pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1363 if let Some(ref ext_read) = self.external_read_store {
1364 Arc::clone(ext_read)
1365 } else {
1366 Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1367 }
1368 }
1369
1370 #[must_use]
1375 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1376 if self.external_read_store.is_some() {
1377 self.external_write_store.as_ref().map(Arc::clone)
1378 } else {
1379 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1380 }
1381 }
1382
1383 pub fn gc(&self) {
1389 let min_epoch = self.transaction_manager.min_active_epoch();
1390 self.lpg_store().gc_versions(min_epoch);
1391 self.transaction_manager.gc();
1392 }
1393
1394 #[must_use]
1396 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1397 &self.buffer_manager
1398 }
1399
1400 #[must_use]
1402 pub fn query_cache(&self) -> &Arc<QueryCache> {
1403 &self.query_cache
1404 }
1405
1406 pub fn clear_plan_cache(&self) {
1412 self.query_cache.clear();
1413 }
1414
1415 pub fn close(&self) -> Result<()> {
1429 let mut is_open = self.is_open.write();
1430 if !*is_open {
1431 return Ok(());
1432 }
1433
1434 if self.read_only {
1436 #[cfg(feature = "grafeo-file")]
1437 if let Some(ref fm) = self.file_manager {
1438 fm.close()?;
1439 }
1440 *is_open = false;
1441 return Ok(());
1442 }
1443
1444 #[cfg(feature = "grafeo-file")]
1448 let is_single_file = self.file_manager.is_some();
1449 #[cfg(not(feature = "grafeo-file"))]
1450 let is_single_file = false;
1451
1452 #[cfg(feature = "grafeo-file")]
1453 if let Some(ref fm) = self.file_manager {
1454 #[cfg(feature = "wal")]
1456 if let Some(ref wal) = self.wal {
1457 wal.sync()?;
1458 }
1459 self.checkpoint_to_file(fm)?;
1460
1461 #[cfg(feature = "wal")]
1464 if let Some(ref wal) = self.wal {
1465 wal.close_active_log();
1466 }
1467
1468 {
1469 use grafeo_core::testing::crash::maybe_crash;
1470 maybe_crash("close:before_remove_sidecar_wal");
1471 }
1472 fm.remove_sidecar_wal()?;
1473 fm.close()?;
1474 }
1475
1476 #[cfg(feature = "wal")]
1482 if !is_single_file && let Some(ref wal) = self.wal {
1483 let commit_tx = self
1485 .transaction_manager
1486 .last_assigned_transaction_id()
1487 .unwrap_or_else(|| self.transaction_manager.begin());
1488
1489 wal.log(&WalRecord::TransactionCommit {
1491 transaction_id: commit_tx,
1492 })?;
1493
1494 wal.sync()?;
1495 }
1496
1497 *is_open = false;
1498 Ok(())
1499 }
1500
1501 #[cfg(feature = "wal")]
1503 #[must_use]
1504 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1505 self.wal.as_ref()
1506 }
1507
1508 #[cfg(feature = "wal")]
1510 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1511 if let Some(ref wal) = self.wal {
1512 wal.log(record)?;
1513 }
1514 Ok(())
1515 }
1516
1517 #[cfg(feature = "grafeo-file")]
1523 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1524 use grafeo_core::testing::crash::maybe_crash;
1525
1526 maybe_crash("checkpoint_to_file:before_export");
1527 let snapshot_data = self.export_snapshot()?;
1528 maybe_crash("checkpoint_to_file:after_export");
1529
1530 let epoch = self.lpg_store().current_epoch();
1531 let transaction_id = self
1532 .transaction_manager
1533 .last_assigned_transaction_id()
1534 .map_or(0, |t| t.0);
1535 let node_count = self.lpg_store().node_count() as u64;
1536 let edge_count = self.lpg_store().edge_count() as u64;
1537
1538 fm.write_snapshot(
1539 &snapshot_data,
1540 epoch.0,
1541 transaction_id,
1542 node_count,
1543 edge_count,
1544 )?;
1545
1546 maybe_crash("checkpoint_to_file:after_write_snapshot");
1547 Ok(())
1548 }
1549
1550 #[cfg(feature = "grafeo-file")]
1552 #[must_use]
1553 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1554 self.file_manager.as_ref()
1555 }
1556}
1557
1558impl Drop for GrafeoDB {
1559 fn drop(&mut self) {
1560 if let Err(e) = self.close() {
1561 grafeo_error!("Error closing database: {}", e);
1562 }
1563 }
1564}
1565
1566impl crate::admin::AdminService for GrafeoDB {
1567 fn info(&self) -> crate::admin::DatabaseInfo {
1568 self.info()
1569 }
1570
1571 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1572 self.detailed_stats()
1573 }
1574
1575 fn schema(&self) -> crate::admin::SchemaInfo {
1576 self.schema()
1577 }
1578
1579 fn validate(&self) -> crate::admin::ValidationResult {
1580 self.validate()
1581 }
1582
1583 fn wal_status(&self) -> crate::admin::WalStatus {
1584 self.wal_status()
1585 }
1586
1587 fn wal_checkpoint(&self) -> Result<()> {
1588 self.wal_checkpoint()
1589 }
1590}
1591
1592#[derive(Debug)]
1622pub struct QueryResult {
1623 pub columns: Vec<String>,
1625 pub column_types: Vec<grafeo_common::types::LogicalType>,
1627 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1629 pub execution_time_ms: Option<f64>,
1631 pub rows_scanned: Option<u64>,
1633 pub status_message: Option<String>,
1635 pub gql_status: grafeo_common::utils::GqlStatus,
1637}
1638
1639impl QueryResult {
1640 #[must_use]
1642 pub fn empty() -> Self {
1643 Self {
1644 columns: Vec::new(),
1645 column_types: Vec::new(),
1646 rows: Vec::new(),
1647 execution_time_ms: None,
1648 rows_scanned: None,
1649 status_message: None,
1650 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1651 }
1652 }
1653
1654 #[must_use]
1656 pub fn status(msg: impl Into<String>) -> Self {
1657 Self {
1658 columns: Vec::new(),
1659 column_types: Vec::new(),
1660 rows: Vec::new(),
1661 execution_time_ms: None,
1662 rows_scanned: None,
1663 status_message: Some(msg.into()),
1664 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1665 }
1666 }
1667
1668 #[must_use]
1670 pub fn new(columns: Vec<String>) -> Self {
1671 let len = columns.len();
1672 Self {
1673 columns,
1674 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1675 rows: Vec::new(),
1676 execution_time_ms: None,
1677 rows_scanned: None,
1678 status_message: None,
1679 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1680 }
1681 }
1682
1683 #[must_use]
1685 pub fn with_types(
1686 columns: Vec<String>,
1687 column_types: Vec<grafeo_common::types::LogicalType>,
1688 ) -> Self {
1689 Self {
1690 columns,
1691 column_types,
1692 rows: Vec::new(),
1693 execution_time_ms: None,
1694 rows_scanned: None,
1695 status_message: None,
1696 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1697 }
1698 }
1699
1700 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1702 self.execution_time_ms = Some(execution_time_ms);
1703 self.rows_scanned = Some(rows_scanned);
1704 self
1705 }
1706
1707 #[must_use]
1709 pub fn execution_time_ms(&self) -> Option<f64> {
1710 self.execution_time_ms
1711 }
1712
1713 #[must_use]
1715 pub fn rows_scanned(&self) -> Option<u64> {
1716 self.rows_scanned
1717 }
1718
1719 #[must_use]
1721 pub fn row_count(&self) -> usize {
1722 self.rows.len()
1723 }
1724
1725 #[must_use]
1727 pub fn column_count(&self) -> usize {
1728 self.columns.len()
1729 }
1730
1731 #[must_use]
1733 pub fn is_empty(&self) -> bool {
1734 self.rows.is_empty()
1735 }
1736
1737 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1746 if self.rows.len() != 1 || self.columns.len() != 1 {
1747 return Err(grafeo_common::utils::error::Error::InvalidValue(
1748 "Expected single value".to_string(),
1749 ));
1750 }
1751 T::from_value(&self.rows[0][0])
1752 }
1753
1754 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1756 self.rows.iter()
1757 }
1758}
1759
1760impl std::fmt::Display for QueryResult {
1761 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1762 let table = grafeo_common::fmt::format_result_table(
1763 &self.columns,
1764 &self.rows,
1765 self.execution_time_ms,
1766 self.status_message.as_deref(),
1767 );
1768 f.write_str(&table)
1769 }
1770}
1771
1772pub trait FromValue: Sized {
1777 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1783}
1784
1785impl FromValue for i64 {
1786 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1787 value
1788 .as_int64()
1789 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1790 expected: "INT64".to_string(),
1791 found: value.type_name().to_string(),
1792 })
1793 }
1794}
1795
1796impl FromValue for f64 {
1797 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1798 value
1799 .as_float64()
1800 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1801 expected: "FLOAT64".to_string(),
1802 found: value.type_name().to_string(),
1803 })
1804 }
1805}
1806
1807impl FromValue for String {
1808 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1809 value.as_str().map(String::from).ok_or_else(|| {
1810 grafeo_common::utils::error::Error::TypeMismatch {
1811 expected: "STRING".to_string(),
1812 found: value.type_name().to_string(),
1813 }
1814 })
1815 }
1816}
1817
1818impl FromValue for bool {
1819 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1820 value
1821 .as_bool()
1822 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1823 expected: "BOOL".to_string(),
1824 found: value.type_name().to_string(),
1825 })
1826 }
1827}
1828
1829#[cfg(test)]
1830mod tests {
1831 use super::*;
1832
1833 #[test]
1834 fn test_create_in_memory_database() {
1835 let db = GrafeoDB::new_in_memory();
1836 assert_eq!(db.node_count(), 0);
1837 assert_eq!(db.edge_count(), 0);
1838 }
1839
1840 #[test]
1841 fn test_database_config() {
1842 let config = Config::in_memory().with_threads(4).with_query_logging();
1843
1844 let db = GrafeoDB::with_config(config).unwrap();
1845 assert_eq!(db.config().threads, 4);
1846 assert!(db.config().query_logging);
1847 }
1848
1849 #[test]
1850 fn test_database_session() {
1851 let db = GrafeoDB::new_in_memory();
1852 let _session = db.session();
1853 }
1855
1856 #[cfg(feature = "wal")]
1857 #[test]
1858 fn test_persistent_database_recovery() {
1859 use grafeo_common::types::Value;
1860 use tempfile::tempdir;
1861
1862 let dir = tempdir().unwrap();
1863 let db_path = dir.path().join("test_db");
1864
1865 {
1867 let db = GrafeoDB::open(&db_path).unwrap();
1868
1869 let alix = db.create_node(&["Person"]);
1870 db.set_node_property(alix, "name", Value::from("Alix"));
1871
1872 let gus = db.create_node(&["Person"]);
1873 db.set_node_property(gus, "name", Value::from("Gus"));
1874
1875 let _edge = db.create_edge(alix, gus, "KNOWS");
1876
1877 db.close().unwrap();
1879 }
1880
1881 {
1883 let db = GrafeoDB::open(&db_path).unwrap();
1884
1885 assert_eq!(db.node_count(), 2);
1886 assert_eq!(db.edge_count(), 1);
1887
1888 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1890 assert!(node0.is_some());
1891
1892 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1893 assert!(node1.is_some());
1894 }
1895 }
1896
1897 #[cfg(feature = "wal")]
1898 #[test]
1899 fn test_wal_logging() {
1900 use tempfile::tempdir;
1901
1902 let dir = tempdir().unwrap();
1903 let db_path = dir.path().join("wal_test_db");
1904
1905 let db = GrafeoDB::open(&db_path).unwrap();
1906
1907 let node = db.create_node(&["Test"]);
1909 db.delete_node(node);
1910
1911 if let Some(wal) = db.wal() {
1913 assert!(wal.record_count() > 0);
1914 }
1915
1916 db.close().unwrap();
1917 }
1918
1919 #[cfg(feature = "wal")]
1920 #[test]
1921 fn test_wal_recovery_multiple_sessions() {
1922 use grafeo_common::types::Value;
1924 use tempfile::tempdir;
1925
1926 let dir = tempdir().unwrap();
1927 let db_path = dir.path().join("multi_session_db");
1928
1929 {
1931 let db = GrafeoDB::open(&db_path).unwrap();
1932 let alix = db.create_node(&["Person"]);
1933 db.set_node_property(alix, "name", Value::from("Alix"));
1934 db.close().unwrap();
1935 }
1936
1937 {
1939 let db = GrafeoDB::open(&db_path).unwrap();
1940 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1942 db.set_node_property(gus, "name", Value::from("Gus"));
1943 db.close().unwrap();
1944 }
1945
1946 {
1948 let db = GrafeoDB::open(&db_path).unwrap();
1949 assert_eq!(db.node_count(), 2);
1950
1951 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1953 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1954
1955 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1956 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1957 }
1958 }
1959
1960 #[cfg(feature = "wal")]
1961 #[test]
1962 fn test_database_consistency_after_mutations() {
1963 use grafeo_common::types::Value;
1965 use tempfile::tempdir;
1966
1967 let dir = tempdir().unwrap();
1968 let db_path = dir.path().join("consistency_db");
1969
1970 {
1971 let db = GrafeoDB::open(&db_path).unwrap();
1972
1973 let a = db.create_node(&["Node"]);
1975 let b = db.create_node(&["Node"]);
1976 let c = db.create_node(&["Node"]);
1977
1978 let e1 = db.create_edge(a, b, "LINKS");
1980 let _e2 = db.create_edge(b, c, "LINKS");
1981
1982 db.delete_edge(e1);
1984 db.delete_node(b);
1985
1986 db.set_node_property(a, "value", Value::Int64(1));
1988 db.set_node_property(c, "value", Value::Int64(3));
1989
1990 db.close().unwrap();
1991 }
1992
1993 {
1995 let db = GrafeoDB::open(&db_path).unwrap();
1996
1997 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2001 assert!(node_a.is_some());
2002
2003 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2004 assert!(node_c.is_some());
2005
2006 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2008 assert!(node_b.is_none());
2009 }
2010 }
2011
2012 #[cfg(feature = "wal")]
2013 #[test]
2014 fn test_close_is_idempotent() {
2015 use tempfile::tempdir;
2017
2018 let dir = tempdir().unwrap();
2019 let db_path = dir.path().join("close_test_db");
2020
2021 let db = GrafeoDB::open(&db_path).unwrap();
2022 db.create_node(&["Test"]);
2023
2024 assert!(db.close().is_ok());
2026
2027 assert!(db.close().is_ok());
2029 }
2030
2031 #[test]
2032 fn test_with_store_external_backend() {
2033 use grafeo_core::graph::lpg::LpgStore;
2034
2035 let external = Arc::new(LpgStore::new().unwrap());
2036
2037 let n1 = external.create_node(&["Person"]);
2039 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2040
2041 let db = GrafeoDB::with_store(
2042 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2043 Config::in_memory(),
2044 )
2045 .unwrap();
2046
2047 let session = db.session();
2048
2049 #[cfg(feature = "gql")]
2051 {
2052 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2053 assert_eq!(result.rows.len(), 1);
2054 }
2055 }
2056
2057 #[test]
2058 fn test_with_config_custom_memory_limit() {
2059 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
2062 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2063 assert_eq!(db.node_count(), 0);
2064 }
2065
2066 #[cfg(feature = "metrics")]
2067 #[test]
2068 fn test_database_metrics_registry() {
2069 let db = GrafeoDB::new_in_memory();
2070
2071 db.create_node(&["Person"]);
2073 db.create_node(&["Person"]);
2074
2075 let snap = db.metrics();
2077 assert_eq!(snap.query_count, 0); }
2080
2081 #[test]
2082 fn test_query_result_has_metrics() {
2083 let db = GrafeoDB::new_in_memory();
2085 db.create_node(&["Person"]);
2086 db.create_node(&["Person"]);
2087
2088 #[cfg(feature = "gql")]
2089 {
2090 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2091
2092 assert!(result.execution_time_ms.is_some());
2094 assert!(result.rows_scanned.is_some());
2095 assert!(result.execution_time_ms.unwrap() >= 0.0);
2096 assert_eq!(result.rows_scanned.unwrap(), 2);
2097 }
2098 }
2099
2100 #[test]
2101 fn test_empty_query_result_metrics() {
2102 let db = GrafeoDB::new_in_memory();
2104 db.create_node(&["Person"]);
2105
2106 #[cfg(feature = "gql")]
2107 {
2108 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2110
2111 assert!(result.execution_time_ms.is_some());
2112 assert!(result.rows_scanned.is_some());
2113 assert_eq!(result.rows_scanned.unwrap(), 0);
2114 }
2115 }
2116
2117 #[cfg(feature = "cdc")]
2118 mod cdc_integration {
2119 use super::*;
2120
2121 fn cdc_db() -> GrafeoDB {
2123 GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2124 }
2125
2126 #[test]
2127 fn test_node_lifecycle_history() {
2128 let db = cdc_db();
2129
2130 let id = db.create_node(&["Person"]);
2132 db.set_node_property(id, "name", "Alix".into());
2134 db.set_node_property(id, "name", "Gus".into());
2135 db.delete_node(id);
2137
2138 let history = db.history(id).unwrap();
2139 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2141 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2142 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2144 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2146 }
2147
2148 #[test]
2149 fn test_edge_lifecycle_history() {
2150 let db = cdc_db();
2151
2152 let alix = db.create_node(&["Person"]);
2153 let gus = db.create_node(&["Person"]);
2154 let edge = db.create_edge(alix, gus, "KNOWS");
2155 db.set_edge_property(edge, "since", 2024i64.into());
2156 db.delete_edge(edge);
2157
2158 let history = db.history(edge).unwrap();
2159 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2161 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2162 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2163 }
2164
2165 #[test]
2166 fn test_create_node_with_props_cdc() {
2167 let db = cdc_db();
2168
2169 let id = db.create_node_with_props(
2170 &["Person"],
2171 vec![
2172 ("name", grafeo_common::types::Value::from("Alix")),
2173 ("age", grafeo_common::types::Value::from(30i64)),
2174 ],
2175 );
2176
2177 let history = db.history(id).unwrap();
2178 assert_eq!(history.len(), 1);
2179 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2180 let after = history[0].after.as_ref().unwrap();
2182 assert_eq!(after.len(), 2);
2183 }
2184
2185 #[test]
2186 fn test_changes_between() {
2187 let db = cdc_db();
2188
2189 let id1 = db.create_node(&["A"]);
2190 let _id2 = db.create_node(&["B"]);
2191 db.set_node_property(id1, "x", 1i64.into());
2192
2193 let changes = db
2195 .changes_between(
2196 grafeo_common::types::EpochId(0),
2197 grafeo_common::types::EpochId(u64::MAX),
2198 )
2199 .unwrap();
2200 assert_eq!(changes.len(), 3); }
2202
2203 #[test]
2204 fn test_cdc_disabled_by_default() {
2205 let db = GrafeoDB::new_in_memory();
2206 assert!(!db.is_cdc_enabled());
2207
2208 let id = db.create_node(&["Person"]);
2209 db.set_node_property(id, "name", "Alix".into());
2210
2211 let history = db.history(id).unwrap();
2212 assert!(history.is_empty(), "CDC off by default: no events recorded");
2213 }
2214
2215 #[test]
2216 fn test_session_with_cdc_override_on() {
2217 let db = GrafeoDB::new_in_memory();
2219 let session = db.session_with_cdc(true);
2220 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2221 let changes = db
2223 .changes_between(
2224 grafeo_common::types::EpochId(0),
2225 grafeo_common::types::EpochId(u64::MAX),
2226 )
2227 .unwrap();
2228 assert!(
2229 !changes.is_empty(),
2230 "session_with_cdc(true) should record events"
2231 );
2232 }
2233
2234 #[test]
2235 fn test_session_with_cdc_override_off() {
2236 let db = cdc_db();
2238 let session = db.session_with_cdc(false);
2239 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2240 let changes = db
2241 .changes_between(
2242 grafeo_common::types::EpochId(0),
2243 grafeo_common::types::EpochId(u64::MAX),
2244 )
2245 .unwrap();
2246 assert!(
2247 changes.is_empty(),
2248 "session_with_cdc(false) should not record events"
2249 );
2250 }
2251
2252 #[test]
2253 fn test_set_cdc_enabled_runtime() {
2254 let db = GrafeoDB::new_in_memory();
2255 assert!(!db.is_cdc_enabled());
2256
2257 db.set_cdc_enabled(true);
2259 assert!(db.is_cdc_enabled());
2260
2261 let id = db.create_node(&["Person"]);
2262 let history = db.history(id).unwrap();
2263 assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
2264
2265 db.set_cdc_enabled(false);
2267 let id2 = db.create_node(&["Person"]);
2268 let history2 = db.history(id2).unwrap();
2269 assert!(
2270 history2.is_empty(),
2271 "CDC disabled at runtime stops recording"
2272 );
2273 }
2274 }
2275
2276 #[test]
2277 fn test_with_store_basic() {
2278 use grafeo_core::graph::lpg::LpgStore;
2279
2280 let store = Arc::new(LpgStore::new().unwrap());
2281 let n1 = store.create_node(&["Person"]);
2282 store.set_node_property(n1, "name", "Alix".into());
2283
2284 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2285 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2286
2287 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
2288 assert_eq!(result.rows.len(), 1);
2289 }
2290
2291 #[test]
2292 fn test_with_store_session() {
2293 use grafeo_core::graph::lpg::LpgStore;
2294
2295 let store = Arc::new(LpgStore::new().unwrap());
2296 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2297 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2298
2299 let session = db.session();
2300 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
2301 assert_eq!(result.rows.len(), 1);
2302 }
2303
2304 #[test]
2305 fn test_with_store_mutations() {
2306 use grafeo_core::graph::lpg::LpgStore;
2307
2308 let store = Arc::new(LpgStore::new().unwrap());
2309 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2310 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2311
2312 let mut session = db.session();
2313
2314 session.begin_transaction().unwrap();
2318 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2319
2320 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2321 assert_eq!(result.rows.len(), 1);
2322
2323 session.commit().unwrap();
2324 }
2325}