1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22#[cfg(feature = "wal")]
23pub(crate) mod wal_store;
24
25#[cfg(feature = "wal")]
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::AtomicUsize;
29
30use parking_lot::RwLock;
31
32#[cfg(feature = "grafeo-file")]
33use grafeo_adapters::storage::file::GrafeoFileManager;
34#[cfg(feature = "wal")]
35use grafeo_adapters::storage::wal::{
36 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
37};
38use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
39use grafeo_common::utils::error::Result;
40use grafeo_core::graph::GraphStoreMut;
41use grafeo_core::graph::lpg::LpgStore;
42#[cfg(feature = "rdf")]
43use grafeo_core::graph::rdf::RdfStore;
44
45use crate::catalog::Catalog;
46use crate::config::Config;
47use crate::query::cache::QueryCache;
48use crate::session::Session;
49use crate::transaction::TransactionManager;
50
51pub struct GrafeoDB {
74 pub(super) config: Config,
76 pub(super) store: Arc<LpgStore>,
78 pub(super) catalog: Arc<Catalog>,
80 #[cfg(feature = "rdf")]
82 pub(super) rdf_store: Arc<RdfStore>,
83 pub(super) transaction_manager: Arc<TransactionManager>,
85 pub(super) buffer_manager: Arc<BufferManager>,
87 #[cfg(feature = "wal")]
89 pub(super) wal: Option<Arc<LpgWal>>,
90 #[cfg(feature = "wal")]
94 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
95 pub(super) query_cache: Arc<QueryCache>,
97 pub(super) commit_counter: Arc<AtomicUsize>,
99 pub(super) is_open: RwLock<bool>,
101 #[cfg(feature = "cdc")]
103 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
104 #[cfg(feature = "embed")]
106 pub(super) embedding_models:
107 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
108 #[cfg(feature = "grafeo-file")]
110 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
111 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
114 current_graph: RwLock<Option<String>>,
118}
119
120impl GrafeoDB {
121 #[must_use]
142 pub fn new_in_memory() -> Self {
143 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
144 }
145
146 #[cfg(feature = "wal")]
165 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
166 Self::with_config(Config::persistent(path.as_ref()))
167 }
168
169 pub fn with_config(config: Config) -> Result<Self> {
193 config
195 .validate()
196 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
197
198 let store = Arc::new(LpgStore::new()?);
199 #[cfg(feature = "rdf")]
200 let rdf_store = Arc::new(RdfStore::new());
201 let transaction_manager = Arc::new(TransactionManager::new());
202
203 let buffer_config = BufferManagerConfig {
205 budget: config.memory_limit.unwrap_or_else(|| {
206 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
207 }),
208 spill_path: config
209 .spill_path
210 .clone()
211 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
212 ..BufferManagerConfig::default()
213 };
214 let buffer_manager = BufferManager::new(buffer_config);
215
216 let catalog = Arc::new(Catalog::new());
218
219 #[cfg(feature = "grafeo-file")]
221 let file_manager: Option<Arc<GrafeoFileManager>> = if config.wal_enabled {
222 if let Some(ref db_path) = config.path {
223 if Self::should_use_single_file(db_path, config.storage_format) {
224 let fm = if db_path.exists() && db_path.is_file() {
225 GrafeoFileManager::open(db_path)?
226 } else if !db_path.exists() {
227 GrafeoFileManager::create(db_path)?
228 } else {
229 return Err(grafeo_common::utils::error::Error::Internal(format!(
231 "path exists but is not a file: {}",
232 db_path.display()
233 )));
234 };
235
236 let snapshot_data = fm.read_snapshot()?;
238 if !snapshot_data.is_empty() {
239 Self::apply_snapshot_data(
240 &store,
241 &catalog,
242 #[cfg(feature = "rdf")]
243 &rdf_store,
244 &snapshot_data,
245 )?;
246 }
247
248 if fm.has_sidecar_wal() {
250 let recovery = WalRecovery::new(fm.sidecar_wal_path());
251 let records = recovery.recover()?;
252 Self::apply_wal_records(
253 &store,
254 &catalog,
255 #[cfg(feature = "rdf")]
256 &rdf_store,
257 &records,
258 )?;
259 }
260
261 Some(Arc::new(fm))
262 } else {
263 None
264 }
265 } else {
266 None
267 }
268 } else {
269 None
270 };
271
272 #[cfg(feature = "wal")]
274 let wal = if config.wal_enabled {
275 if let Some(ref db_path) = config.path {
276 #[cfg(feature = "grafeo-file")]
278 let wal_path = if let Some(ref fm) = file_manager {
279 let p = fm.sidecar_wal_path();
280 std::fs::create_dir_all(&p)?;
281 p
282 } else {
283 std::fs::create_dir_all(db_path)?;
285 db_path.join("wal")
286 };
287
288 #[cfg(not(feature = "grafeo-file"))]
289 let wal_path = {
290 std::fs::create_dir_all(db_path)?;
291 db_path.join("wal")
292 };
293
294 #[cfg(feature = "grafeo-file")]
296 let is_single_file = file_manager.is_some();
297 #[cfg(not(feature = "grafeo-file"))]
298 let is_single_file = false;
299
300 if !is_single_file && wal_path.exists() {
301 let recovery = WalRecovery::new(&wal_path);
302 let records = recovery.recover()?;
303 Self::apply_wal_records(
304 &store,
305 &catalog,
306 #[cfg(feature = "rdf")]
307 &rdf_store,
308 &records,
309 )?;
310 }
311
312 let wal_durability = match config.wal_durability {
314 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
315 crate::config::DurabilityMode::Batch {
316 max_delay_ms,
317 max_records,
318 } => WalDurabilityMode::Batch {
319 max_delay_ms,
320 max_records,
321 },
322 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
323 WalDurabilityMode::Adaptive { target_interval_ms }
324 }
325 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
326 };
327 let wal_config = WalConfig {
328 durability: wal_durability,
329 ..WalConfig::default()
330 };
331 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
332 Some(Arc::new(wal_manager))
333 } else {
334 None
335 }
336 } else {
337 None
338 };
339
340 let query_cache = Arc::new(QueryCache::default());
342
343 Ok(Self {
344 config,
345 store,
346 catalog,
347 #[cfg(feature = "rdf")]
348 rdf_store,
349 transaction_manager,
350 buffer_manager,
351 #[cfg(feature = "wal")]
352 wal,
353 #[cfg(feature = "wal")]
354 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
355 query_cache,
356 commit_counter: Arc::new(AtomicUsize::new(0)),
357 is_open: RwLock::new(true),
358 #[cfg(feature = "cdc")]
359 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
360 #[cfg(feature = "embed")]
361 embedding_models: RwLock::new(hashbrown::HashMap::new()),
362 #[cfg(feature = "grafeo-file")]
363 file_manager,
364 external_store: None,
365 current_graph: RwLock::new(None),
366 })
367 }
368
369 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
394 config
395 .validate()
396 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
397
398 let dummy_store = Arc::new(LpgStore::new()?);
399 let transaction_manager = Arc::new(TransactionManager::new());
400
401 let buffer_config = BufferManagerConfig {
402 budget: config.memory_limit.unwrap_or_else(|| {
403 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
404 }),
405 spill_path: None,
406 ..BufferManagerConfig::default()
407 };
408 let buffer_manager = BufferManager::new(buffer_config);
409
410 let query_cache = Arc::new(QueryCache::default());
411
412 Ok(Self {
413 config,
414 store: dummy_store,
415 catalog: Arc::new(Catalog::new()),
416 #[cfg(feature = "rdf")]
417 rdf_store: Arc::new(RdfStore::new()),
418 transaction_manager,
419 buffer_manager,
420 #[cfg(feature = "wal")]
421 wal: None,
422 #[cfg(feature = "wal")]
423 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
424 query_cache,
425 commit_counter: Arc::new(AtomicUsize::new(0)),
426 is_open: RwLock::new(true),
427 #[cfg(feature = "cdc")]
428 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
429 #[cfg(feature = "embed")]
430 embedding_models: RwLock::new(hashbrown::HashMap::new()),
431 #[cfg(feature = "grafeo-file")]
432 file_manager: None,
433 external_store: Some(store),
434 current_graph: RwLock::new(None),
435 })
436 }
437
438 #[cfg(feature = "wal")]
444 fn apply_wal_records(
445 store: &Arc<LpgStore>,
446 catalog: &Catalog,
447 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
448 records: &[WalRecord],
449 ) -> Result<()> {
450 use crate::catalog::{
451 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
452 };
453 use grafeo_common::utils::error::Error;
454
455 let mut current_graph: Option<String> = None;
458 let mut target_store: Arc<LpgStore> = Arc::clone(store);
459
460 for record in records {
461 match record {
462 WalRecord::CreateNamedGraph { name } => {
464 let _ = store.create_graph(name);
465 }
466 WalRecord::DropNamedGraph { name } => {
467 store.drop_graph(name);
468 if current_graph.as_deref() == Some(name.as_str()) {
470 current_graph = None;
471 target_store = Arc::clone(store);
472 }
473 }
474 WalRecord::SwitchGraph { name } => {
475 current_graph.clone_from(name);
476 target_store = match ¤t_graph {
477 None => Arc::clone(store),
478 Some(graph_name) => store
479 .graph_or_create(graph_name)
480 .map_err(|e| Error::Internal(e.to_string()))?,
481 };
482 }
483
484 WalRecord::CreateNode { id, labels } => {
486 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
487 target_store.create_node_with_id(*id, &label_refs)?;
488 }
489 WalRecord::DeleteNode { id } => {
490 target_store.delete_node(*id);
491 }
492 WalRecord::CreateEdge {
493 id,
494 src,
495 dst,
496 edge_type,
497 } => {
498 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
499 }
500 WalRecord::DeleteEdge { id } => {
501 target_store.delete_edge(*id);
502 }
503 WalRecord::SetNodeProperty { id, key, value } => {
504 target_store.set_node_property(*id, key, value.clone());
505 }
506 WalRecord::SetEdgeProperty { id, key, value } => {
507 target_store.set_edge_property(*id, key, value.clone());
508 }
509 WalRecord::AddNodeLabel { id, label } => {
510 target_store.add_label(*id, label);
511 }
512 WalRecord::RemoveNodeLabel { id, label } => {
513 target_store.remove_label(*id, label);
514 }
515 WalRecord::RemoveNodeProperty { id, key } => {
516 target_store.remove_node_property(*id, key);
517 }
518 WalRecord::RemoveEdgeProperty { id, key } => {
519 target_store.remove_edge_property(*id, key);
520 }
521
522 WalRecord::CreateNodeType {
524 name,
525 properties,
526 constraints,
527 } => {
528 let def = NodeTypeDefinition {
529 name: name.clone(),
530 properties: properties
531 .iter()
532 .map(|(n, t, nullable)| TypedProperty {
533 name: n.clone(),
534 data_type: PropertyDataType::from_type_name(t),
535 nullable: *nullable,
536 default_value: None,
537 })
538 .collect(),
539 constraints: constraints
540 .iter()
541 .map(|(kind, props)| match kind.as_str() {
542 "unique" => TypeConstraint::Unique(props.clone()),
543 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
544 "not_null" if !props.is_empty() => {
545 TypeConstraint::NotNull(props[0].clone())
546 }
547 _ => TypeConstraint::Unique(props.clone()),
548 })
549 .collect(),
550 parent_types: Vec::new(),
551 };
552 let _ = catalog.register_node_type(def);
553 }
554 WalRecord::DropNodeType { name } => {
555 let _ = catalog.drop_node_type(name);
556 }
557 WalRecord::CreateEdgeType {
558 name,
559 properties,
560 constraints,
561 } => {
562 let def = EdgeTypeDefinition {
563 name: name.clone(),
564 properties: properties
565 .iter()
566 .map(|(n, t, nullable)| TypedProperty {
567 name: n.clone(),
568 data_type: PropertyDataType::from_type_name(t),
569 nullable: *nullable,
570 default_value: None,
571 })
572 .collect(),
573 constraints: constraints
574 .iter()
575 .map(|(kind, props)| match kind.as_str() {
576 "unique" => TypeConstraint::Unique(props.clone()),
577 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
578 "not_null" if !props.is_empty() => {
579 TypeConstraint::NotNull(props[0].clone())
580 }
581 _ => TypeConstraint::Unique(props.clone()),
582 })
583 .collect(),
584 source_node_types: Vec::new(),
585 target_node_types: Vec::new(),
586 };
587 let _ = catalog.register_edge_type_def(def);
588 }
589 WalRecord::DropEdgeType { name } => {
590 let _ = catalog.drop_edge_type_def(name);
591 }
592 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
593 }
596 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
597 }
600 WalRecord::CreateGraphType {
601 name,
602 node_types,
603 edge_types,
604 open,
605 } => {
606 use crate::catalog::GraphTypeDefinition;
607 let def = GraphTypeDefinition {
608 name: name.clone(),
609 allowed_node_types: node_types.clone(),
610 allowed_edge_types: edge_types.clone(),
611 open: *open,
612 };
613 let _ = catalog.register_graph_type(def);
614 }
615 WalRecord::DropGraphType { name } => {
616 let _ = catalog.drop_graph_type(name);
617 }
618 WalRecord::CreateSchema { name } => {
619 let _ = catalog.register_schema_namespace(name.clone());
620 }
621 WalRecord::DropSchema { name } => {
622 let _ = catalog.drop_schema_namespace(name);
623 }
624
625 WalRecord::AlterNodeType { name, alterations } => {
626 for (action, prop_name, type_name, nullable) in alterations {
627 match action.as_str() {
628 "add" => {
629 let prop = TypedProperty {
630 name: prop_name.clone(),
631 data_type: PropertyDataType::from_type_name(type_name),
632 nullable: *nullable,
633 default_value: None,
634 };
635 let _ = catalog.alter_node_type_add_property(name, prop);
636 }
637 "drop" => {
638 let _ = catalog.alter_node_type_drop_property(name, prop_name);
639 }
640 _ => {}
641 }
642 }
643 }
644 WalRecord::AlterEdgeType { name, alterations } => {
645 for (action, prop_name, type_name, nullable) in alterations {
646 match action.as_str() {
647 "add" => {
648 let prop = TypedProperty {
649 name: prop_name.clone(),
650 data_type: PropertyDataType::from_type_name(type_name),
651 nullable: *nullable,
652 default_value: None,
653 };
654 let _ = catalog.alter_edge_type_add_property(name, prop);
655 }
656 "drop" => {
657 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
658 }
659 _ => {}
660 }
661 }
662 }
663 WalRecord::AlterGraphType { name, alterations } => {
664 for (action, type_name) in alterations {
665 match action.as_str() {
666 "add_node" => {
667 let _ =
668 catalog.alter_graph_type_add_node_type(name, type_name.clone());
669 }
670 "drop_node" => {
671 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
672 }
673 "add_edge" => {
674 let _ =
675 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
676 }
677 "drop_edge" => {
678 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
679 }
680 _ => {}
681 }
682 }
683 }
684
685 WalRecord::CreateProcedure {
686 name,
687 params,
688 returns,
689 body,
690 } => {
691 use crate::catalog::ProcedureDefinition;
692 let def = ProcedureDefinition {
693 name: name.clone(),
694 params: params.clone(),
695 returns: returns.clone(),
696 body: body.clone(),
697 };
698 let _ = catalog.register_procedure(def);
699 }
700 WalRecord::DropProcedure { name } => {
701 let _ = catalog.drop_procedure(name);
702 }
703
704 #[cfg(feature = "rdf")]
706 WalRecord::InsertRdfTriple {
707 subject,
708 predicate,
709 object,
710 graph,
711 } => {
712 use grafeo_core::graph::rdf::Term;
713 if let (Some(s), Some(p), Some(o)) = (
714 Term::from_ntriples(subject),
715 Term::from_ntriples(predicate),
716 Term::from_ntriples(object),
717 ) {
718 let triple = grafeo_core::graph::rdf::Triple::new(s, p, o);
719 let target = match graph {
720 Some(name) => rdf_store.graph_or_create(name),
721 None => Arc::clone(rdf_store),
722 };
723 target.insert(triple);
724 }
725 }
726 #[cfg(feature = "rdf")]
727 WalRecord::DeleteRdfTriple {
728 subject,
729 predicate,
730 object,
731 graph,
732 } => {
733 use grafeo_core::graph::rdf::Term;
734 if let (Some(s), Some(p), Some(o)) = (
735 Term::from_ntriples(subject),
736 Term::from_ntriples(predicate),
737 Term::from_ntriples(object),
738 ) {
739 let triple = grafeo_core::graph::rdf::Triple::new(s, p, o);
740 let target = match graph {
741 Some(name) => rdf_store.graph_or_create(name),
742 None => Arc::clone(rdf_store),
743 };
744 target.remove(&triple);
745 }
746 }
747 #[cfg(feature = "rdf")]
748 WalRecord::ClearRdfGraph { graph } => {
749 rdf_store.clear_graph(graph.as_deref());
750 }
751 #[cfg(feature = "rdf")]
752 WalRecord::CreateRdfGraph { name } => {
753 let _ = rdf_store.create_graph(name);
754 }
755 #[cfg(feature = "rdf")]
756 WalRecord::DropRdfGraph { name } => match name {
757 None => rdf_store.clear(),
758 Some(graph_name) => {
759 rdf_store.drop_graph(graph_name);
760 }
761 },
762 #[cfg(not(feature = "rdf"))]
764 WalRecord::InsertRdfTriple { .. }
765 | WalRecord::DeleteRdfTriple { .. }
766 | WalRecord::ClearRdfGraph { .. }
767 | WalRecord::CreateRdfGraph { .. }
768 | WalRecord::DropRdfGraph { .. } => {}
769
770 WalRecord::TransactionCommit { .. }
771 | WalRecord::TransactionAbort { .. }
772 | WalRecord::Checkpoint { .. } => {
773 }
776 }
777 }
778 Ok(())
779 }
780
781 #[cfg(feature = "grafeo-file")]
787 fn should_use_single_file(
788 path: &std::path::Path,
789 configured: crate::config::StorageFormat,
790 ) -> bool {
791 use crate::config::StorageFormat;
792 match configured {
793 StorageFormat::SingleFile => true,
794 StorageFormat::WalDirectory => false,
795 StorageFormat::Auto => {
796 if path.is_file() {
798 if let Ok(mut f) = std::fs::File::open(path) {
799 use std::io::Read;
800 let mut magic = [0u8; 4];
801 if f.read_exact(&mut magic).is_ok()
802 && magic == grafeo_adapters::storage::file::MAGIC
803 {
804 return true;
805 }
806 }
807 return false;
808 }
809 if path.is_dir() {
811 return false;
812 }
813 path.extension().is_some_and(|ext| ext == "grafeo")
815 }
816 }
817 }
818
819 #[cfg(feature = "grafeo-file")]
821 fn apply_snapshot_data(
822 store: &Arc<LpgStore>,
823 catalog: &Arc<crate::catalog::Catalog>,
824 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
825 data: &[u8],
826 ) -> Result<()> {
827 persistence::load_snapshot_into_store(
828 store,
829 catalog,
830 #[cfg(feature = "rdf")]
831 rdf_store,
832 data,
833 )
834 }
835
836 #[must_use]
864 pub fn session(&self) -> Session {
865 let session_cfg = || crate::session::SessionConfig {
866 transaction_manager: Arc::clone(&self.transaction_manager),
867 query_cache: Arc::clone(&self.query_cache),
868 catalog: Arc::clone(&self.catalog),
869 adaptive_config: self.config.adaptive.clone(),
870 factorized_execution: self.config.factorized_execution,
871 graph_model: self.config.graph_model,
872 query_timeout: self.config.query_timeout,
873 commit_counter: Arc::clone(&self.commit_counter),
874 gc_interval: self.config.gc_interval,
875 };
876
877 if let Some(ref ext_store) = self.external_store {
878 return Session::with_external_store(Arc::clone(ext_store), session_cfg())
879 .expect("arena allocation for external store session");
880 }
881
882 #[cfg(feature = "rdf")]
883 let mut session = Session::with_rdf_store_and_adaptive(
884 Arc::clone(&self.store),
885 Arc::clone(&self.rdf_store),
886 session_cfg(),
887 );
888 #[cfg(not(feature = "rdf"))]
889 let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
890
891 #[cfg(feature = "wal")]
892 if let Some(ref wal) = self.wal {
893 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
894 }
895
896 #[cfg(feature = "cdc")]
897 session.set_cdc_log(Arc::clone(&self.cdc_log));
898
899 if let Some(ref graph) = *self.current_graph.read() {
901 session.use_graph(graph);
902 }
903
904 let _ = &mut session;
906
907 session
908 }
909
910 #[must_use]
916 pub fn current_graph(&self) -> Option<String> {
917 self.current_graph.read().clone()
918 }
919
920 pub fn set_current_graph(&self, name: Option<&str>) {
925 *self.current_graph.write() = name.map(ToString::to_string);
926 }
927
928 #[must_use]
930 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
931 &self.config.adaptive
932 }
933
934 #[must_use]
936 pub fn config(&self) -> &Config {
937 &self.config
938 }
939
940 #[must_use]
942 pub fn graph_model(&self) -> crate::config::GraphModel {
943 self.config.graph_model
944 }
945
946 #[must_use]
948 pub fn memory_limit(&self) -> Option<usize> {
949 self.config.memory_limit
950 }
951
952 #[must_use]
960 pub fn store(&self) -> &Arc<LpgStore> {
961 &self.store
962 }
963
964 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
971 let graph_name = self.current_graph.read().clone();
972 match graph_name {
973 None => Arc::clone(&self.store),
974 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
975 Some(ref name) => self
976 .store
977 .graph(name)
978 .unwrap_or_else(|| Arc::clone(&self.store)),
979 }
980 }
981
982 pub fn create_graph(&self, name: &str) -> Result<bool> {
990 Ok(self.store.create_graph(name)?)
991 }
992
993 pub fn drop_graph(&self, name: &str) -> bool {
995 self.store.drop_graph(name)
996 }
997
998 #[must_use]
1000 pub fn list_graphs(&self) -> Vec<String> {
1001 self.store.graph_names()
1002 }
1003
1004 #[must_use]
1012 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1013 if let Some(ref ext_store) = self.external_store {
1014 Arc::clone(ext_store)
1015 } else {
1016 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1017 }
1018 }
1019
1020 pub fn gc(&self) {
1026 let min_epoch = self.transaction_manager.min_active_epoch();
1027 self.store.gc_versions(min_epoch);
1028 self.transaction_manager.gc();
1029 }
1030
1031 #[must_use]
1033 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1034 &self.buffer_manager
1035 }
1036
1037 #[must_use]
1039 pub fn query_cache(&self) -> &Arc<QueryCache> {
1040 &self.query_cache
1041 }
1042
1043 pub fn clear_plan_cache(&self) {
1049 self.query_cache.clear();
1050 }
1051
1052 pub fn close(&self) -> Result<()> {
1066 let mut is_open = self.is_open.write();
1067 if !*is_open {
1068 return Ok(());
1069 }
1070
1071 #[cfg(feature = "grafeo-file")]
1075 let is_single_file = self.file_manager.is_some();
1076 #[cfg(not(feature = "grafeo-file"))]
1077 let is_single_file = false;
1078
1079 #[cfg(feature = "grafeo-file")]
1080 if let Some(ref fm) = self.file_manager {
1081 #[cfg(feature = "wal")]
1083 if let Some(ref wal) = self.wal {
1084 wal.sync()?;
1085 }
1086 self.checkpoint_to_file(fm)?;
1087
1088 #[cfg(feature = "wal")]
1091 if let Some(ref wal) = self.wal {
1092 wal.close_active_log();
1093 }
1094
1095 fm.remove_sidecar_wal()?;
1096 fm.close()?;
1097 }
1098
1099 #[cfg(feature = "wal")]
1101 if !is_single_file && let Some(ref wal) = self.wal {
1102 let epoch = self.store.current_epoch();
1103
1104 let checkpoint_tx = self
1106 .transaction_manager
1107 .last_assigned_transaction_id()
1108 .unwrap_or_else(|| {
1109 self.transaction_manager.begin()
1111 });
1112
1113 wal.log(&WalRecord::TransactionCommit {
1115 transaction_id: checkpoint_tx,
1116 })?;
1117
1118 wal.checkpoint(checkpoint_tx, epoch)?;
1120 wal.sync()?;
1121 }
1122
1123 *is_open = false;
1124 Ok(())
1125 }
1126
1127 #[cfg(feature = "wal")]
1129 #[must_use]
1130 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1131 self.wal.as_ref()
1132 }
1133
1134 #[cfg(feature = "wal")]
1136 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1137 if let Some(ref wal) = self.wal {
1138 wal.log(record)?;
1139 }
1140 Ok(())
1141 }
1142
1143 #[cfg(feature = "grafeo-file")]
1149 fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1150 use grafeo_core::testing::crash::maybe_crash;
1151
1152 maybe_crash("checkpoint_to_file:before_export");
1153 let snapshot_data = self.export_snapshot()?;
1154 maybe_crash("checkpoint_to_file:after_export");
1155
1156 let epoch = self.store.current_epoch();
1157 let transaction_id = self
1158 .transaction_manager
1159 .last_assigned_transaction_id()
1160 .map_or(0, |t| t.0);
1161 let node_count = self.store.node_count() as u64;
1162 let edge_count = self.store.edge_count() as u64;
1163
1164 fm.write_snapshot(
1165 &snapshot_data,
1166 epoch.0,
1167 transaction_id,
1168 node_count,
1169 edge_count,
1170 )?;
1171
1172 maybe_crash("checkpoint_to_file:after_write_snapshot");
1173 Ok(())
1174 }
1175
1176 #[cfg(feature = "grafeo-file")]
1178 #[must_use]
1179 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1180 self.file_manager.as_ref()
1181 }
1182}
1183
1184impl Drop for GrafeoDB {
1185 fn drop(&mut self) {
1186 if let Err(e) = self.close() {
1187 tracing::error!("Error closing database: {}", e);
1188 }
1189 }
1190}
1191
1192impl crate::admin::AdminService for GrafeoDB {
1193 fn info(&self) -> crate::admin::DatabaseInfo {
1194 self.info()
1195 }
1196
1197 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1198 self.detailed_stats()
1199 }
1200
1201 fn schema(&self) -> crate::admin::SchemaInfo {
1202 self.schema()
1203 }
1204
1205 fn validate(&self) -> crate::admin::ValidationResult {
1206 self.validate()
1207 }
1208
1209 fn wal_status(&self) -> crate::admin::WalStatus {
1210 self.wal_status()
1211 }
1212
1213 fn wal_checkpoint(&self) -> Result<()> {
1214 self.wal_checkpoint()
1215 }
1216}
1217
1218#[derive(Debug)]
1248pub struct QueryResult {
1249 pub columns: Vec<String>,
1251 pub column_types: Vec<grafeo_common::types::LogicalType>,
1253 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1255 pub execution_time_ms: Option<f64>,
1257 pub rows_scanned: Option<u64>,
1259 pub status_message: Option<String>,
1261 pub gql_status: grafeo_common::utils::GqlStatus,
1263}
1264
1265impl QueryResult {
1266 #[must_use]
1268 pub fn empty() -> Self {
1269 Self {
1270 columns: Vec::new(),
1271 column_types: Vec::new(),
1272 rows: Vec::new(),
1273 execution_time_ms: None,
1274 rows_scanned: None,
1275 status_message: None,
1276 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1277 }
1278 }
1279
1280 #[must_use]
1282 pub fn status(msg: impl Into<String>) -> Self {
1283 Self {
1284 columns: Vec::new(),
1285 column_types: Vec::new(),
1286 rows: Vec::new(),
1287 execution_time_ms: None,
1288 rows_scanned: None,
1289 status_message: Some(msg.into()),
1290 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1291 }
1292 }
1293
1294 #[must_use]
1296 pub fn new(columns: Vec<String>) -> Self {
1297 let len = columns.len();
1298 Self {
1299 columns,
1300 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1301 rows: Vec::new(),
1302 execution_time_ms: None,
1303 rows_scanned: None,
1304 status_message: None,
1305 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1306 }
1307 }
1308
1309 #[must_use]
1311 pub fn with_types(
1312 columns: Vec<String>,
1313 column_types: Vec<grafeo_common::types::LogicalType>,
1314 ) -> Self {
1315 Self {
1316 columns,
1317 column_types,
1318 rows: Vec::new(),
1319 execution_time_ms: None,
1320 rows_scanned: None,
1321 status_message: None,
1322 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1323 }
1324 }
1325
1326 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1328 self.execution_time_ms = Some(execution_time_ms);
1329 self.rows_scanned = Some(rows_scanned);
1330 self
1331 }
1332
1333 #[must_use]
1335 pub fn execution_time_ms(&self) -> Option<f64> {
1336 self.execution_time_ms
1337 }
1338
1339 #[must_use]
1341 pub fn rows_scanned(&self) -> Option<u64> {
1342 self.rows_scanned
1343 }
1344
1345 #[must_use]
1347 pub fn row_count(&self) -> usize {
1348 self.rows.len()
1349 }
1350
1351 #[must_use]
1353 pub fn column_count(&self) -> usize {
1354 self.columns.len()
1355 }
1356
1357 #[must_use]
1359 pub fn is_empty(&self) -> bool {
1360 self.rows.is_empty()
1361 }
1362
1363 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1372 if self.rows.len() != 1 || self.columns.len() != 1 {
1373 return Err(grafeo_common::utils::error::Error::InvalidValue(
1374 "Expected single value".to_string(),
1375 ));
1376 }
1377 T::from_value(&self.rows[0][0])
1378 }
1379
1380 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1382 self.rows.iter()
1383 }
1384}
1385
1386pub trait FromValue: Sized {
1391 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1393}
1394
1395impl FromValue for i64 {
1396 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1397 value
1398 .as_int64()
1399 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1400 expected: "INT64".to_string(),
1401 found: value.type_name().to_string(),
1402 })
1403 }
1404}
1405
1406impl FromValue for f64 {
1407 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1408 value
1409 .as_float64()
1410 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1411 expected: "FLOAT64".to_string(),
1412 found: value.type_name().to_string(),
1413 })
1414 }
1415}
1416
1417impl FromValue for String {
1418 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1419 value.as_str().map(String::from).ok_or_else(|| {
1420 grafeo_common::utils::error::Error::TypeMismatch {
1421 expected: "STRING".to_string(),
1422 found: value.type_name().to_string(),
1423 }
1424 })
1425 }
1426}
1427
1428impl FromValue for bool {
1429 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1430 value
1431 .as_bool()
1432 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1433 expected: "BOOL".to_string(),
1434 found: value.type_name().to_string(),
1435 })
1436 }
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441 use super::*;
1442
1443 #[test]
1444 fn test_create_in_memory_database() {
1445 let db = GrafeoDB::new_in_memory();
1446 assert_eq!(db.node_count(), 0);
1447 assert_eq!(db.edge_count(), 0);
1448 }
1449
1450 #[test]
1451 fn test_database_config() {
1452 let config = Config::in_memory().with_threads(4).with_query_logging();
1453
1454 let db = GrafeoDB::with_config(config).unwrap();
1455 assert_eq!(db.config().threads, 4);
1456 assert!(db.config().query_logging);
1457 }
1458
1459 #[test]
1460 fn test_database_session() {
1461 let db = GrafeoDB::new_in_memory();
1462 let _session = db.session();
1463 }
1465
1466 #[cfg(feature = "wal")]
1467 #[test]
1468 fn test_persistent_database_recovery() {
1469 use grafeo_common::types::Value;
1470 use tempfile::tempdir;
1471
1472 let dir = tempdir().unwrap();
1473 let db_path = dir.path().join("test_db");
1474
1475 {
1477 let db = GrafeoDB::open(&db_path).unwrap();
1478
1479 let alix = db.create_node(&["Person"]);
1480 db.set_node_property(alix, "name", Value::from("Alix"));
1481
1482 let gus = db.create_node(&["Person"]);
1483 db.set_node_property(gus, "name", Value::from("Gus"));
1484
1485 let _edge = db.create_edge(alix, gus, "KNOWS");
1486
1487 db.close().unwrap();
1489 }
1490
1491 {
1493 let db = GrafeoDB::open(&db_path).unwrap();
1494
1495 assert_eq!(db.node_count(), 2);
1496 assert_eq!(db.edge_count(), 1);
1497
1498 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1500 assert!(node0.is_some());
1501
1502 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1503 assert!(node1.is_some());
1504 }
1505 }
1506
1507 #[cfg(feature = "wal")]
1508 #[test]
1509 fn test_wal_logging() {
1510 use tempfile::tempdir;
1511
1512 let dir = tempdir().unwrap();
1513 let db_path = dir.path().join("wal_test_db");
1514
1515 let db = GrafeoDB::open(&db_path).unwrap();
1516
1517 let node = db.create_node(&["Test"]);
1519 db.delete_node(node);
1520
1521 if let Some(wal) = db.wal() {
1523 assert!(wal.record_count() > 0);
1524 }
1525
1526 db.close().unwrap();
1527 }
1528
1529 #[cfg(feature = "wal")]
1530 #[test]
1531 fn test_wal_recovery_multiple_sessions() {
1532 use grafeo_common::types::Value;
1534 use tempfile::tempdir;
1535
1536 let dir = tempdir().unwrap();
1537 let db_path = dir.path().join("multi_session_db");
1538
1539 {
1541 let db = GrafeoDB::open(&db_path).unwrap();
1542 let alix = db.create_node(&["Person"]);
1543 db.set_node_property(alix, "name", Value::from("Alix"));
1544 db.close().unwrap();
1545 }
1546
1547 {
1549 let db = GrafeoDB::open(&db_path).unwrap();
1550 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1552 db.set_node_property(gus, "name", Value::from("Gus"));
1553 db.close().unwrap();
1554 }
1555
1556 {
1558 let db = GrafeoDB::open(&db_path).unwrap();
1559 assert_eq!(db.node_count(), 2);
1560
1561 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1563 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1564
1565 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1566 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1567 }
1568 }
1569
1570 #[cfg(feature = "wal")]
1571 #[test]
1572 fn test_database_consistency_after_mutations() {
1573 use grafeo_common::types::Value;
1575 use tempfile::tempdir;
1576
1577 let dir = tempdir().unwrap();
1578 let db_path = dir.path().join("consistency_db");
1579
1580 {
1581 let db = GrafeoDB::open(&db_path).unwrap();
1582
1583 let a = db.create_node(&["Node"]);
1585 let b = db.create_node(&["Node"]);
1586 let c = db.create_node(&["Node"]);
1587
1588 let e1 = db.create_edge(a, b, "LINKS");
1590 let _e2 = db.create_edge(b, c, "LINKS");
1591
1592 db.delete_edge(e1);
1594 db.delete_node(b);
1595
1596 db.set_node_property(a, "value", Value::Int64(1));
1598 db.set_node_property(c, "value", Value::Int64(3));
1599
1600 db.close().unwrap();
1601 }
1602
1603 {
1605 let db = GrafeoDB::open(&db_path).unwrap();
1606
1607 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1611 assert!(node_a.is_some());
1612
1613 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1614 assert!(node_c.is_some());
1615
1616 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1618 assert!(node_b.is_none());
1619 }
1620 }
1621
1622 #[cfg(feature = "wal")]
1623 #[test]
1624 fn test_close_is_idempotent() {
1625 use tempfile::tempdir;
1627
1628 let dir = tempdir().unwrap();
1629 let db_path = dir.path().join("close_test_db");
1630
1631 let db = GrafeoDB::open(&db_path).unwrap();
1632 db.create_node(&["Test"]);
1633
1634 assert!(db.close().is_ok());
1636
1637 assert!(db.close().is_ok());
1639 }
1640
1641 #[test]
1642 fn test_query_result_has_metrics() {
1643 let db = GrafeoDB::new_in_memory();
1645 db.create_node(&["Person"]);
1646 db.create_node(&["Person"]);
1647
1648 #[cfg(feature = "gql")]
1649 {
1650 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1651
1652 assert!(result.execution_time_ms.is_some());
1654 assert!(result.rows_scanned.is_some());
1655 assert!(result.execution_time_ms.unwrap() >= 0.0);
1656 assert_eq!(result.rows_scanned.unwrap(), 2);
1657 }
1658 }
1659
1660 #[test]
1661 fn test_empty_query_result_metrics() {
1662 let db = GrafeoDB::new_in_memory();
1664 db.create_node(&["Person"]);
1665
1666 #[cfg(feature = "gql")]
1667 {
1668 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1670
1671 assert!(result.execution_time_ms.is_some());
1672 assert!(result.rows_scanned.is_some());
1673 assert_eq!(result.rows_scanned.unwrap(), 0);
1674 }
1675 }
1676
1677 #[cfg(feature = "cdc")]
1678 mod cdc_integration {
1679 use super::*;
1680
1681 #[test]
1682 fn test_node_lifecycle_history() {
1683 let db = GrafeoDB::new_in_memory();
1684
1685 let id = db.create_node(&["Person"]);
1687 db.set_node_property(id, "name", "Alix".into());
1689 db.set_node_property(id, "name", "Gus".into());
1690 db.delete_node(id);
1692
1693 let history = db.history(id).unwrap();
1694 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1696 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1697 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1699 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1701 }
1702
1703 #[test]
1704 fn test_edge_lifecycle_history() {
1705 let db = GrafeoDB::new_in_memory();
1706
1707 let alix = db.create_node(&["Person"]);
1708 let gus = db.create_node(&["Person"]);
1709 let edge = db.create_edge(alix, gus, "KNOWS");
1710 db.set_edge_property(edge, "since", 2024i64.into());
1711 db.delete_edge(edge);
1712
1713 let history = db.history(edge).unwrap();
1714 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1716 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1717 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1718 }
1719
1720 #[test]
1721 fn test_create_node_with_props_cdc() {
1722 let db = GrafeoDB::new_in_memory();
1723
1724 let id = db.create_node_with_props(
1725 &["Person"],
1726 vec![
1727 ("name", grafeo_common::types::Value::from("Alix")),
1728 ("age", grafeo_common::types::Value::from(30i64)),
1729 ],
1730 );
1731
1732 let history = db.history(id).unwrap();
1733 assert_eq!(history.len(), 1);
1734 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1735 let after = history[0].after.as_ref().unwrap();
1737 assert_eq!(after.len(), 2);
1738 }
1739
1740 #[test]
1741 fn test_changes_between() {
1742 let db = GrafeoDB::new_in_memory();
1743
1744 let id1 = db.create_node(&["A"]);
1745 let _id2 = db.create_node(&["B"]);
1746 db.set_node_property(id1, "x", 1i64.into());
1747
1748 let changes = db
1750 .changes_between(
1751 grafeo_common::types::EpochId(0),
1752 grafeo_common::types::EpochId(u64::MAX),
1753 )
1754 .unwrap();
1755 assert_eq!(changes.len(), 3); }
1757 }
1758
1759 #[test]
1760 fn test_with_store_basic() {
1761 use grafeo_core::graph::lpg::LpgStore;
1762
1763 let store = Arc::new(LpgStore::new().unwrap());
1764 let n1 = store.create_node(&["Person"]);
1765 store.set_node_property(n1, "name", "Alix".into());
1766
1767 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1768 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1769
1770 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1771 assert_eq!(result.rows.len(), 1);
1772 }
1773
1774 #[test]
1775 fn test_with_store_session() {
1776 use grafeo_core::graph::lpg::LpgStore;
1777
1778 let store = Arc::new(LpgStore::new().unwrap());
1779 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1780 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1781
1782 let session = db.session();
1783 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1784 assert_eq!(result.rows.len(), 1);
1785 }
1786
1787 #[test]
1788 fn test_with_store_mutations() {
1789 use grafeo_core::graph::lpg::LpgStore;
1790
1791 let store = Arc::new(LpgStore::new().unwrap());
1792 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1793 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1794
1795 let mut session = db.session();
1796
1797 session.begin_transaction().unwrap();
1801 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1802
1803 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1804 assert_eq!(result.rows.len(), 1);
1805
1806 session.commit().unwrap();
1807 }
1808}