1mod admin;
15#[cfg(feature = "async-storage")]
16mod async_ops;
17#[cfg(feature = "async-storage")]
18pub(crate) mod async_wal_store;
19#[cfg(feature = "cdc")]
20pub(crate) mod cdc_store;
21mod crud;
22#[cfg(feature = "embed")]
23mod embed;
24mod import;
25mod index;
26mod persistence;
27mod query;
28#[cfg(feature = "rdf")]
29mod rdf_ops;
30mod search;
31#[cfg(feature = "wal")]
32pub(crate) mod wal_store;
33
34use grafeo_common::grafeo_error;
35#[cfg(feature = "wal")]
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::AtomicUsize;
39
40use parking_lot::RwLock;
41
42#[cfg(feature = "grafeo-file")]
43use grafeo_adapters::storage::file::GrafeoFileManager;
44#[cfg(feature = "wal")]
45use grafeo_adapters::storage::wal::{
46 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
47};
48use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
49use grafeo_common::utils::error::Result;
50use grafeo_core::graph::lpg::LpgStore;
51#[cfg(feature = "rdf")]
52use grafeo_core::graph::rdf::RdfStore;
53use grafeo_core::graph::{GraphStore, GraphStoreMut};
54
55use crate::catalog::Catalog;
56use crate::config::Config;
57use crate::query::cache::QueryCache;
58use crate::session::Session;
59use crate::transaction::TransactionManager;
60
61pub struct GrafeoDB {
84 pub(super) config: Config,
86 pub(super) store: Option<Arc<LpgStore>>,
88 pub(super) catalog: Arc<Catalog>,
90 #[cfg(feature = "rdf")]
92 pub(super) rdf_store: Arc<RdfStore>,
93 pub(super) transaction_manager: Arc<TransactionManager>,
95 pub(super) buffer_manager: Arc<BufferManager>,
97 #[cfg(feature = "wal")]
99 pub(super) wal: Option<Arc<LpgWal>>,
100 #[cfg(feature = "wal")]
104 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
105 pub(super) query_cache: Arc<QueryCache>,
107 pub(super) commit_counter: Arc<AtomicUsize>,
109 pub(super) is_open: RwLock<bool>,
111 #[cfg(feature = "cdc")]
113 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
114 #[cfg(feature = "cdc")]
116 cdc_enabled: std::sync::atomic::AtomicBool,
117 #[cfg(feature = "embed")]
119 pub(super) embedding_models:
120 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
121 #[cfg(feature = "grafeo-file")]
123 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
124 pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
127 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
130 #[cfg(feature = "metrics")]
132 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
133 current_graph: RwLock<Option<String>>,
137 current_schema: RwLock<Option<String>>,
141 read_only: bool,
144}
145
146impl GrafeoDB {
147 fn lpg_store(&self) -> &Arc<LpgStore> {
155 self.store.as_ref().expect(
156 "no built-in LpgStore: this GrafeoDB was created with an external store \
157 (with_store / with_read_store). Use session() or graph_store() instead.",
158 )
159 }
160
161 #[cfg(feature = "cdc")]
163 #[inline]
164 pub(super) fn cdc_active(&self) -> bool {
165 self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
166 }
167
168 #[must_use]
189 pub fn new_in_memory() -> Self {
190 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
191 }
192
193 #[cfg(feature = "wal")]
212 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
213 Self::with_config(Config::persistent(path.as_ref()))
214 }
215
216 #[cfg(feature = "grafeo-file")]
241 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
242 Self::with_config(Config::read_only(path.as_ref()))
243 }
244
245 pub fn with_config(config: Config) -> Result<Self> {
269 config
271 .validate()
272 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
273
274 let store = Arc::new(LpgStore::new()?);
275 #[cfg(feature = "rdf")]
276 let rdf_store = Arc::new(RdfStore::new());
277 let transaction_manager = Arc::new(TransactionManager::new());
278
279 let buffer_config = BufferManagerConfig {
281 budget: config.memory_limit.unwrap_or_else(|| {
282 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
283 }),
284 spill_path: config
285 .spill_path
286 .clone()
287 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
288 ..BufferManagerConfig::default()
289 };
290 let buffer_manager = BufferManager::new(buffer_config);
291
292 let catalog = Arc::new(Catalog::new());
294
295 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
296
297 #[cfg(feature = "grafeo-file")]
299 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
300 if let Some(ref db_path) = config.path {
302 if db_path.exists() && db_path.is_file() {
303 let fm = GrafeoFileManager::open_read_only(db_path)?;
304 let snapshot_data = fm.read_snapshot()?;
305 if !snapshot_data.is_empty() {
306 Self::apply_snapshot_data(
307 &store,
308 &catalog,
309 #[cfg(feature = "rdf")]
310 &rdf_store,
311 &snapshot_data,
312 )?;
313 }
314 Some(Arc::new(fm))
315 } else {
316 return Err(grafeo_common::utils::error::Error::Internal(format!(
317 "read-only open requires an existing .grafeo file: {}",
318 db_path.display()
319 )));
320 }
321 } else {
322 return Err(grafeo_common::utils::error::Error::Internal(
323 "read-only mode requires a database path".to_string(),
324 ));
325 }
326 } else if let Some(ref db_path) = config.path {
327 if Self::should_use_single_file(db_path, config.storage_format) {
332 let fm = if db_path.exists() && db_path.is_file() {
333 GrafeoFileManager::open(db_path)?
334 } else if !db_path.exists() {
335 GrafeoFileManager::create(db_path)?
336 } else {
337 return Err(grafeo_common::utils::error::Error::Internal(format!(
339 "path exists but is not a file: {}",
340 db_path.display()
341 )));
342 };
343
344 let snapshot_data = fm.read_snapshot()?;
346 if !snapshot_data.is_empty() {
347 Self::apply_snapshot_data(
348 &store,
349 &catalog,
350 #[cfg(feature = "rdf")]
351 &rdf_store,
352 &snapshot_data,
353 )?;
354 }
355
356 #[cfg(feature = "wal")]
358 if config.wal_enabled && fm.has_sidecar_wal() {
359 let recovery = WalRecovery::new(fm.sidecar_wal_path());
360 let records = recovery.recover()?;
361 Self::apply_wal_records(
362 &store,
363 &catalog,
364 #[cfg(feature = "rdf")]
365 &rdf_store,
366 &records,
367 )?;
368 }
369
370 Some(Arc::new(fm))
371 } else {
372 None
373 }
374 } else {
375 None
376 };
377
378 #[cfg(feature = "wal")]
381 let wal = if is_read_only {
382 None
383 } else if config.wal_enabled {
384 if let Some(ref db_path) = config.path {
385 #[cfg(feature = "grafeo-file")]
387 let wal_path = if let Some(ref fm) = file_manager {
388 let p = fm.sidecar_wal_path();
389 std::fs::create_dir_all(&p)?;
390 p
391 } else {
392 std::fs::create_dir_all(db_path)?;
394 db_path.join("wal")
395 };
396
397 #[cfg(not(feature = "grafeo-file"))]
398 let wal_path = {
399 std::fs::create_dir_all(db_path)?;
400 db_path.join("wal")
401 };
402
403 #[cfg(feature = "grafeo-file")]
405 let is_single_file = file_manager.is_some();
406 #[cfg(not(feature = "grafeo-file"))]
407 let is_single_file = false;
408
409 if !is_single_file && wal_path.exists() {
410 let recovery = WalRecovery::new(&wal_path);
411 let records = recovery.recover()?;
412 Self::apply_wal_records(
413 &store,
414 &catalog,
415 #[cfg(feature = "rdf")]
416 &rdf_store,
417 &records,
418 )?;
419 }
420
421 let wal_durability = match config.wal_durability {
423 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
424 crate::config::DurabilityMode::Batch {
425 max_delay_ms,
426 max_records,
427 } => WalDurabilityMode::Batch {
428 max_delay_ms,
429 max_records,
430 },
431 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
432 WalDurabilityMode::Adaptive { target_interval_ms }
433 }
434 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
435 };
436 let wal_config = WalConfig {
437 durability: wal_durability,
438 ..WalConfig::default()
439 };
440 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
441 Some(Arc::new(wal_manager))
442 } else {
443 None
444 }
445 } else {
446 None
447 };
448
449 let query_cache = Arc::new(QueryCache::default());
451
452 #[cfg(feature = "temporal")]
455 transaction_manager.sync_epoch(store.current_epoch());
456
457 #[cfg(feature = "cdc")]
458 let cdc_enabled_val = config.cdc_enabled;
459
460 Ok(Self {
461 config,
462 store: Some(store),
463 catalog,
464 #[cfg(feature = "rdf")]
465 rdf_store,
466 transaction_manager,
467 buffer_manager,
468 #[cfg(feature = "wal")]
469 wal,
470 #[cfg(feature = "wal")]
471 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
472 query_cache,
473 commit_counter: Arc::new(AtomicUsize::new(0)),
474 is_open: RwLock::new(true),
475 #[cfg(feature = "cdc")]
476 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
477 #[cfg(feature = "cdc")]
478 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
479 #[cfg(feature = "embed")]
480 embedding_models: RwLock::new(hashbrown::HashMap::new()),
481 #[cfg(feature = "grafeo-file")]
482 file_manager,
483 external_read_store: None,
484 external_write_store: None,
485 #[cfg(feature = "metrics")]
486 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
487 current_graph: RwLock::new(None),
488 current_schema: RwLock::new(None),
489 read_only: is_read_only,
490 })
491 }
492
493 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
518 config
519 .validate()
520 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
521
522 let transaction_manager = Arc::new(TransactionManager::new());
523
524 let buffer_config = BufferManagerConfig {
525 budget: config.memory_limit.unwrap_or_else(|| {
526 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
527 }),
528 spill_path: None,
529 ..BufferManagerConfig::default()
530 };
531 let buffer_manager = BufferManager::new(buffer_config);
532
533 let query_cache = Arc::new(QueryCache::default());
534
535 #[cfg(feature = "cdc")]
536 let cdc_enabled_val = config.cdc_enabled;
537
538 Ok(Self {
539 config,
540 store: None,
541 catalog: Arc::new(Catalog::new()),
542 #[cfg(feature = "rdf")]
543 rdf_store: Arc::new(RdfStore::new()),
544 transaction_manager,
545 buffer_manager,
546 #[cfg(feature = "wal")]
547 wal: None,
548 #[cfg(feature = "wal")]
549 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
550 query_cache,
551 commit_counter: Arc::new(AtomicUsize::new(0)),
552 is_open: RwLock::new(true),
553 #[cfg(feature = "cdc")]
554 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
555 #[cfg(feature = "cdc")]
556 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
557 #[cfg(feature = "embed")]
558 embedding_models: RwLock::new(hashbrown::HashMap::new()),
559 #[cfg(feature = "grafeo-file")]
560 file_manager: None,
561 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
562 external_write_store: Some(store),
563 #[cfg(feature = "metrics")]
564 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
565 current_graph: RwLock::new(None),
566 current_schema: RwLock::new(None),
567 read_only: false,
568 })
569 }
570
571 pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
592 config
593 .validate()
594 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
595
596 let transaction_manager = Arc::new(TransactionManager::new());
597
598 let buffer_config = BufferManagerConfig {
599 budget: config.memory_limit.unwrap_or_else(|| {
600 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
601 }),
602 spill_path: None,
603 ..BufferManagerConfig::default()
604 };
605 let buffer_manager = BufferManager::new(buffer_config);
606
607 let query_cache = Arc::new(QueryCache::default());
608
609 #[cfg(feature = "cdc")]
610 let cdc_enabled_val = config.cdc_enabled;
611
612 Ok(Self {
613 config,
614 store: None,
615 catalog: Arc::new(Catalog::new()),
616 #[cfg(feature = "rdf")]
617 rdf_store: Arc::new(RdfStore::new()),
618 transaction_manager,
619 buffer_manager,
620 #[cfg(feature = "wal")]
621 wal: None,
622 #[cfg(feature = "wal")]
623 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
624 query_cache,
625 commit_counter: Arc::new(AtomicUsize::new(0)),
626 is_open: RwLock::new(true),
627 #[cfg(feature = "cdc")]
628 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
629 #[cfg(feature = "cdc")]
630 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
631 #[cfg(feature = "embed")]
632 embedding_models: RwLock::new(hashbrown::HashMap::new()),
633 #[cfg(feature = "grafeo-file")]
634 file_manager: None,
635 external_read_store: Some(store),
636 external_write_store: None,
637 #[cfg(feature = "metrics")]
638 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
639 current_graph: RwLock::new(None),
640 current_schema: RwLock::new(None),
641 read_only: true,
642 })
643 }
644
645 #[cfg(feature = "compact-store")]
663 pub fn compact(&mut self) -> Result<()> {
664 use grafeo_core::graph::compact::from_graph_store;
665
666 let current_store = self.graph_store();
667 let compact = from_graph_store(current_store.as_ref())
668 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
669
670 self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
671 self.external_write_store = None;
672 self.store = None;
673 self.read_only = true;
674 self.query_cache = Arc::new(QueryCache::default());
675
676 Ok(())
677 }
678
679 #[cfg(feature = "wal")]
685 fn apply_wal_records(
686 store: &Arc<LpgStore>,
687 catalog: &Catalog,
688 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
689 records: &[WalRecord],
690 ) -> Result<()> {
691 use crate::catalog::{
692 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
693 };
694 use grafeo_common::utils::error::Error;
695
696 let mut current_graph: Option<String> = None;
699 let mut target_store: Arc<LpgStore> = Arc::clone(store);
700
701 for record in records {
702 match record {
703 WalRecord::CreateNamedGraph { name } => {
705 let _ = store.create_graph(name);
706 }
707 WalRecord::DropNamedGraph { name } => {
708 store.drop_graph(name);
709 if current_graph.as_deref() == Some(name.as_str()) {
711 current_graph = None;
712 target_store = Arc::clone(store);
713 }
714 }
715 WalRecord::SwitchGraph { name } => {
716 current_graph.clone_from(name);
717 target_store = match ¤t_graph {
718 None => Arc::clone(store),
719 Some(graph_name) => store
720 .graph_or_create(graph_name)
721 .map_err(|e| Error::Internal(e.to_string()))?,
722 };
723 }
724
725 WalRecord::CreateNode { id, labels } => {
727 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
728 target_store.create_node_with_id(*id, &label_refs)?;
729 }
730 WalRecord::DeleteNode { id } => {
731 target_store.delete_node(*id);
732 }
733 WalRecord::CreateEdge {
734 id,
735 src,
736 dst,
737 edge_type,
738 } => {
739 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
740 }
741 WalRecord::DeleteEdge { id } => {
742 target_store.delete_edge(*id);
743 }
744 WalRecord::SetNodeProperty { id, key, value } => {
745 target_store.set_node_property(*id, key, value.clone());
746 }
747 WalRecord::SetEdgeProperty { id, key, value } => {
748 target_store.set_edge_property(*id, key, value.clone());
749 }
750 WalRecord::AddNodeLabel { id, label } => {
751 target_store.add_label(*id, label);
752 }
753 WalRecord::RemoveNodeLabel { id, label } => {
754 target_store.remove_label(*id, label);
755 }
756 WalRecord::RemoveNodeProperty { id, key } => {
757 target_store.remove_node_property(*id, key);
758 }
759 WalRecord::RemoveEdgeProperty { id, key } => {
760 target_store.remove_edge_property(*id, key);
761 }
762
763 WalRecord::CreateNodeType {
765 name,
766 properties,
767 constraints,
768 } => {
769 let def = NodeTypeDefinition {
770 name: name.clone(),
771 properties: properties
772 .iter()
773 .map(|(n, t, nullable)| TypedProperty {
774 name: n.clone(),
775 data_type: PropertyDataType::from_type_name(t),
776 nullable: *nullable,
777 default_value: None,
778 })
779 .collect(),
780 constraints: constraints
781 .iter()
782 .map(|(kind, props)| match kind.as_str() {
783 "unique" => TypeConstraint::Unique(props.clone()),
784 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
785 "not_null" if !props.is_empty() => {
786 TypeConstraint::NotNull(props[0].clone())
787 }
788 _ => TypeConstraint::Unique(props.clone()),
789 })
790 .collect(),
791 parent_types: Vec::new(),
792 };
793 let _ = catalog.register_node_type(def);
794 }
795 WalRecord::DropNodeType { name } => {
796 let _ = catalog.drop_node_type(name);
797 }
798 WalRecord::CreateEdgeType {
799 name,
800 properties,
801 constraints,
802 } => {
803 let def = EdgeTypeDefinition {
804 name: name.clone(),
805 properties: properties
806 .iter()
807 .map(|(n, t, nullable)| TypedProperty {
808 name: n.clone(),
809 data_type: PropertyDataType::from_type_name(t),
810 nullable: *nullable,
811 default_value: None,
812 })
813 .collect(),
814 constraints: constraints
815 .iter()
816 .map(|(kind, props)| match kind.as_str() {
817 "unique" => TypeConstraint::Unique(props.clone()),
818 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
819 "not_null" if !props.is_empty() => {
820 TypeConstraint::NotNull(props[0].clone())
821 }
822 _ => TypeConstraint::Unique(props.clone()),
823 })
824 .collect(),
825 source_node_types: Vec::new(),
826 target_node_types: Vec::new(),
827 };
828 let _ = catalog.register_edge_type_def(def);
829 }
830 WalRecord::DropEdgeType { name } => {
831 let _ = catalog.drop_edge_type_def(name);
832 }
833 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
834 }
837 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
838 }
841 WalRecord::CreateGraphType {
842 name,
843 node_types,
844 edge_types,
845 open,
846 } => {
847 use crate::catalog::GraphTypeDefinition;
848 let def = GraphTypeDefinition {
849 name: name.clone(),
850 allowed_node_types: node_types.clone(),
851 allowed_edge_types: edge_types.clone(),
852 open: *open,
853 };
854 let _ = catalog.register_graph_type(def);
855 }
856 WalRecord::DropGraphType { name } => {
857 let _ = catalog.drop_graph_type(name);
858 }
859 WalRecord::CreateSchema { name } => {
860 let _ = catalog.register_schema_namespace(name.clone());
861 }
862 WalRecord::DropSchema { name } => {
863 let _ = catalog.drop_schema_namespace(name);
864 }
865
866 WalRecord::AlterNodeType { name, alterations } => {
867 for (action, prop_name, type_name, nullable) in alterations {
868 match action.as_str() {
869 "add" => {
870 let prop = TypedProperty {
871 name: prop_name.clone(),
872 data_type: PropertyDataType::from_type_name(type_name),
873 nullable: *nullable,
874 default_value: None,
875 };
876 let _ = catalog.alter_node_type_add_property(name, prop);
877 }
878 "drop" => {
879 let _ = catalog.alter_node_type_drop_property(name, prop_name);
880 }
881 _ => {}
882 }
883 }
884 }
885 WalRecord::AlterEdgeType { name, alterations } => {
886 for (action, prop_name, type_name, nullable) in alterations {
887 match action.as_str() {
888 "add" => {
889 let prop = TypedProperty {
890 name: prop_name.clone(),
891 data_type: PropertyDataType::from_type_name(type_name),
892 nullable: *nullable,
893 default_value: None,
894 };
895 let _ = catalog.alter_edge_type_add_property(name, prop);
896 }
897 "drop" => {
898 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
899 }
900 _ => {}
901 }
902 }
903 }
904 WalRecord::AlterGraphType { name, alterations } => {
905 for (action, type_name) in alterations {
906 match action.as_str() {
907 "add_node" => {
908 let _ =
909 catalog.alter_graph_type_add_node_type(name, type_name.clone());
910 }
911 "drop_node" => {
912 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
913 }
914 "add_edge" => {
915 let _ =
916 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
917 }
918 "drop_edge" => {
919 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
920 }
921 _ => {}
922 }
923 }
924 }
925
926 WalRecord::CreateProcedure {
927 name,
928 params,
929 returns,
930 body,
931 } => {
932 use crate::catalog::ProcedureDefinition;
933 let def = ProcedureDefinition {
934 name: name.clone(),
935 params: params.clone(),
936 returns: returns.clone(),
937 body: body.clone(),
938 };
939 let _ = catalog.register_procedure(def);
940 }
941 WalRecord::DropProcedure { name } => {
942 let _ = catalog.drop_procedure(name);
943 }
944
945 #[cfg(feature = "rdf")]
947 WalRecord::InsertRdfTriple { .. }
948 | WalRecord::DeleteRdfTriple { .. }
949 | WalRecord::ClearRdfGraph { .. }
950 | WalRecord::CreateRdfGraph { .. }
951 | WalRecord::DropRdfGraph { .. } => {
952 rdf_ops::replay_rdf_wal_record(rdf_store, record);
953 }
954 #[cfg(not(feature = "rdf"))]
955 WalRecord::InsertRdfTriple { .. }
956 | WalRecord::DeleteRdfTriple { .. }
957 | WalRecord::ClearRdfGraph { .. }
958 | WalRecord::CreateRdfGraph { .. }
959 | WalRecord::DropRdfGraph { .. } => {}
960
961 WalRecord::TransactionCommit { .. } => {
962 #[cfg(feature = "temporal")]
966 {
967 target_store.new_epoch();
968 }
969 }
970 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
971 }
974 }
975 }
976 Ok(())
977 }
978
979 #[cfg(feature = "grafeo-file")]
985 fn should_use_single_file(
986 path: &std::path::Path,
987 configured: crate::config::StorageFormat,
988 ) -> bool {
989 use crate::config::StorageFormat;
990 match configured {
991 StorageFormat::SingleFile => true,
992 StorageFormat::WalDirectory => false,
993 StorageFormat::Auto => {
994 if path.is_file() {
996 if let Ok(mut f) = std::fs::File::open(path) {
997 use std::io::Read;
998 let mut magic = [0u8; 4];
999 if f.read_exact(&mut magic).is_ok()
1000 && magic == grafeo_adapters::storage::file::MAGIC
1001 {
1002 return true;
1003 }
1004 }
1005 return false;
1006 }
1007 if path.is_dir() {
1009 return false;
1010 }
1011 path.extension().is_some_and(|ext| ext == "grafeo")
1013 }
1014 }
1015 }
1016
1017 #[cfg(feature = "grafeo-file")]
1019 fn apply_snapshot_data(
1020 store: &Arc<LpgStore>,
1021 catalog: &Arc<crate::catalog::Catalog>,
1022 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
1023 data: &[u8],
1024 ) -> Result<()> {
1025 persistence::load_snapshot_into_store(
1026 store,
1027 catalog,
1028 #[cfg(feature = "rdf")]
1029 rdf_store,
1030 data,
1031 )
1032 }
1033
1034 #[must_use]
1062 pub fn session(&self) -> Session {
1063 self.create_session_inner(None)
1064 }
1065
1066 #[cfg(feature = "cdc")]
1085 #[must_use]
1086 pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1087 self.create_session_inner(Some(cdc_enabled))
1088 }
1089
1090 #[must_use]
1097 pub fn session_read_only(&self) -> Session {
1098 self.create_session_inner_opts(None, true)
1099 }
1100
1101 #[allow(unused_variables)] fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1107 self.create_session_inner_opts(cdc_override, false)
1108 }
1109
1110 #[allow(unused_variables)]
1112 fn create_session_inner_opts(
1113 &self,
1114 cdc_override: Option<bool>,
1115 force_read_only: bool,
1116 ) -> Session {
1117 let session_cfg = || crate::session::SessionConfig {
1118 transaction_manager: Arc::clone(&self.transaction_manager),
1119 query_cache: Arc::clone(&self.query_cache),
1120 catalog: Arc::clone(&self.catalog),
1121 adaptive_config: self.config.adaptive.clone(),
1122 factorized_execution: self.config.factorized_execution,
1123 graph_model: self.config.graph_model,
1124 query_timeout: self.config.query_timeout,
1125 commit_counter: Arc::clone(&self.commit_counter),
1126 gc_interval: self.config.gc_interval,
1127 read_only: self.read_only || force_read_only,
1128 };
1129
1130 if let Some(ref ext_read) = self.external_read_store {
1131 return Session::with_external_store(
1132 Arc::clone(ext_read),
1133 self.external_write_store.as_ref().map(Arc::clone),
1134 session_cfg(),
1135 )
1136 .expect("arena allocation for external store session");
1137 }
1138
1139 #[cfg(feature = "rdf")]
1140 let mut session = Session::with_rdf_store_and_adaptive(
1141 Arc::clone(self.lpg_store()),
1142 Arc::clone(&self.rdf_store),
1143 session_cfg(),
1144 );
1145 #[cfg(not(feature = "rdf"))]
1146 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1147
1148 #[cfg(feature = "wal")]
1149 if let Some(ref wal) = self.wal {
1150 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1151 }
1152
1153 #[cfg(feature = "cdc")]
1154 {
1155 let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1156 if should_enable {
1157 session.set_cdc_log(Arc::clone(&self.cdc_log));
1158 }
1159 }
1160
1161 #[cfg(feature = "metrics")]
1162 {
1163 if let Some(ref m) = self.metrics {
1164 session.set_metrics(Arc::clone(m));
1165 m.session_created
1166 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1167 m.session_active
1168 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1169 }
1170 }
1171
1172 if let Some(ref graph) = *self.current_graph.read() {
1174 session.use_graph(graph);
1175 }
1176
1177 if let Some(ref schema) = *self.current_schema.read() {
1179 session.set_schema(schema);
1180 }
1181
1182 let _ = &mut session;
1184
1185 session
1186 }
1187
1188 #[must_use]
1194 pub fn current_graph(&self) -> Option<String> {
1195 self.current_graph.read().clone()
1196 }
1197
1198 pub fn set_current_graph(&self, name: Option<&str>) {
1203 *self.current_graph.write() = name.map(ToString::to_string);
1204 }
1205
1206 #[must_use]
1211 pub fn current_schema(&self) -> Option<String> {
1212 self.current_schema.read().clone()
1213 }
1214
1215 pub fn set_current_schema(&self, name: Option<&str>) {
1220 *self.current_schema.write() = name.map(ToString::to_string);
1221 }
1222
1223 #[must_use]
1225 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1226 &self.config.adaptive
1227 }
1228
1229 #[must_use]
1231 pub fn is_read_only(&self) -> bool {
1232 self.read_only
1233 }
1234
1235 #[must_use]
1237 pub fn config(&self) -> &Config {
1238 &self.config
1239 }
1240
1241 #[must_use]
1243 pub fn graph_model(&self) -> crate::config::GraphModel {
1244 self.config.graph_model
1245 }
1246
1247 #[must_use]
1249 pub fn memory_limit(&self) -> Option<usize> {
1250 self.config.memory_limit
1251 }
1252
1253 #[cfg(feature = "metrics")]
1258 #[must_use]
1259 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1260 let mut snapshot = self
1261 .metrics
1262 .as_ref()
1263 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1264
1265 let cache_stats = self.query_cache.stats();
1267 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1268 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1269 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1270 snapshot.cache_invalidations = cache_stats.invalidations;
1271
1272 snapshot
1273 }
1274
1275 #[cfg(feature = "metrics")]
1279 #[must_use]
1280 pub fn metrics_prometheus(&self) -> String {
1281 self.metrics
1282 .as_ref()
1283 .map_or_else(String::new, |m| m.to_prometheus())
1284 }
1285
1286 #[cfg(feature = "metrics")]
1288 pub fn reset_metrics(&self) {
1289 if let Some(ref m) = self.metrics {
1290 m.reset();
1291 }
1292 self.query_cache.reset_stats();
1293 }
1294
1295 #[must_use]
1303 pub fn store(&self) -> &Arc<LpgStore> {
1304 self.lpg_store()
1305 }
1306
1307 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
1314 let store = self.lpg_store();
1315 let graph_name = self.current_graph.read().clone();
1316 match graph_name {
1317 None => Arc::clone(store),
1318 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(store),
1319 Some(ref name) => store.graph(name).unwrap_or_else(|| Arc::clone(store)),
1320 }
1321 }
1322
1323 pub fn create_graph(&self, name: &str) -> Result<bool> {
1331 Ok(self.lpg_store().create_graph(name)?)
1332 }
1333
1334 pub fn drop_graph(&self, name: &str) -> bool {
1336 self.lpg_store().drop_graph(name)
1337 }
1338
1339 #[must_use]
1341 pub fn list_graphs(&self) -> Vec<String> {
1342 self.lpg_store().graph_names()
1343 }
1344
1345 #[must_use]
1354 pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1355 if let Some(ref ext_read) = self.external_read_store {
1356 Arc::clone(ext_read)
1357 } else {
1358 Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1359 }
1360 }
1361
1362 #[must_use]
1367 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1368 if self.external_read_store.is_some() {
1369 self.external_write_store.as_ref().map(Arc::clone)
1370 } else {
1371 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1372 }
1373 }
1374
1375 pub fn gc(&self) {
1381 let min_epoch = self.transaction_manager.min_active_epoch();
1382 self.lpg_store().gc_versions(min_epoch);
1383 self.transaction_manager.gc();
1384 }
1385
1386 #[must_use]
1388 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1389 &self.buffer_manager
1390 }
1391
1392 #[must_use]
1394 pub fn query_cache(&self) -> &Arc<QueryCache> {
1395 &self.query_cache
1396 }
1397
1398 pub fn clear_plan_cache(&self) {
1404 self.query_cache.clear();
1405 }
1406
1407 pub fn close(&self) -> Result<()> {
1421 let mut is_open = self.is_open.write();
1422 if !*is_open {
1423 return Ok(());
1424 }
1425
1426 if self.read_only {
1428 #[cfg(feature = "grafeo-file")]
1429 if let Some(ref fm) = self.file_manager {
1430 fm.close()?;
1431 }
1432 *is_open = false;
1433 return Ok(());
1434 }
1435
1436 #[cfg(feature = "grafeo-file")]
1440 let is_single_file = self.file_manager.is_some();
1441 #[cfg(not(feature = "grafeo-file"))]
1442 let is_single_file = false;
1443
1444 #[cfg(feature = "grafeo-file")]
1445 if let Some(ref fm) = self.file_manager {
1446 #[cfg(feature = "wal")]
1448 if let Some(ref wal) = self.wal {
1449 wal.sync()?;
1450 }
1451 self.checkpoint_to_file(fm)?;
1452
1453 #[cfg(feature = "wal")]
1456 if let Some(ref wal) = self.wal {
1457 wal.close_active_log();
1458 }
1459
1460 {
1461 use grafeo_core::testing::crash::maybe_crash;
1462 maybe_crash("close:before_remove_sidecar_wal");
1463 }
1464 fm.remove_sidecar_wal()?;
1465 fm.close()?;
1466 }
1467
1468 #[cfg(feature = "wal")]
1474 if !is_single_file && let Some(ref wal) = self.wal {
1475 let commit_tx = self
1477 .transaction_manager
1478 .last_assigned_transaction_id()
1479 .unwrap_or_else(|| self.transaction_manager.begin());
1480
1481 wal.log(&WalRecord::TransactionCommit {
1483 transaction_id: commit_tx,
1484 })?;
1485
1486 wal.sync()?;
1487 }
1488
1489 *is_open = false;
1490 Ok(())
1491 }
1492
1493 #[cfg(feature = "wal")]
1495 #[must_use]
1496 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1497 self.wal.as_ref()
1498 }
1499
1500 #[cfg(feature = "wal")]
1502 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1503 if let Some(ref wal) = self.wal {
1504 wal.log(record)?;
1505 }
1506 Ok(())
1507 }
1508
1509 #[cfg(feature = "grafeo-file")]
1515 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1516 use grafeo_core::testing::crash::maybe_crash;
1517
1518 maybe_crash("checkpoint_to_file:before_export");
1519 let snapshot_data = self.export_snapshot()?;
1520 maybe_crash("checkpoint_to_file:after_export");
1521
1522 let epoch = self.lpg_store().current_epoch();
1523 let transaction_id = self
1524 .transaction_manager
1525 .last_assigned_transaction_id()
1526 .map_or(0, |t| t.0);
1527 let node_count = self.lpg_store().node_count() as u64;
1528 let edge_count = self.lpg_store().edge_count() as u64;
1529
1530 fm.write_snapshot(
1531 &snapshot_data,
1532 epoch.0,
1533 transaction_id,
1534 node_count,
1535 edge_count,
1536 )?;
1537
1538 maybe_crash("checkpoint_to_file:after_write_snapshot");
1539 Ok(())
1540 }
1541
1542 #[cfg(feature = "grafeo-file")]
1544 #[must_use]
1545 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1546 self.file_manager.as_ref()
1547 }
1548}
1549
1550impl Drop for GrafeoDB {
1551 fn drop(&mut self) {
1552 if let Err(e) = self.close() {
1553 grafeo_error!("Error closing database: {}", e);
1554 }
1555 }
1556}
1557
1558impl crate::admin::AdminService for GrafeoDB {
1559 fn info(&self) -> crate::admin::DatabaseInfo {
1560 self.info()
1561 }
1562
1563 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1564 self.detailed_stats()
1565 }
1566
1567 fn schema(&self) -> crate::admin::SchemaInfo {
1568 self.schema()
1569 }
1570
1571 fn validate(&self) -> crate::admin::ValidationResult {
1572 self.validate()
1573 }
1574
1575 fn wal_status(&self) -> crate::admin::WalStatus {
1576 self.wal_status()
1577 }
1578
1579 fn wal_checkpoint(&self) -> Result<()> {
1580 self.wal_checkpoint()
1581 }
1582}
1583
1584#[derive(Debug)]
1614pub struct QueryResult {
1615 pub columns: Vec<String>,
1617 pub column_types: Vec<grafeo_common::types::LogicalType>,
1619 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1621 pub execution_time_ms: Option<f64>,
1623 pub rows_scanned: Option<u64>,
1625 pub status_message: Option<String>,
1627 pub gql_status: grafeo_common::utils::GqlStatus,
1629}
1630
1631impl QueryResult {
1632 #[must_use]
1634 pub fn empty() -> Self {
1635 Self {
1636 columns: Vec::new(),
1637 column_types: Vec::new(),
1638 rows: Vec::new(),
1639 execution_time_ms: None,
1640 rows_scanned: None,
1641 status_message: None,
1642 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1643 }
1644 }
1645
1646 #[must_use]
1648 pub fn status(msg: impl Into<String>) -> Self {
1649 Self {
1650 columns: Vec::new(),
1651 column_types: Vec::new(),
1652 rows: Vec::new(),
1653 execution_time_ms: None,
1654 rows_scanned: None,
1655 status_message: Some(msg.into()),
1656 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1657 }
1658 }
1659
1660 #[must_use]
1662 pub fn new(columns: Vec<String>) -> Self {
1663 let len = columns.len();
1664 Self {
1665 columns,
1666 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1667 rows: Vec::new(),
1668 execution_time_ms: None,
1669 rows_scanned: None,
1670 status_message: None,
1671 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1672 }
1673 }
1674
1675 #[must_use]
1677 pub fn with_types(
1678 columns: Vec<String>,
1679 column_types: Vec<grafeo_common::types::LogicalType>,
1680 ) -> Self {
1681 Self {
1682 columns,
1683 column_types,
1684 rows: Vec::new(),
1685 execution_time_ms: None,
1686 rows_scanned: None,
1687 status_message: None,
1688 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1689 }
1690 }
1691
1692 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1694 self.execution_time_ms = Some(execution_time_ms);
1695 self.rows_scanned = Some(rows_scanned);
1696 self
1697 }
1698
1699 #[must_use]
1701 pub fn execution_time_ms(&self) -> Option<f64> {
1702 self.execution_time_ms
1703 }
1704
1705 #[must_use]
1707 pub fn rows_scanned(&self) -> Option<u64> {
1708 self.rows_scanned
1709 }
1710
1711 #[must_use]
1713 pub fn row_count(&self) -> usize {
1714 self.rows.len()
1715 }
1716
1717 #[must_use]
1719 pub fn column_count(&self) -> usize {
1720 self.columns.len()
1721 }
1722
1723 #[must_use]
1725 pub fn is_empty(&self) -> bool {
1726 self.rows.is_empty()
1727 }
1728
1729 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1738 if self.rows.len() != 1 || self.columns.len() != 1 {
1739 return Err(grafeo_common::utils::error::Error::InvalidValue(
1740 "Expected single value".to_string(),
1741 ));
1742 }
1743 T::from_value(&self.rows[0][0])
1744 }
1745
1746 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1748 self.rows.iter()
1749 }
1750}
1751
1752impl std::fmt::Display for QueryResult {
1753 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1754 let table = grafeo_common::fmt::format_result_table(
1755 &self.columns,
1756 &self.rows,
1757 self.execution_time_ms,
1758 self.status_message.as_deref(),
1759 );
1760 f.write_str(&table)
1761 }
1762}
1763
1764pub trait FromValue: Sized {
1769 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1771}
1772
1773impl FromValue for i64 {
1774 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1775 value
1776 .as_int64()
1777 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1778 expected: "INT64".to_string(),
1779 found: value.type_name().to_string(),
1780 })
1781 }
1782}
1783
1784impl FromValue for f64 {
1785 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1786 value
1787 .as_float64()
1788 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1789 expected: "FLOAT64".to_string(),
1790 found: value.type_name().to_string(),
1791 })
1792 }
1793}
1794
1795impl FromValue for String {
1796 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1797 value.as_str().map(String::from).ok_or_else(|| {
1798 grafeo_common::utils::error::Error::TypeMismatch {
1799 expected: "STRING".to_string(),
1800 found: value.type_name().to_string(),
1801 }
1802 })
1803 }
1804}
1805
1806impl FromValue for bool {
1807 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1808 value
1809 .as_bool()
1810 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1811 expected: "BOOL".to_string(),
1812 found: value.type_name().to_string(),
1813 })
1814 }
1815}
1816
1817#[cfg(test)]
1818mod tests {
1819 use super::*;
1820
1821 #[test]
1822 fn test_create_in_memory_database() {
1823 let db = GrafeoDB::new_in_memory();
1824 assert_eq!(db.node_count(), 0);
1825 assert_eq!(db.edge_count(), 0);
1826 }
1827
1828 #[test]
1829 fn test_database_config() {
1830 let config = Config::in_memory().with_threads(4).with_query_logging();
1831
1832 let db = GrafeoDB::with_config(config).unwrap();
1833 assert_eq!(db.config().threads, 4);
1834 assert!(db.config().query_logging);
1835 }
1836
1837 #[test]
1838 fn test_database_session() {
1839 let db = GrafeoDB::new_in_memory();
1840 let _session = db.session();
1841 }
1843
1844 #[cfg(feature = "wal")]
1845 #[test]
1846 fn test_persistent_database_recovery() {
1847 use grafeo_common::types::Value;
1848 use tempfile::tempdir;
1849
1850 let dir = tempdir().unwrap();
1851 let db_path = dir.path().join("test_db");
1852
1853 {
1855 let db = GrafeoDB::open(&db_path).unwrap();
1856
1857 let alix = db.create_node(&["Person"]);
1858 db.set_node_property(alix, "name", Value::from("Alix"));
1859
1860 let gus = db.create_node(&["Person"]);
1861 db.set_node_property(gus, "name", Value::from("Gus"));
1862
1863 let _edge = db.create_edge(alix, gus, "KNOWS");
1864
1865 db.close().unwrap();
1867 }
1868
1869 {
1871 let db = GrafeoDB::open(&db_path).unwrap();
1872
1873 assert_eq!(db.node_count(), 2);
1874 assert_eq!(db.edge_count(), 1);
1875
1876 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1878 assert!(node0.is_some());
1879
1880 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1881 assert!(node1.is_some());
1882 }
1883 }
1884
1885 #[cfg(feature = "wal")]
1886 #[test]
1887 fn test_wal_logging() {
1888 use tempfile::tempdir;
1889
1890 let dir = tempdir().unwrap();
1891 let db_path = dir.path().join("wal_test_db");
1892
1893 let db = GrafeoDB::open(&db_path).unwrap();
1894
1895 let node = db.create_node(&["Test"]);
1897 db.delete_node(node);
1898
1899 if let Some(wal) = db.wal() {
1901 assert!(wal.record_count() > 0);
1902 }
1903
1904 db.close().unwrap();
1905 }
1906
1907 #[cfg(feature = "wal")]
1908 #[test]
1909 fn test_wal_recovery_multiple_sessions() {
1910 use grafeo_common::types::Value;
1912 use tempfile::tempdir;
1913
1914 let dir = tempdir().unwrap();
1915 let db_path = dir.path().join("multi_session_db");
1916
1917 {
1919 let db = GrafeoDB::open(&db_path).unwrap();
1920 let alix = db.create_node(&["Person"]);
1921 db.set_node_property(alix, "name", Value::from("Alix"));
1922 db.close().unwrap();
1923 }
1924
1925 {
1927 let db = GrafeoDB::open(&db_path).unwrap();
1928 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1930 db.set_node_property(gus, "name", Value::from("Gus"));
1931 db.close().unwrap();
1932 }
1933
1934 {
1936 let db = GrafeoDB::open(&db_path).unwrap();
1937 assert_eq!(db.node_count(), 2);
1938
1939 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1941 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1942
1943 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1944 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1945 }
1946 }
1947
1948 #[cfg(feature = "wal")]
1949 #[test]
1950 fn test_database_consistency_after_mutations() {
1951 use grafeo_common::types::Value;
1953 use tempfile::tempdir;
1954
1955 let dir = tempdir().unwrap();
1956 let db_path = dir.path().join("consistency_db");
1957
1958 {
1959 let db = GrafeoDB::open(&db_path).unwrap();
1960
1961 let a = db.create_node(&["Node"]);
1963 let b = db.create_node(&["Node"]);
1964 let c = db.create_node(&["Node"]);
1965
1966 let e1 = db.create_edge(a, b, "LINKS");
1968 let _e2 = db.create_edge(b, c, "LINKS");
1969
1970 db.delete_edge(e1);
1972 db.delete_node(b);
1973
1974 db.set_node_property(a, "value", Value::Int64(1));
1976 db.set_node_property(c, "value", Value::Int64(3));
1977
1978 db.close().unwrap();
1979 }
1980
1981 {
1983 let db = GrafeoDB::open(&db_path).unwrap();
1984
1985 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1989 assert!(node_a.is_some());
1990
1991 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1992 assert!(node_c.is_some());
1993
1994 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1996 assert!(node_b.is_none());
1997 }
1998 }
1999
2000 #[cfg(feature = "wal")]
2001 #[test]
2002 fn test_close_is_idempotent() {
2003 use tempfile::tempdir;
2005
2006 let dir = tempdir().unwrap();
2007 let db_path = dir.path().join("close_test_db");
2008
2009 let db = GrafeoDB::open(&db_path).unwrap();
2010 db.create_node(&["Test"]);
2011
2012 assert!(db.close().is_ok());
2014
2015 assert!(db.close().is_ok());
2017 }
2018
2019 #[test]
2020 fn test_with_store_external_backend() {
2021 use grafeo_core::graph::lpg::LpgStore;
2022
2023 let external = Arc::new(LpgStore::new().unwrap());
2024
2025 let n1 = external.create_node(&["Person"]);
2027 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2028
2029 let db = GrafeoDB::with_store(
2030 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2031 Config::in_memory(),
2032 )
2033 .unwrap();
2034
2035 let session = db.session();
2036
2037 #[cfg(feature = "gql")]
2039 {
2040 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2041 assert_eq!(result.rows.len(), 1);
2042 }
2043 }
2044
2045 #[test]
2046 fn test_with_config_custom_memory_limit() {
2047 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
2050 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2051 assert_eq!(db.node_count(), 0);
2052 }
2053
2054 #[cfg(feature = "metrics")]
2055 #[test]
2056 fn test_database_metrics_registry() {
2057 let db = GrafeoDB::new_in_memory();
2058
2059 db.create_node(&["Person"]);
2061 db.create_node(&["Person"]);
2062
2063 let snap = db.metrics();
2065 assert_eq!(snap.query_count, 0); }
2068
2069 #[test]
2070 fn test_query_result_has_metrics() {
2071 let db = GrafeoDB::new_in_memory();
2073 db.create_node(&["Person"]);
2074 db.create_node(&["Person"]);
2075
2076 #[cfg(feature = "gql")]
2077 {
2078 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2079
2080 assert!(result.execution_time_ms.is_some());
2082 assert!(result.rows_scanned.is_some());
2083 assert!(result.execution_time_ms.unwrap() >= 0.0);
2084 assert_eq!(result.rows_scanned.unwrap(), 2);
2085 }
2086 }
2087
2088 #[test]
2089 fn test_empty_query_result_metrics() {
2090 let db = GrafeoDB::new_in_memory();
2092 db.create_node(&["Person"]);
2093
2094 #[cfg(feature = "gql")]
2095 {
2096 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2098
2099 assert!(result.execution_time_ms.is_some());
2100 assert!(result.rows_scanned.is_some());
2101 assert_eq!(result.rows_scanned.unwrap(), 0);
2102 }
2103 }
2104
2105 #[cfg(feature = "cdc")]
2106 mod cdc_integration {
2107 use super::*;
2108
2109 fn cdc_db() -> GrafeoDB {
2111 GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2112 }
2113
2114 #[test]
2115 fn test_node_lifecycle_history() {
2116 let db = cdc_db();
2117
2118 let id = db.create_node(&["Person"]);
2120 db.set_node_property(id, "name", "Alix".into());
2122 db.set_node_property(id, "name", "Gus".into());
2123 db.delete_node(id);
2125
2126 let history = db.history(id).unwrap();
2127 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2129 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2130 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2132 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2134 }
2135
2136 #[test]
2137 fn test_edge_lifecycle_history() {
2138 let db = cdc_db();
2139
2140 let alix = db.create_node(&["Person"]);
2141 let gus = db.create_node(&["Person"]);
2142 let edge = db.create_edge(alix, gus, "KNOWS");
2143 db.set_edge_property(edge, "since", 2024i64.into());
2144 db.delete_edge(edge);
2145
2146 let history = db.history(edge).unwrap();
2147 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2149 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2150 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2151 }
2152
2153 #[test]
2154 fn test_create_node_with_props_cdc() {
2155 let db = cdc_db();
2156
2157 let id = db.create_node_with_props(
2158 &["Person"],
2159 vec![
2160 ("name", grafeo_common::types::Value::from("Alix")),
2161 ("age", grafeo_common::types::Value::from(30i64)),
2162 ],
2163 );
2164
2165 let history = db.history(id).unwrap();
2166 assert_eq!(history.len(), 1);
2167 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2168 let after = history[0].after.as_ref().unwrap();
2170 assert_eq!(after.len(), 2);
2171 }
2172
2173 #[test]
2174 fn test_changes_between() {
2175 let db = cdc_db();
2176
2177 let id1 = db.create_node(&["A"]);
2178 let _id2 = db.create_node(&["B"]);
2179 db.set_node_property(id1, "x", 1i64.into());
2180
2181 let changes = db
2183 .changes_between(
2184 grafeo_common::types::EpochId(0),
2185 grafeo_common::types::EpochId(u64::MAX),
2186 )
2187 .unwrap();
2188 assert_eq!(changes.len(), 3); }
2190
2191 #[test]
2192 fn test_cdc_disabled_by_default() {
2193 let db = GrafeoDB::new_in_memory();
2194 assert!(!db.is_cdc_enabled());
2195
2196 let id = db.create_node(&["Person"]);
2197 db.set_node_property(id, "name", "Alix".into());
2198
2199 let history = db.history(id).unwrap();
2200 assert!(history.is_empty(), "CDC off by default: no events recorded");
2201 }
2202
2203 #[test]
2204 fn test_session_with_cdc_override_on() {
2205 let db = GrafeoDB::new_in_memory();
2207 let session = db.session_with_cdc(true);
2208 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2209 let changes = db
2211 .changes_between(
2212 grafeo_common::types::EpochId(0),
2213 grafeo_common::types::EpochId(u64::MAX),
2214 )
2215 .unwrap();
2216 assert!(
2217 !changes.is_empty(),
2218 "session_with_cdc(true) should record events"
2219 );
2220 }
2221
2222 #[test]
2223 fn test_session_with_cdc_override_off() {
2224 let db = cdc_db();
2226 let session = db.session_with_cdc(false);
2227 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2228 let changes = db
2229 .changes_between(
2230 grafeo_common::types::EpochId(0),
2231 grafeo_common::types::EpochId(u64::MAX),
2232 )
2233 .unwrap();
2234 assert!(
2235 changes.is_empty(),
2236 "session_with_cdc(false) should not record events"
2237 );
2238 }
2239
2240 #[test]
2241 fn test_set_cdc_enabled_runtime() {
2242 let db = GrafeoDB::new_in_memory();
2243 assert!(!db.is_cdc_enabled());
2244
2245 db.set_cdc_enabled(true);
2247 assert!(db.is_cdc_enabled());
2248
2249 let id = db.create_node(&["Person"]);
2250 let history = db.history(id).unwrap();
2251 assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
2252
2253 db.set_cdc_enabled(false);
2255 let id2 = db.create_node(&["Person"]);
2256 let history2 = db.history(id2).unwrap();
2257 assert!(
2258 history2.is_empty(),
2259 "CDC disabled at runtime stops recording"
2260 );
2261 }
2262 }
2263
2264 #[test]
2265 fn test_with_store_basic() {
2266 use grafeo_core::graph::lpg::LpgStore;
2267
2268 let store = Arc::new(LpgStore::new().unwrap());
2269 let n1 = store.create_node(&["Person"]);
2270 store.set_node_property(n1, "name", "Alix".into());
2271
2272 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2273 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2274
2275 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
2276 assert_eq!(result.rows.len(), 1);
2277 }
2278
2279 #[test]
2280 fn test_with_store_session() {
2281 use grafeo_core::graph::lpg::LpgStore;
2282
2283 let store = Arc::new(LpgStore::new().unwrap());
2284 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2285 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2286
2287 let session = db.session();
2288 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
2289 assert_eq!(result.rows.len(), 1);
2290 }
2291
2292 #[test]
2293 fn test_with_store_mutations() {
2294 use grafeo_core::graph::lpg::LpgStore;
2295
2296 let store = Arc::new(LpgStore::new().unwrap());
2297 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2298 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2299
2300 let mut session = db.session();
2301
2302 session.begin_transaction().unwrap();
2306 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2307
2308 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2309 assert_eq!(result.rows.len(), 1);
2310
2311 session.commit().unwrap();
2312 }
2313}