1#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(all(feature = "compact-store", feature = "mmap"))]
31pub mod compact_tiered;
32#[cfg(feature = "lpg")]
33mod crud;
34#[cfg(feature = "embed")]
35mod embed;
36#[cfg(feature = "grafeo-file")]
37pub(crate) mod flush;
38#[cfg(feature = "lpg")]
39mod import;
40#[cfg(feature = "lpg")]
41mod index;
42#[cfg(feature = "lpg")]
43mod persistence;
44mod query;
45#[cfg(feature = "triple-store")]
46mod rdf_ops;
47#[cfg(feature = "lpg")]
48mod search;
49pub(crate) mod section_consumer;
50#[cfg(all(feature = "wal", feature = "lpg"))]
51pub(crate) mod wal_store;
52
53use grafeo_common::{grafeo_error, grafeo_warn};
54#[cfg(feature = "wal")]
55use std::path::Path;
56use std::sync::Arc;
57use std::sync::atomic::AtomicUsize;
58
59use parking_lot::RwLock;
60
61use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
62use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
63#[cfg(feature = "lpg")]
64use grafeo_core::graph::lpg::LpgStore;
65#[cfg(feature = "triple-store")]
66use grafeo_core::graph::rdf::RdfStore;
67use grafeo_core::graph::{GraphStoreMut, GraphStoreSearch};
68#[cfg(feature = "grafeo-file")]
69use grafeo_storage::file::GrafeoFileManager;
70#[cfg(all(feature = "wal", feature = "lpg"))]
71use grafeo_storage::wal::WalRecovery;
72#[cfg(feature = "wal")]
73use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
74
75use crate::catalog::Catalog;
76use crate::config::Config;
77use crate::query::cache::QueryCache;
78use crate::session::Session;
79use crate::transaction::TransactionManager;
80
81pub struct GrafeoDB {
104 pub(super) config: Config,
106 #[cfg(feature = "lpg")]
108 pub(super) store: Option<Arc<LpgStore>>,
109 pub(super) catalog: Arc<Catalog>,
111 #[cfg(feature = "triple-store")]
113 pub(super) rdf_store: Arc<RdfStore>,
114 pub(super) transaction_manager: Arc<TransactionManager>,
116 pub(super) buffer_manager: Arc<BufferManager>,
118 #[cfg(feature = "wal")]
120 pub(super) wal: Option<Arc<LpgWal>>,
121 #[cfg(feature = "wal")]
125 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
126 pub(super) query_cache: Arc<QueryCache>,
128 pub(super) commit_counter: Arc<AtomicUsize>,
130 pub(super) is_open: RwLock<bool>,
132 #[cfg(feature = "cdc")]
134 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
135 #[cfg(feature = "cdc")]
137 cdc_enabled: std::sync::atomic::AtomicBool,
138 #[cfg(feature = "embed")]
140 pub(super) embedding_models:
141 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
142 #[cfg(feature = "grafeo-file")]
144 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
145 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
148 checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
149 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
152 vector_spill_storages: Option<
153 Arc<
154 parking_lot::RwLock<
155 std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
156 >,
157 >,
158 >,
159 pub(super) external_read_store: Option<Arc<dyn GraphStoreSearch>>,
162 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
165 #[cfg(feature = "metrics")]
167 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
168 current_graph: RwLock<Option<String>>,
172 current_schema: RwLock<Option<String>>,
176 read_only: bool,
179 projections:
181 Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
182 #[cfg(all(feature = "compact-store", feature = "lpg"))]
184 layered_store: Option<Arc<grafeo_core::graph::compact::layered::LayeredStore>>,
185 #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
192 compact_tiered: Option<Arc<compact_tiered::CompactStoreTiered>>,
193}
194
195impl GrafeoDB {
196 #[cfg(feature = "lpg")]
204 fn lpg_store(&self) -> &Arc<LpgStore> {
205 self.store.as_ref().expect(
206 "no built-in LpgStore: this GrafeoDB was created with an external store \
207 (with_store / with_read_store). Use session() or graph_store() instead.",
208 )
209 }
210
211 #[cfg(any(
221 feature = "vector-index",
222 feature = "text-index",
223 feature = "hybrid-search",
224 feature = "embed",
225 ))]
226 fn graph_store_ref(&self) -> &dyn grafeo_core::graph::GraphStore {
227 if let Some(ref ext_read) = self.external_read_store {
228 ext_read.as_ref()
229 } else {
230 #[cfg(feature = "lpg")]
231 {
232 &**self.lpg_store()
233 }
234 #[cfg(not(feature = "lpg"))]
235 unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
236 }
237 }
238
239 #[cfg(feature = "cdc")]
241 #[inline]
242 pub(super) fn cdc_active(&self) -> bool {
243 self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
244 }
245
246 #[must_use]
267 pub fn new_in_memory() -> Self {
268 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
269 }
270
271 #[cfg(feature = "wal")]
290 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
291 Self::with_config(Config::persistent(path.as_ref()))
292 }
293
294 #[cfg(feature = "grafeo-file")]
319 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
320 Self::with_config(Config::read_only(path.as_ref()))
321 }
322
323 pub fn with_config(config: Config) -> Result<Self> {
347 config
349 .validate()
350 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
351
352 #[cfg(feature = "lpg")]
353 let store = Arc::new(LpgStore::new()?);
354 #[cfg(feature = "triple-store")]
355 let rdf_store = Arc::new(RdfStore::new());
356 let transaction_manager = Arc::new(TransactionManager::new());
357
358 let buffer_config = BufferManagerConfig {
360 budget: config.memory_limit.unwrap_or_else(|| {
361 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
363 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
364 b
365 }),
366 spill_path: config.spill_path.clone().or_else(|| {
367 config.path.as_ref().and_then(|p| {
368 let parent = p.parent()?;
369 let name = p.file_name()?.to_str()?;
370 Some(parent.join(format!("{name}.spill")))
371 })
372 }),
373 ..BufferManagerConfig::default()
374 };
375 let buffer_manager = BufferManager::new(buffer_config);
376
377 let catalog = Arc::new(Catalog::new());
379
380 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
381
382 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
387 let mut loaded_compact_base: Option<
388 Arc<grafeo_core::graph::compact::CompactStore>,
389 > = None;
390
391 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
395 let mut loaded_overlay_deletions: Option<(
396 Vec<grafeo_common::types::NodeId>,
397 Vec<grafeo_common::types::EdgeId>,
398 )> = None;
399
400 #[cfg(feature = "grafeo-file")]
402 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
403 if let Some(ref db_path) = config.path {
405 if db_path.exists() && db_path.is_file() {
406 let fm = GrafeoFileManager::open_read_only(db_path)?;
407 #[cfg(feature = "lpg")]
409 if fm.read_section_directory()?.is_some() {
410 Self::load_from_sections(
411 &fm,
412 &store,
413 &catalog,
414 #[cfg(feature = "triple-store")]
415 &rdf_store,
416 )?;
417 #[cfg(feature = "compact-store")]
418 {
419 loaded_compact_base = Self::extract_compact_base(&fm)?;
420 loaded_overlay_deletions = Self::extract_overlay_deletions(&fm)?;
421 }
422 } else {
423 let snapshot_data = fm.read_snapshot()?;
425 if !snapshot_data.is_empty() {
426 Self::apply_snapshot_data(
427 &store,
428 &catalog,
429 #[cfg(feature = "triple-store")]
430 &rdf_store,
431 &snapshot_data,
432 )?;
433 }
434 }
435 Some(Arc::new(fm))
436 } else {
437 return Err(grafeo_common::utils::error::Error::Internal(format!(
438 "read-only open requires an existing .grafeo file: {}",
439 db_path.display()
440 )));
441 }
442 } else {
443 return Err(grafeo_common::utils::error::Error::Internal(
444 "read-only mode requires a database path".to_string(),
445 ));
446 }
447 } else if let Some(ref db_path) = config.path {
448 if Self::should_use_single_file(db_path, config.storage_format) {
453 let fm = if db_path.exists() && db_path.is_file() {
454 GrafeoFileManager::open(db_path)?
455 } else if !db_path.exists() {
456 GrafeoFileManager::create(db_path)?
457 } else {
458 return Err(grafeo_common::utils::error::Error::Internal(format!(
460 "path exists but is not a file: {}",
461 db_path.display()
462 )));
463 };
464
465 #[cfg(feature = "lpg")]
467 if fm.read_section_directory()?.is_some() {
468 Self::load_from_sections(
469 &fm,
470 &store,
471 &catalog,
472 #[cfg(feature = "triple-store")]
473 &rdf_store,
474 )?;
475 #[cfg(feature = "compact-store")]
476 {
477 loaded_compact_base = Self::extract_compact_base(&fm)?;
478 loaded_overlay_deletions = Self::extract_overlay_deletions(&fm)?;
479 }
480 } else {
481 let snapshot_data = fm.read_snapshot()?;
482 if !snapshot_data.is_empty() {
483 Self::apply_snapshot_data(
484 &store,
485 &catalog,
486 #[cfg(feature = "triple-store")]
487 &rdf_store,
488 &snapshot_data,
489 )?;
490 }
491 }
492
493 #[cfg(all(feature = "wal", feature = "lpg"))]
495 if config.wal_enabled && fm.has_sidecar_wal() {
496 let recovery = WalRecovery::new(fm.sidecar_wal_path());
497 let records = recovery.recover()?;
498 Self::apply_wal_records(
499 &store,
500 &catalog,
501 #[cfg(feature = "triple-store")]
502 &rdf_store,
503 &records,
504 )?;
505 }
506
507 Some(Arc::new(fm))
508 } else {
509 None
510 }
511 } else {
512 None
513 };
514
515 #[cfg(feature = "wal")]
518 let wal = if is_read_only {
519 None
520 } else if config.wal_enabled {
521 if let Some(ref db_path) = config.path {
522 #[cfg(feature = "grafeo-file")]
524 let wal_path = if let Some(ref fm) = file_manager {
525 let p = fm.sidecar_wal_path();
526 std::fs::create_dir_all(&p)?;
527 p
528 } else {
529 std::fs::create_dir_all(db_path)?;
531 db_path.join("wal")
532 };
533
534 #[cfg(not(feature = "grafeo-file"))]
535 let wal_path = {
536 std::fs::create_dir_all(db_path)?;
537 db_path.join("wal")
538 };
539
540 #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
542 let is_single_file = file_manager.is_some();
543 #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
544 let is_single_file = false;
545
546 #[cfg(feature = "lpg")]
547 if !is_single_file && wal_path.exists() {
548 let recovery = WalRecovery::new(&wal_path);
549 let records = recovery.recover()?;
550 Self::apply_wal_records(
551 &store,
552 &catalog,
553 #[cfg(feature = "triple-store")]
554 &rdf_store,
555 &records,
556 )?;
557 }
558
559 let wal_durability = match config.wal_durability {
561 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
562 crate::config::DurabilityMode::Batch {
563 max_delay_ms,
564 max_records,
565 } => WalDurabilityMode::Batch {
566 max_delay_ms,
567 max_records,
568 },
569 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
570 WalDurabilityMode::Adaptive { target_interval_ms }
571 }
572 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
573 };
574 let wal_config = WalConfig {
575 durability: wal_durability,
576 ..WalConfig::default()
577 };
578 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
579 Some(Arc::new(wal_manager))
580 } else {
581 None
582 }
583 } else {
584 None
585 };
586
587 let query_cache = Arc::new(QueryCache::default());
589
590 #[cfg(all(feature = "temporal", feature = "lpg"))]
593 transaction_manager.sync_epoch(store.current_epoch());
594
595 #[cfg(feature = "cdc")]
596 let cdc_enabled_val = config.cdc_enabled;
597 #[cfg(feature = "cdc")]
598 let cdc_retention = config.cdc_retention.clone();
599
600 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
603 let checkpoint_interval = config.checkpoint_interval;
604 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
605 let timer_store = Arc::clone(&store);
606 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
607 let timer_catalog = Arc::clone(&catalog);
608 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
609 let timer_tm = Arc::clone(&transaction_manager);
610 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
611 let timer_rdf = Arc::clone(&rdf_store);
612 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
613 let timer_wal = wal.clone();
614
615 let mut db = Self {
616 config,
617 #[cfg(feature = "lpg")]
618 store: Some(store),
619 catalog,
620 #[cfg(feature = "triple-store")]
621 rdf_store,
622 transaction_manager,
623 buffer_manager,
624 #[cfg(feature = "wal")]
625 wal,
626 #[cfg(feature = "wal")]
627 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
628 query_cache,
629 commit_counter: Arc::new(AtomicUsize::new(0)),
630 is_open: RwLock::new(true),
631 #[cfg(feature = "cdc")]
632 cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
633 #[cfg(feature = "cdc")]
634 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
635 #[cfg(feature = "embed")]
636 embedding_models: RwLock::new(hashbrown::HashMap::new()),
637 #[cfg(feature = "grafeo-file")]
638 file_manager,
639 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
640 checkpoint_timer: parking_lot::Mutex::new(None),
641 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
642 vector_spill_storages: None,
643 external_read_store: None,
644 external_write_store: None,
645 #[cfg(feature = "metrics")]
646 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
647 current_graph: RwLock::new(None),
648 current_schema: RwLock::new(None),
649 read_only: is_read_only,
650 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
651 #[cfg(all(feature = "compact-store", feature = "lpg"))]
652 layered_store: None,
653 #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
654 compact_tiered: None,
655 };
656
657 db.register_section_consumers();
659
660 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
666 if let Some(compact_base) = loaded_compact_base {
667 db.wire_layered_after_load(compact_base, loaded_overlay_deletions)?;
668 }
669
670 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
672 if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
673 && !is_read_only
674 {
675 *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
676 interval,
677 Arc::clone(fm),
678 timer_store,
679 timer_catalog,
680 timer_tm,
681 #[cfg(feature = "triple-store")]
682 timer_rdf,
683 #[cfg(feature = "wal")]
684 timer_wal,
685 ));
686 }
687
688 #[cfg(all(
692 feature = "lpg",
693 feature = "vector-index",
694 feature = "mmap",
695 not(feature = "temporal")
696 ))]
697 db.restore_spill_files();
698
699 db.apply_force_disk_overrides();
705
706 Ok(db)
707 }
708
709 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
738 config
739 .validate()
740 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
741
742 let transaction_manager = Arc::new(TransactionManager::new());
743
744 let buffer_config = BufferManagerConfig {
745 budget: config.memory_limit.unwrap_or_else(|| {
746 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
748 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
749 b
750 }),
751 spill_path: None,
752 ..BufferManagerConfig::default()
753 };
754 let buffer_manager = BufferManager::new(buffer_config);
755
756 let query_cache = Arc::new(QueryCache::default());
757
758 #[cfg(feature = "cdc")]
759 let cdc_enabled_val = config.cdc_enabled;
760
761 Ok(Self {
762 config,
763 #[cfg(feature = "lpg")]
764 store: None,
765 catalog: Arc::new(Catalog::new()),
766 #[cfg(feature = "triple-store")]
767 rdf_store: Arc::new(RdfStore::new()),
768 transaction_manager,
769 buffer_manager,
770 #[cfg(feature = "wal")]
771 wal: None,
772 #[cfg(feature = "wal")]
773 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
774 query_cache,
775 commit_counter: Arc::new(AtomicUsize::new(0)),
776 is_open: RwLock::new(true),
777 #[cfg(feature = "cdc")]
778 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
779 #[cfg(feature = "cdc")]
780 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
781 #[cfg(feature = "embed")]
782 embedding_models: RwLock::new(hashbrown::HashMap::new()),
783 #[cfg(feature = "grafeo-file")]
784 file_manager: None,
785 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
786 checkpoint_timer: parking_lot::Mutex::new(None),
787 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
788 vector_spill_storages: None,
789 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStoreSearch>),
790 external_write_store: Some(store),
791 #[cfg(feature = "metrics")]
792 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
793 current_graph: RwLock::new(None),
794 current_schema: RwLock::new(None),
795 read_only: false,
796 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
797 #[cfg(all(feature = "compact-store", feature = "lpg"))]
798 layered_store: None,
799 #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
800 compact_tiered: None,
801 })
802 }
803
804 pub fn with_read_store(store: Arc<dyn GraphStoreSearch>, config: Config) -> Result<Self> {
829 config
830 .validate()
831 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
832
833 let transaction_manager = Arc::new(TransactionManager::new());
834
835 let buffer_config = BufferManagerConfig {
836 budget: config.memory_limit.unwrap_or_else(|| {
837 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
839 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
840 b
841 }),
842 spill_path: None,
843 ..BufferManagerConfig::default()
844 };
845 let buffer_manager = BufferManager::new(buffer_config);
846
847 let query_cache = Arc::new(QueryCache::default());
848
849 #[cfg(feature = "cdc")]
850 let cdc_enabled_val = config.cdc_enabled;
851
852 Ok(Self {
853 config,
854 #[cfg(feature = "lpg")]
855 store: None,
856 catalog: Arc::new(Catalog::new()),
857 #[cfg(feature = "triple-store")]
858 rdf_store: Arc::new(RdfStore::new()),
859 transaction_manager,
860 buffer_manager,
861 #[cfg(feature = "wal")]
862 wal: None,
863 #[cfg(feature = "wal")]
864 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
865 query_cache,
866 commit_counter: Arc::new(AtomicUsize::new(0)),
867 is_open: RwLock::new(true),
868 #[cfg(feature = "cdc")]
869 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
870 #[cfg(feature = "cdc")]
871 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
872 #[cfg(feature = "embed")]
873 embedding_models: RwLock::new(hashbrown::HashMap::new()),
874 #[cfg(feature = "grafeo-file")]
875 file_manager: None,
876 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
877 checkpoint_timer: parking_lot::Mutex::new(None),
878 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
879 vector_spill_storages: None,
880 external_read_store: Some(store),
881 external_write_store: None,
882 #[cfg(feature = "metrics")]
883 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
884 current_graph: RwLock::new(None),
885 current_schema: RwLock::new(None),
886 read_only: true,
887 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
888 #[cfg(all(feature = "compact-store", feature = "lpg"))]
889 layered_store: None,
890 #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
891 compact_tiered: None,
892 })
893 }
894
895 #[cfg(all(feature = "compact-store", feature = "lpg"))]
913 pub fn compact(&mut self) -> Result<()> {
914 use grafeo_core::graph::compact::from_graph_store_preserving_ids;
915 use grafeo_core::graph::compact::layered::LayeredStore;
916
917 let current_store = self.graph_store();
918
919 let max_node_id = if let Some(ref store) = self.store {
921 store.next_node_id().saturating_sub(1)
922 } else {
923 current_store.node_ids().last().map_or(0, |id| id.as_u64())
924 };
925 let max_edge_id = if let Some(ref store) = self.store {
926 store.next_edge_id().saturating_sub(1)
927 } else {
928 let mut max_eid = 0u64;
930 for nid in current_store.node_ids() {
931 for (_, eid) in
932 current_store.edges_from(nid, grafeo_core::graph::Direction::Outgoing)
933 {
934 max_eid = max_eid.max(eid.as_u64());
935 }
936 }
937 max_eid
938 };
939
940 let compact = from_graph_store_preserving_ids(current_store.as_ref())
941 .map_err(|e| Error::Internal(e.to_string()))?;
942
943 let layered = Arc::new(
944 LayeredStore::new(compact, max_node_id, max_edge_id)
945 .map_err(|e| Error::Internal(e.to_string()))?,
946 );
947
948 let current_epoch = self.transaction_manager.current_epoch();
951 layered.overlay_store().sync_epoch(current_epoch);
952
953 if let Some(ref old) = self.store {
957 layered
958 .overlay_store()
959 .install_named_graphs(old.take_named_graphs());
960 }
961
962 self.external_read_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreSearch>);
963 self.external_write_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreMut>);
964 self.store = Some(layered.overlay_store());
965
966 #[cfg(feature = "mmap")]
970 {
971 let tiered = Arc::new(compact_tiered::CompactStoreTiered::new_in_memory(
972 layered.base_store_arc(),
973 ));
974 let spill_path = self.buffer_manager.config().spill_path.clone();
975 let consumer = Arc::new(section_consumer::CompactStoreConsumer::new(
976 &tiered, &layered, spill_path,
977 ));
978 self.buffer_manager.register_consumer(consumer);
979 self.compact_tiered = Some(tiered);
980 }
981
982 let overlay_consumer = Arc::new(section_consumer::OverlayConsumer::new(&layered));
985 self.buffer_manager.register_consumer(overlay_consumer);
986
987 self.layered_store = Some(layered);
988 self.read_only = false;
989 self.query_cache = Arc::new(QueryCache::default());
990 self.projections.write().clear();
991
992 Ok(())
993 }
994
995 #[cfg(all(feature = "compact-store", feature = "lpg"))]
1006 pub fn recompact(&mut self) -> Result<()> {
1007 use grafeo_core::graph::compact::from_graph_store_preserving_ids;
1008 use grafeo_core::graph::compact::layered::LayeredStore;
1009
1010 let layered = self
1011 .layered_store
1012 .as_ref()
1013 .ok_or_else(|| Error::Internal("recompact() requires a prior compact()".into()))?;
1014
1015 let combined: Arc<dyn GraphStoreSearch> = Arc::clone(layered) as Arc<dyn GraphStoreSearch>;
1017
1018 let max_node_id = layered.overlay_store().next_node_id().saturating_sub(1);
1020 let max_edge_id = layered.overlay_store().next_edge_id().saturating_sub(1);
1021
1022 let fresh_compact = from_graph_store_preserving_ids(combined.as_ref())
1023 .map_err(|e| Error::Internal(e.to_string()))?;
1024
1025 let new_layered = Arc::new(
1026 LayeredStore::new(fresh_compact, max_node_id, max_edge_id)
1027 .map_err(|e| Error::Internal(e.to_string()))?,
1028 );
1029
1030 let current_epoch = self.transaction_manager.current_epoch();
1032 new_layered.overlay_store().sync_epoch(current_epoch);
1033
1034 new_layered
1036 .overlay_store()
1037 .install_named_graphs(layered.overlay_store().take_named_graphs());
1038
1039 self.external_read_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreSearch>);
1040 self.external_write_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreMut>);
1041 self.store = Some(new_layered.overlay_store());
1042
1043 #[cfg(feature = "mmap")]
1047 {
1048 self.buffer_manager
1049 .unregister_consumer("section:CompactStore");
1050 let tiered = Arc::new(compact_tiered::CompactStoreTiered::new_in_memory(
1051 new_layered.base_store_arc(),
1052 ));
1053 let spill_path = self.buffer_manager.config().spill_path.clone();
1054 let consumer = Arc::new(section_consumer::CompactStoreConsumer::new(
1055 &tiered,
1056 &new_layered,
1057 spill_path,
1058 ));
1059 self.buffer_manager.register_consumer(consumer);
1060 self.compact_tiered = Some(tiered);
1061 }
1062
1063 self.buffer_manager.unregister_consumer("overlay:LpgStore");
1065 let overlay_consumer = Arc::new(section_consumer::OverlayConsumer::new(&new_layered));
1066 self.buffer_manager.register_consumer(overlay_consumer);
1067
1068 self.layered_store = Some(new_layered);
1069 self.query_cache = Arc::new(QueryCache::default());
1070
1071 Ok(())
1072 }
1073
1074 #[cfg(all(feature = "wal", feature = "lpg"))]
1080 fn apply_wal_records(
1081 store: &Arc<LpgStore>,
1082 catalog: &Catalog,
1083 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1084 records: &[WalRecord],
1085 ) -> Result<()> {
1086 use crate::catalog::{
1087 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
1088 };
1089 use grafeo_common::utils::error::Error;
1090
1091 let mut current_graph: Option<String> = None;
1094 let mut target_store: Arc<LpgStore> = Arc::clone(store);
1095
1096 for record in records {
1097 match record {
1098 WalRecord::CreateNamedGraph { name } => {
1100 let _ = store.create_graph(name);
1101 }
1102 WalRecord::DropNamedGraph { name } => {
1103 store.drop_graph(name);
1104 if current_graph.as_deref() == Some(name.as_str()) {
1106 current_graph = None;
1107 target_store = Arc::clone(store);
1108 }
1109 }
1110 WalRecord::SwitchGraph { name } => {
1111 current_graph.clone_from(name);
1112 target_store = match ¤t_graph {
1113 None => Arc::clone(store),
1114 Some(graph_name) => store
1115 .graph_or_create(graph_name)
1116 .map_err(|e| Error::Internal(e.to_string()))?,
1117 };
1118 }
1119
1120 WalRecord::CreateNode { id, labels } => {
1122 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
1123 target_store.create_node_with_id(*id, &label_refs)?;
1124 }
1125 WalRecord::DeleteNode { id } => {
1126 target_store.delete_node(*id);
1127 }
1128 WalRecord::CreateEdge {
1129 id,
1130 src,
1131 dst,
1132 edge_type,
1133 } => {
1134 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
1135 }
1136 WalRecord::DeleteEdge { id } => {
1137 target_store.delete_edge(*id);
1138 }
1139 WalRecord::SetNodeProperty { id, key, value } => {
1140 target_store.set_node_property(*id, key, value.clone());
1141 }
1142 WalRecord::SetEdgeProperty { id, key, value } => {
1143 target_store.set_edge_property(*id, key, value.clone());
1144 }
1145 WalRecord::AddNodeLabel { id, label } => {
1146 target_store.add_label(*id, label);
1147 }
1148 WalRecord::RemoveNodeLabel { id, label } => {
1149 target_store.remove_label(*id, label);
1150 }
1151 WalRecord::RemoveNodeProperty { id, key } => {
1152 target_store.remove_node_property(*id, key);
1153 }
1154 WalRecord::RemoveEdgeProperty { id, key } => {
1155 target_store.remove_edge_property(*id, key);
1156 }
1157
1158 WalRecord::CreateNodeType {
1160 name,
1161 properties,
1162 constraints,
1163 } => {
1164 let def = NodeTypeDefinition {
1165 name: name.clone(),
1166 properties: properties
1167 .iter()
1168 .map(|(n, t, nullable)| TypedProperty {
1169 name: n.clone(),
1170 data_type: PropertyDataType::from_type_name(t),
1171 nullable: *nullable,
1172 default_value: None,
1173 })
1174 .collect(),
1175 constraints: constraints
1176 .iter()
1177 .map(|(kind, props)| match kind.as_str() {
1178 "unique" => TypeConstraint::Unique(props.clone()),
1179 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1180 "not_null" if !props.is_empty() => {
1181 TypeConstraint::NotNull(props[0].clone())
1182 }
1183 _ => TypeConstraint::Unique(props.clone()),
1184 })
1185 .collect(),
1186 parent_types: Vec::new(),
1187 };
1188 let _ = catalog.register_node_type(def);
1189 }
1190 WalRecord::DropNodeType { name } => {
1191 let _ = catalog.drop_node_type(name);
1192 }
1193 WalRecord::CreateEdgeType {
1194 name,
1195 properties,
1196 constraints,
1197 } => {
1198 let def = EdgeTypeDefinition {
1199 name: name.clone(),
1200 properties: properties
1201 .iter()
1202 .map(|(n, t, nullable)| TypedProperty {
1203 name: n.clone(),
1204 data_type: PropertyDataType::from_type_name(t),
1205 nullable: *nullable,
1206 default_value: None,
1207 })
1208 .collect(),
1209 constraints: constraints
1210 .iter()
1211 .map(|(kind, props)| match kind.as_str() {
1212 "unique" => TypeConstraint::Unique(props.clone()),
1213 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1214 "not_null" if !props.is_empty() => {
1215 TypeConstraint::NotNull(props[0].clone())
1216 }
1217 _ => TypeConstraint::Unique(props.clone()),
1218 })
1219 .collect(),
1220 source_node_types: Vec::new(),
1221 target_node_types: Vec::new(),
1222 };
1223 let _ = catalog.register_edge_type_def(def);
1224 }
1225 WalRecord::DropEdgeType { name } => {
1226 let _ = catalog.drop_edge_type_def(name);
1227 }
1228 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
1229 }
1232 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
1233 }
1236 WalRecord::CreateGraphType {
1237 name,
1238 node_types,
1239 edge_types,
1240 open,
1241 } => {
1242 use crate::catalog::GraphTypeDefinition;
1243 let def = GraphTypeDefinition {
1244 name: name.clone(),
1245 allowed_node_types: node_types.clone(),
1246 allowed_edge_types: edge_types.clone(),
1247 open: *open,
1248 };
1249 let _ = catalog.register_graph_type(def);
1250 }
1251 WalRecord::DropGraphType { name } => {
1252 let _ = catalog.drop_graph_type(name);
1253 }
1254 WalRecord::CreateSchema { name } => {
1255 let _ = catalog.register_schema_namespace(name.clone());
1256 }
1257 WalRecord::DropSchema { name } => {
1258 let _ = catalog.drop_schema_namespace(name);
1259 }
1260
1261 WalRecord::AlterNodeType { name, alterations } => {
1262 for (action, prop_name, type_name, nullable) in alterations {
1263 match action.as_str() {
1264 "add" => {
1265 let prop = TypedProperty {
1266 name: prop_name.clone(),
1267 data_type: PropertyDataType::from_type_name(type_name),
1268 nullable: *nullable,
1269 default_value: None,
1270 };
1271 let _ = catalog.alter_node_type_add_property(name, prop);
1272 }
1273 "drop" => {
1274 let _ = catalog.alter_node_type_drop_property(name, prop_name);
1275 }
1276 _ => {}
1277 }
1278 }
1279 }
1280 WalRecord::AlterEdgeType { name, alterations } => {
1281 for (action, prop_name, type_name, nullable) in alterations {
1282 match action.as_str() {
1283 "add" => {
1284 let prop = TypedProperty {
1285 name: prop_name.clone(),
1286 data_type: PropertyDataType::from_type_name(type_name),
1287 nullable: *nullable,
1288 default_value: None,
1289 };
1290 let _ = catalog.alter_edge_type_add_property(name, prop);
1291 }
1292 "drop" => {
1293 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1294 }
1295 _ => {}
1296 }
1297 }
1298 }
1299 WalRecord::AlterGraphType { name, alterations } => {
1300 for (action, type_name) in alterations {
1301 match action.as_str() {
1302 "add_node" => {
1303 let _ =
1304 catalog.alter_graph_type_add_node_type(name, type_name.clone());
1305 }
1306 "drop_node" => {
1307 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1308 }
1309 "add_edge" => {
1310 let _ =
1311 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1312 }
1313 "drop_edge" => {
1314 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1315 }
1316 _ => {}
1317 }
1318 }
1319 }
1320
1321 WalRecord::CreateProcedure {
1322 name,
1323 params,
1324 returns,
1325 body,
1326 } => {
1327 use crate::catalog::ProcedureDefinition;
1328 let def = ProcedureDefinition {
1329 name: name.clone(),
1330 params: params.clone(),
1331 returns: returns.clone(),
1332 body: body.clone(),
1333 };
1334 let _ = catalog.register_procedure(def);
1335 }
1336 WalRecord::DropProcedure { name } => {
1337 let _ = catalog.drop_procedure(name);
1338 }
1339
1340 #[cfg(feature = "triple-store")]
1342 WalRecord::InsertRdfTriple { .. }
1343 | WalRecord::DeleteRdfTriple { .. }
1344 | WalRecord::ClearRdfGraph { .. }
1345 | WalRecord::CreateRdfGraph { .. }
1346 | WalRecord::DropRdfGraph { .. } => {
1347 rdf_ops::replay_rdf_wal_record(rdf_store, record);
1348 }
1349 #[cfg(not(feature = "triple-store"))]
1350 WalRecord::InsertRdfTriple { .. }
1351 | WalRecord::DeleteRdfTriple { .. }
1352 | WalRecord::ClearRdfGraph { .. }
1353 | WalRecord::CreateRdfGraph { .. }
1354 | WalRecord::DropRdfGraph { .. } => {}
1355
1356 WalRecord::TransactionCommit { .. } => {
1357 #[cfg(feature = "temporal")]
1361 {
1362 target_store.new_epoch();
1363 }
1364 }
1365 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1366 }
1369 WalRecord::EpochAdvance { .. } => {
1370 }
1373 }
1374 }
1375 Ok(())
1376 }
1377
1378 #[cfg(feature = "grafeo-file")]
1384 fn should_use_single_file(
1385 path: &std::path::Path,
1386 configured: crate::config::StorageFormat,
1387 ) -> bool {
1388 use crate::config::StorageFormat;
1389 match configured {
1390 StorageFormat::SingleFile => true,
1391 StorageFormat::WalDirectory => false,
1392 StorageFormat::Auto => {
1393 if path.is_file() {
1395 if let Ok(mut f) = std::fs::File::open(path) {
1396 use std::io::Read;
1397 let mut magic = [0u8; 4];
1398 if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1399 {
1400 return true;
1401 }
1402 }
1403 return false;
1404 }
1405 if path.is_dir() {
1407 return false;
1408 }
1409 path.extension().is_some_and(|ext| ext == "grafeo")
1411 }
1412 }
1413 }
1414
1415 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1419 fn apply_snapshot_data(
1420 store: &Arc<LpgStore>,
1421 catalog: &Arc<crate::catalog::Catalog>,
1422 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1423 data: &[u8],
1424 ) -> Result<()> {
1425 persistence::load_snapshot_into_store(
1427 store,
1428 catalog,
1429 #[cfg(feature = "triple-store")]
1430 rdf_store,
1431 data,
1432 )
1433 }
1434
1435 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
1448 fn wire_layered_after_load(
1449 &mut self,
1450 compact_base: Arc<grafeo_core::graph::compact::CompactStore>,
1451 deletion_log: Option<(
1452 Vec<grafeo_common::types::NodeId>,
1453 Vec<grafeo_common::types::EdgeId>,
1454 )>,
1455 ) -> Result<()> {
1456 use grafeo_core::graph::compact::layered::LayeredStore;
1457
1458 let overlay_store = self
1459 .store
1460 .as_ref()
1461 .ok_or_else(|| Error::Internal("wire_layered_after_load: no LpgStore".into()))?;
1462
1463 let layered = Arc::new(LayeredStore::with_overlay(
1466 Arc::clone(&compact_base),
1467 Arc::clone(overlay_store),
1468 ));
1469
1470 if let Some((nodes, edges)) = deletion_log {
1474 layered.seed_deleted_from_base(nodes, edges);
1475 }
1476
1477 let current_epoch = self.transaction_manager.current_epoch();
1479 layered.overlay_store().sync_epoch(current_epoch);
1480
1481 self.external_read_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreSearch>);
1482 self.external_write_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreMut>);
1483
1484 #[cfg(feature = "mmap")]
1486 {
1487 let tiered = Arc::new(compact_tiered::CompactStoreTiered::new_in_memory(
1488 layered.base_store_arc(),
1489 ));
1490 let spill_path = self.buffer_manager.config().spill_path.clone();
1491 let consumer = Arc::new(section_consumer::CompactStoreConsumer::new(
1492 &tiered, &layered, spill_path,
1493 ));
1494 self.buffer_manager.register_consumer(consumer);
1495 self.compact_tiered = Some(tiered);
1496 }
1497
1498 let overlay_consumer = Arc::new(section_consumer::OverlayConsumer::new(&layered));
1499 self.buffer_manager.register_consumer(overlay_consumer);
1500
1501 self.layered_store = Some(layered);
1502
1503 Ok(())
1504 }
1505
1506 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
1511 fn extract_compact_base(
1512 fm: &GrafeoFileManager,
1513 ) -> Result<Option<Arc<grafeo_core::graph::compact::CompactStore>>> {
1514 use grafeo_common::storage::{Section, SectionType};
1515 let Some(dir) = fm.read_section_directory()? else {
1516 return Ok(None);
1517 };
1518 let Some(entry) = dir.find(SectionType::CompactStore) else {
1519 return Ok(None);
1520 };
1521 let data = fm.read_section_data(entry)?;
1522 let mut section = grafeo_core::graph::compact::section::CompactStoreSection::empty();
1523 section.deserialize(&data)?;
1524 Ok(section.store())
1525 }
1526
1527 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "compact-store"))]
1533 fn extract_overlay_deletions(
1534 fm: &GrafeoFileManager,
1535 ) -> Result<
1536 Option<(
1537 Vec<grafeo_common::types::NodeId>,
1538 Vec<grafeo_common::types::EdgeId>,
1539 )>,
1540 > {
1541 use grafeo_common::storage::{Section, SectionType};
1542 let Some(dir) = fm.read_section_directory()? else {
1543 return Ok(None);
1544 };
1545 let Some(entry) = dir.find(SectionType::OverlayDeletions) else {
1546 return Ok(None);
1547 };
1548 let data = fm.read_section_data(entry)?;
1549 let mut section =
1550 grafeo_core::graph::compact::deletions_section::OverlayDeletionsSection::empty();
1551 section.deserialize(&data)?;
1552 Ok(Some(section.take()))
1553 }
1554
1555 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1559 fn load_from_sections(
1560 fm: &GrafeoFileManager,
1561 store: &Arc<LpgStore>,
1562 catalog: &Arc<crate::catalog::Catalog>,
1563 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1564 ) -> Result<()> {
1565 use grafeo_common::storage::{Section, SectionType};
1566
1567 let dir = fm.read_section_directory()?.ok_or_else(|| {
1568 grafeo_common::utils::error::Error::Internal(
1569 "expected v2 section directory but found none".to_string(),
1570 )
1571 })?;
1572
1573 if let Some(entry) = dir.find(SectionType::Catalog) {
1575 let data = fm.read_section_data(entry)?;
1576 let tm = Arc::new(crate::transaction::TransactionManager::new());
1577 let mut section = catalog_section::CatalogSection::new(
1578 Arc::clone(catalog),
1579 Arc::clone(store),
1580 move || tm.current_epoch().as_u64(),
1581 );
1582 section.deserialize(&data)?;
1583 }
1584
1585 if let Some(entry) = dir.find(SectionType::LpgStore) {
1589 let data = fm.read_section_data(entry)?;
1590 let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1591 section.deserialize(&data)?;
1592 }
1593
1594 #[cfg(feature = "triple-store")]
1596 if let Some(entry) = dir.find(SectionType::RdfStore) {
1597 let data = fm.read_section_data(entry)?;
1598 let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1599 section.deserialize(&data)?;
1600 }
1601
1602 #[cfg(feature = "ring-index")]
1604 if let Some(entry) = dir.find(SectionType::RdfRing) {
1605 let data = fm.read_section_data(entry)?;
1606 let mut section = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
1607 section.deserialize(&data)?;
1608 }
1609
1610 #[cfg(feature = "vector-index")]
1612 if let Some(entry) = dir.find(SectionType::VectorStore) {
1613 let data = fm.read_section_data(entry)?;
1614 let indexes = store.vector_index_entries();
1615 if !indexes.is_empty() {
1616 let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1617 section.deserialize(&data)?;
1618 }
1619 }
1620
1621 #[cfg(feature = "text-index")]
1623 if let Some(entry) = dir.find(SectionType::TextIndex) {
1624 let data = fm.read_section_data(entry)?;
1625 let indexes = store.text_index_entries();
1626 if !indexes.is_empty() {
1627 let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1628 section.deserialize(&data)?;
1629 }
1630 }
1631
1632 Ok(())
1633 }
1634
1635 #[must_use]
1663 pub fn session(&self) -> Session {
1664 self.create_session_inner(None)
1665 }
1666
1667 #[must_use]
1685 pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1686 let force_read_only = !identity.can_write();
1687 self.create_session_inner_full(None, force_read_only, identity)
1688 }
1689
1690 #[must_use]
1704 pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1705 self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1706 }
1707
1708 #[cfg(feature = "cdc")]
1727 #[must_use]
1728 pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1729 self.create_session_inner(Some(cdc_enabled))
1730 }
1731
1732 #[deprecated(
1741 since = "0.5.36",
1742 note = "use session_with_role(Role::ReadOnly) instead"
1743 )]
1744 #[must_use]
1745 pub fn session_read_only(&self) -> Session {
1746 self.session_with_role(crate::auth::Role::ReadOnly)
1747 }
1748
1749 #[allow(unused_variables)] fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1755 self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1756 }
1757
1758 #[allow(unused_variables)]
1760 fn create_session_inner_full(
1761 &self,
1762 cdc_override: Option<bool>,
1763 force_read_only: bool,
1764 identity: crate::auth::Identity,
1765 ) -> Session {
1766 let session_cfg = || crate::session::SessionConfig {
1767 transaction_manager: Arc::clone(&self.transaction_manager),
1768 query_cache: Arc::clone(&self.query_cache),
1769 catalog: Arc::clone(&self.catalog),
1770 adaptive_config: self.config.adaptive.clone(),
1771 factorized_execution: self.config.factorized_execution,
1772 graph_model: self.config.graph_model,
1773 query_timeout: self.config.query_timeout,
1774 max_property_size: self.config.max_property_size,
1775 buffer_manager: Some(Arc::clone(&self.buffer_manager)),
1776 commit_counter: Arc::clone(&self.commit_counter),
1777 gc_interval: self.config.gc_interval,
1778 read_only: self.read_only || force_read_only,
1779 identity: identity.clone(),
1780 #[cfg(feature = "lpg")]
1781 projections: Arc::clone(&self.projections),
1782 };
1783
1784 #[cfg(all(feature = "compact-store", feature = "lpg"))]
1787 if let Some(ref layered) = self.layered_store {
1788 let overlay = layered.overlay_store();
1789 let layered_arc = Arc::clone(layered);
1790 let mut session = Session::with_adaptive(overlay, session_cfg());
1791 session.override_stores(
1794 Arc::clone(&layered_arc) as Arc<dyn GraphStoreSearch>,
1795 Some(layered_arc as Arc<dyn GraphStoreMut>),
1796 );
1797 return session;
1798 }
1799
1800 if let Some(ref ext_read) = self.external_read_store {
1801 return Session::with_external_store(
1802 Arc::clone(ext_read),
1803 self.external_write_store.as_ref().map(Arc::clone),
1804 session_cfg(),
1805 )
1806 .expect("arena allocation for external store session");
1807 }
1808
1809 #[cfg(all(feature = "lpg", feature = "triple-store"))]
1810 let mut session = Session::with_rdf_store_and_adaptive(
1811 Arc::clone(self.lpg_store()),
1812 Arc::clone(&self.rdf_store),
1813 session_cfg(),
1814 );
1815 #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1816 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1817 #[cfg(not(feature = "lpg"))]
1818 let mut session =
1819 Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1820 .expect("session creation for non-lpg build");
1821
1822 #[cfg(all(feature = "wal", feature = "lpg"))]
1823 if let Some(ref wal) = self.wal {
1824 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1825 }
1826
1827 #[cfg(feature = "cdc")]
1828 {
1829 let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1830 if should_enable {
1831 session.set_cdc_log(Arc::clone(&self.cdc_log));
1832 }
1833 }
1834
1835 #[cfg(feature = "metrics")]
1836 {
1837 if let Some(ref m) = self.metrics {
1838 session.set_metrics(Arc::clone(m));
1839 m.session_created
1840 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1841 m.session_active
1842 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1843 }
1844 }
1845
1846 if let Some(ref graph) = *self.current_graph.read() {
1848 session.use_graph(graph);
1849 }
1850
1851 if let Some(ref schema) = *self.current_schema.read() {
1853 session.set_schema(schema);
1854 }
1855
1856 let _ = &mut session;
1858
1859 session
1860 }
1861
1862 #[must_use]
1868 pub fn current_graph(&self) -> Option<String> {
1869 self.current_graph.read().clone()
1870 }
1871
1872 pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1881 #[cfg(feature = "lpg")]
1882 if let Some(name) = name
1883 && !name.eq_ignore_ascii_case("default")
1884 && let Some(store) = &self.store
1885 && store.graph(name).is_none()
1886 {
1887 return Err(Error::Query(QueryError::new(
1888 QueryErrorKind::Semantic,
1889 format!("Graph '{name}' does not exist"),
1890 )));
1891 }
1892 *self.current_graph.write() = name.map(ToString::to_string);
1893 Ok(())
1894 }
1895
1896 #[must_use]
1901 pub fn current_schema(&self) -> Option<String> {
1902 self.current_schema.read().clone()
1903 }
1904
1905 pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1914 if let Some(name) = name
1915 && !self.catalog.schema_exists(name)
1916 {
1917 return Err(Error::Query(QueryError::new(
1918 QueryErrorKind::Semantic,
1919 format!("Schema '{name}' does not exist"),
1920 )));
1921 }
1922 *self.current_schema.write() = name.map(ToString::to_string);
1923 Ok(())
1924 }
1925
1926 #[must_use]
1928 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1929 &self.config.adaptive
1930 }
1931
1932 #[must_use]
1934 pub fn is_read_only(&self) -> bool {
1935 self.read_only
1936 }
1937
1938 #[must_use]
1940 pub fn config(&self) -> &Config {
1941 &self.config
1942 }
1943
1944 #[must_use]
1946 pub fn graph_model(&self) -> crate::config::GraphModel {
1947 self.config.graph_model
1948 }
1949
1950 #[must_use]
1952 pub fn memory_limit(&self) -> Option<usize> {
1953 self.config.memory_limit
1954 }
1955
1956 #[cfg(feature = "metrics")]
1961 #[must_use]
1962 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1963 let mut snapshot = self
1964 .metrics
1965 .as_ref()
1966 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1967
1968 let cache_stats = self.query_cache.stats();
1970 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1971 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1972 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1973 snapshot.cache_invalidations = cache_stats.invalidations;
1974
1975 snapshot
1976 }
1977
1978 #[cfg(feature = "metrics")]
1982 #[must_use]
1983 pub fn metrics_prometheus(&self) -> String {
1984 self.metrics
1985 .as_ref()
1986 .map_or_else(String::new, |m| m.to_prometheus())
1987 }
1988
1989 #[cfg(feature = "metrics")]
1991 pub fn reset_metrics(&self) {
1992 if let Some(ref m) = self.metrics {
1993 m.reset();
1994 }
1995 self.query_cache.reset_stats();
1996 }
1997
1998 #[cfg(feature = "lpg")]
2006 #[must_use]
2007 pub fn store(&self) -> &Arc<LpgStore> {
2008 self.lpg_store()
2009 }
2010
2011 #[cfg(feature = "lpg")]
2019 pub fn create_graph(&self, name: &str) -> Result<bool> {
2020 Ok(self.lpg_store().create_graph(name)?)
2021 }
2022
2023 #[cfg(feature = "lpg")]
2028 pub fn drop_graph(&self, name: &str) -> bool {
2029 let Some(store) = &self.store else {
2030 return false;
2031 };
2032 let dropped = store.drop_graph(name);
2033 if dropped {
2034 let mut current = self.current_graph.write();
2035 if current
2036 .as_deref()
2037 .is_some_and(|g| g.eq_ignore_ascii_case(name))
2038 {
2039 *current = None;
2040 }
2041 }
2042 dropped
2043 }
2044
2045 #[cfg(feature = "lpg")]
2047 #[must_use]
2048 pub fn list_graphs(&self) -> Vec<String> {
2049 self.lpg_store().graph_names()
2050 }
2051
2052 pub fn create_projection(
2073 &self,
2074 name: impl Into<String>,
2075 spec: grafeo_core::graph::ProjectionSpec,
2076 ) -> bool {
2077 use grafeo_core::graph::GraphProjection;
2078 use std::collections::hash_map::Entry;
2079
2080 let store = self.graph_store();
2081 let projection = Arc::new(GraphProjection::new(store, spec));
2082 let mut projections = self.projections.write();
2083 match projections.entry(name.into()) {
2084 Entry::Occupied(_) => false,
2085 Entry::Vacant(e) => {
2086 e.insert(projection);
2087 true
2088 }
2089 }
2090 }
2091
2092 pub fn drop_projection(&self, name: &str) -> bool {
2094 self.projections.write().remove(name).is_some()
2095 }
2096
2097 #[must_use]
2099 pub fn list_projections(&self) -> Vec<String> {
2100 self.projections.read().keys().cloned().collect()
2101 }
2102
2103 #[must_use]
2105 pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStoreSearch>> {
2106 self.projections
2107 .read()
2108 .get(name)
2109 .map(|p| Arc::clone(p) as Arc<dyn GraphStoreSearch>)
2110 }
2111
2112 #[must_use]
2120 pub fn graph_store(&self) -> Arc<dyn GraphStoreSearch> {
2121 if let Some(ref ext_read) = self.external_read_store {
2122 Arc::clone(ext_read)
2123 } else {
2124 #[cfg(feature = "lpg")]
2125 {
2126 Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreSearch>
2127 }
2128 #[cfg(not(feature = "lpg"))]
2129 unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
2130 }
2131 }
2132
2133 #[must_use]
2138 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
2139 if self.external_read_store.is_some() {
2140 self.external_write_store.as_ref().map(Arc::clone)
2141 } else {
2142 #[cfg(feature = "lpg")]
2143 {
2144 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
2145 }
2146 #[cfg(not(feature = "lpg"))]
2147 {
2148 None
2149 }
2150 }
2151 }
2152
2153 pub fn gc(&self) {
2160 #[cfg(feature = "lpg")]
2161 {
2162 let min_epoch = self.transaction_manager.min_active_epoch();
2163 self.lpg_store().gc_versions(min_epoch);
2164 }
2165 #[cfg(all(feature = "lpg", feature = "cdc"))]
2166 let current_epoch = self.transaction_manager.current_epoch();
2167 self.transaction_manager.gc();
2168
2169 #[cfg(feature = "cdc")]
2171 if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
2172 #[cfg(feature = "lpg")]
2173 self.cdc_log.apply_retention(current_epoch);
2174 }
2175 }
2176
2177 #[must_use]
2179 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
2180 &self.buffer_manager
2181 }
2182
2183 #[cfg(all(feature = "compact-store", feature = "lpg"))]
2186 #[must_use]
2187 pub fn layered_store(
2188 &self,
2189 ) -> Option<&Arc<grafeo_core::graph::compact::layered::LayeredStore>> {
2190 self.layered_store.as_ref()
2191 }
2192
2193 #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
2196 #[must_use]
2197 pub fn compact_tiered(&self) -> Option<&Arc<compact_tiered::CompactStoreTiered>> {
2198 self.compact_tiered.as_ref()
2199 }
2200
2201 #[must_use]
2203 pub fn query_cache(&self) -> &Arc<QueryCache> {
2204 &self.query_cache
2205 }
2206
2207 pub fn clear_plan_cache(&self) {
2213 self.query_cache.clear();
2214 }
2215
2216 pub fn close(&self) -> Result<()> {
2230 let mut is_open = self.is_open.write();
2231 if !*is_open {
2232 return Ok(());
2233 }
2234
2235 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
2240 if let Some(mut timer) = self.checkpoint_timer.lock().take() {
2241 timer.stop();
2242 }
2243
2244 if self.read_only {
2246 #[cfg(feature = "grafeo-file")]
2247 if let Some(ref fm) = self.file_manager {
2248 fm.close()?;
2249 }
2250 *is_open = false;
2251 return Ok(());
2252 }
2253
2254 #[cfg(feature = "grafeo-file")]
2258 let is_single_file = self.file_manager.is_some();
2259 #[cfg(not(feature = "grafeo-file"))]
2260 let is_single_file = false;
2261
2262 #[cfg(feature = "grafeo-file")]
2263 if let Some(ref fm) = self.file_manager {
2264 #[cfg(feature = "wal")]
2266 if let Some(ref wal) = self.wal {
2267 wal.sync()?;
2268 }
2269 let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2270
2271 #[cfg(feature = "wal")]
2277 let flush_result = if flush_result.sections_written == 0 {
2278 if let Some(ref wal) = self.wal {
2279 if wal.record_count() > 0 {
2280 grafeo_warn!(
2281 "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
2282 wal.record_count()
2283 );
2284 self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
2285 } else {
2286 flush_result
2287 }
2288 } else {
2289 flush_result
2290 }
2291 } else {
2292 flush_result
2293 };
2294
2295 #[cfg(feature = "wal")]
2298 if let Some(ref wal) = self.wal {
2299 wal.close_active_log();
2300 }
2301
2302 #[cfg(feature = "wal")]
2306 let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
2307 #[cfg(not(feature = "wal"))]
2308 let has_wal_records = false;
2309
2310 if flush_result.sections_written > 0 || !has_wal_records {
2311 {
2312 use grafeo_common::testing::crash::maybe_crash;
2313 maybe_crash("close:before_remove_sidecar_wal");
2314 }
2315 fm.remove_sidecar_wal()?;
2316 } else {
2317 grafeo_warn!(
2318 "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
2319 );
2320 }
2321 fm.close()?;
2322 }
2323
2324 #[cfg(feature = "wal")]
2330 if !is_single_file && let Some(ref wal) = self.wal {
2331 let commit_tx = self
2333 .transaction_manager
2334 .last_assigned_transaction_id()
2335 .unwrap_or_else(|| self.transaction_manager.begin());
2336
2337 wal.log(&WalRecord::TransactionCommit {
2339 transaction_id: commit_tx,
2340 })?;
2341
2342 wal.sync()?;
2343 }
2344
2345 *is_open = false;
2346 Ok(())
2347 }
2348
2349 #[cfg(feature = "wal")]
2351 #[must_use]
2352 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
2353 self.wal.as_ref()
2354 }
2355
2356 #[cfg(feature = "wal")]
2358 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
2359 if let Some(ref wal) = self.wal {
2360 wal.log(record)?;
2361 }
2362 Ok(())
2363 }
2364
2365 fn register_section_consumers(&mut self) {
2370 #[cfg(feature = "lpg")]
2372 let store_ref = self.store.as_ref();
2373 #[cfg(feature = "lpg")]
2374 if let Some(store) = store_ref {
2375 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2376 self.buffer_manager.register_consumer(Arc::new(
2377 section_consumer::SectionConsumer::new(Arc::new(lpg)),
2378 ));
2379 }
2380
2381 #[cfg(feature = "triple-store")]
2383 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2384 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2385 self.buffer_manager.register_consumer(Arc::new(
2386 section_consumer::SectionConsumer::new(Arc::new(rdf)),
2387 ));
2388 }
2389
2390 #[cfg(feature = "ring-index")]
2392 if self.rdf_store.ring().is_some() {
2393 let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2394 let consumer = match self.buffer_manager.config().spill_path.clone() {
2400 Some(path) => section_consumer::SectionConsumer::with_spill(Arc::new(ring), path),
2401 None => section_consumer::SectionConsumer::new(Arc::new(ring)),
2402 };
2403 self.buffer_manager.register_consumer(Arc::new(consumer));
2404 }
2405
2406 #[cfg(all(
2409 feature = "lpg",
2410 feature = "vector-index",
2411 feature = "mmap",
2412 not(feature = "temporal")
2413 ))]
2414 if let Some(store) = store_ref {
2415 let spill_path = self.buffer_manager.config().spill_path.clone();
2416 let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2417 store, spill_path,
2418 ));
2419 self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2421 self.buffer_manager.register_consumer(consumer);
2422 }
2423
2424 #[cfg(all(feature = "lpg", feature = "text-index"))]
2426 if let Some(store) = store_ref {
2427 self.buffer_manager
2428 .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2429 }
2430
2431 #[cfg(feature = "cdc")]
2434 self.buffer_manager.register_consumer(
2435 Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2436 );
2437 }
2438
2439 fn apply_force_disk_overrides(&self) {
2453 use grafeo_common::storage::TierOverride;
2454
2455 for (section_type, mem_config) in &self.config.section_configs {
2456 let consumer_name = format!("section:{section_type:?}");
2457 match mem_config.tier {
2458 TierOverride::ForceDisk => {
2459 #[cfg(feature = "tracing")]
2460 tracing::info!(
2461 target: "grafeo::tier",
2462 section = ?section_type,
2463 tier = "ForceDisk",
2464 "applying tier override at db open"
2465 );
2466 self.buffer_manager.spill_consumer_by_name(&consumer_name);
2467 }
2468 TierOverride::ForceRam => {
2469 #[cfg(feature = "tracing")]
2470 tracing::info!(
2471 target: "grafeo::tier",
2472 section = ?section_type,
2473 tier = "ForceRam",
2474 "pinning consumer to RAM"
2475 );
2476 self.buffer_manager.mark_force_ram(&consumer_name);
2477 }
2478 TierOverride::Auto => {}
2479 _ => {}
2480 }
2481 }
2482 }
2483
2484 pub fn reload_eligible(&self, target_fraction: f64) -> usize {
2500 self.buffer_manager.reload_eligible(target_fraction)
2501 }
2502
2503 #[must_use]
2516 pub fn storage_tiers(
2517 &self,
2518 ) -> hashbrown::HashMap<
2519 grafeo_common::storage::SectionType,
2520 grafeo_common::memory::buffer::StorageTier,
2521 > {
2522 use grafeo_common::storage::SectionType;
2523 let snapshot = self.buffer_manager.snapshot_consumer_tiers();
2524 let mut out = hashbrown::HashMap::new();
2525 for (name, tier) in snapshot {
2526 let Some(suffix) = name.strip_prefix("section:") else {
2527 continue;
2528 };
2529 let section_type = match suffix {
2530 "LpgStore" => SectionType::LpgStore,
2531 "RdfStore" => SectionType::RdfStore,
2532 "CompactStore" => SectionType::CompactStore,
2533 "VectorStore" => SectionType::VectorStore,
2534 "TextIndex" => SectionType::TextIndex,
2535 "RdfRing" => SectionType::RdfRing,
2536 "PropertyIndex" => SectionType::PropertyIndex,
2537 "Catalog" => SectionType::Catalog,
2538 _ => continue,
2539 };
2540 out.insert(section_type, tier);
2541 }
2542 out
2543 }
2544
2545 #[cfg(all(
2552 feature = "lpg",
2553 feature = "vector-index",
2554 feature = "mmap",
2555 not(feature = "temporal")
2556 ))]
2557 fn restore_spill_files(&mut self) {
2558 use grafeo_core::index::vector::MmapStorage;
2559
2560 let spill_dir = match self.buffer_manager.config().spill_path {
2561 Some(ref path) => path.clone(),
2562 None => return,
2563 };
2564
2565 if !spill_dir.exists() {
2566 return;
2567 }
2568
2569 let spill_map = match self.vector_spill_storages {
2570 Some(ref map) => Arc::clone(map),
2571 None => return,
2572 };
2573
2574 let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2575 return;
2576 };
2577
2578 let Some(ref store) = self.store else {
2579 return;
2580 };
2581
2582 for entry in entries.flatten() {
2583 let path = entry.path();
2584 let file_name = match path.file_name().and_then(|n| n.to_str()) {
2585 Some(name) => name.to_string(),
2586 None => continue,
2587 };
2588
2589 if !file_name.starts_with("vectors_")
2591 || !std::path::Path::new(&file_name)
2592 .extension()
2593 .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2594 {
2595 continue;
2596 }
2597
2598 let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2600
2601 let key = key_part.replace("%3A", ":").replace("%25", "%");
2603
2604 if !key.contains(':') {
2606 continue;
2608 }
2609
2610 if store.get_vector_index_by_key(&key).is_none() {
2612 let _ = std::fs::remove_file(&path);
2614 continue;
2615 }
2616
2617 match MmapStorage::open(&path) {
2619 Ok(mmap_storage) => {
2620 let property = key.split(':').nth(1).unwrap_or("");
2622 let prop_key = grafeo_common::types::PropertyKey::new(property);
2623 store.node_properties_mark_spilled(&prop_key);
2624
2625 spill_map.write().insert(key, Arc::new(mmap_storage));
2626 }
2627 Err(e) => {
2628 eprintln!("failed to restore spill file {}: {e}", path.display());
2629 let _ = std::fs::remove_file(&path);
2631 }
2632 }
2633 }
2634 }
2635
2636 #[cfg(feature = "grafeo-file")]
2638 fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2639 let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2640
2641 #[cfg(all(feature = "compact-store", feature = "lpg"))]
2643 if let Some(ref layered) = self.layered_store {
2644 let compact_section = grafeo_core::graph::compact::section::CompactStoreSection::new(
2646 layered.base_store_arc(),
2647 );
2648 sections.push(Box::new(compact_section));
2649
2650 let overlay = layered.overlay_store();
2652 let overlay_section = grafeo_core::graph::lpg::LpgStoreSection::new(overlay);
2653 sections.push(Box::new(overlay_section));
2654
2655 let deletions = grafeo_core::graph::compact::deletions_section::OverlayDeletionsSection::from_layered(
2661 Arc::clone(layered),
2662 );
2663 if !deletions.is_empty() {
2664 sections.push(Box::new(deletions));
2665 } else {
2666 layered.mark_deletions_clean();
2671 }
2672
2673 return sections;
2674 }
2675
2676 #[cfg(feature = "lpg")]
2678 if let Some(store) = self.store.as_ref() {
2679 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2680
2681 let catalog = catalog_section::CatalogSection::new(
2682 Arc::clone(&self.catalog),
2683 Arc::clone(store),
2684 {
2685 let tm = Arc::clone(&self.transaction_manager);
2686 move || tm.current_epoch().as_u64()
2687 },
2688 );
2689
2690 sections.push(Box::new(catalog));
2691 sections.push(Box::new(lpg));
2692
2693 #[cfg(feature = "vector-index")]
2695 {
2696 let indexes = store.vector_index_entries();
2697 if !indexes.is_empty() {
2698 let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2699 sections.push(Box::new(vector));
2700 }
2701 }
2702
2703 #[cfg(feature = "text-index")]
2705 {
2706 let indexes = store.text_index_entries();
2707 if !indexes.is_empty() {
2708 let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2709 sections.push(Box::new(text));
2710 }
2711 }
2712 }
2713
2714 #[cfg(feature = "triple-store")]
2715 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2716 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2717 sections.push(Box::new(rdf));
2718 }
2719
2720 #[cfg(feature = "ring-index")]
2721 if self.rdf_store.ring().is_some() {
2722 let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2723 sections.push(Box::new(ring));
2724 }
2725
2726 sections
2727 }
2728
2729 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2743 pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2744 let fm = self
2745 .file_manager
2746 .as_ref()
2747 .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2748
2749 if !self.read_only {
2753 let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2754 }
2755
2756 let current_epoch = self.transaction_manager.current_epoch();
2757 backup::do_backup_full(backup_dir, fm, self.wal.as_deref(), current_epoch)
2758 }
2759
2760 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2768 pub fn backup_incremental(
2769 &self,
2770 backup_dir: &std::path::Path,
2771 ) -> Result<backup::BackupSegment> {
2772 let wal = self
2773 .wal
2774 .as_ref()
2775 .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2776
2777 let current_epoch = self.transaction_manager.current_epoch();
2778 backup::do_backup_incremental(backup_dir, wal, current_epoch)
2779 }
2780
2781 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2787 pub fn read_backup_manifest(
2788 backup_dir: &std::path::Path,
2789 ) -> Result<Option<backup::BackupManifest>> {
2790 backup::read_manifest(backup_dir)
2791 }
2792
2793 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2795 #[must_use]
2796 pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2797 self.wal
2798 .as_ref()
2799 .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2800 }
2801
2802 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2813 pub fn restore_to_epoch(
2814 backup_dir: &std::path::Path,
2815 target_epoch: grafeo_common::types::EpochId,
2816 output_path: &std::path::Path,
2817 ) -> Result<()> {
2818 backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2819 }
2820
2821 #[cfg(feature = "grafeo-file")]
2827 fn checkpoint_to_file(
2828 &self,
2829 fm: &GrafeoFileManager,
2830 reason: flush::FlushReason,
2831 ) -> Result<flush::FlushResult> {
2832 let sections = self.build_sections();
2833 let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2834 sections.iter().map(|s| s.as_ref()).collect();
2835 #[cfg(feature = "lpg")]
2836 let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2837 #[cfg(not(feature = "lpg"))]
2838 let context = flush::build_context_minimal(&self.transaction_manager);
2839
2840 flush::flush(
2841 fm,
2842 §ion_refs,
2843 &context,
2844 reason,
2845 #[cfg(feature = "wal")]
2846 self.wal.as_deref(),
2847 )
2848 }
2849
2850 #[cfg(feature = "grafeo-file")]
2852 #[must_use]
2853 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2854 self.file_manager.as_ref()
2855 }
2856}
2857
2858impl Drop for GrafeoDB {
2859 fn drop(&mut self) {
2860 if let Err(e) = self.close() {
2861 grafeo_error!("Error closing database: {}", e);
2862 }
2863 }
2864}
2865
2866#[cfg(feature = "lpg")]
2867impl crate::admin::AdminService for GrafeoDB {
2868 fn info(&self) -> crate::admin::DatabaseInfo {
2869 self.info()
2870 }
2871
2872 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2873 self.detailed_stats()
2874 }
2875
2876 fn schema(&self) -> crate::admin::SchemaInfo {
2877 self.schema()
2878 }
2879
2880 fn validate(&self) -> crate::admin::ValidationResult {
2881 self.validate()
2882 }
2883
2884 fn wal_status(&self) -> crate::admin::WalStatus {
2885 self.wal_status()
2886 }
2887
2888 fn wal_checkpoint(&self) -> Result<()> {
2889 self.wal_checkpoint()
2890 }
2891}
2892
2893#[derive(Debug)]
2923pub struct QueryResult {
2924 pub columns: Vec<String>,
2926 pub column_types: Vec<grafeo_common::types::LogicalType>,
2928 pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2933 pub execution_time_ms: Option<f64>,
2935 pub rows_scanned: Option<u64>,
2937 pub status_message: Option<String>,
2939 pub gql_status: grafeo_common::utils::GqlStatus,
2941}
2942
2943impl QueryResult {
2944 #[must_use]
2946 pub fn empty() -> Self {
2947 Self {
2948 columns: Vec::new(),
2949 column_types: Vec::new(),
2950 rows: Vec::new(),
2951 execution_time_ms: None,
2952 rows_scanned: None,
2953 status_message: None,
2954 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2955 }
2956 }
2957
2958 #[must_use]
2960 pub fn status(msg: impl Into<String>) -> Self {
2961 Self {
2962 columns: Vec::new(),
2963 column_types: Vec::new(),
2964 rows: Vec::new(),
2965 execution_time_ms: None,
2966 rows_scanned: None,
2967 status_message: Some(msg.into()),
2968 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2969 }
2970 }
2971
2972 #[must_use]
2974 pub fn new(columns: Vec<String>) -> Self {
2975 let len = columns.len();
2976 Self {
2977 columns,
2978 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2979 rows: Vec::new(),
2980 execution_time_ms: None,
2981 rows_scanned: None,
2982 status_message: None,
2983 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2984 }
2985 }
2986
2987 #[must_use]
2989 pub fn with_types(
2990 columns: Vec<String>,
2991 column_types: Vec<grafeo_common::types::LogicalType>,
2992 ) -> Self {
2993 Self {
2994 columns,
2995 column_types,
2996 rows: Vec::new(),
2997 execution_time_ms: None,
2998 rows_scanned: None,
2999 status_message: None,
3000 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3001 }
3002 }
3003
3004 #[must_use]
3006 pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
3007 let len = columns.len();
3008 Self {
3009 columns,
3010 column_types: vec![grafeo_common::types::LogicalType::Any; len],
3011 rows,
3012 execution_time_ms: None,
3013 rows_scanned: None,
3014 status_message: None,
3015 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3016 }
3017 }
3018
3019 pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
3021 self.rows.push(row);
3022 }
3023
3024 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
3026 self.execution_time_ms = Some(execution_time_ms);
3027 self.rows_scanned = Some(rows_scanned);
3028 self
3029 }
3030
3031 #[must_use]
3033 pub fn execution_time_ms(&self) -> Option<f64> {
3034 self.execution_time_ms
3035 }
3036
3037 #[must_use]
3039 pub fn rows_scanned(&self) -> Option<u64> {
3040 self.rows_scanned
3041 }
3042
3043 #[must_use]
3045 pub fn row_count(&self) -> usize {
3046 self.rows.len()
3047 }
3048
3049 #[must_use]
3051 pub fn column_count(&self) -> usize {
3052 self.columns.len()
3053 }
3054
3055 #[must_use]
3057 pub fn is_empty(&self) -> bool {
3058 self.rows.is_empty()
3059 }
3060
3061 pub fn scalar<T: FromValue>(&self) -> Result<T> {
3070 if self.rows.len() != 1 || self.columns.len() != 1 {
3071 return Err(grafeo_common::utils::error::Error::InvalidValue(
3072 "Expected single value".to_string(),
3073 ));
3074 }
3075 T::from_value(&self.rows[0][0])
3076 }
3077
3078 #[must_use]
3080 pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
3081 &self.rows
3082 }
3083
3084 #[must_use]
3086 pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
3087 self.rows
3088 }
3089
3090 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
3092 self.rows.iter()
3093 }
3094
3095 #[cfg(feature = "arrow-export")]
3110 pub fn to_record_batch(
3111 &self,
3112 ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
3113 arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
3114 }
3115
3116 #[cfg(feature = "arrow-export")]
3127 pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
3128 let batch = self.to_record_batch()?;
3129 arrow::record_batch_to_ipc_stream(&batch)
3130 }
3131}
3132
3133impl std::fmt::Display for QueryResult {
3134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3135 let table = grafeo_common::fmt::format_result_table(
3136 &self.columns,
3137 &self.rows,
3138 self.execution_time_ms,
3139 self.status_message.as_deref(),
3140 );
3141 f.write_str(&table)
3142 }
3143}
3144
3145pub trait FromValue: Sized {
3150 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
3156}
3157
3158impl FromValue for i64 {
3159 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
3160 value
3161 .as_int64()
3162 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
3163 expected: "INT64".to_string(),
3164 found: value.type_name().to_string(),
3165 })
3166 }
3167}
3168
3169impl FromValue for f64 {
3170 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
3171 value
3172 .as_float64()
3173 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
3174 expected: "FLOAT64".to_string(),
3175 found: value.type_name().to_string(),
3176 })
3177 }
3178}
3179
3180impl FromValue for String {
3181 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
3182 value.as_str().map(String::from).ok_or_else(|| {
3183 grafeo_common::utils::error::Error::TypeMismatch {
3184 expected: "STRING".to_string(),
3185 found: value.type_name().to_string(),
3186 }
3187 })
3188 }
3189}
3190
3191impl FromValue for bool {
3192 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
3193 value
3194 .as_bool()
3195 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
3196 expected: "BOOL".to_string(),
3197 found: value.type_name().to_string(),
3198 })
3199 }
3200}
3201
3202#[cfg(test)]
3203mod tests {
3204 use super::*;
3205
3206 #[test]
3207 fn test_create_in_memory_database() {
3208 let db = GrafeoDB::new_in_memory();
3209 assert_eq!(db.node_count(), 0);
3210 assert_eq!(db.edge_count(), 0);
3211 }
3212
3213 #[test]
3214 fn test_database_config() {
3215 let config = Config::in_memory().with_threads(4).with_query_logging();
3216
3217 let db = GrafeoDB::with_config(config).unwrap();
3218 assert_eq!(db.config().threads, 4);
3219 assert!(db.config().query_logging);
3220 }
3221
3222 #[test]
3223 fn test_database_session() {
3224 let db = GrafeoDB::new_in_memory();
3225 let _session = db.session();
3226 }
3228
3229 #[cfg(feature = "wal")]
3230 #[test]
3231 fn test_persistent_database_recovery() {
3232 use grafeo_common::types::Value;
3233 use tempfile::tempdir;
3234
3235 let dir = tempdir().unwrap();
3236 let db_path = dir.path().join("test_db");
3237
3238 {
3240 let db = GrafeoDB::open(&db_path).unwrap();
3241
3242 let alix = db.create_node(&["Person"]);
3243 db.set_node_property(alix, "name", Value::from("Alix"));
3244
3245 let gus = db.create_node(&["Person"]);
3246 db.set_node_property(gus, "name", Value::from("Gus"));
3247
3248 let _edge = db.create_edge(alix, gus, "KNOWS");
3249
3250 db.close().unwrap();
3252 }
3253
3254 {
3256 let db = GrafeoDB::open(&db_path).unwrap();
3257
3258 assert_eq!(db.node_count(), 2);
3259 assert_eq!(db.edge_count(), 1);
3260
3261 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
3263 assert!(node0.is_some());
3264
3265 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
3266 assert!(node1.is_some());
3267 }
3268 }
3269
3270 #[cfg(feature = "wal")]
3271 #[test]
3272 fn test_wal_logging() {
3273 use tempfile::tempdir;
3274
3275 let dir = tempdir().unwrap();
3276 let db_path = dir.path().join("wal_test_db");
3277
3278 let db = GrafeoDB::open(&db_path).unwrap();
3279
3280 let node = db.create_node(&["Test"]);
3282 db.delete_node(node);
3283
3284 if let Some(wal) = db.wal() {
3286 assert!(wal.record_count() > 0);
3287 }
3288
3289 db.close().unwrap();
3290 }
3291
3292 #[cfg(feature = "wal")]
3293 #[test]
3294 fn test_wal_recovery_multiple_sessions() {
3295 use grafeo_common::types::Value;
3297 use tempfile::tempdir;
3298
3299 let dir = tempdir().unwrap();
3300 let db_path = dir.path().join("multi_session_db");
3301
3302 {
3304 let db = GrafeoDB::open(&db_path).unwrap();
3305 let alix = db.create_node(&["Person"]);
3306 db.set_node_property(alix, "name", Value::from("Alix"));
3307 db.close().unwrap();
3308 }
3309
3310 {
3312 let db = GrafeoDB::open(&db_path).unwrap();
3313 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
3315 db.set_node_property(gus, "name", Value::from("Gus"));
3316 db.close().unwrap();
3317 }
3318
3319 {
3321 let db = GrafeoDB::open(&db_path).unwrap();
3322 assert_eq!(db.node_count(), 2);
3323
3324 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
3326 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
3327
3328 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
3329 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
3330 }
3331 }
3332
3333 #[cfg(feature = "wal")]
3334 #[test]
3335 fn test_database_consistency_after_mutations() {
3336 use grafeo_common::types::Value;
3338 use tempfile::tempdir;
3339
3340 let dir = tempdir().unwrap();
3341 let db_path = dir.path().join("consistency_db");
3342
3343 {
3344 let db = GrafeoDB::open(&db_path).unwrap();
3345
3346 let a = db.create_node(&["Node"]);
3348 let b = db.create_node(&["Node"]);
3349 let c = db.create_node(&["Node"]);
3350
3351 let e1 = db.create_edge(a, b, "LINKS");
3353 let _e2 = db.create_edge(b, c, "LINKS");
3354
3355 db.delete_edge(e1);
3357 db.delete_node(b);
3358
3359 db.set_node_property(a, "value", Value::Int64(1));
3361 db.set_node_property(c, "value", Value::Int64(3));
3362
3363 db.close().unwrap();
3364 }
3365
3366 {
3368 let db = GrafeoDB::open(&db_path).unwrap();
3369
3370 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
3374 assert!(node_a.is_some());
3375
3376 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
3377 assert!(node_c.is_some());
3378
3379 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
3381 assert!(node_b.is_none());
3382 }
3383 }
3384
3385 #[cfg(feature = "wal")]
3386 #[test]
3387 fn test_close_is_idempotent() {
3388 use tempfile::tempdir;
3390
3391 let dir = tempdir().unwrap();
3392 let db_path = dir.path().join("close_test_db");
3393
3394 let db = GrafeoDB::open(&db_path).unwrap();
3395 db.create_node(&["Test"]);
3396
3397 assert!(db.close().is_ok());
3399
3400 assert!(db.close().is_ok());
3402 }
3403
3404 #[test]
3405 fn test_with_store_external_backend() {
3406 use grafeo_core::graph::lpg::LpgStore;
3407
3408 let external = Arc::new(LpgStore::new().unwrap());
3409
3410 let n1 = external.create_node(&["Person"]);
3412 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
3413
3414 let db = GrafeoDB::with_store(
3415 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
3416 Config::in_memory(),
3417 )
3418 .unwrap();
3419
3420 let session = db.session();
3421
3422 #[cfg(feature = "gql")]
3424 {
3425 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
3426 assert_eq!(result.rows.len(), 1);
3427 }
3428 }
3429
3430 #[test]
3431 fn test_with_config_custom_memory_limit() {
3432 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
3435 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
3436 assert_eq!(db.node_count(), 0);
3437 }
3438
3439 #[cfg(feature = "metrics")]
3440 #[test]
3441 fn test_database_metrics_registry() {
3442 let db = GrafeoDB::new_in_memory();
3443
3444 db.create_node(&["Person"]);
3446 db.create_node(&["Person"]);
3447
3448 let snap = db.metrics();
3450 assert_eq!(snap.query_count, 0); }
3453
3454 #[test]
3455 fn test_query_result_has_metrics() {
3456 let db = GrafeoDB::new_in_memory();
3458 db.create_node(&["Person"]);
3459 db.create_node(&["Person"]);
3460
3461 #[cfg(feature = "gql")]
3462 {
3463 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3464
3465 assert!(result.execution_time_ms.is_some());
3467 assert!(result.rows_scanned.is_some());
3468 assert!(result.execution_time_ms.unwrap() >= 0.0);
3469 assert_eq!(result.rows_scanned.unwrap(), 2);
3470 }
3471 }
3472
3473 #[test]
3474 fn test_empty_query_result_metrics() {
3475 let db = GrafeoDB::new_in_memory();
3477 db.create_node(&["Person"]);
3478
3479 #[cfg(feature = "gql")]
3480 {
3481 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3483
3484 assert!(result.execution_time_ms.is_some());
3485 assert!(result.rows_scanned.is_some());
3486 assert_eq!(result.rows_scanned.unwrap(), 0);
3487 }
3488 }
3489
3490 #[cfg(feature = "cdc")]
3491 mod cdc_integration {
3492 use super::*;
3493
3494 fn cdc_db() -> GrafeoDB {
3496 GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
3497 }
3498
3499 #[test]
3500 fn test_node_lifecycle_history() {
3501 let db = cdc_db();
3502
3503 let id = db.create_node(&["Person"]);
3505 db.set_node_property(id, "name", "Alix".into());
3507 db.set_node_property(id, "name", "Gus".into());
3508 db.delete_node(id);
3510
3511 let history = db.history(id).unwrap();
3512 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3514 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3515 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3517 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3519 }
3520
3521 #[test]
3522 fn test_edge_lifecycle_history() {
3523 let db = cdc_db();
3524
3525 let alix = db.create_node(&["Person"]);
3526 let gus = db.create_node(&["Person"]);
3527 let edge = db.create_edge(alix, gus, "KNOWS");
3528 db.set_edge_property(edge, "since", 2024i64.into());
3529 db.delete_edge(edge);
3530
3531 let history = db.history(edge).unwrap();
3532 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3534 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3535 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3536 }
3537
3538 #[test]
3539 fn test_create_node_with_props_cdc() {
3540 let db = cdc_db();
3541
3542 let id = db.create_node_with_props(
3543 &["Person"],
3544 vec![
3545 ("name", grafeo_common::types::Value::from("Alix")),
3546 ("age", grafeo_common::types::Value::from(30i64)),
3547 ],
3548 );
3549
3550 let history = db.history(id).unwrap();
3551 assert_eq!(history.len(), 1);
3552 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3553 let after = history[0].after.as_ref().unwrap();
3555 assert_eq!(after.len(), 2);
3556 }
3557
3558 #[test]
3559 fn test_changes_between() {
3560 let db = cdc_db();
3561
3562 let id1 = db.create_node(&["A"]);
3563 let _id2 = db.create_node(&["B"]);
3564 db.set_node_property(id1, "x", 1i64.into());
3565
3566 let changes = db
3568 .changes_between(
3569 grafeo_common::types::EpochId(0),
3570 grafeo_common::types::EpochId(u64::MAX),
3571 )
3572 .unwrap();
3573 assert_eq!(changes.len(), 3); }
3575
3576 #[test]
3577 fn test_cdc_disabled_by_default() {
3578 let db = GrafeoDB::new_in_memory();
3579 assert!(!db.is_cdc_enabled());
3580
3581 let id = db.create_node(&["Person"]);
3582 db.set_node_property(id, "name", "Alix".into());
3583
3584 let history = db.history(id).unwrap();
3585 assert!(history.is_empty(), "CDC off by default: no events recorded");
3586 }
3587
3588 #[test]
3589 fn test_session_with_cdc_override_on() {
3590 let db = GrafeoDB::new_in_memory();
3592 let session = db.session_with_cdc(true);
3593 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3594 let changes = db
3596 .changes_between(
3597 grafeo_common::types::EpochId(0),
3598 grafeo_common::types::EpochId(u64::MAX),
3599 )
3600 .unwrap();
3601 assert!(
3602 !changes.is_empty(),
3603 "session_with_cdc(true) should record events"
3604 );
3605 }
3606
3607 #[test]
3608 fn test_session_with_cdc_override_off() {
3609 let db = cdc_db();
3611 let session = db.session_with_cdc(false);
3612 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3613 let changes = db
3614 .changes_between(
3615 grafeo_common::types::EpochId(0),
3616 grafeo_common::types::EpochId(u64::MAX),
3617 )
3618 .unwrap();
3619 assert!(
3620 changes.is_empty(),
3621 "session_with_cdc(false) should not record events"
3622 );
3623 }
3624
3625 #[test]
3626 fn test_set_cdc_enabled_runtime() {
3627 let db = GrafeoDB::new_in_memory();
3628 assert!(!db.is_cdc_enabled());
3629
3630 db.set_cdc_enabled(true);
3632 assert!(db.is_cdc_enabled());
3633
3634 let id = db.create_node(&["Person"]);
3635 let history = db.history(id).unwrap();
3636 assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3637
3638 db.set_cdc_enabled(false);
3640 let id2 = db.create_node(&["Person"]);
3641 let history2 = db.history(id2).unwrap();
3642 assert!(
3643 history2.is_empty(),
3644 "CDC disabled at runtime stops recording"
3645 );
3646 }
3647 }
3648
3649 #[test]
3650 fn test_with_store_basic() {
3651 use grafeo_core::graph::lpg::LpgStore;
3652
3653 let store = Arc::new(LpgStore::new().unwrap());
3654 let n1 = store.create_node(&["Person"]);
3655 store.set_node_property(n1, "name", "Alix".into());
3656
3657 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3658 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3659
3660 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3661 assert_eq!(result.rows.len(), 1);
3662 }
3663
3664 #[test]
3665 fn test_with_store_session() {
3666 use grafeo_core::graph::lpg::LpgStore;
3667
3668 let store = Arc::new(LpgStore::new().unwrap());
3669 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3670 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3671
3672 let session = db.session();
3673 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3674 assert_eq!(result.rows.len(), 1);
3675 }
3676
3677 #[test]
3678 fn test_with_store_mutations() {
3679 use grafeo_core::graph::lpg::LpgStore;
3680
3681 let store = Arc::new(LpgStore::new().unwrap());
3682 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3683 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3684
3685 let mut session = db.session();
3686
3687 session.begin_transaction().unwrap();
3691 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3692
3693 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3694 assert_eq!(result.rows.len(), 1);
3695
3696 session.commit().unwrap();
3697 }
3698
3699 #[test]
3704 fn test_query_result_empty() {
3705 let result = QueryResult::empty();
3706 assert!(result.is_empty());
3707 assert_eq!(result.row_count(), 0);
3708 assert_eq!(result.column_count(), 0);
3709 assert!(result.execution_time_ms().is_none());
3710 assert!(result.rows_scanned().is_none());
3711 assert!(result.status_message.is_none());
3712 }
3713
3714 #[test]
3715 fn test_query_result_status() {
3716 let result = QueryResult::status("Created node type 'Person'");
3717 assert!(result.is_empty());
3718 assert_eq!(result.column_count(), 0);
3719 assert_eq!(
3720 result.status_message.as_deref(),
3721 Some("Created node type 'Person'")
3722 );
3723 }
3724
3725 #[test]
3726 fn test_query_result_new_with_columns() {
3727 let result = QueryResult::new(vec!["name".into(), "age".into()]);
3728 assert_eq!(result.column_count(), 2);
3729 assert_eq!(result.row_count(), 0);
3730 assert!(result.is_empty());
3731 assert_eq!(
3733 result.column_types,
3734 vec![
3735 grafeo_common::types::LogicalType::Any,
3736 grafeo_common::types::LogicalType::Any
3737 ]
3738 );
3739 }
3740
3741 #[test]
3742 fn test_query_result_with_types() {
3743 use grafeo_common::types::LogicalType;
3744 let result = QueryResult::with_types(
3745 vec!["name".into(), "age".into()],
3746 vec![LogicalType::String, LogicalType::Int64],
3747 );
3748 assert_eq!(result.column_count(), 2);
3749 assert_eq!(result.column_types[0], LogicalType::String);
3750 assert_eq!(result.column_types[1], LogicalType::Int64);
3751 }
3752
3753 #[test]
3754 fn test_query_result_with_metrics() {
3755 let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3756 assert_eq!(result.execution_time_ms(), Some(42.5));
3757 assert_eq!(result.rows_scanned(), Some(100));
3758 }
3759
3760 #[test]
3761 fn test_query_result_scalar_success() {
3762 use grafeo_common::types::Value;
3763 let mut result = QueryResult::new(vec!["count".into()]);
3764 result.rows.push(vec![Value::Int64(42)]);
3765
3766 let val: i64 = result.scalar().unwrap();
3767 assert_eq!(val, 42);
3768 }
3769
3770 #[test]
3771 fn test_query_result_scalar_wrong_shape() {
3772 use grafeo_common::types::Value;
3773 let mut result = QueryResult::new(vec!["x".into()]);
3775 result.rows.push(vec![Value::Int64(1)]);
3776 result.rows.push(vec![Value::Int64(2)]);
3777 assert!(result.scalar::<i64>().is_err());
3778
3779 let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3781 result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3782 assert!(result2.scalar::<i64>().is_err());
3783
3784 let result3 = QueryResult::new(vec!["x".into()]);
3786 assert!(result3.scalar::<i64>().is_err());
3787 }
3788
3789 #[test]
3790 fn test_query_result_iter() {
3791 use grafeo_common::types::Value;
3792 let mut result = QueryResult::new(vec!["x".into()]);
3793 result.rows.push(vec![Value::Int64(1)]);
3794 result.rows.push(vec![Value::Int64(2)]);
3795
3796 let collected: Vec<_> = result.iter().collect();
3797 assert_eq!(collected.len(), 2);
3798 }
3799
3800 #[test]
3801 fn test_query_result_display() {
3802 use grafeo_common::types::Value;
3803 let mut result = QueryResult::new(vec!["name".into()]);
3804 result.rows.push(vec![Value::from("Alix")]);
3805 let display = result.to_string();
3806 assert!(display.contains("name"));
3807 assert!(display.contains("Alix"));
3808 }
3809
3810 #[test]
3815 fn test_from_value_i64_type_mismatch() {
3816 use grafeo_common::types::Value;
3817 let val = Value::from("not a number");
3818 assert!(i64::from_value(&val).is_err());
3819 }
3820
3821 #[test]
3822 fn test_from_value_f64_type_mismatch() {
3823 use grafeo_common::types::Value;
3824 let val = Value::from("not a float");
3825 assert!(f64::from_value(&val).is_err());
3826 }
3827
3828 #[test]
3829 fn test_from_value_string_type_mismatch() {
3830 use grafeo_common::types::Value;
3831 let val = Value::Int64(42);
3832 assert!(String::from_value(&val).is_err());
3833 }
3834
3835 #[test]
3836 fn test_from_value_bool_type_mismatch() {
3837 use grafeo_common::types::Value;
3838 let val = Value::Int64(1);
3839 assert!(bool::from_value(&val).is_err());
3840 }
3841
3842 #[test]
3843 fn test_from_value_all_success() {
3844 use grafeo_common::types::Value;
3845 assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3846 assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3847 assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3848 assert!(bool::from_value(&Value::Bool(true)).unwrap());
3849 }
3850
3851 #[test]
3856 fn test_database_is_read_only_false_by_default() {
3857 let db = GrafeoDB::new_in_memory();
3858 assert!(!db.is_read_only());
3859 }
3860
3861 #[test]
3862 fn test_database_graph_model() {
3863 let db = GrafeoDB::new_in_memory();
3864 assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3865 }
3866
3867 #[test]
3868 fn test_database_memory_limit_none_by_default() {
3869 let db = GrafeoDB::new_in_memory();
3870 assert!(db.memory_limit().is_none());
3871 }
3872
3873 #[test]
3874 fn test_database_memory_limit_custom() {
3875 let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3876 let db = GrafeoDB::with_config(config).unwrap();
3877 assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3878 }
3879
3880 #[test]
3881 fn test_database_adaptive_config() {
3882 let db = GrafeoDB::new_in_memory();
3883 let adaptive = db.adaptive_config();
3884 assert!(adaptive.enabled);
3885 assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3886 }
3887
3888 #[test]
3889 fn test_database_buffer_manager() {
3890 let db = GrafeoDB::new_in_memory();
3891 let _bm = db.buffer_manager();
3892 }
3894
3895 #[test]
3896 fn test_database_query_cache() {
3897 let db = GrafeoDB::new_in_memory();
3898 let _qc = db.query_cache();
3899 }
3900
3901 #[test]
3902 fn test_database_clear_plan_cache() {
3903 let db = GrafeoDB::new_in_memory();
3904 #[cfg(feature = "gql")]
3906 {
3907 let _ = db.execute("MATCH (n) RETURN count(n)");
3908 }
3909 db.clear_plan_cache();
3910 }
3912
3913 #[test]
3914 fn test_database_gc() {
3915 let db = GrafeoDB::new_in_memory();
3916 db.create_node(&["Person"]);
3917 db.gc();
3918 assert_eq!(db.node_count(), 1);
3920 }
3921
3922 #[test]
3927 fn test_create_and_list_graphs() {
3928 let db = GrafeoDB::new_in_memory();
3929 let created = db.create_graph("social").unwrap();
3930 assert!(created);
3931
3932 let created_again = db.create_graph("social").unwrap();
3934 assert!(!created_again);
3935
3936 let names = db.list_graphs();
3937 assert!(names.contains(&"social".to_string()));
3938 }
3939
3940 #[test]
3941 fn test_drop_graph() {
3942 let db = GrafeoDB::new_in_memory();
3943 db.create_graph("temp").unwrap();
3944 assert!(db.drop_graph("temp"));
3945 assert!(!db.drop_graph("temp")); }
3947
3948 #[test]
3949 fn test_drop_graph_resets_current_graph() {
3950 let db = GrafeoDB::new_in_memory();
3951 db.create_graph("active").unwrap();
3952 db.set_current_graph(Some("active")).unwrap();
3953 assert_eq!(db.current_graph(), Some("active".to_string()));
3954
3955 db.drop_graph("active");
3956 assert_eq!(db.current_graph(), None);
3957 }
3958
3959 #[test]
3964 fn test_current_graph_default_none() {
3965 let db = GrafeoDB::new_in_memory();
3966 assert_eq!(db.current_graph(), None);
3967 }
3968
3969 #[test]
3970 fn test_set_current_graph_valid() {
3971 let db = GrafeoDB::new_in_memory();
3972 db.create_graph("social").unwrap();
3973 db.set_current_graph(Some("social")).unwrap();
3974 assert_eq!(db.current_graph(), Some("social".to_string()));
3975 }
3976
3977 #[test]
3978 fn test_set_current_graph_nonexistent() {
3979 let db = GrafeoDB::new_in_memory();
3980 let result = db.set_current_graph(Some("nonexistent"));
3981 assert!(result.is_err());
3982 }
3983
3984 #[test]
3985 fn test_set_current_graph_none_resets() {
3986 let db = GrafeoDB::new_in_memory();
3987 db.create_graph("social").unwrap();
3988 db.set_current_graph(Some("social")).unwrap();
3989 db.set_current_graph(None).unwrap();
3990 assert_eq!(db.current_graph(), None);
3991 }
3992
3993 #[test]
3994 fn test_set_current_graph_default_keyword() {
3995 let db = GrafeoDB::new_in_memory();
3996 db.set_current_graph(Some("default")).unwrap();
3998 assert_eq!(db.current_graph(), Some("default".to_string()));
3999 }
4000
4001 #[test]
4002 fn test_current_schema_default_none() {
4003 let db = GrafeoDB::new_in_memory();
4004 assert_eq!(db.current_schema(), None);
4005 }
4006
4007 #[test]
4008 fn test_set_current_schema_nonexistent() {
4009 let db = GrafeoDB::new_in_memory();
4010 let result = db.set_current_schema(Some("nonexistent"));
4011 assert!(result.is_err());
4012 }
4013
4014 #[test]
4015 fn test_set_current_schema_none_resets() {
4016 let db = GrafeoDB::new_in_memory();
4017 db.set_current_schema(None).unwrap();
4018 assert_eq!(db.current_schema(), None);
4019 }
4020
4021 #[test]
4026 fn test_graph_store_returns_lpg_by_default() {
4027 let db = GrafeoDB::new_in_memory();
4028 db.create_node(&["Person"]);
4029 let store = db.graph_store();
4030 assert_eq!(store.node_count(), 1);
4031 }
4032
4033 #[test]
4034 fn test_graph_store_mut_returns_some_by_default() {
4035 let db = GrafeoDB::new_in_memory();
4036 assert!(db.graph_store_mut().is_some());
4037 }
4038
4039 #[test]
4040 fn test_with_read_store() {
4041 use grafeo_core::graph::lpg::LpgStore;
4042
4043 let store = Arc::new(LpgStore::new().unwrap());
4044 store.create_node(&["Person"]);
4045
4046 let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
4047 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
4048
4049 assert!(db.is_read_only());
4050 assert!(db.graph_store_mut().is_none());
4051
4052 let gs = db.graph_store();
4054 assert_eq!(gs.node_count(), 1);
4055 }
4056
4057 #[test]
4058 fn test_with_store_graph_store_methods() {
4059 use grafeo_core::graph::lpg::LpgStore;
4060
4061 let store = Arc::new(LpgStore::new().unwrap());
4062 store.create_node(&["Person"]);
4063
4064 let db = GrafeoDB::with_store(
4065 Arc::clone(&store) as Arc<dyn GraphStoreMut>,
4066 Config::in_memory(),
4067 )
4068 .unwrap();
4069
4070 assert!(!db.is_read_only());
4071 assert!(db.graph_store_mut().is_some());
4072 assert_eq!(db.graph_store().node_count(), 1);
4073 }
4074
4075 #[test]
4080 #[allow(deprecated)]
4081 fn test_session_read_only() {
4082 let db = GrafeoDB::new_in_memory();
4083 db.create_node(&["Person"]);
4084
4085 let session = db.session_read_only();
4086 #[cfg(feature = "gql")]
4088 {
4089 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
4090 assert_eq!(result.rows.len(), 1);
4091 }
4092 }
4093
4094 #[test]
4099 fn test_close_in_memory_database() {
4100 let db = GrafeoDB::new_in_memory();
4101 db.create_node(&["Person"]);
4102 assert!(db.close().is_ok());
4103 assert!(db.close().is_ok());
4105 }
4106
4107 #[test]
4112 fn test_with_config_invalid_config_zero_threads() {
4113 let config = Config::in_memory().with_threads(0);
4114 let result = GrafeoDB::with_config(config);
4115 assert!(result.is_err());
4116 }
4117
4118 #[test]
4119 fn test_with_config_invalid_config_zero_memory_limit() {
4120 let config = Config::in_memory().with_memory_limit(0);
4121 let result = GrafeoDB::with_config(config);
4122 assert!(result.is_err());
4123 }
4124
4125 #[test]
4130 fn test_storage_format_display() {
4131 use crate::config::StorageFormat;
4132 assert_eq!(StorageFormat::Auto.to_string(), "auto");
4133 assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
4134 assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
4135 }
4136
4137 #[test]
4138 fn test_storage_format_default() {
4139 use crate::config::StorageFormat;
4140 assert_eq!(StorageFormat::default(), StorageFormat::Auto);
4141 }
4142
4143 #[test]
4144 fn test_config_with_storage_format() {
4145 use crate::config::StorageFormat;
4146 let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
4147 assert_eq!(config.storage_format, StorageFormat::SingleFile);
4148 }
4149
4150 #[test]
4155 fn test_config_with_cdc() {
4156 let config = Config::in_memory().with_cdc();
4157 assert!(config.cdc_enabled);
4158 }
4159
4160 #[test]
4161 fn test_config_cdc_default_false() {
4162 let config = Config::in_memory();
4163 assert!(!config.cdc_enabled);
4164 }
4165
4166 #[test]
4171 fn test_config_error_is_error_trait() {
4172 use crate::config::ConfigError;
4173 let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
4174 assert!(err.source().is_none());
4175 }
4176
4177 #[cfg(feature = "metrics")]
4182 #[test]
4183 fn test_metrics_prometheus_output() {
4184 let db = GrafeoDB::new_in_memory();
4185 let prom = db.metrics_prometheus();
4186 assert!(!prom.is_empty());
4188 }
4189
4190 #[cfg(feature = "metrics")]
4191 #[test]
4192 fn test_reset_metrics() {
4193 let db = GrafeoDB::new_in_memory();
4194 let _session = db.session();
4196 db.reset_metrics();
4197 let snap = db.metrics();
4198 assert_eq!(snap.query_count, 0);
4199 }
4200
4201 #[test]
4206 fn test_drop_graph_on_external_store() {
4207 use grafeo_core::graph::lpg::LpgStore;
4208
4209 let store = Arc::new(LpgStore::new().unwrap());
4210 let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
4211 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
4212
4213 assert!(!db.drop_graph("anything"));
4215 }
4216}