1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27#[cfg(feature = "wal")]
28use std::path::Path;
29use std::sync::Arc;
30use std::sync::atomic::AtomicUsize;
31
32use parking_lot::RwLock;
33
34#[cfg(feature = "grafeo-file")]
35use grafeo_adapters::storage::file::GrafeoFileManager;
36#[cfg(feature = "wal")]
37use grafeo_adapters::storage::wal::{
38 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
39};
40use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
41use grafeo_common::utils::error::Result;
42use grafeo_core::graph::GraphStoreMut;
43use grafeo_core::graph::lpg::LpgStore;
44#[cfg(feature = "rdf")]
45use grafeo_core::graph::rdf::RdfStore;
46
47use crate::catalog::Catalog;
48use crate::config::Config;
49use crate::query::cache::QueryCache;
50use crate::session::Session;
51use crate::transaction::TransactionManager;
52
53pub struct GrafeoDB {
76 pub(super) config: Config,
78 pub(super) store: Arc<LpgStore>,
80 pub(super) catalog: Arc<Catalog>,
82 #[cfg(feature = "rdf")]
84 pub(super) rdf_store: Arc<RdfStore>,
85 pub(super) transaction_manager: Arc<TransactionManager>,
87 pub(super) buffer_manager: Arc<BufferManager>,
89 #[cfg(feature = "wal")]
91 pub(super) wal: Option<Arc<LpgWal>>,
92 #[cfg(feature = "wal")]
96 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
97 pub(super) query_cache: Arc<QueryCache>,
99 pub(super) commit_counter: Arc<AtomicUsize>,
101 pub(super) is_open: RwLock<bool>,
103 #[cfg(feature = "cdc")]
105 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
106 #[cfg(feature = "embed")]
108 pub(super) embedding_models:
109 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
110 #[cfg(feature = "grafeo-file")]
112 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
113 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
116 #[cfg(feature = "metrics")]
118 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
119 current_graph: RwLock<Option<String>>,
123 read_only: bool,
126}
127
128impl GrafeoDB {
129 #[must_use]
150 pub fn new_in_memory() -> Self {
151 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
152 }
153
154 #[cfg(feature = "wal")]
173 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
174 Self::with_config(Config::persistent(path.as_ref()))
175 }
176
177 #[cfg(feature = "grafeo-file")]
202 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
203 Self::with_config(Config::read_only(path.as_ref()))
204 }
205
206 pub fn with_config(config: Config) -> Result<Self> {
230 config
232 .validate()
233 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
234
235 let store = Arc::new(LpgStore::new()?);
236 #[cfg(feature = "rdf")]
237 let rdf_store = Arc::new(RdfStore::new());
238 let transaction_manager = Arc::new(TransactionManager::new());
239
240 let buffer_config = BufferManagerConfig {
242 budget: config.memory_limit.unwrap_or_else(|| {
243 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
244 }),
245 spill_path: config
246 .spill_path
247 .clone()
248 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
249 ..BufferManagerConfig::default()
250 };
251 let buffer_manager = BufferManager::new(buffer_config);
252
253 let catalog = Arc::new(Catalog::new());
255
256 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
257
258 #[cfg(feature = "grafeo-file")]
260 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
261 if let Some(ref db_path) = config.path {
263 if db_path.exists() && db_path.is_file() {
264 let fm = GrafeoFileManager::open_read_only(db_path)?;
265 let snapshot_data = fm.read_snapshot()?;
266 if !snapshot_data.is_empty() {
267 Self::apply_snapshot_data(
268 &store,
269 &catalog,
270 #[cfg(feature = "rdf")]
271 &rdf_store,
272 &snapshot_data,
273 )?;
274 }
275 Some(Arc::new(fm))
276 } else {
277 return Err(grafeo_common::utils::error::Error::Internal(format!(
278 "read-only open requires an existing .grafeo file: {}",
279 db_path.display()
280 )));
281 }
282 } else {
283 return Err(grafeo_common::utils::error::Error::Internal(
284 "read-only mode requires a database path".to_string(),
285 ));
286 }
287 } else if config.wal_enabled {
288 if let Some(ref db_path) = config.path {
289 if Self::should_use_single_file(db_path, config.storage_format) {
290 let fm = if db_path.exists() && db_path.is_file() {
291 GrafeoFileManager::open(db_path)?
292 } else if !db_path.exists() {
293 GrafeoFileManager::create(db_path)?
294 } else {
295 return Err(grafeo_common::utils::error::Error::Internal(format!(
297 "path exists but is not a file: {}",
298 db_path.display()
299 )));
300 };
301
302 let snapshot_data = fm.read_snapshot()?;
304 if !snapshot_data.is_empty() {
305 Self::apply_snapshot_data(
306 &store,
307 &catalog,
308 #[cfg(feature = "rdf")]
309 &rdf_store,
310 &snapshot_data,
311 )?;
312 }
313
314 if fm.has_sidecar_wal() {
316 let recovery = WalRecovery::new(fm.sidecar_wal_path());
317 let records = recovery.recover()?;
318 Self::apply_wal_records(
319 &store,
320 &catalog,
321 #[cfg(feature = "rdf")]
322 &rdf_store,
323 &records,
324 )?;
325 }
326
327 Some(Arc::new(fm))
328 } else {
329 None
330 }
331 } else {
332 None
333 }
334 } else {
335 None
336 };
337
338 #[cfg(feature = "wal")]
341 let wal = if is_read_only {
342 None
343 } else if config.wal_enabled {
344 if let Some(ref db_path) = config.path {
345 #[cfg(feature = "grafeo-file")]
347 let wal_path = if let Some(ref fm) = file_manager {
348 let p = fm.sidecar_wal_path();
349 std::fs::create_dir_all(&p)?;
350 p
351 } else {
352 std::fs::create_dir_all(db_path)?;
354 db_path.join("wal")
355 };
356
357 #[cfg(not(feature = "grafeo-file"))]
358 let wal_path = {
359 std::fs::create_dir_all(db_path)?;
360 db_path.join("wal")
361 };
362
363 #[cfg(feature = "grafeo-file")]
365 let is_single_file = file_manager.is_some();
366 #[cfg(not(feature = "grafeo-file"))]
367 let is_single_file = false;
368
369 if !is_single_file && wal_path.exists() {
370 let recovery = WalRecovery::new(&wal_path);
371 let records = recovery.recover()?;
372 Self::apply_wal_records(
373 &store,
374 &catalog,
375 #[cfg(feature = "rdf")]
376 &rdf_store,
377 &records,
378 )?;
379 }
380
381 let wal_durability = match config.wal_durability {
383 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
384 crate::config::DurabilityMode::Batch {
385 max_delay_ms,
386 max_records,
387 } => WalDurabilityMode::Batch {
388 max_delay_ms,
389 max_records,
390 },
391 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
392 WalDurabilityMode::Adaptive { target_interval_ms }
393 }
394 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
395 };
396 let wal_config = WalConfig {
397 durability: wal_durability,
398 ..WalConfig::default()
399 };
400 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
401 Some(Arc::new(wal_manager))
402 } else {
403 None
404 }
405 } else {
406 None
407 };
408
409 let query_cache = Arc::new(QueryCache::default());
411
412 #[cfg(feature = "temporal")]
415 transaction_manager.sync_epoch(store.current_epoch());
416
417 Ok(Self {
418 config,
419 store,
420 catalog,
421 #[cfg(feature = "rdf")]
422 rdf_store,
423 transaction_manager,
424 buffer_manager,
425 #[cfg(feature = "wal")]
426 wal,
427 #[cfg(feature = "wal")]
428 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
429 query_cache,
430 commit_counter: Arc::new(AtomicUsize::new(0)),
431 is_open: RwLock::new(true),
432 #[cfg(feature = "cdc")]
433 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
434 #[cfg(feature = "embed")]
435 embedding_models: RwLock::new(hashbrown::HashMap::new()),
436 #[cfg(feature = "grafeo-file")]
437 file_manager,
438 external_store: None,
439 #[cfg(feature = "metrics")]
440 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
441 current_graph: RwLock::new(None),
442 read_only: is_read_only,
443 })
444 }
445
446 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
471 config
472 .validate()
473 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
474
475 let dummy_store = Arc::new(LpgStore::new()?);
476 let transaction_manager = Arc::new(TransactionManager::new());
477
478 let buffer_config = BufferManagerConfig {
479 budget: config.memory_limit.unwrap_or_else(|| {
480 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
481 }),
482 spill_path: None,
483 ..BufferManagerConfig::default()
484 };
485 let buffer_manager = BufferManager::new(buffer_config);
486
487 let query_cache = Arc::new(QueryCache::default());
488
489 Ok(Self {
490 config,
491 store: dummy_store,
492 catalog: Arc::new(Catalog::new()),
493 #[cfg(feature = "rdf")]
494 rdf_store: Arc::new(RdfStore::new()),
495 transaction_manager,
496 buffer_manager,
497 #[cfg(feature = "wal")]
498 wal: None,
499 #[cfg(feature = "wal")]
500 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
501 query_cache,
502 commit_counter: Arc::new(AtomicUsize::new(0)),
503 is_open: RwLock::new(true),
504 #[cfg(feature = "cdc")]
505 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
506 #[cfg(feature = "embed")]
507 embedding_models: RwLock::new(hashbrown::HashMap::new()),
508 #[cfg(feature = "grafeo-file")]
509 file_manager: None,
510 external_store: Some(store),
511 #[cfg(feature = "metrics")]
512 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
513 current_graph: RwLock::new(None),
514 read_only: false,
515 })
516 }
517
518 #[cfg(feature = "wal")]
524 fn apply_wal_records(
525 store: &Arc<LpgStore>,
526 catalog: &Catalog,
527 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
528 records: &[WalRecord],
529 ) -> Result<()> {
530 use crate::catalog::{
531 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
532 };
533 use grafeo_common::utils::error::Error;
534
535 let mut current_graph: Option<String> = None;
538 let mut target_store: Arc<LpgStore> = Arc::clone(store);
539
540 for record in records {
541 match record {
542 WalRecord::CreateNamedGraph { name } => {
544 let _ = store.create_graph(name);
545 }
546 WalRecord::DropNamedGraph { name } => {
547 store.drop_graph(name);
548 if current_graph.as_deref() == Some(name.as_str()) {
550 current_graph = None;
551 target_store = Arc::clone(store);
552 }
553 }
554 WalRecord::SwitchGraph { name } => {
555 current_graph.clone_from(name);
556 target_store = match ¤t_graph {
557 None => Arc::clone(store),
558 Some(graph_name) => store
559 .graph_or_create(graph_name)
560 .map_err(|e| Error::Internal(e.to_string()))?,
561 };
562 }
563
564 WalRecord::CreateNode { id, labels } => {
566 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
567 target_store.create_node_with_id(*id, &label_refs)?;
568 }
569 WalRecord::DeleteNode { id } => {
570 target_store.delete_node(*id);
571 }
572 WalRecord::CreateEdge {
573 id,
574 src,
575 dst,
576 edge_type,
577 } => {
578 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
579 }
580 WalRecord::DeleteEdge { id } => {
581 target_store.delete_edge(*id);
582 }
583 WalRecord::SetNodeProperty { id, key, value } => {
584 target_store.set_node_property(*id, key, value.clone());
585 }
586 WalRecord::SetEdgeProperty { id, key, value } => {
587 target_store.set_edge_property(*id, key, value.clone());
588 }
589 WalRecord::AddNodeLabel { id, label } => {
590 target_store.add_label(*id, label);
591 }
592 WalRecord::RemoveNodeLabel { id, label } => {
593 target_store.remove_label(*id, label);
594 }
595 WalRecord::RemoveNodeProperty { id, key } => {
596 target_store.remove_node_property(*id, key);
597 }
598 WalRecord::RemoveEdgeProperty { id, key } => {
599 target_store.remove_edge_property(*id, key);
600 }
601
602 WalRecord::CreateNodeType {
604 name,
605 properties,
606 constraints,
607 } => {
608 let def = NodeTypeDefinition {
609 name: name.clone(),
610 properties: properties
611 .iter()
612 .map(|(n, t, nullable)| TypedProperty {
613 name: n.clone(),
614 data_type: PropertyDataType::from_type_name(t),
615 nullable: *nullable,
616 default_value: None,
617 })
618 .collect(),
619 constraints: constraints
620 .iter()
621 .map(|(kind, props)| match kind.as_str() {
622 "unique" => TypeConstraint::Unique(props.clone()),
623 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
624 "not_null" if !props.is_empty() => {
625 TypeConstraint::NotNull(props[0].clone())
626 }
627 _ => TypeConstraint::Unique(props.clone()),
628 })
629 .collect(),
630 parent_types: Vec::new(),
631 };
632 let _ = catalog.register_node_type(def);
633 }
634 WalRecord::DropNodeType { name } => {
635 let _ = catalog.drop_node_type(name);
636 }
637 WalRecord::CreateEdgeType {
638 name,
639 properties,
640 constraints,
641 } => {
642 let def = EdgeTypeDefinition {
643 name: name.clone(),
644 properties: properties
645 .iter()
646 .map(|(n, t, nullable)| TypedProperty {
647 name: n.clone(),
648 data_type: PropertyDataType::from_type_name(t),
649 nullable: *nullable,
650 default_value: None,
651 })
652 .collect(),
653 constraints: constraints
654 .iter()
655 .map(|(kind, props)| match kind.as_str() {
656 "unique" => TypeConstraint::Unique(props.clone()),
657 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
658 "not_null" if !props.is_empty() => {
659 TypeConstraint::NotNull(props[0].clone())
660 }
661 _ => TypeConstraint::Unique(props.clone()),
662 })
663 .collect(),
664 source_node_types: Vec::new(),
665 target_node_types: Vec::new(),
666 };
667 let _ = catalog.register_edge_type_def(def);
668 }
669 WalRecord::DropEdgeType { name } => {
670 let _ = catalog.drop_edge_type_def(name);
671 }
672 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
673 }
676 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
677 }
680 WalRecord::CreateGraphType {
681 name,
682 node_types,
683 edge_types,
684 open,
685 } => {
686 use crate::catalog::GraphTypeDefinition;
687 let def = GraphTypeDefinition {
688 name: name.clone(),
689 allowed_node_types: node_types.clone(),
690 allowed_edge_types: edge_types.clone(),
691 open: *open,
692 };
693 let _ = catalog.register_graph_type(def);
694 }
695 WalRecord::DropGraphType { name } => {
696 let _ = catalog.drop_graph_type(name);
697 }
698 WalRecord::CreateSchema { name } => {
699 let _ = catalog.register_schema_namespace(name.clone());
700 }
701 WalRecord::DropSchema { name } => {
702 let _ = catalog.drop_schema_namespace(name);
703 }
704
705 WalRecord::AlterNodeType { name, alterations } => {
706 for (action, prop_name, type_name, nullable) in alterations {
707 match action.as_str() {
708 "add" => {
709 let prop = TypedProperty {
710 name: prop_name.clone(),
711 data_type: PropertyDataType::from_type_name(type_name),
712 nullable: *nullable,
713 default_value: None,
714 };
715 let _ = catalog.alter_node_type_add_property(name, prop);
716 }
717 "drop" => {
718 let _ = catalog.alter_node_type_drop_property(name, prop_name);
719 }
720 _ => {}
721 }
722 }
723 }
724 WalRecord::AlterEdgeType { name, alterations } => {
725 for (action, prop_name, type_name, nullable) in alterations {
726 match action.as_str() {
727 "add" => {
728 let prop = TypedProperty {
729 name: prop_name.clone(),
730 data_type: PropertyDataType::from_type_name(type_name),
731 nullable: *nullable,
732 default_value: None,
733 };
734 let _ = catalog.alter_edge_type_add_property(name, prop);
735 }
736 "drop" => {
737 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
738 }
739 _ => {}
740 }
741 }
742 }
743 WalRecord::AlterGraphType { name, alterations } => {
744 for (action, type_name) in alterations {
745 match action.as_str() {
746 "add_node" => {
747 let _ =
748 catalog.alter_graph_type_add_node_type(name, type_name.clone());
749 }
750 "drop_node" => {
751 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
752 }
753 "add_edge" => {
754 let _ =
755 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
756 }
757 "drop_edge" => {
758 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
759 }
760 _ => {}
761 }
762 }
763 }
764
765 WalRecord::CreateProcedure {
766 name,
767 params,
768 returns,
769 body,
770 } => {
771 use crate::catalog::ProcedureDefinition;
772 let def = ProcedureDefinition {
773 name: name.clone(),
774 params: params.clone(),
775 returns: returns.clone(),
776 body: body.clone(),
777 };
778 let _ = catalog.register_procedure(def);
779 }
780 WalRecord::DropProcedure { name } => {
781 let _ = catalog.drop_procedure(name);
782 }
783
784 #[cfg(feature = "rdf")]
786 WalRecord::InsertRdfTriple { .. }
787 | WalRecord::DeleteRdfTriple { .. }
788 | WalRecord::ClearRdfGraph { .. }
789 | WalRecord::CreateRdfGraph { .. }
790 | WalRecord::DropRdfGraph { .. } => {
791 rdf_ops::replay_rdf_wal_record(rdf_store, record);
792 }
793 #[cfg(not(feature = "rdf"))]
794 WalRecord::InsertRdfTriple { .. }
795 | WalRecord::DeleteRdfTriple { .. }
796 | WalRecord::ClearRdfGraph { .. }
797 | WalRecord::CreateRdfGraph { .. }
798 | WalRecord::DropRdfGraph { .. } => {}
799
800 WalRecord::TransactionCommit { .. } => {
801 #[cfg(feature = "temporal")]
805 {
806 target_store.new_epoch();
807 }
808 }
809 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
810 }
813 }
814 }
815 Ok(())
816 }
817
818 #[cfg(feature = "grafeo-file")]
824 fn should_use_single_file(
825 path: &std::path::Path,
826 configured: crate::config::StorageFormat,
827 ) -> bool {
828 use crate::config::StorageFormat;
829 match configured {
830 StorageFormat::SingleFile => true,
831 StorageFormat::WalDirectory => false,
832 StorageFormat::Auto => {
833 if path.is_file() {
835 if let Ok(mut f) = std::fs::File::open(path) {
836 use std::io::Read;
837 let mut magic = [0u8; 4];
838 if f.read_exact(&mut magic).is_ok()
839 && magic == grafeo_adapters::storage::file::MAGIC
840 {
841 return true;
842 }
843 }
844 return false;
845 }
846 if path.is_dir() {
848 return false;
849 }
850 path.extension().is_some_and(|ext| ext == "grafeo")
852 }
853 }
854 }
855
856 #[cfg(feature = "grafeo-file")]
858 fn apply_snapshot_data(
859 store: &Arc<LpgStore>,
860 catalog: &Arc<crate::catalog::Catalog>,
861 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
862 data: &[u8],
863 ) -> Result<()> {
864 persistence::load_snapshot_into_store(
865 store,
866 catalog,
867 #[cfg(feature = "rdf")]
868 rdf_store,
869 data,
870 )
871 }
872
873 #[must_use]
901 pub fn session(&self) -> Session {
902 let session_cfg = || crate::session::SessionConfig {
903 transaction_manager: Arc::clone(&self.transaction_manager),
904 query_cache: Arc::clone(&self.query_cache),
905 catalog: Arc::clone(&self.catalog),
906 adaptive_config: self.config.adaptive.clone(),
907 factorized_execution: self.config.factorized_execution,
908 graph_model: self.config.graph_model,
909 query_timeout: self.config.query_timeout,
910 commit_counter: Arc::clone(&self.commit_counter),
911 gc_interval: self.config.gc_interval,
912 read_only: self.read_only,
913 };
914
915 if let Some(ref ext_store) = self.external_store {
916 return Session::with_external_store(Arc::clone(ext_store), session_cfg())
917 .expect("arena allocation for external store session");
918 }
919
920 #[cfg(feature = "rdf")]
921 let mut session = Session::with_rdf_store_and_adaptive(
922 Arc::clone(&self.store),
923 Arc::clone(&self.rdf_store),
924 session_cfg(),
925 );
926 #[cfg(not(feature = "rdf"))]
927 let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
928
929 #[cfg(feature = "wal")]
930 if let Some(ref wal) = self.wal {
931 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
932 }
933
934 #[cfg(feature = "cdc")]
935 session.set_cdc_log(Arc::clone(&self.cdc_log));
936
937 #[cfg(feature = "metrics")]
938 {
939 if let Some(ref m) = self.metrics {
940 session.set_metrics(Arc::clone(m));
941 m.session_created
942 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
943 m.session_active
944 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
945 }
946 }
947
948 if let Some(ref graph) = *self.current_graph.read() {
950 session.use_graph(graph);
951 }
952
953 let _ = &mut session;
955
956 session
957 }
958
959 #[must_use]
965 pub fn current_graph(&self) -> Option<String> {
966 self.current_graph.read().clone()
967 }
968
969 pub fn set_current_graph(&self, name: Option<&str>) {
974 *self.current_graph.write() = name.map(ToString::to_string);
975 }
976
977 #[must_use]
979 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
980 &self.config.adaptive
981 }
982
983 #[must_use]
985 pub fn is_read_only(&self) -> bool {
986 self.read_only
987 }
988
989 #[must_use]
991 pub fn config(&self) -> &Config {
992 &self.config
993 }
994
995 #[must_use]
997 pub fn graph_model(&self) -> crate::config::GraphModel {
998 self.config.graph_model
999 }
1000
1001 #[must_use]
1003 pub fn memory_limit(&self) -> Option<usize> {
1004 self.config.memory_limit
1005 }
1006
1007 #[cfg(feature = "metrics")]
1012 #[must_use]
1013 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1014 let mut snapshot = self
1015 .metrics
1016 .as_ref()
1017 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1018
1019 let cache_stats = self.query_cache.stats();
1021 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1022 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1023 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1024 snapshot.cache_invalidations = cache_stats.invalidations;
1025
1026 snapshot
1027 }
1028
1029 #[cfg(feature = "metrics")]
1033 #[must_use]
1034 pub fn metrics_prometheus(&self) -> String {
1035 self.metrics
1036 .as_ref()
1037 .map_or_else(String::new, |m| m.to_prometheus())
1038 }
1039
1040 #[cfg(feature = "metrics")]
1042 pub fn reset_metrics(&self) {
1043 if let Some(ref m) = self.metrics {
1044 m.reset();
1045 }
1046 self.query_cache.reset_stats();
1047 }
1048
1049 #[must_use]
1057 pub fn store(&self) -> &Arc<LpgStore> {
1058 &self.store
1059 }
1060
1061 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
1068 let graph_name = self.current_graph.read().clone();
1069 match graph_name {
1070 None => Arc::clone(&self.store),
1071 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
1072 Some(ref name) => self
1073 .store
1074 .graph(name)
1075 .unwrap_or_else(|| Arc::clone(&self.store)),
1076 }
1077 }
1078
1079 pub fn create_graph(&self, name: &str) -> Result<bool> {
1087 Ok(self.store.create_graph(name)?)
1088 }
1089
1090 pub fn drop_graph(&self, name: &str) -> bool {
1092 self.store.drop_graph(name)
1093 }
1094
1095 #[must_use]
1097 pub fn list_graphs(&self) -> Vec<String> {
1098 self.store.graph_names()
1099 }
1100
1101 #[must_use]
1109 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1110 if let Some(ref ext_store) = self.external_store {
1111 Arc::clone(ext_store)
1112 } else {
1113 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1114 }
1115 }
1116
1117 pub fn gc(&self) {
1123 let min_epoch = self.transaction_manager.min_active_epoch();
1124 self.store.gc_versions(min_epoch);
1125 self.transaction_manager.gc();
1126 }
1127
1128 #[must_use]
1130 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1131 &self.buffer_manager
1132 }
1133
1134 #[must_use]
1136 pub fn query_cache(&self) -> &Arc<QueryCache> {
1137 &self.query_cache
1138 }
1139
1140 pub fn clear_plan_cache(&self) {
1146 self.query_cache.clear();
1147 }
1148
1149 pub fn close(&self) -> Result<()> {
1163 let mut is_open = self.is_open.write();
1164 if !*is_open {
1165 return Ok(());
1166 }
1167
1168 if self.read_only {
1170 #[cfg(feature = "grafeo-file")]
1171 if let Some(ref fm) = self.file_manager {
1172 fm.close()?;
1173 }
1174 *is_open = false;
1175 return Ok(());
1176 }
1177
1178 #[cfg(feature = "grafeo-file")]
1182 let is_single_file = self.file_manager.is_some();
1183 #[cfg(not(feature = "grafeo-file"))]
1184 let is_single_file = false;
1185
1186 #[cfg(feature = "grafeo-file")]
1187 if let Some(ref fm) = self.file_manager {
1188 #[cfg(feature = "wal")]
1190 if let Some(ref wal) = self.wal {
1191 wal.sync()?;
1192 }
1193 self.checkpoint_to_file(fm)?;
1194
1195 #[cfg(feature = "wal")]
1198 if let Some(ref wal) = self.wal {
1199 wal.close_active_log();
1200 }
1201
1202 fm.remove_sidecar_wal()?;
1203 fm.close()?;
1204 }
1205
1206 #[cfg(feature = "wal")]
1208 if !is_single_file && let Some(ref wal) = self.wal {
1209 let epoch = self.store.current_epoch();
1210
1211 let checkpoint_tx = self
1213 .transaction_manager
1214 .last_assigned_transaction_id()
1215 .unwrap_or_else(|| {
1216 self.transaction_manager.begin()
1218 });
1219
1220 wal.log(&WalRecord::TransactionCommit {
1222 transaction_id: checkpoint_tx,
1223 })?;
1224
1225 wal.checkpoint(checkpoint_tx, epoch)?;
1227 wal.sync()?;
1228 }
1229
1230 *is_open = false;
1231 Ok(())
1232 }
1233
1234 #[cfg(feature = "wal")]
1236 #[must_use]
1237 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1238 self.wal.as_ref()
1239 }
1240
1241 #[cfg(feature = "wal")]
1243 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1244 if let Some(ref wal) = self.wal {
1245 wal.log(record)?;
1246 }
1247 Ok(())
1248 }
1249
1250 #[cfg(feature = "grafeo-file")]
1256 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1257 use grafeo_core::testing::crash::maybe_crash;
1258
1259 maybe_crash("checkpoint_to_file:before_export");
1260 let snapshot_data = self.export_snapshot()?;
1261 maybe_crash("checkpoint_to_file:after_export");
1262
1263 let epoch = self.store.current_epoch();
1264 let transaction_id = self
1265 .transaction_manager
1266 .last_assigned_transaction_id()
1267 .map_or(0, |t| t.0);
1268 let node_count = self.store.node_count() as u64;
1269 let edge_count = self.store.edge_count() as u64;
1270
1271 fm.write_snapshot(
1272 &snapshot_data,
1273 epoch.0,
1274 transaction_id,
1275 node_count,
1276 edge_count,
1277 )?;
1278
1279 maybe_crash("checkpoint_to_file:after_write_snapshot");
1280 Ok(())
1281 }
1282
1283 #[cfg(feature = "grafeo-file")]
1285 #[must_use]
1286 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1287 self.file_manager.as_ref()
1288 }
1289}
1290
1291impl Drop for GrafeoDB {
1292 fn drop(&mut self) {
1293 if let Err(e) = self.close() {
1294 tracing::error!("Error closing database: {}", e);
1295 }
1296 }
1297}
1298
1299impl crate::admin::AdminService for GrafeoDB {
1300 fn info(&self) -> crate::admin::DatabaseInfo {
1301 self.info()
1302 }
1303
1304 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1305 self.detailed_stats()
1306 }
1307
1308 fn schema(&self) -> crate::admin::SchemaInfo {
1309 self.schema()
1310 }
1311
1312 fn validate(&self) -> crate::admin::ValidationResult {
1313 self.validate()
1314 }
1315
1316 fn wal_status(&self) -> crate::admin::WalStatus {
1317 self.wal_status()
1318 }
1319
1320 fn wal_checkpoint(&self) -> Result<()> {
1321 self.wal_checkpoint()
1322 }
1323}
1324
1325#[derive(Debug)]
1355pub struct QueryResult {
1356 pub columns: Vec<String>,
1358 pub column_types: Vec<grafeo_common::types::LogicalType>,
1360 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1362 pub execution_time_ms: Option<f64>,
1364 pub rows_scanned: Option<u64>,
1366 pub status_message: Option<String>,
1368 pub gql_status: grafeo_common::utils::GqlStatus,
1370}
1371
1372impl QueryResult {
1373 #[must_use]
1375 pub fn empty() -> Self {
1376 Self {
1377 columns: Vec::new(),
1378 column_types: Vec::new(),
1379 rows: Vec::new(),
1380 execution_time_ms: None,
1381 rows_scanned: None,
1382 status_message: None,
1383 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1384 }
1385 }
1386
1387 #[must_use]
1389 pub fn status(msg: impl Into<String>) -> Self {
1390 Self {
1391 columns: Vec::new(),
1392 column_types: Vec::new(),
1393 rows: Vec::new(),
1394 execution_time_ms: None,
1395 rows_scanned: None,
1396 status_message: Some(msg.into()),
1397 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1398 }
1399 }
1400
1401 #[must_use]
1403 pub fn new(columns: Vec<String>) -> Self {
1404 let len = columns.len();
1405 Self {
1406 columns,
1407 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1408 rows: Vec::new(),
1409 execution_time_ms: None,
1410 rows_scanned: None,
1411 status_message: None,
1412 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1413 }
1414 }
1415
1416 #[must_use]
1418 pub fn with_types(
1419 columns: Vec<String>,
1420 column_types: Vec<grafeo_common::types::LogicalType>,
1421 ) -> Self {
1422 Self {
1423 columns,
1424 column_types,
1425 rows: Vec::new(),
1426 execution_time_ms: None,
1427 rows_scanned: None,
1428 status_message: None,
1429 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1430 }
1431 }
1432
1433 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1435 self.execution_time_ms = Some(execution_time_ms);
1436 self.rows_scanned = Some(rows_scanned);
1437 self
1438 }
1439
1440 #[must_use]
1442 pub fn execution_time_ms(&self) -> Option<f64> {
1443 self.execution_time_ms
1444 }
1445
1446 #[must_use]
1448 pub fn rows_scanned(&self) -> Option<u64> {
1449 self.rows_scanned
1450 }
1451
1452 #[must_use]
1454 pub fn row_count(&self) -> usize {
1455 self.rows.len()
1456 }
1457
1458 #[must_use]
1460 pub fn column_count(&self) -> usize {
1461 self.columns.len()
1462 }
1463
1464 #[must_use]
1466 pub fn is_empty(&self) -> bool {
1467 self.rows.is_empty()
1468 }
1469
1470 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1479 if self.rows.len() != 1 || self.columns.len() != 1 {
1480 return Err(grafeo_common::utils::error::Error::InvalidValue(
1481 "Expected single value".to_string(),
1482 ));
1483 }
1484 T::from_value(&self.rows[0][0])
1485 }
1486
1487 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1489 self.rows.iter()
1490 }
1491}
1492
1493impl std::fmt::Display for QueryResult {
1494 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1495 let table = grafeo_common::fmt::format_result_table(
1496 &self.columns,
1497 &self.rows,
1498 self.execution_time_ms,
1499 self.status_message.as_deref(),
1500 );
1501 f.write_str(&table)
1502 }
1503}
1504
1505pub trait FromValue: Sized {
1510 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1512}
1513
1514impl FromValue for i64 {
1515 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1516 value
1517 .as_int64()
1518 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1519 expected: "INT64".to_string(),
1520 found: value.type_name().to_string(),
1521 })
1522 }
1523}
1524
1525impl FromValue for f64 {
1526 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1527 value
1528 .as_float64()
1529 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1530 expected: "FLOAT64".to_string(),
1531 found: value.type_name().to_string(),
1532 })
1533 }
1534}
1535
1536impl FromValue for String {
1537 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1538 value.as_str().map(String::from).ok_or_else(|| {
1539 grafeo_common::utils::error::Error::TypeMismatch {
1540 expected: "STRING".to_string(),
1541 found: value.type_name().to_string(),
1542 }
1543 })
1544 }
1545}
1546
1547impl FromValue for bool {
1548 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1549 value
1550 .as_bool()
1551 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1552 expected: "BOOL".to_string(),
1553 found: value.type_name().to_string(),
1554 })
1555 }
1556}
1557
1558#[cfg(test)]
1559mod tests {
1560 use super::*;
1561
1562 #[test]
1563 fn test_create_in_memory_database() {
1564 let db = GrafeoDB::new_in_memory();
1565 assert_eq!(db.node_count(), 0);
1566 assert_eq!(db.edge_count(), 0);
1567 }
1568
1569 #[test]
1570 fn test_database_config() {
1571 let config = Config::in_memory().with_threads(4).with_query_logging();
1572
1573 let db = GrafeoDB::with_config(config).unwrap();
1574 assert_eq!(db.config().threads, 4);
1575 assert!(db.config().query_logging);
1576 }
1577
1578 #[test]
1579 fn test_database_session() {
1580 let db = GrafeoDB::new_in_memory();
1581 let _session = db.session();
1582 }
1584
1585 #[cfg(feature = "wal")]
1586 #[test]
1587 fn test_persistent_database_recovery() {
1588 use grafeo_common::types::Value;
1589 use tempfile::tempdir;
1590
1591 let dir = tempdir().unwrap();
1592 let db_path = dir.path().join("test_db");
1593
1594 {
1596 let db = GrafeoDB::open(&db_path).unwrap();
1597
1598 let alix = db.create_node(&["Person"]);
1599 db.set_node_property(alix, "name", Value::from("Alix"));
1600
1601 let gus = db.create_node(&["Person"]);
1602 db.set_node_property(gus, "name", Value::from("Gus"));
1603
1604 let _edge = db.create_edge(alix, gus, "KNOWS");
1605
1606 db.close().unwrap();
1608 }
1609
1610 {
1612 let db = GrafeoDB::open(&db_path).unwrap();
1613
1614 assert_eq!(db.node_count(), 2);
1615 assert_eq!(db.edge_count(), 1);
1616
1617 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1619 assert!(node0.is_some());
1620
1621 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1622 assert!(node1.is_some());
1623 }
1624 }
1625
1626 #[cfg(feature = "wal")]
1627 #[test]
1628 fn test_wal_logging() {
1629 use tempfile::tempdir;
1630
1631 let dir = tempdir().unwrap();
1632 let db_path = dir.path().join("wal_test_db");
1633
1634 let db = GrafeoDB::open(&db_path).unwrap();
1635
1636 let node = db.create_node(&["Test"]);
1638 db.delete_node(node);
1639
1640 if let Some(wal) = db.wal() {
1642 assert!(wal.record_count() > 0);
1643 }
1644
1645 db.close().unwrap();
1646 }
1647
1648 #[cfg(feature = "wal")]
1649 #[test]
1650 fn test_wal_recovery_multiple_sessions() {
1651 use grafeo_common::types::Value;
1653 use tempfile::tempdir;
1654
1655 let dir = tempdir().unwrap();
1656 let db_path = dir.path().join("multi_session_db");
1657
1658 {
1660 let db = GrafeoDB::open(&db_path).unwrap();
1661 let alix = db.create_node(&["Person"]);
1662 db.set_node_property(alix, "name", Value::from("Alix"));
1663 db.close().unwrap();
1664 }
1665
1666 {
1668 let db = GrafeoDB::open(&db_path).unwrap();
1669 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1671 db.set_node_property(gus, "name", Value::from("Gus"));
1672 db.close().unwrap();
1673 }
1674
1675 {
1677 let db = GrafeoDB::open(&db_path).unwrap();
1678 assert_eq!(db.node_count(), 2);
1679
1680 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1682 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1683
1684 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1685 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1686 }
1687 }
1688
1689 #[cfg(feature = "wal")]
1690 #[test]
1691 fn test_database_consistency_after_mutations() {
1692 use grafeo_common::types::Value;
1694 use tempfile::tempdir;
1695
1696 let dir = tempdir().unwrap();
1697 let db_path = dir.path().join("consistency_db");
1698
1699 {
1700 let db = GrafeoDB::open(&db_path).unwrap();
1701
1702 let a = db.create_node(&["Node"]);
1704 let b = db.create_node(&["Node"]);
1705 let c = db.create_node(&["Node"]);
1706
1707 let e1 = db.create_edge(a, b, "LINKS");
1709 let _e2 = db.create_edge(b, c, "LINKS");
1710
1711 db.delete_edge(e1);
1713 db.delete_node(b);
1714
1715 db.set_node_property(a, "value", Value::Int64(1));
1717 db.set_node_property(c, "value", Value::Int64(3));
1718
1719 db.close().unwrap();
1720 }
1721
1722 {
1724 let db = GrafeoDB::open(&db_path).unwrap();
1725
1726 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1730 assert!(node_a.is_some());
1731
1732 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1733 assert!(node_c.is_some());
1734
1735 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1737 assert!(node_b.is_none());
1738 }
1739 }
1740
1741 #[cfg(feature = "wal")]
1742 #[test]
1743 fn test_close_is_idempotent() {
1744 use tempfile::tempdir;
1746
1747 let dir = tempdir().unwrap();
1748 let db_path = dir.path().join("close_test_db");
1749
1750 let db = GrafeoDB::open(&db_path).unwrap();
1751 db.create_node(&["Test"]);
1752
1753 assert!(db.close().is_ok());
1755
1756 assert!(db.close().is_ok());
1758 }
1759
1760 #[test]
1761 fn test_with_store_external_backend() {
1762 use grafeo_core::graph::lpg::LpgStore;
1763
1764 let external = Arc::new(LpgStore::new().unwrap());
1765
1766 let n1 = external.create_node(&["Person"]);
1768 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1769
1770 let db = GrafeoDB::with_store(
1771 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1772 Config::in_memory(),
1773 )
1774 .unwrap();
1775
1776 let session = db.session();
1777
1778 #[cfg(feature = "gql")]
1780 {
1781 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1782 assert_eq!(result.rows.len(), 1);
1783 }
1784 }
1785
1786 #[test]
1787 fn test_with_config_custom_memory_limit() {
1788 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
1791 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1792 assert_eq!(db.node_count(), 0);
1793 }
1794
1795 #[cfg(feature = "metrics")]
1796 #[test]
1797 fn test_database_metrics_registry() {
1798 let db = GrafeoDB::new_in_memory();
1799
1800 db.create_node(&["Person"]);
1802 db.create_node(&["Person"]);
1803
1804 let snap = db.metrics();
1806 assert_eq!(snap.query_count, 0); }
1809
1810 #[test]
1811 fn test_query_result_has_metrics() {
1812 let db = GrafeoDB::new_in_memory();
1814 db.create_node(&["Person"]);
1815 db.create_node(&["Person"]);
1816
1817 #[cfg(feature = "gql")]
1818 {
1819 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1820
1821 assert!(result.execution_time_ms.is_some());
1823 assert!(result.rows_scanned.is_some());
1824 assert!(result.execution_time_ms.unwrap() >= 0.0);
1825 assert_eq!(result.rows_scanned.unwrap(), 2);
1826 }
1827 }
1828
1829 #[test]
1830 fn test_empty_query_result_metrics() {
1831 let db = GrafeoDB::new_in_memory();
1833 db.create_node(&["Person"]);
1834
1835 #[cfg(feature = "gql")]
1836 {
1837 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1839
1840 assert!(result.execution_time_ms.is_some());
1841 assert!(result.rows_scanned.is_some());
1842 assert_eq!(result.rows_scanned.unwrap(), 0);
1843 }
1844 }
1845
1846 #[cfg(feature = "cdc")]
1847 mod cdc_integration {
1848 use super::*;
1849
1850 #[test]
1851 fn test_node_lifecycle_history() {
1852 let db = GrafeoDB::new_in_memory();
1853
1854 let id = db.create_node(&["Person"]);
1856 db.set_node_property(id, "name", "Alix".into());
1858 db.set_node_property(id, "name", "Gus".into());
1859 db.delete_node(id);
1861
1862 let history = db.history(id).unwrap();
1863 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1865 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1866 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1868 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1870 }
1871
1872 #[test]
1873 fn test_edge_lifecycle_history() {
1874 let db = GrafeoDB::new_in_memory();
1875
1876 let alix = db.create_node(&["Person"]);
1877 let gus = db.create_node(&["Person"]);
1878 let edge = db.create_edge(alix, gus, "KNOWS");
1879 db.set_edge_property(edge, "since", 2024i64.into());
1880 db.delete_edge(edge);
1881
1882 let history = db.history(edge).unwrap();
1883 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1885 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1886 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1887 }
1888
1889 #[test]
1890 fn test_create_node_with_props_cdc() {
1891 let db = GrafeoDB::new_in_memory();
1892
1893 let id = db.create_node_with_props(
1894 &["Person"],
1895 vec![
1896 ("name", grafeo_common::types::Value::from("Alix")),
1897 ("age", grafeo_common::types::Value::from(30i64)),
1898 ],
1899 );
1900
1901 let history = db.history(id).unwrap();
1902 assert_eq!(history.len(), 1);
1903 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1904 let after = history[0].after.as_ref().unwrap();
1906 assert_eq!(after.len(), 2);
1907 }
1908
1909 #[test]
1910 fn test_changes_between() {
1911 let db = GrafeoDB::new_in_memory();
1912
1913 let id1 = db.create_node(&["A"]);
1914 let _id2 = db.create_node(&["B"]);
1915 db.set_node_property(id1, "x", 1i64.into());
1916
1917 let changes = db
1919 .changes_between(
1920 grafeo_common::types::EpochId(0),
1921 grafeo_common::types::EpochId(u64::MAX),
1922 )
1923 .unwrap();
1924 assert_eq!(changes.len(), 3); }
1926 }
1927
1928 #[test]
1929 fn test_with_store_basic() {
1930 use grafeo_core::graph::lpg::LpgStore;
1931
1932 let store = Arc::new(LpgStore::new().unwrap());
1933 let n1 = store.create_node(&["Person"]);
1934 store.set_node_property(n1, "name", "Alix".into());
1935
1936 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1937 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1938
1939 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1940 assert_eq!(result.rows.len(), 1);
1941 }
1942
1943 #[test]
1944 fn test_with_store_session() {
1945 use grafeo_core::graph::lpg::LpgStore;
1946
1947 let store = Arc::new(LpgStore::new().unwrap());
1948 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1949 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1950
1951 let session = db.session();
1952 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1953 assert_eq!(result.rows.len(), 1);
1954 }
1955
1956 #[test]
1957 fn test_with_store_mutations() {
1958 use grafeo_core::graph::lpg::LpgStore;
1959
1960 let store = Arc::new(LpgStore::new().unwrap());
1961 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1962 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1963
1964 let mut session = db.session();
1965
1966 session.begin_transaction().unwrap();
1970 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1971
1972 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1973 assert_eq!(result.rows.len(), 1);
1974
1975 session.commit().unwrap();
1976 }
1977}