1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27#[cfg(feature = "wal")]
28use std::path::Path;
29use std::sync::Arc;
30use std::sync::atomic::AtomicUsize;
31
32use parking_lot::RwLock;
33
34#[cfg(feature = "grafeo-file")]
35use grafeo_adapters::storage::file::GrafeoFileManager;
36#[cfg(feature = "wal")]
37use grafeo_adapters::storage::wal::{
38 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
39};
40use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
41use grafeo_common::utils::error::Result;
42use grafeo_core::graph::GraphStoreMut;
43use grafeo_core::graph::lpg::LpgStore;
44#[cfg(feature = "rdf")]
45use grafeo_core::graph::rdf::RdfStore;
46
47use crate::catalog::Catalog;
48use crate::config::Config;
49use crate::query::cache::QueryCache;
50use crate::session::Session;
51use crate::transaction::TransactionManager;
52
53pub struct GrafeoDB {
76 pub(super) config: Config,
78 pub(super) store: Arc<LpgStore>,
80 pub(super) catalog: Arc<Catalog>,
82 #[cfg(feature = "rdf")]
84 pub(super) rdf_store: Arc<RdfStore>,
85 pub(super) transaction_manager: Arc<TransactionManager>,
87 pub(super) buffer_manager: Arc<BufferManager>,
89 #[cfg(feature = "wal")]
91 pub(super) wal: Option<Arc<LpgWal>>,
92 #[cfg(feature = "wal")]
96 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
97 pub(super) query_cache: Arc<QueryCache>,
99 pub(super) commit_counter: Arc<AtomicUsize>,
101 pub(super) is_open: RwLock<bool>,
103 #[cfg(feature = "cdc")]
105 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
106 #[cfg(feature = "embed")]
108 pub(super) embedding_models:
109 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
110 #[cfg(feature = "grafeo-file")]
112 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
113 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
116 #[cfg(feature = "metrics")]
118 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
119 current_graph: RwLock<Option<String>>,
123}
124
125impl GrafeoDB {
126 #[must_use]
147 pub fn new_in_memory() -> Self {
148 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
149 }
150
151 #[cfg(feature = "wal")]
170 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
171 Self::with_config(Config::persistent(path.as_ref()))
172 }
173
174 pub fn with_config(config: Config) -> Result<Self> {
198 config
200 .validate()
201 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
202
203 let store = Arc::new(LpgStore::new()?);
204 #[cfg(feature = "rdf")]
205 let rdf_store = Arc::new(RdfStore::new());
206 let transaction_manager = Arc::new(TransactionManager::new());
207
208 let buffer_config = BufferManagerConfig {
210 budget: config.memory_limit.unwrap_or_else(|| {
211 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
212 }),
213 spill_path: config
214 .spill_path
215 .clone()
216 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
217 ..BufferManagerConfig::default()
218 };
219 let buffer_manager = BufferManager::new(buffer_config);
220
221 let catalog = Arc::new(Catalog::new());
223
224 #[cfg(feature = "grafeo-file")]
226 let file_manager: Option<Arc<GrafeoFileManager>> = if config.wal_enabled {
227 if let Some(ref db_path) = config.path {
228 if Self::should_use_single_file(db_path, config.storage_format) {
229 let fm = if db_path.exists() && db_path.is_file() {
230 GrafeoFileManager::open(db_path)?
231 } else if !db_path.exists() {
232 GrafeoFileManager::create(db_path)?
233 } else {
234 return Err(grafeo_common::utils::error::Error::Internal(format!(
236 "path exists but is not a file: {}",
237 db_path.display()
238 )));
239 };
240
241 let snapshot_data = fm.read_snapshot()?;
243 if !snapshot_data.is_empty() {
244 Self::apply_snapshot_data(
245 &store,
246 &catalog,
247 #[cfg(feature = "rdf")]
248 &rdf_store,
249 &snapshot_data,
250 )?;
251 }
252
253 if fm.has_sidecar_wal() {
255 let recovery = WalRecovery::new(fm.sidecar_wal_path());
256 let records = recovery.recover()?;
257 Self::apply_wal_records(
258 &store,
259 &catalog,
260 #[cfg(feature = "rdf")]
261 &rdf_store,
262 &records,
263 )?;
264 }
265
266 Some(Arc::new(fm))
267 } else {
268 None
269 }
270 } else {
271 None
272 }
273 } else {
274 None
275 };
276
277 #[cfg(feature = "wal")]
279 let wal = if config.wal_enabled {
280 if let Some(ref db_path) = config.path {
281 #[cfg(feature = "grafeo-file")]
283 let wal_path = if let Some(ref fm) = file_manager {
284 let p = fm.sidecar_wal_path();
285 std::fs::create_dir_all(&p)?;
286 p
287 } else {
288 std::fs::create_dir_all(db_path)?;
290 db_path.join("wal")
291 };
292
293 #[cfg(not(feature = "grafeo-file"))]
294 let wal_path = {
295 std::fs::create_dir_all(db_path)?;
296 db_path.join("wal")
297 };
298
299 #[cfg(feature = "grafeo-file")]
301 let is_single_file = file_manager.is_some();
302 #[cfg(not(feature = "grafeo-file"))]
303 let is_single_file = false;
304
305 if !is_single_file && wal_path.exists() {
306 let recovery = WalRecovery::new(&wal_path);
307 let records = recovery.recover()?;
308 Self::apply_wal_records(
309 &store,
310 &catalog,
311 #[cfg(feature = "rdf")]
312 &rdf_store,
313 &records,
314 )?;
315 }
316
317 let wal_durability = match config.wal_durability {
319 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
320 crate::config::DurabilityMode::Batch {
321 max_delay_ms,
322 max_records,
323 } => WalDurabilityMode::Batch {
324 max_delay_ms,
325 max_records,
326 },
327 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
328 WalDurabilityMode::Adaptive { target_interval_ms }
329 }
330 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
331 };
332 let wal_config = WalConfig {
333 durability: wal_durability,
334 ..WalConfig::default()
335 };
336 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
337 Some(Arc::new(wal_manager))
338 } else {
339 None
340 }
341 } else {
342 None
343 };
344
345 let query_cache = Arc::new(QueryCache::default());
347
348 Ok(Self {
349 config,
350 store,
351 catalog,
352 #[cfg(feature = "rdf")]
353 rdf_store,
354 transaction_manager,
355 buffer_manager,
356 #[cfg(feature = "wal")]
357 wal,
358 #[cfg(feature = "wal")]
359 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
360 query_cache,
361 commit_counter: Arc::new(AtomicUsize::new(0)),
362 is_open: RwLock::new(true),
363 #[cfg(feature = "cdc")]
364 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
365 #[cfg(feature = "embed")]
366 embedding_models: RwLock::new(hashbrown::HashMap::new()),
367 #[cfg(feature = "grafeo-file")]
368 file_manager,
369 external_store: None,
370 #[cfg(feature = "metrics")]
371 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
372 current_graph: RwLock::new(None),
373 })
374 }
375
376 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
401 config
402 .validate()
403 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
404
405 let dummy_store = Arc::new(LpgStore::new()?);
406 let transaction_manager = Arc::new(TransactionManager::new());
407
408 let buffer_config = BufferManagerConfig {
409 budget: config.memory_limit.unwrap_or_else(|| {
410 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
411 }),
412 spill_path: None,
413 ..BufferManagerConfig::default()
414 };
415 let buffer_manager = BufferManager::new(buffer_config);
416
417 let query_cache = Arc::new(QueryCache::default());
418
419 Ok(Self {
420 config,
421 store: dummy_store,
422 catalog: Arc::new(Catalog::new()),
423 #[cfg(feature = "rdf")]
424 rdf_store: Arc::new(RdfStore::new()),
425 transaction_manager,
426 buffer_manager,
427 #[cfg(feature = "wal")]
428 wal: None,
429 #[cfg(feature = "wal")]
430 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
431 query_cache,
432 commit_counter: Arc::new(AtomicUsize::new(0)),
433 is_open: RwLock::new(true),
434 #[cfg(feature = "cdc")]
435 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
436 #[cfg(feature = "embed")]
437 embedding_models: RwLock::new(hashbrown::HashMap::new()),
438 #[cfg(feature = "grafeo-file")]
439 file_manager: None,
440 external_store: Some(store),
441 #[cfg(feature = "metrics")]
442 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
443 current_graph: RwLock::new(None),
444 })
445 }
446
447 #[cfg(feature = "wal")]
453 fn apply_wal_records(
454 store: &Arc<LpgStore>,
455 catalog: &Catalog,
456 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
457 records: &[WalRecord],
458 ) -> Result<()> {
459 use crate::catalog::{
460 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
461 };
462 use grafeo_common::utils::error::Error;
463
464 let mut current_graph: Option<String> = None;
467 let mut target_store: Arc<LpgStore> = Arc::clone(store);
468
469 for record in records {
470 match record {
471 WalRecord::CreateNamedGraph { name } => {
473 let _ = store.create_graph(name);
474 }
475 WalRecord::DropNamedGraph { name } => {
476 store.drop_graph(name);
477 if current_graph.as_deref() == Some(name.as_str()) {
479 current_graph = None;
480 target_store = Arc::clone(store);
481 }
482 }
483 WalRecord::SwitchGraph { name } => {
484 current_graph.clone_from(name);
485 target_store = match ¤t_graph {
486 None => Arc::clone(store),
487 Some(graph_name) => store
488 .graph_or_create(graph_name)
489 .map_err(|e| Error::Internal(e.to_string()))?,
490 };
491 }
492
493 WalRecord::CreateNode { id, labels } => {
495 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
496 target_store.create_node_with_id(*id, &label_refs)?;
497 }
498 WalRecord::DeleteNode { id } => {
499 target_store.delete_node(*id);
500 }
501 WalRecord::CreateEdge {
502 id,
503 src,
504 dst,
505 edge_type,
506 } => {
507 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
508 }
509 WalRecord::DeleteEdge { id } => {
510 target_store.delete_edge(*id);
511 }
512 WalRecord::SetNodeProperty { id, key, value } => {
513 target_store.set_node_property(*id, key, value.clone());
514 }
515 WalRecord::SetEdgeProperty { id, key, value } => {
516 target_store.set_edge_property(*id, key, value.clone());
517 }
518 WalRecord::AddNodeLabel { id, label } => {
519 target_store.add_label(*id, label);
520 }
521 WalRecord::RemoveNodeLabel { id, label } => {
522 target_store.remove_label(*id, label);
523 }
524 WalRecord::RemoveNodeProperty { id, key } => {
525 target_store.remove_node_property(*id, key);
526 }
527 WalRecord::RemoveEdgeProperty { id, key } => {
528 target_store.remove_edge_property(*id, key);
529 }
530
531 WalRecord::CreateNodeType {
533 name,
534 properties,
535 constraints,
536 } => {
537 let def = NodeTypeDefinition {
538 name: name.clone(),
539 properties: properties
540 .iter()
541 .map(|(n, t, nullable)| TypedProperty {
542 name: n.clone(),
543 data_type: PropertyDataType::from_type_name(t),
544 nullable: *nullable,
545 default_value: None,
546 })
547 .collect(),
548 constraints: constraints
549 .iter()
550 .map(|(kind, props)| match kind.as_str() {
551 "unique" => TypeConstraint::Unique(props.clone()),
552 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
553 "not_null" if !props.is_empty() => {
554 TypeConstraint::NotNull(props[0].clone())
555 }
556 _ => TypeConstraint::Unique(props.clone()),
557 })
558 .collect(),
559 parent_types: Vec::new(),
560 };
561 let _ = catalog.register_node_type(def);
562 }
563 WalRecord::DropNodeType { name } => {
564 let _ = catalog.drop_node_type(name);
565 }
566 WalRecord::CreateEdgeType {
567 name,
568 properties,
569 constraints,
570 } => {
571 let def = EdgeTypeDefinition {
572 name: name.clone(),
573 properties: properties
574 .iter()
575 .map(|(n, t, nullable)| TypedProperty {
576 name: n.clone(),
577 data_type: PropertyDataType::from_type_name(t),
578 nullable: *nullable,
579 default_value: None,
580 })
581 .collect(),
582 constraints: constraints
583 .iter()
584 .map(|(kind, props)| match kind.as_str() {
585 "unique" => TypeConstraint::Unique(props.clone()),
586 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
587 "not_null" if !props.is_empty() => {
588 TypeConstraint::NotNull(props[0].clone())
589 }
590 _ => TypeConstraint::Unique(props.clone()),
591 })
592 .collect(),
593 source_node_types: Vec::new(),
594 target_node_types: Vec::new(),
595 };
596 let _ = catalog.register_edge_type_def(def);
597 }
598 WalRecord::DropEdgeType { name } => {
599 let _ = catalog.drop_edge_type_def(name);
600 }
601 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
602 }
605 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
606 }
609 WalRecord::CreateGraphType {
610 name,
611 node_types,
612 edge_types,
613 open,
614 } => {
615 use crate::catalog::GraphTypeDefinition;
616 let def = GraphTypeDefinition {
617 name: name.clone(),
618 allowed_node_types: node_types.clone(),
619 allowed_edge_types: edge_types.clone(),
620 open: *open,
621 };
622 let _ = catalog.register_graph_type(def);
623 }
624 WalRecord::DropGraphType { name } => {
625 let _ = catalog.drop_graph_type(name);
626 }
627 WalRecord::CreateSchema { name } => {
628 let _ = catalog.register_schema_namespace(name.clone());
629 }
630 WalRecord::DropSchema { name } => {
631 let _ = catalog.drop_schema_namespace(name);
632 }
633
634 WalRecord::AlterNodeType { name, alterations } => {
635 for (action, prop_name, type_name, nullable) in alterations {
636 match action.as_str() {
637 "add" => {
638 let prop = TypedProperty {
639 name: prop_name.clone(),
640 data_type: PropertyDataType::from_type_name(type_name),
641 nullable: *nullable,
642 default_value: None,
643 };
644 let _ = catalog.alter_node_type_add_property(name, prop);
645 }
646 "drop" => {
647 let _ = catalog.alter_node_type_drop_property(name, prop_name);
648 }
649 _ => {}
650 }
651 }
652 }
653 WalRecord::AlterEdgeType { name, alterations } => {
654 for (action, prop_name, type_name, nullable) in alterations {
655 match action.as_str() {
656 "add" => {
657 let prop = TypedProperty {
658 name: prop_name.clone(),
659 data_type: PropertyDataType::from_type_name(type_name),
660 nullable: *nullable,
661 default_value: None,
662 };
663 let _ = catalog.alter_edge_type_add_property(name, prop);
664 }
665 "drop" => {
666 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
667 }
668 _ => {}
669 }
670 }
671 }
672 WalRecord::AlterGraphType { name, alterations } => {
673 for (action, type_name) in alterations {
674 match action.as_str() {
675 "add_node" => {
676 let _ =
677 catalog.alter_graph_type_add_node_type(name, type_name.clone());
678 }
679 "drop_node" => {
680 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
681 }
682 "add_edge" => {
683 let _ =
684 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
685 }
686 "drop_edge" => {
687 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
688 }
689 _ => {}
690 }
691 }
692 }
693
694 WalRecord::CreateProcedure {
695 name,
696 params,
697 returns,
698 body,
699 } => {
700 use crate::catalog::ProcedureDefinition;
701 let def = ProcedureDefinition {
702 name: name.clone(),
703 params: params.clone(),
704 returns: returns.clone(),
705 body: body.clone(),
706 };
707 let _ = catalog.register_procedure(def);
708 }
709 WalRecord::DropProcedure { name } => {
710 let _ = catalog.drop_procedure(name);
711 }
712
713 #[cfg(feature = "rdf")]
715 WalRecord::InsertRdfTriple { .. }
716 | WalRecord::DeleteRdfTriple { .. }
717 | WalRecord::ClearRdfGraph { .. }
718 | WalRecord::CreateRdfGraph { .. }
719 | WalRecord::DropRdfGraph { .. } => {
720 rdf_ops::replay_rdf_wal_record(rdf_store, record);
721 }
722 #[cfg(not(feature = "rdf"))]
723 WalRecord::InsertRdfTriple { .. }
724 | WalRecord::DeleteRdfTriple { .. }
725 | WalRecord::ClearRdfGraph { .. }
726 | WalRecord::CreateRdfGraph { .. }
727 | WalRecord::DropRdfGraph { .. } => {}
728
729 WalRecord::TransactionCommit { .. }
730 | WalRecord::TransactionAbort { .. }
731 | WalRecord::Checkpoint { .. } => {
732 }
735 }
736 }
737 Ok(())
738 }
739
740 #[cfg(feature = "grafeo-file")]
746 fn should_use_single_file(
747 path: &std::path::Path,
748 configured: crate::config::StorageFormat,
749 ) -> bool {
750 use crate::config::StorageFormat;
751 match configured {
752 StorageFormat::SingleFile => true,
753 StorageFormat::WalDirectory => false,
754 StorageFormat::Auto => {
755 if path.is_file() {
757 if let Ok(mut f) = std::fs::File::open(path) {
758 use std::io::Read;
759 let mut magic = [0u8; 4];
760 if f.read_exact(&mut magic).is_ok()
761 && magic == grafeo_adapters::storage::file::MAGIC
762 {
763 return true;
764 }
765 }
766 return false;
767 }
768 if path.is_dir() {
770 return false;
771 }
772 path.extension().is_some_and(|ext| ext == "grafeo")
774 }
775 }
776 }
777
778 #[cfg(feature = "grafeo-file")]
780 fn apply_snapshot_data(
781 store: &Arc<LpgStore>,
782 catalog: &Arc<crate::catalog::Catalog>,
783 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
784 data: &[u8],
785 ) -> Result<()> {
786 persistence::load_snapshot_into_store(
787 store,
788 catalog,
789 #[cfg(feature = "rdf")]
790 rdf_store,
791 data,
792 )
793 }
794
795 #[must_use]
823 pub fn session(&self) -> Session {
824 let session_cfg = || crate::session::SessionConfig {
825 transaction_manager: Arc::clone(&self.transaction_manager),
826 query_cache: Arc::clone(&self.query_cache),
827 catalog: Arc::clone(&self.catalog),
828 adaptive_config: self.config.adaptive.clone(),
829 factorized_execution: self.config.factorized_execution,
830 graph_model: self.config.graph_model,
831 query_timeout: self.config.query_timeout,
832 commit_counter: Arc::clone(&self.commit_counter),
833 gc_interval: self.config.gc_interval,
834 };
835
836 if let Some(ref ext_store) = self.external_store {
837 return Session::with_external_store(Arc::clone(ext_store), session_cfg())
838 .expect("arena allocation for external store session");
839 }
840
841 #[cfg(feature = "rdf")]
842 let mut session = Session::with_rdf_store_and_adaptive(
843 Arc::clone(&self.store),
844 Arc::clone(&self.rdf_store),
845 session_cfg(),
846 );
847 #[cfg(not(feature = "rdf"))]
848 let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
849
850 #[cfg(feature = "wal")]
851 if let Some(ref wal) = self.wal {
852 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
853 }
854
855 #[cfg(feature = "cdc")]
856 session.set_cdc_log(Arc::clone(&self.cdc_log));
857
858 #[cfg(feature = "metrics")]
859 {
860 if let Some(ref m) = self.metrics {
861 session.set_metrics(Arc::clone(m));
862 m.session_created
863 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
864 m.session_active
865 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
866 }
867 }
868
869 if let Some(ref graph) = *self.current_graph.read() {
871 session.use_graph(graph);
872 }
873
874 let _ = &mut session;
876
877 session
878 }
879
880 #[must_use]
886 pub fn current_graph(&self) -> Option<String> {
887 self.current_graph.read().clone()
888 }
889
890 pub fn set_current_graph(&self, name: Option<&str>) {
895 *self.current_graph.write() = name.map(ToString::to_string);
896 }
897
898 #[must_use]
900 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
901 &self.config.adaptive
902 }
903
904 #[must_use]
906 pub fn config(&self) -> &Config {
907 &self.config
908 }
909
910 #[must_use]
912 pub fn graph_model(&self) -> crate::config::GraphModel {
913 self.config.graph_model
914 }
915
916 #[must_use]
918 pub fn memory_limit(&self) -> Option<usize> {
919 self.config.memory_limit
920 }
921
922 #[cfg(feature = "metrics")]
927 #[must_use]
928 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
929 let mut snapshot = self
930 .metrics
931 .as_ref()
932 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
933
934 let cache_stats = self.query_cache.stats();
936 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
937 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
938 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
939 snapshot.cache_invalidations = cache_stats.invalidations;
940
941 snapshot
942 }
943
944 #[cfg(feature = "metrics")]
948 #[must_use]
949 pub fn metrics_prometheus(&self) -> String {
950 self.metrics
951 .as_ref()
952 .map_or_else(String::new, |m| m.to_prometheus())
953 }
954
955 #[cfg(feature = "metrics")]
957 pub fn reset_metrics(&self) {
958 if let Some(ref m) = self.metrics {
959 m.reset();
960 }
961 self.query_cache.reset_stats();
962 }
963
964 #[must_use]
972 pub fn store(&self) -> &Arc<LpgStore> {
973 &self.store
974 }
975
976 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
983 let graph_name = self.current_graph.read().clone();
984 match graph_name {
985 None => Arc::clone(&self.store),
986 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
987 Some(ref name) => self
988 .store
989 .graph(name)
990 .unwrap_or_else(|| Arc::clone(&self.store)),
991 }
992 }
993
994 pub fn create_graph(&self, name: &str) -> Result<bool> {
1002 Ok(self.store.create_graph(name)?)
1003 }
1004
1005 pub fn drop_graph(&self, name: &str) -> bool {
1007 self.store.drop_graph(name)
1008 }
1009
1010 #[must_use]
1012 pub fn list_graphs(&self) -> Vec<String> {
1013 self.store.graph_names()
1014 }
1015
1016 #[must_use]
1024 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1025 if let Some(ref ext_store) = self.external_store {
1026 Arc::clone(ext_store)
1027 } else {
1028 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1029 }
1030 }
1031
1032 pub fn gc(&self) {
1038 let min_epoch = self.transaction_manager.min_active_epoch();
1039 self.store.gc_versions(min_epoch);
1040 self.transaction_manager.gc();
1041 }
1042
1043 #[must_use]
1045 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1046 &self.buffer_manager
1047 }
1048
1049 #[must_use]
1051 pub fn query_cache(&self) -> &Arc<QueryCache> {
1052 &self.query_cache
1053 }
1054
1055 pub fn clear_plan_cache(&self) {
1061 self.query_cache.clear();
1062 }
1063
1064 pub fn close(&self) -> Result<()> {
1078 let mut is_open = self.is_open.write();
1079 if !*is_open {
1080 return Ok(());
1081 }
1082
1083 #[cfg(feature = "grafeo-file")]
1087 let is_single_file = self.file_manager.is_some();
1088 #[cfg(not(feature = "grafeo-file"))]
1089 let is_single_file = false;
1090
1091 #[cfg(feature = "grafeo-file")]
1092 if let Some(ref fm) = self.file_manager {
1093 #[cfg(feature = "wal")]
1095 if let Some(ref wal) = self.wal {
1096 wal.sync()?;
1097 }
1098 self.checkpoint_to_file(fm)?;
1099
1100 #[cfg(feature = "wal")]
1103 if let Some(ref wal) = self.wal {
1104 wal.close_active_log();
1105 }
1106
1107 fm.remove_sidecar_wal()?;
1108 fm.close()?;
1109 }
1110
1111 #[cfg(feature = "wal")]
1113 if !is_single_file && let Some(ref wal) = self.wal {
1114 let epoch = self.store.current_epoch();
1115
1116 let checkpoint_tx = self
1118 .transaction_manager
1119 .last_assigned_transaction_id()
1120 .unwrap_or_else(|| {
1121 self.transaction_manager.begin()
1123 });
1124
1125 wal.log(&WalRecord::TransactionCommit {
1127 transaction_id: checkpoint_tx,
1128 })?;
1129
1130 wal.checkpoint(checkpoint_tx, epoch)?;
1132 wal.sync()?;
1133 }
1134
1135 *is_open = false;
1136 Ok(())
1137 }
1138
1139 #[cfg(feature = "wal")]
1141 #[must_use]
1142 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1143 self.wal.as_ref()
1144 }
1145
1146 #[cfg(feature = "wal")]
1148 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1149 if let Some(ref wal) = self.wal {
1150 wal.log(record)?;
1151 }
1152 Ok(())
1153 }
1154
1155 #[cfg(feature = "grafeo-file")]
1161 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1162 use grafeo_core::testing::crash::maybe_crash;
1163
1164 maybe_crash("checkpoint_to_file:before_export");
1165 let snapshot_data = self.export_snapshot()?;
1166 maybe_crash("checkpoint_to_file:after_export");
1167
1168 let epoch = self.store.current_epoch();
1169 let transaction_id = self
1170 .transaction_manager
1171 .last_assigned_transaction_id()
1172 .map_or(0, |t| t.0);
1173 let node_count = self.store.node_count() as u64;
1174 let edge_count = self.store.edge_count() as u64;
1175
1176 fm.write_snapshot(
1177 &snapshot_data,
1178 epoch.0,
1179 transaction_id,
1180 node_count,
1181 edge_count,
1182 )?;
1183
1184 maybe_crash("checkpoint_to_file:after_write_snapshot");
1185 Ok(())
1186 }
1187
1188 #[cfg(feature = "grafeo-file")]
1190 #[must_use]
1191 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1192 self.file_manager.as_ref()
1193 }
1194}
1195
1196impl Drop for GrafeoDB {
1197 fn drop(&mut self) {
1198 if let Err(e) = self.close() {
1199 tracing::error!("Error closing database: {}", e);
1200 }
1201 }
1202}
1203
1204impl crate::admin::AdminService for GrafeoDB {
1205 fn info(&self) -> crate::admin::DatabaseInfo {
1206 self.info()
1207 }
1208
1209 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1210 self.detailed_stats()
1211 }
1212
1213 fn schema(&self) -> crate::admin::SchemaInfo {
1214 self.schema()
1215 }
1216
1217 fn validate(&self) -> crate::admin::ValidationResult {
1218 self.validate()
1219 }
1220
1221 fn wal_status(&self) -> crate::admin::WalStatus {
1222 self.wal_status()
1223 }
1224
1225 fn wal_checkpoint(&self) -> Result<()> {
1226 self.wal_checkpoint()
1227 }
1228}
1229
1230#[derive(Debug)]
1260pub struct QueryResult {
1261 pub columns: Vec<String>,
1263 pub column_types: Vec<grafeo_common::types::LogicalType>,
1265 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1267 pub execution_time_ms: Option<f64>,
1269 pub rows_scanned: Option<u64>,
1271 pub status_message: Option<String>,
1273 pub gql_status: grafeo_common::utils::GqlStatus,
1275}
1276
1277impl QueryResult {
1278 #[must_use]
1280 pub fn empty() -> Self {
1281 Self {
1282 columns: Vec::new(),
1283 column_types: Vec::new(),
1284 rows: Vec::new(),
1285 execution_time_ms: None,
1286 rows_scanned: None,
1287 status_message: None,
1288 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1289 }
1290 }
1291
1292 #[must_use]
1294 pub fn status(msg: impl Into<String>) -> Self {
1295 Self {
1296 columns: Vec::new(),
1297 column_types: Vec::new(),
1298 rows: Vec::new(),
1299 execution_time_ms: None,
1300 rows_scanned: None,
1301 status_message: Some(msg.into()),
1302 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1303 }
1304 }
1305
1306 #[must_use]
1308 pub fn new(columns: Vec<String>) -> Self {
1309 let len = columns.len();
1310 Self {
1311 columns,
1312 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1313 rows: Vec::new(),
1314 execution_time_ms: None,
1315 rows_scanned: None,
1316 status_message: None,
1317 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1318 }
1319 }
1320
1321 #[must_use]
1323 pub fn with_types(
1324 columns: Vec<String>,
1325 column_types: Vec<grafeo_common::types::LogicalType>,
1326 ) -> Self {
1327 Self {
1328 columns,
1329 column_types,
1330 rows: Vec::new(),
1331 execution_time_ms: None,
1332 rows_scanned: None,
1333 status_message: None,
1334 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1335 }
1336 }
1337
1338 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1340 self.execution_time_ms = Some(execution_time_ms);
1341 self.rows_scanned = Some(rows_scanned);
1342 self
1343 }
1344
1345 #[must_use]
1347 pub fn execution_time_ms(&self) -> Option<f64> {
1348 self.execution_time_ms
1349 }
1350
1351 #[must_use]
1353 pub fn rows_scanned(&self) -> Option<u64> {
1354 self.rows_scanned
1355 }
1356
1357 #[must_use]
1359 pub fn row_count(&self) -> usize {
1360 self.rows.len()
1361 }
1362
1363 #[must_use]
1365 pub fn column_count(&self) -> usize {
1366 self.columns.len()
1367 }
1368
1369 #[must_use]
1371 pub fn is_empty(&self) -> bool {
1372 self.rows.is_empty()
1373 }
1374
1375 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1384 if self.rows.len() != 1 || self.columns.len() != 1 {
1385 return Err(grafeo_common::utils::error::Error::InvalidValue(
1386 "Expected single value".to_string(),
1387 ));
1388 }
1389 T::from_value(&self.rows[0][0])
1390 }
1391
1392 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1394 self.rows.iter()
1395 }
1396}
1397
1398impl std::fmt::Display for QueryResult {
1399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1400 let table = grafeo_common::fmt::format_result_table(
1401 &self.columns,
1402 &self.rows,
1403 self.execution_time_ms,
1404 self.status_message.as_deref(),
1405 );
1406 f.write_str(&table)
1407 }
1408}
1409
1410pub trait FromValue: Sized {
1415 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1417}
1418
1419impl FromValue for i64 {
1420 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1421 value
1422 .as_int64()
1423 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1424 expected: "INT64".to_string(),
1425 found: value.type_name().to_string(),
1426 })
1427 }
1428}
1429
1430impl FromValue for f64 {
1431 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1432 value
1433 .as_float64()
1434 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1435 expected: "FLOAT64".to_string(),
1436 found: value.type_name().to_string(),
1437 })
1438 }
1439}
1440
1441impl FromValue for String {
1442 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1443 value.as_str().map(String::from).ok_or_else(|| {
1444 grafeo_common::utils::error::Error::TypeMismatch {
1445 expected: "STRING".to_string(),
1446 found: value.type_name().to_string(),
1447 }
1448 })
1449 }
1450}
1451
1452impl FromValue for bool {
1453 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1454 value
1455 .as_bool()
1456 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1457 expected: "BOOL".to_string(),
1458 found: value.type_name().to_string(),
1459 })
1460 }
1461}
1462
1463#[cfg(test)]
1464mod tests {
1465 use super::*;
1466
1467 #[test]
1468 fn test_create_in_memory_database() {
1469 let db = GrafeoDB::new_in_memory();
1470 assert_eq!(db.node_count(), 0);
1471 assert_eq!(db.edge_count(), 0);
1472 }
1473
1474 #[test]
1475 fn test_database_config() {
1476 let config = Config::in_memory().with_threads(4).with_query_logging();
1477
1478 let db = GrafeoDB::with_config(config).unwrap();
1479 assert_eq!(db.config().threads, 4);
1480 assert!(db.config().query_logging);
1481 }
1482
1483 #[test]
1484 fn test_database_session() {
1485 let db = GrafeoDB::new_in_memory();
1486 let _session = db.session();
1487 }
1489
1490 #[cfg(feature = "wal")]
1491 #[test]
1492 fn test_persistent_database_recovery() {
1493 use grafeo_common::types::Value;
1494 use tempfile::tempdir;
1495
1496 let dir = tempdir().unwrap();
1497 let db_path = dir.path().join("test_db");
1498
1499 {
1501 let db = GrafeoDB::open(&db_path).unwrap();
1502
1503 let alix = db.create_node(&["Person"]);
1504 db.set_node_property(alix, "name", Value::from("Alix"));
1505
1506 let gus = db.create_node(&["Person"]);
1507 db.set_node_property(gus, "name", Value::from("Gus"));
1508
1509 let _edge = db.create_edge(alix, gus, "KNOWS");
1510
1511 db.close().unwrap();
1513 }
1514
1515 {
1517 let db = GrafeoDB::open(&db_path).unwrap();
1518
1519 assert_eq!(db.node_count(), 2);
1520 assert_eq!(db.edge_count(), 1);
1521
1522 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1524 assert!(node0.is_some());
1525
1526 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1527 assert!(node1.is_some());
1528 }
1529 }
1530
1531 #[cfg(feature = "wal")]
1532 #[test]
1533 fn test_wal_logging() {
1534 use tempfile::tempdir;
1535
1536 let dir = tempdir().unwrap();
1537 let db_path = dir.path().join("wal_test_db");
1538
1539 let db = GrafeoDB::open(&db_path).unwrap();
1540
1541 let node = db.create_node(&["Test"]);
1543 db.delete_node(node);
1544
1545 if let Some(wal) = db.wal() {
1547 assert!(wal.record_count() > 0);
1548 }
1549
1550 db.close().unwrap();
1551 }
1552
1553 #[cfg(feature = "wal")]
1554 #[test]
1555 fn test_wal_recovery_multiple_sessions() {
1556 use grafeo_common::types::Value;
1558 use tempfile::tempdir;
1559
1560 let dir = tempdir().unwrap();
1561 let db_path = dir.path().join("multi_session_db");
1562
1563 {
1565 let db = GrafeoDB::open(&db_path).unwrap();
1566 let alix = db.create_node(&["Person"]);
1567 db.set_node_property(alix, "name", Value::from("Alix"));
1568 db.close().unwrap();
1569 }
1570
1571 {
1573 let db = GrafeoDB::open(&db_path).unwrap();
1574 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1576 db.set_node_property(gus, "name", Value::from("Gus"));
1577 db.close().unwrap();
1578 }
1579
1580 {
1582 let db = GrafeoDB::open(&db_path).unwrap();
1583 assert_eq!(db.node_count(), 2);
1584
1585 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1587 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1588
1589 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1590 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1591 }
1592 }
1593
1594 #[cfg(feature = "wal")]
1595 #[test]
1596 fn test_database_consistency_after_mutations() {
1597 use grafeo_common::types::Value;
1599 use tempfile::tempdir;
1600
1601 let dir = tempdir().unwrap();
1602 let db_path = dir.path().join("consistency_db");
1603
1604 {
1605 let db = GrafeoDB::open(&db_path).unwrap();
1606
1607 let a = db.create_node(&["Node"]);
1609 let b = db.create_node(&["Node"]);
1610 let c = db.create_node(&["Node"]);
1611
1612 let e1 = db.create_edge(a, b, "LINKS");
1614 let _e2 = db.create_edge(b, c, "LINKS");
1615
1616 db.delete_edge(e1);
1618 db.delete_node(b);
1619
1620 db.set_node_property(a, "value", Value::Int64(1));
1622 db.set_node_property(c, "value", Value::Int64(3));
1623
1624 db.close().unwrap();
1625 }
1626
1627 {
1629 let db = GrafeoDB::open(&db_path).unwrap();
1630
1631 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1635 assert!(node_a.is_some());
1636
1637 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1638 assert!(node_c.is_some());
1639
1640 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1642 assert!(node_b.is_none());
1643 }
1644 }
1645
1646 #[cfg(feature = "wal")]
1647 #[test]
1648 fn test_close_is_idempotent() {
1649 use tempfile::tempdir;
1651
1652 let dir = tempdir().unwrap();
1653 let db_path = dir.path().join("close_test_db");
1654
1655 let db = GrafeoDB::open(&db_path).unwrap();
1656 db.create_node(&["Test"]);
1657
1658 assert!(db.close().is_ok());
1660
1661 assert!(db.close().is_ok());
1663 }
1664
1665 #[test]
1666 fn test_with_store_external_backend() {
1667 use grafeo_core::graph::lpg::LpgStore;
1668
1669 let external = Arc::new(LpgStore::new().unwrap());
1670
1671 let n1 = external.create_node(&["Person"]);
1673 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1674
1675 let db = GrafeoDB::with_store(
1676 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1677 Config::in_memory(),
1678 )
1679 .unwrap();
1680
1681 let session = db.session();
1682
1683 #[cfg(feature = "gql")]
1685 {
1686 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1687 assert_eq!(result.rows.len(), 1);
1688 }
1689 }
1690
1691 #[test]
1692 fn test_with_config_custom_memory_limit() {
1693 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
1696 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1697 assert_eq!(db.node_count(), 0);
1698 }
1699
1700 #[cfg(feature = "metrics")]
1701 #[test]
1702 fn test_database_metrics_registry() {
1703 let db = GrafeoDB::new_in_memory();
1704
1705 db.create_node(&["Person"]);
1707 db.create_node(&["Person"]);
1708
1709 let snap = db.metrics();
1711 assert_eq!(snap.query_count, 0); }
1714
1715 #[test]
1716 fn test_query_result_has_metrics() {
1717 let db = GrafeoDB::new_in_memory();
1719 db.create_node(&["Person"]);
1720 db.create_node(&["Person"]);
1721
1722 #[cfg(feature = "gql")]
1723 {
1724 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1725
1726 assert!(result.execution_time_ms.is_some());
1728 assert!(result.rows_scanned.is_some());
1729 assert!(result.execution_time_ms.unwrap() >= 0.0);
1730 assert_eq!(result.rows_scanned.unwrap(), 2);
1731 }
1732 }
1733
1734 #[test]
1735 fn test_empty_query_result_metrics() {
1736 let db = GrafeoDB::new_in_memory();
1738 db.create_node(&["Person"]);
1739
1740 #[cfg(feature = "gql")]
1741 {
1742 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1744
1745 assert!(result.execution_time_ms.is_some());
1746 assert!(result.rows_scanned.is_some());
1747 assert_eq!(result.rows_scanned.unwrap(), 0);
1748 }
1749 }
1750
1751 #[cfg(feature = "cdc")]
1752 mod cdc_integration {
1753 use super::*;
1754
1755 #[test]
1756 fn test_node_lifecycle_history() {
1757 let db = GrafeoDB::new_in_memory();
1758
1759 let id = db.create_node(&["Person"]);
1761 db.set_node_property(id, "name", "Alix".into());
1763 db.set_node_property(id, "name", "Gus".into());
1764 db.delete_node(id);
1766
1767 let history = db.history(id).unwrap();
1768 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1770 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1771 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1773 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1775 }
1776
1777 #[test]
1778 fn test_edge_lifecycle_history() {
1779 let db = GrafeoDB::new_in_memory();
1780
1781 let alix = db.create_node(&["Person"]);
1782 let gus = db.create_node(&["Person"]);
1783 let edge = db.create_edge(alix, gus, "KNOWS");
1784 db.set_edge_property(edge, "since", 2024i64.into());
1785 db.delete_edge(edge);
1786
1787 let history = db.history(edge).unwrap();
1788 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1790 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1791 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1792 }
1793
1794 #[test]
1795 fn test_create_node_with_props_cdc() {
1796 let db = GrafeoDB::new_in_memory();
1797
1798 let id = db.create_node_with_props(
1799 &["Person"],
1800 vec![
1801 ("name", grafeo_common::types::Value::from("Alix")),
1802 ("age", grafeo_common::types::Value::from(30i64)),
1803 ],
1804 );
1805
1806 let history = db.history(id).unwrap();
1807 assert_eq!(history.len(), 1);
1808 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1809 let after = history[0].after.as_ref().unwrap();
1811 assert_eq!(after.len(), 2);
1812 }
1813
1814 #[test]
1815 fn test_changes_between() {
1816 let db = GrafeoDB::new_in_memory();
1817
1818 let id1 = db.create_node(&["A"]);
1819 let _id2 = db.create_node(&["B"]);
1820 db.set_node_property(id1, "x", 1i64.into());
1821
1822 let changes = db
1824 .changes_between(
1825 grafeo_common::types::EpochId(0),
1826 grafeo_common::types::EpochId(u64::MAX),
1827 )
1828 .unwrap();
1829 assert_eq!(changes.len(), 3); }
1831 }
1832
1833 #[test]
1834 fn test_with_store_basic() {
1835 use grafeo_core::graph::lpg::LpgStore;
1836
1837 let store = Arc::new(LpgStore::new().unwrap());
1838 let n1 = store.create_node(&["Person"]);
1839 store.set_node_property(n1, "name", "Alix".into());
1840
1841 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1842 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1843
1844 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1845 assert_eq!(result.rows.len(), 1);
1846 }
1847
1848 #[test]
1849 fn test_with_store_session() {
1850 use grafeo_core::graph::lpg::LpgStore;
1851
1852 let store = Arc::new(LpgStore::new().unwrap());
1853 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1854 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1855
1856 let session = db.session();
1857 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1858 assert_eq!(result.rows.len(), 1);
1859 }
1860
1861 #[test]
1862 fn test_with_store_mutations() {
1863 use grafeo_core::graph::lpg::LpgStore;
1864
1865 let store = Arc::new(LpgStore::new().unwrap());
1866 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1867 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1868
1869 let mut session = db.session();
1870
1871 session.begin_transaction().unwrap();
1875 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1876
1877 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1878 assert_eq!(result.rows.len(), 1);
1879
1880 session.commit().unwrap();
1881 }
1882}