1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27use grafeo_common::grafeo_error;
28#[cfg(feature = "wal")]
29use std::path::Path;
30use std::sync::Arc;
31use std::sync::atomic::AtomicUsize;
32
33use parking_lot::RwLock;
34
35#[cfg(feature = "grafeo-file")]
36use grafeo_adapters::storage::file::GrafeoFileManager;
37#[cfg(feature = "wal")]
38use grafeo_adapters::storage::wal::{
39 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
40};
41use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
42use grafeo_common::utils::error::Result;
43use grafeo_core::graph::GraphStoreMut;
44use grafeo_core::graph::lpg::LpgStore;
45#[cfg(feature = "rdf")]
46use grafeo_core::graph::rdf::RdfStore;
47
48use crate::catalog::Catalog;
49use crate::config::Config;
50use crate::query::cache::QueryCache;
51use crate::session::Session;
52use crate::transaction::TransactionManager;
53
54pub struct GrafeoDB {
77 pub(super) config: Config,
79 pub(super) store: Arc<LpgStore>,
81 pub(super) catalog: Arc<Catalog>,
83 #[cfg(feature = "rdf")]
85 pub(super) rdf_store: Arc<RdfStore>,
86 pub(super) transaction_manager: Arc<TransactionManager>,
88 pub(super) buffer_manager: Arc<BufferManager>,
90 #[cfg(feature = "wal")]
92 pub(super) wal: Option<Arc<LpgWal>>,
93 #[cfg(feature = "wal")]
97 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
98 pub(super) query_cache: Arc<QueryCache>,
100 pub(super) commit_counter: Arc<AtomicUsize>,
102 pub(super) is_open: RwLock<bool>,
104 #[cfg(feature = "cdc")]
106 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
107 #[cfg(feature = "embed")]
109 pub(super) embedding_models:
110 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
111 #[cfg(feature = "grafeo-file")]
113 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
114 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
117 #[cfg(feature = "metrics")]
119 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
120 current_graph: RwLock<Option<String>>,
124 current_schema: RwLock<Option<String>>,
128 read_only: bool,
131}
132
133impl GrafeoDB {
134 #[must_use]
155 pub fn new_in_memory() -> Self {
156 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
157 }
158
159 #[cfg(feature = "wal")]
178 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
179 Self::with_config(Config::persistent(path.as_ref()))
180 }
181
182 #[cfg(feature = "grafeo-file")]
207 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
208 Self::with_config(Config::read_only(path.as_ref()))
209 }
210
211 pub fn with_config(config: Config) -> Result<Self> {
235 config
237 .validate()
238 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
239
240 let store = Arc::new(LpgStore::new()?);
241 #[cfg(feature = "rdf")]
242 let rdf_store = Arc::new(RdfStore::new());
243 let transaction_manager = Arc::new(TransactionManager::new());
244
245 let buffer_config = BufferManagerConfig {
247 budget: config.memory_limit.unwrap_or_else(|| {
248 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
249 }),
250 spill_path: config
251 .spill_path
252 .clone()
253 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
254 ..BufferManagerConfig::default()
255 };
256 let buffer_manager = BufferManager::new(buffer_config);
257
258 let catalog = Arc::new(Catalog::new());
260
261 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
262
263 #[cfg(feature = "grafeo-file")]
265 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
266 if let Some(ref db_path) = config.path {
268 if db_path.exists() && db_path.is_file() {
269 let fm = GrafeoFileManager::open_read_only(db_path)?;
270 let snapshot_data = fm.read_snapshot()?;
271 if !snapshot_data.is_empty() {
272 Self::apply_snapshot_data(
273 &store,
274 &catalog,
275 #[cfg(feature = "rdf")]
276 &rdf_store,
277 &snapshot_data,
278 )?;
279 }
280 Some(Arc::new(fm))
281 } else {
282 return Err(grafeo_common::utils::error::Error::Internal(format!(
283 "read-only open requires an existing .grafeo file: {}",
284 db_path.display()
285 )));
286 }
287 } else {
288 return Err(grafeo_common::utils::error::Error::Internal(
289 "read-only mode requires a database path".to_string(),
290 ));
291 }
292 } else if config.wal_enabled {
293 if let Some(ref db_path) = config.path {
294 if Self::should_use_single_file(db_path, config.storage_format) {
295 let fm = if db_path.exists() && db_path.is_file() {
296 GrafeoFileManager::open(db_path)?
297 } else if !db_path.exists() {
298 GrafeoFileManager::create(db_path)?
299 } else {
300 return Err(grafeo_common::utils::error::Error::Internal(format!(
302 "path exists but is not a file: {}",
303 db_path.display()
304 )));
305 };
306
307 let snapshot_data = fm.read_snapshot()?;
309 if !snapshot_data.is_empty() {
310 Self::apply_snapshot_data(
311 &store,
312 &catalog,
313 #[cfg(feature = "rdf")]
314 &rdf_store,
315 &snapshot_data,
316 )?;
317 }
318
319 if fm.has_sidecar_wal() {
321 let recovery = WalRecovery::new(fm.sidecar_wal_path());
322 let records = recovery.recover()?;
323 Self::apply_wal_records(
324 &store,
325 &catalog,
326 #[cfg(feature = "rdf")]
327 &rdf_store,
328 &records,
329 )?;
330 }
331
332 Some(Arc::new(fm))
333 } else {
334 None
335 }
336 } else {
337 None
338 }
339 } else {
340 None
341 };
342
343 #[cfg(feature = "wal")]
346 let wal = if is_read_only {
347 None
348 } else if config.wal_enabled {
349 if let Some(ref db_path) = config.path {
350 #[cfg(feature = "grafeo-file")]
352 let wal_path = if let Some(ref fm) = file_manager {
353 let p = fm.sidecar_wal_path();
354 std::fs::create_dir_all(&p)?;
355 p
356 } else {
357 std::fs::create_dir_all(db_path)?;
359 db_path.join("wal")
360 };
361
362 #[cfg(not(feature = "grafeo-file"))]
363 let wal_path = {
364 std::fs::create_dir_all(db_path)?;
365 db_path.join("wal")
366 };
367
368 #[cfg(feature = "grafeo-file")]
370 let is_single_file = file_manager.is_some();
371 #[cfg(not(feature = "grafeo-file"))]
372 let is_single_file = false;
373
374 if !is_single_file && wal_path.exists() {
375 let recovery = WalRecovery::new(&wal_path);
376 let records = recovery.recover()?;
377 Self::apply_wal_records(
378 &store,
379 &catalog,
380 #[cfg(feature = "rdf")]
381 &rdf_store,
382 &records,
383 )?;
384 }
385
386 let wal_durability = match config.wal_durability {
388 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
389 crate::config::DurabilityMode::Batch {
390 max_delay_ms,
391 max_records,
392 } => WalDurabilityMode::Batch {
393 max_delay_ms,
394 max_records,
395 },
396 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
397 WalDurabilityMode::Adaptive { target_interval_ms }
398 }
399 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
400 };
401 let wal_config = WalConfig {
402 durability: wal_durability,
403 ..WalConfig::default()
404 };
405 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
406 Some(Arc::new(wal_manager))
407 } else {
408 None
409 }
410 } else {
411 None
412 };
413
414 let query_cache = Arc::new(QueryCache::default());
416
417 #[cfg(feature = "temporal")]
420 transaction_manager.sync_epoch(store.current_epoch());
421
422 Ok(Self {
423 config,
424 store,
425 catalog,
426 #[cfg(feature = "rdf")]
427 rdf_store,
428 transaction_manager,
429 buffer_manager,
430 #[cfg(feature = "wal")]
431 wal,
432 #[cfg(feature = "wal")]
433 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
434 query_cache,
435 commit_counter: Arc::new(AtomicUsize::new(0)),
436 is_open: RwLock::new(true),
437 #[cfg(feature = "cdc")]
438 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
439 #[cfg(feature = "embed")]
440 embedding_models: RwLock::new(hashbrown::HashMap::new()),
441 #[cfg(feature = "grafeo-file")]
442 file_manager,
443 external_store: None,
444 #[cfg(feature = "metrics")]
445 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
446 current_graph: RwLock::new(None),
447 current_schema: RwLock::new(None),
448 read_only: is_read_only,
449 })
450 }
451
452 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
477 config
478 .validate()
479 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
480
481 let dummy_store = Arc::new(LpgStore::new()?);
482 let transaction_manager = Arc::new(TransactionManager::new());
483
484 let buffer_config = BufferManagerConfig {
485 budget: config.memory_limit.unwrap_or_else(|| {
486 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
487 }),
488 spill_path: None,
489 ..BufferManagerConfig::default()
490 };
491 let buffer_manager = BufferManager::new(buffer_config);
492
493 let query_cache = Arc::new(QueryCache::default());
494
495 Ok(Self {
496 config,
497 store: dummy_store,
498 catalog: Arc::new(Catalog::new()),
499 #[cfg(feature = "rdf")]
500 rdf_store: Arc::new(RdfStore::new()),
501 transaction_manager,
502 buffer_manager,
503 #[cfg(feature = "wal")]
504 wal: None,
505 #[cfg(feature = "wal")]
506 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
507 query_cache,
508 commit_counter: Arc::new(AtomicUsize::new(0)),
509 is_open: RwLock::new(true),
510 #[cfg(feature = "cdc")]
511 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
512 #[cfg(feature = "embed")]
513 embedding_models: RwLock::new(hashbrown::HashMap::new()),
514 #[cfg(feature = "grafeo-file")]
515 file_manager: None,
516 external_store: Some(store),
517 #[cfg(feature = "metrics")]
518 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
519 current_graph: RwLock::new(None),
520 current_schema: RwLock::new(None),
521 read_only: false,
522 })
523 }
524
525 #[cfg(feature = "wal")]
531 fn apply_wal_records(
532 store: &Arc<LpgStore>,
533 catalog: &Catalog,
534 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
535 records: &[WalRecord],
536 ) -> Result<()> {
537 use crate::catalog::{
538 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
539 };
540 use grafeo_common::utils::error::Error;
541
542 let mut current_graph: Option<String> = None;
545 let mut target_store: Arc<LpgStore> = Arc::clone(store);
546
547 for record in records {
548 match record {
549 WalRecord::CreateNamedGraph { name } => {
551 let _ = store.create_graph(name);
552 }
553 WalRecord::DropNamedGraph { name } => {
554 store.drop_graph(name);
555 if current_graph.as_deref() == Some(name.as_str()) {
557 current_graph = None;
558 target_store = Arc::clone(store);
559 }
560 }
561 WalRecord::SwitchGraph { name } => {
562 current_graph.clone_from(name);
563 target_store = match ¤t_graph {
564 None => Arc::clone(store),
565 Some(graph_name) => store
566 .graph_or_create(graph_name)
567 .map_err(|e| Error::Internal(e.to_string()))?,
568 };
569 }
570
571 WalRecord::CreateNode { id, labels } => {
573 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
574 target_store.create_node_with_id(*id, &label_refs)?;
575 }
576 WalRecord::DeleteNode { id } => {
577 target_store.delete_node(*id);
578 }
579 WalRecord::CreateEdge {
580 id,
581 src,
582 dst,
583 edge_type,
584 } => {
585 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
586 }
587 WalRecord::DeleteEdge { id } => {
588 target_store.delete_edge(*id);
589 }
590 WalRecord::SetNodeProperty { id, key, value } => {
591 target_store.set_node_property(*id, key, value.clone());
592 }
593 WalRecord::SetEdgeProperty { id, key, value } => {
594 target_store.set_edge_property(*id, key, value.clone());
595 }
596 WalRecord::AddNodeLabel { id, label } => {
597 target_store.add_label(*id, label);
598 }
599 WalRecord::RemoveNodeLabel { id, label } => {
600 target_store.remove_label(*id, label);
601 }
602 WalRecord::RemoveNodeProperty { id, key } => {
603 target_store.remove_node_property(*id, key);
604 }
605 WalRecord::RemoveEdgeProperty { id, key } => {
606 target_store.remove_edge_property(*id, key);
607 }
608
609 WalRecord::CreateNodeType {
611 name,
612 properties,
613 constraints,
614 } => {
615 let def = NodeTypeDefinition {
616 name: name.clone(),
617 properties: properties
618 .iter()
619 .map(|(n, t, nullable)| TypedProperty {
620 name: n.clone(),
621 data_type: PropertyDataType::from_type_name(t),
622 nullable: *nullable,
623 default_value: None,
624 })
625 .collect(),
626 constraints: constraints
627 .iter()
628 .map(|(kind, props)| match kind.as_str() {
629 "unique" => TypeConstraint::Unique(props.clone()),
630 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
631 "not_null" if !props.is_empty() => {
632 TypeConstraint::NotNull(props[0].clone())
633 }
634 _ => TypeConstraint::Unique(props.clone()),
635 })
636 .collect(),
637 parent_types: Vec::new(),
638 };
639 let _ = catalog.register_node_type(def);
640 }
641 WalRecord::DropNodeType { name } => {
642 let _ = catalog.drop_node_type(name);
643 }
644 WalRecord::CreateEdgeType {
645 name,
646 properties,
647 constraints,
648 } => {
649 let def = EdgeTypeDefinition {
650 name: name.clone(),
651 properties: properties
652 .iter()
653 .map(|(n, t, nullable)| TypedProperty {
654 name: n.clone(),
655 data_type: PropertyDataType::from_type_name(t),
656 nullable: *nullable,
657 default_value: None,
658 })
659 .collect(),
660 constraints: constraints
661 .iter()
662 .map(|(kind, props)| match kind.as_str() {
663 "unique" => TypeConstraint::Unique(props.clone()),
664 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
665 "not_null" if !props.is_empty() => {
666 TypeConstraint::NotNull(props[0].clone())
667 }
668 _ => TypeConstraint::Unique(props.clone()),
669 })
670 .collect(),
671 source_node_types: Vec::new(),
672 target_node_types: Vec::new(),
673 };
674 let _ = catalog.register_edge_type_def(def);
675 }
676 WalRecord::DropEdgeType { name } => {
677 let _ = catalog.drop_edge_type_def(name);
678 }
679 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
680 }
683 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
684 }
687 WalRecord::CreateGraphType {
688 name,
689 node_types,
690 edge_types,
691 open,
692 } => {
693 use crate::catalog::GraphTypeDefinition;
694 let def = GraphTypeDefinition {
695 name: name.clone(),
696 allowed_node_types: node_types.clone(),
697 allowed_edge_types: edge_types.clone(),
698 open: *open,
699 };
700 let _ = catalog.register_graph_type(def);
701 }
702 WalRecord::DropGraphType { name } => {
703 let _ = catalog.drop_graph_type(name);
704 }
705 WalRecord::CreateSchema { name } => {
706 let _ = catalog.register_schema_namespace(name.clone());
707 }
708 WalRecord::DropSchema { name } => {
709 let _ = catalog.drop_schema_namespace(name);
710 }
711
712 WalRecord::AlterNodeType { name, alterations } => {
713 for (action, prop_name, type_name, nullable) in alterations {
714 match action.as_str() {
715 "add" => {
716 let prop = TypedProperty {
717 name: prop_name.clone(),
718 data_type: PropertyDataType::from_type_name(type_name),
719 nullable: *nullable,
720 default_value: None,
721 };
722 let _ = catalog.alter_node_type_add_property(name, prop);
723 }
724 "drop" => {
725 let _ = catalog.alter_node_type_drop_property(name, prop_name);
726 }
727 _ => {}
728 }
729 }
730 }
731 WalRecord::AlterEdgeType { name, alterations } => {
732 for (action, prop_name, type_name, nullable) in alterations {
733 match action.as_str() {
734 "add" => {
735 let prop = TypedProperty {
736 name: prop_name.clone(),
737 data_type: PropertyDataType::from_type_name(type_name),
738 nullable: *nullable,
739 default_value: None,
740 };
741 let _ = catalog.alter_edge_type_add_property(name, prop);
742 }
743 "drop" => {
744 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
745 }
746 _ => {}
747 }
748 }
749 }
750 WalRecord::AlterGraphType { name, alterations } => {
751 for (action, type_name) in alterations {
752 match action.as_str() {
753 "add_node" => {
754 let _ =
755 catalog.alter_graph_type_add_node_type(name, type_name.clone());
756 }
757 "drop_node" => {
758 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
759 }
760 "add_edge" => {
761 let _ =
762 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
763 }
764 "drop_edge" => {
765 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
766 }
767 _ => {}
768 }
769 }
770 }
771
772 WalRecord::CreateProcedure {
773 name,
774 params,
775 returns,
776 body,
777 } => {
778 use crate::catalog::ProcedureDefinition;
779 let def = ProcedureDefinition {
780 name: name.clone(),
781 params: params.clone(),
782 returns: returns.clone(),
783 body: body.clone(),
784 };
785 let _ = catalog.register_procedure(def);
786 }
787 WalRecord::DropProcedure { name } => {
788 let _ = catalog.drop_procedure(name);
789 }
790
791 #[cfg(feature = "rdf")]
793 WalRecord::InsertRdfTriple { .. }
794 | WalRecord::DeleteRdfTriple { .. }
795 | WalRecord::ClearRdfGraph { .. }
796 | WalRecord::CreateRdfGraph { .. }
797 | WalRecord::DropRdfGraph { .. } => {
798 rdf_ops::replay_rdf_wal_record(rdf_store, record);
799 }
800 #[cfg(not(feature = "rdf"))]
801 WalRecord::InsertRdfTriple { .. }
802 | WalRecord::DeleteRdfTriple { .. }
803 | WalRecord::ClearRdfGraph { .. }
804 | WalRecord::CreateRdfGraph { .. }
805 | WalRecord::DropRdfGraph { .. } => {}
806
807 WalRecord::TransactionCommit { .. } => {
808 #[cfg(feature = "temporal")]
812 {
813 target_store.new_epoch();
814 }
815 }
816 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
817 }
820 }
821 }
822 Ok(())
823 }
824
825 #[cfg(feature = "grafeo-file")]
831 fn should_use_single_file(
832 path: &std::path::Path,
833 configured: crate::config::StorageFormat,
834 ) -> bool {
835 use crate::config::StorageFormat;
836 match configured {
837 StorageFormat::SingleFile => true,
838 StorageFormat::WalDirectory => false,
839 StorageFormat::Auto => {
840 if path.is_file() {
842 if let Ok(mut f) = std::fs::File::open(path) {
843 use std::io::Read;
844 let mut magic = [0u8; 4];
845 if f.read_exact(&mut magic).is_ok()
846 && magic == grafeo_adapters::storage::file::MAGIC
847 {
848 return true;
849 }
850 }
851 return false;
852 }
853 if path.is_dir() {
855 return false;
856 }
857 path.extension().is_some_and(|ext| ext == "grafeo")
859 }
860 }
861 }
862
863 #[cfg(feature = "grafeo-file")]
865 fn apply_snapshot_data(
866 store: &Arc<LpgStore>,
867 catalog: &Arc<crate::catalog::Catalog>,
868 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
869 data: &[u8],
870 ) -> Result<()> {
871 persistence::load_snapshot_into_store(
872 store,
873 catalog,
874 #[cfg(feature = "rdf")]
875 rdf_store,
876 data,
877 )
878 }
879
880 #[must_use]
908 pub fn session(&self) -> Session {
909 let session_cfg = || crate::session::SessionConfig {
910 transaction_manager: Arc::clone(&self.transaction_manager),
911 query_cache: Arc::clone(&self.query_cache),
912 catalog: Arc::clone(&self.catalog),
913 adaptive_config: self.config.adaptive.clone(),
914 factorized_execution: self.config.factorized_execution,
915 graph_model: self.config.graph_model,
916 query_timeout: self.config.query_timeout,
917 commit_counter: Arc::clone(&self.commit_counter),
918 gc_interval: self.config.gc_interval,
919 read_only: self.read_only,
920 };
921
922 if let Some(ref ext_store) = self.external_store {
923 return Session::with_external_store(Arc::clone(ext_store), session_cfg())
924 .expect("arena allocation for external store session");
925 }
926
927 #[cfg(feature = "rdf")]
928 let mut session = Session::with_rdf_store_and_adaptive(
929 Arc::clone(&self.store),
930 Arc::clone(&self.rdf_store),
931 session_cfg(),
932 );
933 #[cfg(not(feature = "rdf"))]
934 let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
935
936 #[cfg(feature = "wal")]
937 if let Some(ref wal) = self.wal {
938 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
939 }
940
941 #[cfg(feature = "cdc")]
942 session.set_cdc_log(Arc::clone(&self.cdc_log));
943
944 #[cfg(feature = "metrics")]
945 {
946 if let Some(ref m) = self.metrics {
947 session.set_metrics(Arc::clone(m));
948 m.session_created
949 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
950 m.session_active
951 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
952 }
953 }
954
955 if let Some(ref graph) = *self.current_graph.read() {
957 session.use_graph(graph);
958 }
959
960 if let Some(ref schema) = *self.current_schema.read() {
962 session.set_schema(schema);
963 }
964
965 let _ = &mut session;
967
968 session
969 }
970
971 #[must_use]
977 pub fn current_graph(&self) -> Option<String> {
978 self.current_graph.read().clone()
979 }
980
981 pub fn set_current_graph(&self, name: Option<&str>) {
986 *self.current_graph.write() = name.map(ToString::to_string);
987 }
988
989 #[must_use]
994 pub fn current_schema(&self) -> Option<String> {
995 self.current_schema.read().clone()
996 }
997
998 pub fn set_current_schema(&self, name: Option<&str>) {
1003 *self.current_schema.write() = name.map(ToString::to_string);
1004 }
1005
1006 #[must_use]
1008 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1009 &self.config.adaptive
1010 }
1011
1012 #[must_use]
1014 pub fn is_read_only(&self) -> bool {
1015 self.read_only
1016 }
1017
1018 #[must_use]
1020 pub fn config(&self) -> &Config {
1021 &self.config
1022 }
1023
1024 #[must_use]
1026 pub fn graph_model(&self) -> crate::config::GraphModel {
1027 self.config.graph_model
1028 }
1029
1030 #[must_use]
1032 pub fn memory_limit(&self) -> Option<usize> {
1033 self.config.memory_limit
1034 }
1035
1036 #[cfg(feature = "metrics")]
1041 #[must_use]
1042 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1043 let mut snapshot = self
1044 .metrics
1045 .as_ref()
1046 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1047
1048 let cache_stats = self.query_cache.stats();
1050 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1051 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1052 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1053 snapshot.cache_invalidations = cache_stats.invalidations;
1054
1055 snapshot
1056 }
1057
1058 #[cfg(feature = "metrics")]
1062 #[must_use]
1063 pub fn metrics_prometheus(&self) -> String {
1064 self.metrics
1065 .as_ref()
1066 .map_or_else(String::new, |m| m.to_prometheus())
1067 }
1068
1069 #[cfg(feature = "metrics")]
1071 pub fn reset_metrics(&self) {
1072 if let Some(ref m) = self.metrics {
1073 m.reset();
1074 }
1075 self.query_cache.reset_stats();
1076 }
1077
1078 #[must_use]
1086 pub fn store(&self) -> &Arc<LpgStore> {
1087 &self.store
1088 }
1089
1090 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
1097 let graph_name = self.current_graph.read().clone();
1098 match graph_name {
1099 None => Arc::clone(&self.store),
1100 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
1101 Some(ref name) => self
1102 .store
1103 .graph(name)
1104 .unwrap_or_else(|| Arc::clone(&self.store)),
1105 }
1106 }
1107
1108 pub fn create_graph(&self, name: &str) -> Result<bool> {
1116 Ok(self.store.create_graph(name)?)
1117 }
1118
1119 pub fn drop_graph(&self, name: &str) -> bool {
1121 self.store.drop_graph(name)
1122 }
1123
1124 #[must_use]
1126 pub fn list_graphs(&self) -> Vec<String> {
1127 self.store.graph_names()
1128 }
1129
1130 #[must_use]
1138 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1139 if let Some(ref ext_store) = self.external_store {
1140 Arc::clone(ext_store)
1141 } else {
1142 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1143 }
1144 }
1145
1146 pub fn gc(&self) {
1152 let min_epoch = self.transaction_manager.min_active_epoch();
1153 self.store.gc_versions(min_epoch);
1154 self.transaction_manager.gc();
1155 }
1156
1157 #[must_use]
1159 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1160 &self.buffer_manager
1161 }
1162
1163 #[must_use]
1165 pub fn query_cache(&self) -> &Arc<QueryCache> {
1166 &self.query_cache
1167 }
1168
1169 pub fn clear_plan_cache(&self) {
1175 self.query_cache.clear();
1176 }
1177
1178 pub fn close(&self) -> Result<()> {
1192 let mut is_open = self.is_open.write();
1193 if !*is_open {
1194 return Ok(());
1195 }
1196
1197 if self.read_only {
1199 #[cfg(feature = "grafeo-file")]
1200 if let Some(ref fm) = self.file_manager {
1201 fm.close()?;
1202 }
1203 *is_open = false;
1204 return Ok(());
1205 }
1206
1207 #[cfg(feature = "grafeo-file")]
1211 let is_single_file = self.file_manager.is_some();
1212 #[cfg(not(feature = "grafeo-file"))]
1213 let is_single_file = false;
1214
1215 #[cfg(feature = "grafeo-file")]
1216 if let Some(ref fm) = self.file_manager {
1217 #[cfg(feature = "wal")]
1219 if let Some(ref wal) = self.wal {
1220 wal.sync()?;
1221 }
1222 self.checkpoint_to_file(fm)?;
1223
1224 #[cfg(feature = "wal")]
1227 if let Some(ref wal) = self.wal {
1228 wal.close_active_log();
1229 }
1230
1231 fm.remove_sidecar_wal()?;
1232 fm.close()?;
1233 }
1234
1235 #[cfg(feature = "wal")]
1237 if !is_single_file && let Some(ref wal) = self.wal {
1238 let epoch = self.store.current_epoch();
1239
1240 let checkpoint_tx = self
1242 .transaction_manager
1243 .last_assigned_transaction_id()
1244 .unwrap_or_else(|| {
1245 self.transaction_manager.begin()
1247 });
1248
1249 wal.log(&WalRecord::TransactionCommit {
1251 transaction_id: checkpoint_tx,
1252 })?;
1253
1254 wal.checkpoint(checkpoint_tx, epoch)?;
1256 wal.sync()?;
1257 }
1258
1259 *is_open = false;
1260 Ok(())
1261 }
1262
1263 #[cfg(feature = "wal")]
1265 #[must_use]
1266 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1267 self.wal.as_ref()
1268 }
1269
1270 #[cfg(feature = "wal")]
1272 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1273 if let Some(ref wal) = self.wal {
1274 wal.log(record)?;
1275 }
1276 Ok(())
1277 }
1278
1279 #[cfg(feature = "grafeo-file")]
1285 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1286 use grafeo_core::testing::crash::maybe_crash;
1287
1288 maybe_crash("checkpoint_to_file:before_export");
1289 let snapshot_data = self.export_snapshot()?;
1290 maybe_crash("checkpoint_to_file:after_export");
1291
1292 let epoch = self.store.current_epoch();
1293 let transaction_id = self
1294 .transaction_manager
1295 .last_assigned_transaction_id()
1296 .map_or(0, |t| t.0);
1297 let node_count = self.store.node_count() as u64;
1298 let edge_count = self.store.edge_count() as u64;
1299
1300 fm.write_snapshot(
1301 &snapshot_data,
1302 epoch.0,
1303 transaction_id,
1304 node_count,
1305 edge_count,
1306 )?;
1307
1308 maybe_crash("checkpoint_to_file:after_write_snapshot");
1309 Ok(())
1310 }
1311
1312 #[cfg(feature = "grafeo-file")]
1314 #[must_use]
1315 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1316 self.file_manager.as_ref()
1317 }
1318}
1319
1320impl Drop for GrafeoDB {
1321 fn drop(&mut self) {
1322 if let Err(e) = self.close() {
1323 grafeo_error!("Error closing database: {}", e);
1324 }
1325 }
1326}
1327
1328impl crate::admin::AdminService for GrafeoDB {
1329 fn info(&self) -> crate::admin::DatabaseInfo {
1330 self.info()
1331 }
1332
1333 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1334 self.detailed_stats()
1335 }
1336
1337 fn schema(&self) -> crate::admin::SchemaInfo {
1338 self.schema()
1339 }
1340
1341 fn validate(&self) -> crate::admin::ValidationResult {
1342 self.validate()
1343 }
1344
1345 fn wal_status(&self) -> crate::admin::WalStatus {
1346 self.wal_status()
1347 }
1348
1349 fn wal_checkpoint(&self) -> Result<()> {
1350 self.wal_checkpoint()
1351 }
1352}
1353
1354#[derive(Debug)]
1384pub struct QueryResult {
1385 pub columns: Vec<String>,
1387 pub column_types: Vec<grafeo_common::types::LogicalType>,
1389 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1391 pub execution_time_ms: Option<f64>,
1393 pub rows_scanned: Option<u64>,
1395 pub status_message: Option<String>,
1397 pub gql_status: grafeo_common::utils::GqlStatus,
1399}
1400
1401impl QueryResult {
1402 #[must_use]
1404 pub fn empty() -> Self {
1405 Self {
1406 columns: Vec::new(),
1407 column_types: Vec::new(),
1408 rows: Vec::new(),
1409 execution_time_ms: None,
1410 rows_scanned: None,
1411 status_message: None,
1412 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1413 }
1414 }
1415
1416 #[must_use]
1418 pub fn status(msg: impl Into<String>) -> Self {
1419 Self {
1420 columns: Vec::new(),
1421 column_types: Vec::new(),
1422 rows: Vec::new(),
1423 execution_time_ms: None,
1424 rows_scanned: None,
1425 status_message: Some(msg.into()),
1426 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1427 }
1428 }
1429
1430 #[must_use]
1432 pub fn new(columns: Vec<String>) -> Self {
1433 let len = columns.len();
1434 Self {
1435 columns,
1436 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1437 rows: Vec::new(),
1438 execution_time_ms: None,
1439 rows_scanned: None,
1440 status_message: None,
1441 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1442 }
1443 }
1444
1445 #[must_use]
1447 pub fn with_types(
1448 columns: Vec<String>,
1449 column_types: Vec<grafeo_common::types::LogicalType>,
1450 ) -> Self {
1451 Self {
1452 columns,
1453 column_types,
1454 rows: Vec::new(),
1455 execution_time_ms: None,
1456 rows_scanned: None,
1457 status_message: None,
1458 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1459 }
1460 }
1461
1462 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1464 self.execution_time_ms = Some(execution_time_ms);
1465 self.rows_scanned = Some(rows_scanned);
1466 self
1467 }
1468
1469 #[must_use]
1471 pub fn execution_time_ms(&self) -> Option<f64> {
1472 self.execution_time_ms
1473 }
1474
1475 #[must_use]
1477 pub fn rows_scanned(&self) -> Option<u64> {
1478 self.rows_scanned
1479 }
1480
1481 #[must_use]
1483 pub fn row_count(&self) -> usize {
1484 self.rows.len()
1485 }
1486
1487 #[must_use]
1489 pub fn column_count(&self) -> usize {
1490 self.columns.len()
1491 }
1492
1493 #[must_use]
1495 pub fn is_empty(&self) -> bool {
1496 self.rows.is_empty()
1497 }
1498
1499 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1508 if self.rows.len() != 1 || self.columns.len() != 1 {
1509 return Err(grafeo_common::utils::error::Error::InvalidValue(
1510 "Expected single value".to_string(),
1511 ));
1512 }
1513 T::from_value(&self.rows[0][0])
1514 }
1515
1516 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1518 self.rows.iter()
1519 }
1520}
1521
1522impl std::fmt::Display for QueryResult {
1523 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1524 let table = grafeo_common::fmt::format_result_table(
1525 &self.columns,
1526 &self.rows,
1527 self.execution_time_ms,
1528 self.status_message.as_deref(),
1529 );
1530 f.write_str(&table)
1531 }
1532}
1533
1534pub trait FromValue: Sized {
1539 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1541}
1542
1543impl FromValue for i64 {
1544 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1545 value
1546 .as_int64()
1547 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1548 expected: "INT64".to_string(),
1549 found: value.type_name().to_string(),
1550 })
1551 }
1552}
1553
1554impl FromValue for f64 {
1555 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1556 value
1557 .as_float64()
1558 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1559 expected: "FLOAT64".to_string(),
1560 found: value.type_name().to_string(),
1561 })
1562 }
1563}
1564
1565impl FromValue for String {
1566 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1567 value.as_str().map(String::from).ok_or_else(|| {
1568 grafeo_common::utils::error::Error::TypeMismatch {
1569 expected: "STRING".to_string(),
1570 found: value.type_name().to_string(),
1571 }
1572 })
1573 }
1574}
1575
1576impl FromValue for bool {
1577 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1578 value
1579 .as_bool()
1580 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1581 expected: "BOOL".to_string(),
1582 found: value.type_name().to_string(),
1583 })
1584 }
1585}
1586
1587#[cfg(test)]
1588mod tests {
1589 use super::*;
1590
1591 #[test]
1592 fn test_create_in_memory_database() {
1593 let db = GrafeoDB::new_in_memory();
1594 assert_eq!(db.node_count(), 0);
1595 assert_eq!(db.edge_count(), 0);
1596 }
1597
1598 #[test]
1599 fn test_database_config() {
1600 let config = Config::in_memory().with_threads(4).with_query_logging();
1601
1602 let db = GrafeoDB::with_config(config).unwrap();
1603 assert_eq!(db.config().threads, 4);
1604 assert!(db.config().query_logging);
1605 }
1606
1607 #[test]
1608 fn test_database_session() {
1609 let db = GrafeoDB::new_in_memory();
1610 let _session = db.session();
1611 }
1613
1614 #[cfg(feature = "wal")]
1615 #[test]
1616 fn test_persistent_database_recovery() {
1617 use grafeo_common::types::Value;
1618 use tempfile::tempdir;
1619
1620 let dir = tempdir().unwrap();
1621 let db_path = dir.path().join("test_db");
1622
1623 {
1625 let db = GrafeoDB::open(&db_path).unwrap();
1626
1627 let alix = db.create_node(&["Person"]);
1628 db.set_node_property(alix, "name", Value::from("Alix"));
1629
1630 let gus = db.create_node(&["Person"]);
1631 db.set_node_property(gus, "name", Value::from("Gus"));
1632
1633 let _edge = db.create_edge(alix, gus, "KNOWS");
1634
1635 db.close().unwrap();
1637 }
1638
1639 {
1641 let db = GrafeoDB::open(&db_path).unwrap();
1642
1643 assert_eq!(db.node_count(), 2);
1644 assert_eq!(db.edge_count(), 1);
1645
1646 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1648 assert!(node0.is_some());
1649
1650 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1651 assert!(node1.is_some());
1652 }
1653 }
1654
1655 #[cfg(feature = "wal")]
1656 #[test]
1657 fn test_wal_logging() {
1658 use tempfile::tempdir;
1659
1660 let dir = tempdir().unwrap();
1661 let db_path = dir.path().join("wal_test_db");
1662
1663 let db = GrafeoDB::open(&db_path).unwrap();
1664
1665 let node = db.create_node(&["Test"]);
1667 db.delete_node(node);
1668
1669 if let Some(wal) = db.wal() {
1671 assert!(wal.record_count() > 0);
1672 }
1673
1674 db.close().unwrap();
1675 }
1676
1677 #[cfg(feature = "wal")]
1678 #[test]
1679 fn test_wal_recovery_multiple_sessions() {
1680 use grafeo_common::types::Value;
1682 use tempfile::tempdir;
1683
1684 let dir = tempdir().unwrap();
1685 let db_path = dir.path().join("multi_session_db");
1686
1687 {
1689 let db = GrafeoDB::open(&db_path).unwrap();
1690 let alix = db.create_node(&["Person"]);
1691 db.set_node_property(alix, "name", Value::from("Alix"));
1692 db.close().unwrap();
1693 }
1694
1695 {
1697 let db = GrafeoDB::open(&db_path).unwrap();
1698 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1700 db.set_node_property(gus, "name", Value::from("Gus"));
1701 db.close().unwrap();
1702 }
1703
1704 {
1706 let db = GrafeoDB::open(&db_path).unwrap();
1707 assert_eq!(db.node_count(), 2);
1708
1709 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1711 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1712
1713 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1714 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1715 }
1716 }
1717
1718 #[cfg(feature = "wal")]
1719 #[test]
1720 fn test_database_consistency_after_mutations() {
1721 use grafeo_common::types::Value;
1723 use tempfile::tempdir;
1724
1725 let dir = tempdir().unwrap();
1726 let db_path = dir.path().join("consistency_db");
1727
1728 {
1729 let db = GrafeoDB::open(&db_path).unwrap();
1730
1731 let a = db.create_node(&["Node"]);
1733 let b = db.create_node(&["Node"]);
1734 let c = db.create_node(&["Node"]);
1735
1736 let e1 = db.create_edge(a, b, "LINKS");
1738 let _e2 = db.create_edge(b, c, "LINKS");
1739
1740 db.delete_edge(e1);
1742 db.delete_node(b);
1743
1744 db.set_node_property(a, "value", Value::Int64(1));
1746 db.set_node_property(c, "value", Value::Int64(3));
1747
1748 db.close().unwrap();
1749 }
1750
1751 {
1753 let db = GrafeoDB::open(&db_path).unwrap();
1754
1755 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1759 assert!(node_a.is_some());
1760
1761 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1762 assert!(node_c.is_some());
1763
1764 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1766 assert!(node_b.is_none());
1767 }
1768 }
1769
1770 #[cfg(feature = "wal")]
1771 #[test]
1772 fn test_close_is_idempotent() {
1773 use tempfile::tempdir;
1775
1776 let dir = tempdir().unwrap();
1777 let db_path = dir.path().join("close_test_db");
1778
1779 let db = GrafeoDB::open(&db_path).unwrap();
1780 db.create_node(&["Test"]);
1781
1782 assert!(db.close().is_ok());
1784
1785 assert!(db.close().is_ok());
1787 }
1788
1789 #[test]
1790 fn test_with_store_external_backend() {
1791 use grafeo_core::graph::lpg::LpgStore;
1792
1793 let external = Arc::new(LpgStore::new().unwrap());
1794
1795 let n1 = external.create_node(&["Person"]);
1797 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1798
1799 let db = GrafeoDB::with_store(
1800 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1801 Config::in_memory(),
1802 )
1803 .unwrap();
1804
1805 let session = db.session();
1806
1807 #[cfg(feature = "gql")]
1809 {
1810 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1811 assert_eq!(result.rows.len(), 1);
1812 }
1813 }
1814
1815 #[test]
1816 fn test_with_config_custom_memory_limit() {
1817 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
1820 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1821 assert_eq!(db.node_count(), 0);
1822 }
1823
1824 #[cfg(feature = "metrics")]
1825 #[test]
1826 fn test_database_metrics_registry() {
1827 let db = GrafeoDB::new_in_memory();
1828
1829 db.create_node(&["Person"]);
1831 db.create_node(&["Person"]);
1832
1833 let snap = db.metrics();
1835 assert_eq!(snap.query_count, 0); }
1838
1839 #[test]
1840 fn test_query_result_has_metrics() {
1841 let db = GrafeoDB::new_in_memory();
1843 db.create_node(&["Person"]);
1844 db.create_node(&["Person"]);
1845
1846 #[cfg(feature = "gql")]
1847 {
1848 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1849
1850 assert!(result.execution_time_ms.is_some());
1852 assert!(result.rows_scanned.is_some());
1853 assert!(result.execution_time_ms.unwrap() >= 0.0);
1854 assert_eq!(result.rows_scanned.unwrap(), 2);
1855 }
1856 }
1857
1858 #[test]
1859 fn test_empty_query_result_metrics() {
1860 let db = GrafeoDB::new_in_memory();
1862 db.create_node(&["Person"]);
1863
1864 #[cfg(feature = "gql")]
1865 {
1866 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1868
1869 assert!(result.execution_time_ms.is_some());
1870 assert!(result.rows_scanned.is_some());
1871 assert_eq!(result.rows_scanned.unwrap(), 0);
1872 }
1873 }
1874
1875 #[cfg(feature = "cdc")]
1876 mod cdc_integration {
1877 use super::*;
1878
1879 #[test]
1880 fn test_node_lifecycle_history() {
1881 let db = GrafeoDB::new_in_memory();
1882
1883 let id = db.create_node(&["Person"]);
1885 db.set_node_property(id, "name", "Alix".into());
1887 db.set_node_property(id, "name", "Gus".into());
1888 db.delete_node(id);
1890
1891 let history = db.history(id).unwrap();
1892 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1894 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1895 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1897 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1899 }
1900
1901 #[test]
1902 fn test_edge_lifecycle_history() {
1903 let db = GrafeoDB::new_in_memory();
1904
1905 let alix = db.create_node(&["Person"]);
1906 let gus = db.create_node(&["Person"]);
1907 let edge = db.create_edge(alix, gus, "KNOWS");
1908 db.set_edge_property(edge, "since", 2024i64.into());
1909 db.delete_edge(edge);
1910
1911 let history = db.history(edge).unwrap();
1912 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1914 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1915 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1916 }
1917
1918 #[test]
1919 fn test_create_node_with_props_cdc() {
1920 let db = GrafeoDB::new_in_memory();
1921
1922 let id = db.create_node_with_props(
1923 &["Person"],
1924 vec![
1925 ("name", grafeo_common::types::Value::from("Alix")),
1926 ("age", grafeo_common::types::Value::from(30i64)),
1927 ],
1928 );
1929
1930 let history = db.history(id).unwrap();
1931 assert_eq!(history.len(), 1);
1932 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1933 let after = history[0].after.as_ref().unwrap();
1935 assert_eq!(after.len(), 2);
1936 }
1937
1938 #[test]
1939 fn test_changes_between() {
1940 let db = GrafeoDB::new_in_memory();
1941
1942 let id1 = db.create_node(&["A"]);
1943 let _id2 = db.create_node(&["B"]);
1944 db.set_node_property(id1, "x", 1i64.into());
1945
1946 let changes = db
1948 .changes_between(
1949 grafeo_common::types::EpochId(0),
1950 grafeo_common::types::EpochId(u64::MAX),
1951 )
1952 .unwrap();
1953 assert_eq!(changes.len(), 3); }
1955 }
1956
1957 #[test]
1958 fn test_with_store_basic() {
1959 use grafeo_core::graph::lpg::LpgStore;
1960
1961 let store = Arc::new(LpgStore::new().unwrap());
1962 let n1 = store.create_node(&["Person"]);
1963 store.set_node_property(n1, "name", "Alix".into());
1964
1965 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1966 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1967
1968 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1969 assert_eq!(result.rows.len(), 1);
1970 }
1971
1972 #[test]
1973 fn test_with_store_session() {
1974 use grafeo_core::graph::lpg::LpgStore;
1975
1976 let store = Arc::new(LpgStore::new().unwrap());
1977 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1978 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1979
1980 let session = db.session();
1981 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1982 assert_eq!(result.rows.len(), 1);
1983 }
1984
1985 #[test]
1986 fn test_with_store_mutations() {
1987 use grafeo_core::graph::lpg::LpgStore;
1988
1989 let store = Arc::new(LpgStore::new().unwrap());
1990 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1991 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1992
1993 let mut session = db.session();
1994
1995 session.begin_transaction().unwrap();
1999 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2000
2001 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2002 assert_eq!(result.rows.len(), 1);
2003
2004 session.commit().unwrap();
2005 }
2006}