1mod admin;
15#[cfg(feature = "async-storage")]
16mod async_ops;
17#[cfg(feature = "async-storage")]
18pub(crate) mod async_wal_store;
19#[cfg(feature = "cdc")]
20pub(crate) mod cdc_store;
21mod crud;
22#[cfg(feature = "embed")]
23mod embed;
24mod index;
25mod persistence;
26mod query;
27#[cfg(feature = "rdf")]
28mod rdf_ops;
29mod search;
30#[cfg(feature = "wal")]
31pub(crate) mod wal_store;
32
33use grafeo_common::grafeo_error;
34#[cfg(feature = "wal")]
35use std::path::Path;
36use std::sync::Arc;
37use std::sync::atomic::AtomicUsize;
38
39use parking_lot::RwLock;
40
41#[cfg(feature = "grafeo-file")]
42use grafeo_adapters::storage::file::GrafeoFileManager;
43#[cfg(feature = "wal")]
44use grafeo_adapters::storage::wal::{
45 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
46};
47use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
48use grafeo_common::utils::error::Result;
49use grafeo_core::graph::lpg::LpgStore;
50#[cfg(feature = "rdf")]
51use grafeo_core::graph::rdf::RdfStore;
52use grafeo_core::graph::{GraphStore, GraphStoreMut};
53
54use crate::catalog::Catalog;
55use crate::config::Config;
56use crate::query::cache::QueryCache;
57use crate::session::Session;
58use crate::transaction::TransactionManager;
59
60pub struct GrafeoDB {
83 pub(super) config: Config,
85 pub(super) store: Option<Arc<LpgStore>>,
87 pub(super) catalog: Arc<Catalog>,
89 #[cfg(feature = "rdf")]
91 pub(super) rdf_store: Arc<RdfStore>,
92 pub(super) transaction_manager: Arc<TransactionManager>,
94 pub(super) buffer_manager: Arc<BufferManager>,
96 #[cfg(feature = "wal")]
98 pub(super) wal: Option<Arc<LpgWal>>,
99 #[cfg(feature = "wal")]
103 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
104 pub(super) query_cache: Arc<QueryCache>,
106 pub(super) commit_counter: Arc<AtomicUsize>,
108 pub(super) is_open: RwLock<bool>,
110 #[cfg(feature = "cdc")]
112 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
113 #[cfg(feature = "cdc")]
115 cdc_enabled: std::sync::atomic::AtomicBool,
116 #[cfg(feature = "embed")]
118 pub(super) embedding_models:
119 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
120 #[cfg(feature = "grafeo-file")]
122 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
123 pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
126 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
129 #[cfg(feature = "metrics")]
131 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
132 current_graph: RwLock<Option<String>>,
136 current_schema: RwLock<Option<String>>,
140 read_only: bool,
143}
144
145impl GrafeoDB {
146 fn lpg_store(&self) -> &Arc<LpgStore> {
154 self.store.as_ref().expect(
155 "no built-in LpgStore: this GrafeoDB was created with an external store \
156 (with_store / with_read_store). Use session() or graph_store() instead.",
157 )
158 }
159
160 #[cfg(feature = "cdc")]
162 #[inline]
163 pub(super) fn cdc_active(&self) -> bool {
164 self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
165 }
166
167 #[must_use]
188 pub fn new_in_memory() -> Self {
189 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
190 }
191
192 #[cfg(feature = "wal")]
211 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
212 Self::with_config(Config::persistent(path.as_ref()))
213 }
214
215 #[cfg(feature = "grafeo-file")]
240 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
241 Self::with_config(Config::read_only(path.as_ref()))
242 }
243
244 pub fn with_config(config: Config) -> Result<Self> {
268 config
270 .validate()
271 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
272
273 let store = Arc::new(LpgStore::new()?);
274 #[cfg(feature = "rdf")]
275 let rdf_store = Arc::new(RdfStore::new());
276 let transaction_manager = Arc::new(TransactionManager::new());
277
278 let buffer_config = BufferManagerConfig {
280 budget: config.memory_limit.unwrap_or_else(|| {
281 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
282 }),
283 spill_path: config
284 .spill_path
285 .clone()
286 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
287 ..BufferManagerConfig::default()
288 };
289 let buffer_manager = BufferManager::new(buffer_config);
290
291 let catalog = Arc::new(Catalog::new());
293
294 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
295
296 #[cfg(feature = "grafeo-file")]
298 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
299 if let Some(ref db_path) = config.path {
301 if db_path.exists() && db_path.is_file() {
302 let fm = GrafeoFileManager::open_read_only(db_path)?;
303 let snapshot_data = fm.read_snapshot()?;
304 if !snapshot_data.is_empty() {
305 Self::apply_snapshot_data(
306 &store,
307 &catalog,
308 #[cfg(feature = "rdf")]
309 &rdf_store,
310 &snapshot_data,
311 )?;
312 }
313 Some(Arc::new(fm))
314 } else {
315 return Err(grafeo_common::utils::error::Error::Internal(format!(
316 "read-only open requires an existing .grafeo file: {}",
317 db_path.display()
318 )));
319 }
320 } else {
321 return Err(grafeo_common::utils::error::Error::Internal(
322 "read-only mode requires a database path".to_string(),
323 ));
324 }
325 } else if let Some(ref db_path) = config.path {
326 if Self::should_use_single_file(db_path, config.storage_format) {
331 let fm = if db_path.exists() && db_path.is_file() {
332 GrafeoFileManager::open(db_path)?
333 } else if !db_path.exists() {
334 GrafeoFileManager::create(db_path)?
335 } else {
336 return Err(grafeo_common::utils::error::Error::Internal(format!(
338 "path exists but is not a file: {}",
339 db_path.display()
340 )));
341 };
342
343 let snapshot_data = fm.read_snapshot()?;
345 if !snapshot_data.is_empty() {
346 Self::apply_snapshot_data(
347 &store,
348 &catalog,
349 #[cfg(feature = "rdf")]
350 &rdf_store,
351 &snapshot_data,
352 )?;
353 }
354
355 #[cfg(feature = "wal")]
357 if config.wal_enabled && fm.has_sidecar_wal() {
358 let recovery = WalRecovery::new(fm.sidecar_wal_path());
359 let records = recovery.recover()?;
360 Self::apply_wal_records(
361 &store,
362 &catalog,
363 #[cfg(feature = "rdf")]
364 &rdf_store,
365 &records,
366 )?;
367 }
368
369 Some(Arc::new(fm))
370 } else {
371 None
372 }
373 } else {
374 None
375 };
376
377 #[cfg(feature = "wal")]
380 let wal = if is_read_only {
381 None
382 } else if config.wal_enabled {
383 if let Some(ref db_path) = config.path {
384 #[cfg(feature = "grafeo-file")]
386 let wal_path = if let Some(ref fm) = file_manager {
387 let p = fm.sidecar_wal_path();
388 std::fs::create_dir_all(&p)?;
389 p
390 } else {
391 std::fs::create_dir_all(db_path)?;
393 db_path.join("wal")
394 };
395
396 #[cfg(not(feature = "grafeo-file"))]
397 let wal_path = {
398 std::fs::create_dir_all(db_path)?;
399 db_path.join("wal")
400 };
401
402 #[cfg(feature = "grafeo-file")]
404 let is_single_file = file_manager.is_some();
405 #[cfg(not(feature = "grafeo-file"))]
406 let is_single_file = false;
407
408 if !is_single_file && wal_path.exists() {
409 let recovery = WalRecovery::new(&wal_path);
410 let records = recovery.recover()?;
411 Self::apply_wal_records(
412 &store,
413 &catalog,
414 #[cfg(feature = "rdf")]
415 &rdf_store,
416 &records,
417 )?;
418 }
419
420 let wal_durability = match config.wal_durability {
422 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
423 crate::config::DurabilityMode::Batch {
424 max_delay_ms,
425 max_records,
426 } => WalDurabilityMode::Batch {
427 max_delay_ms,
428 max_records,
429 },
430 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
431 WalDurabilityMode::Adaptive { target_interval_ms }
432 }
433 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
434 };
435 let wal_config = WalConfig {
436 durability: wal_durability,
437 ..WalConfig::default()
438 };
439 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
440 Some(Arc::new(wal_manager))
441 } else {
442 None
443 }
444 } else {
445 None
446 };
447
448 let query_cache = Arc::new(QueryCache::default());
450
451 #[cfg(feature = "temporal")]
454 transaction_manager.sync_epoch(store.current_epoch());
455
456 #[cfg(feature = "cdc")]
457 let cdc_enabled_val = config.cdc_enabled;
458
459 Ok(Self {
460 config,
461 store: Some(store),
462 catalog,
463 #[cfg(feature = "rdf")]
464 rdf_store,
465 transaction_manager,
466 buffer_manager,
467 #[cfg(feature = "wal")]
468 wal,
469 #[cfg(feature = "wal")]
470 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
471 query_cache,
472 commit_counter: Arc::new(AtomicUsize::new(0)),
473 is_open: RwLock::new(true),
474 #[cfg(feature = "cdc")]
475 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
476 #[cfg(feature = "cdc")]
477 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
478 #[cfg(feature = "embed")]
479 embedding_models: RwLock::new(hashbrown::HashMap::new()),
480 #[cfg(feature = "grafeo-file")]
481 file_manager,
482 external_read_store: None,
483 external_write_store: None,
484 #[cfg(feature = "metrics")]
485 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
486 current_graph: RwLock::new(None),
487 current_schema: RwLock::new(None),
488 read_only: is_read_only,
489 })
490 }
491
492 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
517 config
518 .validate()
519 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
520
521 let transaction_manager = Arc::new(TransactionManager::new());
522
523 let buffer_config = BufferManagerConfig {
524 budget: config.memory_limit.unwrap_or_else(|| {
525 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
526 }),
527 spill_path: None,
528 ..BufferManagerConfig::default()
529 };
530 let buffer_manager = BufferManager::new(buffer_config);
531
532 let query_cache = Arc::new(QueryCache::default());
533
534 #[cfg(feature = "cdc")]
535 let cdc_enabled_val = config.cdc_enabled;
536
537 Ok(Self {
538 config,
539 store: None,
540 catalog: Arc::new(Catalog::new()),
541 #[cfg(feature = "rdf")]
542 rdf_store: Arc::new(RdfStore::new()),
543 transaction_manager,
544 buffer_manager,
545 #[cfg(feature = "wal")]
546 wal: None,
547 #[cfg(feature = "wal")]
548 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
549 query_cache,
550 commit_counter: Arc::new(AtomicUsize::new(0)),
551 is_open: RwLock::new(true),
552 #[cfg(feature = "cdc")]
553 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
554 #[cfg(feature = "cdc")]
555 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
556 #[cfg(feature = "embed")]
557 embedding_models: RwLock::new(hashbrown::HashMap::new()),
558 #[cfg(feature = "grafeo-file")]
559 file_manager: None,
560 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
561 external_write_store: Some(store),
562 #[cfg(feature = "metrics")]
563 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
564 current_graph: RwLock::new(None),
565 current_schema: RwLock::new(None),
566 read_only: false,
567 })
568 }
569
570 pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
591 config
592 .validate()
593 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
594
595 let transaction_manager = Arc::new(TransactionManager::new());
596
597 let buffer_config = BufferManagerConfig {
598 budget: config.memory_limit.unwrap_or_else(|| {
599 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
600 }),
601 spill_path: None,
602 ..BufferManagerConfig::default()
603 };
604 let buffer_manager = BufferManager::new(buffer_config);
605
606 let query_cache = Arc::new(QueryCache::default());
607
608 #[cfg(feature = "cdc")]
609 let cdc_enabled_val = config.cdc_enabled;
610
611 Ok(Self {
612 config,
613 store: None,
614 catalog: Arc::new(Catalog::new()),
615 #[cfg(feature = "rdf")]
616 rdf_store: Arc::new(RdfStore::new()),
617 transaction_manager,
618 buffer_manager,
619 #[cfg(feature = "wal")]
620 wal: None,
621 #[cfg(feature = "wal")]
622 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
623 query_cache,
624 commit_counter: Arc::new(AtomicUsize::new(0)),
625 is_open: RwLock::new(true),
626 #[cfg(feature = "cdc")]
627 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
628 #[cfg(feature = "cdc")]
629 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
630 #[cfg(feature = "embed")]
631 embedding_models: RwLock::new(hashbrown::HashMap::new()),
632 #[cfg(feature = "grafeo-file")]
633 file_manager: None,
634 external_read_store: Some(store),
635 external_write_store: None,
636 #[cfg(feature = "metrics")]
637 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
638 current_graph: RwLock::new(None),
639 current_schema: RwLock::new(None),
640 read_only: true,
641 })
642 }
643
644 #[cfg(feature = "compact-store")]
662 pub fn compact(&mut self) -> Result<()> {
663 use grafeo_core::graph::compact::from_graph_store;
664
665 let current_store = self.graph_store();
666 let compact = from_graph_store(current_store.as_ref())
667 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
668
669 self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
670 self.external_write_store = None;
671 self.store = None;
672 self.read_only = true;
673 self.query_cache = Arc::new(QueryCache::default());
674
675 Ok(())
676 }
677
678 #[cfg(feature = "wal")]
684 fn apply_wal_records(
685 store: &Arc<LpgStore>,
686 catalog: &Catalog,
687 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
688 records: &[WalRecord],
689 ) -> Result<()> {
690 use crate::catalog::{
691 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
692 };
693 use grafeo_common::utils::error::Error;
694
695 let mut current_graph: Option<String> = None;
698 let mut target_store: Arc<LpgStore> = Arc::clone(store);
699
700 for record in records {
701 match record {
702 WalRecord::CreateNamedGraph { name } => {
704 let _ = store.create_graph(name);
705 }
706 WalRecord::DropNamedGraph { name } => {
707 store.drop_graph(name);
708 if current_graph.as_deref() == Some(name.as_str()) {
710 current_graph = None;
711 target_store = Arc::clone(store);
712 }
713 }
714 WalRecord::SwitchGraph { name } => {
715 current_graph.clone_from(name);
716 target_store = match ¤t_graph {
717 None => Arc::clone(store),
718 Some(graph_name) => store
719 .graph_or_create(graph_name)
720 .map_err(|e| Error::Internal(e.to_string()))?,
721 };
722 }
723
724 WalRecord::CreateNode { id, labels } => {
726 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
727 target_store.create_node_with_id(*id, &label_refs)?;
728 }
729 WalRecord::DeleteNode { id } => {
730 target_store.delete_node(*id);
731 }
732 WalRecord::CreateEdge {
733 id,
734 src,
735 dst,
736 edge_type,
737 } => {
738 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
739 }
740 WalRecord::DeleteEdge { id } => {
741 target_store.delete_edge(*id);
742 }
743 WalRecord::SetNodeProperty { id, key, value } => {
744 target_store.set_node_property(*id, key, value.clone());
745 }
746 WalRecord::SetEdgeProperty { id, key, value } => {
747 target_store.set_edge_property(*id, key, value.clone());
748 }
749 WalRecord::AddNodeLabel { id, label } => {
750 target_store.add_label(*id, label);
751 }
752 WalRecord::RemoveNodeLabel { id, label } => {
753 target_store.remove_label(*id, label);
754 }
755 WalRecord::RemoveNodeProperty { id, key } => {
756 target_store.remove_node_property(*id, key);
757 }
758 WalRecord::RemoveEdgeProperty { id, key } => {
759 target_store.remove_edge_property(*id, key);
760 }
761
762 WalRecord::CreateNodeType {
764 name,
765 properties,
766 constraints,
767 } => {
768 let def = NodeTypeDefinition {
769 name: name.clone(),
770 properties: properties
771 .iter()
772 .map(|(n, t, nullable)| TypedProperty {
773 name: n.clone(),
774 data_type: PropertyDataType::from_type_name(t),
775 nullable: *nullable,
776 default_value: None,
777 })
778 .collect(),
779 constraints: constraints
780 .iter()
781 .map(|(kind, props)| match kind.as_str() {
782 "unique" => TypeConstraint::Unique(props.clone()),
783 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
784 "not_null" if !props.is_empty() => {
785 TypeConstraint::NotNull(props[0].clone())
786 }
787 _ => TypeConstraint::Unique(props.clone()),
788 })
789 .collect(),
790 parent_types: Vec::new(),
791 };
792 let _ = catalog.register_node_type(def);
793 }
794 WalRecord::DropNodeType { name } => {
795 let _ = catalog.drop_node_type(name);
796 }
797 WalRecord::CreateEdgeType {
798 name,
799 properties,
800 constraints,
801 } => {
802 let def = EdgeTypeDefinition {
803 name: name.clone(),
804 properties: properties
805 .iter()
806 .map(|(n, t, nullable)| TypedProperty {
807 name: n.clone(),
808 data_type: PropertyDataType::from_type_name(t),
809 nullable: *nullable,
810 default_value: None,
811 })
812 .collect(),
813 constraints: constraints
814 .iter()
815 .map(|(kind, props)| match kind.as_str() {
816 "unique" => TypeConstraint::Unique(props.clone()),
817 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
818 "not_null" if !props.is_empty() => {
819 TypeConstraint::NotNull(props[0].clone())
820 }
821 _ => TypeConstraint::Unique(props.clone()),
822 })
823 .collect(),
824 source_node_types: Vec::new(),
825 target_node_types: Vec::new(),
826 };
827 let _ = catalog.register_edge_type_def(def);
828 }
829 WalRecord::DropEdgeType { name } => {
830 let _ = catalog.drop_edge_type_def(name);
831 }
832 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
833 }
836 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
837 }
840 WalRecord::CreateGraphType {
841 name,
842 node_types,
843 edge_types,
844 open,
845 } => {
846 use crate::catalog::GraphTypeDefinition;
847 let def = GraphTypeDefinition {
848 name: name.clone(),
849 allowed_node_types: node_types.clone(),
850 allowed_edge_types: edge_types.clone(),
851 open: *open,
852 };
853 let _ = catalog.register_graph_type(def);
854 }
855 WalRecord::DropGraphType { name } => {
856 let _ = catalog.drop_graph_type(name);
857 }
858 WalRecord::CreateSchema { name } => {
859 let _ = catalog.register_schema_namespace(name.clone());
860 }
861 WalRecord::DropSchema { name } => {
862 let _ = catalog.drop_schema_namespace(name);
863 }
864
865 WalRecord::AlterNodeType { name, alterations } => {
866 for (action, prop_name, type_name, nullable) in alterations {
867 match action.as_str() {
868 "add" => {
869 let prop = TypedProperty {
870 name: prop_name.clone(),
871 data_type: PropertyDataType::from_type_name(type_name),
872 nullable: *nullable,
873 default_value: None,
874 };
875 let _ = catalog.alter_node_type_add_property(name, prop);
876 }
877 "drop" => {
878 let _ = catalog.alter_node_type_drop_property(name, prop_name);
879 }
880 _ => {}
881 }
882 }
883 }
884 WalRecord::AlterEdgeType { name, alterations } => {
885 for (action, prop_name, type_name, nullable) in alterations {
886 match action.as_str() {
887 "add" => {
888 let prop = TypedProperty {
889 name: prop_name.clone(),
890 data_type: PropertyDataType::from_type_name(type_name),
891 nullable: *nullable,
892 default_value: None,
893 };
894 let _ = catalog.alter_edge_type_add_property(name, prop);
895 }
896 "drop" => {
897 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
898 }
899 _ => {}
900 }
901 }
902 }
903 WalRecord::AlterGraphType { name, alterations } => {
904 for (action, type_name) in alterations {
905 match action.as_str() {
906 "add_node" => {
907 let _ =
908 catalog.alter_graph_type_add_node_type(name, type_name.clone());
909 }
910 "drop_node" => {
911 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
912 }
913 "add_edge" => {
914 let _ =
915 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
916 }
917 "drop_edge" => {
918 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
919 }
920 _ => {}
921 }
922 }
923 }
924
925 WalRecord::CreateProcedure {
926 name,
927 params,
928 returns,
929 body,
930 } => {
931 use crate::catalog::ProcedureDefinition;
932 let def = ProcedureDefinition {
933 name: name.clone(),
934 params: params.clone(),
935 returns: returns.clone(),
936 body: body.clone(),
937 };
938 let _ = catalog.register_procedure(def);
939 }
940 WalRecord::DropProcedure { name } => {
941 let _ = catalog.drop_procedure(name);
942 }
943
944 #[cfg(feature = "rdf")]
946 WalRecord::InsertRdfTriple { .. }
947 | WalRecord::DeleteRdfTriple { .. }
948 | WalRecord::ClearRdfGraph { .. }
949 | WalRecord::CreateRdfGraph { .. }
950 | WalRecord::DropRdfGraph { .. } => {
951 rdf_ops::replay_rdf_wal_record(rdf_store, record);
952 }
953 #[cfg(not(feature = "rdf"))]
954 WalRecord::InsertRdfTriple { .. }
955 | WalRecord::DeleteRdfTriple { .. }
956 | WalRecord::ClearRdfGraph { .. }
957 | WalRecord::CreateRdfGraph { .. }
958 | WalRecord::DropRdfGraph { .. } => {}
959
960 WalRecord::TransactionCommit { .. } => {
961 #[cfg(feature = "temporal")]
965 {
966 target_store.new_epoch();
967 }
968 }
969 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
970 }
973 }
974 }
975 Ok(())
976 }
977
978 #[cfg(feature = "grafeo-file")]
984 fn should_use_single_file(
985 path: &std::path::Path,
986 configured: crate::config::StorageFormat,
987 ) -> bool {
988 use crate::config::StorageFormat;
989 match configured {
990 StorageFormat::SingleFile => true,
991 StorageFormat::WalDirectory => false,
992 StorageFormat::Auto => {
993 if path.is_file() {
995 if let Ok(mut f) = std::fs::File::open(path) {
996 use std::io::Read;
997 let mut magic = [0u8; 4];
998 if f.read_exact(&mut magic).is_ok()
999 && magic == grafeo_adapters::storage::file::MAGIC
1000 {
1001 return true;
1002 }
1003 }
1004 return false;
1005 }
1006 if path.is_dir() {
1008 return false;
1009 }
1010 path.extension().is_some_and(|ext| ext == "grafeo")
1012 }
1013 }
1014 }
1015
1016 #[cfg(feature = "grafeo-file")]
1018 fn apply_snapshot_data(
1019 store: &Arc<LpgStore>,
1020 catalog: &Arc<crate::catalog::Catalog>,
1021 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
1022 data: &[u8],
1023 ) -> Result<()> {
1024 persistence::load_snapshot_into_store(
1025 store,
1026 catalog,
1027 #[cfg(feature = "rdf")]
1028 rdf_store,
1029 data,
1030 )
1031 }
1032
1033 #[must_use]
1061 pub fn session(&self) -> Session {
1062 self.create_session_inner(None)
1063 }
1064
1065 #[cfg(feature = "cdc")]
1084 #[must_use]
1085 pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1086 self.create_session_inner(Some(cdc_enabled))
1087 }
1088
1089 #[must_use]
1096 pub fn session_read_only(&self) -> Session {
1097 self.create_session_inner_opts(None, true)
1098 }
1099
1100 #[allow(unused_variables)] fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1106 self.create_session_inner_opts(cdc_override, false)
1107 }
1108
1109 #[allow(unused_variables)]
1111 fn create_session_inner_opts(
1112 &self,
1113 cdc_override: Option<bool>,
1114 force_read_only: bool,
1115 ) -> Session {
1116 let session_cfg = || crate::session::SessionConfig {
1117 transaction_manager: Arc::clone(&self.transaction_manager),
1118 query_cache: Arc::clone(&self.query_cache),
1119 catalog: Arc::clone(&self.catalog),
1120 adaptive_config: self.config.adaptive.clone(),
1121 factorized_execution: self.config.factorized_execution,
1122 graph_model: self.config.graph_model,
1123 query_timeout: self.config.query_timeout,
1124 commit_counter: Arc::clone(&self.commit_counter),
1125 gc_interval: self.config.gc_interval,
1126 read_only: self.read_only || force_read_only,
1127 };
1128
1129 if let Some(ref ext_read) = self.external_read_store {
1130 return Session::with_external_store(
1131 Arc::clone(ext_read),
1132 self.external_write_store.as_ref().map(Arc::clone),
1133 session_cfg(),
1134 )
1135 .expect("arena allocation for external store session");
1136 }
1137
1138 #[cfg(feature = "rdf")]
1139 let mut session = Session::with_rdf_store_and_adaptive(
1140 Arc::clone(self.lpg_store()),
1141 Arc::clone(&self.rdf_store),
1142 session_cfg(),
1143 );
1144 #[cfg(not(feature = "rdf"))]
1145 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1146
1147 #[cfg(feature = "wal")]
1148 if let Some(ref wal) = self.wal {
1149 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1150 }
1151
1152 #[cfg(feature = "cdc")]
1153 {
1154 let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1155 if should_enable {
1156 session.set_cdc_log(Arc::clone(&self.cdc_log));
1157 }
1158 }
1159
1160 #[cfg(feature = "metrics")]
1161 {
1162 if let Some(ref m) = self.metrics {
1163 session.set_metrics(Arc::clone(m));
1164 m.session_created
1165 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1166 m.session_active
1167 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1168 }
1169 }
1170
1171 if let Some(ref graph) = *self.current_graph.read() {
1173 session.use_graph(graph);
1174 }
1175
1176 if let Some(ref schema) = *self.current_schema.read() {
1178 session.set_schema(schema);
1179 }
1180
1181 let _ = &mut session;
1183
1184 session
1185 }
1186
1187 #[must_use]
1193 pub fn current_graph(&self) -> Option<String> {
1194 self.current_graph.read().clone()
1195 }
1196
1197 pub fn set_current_graph(&self, name: Option<&str>) {
1202 *self.current_graph.write() = name.map(ToString::to_string);
1203 }
1204
1205 #[must_use]
1210 pub fn current_schema(&self) -> Option<String> {
1211 self.current_schema.read().clone()
1212 }
1213
1214 pub fn set_current_schema(&self, name: Option<&str>) {
1219 *self.current_schema.write() = name.map(ToString::to_string);
1220 }
1221
1222 #[must_use]
1224 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1225 &self.config.adaptive
1226 }
1227
1228 #[must_use]
1230 pub fn is_read_only(&self) -> bool {
1231 self.read_only
1232 }
1233
1234 #[must_use]
1236 pub fn config(&self) -> &Config {
1237 &self.config
1238 }
1239
1240 #[must_use]
1242 pub fn graph_model(&self) -> crate::config::GraphModel {
1243 self.config.graph_model
1244 }
1245
1246 #[must_use]
1248 pub fn memory_limit(&self) -> Option<usize> {
1249 self.config.memory_limit
1250 }
1251
1252 #[cfg(feature = "metrics")]
1257 #[must_use]
1258 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1259 let mut snapshot = self
1260 .metrics
1261 .as_ref()
1262 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1263
1264 let cache_stats = self.query_cache.stats();
1266 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1267 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1268 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1269 snapshot.cache_invalidations = cache_stats.invalidations;
1270
1271 snapshot
1272 }
1273
1274 #[cfg(feature = "metrics")]
1278 #[must_use]
1279 pub fn metrics_prometheus(&self) -> String {
1280 self.metrics
1281 .as_ref()
1282 .map_or_else(String::new, |m| m.to_prometheus())
1283 }
1284
1285 #[cfg(feature = "metrics")]
1287 pub fn reset_metrics(&self) {
1288 if let Some(ref m) = self.metrics {
1289 m.reset();
1290 }
1291 self.query_cache.reset_stats();
1292 }
1293
1294 #[must_use]
1302 pub fn store(&self) -> &Arc<LpgStore> {
1303 self.lpg_store()
1304 }
1305
1306 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
1313 let store = self.lpg_store();
1314 let graph_name = self.current_graph.read().clone();
1315 match graph_name {
1316 None => Arc::clone(store),
1317 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(store),
1318 Some(ref name) => store.graph(name).unwrap_or_else(|| Arc::clone(store)),
1319 }
1320 }
1321
1322 pub fn create_graph(&self, name: &str) -> Result<bool> {
1330 Ok(self.lpg_store().create_graph(name)?)
1331 }
1332
1333 pub fn drop_graph(&self, name: &str) -> bool {
1335 self.lpg_store().drop_graph(name)
1336 }
1337
1338 #[must_use]
1340 pub fn list_graphs(&self) -> Vec<String> {
1341 self.lpg_store().graph_names()
1342 }
1343
1344 #[must_use]
1353 pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1354 if let Some(ref ext_read) = self.external_read_store {
1355 Arc::clone(ext_read)
1356 } else {
1357 Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1358 }
1359 }
1360
1361 #[must_use]
1366 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1367 if self.external_read_store.is_some() {
1368 self.external_write_store.as_ref().map(Arc::clone)
1369 } else {
1370 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1371 }
1372 }
1373
1374 pub fn gc(&self) {
1380 let min_epoch = self.transaction_manager.min_active_epoch();
1381 self.lpg_store().gc_versions(min_epoch);
1382 self.transaction_manager.gc();
1383 }
1384
1385 #[must_use]
1387 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1388 &self.buffer_manager
1389 }
1390
1391 #[must_use]
1393 pub fn query_cache(&self) -> &Arc<QueryCache> {
1394 &self.query_cache
1395 }
1396
1397 pub fn clear_plan_cache(&self) {
1403 self.query_cache.clear();
1404 }
1405
1406 pub fn close(&self) -> Result<()> {
1420 let mut is_open = self.is_open.write();
1421 if !*is_open {
1422 return Ok(());
1423 }
1424
1425 if self.read_only {
1427 #[cfg(feature = "grafeo-file")]
1428 if let Some(ref fm) = self.file_manager {
1429 fm.close()?;
1430 }
1431 *is_open = false;
1432 return Ok(());
1433 }
1434
1435 #[cfg(feature = "grafeo-file")]
1439 let is_single_file = self.file_manager.is_some();
1440 #[cfg(not(feature = "grafeo-file"))]
1441 let is_single_file = false;
1442
1443 #[cfg(feature = "grafeo-file")]
1444 if let Some(ref fm) = self.file_manager {
1445 #[cfg(feature = "wal")]
1447 if let Some(ref wal) = self.wal {
1448 wal.sync()?;
1449 }
1450 self.checkpoint_to_file(fm)?;
1451
1452 #[cfg(feature = "wal")]
1455 if let Some(ref wal) = self.wal {
1456 wal.close_active_log();
1457 }
1458
1459 {
1460 use grafeo_core::testing::crash::maybe_crash;
1461 maybe_crash("close:before_remove_sidecar_wal");
1462 }
1463 fm.remove_sidecar_wal()?;
1464 fm.close()?;
1465 }
1466
1467 #[cfg(feature = "wal")]
1473 if !is_single_file && let Some(ref wal) = self.wal {
1474 let commit_tx = self
1476 .transaction_manager
1477 .last_assigned_transaction_id()
1478 .unwrap_or_else(|| self.transaction_manager.begin());
1479
1480 wal.log(&WalRecord::TransactionCommit {
1482 transaction_id: commit_tx,
1483 })?;
1484
1485 wal.sync()?;
1486 }
1487
1488 *is_open = false;
1489 Ok(())
1490 }
1491
1492 #[cfg(feature = "wal")]
1494 #[must_use]
1495 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1496 self.wal.as_ref()
1497 }
1498
1499 #[cfg(feature = "wal")]
1501 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1502 if let Some(ref wal) = self.wal {
1503 wal.log(record)?;
1504 }
1505 Ok(())
1506 }
1507
1508 #[cfg(feature = "grafeo-file")]
1514 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1515 use grafeo_core::testing::crash::maybe_crash;
1516
1517 maybe_crash("checkpoint_to_file:before_export");
1518 let snapshot_data = self.export_snapshot()?;
1519 maybe_crash("checkpoint_to_file:after_export");
1520
1521 let epoch = self.lpg_store().current_epoch();
1522 let transaction_id = self
1523 .transaction_manager
1524 .last_assigned_transaction_id()
1525 .map_or(0, |t| t.0);
1526 let node_count = self.lpg_store().node_count() as u64;
1527 let edge_count = self.lpg_store().edge_count() as u64;
1528
1529 fm.write_snapshot(
1530 &snapshot_data,
1531 epoch.0,
1532 transaction_id,
1533 node_count,
1534 edge_count,
1535 )?;
1536
1537 maybe_crash("checkpoint_to_file:after_write_snapshot");
1538 Ok(())
1539 }
1540
1541 #[cfg(feature = "grafeo-file")]
1543 #[must_use]
1544 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1545 self.file_manager.as_ref()
1546 }
1547}
1548
1549impl Drop for GrafeoDB {
1550 fn drop(&mut self) {
1551 if let Err(e) = self.close() {
1552 grafeo_error!("Error closing database: {}", e);
1553 }
1554 }
1555}
1556
1557impl crate::admin::AdminService for GrafeoDB {
1558 fn info(&self) -> crate::admin::DatabaseInfo {
1559 self.info()
1560 }
1561
1562 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1563 self.detailed_stats()
1564 }
1565
1566 fn schema(&self) -> crate::admin::SchemaInfo {
1567 self.schema()
1568 }
1569
1570 fn validate(&self) -> crate::admin::ValidationResult {
1571 self.validate()
1572 }
1573
1574 fn wal_status(&self) -> crate::admin::WalStatus {
1575 self.wal_status()
1576 }
1577
1578 fn wal_checkpoint(&self) -> Result<()> {
1579 self.wal_checkpoint()
1580 }
1581}
1582
1583#[derive(Debug)]
1613pub struct QueryResult {
1614 pub columns: Vec<String>,
1616 pub column_types: Vec<grafeo_common::types::LogicalType>,
1618 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1620 pub execution_time_ms: Option<f64>,
1622 pub rows_scanned: Option<u64>,
1624 pub status_message: Option<String>,
1626 pub gql_status: grafeo_common::utils::GqlStatus,
1628}
1629
1630impl QueryResult {
1631 #[must_use]
1633 pub fn empty() -> Self {
1634 Self {
1635 columns: Vec::new(),
1636 column_types: Vec::new(),
1637 rows: Vec::new(),
1638 execution_time_ms: None,
1639 rows_scanned: None,
1640 status_message: None,
1641 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1642 }
1643 }
1644
1645 #[must_use]
1647 pub fn status(msg: impl Into<String>) -> Self {
1648 Self {
1649 columns: Vec::new(),
1650 column_types: Vec::new(),
1651 rows: Vec::new(),
1652 execution_time_ms: None,
1653 rows_scanned: None,
1654 status_message: Some(msg.into()),
1655 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1656 }
1657 }
1658
1659 #[must_use]
1661 pub fn new(columns: Vec<String>) -> Self {
1662 let len = columns.len();
1663 Self {
1664 columns,
1665 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1666 rows: Vec::new(),
1667 execution_time_ms: None,
1668 rows_scanned: None,
1669 status_message: None,
1670 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1671 }
1672 }
1673
1674 #[must_use]
1676 pub fn with_types(
1677 columns: Vec<String>,
1678 column_types: Vec<grafeo_common::types::LogicalType>,
1679 ) -> Self {
1680 Self {
1681 columns,
1682 column_types,
1683 rows: Vec::new(),
1684 execution_time_ms: None,
1685 rows_scanned: None,
1686 status_message: None,
1687 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1688 }
1689 }
1690
1691 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1693 self.execution_time_ms = Some(execution_time_ms);
1694 self.rows_scanned = Some(rows_scanned);
1695 self
1696 }
1697
1698 #[must_use]
1700 pub fn execution_time_ms(&self) -> Option<f64> {
1701 self.execution_time_ms
1702 }
1703
1704 #[must_use]
1706 pub fn rows_scanned(&self) -> Option<u64> {
1707 self.rows_scanned
1708 }
1709
1710 #[must_use]
1712 pub fn row_count(&self) -> usize {
1713 self.rows.len()
1714 }
1715
1716 #[must_use]
1718 pub fn column_count(&self) -> usize {
1719 self.columns.len()
1720 }
1721
1722 #[must_use]
1724 pub fn is_empty(&self) -> bool {
1725 self.rows.is_empty()
1726 }
1727
1728 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1737 if self.rows.len() != 1 || self.columns.len() != 1 {
1738 return Err(grafeo_common::utils::error::Error::InvalidValue(
1739 "Expected single value".to_string(),
1740 ));
1741 }
1742 T::from_value(&self.rows[0][0])
1743 }
1744
1745 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1747 self.rows.iter()
1748 }
1749}
1750
1751impl std::fmt::Display for QueryResult {
1752 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1753 let table = grafeo_common::fmt::format_result_table(
1754 &self.columns,
1755 &self.rows,
1756 self.execution_time_ms,
1757 self.status_message.as_deref(),
1758 );
1759 f.write_str(&table)
1760 }
1761}
1762
1763pub trait FromValue: Sized {
1768 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1770}
1771
1772impl FromValue for i64 {
1773 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1774 value
1775 .as_int64()
1776 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1777 expected: "INT64".to_string(),
1778 found: value.type_name().to_string(),
1779 })
1780 }
1781}
1782
1783impl FromValue for f64 {
1784 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1785 value
1786 .as_float64()
1787 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1788 expected: "FLOAT64".to_string(),
1789 found: value.type_name().to_string(),
1790 })
1791 }
1792}
1793
1794impl FromValue for String {
1795 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1796 value.as_str().map(String::from).ok_or_else(|| {
1797 grafeo_common::utils::error::Error::TypeMismatch {
1798 expected: "STRING".to_string(),
1799 found: value.type_name().to_string(),
1800 }
1801 })
1802 }
1803}
1804
1805impl FromValue for bool {
1806 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1807 value
1808 .as_bool()
1809 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1810 expected: "BOOL".to_string(),
1811 found: value.type_name().to_string(),
1812 })
1813 }
1814}
1815
1816#[cfg(test)]
1817mod tests {
1818 use super::*;
1819
1820 #[test]
1821 fn test_create_in_memory_database() {
1822 let db = GrafeoDB::new_in_memory();
1823 assert_eq!(db.node_count(), 0);
1824 assert_eq!(db.edge_count(), 0);
1825 }
1826
1827 #[test]
1828 fn test_database_config() {
1829 let config = Config::in_memory().with_threads(4).with_query_logging();
1830
1831 let db = GrafeoDB::with_config(config).unwrap();
1832 assert_eq!(db.config().threads, 4);
1833 assert!(db.config().query_logging);
1834 }
1835
1836 #[test]
1837 fn test_database_session() {
1838 let db = GrafeoDB::new_in_memory();
1839 let _session = db.session();
1840 }
1842
1843 #[cfg(feature = "wal")]
1844 #[test]
1845 fn test_persistent_database_recovery() {
1846 use grafeo_common::types::Value;
1847 use tempfile::tempdir;
1848
1849 let dir = tempdir().unwrap();
1850 let db_path = dir.path().join("test_db");
1851
1852 {
1854 let db = GrafeoDB::open(&db_path).unwrap();
1855
1856 let alix = db.create_node(&["Person"]);
1857 db.set_node_property(alix, "name", Value::from("Alix"));
1858
1859 let gus = db.create_node(&["Person"]);
1860 db.set_node_property(gus, "name", Value::from("Gus"));
1861
1862 let _edge = db.create_edge(alix, gus, "KNOWS");
1863
1864 db.close().unwrap();
1866 }
1867
1868 {
1870 let db = GrafeoDB::open(&db_path).unwrap();
1871
1872 assert_eq!(db.node_count(), 2);
1873 assert_eq!(db.edge_count(), 1);
1874
1875 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1877 assert!(node0.is_some());
1878
1879 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1880 assert!(node1.is_some());
1881 }
1882 }
1883
1884 #[cfg(feature = "wal")]
1885 #[test]
1886 fn test_wal_logging() {
1887 use tempfile::tempdir;
1888
1889 let dir = tempdir().unwrap();
1890 let db_path = dir.path().join("wal_test_db");
1891
1892 let db = GrafeoDB::open(&db_path).unwrap();
1893
1894 let node = db.create_node(&["Test"]);
1896 db.delete_node(node);
1897
1898 if let Some(wal) = db.wal() {
1900 assert!(wal.record_count() > 0);
1901 }
1902
1903 db.close().unwrap();
1904 }
1905
1906 #[cfg(feature = "wal")]
1907 #[test]
1908 fn test_wal_recovery_multiple_sessions() {
1909 use grafeo_common::types::Value;
1911 use tempfile::tempdir;
1912
1913 let dir = tempdir().unwrap();
1914 let db_path = dir.path().join("multi_session_db");
1915
1916 {
1918 let db = GrafeoDB::open(&db_path).unwrap();
1919 let alix = db.create_node(&["Person"]);
1920 db.set_node_property(alix, "name", Value::from("Alix"));
1921 db.close().unwrap();
1922 }
1923
1924 {
1926 let db = GrafeoDB::open(&db_path).unwrap();
1927 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1929 db.set_node_property(gus, "name", Value::from("Gus"));
1930 db.close().unwrap();
1931 }
1932
1933 {
1935 let db = GrafeoDB::open(&db_path).unwrap();
1936 assert_eq!(db.node_count(), 2);
1937
1938 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1940 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1941
1942 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1943 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1944 }
1945 }
1946
1947 #[cfg(feature = "wal")]
1948 #[test]
1949 fn test_database_consistency_after_mutations() {
1950 use grafeo_common::types::Value;
1952 use tempfile::tempdir;
1953
1954 let dir = tempdir().unwrap();
1955 let db_path = dir.path().join("consistency_db");
1956
1957 {
1958 let db = GrafeoDB::open(&db_path).unwrap();
1959
1960 let a = db.create_node(&["Node"]);
1962 let b = db.create_node(&["Node"]);
1963 let c = db.create_node(&["Node"]);
1964
1965 let e1 = db.create_edge(a, b, "LINKS");
1967 let _e2 = db.create_edge(b, c, "LINKS");
1968
1969 db.delete_edge(e1);
1971 db.delete_node(b);
1972
1973 db.set_node_property(a, "value", Value::Int64(1));
1975 db.set_node_property(c, "value", Value::Int64(3));
1976
1977 db.close().unwrap();
1978 }
1979
1980 {
1982 let db = GrafeoDB::open(&db_path).unwrap();
1983
1984 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1988 assert!(node_a.is_some());
1989
1990 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1991 assert!(node_c.is_some());
1992
1993 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1995 assert!(node_b.is_none());
1996 }
1997 }
1998
1999 #[cfg(feature = "wal")]
2000 #[test]
2001 fn test_close_is_idempotent() {
2002 use tempfile::tempdir;
2004
2005 let dir = tempdir().unwrap();
2006 let db_path = dir.path().join("close_test_db");
2007
2008 let db = GrafeoDB::open(&db_path).unwrap();
2009 db.create_node(&["Test"]);
2010
2011 assert!(db.close().is_ok());
2013
2014 assert!(db.close().is_ok());
2016 }
2017
2018 #[test]
2019 fn test_with_store_external_backend() {
2020 use grafeo_core::graph::lpg::LpgStore;
2021
2022 let external = Arc::new(LpgStore::new().unwrap());
2023
2024 let n1 = external.create_node(&["Person"]);
2026 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2027
2028 let db = GrafeoDB::with_store(
2029 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2030 Config::in_memory(),
2031 )
2032 .unwrap();
2033
2034 let session = db.session();
2035
2036 #[cfg(feature = "gql")]
2038 {
2039 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2040 assert_eq!(result.rows.len(), 1);
2041 }
2042 }
2043
2044 #[test]
2045 fn test_with_config_custom_memory_limit() {
2046 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
2049 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2050 assert_eq!(db.node_count(), 0);
2051 }
2052
2053 #[cfg(feature = "metrics")]
2054 #[test]
2055 fn test_database_metrics_registry() {
2056 let db = GrafeoDB::new_in_memory();
2057
2058 db.create_node(&["Person"]);
2060 db.create_node(&["Person"]);
2061
2062 let snap = db.metrics();
2064 assert_eq!(snap.query_count, 0); }
2067
2068 #[test]
2069 fn test_query_result_has_metrics() {
2070 let db = GrafeoDB::new_in_memory();
2072 db.create_node(&["Person"]);
2073 db.create_node(&["Person"]);
2074
2075 #[cfg(feature = "gql")]
2076 {
2077 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2078
2079 assert!(result.execution_time_ms.is_some());
2081 assert!(result.rows_scanned.is_some());
2082 assert!(result.execution_time_ms.unwrap() >= 0.0);
2083 assert_eq!(result.rows_scanned.unwrap(), 2);
2084 }
2085 }
2086
2087 #[test]
2088 fn test_empty_query_result_metrics() {
2089 let db = GrafeoDB::new_in_memory();
2091 db.create_node(&["Person"]);
2092
2093 #[cfg(feature = "gql")]
2094 {
2095 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2097
2098 assert!(result.execution_time_ms.is_some());
2099 assert!(result.rows_scanned.is_some());
2100 assert_eq!(result.rows_scanned.unwrap(), 0);
2101 }
2102 }
2103
2104 #[cfg(feature = "cdc")]
2105 mod cdc_integration {
2106 use super::*;
2107
2108 fn cdc_db() -> GrafeoDB {
2110 GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2111 }
2112
2113 #[test]
2114 fn test_node_lifecycle_history() {
2115 let db = cdc_db();
2116
2117 let id = db.create_node(&["Person"]);
2119 db.set_node_property(id, "name", "Alix".into());
2121 db.set_node_property(id, "name", "Gus".into());
2122 db.delete_node(id);
2124
2125 let history = db.history(id).unwrap();
2126 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2128 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2129 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2131 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2133 }
2134
2135 #[test]
2136 fn test_edge_lifecycle_history() {
2137 let db = cdc_db();
2138
2139 let alix = db.create_node(&["Person"]);
2140 let gus = db.create_node(&["Person"]);
2141 let edge = db.create_edge(alix, gus, "KNOWS");
2142 db.set_edge_property(edge, "since", 2024i64.into());
2143 db.delete_edge(edge);
2144
2145 let history = db.history(edge).unwrap();
2146 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2148 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2149 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2150 }
2151
2152 #[test]
2153 fn test_create_node_with_props_cdc() {
2154 let db = cdc_db();
2155
2156 let id = db.create_node_with_props(
2157 &["Person"],
2158 vec![
2159 ("name", grafeo_common::types::Value::from("Alix")),
2160 ("age", grafeo_common::types::Value::from(30i64)),
2161 ],
2162 );
2163
2164 let history = db.history(id).unwrap();
2165 assert_eq!(history.len(), 1);
2166 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2167 let after = history[0].after.as_ref().unwrap();
2169 assert_eq!(after.len(), 2);
2170 }
2171
2172 #[test]
2173 fn test_changes_between() {
2174 let db = cdc_db();
2175
2176 let id1 = db.create_node(&["A"]);
2177 let _id2 = db.create_node(&["B"]);
2178 db.set_node_property(id1, "x", 1i64.into());
2179
2180 let changes = db
2182 .changes_between(
2183 grafeo_common::types::EpochId(0),
2184 grafeo_common::types::EpochId(u64::MAX),
2185 )
2186 .unwrap();
2187 assert_eq!(changes.len(), 3); }
2189
2190 #[test]
2191 fn test_cdc_disabled_by_default() {
2192 let db = GrafeoDB::new_in_memory();
2193 assert!(!db.is_cdc_enabled());
2194
2195 let id = db.create_node(&["Person"]);
2196 db.set_node_property(id, "name", "Alix".into());
2197
2198 let history = db.history(id).unwrap();
2199 assert!(history.is_empty(), "CDC off by default: no events recorded");
2200 }
2201
2202 #[test]
2203 fn test_session_with_cdc_override_on() {
2204 let db = GrafeoDB::new_in_memory();
2206 let session = db.session_with_cdc(true);
2207 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2208 let changes = db
2210 .changes_between(
2211 grafeo_common::types::EpochId(0),
2212 grafeo_common::types::EpochId(u64::MAX),
2213 )
2214 .unwrap();
2215 assert!(
2216 !changes.is_empty(),
2217 "session_with_cdc(true) should record events"
2218 );
2219 }
2220
2221 #[test]
2222 fn test_session_with_cdc_override_off() {
2223 let db = cdc_db();
2225 let session = db.session_with_cdc(false);
2226 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2227 let changes = db
2228 .changes_between(
2229 grafeo_common::types::EpochId(0),
2230 grafeo_common::types::EpochId(u64::MAX),
2231 )
2232 .unwrap();
2233 assert!(
2234 changes.is_empty(),
2235 "session_with_cdc(false) should not record events"
2236 );
2237 }
2238
2239 #[test]
2240 fn test_set_cdc_enabled_runtime() {
2241 let db = GrafeoDB::new_in_memory();
2242 assert!(!db.is_cdc_enabled());
2243
2244 db.set_cdc_enabled(true);
2246 assert!(db.is_cdc_enabled());
2247
2248 let id = db.create_node(&["Person"]);
2249 let history = db.history(id).unwrap();
2250 assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
2251
2252 db.set_cdc_enabled(false);
2254 let id2 = db.create_node(&["Person"]);
2255 let history2 = db.history(id2).unwrap();
2256 assert!(
2257 history2.is_empty(),
2258 "CDC disabled at runtime stops recording"
2259 );
2260 }
2261 }
2262
2263 #[test]
2264 fn test_with_store_basic() {
2265 use grafeo_core::graph::lpg::LpgStore;
2266
2267 let store = Arc::new(LpgStore::new().unwrap());
2268 let n1 = store.create_node(&["Person"]);
2269 store.set_node_property(n1, "name", "Alix".into());
2270
2271 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2272 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2273
2274 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
2275 assert_eq!(result.rows.len(), 1);
2276 }
2277
2278 #[test]
2279 fn test_with_store_session() {
2280 use grafeo_core::graph::lpg::LpgStore;
2281
2282 let store = Arc::new(LpgStore::new().unwrap());
2283 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2284 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2285
2286 let session = db.session();
2287 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
2288 assert_eq!(result.rows.len(), 1);
2289 }
2290
2291 #[test]
2292 fn test_with_store_mutations() {
2293 use grafeo_core::graph::lpg::LpgStore;
2294
2295 let store = Arc::new(LpgStore::new().unwrap());
2296 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2297 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2298
2299 let mut session = db.session();
2300
2301 session.begin_transaction().unwrap();
2305 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2306
2307 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2308 assert_eq!(result.rows.len(), 1);
2309
2310 session.commit().unwrap();
2311 }
2312}