1#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(feature = "lpg")]
31mod crud;
32#[cfg(feature = "embed")]
33mod embed;
34#[cfg(feature = "grafeo-file")]
35pub(crate) mod flush;
36#[cfg(feature = "lpg")]
37mod import;
38#[cfg(feature = "lpg")]
39mod index;
40#[cfg(feature = "lpg")]
41mod persistence;
42mod query;
43#[cfg(feature = "triple-store")]
44mod rdf_ops;
45#[cfg(feature = "lpg")]
46mod search;
47pub(crate) mod section_consumer;
48#[cfg(all(feature = "wal", feature = "lpg"))]
49pub(crate) mod wal_store;
50
51use grafeo_common::{grafeo_error, grafeo_warn};
52#[cfg(feature = "wal")]
53use std::path::Path;
54use std::sync::Arc;
55use std::sync::atomic::AtomicUsize;
56
57use parking_lot::RwLock;
58
59use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
60use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
61#[cfg(feature = "lpg")]
62use grafeo_core::graph::lpg::LpgStore;
63#[cfg(feature = "triple-store")]
64use grafeo_core::graph::rdf::RdfStore;
65use grafeo_core::graph::{GraphStoreMut, GraphStoreSearch};
66#[cfg(feature = "grafeo-file")]
67use grafeo_storage::file::GrafeoFileManager;
68#[cfg(all(feature = "wal", feature = "lpg"))]
69use grafeo_storage::wal::WalRecovery;
70#[cfg(feature = "wal")]
71use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
72
73use crate::catalog::Catalog;
74use crate::config::Config;
75use crate::query::cache::QueryCache;
76use crate::session::Session;
77use crate::transaction::TransactionManager;
78
79pub struct GrafeoDB {
102 pub(super) config: Config,
104 #[cfg(feature = "lpg")]
106 pub(super) store: Option<Arc<LpgStore>>,
107 pub(super) catalog: Arc<Catalog>,
109 #[cfg(feature = "triple-store")]
111 pub(super) rdf_store: Arc<RdfStore>,
112 pub(super) transaction_manager: Arc<TransactionManager>,
114 pub(super) buffer_manager: Arc<BufferManager>,
116 #[cfg(feature = "wal")]
118 pub(super) wal: Option<Arc<LpgWal>>,
119 #[cfg(feature = "wal")]
123 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
124 pub(super) query_cache: Arc<QueryCache>,
126 pub(super) commit_counter: Arc<AtomicUsize>,
128 pub(super) is_open: RwLock<bool>,
130 #[cfg(feature = "cdc")]
132 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
133 #[cfg(feature = "cdc")]
135 cdc_enabled: std::sync::atomic::AtomicBool,
136 #[cfg(feature = "embed")]
138 pub(super) embedding_models:
139 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
140 #[cfg(feature = "grafeo-file")]
142 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
143 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
146 checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
147 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
150 vector_spill_storages: Option<
151 Arc<
152 parking_lot::RwLock<
153 std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
154 >,
155 >,
156 >,
157 pub(super) external_read_store: Option<Arc<dyn GraphStoreSearch>>,
160 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
163 #[cfg(feature = "metrics")]
165 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
166 current_graph: RwLock<Option<String>>,
170 current_schema: RwLock<Option<String>>,
174 read_only: bool,
177 projections:
179 Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
180 #[cfg(all(feature = "compact-store", feature = "lpg"))]
182 layered_store: Option<Arc<grafeo_core::graph::compact::layered::LayeredStore>>,
183}
184
185impl GrafeoDB {
186 #[cfg(feature = "lpg")]
194 fn lpg_store(&self) -> &Arc<LpgStore> {
195 self.store.as_ref().expect(
196 "no built-in LpgStore: this GrafeoDB was created with an external store \
197 (with_store / with_read_store). Use session() or graph_store() instead.",
198 )
199 }
200
201 #[cfg(any(
211 feature = "vector-index",
212 feature = "text-index",
213 feature = "hybrid-search",
214 feature = "embed",
215 ))]
216 fn graph_store_ref(&self) -> &dyn grafeo_core::graph::GraphStore {
217 if let Some(ref ext_read) = self.external_read_store {
218 ext_read.as_ref()
219 } else {
220 #[cfg(feature = "lpg")]
221 {
222 &**self.lpg_store()
223 }
224 #[cfg(not(feature = "lpg"))]
225 unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
226 }
227 }
228
229 #[cfg(feature = "cdc")]
231 #[inline]
232 pub(super) fn cdc_active(&self) -> bool {
233 self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
234 }
235
236 #[must_use]
257 pub fn new_in_memory() -> Self {
258 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
259 }
260
261 #[cfg(feature = "wal")]
280 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
281 Self::with_config(Config::persistent(path.as_ref()))
282 }
283
284 #[cfg(feature = "grafeo-file")]
309 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
310 Self::with_config(Config::read_only(path.as_ref()))
311 }
312
313 pub fn with_config(config: Config) -> Result<Self> {
337 config
339 .validate()
340 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
341
342 #[cfg(feature = "lpg")]
343 let store = Arc::new(LpgStore::new()?);
344 #[cfg(feature = "triple-store")]
345 let rdf_store = Arc::new(RdfStore::new());
346 let transaction_manager = Arc::new(TransactionManager::new());
347
348 let buffer_config = BufferManagerConfig {
350 budget: config.memory_limit.unwrap_or_else(|| {
351 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
353 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
354 b
355 }),
356 spill_path: config.spill_path.clone().or_else(|| {
357 config.path.as_ref().and_then(|p| {
358 let parent = p.parent()?;
359 let name = p.file_name()?.to_str()?;
360 Some(parent.join(format!("{name}.spill")))
361 })
362 }),
363 ..BufferManagerConfig::default()
364 };
365 let buffer_manager = BufferManager::new(buffer_config);
366
367 let catalog = Arc::new(Catalog::new());
369
370 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
371
372 #[cfg(feature = "grafeo-file")]
374 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
375 if let Some(ref db_path) = config.path {
377 if db_path.exists() && db_path.is_file() {
378 let fm = GrafeoFileManager::open_read_only(db_path)?;
379 #[cfg(feature = "lpg")]
381 if fm.read_section_directory()?.is_some() {
382 Self::load_from_sections(
383 &fm,
384 &store,
385 &catalog,
386 #[cfg(feature = "triple-store")]
387 &rdf_store,
388 )?;
389 } else {
390 let snapshot_data = fm.read_snapshot()?;
392 if !snapshot_data.is_empty() {
393 Self::apply_snapshot_data(
394 &store,
395 &catalog,
396 #[cfg(feature = "triple-store")]
397 &rdf_store,
398 &snapshot_data,
399 )?;
400 }
401 }
402 Some(Arc::new(fm))
403 } else {
404 return Err(grafeo_common::utils::error::Error::Internal(format!(
405 "read-only open requires an existing .grafeo file: {}",
406 db_path.display()
407 )));
408 }
409 } else {
410 return Err(grafeo_common::utils::error::Error::Internal(
411 "read-only mode requires a database path".to_string(),
412 ));
413 }
414 } else if let Some(ref db_path) = config.path {
415 if Self::should_use_single_file(db_path, config.storage_format) {
420 let fm = if db_path.exists() && db_path.is_file() {
421 GrafeoFileManager::open(db_path)?
422 } else if !db_path.exists() {
423 GrafeoFileManager::create(db_path)?
424 } else {
425 return Err(grafeo_common::utils::error::Error::Internal(format!(
427 "path exists but is not a file: {}",
428 db_path.display()
429 )));
430 };
431
432 #[cfg(feature = "lpg")]
434 if fm.read_section_directory()?.is_some() {
435 Self::load_from_sections(
436 &fm,
437 &store,
438 &catalog,
439 #[cfg(feature = "triple-store")]
440 &rdf_store,
441 )?;
442 } else {
443 let snapshot_data = fm.read_snapshot()?;
444 if !snapshot_data.is_empty() {
445 Self::apply_snapshot_data(
446 &store,
447 &catalog,
448 #[cfg(feature = "triple-store")]
449 &rdf_store,
450 &snapshot_data,
451 )?;
452 }
453 }
454
455 #[cfg(all(feature = "wal", feature = "lpg"))]
457 if config.wal_enabled && fm.has_sidecar_wal() {
458 let recovery = WalRecovery::new(fm.sidecar_wal_path());
459 let records = recovery.recover()?;
460 Self::apply_wal_records(
461 &store,
462 &catalog,
463 #[cfg(feature = "triple-store")]
464 &rdf_store,
465 &records,
466 )?;
467 }
468
469 Some(Arc::new(fm))
470 } else {
471 None
472 }
473 } else {
474 None
475 };
476
477 #[cfg(feature = "wal")]
480 let wal = if is_read_only {
481 None
482 } else if config.wal_enabled {
483 if let Some(ref db_path) = config.path {
484 #[cfg(feature = "grafeo-file")]
486 let wal_path = if let Some(ref fm) = file_manager {
487 let p = fm.sidecar_wal_path();
488 std::fs::create_dir_all(&p)?;
489 p
490 } else {
491 std::fs::create_dir_all(db_path)?;
493 db_path.join("wal")
494 };
495
496 #[cfg(not(feature = "grafeo-file"))]
497 let wal_path = {
498 std::fs::create_dir_all(db_path)?;
499 db_path.join("wal")
500 };
501
502 #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
504 let is_single_file = file_manager.is_some();
505 #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
506 let is_single_file = false;
507
508 #[cfg(feature = "lpg")]
509 if !is_single_file && wal_path.exists() {
510 let recovery = WalRecovery::new(&wal_path);
511 let records = recovery.recover()?;
512 Self::apply_wal_records(
513 &store,
514 &catalog,
515 #[cfg(feature = "triple-store")]
516 &rdf_store,
517 &records,
518 )?;
519 }
520
521 let wal_durability = match config.wal_durability {
523 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
524 crate::config::DurabilityMode::Batch {
525 max_delay_ms,
526 max_records,
527 } => WalDurabilityMode::Batch {
528 max_delay_ms,
529 max_records,
530 },
531 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
532 WalDurabilityMode::Adaptive { target_interval_ms }
533 }
534 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
535 };
536 let wal_config = WalConfig {
537 durability: wal_durability,
538 ..WalConfig::default()
539 };
540 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
541 Some(Arc::new(wal_manager))
542 } else {
543 None
544 }
545 } else {
546 None
547 };
548
549 let query_cache = Arc::new(QueryCache::default());
551
552 #[cfg(all(feature = "temporal", feature = "lpg"))]
555 transaction_manager.sync_epoch(store.current_epoch());
556
557 #[cfg(feature = "cdc")]
558 let cdc_enabled_val = config.cdc_enabled;
559 #[cfg(feature = "cdc")]
560 let cdc_retention = config.cdc_retention.clone();
561
562 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
565 let checkpoint_interval = config.checkpoint_interval;
566 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
567 let timer_store = Arc::clone(&store);
568 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
569 let timer_catalog = Arc::clone(&catalog);
570 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
571 let timer_tm = Arc::clone(&transaction_manager);
572 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
573 let timer_rdf = Arc::clone(&rdf_store);
574 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
575 let timer_wal = wal.clone();
576
577 let mut db = Self {
578 config,
579 #[cfg(feature = "lpg")]
580 store: Some(store),
581 catalog,
582 #[cfg(feature = "triple-store")]
583 rdf_store,
584 transaction_manager,
585 buffer_manager,
586 #[cfg(feature = "wal")]
587 wal,
588 #[cfg(feature = "wal")]
589 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
590 query_cache,
591 commit_counter: Arc::new(AtomicUsize::new(0)),
592 is_open: RwLock::new(true),
593 #[cfg(feature = "cdc")]
594 cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
595 #[cfg(feature = "cdc")]
596 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
597 #[cfg(feature = "embed")]
598 embedding_models: RwLock::new(hashbrown::HashMap::new()),
599 #[cfg(feature = "grafeo-file")]
600 file_manager,
601 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
602 checkpoint_timer: parking_lot::Mutex::new(None),
603 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
604 vector_spill_storages: None,
605 external_read_store: None,
606 external_write_store: None,
607 #[cfg(feature = "metrics")]
608 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
609 current_graph: RwLock::new(None),
610 current_schema: RwLock::new(None),
611 read_only: is_read_only,
612 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
613 #[cfg(all(feature = "compact-store", feature = "lpg"))]
614 layered_store: None,
615 };
616
617 db.register_section_consumers();
619
620 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
622 if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
623 && !is_read_only
624 {
625 *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
626 interval,
627 Arc::clone(fm),
628 timer_store,
629 timer_catalog,
630 timer_tm,
631 #[cfg(feature = "triple-store")]
632 timer_rdf,
633 #[cfg(feature = "wal")]
634 timer_wal,
635 ));
636 }
637
638 #[cfg(all(
642 feature = "lpg",
643 feature = "vector-index",
644 feature = "mmap",
645 not(feature = "temporal")
646 ))]
647 db.restore_spill_files();
648
649 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
652 if db
653 .config
654 .section_configs
655 .get(&grafeo_common::storage::SectionType::VectorStore)
656 .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
657 {
658 db.buffer_manager.spill_all();
659 }
660
661 Ok(db)
662 }
663
664 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
693 config
694 .validate()
695 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
696
697 let transaction_manager = Arc::new(TransactionManager::new());
698
699 let buffer_config = BufferManagerConfig {
700 budget: config.memory_limit.unwrap_or_else(|| {
701 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
703 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
704 b
705 }),
706 spill_path: None,
707 ..BufferManagerConfig::default()
708 };
709 let buffer_manager = BufferManager::new(buffer_config);
710
711 let query_cache = Arc::new(QueryCache::default());
712
713 #[cfg(feature = "cdc")]
714 let cdc_enabled_val = config.cdc_enabled;
715
716 Ok(Self {
717 config,
718 #[cfg(feature = "lpg")]
719 store: None,
720 catalog: Arc::new(Catalog::new()),
721 #[cfg(feature = "triple-store")]
722 rdf_store: Arc::new(RdfStore::new()),
723 transaction_manager,
724 buffer_manager,
725 #[cfg(feature = "wal")]
726 wal: None,
727 #[cfg(feature = "wal")]
728 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
729 query_cache,
730 commit_counter: Arc::new(AtomicUsize::new(0)),
731 is_open: RwLock::new(true),
732 #[cfg(feature = "cdc")]
733 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
734 #[cfg(feature = "cdc")]
735 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
736 #[cfg(feature = "embed")]
737 embedding_models: RwLock::new(hashbrown::HashMap::new()),
738 #[cfg(feature = "grafeo-file")]
739 file_manager: None,
740 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
741 checkpoint_timer: parking_lot::Mutex::new(None),
742 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
743 vector_spill_storages: None,
744 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStoreSearch>),
745 external_write_store: Some(store),
746 #[cfg(feature = "metrics")]
747 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
748 current_graph: RwLock::new(None),
749 current_schema: RwLock::new(None),
750 read_only: false,
751 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
752 #[cfg(all(feature = "compact-store", feature = "lpg"))]
753 layered_store: None,
754 })
755 }
756
757 pub fn with_read_store(store: Arc<dyn GraphStoreSearch>, config: Config) -> Result<Self> {
782 config
783 .validate()
784 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
785
786 let transaction_manager = Arc::new(TransactionManager::new());
787
788 let buffer_config = BufferManagerConfig {
789 budget: config.memory_limit.unwrap_or_else(|| {
790 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
792 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
793 b
794 }),
795 spill_path: None,
796 ..BufferManagerConfig::default()
797 };
798 let buffer_manager = BufferManager::new(buffer_config);
799
800 let query_cache = Arc::new(QueryCache::default());
801
802 #[cfg(feature = "cdc")]
803 let cdc_enabled_val = config.cdc_enabled;
804
805 Ok(Self {
806 config,
807 #[cfg(feature = "lpg")]
808 store: None,
809 catalog: Arc::new(Catalog::new()),
810 #[cfg(feature = "triple-store")]
811 rdf_store: Arc::new(RdfStore::new()),
812 transaction_manager,
813 buffer_manager,
814 #[cfg(feature = "wal")]
815 wal: None,
816 #[cfg(feature = "wal")]
817 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
818 query_cache,
819 commit_counter: Arc::new(AtomicUsize::new(0)),
820 is_open: RwLock::new(true),
821 #[cfg(feature = "cdc")]
822 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
823 #[cfg(feature = "cdc")]
824 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
825 #[cfg(feature = "embed")]
826 embedding_models: RwLock::new(hashbrown::HashMap::new()),
827 #[cfg(feature = "grafeo-file")]
828 file_manager: None,
829 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
830 checkpoint_timer: parking_lot::Mutex::new(None),
831 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
832 vector_spill_storages: None,
833 external_read_store: Some(store),
834 external_write_store: None,
835 #[cfg(feature = "metrics")]
836 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
837 current_graph: RwLock::new(None),
838 current_schema: RwLock::new(None),
839 read_only: true,
840 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
841 #[cfg(all(feature = "compact-store", feature = "lpg"))]
842 layered_store: None,
843 })
844 }
845
846 #[cfg(all(feature = "compact-store", feature = "lpg"))]
864 pub fn compact(&mut self) -> Result<()> {
865 use grafeo_core::graph::compact::from_graph_store_preserving_ids;
866 use grafeo_core::graph::compact::layered::LayeredStore;
867
868 let current_store = self.graph_store();
869
870 let max_node_id = if let Some(ref store) = self.store {
872 store.next_node_id().saturating_sub(1)
873 } else {
874 current_store.node_ids().last().map_or(0, |id| id.as_u64())
875 };
876 let max_edge_id = if let Some(ref store) = self.store {
877 store.next_edge_id().saturating_sub(1)
878 } else {
879 let mut max_eid = 0u64;
881 for nid in current_store.node_ids() {
882 for (_, eid) in
883 current_store.edges_from(nid, grafeo_core::graph::Direction::Outgoing)
884 {
885 max_eid = max_eid.max(eid.as_u64());
886 }
887 }
888 max_eid
889 };
890
891 let compact = from_graph_store_preserving_ids(current_store.as_ref())
892 .map_err(|e| Error::Internal(e.to_string()))?;
893
894 let layered = Arc::new(
895 LayeredStore::new(compact, max_node_id, max_edge_id)
896 .map_err(|e| Error::Internal(e.to_string()))?,
897 );
898
899 let current_epoch = self.transaction_manager.current_epoch();
902 layered.overlay_store().sync_epoch(current_epoch);
903
904 if let Some(ref old) = self.store {
908 layered
909 .overlay_store()
910 .install_named_graphs(old.take_named_graphs());
911 }
912
913 self.external_read_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreSearch>);
914 self.external_write_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreMut>);
915 self.store = Some(Arc::clone(layered.overlay_store()));
916 self.layered_store = Some(layered);
917 self.read_only = false;
918 self.query_cache = Arc::new(QueryCache::default());
919 self.projections.write().clear();
920
921 Ok(())
922 }
923
924 #[cfg(all(feature = "compact-store", feature = "lpg"))]
935 pub fn recompact(&mut self) -> Result<()> {
936 use grafeo_core::graph::compact::from_graph_store_preserving_ids;
937 use grafeo_core::graph::compact::layered::LayeredStore;
938
939 let layered = self
940 .layered_store
941 .as_ref()
942 .ok_or_else(|| Error::Internal("recompact() requires a prior compact()".into()))?;
943
944 let combined: Arc<dyn GraphStoreSearch> = Arc::clone(layered) as Arc<dyn GraphStoreSearch>;
946
947 let max_node_id = layered.overlay_store().next_node_id().saturating_sub(1);
949 let max_edge_id = layered.overlay_store().next_edge_id().saturating_sub(1);
950
951 let fresh_compact = from_graph_store_preserving_ids(combined.as_ref())
952 .map_err(|e| Error::Internal(e.to_string()))?;
953
954 let new_layered = Arc::new(
955 LayeredStore::new(fresh_compact, max_node_id, max_edge_id)
956 .map_err(|e| Error::Internal(e.to_string()))?,
957 );
958
959 let current_epoch = self.transaction_manager.current_epoch();
961 new_layered.overlay_store().sync_epoch(current_epoch);
962
963 new_layered
965 .overlay_store()
966 .install_named_graphs(layered.overlay_store().take_named_graphs());
967
968 self.external_read_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreSearch>);
969 self.external_write_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreMut>);
970 self.store = Some(Arc::clone(new_layered.overlay_store()));
971 self.layered_store = Some(new_layered);
972 self.query_cache = Arc::new(QueryCache::default());
973
974 Ok(())
975 }
976
977 #[cfg(all(feature = "wal", feature = "lpg"))]
983 fn apply_wal_records(
984 store: &Arc<LpgStore>,
985 catalog: &Catalog,
986 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
987 records: &[WalRecord],
988 ) -> Result<()> {
989 use crate::catalog::{
990 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
991 };
992 use grafeo_common::utils::error::Error;
993
994 let mut current_graph: Option<String> = None;
997 let mut target_store: Arc<LpgStore> = Arc::clone(store);
998
999 for record in records {
1000 match record {
1001 WalRecord::CreateNamedGraph { name } => {
1003 let _ = store.create_graph(name);
1004 }
1005 WalRecord::DropNamedGraph { name } => {
1006 store.drop_graph(name);
1007 if current_graph.as_deref() == Some(name.as_str()) {
1009 current_graph = None;
1010 target_store = Arc::clone(store);
1011 }
1012 }
1013 WalRecord::SwitchGraph { name } => {
1014 current_graph.clone_from(name);
1015 target_store = match ¤t_graph {
1016 None => Arc::clone(store),
1017 Some(graph_name) => store
1018 .graph_or_create(graph_name)
1019 .map_err(|e| Error::Internal(e.to_string()))?,
1020 };
1021 }
1022
1023 WalRecord::CreateNode { id, labels } => {
1025 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
1026 target_store.create_node_with_id(*id, &label_refs)?;
1027 }
1028 WalRecord::DeleteNode { id } => {
1029 target_store.delete_node(*id);
1030 }
1031 WalRecord::CreateEdge {
1032 id,
1033 src,
1034 dst,
1035 edge_type,
1036 } => {
1037 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
1038 }
1039 WalRecord::DeleteEdge { id } => {
1040 target_store.delete_edge(*id);
1041 }
1042 WalRecord::SetNodeProperty { id, key, value } => {
1043 target_store.set_node_property(*id, key, value.clone());
1044 }
1045 WalRecord::SetEdgeProperty { id, key, value } => {
1046 target_store.set_edge_property(*id, key, value.clone());
1047 }
1048 WalRecord::AddNodeLabel { id, label } => {
1049 target_store.add_label(*id, label);
1050 }
1051 WalRecord::RemoveNodeLabel { id, label } => {
1052 target_store.remove_label(*id, label);
1053 }
1054 WalRecord::RemoveNodeProperty { id, key } => {
1055 target_store.remove_node_property(*id, key);
1056 }
1057 WalRecord::RemoveEdgeProperty { id, key } => {
1058 target_store.remove_edge_property(*id, key);
1059 }
1060
1061 WalRecord::CreateNodeType {
1063 name,
1064 properties,
1065 constraints,
1066 } => {
1067 let def = NodeTypeDefinition {
1068 name: name.clone(),
1069 properties: properties
1070 .iter()
1071 .map(|(n, t, nullable)| TypedProperty {
1072 name: n.clone(),
1073 data_type: PropertyDataType::from_type_name(t),
1074 nullable: *nullable,
1075 default_value: None,
1076 })
1077 .collect(),
1078 constraints: constraints
1079 .iter()
1080 .map(|(kind, props)| match kind.as_str() {
1081 "unique" => TypeConstraint::Unique(props.clone()),
1082 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1083 "not_null" if !props.is_empty() => {
1084 TypeConstraint::NotNull(props[0].clone())
1085 }
1086 _ => TypeConstraint::Unique(props.clone()),
1087 })
1088 .collect(),
1089 parent_types: Vec::new(),
1090 };
1091 let _ = catalog.register_node_type(def);
1092 }
1093 WalRecord::DropNodeType { name } => {
1094 let _ = catalog.drop_node_type(name);
1095 }
1096 WalRecord::CreateEdgeType {
1097 name,
1098 properties,
1099 constraints,
1100 } => {
1101 let def = EdgeTypeDefinition {
1102 name: name.clone(),
1103 properties: properties
1104 .iter()
1105 .map(|(n, t, nullable)| TypedProperty {
1106 name: n.clone(),
1107 data_type: PropertyDataType::from_type_name(t),
1108 nullable: *nullable,
1109 default_value: None,
1110 })
1111 .collect(),
1112 constraints: constraints
1113 .iter()
1114 .map(|(kind, props)| match kind.as_str() {
1115 "unique" => TypeConstraint::Unique(props.clone()),
1116 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1117 "not_null" if !props.is_empty() => {
1118 TypeConstraint::NotNull(props[0].clone())
1119 }
1120 _ => TypeConstraint::Unique(props.clone()),
1121 })
1122 .collect(),
1123 source_node_types: Vec::new(),
1124 target_node_types: Vec::new(),
1125 };
1126 let _ = catalog.register_edge_type_def(def);
1127 }
1128 WalRecord::DropEdgeType { name } => {
1129 let _ = catalog.drop_edge_type_def(name);
1130 }
1131 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
1132 }
1135 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
1136 }
1139 WalRecord::CreateGraphType {
1140 name,
1141 node_types,
1142 edge_types,
1143 open,
1144 } => {
1145 use crate::catalog::GraphTypeDefinition;
1146 let def = GraphTypeDefinition {
1147 name: name.clone(),
1148 allowed_node_types: node_types.clone(),
1149 allowed_edge_types: edge_types.clone(),
1150 open: *open,
1151 };
1152 let _ = catalog.register_graph_type(def);
1153 }
1154 WalRecord::DropGraphType { name } => {
1155 let _ = catalog.drop_graph_type(name);
1156 }
1157 WalRecord::CreateSchema { name } => {
1158 let _ = catalog.register_schema_namespace(name.clone());
1159 }
1160 WalRecord::DropSchema { name } => {
1161 let _ = catalog.drop_schema_namespace(name);
1162 }
1163
1164 WalRecord::AlterNodeType { name, alterations } => {
1165 for (action, prop_name, type_name, nullable) in alterations {
1166 match action.as_str() {
1167 "add" => {
1168 let prop = TypedProperty {
1169 name: prop_name.clone(),
1170 data_type: PropertyDataType::from_type_name(type_name),
1171 nullable: *nullable,
1172 default_value: None,
1173 };
1174 let _ = catalog.alter_node_type_add_property(name, prop);
1175 }
1176 "drop" => {
1177 let _ = catalog.alter_node_type_drop_property(name, prop_name);
1178 }
1179 _ => {}
1180 }
1181 }
1182 }
1183 WalRecord::AlterEdgeType { name, alterations } => {
1184 for (action, prop_name, type_name, nullable) in alterations {
1185 match action.as_str() {
1186 "add" => {
1187 let prop = TypedProperty {
1188 name: prop_name.clone(),
1189 data_type: PropertyDataType::from_type_name(type_name),
1190 nullable: *nullable,
1191 default_value: None,
1192 };
1193 let _ = catalog.alter_edge_type_add_property(name, prop);
1194 }
1195 "drop" => {
1196 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1197 }
1198 _ => {}
1199 }
1200 }
1201 }
1202 WalRecord::AlterGraphType { name, alterations } => {
1203 for (action, type_name) in alterations {
1204 match action.as_str() {
1205 "add_node" => {
1206 let _ =
1207 catalog.alter_graph_type_add_node_type(name, type_name.clone());
1208 }
1209 "drop_node" => {
1210 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1211 }
1212 "add_edge" => {
1213 let _ =
1214 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1215 }
1216 "drop_edge" => {
1217 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1218 }
1219 _ => {}
1220 }
1221 }
1222 }
1223
1224 WalRecord::CreateProcedure {
1225 name,
1226 params,
1227 returns,
1228 body,
1229 } => {
1230 use crate::catalog::ProcedureDefinition;
1231 let def = ProcedureDefinition {
1232 name: name.clone(),
1233 params: params.clone(),
1234 returns: returns.clone(),
1235 body: body.clone(),
1236 };
1237 let _ = catalog.register_procedure(def);
1238 }
1239 WalRecord::DropProcedure { name } => {
1240 let _ = catalog.drop_procedure(name);
1241 }
1242
1243 #[cfg(feature = "triple-store")]
1245 WalRecord::InsertRdfTriple { .. }
1246 | WalRecord::DeleteRdfTriple { .. }
1247 | WalRecord::ClearRdfGraph { .. }
1248 | WalRecord::CreateRdfGraph { .. }
1249 | WalRecord::DropRdfGraph { .. } => {
1250 rdf_ops::replay_rdf_wal_record(rdf_store, record);
1251 }
1252 #[cfg(not(feature = "triple-store"))]
1253 WalRecord::InsertRdfTriple { .. }
1254 | WalRecord::DeleteRdfTriple { .. }
1255 | WalRecord::ClearRdfGraph { .. }
1256 | WalRecord::CreateRdfGraph { .. }
1257 | WalRecord::DropRdfGraph { .. } => {}
1258
1259 WalRecord::TransactionCommit { .. } => {
1260 #[cfg(feature = "temporal")]
1264 {
1265 target_store.new_epoch();
1266 }
1267 }
1268 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1269 }
1272 WalRecord::EpochAdvance { .. } => {
1273 }
1276 }
1277 }
1278 Ok(())
1279 }
1280
1281 #[cfg(feature = "grafeo-file")]
1287 fn should_use_single_file(
1288 path: &std::path::Path,
1289 configured: crate::config::StorageFormat,
1290 ) -> bool {
1291 use crate::config::StorageFormat;
1292 match configured {
1293 StorageFormat::SingleFile => true,
1294 StorageFormat::WalDirectory => false,
1295 StorageFormat::Auto => {
1296 if path.is_file() {
1298 if let Ok(mut f) = std::fs::File::open(path) {
1299 use std::io::Read;
1300 let mut magic = [0u8; 4];
1301 if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1302 {
1303 return true;
1304 }
1305 }
1306 return false;
1307 }
1308 if path.is_dir() {
1310 return false;
1311 }
1312 path.extension().is_some_and(|ext| ext == "grafeo")
1314 }
1315 }
1316 }
1317
1318 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1322 fn apply_snapshot_data(
1323 store: &Arc<LpgStore>,
1324 catalog: &Arc<crate::catalog::Catalog>,
1325 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1326 data: &[u8],
1327 ) -> Result<()> {
1328 persistence::load_snapshot_into_store(
1330 store,
1331 catalog,
1332 #[cfg(feature = "triple-store")]
1333 rdf_store,
1334 data,
1335 )
1336 }
1337
1338 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1342 fn load_from_sections(
1343 fm: &GrafeoFileManager,
1344 store: &Arc<LpgStore>,
1345 catalog: &Arc<crate::catalog::Catalog>,
1346 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1347 ) -> Result<()> {
1348 use grafeo_common::storage::{Section, SectionType};
1349
1350 let dir = fm.read_section_directory()?.ok_or_else(|| {
1351 grafeo_common::utils::error::Error::Internal(
1352 "expected v2 section directory but found none".to_string(),
1353 )
1354 })?;
1355
1356 if let Some(entry) = dir.find(SectionType::Catalog) {
1358 let data = fm.read_section_data(entry)?;
1359 let tm = Arc::new(crate::transaction::TransactionManager::new());
1360 let mut section = catalog_section::CatalogSection::new(
1361 Arc::clone(catalog),
1362 Arc::clone(store),
1363 move || tm.current_epoch().as_u64(),
1364 );
1365 section.deserialize(&data)?;
1366 }
1367
1368 if let Some(entry) = dir.find(SectionType::LpgStore) {
1370 let data = fm.read_section_data(entry)?;
1371 let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1372 section.deserialize(&data)?;
1373 }
1374
1375 #[cfg(feature = "triple-store")]
1377 if let Some(entry) = dir.find(SectionType::RdfStore) {
1378 let data = fm.read_section_data(entry)?;
1379 let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1380 section.deserialize(&data)?;
1381 }
1382
1383 #[cfg(feature = "ring-index")]
1385 if let Some(entry) = dir.find(SectionType::RdfRing) {
1386 let data = fm.read_section_data(entry)?;
1387 let mut section = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
1388 section.deserialize(&data)?;
1389 }
1390
1391 #[cfg(feature = "vector-index")]
1393 if let Some(entry) = dir.find(SectionType::VectorStore) {
1394 let data = fm.read_section_data(entry)?;
1395 let indexes = store.vector_index_entries();
1396 if !indexes.is_empty() {
1397 let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1398 section.deserialize(&data)?;
1399 }
1400 }
1401
1402 #[cfg(feature = "text-index")]
1404 if let Some(entry) = dir.find(SectionType::TextIndex) {
1405 let data = fm.read_section_data(entry)?;
1406 let indexes = store.text_index_entries();
1407 if !indexes.is_empty() {
1408 let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1409 section.deserialize(&data)?;
1410 }
1411 }
1412
1413 Ok(())
1414 }
1415
1416 #[must_use]
1444 pub fn session(&self) -> Session {
1445 self.create_session_inner(None)
1446 }
1447
1448 #[must_use]
1466 pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1467 let force_read_only = !identity.can_write();
1468 self.create_session_inner_full(None, force_read_only, identity)
1469 }
1470
1471 #[must_use]
1485 pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1486 self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1487 }
1488
1489 #[cfg(feature = "cdc")]
1508 #[must_use]
1509 pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1510 self.create_session_inner(Some(cdc_enabled))
1511 }
1512
1513 #[deprecated(
1522 since = "0.5.36",
1523 note = "use session_with_role(Role::ReadOnly) instead"
1524 )]
1525 #[must_use]
1526 pub fn session_read_only(&self) -> Session {
1527 self.session_with_role(crate::auth::Role::ReadOnly)
1528 }
1529
1530 #[allow(unused_variables)] fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1536 self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1537 }
1538
1539 #[allow(unused_variables)]
1541 fn create_session_inner_full(
1542 &self,
1543 cdc_override: Option<bool>,
1544 force_read_only: bool,
1545 identity: crate::auth::Identity,
1546 ) -> Session {
1547 let session_cfg = || crate::session::SessionConfig {
1548 transaction_manager: Arc::clone(&self.transaction_manager),
1549 query_cache: Arc::clone(&self.query_cache),
1550 catalog: Arc::clone(&self.catalog),
1551 adaptive_config: self.config.adaptive.clone(),
1552 factorized_execution: self.config.factorized_execution,
1553 graph_model: self.config.graph_model,
1554 query_timeout: self.config.query_timeout,
1555 max_property_size: self.config.max_property_size,
1556 buffer_manager: Some(Arc::clone(&self.buffer_manager)),
1557 commit_counter: Arc::clone(&self.commit_counter),
1558 gc_interval: self.config.gc_interval,
1559 read_only: self.read_only || force_read_only,
1560 identity: identity.clone(),
1561 #[cfg(feature = "lpg")]
1562 projections: Arc::clone(&self.projections),
1563 };
1564
1565 #[cfg(all(feature = "compact-store", feature = "lpg"))]
1568 if let Some(ref layered) = self.layered_store {
1569 let overlay = Arc::clone(layered.overlay_store());
1570 let layered_arc = Arc::clone(layered);
1571 let mut session = Session::with_adaptive(overlay, session_cfg());
1572 session.override_stores(
1575 Arc::clone(&layered_arc) as Arc<dyn GraphStoreSearch>,
1576 Some(layered_arc as Arc<dyn GraphStoreMut>),
1577 );
1578 return session;
1579 }
1580
1581 if let Some(ref ext_read) = self.external_read_store {
1582 return Session::with_external_store(
1583 Arc::clone(ext_read),
1584 self.external_write_store.as_ref().map(Arc::clone),
1585 session_cfg(),
1586 )
1587 .expect("arena allocation for external store session");
1588 }
1589
1590 #[cfg(all(feature = "lpg", feature = "triple-store"))]
1591 let mut session = Session::with_rdf_store_and_adaptive(
1592 Arc::clone(self.lpg_store()),
1593 Arc::clone(&self.rdf_store),
1594 session_cfg(),
1595 );
1596 #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1597 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1598 #[cfg(not(feature = "lpg"))]
1599 let mut session =
1600 Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1601 .expect("session creation for non-lpg build");
1602
1603 #[cfg(all(feature = "wal", feature = "lpg"))]
1604 if let Some(ref wal) = self.wal {
1605 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1606 }
1607
1608 #[cfg(feature = "cdc")]
1609 {
1610 let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1611 if should_enable {
1612 session.set_cdc_log(Arc::clone(&self.cdc_log));
1613 }
1614 }
1615
1616 #[cfg(feature = "metrics")]
1617 {
1618 if let Some(ref m) = self.metrics {
1619 session.set_metrics(Arc::clone(m));
1620 m.session_created
1621 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1622 m.session_active
1623 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1624 }
1625 }
1626
1627 if let Some(ref graph) = *self.current_graph.read() {
1629 session.use_graph(graph);
1630 }
1631
1632 if let Some(ref schema) = *self.current_schema.read() {
1634 session.set_schema(schema);
1635 }
1636
1637 let _ = &mut session;
1639
1640 session
1641 }
1642
1643 #[must_use]
1649 pub fn current_graph(&self) -> Option<String> {
1650 self.current_graph.read().clone()
1651 }
1652
1653 pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1662 #[cfg(feature = "lpg")]
1663 if let Some(name) = name
1664 && !name.eq_ignore_ascii_case("default")
1665 && let Some(store) = &self.store
1666 && store.graph(name).is_none()
1667 {
1668 return Err(Error::Query(QueryError::new(
1669 QueryErrorKind::Semantic,
1670 format!("Graph '{name}' does not exist"),
1671 )));
1672 }
1673 *self.current_graph.write() = name.map(ToString::to_string);
1674 Ok(())
1675 }
1676
1677 #[must_use]
1682 pub fn current_schema(&self) -> Option<String> {
1683 self.current_schema.read().clone()
1684 }
1685
1686 pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1695 if let Some(name) = name
1696 && !self.catalog.schema_exists(name)
1697 {
1698 return Err(Error::Query(QueryError::new(
1699 QueryErrorKind::Semantic,
1700 format!("Schema '{name}' does not exist"),
1701 )));
1702 }
1703 *self.current_schema.write() = name.map(ToString::to_string);
1704 Ok(())
1705 }
1706
1707 #[must_use]
1709 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1710 &self.config.adaptive
1711 }
1712
1713 #[must_use]
1715 pub fn is_read_only(&self) -> bool {
1716 self.read_only
1717 }
1718
1719 #[must_use]
1721 pub fn config(&self) -> &Config {
1722 &self.config
1723 }
1724
1725 #[must_use]
1727 pub fn graph_model(&self) -> crate::config::GraphModel {
1728 self.config.graph_model
1729 }
1730
1731 #[must_use]
1733 pub fn memory_limit(&self) -> Option<usize> {
1734 self.config.memory_limit
1735 }
1736
1737 #[cfg(feature = "metrics")]
1742 #[must_use]
1743 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1744 let mut snapshot = self
1745 .metrics
1746 .as_ref()
1747 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1748
1749 let cache_stats = self.query_cache.stats();
1751 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1752 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1753 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1754 snapshot.cache_invalidations = cache_stats.invalidations;
1755
1756 snapshot
1757 }
1758
1759 #[cfg(feature = "metrics")]
1763 #[must_use]
1764 pub fn metrics_prometheus(&self) -> String {
1765 self.metrics
1766 .as_ref()
1767 .map_or_else(String::new, |m| m.to_prometheus())
1768 }
1769
1770 #[cfg(feature = "metrics")]
1772 pub fn reset_metrics(&self) {
1773 if let Some(ref m) = self.metrics {
1774 m.reset();
1775 }
1776 self.query_cache.reset_stats();
1777 }
1778
1779 #[cfg(feature = "lpg")]
1787 #[must_use]
1788 pub fn store(&self) -> &Arc<LpgStore> {
1789 self.lpg_store()
1790 }
1791
1792 #[cfg(feature = "lpg")]
1800 pub fn create_graph(&self, name: &str) -> Result<bool> {
1801 Ok(self.lpg_store().create_graph(name)?)
1802 }
1803
1804 #[cfg(feature = "lpg")]
1809 pub fn drop_graph(&self, name: &str) -> bool {
1810 let Some(store) = &self.store else {
1811 return false;
1812 };
1813 let dropped = store.drop_graph(name);
1814 if dropped {
1815 let mut current = self.current_graph.write();
1816 if current
1817 .as_deref()
1818 .is_some_and(|g| g.eq_ignore_ascii_case(name))
1819 {
1820 *current = None;
1821 }
1822 }
1823 dropped
1824 }
1825
1826 #[cfg(feature = "lpg")]
1828 #[must_use]
1829 pub fn list_graphs(&self) -> Vec<String> {
1830 self.lpg_store().graph_names()
1831 }
1832
1833 pub fn create_projection(
1854 &self,
1855 name: impl Into<String>,
1856 spec: grafeo_core::graph::ProjectionSpec,
1857 ) -> bool {
1858 use grafeo_core::graph::GraphProjection;
1859 use std::collections::hash_map::Entry;
1860
1861 let store = self.graph_store();
1862 let projection = Arc::new(GraphProjection::new(store, spec));
1863 let mut projections = self.projections.write();
1864 match projections.entry(name.into()) {
1865 Entry::Occupied(_) => false,
1866 Entry::Vacant(e) => {
1867 e.insert(projection);
1868 true
1869 }
1870 }
1871 }
1872
1873 pub fn drop_projection(&self, name: &str) -> bool {
1875 self.projections.write().remove(name).is_some()
1876 }
1877
1878 #[must_use]
1880 pub fn list_projections(&self) -> Vec<String> {
1881 self.projections.read().keys().cloned().collect()
1882 }
1883
1884 #[must_use]
1886 pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStoreSearch>> {
1887 self.projections
1888 .read()
1889 .get(name)
1890 .map(|p| Arc::clone(p) as Arc<dyn GraphStoreSearch>)
1891 }
1892
1893 #[must_use]
1901 pub fn graph_store(&self) -> Arc<dyn GraphStoreSearch> {
1902 if let Some(ref ext_read) = self.external_read_store {
1903 Arc::clone(ext_read)
1904 } else {
1905 #[cfg(feature = "lpg")]
1906 {
1907 Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreSearch>
1908 }
1909 #[cfg(not(feature = "lpg"))]
1910 unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1911 }
1912 }
1913
1914 #[must_use]
1919 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1920 if self.external_read_store.is_some() {
1921 self.external_write_store.as_ref().map(Arc::clone)
1922 } else {
1923 #[cfg(feature = "lpg")]
1924 {
1925 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1926 }
1927 #[cfg(not(feature = "lpg"))]
1928 {
1929 None
1930 }
1931 }
1932 }
1933
1934 pub fn gc(&self) {
1941 #[cfg(feature = "lpg")]
1942 {
1943 let min_epoch = self.transaction_manager.min_active_epoch();
1944 self.lpg_store().gc_versions(min_epoch);
1945 }
1946 #[cfg(all(feature = "lpg", feature = "cdc"))]
1947 let current_epoch = self.transaction_manager.current_epoch();
1948 self.transaction_manager.gc();
1949
1950 #[cfg(feature = "cdc")]
1952 if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
1953 #[cfg(feature = "lpg")]
1954 self.cdc_log.apply_retention(current_epoch);
1955 }
1956 }
1957
1958 #[must_use]
1960 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1961 &self.buffer_manager
1962 }
1963
1964 #[must_use]
1966 pub fn query_cache(&self) -> &Arc<QueryCache> {
1967 &self.query_cache
1968 }
1969
1970 pub fn clear_plan_cache(&self) {
1976 self.query_cache.clear();
1977 }
1978
1979 pub fn close(&self) -> Result<()> {
1993 let mut is_open = self.is_open.write();
1994 if !*is_open {
1995 return Ok(());
1996 }
1997
1998 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
2003 if let Some(mut timer) = self.checkpoint_timer.lock().take() {
2004 timer.stop();
2005 }
2006
2007 if self.read_only {
2009 #[cfg(feature = "grafeo-file")]
2010 if let Some(ref fm) = self.file_manager {
2011 fm.close()?;
2012 }
2013 *is_open = false;
2014 return Ok(());
2015 }
2016
2017 #[cfg(feature = "grafeo-file")]
2021 let is_single_file = self.file_manager.is_some();
2022 #[cfg(not(feature = "grafeo-file"))]
2023 let is_single_file = false;
2024
2025 #[cfg(feature = "grafeo-file")]
2026 if let Some(ref fm) = self.file_manager {
2027 #[cfg(feature = "wal")]
2029 if let Some(ref wal) = self.wal {
2030 wal.sync()?;
2031 }
2032 let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2033
2034 #[cfg(feature = "wal")]
2040 let flush_result = if flush_result.sections_written == 0 {
2041 if let Some(ref wal) = self.wal {
2042 if wal.record_count() > 0 {
2043 grafeo_warn!(
2044 "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
2045 wal.record_count()
2046 );
2047 self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
2048 } else {
2049 flush_result
2050 }
2051 } else {
2052 flush_result
2053 }
2054 } else {
2055 flush_result
2056 };
2057
2058 #[cfg(feature = "wal")]
2061 if let Some(ref wal) = self.wal {
2062 wal.close_active_log();
2063 }
2064
2065 #[cfg(feature = "wal")]
2069 let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
2070 #[cfg(not(feature = "wal"))]
2071 let has_wal_records = false;
2072
2073 if flush_result.sections_written > 0 || !has_wal_records {
2074 {
2075 use grafeo_common::testing::crash::maybe_crash;
2076 maybe_crash("close:before_remove_sidecar_wal");
2077 }
2078 fm.remove_sidecar_wal()?;
2079 } else {
2080 grafeo_warn!(
2081 "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
2082 );
2083 }
2084 fm.close()?;
2085 }
2086
2087 #[cfg(feature = "wal")]
2093 if !is_single_file && let Some(ref wal) = self.wal {
2094 let commit_tx = self
2096 .transaction_manager
2097 .last_assigned_transaction_id()
2098 .unwrap_or_else(|| self.transaction_manager.begin());
2099
2100 wal.log(&WalRecord::TransactionCommit {
2102 transaction_id: commit_tx,
2103 })?;
2104
2105 wal.sync()?;
2106 }
2107
2108 *is_open = false;
2109 Ok(())
2110 }
2111
2112 #[cfg(feature = "wal")]
2114 #[must_use]
2115 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
2116 self.wal.as_ref()
2117 }
2118
2119 #[cfg(feature = "wal")]
2121 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
2122 if let Some(ref wal) = self.wal {
2123 wal.log(record)?;
2124 }
2125 Ok(())
2126 }
2127
2128 fn register_section_consumers(&mut self) {
2133 #[cfg(feature = "lpg")]
2135 let store_ref = self.store.as_ref();
2136 #[cfg(feature = "lpg")]
2137 if let Some(store) = store_ref {
2138 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2139 self.buffer_manager.register_consumer(Arc::new(
2140 section_consumer::SectionConsumer::new(Arc::new(lpg)),
2141 ));
2142 }
2143
2144 #[cfg(feature = "triple-store")]
2146 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2147 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2148 self.buffer_manager.register_consumer(Arc::new(
2149 section_consumer::SectionConsumer::new(Arc::new(rdf)),
2150 ));
2151 }
2152
2153 #[cfg(feature = "ring-index")]
2155 if self.rdf_store.ring().is_some() {
2156 let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2157 self.buffer_manager.register_consumer(Arc::new(
2158 section_consumer::SectionConsumer::new(Arc::new(ring)),
2159 ));
2160 }
2161
2162 #[cfg(all(
2165 feature = "lpg",
2166 feature = "vector-index",
2167 feature = "mmap",
2168 not(feature = "temporal")
2169 ))]
2170 if let Some(store) = store_ref {
2171 let spill_path = self.buffer_manager.config().spill_path.clone();
2172 let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2173 store, spill_path,
2174 ));
2175 self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2177 self.buffer_manager.register_consumer(consumer);
2178 }
2179
2180 #[cfg(all(feature = "lpg", feature = "text-index"))]
2182 if let Some(store) = store_ref {
2183 self.buffer_manager
2184 .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2185 }
2186
2187 #[cfg(feature = "cdc")]
2190 self.buffer_manager.register_consumer(
2191 Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2192 );
2193 }
2194
2195 #[cfg(all(
2202 feature = "lpg",
2203 feature = "vector-index",
2204 feature = "mmap",
2205 not(feature = "temporal")
2206 ))]
2207 fn restore_spill_files(&mut self) {
2208 use grafeo_core::index::vector::MmapStorage;
2209
2210 let spill_dir = match self.buffer_manager.config().spill_path {
2211 Some(ref path) => path.clone(),
2212 None => return,
2213 };
2214
2215 if !spill_dir.exists() {
2216 return;
2217 }
2218
2219 let spill_map = match self.vector_spill_storages {
2220 Some(ref map) => Arc::clone(map),
2221 None => return,
2222 };
2223
2224 let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2225 return;
2226 };
2227
2228 let Some(ref store) = self.store else {
2229 return;
2230 };
2231
2232 for entry in entries.flatten() {
2233 let path = entry.path();
2234 let file_name = match path.file_name().and_then(|n| n.to_str()) {
2235 Some(name) => name.to_string(),
2236 None => continue,
2237 };
2238
2239 if !file_name.starts_with("vectors_")
2241 || !std::path::Path::new(&file_name)
2242 .extension()
2243 .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2244 {
2245 continue;
2246 }
2247
2248 let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2250
2251 let key = key_part.replace("%3A", ":").replace("%25", "%");
2253
2254 if !key.contains(':') {
2256 continue;
2258 }
2259
2260 if store.get_vector_index_by_key(&key).is_none() {
2262 let _ = std::fs::remove_file(&path);
2264 continue;
2265 }
2266
2267 match MmapStorage::open(&path) {
2269 Ok(mmap_storage) => {
2270 let property = key.split(':').nth(1).unwrap_or("");
2272 let prop_key = grafeo_common::types::PropertyKey::new(property);
2273 store.node_properties_mark_spilled(&prop_key);
2274
2275 spill_map.write().insert(key, Arc::new(mmap_storage));
2276 }
2277 Err(e) => {
2278 eprintln!("failed to restore spill file {}: {e}", path.display());
2279 let _ = std::fs::remove_file(&path);
2281 }
2282 }
2283 }
2284 }
2285
2286 #[cfg(feature = "grafeo-file")]
2288 fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2289 let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2290
2291 #[cfg(all(feature = "compact-store", feature = "lpg"))]
2293 if let Some(ref layered) = self.layered_store {
2294 let compact_section = grafeo_core::graph::compact::section::CompactStoreSection::new(
2296 layered.base_store_arc(),
2297 );
2298 sections.push(Box::new(compact_section));
2299
2300 let overlay = layered.overlay_store();
2302 let overlay_section =
2303 grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(overlay));
2304 sections.push(Box::new(overlay_section));
2305
2306 return sections;
2307 }
2308
2309 #[cfg(feature = "lpg")]
2311 if let Some(store) = self.store.as_ref() {
2312 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2313
2314 let catalog = catalog_section::CatalogSection::new(
2315 Arc::clone(&self.catalog),
2316 Arc::clone(store),
2317 {
2318 let tm = Arc::clone(&self.transaction_manager);
2319 move || tm.current_epoch().as_u64()
2320 },
2321 );
2322
2323 sections.push(Box::new(catalog));
2324 sections.push(Box::new(lpg));
2325
2326 #[cfg(feature = "vector-index")]
2328 {
2329 let indexes = store.vector_index_entries();
2330 if !indexes.is_empty() {
2331 let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2332 sections.push(Box::new(vector));
2333 }
2334 }
2335
2336 #[cfg(feature = "text-index")]
2338 {
2339 let indexes = store.text_index_entries();
2340 if !indexes.is_empty() {
2341 let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2342 sections.push(Box::new(text));
2343 }
2344 }
2345 }
2346
2347 #[cfg(feature = "triple-store")]
2348 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2349 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2350 sections.push(Box::new(rdf));
2351 }
2352
2353 #[cfg(feature = "ring-index")]
2354 if self.rdf_store.ring().is_some() {
2355 let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2356 sections.push(Box::new(ring));
2357 }
2358
2359 sections
2360 }
2361
2362 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2376 pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2377 let fm = self
2378 .file_manager
2379 .as_ref()
2380 .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2381
2382 if !self.read_only {
2386 let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2387 }
2388
2389 let current_epoch = self.transaction_manager.current_epoch();
2390 backup::do_backup_full(backup_dir, fm, self.wal.as_deref(), current_epoch)
2391 }
2392
2393 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2401 pub fn backup_incremental(
2402 &self,
2403 backup_dir: &std::path::Path,
2404 ) -> Result<backup::BackupSegment> {
2405 let wal = self
2406 .wal
2407 .as_ref()
2408 .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2409
2410 let current_epoch = self.transaction_manager.current_epoch();
2411 backup::do_backup_incremental(backup_dir, wal, current_epoch)
2412 }
2413
2414 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2420 pub fn read_backup_manifest(
2421 backup_dir: &std::path::Path,
2422 ) -> Result<Option<backup::BackupManifest>> {
2423 backup::read_manifest(backup_dir)
2424 }
2425
2426 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2428 #[must_use]
2429 pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2430 self.wal
2431 .as_ref()
2432 .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2433 }
2434
2435 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2446 pub fn restore_to_epoch(
2447 backup_dir: &std::path::Path,
2448 target_epoch: grafeo_common::types::EpochId,
2449 output_path: &std::path::Path,
2450 ) -> Result<()> {
2451 backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2452 }
2453
2454 #[cfg(feature = "grafeo-file")]
2460 fn checkpoint_to_file(
2461 &self,
2462 fm: &GrafeoFileManager,
2463 reason: flush::FlushReason,
2464 ) -> Result<flush::FlushResult> {
2465 let sections = self.build_sections();
2466 let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2467 sections.iter().map(|s| s.as_ref()).collect();
2468 #[cfg(feature = "lpg")]
2469 let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2470 #[cfg(not(feature = "lpg"))]
2471 let context = flush::build_context_minimal(&self.transaction_manager);
2472
2473 flush::flush(
2474 fm,
2475 §ion_refs,
2476 &context,
2477 reason,
2478 #[cfg(feature = "wal")]
2479 self.wal.as_deref(),
2480 )
2481 }
2482
2483 #[cfg(feature = "grafeo-file")]
2485 #[must_use]
2486 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2487 self.file_manager.as_ref()
2488 }
2489}
2490
2491impl Drop for GrafeoDB {
2492 fn drop(&mut self) {
2493 if let Err(e) = self.close() {
2494 grafeo_error!("Error closing database: {}", e);
2495 }
2496 }
2497}
2498
2499#[cfg(feature = "lpg")]
2500impl crate::admin::AdminService for GrafeoDB {
2501 fn info(&self) -> crate::admin::DatabaseInfo {
2502 self.info()
2503 }
2504
2505 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2506 self.detailed_stats()
2507 }
2508
2509 fn schema(&self) -> crate::admin::SchemaInfo {
2510 self.schema()
2511 }
2512
2513 fn validate(&self) -> crate::admin::ValidationResult {
2514 self.validate()
2515 }
2516
2517 fn wal_status(&self) -> crate::admin::WalStatus {
2518 self.wal_status()
2519 }
2520
2521 fn wal_checkpoint(&self) -> Result<()> {
2522 self.wal_checkpoint()
2523 }
2524}
2525
2526#[derive(Debug)]
2556pub struct QueryResult {
2557 pub columns: Vec<String>,
2559 pub column_types: Vec<grafeo_common::types::LogicalType>,
2561 pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2566 pub execution_time_ms: Option<f64>,
2568 pub rows_scanned: Option<u64>,
2570 pub status_message: Option<String>,
2572 pub gql_status: grafeo_common::utils::GqlStatus,
2574}
2575
2576impl QueryResult {
2577 #[must_use]
2579 pub fn empty() -> Self {
2580 Self {
2581 columns: Vec::new(),
2582 column_types: Vec::new(),
2583 rows: Vec::new(),
2584 execution_time_ms: None,
2585 rows_scanned: None,
2586 status_message: None,
2587 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2588 }
2589 }
2590
2591 #[must_use]
2593 pub fn status(msg: impl Into<String>) -> Self {
2594 Self {
2595 columns: Vec::new(),
2596 column_types: Vec::new(),
2597 rows: Vec::new(),
2598 execution_time_ms: None,
2599 rows_scanned: None,
2600 status_message: Some(msg.into()),
2601 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2602 }
2603 }
2604
2605 #[must_use]
2607 pub fn new(columns: Vec<String>) -> Self {
2608 let len = columns.len();
2609 Self {
2610 columns,
2611 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2612 rows: Vec::new(),
2613 execution_time_ms: None,
2614 rows_scanned: None,
2615 status_message: None,
2616 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2617 }
2618 }
2619
2620 #[must_use]
2622 pub fn with_types(
2623 columns: Vec<String>,
2624 column_types: Vec<grafeo_common::types::LogicalType>,
2625 ) -> Self {
2626 Self {
2627 columns,
2628 column_types,
2629 rows: Vec::new(),
2630 execution_time_ms: None,
2631 rows_scanned: None,
2632 status_message: None,
2633 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2634 }
2635 }
2636
2637 #[must_use]
2639 pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2640 let len = columns.len();
2641 Self {
2642 columns,
2643 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2644 rows,
2645 execution_time_ms: None,
2646 rows_scanned: None,
2647 status_message: None,
2648 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2649 }
2650 }
2651
2652 pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2654 self.rows.push(row);
2655 }
2656
2657 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2659 self.execution_time_ms = Some(execution_time_ms);
2660 self.rows_scanned = Some(rows_scanned);
2661 self
2662 }
2663
2664 #[must_use]
2666 pub fn execution_time_ms(&self) -> Option<f64> {
2667 self.execution_time_ms
2668 }
2669
2670 #[must_use]
2672 pub fn rows_scanned(&self) -> Option<u64> {
2673 self.rows_scanned
2674 }
2675
2676 #[must_use]
2678 pub fn row_count(&self) -> usize {
2679 self.rows.len()
2680 }
2681
2682 #[must_use]
2684 pub fn column_count(&self) -> usize {
2685 self.columns.len()
2686 }
2687
2688 #[must_use]
2690 pub fn is_empty(&self) -> bool {
2691 self.rows.is_empty()
2692 }
2693
2694 pub fn scalar<T: FromValue>(&self) -> Result<T> {
2703 if self.rows.len() != 1 || self.columns.len() != 1 {
2704 return Err(grafeo_common::utils::error::Error::InvalidValue(
2705 "Expected single value".to_string(),
2706 ));
2707 }
2708 T::from_value(&self.rows[0][0])
2709 }
2710
2711 #[must_use]
2713 pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2714 &self.rows
2715 }
2716
2717 #[must_use]
2719 pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2720 self.rows
2721 }
2722
2723 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2725 self.rows.iter()
2726 }
2727
2728 #[cfg(feature = "arrow-export")]
2743 pub fn to_record_batch(
2744 &self,
2745 ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2746 arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2747 }
2748
2749 #[cfg(feature = "arrow-export")]
2760 pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2761 let batch = self.to_record_batch()?;
2762 arrow::record_batch_to_ipc_stream(&batch)
2763 }
2764}
2765
2766impl std::fmt::Display for QueryResult {
2767 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2768 let table = grafeo_common::fmt::format_result_table(
2769 &self.columns,
2770 &self.rows,
2771 self.execution_time_ms,
2772 self.status_message.as_deref(),
2773 );
2774 f.write_str(&table)
2775 }
2776}
2777
2778pub trait FromValue: Sized {
2783 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2789}
2790
2791impl FromValue for i64 {
2792 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2793 value
2794 .as_int64()
2795 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2796 expected: "INT64".to_string(),
2797 found: value.type_name().to_string(),
2798 })
2799 }
2800}
2801
2802impl FromValue for f64 {
2803 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2804 value
2805 .as_float64()
2806 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2807 expected: "FLOAT64".to_string(),
2808 found: value.type_name().to_string(),
2809 })
2810 }
2811}
2812
2813impl FromValue for String {
2814 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2815 value.as_str().map(String::from).ok_or_else(|| {
2816 grafeo_common::utils::error::Error::TypeMismatch {
2817 expected: "STRING".to_string(),
2818 found: value.type_name().to_string(),
2819 }
2820 })
2821 }
2822}
2823
2824impl FromValue for bool {
2825 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2826 value
2827 .as_bool()
2828 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2829 expected: "BOOL".to_string(),
2830 found: value.type_name().to_string(),
2831 })
2832 }
2833}
2834
2835#[cfg(test)]
2836mod tests {
2837 use super::*;
2838
2839 #[test]
2840 fn test_create_in_memory_database() {
2841 let db = GrafeoDB::new_in_memory();
2842 assert_eq!(db.node_count(), 0);
2843 assert_eq!(db.edge_count(), 0);
2844 }
2845
2846 #[test]
2847 fn test_database_config() {
2848 let config = Config::in_memory().with_threads(4).with_query_logging();
2849
2850 let db = GrafeoDB::with_config(config).unwrap();
2851 assert_eq!(db.config().threads, 4);
2852 assert!(db.config().query_logging);
2853 }
2854
2855 #[test]
2856 fn test_database_session() {
2857 let db = GrafeoDB::new_in_memory();
2858 let _session = db.session();
2859 }
2861
2862 #[cfg(feature = "wal")]
2863 #[test]
2864 fn test_persistent_database_recovery() {
2865 use grafeo_common::types::Value;
2866 use tempfile::tempdir;
2867
2868 let dir = tempdir().unwrap();
2869 let db_path = dir.path().join("test_db");
2870
2871 {
2873 let db = GrafeoDB::open(&db_path).unwrap();
2874
2875 let alix = db.create_node(&["Person"]);
2876 db.set_node_property(alix, "name", Value::from("Alix"));
2877
2878 let gus = db.create_node(&["Person"]);
2879 db.set_node_property(gus, "name", Value::from("Gus"));
2880
2881 let _edge = db.create_edge(alix, gus, "KNOWS");
2882
2883 db.close().unwrap();
2885 }
2886
2887 {
2889 let db = GrafeoDB::open(&db_path).unwrap();
2890
2891 assert_eq!(db.node_count(), 2);
2892 assert_eq!(db.edge_count(), 1);
2893
2894 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2896 assert!(node0.is_some());
2897
2898 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2899 assert!(node1.is_some());
2900 }
2901 }
2902
2903 #[cfg(feature = "wal")]
2904 #[test]
2905 fn test_wal_logging() {
2906 use tempfile::tempdir;
2907
2908 let dir = tempdir().unwrap();
2909 let db_path = dir.path().join("wal_test_db");
2910
2911 let db = GrafeoDB::open(&db_path).unwrap();
2912
2913 let node = db.create_node(&["Test"]);
2915 db.delete_node(node);
2916
2917 if let Some(wal) = db.wal() {
2919 assert!(wal.record_count() > 0);
2920 }
2921
2922 db.close().unwrap();
2923 }
2924
2925 #[cfg(feature = "wal")]
2926 #[test]
2927 fn test_wal_recovery_multiple_sessions() {
2928 use grafeo_common::types::Value;
2930 use tempfile::tempdir;
2931
2932 let dir = tempdir().unwrap();
2933 let db_path = dir.path().join("multi_session_db");
2934
2935 {
2937 let db = GrafeoDB::open(&db_path).unwrap();
2938 let alix = db.create_node(&["Person"]);
2939 db.set_node_property(alix, "name", Value::from("Alix"));
2940 db.close().unwrap();
2941 }
2942
2943 {
2945 let db = GrafeoDB::open(&db_path).unwrap();
2946 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
2948 db.set_node_property(gus, "name", Value::from("Gus"));
2949 db.close().unwrap();
2950 }
2951
2952 {
2954 let db = GrafeoDB::open(&db_path).unwrap();
2955 assert_eq!(db.node_count(), 2);
2956
2957 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2959 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2960
2961 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2962 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2963 }
2964 }
2965
2966 #[cfg(feature = "wal")]
2967 #[test]
2968 fn test_database_consistency_after_mutations() {
2969 use grafeo_common::types::Value;
2971 use tempfile::tempdir;
2972
2973 let dir = tempdir().unwrap();
2974 let db_path = dir.path().join("consistency_db");
2975
2976 {
2977 let db = GrafeoDB::open(&db_path).unwrap();
2978
2979 let a = db.create_node(&["Node"]);
2981 let b = db.create_node(&["Node"]);
2982 let c = db.create_node(&["Node"]);
2983
2984 let e1 = db.create_edge(a, b, "LINKS");
2986 let _e2 = db.create_edge(b, c, "LINKS");
2987
2988 db.delete_edge(e1);
2990 db.delete_node(b);
2991
2992 db.set_node_property(a, "value", Value::Int64(1));
2994 db.set_node_property(c, "value", Value::Int64(3));
2995
2996 db.close().unwrap();
2997 }
2998
2999 {
3001 let db = GrafeoDB::open(&db_path).unwrap();
3002
3003 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
3007 assert!(node_a.is_some());
3008
3009 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
3010 assert!(node_c.is_some());
3011
3012 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
3014 assert!(node_b.is_none());
3015 }
3016 }
3017
3018 #[cfg(feature = "wal")]
3019 #[test]
3020 fn test_close_is_idempotent() {
3021 use tempfile::tempdir;
3023
3024 let dir = tempdir().unwrap();
3025 let db_path = dir.path().join("close_test_db");
3026
3027 let db = GrafeoDB::open(&db_path).unwrap();
3028 db.create_node(&["Test"]);
3029
3030 assert!(db.close().is_ok());
3032
3033 assert!(db.close().is_ok());
3035 }
3036
3037 #[test]
3038 fn test_with_store_external_backend() {
3039 use grafeo_core::graph::lpg::LpgStore;
3040
3041 let external = Arc::new(LpgStore::new().unwrap());
3042
3043 let n1 = external.create_node(&["Person"]);
3045 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
3046
3047 let db = GrafeoDB::with_store(
3048 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
3049 Config::in_memory(),
3050 )
3051 .unwrap();
3052
3053 let session = db.session();
3054
3055 #[cfg(feature = "gql")]
3057 {
3058 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
3059 assert_eq!(result.rows.len(), 1);
3060 }
3061 }
3062
3063 #[test]
3064 fn test_with_config_custom_memory_limit() {
3065 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
3068 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
3069 assert_eq!(db.node_count(), 0);
3070 }
3071
3072 #[cfg(feature = "metrics")]
3073 #[test]
3074 fn test_database_metrics_registry() {
3075 let db = GrafeoDB::new_in_memory();
3076
3077 db.create_node(&["Person"]);
3079 db.create_node(&["Person"]);
3080
3081 let snap = db.metrics();
3083 assert_eq!(snap.query_count, 0); }
3086
3087 #[test]
3088 fn test_query_result_has_metrics() {
3089 let db = GrafeoDB::new_in_memory();
3091 db.create_node(&["Person"]);
3092 db.create_node(&["Person"]);
3093
3094 #[cfg(feature = "gql")]
3095 {
3096 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3097
3098 assert!(result.execution_time_ms.is_some());
3100 assert!(result.rows_scanned.is_some());
3101 assert!(result.execution_time_ms.unwrap() >= 0.0);
3102 assert_eq!(result.rows_scanned.unwrap(), 2);
3103 }
3104 }
3105
3106 #[test]
3107 fn test_empty_query_result_metrics() {
3108 let db = GrafeoDB::new_in_memory();
3110 db.create_node(&["Person"]);
3111
3112 #[cfg(feature = "gql")]
3113 {
3114 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3116
3117 assert!(result.execution_time_ms.is_some());
3118 assert!(result.rows_scanned.is_some());
3119 assert_eq!(result.rows_scanned.unwrap(), 0);
3120 }
3121 }
3122
3123 #[cfg(feature = "cdc")]
3124 mod cdc_integration {
3125 use super::*;
3126
3127 fn cdc_db() -> GrafeoDB {
3129 GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
3130 }
3131
3132 #[test]
3133 fn test_node_lifecycle_history() {
3134 let db = cdc_db();
3135
3136 let id = db.create_node(&["Person"]);
3138 db.set_node_property(id, "name", "Alix".into());
3140 db.set_node_property(id, "name", "Gus".into());
3141 db.delete_node(id);
3143
3144 let history = db.history(id).unwrap();
3145 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3147 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3148 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3150 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3152 }
3153
3154 #[test]
3155 fn test_edge_lifecycle_history() {
3156 let db = cdc_db();
3157
3158 let alix = db.create_node(&["Person"]);
3159 let gus = db.create_node(&["Person"]);
3160 let edge = db.create_edge(alix, gus, "KNOWS");
3161 db.set_edge_property(edge, "since", 2024i64.into());
3162 db.delete_edge(edge);
3163
3164 let history = db.history(edge).unwrap();
3165 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3167 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3168 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3169 }
3170
3171 #[test]
3172 fn test_create_node_with_props_cdc() {
3173 let db = cdc_db();
3174
3175 let id = db.create_node_with_props(
3176 &["Person"],
3177 vec![
3178 ("name", grafeo_common::types::Value::from("Alix")),
3179 ("age", grafeo_common::types::Value::from(30i64)),
3180 ],
3181 );
3182
3183 let history = db.history(id).unwrap();
3184 assert_eq!(history.len(), 1);
3185 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3186 let after = history[0].after.as_ref().unwrap();
3188 assert_eq!(after.len(), 2);
3189 }
3190
3191 #[test]
3192 fn test_changes_between() {
3193 let db = cdc_db();
3194
3195 let id1 = db.create_node(&["A"]);
3196 let _id2 = db.create_node(&["B"]);
3197 db.set_node_property(id1, "x", 1i64.into());
3198
3199 let changes = db
3201 .changes_between(
3202 grafeo_common::types::EpochId(0),
3203 grafeo_common::types::EpochId(u64::MAX),
3204 )
3205 .unwrap();
3206 assert_eq!(changes.len(), 3); }
3208
3209 #[test]
3210 fn test_cdc_disabled_by_default() {
3211 let db = GrafeoDB::new_in_memory();
3212 assert!(!db.is_cdc_enabled());
3213
3214 let id = db.create_node(&["Person"]);
3215 db.set_node_property(id, "name", "Alix".into());
3216
3217 let history = db.history(id).unwrap();
3218 assert!(history.is_empty(), "CDC off by default: no events recorded");
3219 }
3220
3221 #[test]
3222 fn test_session_with_cdc_override_on() {
3223 let db = GrafeoDB::new_in_memory();
3225 let session = db.session_with_cdc(true);
3226 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3227 let changes = db
3229 .changes_between(
3230 grafeo_common::types::EpochId(0),
3231 grafeo_common::types::EpochId(u64::MAX),
3232 )
3233 .unwrap();
3234 assert!(
3235 !changes.is_empty(),
3236 "session_with_cdc(true) should record events"
3237 );
3238 }
3239
3240 #[test]
3241 fn test_session_with_cdc_override_off() {
3242 let db = cdc_db();
3244 let session = db.session_with_cdc(false);
3245 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3246 let changes = db
3247 .changes_between(
3248 grafeo_common::types::EpochId(0),
3249 grafeo_common::types::EpochId(u64::MAX),
3250 )
3251 .unwrap();
3252 assert!(
3253 changes.is_empty(),
3254 "session_with_cdc(false) should not record events"
3255 );
3256 }
3257
3258 #[test]
3259 fn test_set_cdc_enabled_runtime() {
3260 let db = GrafeoDB::new_in_memory();
3261 assert!(!db.is_cdc_enabled());
3262
3263 db.set_cdc_enabled(true);
3265 assert!(db.is_cdc_enabled());
3266
3267 let id = db.create_node(&["Person"]);
3268 let history = db.history(id).unwrap();
3269 assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3270
3271 db.set_cdc_enabled(false);
3273 let id2 = db.create_node(&["Person"]);
3274 let history2 = db.history(id2).unwrap();
3275 assert!(
3276 history2.is_empty(),
3277 "CDC disabled at runtime stops recording"
3278 );
3279 }
3280 }
3281
3282 #[test]
3283 fn test_with_store_basic() {
3284 use grafeo_core::graph::lpg::LpgStore;
3285
3286 let store = Arc::new(LpgStore::new().unwrap());
3287 let n1 = store.create_node(&["Person"]);
3288 store.set_node_property(n1, "name", "Alix".into());
3289
3290 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3291 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3292
3293 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3294 assert_eq!(result.rows.len(), 1);
3295 }
3296
3297 #[test]
3298 fn test_with_store_session() {
3299 use grafeo_core::graph::lpg::LpgStore;
3300
3301 let store = Arc::new(LpgStore::new().unwrap());
3302 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3303 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3304
3305 let session = db.session();
3306 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3307 assert_eq!(result.rows.len(), 1);
3308 }
3309
3310 #[test]
3311 fn test_with_store_mutations() {
3312 use grafeo_core::graph::lpg::LpgStore;
3313
3314 let store = Arc::new(LpgStore::new().unwrap());
3315 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3316 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3317
3318 let mut session = db.session();
3319
3320 session.begin_transaction().unwrap();
3324 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3325
3326 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3327 assert_eq!(result.rows.len(), 1);
3328
3329 session.commit().unwrap();
3330 }
3331
3332 #[test]
3337 fn test_query_result_empty() {
3338 let result = QueryResult::empty();
3339 assert!(result.is_empty());
3340 assert_eq!(result.row_count(), 0);
3341 assert_eq!(result.column_count(), 0);
3342 assert!(result.execution_time_ms().is_none());
3343 assert!(result.rows_scanned().is_none());
3344 assert!(result.status_message.is_none());
3345 }
3346
3347 #[test]
3348 fn test_query_result_status() {
3349 let result = QueryResult::status("Created node type 'Person'");
3350 assert!(result.is_empty());
3351 assert_eq!(result.column_count(), 0);
3352 assert_eq!(
3353 result.status_message.as_deref(),
3354 Some("Created node type 'Person'")
3355 );
3356 }
3357
3358 #[test]
3359 fn test_query_result_new_with_columns() {
3360 let result = QueryResult::new(vec!["name".into(), "age".into()]);
3361 assert_eq!(result.column_count(), 2);
3362 assert_eq!(result.row_count(), 0);
3363 assert!(result.is_empty());
3364 assert_eq!(
3366 result.column_types,
3367 vec![
3368 grafeo_common::types::LogicalType::Any,
3369 grafeo_common::types::LogicalType::Any
3370 ]
3371 );
3372 }
3373
3374 #[test]
3375 fn test_query_result_with_types() {
3376 use grafeo_common::types::LogicalType;
3377 let result = QueryResult::with_types(
3378 vec!["name".into(), "age".into()],
3379 vec![LogicalType::String, LogicalType::Int64],
3380 );
3381 assert_eq!(result.column_count(), 2);
3382 assert_eq!(result.column_types[0], LogicalType::String);
3383 assert_eq!(result.column_types[1], LogicalType::Int64);
3384 }
3385
3386 #[test]
3387 fn test_query_result_with_metrics() {
3388 let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3389 assert_eq!(result.execution_time_ms(), Some(42.5));
3390 assert_eq!(result.rows_scanned(), Some(100));
3391 }
3392
3393 #[test]
3394 fn test_query_result_scalar_success() {
3395 use grafeo_common::types::Value;
3396 let mut result = QueryResult::new(vec!["count".into()]);
3397 result.rows.push(vec![Value::Int64(42)]);
3398
3399 let val: i64 = result.scalar().unwrap();
3400 assert_eq!(val, 42);
3401 }
3402
3403 #[test]
3404 fn test_query_result_scalar_wrong_shape() {
3405 use grafeo_common::types::Value;
3406 let mut result = QueryResult::new(vec!["x".into()]);
3408 result.rows.push(vec![Value::Int64(1)]);
3409 result.rows.push(vec![Value::Int64(2)]);
3410 assert!(result.scalar::<i64>().is_err());
3411
3412 let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3414 result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3415 assert!(result2.scalar::<i64>().is_err());
3416
3417 let result3 = QueryResult::new(vec!["x".into()]);
3419 assert!(result3.scalar::<i64>().is_err());
3420 }
3421
3422 #[test]
3423 fn test_query_result_iter() {
3424 use grafeo_common::types::Value;
3425 let mut result = QueryResult::new(vec!["x".into()]);
3426 result.rows.push(vec![Value::Int64(1)]);
3427 result.rows.push(vec![Value::Int64(2)]);
3428
3429 let collected: Vec<_> = result.iter().collect();
3430 assert_eq!(collected.len(), 2);
3431 }
3432
3433 #[test]
3434 fn test_query_result_display() {
3435 use grafeo_common::types::Value;
3436 let mut result = QueryResult::new(vec!["name".into()]);
3437 result.rows.push(vec![Value::from("Alix")]);
3438 let display = result.to_string();
3439 assert!(display.contains("name"));
3440 assert!(display.contains("Alix"));
3441 }
3442
3443 #[test]
3448 fn test_from_value_i64_type_mismatch() {
3449 use grafeo_common::types::Value;
3450 let val = Value::from("not a number");
3451 assert!(i64::from_value(&val).is_err());
3452 }
3453
3454 #[test]
3455 fn test_from_value_f64_type_mismatch() {
3456 use grafeo_common::types::Value;
3457 let val = Value::from("not a float");
3458 assert!(f64::from_value(&val).is_err());
3459 }
3460
3461 #[test]
3462 fn test_from_value_string_type_mismatch() {
3463 use grafeo_common::types::Value;
3464 let val = Value::Int64(42);
3465 assert!(String::from_value(&val).is_err());
3466 }
3467
3468 #[test]
3469 fn test_from_value_bool_type_mismatch() {
3470 use grafeo_common::types::Value;
3471 let val = Value::Int64(1);
3472 assert!(bool::from_value(&val).is_err());
3473 }
3474
3475 #[test]
3476 fn test_from_value_all_success() {
3477 use grafeo_common::types::Value;
3478 assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3479 assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3480 assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3481 assert!(bool::from_value(&Value::Bool(true)).unwrap());
3482 }
3483
3484 #[test]
3489 fn test_database_is_read_only_false_by_default() {
3490 let db = GrafeoDB::new_in_memory();
3491 assert!(!db.is_read_only());
3492 }
3493
3494 #[test]
3495 fn test_database_graph_model() {
3496 let db = GrafeoDB::new_in_memory();
3497 assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3498 }
3499
3500 #[test]
3501 fn test_database_memory_limit_none_by_default() {
3502 let db = GrafeoDB::new_in_memory();
3503 assert!(db.memory_limit().is_none());
3504 }
3505
3506 #[test]
3507 fn test_database_memory_limit_custom() {
3508 let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3509 let db = GrafeoDB::with_config(config).unwrap();
3510 assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3511 }
3512
3513 #[test]
3514 fn test_database_adaptive_config() {
3515 let db = GrafeoDB::new_in_memory();
3516 let adaptive = db.adaptive_config();
3517 assert!(adaptive.enabled);
3518 assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3519 }
3520
3521 #[test]
3522 fn test_database_buffer_manager() {
3523 let db = GrafeoDB::new_in_memory();
3524 let _bm = db.buffer_manager();
3525 }
3527
3528 #[test]
3529 fn test_database_query_cache() {
3530 let db = GrafeoDB::new_in_memory();
3531 let _qc = db.query_cache();
3532 }
3533
3534 #[test]
3535 fn test_database_clear_plan_cache() {
3536 let db = GrafeoDB::new_in_memory();
3537 #[cfg(feature = "gql")]
3539 {
3540 let _ = db.execute("MATCH (n) RETURN count(n)");
3541 }
3542 db.clear_plan_cache();
3543 }
3545
3546 #[test]
3547 fn test_database_gc() {
3548 let db = GrafeoDB::new_in_memory();
3549 db.create_node(&["Person"]);
3550 db.gc();
3551 assert_eq!(db.node_count(), 1);
3553 }
3554
3555 #[test]
3560 fn test_create_and_list_graphs() {
3561 let db = GrafeoDB::new_in_memory();
3562 let created = db.create_graph("social").unwrap();
3563 assert!(created);
3564
3565 let created_again = db.create_graph("social").unwrap();
3567 assert!(!created_again);
3568
3569 let names = db.list_graphs();
3570 assert!(names.contains(&"social".to_string()));
3571 }
3572
3573 #[test]
3574 fn test_drop_graph() {
3575 let db = GrafeoDB::new_in_memory();
3576 db.create_graph("temp").unwrap();
3577 assert!(db.drop_graph("temp"));
3578 assert!(!db.drop_graph("temp")); }
3580
3581 #[test]
3582 fn test_drop_graph_resets_current_graph() {
3583 let db = GrafeoDB::new_in_memory();
3584 db.create_graph("active").unwrap();
3585 db.set_current_graph(Some("active")).unwrap();
3586 assert_eq!(db.current_graph(), Some("active".to_string()));
3587
3588 db.drop_graph("active");
3589 assert_eq!(db.current_graph(), None);
3590 }
3591
3592 #[test]
3597 fn test_current_graph_default_none() {
3598 let db = GrafeoDB::new_in_memory();
3599 assert_eq!(db.current_graph(), None);
3600 }
3601
3602 #[test]
3603 fn test_set_current_graph_valid() {
3604 let db = GrafeoDB::new_in_memory();
3605 db.create_graph("social").unwrap();
3606 db.set_current_graph(Some("social")).unwrap();
3607 assert_eq!(db.current_graph(), Some("social".to_string()));
3608 }
3609
3610 #[test]
3611 fn test_set_current_graph_nonexistent() {
3612 let db = GrafeoDB::new_in_memory();
3613 let result = db.set_current_graph(Some("nonexistent"));
3614 assert!(result.is_err());
3615 }
3616
3617 #[test]
3618 fn test_set_current_graph_none_resets() {
3619 let db = GrafeoDB::new_in_memory();
3620 db.create_graph("social").unwrap();
3621 db.set_current_graph(Some("social")).unwrap();
3622 db.set_current_graph(None).unwrap();
3623 assert_eq!(db.current_graph(), None);
3624 }
3625
3626 #[test]
3627 fn test_set_current_graph_default_keyword() {
3628 let db = GrafeoDB::new_in_memory();
3629 db.set_current_graph(Some("default")).unwrap();
3631 assert_eq!(db.current_graph(), Some("default".to_string()));
3632 }
3633
3634 #[test]
3635 fn test_current_schema_default_none() {
3636 let db = GrafeoDB::new_in_memory();
3637 assert_eq!(db.current_schema(), None);
3638 }
3639
3640 #[test]
3641 fn test_set_current_schema_nonexistent() {
3642 let db = GrafeoDB::new_in_memory();
3643 let result = db.set_current_schema(Some("nonexistent"));
3644 assert!(result.is_err());
3645 }
3646
3647 #[test]
3648 fn test_set_current_schema_none_resets() {
3649 let db = GrafeoDB::new_in_memory();
3650 db.set_current_schema(None).unwrap();
3651 assert_eq!(db.current_schema(), None);
3652 }
3653
3654 #[test]
3659 fn test_graph_store_returns_lpg_by_default() {
3660 let db = GrafeoDB::new_in_memory();
3661 db.create_node(&["Person"]);
3662 let store = db.graph_store();
3663 assert_eq!(store.node_count(), 1);
3664 }
3665
3666 #[test]
3667 fn test_graph_store_mut_returns_some_by_default() {
3668 let db = GrafeoDB::new_in_memory();
3669 assert!(db.graph_store_mut().is_some());
3670 }
3671
3672 #[test]
3673 fn test_with_read_store() {
3674 use grafeo_core::graph::lpg::LpgStore;
3675
3676 let store = Arc::new(LpgStore::new().unwrap());
3677 store.create_node(&["Person"]);
3678
3679 let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
3680 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3681
3682 assert!(db.is_read_only());
3683 assert!(db.graph_store_mut().is_none());
3684
3685 let gs = db.graph_store();
3687 assert_eq!(gs.node_count(), 1);
3688 }
3689
3690 #[test]
3691 fn test_with_store_graph_store_methods() {
3692 use grafeo_core::graph::lpg::LpgStore;
3693
3694 let store = Arc::new(LpgStore::new().unwrap());
3695 store.create_node(&["Person"]);
3696
3697 let db = GrafeoDB::with_store(
3698 Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3699 Config::in_memory(),
3700 )
3701 .unwrap();
3702
3703 assert!(!db.is_read_only());
3704 assert!(db.graph_store_mut().is_some());
3705 assert_eq!(db.graph_store().node_count(), 1);
3706 }
3707
3708 #[test]
3713 #[allow(deprecated)]
3714 fn test_session_read_only() {
3715 let db = GrafeoDB::new_in_memory();
3716 db.create_node(&["Person"]);
3717
3718 let session = db.session_read_only();
3719 #[cfg(feature = "gql")]
3721 {
3722 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3723 assert_eq!(result.rows.len(), 1);
3724 }
3725 }
3726
3727 #[test]
3732 fn test_close_in_memory_database() {
3733 let db = GrafeoDB::new_in_memory();
3734 db.create_node(&["Person"]);
3735 assert!(db.close().is_ok());
3736 assert!(db.close().is_ok());
3738 }
3739
3740 #[test]
3745 fn test_with_config_invalid_config_zero_threads() {
3746 let config = Config::in_memory().with_threads(0);
3747 let result = GrafeoDB::with_config(config);
3748 assert!(result.is_err());
3749 }
3750
3751 #[test]
3752 fn test_with_config_invalid_config_zero_memory_limit() {
3753 let config = Config::in_memory().with_memory_limit(0);
3754 let result = GrafeoDB::with_config(config);
3755 assert!(result.is_err());
3756 }
3757
3758 #[test]
3763 fn test_storage_format_display() {
3764 use crate::config::StorageFormat;
3765 assert_eq!(StorageFormat::Auto.to_string(), "auto");
3766 assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3767 assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3768 }
3769
3770 #[test]
3771 fn test_storage_format_default() {
3772 use crate::config::StorageFormat;
3773 assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3774 }
3775
3776 #[test]
3777 fn test_config_with_storage_format() {
3778 use crate::config::StorageFormat;
3779 let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3780 assert_eq!(config.storage_format, StorageFormat::SingleFile);
3781 }
3782
3783 #[test]
3788 fn test_config_with_cdc() {
3789 let config = Config::in_memory().with_cdc();
3790 assert!(config.cdc_enabled);
3791 }
3792
3793 #[test]
3794 fn test_config_cdc_default_false() {
3795 let config = Config::in_memory();
3796 assert!(!config.cdc_enabled);
3797 }
3798
3799 #[test]
3804 fn test_config_error_is_error_trait() {
3805 use crate::config::ConfigError;
3806 let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3807 assert!(err.source().is_none());
3808 }
3809
3810 #[cfg(feature = "metrics")]
3815 #[test]
3816 fn test_metrics_prometheus_output() {
3817 let db = GrafeoDB::new_in_memory();
3818 let prom = db.metrics_prometheus();
3819 assert!(!prom.is_empty());
3821 }
3822
3823 #[cfg(feature = "metrics")]
3824 #[test]
3825 fn test_reset_metrics() {
3826 let db = GrafeoDB::new_in_memory();
3827 let _session = db.session();
3829 db.reset_metrics();
3830 let snap = db.metrics();
3831 assert_eq!(snap.query_count, 0);
3832 }
3833
3834 #[test]
3839 fn test_drop_graph_on_external_store() {
3840 use grafeo_core::graph::lpg::LpgStore;
3841
3842 let store = Arc::new(LpgStore::new().unwrap());
3843 let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
3844 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3845
3846 assert!(!db.drop_graph("anything"));
3848 }
3849}