1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27#[cfg(feature = "wal")]
28use std::path::Path;
29use std::sync::Arc;
30use std::sync::atomic::AtomicUsize;
31
32use parking_lot::RwLock;
33
34#[cfg(feature = "grafeo-file")]
35use grafeo_adapters::storage::file::GrafeoFileManager;
36#[cfg(feature = "wal")]
37use grafeo_adapters::storage::wal::{
38 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
39};
40use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
41use grafeo_common::utils::error::Result;
42use grafeo_core::graph::GraphStoreMut;
43use grafeo_core::graph::lpg::LpgStore;
44#[cfg(feature = "rdf")]
45use grafeo_core::graph::rdf::RdfStore;
46
47use crate::catalog::Catalog;
48use crate::config::Config;
49use crate::query::cache::QueryCache;
50use crate::session::Session;
51use crate::transaction::TransactionManager;
52
53pub struct GrafeoDB {
76 pub(super) config: Config,
78 pub(super) store: Arc<LpgStore>,
80 pub(super) catalog: Arc<Catalog>,
82 #[cfg(feature = "rdf")]
84 pub(super) rdf_store: Arc<RdfStore>,
85 pub(super) transaction_manager: Arc<TransactionManager>,
87 pub(super) buffer_manager: Arc<BufferManager>,
89 #[cfg(feature = "wal")]
91 pub(super) wal: Option<Arc<LpgWal>>,
92 #[cfg(feature = "wal")]
96 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
97 pub(super) query_cache: Arc<QueryCache>,
99 pub(super) commit_counter: Arc<AtomicUsize>,
101 pub(super) is_open: RwLock<bool>,
103 #[cfg(feature = "cdc")]
105 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
106 #[cfg(feature = "embed")]
108 pub(super) embedding_models:
109 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
110 #[cfg(feature = "grafeo-file")]
112 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
113 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
116 #[cfg(feature = "metrics")]
118 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
119 current_graph: RwLock<Option<String>>,
123}
124
125impl GrafeoDB {
126 #[must_use]
147 pub fn new_in_memory() -> Self {
148 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
149 }
150
151 #[cfg(feature = "wal")]
170 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
171 Self::with_config(Config::persistent(path.as_ref()))
172 }
173
174 pub fn with_config(config: Config) -> Result<Self> {
198 config
200 .validate()
201 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
202
203 let store = Arc::new(LpgStore::new()?);
204 #[cfg(feature = "rdf")]
205 let rdf_store = Arc::new(RdfStore::new());
206 let transaction_manager = Arc::new(TransactionManager::new());
207
208 let buffer_config = BufferManagerConfig {
210 budget: config.memory_limit.unwrap_or_else(|| {
211 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
212 }),
213 spill_path: config
214 .spill_path
215 .clone()
216 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
217 ..BufferManagerConfig::default()
218 };
219 let buffer_manager = BufferManager::new(buffer_config);
220
221 let catalog = Arc::new(Catalog::new());
223
224 #[cfg(feature = "grafeo-file")]
226 let file_manager: Option<Arc<GrafeoFileManager>> = if config.wal_enabled {
227 if let Some(ref db_path) = config.path {
228 if Self::should_use_single_file(db_path, config.storage_format) {
229 let fm = if db_path.exists() && db_path.is_file() {
230 GrafeoFileManager::open(db_path)?
231 } else if !db_path.exists() {
232 GrafeoFileManager::create(db_path)?
233 } else {
234 return Err(grafeo_common::utils::error::Error::Internal(format!(
236 "path exists but is not a file: {}",
237 db_path.display()
238 )));
239 };
240
241 let snapshot_data = fm.read_snapshot()?;
243 if !snapshot_data.is_empty() {
244 Self::apply_snapshot_data(
245 &store,
246 &catalog,
247 #[cfg(feature = "rdf")]
248 &rdf_store,
249 &snapshot_data,
250 )?;
251 }
252
253 if fm.has_sidecar_wal() {
255 let recovery = WalRecovery::new(fm.sidecar_wal_path());
256 let records = recovery.recover()?;
257 Self::apply_wal_records(
258 &store,
259 &catalog,
260 #[cfg(feature = "rdf")]
261 &rdf_store,
262 &records,
263 )?;
264 }
265
266 Some(Arc::new(fm))
267 } else {
268 None
269 }
270 } else {
271 None
272 }
273 } else {
274 None
275 };
276
277 #[cfg(feature = "wal")]
279 let wal = if config.wal_enabled {
280 if let Some(ref db_path) = config.path {
281 #[cfg(feature = "grafeo-file")]
283 let wal_path = if let Some(ref fm) = file_manager {
284 let p = fm.sidecar_wal_path();
285 std::fs::create_dir_all(&p)?;
286 p
287 } else {
288 std::fs::create_dir_all(db_path)?;
290 db_path.join("wal")
291 };
292
293 #[cfg(not(feature = "grafeo-file"))]
294 let wal_path = {
295 std::fs::create_dir_all(db_path)?;
296 db_path.join("wal")
297 };
298
299 #[cfg(feature = "grafeo-file")]
301 let is_single_file = file_manager.is_some();
302 #[cfg(not(feature = "grafeo-file"))]
303 let is_single_file = false;
304
305 if !is_single_file && wal_path.exists() {
306 let recovery = WalRecovery::new(&wal_path);
307 let records = recovery.recover()?;
308 Self::apply_wal_records(
309 &store,
310 &catalog,
311 #[cfg(feature = "rdf")]
312 &rdf_store,
313 &records,
314 )?;
315 }
316
317 let wal_durability = match config.wal_durability {
319 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
320 crate::config::DurabilityMode::Batch {
321 max_delay_ms,
322 max_records,
323 } => WalDurabilityMode::Batch {
324 max_delay_ms,
325 max_records,
326 },
327 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
328 WalDurabilityMode::Adaptive { target_interval_ms }
329 }
330 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
331 };
332 let wal_config = WalConfig {
333 durability: wal_durability,
334 ..WalConfig::default()
335 };
336 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
337 Some(Arc::new(wal_manager))
338 } else {
339 None
340 }
341 } else {
342 None
343 };
344
345 let query_cache = Arc::new(QueryCache::default());
347
348 Ok(Self {
349 config,
350 store,
351 catalog,
352 #[cfg(feature = "rdf")]
353 rdf_store,
354 transaction_manager,
355 buffer_manager,
356 #[cfg(feature = "wal")]
357 wal,
358 #[cfg(feature = "wal")]
359 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
360 query_cache,
361 commit_counter: Arc::new(AtomicUsize::new(0)),
362 is_open: RwLock::new(true),
363 #[cfg(feature = "cdc")]
364 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
365 #[cfg(feature = "embed")]
366 embedding_models: RwLock::new(hashbrown::HashMap::new()),
367 #[cfg(feature = "grafeo-file")]
368 file_manager,
369 external_store: None,
370 #[cfg(feature = "metrics")]
371 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
372 current_graph: RwLock::new(None),
373 })
374 }
375
376 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
401 config
402 .validate()
403 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
404
405 let dummy_store = Arc::new(LpgStore::new()?);
406 let transaction_manager = Arc::new(TransactionManager::new());
407
408 let buffer_config = BufferManagerConfig {
409 budget: config.memory_limit.unwrap_or_else(|| {
410 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
411 }),
412 spill_path: None,
413 ..BufferManagerConfig::default()
414 };
415 let buffer_manager = BufferManager::new(buffer_config);
416
417 let query_cache = Arc::new(QueryCache::default());
418
419 Ok(Self {
420 config,
421 store: dummy_store,
422 catalog: Arc::new(Catalog::new()),
423 #[cfg(feature = "rdf")]
424 rdf_store: Arc::new(RdfStore::new()),
425 transaction_manager,
426 buffer_manager,
427 #[cfg(feature = "wal")]
428 wal: None,
429 #[cfg(feature = "wal")]
430 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
431 query_cache,
432 commit_counter: Arc::new(AtomicUsize::new(0)),
433 is_open: RwLock::new(true),
434 #[cfg(feature = "cdc")]
435 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
436 #[cfg(feature = "embed")]
437 embedding_models: RwLock::new(hashbrown::HashMap::new()),
438 #[cfg(feature = "grafeo-file")]
439 file_manager: None,
440 external_store: Some(store),
441 #[cfg(feature = "metrics")]
442 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
443 current_graph: RwLock::new(None),
444 })
445 }
446
447 #[cfg(feature = "wal")]
453 fn apply_wal_records(
454 store: &Arc<LpgStore>,
455 catalog: &Catalog,
456 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
457 records: &[WalRecord],
458 ) -> Result<()> {
459 use crate::catalog::{
460 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
461 };
462 use grafeo_common::utils::error::Error;
463
464 let mut current_graph: Option<String> = None;
467 let mut target_store: Arc<LpgStore> = Arc::clone(store);
468
469 for record in records {
470 match record {
471 WalRecord::CreateNamedGraph { name } => {
473 let _ = store.create_graph(name);
474 }
475 WalRecord::DropNamedGraph { name } => {
476 store.drop_graph(name);
477 if current_graph.as_deref() == Some(name.as_str()) {
479 current_graph = None;
480 target_store = Arc::clone(store);
481 }
482 }
483 WalRecord::SwitchGraph { name } => {
484 current_graph.clone_from(name);
485 target_store = match ¤t_graph {
486 None => Arc::clone(store),
487 Some(graph_name) => store
488 .graph_or_create(graph_name)
489 .map_err(|e| Error::Internal(e.to_string()))?,
490 };
491 }
492
493 WalRecord::CreateNode { id, labels } => {
495 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
496 target_store.create_node_with_id(*id, &label_refs)?;
497 }
498 WalRecord::DeleteNode { id } => {
499 target_store.delete_node(*id);
500 }
501 WalRecord::CreateEdge {
502 id,
503 src,
504 dst,
505 edge_type,
506 } => {
507 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
508 }
509 WalRecord::DeleteEdge { id } => {
510 target_store.delete_edge(*id);
511 }
512 WalRecord::SetNodeProperty { id, key, value } => {
513 target_store.set_node_property(*id, key, value.clone());
514 }
515 WalRecord::SetEdgeProperty { id, key, value } => {
516 target_store.set_edge_property(*id, key, value.clone());
517 }
518 WalRecord::AddNodeLabel { id, label } => {
519 target_store.add_label(*id, label);
520 }
521 WalRecord::RemoveNodeLabel { id, label } => {
522 target_store.remove_label(*id, label);
523 }
524 WalRecord::RemoveNodeProperty { id, key } => {
525 target_store.remove_node_property(*id, key);
526 }
527 WalRecord::RemoveEdgeProperty { id, key } => {
528 target_store.remove_edge_property(*id, key);
529 }
530
531 WalRecord::CreateNodeType {
533 name,
534 properties,
535 constraints,
536 } => {
537 let def = NodeTypeDefinition {
538 name: name.clone(),
539 properties: properties
540 .iter()
541 .map(|(n, t, nullable)| TypedProperty {
542 name: n.clone(),
543 data_type: PropertyDataType::from_type_name(t),
544 nullable: *nullable,
545 default_value: None,
546 })
547 .collect(),
548 constraints: constraints
549 .iter()
550 .map(|(kind, props)| match kind.as_str() {
551 "unique" => TypeConstraint::Unique(props.clone()),
552 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
553 "not_null" if !props.is_empty() => {
554 TypeConstraint::NotNull(props[0].clone())
555 }
556 _ => TypeConstraint::Unique(props.clone()),
557 })
558 .collect(),
559 parent_types: Vec::new(),
560 };
561 let _ = catalog.register_node_type(def);
562 }
563 WalRecord::DropNodeType { name } => {
564 let _ = catalog.drop_node_type(name);
565 }
566 WalRecord::CreateEdgeType {
567 name,
568 properties,
569 constraints,
570 } => {
571 let def = EdgeTypeDefinition {
572 name: name.clone(),
573 properties: properties
574 .iter()
575 .map(|(n, t, nullable)| TypedProperty {
576 name: n.clone(),
577 data_type: PropertyDataType::from_type_name(t),
578 nullable: *nullable,
579 default_value: None,
580 })
581 .collect(),
582 constraints: constraints
583 .iter()
584 .map(|(kind, props)| match kind.as_str() {
585 "unique" => TypeConstraint::Unique(props.clone()),
586 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
587 "not_null" if !props.is_empty() => {
588 TypeConstraint::NotNull(props[0].clone())
589 }
590 _ => TypeConstraint::Unique(props.clone()),
591 })
592 .collect(),
593 source_node_types: Vec::new(),
594 target_node_types: Vec::new(),
595 };
596 let _ = catalog.register_edge_type_def(def);
597 }
598 WalRecord::DropEdgeType { name } => {
599 let _ = catalog.drop_edge_type_def(name);
600 }
601 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
602 }
605 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
606 }
609 WalRecord::CreateGraphType {
610 name,
611 node_types,
612 edge_types,
613 open,
614 } => {
615 use crate::catalog::GraphTypeDefinition;
616 let def = GraphTypeDefinition {
617 name: name.clone(),
618 allowed_node_types: node_types.clone(),
619 allowed_edge_types: edge_types.clone(),
620 open: *open,
621 };
622 let _ = catalog.register_graph_type(def);
623 }
624 WalRecord::DropGraphType { name } => {
625 let _ = catalog.drop_graph_type(name);
626 }
627 WalRecord::CreateSchema { name } => {
628 let _ = catalog.register_schema_namespace(name.clone());
629 }
630 WalRecord::DropSchema { name } => {
631 let _ = catalog.drop_schema_namespace(name);
632 }
633
634 WalRecord::AlterNodeType { name, alterations } => {
635 for (action, prop_name, type_name, nullable) in alterations {
636 match action.as_str() {
637 "add" => {
638 let prop = TypedProperty {
639 name: prop_name.clone(),
640 data_type: PropertyDataType::from_type_name(type_name),
641 nullable: *nullable,
642 default_value: None,
643 };
644 let _ = catalog.alter_node_type_add_property(name, prop);
645 }
646 "drop" => {
647 let _ = catalog.alter_node_type_drop_property(name, prop_name);
648 }
649 _ => {}
650 }
651 }
652 }
653 WalRecord::AlterEdgeType { name, alterations } => {
654 for (action, prop_name, type_name, nullable) in alterations {
655 match action.as_str() {
656 "add" => {
657 let prop = TypedProperty {
658 name: prop_name.clone(),
659 data_type: PropertyDataType::from_type_name(type_name),
660 nullable: *nullable,
661 default_value: None,
662 };
663 let _ = catalog.alter_edge_type_add_property(name, prop);
664 }
665 "drop" => {
666 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
667 }
668 _ => {}
669 }
670 }
671 }
672 WalRecord::AlterGraphType { name, alterations } => {
673 for (action, type_name) in alterations {
674 match action.as_str() {
675 "add_node" => {
676 let _ =
677 catalog.alter_graph_type_add_node_type(name, type_name.clone());
678 }
679 "drop_node" => {
680 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
681 }
682 "add_edge" => {
683 let _ =
684 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
685 }
686 "drop_edge" => {
687 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
688 }
689 _ => {}
690 }
691 }
692 }
693
694 WalRecord::CreateProcedure {
695 name,
696 params,
697 returns,
698 body,
699 } => {
700 use crate::catalog::ProcedureDefinition;
701 let def = ProcedureDefinition {
702 name: name.clone(),
703 params: params.clone(),
704 returns: returns.clone(),
705 body: body.clone(),
706 };
707 let _ = catalog.register_procedure(def);
708 }
709 WalRecord::DropProcedure { name } => {
710 let _ = catalog.drop_procedure(name);
711 }
712
713 #[cfg(feature = "rdf")]
715 WalRecord::InsertRdfTriple { .. }
716 | WalRecord::DeleteRdfTriple { .. }
717 | WalRecord::ClearRdfGraph { .. }
718 | WalRecord::CreateRdfGraph { .. }
719 | WalRecord::DropRdfGraph { .. } => {
720 rdf_ops::replay_rdf_wal_record(rdf_store, record);
721 }
722 #[cfg(not(feature = "rdf"))]
723 WalRecord::InsertRdfTriple { .. }
724 | WalRecord::DeleteRdfTriple { .. }
725 | WalRecord::ClearRdfGraph { .. }
726 | WalRecord::CreateRdfGraph { .. }
727 | WalRecord::DropRdfGraph { .. } => {}
728
729 WalRecord::TransactionCommit { .. }
730 | WalRecord::TransactionAbort { .. }
731 | WalRecord::Checkpoint { .. } => {
732 }
735 }
736 }
737 Ok(())
738 }
739
740 #[cfg(feature = "grafeo-file")]
746 fn should_use_single_file(
747 path: &std::path::Path,
748 configured: crate::config::StorageFormat,
749 ) -> bool {
750 use crate::config::StorageFormat;
751 match configured {
752 StorageFormat::SingleFile => true,
753 StorageFormat::WalDirectory => false,
754 StorageFormat::Auto => {
755 if path.is_file() {
757 if let Ok(mut f) = std::fs::File::open(path) {
758 use std::io::Read;
759 let mut magic = [0u8; 4];
760 if f.read_exact(&mut magic).is_ok()
761 && magic == grafeo_adapters::storage::file::MAGIC
762 {
763 return true;
764 }
765 }
766 return false;
767 }
768 if path.is_dir() {
770 return false;
771 }
772 path.extension().is_some_and(|ext| ext == "grafeo")
774 }
775 }
776 }
777
778 #[cfg(feature = "grafeo-file")]
780 fn apply_snapshot_data(
781 store: &Arc<LpgStore>,
782 catalog: &Arc<crate::catalog::Catalog>,
783 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
784 data: &[u8],
785 ) -> Result<()> {
786 persistence::load_snapshot_into_store(
787 store,
788 catalog,
789 #[cfg(feature = "rdf")]
790 rdf_store,
791 data,
792 )
793 }
794
795 #[must_use]
823 pub fn session(&self) -> Session {
824 let session_cfg = || crate::session::SessionConfig {
825 transaction_manager: Arc::clone(&self.transaction_manager),
826 query_cache: Arc::clone(&self.query_cache),
827 catalog: Arc::clone(&self.catalog),
828 adaptive_config: self.config.adaptive.clone(),
829 factorized_execution: self.config.factorized_execution,
830 graph_model: self.config.graph_model,
831 query_timeout: self.config.query_timeout,
832 commit_counter: Arc::clone(&self.commit_counter),
833 gc_interval: self.config.gc_interval,
834 };
835
836 if let Some(ref ext_store) = self.external_store {
837 return Session::with_external_store(Arc::clone(ext_store), session_cfg())
838 .expect("arena allocation for external store session");
839 }
840
841 #[cfg(feature = "rdf")]
842 let mut session = Session::with_rdf_store_and_adaptive(
843 Arc::clone(&self.store),
844 Arc::clone(&self.rdf_store),
845 session_cfg(),
846 );
847 #[cfg(not(feature = "rdf"))]
848 let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
849
850 #[cfg(feature = "wal")]
851 if let Some(ref wal) = self.wal {
852 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
853 }
854
855 #[cfg(feature = "cdc")]
856 session.set_cdc_log(Arc::clone(&self.cdc_log));
857
858 #[cfg(feature = "metrics")]
859 {
860 if let Some(ref m) = self.metrics {
861 session.set_metrics(Arc::clone(m));
862 m.session_created
863 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
864 m.session_active
865 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
866 }
867 }
868
869 if let Some(ref graph) = *self.current_graph.read() {
871 session.use_graph(graph);
872 }
873
874 let _ = &mut session;
876
877 session
878 }
879
880 #[must_use]
886 pub fn current_graph(&self) -> Option<String> {
887 self.current_graph.read().clone()
888 }
889
890 pub fn set_current_graph(&self, name: Option<&str>) {
895 *self.current_graph.write() = name.map(ToString::to_string);
896 }
897
898 #[must_use]
900 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
901 &self.config.adaptive
902 }
903
904 #[must_use]
906 pub fn config(&self) -> &Config {
907 &self.config
908 }
909
910 #[must_use]
912 pub fn graph_model(&self) -> crate::config::GraphModel {
913 self.config.graph_model
914 }
915
916 #[must_use]
918 pub fn memory_limit(&self) -> Option<usize> {
919 self.config.memory_limit
920 }
921
922 #[cfg(feature = "metrics")]
927 #[must_use]
928 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
929 let mut snapshot = self
930 .metrics
931 .as_ref()
932 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
933
934 let cache_stats = self.query_cache.stats();
936 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
937 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
938 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
939 snapshot.cache_invalidations = cache_stats.invalidations;
940
941 snapshot
942 }
943
944 #[cfg(feature = "metrics")]
946 pub fn reset_metrics(&self) {
947 if let Some(ref m) = self.metrics {
948 m.reset();
949 }
950 self.query_cache.reset_stats();
951 }
952
953 #[must_use]
961 pub fn store(&self) -> &Arc<LpgStore> {
962 &self.store
963 }
964
965 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
972 let graph_name = self.current_graph.read().clone();
973 match graph_name {
974 None => Arc::clone(&self.store),
975 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
976 Some(ref name) => self
977 .store
978 .graph(name)
979 .unwrap_or_else(|| Arc::clone(&self.store)),
980 }
981 }
982
983 pub fn create_graph(&self, name: &str) -> Result<bool> {
991 Ok(self.store.create_graph(name)?)
992 }
993
994 pub fn drop_graph(&self, name: &str) -> bool {
996 self.store.drop_graph(name)
997 }
998
999 #[must_use]
1001 pub fn list_graphs(&self) -> Vec<String> {
1002 self.store.graph_names()
1003 }
1004
1005 #[must_use]
1013 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1014 if let Some(ref ext_store) = self.external_store {
1015 Arc::clone(ext_store)
1016 } else {
1017 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1018 }
1019 }
1020
1021 pub fn gc(&self) {
1027 let min_epoch = self.transaction_manager.min_active_epoch();
1028 self.store.gc_versions(min_epoch);
1029 self.transaction_manager.gc();
1030 }
1031
1032 #[must_use]
1034 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1035 &self.buffer_manager
1036 }
1037
1038 #[must_use]
1040 pub fn query_cache(&self) -> &Arc<QueryCache> {
1041 &self.query_cache
1042 }
1043
1044 pub fn clear_plan_cache(&self) {
1050 self.query_cache.clear();
1051 }
1052
1053 pub fn close(&self) -> Result<()> {
1067 let mut is_open = self.is_open.write();
1068 if !*is_open {
1069 return Ok(());
1070 }
1071
1072 #[cfg(feature = "grafeo-file")]
1076 let is_single_file = self.file_manager.is_some();
1077 #[cfg(not(feature = "grafeo-file"))]
1078 let is_single_file = false;
1079
1080 #[cfg(feature = "grafeo-file")]
1081 if let Some(ref fm) = self.file_manager {
1082 #[cfg(feature = "wal")]
1084 if let Some(ref wal) = self.wal {
1085 wal.sync()?;
1086 }
1087 self.checkpoint_to_file(fm)?;
1088
1089 #[cfg(feature = "wal")]
1092 if let Some(ref wal) = self.wal {
1093 wal.close_active_log();
1094 }
1095
1096 fm.remove_sidecar_wal()?;
1097 fm.close()?;
1098 }
1099
1100 #[cfg(feature = "wal")]
1102 if !is_single_file && let Some(ref wal) = self.wal {
1103 let epoch = self.store.current_epoch();
1104
1105 let checkpoint_tx = self
1107 .transaction_manager
1108 .last_assigned_transaction_id()
1109 .unwrap_or_else(|| {
1110 self.transaction_manager.begin()
1112 });
1113
1114 wal.log(&WalRecord::TransactionCommit {
1116 transaction_id: checkpoint_tx,
1117 })?;
1118
1119 wal.checkpoint(checkpoint_tx, epoch)?;
1121 wal.sync()?;
1122 }
1123
1124 *is_open = false;
1125 Ok(())
1126 }
1127
1128 #[cfg(feature = "wal")]
1130 #[must_use]
1131 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1132 self.wal.as_ref()
1133 }
1134
1135 #[cfg(feature = "wal")]
1137 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1138 if let Some(ref wal) = self.wal {
1139 wal.log(record)?;
1140 }
1141 Ok(())
1142 }
1143
1144 #[cfg(feature = "grafeo-file")]
1150 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1151 use grafeo_core::testing::crash::maybe_crash;
1152
1153 maybe_crash("checkpoint_to_file:before_export");
1154 let snapshot_data = self.export_snapshot()?;
1155 maybe_crash("checkpoint_to_file:after_export");
1156
1157 let epoch = self.store.current_epoch();
1158 let transaction_id = self
1159 .transaction_manager
1160 .last_assigned_transaction_id()
1161 .map_or(0, |t| t.0);
1162 let node_count = self.store.node_count() as u64;
1163 let edge_count = self.store.edge_count() as u64;
1164
1165 fm.write_snapshot(
1166 &snapshot_data,
1167 epoch.0,
1168 transaction_id,
1169 node_count,
1170 edge_count,
1171 )?;
1172
1173 maybe_crash("checkpoint_to_file:after_write_snapshot");
1174 Ok(())
1175 }
1176
1177 #[cfg(feature = "grafeo-file")]
1179 #[must_use]
1180 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1181 self.file_manager.as_ref()
1182 }
1183}
1184
1185impl Drop for GrafeoDB {
1186 fn drop(&mut self) {
1187 if let Err(e) = self.close() {
1188 tracing::error!("Error closing database: {}", e);
1189 }
1190 }
1191}
1192
1193impl crate::admin::AdminService for GrafeoDB {
1194 fn info(&self) -> crate::admin::DatabaseInfo {
1195 self.info()
1196 }
1197
1198 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1199 self.detailed_stats()
1200 }
1201
1202 fn schema(&self) -> crate::admin::SchemaInfo {
1203 self.schema()
1204 }
1205
1206 fn validate(&self) -> crate::admin::ValidationResult {
1207 self.validate()
1208 }
1209
1210 fn wal_status(&self) -> crate::admin::WalStatus {
1211 self.wal_status()
1212 }
1213
1214 fn wal_checkpoint(&self) -> Result<()> {
1215 self.wal_checkpoint()
1216 }
1217}
1218
1219#[derive(Debug)]
1249pub struct QueryResult {
1250 pub columns: Vec<String>,
1252 pub column_types: Vec<grafeo_common::types::LogicalType>,
1254 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1256 pub execution_time_ms: Option<f64>,
1258 pub rows_scanned: Option<u64>,
1260 pub status_message: Option<String>,
1262 pub gql_status: grafeo_common::utils::GqlStatus,
1264}
1265
1266impl QueryResult {
1267 #[must_use]
1269 pub fn empty() -> Self {
1270 Self {
1271 columns: Vec::new(),
1272 column_types: Vec::new(),
1273 rows: Vec::new(),
1274 execution_time_ms: None,
1275 rows_scanned: None,
1276 status_message: None,
1277 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1278 }
1279 }
1280
1281 #[must_use]
1283 pub fn status(msg: impl Into<String>) -> Self {
1284 Self {
1285 columns: Vec::new(),
1286 column_types: Vec::new(),
1287 rows: Vec::new(),
1288 execution_time_ms: None,
1289 rows_scanned: None,
1290 status_message: Some(msg.into()),
1291 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1292 }
1293 }
1294
1295 #[must_use]
1297 pub fn new(columns: Vec<String>) -> Self {
1298 let len = columns.len();
1299 Self {
1300 columns,
1301 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1302 rows: Vec::new(),
1303 execution_time_ms: None,
1304 rows_scanned: None,
1305 status_message: None,
1306 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1307 }
1308 }
1309
1310 #[must_use]
1312 pub fn with_types(
1313 columns: Vec<String>,
1314 column_types: Vec<grafeo_common::types::LogicalType>,
1315 ) -> Self {
1316 Self {
1317 columns,
1318 column_types,
1319 rows: Vec::new(),
1320 execution_time_ms: None,
1321 rows_scanned: None,
1322 status_message: None,
1323 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1324 }
1325 }
1326
1327 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1329 self.execution_time_ms = Some(execution_time_ms);
1330 self.rows_scanned = Some(rows_scanned);
1331 self
1332 }
1333
1334 #[must_use]
1336 pub fn execution_time_ms(&self) -> Option<f64> {
1337 self.execution_time_ms
1338 }
1339
1340 #[must_use]
1342 pub fn rows_scanned(&self) -> Option<u64> {
1343 self.rows_scanned
1344 }
1345
1346 #[must_use]
1348 pub fn row_count(&self) -> usize {
1349 self.rows.len()
1350 }
1351
1352 #[must_use]
1354 pub fn column_count(&self) -> usize {
1355 self.columns.len()
1356 }
1357
1358 #[must_use]
1360 pub fn is_empty(&self) -> bool {
1361 self.rows.is_empty()
1362 }
1363
1364 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1373 if self.rows.len() != 1 || self.columns.len() != 1 {
1374 return Err(grafeo_common::utils::error::Error::InvalidValue(
1375 "Expected single value".to_string(),
1376 ));
1377 }
1378 T::from_value(&self.rows[0][0])
1379 }
1380
1381 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1383 self.rows.iter()
1384 }
1385}
1386
1387impl std::fmt::Display for QueryResult {
1388 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1389 let table = grafeo_common::fmt::format_result_table(
1390 &self.columns,
1391 &self.rows,
1392 self.execution_time_ms,
1393 self.status_message.as_deref(),
1394 );
1395 f.write_str(&table)
1396 }
1397}
1398
1399pub trait FromValue: Sized {
1404 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1406}
1407
1408impl FromValue for i64 {
1409 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1410 value
1411 .as_int64()
1412 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1413 expected: "INT64".to_string(),
1414 found: value.type_name().to_string(),
1415 })
1416 }
1417}
1418
1419impl FromValue for f64 {
1420 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1421 value
1422 .as_float64()
1423 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1424 expected: "FLOAT64".to_string(),
1425 found: value.type_name().to_string(),
1426 })
1427 }
1428}
1429
1430impl FromValue for String {
1431 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1432 value.as_str().map(String::from).ok_or_else(|| {
1433 grafeo_common::utils::error::Error::TypeMismatch {
1434 expected: "STRING".to_string(),
1435 found: value.type_name().to_string(),
1436 }
1437 })
1438 }
1439}
1440
1441impl FromValue for bool {
1442 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1443 value
1444 .as_bool()
1445 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1446 expected: "BOOL".to_string(),
1447 found: value.type_name().to_string(),
1448 })
1449 }
1450}
1451
1452#[cfg(test)]
1453mod tests {
1454 use super::*;
1455
1456 #[test]
1457 fn test_create_in_memory_database() {
1458 let db = GrafeoDB::new_in_memory();
1459 assert_eq!(db.node_count(), 0);
1460 assert_eq!(db.edge_count(), 0);
1461 }
1462
1463 #[test]
1464 fn test_database_config() {
1465 let config = Config::in_memory().with_threads(4).with_query_logging();
1466
1467 let db = GrafeoDB::with_config(config).unwrap();
1468 assert_eq!(db.config().threads, 4);
1469 assert!(db.config().query_logging);
1470 }
1471
1472 #[test]
1473 fn test_database_session() {
1474 let db = GrafeoDB::new_in_memory();
1475 let _session = db.session();
1476 }
1478
1479 #[cfg(feature = "wal")]
1480 #[test]
1481 fn test_persistent_database_recovery() {
1482 use grafeo_common::types::Value;
1483 use tempfile::tempdir;
1484
1485 let dir = tempdir().unwrap();
1486 let db_path = dir.path().join("test_db");
1487
1488 {
1490 let db = GrafeoDB::open(&db_path).unwrap();
1491
1492 let alix = db.create_node(&["Person"]);
1493 db.set_node_property(alix, "name", Value::from("Alix"));
1494
1495 let gus = db.create_node(&["Person"]);
1496 db.set_node_property(gus, "name", Value::from("Gus"));
1497
1498 let _edge = db.create_edge(alix, gus, "KNOWS");
1499
1500 db.close().unwrap();
1502 }
1503
1504 {
1506 let db = GrafeoDB::open(&db_path).unwrap();
1507
1508 assert_eq!(db.node_count(), 2);
1509 assert_eq!(db.edge_count(), 1);
1510
1511 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1513 assert!(node0.is_some());
1514
1515 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1516 assert!(node1.is_some());
1517 }
1518 }
1519
1520 #[cfg(feature = "wal")]
1521 #[test]
1522 fn test_wal_logging() {
1523 use tempfile::tempdir;
1524
1525 let dir = tempdir().unwrap();
1526 let db_path = dir.path().join("wal_test_db");
1527
1528 let db = GrafeoDB::open(&db_path).unwrap();
1529
1530 let node = db.create_node(&["Test"]);
1532 db.delete_node(node);
1533
1534 if let Some(wal) = db.wal() {
1536 assert!(wal.record_count() > 0);
1537 }
1538
1539 db.close().unwrap();
1540 }
1541
1542 #[cfg(feature = "wal")]
1543 #[test]
1544 fn test_wal_recovery_multiple_sessions() {
1545 use grafeo_common::types::Value;
1547 use tempfile::tempdir;
1548
1549 let dir = tempdir().unwrap();
1550 let db_path = dir.path().join("multi_session_db");
1551
1552 {
1554 let db = GrafeoDB::open(&db_path).unwrap();
1555 let alix = db.create_node(&["Person"]);
1556 db.set_node_property(alix, "name", Value::from("Alix"));
1557 db.close().unwrap();
1558 }
1559
1560 {
1562 let db = GrafeoDB::open(&db_path).unwrap();
1563 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1565 db.set_node_property(gus, "name", Value::from("Gus"));
1566 db.close().unwrap();
1567 }
1568
1569 {
1571 let db = GrafeoDB::open(&db_path).unwrap();
1572 assert_eq!(db.node_count(), 2);
1573
1574 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1576 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1577
1578 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1579 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1580 }
1581 }
1582
1583 #[cfg(feature = "wal")]
1584 #[test]
1585 fn test_database_consistency_after_mutations() {
1586 use grafeo_common::types::Value;
1588 use tempfile::tempdir;
1589
1590 let dir = tempdir().unwrap();
1591 let db_path = dir.path().join("consistency_db");
1592
1593 {
1594 let db = GrafeoDB::open(&db_path).unwrap();
1595
1596 let a = db.create_node(&["Node"]);
1598 let b = db.create_node(&["Node"]);
1599 let c = db.create_node(&["Node"]);
1600
1601 let e1 = db.create_edge(a, b, "LINKS");
1603 let _e2 = db.create_edge(b, c, "LINKS");
1604
1605 db.delete_edge(e1);
1607 db.delete_node(b);
1608
1609 db.set_node_property(a, "value", Value::Int64(1));
1611 db.set_node_property(c, "value", Value::Int64(3));
1612
1613 db.close().unwrap();
1614 }
1615
1616 {
1618 let db = GrafeoDB::open(&db_path).unwrap();
1619
1620 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1624 assert!(node_a.is_some());
1625
1626 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1627 assert!(node_c.is_some());
1628
1629 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1631 assert!(node_b.is_none());
1632 }
1633 }
1634
1635 #[cfg(feature = "wal")]
1636 #[test]
1637 fn test_close_is_idempotent() {
1638 use tempfile::tempdir;
1640
1641 let dir = tempdir().unwrap();
1642 let db_path = dir.path().join("close_test_db");
1643
1644 let db = GrafeoDB::open(&db_path).unwrap();
1645 db.create_node(&["Test"]);
1646
1647 assert!(db.close().is_ok());
1649
1650 assert!(db.close().is_ok());
1652 }
1653
1654 #[test]
1655 fn test_with_store_external_backend() {
1656 use grafeo_core::graph::lpg::LpgStore;
1657
1658 let external = Arc::new(LpgStore::new().unwrap());
1659
1660 let n1 = external.create_node(&["Person"]);
1662 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1663
1664 let db = GrafeoDB::with_store(
1665 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1666 Config::in_memory(),
1667 )
1668 .unwrap();
1669
1670 let session = db.session();
1671
1672 #[cfg(feature = "gql")]
1674 {
1675 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1676 assert_eq!(result.rows.len(), 1);
1677 }
1678 }
1679
1680 #[test]
1681 fn test_with_config_custom_memory_limit() {
1682 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
1685 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1686 assert_eq!(db.node_count(), 0);
1687 }
1688
1689 #[cfg(feature = "metrics")]
1690 #[test]
1691 fn test_database_metrics_registry() {
1692 let db = GrafeoDB::new_in_memory();
1693
1694 db.create_node(&["Person"]);
1696 db.create_node(&["Person"]);
1697
1698 let snap = db.metrics();
1700 assert_eq!(snap.query_count, 0); }
1703
1704 #[test]
1705 fn test_query_result_has_metrics() {
1706 let db = GrafeoDB::new_in_memory();
1708 db.create_node(&["Person"]);
1709 db.create_node(&["Person"]);
1710
1711 #[cfg(feature = "gql")]
1712 {
1713 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1714
1715 assert!(result.execution_time_ms.is_some());
1717 assert!(result.rows_scanned.is_some());
1718 assert!(result.execution_time_ms.unwrap() >= 0.0);
1719 assert_eq!(result.rows_scanned.unwrap(), 2);
1720 }
1721 }
1722
1723 #[test]
1724 fn test_empty_query_result_metrics() {
1725 let db = GrafeoDB::new_in_memory();
1727 db.create_node(&["Person"]);
1728
1729 #[cfg(feature = "gql")]
1730 {
1731 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1733
1734 assert!(result.execution_time_ms.is_some());
1735 assert!(result.rows_scanned.is_some());
1736 assert_eq!(result.rows_scanned.unwrap(), 0);
1737 }
1738 }
1739
1740 #[cfg(feature = "cdc")]
1741 mod cdc_integration {
1742 use super::*;
1743
1744 #[test]
1745 fn test_node_lifecycle_history() {
1746 let db = GrafeoDB::new_in_memory();
1747
1748 let id = db.create_node(&["Person"]);
1750 db.set_node_property(id, "name", "Alix".into());
1752 db.set_node_property(id, "name", "Gus".into());
1753 db.delete_node(id);
1755
1756 let history = db.history(id).unwrap();
1757 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1759 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1760 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1762 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1764 }
1765
1766 #[test]
1767 fn test_edge_lifecycle_history() {
1768 let db = GrafeoDB::new_in_memory();
1769
1770 let alix = db.create_node(&["Person"]);
1771 let gus = db.create_node(&["Person"]);
1772 let edge = db.create_edge(alix, gus, "KNOWS");
1773 db.set_edge_property(edge, "since", 2024i64.into());
1774 db.delete_edge(edge);
1775
1776 let history = db.history(edge).unwrap();
1777 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1779 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1780 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1781 }
1782
1783 #[test]
1784 fn test_create_node_with_props_cdc() {
1785 let db = GrafeoDB::new_in_memory();
1786
1787 let id = db.create_node_with_props(
1788 &["Person"],
1789 vec![
1790 ("name", grafeo_common::types::Value::from("Alix")),
1791 ("age", grafeo_common::types::Value::from(30i64)),
1792 ],
1793 );
1794
1795 let history = db.history(id).unwrap();
1796 assert_eq!(history.len(), 1);
1797 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1798 let after = history[0].after.as_ref().unwrap();
1800 assert_eq!(after.len(), 2);
1801 }
1802
1803 #[test]
1804 fn test_changes_between() {
1805 let db = GrafeoDB::new_in_memory();
1806
1807 let id1 = db.create_node(&["A"]);
1808 let _id2 = db.create_node(&["B"]);
1809 db.set_node_property(id1, "x", 1i64.into());
1810
1811 let changes = db
1813 .changes_between(
1814 grafeo_common::types::EpochId(0),
1815 grafeo_common::types::EpochId(u64::MAX),
1816 )
1817 .unwrap();
1818 assert_eq!(changes.len(), 3); }
1820 }
1821
1822 #[test]
1823 fn test_with_store_basic() {
1824 use grafeo_core::graph::lpg::LpgStore;
1825
1826 let store = Arc::new(LpgStore::new().unwrap());
1827 let n1 = store.create_node(&["Person"]);
1828 store.set_node_property(n1, "name", "Alix".into());
1829
1830 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1831 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1832
1833 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1834 assert_eq!(result.rows.len(), 1);
1835 }
1836
1837 #[test]
1838 fn test_with_store_session() {
1839 use grafeo_core::graph::lpg::LpgStore;
1840
1841 let store = Arc::new(LpgStore::new().unwrap());
1842 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1843 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1844
1845 let session = db.session();
1846 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1847 assert_eq!(result.rows.len(), 1);
1848 }
1849
1850 #[test]
1851 fn test_with_store_mutations() {
1852 use grafeo_core::graph::lpg::LpgStore;
1853
1854 let store = Arc::new(LpgStore::new().unwrap());
1855 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1856 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1857
1858 let mut session = db.session();
1859
1860 session.begin_transaction().unwrap();
1864 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1865
1866 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1867 assert_eq!(result.rows.len(), 1);
1868
1869 session.commit().unwrap();
1870 }
1871}