1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27use grafeo_common::grafeo_error;
28#[cfg(feature = "wal")]
29use std::path::Path;
30use std::sync::Arc;
31use std::sync::atomic::AtomicUsize;
32
33use parking_lot::RwLock;
34
35#[cfg(feature = "grafeo-file")]
36use grafeo_adapters::storage::file::GrafeoFileManager;
37#[cfg(feature = "wal")]
38use grafeo_adapters::storage::wal::{
39 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
40};
41use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
42use grafeo_common::utils::error::Result;
43use grafeo_core::graph::GraphStoreMut;
44use grafeo_core::graph::lpg::LpgStore;
45#[cfg(feature = "rdf")]
46use grafeo_core::graph::rdf::RdfStore;
47
48use crate::catalog::Catalog;
49use crate::config::Config;
50use crate::query::cache::QueryCache;
51use crate::session::Session;
52use crate::transaction::TransactionManager;
53
54pub struct GrafeoDB {
77 pub(super) config: Config,
79 pub(super) store: Arc<LpgStore>,
81 pub(super) catalog: Arc<Catalog>,
83 #[cfg(feature = "rdf")]
85 pub(super) rdf_store: Arc<RdfStore>,
86 pub(super) transaction_manager: Arc<TransactionManager>,
88 pub(super) buffer_manager: Arc<BufferManager>,
90 #[cfg(feature = "wal")]
92 pub(super) wal: Option<Arc<LpgWal>>,
93 #[cfg(feature = "wal")]
97 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
98 pub(super) query_cache: Arc<QueryCache>,
100 pub(super) commit_counter: Arc<AtomicUsize>,
102 pub(super) is_open: RwLock<bool>,
104 #[cfg(feature = "cdc")]
106 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
107 #[cfg(feature = "embed")]
109 pub(super) embedding_models:
110 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
111 #[cfg(feature = "grafeo-file")]
113 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
114 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
117 #[cfg(feature = "metrics")]
119 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
120 current_graph: RwLock<Option<String>>,
124 current_schema: RwLock<Option<String>>,
128 read_only: bool,
131}
132
133impl GrafeoDB {
134 #[must_use]
155 pub fn new_in_memory() -> Self {
156 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
157 }
158
159 #[cfg(feature = "wal")]
178 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
179 Self::with_config(Config::persistent(path.as_ref()))
180 }
181
182 #[cfg(feature = "grafeo-file")]
207 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
208 Self::with_config(Config::read_only(path.as_ref()))
209 }
210
211 pub fn with_config(config: Config) -> Result<Self> {
235 config
237 .validate()
238 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
239
240 let store = Arc::new(LpgStore::new()?);
241 #[cfg(feature = "rdf")]
242 let rdf_store = Arc::new(RdfStore::new());
243 let transaction_manager = Arc::new(TransactionManager::new());
244
245 let buffer_config = BufferManagerConfig {
247 budget: config.memory_limit.unwrap_or_else(|| {
248 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
249 }),
250 spill_path: config
251 .spill_path
252 .clone()
253 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
254 ..BufferManagerConfig::default()
255 };
256 let buffer_manager = BufferManager::new(buffer_config);
257
258 let catalog = Arc::new(Catalog::new());
260
261 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
262
263 #[cfg(feature = "grafeo-file")]
265 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
266 if let Some(ref db_path) = config.path {
268 if db_path.exists() && db_path.is_file() {
269 let fm = GrafeoFileManager::open_read_only(db_path)?;
270 let snapshot_data = fm.read_snapshot()?;
271 if !snapshot_data.is_empty() {
272 Self::apply_snapshot_data(
273 &store,
274 &catalog,
275 #[cfg(feature = "rdf")]
276 &rdf_store,
277 &snapshot_data,
278 )?;
279 }
280 Some(Arc::new(fm))
281 } else {
282 return Err(grafeo_common::utils::error::Error::Internal(format!(
283 "read-only open requires an existing .grafeo file: {}",
284 db_path.display()
285 )));
286 }
287 } else {
288 return Err(grafeo_common::utils::error::Error::Internal(
289 "read-only mode requires a database path".to_string(),
290 ));
291 }
292 } else if config.wal_enabled {
293 if let Some(ref db_path) = config.path {
294 if Self::should_use_single_file(db_path, config.storage_format) {
295 let fm = if db_path.exists() && db_path.is_file() {
296 GrafeoFileManager::open(db_path)?
297 } else if !db_path.exists() {
298 GrafeoFileManager::create(db_path)?
299 } else {
300 return Err(grafeo_common::utils::error::Error::Internal(format!(
302 "path exists but is not a file: {}",
303 db_path.display()
304 )));
305 };
306
307 let snapshot_data = fm.read_snapshot()?;
309 if !snapshot_data.is_empty() {
310 Self::apply_snapshot_data(
311 &store,
312 &catalog,
313 #[cfg(feature = "rdf")]
314 &rdf_store,
315 &snapshot_data,
316 )?;
317 }
318
319 if fm.has_sidecar_wal() {
321 let recovery = WalRecovery::new(fm.sidecar_wal_path());
322 let records = recovery.recover()?;
323 Self::apply_wal_records(
324 &store,
325 &catalog,
326 #[cfg(feature = "rdf")]
327 &rdf_store,
328 &records,
329 )?;
330 }
331
332 Some(Arc::new(fm))
333 } else {
334 None
335 }
336 } else {
337 None
338 }
339 } else {
340 None
341 };
342
343 #[cfg(feature = "wal")]
346 let wal = if is_read_only {
347 None
348 } else if config.wal_enabled {
349 if let Some(ref db_path) = config.path {
350 #[cfg(feature = "grafeo-file")]
352 let wal_path = if let Some(ref fm) = file_manager {
353 let p = fm.sidecar_wal_path();
354 std::fs::create_dir_all(&p)?;
355 p
356 } else {
357 std::fs::create_dir_all(db_path)?;
359 db_path.join("wal")
360 };
361
362 #[cfg(not(feature = "grafeo-file"))]
363 let wal_path = {
364 std::fs::create_dir_all(db_path)?;
365 db_path.join("wal")
366 };
367
368 #[cfg(feature = "grafeo-file")]
370 let is_single_file = file_manager.is_some();
371 #[cfg(not(feature = "grafeo-file"))]
372 let is_single_file = false;
373
374 if !is_single_file && wal_path.exists() {
375 let recovery = WalRecovery::new(&wal_path);
376 let records = recovery.recover()?;
377 Self::apply_wal_records(
378 &store,
379 &catalog,
380 #[cfg(feature = "rdf")]
381 &rdf_store,
382 &records,
383 )?;
384 }
385
386 let wal_durability = match config.wal_durability {
388 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
389 crate::config::DurabilityMode::Batch {
390 max_delay_ms,
391 max_records,
392 } => WalDurabilityMode::Batch {
393 max_delay_ms,
394 max_records,
395 },
396 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
397 WalDurabilityMode::Adaptive { target_interval_ms }
398 }
399 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
400 };
401 let wal_config = WalConfig {
402 durability: wal_durability,
403 ..WalConfig::default()
404 };
405 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
406 Some(Arc::new(wal_manager))
407 } else {
408 None
409 }
410 } else {
411 None
412 };
413
414 let query_cache = Arc::new(QueryCache::default());
416
417 #[cfg(feature = "temporal")]
420 transaction_manager.sync_epoch(store.current_epoch());
421
422 Ok(Self {
423 config,
424 store,
425 catalog,
426 #[cfg(feature = "rdf")]
427 rdf_store,
428 transaction_manager,
429 buffer_manager,
430 #[cfg(feature = "wal")]
431 wal,
432 #[cfg(feature = "wal")]
433 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
434 query_cache,
435 commit_counter: Arc::new(AtomicUsize::new(0)),
436 is_open: RwLock::new(true),
437 #[cfg(feature = "cdc")]
438 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
439 #[cfg(feature = "embed")]
440 embedding_models: RwLock::new(hashbrown::HashMap::new()),
441 #[cfg(feature = "grafeo-file")]
442 file_manager,
443 external_store: None,
444 #[cfg(feature = "metrics")]
445 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
446 current_graph: RwLock::new(None),
447 current_schema: RwLock::new(None),
448 read_only: is_read_only,
449 })
450 }
451
452 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
477 config
478 .validate()
479 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
480
481 let dummy_store = Arc::new(LpgStore::new()?);
482 let transaction_manager = Arc::new(TransactionManager::new());
483
484 let buffer_config = BufferManagerConfig {
485 budget: config.memory_limit.unwrap_or_else(|| {
486 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
487 }),
488 spill_path: None,
489 ..BufferManagerConfig::default()
490 };
491 let buffer_manager = BufferManager::new(buffer_config);
492
493 let query_cache = Arc::new(QueryCache::default());
494
495 Ok(Self {
496 config,
497 store: dummy_store,
498 catalog: Arc::new(Catalog::new()),
499 #[cfg(feature = "rdf")]
500 rdf_store: Arc::new(RdfStore::new()),
501 transaction_manager,
502 buffer_manager,
503 #[cfg(feature = "wal")]
504 wal: None,
505 #[cfg(feature = "wal")]
506 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
507 query_cache,
508 commit_counter: Arc::new(AtomicUsize::new(0)),
509 is_open: RwLock::new(true),
510 #[cfg(feature = "cdc")]
511 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
512 #[cfg(feature = "embed")]
513 embedding_models: RwLock::new(hashbrown::HashMap::new()),
514 #[cfg(feature = "grafeo-file")]
515 file_manager: None,
516 external_store: Some(store),
517 #[cfg(feature = "metrics")]
518 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
519 current_graph: RwLock::new(None),
520 current_schema: RwLock::new(None),
521 read_only: false,
522 })
523 }
524
525 #[cfg(feature = "wal")]
531 fn apply_wal_records(
532 store: &Arc<LpgStore>,
533 catalog: &Catalog,
534 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
535 records: &[WalRecord],
536 ) -> Result<()> {
537 use crate::catalog::{
538 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
539 };
540 use grafeo_common::utils::error::Error;
541
542 let mut current_graph: Option<String> = None;
545 let mut target_store: Arc<LpgStore> = Arc::clone(store);
546
547 for record in records {
548 match record {
549 WalRecord::CreateNamedGraph { name } => {
551 let _ = store.create_graph(name);
552 }
553 WalRecord::DropNamedGraph { name } => {
554 store.drop_graph(name);
555 if current_graph.as_deref() == Some(name.as_str()) {
557 current_graph = None;
558 target_store = Arc::clone(store);
559 }
560 }
561 WalRecord::SwitchGraph { name } => {
562 current_graph.clone_from(name);
563 target_store = match ¤t_graph {
564 None => Arc::clone(store),
565 Some(graph_name) => store
566 .graph_or_create(graph_name)
567 .map_err(|e| Error::Internal(e.to_string()))?,
568 };
569 }
570
571 WalRecord::CreateNode { id, labels } => {
573 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
574 target_store.create_node_with_id(*id, &label_refs)?;
575 }
576 WalRecord::DeleteNode { id } => {
577 target_store.delete_node(*id);
578 }
579 WalRecord::CreateEdge {
580 id,
581 src,
582 dst,
583 edge_type,
584 } => {
585 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
586 }
587 WalRecord::DeleteEdge { id } => {
588 target_store.delete_edge(*id);
589 }
590 WalRecord::SetNodeProperty { id, key, value } => {
591 target_store.set_node_property(*id, key, value.clone());
592 }
593 WalRecord::SetEdgeProperty { id, key, value } => {
594 target_store.set_edge_property(*id, key, value.clone());
595 }
596 WalRecord::AddNodeLabel { id, label } => {
597 target_store.add_label(*id, label);
598 }
599 WalRecord::RemoveNodeLabel { id, label } => {
600 target_store.remove_label(*id, label);
601 }
602 WalRecord::RemoveNodeProperty { id, key } => {
603 target_store.remove_node_property(*id, key);
604 }
605 WalRecord::RemoveEdgeProperty { id, key } => {
606 target_store.remove_edge_property(*id, key);
607 }
608
609 WalRecord::CreateNodeType {
611 name,
612 properties,
613 constraints,
614 } => {
615 let def = NodeTypeDefinition {
616 name: name.clone(),
617 properties: properties
618 .iter()
619 .map(|(n, t, nullable)| TypedProperty {
620 name: n.clone(),
621 data_type: PropertyDataType::from_type_name(t),
622 nullable: *nullable,
623 default_value: None,
624 })
625 .collect(),
626 constraints: constraints
627 .iter()
628 .map(|(kind, props)| match kind.as_str() {
629 "unique" => TypeConstraint::Unique(props.clone()),
630 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
631 "not_null" if !props.is_empty() => {
632 TypeConstraint::NotNull(props[0].clone())
633 }
634 _ => TypeConstraint::Unique(props.clone()),
635 })
636 .collect(),
637 parent_types: Vec::new(),
638 };
639 let _ = catalog.register_node_type(def);
640 }
641 WalRecord::DropNodeType { name } => {
642 let _ = catalog.drop_node_type(name);
643 }
644 WalRecord::CreateEdgeType {
645 name,
646 properties,
647 constraints,
648 } => {
649 let def = EdgeTypeDefinition {
650 name: name.clone(),
651 properties: properties
652 .iter()
653 .map(|(n, t, nullable)| TypedProperty {
654 name: n.clone(),
655 data_type: PropertyDataType::from_type_name(t),
656 nullable: *nullable,
657 default_value: None,
658 })
659 .collect(),
660 constraints: constraints
661 .iter()
662 .map(|(kind, props)| match kind.as_str() {
663 "unique" => TypeConstraint::Unique(props.clone()),
664 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
665 "not_null" if !props.is_empty() => {
666 TypeConstraint::NotNull(props[0].clone())
667 }
668 _ => TypeConstraint::Unique(props.clone()),
669 })
670 .collect(),
671 source_node_types: Vec::new(),
672 target_node_types: Vec::new(),
673 };
674 let _ = catalog.register_edge_type_def(def);
675 }
676 WalRecord::DropEdgeType { name } => {
677 let _ = catalog.drop_edge_type_def(name);
678 }
679 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
680 }
683 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
684 }
687 WalRecord::CreateGraphType {
688 name,
689 node_types,
690 edge_types,
691 open,
692 } => {
693 use crate::catalog::GraphTypeDefinition;
694 let def = GraphTypeDefinition {
695 name: name.clone(),
696 allowed_node_types: node_types.clone(),
697 allowed_edge_types: edge_types.clone(),
698 open: *open,
699 };
700 let _ = catalog.register_graph_type(def);
701 }
702 WalRecord::DropGraphType { name } => {
703 let _ = catalog.drop_graph_type(name);
704 }
705 WalRecord::CreateSchema { name } => {
706 let _ = catalog.register_schema_namespace(name.clone());
707 }
708 WalRecord::DropSchema { name } => {
709 let _ = catalog.drop_schema_namespace(name);
710 }
711
712 WalRecord::AlterNodeType { name, alterations } => {
713 for (action, prop_name, type_name, nullable) in alterations {
714 match action.as_str() {
715 "add" => {
716 let prop = TypedProperty {
717 name: prop_name.clone(),
718 data_type: PropertyDataType::from_type_name(type_name),
719 nullable: *nullable,
720 default_value: None,
721 };
722 let _ = catalog.alter_node_type_add_property(name, prop);
723 }
724 "drop" => {
725 let _ = catalog.alter_node_type_drop_property(name, prop_name);
726 }
727 _ => {}
728 }
729 }
730 }
731 WalRecord::AlterEdgeType { name, alterations } => {
732 for (action, prop_name, type_name, nullable) in alterations {
733 match action.as_str() {
734 "add" => {
735 let prop = TypedProperty {
736 name: prop_name.clone(),
737 data_type: PropertyDataType::from_type_name(type_name),
738 nullable: *nullable,
739 default_value: None,
740 };
741 let _ = catalog.alter_edge_type_add_property(name, prop);
742 }
743 "drop" => {
744 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
745 }
746 _ => {}
747 }
748 }
749 }
750 WalRecord::AlterGraphType { name, alterations } => {
751 for (action, type_name) in alterations {
752 match action.as_str() {
753 "add_node" => {
754 let _ =
755 catalog.alter_graph_type_add_node_type(name, type_name.clone());
756 }
757 "drop_node" => {
758 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
759 }
760 "add_edge" => {
761 let _ =
762 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
763 }
764 "drop_edge" => {
765 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
766 }
767 _ => {}
768 }
769 }
770 }
771
772 WalRecord::CreateProcedure {
773 name,
774 params,
775 returns,
776 body,
777 } => {
778 use crate::catalog::ProcedureDefinition;
779 let def = ProcedureDefinition {
780 name: name.clone(),
781 params: params.clone(),
782 returns: returns.clone(),
783 body: body.clone(),
784 };
785 let _ = catalog.register_procedure(def);
786 }
787 WalRecord::DropProcedure { name } => {
788 let _ = catalog.drop_procedure(name);
789 }
790
791 #[cfg(feature = "rdf")]
793 WalRecord::InsertRdfTriple { .. }
794 | WalRecord::DeleteRdfTriple { .. }
795 | WalRecord::ClearRdfGraph { .. }
796 | WalRecord::CreateRdfGraph { .. }
797 | WalRecord::DropRdfGraph { .. } => {
798 rdf_ops::replay_rdf_wal_record(rdf_store, record);
799 }
800 #[cfg(not(feature = "rdf"))]
801 WalRecord::InsertRdfTriple { .. }
802 | WalRecord::DeleteRdfTriple { .. }
803 | WalRecord::ClearRdfGraph { .. }
804 | WalRecord::CreateRdfGraph { .. }
805 | WalRecord::DropRdfGraph { .. } => {}
806
807 WalRecord::TransactionCommit { .. } => {
808 #[cfg(feature = "temporal")]
812 {
813 target_store.new_epoch();
814 }
815 }
816 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
817 }
820 }
821 }
822 Ok(())
823 }
824
825 #[cfg(feature = "grafeo-file")]
831 fn should_use_single_file(
832 path: &std::path::Path,
833 configured: crate::config::StorageFormat,
834 ) -> bool {
835 use crate::config::StorageFormat;
836 match configured {
837 StorageFormat::SingleFile => true,
838 StorageFormat::WalDirectory => false,
839 StorageFormat::Auto => {
840 if path.is_file() {
842 if let Ok(mut f) = std::fs::File::open(path) {
843 use std::io::Read;
844 let mut magic = [0u8; 4];
845 if f.read_exact(&mut magic).is_ok()
846 && magic == grafeo_adapters::storage::file::MAGIC
847 {
848 return true;
849 }
850 }
851 return false;
852 }
853 if path.is_dir() {
855 return false;
856 }
857 path.extension().is_some_and(|ext| ext == "grafeo")
859 }
860 }
861 }
862
863 #[cfg(feature = "grafeo-file")]
865 fn apply_snapshot_data(
866 store: &Arc<LpgStore>,
867 catalog: &Arc<crate::catalog::Catalog>,
868 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
869 data: &[u8],
870 ) -> Result<()> {
871 persistence::load_snapshot_into_store(
872 store,
873 catalog,
874 #[cfg(feature = "rdf")]
875 rdf_store,
876 data,
877 )
878 }
879
880 #[must_use]
908 pub fn session(&self) -> Session {
909 let session_cfg = || crate::session::SessionConfig {
910 transaction_manager: Arc::clone(&self.transaction_manager),
911 query_cache: Arc::clone(&self.query_cache),
912 catalog: Arc::clone(&self.catalog),
913 adaptive_config: self.config.adaptive.clone(),
914 factorized_execution: self.config.factorized_execution,
915 graph_model: self.config.graph_model,
916 query_timeout: self.config.query_timeout,
917 commit_counter: Arc::clone(&self.commit_counter),
918 gc_interval: self.config.gc_interval,
919 read_only: self.read_only,
920 };
921
922 if let Some(ref ext_store) = self.external_store {
923 return Session::with_external_store(Arc::clone(ext_store), session_cfg())
924 .expect("arena allocation for external store session");
925 }
926
927 #[cfg(feature = "rdf")]
928 let mut session = Session::with_rdf_store_and_adaptive(
929 Arc::clone(&self.store),
930 Arc::clone(&self.rdf_store),
931 session_cfg(),
932 );
933 #[cfg(not(feature = "rdf"))]
934 let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
935
936 #[cfg(feature = "wal")]
937 if let Some(ref wal) = self.wal {
938 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
939 }
940
941 #[cfg(feature = "cdc")]
942 session.set_cdc_log(Arc::clone(&self.cdc_log));
943
944 #[cfg(feature = "metrics")]
945 {
946 if let Some(ref m) = self.metrics {
947 session.set_metrics(Arc::clone(m));
948 m.session_created
949 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
950 m.session_active
951 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
952 }
953 }
954
955 if let Some(ref graph) = *self.current_graph.read() {
957 session.use_graph(graph);
958 }
959
960 if let Some(ref schema) = *self.current_schema.read() {
962 session.set_schema(schema);
963 }
964
965 let _ = &mut session;
967
968 session
969 }
970
971 #[must_use]
977 pub fn current_graph(&self) -> Option<String> {
978 self.current_graph.read().clone()
979 }
980
981 pub fn set_current_graph(&self, name: Option<&str>) {
986 *self.current_graph.write() = name.map(ToString::to_string);
987 }
988
989 #[must_use]
994 pub fn current_schema(&self) -> Option<String> {
995 self.current_schema.read().clone()
996 }
997
998 pub fn set_current_schema(&self, name: Option<&str>) {
1003 *self.current_schema.write() = name.map(ToString::to_string);
1004 }
1005
1006 #[must_use]
1008 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1009 &self.config.adaptive
1010 }
1011
1012 #[must_use]
1014 pub fn is_read_only(&self) -> bool {
1015 self.read_only
1016 }
1017
1018 #[must_use]
1020 pub fn config(&self) -> &Config {
1021 &self.config
1022 }
1023
1024 #[must_use]
1026 pub fn graph_model(&self) -> crate::config::GraphModel {
1027 self.config.graph_model
1028 }
1029
1030 #[must_use]
1032 pub fn memory_limit(&self) -> Option<usize> {
1033 self.config.memory_limit
1034 }
1035
1036 #[cfg(feature = "metrics")]
1041 #[must_use]
1042 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1043 let mut snapshot = self
1044 .metrics
1045 .as_ref()
1046 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1047
1048 let cache_stats = self.query_cache.stats();
1050 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1051 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1052 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1053 snapshot.cache_invalidations = cache_stats.invalidations;
1054
1055 snapshot
1056 }
1057
1058 #[cfg(feature = "metrics")]
1062 #[must_use]
1063 pub fn metrics_prometheus(&self) -> String {
1064 self.metrics
1065 .as_ref()
1066 .map_or_else(String::new, |m| m.to_prometheus())
1067 }
1068
1069 #[cfg(feature = "metrics")]
1071 pub fn reset_metrics(&self) {
1072 if let Some(ref m) = self.metrics {
1073 m.reset();
1074 }
1075 self.query_cache.reset_stats();
1076 }
1077
1078 #[must_use]
1086 pub fn store(&self) -> &Arc<LpgStore> {
1087 &self.store
1088 }
1089
1090 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
1097 let graph_name = self.current_graph.read().clone();
1098 match graph_name {
1099 None => Arc::clone(&self.store),
1100 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
1101 Some(ref name) => self
1102 .store
1103 .graph(name)
1104 .unwrap_or_else(|| Arc::clone(&self.store)),
1105 }
1106 }
1107
1108 pub fn create_graph(&self, name: &str) -> Result<bool> {
1116 Ok(self.store.create_graph(name)?)
1117 }
1118
1119 pub fn drop_graph(&self, name: &str) -> bool {
1121 self.store.drop_graph(name)
1122 }
1123
1124 #[must_use]
1126 pub fn list_graphs(&self) -> Vec<String> {
1127 self.store.graph_names()
1128 }
1129
1130 #[must_use]
1138 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1139 if let Some(ref ext_store) = self.external_store {
1140 Arc::clone(ext_store)
1141 } else {
1142 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1143 }
1144 }
1145
1146 pub fn gc(&self) {
1152 let min_epoch = self.transaction_manager.min_active_epoch();
1153 self.store.gc_versions(min_epoch);
1154 self.transaction_manager.gc();
1155 }
1156
1157 #[must_use]
1159 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1160 &self.buffer_manager
1161 }
1162
1163 #[must_use]
1165 pub fn query_cache(&self) -> &Arc<QueryCache> {
1166 &self.query_cache
1167 }
1168
1169 pub fn clear_plan_cache(&self) {
1175 self.query_cache.clear();
1176 }
1177
1178 pub fn close(&self) -> Result<()> {
1192 let mut is_open = self.is_open.write();
1193 if !*is_open {
1194 return Ok(());
1195 }
1196
1197 if self.read_only {
1199 #[cfg(feature = "grafeo-file")]
1200 if let Some(ref fm) = self.file_manager {
1201 fm.close()?;
1202 }
1203 *is_open = false;
1204 return Ok(());
1205 }
1206
1207 #[cfg(feature = "grafeo-file")]
1211 let is_single_file = self.file_manager.is_some();
1212 #[cfg(not(feature = "grafeo-file"))]
1213 let is_single_file = false;
1214
1215 #[cfg(feature = "grafeo-file")]
1216 if let Some(ref fm) = self.file_manager {
1217 #[cfg(feature = "wal")]
1219 if let Some(ref wal) = self.wal {
1220 wal.sync()?;
1221 }
1222 self.checkpoint_to_file(fm)?;
1223
1224 #[cfg(feature = "wal")]
1227 if let Some(ref wal) = self.wal {
1228 wal.close_active_log();
1229 }
1230
1231 fm.remove_sidecar_wal()?;
1232 fm.close()?;
1233 }
1234
1235 #[cfg(feature = "wal")]
1241 if !is_single_file && let Some(ref wal) = self.wal {
1242 let commit_tx = self
1244 .transaction_manager
1245 .last_assigned_transaction_id()
1246 .unwrap_or_else(|| self.transaction_manager.begin());
1247
1248 wal.log(&WalRecord::TransactionCommit {
1250 transaction_id: commit_tx,
1251 })?;
1252
1253 wal.sync()?;
1254 }
1255
1256 *is_open = false;
1257 Ok(())
1258 }
1259
1260 #[cfg(feature = "wal")]
1262 #[must_use]
1263 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1264 self.wal.as_ref()
1265 }
1266
1267 #[cfg(feature = "wal")]
1269 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1270 if let Some(ref wal) = self.wal {
1271 wal.log(record)?;
1272 }
1273 Ok(())
1274 }
1275
1276 #[cfg(feature = "grafeo-file")]
1282 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1283 use grafeo_core::testing::crash::maybe_crash;
1284
1285 maybe_crash("checkpoint_to_file:before_export");
1286 let snapshot_data = self.export_snapshot()?;
1287 maybe_crash("checkpoint_to_file:after_export");
1288
1289 let epoch = self.store.current_epoch();
1290 let transaction_id = self
1291 .transaction_manager
1292 .last_assigned_transaction_id()
1293 .map_or(0, |t| t.0);
1294 let node_count = self.store.node_count() as u64;
1295 let edge_count = self.store.edge_count() as u64;
1296
1297 fm.write_snapshot(
1298 &snapshot_data,
1299 epoch.0,
1300 transaction_id,
1301 node_count,
1302 edge_count,
1303 )?;
1304
1305 maybe_crash("checkpoint_to_file:after_write_snapshot");
1306 Ok(())
1307 }
1308
1309 #[cfg(feature = "grafeo-file")]
1311 #[must_use]
1312 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1313 self.file_manager.as_ref()
1314 }
1315}
1316
1317impl Drop for GrafeoDB {
1318 fn drop(&mut self) {
1319 if let Err(e) = self.close() {
1320 grafeo_error!("Error closing database: {}", e);
1321 }
1322 }
1323}
1324
1325impl crate::admin::AdminService for GrafeoDB {
1326 fn info(&self) -> crate::admin::DatabaseInfo {
1327 self.info()
1328 }
1329
1330 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1331 self.detailed_stats()
1332 }
1333
1334 fn schema(&self) -> crate::admin::SchemaInfo {
1335 self.schema()
1336 }
1337
1338 fn validate(&self) -> crate::admin::ValidationResult {
1339 self.validate()
1340 }
1341
1342 fn wal_status(&self) -> crate::admin::WalStatus {
1343 self.wal_status()
1344 }
1345
1346 fn wal_checkpoint(&self) -> Result<()> {
1347 self.wal_checkpoint()
1348 }
1349}
1350
1351#[derive(Debug)]
1381pub struct QueryResult {
1382 pub columns: Vec<String>,
1384 pub column_types: Vec<grafeo_common::types::LogicalType>,
1386 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1388 pub execution_time_ms: Option<f64>,
1390 pub rows_scanned: Option<u64>,
1392 pub status_message: Option<String>,
1394 pub gql_status: grafeo_common::utils::GqlStatus,
1396}
1397
1398impl QueryResult {
1399 #[must_use]
1401 pub fn empty() -> Self {
1402 Self {
1403 columns: Vec::new(),
1404 column_types: Vec::new(),
1405 rows: Vec::new(),
1406 execution_time_ms: None,
1407 rows_scanned: None,
1408 status_message: None,
1409 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1410 }
1411 }
1412
1413 #[must_use]
1415 pub fn status(msg: impl Into<String>) -> Self {
1416 Self {
1417 columns: Vec::new(),
1418 column_types: Vec::new(),
1419 rows: Vec::new(),
1420 execution_time_ms: None,
1421 rows_scanned: None,
1422 status_message: Some(msg.into()),
1423 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1424 }
1425 }
1426
1427 #[must_use]
1429 pub fn new(columns: Vec<String>) -> Self {
1430 let len = columns.len();
1431 Self {
1432 columns,
1433 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1434 rows: Vec::new(),
1435 execution_time_ms: None,
1436 rows_scanned: None,
1437 status_message: None,
1438 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1439 }
1440 }
1441
1442 #[must_use]
1444 pub fn with_types(
1445 columns: Vec<String>,
1446 column_types: Vec<grafeo_common::types::LogicalType>,
1447 ) -> Self {
1448 Self {
1449 columns,
1450 column_types,
1451 rows: Vec::new(),
1452 execution_time_ms: None,
1453 rows_scanned: None,
1454 status_message: None,
1455 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1456 }
1457 }
1458
1459 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1461 self.execution_time_ms = Some(execution_time_ms);
1462 self.rows_scanned = Some(rows_scanned);
1463 self
1464 }
1465
1466 #[must_use]
1468 pub fn execution_time_ms(&self) -> Option<f64> {
1469 self.execution_time_ms
1470 }
1471
1472 #[must_use]
1474 pub fn rows_scanned(&self) -> Option<u64> {
1475 self.rows_scanned
1476 }
1477
1478 #[must_use]
1480 pub fn row_count(&self) -> usize {
1481 self.rows.len()
1482 }
1483
1484 #[must_use]
1486 pub fn column_count(&self) -> usize {
1487 self.columns.len()
1488 }
1489
1490 #[must_use]
1492 pub fn is_empty(&self) -> bool {
1493 self.rows.is_empty()
1494 }
1495
1496 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1505 if self.rows.len() != 1 || self.columns.len() != 1 {
1506 return Err(grafeo_common::utils::error::Error::InvalidValue(
1507 "Expected single value".to_string(),
1508 ));
1509 }
1510 T::from_value(&self.rows[0][0])
1511 }
1512
1513 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1515 self.rows.iter()
1516 }
1517}
1518
1519impl std::fmt::Display for QueryResult {
1520 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1521 let table = grafeo_common::fmt::format_result_table(
1522 &self.columns,
1523 &self.rows,
1524 self.execution_time_ms,
1525 self.status_message.as_deref(),
1526 );
1527 f.write_str(&table)
1528 }
1529}
1530
1531pub trait FromValue: Sized {
1536 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1538}
1539
1540impl FromValue for i64 {
1541 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1542 value
1543 .as_int64()
1544 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1545 expected: "INT64".to_string(),
1546 found: value.type_name().to_string(),
1547 })
1548 }
1549}
1550
1551impl FromValue for f64 {
1552 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1553 value
1554 .as_float64()
1555 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1556 expected: "FLOAT64".to_string(),
1557 found: value.type_name().to_string(),
1558 })
1559 }
1560}
1561
1562impl FromValue for String {
1563 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1564 value.as_str().map(String::from).ok_or_else(|| {
1565 grafeo_common::utils::error::Error::TypeMismatch {
1566 expected: "STRING".to_string(),
1567 found: value.type_name().to_string(),
1568 }
1569 })
1570 }
1571}
1572
1573impl FromValue for bool {
1574 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1575 value
1576 .as_bool()
1577 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1578 expected: "BOOL".to_string(),
1579 found: value.type_name().to_string(),
1580 })
1581 }
1582}
1583
1584#[cfg(test)]
1585mod tests {
1586 use super::*;
1587
1588 #[test]
1589 fn test_create_in_memory_database() {
1590 let db = GrafeoDB::new_in_memory();
1591 assert_eq!(db.node_count(), 0);
1592 assert_eq!(db.edge_count(), 0);
1593 }
1594
1595 #[test]
1596 fn test_database_config() {
1597 let config = Config::in_memory().with_threads(4).with_query_logging();
1598
1599 let db = GrafeoDB::with_config(config).unwrap();
1600 assert_eq!(db.config().threads, 4);
1601 assert!(db.config().query_logging);
1602 }
1603
1604 #[test]
1605 fn test_database_session() {
1606 let db = GrafeoDB::new_in_memory();
1607 let _session = db.session();
1608 }
1610
1611 #[cfg(feature = "wal")]
1612 #[test]
1613 fn test_persistent_database_recovery() {
1614 use grafeo_common::types::Value;
1615 use tempfile::tempdir;
1616
1617 let dir = tempdir().unwrap();
1618 let db_path = dir.path().join("test_db");
1619
1620 {
1622 let db = GrafeoDB::open(&db_path).unwrap();
1623
1624 let alix = db.create_node(&["Person"]);
1625 db.set_node_property(alix, "name", Value::from("Alix"));
1626
1627 let gus = db.create_node(&["Person"]);
1628 db.set_node_property(gus, "name", Value::from("Gus"));
1629
1630 let _edge = db.create_edge(alix, gus, "KNOWS");
1631
1632 db.close().unwrap();
1634 }
1635
1636 {
1638 let db = GrafeoDB::open(&db_path).unwrap();
1639
1640 assert_eq!(db.node_count(), 2);
1641 assert_eq!(db.edge_count(), 1);
1642
1643 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1645 assert!(node0.is_some());
1646
1647 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1648 assert!(node1.is_some());
1649 }
1650 }
1651
1652 #[cfg(feature = "wal")]
1653 #[test]
1654 fn test_wal_logging() {
1655 use tempfile::tempdir;
1656
1657 let dir = tempdir().unwrap();
1658 let db_path = dir.path().join("wal_test_db");
1659
1660 let db = GrafeoDB::open(&db_path).unwrap();
1661
1662 let node = db.create_node(&["Test"]);
1664 db.delete_node(node);
1665
1666 if let Some(wal) = db.wal() {
1668 assert!(wal.record_count() > 0);
1669 }
1670
1671 db.close().unwrap();
1672 }
1673
1674 #[cfg(feature = "wal")]
1675 #[test]
1676 fn test_wal_recovery_multiple_sessions() {
1677 use grafeo_common::types::Value;
1679 use tempfile::tempdir;
1680
1681 let dir = tempdir().unwrap();
1682 let db_path = dir.path().join("multi_session_db");
1683
1684 {
1686 let db = GrafeoDB::open(&db_path).unwrap();
1687 let alix = db.create_node(&["Person"]);
1688 db.set_node_property(alix, "name", Value::from("Alix"));
1689 db.close().unwrap();
1690 }
1691
1692 {
1694 let db = GrafeoDB::open(&db_path).unwrap();
1695 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1697 db.set_node_property(gus, "name", Value::from("Gus"));
1698 db.close().unwrap();
1699 }
1700
1701 {
1703 let db = GrafeoDB::open(&db_path).unwrap();
1704 assert_eq!(db.node_count(), 2);
1705
1706 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1708 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1709
1710 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1711 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1712 }
1713 }
1714
1715 #[cfg(feature = "wal")]
1716 #[test]
1717 fn test_database_consistency_after_mutations() {
1718 use grafeo_common::types::Value;
1720 use tempfile::tempdir;
1721
1722 let dir = tempdir().unwrap();
1723 let db_path = dir.path().join("consistency_db");
1724
1725 {
1726 let db = GrafeoDB::open(&db_path).unwrap();
1727
1728 let a = db.create_node(&["Node"]);
1730 let b = db.create_node(&["Node"]);
1731 let c = db.create_node(&["Node"]);
1732
1733 let e1 = db.create_edge(a, b, "LINKS");
1735 let _e2 = db.create_edge(b, c, "LINKS");
1736
1737 db.delete_edge(e1);
1739 db.delete_node(b);
1740
1741 db.set_node_property(a, "value", Value::Int64(1));
1743 db.set_node_property(c, "value", Value::Int64(3));
1744
1745 db.close().unwrap();
1746 }
1747
1748 {
1750 let db = GrafeoDB::open(&db_path).unwrap();
1751
1752 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1756 assert!(node_a.is_some());
1757
1758 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1759 assert!(node_c.is_some());
1760
1761 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1763 assert!(node_b.is_none());
1764 }
1765 }
1766
1767 #[cfg(feature = "wal")]
1768 #[test]
1769 fn test_close_is_idempotent() {
1770 use tempfile::tempdir;
1772
1773 let dir = tempdir().unwrap();
1774 let db_path = dir.path().join("close_test_db");
1775
1776 let db = GrafeoDB::open(&db_path).unwrap();
1777 db.create_node(&["Test"]);
1778
1779 assert!(db.close().is_ok());
1781
1782 assert!(db.close().is_ok());
1784 }
1785
1786 #[test]
1787 fn test_with_store_external_backend() {
1788 use grafeo_core::graph::lpg::LpgStore;
1789
1790 let external = Arc::new(LpgStore::new().unwrap());
1791
1792 let n1 = external.create_node(&["Person"]);
1794 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1795
1796 let db = GrafeoDB::with_store(
1797 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1798 Config::in_memory(),
1799 )
1800 .unwrap();
1801
1802 let session = db.session();
1803
1804 #[cfg(feature = "gql")]
1806 {
1807 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1808 assert_eq!(result.rows.len(), 1);
1809 }
1810 }
1811
1812 #[test]
1813 fn test_with_config_custom_memory_limit() {
1814 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
1817 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1818 assert_eq!(db.node_count(), 0);
1819 }
1820
1821 #[cfg(feature = "metrics")]
1822 #[test]
1823 fn test_database_metrics_registry() {
1824 let db = GrafeoDB::new_in_memory();
1825
1826 db.create_node(&["Person"]);
1828 db.create_node(&["Person"]);
1829
1830 let snap = db.metrics();
1832 assert_eq!(snap.query_count, 0); }
1835
1836 #[test]
1837 fn test_query_result_has_metrics() {
1838 let db = GrafeoDB::new_in_memory();
1840 db.create_node(&["Person"]);
1841 db.create_node(&["Person"]);
1842
1843 #[cfg(feature = "gql")]
1844 {
1845 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1846
1847 assert!(result.execution_time_ms.is_some());
1849 assert!(result.rows_scanned.is_some());
1850 assert!(result.execution_time_ms.unwrap() >= 0.0);
1851 assert_eq!(result.rows_scanned.unwrap(), 2);
1852 }
1853 }
1854
1855 #[test]
1856 fn test_empty_query_result_metrics() {
1857 let db = GrafeoDB::new_in_memory();
1859 db.create_node(&["Person"]);
1860
1861 #[cfg(feature = "gql")]
1862 {
1863 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1865
1866 assert!(result.execution_time_ms.is_some());
1867 assert!(result.rows_scanned.is_some());
1868 assert_eq!(result.rows_scanned.unwrap(), 0);
1869 }
1870 }
1871
1872 #[cfg(feature = "cdc")]
1873 mod cdc_integration {
1874 use super::*;
1875
1876 #[test]
1877 fn test_node_lifecycle_history() {
1878 let db = GrafeoDB::new_in_memory();
1879
1880 let id = db.create_node(&["Person"]);
1882 db.set_node_property(id, "name", "Alix".into());
1884 db.set_node_property(id, "name", "Gus".into());
1885 db.delete_node(id);
1887
1888 let history = db.history(id).unwrap();
1889 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1891 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1892 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1894 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1896 }
1897
1898 #[test]
1899 fn test_edge_lifecycle_history() {
1900 let db = GrafeoDB::new_in_memory();
1901
1902 let alix = db.create_node(&["Person"]);
1903 let gus = db.create_node(&["Person"]);
1904 let edge = db.create_edge(alix, gus, "KNOWS");
1905 db.set_edge_property(edge, "since", 2024i64.into());
1906 db.delete_edge(edge);
1907
1908 let history = db.history(edge).unwrap();
1909 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1911 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1912 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1913 }
1914
1915 #[test]
1916 fn test_create_node_with_props_cdc() {
1917 let db = GrafeoDB::new_in_memory();
1918
1919 let id = db.create_node_with_props(
1920 &["Person"],
1921 vec![
1922 ("name", grafeo_common::types::Value::from("Alix")),
1923 ("age", grafeo_common::types::Value::from(30i64)),
1924 ],
1925 );
1926
1927 let history = db.history(id).unwrap();
1928 assert_eq!(history.len(), 1);
1929 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1930 let after = history[0].after.as_ref().unwrap();
1932 assert_eq!(after.len(), 2);
1933 }
1934
1935 #[test]
1936 fn test_changes_between() {
1937 let db = GrafeoDB::new_in_memory();
1938
1939 let id1 = db.create_node(&["A"]);
1940 let _id2 = db.create_node(&["B"]);
1941 db.set_node_property(id1, "x", 1i64.into());
1942
1943 let changes = db
1945 .changes_between(
1946 grafeo_common::types::EpochId(0),
1947 grafeo_common::types::EpochId(u64::MAX),
1948 )
1949 .unwrap();
1950 assert_eq!(changes.len(), 3); }
1952 }
1953
1954 #[test]
1955 fn test_with_store_basic() {
1956 use grafeo_core::graph::lpg::LpgStore;
1957
1958 let store = Arc::new(LpgStore::new().unwrap());
1959 let n1 = store.create_node(&["Person"]);
1960 store.set_node_property(n1, "name", "Alix".into());
1961
1962 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1963 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1964
1965 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1966 assert_eq!(result.rows.len(), 1);
1967 }
1968
1969 #[test]
1970 fn test_with_store_session() {
1971 use grafeo_core::graph::lpg::LpgStore;
1972
1973 let store = Arc::new(LpgStore::new().unwrap());
1974 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1975 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1976
1977 let session = db.session();
1978 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1979 assert_eq!(result.rows.len(), 1);
1980 }
1981
1982 #[test]
1983 fn test_with_store_mutations() {
1984 use grafeo_core::graph::lpg::LpgStore;
1985
1986 let store = Arc::new(LpgStore::new().unwrap());
1987 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1988 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1989
1990 let mut session = db.session();
1991
1992 session.begin_transaction().unwrap();
1996 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1997
1998 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1999 assert_eq!(result.rows.len(), 1);
2000
2001 session.commit().unwrap();
2002 }
2003}