1mod admin;
15#[cfg(feature = "async-storage")]
16mod async_ops;
17#[cfg(feature = "async-storage")]
18pub(crate) mod async_wal_store;
19mod crud;
20#[cfg(feature = "embed")]
21mod embed;
22mod index;
23mod persistence;
24mod query;
25#[cfg(feature = "rdf")]
26mod rdf_ops;
27mod search;
28#[cfg(feature = "wal")]
29pub(crate) mod wal_store;
30
31use grafeo_common::grafeo_error;
32#[cfg(feature = "wal")]
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36
37use parking_lot::RwLock;
38
39#[cfg(feature = "grafeo-file")]
40use grafeo_adapters::storage::file::GrafeoFileManager;
41#[cfg(feature = "wal")]
42use grafeo_adapters::storage::wal::{
43 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
44};
45use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
46use grafeo_common::utils::error::Result;
47use grafeo_core::graph::lpg::LpgStore;
48#[cfg(feature = "rdf")]
49use grafeo_core::graph::rdf::RdfStore;
50use grafeo_core::graph::{GraphStore, GraphStoreMut};
51
52use crate::catalog::Catalog;
53use crate::config::Config;
54use crate::query::cache::QueryCache;
55use crate::session::Session;
56use crate::transaction::TransactionManager;
57
58pub struct GrafeoDB {
81 pub(super) config: Config,
83 pub(super) store: Option<Arc<LpgStore>>,
85 pub(super) catalog: Arc<Catalog>,
87 #[cfg(feature = "rdf")]
89 pub(super) rdf_store: Arc<RdfStore>,
90 pub(super) transaction_manager: Arc<TransactionManager>,
92 pub(super) buffer_manager: Arc<BufferManager>,
94 #[cfg(feature = "wal")]
96 pub(super) wal: Option<Arc<LpgWal>>,
97 #[cfg(feature = "wal")]
101 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
102 pub(super) query_cache: Arc<QueryCache>,
104 pub(super) commit_counter: Arc<AtomicUsize>,
106 pub(super) is_open: RwLock<bool>,
108 #[cfg(feature = "cdc")]
110 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
111 #[cfg(feature = "embed")]
113 pub(super) embedding_models:
114 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
115 #[cfg(feature = "grafeo-file")]
117 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
118 pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
121 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
124 #[cfg(feature = "metrics")]
126 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
127 current_graph: RwLock<Option<String>>,
131 current_schema: RwLock<Option<String>>,
135 read_only: bool,
138}
139
140impl GrafeoDB {
141 fn lpg_store(&self) -> &Arc<LpgStore> {
149 self.store.as_ref().expect(
150 "no built-in LpgStore: this GrafeoDB was created with an external store \
151 (with_store / with_read_store). Use session() or graph_store() instead.",
152 )
153 }
154
155 #[must_use]
176 pub fn new_in_memory() -> Self {
177 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
178 }
179
180 #[cfg(feature = "wal")]
199 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
200 Self::with_config(Config::persistent(path.as_ref()))
201 }
202
203 #[cfg(feature = "grafeo-file")]
228 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
229 Self::with_config(Config::read_only(path.as_ref()))
230 }
231
232 pub fn with_config(config: Config) -> Result<Self> {
256 config
258 .validate()
259 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
260
261 let store = Arc::new(LpgStore::new()?);
262 #[cfg(feature = "rdf")]
263 let rdf_store = Arc::new(RdfStore::new());
264 let transaction_manager = Arc::new(TransactionManager::new());
265
266 let buffer_config = BufferManagerConfig {
268 budget: config.memory_limit.unwrap_or_else(|| {
269 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
270 }),
271 spill_path: config
272 .spill_path
273 .clone()
274 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
275 ..BufferManagerConfig::default()
276 };
277 let buffer_manager = BufferManager::new(buffer_config);
278
279 let catalog = Arc::new(Catalog::new());
281
282 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
283
284 #[cfg(feature = "grafeo-file")]
286 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
287 if let Some(ref db_path) = config.path {
289 if db_path.exists() && db_path.is_file() {
290 let fm = GrafeoFileManager::open_read_only(db_path)?;
291 let snapshot_data = fm.read_snapshot()?;
292 if !snapshot_data.is_empty() {
293 Self::apply_snapshot_data(
294 &store,
295 &catalog,
296 #[cfg(feature = "rdf")]
297 &rdf_store,
298 &snapshot_data,
299 )?;
300 }
301 Some(Arc::new(fm))
302 } else {
303 return Err(grafeo_common::utils::error::Error::Internal(format!(
304 "read-only open requires an existing .grafeo file: {}",
305 db_path.display()
306 )));
307 }
308 } else {
309 return Err(grafeo_common::utils::error::Error::Internal(
310 "read-only mode requires a database path".to_string(),
311 ));
312 }
313 } else if let Some(ref db_path) = config.path {
314 if Self::should_use_single_file(db_path, config.storage_format) {
319 let fm = if db_path.exists() && db_path.is_file() {
320 GrafeoFileManager::open(db_path)?
321 } else if !db_path.exists() {
322 GrafeoFileManager::create(db_path)?
323 } else {
324 return Err(grafeo_common::utils::error::Error::Internal(format!(
326 "path exists but is not a file: {}",
327 db_path.display()
328 )));
329 };
330
331 let snapshot_data = fm.read_snapshot()?;
333 if !snapshot_data.is_empty() {
334 Self::apply_snapshot_data(
335 &store,
336 &catalog,
337 #[cfg(feature = "rdf")]
338 &rdf_store,
339 &snapshot_data,
340 )?;
341 }
342
343 #[cfg(feature = "wal")]
345 if config.wal_enabled && fm.has_sidecar_wal() {
346 let recovery = WalRecovery::new(fm.sidecar_wal_path());
347 let records = recovery.recover()?;
348 Self::apply_wal_records(
349 &store,
350 &catalog,
351 #[cfg(feature = "rdf")]
352 &rdf_store,
353 &records,
354 )?;
355 }
356
357 Some(Arc::new(fm))
358 } else {
359 None
360 }
361 } else {
362 None
363 };
364
365 #[cfg(feature = "wal")]
368 let wal = if is_read_only {
369 None
370 } else if config.wal_enabled {
371 if let Some(ref db_path) = config.path {
372 #[cfg(feature = "grafeo-file")]
374 let wal_path = if let Some(ref fm) = file_manager {
375 let p = fm.sidecar_wal_path();
376 std::fs::create_dir_all(&p)?;
377 p
378 } else {
379 std::fs::create_dir_all(db_path)?;
381 db_path.join("wal")
382 };
383
384 #[cfg(not(feature = "grafeo-file"))]
385 let wal_path = {
386 std::fs::create_dir_all(db_path)?;
387 db_path.join("wal")
388 };
389
390 #[cfg(feature = "grafeo-file")]
392 let is_single_file = file_manager.is_some();
393 #[cfg(not(feature = "grafeo-file"))]
394 let is_single_file = false;
395
396 if !is_single_file && wal_path.exists() {
397 let recovery = WalRecovery::new(&wal_path);
398 let records = recovery.recover()?;
399 Self::apply_wal_records(
400 &store,
401 &catalog,
402 #[cfg(feature = "rdf")]
403 &rdf_store,
404 &records,
405 )?;
406 }
407
408 let wal_durability = match config.wal_durability {
410 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
411 crate::config::DurabilityMode::Batch {
412 max_delay_ms,
413 max_records,
414 } => WalDurabilityMode::Batch {
415 max_delay_ms,
416 max_records,
417 },
418 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
419 WalDurabilityMode::Adaptive { target_interval_ms }
420 }
421 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
422 };
423 let wal_config = WalConfig {
424 durability: wal_durability,
425 ..WalConfig::default()
426 };
427 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
428 Some(Arc::new(wal_manager))
429 } else {
430 None
431 }
432 } else {
433 None
434 };
435
436 let query_cache = Arc::new(QueryCache::default());
438
439 #[cfg(feature = "temporal")]
442 transaction_manager.sync_epoch(store.current_epoch());
443
444 Ok(Self {
445 config,
446 store: Some(store),
447 catalog,
448 #[cfg(feature = "rdf")]
449 rdf_store,
450 transaction_manager,
451 buffer_manager,
452 #[cfg(feature = "wal")]
453 wal,
454 #[cfg(feature = "wal")]
455 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
456 query_cache,
457 commit_counter: Arc::new(AtomicUsize::new(0)),
458 is_open: RwLock::new(true),
459 #[cfg(feature = "cdc")]
460 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
461 #[cfg(feature = "embed")]
462 embedding_models: RwLock::new(hashbrown::HashMap::new()),
463 #[cfg(feature = "grafeo-file")]
464 file_manager,
465 external_read_store: None,
466 external_write_store: None,
467 #[cfg(feature = "metrics")]
468 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
469 current_graph: RwLock::new(None),
470 current_schema: RwLock::new(None),
471 read_only: is_read_only,
472 })
473 }
474
475 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
500 config
501 .validate()
502 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
503
504 let transaction_manager = Arc::new(TransactionManager::new());
505
506 let buffer_config = BufferManagerConfig {
507 budget: config.memory_limit.unwrap_or_else(|| {
508 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
509 }),
510 spill_path: None,
511 ..BufferManagerConfig::default()
512 };
513 let buffer_manager = BufferManager::new(buffer_config);
514
515 let query_cache = Arc::new(QueryCache::default());
516
517 Ok(Self {
518 config,
519 store: None,
520 catalog: Arc::new(Catalog::new()),
521 #[cfg(feature = "rdf")]
522 rdf_store: Arc::new(RdfStore::new()),
523 transaction_manager,
524 buffer_manager,
525 #[cfg(feature = "wal")]
526 wal: None,
527 #[cfg(feature = "wal")]
528 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
529 query_cache,
530 commit_counter: Arc::new(AtomicUsize::new(0)),
531 is_open: RwLock::new(true),
532 #[cfg(feature = "cdc")]
533 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
534 #[cfg(feature = "embed")]
535 embedding_models: RwLock::new(hashbrown::HashMap::new()),
536 #[cfg(feature = "grafeo-file")]
537 file_manager: None,
538 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
539 external_write_store: Some(store),
540 #[cfg(feature = "metrics")]
541 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
542 current_graph: RwLock::new(None),
543 current_schema: RwLock::new(None),
544 read_only: false,
545 })
546 }
547
548 pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
569 config
570 .validate()
571 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
572
573 let transaction_manager = Arc::new(TransactionManager::new());
574
575 let buffer_config = BufferManagerConfig {
576 budget: config.memory_limit.unwrap_or_else(|| {
577 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
578 }),
579 spill_path: None,
580 ..BufferManagerConfig::default()
581 };
582 let buffer_manager = BufferManager::new(buffer_config);
583
584 let query_cache = Arc::new(QueryCache::default());
585
586 Ok(Self {
587 config,
588 store: None,
589 catalog: Arc::new(Catalog::new()),
590 #[cfg(feature = "rdf")]
591 rdf_store: Arc::new(RdfStore::new()),
592 transaction_manager,
593 buffer_manager,
594 #[cfg(feature = "wal")]
595 wal: None,
596 #[cfg(feature = "wal")]
597 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
598 query_cache,
599 commit_counter: Arc::new(AtomicUsize::new(0)),
600 is_open: RwLock::new(true),
601 #[cfg(feature = "cdc")]
602 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
603 #[cfg(feature = "embed")]
604 embedding_models: RwLock::new(hashbrown::HashMap::new()),
605 #[cfg(feature = "grafeo-file")]
606 file_manager: None,
607 external_read_store: Some(store),
608 external_write_store: None,
609 #[cfg(feature = "metrics")]
610 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
611 current_graph: RwLock::new(None),
612 current_schema: RwLock::new(None),
613 read_only: true,
614 })
615 }
616
617 #[cfg(feature = "wal")]
623 fn apply_wal_records(
624 store: &Arc<LpgStore>,
625 catalog: &Catalog,
626 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
627 records: &[WalRecord],
628 ) -> Result<()> {
629 use crate::catalog::{
630 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
631 };
632 use grafeo_common::utils::error::Error;
633
634 let mut current_graph: Option<String> = None;
637 let mut target_store: Arc<LpgStore> = Arc::clone(store);
638
639 for record in records {
640 match record {
641 WalRecord::CreateNamedGraph { name } => {
643 let _ = store.create_graph(name);
644 }
645 WalRecord::DropNamedGraph { name } => {
646 store.drop_graph(name);
647 if current_graph.as_deref() == Some(name.as_str()) {
649 current_graph = None;
650 target_store = Arc::clone(store);
651 }
652 }
653 WalRecord::SwitchGraph { name } => {
654 current_graph.clone_from(name);
655 target_store = match ¤t_graph {
656 None => Arc::clone(store),
657 Some(graph_name) => store
658 .graph_or_create(graph_name)
659 .map_err(|e| Error::Internal(e.to_string()))?,
660 };
661 }
662
663 WalRecord::CreateNode { id, labels } => {
665 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
666 target_store.create_node_with_id(*id, &label_refs)?;
667 }
668 WalRecord::DeleteNode { id } => {
669 target_store.delete_node(*id);
670 }
671 WalRecord::CreateEdge {
672 id,
673 src,
674 dst,
675 edge_type,
676 } => {
677 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
678 }
679 WalRecord::DeleteEdge { id } => {
680 target_store.delete_edge(*id);
681 }
682 WalRecord::SetNodeProperty { id, key, value } => {
683 target_store.set_node_property(*id, key, value.clone());
684 }
685 WalRecord::SetEdgeProperty { id, key, value } => {
686 target_store.set_edge_property(*id, key, value.clone());
687 }
688 WalRecord::AddNodeLabel { id, label } => {
689 target_store.add_label(*id, label);
690 }
691 WalRecord::RemoveNodeLabel { id, label } => {
692 target_store.remove_label(*id, label);
693 }
694 WalRecord::RemoveNodeProperty { id, key } => {
695 target_store.remove_node_property(*id, key);
696 }
697 WalRecord::RemoveEdgeProperty { id, key } => {
698 target_store.remove_edge_property(*id, key);
699 }
700
701 WalRecord::CreateNodeType {
703 name,
704 properties,
705 constraints,
706 } => {
707 let def = NodeTypeDefinition {
708 name: name.clone(),
709 properties: properties
710 .iter()
711 .map(|(n, t, nullable)| TypedProperty {
712 name: n.clone(),
713 data_type: PropertyDataType::from_type_name(t),
714 nullable: *nullable,
715 default_value: None,
716 })
717 .collect(),
718 constraints: constraints
719 .iter()
720 .map(|(kind, props)| match kind.as_str() {
721 "unique" => TypeConstraint::Unique(props.clone()),
722 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
723 "not_null" if !props.is_empty() => {
724 TypeConstraint::NotNull(props[0].clone())
725 }
726 _ => TypeConstraint::Unique(props.clone()),
727 })
728 .collect(),
729 parent_types: Vec::new(),
730 };
731 let _ = catalog.register_node_type(def);
732 }
733 WalRecord::DropNodeType { name } => {
734 let _ = catalog.drop_node_type(name);
735 }
736 WalRecord::CreateEdgeType {
737 name,
738 properties,
739 constraints,
740 } => {
741 let def = EdgeTypeDefinition {
742 name: name.clone(),
743 properties: properties
744 .iter()
745 .map(|(n, t, nullable)| TypedProperty {
746 name: n.clone(),
747 data_type: PropertyDataType::from_type_name(t),
748 nullable: *nullable,
749 default_value: None,
750 })
751 .collect(),
752 constraints: constraints
753 .iter()
754 .map(|(kind, props)| match kind.as_str() {
755 "unique" => TypeConstraint::Unique(props.clone()),
756 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
757 "not_null" if !props.is_empty() => {
758 TypeConstraint::NotNull(props[0].clone())
759 }
760 _ => TypeConstraint::Unique(props.clone()),
761 })
762 .collect(),
763 source_node_types: Vec::new(),
764 target_node_types: Vec::new(),
765 };
766 let _ = catalog.register_edge_type_def(def);
767 }
768 WalRecord::DropEdgeType { name } => {
769 let _ = catalog.drop_edge_type_def(name);
770 }
771 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
772 }
775 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
776 }
779 WalRecord::CreateGraphType {
780 name,
781 node_types,
782 edge_types,
783 open,
784 } => {
785 use crate::catalog::GraphTypeDefinition;
786 let def = GraphTypeDefinition {
787 name: name.clone(),
788 allowed_node_types: node_types.clone(),
789 allowed_edge_types: edge_types.clone(),
790 open: *open,
791 };
792 let _ = catalog.register_graph_type(def);
793 }
794 WalRecord::DropGraphType { name } => {
795 let _ = catalog.drop_graph_type(name);
796 }
797 WalRecord::CreateSchema { name } => {
798 let _ = catalog.register_schema_namespace(name.clone());
799 }
800 WalRecord::DropSchema { name } => {
801 let _ = catalog.drop_schema_namespace(name);
802 }
803
804 WalRecord::AlterNodeType { name, alterations } => {
805 for (action, prop_name, type_name, nullable) in alterations {
806 match action.as_str() {
807 "add" => {
808 let prop = TypedProperty {
809 name: prop_name.clone(),
810 data_type: PropertyDataType::from_type_name(type_name),
811 nullable: *nullable,
812 default_value: None,
813 };
814 let _ = catalog.alter_node_type_add_property(name, prop);
815 }
816 "drop" => {
817 let _ = catalog.alter_node_type_drop_property(name, prop_name);
818 }
819 _ => {}
820 }
821 }
822 }
823 WalRecord::AlterEdgeType { name, alterations } => {
824 for (action, prop_name, type_name, nullable) in alterations {
825 match action.as_str() {
826 "add" => {
827 let prop = TypedProperty {
828 name: prop_name.clone(),
829 data_type: PropertyDataType::from_type_name(type_name),
830 nullable: *nullable,
831 default_value: None,
832 };
833 let _ = catalog.alter_edge_type_add_property(name, prop);
834 }
835 "drop" => {
836 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
837 }
838 _ => {}
839 }
840 }
841 }
842 WalRecord::AlterGraphType { name, alterations } => {
843 for (action, type_name) in alterations {
844 match action.as_str() {
845 "add_node" => {
846 let _ =
847 catalog.alter_graph_type_add_node_type(name, type_name.clone());
848 }
849 "drop_node" => {
850 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
851 }
852 "add_edge" => {
853 let _ =
854 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
855 }
856 "drop_edge" => {
857 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
858 }
859 _ => {}
860 }
861 }
862 }
863
864 WalRecord::CreateProcedure {
865 name,
866 params,
867 returns,
868 body,
869 } => {
870 use crate::catalog::ProcedureDefinition;
871 let def = ProcedureDefinition {
872 name: name.clone(),
873 params: params.clone(),
874 returns: returns.clone(),
875 body: body.clone(),
876 };
877 let _ = catalog.register_procedure(def);
878 }
879 WalRecord::DropProcedure { name } => {
880 let _ = catalog.drop_procedure(name);
881 }
882
883 #[cfg(feature = "rdf")]
885 WalRecord::InsertRdfTriple { .. }
886 | WalRecord::DeleteRdfTriple { .. }
887 | WalRecord::ClearRdfGraph { .. }
888 | WalRecord::CreateRdfGraph { .. }
889 | WalRecord::DropRdfGraph { .. } => {
890 rdf_ops::replay_rdf_wal_record(rdf_store, record);
891 }
892 #[cfg(not(feature = "rdf"))]
893 WalRecord::InsertRdfTriple { .. }
894 | WalRecord::DeleteRdfTriple { .. }
895 | WalRecord::ClearRdfGraph { .. }
896 | WalRecord::CreateRdfGraph { .. }
897 | WalRecord::DropRdfGraph { .. } => {}
898
899 WalRecord::TransactionCommit { .. } => {
900 #[cfg(feature = "temporal")]
904 {
905 target_store.new_epoch();
906 }
907 }
908 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
909 }
912 }
913 }
914 Ok(())
915 }
916
917 #[cfg(feature = "grafeo-file")]
923 fn should_use_single_file(
924 path: &std::path::Path,
925 configured: crate::config::StorageFormat,
926 ) -> bool {
927 use crate::config::StorageFormat;
928 match configured {
929 StorageFormat::SingleFile => true,
930 StorageFormat::WalDirectory => false,
931 StorageFormat::Auto => {
932 if path.is_file() {
934 if let Ok(mut f) = std::fs::File::open(path) {
935 use std::io::Read;
936 let mut magic = [0u8; 4];
937 if f.read_exact(&mut magic).is_ok()
938 && magic == grafeo_adapters::storage::file::MAGIC
939 {
940 return true;
941 }
942 }
943 return false;
944 }
945 if path.is_dir() {
947 return false;
948 }
949 path.extension().is_some_and(|ext| ext == "grafeo")
951 }
952 }
953 }
954
955 #[cfg(feature = "grafeo-file")]
957 fn apply_snapshot_data(
958 store: &Arc<LpgStore>,
959 catalog: &Arc<crate::catalog::Catalog>,
960 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
961 data: &[u8],
962 ) -> Result<()> {
963 persistence::load_snapshot_into_store(
964 store,
965 catalog,
966 #[cfg(feature = "rdf")]
967 rdf_store,
968 data,
969 )
970 }
971
972 #[must_use]
1000 pub fn session(&self) -> Session {
1001 let session_cfg = || crate::session::SessionConfig {
1002 transaction_manager: Arc::clone(&self.transaction_manager),
1003 query_cache: Arc::clone(&self.query_cache),
1004 catalog: Arc::clone(&self.catalog),
1005 adaptive_config: self.config.adaptive.clone(),
1006 factorized_execution: self.config.factorized_execution,
1007 graph_model: self.config.graph_model,
1008 query_timeout: self.config.query_timeout,
1009 commit_counter: Arc::clone(&self.commit_counter),
1010 gc_interval: self.config.gc_interval,
1011 read_only: self.read_only,
1012 };
1013
1014 if let Some(ref ext_read) = self.external_read_store {
1015 return Session::with_external_store(
1016 Arc::clone(ext_read),
1017 self.external_write_store.as_ref().map(Arc::clone),
1018 session_cfg(),
1019 )
1020 .expect("arena allocation for external store session");
1021 }
1022
1023 #[cfg(feature = "rdf")]
1024 let mut session = Session::with_rdf_store_and_adaptive(
1025 Arc::clone(self.lpg_store()),
1026 Arc::clone(&self.rdf_store),
1027 session_cfg(),
1028 );
1029 #[cfg(not(feature = "rdf"))]
1030 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1031
1032 #[cfg(feature = "wal")]
1033 if let Some(ref wal) = self.wal {
1034 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1035 }
1036
1037 #[cfg(feature = "cdc")]
1038 session.set_cdc_log(Arc::clone(&self.cdc_log));
1039
1040 #[cfg(feature = "metrics")]
1041 {
1042 if let Some(ref m) = self.metrics {
1043 session.set_metrics(Arc::clone(m));
1044 m.session_created
1045 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1046 m.session_active
1047 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1048 }
1049 }
1050
1051 if let Some(ref graph) = *self.current_graph.read() {
1053 session.use_graph(graph);
1054 }
1055
1056 if let Some(ref schema) = *self.current_schema.read() {
1058 session.set_schema(schema);
1059 }
1060
1061 let _ = &mut session;
1063
1064 session
1065 }
1066
1067 #[must_use]
1073 pub fn current_graph(&self) -> Option<String> {
1074 self.current_graph.read().clone()
1075 }
1076
1077 pub fn set_current_graph(&self, name: Option<&str>) {
1082 *self.current_graph.write() = name.map(ToString::to_string);
1083 }
1084
1085 #[must_use]
1090 pub fn current_schema(&self) -> Option<String> {
1091 self.current_schema.read().clone()
1092 }
1093
1094 pub fn set_current_schema(&self, name: Option<&str>) {
1099 *self.current_schema.write() = name.map(ToString::to_string);
1100 }
1101
1102 #[must_use]
1104 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1105 &self.config.adaptive
1106 }
1107
1108 #[must_use]
1110 pub fn is_read_only(&self) -> bool {
1111 self.read_only
1112 }
1113
1114 #[must_use]
1116 pub fn config(&self) -> &Config {
1117 &self.config
1118 }
1119
1120 #[must_use]
1122 pub fn graph_model(&self) -> crate::config::GraphModel {
1123 self.config.graph_model
1124 }
1125
1126 #[must_use]
1128 pub fn memory_limit(&self) -> Option<usize> {
1129 self.config.memory_limit
1130 }
1131
1132 #[cfg(feature = "metrics")]
1137 #[must_use]
1138 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1139 let mut snapshot = self
1140 .metrics
1141 .as_ref()
1142 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1143
1144 let cache_stats = self.query_cache.stats();
1146 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1147 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1148 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1149 snapshot.cache_invalidations = cache_stats.invalidations;
1150
1151 snapshot
1152 }
1153
1154 #[cfg(feature = "metrics")]
1158 #[must_use]
1159 pub fn metrics_prometheus(&self) -> String {
1160 self.metrics
1161 .as_ref()
1162 .map_or_else(String::new, |m| m.to_prometheus())
1163 }
1164
1165 #[cfg(feature = "metrics")]
1167 pub fn reset_metrics(&self) {
1168 if let Some(ref m) = self.metrics {
1169 m.reset();
1170 }
1171 self.query_cache.reset_stats();
1172 }
1173
1174 #[must_use]
1182 pub fn store(&self) -> &Arc<LpgStore> {
1183 self.lpg_store()
1184 }
1185
1186 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
1193 let store = self.lpg_store();
1194 let graph_name = self.current_graph.read().clone();
1195 match graph_name {
1196 None => Arc::clone(store),
1197 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(store),
1198 Some(ref name) => store.graph(name).unwrap_or_else(|| Arc::clone(store)),
1199 }
1200 }
1201
1202 pub fn create_graph(&self, name: &str) -> Result<bool> {
1210 Ok(self.lpg_store().create_graph(name)?)
1211 }
1212
1213 pub fn drop_graph(&self, name: &str) -> bool {
1215 self.lpg_store().drop_graph(name)
1216 }
1217
1218 #[must_use]
1220 pub fn list_graphs(&self) -> Vec<String> {
1221 self.lpg_store().graph_names()
1222 }
1223
1224 #[must_use]
1233 pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1234 if let Some(ref ext_read) = self.external_read_store {
1235 Arc::clone(ext_read)
1236 } else {
1237 Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1238 }
1239 }
1240
1241 #[must_use]
1246 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1247 if self.external_read_store.is_some() {
1248 self.external_write_store.as_ref().map(Arc::clone)
1249 } else {
1250 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1251 }
1252 }
1253
1254 pub fn gc(&self) {
1260 let min_epoch = self.transaction_manager.min_active_epoch();
1261 self.lpg_store().gc_versions(min_epoch);
1262 self.transaction_manager.gc();
1263 }
1264
1265 #[must_use]
1267 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1268 &self.buffer_manager
1269 }
1270
1271 #[must_use]
1273 pub fn query_cache(&self) -> &Arc<QueryCache> {
1274 &self.query_cache
1275 }
1276
1277 pub fn clear_plan_cache(&self) {
1283 self.query_cache.clear();
1284 }
1285
1286 pub fn close(&self) -> Result<()> {
1300 let mut is_open = self.is_open.write();
1301 if !*is_open {
1302 return Ok(());
1303 }
1304
1305 if self.read_only {
1307 #[cfg(feature = "grafeo-file")]
1308 if let Some(ref fm) = self.file_manager {
1309 fm.close()?;
1310 }
1311 *is_open = false;
1312 return Ok(());
1313 }
1314
1315 #[cfg(feature = "grafeo-file")]
1319 let is_single_file = self.file_manager.is_some();
1320 #[cfg(not(feature = "grafeo-file"))]
1321 let is_single_file = false;
1322
1323 #[cfg(feature = "grafeo-file")]
1324 if let Some(ref fm) = self.file_manager {
1325 #[cfg(feature = "wal")]
1327 if let Some(ref wal) = self.wal {
1328 wal.sync()?;
1329 }
1330 self.checkpoint_to_file(fm)?;
1331
1332 #[cfg(feature = "wal")]
1335 if let Some(ref wal) = self.wal {
1336 wal.close_active_log();
1337 }
1338
1339 {
1340 use grafeo_core::testing::crash::maybe_crash;
1341 maybe_crash("close:before_remove_sidecar_wal");
1342 }
1343 fm.remove_sidecar_wal()?;
1344 fm.close()?;
1345 }
1346
1347 #[cfg(feature = "wal")]
1353 if !is_single_file && let Some(ref wal) = self.wal {
1354 let commit_tx = self
1356 .transaction_manager
1357 .last_assigned_transaction_id()
1358 .unwrap_or_else(|| self.transaction_manager.begin());
1359
1360 wal.log(&WalRecord::TransactionCommit {
1362 transaction_id: commit_tx,
1363 })?;
1364
1365 wal.sync()?;
1366 }
1367
1368 *is_open = false;
1369 Ok(())
1370 }
1371
1372 #[cfg(feature = "wal")]
1374 #[must_use]
1375 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1376 self.wal.as_ref()
1377 }
1378
1379 #[cfg(feature = "wal")]
1381 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1382 if let Some(ref wal) = self.wal {
1383 wal.log(record)?;
1384 }
1385 Ok(())
1386 }
1387
1388 #[cfg(feature = "grafeo-file")]
1394 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1395 use grafeo_core::testing::crash::maybe_crash;
1396
1397 maybe_crash("checkpoint_to_file:before_export");
1398 let snapshot_data = self.export_snapshot()?;
1399 maybe_crash("checkpoint_to_file:after_export");
1400
1401 let epoch = self.lpg_store().current_epoch();
1402 let transaction_id = self
1403 .transaction_manager
1404 .last_assigned_transaction_id()
1405 .map_or(0, |t| t.0);
1406 let node_count = self.lpg_store().node_count() as u64;
1407 let edge_count = self.lpg_store().edge_count() as u64;
1408
1409 fm.write_snapshot(
1410 &snapshot_data,
1411 epoch.0,
1412 transaction_id,
1413 node_count,
1414 edge_count,
1415 )?;
1416
1417 maybe_crash("checkpoint_to_file:after_write_snapshot");
1418 Ok(())
1419 }
1420
1421 #[cfg(feature = "grafeo-file")]
1423 #[must_use]
1424 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1425 self.file_manager.as_ref()
1426 }
1427}
1428
1429impl Drop for GrafeoDB {
1430 fn drop(&mut self) {
1431 if let Err(e) = self.close() {
1432 grafeo_error!("Error closing database: {}", e);
1433 }
1434 }
1435}
1436
1437impl crate::admin::AdminService for GrafeoDB {
1438 fn info(&self) -> crate::admin::DatabaseInfo {
1439 self.info()
1440 }
1441
1442 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1443 self.detailed_stats()
1444 }
1445
1446 fn schema(&self) -> crate::admin::SchemaInfo {
1447 self.schema()
1448 }
1449
1450 fn validate(&self) -> crate::admin::ValidationResult {
1451 self.validate()
1452 }
1453
1454 fn wal_status(&self) -> crate::admin::WalStatus {
1455 self.wal_status()
1456 }
1457
1458 fn wal_checkpoint(&self) -> Result<()> {
1459 self.wal_checkpoint()
1460 }
1461}
1462
1463#[derive(Debug)]
1493pub struct QueryResult {
1494 pub columns: Vec<String>,
1496 pub column_types: Vec<grafeo_common::types::LogicalType>,
1498 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1500 pub execution_time_ms: Option<f64>,
1502 pub rows_scanned: Option<u64>,
1504 pub status_message: Option<String>,
1506 pub gql_status: grafeo_common::utils::GqlStatus,
1508}
1509
1510impl QueryResult {
1511 #[must_use]
1513 pub fn empty() -> Self {
1514 Self {
1515 columns: Vec::new(),
1516 column_types: Vec::new(),
1517 rows: Vec::new(),
1518 execution_time_ms: None,
1519 rows_scanned: None,
1520 status_message: None,
1521 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1522 }
1523 }
1524
1525 #[must_use]
1527 pub fn status(msg: impl Into<String>) -> Self {
1528 Self {
1529 columns: Vec::new(),
1530 column_types: Vec::new(),
1531 rows: Vec::new(),
1532 execution_time_ms: None,
1533 rows_scanned: None,
1534 status_message: Some(msg.into()),
1535 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1536 }
1537 }
1538
1539 #[must_use]
1541 pub fn new(columns: Vec<String>) -> Self {
1542 let len = columns.len();
1543 Self {
1544 columns,
1545 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1546 rows: Vec::new(),
1547 execution_time_ms: None,
1548 rows_scanned: None,
1549 status_message: None,
1550 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1551 }
1552 }
1553
1554 #[must_use]
1556 pub fn with_types(
1557 columns: Vec<String>,
1558 column_types: Vec<grafeo_common::types::LogicalType>,
1559 ) -> Self {
1560 Self {
1561 columns,
1562 column_types,
1563 rows: Vec::new(),
1564 execution_time_ms: None,
1565 rows_scanned: None,
1566 status_message: None,
1567 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1568 }
1569 }
1570
1571 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1573 self.execution_time_ms = Some(execution_time_ms);
1574 self.rows_scanned = Some(rows_scanned);
1575 self
1576 }
1577
1578 #[must_use]
1580 pub fn execution_time_ms(&self) -> Option<f64> {
1581 self.execution_time_ms
1582 }
1583
1584 #[must_use]
1586 pub fn rows_scanned(&self) -> Option<u64> {
1587 self.rows_scanned
1588 }
1589
1590 #[must_use]
1592 pub fn row_count(&self) -> usize {
1593 self.rows.len()
1594 }
1595
1596 #[must_use]
1598 pub fn column_count(&self) -> usize {
1599 self.columns.len()
1600 }
1601
1602 #[must_use]
1604 pub fn is_empty(&self) -> bool {
1605 self.rows.is_empty()
1606 }
1607
1608 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1617 if self.rows.len() != 1 || self.columns.len() != 1 {
1618 return Err(grafeo_common::utils::error::Error::InvalidValue(
1619 "Expected single value".to_string(),
1620 ));
1621 }
1622 T::from_value(&self.rows[0][0])
1623 }
1624
1625 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1627 self.rows.iter()
1628 }
1629}
1630
1631impl std::fmt::Display for QueryResult {
1632 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1633 let table = grafeo_common::fmt::format_result_table(
1634 &self.columns,
1635 &self.rows,
1636 self.execution_time_ms,
1637 self.status_message.as_deref(),
1638 );
1639 f.write_str(&table)
1640 }
1641}
1642
1643pub trait FromValue: Sized {
1648 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1650}
1651
1652impl FromValue for i64 {
1653 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1654 value
1655 .as_int64()
1656 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1657 expected: "INT64".to_string(),
1658 found: value.type_name().to_string(),
1659 })
1660 }
1661}
1662
1663impl FromValue for f64 {
1664 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1665 value
1666 .as_float64()
1667 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1668 expected: "FLOAT64".to_string(),
1669 found: value.type_name().to_string(),
1670 })
1671 }
1672}
1673
1674impl FromValue for String {
1675 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1676 value.as_str().map(String::from).ok_or_else(|| {
1677 grafeo_common::utils::error::Error::TypeMismatch {
1678 expected: "STRING".to_string(),
1679 found: value.type_name().to_string(),
1680 }
1681 })
1682 }
1683}
1684
1685impl FromValue for bool {
1686 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1687 value
1688 .as_bool()
1689 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1690 expected: "BOOL".to_string(),
1691 found: value.type_name().to_string(),
1692 })
1693 }
1694}
1695
1696#[cfg(test)]
1697mod tests {
1698 use super::*;
1699
1700 #[test]
1701 fn test_create_in_memory_database() {
1702 let db = GrafeoDB::new_in_memory();
1703 assert_eq!(db.node_count(), 0);
1704 assert_eq!(db.edge_count(), 0);
1705 }
1706
1707 #[test]
1708 fn test_database_config() {
1709 let config = Config::in_memory().with_threads(4).with_query_logging();
1710
1711 let db = GrafeoDB::with_config(config).unwrap();
1712 assert_eq!(db.config().threads, 4);
1713 assert!(db.config().query_logging);
1714 }
1715
1716 #[test]
1717 fn test_database_session() {
1718 let db = GrafeoDB::new_in_memory();
1719 let _session = db.session();
1720 }
1722
1723 #[cfg(feature = "wal")]
1724 #[test]
1725 fn test_persistent_database_recovery() {
1726 use grafeo_common::types::Value;
1727 use tempfile::tempdir;
1728
1729 let dir = tempdir().unwrap();
1730 let db_path = dir.path().join("test_db");
1731
1732 {
1734 let db = GrafeoDB::open(&db_path).unwrap();
1735
1736 let alix = db.create_node(&["Person"]);
1737 db.set_node_property(alix, "name", Value::from("Alix"));
1738
1739 let gus = db.create_node(&["Person"]);
1740 db.set_node_property(gus, "name", Value::from("Gus"));
1741
1742 let _edge = db.create_edge(alix, gus, "KNOWS");
1743
1744 db.close().unwrap();
1746 }
1747
1748 {
1750 let db = GrafeoDB::open(&db_path).unwrap();
1751
1752 assert_eq!(db.node_count(), 2);
1753 assert_eq!(db.edge_count(), 1);
1754
1755 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1757 assert!(node0.is_some());
1758
1759 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1760 assert!(node1.is_some());
1761 }
1762 }
1763
1764 #[cfg(feature = "wal")]
1765 #[test]
1766 fn test_wal_logging() {
1767 use tempfile::tempdir;
1768
1769 let dir = tempdir().unwrap();
1770 let db_path = dir.path().join("wal_test_db");
1771
1772 let db = GrafeoDB::open(&db_path).unwrap();
1773
1774 let node = db.create_node(&["Test"]);
1776 db.delete_node(node);
1777
1778 if let Some(wal) = db.wal() {
1780 assert!(wal.record_count() > 0);
1781 }
1782
1783 db.close().unwrap();
1784 }
1785
1786 #[cfg(feature = "wal")]
1787 #[test]
1788 fn test_wal_recovery_multiple_sessions() {
1789 use grafeo_common::types::Value;
1791 use tempfile::tempdir;
1792
1793 let dir = tempdir().unwrap();
1794 let db_path = dir.path().join("multi_session_db");
1795
1796 {
1798 let db = GrafeoDB::open(&db_path).unwrap();
1799 let alix = db.create_node(&["Person"]);
1800 db.set_node_property(alix, "name", Value::from("Alix"));
1801 db.close().unwrap();
1802 }
1803
1804 {
1806 let db = GrafeoDB::open(&db_path).unwrap();
1807 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1809 db.set_node_property(gus, "name", Value::from("Gus"));
1810 db.close().unwrap();
1811 }
1812
1813 {
1815 let db = GrafeoDB::open(&db_path).unwrap();
1816 assert_eq!(db.node_count(), 2);
1817
1818 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1820 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1821
1822 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1823 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1824 }
1825 }
1826
1827 #[cfg(feature = "wal")]
1828 #[test]
1829 fn test_database_consistency_after_mutations() {
1830 use grafeo_common::types::Value;
1832 use tempfile::tempdir;
1833
1834 let dir = tempdir().unwrap();
1835 let db_path = dir.path().join("consistency_db");
1836
1837 {
1838 let db = GrafeoDB::open(&db_path).unwrap();
1839
1840 let a = db.create_node(&["Node"]);
1842 let b = db.create_node(&["Node"]);
1843 let c = db.create_node(&["Node"]);
1844
1845 let e1 = db.create_edge(a, b, "LINKS");
1847 let _e2 = db.create_edge(b, c, "LINKS");
1848
1849 db.delete_edge(e1);
1851 db.delete_node(b);
1852
1853 db.set_node_property(a, "value", Value::Int64(1));
1855 db.set_node_property(c, "value", Value::Int64(3));
1856
1857 db.close().unwrap();
1858 }
1859
1860 {
1862 let db = GrafeoDB::open(&db_path).unwrap();
1863
1864 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1868 assert!(node_a.is_some());
1869
1870 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1871 assert!(node_c.is_some());
1872
1873 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1875 assert!(node_b.is_none());
1876 }
1877 }
1878
1879 #[cfg(feature = "wal")]
1880 #[test]
1881 fn test_close_is_idempotent() {
1882 use tempfile::tempdir;
1884
1885 let dir = tempdir().unwrap();
1886 let db_path = dir.path().join("close_test_db");
1887
1888 let db = GrafeoDB::open(&db_path).unwrap();
1889 db.create_node(&["Test"]);
1890
1891 assert!(db.close().is_ok());
1893
1894 assert!(db.close().is_ok());
1896 }
1897
1898 #[test]
1899 fn test_with_store_external_backend() {
1900 use grafeo_core::graph::lpg::LpgStore;
1901
1902 let external = Arc::new(LpgStore::new().unwrap());
1903
1904 let n1 = external.create_node(&["Person"]);
1906 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1907
1908 let db = GrafeoDB::with_store(
1909 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1910 Config::in_memory(),
1911 )
1912 .unwrap();
1913
1914 let session = db.session();
1915
1916 #[cfg(feature = "gql")]
1918 {
1919 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1920 assert_eq!(result.rows.len(), 1);
1921 }
1922 }
1923
1924 #[test]
1925 fn test_with_config_custom_memory_limit() {
1926 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
1929 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1930 assert_eq!(db.node_count(), 0);
1931 }
1932
1933 #[cfg(feature = "metrics")]
1934 #[test]
1935 fn test_database_metrics_registry() {
1936 let db = GrafeoDB::new_in_memory();
1937
1938 db.create_node(&["Person"]);
1940 db.create_node(&["Person"]);
1941
1942 let snap = db.metrics();
1944 assert_eq!(snap.query_count, 0); }
1947
1948 #[test]
1949 fn test_query_result_has_metrics() {
1950 let db = GrafeoDB::new_in_memory();
1952 db.create_node(&["Person"]);
1953 db.create_node(&["Person"]);
1954
1955 #[cfg(feature = "gql")]
1956 {
1957 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1958
1959 assert!(result.execution_time_ms.is_some());
1961 assert!(result.rows_scanned.is_some());
1962 assert!(result.execution_time_ms.unwrap() >= 0.0);
1963 assert_eq!(result.rows_scanned.unwrap(), 2);
1964 }
1965 }
1966
1967 #[test]
1968 fn test_empty_query_result_metrics() {
1969 let db = GrafeoDB::new_in_memory();
1971 db.create_node(&["Person"]);
1972
1973 #[cfg(feature = "gql")]
1974 {
1975 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1977
1978 assert!(result.execution_time_ms.is_some());
1979 assert!(result.rows_scanned.is_some());
1980 assert_eq!(result.rows_scanned.unwrap(), 0);
1981 }
1982 }
1983
1984 #[cfg(feature = "cdc")]
1985 mod cdc_integration {
1986 use super::*;
1987
1988 #[test]
1989 fn test_node_lifecycle_history() {
1990 let db = GrafeoDB::new_in_memory();
1991
1992 let id = db.create_node(&["Person"]);
1994 db.set_node_property(id, "name", "Alix".into());
1996 db.set_node_property(id, "name", "Gus".into());
1997 db.delete_node(id);
1999
2000 let history = db.history(id).unwrap();
2001 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2003 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2004 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2006 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2008 }
2009
2010 #[test]
2011 fn test_edge_lifecycle_history() {
2012 let db = GrafeoDB::new_in_memory();
2013
2014 let alix = db.create_node(&["Person"]);
2015 let gus = db.create_node(&["Person"]);
2016 let edge = db.create_edge(alix, gus, "KNOWS");
2017 db.set_edge_property(edge, "since", 2024i64.into());
2018 db.delete_edge(edge);
2019
2020 let history = db.history(edge).unwrap();
2021 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2023 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2024 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2025 }
2026
2027 #[test]
2028 fn test_create_node_with_props_cdc() {
2029 let db = GrafeoDB::new_in_memory();
2030
2031 let id = db.create_node_with_props(
2032 &["Person"],
2033 vec![
2034 ("name", grafeo_common::types::Value::from("Alix")),
2035 ("age", grafeo_common::types::Value::from(30i64)),
2036 ],
2037 );
2038
2039 let history = db.history(id).unwrap();
2040 assert_eq!(history.len(), 1);
2041 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2042 let after = history[0].after.as_ref().unwrap();
2044 assert_eq!(after.len(), 2);
2045 }
2046
2047 #[test]
2048 fn test_changes_between() {
2049 let db = GrafeoDB::new_in_memory();
2050
2051 let id1 = db.create_node(&["A"]);
2052 let _id2 = db.create_node(&["B"]);
2053 db.set_node_property(id1, "x", 1i64.into());
2054
2055 let changes = db
2057 .changes_between(
2058 grafeo_common::types::EpochId(0),
2059 grafeo_common::types::EpochId(u64::MAX),
2060 )
2061 .unwrap();
2062 assert_eq!(changes.len(), 3); }
2064 }
2065
2066 #[test]
2067 fn test_with_store_basic() {
2068 use grafeo_core::graph::lpg::LpgStore;
2069
2070 let store = Arc::new(LpgStore::new().unwrap());
2071 let n1 = store.create_node(&["Person"]);
2072 store.set_node_property(n1, "name", "Alix".into());
2073
2074 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2075 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2076
2077 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
2078 assert_eq!(result.rows.len(), 1);
2079 }
2080
2081 #[test]
2082 fn test_with_store_session() {
2083 use grafeo_core::graph::lpg::LpgStore;
2084
2085 let store = Arc::new(LpgStore::new().unwrap());
2086 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2087 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2088
2089 let session = db.session();
2090 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
2091 assert_eq!(result.rows.len(), 1);
2092 }
2093
2094 #[test]
2095 fn test_with_store_mutations() {
2096 use grafeo_core::graph::lpg::LpgStore;
2097
2098 let store = Arc::new(LpgStore::new().unwrap());
2099 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2100 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2101
2102 let mut session = db.session();
2103
2104 session.begin_transaction().unwrap();
2108 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2109
2110 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2111 assert_eq!(result.rows.len(), 1);
2112
2113 session.commit().unwrap();
2114 }
2115}