1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27use grafeo_common::grafeo_error;
28#[cfg(feature = "wal")]
29use std::path::Path;
30use std::sync::Arc;
31use std::sync::atomic::AtomicUsize;
32
33use parking_lot::RwLock;
34
35#[cfg(feature = "grafeo-file")]
36use grafeo_adapters::storage::file::GrafeoFileManager;
37#[cfg(feature = "wal")]
38use grafeo_adapters::storage::wal::{
39 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
40};
41use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
42use grafeo_common::utils::error::Result;
43use grafeo_core::graph::GraphStoreMut;
44use grafeo_core::graph::lpg::LpgStore;
45#[cfg(feature = "rdf")]
46use grafeo_core::graph::rdf::RdfStore;
47
48use crate::catalog::Catalog;
49use crate::config::Config;
50use crate::query::cache::QueryCache;
51use crate::session::Session;
52use crate::transaction::TransactionManager;
53
54pub struct GrafeoDB {
77 pub(super) config: Config,
79 pub(super) store: Arc<LpgStore>,
81 pub(super) catalog: Arc<Catalog>,
83 #[cfg(feature = "rdf")]
85 pub(super) rdf_store: Arc<RdfStore>,
86 pub(super) transaction_manager: Arc<TransactionManager>,
88 pub(super) buffer_manager: Arc<BufferManager>,
90 #[cfg(feature = "wal")]
92 pub(super) wal: Option<Arc<LpgWal>>,
93 #[cfg(feature = "wal")]
97 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
98 pub(super) query_cache: Arc<QueryCache>,
100 pub(super) commit_counter: Arc<AtomicUsize>,
102 pub(super) is_open: RwLock<bool>,
104 #[cfg(feature = "cdc")]
106 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
107 #[cfg(feature = "embed")]
109 pub(super) embedding_models:
110 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
111 #[cfg(feature = "grafeo-file")]
113 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
114 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
117 #[cfg(feature = "metrics")]
119 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
120 current_graph: RwLock<Option<String>>,
124 current_schema: RwLock<Option<String>>,
128 read_only: bool,
131}
132
133impl GrafeoDB {
134 #[must_use]
155 pub fn new_in_memory() -> Self {
156 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
157 }
158
159 #[cfg(feature = "wal")]
178 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
179 Self::with_config(Config::persistent(path.as_ref()))
180 }
181
182 #[cfg(feature = "grafeo-file")]
207 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
208 Self::with_config(Config::read_only(path.as_ref()))
209 }
210
211 pub fn with_config(config: Config) -> Result<Self> {
235 config
237 .validate()
238 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
239
240 let store = Arc::new(LpgStore::new()?);
241 #[cfg(feature = "rdf")]
242 let rdf_store = Arc::new(RdfStore::new());
243 let transaction_manager = Arc::new(TransactionManager::new());
244
245 let buffer_config = BufferManagerConfig {
247 budget: config.memory_limit.unwrap_or_else(|| {
248 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
249 }),
250 spill_path: config
251 .spill_path
252 .clone()
253 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
254 ..BufferManagerConfig::default()
255 };
256 let buffer_manager = BufferManager::new(buffer_config);
257
258 let catalog = Arc::new(Catalog::new());
260
261 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
262
263 #[cfg(feature = "grafeo-file")]
265 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
266 if let Some(ref db_path) = config.path {
268 if db_path.exists() && db_path.is_file() {
269 let fm = GrafeoFileManager::open_read_only(db_path)?;
270 let snapshot_data = fm.read_snapshot()?;
271 if !snapshot_data.is_empty() {
272 Self::apply_snapshot_data(
273 &store,
274 &catalog,
275 #[cfg(feature = "rdf")]
276 &rdf_store,
277 &snapshot_data,
278 )?;
279 }
280 Some(Arc::new(fm))
281 } else {
282 return Err(grafeo_common::utils::error::Error::Internal(format!(
283 "read-only open requires an existing .grafeo file: {}",
284 db_path.display()
285 )));
286 }
287 } else {
288 return Err(grafeo_common::utils::error::Error::Internal(
289 "read-only mode requires a database path".to_string(),
290 ));
291 }
292 } else if let Some(ref db_path) = config.path {
293 if Self::should_use_single_file(db_path, config.storage_format) {
298 let fm = if db_path.exists() && db_path.is_file() {
299 GrafeoFileManager::open(db_path)?
300 } else if !db_path.exists() {
301 GrafeoFileManager::create(db_path)?
302 } else {
303 return Err(grafeo_common::utils::error::Error::Internal(format!(
305 "path exists but is not a file: {}",
306 db_path.display()
307 )));
308 };
309
310 let snapshot_data = fm.read_snapshot()?;
312 if !snapshot_data.is_empty() {
313 Self::apply_snapshot_data(
314 &store,
315 &catalog,
316 #[cfg(feature = "rdf")]
317 &rdf_store,
318 &snapshot_data,
319 )?;
320 }
321
322 #[cfg(feature = "wal")]
324 if config.wal_enabled && fm.has_sidecar_wal() {
325 let recovery = WalRecovery::new(fm.sidecar_wal_path());
326 let records = recovery.recover()?;
327 Self::apply_wal_records(
328 &store,
329 &catalog,
330 #[cfg(feature = "rdf")]
331 &rdf_store,
332 &records,
333 )?;
334 }
335
336 Some(Arc::new(fm))
337 } else {
338 None
339 }
340 } else {
341 None
342 };
343
344 #[cfg(feature = "wal")]
347 let wal = if is_read_only {
348 None
349 } else if config.wal_enabled {
350 if let Some(ref db_path) = config.path {
351 #[cfg(feature = "grafeo-file")]
353 let wal_path = if let Some(ref fm) = file_manager {
354 let p = fm.sidecar_wal_path();
355 std::fs::create_dir_all(&p)?;
356 p
357 } else {
358 std::fs::create_dir_all(db_path)?;
360 db_path.join("wal")
361 };
362
363 #[cfg(not(feature = "grafeo-file"))]
364 let wal_path = {
365 std::fs::create_dir_all(db_path)?;
366 db_path.join("wal")
367 };
368
369 #[cfg(feature = "grafeo-file")]
371 let is_single_file = file_manager.is_some();
372 #[cfg(not(feature = "grafeo-file"))]
373 let is_single_file = false;
374
375 if !is_single_file && wal_path.exists() {
376 let recovery = WalRecovery::new(&wal_path);
377 let records = recovery.recover()?;
378 Self::apply_wal_records(
379 &store,
380 &catalog,
381 #[cfg(feature = "rdf")]
382 &rdf_store,
383 &records,
384 )?;
385 }
386
387 let wal_durability = match config.wal_durability {
389 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
390 crate::config::DurabilityMode::Batch {
391 max_delay_ms,
392 max_records,
393 } => WalDurabilityMode::Batch {
394 max_delay_ms,
395 max_records,
396 },
397 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
398 WalDurabilityMode::Adaptive { target_interval_ms }
399 }
400 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
401 };
402 let wal_config = WalConfig {
403 durability: wal_durability,
404 ..WalConfig::default()
405 };
406 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
407 Some(Arc::new(wal_manager))
408 } else {
409 None
410 }
411 } else {
412 None
413 };
414
415 let query_cache = Arc::new(QueryCache::default());
417
418 #[cfg(feature = "temporal")]
421 transaction_manager.sync_epoch(store.current_epoch());
422
423 Ok(Self {
424 config,
425 store,
426 catalog,
427 #[cfg(feature = "rdf")]
428 rdf_store,
429 transaction_manager,
430 buffer_manager,
431 #[cfg(feature = "wal")]
432 wal,
433 #[cfg(feature = "wal")]
434 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
435 query_cache,
436 commit_counter: Arc::new(AtomicUsize::new(0)),
437 is_open: RwLock::new(true),
438 #[cfg(feature = "cdc")]
439 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
440 #[cfg(feature = "embed")]
441 embedding_models: RwLock::new(hashbrown::HashMap::new()),
442 #[cfg(feature = "grafeo-file")]
443 file_manager,
444 external_store: None,
445 #[cfg(feature = "metrics")]
446 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
447 current_graph: RwLock::new(None),
448 current_schema: RwLock::new(None),
449 read_only: is_read_only,
450 })
451 }
452
453 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
478 config
479 .validate()
480 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
481
482 let dummy_store = Arc::new(LpgStore::new()?);
483 let transaction_manager = Arc::new(TransactionManager::new());
484
485 let buffer_config = BufferManagerConfig {
486 budget: config.memory_limit.unwrap_or_else(|| {
487 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
488 }),
489 spill_path: None,
490 ..BufferManagerConfig::default()
491 };
492 let buffer_manager = BufferManager::new(buffer_config);
493
494 let query_cache = Arc::new(QueryCache::default());
495
496 Ok(Self {
497 config,
498 store: dummy_store,
499 catalog: Arc::new(Catalog::new()),
500 #[cfg(feature = "rdf")]
501 rdf_store: Arc::new(RdfStore::new()),
502 transaction_manager,
503 buffer_manager,
504 #[cfg(feature = "wal")]
505 wal: None,
506 #[cfg(feature = "wal")]
507 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
508 query_cache,
509 commit_counter: Arc::new(AtomicUsize::new(0)),
510 is_open: RwLock::new(true),
511 #[cfg(feature = "cdc")]
512 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
513 #[cfg(feature = "embed")]
514 embedding_models: RwLock::new(hashbrown::HashMap::new()),
515 #[cfg(feature = "grafeo-file")]
516 file_manager: None,
517 external_store: Some(store),
518 #[cfg(feature = "metrics")]
519 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
520 current_graph: RwLock::new(None),
521 current_schema: RwLock::new(None),
522 read_only: false,
523 })
524 }
525
526 #[cfg(feature = "wal")]
532 fn apply_wal_records(
533 store: &Arc<LpgStore>,
534 catalog: &Catalog,
535 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
536 records: &[WalRecord],
537 ) -> Result<()> {
538 use crate::catalog::{
539 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
540 };
541 use grafeo_common::utils::error::Error;
542
543 let mut current_graph: Option<String> = None;
546 let mut target_store: Arc<LpgStore> = Arc::clone(store);
547
548 for record in records {
549 match record {
550 WalRecord::CreateNamedGraph { name } => {
552 let _ = store.create_graph(name);
553 }
554 WalRecord::DropNamedGraph { name } => {
555 store.drop_graph(name);
556 if current_graph.as_deref() == Some(name.as_str()) {
558 current_graph = None;
559 target_store = Arc::clone(store);
560 }
561 }
562 WalRecord::SwitchGraph { name } => {
563 current_graph.clone_from(name);
564 target_store = match ¤t_graph {
565 None => Arc::clone(store),
566 Some(graph_name) => store
567 .graph_or_create(graph_name)
568 .map_err(|e| Error::Internal(e.to_string()))?,
569 };
570 }
571
572 WalRecord::CreateNode { id, labels } => {
574 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
575 target_store.create_node_with_id(*id, &label_refs)?;
576 }
577 WalRecord::DeleteNode { id } => {
578 target_store.delete_node(*id);
579 }
580 WalRecord::CreateEdge {
581 id,
582 src,
583 dst,
584 edge_type,
585 } => {
586 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
587 }
588 WalRecord::DeleteEdge { id } => {
589 target_store.delete_edge(*id);
590 }
591 WalRecord::SetNodeProperty { id, key, value } => {
592 target_store.set_node_property(*id, key, value.clone());
593 }
594 WalRecord::SetEdgeProperty { id, key, value } => {
595 target_store.set_edge_property(*id, key, value.clone());
596 }
597 WalRecord::AddNodeLabel { id, label } => {
598 target_store.add_label(*id, label);
599 }
600 WalRecord::RemoveNodeLabel { id, label } => {
601 target_store.remove_label(*id, label);
602 }
603 WalRecord::RemoveNodeProperty { id, key } => {
604 target_store.remove_node_property(*id, key);
605 }
606 WalRecord::RemoveEdgeProperty { id, key } => {
607 target_store.remove_edge_property(*id, key);
608 }
609
610 WalRecord::CreateNodeType {
612 name,
613 properties,
614 constraints,
615 } => {
616 let def = NodeTypeDefinition {
617 name: name.clone(),
618 properties: properties
619 .iter()
620 .map(|(n, t, nullable)| TypedProperty {
621 name: n.clone(),
622 data_type: PropertyDataType::from_type_name(t),
623 nullable: *nullable,
624 default_value: None,
625 })
626 .collect(),
627 constraints: constraints
628 .iter()
629 .map(|(kind, props)| match kind.as_str() {
630 "unique" => TypeConstraint::Unique(props.clone()),
631 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
632 "not_null" if !props.is_empty() => {
633 TypeConstraint::NotNull(props[0].clone())
634 }
635 _ => TypeConstraint::Unique(props.clone()),
636 })
637 .collect(),
638 parent_types: Vec::new(),
639 };
640 let _ = catalog.register_node_type(def);
641 }
642 WalRecord::DropNodeType { name } => {
643 let _ = catalog.drop_node_type(name);
644 }
645 WalRecord::CreateEdgeType {
646 name,
647 properties,
648 constraints,
649 } => {
650 let def = EdgeTypeDefinition {
651 name: name.clone(),
652 properties: properties
653 .iter()
654 .map(|(n, t, nullable)| TypedProperty {
655 name: n.clone(),
656 data_type: PropertyDataType::from_type_name(t),
657 nullable: *nullable,
658 default_value: None,
659 })
660 .collect(),
661 constraints: constraints
662 .iter()
663 .map(|(kind, props)| match kind.as_str() {
664 "unique" => TypeConstraint::Unique(props.clone()),
665 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
666 "not_null" if !props.is_empty() => {
667 TypeConstraint::NotNull(props[0].clone())
668 }
669 _ => TypeConstraint::Unique(props.clone()),
670 })
671 .collect(),
672 source_node_types: Vec::new(),
673 target_node_types: Vec::new(),
674 };
675 let _ = catalog.register_edge_type_def(def);
676 }
677 WalRecord::DropEdgeType { name } => {
678 let _ = catalog.drop_edge_type_def(name);
679 }
680 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
681 }
684 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
685 }
688 WalRecord::CreateGraphType {
689 name,
690 node_types,
691 edge_types,
692 open,
693 } => {
694 use crate::catalog::GraphTypeDefinition;
695 let def = GraphTypeDefinition {
696 name: name.clone(),
697 allowed_node_types: node_types.clone(),
698 allowed_edge_types: edge_types.clone(),
699 open: *open,
700 };
701 let _ = catalog.register_graph_type(def);
702 }
703 WalRecord::DropGraphType { name } => {
704 let _ = catalog.drop_graph_type(name);
705 }
706 WalRecord::CreateSchema { name } => {
707 let _ = catalog.register_schema_namespace(name.clone());
708 }
709 WalRecord::DropSchema { name } => {
710 let _ = catalog.drop_schema_namespace(name);
711 }
712
713 WalRecord::AlterNodeType { name, alterations } => {
714 for (action, prop_name, type_name, nullable) in alterations {
715 match action.as_str() {
716 "add" => {
717 let prop = TypedProperty {
718 name: prop_name.clone(),
719 data_type: PropertyDataType::from_type_name(type_name),
720 nullable: *nullable,
721 default_value: None,
722 };
723 let _ = catalog.alter_node_type_add_property(name, prop);
724 }
725 "drop" => {
726 let _ = catalog.alter_node_type_drop_property(name, prop_name);
727 }
728 _ => {}
729 }
730 }
731 }
732 WalRecord::AlterEdgeType { name, alterations } => {
733 for (action, prop_name, type_name, nullable) in alterations {
734 match action.as_str() {
735 "add" => {
736 let prop = TypedProperty {
737 name: prop_name.clone(),
738 data_type: PropertyDataType::from_type_name(type_name),
739 nullable: *nullable,
740 default_value: None,
741 };
742 let _ = catalog.alter_edge_type_add_property(name, prop);
743 }
744 "drop" => {
745 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
746 }
747 _ => {}
748 }
749 }
750 }
751 WalRecord::AlterGraphType { name, alterations } => {
752 for (action, type_name) in alterations {
753 match action.as_str() {
754 "add_node" => {
755 let _ =
756 catalog.alter_graph_type_add_node_type(name, type_name.clone());
757 }
758 "drop_node" => {
759 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
760 }
761 "add_edge" => {
762 let _ =
763 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
764 }
765 "drop_edge" => {
766 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
767 }
768 _ => {}
769 }
770 }
771 }
772
773 WalRecord::CreateProcedure {
774 name,
775 params,
776 returns,
777 body,
778 } => {
779 use crate::catalog::ProcedureDefinition;
780 let def = ProcedureDefinition {
781 name: name.clone(),
782 params: params.clone(),
783 returns: returns.clone(),
784 body: body.clone(),
785 };
786 let _ = catalog.register_procedure(def);
787 }
788 WalRecord::DropProcedure { name } => {
789 let _ = catalog.drop_procedure(name);
790 }
791
792 #[cfg(feature = "rdf")]
794 WalRecord::InsertRdfTriple { .. }
795 | WalRecord::DeleteRdfTriple { .. }
796 | WalRecord::ClearRdfGraph { .. }
797 | WalRecord::CreateRdfGraph { .. }
798 | WalRecord::DropRdfGraph { .. } => {
799 rdf_ops::replay_rdf_wal_record(rdf_store, record);
800 }
801 #[cfg(not(feature = "rdf"))]
802 WalRecord::InsertRdfTriple { .. }
803 | WalRecord::DeleteRdfTriple { .. }
804 | WalRecord::ClearRdfGraph { .. }
805 | WalRecord::CreateRdfGraph { .. }
806 | WalRecord::DropRdfGraph { .. } => {}
807
808 WalRecord::TransactionCommit { .. } => {
809 #[cfg(feature = "temporal")]
813 {
814 target_store.new_epoch();
815 }
816 }
817 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
818 }
821 }
822 }
823 Ok(())
824 }
825
826 #[cfg(feature = "grafeo-file")]
832 fn should_use_single_file(
833 path: &std::path::Path,
834 configured: crate::config::StorageFormat,
835 ) -> bool {
836 use crate::config::StorageFormat;
837 match configured {
838 StorageFormat::SingleFile => true,
839 StorageFormat::WalDirectory => false,
840 StorageFormat::Auto => {
841 if path.is_file() {
843 if let Ok(mut f) = std::fs::File::open(path) {
844 use std::io::Read;
845 let mut magic = [0u8; 4];
846 if f.read_exact(&mut magic).is_ok()
847 && magic == grafeo_adapters::storage::file::MAGIC
848 {
849 return true;
850 }
851 }
852 return false;
853 }
854 if path.is_dir() {
856 return false;
857 }
858 path.extension().is_some_and(|ext| ext == "grafeo")
860 }
861 }
862 }
863
864 #[cfg(feature = "grafeo-file")]
866 fn apply_snapshot_data(
867 store: &Arc<LpgStore>,
868 catalog: &Arc<crate::catalog::Catalog>,
869 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
870 data: &[u8],
871 ) -> Result<()> {
872 persistence::load_snapshot_into_store(
873 store,
874 catalog,
875 #[cfg(feature = "rdf")]
876 rdf_store,
877 data,
878 )
879 }
880
881 #[must_use]
909 pub fn session(&self) -> Session {
910 let session_cfg = || crate::session::SessionConfig {
911 transaction_manager: Arc::clone(&self.transaction_manager),
912 query_cache: Arc::clone(&self.query_cache),
913 catalog: Arc::clone(&self.catalog),
914 adaptive_config: self.config.adaptive.clone(),
915 factorized_execution: self.config.factorized_execution,
916 graph_model: self.config.graph_model,
917 query_timeout: self.config.query_timeout,
918 commit_counter: Arc::clone(&self.commit_counter),
919 gc_interval: self.config.gc_interval,
920 read_only: self.read_only,
921 };
922
923 if let Some(ref ext_store) = self.external_store {
924 return Session::with_external_store(Arc::clone(ext_store), session_cfg())
925 .expect("arena allocation for external store session");
926 }
927
928 #[cfg(feature = "rdf")]
929 let mut session = Session::with_rdf_store_and_adaptive(
930 Arc::clone(&self.store),
931 Arc::clone(&self.rdf_store),
932 session_cfg(),
933 );
934 #[cfg(not(feature = "rdf"))]
935 let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
936
937 #[cfg(feature = "wal")]
938 if let Some(ref wal) = self.wal {
939 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
940 }
941
942 #[cfg(feature = "cdc")]
943 session.set_cdc_log(Arc::clone(&self.cdc_log));
944
945 #[cfg(feature = "metrics")]
946 {
947 if let Some(ref m) = self.metrics {
948 session.set_metrics(Arc::clone(m));
949 m.session_created
950 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
951 m.session_active
952 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
953 }
954 }
955
956 if let Some(ref graph) = *self.current_graph.read() {
958 session.use_graph(graph);
959 }
960
961 if let Some(ref schema) = *self.current_schema.read() {
963 session.set_schema(schema);
964 }
965
966 let _ = &mut session;
968
969 session
970 }
971
972 #[must_use]
978 pub fn current_graph(&self) -> Option<String> {
979 self.current_graph.read().clone()
980 }
981
982 pub fn set_current_graph(&self, name: Option<&str>) {
987 *self.current_graph.write() = name.map(ToString::to_string);
988 }
989
990 #[must_use]
995 pub fn current_schema(&self) -> Option<String> {
996 self.current_schema.read().clone()
997 }
998
999 pub fn set_current_schema(&self, name: Option<&str>) {
1004 *self.current_schema.write() = name.map(ToString::to_string);
1005 }
1006
1007 #[must_use]
1009 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1010 &self.config.adaptive
1011 }
1012
1013 #[must_use]
1015 pub fn is_read_only(&self) -> bool {
1016 self.read_only
1017 }
1018
1019 #[must_use]
1021 pub fn config(&self) -> &Config {
1022 &self.config
1023 }
1024
1025 #[must_use]
1027 pub fn graph_model(&self) -> crate::config::GraphModel {
1028 self.config.graph_model
1029 }
1030
1031 #[must_use]
1033 pub fn memory_limit(&self) -> Option<usize> {
1034 self.config.memory_limit
1035 }
1036
1037 #[cfg(feature = "metrics")]
1042 #[must_use]
1043 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1044 let mut snapshot = self
1045 .metrics
1046 .as_ref()
1047 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1048
1049 let cache_stats = self.query_cache.stats();
1051 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1052 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1053 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1054 snapshot.cache_invalidations = cache_stats.invalidations;
1055
1056 snapshot
1057 }
1058
1059 #[cfg(feature = "metrics")]
1063 #[must_use]
1064 pub fn metrics_prometheus(&self) -> String {
1065 self.metrics
1066 .as_ref()
1067 .map_or_else(String::new, |m| m.to_prometheus())
1068 }
1069
1070 #[cfg(feature = "metrics")]
1072 pub fn reset_metrics(&self) {
1073 if let Some(ref m) = self.metrics {
1074 m.reset();
1075 }
1076 self.query_cache.reset_stats();
1077 }
1078
1079 #[must_use]
1087 pub fn store(&self) -> &Arc<LpgStore> {
1088 &self.store
1089 }
1090
1091 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
1098 let graph_name = self.current_graph.read().clone();
1099 match graph_name {
1100 None => Arc::clone(&self.store),
1101 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
1102 Some(ref name) => self
1103 .store
1104 .graph(name)
1105 .unwrap_or_else(|| Arc::clone(&self.store)),
1106 }
1107 }
1108
1109 pub fn create_graph(&self, name: &str) -> Result<bool> {
1117 Ok(self.store.create_graph(name)?)
1118 }
1119
1120 pub fn drop_graph(&self, name: &str) -> bool {
1122 self.store.drop_graph(name)
1123 }
1124
1125 #[must_use]
1127 pub fn list_graphs(&self) -> Vec<String> {
1128 self.store.graph_names()
1129 }
1130
1131 #[must_use]
1139 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1140 if let Some(ref ext_store) = self.external_store {
1141 Arc::clone(ext_store)
1142 } else {
1143 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1144 }
1145 }
1146
1147 pub fn gc(&self) {
1153 let min_epoch = self.transaction_manager.min_active_epoch();
1154 self.store.gc_versions(min_epoch);
1155 self.transaction_manager.gc();
1156 }
1157
1158 #[must_use]
1160 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1161 &self.buffer_manager
1162 }
1163
1164 #[must_use]
1166 pub fn query_cache(&self) -> &Arc<QueryCache> {
1167 &self.query_cache
1168 }
1169
1170 pub fn clear_plan_cache(&self) {
1176 self.query_cache.clear();
1177 }
1178
1179 pub fn close(&self) -> Result<()> {
1193 let mut is_open = self.is_open.write();
1194 if !*is_open {
1195 return Ok(());
1196 }
1197
1198 if self.read_only {
1200 #[cfg(feature = "grafeo-file")]
1201 if let Some(ref fm) = self.file_manager {
1202 fm.close()?;
1203 }
1204 *is_open = false;
1205 return Ok(());
1206 }
1207
1208 #[cfg(feature = "grafeo-file")]
1212 let is_single_file = self.file_manager.is_some();
1213 #[cfg(not(feature = "grafeo-file"))]
1214 let is_single_file = false;
1215
1216 #[cfg(feature = "grafeo-file")]
1217 if let Some(ref fm) = self.file_manager {
1218 #[cfg(feature = "wal")]
1220 if let Some(ref wal) = self.wal {
1221 wal.sync()?;
1222 }
1223 self.checkpoint_to_file(fm)?;
1224
1225 #[cfg(feature = "wal")]
1228 if let Some(ref wal) = self.wal {
1229 wal.close_active_log();
1230 }
1231
1232 {
1233 use grafeo_core::testing::crash::maybe_crash;
1234 maybe_crash("close:before_remove_sidecar_wal");
1235 }
1236 fm.remove_sidecar_wal()?;
1237 fm.close()?;
1238 }
1239
1240 #[cfg(feature = "wal")]
1246 if !is_single_file && let Some(ref wal) = self.wal {
1247 let commit_tx = self
1249 .transaction_manager
1250 .last_assigned_transaction_id()
1251 .unwrap_or_else(|| self.transaction_manager.begin());
1252
1253 wal.log(&WalRecord::TransactionCommit {
1255 transaction_id: commit_tx,
1256 })?;
1257
1258 wal.sync()?;
1259 }
1260
1261 *is_open = false;
1262 Ok(())
1263 }
1264
1265 #[cfg(feature = "wal")]
1267 #[must_use]
1268 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1269 self.wal.as_ref()
1270 }
1271
1272 #[cfg(feature = "wal")]
1274 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1275 if let Some(ref wal) = self.wal {
1276 wal.log(record)?;
1277 }
1278 Ok(())
1279 }
1280
1281 #[cfg(feature = "grafeo-file")]
1287 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1288 use grafeo_core::testing::crash::maybe_crash;
1289
1290 maybe_crash("checkpoint_to_file:before_export");
1291 let snapshot_data = self.export_snapshot()?;
1292 maybe_crash("checkpoint_to_file:after_export");
1293
1294 let epoch = self.store.current_epoch();
1295 let transaction_id = self
1296 .transaction_manager
1297 .last_assigned_transaction_id()
1298 .map_or(0, |t| t.0);
1299 let node_count = self.store.node_count() as u64;
1300 let edge_count = self.store.edge_count() as u64;
1301
1302 fm.write_snapshot(
1303 &snapshot_data,
1304 epoch.0,
1305 transaction_id,
1306 node_count,
1307 edge_count,
1308 )?;
1309
1310 maybe_crash("checkpoint_to_file:after_write_snapshot");
1311 Ok(())
1312 }
1313
1314 #[cfg(feature = "grafeo-file")]
1316 #[must_use]
1317 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1318 self.file_manager.as_ref()
1319 }
1320}
1321
1322impl Drop for GrafeoDB {
1323 fn drop(&mut self) {
1324 if let Err(e) = self.close() {
1325 grafeo_error!("Error closing database: {}", e);
1326 }
1327 }
1328}
1329
1330impl crate::admin::AdminService for GrafeoDB {
1331 fn info(&self) -> crate::admin::DatabaseInfo {
1332 self.info()
1333 }
1334
1335 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1336 self.detailed_stats()
1337 }
1338
1339 fn schema(&self) -> crate::admin::SchemaInfo {
1340 self.schema()
1341 }
1342
1343 fn validate(&self) -> crate::admin::ValidationResult {
1344 self.validate()
1345 }
1346
1347 fn wal_status(&self) -> crate::admin::WalStatus {
1348 self.wal_status()
1349 }
1350
1351 fn wal_checkpoint(&self) -> Result<()> {
1352 self.wal_checkpoint()
1353 }
1354}
1355
1356#[derive(Debug)]
1386pub struct QueryResult {
1387 pub columns: Vec<String>,
1389 pub column_types: Vec<grafeo_common::types::LogicalType>,
1391 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1393 pub execution_time_ms: Option<f64>,
1395 pub rows_scanned: Option<u64>,
1397 pub status_message: Option<String>,
1399 pub gql_status: grafeo_common::utils::GqlStatus,
1401}
1402
1403impl QueryResult {
1404 #[must_use]
1406 pub fn empty() -> Self {
1407 Self {
1408 columns: Vec::new(),
1409 column_types: Vec::new(),
1410 rows: Vec::new(),
1411 execution_time_ms: None,
1412 rows_scanned: None,
1413 status_message: None,
1414 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1415 }
1416 }
1417
1418 #[must_use]
1420 pub fn status(msg: impl Into<String>) -> Self {
1421 Self {
1422 columns: Vec::new(),
1423 column_types: Vec::new(),
1424 rows: Vec::new(),
1425 execution_time_ms: None,
1426 rows_scanned: None,
1427 status_message: Some(msg.into()),
1428 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1429 }
1430 }
1431
1432 #[must_use]
1434 pub fn new(columns: Vec<String>) -> Self {
1435 let len = columns.len();
1436 Self {
1437 columns,
1438 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1439 rows: Vec::new(),
1440 execution_time_ms: None,
1441 rows_scanned: None,
1442 status_message: None,
1443 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1444 }
1445 }
1446
1447 #[must_use]
1449 pub fn with_types(
1450 columns: Vec<String>,
1451 column_types: Vec<grafeo_common::types::LogicalType>,
1452 ) -> Self {
1453 Self {
1454 columns,
1455 column_types,
1456 rows: Vec::new(),
1457 execution_time_ms: None,
1458 rows_scanned: None,
1459 status_message: None,
1460 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1461 }
1462 }
1463
1464 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1466 self.execution_time_ms = Some(execution_time_ms);
1467 self.rows_scanned = Some(rows_scanned);
1468 self
1469 }
1470
1471 #[must_use]
1473 pub fn execution_time_ms(&self) -> Option<f64> {
1474 self.execution_time_ms
1475 }
1476
1477 #[must_use]
1479 pub fn rows_scanned(&self) -> Option<u64> {
1480 self.rows_scanned
1481 }
1482
1483 #[must_use]
1485 pub fn row_count(&self) -> usize {
1486 self.rows.len()
1487 }
1488
1489 #[must_use]
1491 pub fn column_count(&self) -> usize {
1492 self.columns.len()
1493 }
1494
1495 #[must_use]
1497 pub fn is_empty(&self) -> bool {
1498 self.rows.is_empty()
1499 }
1500
1501 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1510 if self.rows.len() != 1 || self.columns.len() != 1 {
1511 return Err(grafeo_common::utils::error::Error::InvalidValue(
1512 "Expected single value".to_string(),
1513 ));
1514 }
1515 T::from_value(&self.rows[0][0])
1516 }
1517
1518 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1520 self.rows.iter()
1521 }
1522}
1523
1524impl std::fmt::Display for QueryResult {
1525 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1526 let table = grafeo_common::fmt::format_result_table(
1527 &self.columns,
1528 &self.rows,
1529 self.execution_time_ms,
1530 self.status_message.as_deref(),
1531 );
1532 f.write_str(&table)
1533 }
1534}
1535
1536pub trait FromValue: Sized {
1541 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1543}
1544
1545impl FromValue for i64 {
1546 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1547 value
1548 .as_int64()
1549 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1550 expected: "INT64".to_string(),
1551 found: value.type_name().to_string(),
1552 })
1553 }
1554}
1555
1556impl FromValue for f64 {
1557 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1558 value
1559 .as_float64()
1560 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1561 expected: "FLOAT64".to_string(),
1562 found: value.type_name().to_string(),
1563 })
1564 }
1565}
1566
1567impl FromValue for String {
1568 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1569 value.as_str().map(String::from).ok_or_else(|| {
1570 grafeo_common::utils::error::Error::TypeMismatch {
1571 expected: "STRING".to_string(),
1572 found: value.type_name().to_string(),
1573 }
1574 })
1575 }
1576}
1577
1578impl FromValue for bool {
1579 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1580 value
1581 .as_bool()
1582 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1583 expected: "BOOL".to_string(),
1584 found: value.type_name().to_string(),
1585 })
1586 }
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591 use super::*;
1592
1593 #[test]
1594 fn test_create_in_memory_database() {
1595 let db = GrafeoDB::new_in_memory();
1596 assert_eq!(db.node_count(), 0);
1597 assert_eq!(db.edge_count(), 0);
1598 }
1599
1600 #[test]
1601 fn test_database_config() {
1602 let config = Config::in_memory().with_threads(4).with_query_logging();
1603
1604 let db = GrafeoDB::with_config(config).unwrap();
1605 assert_eq!(db.config().threads, 4);
1606 assert!(db.config().query_logging);
1607 }
1608
1609 #[test]
1610 fn test_database_session() {
1611 let db = GrafeoDB::new_in_memory();
1612 let _session = db.session();
1613 }
1615
1616 #[cfg(feature = "wal")]
1617 #[test]
1618 fn test_persistent_database_recovery() {
1619 use grafeo_common::types::Value;
1620 use tempfile::tempdir;
1621
1622 let dir = tempdir().unwrap();
1623 let db_path = dir.path().join("test_db");
1624
1625 {
1627 let db = GrafeoDB::open(&db_path).unwrap();
1628
1629 let alix = db.create_node(&["Person"]);
1630 db.set_node_property(alix, "name", Value::from("Alix"));
1631
1632 let gus = db.create_node(&["Person"]);
1633 db.set_node_property(gus, "name", Value::from("Gus"));
1634
1635 let _edge = db.create_edge(alix, gus, "KNOWS");
1636
1637 db.close().unwrap();
1639 }
1640
1641 {
1643 let db = GrafeoDB::open(&db_path).unwrap();
1644
1645 assert_eq!(db.node_count(), 2);
1646 assert_eq!(db.edge_count(), 1);
1647
1648 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1650 assert!(node0.is_some());
1651
1652 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1653 assert!(node1.is_some());
1654 }
1655 }
1656
1657 #[cfg(feature = "wal")]
1658 #[test]
1659 fn test_wal_logging() {
1660 use tempfile::tempdir;
1661
1662 let dir = tempdir().unwrap();
1663 let db_path = dir.path().join("wal_test_db");
1664
1665 let db = GrafeoDB::open(&db_path).unwrap();
1666
1667 let node = db.create_node(&["Test"]);
1669 db.delete_node(node);
1670
1671 if let Some(wal) = db.wal() {
1673 assert!(wal.record_count() > 0);
1674 }
1675
1676 db.close().unwrap();
1677 }
1678
1679 #[cfg(feature = "wal")]
1680 #[test]
1681 fn test_wal_recovery_multiple_sessions() {
1682 use grafeo_common::types::Value;
1684 use tempfile::tempdir;
1685
1686 let dir = tempdir().unwrap();
1687 let db_path = dir.path().join("multi_session_db");
1688
1689 {
1691 let db = GrafeoDB::open(&db_path).unwrap();
1692 let alix = db.create_node(&["Person"]);
1693 db.set_node_property(alix, "name", Value::from("Alix"));
1694 db.close().unwrap();
1695 }
1696
1697 {
1699 let db = GrafeoDB::open(&db_path).unwrap();
1700 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1702 db.set_node_property(gus, "name", Value::from("Gus"));
1703 db.close().unwrap();
1704 }
1705
1706 {
1708 let db = GrafeoDB::open(&db_path).unwrap();
1709 assert_eq!(db.node_count(), 2);
1710
1711 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1713 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1714
1715 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1716 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1717 }
1718 }
1719
1720 #[cfg(feature = "wal")]
1721 #[test]
1722 fn test_database_consistency_after_mutations() {
1723 use grafeo_common::types::Value;
1725 use tempfile::tempdir;
1726
1727 let dir = tempdir().unwrap();
1728 let db_path = dir.path().join("consistency_db");
1729
1730 {
1731 let db = GrafeoDB::open(&db_path).unwrap();
1732
1733 let a = db.create_node(&["Node"]);
1735 let b = db.create_node(&["Node"]);
1736 let c = db.create_node(&["Node"]);
1737
1738 let e1 = db.create_edge(a, b, "LINKS");
1740 let _e2 = db.create_edge(b, c, "LINKS");
1741
1742 db.delete_edge(e1);
1744 db.delete_node(b);
1745
1746 db.set_node_property(a, "value", Value::Int64(1));
1748 db.set_node_property(c, "value", Value::Int64(3));
1749
1750 db.close().unwrap();
1751 }
1752
1753 {
1755 let db = GrafeoDB::open(&db_path).unwrap();
1756
1757 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1761 assert!(node_a.is_some());
1762
1763 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1764 assert!(node_c.is_some());
1765
1766 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1768 assert!(node_b.is_none());
1769 }
1770 }
1771
1772 #[cfg(feature = "wal")]
1773 #[test]
1774 fn test_close_is_idempotent() {
1775 use tempfile::tempdir;
1777
1778 let dir = tempdir().unwrap();
1779 let db_path = dir.path().join("close_test_db");
1780
1781 let db = GrafeoDB::open(&db_path).unwrap();
1782 db.create_node(&["Test"]);
1783
1784 assert!(db.close().is_ok());
1786
1787 assert!(db.close().is_ok());
1789 }
1790
1791 #[test]
1792 fn test_with_store_external_backend() {
1793 use grafeo_core::graph::lpg::LpgStore;
1794
1795 let external = Arc::new(LpgStore::new().unwrap());
1796
1797 let n1 = external.create_node(&["Person"]);
1799 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1800
1801 let db = GrafeoDB::with_store(
1802 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1803 Config::in_memory(),
1804 )
1805 .unwrap();
1806
1807 let session = db.session();
1808
1809 #[cfg(feature = "gql")]
1811 {
1812 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1813 assert_eq!(result.rows.len(), 1);
1814 }
1815 }
1816
1817 #[test]
1818 fn test_with_config_custom_memory_limit() {
1819 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
1822 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1823 assert_eq!(db.node_count(), 0);
1824 }
1825
1826 #[cfg(feature = "metrics")]
1827 #[test]
1828 fn test_database_metrics_registry() {
1829 let db = GrafeoDB::new_in_memory();
1830
1831 db.create_node(&["Person"]);
1833 db.create_node(&["Person"]);
1834
1835 let snap = db.metrics();
1837 assert_eq!(snap.query_count, 0); }
1840
1841 #[test]
1842 fn test_query_result_has_metrics() {
1843 let db = GrafeoDB::new_in_memory();
1845 db.create_node(&["Person"]);
1846 db.create_node(&["Person"]);
1847
1848 #[cfg(feature = "gql")]
1849 {
1850 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1851
1852 assert!(result.execution_time_ms.is_some());
1854 assert!(result.rows_scanned.is_some());
1855 assert!(result.execution_time_ms.unwrap() >= 0.0);
1856 assert_eq!(result.rows_scanned.unwrap(), 2);
1857 }
1858 }
1859
1860 #[test]
1861 fn test_empty_query_result_metrics() {
1862 let db = GrafeoDB::new_in_memory();
1864 db.create_node(&["Person"]);
1865
1866 #[cfg(feature = "gql")]
1867 {
1868 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1870
1871 assert!(result.execution_time_ms.is_some());
1872 assert!(result.rows_scanned.is_some());
1873 assert_eq!(result.rows_scanned.unwrap(), 0);
1874 }
1875 }
1876
1877 #[cfg(feature = "cdc")]
1878 mod cdc_integration {
1879 use super::*;
1880
1881 #[test]
1882 fn test_node_lifecycle_history() {
1883 let db = GrafeoDB::new_in_memory();
1884
1885 let id = db.create_node(&["Person"]);
1887 db.set_node_property(id, "name", "Alix".into());
1889 db.set_node_property(id, "name", "Gus".into());
1890 db.delete_node(id);
1892
1893 let history = db.history(id).unwrap();
1894 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1896 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1897 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1899 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1901 }
1902
1903 #[test]
1904 fn test_edge_lifecycle_history() {
1905 let db = GrafeoDB::new_in_memory();
1906
1907 let alix = db.create_node(&["Person"]);
1908 let gus = db.create_node(&["Person"]);
1909 let edge = db.create_edge(alix, gus, "KNOWS");
1910 db.set_edge_property(edge, "since", 2024i64.into());
1911 db.delete_edge(edge);
1912
1913 let history = db.history(edge).unwrap();
1914 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1916 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1917 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1918 }
1919
1920 #[test]
1921 fn test_create_node_with_props_cdc() {
1922 let db = GrafeoDB::new_in_memory();
1923
1924 let id = db.create_node_with_props(
1925 &["Person"],
1926 vec![
1927 ("name", grafeo_common::types::Value::from("Alix")),
1928 ("age", grafeo_common::types::Value::from(30i64)),
1929 ],
1930 );
1931
1932 let history = db.history(id).unwrap();
1933 assert_eq!(history.len(), 1);
1934 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1935 let after = history[0].after.as_ref().unwrap();
1937 assert_eq!(after.len(), 2);
1938 }
1939
1940 #[test]
1941 fn test_changes_between() {
1942 let db = GrafeoDB::new_in_memory();
1943
1944 let id1 = db.create_node(&["A"]);
1945 let _id2 = db.create_node(&["B"]);
1946 db.set_node_property(id1, "x", 1i64.into());
1947
1948 let changes = db
1950 .changes_between(
1951 grafeo_common::types::EpochId(0),
1952 grafeo_common::types::EpochId(u64::MAX),
1953 )
1954 .unwrap();
1955 assert_eq!(changes.len(), 3); }
1957 }
1958
1959 #[test]
1960 fn test_with_store_basic() {
1961 use grafeo_core::graph::lpg::LpgStore;
1962
1963 let store = Arc::new(LpgStore::new().unwrap());
1964 let n1 = store.create_node(&["Person"]);
1965 store.set_node_property(n1, "name", "Alix".into());
1966
1967 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1968 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1969
1970 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1971 assert_eq!(result.rows.len(), 1);
1972 }
1973
1974 #[test]
1975 fn test_with_store_session() {
1976 use grafeo_core::graph::lpg::LpgStore;
1977
1978 let store = Arc::new(LpgStore::new().unwrap());
1979 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1980 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1981
1982 let session = db.session();
1983 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1984 assert_eq!(result.rows.len(), 1);
1985 }
1986
1987 #[test]
1988 fn test_with_store_mutations() {
1989 use grafeo_core::graph::lpg::LpgStore;
1990
1991 let store = Arc::new(LpgStore::new().unwrap());
1992 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1993 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1994
1995 let mut session = db.session();
1996
1997 session.begin_transaction().unwrap();
2001 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2002
2003 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2004 assert_eq!(result.rows.len(), 1);
2005
2006 session.commit().unwrap();
2007 }
2008}