1mod admin;
15#[cfg(feature = "async-storage")]
16mod async_ops;
17#[cfg(feature = "async-storage")]
18pub(crate) mod async_wal_store;
19mod crud;
20#[cfg(feature = "embed")]
21mod embed;
22mod index;
23mod persistence;
24mod query;
25#[cfg(feature = "rdf")]
26mod rdf_ops;
27mod search;
28#[cfg(feature = "wal")]
29pub(crate) mod wal_store;
30
31use grafeo_common::grafeo_error;
32#[cfg(feature = "wal")]
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36
37use parking_lot::RwLock;
38
39#[cfg(feature = "grafeo-file")]
40use grafeo_adapters::storage::file::GrafeoFileManager;
41#[cfg(feature = "wal")]
42use grafeo_adapters::storage::wal::{
43 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
44};
45use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
46use grafeo_common::utils::error::Result;
47use grafeo_core::graph::GraphStoreMut;
48use grafeo_core::graph::lpg::LpgStore;
49#[cfg(feature = "rdf")]
50use grafeo_core::graph::rdf::RdfStore;
51
52use crate::catalog::Catalog;
53use crate::config::Config;
54use crate::query::cache::QueryCache;
55use crate::session::Session;
56use crate::transaction::TransactionManager;
57
58pub struct GrafeoDB {
81 pub(super) config: Config,
83 pub(super) store: Arc<LpgStore>,
85 pub(super) catalog: Arc<Catalog>,
87 #[cfg(feature = "rdf")]
89 pub(super) rdf_store: Arc<RdfStore>,
90 pub(super) transaction_manager: Arc<TransactionManager>,
92 pub(super) buffer_manager: Arc<BufferManager>,
94 #[cfg(feature = "wal")]
96 pub(super) wal: Option<Arc<LpgWal>>,
97 #[cfg(feature = "wal")]
101 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
102 pub(super) query_cache: Arc<QueryCache>,
104 pub(super) commit_counter: Arc<AtomicUsize>,
106 pub(super) is_open: RwLock<bool>,
108 #[cfg(feature = "cdc")]
110 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
111 #[cfg(feature = "embed")]
113 pub(super) embedding_models:
114 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
115 #[cfg(feature = "grafeo-file")]
117 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
118 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
121 #[cfg(feature = "metrics")]
123 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
124 current_graph: RwLock<Option<String>>,
128 current_schema: RwLock<Option<String>>,
132 read_only: bool,
135}
136
137impl GrafeoDB {
138 #[must_use]
159 pub fn new_in_memory() -> Self {
160 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
161 }
162
163 #[cfg(feature = "wal")]
182 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
183 Self::with_config(Config::persistent(path.as_ref()))
184 }
185
186 #[cfg(feature = "grafeo-file")]
211 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
212 Self::with_config(Config::read_only(path.as_ref()))
213 }
214
215 pub fn with_config(config: Config) -> Result<Self> {
239 config
241 .validate()
242 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
243
244 let store = Arc::new(LpgStore::new()?);
245 #[cfg(feature = "rdf")]
246 let rdf_store = Arc::new(RdfStore::new());
247 let transaction_manager = Arc::new(TransactionManager::new());
248
249 let buffer_config = BufferManagerConfig {
251 budget: config.memory_limit.unwrap_or_else(|| {
252 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
253 }),
254 spill_path: config
255 .spill_path
256 .clone()
257 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
258 ..BufferManagerConfig::default()
259 };
260 let buffer_manager = BufferManager::new(buffer_config);
261
262 let catalog = Arc::new(Catalog::new());
264
265 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
266
267 #[cfg(feature = "grafeo-file")]
269 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
270 if let Some(ref db_path) = config.path {
272 if db_path.exists() && db_path.is_file() {
273 let fm = GrafeoFileManager::open_read_only(db_path)?;
274 let snapshot_data = fm.read_snapshot()?;
275 if !snapshot_data.is_empty() {
276 Self::apply_snapshot_data(
277 &store,
278 &catalog,
279 #[cfg(feature = "rdf")]
280 &rdf_store,
281 &snapshot_data,
282 )?;
283 }
284 Some(Arc::new(fm))
285 } else {
286 return Err(grafeo_common::utils::error::Error::Internal(format!(
287 "read-only open requires an existing .grafeo file: {}",
288 db_path.display()
289 )));
290 }
291 } else {
292 return Err(grafeo_common::utils::error::Error::Internal(
293 "read-only mode requires a database path".to_string(),
294 ));
295 }
296 } else if let Some(ref db_path) = config.path {
297 if Self::should_use_single_file(db_path, config.storage_format) {
302 let fm = if db_path.exists() && db_path.is_file() {
303 GrafeoFileManager::open(db_path)?
304 } else if !db_path.exists() {
305 GrafeoFileManager::create(db_path)?
306 } else {
307 return Err(grafeo_common::utils::error::Error::Internal(format!(
309 "path exists but is not a file: {}",
310 db_path.display()
311 )));
312 };
313
314 let snapshot_data = fm.read_snapshot()?;
316 if !snapshot_data.is_empty() {
317 Self::apply_snapshot_data(
318 &store,
319 &catalog,
320 #[cfg(feature = "rdf")]
321 &rdf_store,
322 &snapshot_data,
323 )?;
324 }
325
326 #[cfg(feature = "wal")]
328 if config.wal_enabled && fm.has_sidecar_wal() {
329 let recovery = WalRecovery::new(fm.sidecar_wal_path());
330 let records = recovery.recover()?;
331 Self::apply_wal_records(
332 &store,
333 &catalog,
334 #[cfg(feature = "rdf")]
335 &rdf_store,
336 &records,
337 )?;
338 }
339
340 Some(Arc::new(fm))
341 } else {
342 None
343 }
344 } else {
345 None
346 };
347
348 #[cfg(feature = "wal")]
351 let wal = if is_read_only {
352 None
353 } else if config.wal_enabled {
354 if let Some(ref db_path) = config.path {
355 #[cfg(feature = "grafeo-file")]
357 let wal_path = if let Some(ref fm) = file_manager {
358 let p = fm.sidecar_wal_path();
359 std::fs::create_dir_all(&p)?;
360 p
361 } else {
362 std::fs::create_dir_all(db_path)?;
364 db_path.join("wal")
365 };
366
367 #[cfg(not(feature = "grafeo-file"))]
368 let wal_path = {
369 std::fs::create_dir_all(db_path)?;
370 db_path.join("wal")
371 };
372
373 #[cfg(feature = "grafeo-file")]
375 let is_single_file = file_manager.is_some();
376 #[cfg(not(feature = "grafeo-file"))]
377 let is_single_file = false;
378
379 if !is_single_file && wal_path.exists() {
380 let recovery = WalRecovery::new(&wal_path);
381 let records = recovery.recover()?;
382 Self::apply_wal_records(
383 &store,
384 &catalog,
385 #[cfg(feature = "rdf")]
386 &rdf_store,
387 &records,
388 )?;
389 }
390
391 let wal_durability = match config.wal_durability {
393 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
394 crate::config::DurabilityMode::Batch {
395 max_delay_ms,
396 max_records,
397 } => WalDurabilityMode::Batch {
398 max_delay_ms,
399 max_records,
400 },
401 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
402 WalDurabilityMode::Adaptive { target_interval_ms }
403 }
404 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
405 };
406 let wal_config = WalConfig {
407 durability: wal_durability,
408 ..WalConfig::default()
409 };
410 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
411 Some(Arc::new(wal_manager))
412 } else {
413 None
414 }
415 } else {
416 None
417 };
418
419 let query_cache = Arc::new(QueryCache::default());
421
422 #[cfg(feature = "temporal")]
425 transaction_manager.sync_epoch(store.current_epoch());
426
427 Ok(Self {
428 config,
429 store,
430 catalog,
431 #[cfg(feature = "rdf")]
432 rdf_store,
433 transaction_manager,
434 buffer_manager,
435 #[cfg(feature = "wal")]
436 wal,
437 #[cfg(feature = "wal")]
438 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
439 query_cache,
440 commit_counter: Arc::new(AtomicUsize::new(0)),
441 is_open: RwLock::new(true),
442 #[cfg(feature = "cdc")]
443 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
444 #[cfg(feature = "embed")]
445 embedding_models: RwLock::new(hashbrown::HashMap::new()),
446 #[cfg(feature = "grafeo-file")]
447 file_manager,
448 external_store: None,
449 #[cfg(feature = "metrics")]
450 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
451 current_graph: RwLock::new(None),
452 current_schema: RwLock::new(None),
453 read_only: is_read_only,
454 })
455 }
456
457 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
482 config
483 .validate()
484 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
485
486 let dummy_store = Arc::new(LpgStore::new()?);
487 let transaction_manager = Arc::new(TransactionManager::new());
488
489 let buffer_config = BufferManagerConfig {
490 budget: config.memory_limit.unwrap_or_else(|| {
491 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
492 }),
493 spill_path: None,
494 ..BufferManagerConfig::default()
495 };
496 let buffer_manager = BufferManager::new(buffer_config);
497
498 let query_cache = Arc::new(QueryCache::default());
499
500 Ok(Self {
501 config,
502 store: dummy_store,
503 catalog: Arc::new(Catalog::new()),
504 #[cfg(feature = "rdf")]
505 rdf_store: Arc::new(RdfStore::new()),
506 transaction_manager,
507 buffer_manager,
508 #[cfg(feature = "wal")]
509 wal: None,
510 #[cfg(feature = "wal")]
511 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
512 query_cache,
513 commit_counter: Arc::new(AtomicUsize::new(0)),
514 is_open: RwLock::new(true),
515 #[cfg(feature = "cdc")]
516 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
517 #[cfg(feature = "embed")]
518 embedding_models: RwLock::new(hashbrown::HashMap::new()),
519 #[cfg(feature = "grafeo-file")]
520 file_manager: None,
521 external_store: Some(store),
522 #[cfg(feature = "metrics")]
523 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
524 current_graph: RwLock::new(None),
525 current_schema: RwLock::new(None),
526 read_only: false,
527 })
528 }
529
530 #[cfg(feature = "wal")]
536 fn apply_wal_records(
537 store: &Arc<LpgStore>,
538 catalog: &Catalog,
539 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
540 records: &[WalRecord],
541 ) -> Result<()> {
542 use crate::catalog::{
543 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
544 };
545 use grafeo_common::utils::error::Error;
546
547 let mut current_graph: Option<String> = None;
550 let mut target_store: Arc<LpgStore> = Arc::clone(store);
551
552 for record in records {
553 match record {
554 WalRecord::CreateNamedGraph { name } => {
556 let _ = store.create_graph(name);
557 }
558 WalRecord::DropNamedGraph { name } => {
559 store.drop_graph(name);
560 if current_graph.as_deref() == Some(name.as_str()) {
562 current_graph = None;
563 target_store = Arc::clone(store);
564 }
565 }
566 WalRecord::SwitchGraph { name } => {
567 current_graph.clone_from(name);
568 target_store = match ¤t_graph {
569 None => Arc::clone(store),
570 Some(graph_name) => store
571 .graph_or_create(graph_name)
572 .map_err(|e| Error::Internal(e.to_string()))?,
573 };
574 }
575
576 WalRecord::CreateNode { id, labels } => {
578 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
579 target_store.create_node_with_id(*id, &label_refs)?;
580 }
581 WalRecord::DeleteNode { id } => {
582 target_store.delete_node(*id);
583 }
584 WalRecord::CreateEdge {
585 id,
586 src,
587 dst,
588 edge_type,
589 } => {
590 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
591 }
592 WalRecord::DeleteEdge { id } => {
593 target_store.delete_edge(*id);
594 }
595 WalRecord::SetNodeProperty { id, key, value } => {
596 target_store.set_node_property(*id, key, value.clone());
597 }
598 WalRecord::SetEdgeProperty { id, key, value } => {
599 target_store.set_edge_property(*id, key, value.clone());
600 }
601 WalRecord::AddNodeLabel { id, label } => {
602 target_store.add_label(*id, label);
603 }
604 WalRecord::RemoveNodeLabel { id, label } => {
605 target_store.remove_label(*id, label);
606 }
607 WalRecord::RemoveNodeProperty { id, key } => {
608 target_store.remove_node_property(*id, key);
609 }
610 WalRecord::RemoveEdgeProperty { id, key } => {
611 target_store.remove_edge_property(*id, key);
612 }
613
614 WalRecord::CreateNodeType {
616 name,
617 properties,
618 constraints,
619 } => {
620 let def = NodeTypeDefinition {
621 name: name.clone(),
622 properties: properties
623 .iter()
624 .map(|(n, t, nullable)| TypedProperty {
625 name: n.clone(),
626 data_type: PropertyDataType::from_type_name(t),
627 nullable: *nullable,
628 default_value: None,
629 })
630 .collect(),
631 constraints: constraints
632 .iter()
633 .map(|(kind, props)| match kind.as_str() {
634 "unique" => TypeConstraint::Unique(props.clone()),
635 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
636 "not_null" if !props.is_empty() => {
637 TypeConstraint::NotNull(props[0].clone())
638 }
639 _ => TypeConstraint::Unique(props.clone()),
640 })
641 .collect(),
642 parent_types: Vec::new(),
643 };
644 let _ = catalog.register_node_type(def);
645 }
646 WalRecord::DropNodeType { name } => {
647 let _ = catalog.drop_node_type(name);
648 }
649 WalRecord::CreateEdgeType {
650 name,
651 properties,
652 constraints,
653 } => {
654 let def = EdgeTypeDefinition {
655 name: name.clone(),
656 properties: properties
657 .iter()
658 .map(|(n, t, nullable)| TypedProperty {
659 name: n.clone(),
660 data_type: PropertyDataType::from_type_name(t),
661 nullable: *nullable,
662 default_value: None,
663 })
664 .collect(),
665 constraints: constraints
666 .iter()
667 .map(|(kind, props)| match kind.as_str() {
668 "unique" => TypeConstraint::Unique(props.clone()),
669 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
670 "not_null" if !props.is_empty() => {
671 TypeConstraint::NotNull(props[0].clone())
672 }
673 _ => TypeConstraint::Unique(props.clone()),
674 })
675 .collect(),
676 source_node_types: Vec::new(),
677 target_node_types: Vec::new(),
678 };
679 let _ = catalog.register_edge_type_def(def);
680 }
681 WalRecord::DropEdgeType { name } => {
682 let _ = catalog.drop_edge_type_def(name);
683 }
684 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
685 }
688 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
689 }
692 WalRecord::CreateGraphType {
693 name,
694 node_types,
695 edge_types,
696 open,
697 } => {
698 use crate::catalog::GraphTypeDefinition;
699 let def = GraphTypeDefinition {
700 name: name.clone(),
701 allowed_node_types: node_types.clone(),
702 allowed_edge_types: edge_types.clone(),
703 open: *open,
704 };
705 let _ = catalog.register_graph_type(def);
706 }
707 WalRecord::DropGraphType { name } => {
708 let _ = catalog.drop_graph_type(name);
709 }
710 WalRecord::CreateSchema { name } => {
711 let _ = catalog.register_schema_namespace(name.clone());
712 }
713 WalRecord::DropSchema { name } => {
714 let _ = catalog.drop_schema_namespace(name);
715 }
716
717 WalRecord::AlterNodeType { name, alterations } => {
718 for (action, prop_name, type_name, nullable) in alterations {
719 match action.as_str() {
720 "add" => {
721 let prop = TypedProperty {
722 name: prop_name.clone(),
723 data_type: PropertyDataType::from_type_name(type_name),
724 nullable: *nullable,
725 default_value: None,
726 };
727 let _ = catalog.alter_node_type_add_property(name, prop);
728 }
729 "drop" => {
730 let _ = catalog.alter_node_type_drop_property(name, prop_name);
731 }
732 _ => {}
733 }
734 }
735 }
736 WalRecord::AlterEdgeType { name, alterations } => {
737 for (action, prop_name, type_name, nullable) in alterations {
738 match action.as_str() {
739 "add" => {
740 let prop = TypedProperty {
741 name: prop_name.clone(),
742 data_type: PropertyDataType::from_type_name(type_name),
743 nullable: *nullable,
744 default_value: None,
745 };
746 let _ = catalog.alter_edge_type_add_property(name, prop);
747 }
748 "drop" => {
749 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
750 }
751 _ => {}
752 }
753 }
754 }
755 WalRecord::AlterGraphType { name, alterations } => {
756 for (action, type_name) in alterations {
757 match action.as_str() {
758 "add_node" => {
759 let _ =
760 catalog.alter_graph_type_add_node_type(name, type_name.clone());
761 }
762 "drop_node" => {
763 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
764 }
765 "add_edge" => {
766 let _ =
767 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
768 }
769 "drop_edge" => {
770 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
771 }
772 _ => {}
773 }
774 }
775 }
776
777 WalRecord::CreateProcedure {
778 name,
779 params,
780 returns,
781 body,
782 } => {
783 use crate::catalog::ProcedureDefinition;
784 let def = ProcedureDefinition {
785 name: name.clone(),
786 params: params.clone(),
787 returns: returns.clone(),
788 body: body.clone(),
789 };
790 let _ = catalog.register_procedure(def);
791 }
792 WalRecord::DropProcedure { name } => {
793 let _ = catalog.drop_procedure(name);
794 }
795
796 #[cfg(feature = "rdf")]
798 WalRecord::InsertRdfTriple { .. }
799 | WalRecord::DeleteRdfTriple { .. }
800 | WalRecord::ClearRdfGraph { .. }
801 | WalRecord::CreateRdfGraph { .. }
802 | WalRecord::DropRdfGraph { .. } => {
803 rdf_ops::replay_rdf_wal_record(rdf_store, record);
804 }
805 #[cfg(not(feature = "rdf"))]
806 WalRecord::InsertRdfTriple { .. }
807 | WalRecord::DeleteRdfTriple { .. }
808 | WalRecord::ClearRdfGraph { .. }
809 | WalRecord::CreateRdfGraph { .. }
810 | WalRecord::DropRdfGraph { .. } => {}
811
812 WalRecord::TransactionCommit { .. } => {
813 #[cfg(feature = "temporal")]
817 {
818 target_store.new_epoch();
819 }
820 }
821 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
822 }
825 }
826 }
827 Ok(())
828 }
829
830 #[cfg(feature = "grafeo-file")]
836 fn should_use_single_file(
837 path: &std::path::Path,
838 configured: crate::config::StorageFormat,
839 ) -> bool {
840 use crate::config::StorageFormat;
841 match configured {
842 StorageFormat::SingleFile => true,
843 StorageFormat::WalDirectory => false,
844 StorageFormat::Auto => {
845 if path.is_file() {
847 if let Ok(mut f) = std::fs::File::open(path) {
848 use std::io::Read;
849 let mut magic = [0u8; 4];
850 if f.read_exact(&mut magic).is_ok()
851 && magic == grafeo_adapters::storage::file::MAGIC
852 {
853 return true;
854 }
855 }
856 return false;
857 }
858 if path.is_dir() {
860 return false;
861 }
862 path.extension().is_some_and(|ext| ext == "grafeo")
864 }
865 }
866 }
867
868 #[cfg(feature = "grafeo-file")]
870 fn apply_snapshot_data(
871 store: &Arc<LpgStore>,
872 catalog: &Arc<crate::catalog::Catalog>,
873 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
874 data: &[u8],
875 ) -> Result<()> {
876 persistence::load_snapshot_into_store(
877 store,
878 catalog,
879 #[cfg(feature = "rdf")]
880 rdf_store,
881 data,
882 )
883 }
884
885 #[must_use]
913 pub fn session(&self) -> Session {
914 let session_cfg = || crate::session::SessionConfig {
915 transaction_manager: Arc::clone(&self.transaction_manager),
916 query_cache: Arc::clone(&self.query_cache),
917 catalog: Arc::clone(&self.catalog),
918 adaptive_config: self.config.adaptive.clone(),
919 factorized_execution: self.config.factorized_execution,
920 graph_model: self.config.graph_model,
921 query_timeout: self.config.query_timeout,
922 commit_counter: Arc::clone(&self.commit_counter),
923 gc_interval: self.config.gc_interval,
924 read_only: self.read_only,
925 };
926
927 if let Some(ref ext_store) = self.external_store {
928 return Session::with_external_store(Arc::clone(ext_store), session_cfg())
929 .expect("arena allocation for external store session");
930 }
931
932 #[cfg(feature = "rdf")]
933 let mut session = Session::with_rdf_store_and_adaptive(
934 Arc::clone(&self.store),
935 Arc::clone(&self.rdf_store),
936 session_cfg(),
937 );
938 #[cfg(not(feature = "rdf"))]
939 let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
940
941 #[cfg(feature = "wal")]
942 if let Some(ref wal) = self.wal {
943 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
944 }
945
946 #[cfg(feature = "cdc")]
947 session.set_cdc_log(Arc::clone(&self.cdc_log));
948
949 #[cfg(feature = "metrics")]
950 {
951 if let Some(ref m) = self.metrics {
952 session.set_metrics(Arc::clone(m));
953 m.session_created
954 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
955 m.session_active
956 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
957 }
958 }
959
960 if let Some(ref graph) = *self.current_graph.read() {
962 session.use_graph(graph);
963 }
964
965 if let Some(ref schema) = *self.current_schema.read() {
967 session.set_schema(schema);
968 }
969
970 let _ = &mut session;
972
973 session
974 }
975
976 #[must_use]
982 pub fn current_graph(&self) -> Option<String> {
983 self.current_graph.read().clone()
984 }
985
986 pub fn set_current_graph(&self, name: Option<&str>) {
991 *self.current_graph.write() = name.map(ToString::to_string);
992 }
993
994 #[must_use]
999 pub fn current_schema(&self) -> Option<String> {
1000 self.current_schema.read().clone()
1001 }
1002
1003 pub fn set_current_schema(&self, name: Option<&str>) {
1008 *self.current_schema.write() = name.map(ToString::to_string);
1009 }
1010
1011 #[must_use]
1013 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1014 &self.config.adaptive
1015 }
1016
1017 #[must_use]
1019 pub fn is_read_only(&self) -> bool {
1020 self.read_only
1021 }
1022
1023 #[must_use]
1025 pub fn config(&self) -> &Config {
1026 &self.config
1027 }
1028
1029 #[must_use]
1031 pub fn graph_model(&self) -> crate::config::GraphModel {
1032 self.config.graph_model
1033 }
1034
1035 #[must_use]
1037 pub fn memory_limit(&self) -> Option<usize> {
1038 self.config.memory_limit
1039 }
1040
1041 #[cfg(feature = "metrics")]
1046 #[must_use]
1047 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1048 let mut snapshot = self
1049 .metrics
1050 .as_ref()
1051 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1052
1053 let cache_stats = self.query_cache.stats();
1055 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1056 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1057 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1058 snapshot.cache_invalidations = cache_stats.invalidations;
1059
1060 snapshot
1061 }
1062
1063 #[cfg(feature = "metrics")]
1067 #[must_use]
1068 pub fn metrics_prometheus(&self) -> String {
1069 self.metrics
1070 .as_ref()
1071 .map_or_else(String::new, |m| m.to_prometheus())
1072 }
1073
1074 #[cfg(feature = "metrics")]
1076 pub fn reset_metrics(&self) {
1077 if let Some(ref m) = self.metrics {
1078 m.reset();
1079 }
1080 self.query_cache.reset_stats();
1081 }
1082
1083 #[must_use]
1091 pub fn store(&self) -> &Arc<LpgStore> {
1092 &self.store
1093 }
1094
1095 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
1102 let graph_name = self.current_graph.read().clone();
1103 match graph_name {
1104 None => Arc::clone(&self.store),
1105 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
1106 Some(ref name) => self
1107 .store
1108 .graph(name)
1109 .unwrap_or_else(|| Arc::clone(&self.store)),
1110 }
1111 }
1112
1113 pub fn create_graph(&self, name: &str) -> Result<bool> {
1121 Ok(self.store.create_graph(name)?)
1122 }
1123
1124 pub fn drop_graph(&self, name: &str) -> bool {
1126 self.store.drop_graph(name)
1127 }
1128
1129 #[must_use]
1131 pub fn list_graphs(&self) -> Vec<String> {
1132 self.store.graph_names()
1133 }
1134
1135 #[must_use]
1143 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1144 if let Some(ref ext_store) = self.external_store {
1145 Arc::clone(ext_store)
1146 } else {
1147 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1148 }
1149 }
1150
1151 pub fn gc(&self) {
1157 let min_epoch = self.transaction_manager.min_active_epoch();
1158 self.store.gc_versions(min_epoch);
1159 self.transaction_manager.gc();
1160 }
1161
1162 #[must_use]
1164 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1165 &self.buffer_manager
1166 }
1167
1168 #[must_use]
1170 pub fn query_cache(&self) -> &Arc<QueryCache> {
1171 &self.query_cache
1172 }
1173
1174 pub fn clear_plan_cache(&self) {
1180 self.query_cache.clear();
1181 }
1182
1183 pub fn close(&self) -> Result<()> {
1197 let mut is_open = self.is_open.write();
1198 if !*is_open {
1199 return Ok(());
1200 }
1201
1202 if self.read_only {
1204 #[cfg(feature = "grafeo-file")]
1205 if let Some(ref fm) = self.file_manager {
1206 fm.close()?;
1207 }
1208 *is_open = false;
1209 return Ok(());
1210 }
1211
1212 #[cfg(feature = "grafeo-file")]
1216 let is_single_file = self.file_manager.is_some();
1217 #[cfg(not(feature = "grafeo-file"))]
1218 let is_single_file = false;
1219
1220 #[cfg(feature = "grafeo-file")]
1221 if let Some(ref fm) = self.file_manager {
1222 #[cfg(feature = "wal")]
1224 if let Some(ref wal) = self.wal {
1225 wal.sync()?;
1226 }
1227 self.checkpoint_to_file(fm)?;
1228
1229 #[cfg(feature = "wal")]
1232 if let Some(ref wal) = self.wal {
1233 wal.close_active_log();
1234 }
1235
1236 {
1237 use grafeo_core::testing::crash::maybe_crash;
1238 maybe_crash("close:before_remove_sidecar_wal");
1239 }
1240 fm.remove_sidecar_wal()?;
1241 fm.close()?;
1242 }
1243
1244 #[cfg(feature = "wal")]
1250 if !is_single_file && let Some(ref wal) = self.wal {
1251 let commit_tx = self
1253 .transaction_manager
1254 .last_assigned_transaction_id()
1255 .unwrap_or_else(|| self.transaction_manager.begin());
1256
1257 wal.log(&WalRecord::TransactionCommit {
1259 transaction_id: commit_tx,
1260 })?;
1261
1262 wal.sync()?;
1263 }
1264
1265 *is_open = false;
1266 Ok(())
1267 }
1268
1269 #[cfg(feature = "wal")]
1271 #[must_use]
1272 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1273 self.wal.as_ref()
1274 }
1275
1276 #[cfg(feature = "wal")]
1278 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1279 if let Some(ref wal) = self.wal {
1280 wal.log(record)?;
1281 }
1282 Ok(())
1283 }
1284
1285 #[cfg(feature = "grafeo-file")]
1291 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1292 use grafeo_core::testing::crash::maybe_crash;
1293
1294 maybe_crash("checkpoint_to_file:before_export");
1295 let snapshot_data = self.export_snapshot()?;
1296 maybe_crash("checkpoint_to_file:after_export");
1297
1298 let epoch = self.store.current_epoch();
1299 let transaction_id = self
1300 .transaction_manager
1301 .last_assigned_transaction_id()
1302 .map_or(0, |t| t.0);
1303 let node_count = self.store.node_count() as u64;
1304 let edge_count = self.store.edge_count() as u64;
1305
1306 fm.write_snapshot(
1307 &snapshot_data,
1308 epoch.0,
1309 transaction_id,
1310 node_count,
1311 edge_count,
1312 )?;
1313
1314 maybe_crash("checkpoint_to_file:after_write_snapshot");
1315 Ok(())
1316 }
1317
1318 #[cfg(feature = "grafeo-file")]
1320 #[must_use]
1321 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1322 self.file_manager.as_ref()
1323 }
1324}
1325
1326impl Drop for GrafeoDB {
1327 fn drop(&mut self) {
1328 if let Err(e) = self.close() {
1329 grafeo_error!("Error closing database: {}", e);
1330 }
1331 }
1332}
1333
1334impl crate::admin::AdminService for GrafeoDB {
1335 fn info(&self) -> crate::admin::DatabaseInfo {
1336 self.info()
1337 }
1338
1339 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1340 self.detailed_stats()
1341 }
1342
1343 fn schema(&self) -> crate::admin::SchemaInfo {
1344 self.schema()
1345 }
1346
1347 fn validate(&self) -> crate::admin::ValidationResult {
1348 self.validate()
1349 }
1350
1351 fn wal_status(&self) -> crate::admin::WalStatus {
1352 self.wal_status()
1353 }
1354
1355 fn wal_checkpoint(&self) -> Result<()> {
1356 self.wal_checkpoint()
1357 }
1358}
1359
1360#[derive(Debug)]
1390pub struct QueryResult {
1391 pub columns: Vec<String>,
1393 pub column_types: Vec<grafeo_common::types::LogicalType>,
1395 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1397 pub execution_time_ms: Option<f64>,
1399 pub rows_scanned: Option<u64>,
1401 pub status_message: Option<String>,
1403 pub gql_status: grafeo_common::utils::GqlStatus,
1405}
1406
1407impl QueryResult {
1408 #[must_use]
1410 pub fn empty() -> Self {
1411 Self {
1412 columns: Vec::new(),
1413 column_types: Vec::new(),
1414 rows: Vec::new(),
1415 execution_time_ms: None,
1416 rows_scanned: None,
1417 status_message: None,
1418 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1419 }
1420 }
1421
1422 #[must_use]
1424 pub fn status(msg: impl Into<String>) -> Self {
1425 Self {
1426 columns: Vec::new(),
1427 column_types: Vec::new(),
1428 rows: Vec::new(),
1429 execution_time_ms: None,
1430 rows_scanned: None,
1431 status_message: Some(msg.into()),
1432 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1433 }
1434 }
1435
1436 #[must_use]
1438 pub fn new(columns: Vec<String>) -> Self {
1439 let len = columns.len();
1440 Self {
1441 columns,
1442 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1443 rows: Vec::new(),
1444 execution_time_ms: None,
1445 rows_scanned: None,
1446 status_message: None,
1447 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1448 }
1449 }
1450
1451 #[must_use]
1453 pub fn with_types(
1454 columns: Vec<String>,
1455 column_types: Vec<grafeo_common::types::LogicalType>,
1456 ) -> Self {
1457 Self {
1458 columns,
1459 column_types,
1460 rows: Vec::new(),
1461 execution_time_ms: None,
1462 rows_scanned: None,
1463 status_message: None,
1464 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1465 }
1466 }
1467
1468 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1470 self.execution_time_ms = Some(execution_time_ms);
1471 self.rows_scanned = Some(rows_scanned);
1472 self
1473 }
1474
1475 #[must_use]
1477 pub fn execution_time_ms(&self) -> Option<f64> {
1478 self.execution_time_ms
1479 }
1480
1481 #[must_use]
1483 pub fn rows_scanned(&self) -> Option<u64> {
1484 self.rows_scanned
1485 }
1486
1487 #[must_use]
1489 pub fn row_count(&self) -> usize {
1490 self.rows.len()
1491 }
1492
1493 #[must_use]
1495 pub fn column_count(&self) -> usize {
1496 self.columns.len()
1497 }
1498
1499 #[must_use]
1501 pub fn is_empty(&self) -> bool {
1502 self.rows.is_empty()
1503 }
1504
1505 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1514 if self.rows.len() != 1 || self.columns.len() != 1 {
1515 return Err(grafeo_common::utils::error::Error::InvalidValue(
1516 "Expected single value".to_string(),
1517 ));
1518 }
1519 T::from_value(&self.rows[0][0])
1520 }
1521
1522 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1524 self.rows.iter()
1525 }
1526}
1527
1528impl std::fmt::Display for QueryResult {
1529 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1530 let table = grafeo_common::fmt::format_result_table(
1531 &self.columns,
1532 &self.rows,
1533 self.execution_time_ms,
1534 self.status_message.as_deref(),
1535 );
1536 f.write_str(&table)
1537 }
1538}
1539
1540pub trait FromValue: Sized {
1545 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1547}
1548
1549impl FromValue for i64 {
1550 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1551 value
1552 .as_int64()
1553 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1554 expected: "INT64".to_string(),
1555 found: value.type_name().to_string(),
1556 })
1557 }
1558}
1559
1560impl FromValue for f64 {
1561 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1562 value
1563 .as_float64()
1564 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1565 expected: "FLOAT64".to_string(),
1566 found: value.type_name().to_string(),
1567 })
1568 }
1569}
1570
1571impl FromValue for String {
1572 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1573 value.as_str().map(String::from).ok_or_else(|| {
1574 grafeo_common::utils::error::Error::TypeMismatch {
1575 expected: "STRING".to_string(),
1576 found: value.type_name().to_string(),
1577 }
1578 })
1579 }
1580}
1581
1582impl FromValue for bool {
1583 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1584 value
1585 .as_bool()
1586 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1587 expected: "BOOL".to_string(),
1588 found: value.type_name().to_string(),
1589 })
1590 }
1591}
1592
1593#[cfg(test)]
1594mod tests {
1595 use super::*;
1596
1597 #[test]
1598 fn test_create_in_memory_database() {
1599 let db = GrafeoDB::new_in_memory();
1600 assert_eq!(db.node_count(), 0);
1601 assert_eq!(db.edge_count(), 0);
1602 }
1603
1604 #[test]
1605 fn test_database_config() {
1606 let config = Config::in_memory().with_threads(4).with_query_logging();
1607
1608 let db = GrafeoDB::with_config(config).unwrap();
1609 assert_eq!(db.config().threads, 4);
1610 assert!(db.config().query_logging);
1611 }
1612
1613 #[test]
1614 fn test_database_session() {
1615 let db = GrafeoDB::new_in_memory();
1616 let _session = db.session();
1617 }
1619
1620 #[cfg(feature = "wal")]
1621 #[test]
1622 fn test_persistent_database_recovery() {
1623 use grafeo_common::types::Value;
1624 use tempfile::tempdir;
1625
1626 let dir = tempdir().unwrap();
1627 let db_path = dir.path().join("test_db");
1628
1629 {
1631 let db = GrafeoDB::open(&db_path).unwrap();
1632
1633 let alix = db.create_node(&["Person"]);
1634 db.set_node_property(alix, "name", Value::from("Alix"));
1635
1636 let gus = db.create_node(&["Person"]);
1637 db.set_node_property(gus, "name", Value::from("Gus"));
1638
1639 let _edge = db.create_edge(alix, gus, "KNOWS");
1640
1641 db.close().unwrap();
1643 }
1644
1645 {
1647 let db = GrafeoDB::open(&db_path).unwrap();
1648
1649 assert_eq!(db.node_count(), 2);
1650 assert_eq!(db.edge_count(), 1);
1651
1652 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1654 assert!(node0.is_some());
1655
1656 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1657 assert!(node1.is_some());
1658 }
1659 }
1660
1661 #[cfg(feature = "wal")]
1662 #[test]
1663 fn test_wal_logging() {
1664 use tempfile::tempdir;
1665
1666 let dir = tempdir().unwrap();
1667 let db_path = dir.path().join("wal_test_db");
1668
1669 let db = GrafeoDB::open(&db_path).unwrap();
1670
1671 let node = db.create_node(&["Test"]);
1673 db.delete_node(node);
1674
1675 if let Some(wal) = db.wal() {
1677 assert!(wal.record_count() > 0);
1678 }
1679
1680 db.close().unwrap();
1681 }
1682
1683 #[cfg(feature = "wal")]
1684 #[test]
1685 fn test_wal_recovery_multiple_sessions() {
1686 use grafeo_common::types::Value;
1688 use tempfile::tempdir;
1689
1690 let dir = tempdir().unwrap();
1691 let db_path = dir.path().join("multi_session_db");
1692
1693 {
1695 let db = GrafeoDB::open(&db_path).unwrap();
1696 let alix = db.create_node(&["Person"]);
1697 db.set_node_property(alix, "name", Value::from("Alix"));
1698 db.close().unwrap();
1699 }
1700
1701 {
1703 let db = GrafeoDB::open(&db_path).unwrap();
1704 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1706 db.set_node_property(gus, "name", Value::from("Gus"));
1707 db.close().unwrap();
1708 }
1709
1710 {
1712 let db = GrafeoDB::open(&db_path).unwrap();
1713 assert_eq!(db.node_count(), 2);
1714
1715 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1717 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1718
1719 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1720 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1721 }
1722 }
1723
1724 #[cfg(feature = "wal")]
1725 #[test]
1726 fn test_database_consistency_after_mutations() {
1727 use grafeo_common::types::Value;
1729 use tempfile::tempdir;
1730
1731 let dir = tempdir().unwrap();
1732 let db_path = dir.path().join("consistency_db");
1733
1734 {
1735 let db = GrafeoDB::open(&db_path).unwrap();
1736
1737 let a = db.create_node(&["Node"]);
1739 let b = db.create_node(&["Node"]);
1740 let c = db.create_node(&["Node"]);
1741
1742 let e1 = db.create_edge(a, b, "LINKS");
1744 let _e2 = db.create_edge(b, c, "LINKS");
1745
1746 db.delete_edge(e1);
1748 db.delete_node(b);
1749
1750 db.set_node_property(a, "value", Value::Int64(1));
1752 db.set_node_property(c, "value", Value::Int64(3));
1753
1754 db.close().unwrap();
1755 }
1756
1757 {
1759 let db = GrafeoDB::open(&db_path).unwrap();
1760
1761 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1765 assert!(node_a.is_some());
1766
1767 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1768 assert!(node_c.is_some());
1769
1770 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1772 assert!(node_b.is_none());
1773 }
1774 }
1775
1776 #[cfg(feature = "wal")]
1777 #[test]
1778 fn test_close_is_idempotent() {
1779 use tempfile::tempdir;
1781
1782 let dir = tempdir().unwrap();
1783 let db_path = dir.path().join("close_test_db");
1784
1785 let db = GrafeoDB::open(&db_path).unwrap();
1786 db.create_node(&["Test"]);
1787
1788 assert!(db.close().is_ok());
1790
1791 assert!(db.close().is_ok());
1793 }
1794
1795 #[test]
1796 fn test_with_store_external_backend() {
1797 use grafeo_core::graph::lpg::LpgStore;
1798
1799 let external = Arc::new(LpgStore::new().unwrap());
1800
1801 let n1 = external.create_node(&["Person"]);
1803 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1804
1805 let db = GrafeoDB::with_store(
1806 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1807 Config::in_memory(),
1808 )
1809 .unwrap();
1810
1811 let session = db.session();
1812
1813 #[cfg(feature = "gql")]
1815 {
1816 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1817 assert_eq!(result.rows.len(), 1);
1818 }
1819 }
1820
1821 #[test]
1822 fn test_with_config_custom_memory_limit() {
1823 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
1826 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1827 assert_eq!(db.node_count(), 0);
1828 }
1829
1830 #[cfg(feature = "metrics")]
1831 #[test]
1832 fn test_database_metrics_registry() {
1833 let db = GrafeoDB::new_in_memory();
1834
1835 db.create_node(&["Person"]);
1837 db.create_node(&["Person"]);
1838
1839 let snap = db.metrics();
1841 assert_eq!(snap.query_count, 0); }
1844
1845 #[test]
1846 fn test_query_result_has_metrics() {
1847 let db = GrafeoDB::new_in_memory();
1849 db.create_node(&["Person"]);
1850 db.create_node(&["Person"]);
1851
1852 #[cfg(feature = "gql")]
1853 {
1854 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1855
1856 assert!(result.execution_time_ms.is_some());
1858 assert!(result.rows_scanned.is_some());
1859 assert!(result.execution_time_ms.unwrap() >= 0.0);
1860 assert_eq!(result.rows_scanned.unwrap(), 2);
1861 }
1862 }
1863
1864 #[test]
1865 fn test_empty_query_result_metrics() {
1866 let db = GrafeoDB::new_in_memory();
1868 db.create_node(&["Person"]);
1869
1870 #[cfg(feature = "gql")]
1871 {
1872 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1874
1875 assert!(result.execution_time_ms.is_some());
1876 assert!(result.rows_scanned.is_some());
1877 assert_eq!(result.rows_scanned.unwrap(), 0);
1878 }
1879 }
1880
1881 #[cfg(feature = "cdc")]
1882 mod cdc_integration {
1883 use super::*;
1884
1885 #[test]
1886 fn test_node_lifecycle_history() {
1887 let db = GrafeoDB::new_in_memory();
1888
1889 let id = db.create_node(&["Person"]);
1891 db.set_node_property(id, "name", "Alix".into());
1893 db.set_node_property(id, "name", "Gus".into());
1894 db.delete_node(id);
1896
1897 let history = db.history(id).unwrap();
1898 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1900 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1901 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1903 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1905 }
1906
1907 #[test]
1908 fn test_edge_lifecycle_history() {
1909 let db = GrafeoDB::new_in_memory();
1910
1911 let alix = db.create_node(&["Person"]);
1912 let gus = db.create_node(&["Person"]);
1913 let edge = db.create_edge(alix, gus, "KNOWS");
1914 db.set_edge_property(edge, "since", 2024i64.into());
1915 db.delete_edge(edge);
1916
1917 let history = db.history(edge).unwrap();
1918 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1920 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1921 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1922 }
1923
1924 #[test]
1925 fn test_create_node_with_props_cdc() {
1926 let db = GrafeoDB::new_in_memory();
1927
1928 let id = db.create_node_with_props(
1929 &["Person"],
1930 vec![
1931 ("name", grafeo_common::types::Value::from("Alix")),
1932 ("age", grafeo_common::types::Value::from(30i64)),
1933 ],
1934 );
1935
1936 let history = db.history(id).unwrap();
1937 assert_eq!(history.len(), 1);
1938 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1939 let after = history[0].after.as_ref().unwrap();
1941 assert_eq!(after.len(), 2);
1942 }
1943
1944 #[test]
1945 fn test_changes_between() {
1946 let db = GrafeoDB::new_in_memory();
1947
1948 let id1 = db.create_node(&["A"]);
1949 let _id2 = db.create_node(&["B"]);
1950 db.set_node_property(id1, "x", 1i64.into());
1951
1952 let changes = db
1954 .changes_between(
1955 grafeo_common::types::EpochId(0),
1956 grafeo_common::types::EpochId(u64::MAX),
1957 )
1958 .unwrap();
1959 assert_eq!(changes.len(), 3); }
1961 }
1962
1963 #[test]
1964 fn test_with_store_basic() {
1965 use grafeo_core::graph::lpg::LpgStore;
1966
1967 let store = Arc::new(LpgStore::new().unwrap());
1968 let n1 = store.create_node(&["Person"]);
1969 store.set_node_property(n1, "name", "Alix".into());
1970
1971 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1972 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1973
1974 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1975 assert_eq!(result.rows.len(), 1);
1976 }
1977
1978 #[test]
1979 fn test_with_store_session() {
1980 use grafeo_core::graph::lpg::LpgStore;
1981
1982 let store = Arc::new(LpgStore::new().unwrap());
1983 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1984 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1985
1986 let session = db.session();
1987 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1988 assert_eq!(result.rows.len(), 1);
1989 }
1990
1991 #[test]
1992 fn test_with_store_mutations() {
1993 use grafeo_core::graph::lpg::LpgStore;
1994
1995 let store = Arc::new(LpgStore::new().unwrap());
1996 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1997 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1998
1999 let mut session = db.session();
2000
2001 session.begin_transaction().unwrap();
2005 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2006
2007 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2008 assert_eq!(result.rows.len(), 1);
2009
2010 session.commit().unwrap();
2011 }
2012}