1#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(feature = "lpg")]
31mod crud;
32#[cfg(feature = "embed")]
33mod embed;
34#[cfg(feature = "grafeo-file")]
35pub(crate) mod flush;
36#[cfg(feature = "lpg")]
37mod import;
38#[cfg(feature = "lpg")]
39mod index;
40#[cfg(feature = "lpg")]
41mod persistence;
42mod query;
43#[cfg(feature = "triple-store")]
44mod rdf_ops;
45#[cfg(feature = "lpg")]
46mod search;
47pub(crate) mod section_consumer;
48#[cfg(all(feature = "wal", feature = "lpg"))]
49pub(crate) mod wal_store;
50
51use grafeo_common::{grafeo_error, grafeo_warn};
52#[cfg(feature = "wal")]
53use std::path::Path;
54use std::sync::Arc;
55use std::sync::atomic::AtomicUsize;
56
57use parking_lot::RwLock;
58
59use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
60use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
61#[cfg(feature = "lpg")]
62use grafeo_core::graph::lpg::LpgStore;
63#[cfg(feature = "triple-store")]
64use grafeo_core::graph::rdf::RdfStore;
65use grafeo_core::graph::{GraphStore, GraphStoreMut};
66#[cfg(feature = "grafeo-file")]
67use grafeo_storage::file::GrafeoFileManager;
68#[cfg(all(feature = "wal", feature = "lpg"))]
69use grafeo_storage::wal::WalRecovery;
70#[cfg(feature = "wal")]
71use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
72
73use crate::catalog::Catalog;
74use crate::config::Config;
75use crate::query::cache::QueryCache;
76use crate::session::Session;
77use crate::transaction::TransactionManager;
78
79pub struct GrafeoDB {
102 pub(super) config: Config,
104 #[cfg(feature = "lpg")]
106 pub(super) store: Option<Arc<LpgStore>>,
107 pub(super) catalog: Arc<Catalog>,
109 #[cfg(feature = "triple-store")]
111 pub(super) rdf_store: Arc<RdfStore>,
112 pub(super) transaction_manager: Arc<TransactionManager>,
114 pub(super) buffer_manager: Arc<BufferManager>,
116 #[cfg(feature = "wal")]
118 pub(super) wal: Option<Arc<LpgWal>>,
119 #[cfg(feature = "wal")]
123 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
124 pub(super) query_cache: Arc<QueryCache>,
126 pub(super) commit_counter: Arc<AtomicUsize>,
128 pub(super) is_open: RwLock<bool>,
130 #[cfg(feature = "cdc")]
132 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
133 #[cfg(feature = "cdc")]
135 cdc_enabled: std::sync::atomic::AtomicBool,
136 #[cfg(feature = "embed")]
138 pub(super) embedding_models:
139 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
140 #[cfg(feature = "grafeo-file")]
142 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
143 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
146 checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
147 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
150 vector_spill_storages: Option<
151 Arc<
152 parking_lot::RwLock<
153 std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
154 >,
155 >,
156 >,
157 pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
160 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
163 #[cfg(feature = "metrics")]
165 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
166 current_graph: RwLock<Option<String>>,
170 current_schema: RwLock<Option<String>>,
174 read_only: bool,
177}
178
179impl GrafeoDB {
180 #[cfg(feature = "lpg")]
188 fn lpg_store(&self) -> &Arc<LpgStore> {
189 self.store.as_ref().expect(
190 "no built-in LpgStore: this GrafeoDB was created with an external store \
191 (with_store / with_read_store). Use session() or graph_store() instead.",
192 )
193 }
194
195 #[cfg(feature = "cdc")]
197 #[inline]
198 pub(super) fn cdc_active(&self) -> bool {
199 self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
200 }
201
202 #[must_use]
223 pub fn new_in_memory() -> Self {
224 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
225 }
226
227 #[cfg(feature = "wal")]
246 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
247 Self::with_config(Config::persistent(path.as_ref()))
248 }
249
250 #[cfg(feature = "grafeo-file")]
275 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
276 Self::with_config(Config::read_only(path.as_ref()))
277 }
278
279 pub fn with_config(config: Config) -> Result<Self> {
303 config
305 .validate()
306 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
307
308 #[cfg(feature = "lpg")]
309 let store = Arc::new(LpgStore::new()?);
310 #[cfg(feature = "triple-store")]
311 let rdf_store = Arc::new(RdfStore::new());
312 let transaction_manager = Arc::new(TransactionManager::new());
313
314 let buffer_config = BufferManagerConfig {
316 budget: config.memory_limit.unwrap_or_else(|| {
317 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
318 }),
319 spill_path: config.spill_path.clone().or_else(|| {
320 config.path.as_ref().and_then(|p| {
321 let parent = p.parent()?;
322 let name = p.file_name()?.to_str()?;
323 Some(parent.join(format!("{name}.spill")))
324 })
325 }),
326 ..BufferManagerConfig::default()
327 };
328 let buffer_manager = BufferManager::new(buffer_config);
329
330 let catalog = Arc::new(Catalog::new());
332
333 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
334
335 #[cfg(feature = "grafeo-file")]
337 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
338 if let Some(ref db_path) = config.path {
340 if db_path.exists() && db_path.is_file() {
341 let fm = GrafeoFileManager::open_read_only(db_path)?;
342 #[cfg(feature = "lpg")]
344 if fm.read_section_directory()?.is_some() {
345 Self::load_from_sections(
346 &fm,
347 &store,
348 &catalog,
349 #[cfg(feature = "triple-store")]
350 &rdf_store,
351 )?;
352 } else {
353 let snapshot_data = fm.read_snapshot()?;
355 if !snapshot_data.is_empty() {
356 Self::apply_snapshot_data(
357 &store,
358 &catalog,
359 #[cfg(feature = "triple-store")]
360 &rdf_store,
361 &snapshot_data,
362 )?;
363 }
364 }
365 Some(Arc::new(fm))
366 } else {
367 return Err(grafeo_common::utils::error::Error::Internal(format!(
368 "read-only open requires an existing .grafeo file: {}",
369 db_path.display()
370 )));
371 }
372 } else {
373 return Err(grafeo_common::utils::error::Error::Internal(
374 "read-only mode requires a database path".to_string(),
375 ));
376 }
377 } else if let Some(ref db_path) = config.path {
378 if Self::should_use_single_file(db_path, config.storage_format) {
383 let fm = if db_path.exists() && db_path.is_file() {
384 GrafeoFileManager::open(db_path)?
385 } else if !db_path.exists() {
386 GrafeoFileManager::create(db_path)?
387 } else {
388 return Err(grafeo_common::utils::error::Error::Internal(format!(
390 "path exists but is not a file: {}",
391 db_path.display()
392 )));
393 };
394
395 #[cfg(feature = "lpg")]
397 if fm.read_section_directory()?.is_some() {
398 Self::load_from_sections(
399 &fm,
400 &store,
401 &catalog,
402 #[cfg(feature = "triple-store")]
403 &rdf_store,
404 )?;
405 } else {
406 let snapshot_data = fm.read_snapshot()?;
407 if !snapshot_data.is_empty() {
408 Self::apply_snapshot_data(
409 &store,
410 &catalog,
411 #[cfg(feature = "triple-store")]
412 &rdf_store,
413 &snapshot_data,
414 )?;
415 }
416 }
417
418 #[cfg(all(feature = "wal", feature = "lpg"))]
420 if config.wal_enabled && fm.has_sidecar_wal() {
421 let recovery = WalRecovery::new(fm.sidecar_wal_path());
422 let records = recovery.recover()?;
423 Self::apply_wal_records(
424 &store,
425 &catalog,
426 #[cfg(feature = "triple-store")]
427 &rdf_store,
428 &records,
429 )?;
430 }
431
432 Some(Arc::new(fm))
433 } else {
434 None
435 }
436 } else {
437 None
438 };
439
440 #[cfg(feature = "wal")]
443 let wal = if is_read_only {
444 None
445 } else if config.wal_enabled {
446 if let Some(ref db_path) = config.path {
447 #[cfg(feature = "grafeo-file")]
449 let wal_path = if let Some(ref fm) = file_manager {
450 let p = fm.sidecar_wal_path();
451 std::fs::create_dir_all(&p)?;
452 p
453 } else {
454 std::fs::create_dir_all(db_path)?;
456 db_path.join("wal")
457 };
458
459 #[cfg(not(feature = "grafeo-file"))]
460 let wal_path = {
461 std::fs::create_dir_all(db_path)?;
462 db_path.join("wal")
463 };
464
465 #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
467 let is_single_file = file_manager.is_some();
468 #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
469 let is_single_file = false;
470
471 #[cfg(feature = "lpg")]
472 if !is_single_file && wal_path.exists() {
473 let recovery = WalRecovery::new(&wal_path);
474 let records = recovery.recover()?;
475 Self::apply_wal_records(
476 &store,
477 &catalog,
478 #[cfg(feature = "triple-store")]
479 &rdf_store,
480 &records,
481 )?;
482 }
483
484 let wal_durability = match config.wal_durability {
486 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
487 crate::config::DurabilityMode::Batch {
488 max_delay_ms,
489 max_records,
490 } => WalDurabilityMode::Batch {
491 max_delay_ms,
492 max_records,
493 },
494 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
495 WalDurabilityMode::Adaptive { target_interval_ms }
496 }
497 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
498 };
499 let wal_config = WalConfig {
500 durability: wal_durability,
501 ..WalConfig::default()
502 };
503 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
504 Some(Arc::new(wal_manager))
505 } else {
506 None
507 }
508 } else {
509 None
510 };
511
512 let query_cache = Arc::new(QueryCache::default());
514
515 #[cfg(all(feature = "temporal", feature = "lpg"))]
518 transaction_manager.sync_epoch(store.current_epoch());
519
520 #[cfg(feature = "cdc")]
521 let cdc_enabled_val = config.cdc_enabled;
522 #[cfg(feature = "cdc")]
523 let cdc_retention = config.cdc_retention.clone();
524
525 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
528 let checkpoint_interval = config.checkpoint_interval;
529 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
530 let timer_store = Arc::clone(&store);
531 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
532 let timer_catalog = Arc::clone(&catalog);
533 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
534 let timer_tm = Arc::clone(&transaction_manager);
535 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
536 let timer_rdf = Arc::clone(&rdf_store);
537 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
538 let timer_wal = wal.clone();
539
540 let mut db = Self {
541 config,
542 #[cfg(feature = "lpg")]
543 store: Some(store),
544 catalog,
545 #[cfg(feature = "triple-store")]
546 rdf_store,
547 transaction_manager,
548 buffer_manager,
549 #[cfg(feature = "wal")]
550 wal,
551 #[cfg(feature = "wal")]
552 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
553 query_cache,
554 commit_counter: Arc::new(AtomicUsize::new(0)),
555 is_open: RwLock::new(true),
556 #[cfg(feature = "cdc")]
557 cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
558 #[cfg(feature = "cdc")]
559 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
560 #[cfg(feature = "embed")]
561 embedding_models: RwLock::new(hashbrown::HashMap::new()),
562 #[cfg(feature = "grafeo-file")]
563 file_manager,
564 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
565 checkpoint_timer: parking_lot::Mutex::new(None),
566 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
567 vector_spill_storages: None,
568 external_read_store: None,
569 external_write_store: None,
570 #[cfg(feature = "metrics")]
571 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
572 current_graph: RwLock::new(None),
573 current_schema: RwLock::new(None),
574 read_only: is_read_only,
575 };
576
577 db.register_section_consumers();
579
580 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
582 if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
583 && !is_read_only
584 {
585 *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
586 interval,
587 Arc::clone(fm),
588 timer_store,
589 timer_catalog,
590 timer_tm,
591 #[cfg(feature = "triple-store")]
592 timer_rdf,
593 #[cfg(feature = "wal")]
594 timer_wal,
595 ));
596 }
597
598 #[cfg(all(
602 feature = "lpg",
603 feature = "vector-index",
604 feature = "mmap",
605 not(feature = "temporal")
606 ))]
607 db.restore_spill_files();
608
609 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
612 if db
613 .config
614 .section_configs
615 .get(&grafeo_common::storage::SectionType::VectorStore)
616 .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
617 {
618 db.buffer_manager.spill_all();
619 }
620
621 Ok(db)
622 }
623
624 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
653 config
654 .validate()
655 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
656
657 let transaction_manager = Arc::new(TransactionManager::new());
658
659 let buffer_config = BufferManagerConfig {
660 budget: config.memory_limit.unwrap_or_else(|| {
661 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
662 }),
663 spill_path: None,
664 ..BufferManagerConfig::default()
665 };
666 let buffer_manager = BufferManager::new(buffer_config);
667
668 let query_cache = Arc::new(QueryCache::default());
669
670 #[cfg(feature = "cdc")]
671 let cdc_enabled_val = config.cdc_enabled;
672
673 Ok(Self {
674 config,
675 #[cfg(feature = "lpg")]
676 store: None,
677 catalog: Arc::new(Catalog::new()),
678 #[cfg(feature = "triple-store")]
679 rdf_store: Arc::new(RdfStore::new()),
680 transaction_manager,
681 buffer_manager,
682 #[cfg(feature = "wal")]
683 wal: None,
684 #[cfg(feature = "wal")]
685 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
686 query_cache,
687 commit_counter: Arc::new(AtomicUsize::new(0)),
688 is_open: RwLock::new(true),
689 #[cfg(feature = "cdc")]
690 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
691 #[cfg(feature = "cdc")]
692 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
693 #[cfg(feature = "embed")]
694 embedding_models: RwLock::new(hashbrown::HashMap::new()),
695 #[cfg(feature = "grafeo-file")]
696 file_manager: None,
697 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
698 checkpoint_timer: parking_lot::Mutex::new(None),
699 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
700 vector_spill_storages: None,
701 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
702 external_write_store: Some(store),
703 #[cfg(feature = "metrics")]
704 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
705 current_graph: RwLock::new(None),
706 current_schema: RwLock::new(None),
707 read_only: false,
708 })
709 }
710
711 pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
736 config
737 .validate()
738 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
739
740 let transaction_manager = Arc::new(TransactionManager::new());
741
742 let buffer_config = BufferManagerConfig {
743 budget: config.memory_limit.unwrap_or_else(|| {
744 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
745 }),
746 spill_path: None,
747 ..BufferManagerConfig::default()
748 };
749 let buffer_manager = BufferManager::new(buffer_config);
750
751 let query_cache = Arc::new(QueryCache::default());
752
753 #[cfg(feature = "cdc")]
754 let cdc_enabled_val = config.cdc_enabled;
755
756 Ok(Self {
757 config,
758 #[cfg(feature = "lpg")]
759 store: None,
760 catalog: Arc::new(Catalog::new()),
761 #[cfg(feature = "triple-store")]
762 rdf_store: Arc::new(RdfStore::new()),
763 transaction_manager,
764 buffer_manager,
765 #[cfg(feature = "wal")]
766 wal: None,
767 #[cfg(feature = "wal")]
768 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
769 query_cache,
770 commit_counter: Arc::new(AtomicUsize::new(0)),
771 is_open: RwLock::new(true),
772 #[cfg(feature = "cdc")]
773 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
774 #[cfg(feature = "cdc")]
775 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
776 #[cfg(feature = "embed")]
777 embedding_models: RwLock::new(hashbrown::HashMap::new()),
778 #[cfg(feature = "grafeo-file")]
779 file_manager: None,
780 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
781 checkpoint_timer: parking_lot::Mutex::new(None),
782 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
783 vector_spill_storages: None,
784 external_read_store: Some(store),
785 external_write_store: None,
786 #[cfg(feature = "metrics")]
787 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
788 current_graph: RwLock::new(None),
789 current_schema: RwLock::new(None),
790 read_only: true,
791 })
792 }
793
794 #[cfg(feature = "compact-store")]
812 pub fn compact(&mut self) -> Result<()> {
813 use grafeo_core::graph::compact::from_graph_store;
814
815 let current_store = self.graph_store();
816 let compact = from_graph_store(current_store.as_ref())
817 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
818
819 self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
820 self.external_write_store = None;
821 #[cfg(feature = "lpg")]
822 {
823 self.store = None;
824 }
825 self.read_only = true;
826 self.query_cache = Arc::new(QueryCache::default());
827
828 Ok(())
829 }
830
831 #[cfg(all(feature = "wal", feature = "lpg"))]
837 fn apply_wal_records(
838 store: &Arc<LpgStore>,
839 catalog: &Catalog,
840 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
841 records: &[WalRecord],
842 ) -> Result<()> {
843 use crate::catalog::{
844 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
845 };
846 use grafeo_common::utils::error::Error;
847
848 let mut current_graph: Option<String> = None;
851 let mut target_store: Arc<LpgStore> = Arc::clone(store);
852
853 for record in records {
854 match record {
855 WalRecord::CreateNamedGraph { name } => {
857 let _ = store.create_graph(name);
858 }
859 WalRecord::DropNamedGraph { name } => {
860 store.drop_graph(name);
861 if current_graph.as_deref() == Some(name.as_str()) {
863 current_graph = None;
864 target_store = Arc::clone(store);
865 }
866 }
867 WalRecord::SwitchGraph { name } => {
868 current_graph.clone_from(name);
869 target_store = match ¤t_graph {
870 None => Arc::clone(store),
871 Some(graph_name) => store
872 .graph_or_create(graph_name)
873 .map_err(|e| Error::Internal(e.to_string()))?,
874 };
875 }
876
877 WalRecord::CreateNode { id, labels } => {
879 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
880 target_store.create_node_with_id(*id, &label_refs)?;
881 }
882 WalRecord::DeleteNode { id } => {
883 target_store.delete_node(*id);
884 }
885 WalRecord::CreateEdge {
886 id,
887 src,
888 dst,
889 edge_type,
890 } => {
891 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
892 }
893 WalRecord::DeleteEdge { id } => {
894 target_store.delete_edge(*id);
895 }
896 WalRecord::SetNodeProperty { id, key, value } => {
897 target_store.set_node_property(*id, key, value.clone());
898 }
899 WalRecord::SetEdgeProperty { id, key, value } => {
900 target_store.set_edge_property(*id, key, value.clone());
901 }
902 WalRecord::AddNodeLabel { id, label } => {
903 target_store.add_label(*id, label);
904 }
905 WalRecord::RemoveNodeLabel { id, label } => {
906 target_store.remove_label(*id, label);
907 }
908 WalRecord::RemoveNodeProperty { id, key } => {
909 target_store.remove_node_property(*id, key);
910 }
911 WalRecord::RemoveEdgeProperty { id, key } => {
912 target_store.remove_edge_property(*id, key);
913 }
914
915 WalRecord::CreateNodeType {
917 name,
918 properties,
919 constraints,
920 } => {
921 let def = NodeTypeDefinition {
922 name: name.clone(),
923 properties: properties
924 .iter()
925 .map(|(n, t, nullable)| TypedProperty {
926 name: n.clone(),
927 data_type: PropertyDataType::from_type_name(t),
928 nullable: *nullable,
929 default_value: None,
930 })
931 .collect(),
932 constraints: constraints
933 .iter()
934 .map(|(kind, props)| match kind.as_str() {
935 "unique" => TypeConstraint::Unique(props.clone()),
936 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
937 "not_null" if !props.is_empty() => {
938 TypeConstraint::NotNull(props[0].clone())
939 }
940 _ => TypeConstraint::Unique(props.clone()),
941 })
942 .collect(),
943 parent_types: Vec::new(),
944 };
945 let _ = catalog.register_node_type(def);
946 }
947 WalRecord::DropNodeType { name } => {
948 let _ = catalog.drop_node_type(name);
949 }
950 WalRecord::CreateEdgeType {
951 name,
952 properties,
953 constraints,
954 } => {
955 let def = EdgeTypeDefinition {
956 name: name.clone(),
957 properties: properties
958 .iter()
959 .map(|(n, t, nullable)| TypedProperty {
960 name: n.clone(),
961 data_type: PropertyDataType::from_type_name(t),
962 nullable: *nullable,
963 default_value: None,
964 })
965 .collect(),
966 constraints: constraints
967 .iter()
968 .map(|(kind, props)| match kind.as_str() {
969 "unique" => TypeConstraint::Unique(props.clone()),
970 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
971 "not_null" if !props.is_empty() => {
972 TypeConstraint::NotNull(props[0].clone())
973 }
974 _ => TypeConstraint::Unique(props.clone()),
975 })
976 .collect(),
977 source_node_types: Vec::new(),
978 target_node_types: Vec::new(),
979 };
980 let _ = catalog.register_edge_type_def(def);
981 }
982 WalRecord::DropEdgeType { name } => {
983 let _ = catalog.drop_edge_type_def(name);
984 }
985 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
986 }
989 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
990 }
993 WalRecord::CreateGraphType {
994 name,
995 node_types,
996 edge_types,
997 open,
998 } => {
999 use crate::catalog::GraphTypeDefinition;
1000 let def = GraphTypeDefinition {
1001 name: name.clone(),
1002 allowed_node_types: node_types.clone(),
1003 allowed_edge_types: edge_types.clone(),
1004 open: *open,
1005 };
1006 let _ = catalog.register_graph_type(def);
1007 }
1008 WalRecord::DropGraphType { name } => {
1009 let _ = catalog.drop_graph_type(name);
1010 }
1011 WalRecord::CreateSchema { name } => {
1012 let _ = catalog.register_schema_namespace(name.clone());
1013 }
1014 WalRecord::DropSchema { name } => {
1015 let _ = catalog.drop_schema_namespace(name);
1016 }
1017
1018 WalRecord::AlterNodeType { name, alterations } => {
1019 for (action, prop_name, type_name, nullable) in alterations {
1020 match action.as_str() {
1021 "add" => {
1022 let prop = TypedProperty {
1023 name: prop_name.clone(),
1024 data_type: PropertyDataType::from_type_name(type_name),
1025 nullable: *nullable,
1026 default_value: None,
1027 };
1028 let _ = catalog.alter_node_type_add_property(name, prop);
1029 }
1030 "drop" => {
1031 let _ = catalog.alter_node_type_drop_property(name, prop_name);
1032 }
1033 _ => {}
1034 }
1035 }
1036 }
1037 WalRecord::AlterEdgeType { name, alterations } => {
1038 for (action, prop_name, type_name, nullable) in alterations {
1039 match action.as_str() {
1040 "add" => {
1041 let prop = TypedProperty {
1042 name: prop_name.clone(),
1043 data_type: PropertyDataType::from_type_name(type_name),
1044 nullable: *nullable,
1045 default_value: None,
1046 };
1047 let _ = catalog.alter_edge_type_add_property(name, prop);
1048 }
1049 "drop" => {
1050 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1051 }
1052 _ => {}
1053 }
1054 }
1055 }
1056 WalRecord::AlterGraphType { name, alterations } => {
1057 for (action, type_name) in alterations {
1058 match action.as_str() {
1059 "add_node" => {
1060 let _ =
1061 catalog.alter_graph_type_add_node_type(name, type_name.clone());
1062 }
1063 "drop_node" => {
1064 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1065 }
1066 "add_edge" => {
1067 let _ =
1068 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1069 }
1070 "drop_edge" => {
1071 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1072 }
1073 _ => {}
1074 }
1075 }
1076 }
1077
1078 WalRecord::CreateProcedure {
1079 name,
1080 params,
1081 returns,
1082 body,
1083 } => {
1084 use crate::catalog::ProcedureDefinition;
1085 let def = ProcedureDefinition {
1086 name: name.clone(),
1087 params: params.clone(),
1088 returns: returns.clone(),
1089 body: body.clone(),
1090 };
1091 let _ = catalog.register_procedure(def);
1092 }
1093 WalRecord::DropProcedure { name } => {
1094 let _ = catalog.drop_procedure(name);
1095 }
1096
1097 #[cfg(feature = "triple-store")]
1099 WalRecord::InsertRdfTriple { .. }
1100 | WalRecord::DeleteRdfTriple { .. }
1101 | WalRecord::ClearRdfGraph { .. }
1102 | WalRecord::CreateRdfGraph { .. }
1103 | WalRecord::DropRdfGraph { .. } => {
1104 rdf_ops::replay_rdf_wal_record(rdf_store, record);
1105 }
1106 #[cfg(not(feature = "triple-store"))]
1107 WalRecord::InsertRdfTriple { .. }
1108 | WalRecord::DeleteRdfTriple { .. }
1109 | WalRecord::ClearRdfGraph { .. }
1110 | WalRecord::CreateRdfGraph { .. }
1111 | WalRecord::DropRdfGraph { .. } => {}
1112
1113 WalRecord::TransactionCommit { .. } => {
1114 #[cfg(feature = "temporal")]
1118 {
1119 target_store.new_epoch();
1120 }
1121 }
1122 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1123 }
1126 WalRecord::EpochAdvance { .. } => {
1127 }
1130 }
1131 }
1132 Ok(())
1133 }
1134
1135 #[cfg(feature = "grafeo-file")]
1141 fn should_use_single_file(
1142 path: &std::path::Path,
1143 configured: crate::config::StorageFormat,
1144 ) -> bool {
1145 use crate::config::StorageFormat;
1146 match configured {
1147 StorageFormat::SingleFile => true,
1148 StorageFormat::WalDirectory => false,
1149 StorageFormat::Auto => {
1150 if path.is_file() {
1152 if let Ok(mut f) = std::fs::File::open(path) {
1153 use std::io::Read;
1154 let mut magic = [0u8; 4];
1155 if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1156 {
1157 return true;
1158 }
1159 }
1160 return false;
1161 }
1162 if path.is_dir() {
1164 return false;
1165 }
1166 path.extension().is_some_and(|ext| ext == "grafeo")
1168 }
1169 }
1170 }
1171
1172 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1176 fn apply_snapshot_data(
1177 store: &Arc<LpgStore>,
1178 catalog: &Arc<crate::catalog::Catalog>,
1179 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1180 data: &[u8],
1181 ) -> Result<()> {
1182 persistence::load_snapshot_into_store(
1184 store,
1185 catalog,
1186 #[cfg(feature = "triple-store")]
1187 rdf_store,
1188 data,
1189 )
1190 }
1191
1192 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1196 fn load_from_sections(
1197 fm: &GrafeoFileManager,
1198 store: &Arc<LpgStore>,
1199 catalog: &Arc<crate::catalog::Catalog>,
1200 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1201 ) -> Result<()> {
1202 use grafeo_common::storage::{Section, SectionType};
1203
1204 let dir = fm.read_section_directory()?.ok_or_else(|| {
1205 grafeo_common::utils::error::Error::Internal(
1206 "expected v2 section directory but found none".to_string(),
1207 )
1208 })?;
1209
1210 if let Some(entry) = dir.find(SectionType::Catalog) {
1212 let data = fm.read_section_data(entry)?;
1213 let tm = Arc::new(crate::transaction::TransactionManager::new());
1214 let mut section = catalog_section::CatalogSection::new(
1215 Arc::clone(catalog),
1216 Arc::clone(store),
1217 move || tm.current_epoch().as_u64(),
1218 );
1219 section.deserialize(&data)?;
1220 }
1221
1222 if let Some(entry) = dir.find(SectionType::LpgStore) {
1224 let data = fm.read_section_data(entry)?;
1225 let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1226 section.deserialize(&data)?;
1227 }
1228
1229 #[cfg(feature = "triple-store")]
1231 if let Some(entry) = dir.find(SectionType::RdfStore) {
1232 let data = fm.read_section_data(entry)?;
1233 let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1234 section.deserialize(&data)?;
1235 }
1236
1237 #[cfg(feature = "vector-index")]
1239 if let Some(entry) = dir.find(SectionType::VectorStore) {
1240 let data = fm.read_section_data(entry)?;
1241 let indexes = store.vector_index_entries();
1242 if !indexes.is_empty() {
1243 let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1244 section.deserialize(&data)?;
1245 }
1246 }
1247
1248 #[cfg(feature = "text-index")]
1250 if let Some(entry) = dir.find(SectionType::TextIndex) {
1251 let data = fm.read_section_data(entry)?;
1252 let indexes = store.text_index_entries();
1253 if !indexes.is_empty() {
1254 let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1255 section.deserialize(&data)?;
1256 }
1257 }
1258
1259 Ok(())
1260 }
1261
1262 #[must_use]
1290 pub fn session(&self) -> Session {
1291 self.create_session_inner(None)
1292 }
1293
1294 #[cfg(feature = "cdc")]
1313 #[must_use]
1314 pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1315 self.create_session_inner(Some(cdc_enabled))
1316 }
1317
1318 #[must_use]
1325 pub fn session_read_only(&self) -> Session {
1326 self.create_session_inner_opts(None, true)
1327 }
1328
1329 #[allow(unused_variables)] fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1335 self.create_session_inner_opts(cdc_override, false)
1336 }
1337
1338 #[allow(unused_variables)]
1340 fn create_session_inner_opts(
1341 &self,
1342 cdc_override: Option<bool>,
1343 force_read_only: bool,
1344 ) -> Session {
1345 let session_cfg = || crate::session::SessionConfig {
1346 transaction_manager: Arc::clone(&self.transaction_manager),
1347 query_cache: Arc::clone(&self.query_cache),
1348 catalog: Arc::clone(&self.catalog),
1349 adaptive_config: self.config.adaptive.clone(),
1350 factorized_execution: self.config.factorized_execution,
1351 graph_model: self.config.graph_model,
1352 query_timeout: self.config.query_timeout,
1353 commit_counter: Arc::clone(&self.commit_counter),
1354 gc_interval: self.config.gc_interval,
1355 read_only: self.read_only || force_read_only,
1356 };
1357
1358 if let Some(ref ext_read) = self.external_read_store {
1359 return Session::with_external_store(
1360 Arc::clone(ext_read),
1361 self.external_write_store.as_ref().map(Arc::clone),
1362 session_cfg(),
1363 )
1364 .expect("arena allocation for external store session");
1365 }
1366
1367 #[cfg(all(feature = "lpg", feature = "triple-store"))]
1368 let mut session = Session::with_rdf_store_and_adaptive(
1369 Arc::clone(self.lpg_store()),
1370 Arc::clone(&self.rdf_store),
1371 session_cfg(),
1372 );
1373 #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1374 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1375 #[cfg(not(feature = "lpg"))]
1376 let mut session =
1377 Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1378 .expect("session creation for non-lpg build");
1379
1380 #[cfg(all(feature = "wal", feature = "lpg"))]
1381 if let Some(ref wal) = self.wal {
1382 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1383 }
1384
1385 #[cfg(feature = "cdc")]
1386 {
1387 let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1388 if should_enable {
1389 session.set_cdc_log(Arc::clone(&self.cdc_log));
1390 }
1391 }
1392
1393 #[cfg(feature = "metrics")]
1394 {
1395 if let Some(ref m) = self.metrics {
1396 session.set_metrics(Arc::clone(m));
1397 m.session_created
1398 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1399 m.session_active
1400 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1401 }
1402 }
1403
1404 if let Some(ref graph) = *self.current_graph.read() {
1406 session.use_graph(graph);
1407 }
1408
1409 if let Some(ref schema) = *self.current_schema.read() {
1411 session.set_schema(schema);
1412 }
1413
1414 let _ = &mut session;
1416
1417 session
1418 }
1419
1420 #[must_use]
1426 pub fn current_graph(&self) -> Option<String> {
1427 self.current_graph.read().clone()
1428 }
1429
1430 pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1439 #[cfg(feature = "lpg")]
1440 if let Some(name) = name
1441 && !name.eq_ignore_ascii_case("default")
1442 && let Some(store) = &self.store
1443 && store.graph(name).is_none()
1444 {
1445 return Err(Error::Query(QueryError::new(
1446 QueryErrorKind::Semantic,
1447 format!("Graph '{name}' does not exist"),
1448 )));
1449 }
1450 *self.current_graph.write() = name.map(ToString::to_string);
1451 Ok(())
1452 }
1453
1454 #[must_use]
1459 pub fn current_schema(&self) -> Option<String> {
1460 self.current_schema.read().clone()
1461 }
1462
1463 pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1472 if let Some(name) = name
1473 && !self.catalog.schema_exists(name)
1474 {
1475 return Err(Error::Query(QueryError::new(
1476 QueryErrorKind::Semantic,
1477 format!("Schema '{name}' does not exist"),
1478 )));
1479 }
1480 *self.current_schema.write() = name.map(ToString::to_string);
1481 Ok(())
1482 }
1483
1484 #[must_use]
1486 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1487 &self.config.adaptive
1488 }
1489
1490 #[must_use]
1492 pub fn is_read_only(&self) -> bool {
1493 self.read_only
1494 }
1495
1496 #[must_use]
1498 pub fn config(&self) -> &Config {
1499 &self.config
1500 }
1501
1502 #[must_use]
1504 pub fn graph_model(&self) -> crate::config::GraphModel {
1505 self.config.graph_model
1506 }
1507
1508 #[must_use]
1510 pub fn memory_limit(&self) -> Option<usize> {
1511 self.config.memory_limit
1512 }
1513
1514 #[cfg(feature = "metrics")]
1519 #[must_use]
1520 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1521 let mut snapshot = self
1522 .metrics
1523 .as_ref()
1524 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1525
1526 let cache_stats = self.query_cache.stats();
1528 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1529 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1530 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1531 snapshot.cache_invalidations = cache_stats.invalidations;
1532
1533 snapshot
1534 }
1535
1536 #[cfg(feature = "metrics")]
1540 #[must_use]
1541 pub fn metrics_prometheus(&self) -> String {
1542 self.metrics
1543 .as_ref()
1544 .map_or_else(String::new, |m| m.to_prometheus())
1545 }
1546
1547 #[cfg(feature = "metrics")]
1549 pub fn reset_metrics(&self) {
1550 if let Some(ref m) = self.metrics {
1551 m.reset();
1552 }
1553 self.query_cache.reset_stats();
1554 }
1555
1556 #[cfg(feature = "lpg")]
1564 #[must_use]
1565 pub fn store(&self) -> &Arc<LpgStore> {
1566 self.lpg_store()
1567 }
1568
1569 #[cfg(feature = "lpg")]
1577 pub fn create_graph(&self, name: &str) -> Result<bool> {
1578 Ok(self.lpg_store().create_graph(name)?)
1579 }
1580
1581 #[cfg(feature = "lpg")]
1586 pub fn drop_graph(&self, name: &str) -> bool {
1587 let Some(store) = &self.store else {
1588 return false;
1589 };
1590 let dropped = store.drop_graph(name);
1591 if dropped {
1592 let mut current = self.current_graph.write();
1593 if current
1594 .as_deref()
1595 .is_some_and(|g| g.eq_ignore_ascii_case(name))
1596 {
1597 *current = None;
1598 }
1599 }
1600 dropped
1601 }
1602
1603 #[cfg(feature = "lpg")]
1605 #[must_use]
1606 pub fn list_graphs(&self) -> Vec<String> {
1607 self.lpg_store().graph_names()
1608 }
1609
1610 #[must_use]
1619 pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1620 if let Some(ref ext_read) = self.external_read_store {
1621 Arc::clone(ext_read)
1622 } else {
1623 #[cfg(feature = "lpg")]
1624 {
1625 Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1626 }
1627 #[cfg(not(feature = "lpg"))]
1628 unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1629 }
1630 }
1631
1632 #[must_use]
1637 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1638 if self.external_read_store.is_some() {
1639 self.external_write_store.as_ref().map(Arc::clone)
1640 } else {
1641 #[cfg(feature = "lpg")]
1642 {
1643 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1644 }
1645 #[cfg(not(feature = "lpg"))]
1646 {
1647 None
1648 }
1649 }
1650 }
1651
1652 pub fn gc(&self) {
1659 #[cfg(feature = "lpg")]
1660 let current_epoch = {
1661 let min_epoch = self.transaction_manager.min_active_epoch();
1662 self.lpg_store().gc_versions(min_epoch);
1663 self.transaction_manager.current_epoch()
1664 };
1665 self.transaction_manager.gc();
1666
1667 #[cfg(feature = "cdc")]
1669 if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
1670 #[cfg(feature = "lpg")]
1671 self.cdc_log.apply_retention(current_epoch);
1672 }
1673 }
1674
1675 #[must_use]
1677 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1678 &self.buffer_manager
1679 }
1680
1681 #[must_use]
1683 pub fn query_cache(&self) -> &Arc<QueryCache> {
1684 &self.query_cache
1685 }
1686
1687 pub fn clear_plan_cache(&self) {
1693 self.query_cache.clear();
1694 }
1695
1696 pub fn close(&self) -> Result<()> {
1710 let mut is_open = self.is_open.write();
1711 if !*is_open {
1712 return Ok(());
1713 }
1714
1715 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1720 if let Some(mut timer) = self.checkpoint_timer.lock().take() {
1721 timer.stop();
1722 }
1723
1724 if self.read_only {
1726 #[cfg(feature = "grafeo-file")]
1727 if let Some(ref fm) = self.file_manager {
1728 fm.close()?;
1729 }
1730 *is_open = false;
1731 return Ok(());
1732 }
1733
1734 #[cfg(feature = "grafeo-file")]
1738 let is_single_file = self.file_manager.is_some();
1739 #[cfg(not(feature = "grafeo-file"))]
1740 let is_single_file = false;
1741
1742 #[cfg(feature = "grafeo-file")]
1743 if let Some(ref fm) = self.file_manager {
1744 #[cfg(feature = "wal")]
1746 if let Some(ref wal) = self.wal {
1747 wal.sync()?;
1748 }
1749 let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
1750
1751 #[cfg(feature = "wal")]
1757 let flush_result = if flush_result.sections_written == 0 {
1758 if let Some(ref wal) = self.wal {
1759 if wal.record_count() > 0 {
1760 grafeo_warn!(
1761 "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
1762 wal.record_count()
1763 );
1764 self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
1765 } else {
1766 flush_result
1767 }
1768 } else {
1769 flush_result
1770 }
1771 } else {
1772 flush_result
1773 };
1774
1775 #[cfg(feature = "wal")]
1778 if let Some(ref wal) = self.wal {
1779 wal.close_active_log();
1780 }
1781
1782 #[cfg(feature = "wal")]
1786 let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
1787 #[cfg(not(feature = "wal"))]
1788 let has_wal_records = false;
1789
1790 if flush_result.sections_written > 0 || !has_wal_records {
1791 {
1792 use grafeo_common::testing::crash::maybe_crash;
1793 maybe_crash("close:before_remove_sidecar_wal");
1794 }
1795 fm.remove_sidecar_wal()?;
1796 } else {
1797 grafeo_warn!(
1798 "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
1799 );
1800 }
1801 fm.close()?;
1802 }
1803
1804 #[cfg(feature = "wal")]
1810 if !is_single_file && let Some(ref wal) = self.wal {
1811 let commit_tx = self
1813 .transaction_manager
1814 .last_assigned_transaction_id()
1815 .unwrap_or_else(|| self.transaction_manager.begin());
1816
1817 wal.log(&WalRecord::TransactionCommit {
1819 transaction_id: commit_tx,
1820 })?;
1821
1822 wal.sync()?;
1823 }
1824
1825 *is_open = false;
1826 Ok(())
1827 }
1828
1829 #[cfg(feature = "wal")]
1831 #[must_use]
1832 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1833 self.wal.as_ref()
1834 }
1835
1836 #[cfg(feature = "wal")]
1838 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1839 if let Some(ref wal) = self.wal {
1840 wal.log(record)?;
1841 }
1842 Ok(())
1843 }
1844
1845 fn register_section_consumers(&mut self) {
1850 #[cfg(feature = "lpg")]
1851 let store_ref = self.store.as_ref();
1852 #[cfg(not(feature = "lpg"))]
1853 #[cfg(feature = "lpg")]
1855 if let Some(store) = store_ref {
1856 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1857 self.buffer_manager.register_consumer(Arc::new(
1858 section_consumer::SectionConsumer::new(Arc::new(lpg)),
1859 ));
1860 }
1861
1862 #[cfg(feature = "triple-store")]
1864 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
1865 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
1866 self.buffer_manager.register_consumer(Arc::new(
1867 section_consumer::SectionConsumer::new(Arc::new(rdf)),
1868 ));
1869 }
1870
1871 #[cfg(all(
1874 feature = "lpg",
1875 feature = "vector-index",
1876 feature = "mmap",
1877 not(feature = "temporal")
1878 ))]
1879 if let Some(store) = store_ref {
1880 let spill_path = self.buffer_manager.config().spill_path.clone();
1881 let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
1882 store, spill_path,
1883 ));
1884 self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
1886 self.buffer_manager.register_consumer(consumer);
1887 }
1888
1889 #[cfg(all(feature = "lpg", feature = "text-index"))]
1891 if let Some(store) = store_ref {
1892 self.buffer_manager
1893 .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
1894 }
1895
1896 #[cfg(feature = "cdc")]
1899 self.buffer_manager.register_consumer(
1900 Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
1901 );
1902 }
1903
1904 #[cfg(all(
1911 feature = "lpg",
1912 feature = "vector-index",
1913 feature = "mmap",
1914 not(feature = "temporal")
1915 ))]
1916 fn restore_spill_files(&mut self) {
1917 use grafeo_core::index::vector::MmapStorage;
1918
1919 let spill_dir = match self.buffer_manager.config().spill_path {
1920 Some(ref path) => path.clone(),
1921 None => return,
1922 };
1923
1924 if !spill_dir.exists() {
1925 return;
1926 }
1927
1928 let spill_map = match self.vector_spill_storages {
1929 Some(ref map) => Arc::clone(map),
1930 None => return,
1931 };
1932
1933 let Ok(entries) = std::fs::read_dir(&spill_dir) else {
1934 return;
1935 };
1936
1937 let Some(ref store) = self.store else {
1938 return;
1939 };
1940
1941 for entry in entries.flatten() {
1942 let path = entry.path();
1943 let file_name = match path.file_name().and_then(|n| n.to_str()) {
1944 Some(name) => name.to_string(),
1945 None => continue,
1946 };
1947
1948 if !file_name.starts_with("vectors_")
1950 || !std::path::Path::new(&file_name)
1951 .extension()
1952 .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
1953 {
1954 continue;
1955 }
1956
1957 let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
1959
1960 let key = key_part.replace("%3A", ":").replace("%25", "%");
1962
1963 if !key.contains(':') {
1965 continue;
1967 }
1968
1969 if store.get_vector_index_by_key(&key).is_none() {
1971 let _ = std::fs::remove_file(&path);
1973 continue;
1974 }
1975
1976 match MmapStorage::open(&path) {
1978 Ok(mmap_storage) => {
1979 let property = key.split(':').nth(1).unwrap_or("");
1981 let prop_key = grafeo_common::types::PropertyKey::new(property);
1982 store.node_properties_mark_spilled(&prop_key);
1983
1984 spill_map.write().insert(key, Arc::new(mmap_storage));
1985 }
1986 Err(e) => {
1987 eprintln!("failed to restore spill file {}: {e}", path.display());
1988 let _ = std::fs::remove_file(&path);
1990 }
1991 }
1992 }
1993 }
1994
1995 #[cfg(feature = "grafeo-file")]
1997 fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
1998 let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
1999
2000 #[cfg(feature = "lpg")]
2002 if let Some(store) = self.store.as_ref() {
2003 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2004
2005 let catalog = catalog_section::CatalogSection::new(
2006 Arc::clone(&self.catalog),
2007 Arc::clone(store),
2008 {
2009 let tm = Arc::clone(&self.transaction_manager);
2010 move || tm.current_epoch().as_u64()
2011 },
2012 );
2013
2014 sections.push(Box::new(catalog));
2015 sections.push(Box::new(lpg));
2016
2017 #[cfg(feature = "vector-index")]
2019 {
2020 let indexes = store.vector_index_entries();
2021 if !indexes.is_empty() {
2022 let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2023 sections.push(Box::new(vector));
2024 }
2025 }
2026
2027 #[cfg(feature = "text-index")]
2029 {
2030 let indexes = store.text_index_entries();
2031 if !indexes.is_empty() {
2032 let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2033 sections.push(Box::new(text));
2034 }
2035 }
2036 }
2037
2038 #[cfg(feature = "triple-store")]
2039 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2040 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2041 sections.push(Box::new(rdf));
2042 }
2043
2044 sections
2045 }
2046
2047 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2061 pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2062 let fm = self
2063 .file_manager
2064 .as_ref()
2065 .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2066
2067 let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2069
2070 let current_epoch = self.transaction_manager.current_epoch();
2071 backup::do_backup_full(backup_dir, fm.path(), self.wal.as_deref(), current_epoch)
2072 }
2073
2074 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2082 pub fn backup_incremental(
2083 &self,
2084 backup_dir: &std::path::Path,
2085 ) -> Result<backup::BackupSegment> {
2086 let wal = self
2087 .wal
2088 .as_ref()
2089 .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2090
2091 let current_epoch = self.transaction_manager.current_epoch();
2092 backup::do_backup_incremental(backup_dir, wal, current_epoch)
2093 }
2094
2095 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2101 pub fn read_backup_manifest(
2102 backup_dir: &std::path::Path,
2103 ) -> Result<Option<backup::BackupManifest>> {
2104 backup::read_manifest(backup_dir)
2105 }
2106
2107 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2109 #[must_use]
2110 pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2111 self.wal
2112 .as_ref()
2113 .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2114 }
2115
2116 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2127 pub fn restore_to_epoch(
2128 backup_dir: &std::path::Path,
2129 target_epoch: grafeo_common::types::EpochId,
2130 output_path: &std::path::Path,
2131 ) -> Result<()> {
2132 backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2133 }
2134
2135 #[cfg(feature = "grafeo-file")]
2141 fn checkpoint_to_file(
2142 &self,
2143 fm: &GrafeoFileManager,
2144 reason: flush::FlushReason,
2145 ) -> Result<flush::FlushResult> {
2146 let sections = self.build_sections();
2147 let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2148 sections.iter().map(|s| s.as_ref()).collect();
2149 #[cfg(feature = "lpg")]
2150 let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2151 #[cfg(not(feature = "lpg"))]
2152 let context = flush::build_context_minimal(&self.transaction_manager);
2153
2154 flush::flush(
2155 fm,
2156 §ion_refs,
2157 &context,
2158 reason,
2159 #[cfg(feature = "wal")]
2160 self.wal.as_deref(),
2161 )
2162 }
2163
2164 #[cfg(feature = "grafeo-file")]
2166 #[must_use]
2167 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2168 self.file_manager.as_ref()
2169 }
2170}
2171
2172impl Drop for GrafeoDB {
2173 fn drop(&mut self) {
2174 if let Err(e) = self.close() {
2175 grafeo_error!("Error closing database: {}", e);
2176 }
2177 }
2178}
2179
2180#[cfg(feature = "lpg")]
2181impl crate::admin::AdminService for GrafeoDB {
2182 fn info(&self) -> crate::admin::DatabaseInfo {
2183 self.info()
2184 }
2185
2186 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2187 self.detailed_stats()
2188 }
2189
2190 fn schema(&self) -> crate::admin::SchemaInfo {
2191 self.schema()
2192 }
2193
2194 fn validate(&self) -> crate::admin::ValidationResult {
2195 self.validate()
2196 }
2197
2198 fn wal_status(&self) -> crate::admin::WalStatus {
2199 self.wal_status()
2200 }
2201
2202 fn wal_checkpoint(&self) -> Result<()> {
2203 self.wal_checkpoint()
2204 }
2205}
2206
2207#[derive(Debug)]
2237pub struct QueryResult {
2238 pub columns: Vec<String>,
2240 pub column_types: Vec<grafeo_common::types::LogicalType>,
2242 pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2247 pub execution_time_ms: Option<f64>,
2249 pub rows_scanned: Option<u64>,
2251 pub status_message: Option<String>,
2253 pub gql_status: grafeo_common::utils::GqlStatus,
2255}
2256
2257impl QueryResult {
2258 #[must_use]
2260 pub fn empty() -> Self {
2261 Self {
2262 columns: Vec::new(),
2263 column_types: Vec::new(),
2264 rows: Vec::new(),
2265 execution_time_ms: None,
2266 rows_scanned: None,
2267 status_message: None,
2268 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2269 }
2270 }
2271
2272 #[must_use]
2274 pub fn status(msg: impl Into<String>) -> Self {
2275 Self {
2276 columns: Vec::new(),
2277 column_types: Vec::new(),
2278 rows: Vec::new(),
2279 execution_time_ms: None,
2280 rows_scanned: None,
2281 status_message: Some(msg.into()),
2282 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2283 }
2284 }
2285
2286 #[must_use]
2288 pub fn new(columns: Vec<String>) -> Self {
2289 let len = columns.len();
2290 Self {
2291 columns,
2292 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2293 rows: Vec::new(),
2294 execution_time_ms: None,
2295 rows_scanned: None,
2296 status_message: None,
2297 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2298 }
2299 }
2300
2301 #[must_use]
2303 pub fn with_types(
2304 columns: Vec<String>,
2305 column_types: Vec<grafeo_common::types::LogicalType>,
2306 ) -> Self {
2307 Self {
2308 columns,
2309 column_types,
2310 rows: Vec::new(),
2311 execution_time_ms: None,
2312 rows_scanned: None,
2313 status_message: None,
2314 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2315 }
2316 }
2317
2318 #[must_use]
2320 pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2321 let len = columns.len();
2322 Self {
2323 columns,
2324 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2325 rows,
2326 execution_time_ms: None,
2327 rows_scanned: None,
2328 status_message: None,
2329 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2330 }
2331 }
2332
2333 pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2335 self.rows.push(row);
2336 }
2337
2338 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2340 self.execution_time_ms = Some(execution_time_ms);
2341 self.rows_scanned = Some(rows_scanned);
2342 self
2343 }
2344
2345 #[must_use]
2347 pub fn execution_time_ms(&self) -> Option<f64> {
2348 self.execution_time_ms
2349 }
2350
2351 #[must_use]
2353 pub fn rows_scanned(&self) -> Option<u64> {
2354 self.rows_scanned
2355 }
2356
2357 #[must_use]
2359 pub fn row_count(&self) -> usize {
2360 self.rows.len()
2361 }
2362
2363 #[must_use]
2365 pub fn column_count(&self) -> usize {
2366 self.columns.len()
2367 }
2368
2369 #[must_use]
2371 pub fn is_empty(&self) -> bool {
2372 self.rows.is_empty()
2373 }
2374
2375 pub fn scalar<T: FromValue>(&self) -> Result<T> {
2384 if self.rows.len() != 1 || self.columns.len() != 1 {
2385 return Err(grafeo_common::utils::error::Error::InvalidValue(
2386 "Expected single value".to_string(),
2387 ));
2388 }
2389 T::from_value(&self.rows[0][0])
2390 }
2391
2392 #[must_use]
2394 pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2395 &self.rows
2396 }
2397
2398 #[must_use]
2400 pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2401 self.rows
2402 }
2403
2404 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2406 self.rows.iter()
2407 }
2408
2409 #[cfg(feature = "arrow-export")]
2424 pub fn to_record_batch(
2425 &self,
2426 ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2427 arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2428 }
2429
2430 #[cfg(feature = "arrow-export")]
2441 pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2442 let batch = self.to_record_batch()?;
2443 arrow::record_batch_to_ipc_stream(&batch)
2444 }
2445}
2446
2447impl std::fmt::Display for QueryResult {
2448 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2449 let table = grafeo_common::fmt::format_result_table(
2450 &self.columns,
2451 &self.rows,
2452 self.execution_time_ms,
2453 self.status_message.as_deref(),
2454 );
2455 f.write_str(&table)
2456 }
2457}
2458
2459pub trait FromValue: Sized {
2464 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2470}
2471
2472impl FromValue for i64 {
2473 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2474 value
2475 .as_int64()
2476 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2477 expected: "INT64".to_string(),
2478 found: value.type_name().to_string(),
2479 })
2480 }
2481}
2482
2483impl FromValue for f64 {
2484 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2485 value
2486 .as_float64()
2487 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2488 expected: "FLOAT64".to_string(),
2489 found: value.type_name().to_string(),
2490 })
2491 }
2492}
2493
2494impl FromValue for String {
2495 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2496 value.as_str().map(String::from).ok_or_else(|| {
2497 grafeo_common::utils::error::Error::TypeMismatch {
2498 expected: "STRING".to_string(),
2499 found: value.type_name().to_string(),
2500 }
2501 })
2502 }
2503}
2504
2505impl FromValue for bool {
2506 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2507 value
2508 .as_bool()
2509 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2510 expected: "BOOL".to_string(),
2511 found: value.type_name().to_string(),
2512 })
2513 }
2514}
2515
2516#[cfg(test)]
2517mod tests {
2518 use super::*;
2519
2520 #[test]
2521 fn test_create_in_memory_database() {
2522 let db = GrafeoDB::new_in_memory();
2523 assert_eq!(db.node_count(), 0);
2524 assert_eq!(db.edge_count(), 0);
2525 }
2526
2527 #[test]
2528 fn test_database_config() {
2529 let config = Config::in_memory().with_threads(4).with_query_logging();
2530
2531 let db = GrafeoDB::with_config(config).unwrap();
2532 assert_eq!(db.config().threads, 4);
2533 assert!(db.config().query_logging);
2534 }
2535
2536 #[test]
2537 fn test_database_session() {
2538 let db = GrafeoDB::new_in_memory();
2539 let _session = db.session();
2540 }
2542
2543 #[cfg(feature = "wal")]
2544 #[test]
2545 fn test_persistent_database_recovery() {
2546 use grafeo_common::types::Value;
2547 use tempfile::tempdir;
2548
2549 let dir = tempdir().unwrap();
2550 let db_path = dir.path().join("test_db");
2551
2552 {
2554 let db = GrafeoDB::open(&db_path).unwrap();
2555
2556 let alix = db.create_node(&["Person"]);
2557 db.set_node_property(alix, "name", Value::from("Alix"));
2558
2559 let gus = db.create_node(&["Person"]);
2560 db.set_node_property(gus, "name", Value::from("Gus"));
2561
2562 let _edge = db.create_edge(alix, gus, "KNOWS");
2563
2564 db.close().unwrap();
2566 }
2567
2568 {
2570 let db = GrafeoDB::open(&db_path).unwrap();
2571
2572 assert_eq!(db.node_count(), 2);
2573 assert_eq!(db.edge_count(), 1);
2574
2575 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2577 assert!(node0.is_some());
2578
2579 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2580 assert!(node1.is_some());
2581 }
2582 }
2583
2584 #[cfg(feature = "wal")]
2585 #[test]
2586 fn test_wal_logging() {
2587 use tempfile::tempdir;
2588
2589 let dir = tempdir().unwrap();
2590 let db_path = dir.path().join("wal_test_db");
2591
2592 let db = GrafeoDB::open(&db_path).unwrap();
2593
2594 let node = db.create_node(&["Test"]);
2596 db.delete_node(node);
2597
2598 if let Some(wal) = db.wal() {
2600 assert!(wal.record_count() > 0);
2601 }
2602
2603 db.close().unwrap();
2604 }
2605
2606 #[cfg(feature = "wal")]
2607 #[test]
2608 fn test_wal_recovery_multiple_sessions() {
2609 use grafeo_common::types::Value;
2611 use tempfile::tempdir;
2612
2613 let dir = tempdir().unwrap();
2614 let db_path = dir.path().join("multi_session_db");
2615
2616 {
2618 let db = GrafeoDB::open(&db_path).unwrap();
2619 let alix = db.create_node(&["Person"]);
2620 db.set_node_property(alix, "name", Value::from("Alix"));
2621 db.close().unwrap();
2622 }
2623
2624 {
2626 let db = GrafeoDB::open(&db_path).unwrap();
2627 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
2629 db.set_node_property(gus, "name", Value::from("Gus"));
2630 db.close().unwrap();
2631 }
2632
2633 {
2635 let db = GrafeoDB::open(&db_path).unwrap();
2636 assert_eq!(db.node_count(), 2);
2637
2638 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2640 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2641
2642 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2643 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2644 }
2645 }
2646
2647 #[cfg(feature = "wal")]
2648 #[test]
2649 fn test_database_consistency_after_mutations() {
2650 use grafeo_common::types::Value;
2652 use tempfile::tempdir;
2653
2654 let dir = tempdir().unwrap();
2655 let db_path = dir.path().join("consistency_db");
2656
2657 {
2658 let db = GrafeoDB::open(&db_path).unwrap();
2659
2660 let a = db.create_node(&["Node"]);
2662 let b = db.create_node(&["Node"]);
2663 let c = db.create_node(&["Node"]);
2664
2665 let e1 = db.create_edge(a, b, "LINKS");
2667 let _e2 = db.create_edge(b, c, "LINKS");
2668
2669 db.delete_edge(e1);
2671 db.delete_node(b);
2672
2673 db.set_node_property(a, "value", Value::Int64(1));
2675 db.set_node_property(c, "value", Value::Int64(3));
2676
2677 db.close().unwrap();
2678 }
2679
2680 {
2682 let db = GrafeoDB::open(&db_path).unwrap();
2683
2684 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2688 assert!(node_a.is_some());
2689
2690 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2691 assert!(node_c.is_some());
2692
2693 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2695 assert!(node_b.is_none());
2696 }
2697 }
2698
2699 #[cfg(feature = "wal")]
2700 #[test]
2701 fn test_close_is_idempotent() {
2702 use tempfile::tempdir;
2704
2705 let dir = tempdir().unwrap();
2706 let db_path = dir.path().join("close_test_db");
2707
2708 let db = GrafeoDB::open(&db_path).unwrap();
2709 db.create_node(&["Test"]);
2710
2711 assert!(db.close().is_ok());
2713
2714 assert!(db.close().is_ok());
2716 }
2717
2718 #[test]
2719 fn test_with_store_external_backend() {
2720 use grafeo_core::graph::lpg::LpgStore;
2721
2722 let external = Arc::new(LpgStore::new().unwrap());
2723
2724 let n1 = external.create_node(&["Person"]);
2726 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2727
2728 let db = GrafeoDB::with_store(
2729 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2730 Config::in_memory(),
2731 )
2732 .unwrap();
2733
2734 let session = db.session();
2735
2736 #[cfg(feature = "gql")]
2738 {
2739 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2740 assert_eq!(result.rows.len(), 1);
2741 }
2742 }
2743
2744 #[test]
2745 fn test_with_config_custom_memory_limit() {
2746 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
2749 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2750 assert_eq!(db.node_count(), 0);
2751 }
2752
2753 #[cfg(feature = "metrics")]
2754 #[test]
2755 fn test_database_metrics_registry() {
2756 let db = GrafeoDB::new_in_memory();
2757
2758 db.create_node(&["Person"]);
2760 db.create_node(&["Person"]);
2761
2762 let snap = db.metrics();
2764 assert_eq!(snap.query_count, 0); }
2767
2768 #[test]
2769 fn test_query_result_has_metrics() {
2770 let db = GrafeoDB::new_in_memory();
2772 db.create_node(&["Person"]);
2773 db.create_node(&["Person"]);
2774
2775 #[cfg(feature = "gql")]
2776 {
2777 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2778
2779 assert!(result.execution_time_ms.is_some());
2781 assert!(result.rows_scanned.is_some());
2782 assert!(result.execution_time_ms.unwrap() >= 0.0);
2783 assert_eq!(result.rows_scanned.unwrap(), 2);
2784 }
2785 }
2786
2787 #[test]
2788 fn test_empty_query_result_metrics() {
2789 let db = GrafeoDB::new_in_memory();
2791 db.create_node(&["Person"]);
2792
2793 #[cfg(feature = "gql")]
2794 {
2795 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2797
2798 assert!(result.execution_time_ms.is_some());
2799 assert!(result.rows_scanned.is_some());
2800 assert_eq!(result.rows_scanned.unwrap(), 0);
2801 }
2802 }
2803
2804 #[cfg(feature = "cdc")]
2805 mod cdc_integration {
2806 use super::*;
2807
2808 fn cdc_db() -> GrafeoDB {
2810 GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2811 }
2812
2813 #[test]
2814 fn test_node_lifecycle_history() {
2815 let db = cdc_db();
2816
2817 let id = db.create_node(&["Person"]);
2819 db.set_node_property(id, "name", "Alix".into());
2821 db.set_node_property(id, "name", "Gus".into());
2822 db.delete_node(id);
2824
2825 let history = db.history(id).unwrap();
2826 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2828 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2829 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2831 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2833 }
2834
2835 #[test]
2836 fn test_edge_lifecycle_history() {
2837 let db = cdc_db();
2838
2839 let alix = db.create_node(&["Person"]);
2840 let gus = db.create_node(&["Person"]);
2841 let edge = db.create_edge(alix, gus, "KNOWS");
2842 db.set_edge_property(edge, "since", 2024i64.into());
2843 db.delete_edge(edge);
2844
2845 let history = db.history(edge).unwrap();
2846 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2848 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2849 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2850 }
2851
2852 #[test]
2853 fn test_create_node_with_props_cdc() {
2854 let db = cdc_db();
2855
2856 let id = db.create_node_with_props(
2857 &["Person"],
2858 vec![
2859 ("name", grafeo_common::types::Value::from("Alix")),
2860 ("age", grafeo_common::types::Value::from(30i64)),
2861 ],
2862 );
2863
2864 let history = db.history(id).unwrap();
2865 assert_eq!(history.len(), 1);
2866 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2867 let after = history[0].after.as_ref().unwrap();
2869 assert_eq!(after.len(), 2);
2870 }
2871
2872 #[test]
2873 fn test_changes_between() {
2874 let db = cdc_db();
2875
2876 let id1 = db.create_node(&["A"]);
2877 let _id2 = db.create_node(&["B"]);
2878 db.set_node_property(id1, "x", 1i64.into());
2879
2880 let changes = db
2882 .changes_between(
2883 grafeo_common::types::EpochId(0),
2884 grafeo_common::types::EpochId(u64::MAX),
2885 )
2886 .unwrap();
2887 assert_eq!(changes.len(), 3); }
2889
2890 #[test]
2891 fn test_cdc_disabled_by_default() {
2892 let db = GrafeoDB::new_in_memory();
2893 assert!(!db.is_cdc_enabled());
2894
2895 let id = db.create_node(&["Person"]);
2896 db.set_node_property(id, "name", "Alix".into());
2897
2898 let history = db.history(id).unwrap();
2899 assert!(history.is_empty(), "CDC off by default: no events recorded");
2900 }
2901
2902 #[test]
2903 fn test_session_with_cdc_override_on() {
2904 let db = GrafeoDB::new_in_memory();
2906 let session = db.session_with_cdc(true);
2907 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2908 let changes = db
2910 .changes_between(
2911 grafeo_common::types::EpochId(0),
2912 grafeo_common::types::EpochId(u64::MAX),
2913 )
2914 .unwrap();
2915 assert!(
2916 !changes.is_empty(),
2917 "session_with_cdc(true) should record events"
2918 );
2919 }
2920
2921 #[test]
2922 fn test_session_with_cdc_override_off() {
2923 let db = cdc_db();
2925 let session = db.session_with_cdc(false);
2926 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2927 let changes = db
2928 .changes_between(
2929 grafeo_common::types::EpochId(0),
2930 grafeo_common::types::EpochId(u64::MAX),
2931 )
2932 .unwrap();
2933 assert!(
2934 changes.is_empty(),
2935 "session_with_cdc(false) should not record events"
2936 );
2937 }
2938
2939 #[test]
2940 fn test_set_cdc_enabled_runtime() {
2941 let db = GrafeoDB::new_in_memory();
2942 assert!(!db.is_cdc_enabled());
2943
2944 db.set_cdc_enabled(true);
2946 assert!(db.is_cdc_enabled());
2947
2948 let id = db.create_node(&["Person"]);
2949 let history = db.history(id).unwrap();
2950 assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
2951
2952 db.set_cdc_enabled(false);
2954 let id2 = db.create_node(&["Person"]);
2955 let history2 = db.history(id2).unwrap();
2956 assert!(
2957 history2.is_empty(),
2958 "CDC disabled at runtime stops recording"
2959 );
2960 }
2961 }
2962
2963 #[test]
2964 fn test_with_store_basic() {
2965 use grafeo_core::graph::lpg::LpgStore;
2966
2967 let store = Arc::new(LpgStore::new().unwrap());
2968 let n1 = store.create_node(&["Person"]);
2969 store.set_node_property(n1, "name", "Alix".into());
2970
2971 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2972 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2973
2974 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
2975 assert_eq!(result.rows.len(), 1);
2976 }
2977
2978 #[test]
2979 fn test_with_store_session() {
2980 use grafeo_core::graph::lpg::LpgStore;
2981
2982 let store = Arc::new(LpgStore::new().unwrap());
2983 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2984 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2985
2986 let session = db.session();
2987 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
2988 assert_eq!(result.rows.len(), 1);
2989 }
2990
2991 #[test]
2992 fn test_with_store_mutations() {
2993 use grafeo_core::graph::lpg::LpgStore;
2994
2995 let store = Arc::new(LpgStore::new().unwrap());
2996 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2997 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2998
2999 let mut session = db.session();
3000
3001 session.begin_transaction().unwrap();
3005 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3006
3007 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3008 assert_eq!(result.rows.len(), 1);
3009
3010 session.commit().unwrap();
3011 }
3012
3013 #[test]
3018 fn test_query_result_empty() {
3019 let result = QueryResult::empty();
3020 assert!(result.is_empty());
3021 assert_eq!(result.row_count(), 0);
3022 assert_eq!(result.column_count(), 0);
3023 assert!(result.execution_time_ms().is_none());
3024 assert!(result.rows_scanned().is_none());
3025 assert!(result.status_message.is_none());
3026 }
3027
3028 #[test]
3029 fn test_query_result_status() {
3030 let result = QueryResult::status("Created node type 'Person'");
3031 assert!(result.is_empty());
3032 assert_eq!(result.column_count(), 0);
3033 assert_eq!(
3034 result.status_message.as_deref(),
3035 Some("Created node type 'Person'")
3036 );
3037 }
3038
3039 #[test]
3040 fn test_query_result_new_with_columns() {
3041 let result = QueryResult::new(vec!["name".into(), "age".into()]);
3042 assert_eq!(result.column_count(), 2);
3043 assert_eq!(result.row_count(), 0);
3044 assert!(result.is_empty());
3045 assert_eq!(
3047 result.column_types,
3048 vec![
3049 grafeo_common::types::LogicalType::Any,
3050 grafeo_common::types::LogicalType::Any
3051 ]
3052 );
3053 }
3054
3055 #[test]
3056 fn test_query_result_with_types() {
3057 use grafeo_common::types::LogicalType;
3058 let result = QueryResult::with_types(
3059 vec!["name".into(), "age".into()],
3060 vec![LogicalType::String, LogicalType::Int64],
3061 );
3062 assert_eq!(result.column_count(), 2);
3063 assert_eq!(result.column_types[0], LogicalType::String);
3064 assert_eq!(result.column_types[1], LogicalType::Int64);
3065 }
3066
3067 #[test]
3068 fn test_query_result_with_metrics() {
3069 let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3070 assert_eq!(result.execution_time_ms(), Some(42.5));
3071 assert_eq!(result.rows_scanned(), Some(100));
3072 }
3073
3074 #[test]
3075 fn test_query_result_scalar_success() {
3076 use grafeo_common::types::Value;
3077 let mut result = QueryResult::new(vec!["count".into()]);
3078 result.rows.push(vec![Value::Int64(42)]);
3079
3080 let val: i64 = result.scalar().unwrap();
3081 assert_eq!(val, 42);
3082 }
3083
3084 #[test]
3085 fn test_query_result_scalar_wrong_shape() {
3086 use grafeo_common::types::Value;
3087 let mut result = QueryResult::new(vec!["x".into()]);
3089 result.rows.push(vec![Value::Int64(1)]);
3090 result.rows.push(vec![Value::Int64(2)]);
3091 assert!(result.scalar::<i64>().is_err());
3092
3093 let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3095 result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3096 assert!(result2.scalar::<i64>().is_err());
3097
3098 let result3 = QueryResult::new(vec!["x".into()]);
3100 assert!(result3.scalar::<i64>().is_err());
3101 }
3102
3103 #[test]
3104 fn test_query_result_iter() {
3105 use grafeo_common::types::Value;
3106 let mut result = QueryResult::new(vec!["x".into()]);
3107 result.rows.push(vec![Value::Int64(1)]);
3108 result.rows.push(vec![Value::Int64(2)]);
3109
3110 let collected: Vec<_> = result.iter().collect();
3111 assert_eq!(collected.len(), 2);
3112 }
3113
3114 #[test]
3115 fn test_query_result_display() {
3116 use grafeo_common::types::Value;
3117 let mut result = QueryResult::new(vec!["name".into()]);
3118 result.rows.push(vec![Value::from("Alix")]);
3119 let display = result.to_string();
3120 assert!(display.contains("name"));
3121 assert!(display.contains("Alix"));
3122 }
3123
3124 #[test]
3129 fn test_from_value_i64_type_mismatch() {
3130 use grafeo_common::types::Value;
3131 let val = Value::from("not a number");
3132 assert!(i64::from_value(&val).is_err());
3133 }
3134
3135 #[test]
3136 fn test_from_value_f64_type_mismatch() {
3137 use grafeo_common::types::Value;
3138 let val = Value::from("not a float");
3139 assert!(f64::from_value(&val).is_err());
3140 }
3141
3142 #[test]
3143 fn test_from_value_string_type_mismatch() {
3144 use grafeo_common::types::Value;
3145 let val = Value::Int64(42);
3146 assert!(String::from_value(&val).is_err());
3147 }
3148
3149 #[test]
3150 fn test_from_value_bool_type_mismatch() {
3151 use grafeo_common::types::Value;
3152 let val = Value::Int64(1);
3153 assert!(bool::from_value(&val).is_err());
3154 }
3155
3156 #[test]
3157 fn test_from_value_all_success() {
3158 use grafeo_common::types::Value;
3159 assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3160 assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3161 assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3162 assert!(bool::from_value(&Value::Bool(true)).unwrap());
3163 }
3164
3165 #[test]
3170 fn test_database_is_read_only_false_by_default() {
3171 let db = GrafeoDB::new_in_memory();
3172 assert!(!db.is_read_only());
3173 }
3174
3175 #[test]
3176 fn test_database_graph_model() {
3177 let db = GrafeoDB::new_in_memory();
3178 assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3179 }
3180
3181 #[test]
3182 fn test_database_memory_limit_none_by_default() {
3183 let db = GrafeoDB::new_in_memory();
3184 assert!(db.memory_limit().is_none());
3185 }
3186
3187 #[test]
3188 fn test_database_memory_limit_custom() {
3189 let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3190 let db = GrafeoDB::with_config(config).unwrap();
3191 assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3192 }
3193
3194 #[test]
3195 fn test_database_adaptive_config() {
3196 let db = GrafeoDB::new_in_memory();
3197 let adaptive = db.adaptive_config();
3198 assert!(adaptive.enabled);
3199 assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3200 }
3201
3202 #[test]
3203 fn test_database_buffer_manager() {
3204 let db = GrafeoDB::new_in_memory();
3205 let _bm = db.buffer_manager();
3206 }
3208
3209 #[test]
3210 fn test_database_query_cache() {
3211 let db = GrafeoDB::new_in_memory();
3212 let _qc = db.query_cache();
3213 }
3214
3215 #[test]
3216 fn test_database_clear_plan_cache() {
3217 let db = GrafeoDB::new_in_memory();
3218 #[cfg(feature = "gql")]
3220 {
3221 let _ = db.execute("MATCH (n) RETURN count(n)");
3222 }
3223 db.clear_plan_cache();
3224 }
3226
3227 #[test]
3228 fn test_database_gc() {
3229 let db = GrafeoDB::new_in_memory();
3230 db.create_node(&["Person"]);
3231 db.gc();
3232 assert_eq!(db.node_count(), 1);
3234 }
3235
3236 #[test]
3241 fn test_create_and_list_graphs() {
3242 let db = GrafeoDB::new_in_memory();
3243 let created = db.create_graph("social").unwrap();
3244 assert!(created);
3245
3246 let created_again = db.create_graph("social").unwrap();
3248 assert!(!created_again);
3249
3250 let names = db.list_graphs();
3251 assert!(names.contains(&"social".to_string()));
3252 }
3253
3254 #[test]
3255 fn test_drop_graph() {
3256 let db = GrafeoDB::new_in_memory();
3257 db.create_graph("temp").unwrap();
3258 assert!(db.drop_graph("temp"));
3259 assert!(!db.drop_graph("temp")); }
3261
3262 #[test]
3263 fn test_drop_graph_resets_current_graph() {
3264 let db = GrafeoDB::new_in_memory();
3265 db.create_graph("active").unwrap();
3266 db.set_current_graph(Some("active")).unwrap();
3267 assert_eq!(db.current_graph(), Some("active".to_string()));
3268
3269 db.drop_graph("active");
3270 assert_eq!(db.current_graph(), None);
3271 }
3272
3273 #[test]
3278 fn test_current_graph_default_none() {
3279 let db = GrafeoDB::new_in_memory();
3280 assert_eq!(db.current_graph(), None);
3281 }
3282
3283 #[test]
3284 fn test_set_current_graph_valid() {
3285 let db = GrafeoDB::new_in_memory();
3286 db.create_graph("social").unwrap();
3287 db.set_current_graph(Some("social")).unwrap();
3288 assert_eq!(db.current_graph(), Some("social".to_string()));
3289 }
3290
3291 #[test]
3292 fn test_set_current_graph_nonexistent() {
3293 let db = GrafeoDB::new_in_memory();
3294 let result = db.set_current_graph(Some("nonexistent"));
3295 assert!(result.is_err());
3296 }
3297
3298 #[test]
3299 fn test_set_current_graph_none_resets() {
3300 let db = GrafeoDB::new_in_memory();
3301 db.create_graph("social").unwrap();
3302 db.set_current_graph(Some("social")).unwrap();
3303 db.set_current_graph(None).unwrap();
3304 assert_eq!(db.current_graph(), None);
3305 }
3306
3307 #[test]
3308 fn test_set_current_graph_default_keyword() {
3309 let db = GrafeoDB::new_in_memory();
3310 db.set_current_graph(Some("default")).unwrap();
3312 assert_eq!(db.current_graph(), Some("default".to_string()));
3313 }
3314
3315 #[test]
3316 fn test_current_schema_default_none() {
3317 let db = GrafeoDB::new_in_memory();
3318 assert_eq!(db.current_schema(), None);
3319 }
3320
3321 #[test]
3322 fn test_set_current_schema_nonexistent() {
3323 let db = GrafeoDB::new_in_memory();
3324 let result = db.set_current_schema(Some("nonexistent"));
3325 assert!(result.is_err());
3326 }
3327
3328 #[test]
3329 fn test_set_current_schema_none_resets() {
3330 let db = GrafeoDB::new_in_memory();
3331 db.set_current_schema(None).unwrap();
3332 assert_eq!(db.current_schema(), None);
3333 }
3334
3335 #[test]
3340 fn test_graph_store_returns_lpg_by_default() {
3341 let db = GrafeoDB::new_in_memory();
3342 db.create_node(&["Person"]);
3343 let store = db.graph_store();
3344 assert_eq!(store.node_count(), 1);
3345 }
3346
3347 #[test]
3348 fn test_graph_store_mut_returns_some_by_default() {
3349 let db = GrafeoDB::new_in_memory();
3350 assert!(db.graph_store_mut().is_some());
3351 }
3352
3353 #[test]
3354 fn test_with_read_store() {
3355 use grafeo_core::graph::lpg::LpgStore;
3356
3357 let store = Arc::new(LpgStore::new().unwrap());
3358 store.create_node(&["Person"]);
3359
3360 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3361 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3362
3363 assert!(db.is_read_only());
3364 assert!(db.graph_store_mut().is_none());
3365
3366 let gs = db.graph_store();
3368 assert_eq!(gs.node_count(), 1);
3369 }
3370
3371 #[test]
3372 fn test_with_store_graph_store_methods() {
3373 use grafeo_core::graph::lpg::LpgStore;
3374
3375 let store = Arc::new(LpgStore::new().unwrap());
3376 store.create_node(&["Person"]);
3377
3378 let db = GrafeoDB::with_store(
3379 Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3380 Config::in_memory(),
3381 )
3382 .unwrap();
3383
3384 assert!(!db.is_read_only());
3385 assert!(db.graph_store_mut().is_some());
3386 assert_eq!(db.graph_store().node_count(), 1);
3387 }
3388
3389 #[test]
3394 fn test_session_read_only() {
3395 let db = GrafeoDB::new_in_memory();
3396 db.create_node(&["Person"]);
3397
3398 let session = db.session_read_only();
3399 #[cfg(feature = "gql")]
3401 {
3402 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3403 assert_eq!(result.rows.len(), 1);
3404 }
3405 }
3406
3407 #[test]
3412 fn test_close_in_memory_database() {
3413 let db = GrafeoDB::new_in_memory();
3414 db.create_node(&["Person"]);
3415 assert!(db.close().is_ok());
3416 assert!(db.close().is_ok());
3418 }
3419
3420 #[test]
3425 fn test_with_config_invalid_config_zero_threads() {
3426 let config = Config::in_memory().with_threads(0);
3427 let result = GrafeoDB::with_config(config);
3428 assert!(result.is_err());
3429 }
3430
3431 #[test]
3432 fn test_with_config_invalid_config_zero_memory_limit() {
3433 let config = Config::in_memory().with_memory_limit(0);
3434 let result = GrafeoDB::with_config(config);
3435 assert!(result.is_err());
3436 }
3437
3438 #[test]
3443 fn test_storage_format_display() {
3444 use crate::config::StorageFormat;
3445 assert_eq!(StorageFormat::Auto.to_string(), "auto");
3446 assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3447 assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3448 }
3449
3450 #[test]
3451 fn test_storage_format_default() {
3452 use crate::config::StorageFormat;
3453 assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3454 }
3455
3456 #[test]
3457 fn test_config_with_storage_format() {
3458 use crate::config::StorageFormat;
3459 let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3460 assert_eq!(config.storage_format, StorageFormat::SingleFile);
3461 }
3462
3463 #[test]
3468 fn test_config_with_cdc() {
3469 let config = Config::in_memory().with_cdc();
3470 assert!(config.cdc_enabled);
3471 }
3472
3473 #[test]
3474 fn test_config_cdc_default_false() {
3475 let config = Config::in_memory();
3476 assert!(!config.cdc_enabled);
3477 }
3478
3479 #[test]
3484 fn test_config_error_is_error_trait() {
3485 use crate::config::ConfigError;
3486 let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3487 assert!(err.source().is_none());
3488 }
3489
3490 #[cfg(feature = "metrics")]
3495 #[test]
3496 fn test_metrics_prometheus_output() {
3497 let db = GrafeoDB::new_in_memory();
3498 let prom = db.metrics_prometheus();
3499 assert!(!prom.is_empty());
3501 }
3502
3503 #[cfg(feature = "metrics")]
3504 #[test]
3505 fn test_reset_metrics() {
3506 let db = GrafeoDB::new_in_memory();
3507 let _session = db.session();
3509 db.reset_metrics();
3510 let snap = db.metrics();
3511 assert_eq!(snap.query_count, 0);
3512 }
3513
3514 #[test]
3519 fn test_drop_graph_on_external_store() {
3520 use grafeo_core::graph::lpg::LpgStore;
3521
3522 let store = Arc::new(LpgStore::new().unwrap());
3523 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3524 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3525
3526 assert!(!db.drop_graph("anything"));
3528 }
3529}