1#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(all(feature = "compact-store", feature = "mmap"))]
31pub mod compact_tiered;
32#[cfg(feature = "lpg")]
33mod crud;
34#[cfg(feature = "embed")]
35mod embed;
36#[cfg(feature = "grafeo-file")]
37pub(crate) mod flush;
38#[cfg(feature = "lpg")]
39mod import;
40#[cfg(feature = "lpg")]
41mod index;
42#[cfg(feature = "lpg")]
43mod persistence;
44mod query;
45#[cfg(feature = "triple-store")]
46mod rdf_ops;
47#[cfg(feature = "lpg")]
48mod search;
49pub(crate) mod section_consumer;
50#[cfg(all(feature = "wal", feature = "lpg"))]
51pub(crate) mod wal_store;
52
53use grafeo_common::{grafeo_error, grafeo_warn};
54#[cfg(feature = "wal")]
55use std::path::Path;
56use std::sync::Arc;
57use std::sync::atomic::AtomicUsize;
58
59use parking_lot::RwLock;
60
61use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
62use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
63#[cfg(feature = "lpg")]
64use grafeo_core::graph::lpg::LpgStore;
65#[cfg(feature = "triple-store")]
66use grafeo_core::graph::rdf::RdfStore;
67use grafeo_core::graph::{GraphStoreMut, GraphStoreSearch};
68#[cfg(feature = "grafeo-file")]
69use grafeo_storage::file::GrafeoFileManager;
70#[cfg(all(feature = "wal", feature = "lpg"))]
71use grafeo_storage::wal::WalRecovery;
72#[cfg(feature = "wal")]
73use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
74
75use crate::catalog::Catalog;
76use crate::config::Config;
77use crate::query::cache::QueryCache;
78use crate::session::Session;
79use crate::transaction::TransactionManager;
80
81pub struct GrafeoDB {
104 pub(super) config: Config,
106 #[cfg(feature = "lpg")]
108 pub(super) store: Option<Arc<LpgStore>>,
109 pub(super) catalog: Arc<Catalog>,
111 #[cfg(feature = "triple-store")]
113 pub(super) rdf_store: Arc<RdfStore>,
114 pub(super) transaction_manager: Arc<TransactionManager>,
116 pub(super) buffer_manager: Arc<BufferManager>,
118 #[cfg(feature = "wal")]
120 pub(super) wal: Option<Arc<LpgWal>>,
121 #[cfg(feature = "wal")]
125 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
126 pub(super) query_cache: Arc<QueryCache>,
128 pub(super) commit_counter: Arc<AtomicUsize>,
130 pub(super) is_open: RwLock<bool>,
132 #[cfg(feature = "cdc")]
134 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
135 #[cfg(feature = "cdc")]
137 cdc_enabled: std::sync::atomic::AtomicBool,
138 #[cfg(feature = "embed")]
140 pub(super) embedding_models:
141 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
142 #[cfg(feature = "grafeo-file")]
144 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
145 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
148 checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
149 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
152 vector_spill_storages: Option<
153 Arc<
154 parking_lot::RwLock<
155 std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
156 >,
157 >,
158 >,
159 pub(super) external_read_store: Option<Arc<dyn GraphStoreSearch>>,
162 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
165 #[cfg(feature = "metrics")]
167 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
168 current_graph: RwLock<Option<String>>,
172 current_schema: RwLock<Option<String>>,
176 read_only: bool,
179 projections:
181 Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
182 #[cfg(all(feature = "compact-store", feature = "lpg"))]
184 layered_store: Option<Arc<grafeo_core::graph::compact::layered::LayeredStore>>,
185 #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
192 compact_tiered: Option<Arc<compact_tiered::CompactStoreTiered>>,
193}
194
195impl GrafeoDB {
196 #[cfg(feature = "lpg")]
204 fn lpg_store(&self) -> &Arc<LpgStore> {
205 self.store.as_ref().expect(
206 "no built-in LpgStore: this GrafeoDB was created with an external store \
207 (with_store / with_read_store). Use session() or graph_store() instead.",
208 )
209 }
210
211 #[cfg(any(
221 feature = "vector-index",
222 feature = "text-index",
223 feature = "hybrid-search",
224 feature = "embed",
225 ))]
226 fn graph_store_ref(&self) -> &dyn grafeo_core::graph::GraphStore {
227 if let Some(ref ext_read) = self.external_read_store {
228 ext_read.as_ref()
229 } else {
230 #[cfg(feature = "lpg")]
231 {
232 &**self.lpg_store()
233 }
234 #[cfg(not(feature = "lpg"))]
235 unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
236 }
237 }
238
239 #[cfg(feature = "cdc")]
241 #[inline]
242 pub(super) fn cdc_active(&self) -> bool {
243 self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
244 }
245
246 #[must_use]
267 pub fn new_in_memory() -> Self {
268 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
269 }
270
271 #[cfg(feature = "wal")]
290 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
291 Self::with_config(Config::persistent(path.as_ref()))
292 }
293
294 #[cfg(feature = "grafeo-file")]
319 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
320 Self::with_config(Config::read_only(path.as_ref()))
321 }
322
323 pub fn with_config(config: Config) -> Result<Self> {
347 config
349 .validate()
350 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
351
352 #[cfg(feature = "lpg")]
353 let store = Arc::new(LpgStore::new()?);
354 #[cfg(feature = "triple-store")]
355 let rdf_store = Arc::new(RdfStore::new());
356 let transaction_manager = Arc::new(TransactionManager::new());
357
358 let buffer_config = BufferManagerConfig {
360 budget: config.memory_limit.unwrap_or_else(|| {
361 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
363 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
364 b
365 }),
366 spill_path: config.spill_path.clone().or_else(|| {
367 config.path.as_ref().and_then(|p| {
368 let parent = p.parent()?;
369 let name = p.file_name()?.to_str()?;
370 Some(parent.join(format!("{name}.spill")))
371 })
372 }),
373 ..BufferManagerConfig::default()
374 };
375 let buffer_manager = BufferManager::new(buffer_config);
376
377 let catalog = Arc::new(Catalog::new());
379
380 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
381
382 #[cfg(feature = "grafeo-file")]
384 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
385 if let Some(ref db_path) = config.path {
387 if db_path.exists() && db_path.is_file() {
388 let fm = GrafeoFileManager::open_read_only(db_path)?;
389 #[cfg(feature = "lpg")]
391 if fm.read_section_directory()?.is_some() {
392 Self::load_from_sections(
393 &fm,
394 &store,
395 &catalog,
396 #[cfg(feature = "triple-store")]
397 &rdf_store,
398 )?;
399 } else {
400 let snapshot_data = fm.read_snapshot()?;
402 if !snapshot_data.is_empty() {
403 Self::apply_snapshot_data(
404 &store,
405 &catalog,
406 #[cfg(feature = "triple-store")]
407 &rdf_store,
408 &snapshot_data,
409 )?;
410 }
411 }
412 Some(Arc::new(fm))
413 } else {
414 return Err(grafeo_common::utils::error::Error::Internal(format!(
415 "read-only open requires an existing .grafeo file: {}",
416 db_path.display()
417 )));
418 }
419 } else {
420 return Err(grafeo_common::utils::error::Error::Internal(
421 "read-only mode requires a database path".to_string(),
422 ));
423 }
424 } else if let Some(ref db_path) = config.path {
425 if Self::should_use_single_file(db_path, config.storage_format) {
430 let fm = if db_path.exists() && db_path.is_file() {
431 GrafeoFileManager::open(db_path)?
432 } else if !db_path.exists() {
433 GrafeoFileManager::create(db_path)?
434 } else {
435 return Err(grafeo_common::utils::error::Error::Internal(format!(
437 "path exists but is not a file: {}",
438 db_path.display()
439 )));
440 };
441
442 #[cfg(feature = "lpg")]
444 if fm.read_section_directory()?.is_some() {
445 Self::load_from_sections(
446 &fm,
447 &store,
448 &catalog,
449 #[cfg(feature = "triple-store")]
450 &rdf_store,
451 )?;
452 } else {
453 let snapshot_data = fm.read_snapshot()?;
454 if !snapshot_data.is_empty() {
455 Self::apply_snapshot_data(
456 &store,
457 &catalog,
458 #[cfg(feature = "triple-store")]
459 &rdf_store,
460 &snapshot_data,
461 )?;
462 }
463 }
464
465 #[cfg(all(feature = "wal", feature = "lpg"))]
467 if config.wal_enabled && fm.has_sidecar_wal() {
468 let recovery = WalRecovery::new(fm.sidecar_wal_path());
469 let records = recovery.recover()?;
470 Self::apply_wal_records(
471 &store,
472 &catalog,
473 #[cfg(feature = "triple-store")]
474 &rdf_store,
475 &records,
476 )?;
477 }
478
479 Some(Arc::new(fm))
480 } else {
481 None
482 }
483 } else {
484 None
485 };
486
487 #[cfg(feature = "wal")]
490 let wal = if is_read_only {
491 None
492 } else if config.wal_enabled {
493 if let Some(ref db_path) = config.path {
494 #[cfg(feature = "grafeo-file")]
496 let wal_path = if let Some(ref fm) = file_manager {
497 let p = fm.sidecar_wal_path();
498 std::fs::create_dir_all(&p)?;
499 p
500 } else {
501 std::fs::create_dir_all(db_path)?;
503 db_path.join("wal")
504 };
505
506 #[cfg(not(feature = "grafeo-file"))]
507 let wal_path = {
508 std::fs::create_dir_all(db_path)?;
509 db_path.join("wal")
510 };
511
512 #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
514 let is_single_file = file_manager.is_some();
515 #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
516 let is_single_file = false;
517
518 #[cfg(feature = "lpg")]
519 if !is_single_file && wal_path.exists() {
520 let recovery = WalRecovery::new(&wal_path);
521 let records = recovery.recover()?;
522 Self::apply_wal_records(
523 &store,
524 &catalog,
525 #[cfg(feature = "triple-store")]
526 &rdf_store,
527 &records,
528 )?;
529 }
530
531 let wal_durability = match config.wal_durability {
533 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
534 crate::config::DurabilityMode::Batch {
535 max_delay_ms,
536 max_records,
537 } => WalDurabilityMode::Batch {
538 max_delay_ms,
539 max_records,
540 },
541 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
542 WalDurabilityMode::Adaptive { target_interval_ms }
543 }
544 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
545 };
546 let wal_config = WalConfig {
547 durability: wal_durability,
548 ..WalConfig::default()
549 };
550 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
551 Some(Arc::new(wal_manager))
552 } else {
553 None
554 }
555 } else {
556 None
557 };
558
559 let query_cache = Arc::new(QueryCache::default());
561
562 #[cfg(all(feature = "temporal", feature = "lpg"))]
565 transaction_manager.sync_epoch(store.current_epoch());
566
567 #[cfg(feature = "cdc")]
568 let cdc_enabled_val = config.cdc_enabled;
569 #[cfg(feature = "cdc")]
570 let cdc_retention = config.cdc_retention.clone();
571
572 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
575 let checkpoint_interval = config.checkpoint_interval;
576 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
577 let timer_store = Arc::clone(&store);
578 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
579 let timer_catalog = Arc::clone(&catalog);
580 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
581 let timer_tm = Arc::clone(&transaction_manager);
582 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
583 let timer_rdf = Arc::clone(&rdf_store);
584 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
585 let timer_wal = wal.clone();
586
587 let mut db = Self {
588 config,
589 #[cfg(feature = "lpg")]
590 store: Some(store),
591 catalog,
592 #[cfg(feature = "triple-store")]
593 rdf_store,
594 transaction_manager,
595 buffer_manager,
596 #[cfg(feature = "wal")]
597 wal,
598 #[cfg(feature = "wal")]
599 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
600 query_cache,
601 commit_counter: Arc::new(AtomicUsize::new(0)),
602 is_open: RwLock::new(true),
603 #[cfg(feature = "cdc")]
604 cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
605 #[cfg(feature = "cdc")]
606 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
607 #[cfg(feature = "embed")]
608 embedding_models: RwLock::new(hashbrown::HashMap::new()),
609 #[cfg(feature = "grafeo-file")]
610 file_manager,
611 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
612 checkpoint_timer: parking_lot::Mutex::new(None),
613 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
614 vector_spill_storages: None,
615 external_read_store: None,
616 external_write_store: None,
617 #[cfg(feature = "metrics")]
618 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
619 current_graph: RwLock::new(None),
620 current_schema: RwLock::new(None),
621 read_only: is_read_only,
622 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
623 #[cfg(all(feature = "compact-store", feature = "lpg"))]
624 layered_store: None,
625 #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
626 compact_tiered: None,
627 };
628
629 db.register_section_consumers();
631
632 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
634 if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
635 && !is_read_only
636 {
637 *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
638 interval,
639 Arc::clone(fm),
640 timer_store,
641 timer_catalog,
642 timer_tm,
643 #[cfg(feature = "triple-store")]
644 timer_rdf,
645 #[cfg(feature = "wal")]
646 timer_wal,
647 ));
648 }
649
650 #[cfg(all(
654 feature = "lpg",
655 feature = "vector-index",
656 feature = "mmap",
657 not(feature = "temporal")
658 ))]
659 db.restore_spill_files();
660
661 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
664 if db
665 .config
666 .section_configs
667 .get(&grafeo_common::storage::SectionType::VectorStore)
668 .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
669 {
670 db.buffer_manager.spill_all();
671 }
672
673 Ok(db)
674 }
675
676 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
705 config
706 .validate()
707 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
708
709 let transaction_manager = Arc::new(TransactionManager::new());
710
711 let buffer_config = BufferManagerConfig {
712 budget: config.memory_limit.unwrap_or_else(|| {
713 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
715 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
716 b
717 }),
718 spill_path: None,
719 ..BufferManagerConfig::default()
720 };
721 let buffer_manager = BufferManager::new(buffer_config);
722
723 let query_cache = Arc::new(QueryCache::default());
724
725 #[cfg(feature = "cdc")]
726 let cdc_enabled_val = config.cdc_enabled;
727
728 Ok(Self {
729 config,
730 #[cfg(feature = "lpg")]
731 store: None,
732 catalog: Arc::new(Catalog::new()),
733 #[cfg(feature = "triple-store")]
734 rdf_store: Arc::new(RdfStore::new()),
735 transaction_manager,
736 buffer_manager,
737 #[cfg(feature = "wal")]
738 wal: None,
739 #[cfg(feature = "wal")]
740 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
741 query_cache,
742 commit_counter: Arc::new(AtomicUsize::new(0)),
743 is_open: RwLock::new(true),
744 #[cfg(feature = "cdc")]
745 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
746 #[cfg(feature = "cdc")]
747 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
748 #[cfg(feature = "embed")]
749 embedding_models: RwLock::new(hashbrown::HashMap::new()),
750 #[cfg(feature = "grafeo-file")]
751 file_manager: None,
752 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
753 checkpoint_timer: parking_lot::Mutex::new(None),
754 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
755 vector_spill_storages: None,
756 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStoreSearch>),
757 external_write_store: Some(store),
758 #[cfg(feature = "metrics")]
759 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
760 current_graph: RwLock::new(None),
761 current_schema: RwLock::new(None),
762 read_only: false,
763 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
764 #[cfg(all(feature = "compact-store", feature = "lpg"))]
765 layered_store: None,
766 #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
767 compact_tiered: None,
768 })
769 }
770
771 pub fn with_read_store(store: Arc<dyn GraphStoreSearch>, config: Config) -> Result<Self> {
796 config
797 .validate()
798 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
799
800 let transaction_manager = Arc::new(TransactionManager::new());
801
802 let buffer_config = BufferManagerConfig {
803 budget: config.memory_limit.unwrap_or_else(|| {
804 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
806 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
807 b
808 }),
809 spill_path: None,
810 ..BufferManagerConfig::default()
811 };
812 let buffer_manager = BufferManager::new(buffer_config);
813
814 let query_cache = Arc::new(QueryCache::default());
815
816 #[cfg(feature = "cdc")]
817 let cdc_enabled_val = config.cdc_enabled;
818
819 Ok(Self {
820 config,
821 #[cfg(feature = "lpg")]
822 store: None,
823 catalog: Arc::new(Catalog::new()),
824 #[cfg(feature = "triple-store")]
825 rdf_store: Arc::new(RdfStore::new()),
826 transaction_manager,
827 buffer_manager,
828 #[cfg(feature = "wal")]
829 wal: None,
830 #[cfg(feature = "wal")]
831 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
832 query_cache,
833 commit_counter: Arc::new(AtomicUsize::new(0)),
834 is_open: RwLock::new(true),
835 #[cfg(feature = "cdc")]
836 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
837 #[cfg(feature = "cdc")]
838 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
839 #[cfg(feature = "embed")]
840 embedding_models: RwLock::new(hashbrown::HashMap::new()),
841 #[cfg(feature = "grafeo-file")]
842 file_manager: None,
843 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
844 checkpoint_timer: parking_lot::Mutex::new(None),
845 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
846 vector_spill_storages: None,
847 external_read_store: Some(store),
848 external_write_store: None,
849 #[cfg(feature = "metrics")]
850 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
851 current_graph: RwLock::new(None),
852 current_schema: RwLock::new(None),
853 read_only: true,
854 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
855 #[cfg(all(feature = "compact-store", feature = "lpg"))]
856 layered_store: None,
857 #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
858 compact_tiered: None,
859 })
860 }
861
862 #[cfg(all(feature = "compact-store", feature = "lpg"))]
880 pub fn compact(&mut self) -> Result<()> {
881 use grafeo_core::graph::compact::from_graph_store_preserving_ids;
882 use grafeo_core::graph::compact::layered::LayeredStore;
883
884 let current_store = self.graph_store();
885
886 let max_node_id = if let Some(ref store) = self.store {
888 store.next_node_id().saturating_sub(1)
889 } else {
890 current_store.node_ids().last().map_or(0, |id| id.as_u64())
891 };
892 let max_edge_id = if let Some(ref store) = self.store {
893 store.next_edge_id().saturating_sub(1)
894 } else {
895 let mut max_eid = 0u64;
897 for nid in current_store.node_ids() {
898 for (_, eid) in
899 current_store.edges_from(nid, grafeo_core::graph::Direction::Outgoing)
900 {
901 max_eid = max_eid.max(eid.as_u64());
902 }
903 }
904 max_eid
905 };
906
907 let compact = from_graph_store_preserving_ids(current_store.as_ref())
908 .map_err(|e| Error::Internal(e.to_string()))?;
909
910 let layered = Arc::new(
911 LayeredStore::new(compact, max_node_id, max_edge_id)
912 .map_err(|e| Error::Internal(e.to_string()))?,
913 );
914
915 let current_epoch = self.transaction_manager.current_epoch();
918 layered.overlay_store().sync_epoch(current_epoch);
919
920 if let Some(ref old) = self.store {
924 layered
925 .overlay_store()
926 .install_named_graphs(old.take_named_graphs());
927 }
928
929 self.external_read_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreSearch>);
930 self.external_write_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreMut>);
931 self.store = Some(Arc::clone(layered.overlay_store()));
932
933 #[cfg(feature = "mmap")]
937 {
938 let tiered = Arc::new(compact_tiered::CompactStoreTiered::new_in_memory(
939 layered.base_store_arc(),
940 ));
941 let spill_path = self.buffer_manager.config().spill_path.clone();
942 let consumer = Arc::new(section_consumer::CompactStoreConsumer::new(
943 &tiered, &layered, spill_path,
944 ));
945 self.buffer_manager.register_consumer(consumer);
946 self.compact_tiered = Some(tiered);
947 }
948
949 self.layered_store = Some(layered);
950 self.read_only = false;
951 self.query_cache = Arc::new(QueryCache::default());
952 self.projections.write().clear();
953
954 Ok(())
955 }
956
957 #[cfg(all(feature = "compact-store", feature = "lpg"))]
968 pub fn recompact(&mut self) -> Result<()> {
969 use grafeo_core::graph::compact::from_graph_store_preserving_ids;
970 use grafeo_core::graph::compact::layered::LayeredStore;
971
972 let layered = self
973 .layered_store
974 .as_ref()
975 .ok_or_else(|| Error::Internal("recompact() requires a prior compact()".into()))?;
976
977 let combined: Arc<dyn GraphStoreSearch> = Arc::clone(layered) as Arc<dyn GraphStoreSearch>;
979
980 let max_node_id = layered.overlay_store().next_node_id().saturating_sub(1);
982 let max_edge_id = layered.overlay_store().next_edge_id().saturating_sub(1);
983
984 let fresh_compact = from_graph_store_preserving_ids(combined.as_ref())
985 .map_err(|e| Error::Internal(e.to_string()))?;
986
987 let new_layered = Arc::new(
988 LayeredStore::new(fresh_compact, max_node_id, max_edge_id)
989 .map_err(|e| Error::Internal(e.to_string()))?,
990 );
991
992 let current_epoch = self.transaction_manager.current_epoch();
994 new_layered.overlay_store().sync_epoch(current_epoch);
995
996 new_layered
998 .overlay_store()
999 .install_named_graphs(layered.overlay_store().take_named_graphs());
1000
1001 self.external_read_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreSearch>);
1002 self.external_write_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreMut>);
1003 self.store = Some(Arc::clone(new_layered.overlay_store()));
1004
1005 #[cfg(feature = "mmap")]
1009 {
1010 self.buffer_manager
1011 .unregister_consumer("section:CompactStore");
1012 let tiered = Arc::new(compact_tiered::CompactStoreTiered::new_in_memory(
1013 new_layered.base_store_arc(),
1014 ));
1015 let spill_path = self.buffer_manager.config().spill_path.clone();
1016 let consumer = Arc::new(section_consumer::CompactStoreConsumer::new(
1017 &tiered,
1018 &new_layered,
1019 spill_path,
1020 ));
1021 self.buffer_manager.register_consumer(consumer);
1022 self.compact_tiered = Some(tiered);
1023 }
1024
1025 self.layered_store = Some(new_layered);
1026 self.query_cache = Arc::new(QueryCache::default());
1027
1028 Ok(())
1029 }
1030
1031 #[cfg(all(feature = "wal", feature = "lpg"))]
1037 fn apply_wal_records(
1038 store: &Arc<LpgStore>,
1039 catalog: &Catalog,
1040 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1041 records: &[WalRecord],
1042 ) -> Result<()> {
1043 use crate::catalog::{
1044 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
1045 };
1046 use grafeo_common::utils::error::Error;
1047
1048 let mut current_graph: Option<String> = None;
1051 let mut target_store: Arc<LpgStore> = Arc::clone(store);
1052
1053 for record in records {
1054 match record {
1055 WalRecord::CreateNamedGraph { name } => {
1057 let _ = store.create_graph(name);
1058 }
1059 WalRecord::DropNamedGraph { name } => {
1060 store.drop_graph(name);
1061 if current_graph.as_deref() == Some(name.as_str()) {
1063 current_graph = None;
1064 target_store = Arc::clone(store);
1065 }
1066 }
1067 WalRecord::SwitchGraph { name } => {
1068 current_graph.clone_from(name);
1069 target_store = match ¤t_graph {
1070 None => Arc::clone(store),
1071 Some(graph_name) => store
1072 .graph_or_create(graph_name)
1073 .map_err(|e| Error::Internal(e.to_string()))?,
1074 };
1075 }
1076
1077 WalRecord::CreateNode { id, labels } => {
1079 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
1080 target_store.create_node_with_id(*id, &label_refs)?;
1081 }
1082 WalRecord::DeleteNode { id } => {
1083 target_store.delete_node(*id);
1084 }
1085 WalRecord::CreateEdge {
1086 id,
1087 src,
1088 dst,
1089 edge_type,
1090 } => {
1091 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
1092 }
1093 WalRecord::DeleteEdge { id } => {
1094 target_store.delete_edge(*id);
1095 }
1096 WalRecord::SetNodeProperty { id, key, value } => {
1097 target_store.set_node_property(*id, key, value.clone());
1098 }
1099 WalRecord::SetEdgeProperty { id, key, value } => {
1100 target_store.set_edge_property(*id, key, value.clone());
1101 }
1102 WalRecord::AddNodeLabel { id, label } => {
1103 target_store.add_label(*id, label);
1104 }
1105 WalRecord::RemoveNodeLabel { id, label } => {
1106 target_store.remove_label(*id, label);
1107 }
1108 WalRecord::RemoveNodeProperty { id, key } => {
1109 target_store.remove_node_property(*id, key);
1110 }
1111 WalRecord::RemoveEdgeProperty { id, key } => {
1112 target_store.remove_edge_property(*id, key);
1113 }
1114
1115 WalRecord::CreateNodeType {
1117 name,
1118 properties,
1119 constraints,
1120 } => {
1121 let def = NodeTypeDefinition {
1122 name: name.clone(),
1123 properties: properties
1124 .iter()
1125 .map(|(n, t, nullable)| TypedProperty {
1126 name: n.clone(),
1127 data_type: PropertyDataType::from_type_name(t),
1128 nullable: *nullable,
1129 default_value: None,
1130 })
1131 .collect(),
1132 constraints: constraints
1133 .iter()
1134 .map(|(kind, props)| match kind.as_str() {
1135 "unique" => TypeConstraint::Unique(props.clone()),
1136 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1137 "not_null" if !props.is_empty() => {
1138 TypeConstraint::NotNull(props[0].clone())
1139 }
1140 _ => TypeConstraint::Unique(props.clone()),
1141 })
1142 .collect(),
1143 parent_types: Vec::new(),
1144 };
1145 let _ = catalog.register_node_type(def);
1146 }
1147 WalRecord::DropNodeType { name } => {
1148 let _ = catalog.drop_node_type(name);
1149 }
1150 WalRecord::CreateEdgeType {
1151 name,
1152 properties,
1153 constraints,
1154 } => {
1155 let def = EdgeTypeDefinition {
1156 name: name.clone(),
1157 properties: properties
1158 .iter()
1159 .map(|(n, t, nullable)| TypedProperty {
1160 name: n.clone(),
1161 data_type: PropertyDataType::from_type_name(t),
1162 nullable: *nullable,
1163 default_value: None,
1164 })
1165 .collect(),
1166 constraints: constraints
1167 .iter()
1168 .map(|(kind, props)| match kind.as_str() {
1169 "unique" => TypeConstraint::Unique(props.clone()),
1170 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1171 "not_null" if !props.is_empty() => {
1172 TypeConstraint::NotNull(props[0].clone())
1173 }
1174 _ => TypeConstraint::Unique(props.clone()),
1175 })
1176 .collect(),
1177 source_node_types: Vec::new(),
1178 target_node_types: Vec::new(),
1179 };
1180 let _ = catalog.register_edge_type_def(def);
1181 }
1182 WalRecord::DropEdgeType { name } => {
1183 let _ = catalog.drop_edge_type_def(name);
1184 }
1185 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
1186 }
1189 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
1190 }
1193 WalRecord::CreateGraphType {
1194 name,
1195 node_types,
1196 edge_types,
1197 open,
1198 } => {
1199 use crate::catalog::GraphTypeDefinition;
1200 let def = GraphTypeDefinition {
1201 name: name.clone(),
1202 allowed_node_types: node_types.clone(),
1203 allowed_edge_types: edge_types.clone(),
1204 open: *open,
1205 };
1206 let _ = catalog.register_graph_type(def);
1207 }
1208 WalRecord::DropGraphType { name } => {
1209 let _ = catalog.drop_graph_type(name);
1210 }
1211 WalRecord::CreateSchema { name } => {
1212 let _ = catalog.register_schema_namespace(name.clone());
1213 }
1214 WalRecord::DropSchema { name } => {
1215 let _ = catalog.drop_schema_namespace(name);
1216 }
1217
1218 WalRecord::AlterNodeType { name, alterations } => {
1219 for (action, prop_name, type_name, nullable) in alterations {
1220 match action.as_str() {
1221 "add" => {
1222 let prop = TypedProperty {
1223 name: prop_name.clone(),
1224 data_type: PropertyDataType::from_type_name(type_name),
1225 nullable: *nullable,
1226 default_value: None,
1227 };
1228 let _ = catalog.alter_node_type_add_property(name, prop);
1229 }
1230 "drop" => {
1231 let _ = catalog.alter_node_type_drop_property(name, prop_name);
1232 }
1233 _ => {}
1234 }
1235 }
1236 }
1237 WalRecord::AlterEdgeType { name, alterations } => {
1238 for (action, prop_name, type_name, nullable) in alterations {
1239 match action.as_str() {
1240 "add" => {
1241 let prop = TypedProperty {
1242 name: prop_name.clone(),
1243 data_type: PropertyDataType::from_type_name(type_name),
1244 nullable: *nullable,
1245 default_value: None,
1246 };
1247 let _ = catalog.alter_edge_type_add_property(name, prop);
1248 }
1249 "drop" => {
1250 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1251 }
1252 _ => {}
1253 }
1254 }
1255 }
1256 WalRecord::AlterGraphType { name, alterations } => {
1257 for (action, type_name) in alterations {
1258 match action.as_str() {
1259 "add_node" => {
1260 let _ =
1261 catalog.alter_graph_type_add_node_type(name, type_name.clone());
1262 }
1263 "drop_node" => {
1264 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1265 }
1266 "add_edge" => {
1267 let _ =
1268 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1269 }
1270 "drop_edge" => {
1271 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1272 }
1273 _ => {}
1274 }
1275 }
1276 }
1277
1278 WalRecord::CreateProcedure {
1279 name,
1280 params,
1281 returns,
1282 body,
1283 } => {
1284 use crate::catalog::ProcedureDefinition;
1285 let def = ProcedureDefinition {
1286 name: name.clone(),
1287 params: params.clone(),
1288 returns: returns.clone(),
1289 body: body.clone(),
1290 };
1291 let _ = catalog.register_procedure(def);
1292 }
1293 WalRecord::DropProcedure { name } => {
1294 let _ = catalog.drop_procedure(name);
1295 }
1296
1297 #[cfg(feature = "triple-store")]
1299 WalRecord::InsertRdfTriple { .. }
1300 | WalRecord::DeleteRdfTriple { .. }
1301 | WalRecord::ClearRdfGraph { .. }
1302 | WalRecord::CreateRdfGraph { .. }
1303 | WalRecord::DropRdfGraph { .. } => {
1304 rdf_ops::replay_rdf_wal_record(rdf_store, record);
1305 }
1306 #[cfg(not(feature = "triple-store"))]
1307 WalRecord::InsertRdfTriple { .. }
1308 | WalRecord::DeleteRdfTriple { .. }
1309 | WalRecord::ClearRdfGraph { .. }
1310 | WalRecord::CreateRdfGraph { .. }
1311 | WalRecord::DropRdfGraph { .. } => {}
1312
1313 WalRecord::TransactionCommit { .. } => {
1314 #[cfg(feature = "temporal")]
1318 {
1319 target_store.new_epoch();
1320 }
1321 }
1322 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1323 }
1326 WalRecord::EpochAdvance { .. } => {
1327 }
1330 }
1331 }
1332 Ok(())
1333 }
1334
1335 #[cfg(feature = "grafeo-file")]
1341 fn should_use_single_file(
1342 path: &std::path::Path,
1343 configured: crate::config::StorageFormat,
1344 ) -> bool {
1345 use crate::config::StorageFormat;
1346 match configured {
1347 StorageFormat::SingleFile => true,
1348 StorageFormat::WalDirectory => false,
1349 StorageFormat::Auto => {
1350 if path.is_file() {
1352 if let Ok(mut f) = std::fs::File::open(path) {
1353 use std::io::Read;
1354 let mut magic = [0u8; 4];
1355 if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1356 {
1357 return true;
1358 }
1359 }
1360 return false;
1361 }
1362 if path.is_dir() {
1364 return false;
1365 }
1366 path.extension().is_some_and(|ext| ext == "grafeo")
1368 }
1369 }
1370 }
1371
1372 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1376 fn apply_snapshot_data(
1377 store: &Arc<LpgStore>,
1378 catalog: &Arc<crate::catalog::Catalog>,
1379 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1380 data: &[u8],
1381 ) -> Result<()> {
1382 persistence::load_snapshot_into_store(
1384 store,
1385 catalog,
1386 #[cfg(feature = "triple-store")]
1387 rdf_store,
1388 data,
1389 )
1390 }
1391
1392 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1396 fn load_from_sections(
1397 fm: &GrafeoFileManager,
1398 store: &Arc<LpgStore>,
1399 catalog: &Arc<crate::catalog::Catalog>,
1400 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1401 ) -> Result<()> {
1402 use grafeo_common::storage::{Section, SectionType};
1403
1404 let dir = fm.read_section_directory()?.ok_or_else(|| {
1405 grafeo_common::utils::error::Error::Internal(
1406 "expected v2 section directory but found none".to_string(),
1407 )
1408 })?;
1409
1410 if let Some(entry) = dir.find(SectionType::Catalog) {
1412 let data = fm.read_section_data(entry)?;
1413 let tm = Arc::new(crate::transaction::TransactionManager::new());
1414 let mut section = catalog_section::CatalogSection::new(
1415 Arc::clone(catalog),
1416 Arc::clone(store),
1417 move || tm.current_epoch().as_u64(),
1418 );
1419 section.deserialize(&data)?;
1420 }
1421
1422 if let Some(entry) = dir.find(SectionType::LpgStore) {
1424 let data = fm.read_section_data(entry)?;
1425 let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1426 section.deserialize(&data)?;
1427 }
1428
1429 #[cfg(feature = "triple-store")]
1431 if let Some(entry) = dir.find(SectionType::RdfStore) {
1432 let data = fm.read_section_data(entry)?;
1433 let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1434 section.deserialize(&data)?;
1435 }
1436
1437 #[cfg(feature = "ring-index")]
1439 if let Some(entry) = dir.find(SectionType::RdfRing) {
1440 let data = fm.read_section_data(entry)?;
1441 let mut section = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
1442 section.deserialize(&data)?;
1443 }
1444
1445 #[cfg(feature = "vector-index")]
1447 if let Some(entry) = dir.find(SectionType::VectorStore) {
1448 let data = fm.read_section_data(entry)?;
1449 let indexes = store.vector_index_entries();
1450 if !indexes.is_empty() {
1451 let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1452 section.deserialize(&data)?;
1453 }
1454 }
1455
1456 #[cfg(feature = "text-index")]
1458 if let Some(entry) = dir.find(SectionType::TextIndex) {
1459 let data = fm.read_section_data(entry)?;
1460 let indexes = store.text_index_entries();
1461 if !indexes.is_empty() {
1462 let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1463 section.deserialize(&data)?;
1464 }
1465 }
1466
1467 Ok(())
1468 }
1469
1470 #[must_use]
1498 pub fn session(&self) -> Session {
1499 self.create_session_inner(None)
1500 }
1501
1502 #[must_use]
1520 pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1521 let force_read_only = !identity.can_write();
1522 self.create_session_inner_full(None, force_read_only, identity)
1523 }
1524
1525 #[must_use]
1539 pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1540 self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1541 }
1542
1543 #[cfg(feature = "cdc")]
1562 #[must_use]
1563 pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1564 self.create_session_inner(Some(cdc_enabled))
1565 }
1566
1567 #[deprecated(
1576 since = "0.5.36",
1577 note = "use session_with_role(Role::ReadOnly) instead"
1578 )]
1579 #[must_use]
1580 pub fn session_read_only(&self) -> Session {
1581 self.session_with_role(crate::auth::Role::ReadOnly)
1582 }
1583
1584 #[allow(unused_variables)] fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1590 self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1591 }
1592
1593 #[allow(unused_variables)]
1595 fn create_session_inner_full(
1596 &self,
1597 cdc_override: Option<bool>,
1598 force_read_only: bool,
1599 identity: crate::auth::Identity,
1600 ) -> Session {
1601 let session_cfg = || crate::session::SessionConfig {
1602 transaction_manager: Arc::clone(&self.transaction_manager),
1603 query_cache: Arc::clone(&self.query_cache),
1604 catalog: Arc::clone(&self.catalog),
1605 adaptive_config: self.config.adaptive.clone(),
1606 factorized_execution: self.config.factorized_execution,
1607 graph_model: self.config.graph_model,
1608 query_timeout: self.config.query_timeout,
1609 max_property_size: self.config.max_property_size,
1610 buffer_manager: Some(Arc::clone(&self.buffer_manager)),
1611 commit_counter: Arc::clone(&self.commit_counter),
1612 gc_interval: self.config.gc_interval,
1613 read_only: self.read_only || force_read_only,
1614 identity: identity.clone(),
1615 #[cfg(feature = "lpg")]
1616 projections: Arc::clone(&self.projections),
1617 };
1618
1619 #[cfg(all(feature = "compact-store", feature = "lpg"))]
1622 if let Some(ref layered) = self.layered_store {
1623 let overlay = Arc::clone(layered.overlay_store());
1624 let layered_arc = Arc::clone(layered);
1625 let mut session = Session::with_adaptive(overlay, session_cfg());
1626 session.override_stores(
1629 Arc::clone(&layered_arc) as Arc<dyn GraphStoreSearch>,
1630 Some(layered_arc as Arc<dyn GraphStoreMut>),
1631 );
1632 return session;
1633 }
1634
1635 if let Some(ref ext_read) = self.external_read_store {
1636 return Session::with_external_store(
1637 Arc::clone(ext_read),
1638 self.external_write_store.as_ref().map(Arc::clone),
1639 session_cfg(),
1640 )
1641 .expect("arena allocation for external store session");
1642 }
1643
1644 #[cfg(all(feature = "lpg", feature = "triple-store"))]
1645 let mut session = Session::with_rdf_store_and_adaptive(
1646 Arc::clone(self.lpg_store()),
1647 Arc::clone(&self.rdf_store),
1648 session_cfg(),
1649 );
1650 #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1651 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1652 #[cfg(not(feature = "lpg"))]
1653 let mut session =
1654 Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1655 .expect("session creation for non-lpg build");
1656
1657 #[cfg(all(feature = "wal", feature = "lpg"))]
1658 if let Some(ref wal) = self.wal {
1659 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1660 }
1661
1662 #[cfg(feature = "cdc")]
1663 {
1664 let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1665 if should_enable {
1666 session.set_cdc_log(Arc::clone(&self.cdc_log));
1667 }
1668 }
1669
1670 #[cfg(feature = "metrics")]
1671 {
1672 if let Some(ref m) = self.metrics {
1673 session.set_metrics(Arc::clone(m));
1674 m.session_created
1675 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1676 m.session_active
1677 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1678 }
1679 }
1680
1681 if let Some(ref graph) = *self.current_graph.read() {
1683 session.use_graph(graph);
1684 }
1685
1686 if let Some(ref schema) = *self.current_schema.read() {
1688 session.set_schema(schema);
1689 }
1690
1691 let _ = &mut session;
1693
1694 session
1695 }
1696
1697 #[must_use]
1703 pub fn current_graph(&self) -> Option<String> {
1704 self.current_graph.read().clone()
1705 }
1706
1707 pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1716 #[cfg(feature = "lpg")]
1717 if let Some(name) = name
1718 && !name.eq_ignore_ascii_case("default")
1719 && let Some(store) = &self.store
1720 && store.graph(name).is_none()
1721 {
1722 return Err(Error::Query(QueryError::new(
1723 QueryErrorKind::Semantic,
1724 format!("Graph '{name}' does not exist"),
1725 )));
1726 }
1727 *self.current_graph.write() = name.map(ToString::to_string);
1728 Ok(())
1729 }
1730
1731 #[must_use]
1736 pub fn current_schema(&self) -> Option<String> {
1737 self.current_schema.read().clone()
1738 }
1739
1740 pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1749 if let Some(name) = name
1750 && !self.catalog.schema_exists(name)
1751 {
1752 return Err(Error::Query(QueryError::new(
1753 QueryErrorKind::Semantic,
1754 format!("Schema '{name}' does not exist"),
1755 )));
1756 }
1757 *self.current_schema.write() = name.map(ToString::to_string);
1758 Ok(())
1759 }
1760
1761 #[must_use]
1763 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1764 &self.config.adaptive
1765 }
1766
1767 #[must_use]
1769 pub fn is_read_only(&self) -> bool {
1770 self.read_only
1771 }
1772
1773 #[must_use]
1775 pub fn config(&self) -> &Config {
1776 &self.config
1777 }
1778
1779 #[must_use]
1781 pub fn graph_model(&self) -> crate::config::GraphModel {
1782 self.config.graph_model
1783 }
1784
1785 #[must_use]
1787 pub fn memory_limit(&self) -> Option<usize> {
1788 self.config.memory_limit
1789 }
1790
1791 #[cfg(feature = "metrics")]
1796 #[must_use]
1797 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1798 let mut snapshot = self
1799 .metrics
1800 .as_ref()
1801 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1802
1803 let cache_stats = self.query_cache.stats();
1805 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1806 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1807 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1808 snapshot.cache_invalidations = cache_stats.invalidations;
1809
1810 snapshot
1811 }
1812
1813 #[cfg(feature = "metrics")]
1817 #[must_use]
1818 pub fn metrics_prometheus(&self) -> String {
1819 self.metrics
1820 .as_ref()
1821 .map_or_else(String::new, |m| m.to_prometheus())
1822 }
1823
1824 #[cfg(feature = "metrics")]
1826 pub fn reset_metrics(&self) {
1827 if let Some(ref m) = self.metrics {
1828 m.reset();
1829 }
1830 self.query_cache.reset_stats();
1831 }
1832
1833 #[cfg(feature = "lpg")]
1841 #[must_use]
1842 pub fn store(&self) -> &Arc<LpgStore> {
1843 self.lpg_store()
1844 }
1845
1846 #[cfg(feature = "lpg")]
1854 pub fn create_graph(&self, name: &str) -> Result<bool> {
1855 Ok(self.lpg_store().create_graph(name)?)
1856 }
1857
1858 #[cfg(feature = "lpg")]
1863 pub fn drop_graph(&self, name: &str) -> bool {
1864 let Some(store) = &self.store else {
1865 return false;
1866 };
1867 let dropped = store.drop_graph(name);
1868 if dropped {
1869 let mut current = self.current_graph.write();
1870 if current
1871 .as_deref()
1872 .is_some_and(|g| g.eq_ignore_ascii_case(name))
1873 {
1874 *current = None;
1875 }
1876 }
1877 dropped
1878 }
1879
1880 #[cfg(feature = "lpg")]
1882 #[must_use]
1883 pub fn list_graphs(&self) -> Vec<String> {
1884 self.lpg_store().graph_names()
1885 }
1886
1887 pub fn create_projection(
1908 &self,
1909 name: impl Into<String>,
1910 spec: grafeo_core::graph::ProjectionSpec,
1911 ) -> bool {
1912 use grafeo_core::graph::GraphProjection;
1913 use std::collections::hash_map::Entry;
1914
1915 let store = self.graph_store();
1916 let projection = Arc::new(GraphProjection::new(store, spec));
1917 let mut projections = self.projections.write();
1918 match projections.entry(name.into()) {
1919 Entry::Occupied(_) => false,
1920 Entry::Vacant(e) => {
1921 e.insert(projection);
1922 true
1923 }
1924 }
1925 }
1926
1927 pub fn drop_projection(&self, name: &str) -> bool {
1929 self.projections.write().remove(name).is_some()
1930 }
1931
1932 #[must_use]
1934 pub fn list_projections(&self) -> Vec<String> {
1935 self.projections.read().keys().cloned().collect()
1936 }
1937
1938 #[must_use]
1940 pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStoreSearch>> {
1941 self.projections
1942 .read()
1943 .get(name)
1944 .map(|p| Arc::clone(p) as Arc<dyn GraphStoreSearch>)
1945 }
1946
1947 #[must_use]
1955 pub fn graph_store(&self) -> Arc<dyn GraphStoreSearch> {
1956 if let Some(ref ext_read) = self.external_read_store {
1957 Arc::clone(ext_read)
1958 } else {
1959 #[cfg(feature = "lpg")]
1960 {
1961 Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreSearch>
1962 }
1963 #[cfg(not(feature = "lpg"))]
1964 unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1965 }
1966 }
1967
1968 #[must_use]
1973 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1974 if self.external_read_store.is_some() {
1975 self.external_write_store.as_ref().map(Arc::clone)
1976 } else {
1977 #[cfg(feature = "lpg")]
1978 {
1979 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1980 }
1981 #[cfg(not(feature = "lpg"))]
1982 {
1983 None
1984 }
1985 }
1986 }
1987
1988 pub fn gc(&self) {
1995 #[cfg(feature = "lpg")]
1996 {
1997 let min_epoch = self.transaction_manager.min_active_epoch();
1998 self.lpg_store().gc_versions(min_epoch);
1999 }
2000 #[cfg(all(feature = "lpg", feature = "cdc"))]
2001 let current_epoch = self.transaction_manager.current_epoch();
2002 self.transaction_manager.gc();
2003
2004 #[cfg(feature = "cdc")]
2006 if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
2007 #[cfg(feature = "lpg")]
2008 self.cdc_log.apply_retention(current_epoch);
2009 }
2010 }
2011
2012 #[must_use]
2014 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
2015 &self.buffer_manager
2016 }
2017
2018 #[cfg(all(feature = "compact-store", feature = "lpg"))]
2021 #[must_use]
2022 pub fn layered_store(
2023 &self,
2024 ) -> Option<&Arc<grafeo_core::graph::compact::layered::LayeredStore>> {
2025 self.layered_store.as_ref()
2026 }
2027
2028 #[cfg(all(feature = "compact-store", feature = "mmap", feature = "lpg"))]
2031 #[must_use]
2032 pub fn compact_tiered(&self) -> Option<&Arc<compact_tiered::CompactStoreTiered>> {
2033 self.compact_tiered.as_ref()
2034 }
2035
2036 #[must_use]
2038 pub fn query_cache(&self) -> &Arc<QueryCache> {
2039 &self.query_cache
2040 }
2041
2042 pub fn clear_plan_cache(&self) {
2048 self.query_cache.clear();
2049 }
2050
2051 pub fn close(&self) -> Result<()> {
2065 let mut is_open = self.is_open.write();
2066 if !*is_open {
2067 return Ok(());
2068 }
2069
2070 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
2075 if let Some(mut timer) = self.checkpoint_timer.lock().take() {
2076 timer.stop();
2077 }
2078
2079 if self.read_only {
2081 #[cfg(feature = "grafeo-file")]
2082 if let Some(ref fm) = self.file_manager {
2083 fm.close()?;
2084 }
2085 *is_open = false;
2086 return Ok(());
2087 }
2088
2089 #[cfg(feature = "grafeo-file")]
2093 let is_single_file = self.file_manager.is_some();
2094 #[cfg(not(feature = "grafeo-file"))]
2095 let is_single_file = false;
2096
2097 #[cfg(feature = "grafeo-file")]
2098 if let Some(ref fm) = self.file_manager {
2099 #[cfg(feature = "wal")]
2101 if let Some(ref wal) = self.wal {
2102 wal.sync()?;
2103 }
2104 let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2105
2106 #[cfg(feature = "wal")]
2112 let flush_result = if flush_result.sections_written == 0 {
2113 if let Some(ref wal) = self.wal {
2114 if wal.record_count() > 0 {
2115 grafeo_warn!(
2116 "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
2117 wal.record_count()
2118 );
2119 self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
2120 } else {
2121 flush_result
2122 }
2123 } else {
2124 flush_result
2125 }
2126 } else {
2127 flush_result
2128 };
2129
2130 #[cfg(feature = "wal")]
2133 if let Some(ref wal) = self.wal {
2134 wal.close_active_log();
2135 }
2136
2137 #[cfg(feature = "wal")]
2141 let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
2142 #[cfg(not(feature = "wal"))]
2143 let has_wal_records = false;
2144
2145 if flush_result.sections_written > 0 || !has_wal_records {
2146 {
2147 use grafeo_common::testing::crash::maybe_crash;
2148 maybe_crash("close:before_remove_sidecar_wal");
2149 }
2150 fm.remove_sidecar_wal()?;
2151 } else {
2152 grafeo_warn!(
2153 "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
2154 );
2155 }
2156 fm.close()?;
2157 }
2158
2159 #[cfg(feature = "wal")]
2165 if !is_single_file && let Some(ref wal) = self.wal {
2166 let commit_tx = self
2168 .transaction_manager
2169 .last_assigned_transaction_id()
2170 .unwrap_or_else(|| self.transaction_manager.begin());
2171
2172 wal.log(&WalRecord::TransactionCommit {
2174 transaction_id: commit_tx,
2175 })?;
2176
2177 wal.sync()?;
2178 }
2179
2180 *is_open = false;
2181 Ok(())
2182 }
2183
2184 #[cfg(feature = "wal")]
2186 #[must_use]
2187 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
2188 self.wal.as_ref()
2189 }
2190
2191 #[cfg(feature = "wal")]
2193 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
2194 if let Some(ref wal) = self.wal {
2195 wal.log(record)?;
2196 }
2197 Ok(())
2198 }
2199
2200 fn register_section_consumers(&mut self) {
2205 #[cfg(feature = "lpg")]
2207 let store_ref = self.store.as_ref();
2208 #[cfg(feature = "lpg")]
2209 if let Some(store) = store_ref {
2210 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2211 self.buffer_manager.register_consumer(Arc::new(
2212 section_consumer::SectionConsumer::new(Arc::new(lpg)),
2213 ));
2214 }
2215
2216 #[cfg(feature = "triple-store")]
2218 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2219 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2220 self.buffer_manager.register_consumer(Arc::new(
2221 section_consumer::SectionConsumer::new(Arc::new(rdf)),
2222 ));
2223 }
2224
2225 #[cfg(feature = "ring-index")]
2227 if self.rdf_store.ring().is_some() {
2228 let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2229 self.buffer_manager.register_consumer(Arc::new(
2230 section_consumer::SectionConsumer::new(Arc::new(ring)),
2231 ));
2232 }
2233
2234 #[cfg(all(
2237 feature = "lpg",
2238 feature = "vector-index",
2239 feature = "mmap",
2240 not(feature = "temporal")
2241 ))]
2242 if let Some(store) = store_ref {
2243 let spill_path = self.buffer_manager.config().spill_path.clone();
2244 let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2245 store, spill_path,
2246 ));
2247 self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2249 self.buffer_manager.register_consumer(consumer);
2250 }
2251
2252 #[cfg(all(feature = "lpg", feature = "text-index"))]
2254 if let Some(store) = store_ref {
2255 self.buffer_manager
2256 .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2257 }
2258
2259 #[cfg(feature = "cdc")]
2262 self.buffer_manager.register_consumer(
2263 Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2264 );
2265 }
2266
2267 #[cfg(all(
2274 feature = "lpg",
2275 feature = "vector-index",
2276 feature = "mmap",
2277 not(feature = "temporal")
2278 ))]
2279 fn restore_spill_files(&mut self) {
2280 use grafeo_core::index::vector::MmapStorage;
2281
2282 let spill_dir = match self.buffer_manager.config().spill_path {
2283 Some(ref path) => path.clone(),
2284 None => return,
2285 };
2286
2287 if !spill_dir.exists() {
2288 return;
2289 }
2290
2291 let spill_map = match self.vector_spill_storages {
2292 Some(ref map) => Arc::clone(map),
2293 None => return,
2294 };
2295
2296 let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2297 return;
2298 };
2299
2300 let Some(ref store) = self.store else {
2301 return;
2302 };
2303
2304 for entry in entries.flatten() {
2305 let path = entry.path();
2306 let file_name = match path.file_name().and_then(|n| n.to_str()) {
2307 Some(name) => name.to_string(),
2308 None => continue,
2309 };
2310
2311 if !file_name.starts_with("vectors_")
2313 || !std::path::Path::new(&file_name)
2314 .extension()
2315 .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2316 {
2317 continue;
2318 }
2319
2320 let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2322
2323 let key = key_part.replace("%3A", ":").replace("%25", "%");
2325
2326 if !key.contains(':') {
2328 continue;
2330 }
2331
2332 if store.get_vector_index_by_key(&key).is_none() {
2334 let _ = std::fs::remove_file(&path);
2336 continue;
2337 }
2338
2339 match MmapStorage::open(&path) {
2341 Ok(mmap_storage) => {
2342 let property = key.split(':').nth(1).unwrap_or("");
2344 let prop_key = grafeo_common::types::PropertyKey::new(property);
2345 store.node_properties_mark_spilled(&prop_key);
2346
2347 spill_map.write().insert(key, Arc::new(mmap_storage));
2348 }
2349 Err(e) => {
2350 eprintln!("failed to restore spill file {}: {e}", path.display());
2351 let _ = std::fs::remove_file(&path);
2353 }
2354 }
2355 }
2356 }
2357
2358 #[cfg(feature = "grafeo-file")]
2360 fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2361 let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2362
2363 #[cfg(all(feature = "compact-store", feature = "lpg"))]
2365 if let Some(ref layered) = self.layered_store {
2366 let compact_section = grafeo_core::graph::compact::section::CompactStoreSection::new(
2368 layered.base_store_arc(),
2369 );
2370 sections.push(Box::new(compact_section));
2371
2372 let overlay = layered.overlay_store();
2374 let overlay_section =
2375 grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(overlay));
2376 sections.push(Box::new(overlay_section));
2377
2378 return sections;
2379 }
2380
2381 #[cfg(feature = "lpg")]
2383 if let Some(store) = self.store.as_ref() {
2384 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2385
2386 let catalog = catalog_section::CatalogSection::new(
2387 Arc::clone(&self.catalog),
2388 Arc::clone(store),
2389 {
2390 let tm = Arc::clone(&self.transaction_manager);
2391 move || tm.current_epoch().as_u64()
2392 },
2393 );
2394
2395 sections.push(Box::new(catalog));
2396 sections.push(Box::new(lpg));
2397
2398 #[cfg(feature = "vector-index")]
2400 {
2401 let indexes = store.vector_index_entries();
2402 if !indexes.is_empty() {
2403 let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2404 sections.push(Box::new(vector));
2405 }
2406 }
2407
2408 #[cfg(feature = "text-index")]
2410 {
2411 let indexes = store.text_index_entries();
2412 if !indexes.is_empty() {
2413 let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2414 sections.push(Box::new(text));
2415 }
2416 }
2417 }
2418
2419 #[cfg(feature = "triple-store")]
2420 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2421 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2422 sections.push(Box::new(rdf));
2423 }
2424
2425 #[cfg(feature = "ring-index")]
2426 if self.rdf_store.ring().is_some() {
2427 let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2428 sections.push(Box::new(ring));
2429 }
2430
2431 sections
2432 }
2433
2434 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2448 pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2449 let fm = self
2450 .file_manager
2451 .as_ref()
2452 .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2453
2454 if !self.read_only {
2458 let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2459 }
2460
2461 let current_epoch = self.transaction_manager.current_epoch();
2462 backup::do_backup_full(backup_dir, fm, self.wal.as_deref(), current_epoch)
2463 }
2464
2465 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2473 pub fn backup_incremental(
2474 &self,
2475 backup_dir: &std::path::Path,
2476 ) -> Result<backup::BackupSegment> {
2477 let wal = self
2478 .wal
2479 .as_ref()
2480 .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2481
2482 let current_epoch = self.transaction_manager.current_epoch();
2483 backup::do_backup_incremental(backup_dir, wal, current_epoch)
2484 }
2485
2486 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2492 pub fn read_backup_manifest(
2493 backup_dir: &std::path::Path,
2494 ) -> Result<Option<backup::BackupManifest>> {
2495 backup::read_manifest(backup_dir)
2496 }
2497
2498 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2500 #[must_use]
2501 pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2502 self.wal
2503 .as_ref()
2504 .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2505 }
2506
2507 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2518 pub fn restore_to_epoch(
2519 backup_dir: &std::path::Path,
2520 target_epoch: grafeo_common::types::EpochId,
2521 output_path: &std::path::Path,
2522 ) -> Result<()> {
2523 backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2524 }
2525
2526 #[cfg(feature = "grafeo-file")]
2532 fn checkpoint_to_file(
2533 &self,
2534 fm: &GrafeoFileManager,
2535 reason: flush::FlushReason,
2536 ) -> Result<flush::FlushResult> {
2537 let sections = self.build_sections();
2538 let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2539 sections.iter().map(|s| s.as_ref()).collect();
2540 #[cfg(feature = "lpg")]
2541 let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2542 #[cfg(not(feature = "lpg"))]
2543 let context = flush::build_context_minimal(&self.transaction_manager);
2544
2545 flush::flush(
2546 fm,
2547 §ion_refs,
2548 &context,
2549 reason,
2550 #[cfg(feature = "wal")]
2551 self.wal.as_deref(),
2552 )
2553 }
2554
2555 #[cfg(feature = "grafeo-file")]
2557 #[must_use]
2558 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2559 self.file_manager.as_ref()
2560 }
2561}
2562
2563impl Drop for GrafeoDB {
2564 fn drop(&mut self) {
2565 if let Err(e) = self.close() {
2566 grafeo_error!("Error closing database: {}", e);
2567 }
2568 }
2569}
2570
2571#[cfg(feature = "lpg")]
2572impl crate::admin::AdminService for GrafeoDB {
2573 fn info(&self) -> crate::admin::DatabaseInfo {
2574 self.info()
2575 }
2576
2577 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2578 self.detailed_stats()
2579 }
2580
2581 fn schema(&self) -> crate::admin::SchemaInfo {
2582 self.schema()
2583 }
2584
2585 fn validate(&self) -> crate::admin::ValidationResult {
2586 self.validate()
2587 }
2588
2589 fn wal_status(&self) -> crate::admin::WalStatus {
2590 self.wal_status()
2591 }
2592
2593 fn wal_checkpoint(&self) -> Result<()> {
2594 self.wal_checkpoint()
2595 }
2596}
2597
2598#[derive(Debug)]
2628pub struct QueryResult {
2629 pub columns: Vec<String>,
2631 pub column_types: Vec<grafeo_common::types::LogicalType>,
2633 pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2638 pub execution_time_ms: Option<f64>,
2640 pub rows_scanned: Option<u64>,
2642 pub status_message: Option<String>,
2644 pub gql_status: grafeo_common::utils::GqlStatus,
2646}
2647
2648impl QueryResult {
2649 #[must_use]
2651 pub fn empty() -> Self {
2652 Self {
2653 columns: Vec::new(),
2654 column_types: Vec::new(),
2655 rows: Vec::new(),
2656 execution_time_ms: None,
2657 rows_scanned: None,
2658 status_message: None,
2659 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2660 }
2661 }
2662
2663 #[must_use]
2665 pub fn status(msg: impl Into<String>) -> Self {
2666 Self {
2667 columns: Vec::new(),
2668 column_types: Vec::new(),
2669 rows: Vec::new(),
2670 execution_time_ms: None,
2671 rows_scanned: None,
2672 status_message: Some(msg.into()),
2673 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2674 }
2675 }
2676
2677 #[must_use]
2679 pub fn new(columns: Vec<String>) -> Self {
2680 let len = columns.len();
2681 Self {
2682 columns,
2683 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2684 rows: Vec::new(),
2685 execution_time_ms: None,
2686 rows_scanned: None,
2687 status_message: None,
2688 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2689 }
2690 }
2691
2692 #[must_use]
2694 pub fn with_types(
2695 columns: Vec<String>,
2696 column_types: Vec<grafeo_common::types::LogicalType>,
2697 ) -> Self {
2698 Self {
2699 columns,
2700 column_types,
2701 rows: Vec::new(),
2702 execution_time_ms: None,
2703 rows_scanned: None,
2704 status_message: None,
2705 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2706 }
2707 }
2708
2709 #[must_use]
2711 pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2712 let len = columns.len();
2713 Self {
2714 columns,
2715 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2716 rows,
2717 execution_time_ms: None,
2718 rows_scanned: None,
2719 status_message: None,
2720 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2721 }
2722 }
2723
2724 pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2726 self.rows.push(row);
2727 }
2728
2729 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2731 self.execution_time_ms = Some(execution_time_ms);
2732 self.rows_scanned = Some(rows_scanned);
2733 self
2734 }
2735
2736 #[must_use]
2738 pub fn execution_time_ms(&self) -> Option<f64> {
2739 self.execution_time_ms
2740 }
2741
2742 #[must_use]
2744 pub fn rows_scanned(&self) -> Option<u64> {
2745 self.rows_scanned
2746 }
2747
2748 #[must_use]
2750 pub fn row_count(&self) -> usize {
2751 self.rows.len()
2752 }
2753
2754 #[must_use]
2756 pub fn column_count(&self) -> usize {
2757 self.columns.len()
2758 }
2759
2760 #[must_use]
2762 pub fn is_empty(&self) -> bool {
2763 self.rows.is_empty()
2764 }
2765
2766 pub fn scalar<T: FromValue>(&self) -> Result<T> {
2775 if self.rows.len() != 1 || self.columns.len() != 1 {
2776 return Err(grafeo_common::utils::error::Error::InvalidValue(
2777 "Expected single value".to_string(),
2778 ));
2779 }
2780 T::from_value(&self.rows[0][0])
2781 }
2782
2783 #[must_use]
2785 pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2786 &self.rows
2787 }
2788
2789 #[must_use]
2791 pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2792 self.rows
2793 }
2794
2795 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2797 self.rows.iter()
2798 }
2799
2800 #[cfg(feature = "arrow-export")]
2815 pub fn to_record_batch(
2816 &self,
2817 ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2818 arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2819 }
2820
2821 #[cfg(feature = "arrow-export")]
2832 pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2833 let batch = self.to_record_batch()?;
2834 arrow::record_batch_to_ipc_stream(&batch)
2835 }
2836}
2837
2838impl std::fmt::Display for QueryResult {
2839 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2840 let table = grafeo_common::fmt::format_result_table(
2841 &self.columns,
2842 &self.rows,
2843 self.execution_time_ms,
2844 self.status_message.as_deref(),
2845 );
2846 f.write_str(&table)
2847 }
2848}
2849
2850pub trait FromValue: Sized {
2855 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2861}
2862
2863impl FromValue for i64 {
2864 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2865 value
2866 .as_int64()
2867 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2868 expected: "INT64".to_string(),
2869 found: value.type_name().to_string(),
2870 })
2871 }
2872}
2873
2874impl FromValue for f64 {
2875 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2876 value
2877 .as_float64()
2878 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2879 expected: "FLOAT64".to_string(),
2880 found: value.type_name().to_string(),
2881 })
2882 }
2883}
2884
2885impl FromValue for String {
2886 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2887 value.as_str().map(String::from).ok_or_else(|| {
2888 grafeo_common::utils::error::Error::TypeMismatch {
2889 expected: "STRING".to_string(),
2890 found: value.type_name().to_string(),
2891 }
2892 })
2893 }
2894}
2895
2896impl FromValue for bool {
2897 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2898 value
2899 .as_bool()
2900 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2901 expected: "BOOL".to_string(),
2902 found: value.type_name().to_string(),
2903 })
2904 }
2905}
2906
2907#[cfg(test)]
2908mod tests {
2909 use super::*;
2910
2911 #[test]
2912 fn test_create_in_memory_database() {
2913 let db = GrafeoDB::new_in_memory();
2914 assert_eq!(db.node_count(), 0);
2915 assert_eq!(db.edge_count(), 0);
2916 }
2917
2918 #[test]
2919 fn test_database_config() {
2920 let config = Config::in_memory().with_threads(4).with_query_logging();
2921
2922 let db = GrafeoDB::with_config(config).unwrap();
2923 assert_eq!(db.config().threads, 4);
2924 assert!(db.config().query_logging);
2925 }
2926
2927 #[test]
2928 fn test_database_session() {
2929 let db = GrafeoDB::new_in_memory();
2930 let _session = db.session();
2931 }
2933
2934 #[cfg(feature = "wal")]
2935 #[test]
2936 fn test_persistent_database_recovery() {
2937 use grafeo_common::types::Value;
2938 use tempfile::tempdir;
2939
2940 let dir = tempdir().unwrap();
2941 let db_path = dir.path().join("test_db");
2942
2943 {
2945 let db = GrafeoDB::open(&db_path).unwrap();
2946
2947 let alix = db.create_node(&["Person"]);
2948 db.set_node_property(alix, "name", Value::from("Alix"));
2949
2950 let gus = db.create_node(&["Person"]);
2951 db.set_node_property(gus, "name", Value::from("Gus"));
2952
2953 let _edge = db.create_edge(alix, gus, "KNOWS");
2954
2955 db.close().unwrap();
2957 }
2958
2959 {
2961 let db = GrafeoDB::open(&db_path).unwrap();
2962
2963 assert_eq!(db.node_count(), 2);
2964 assert_eq!(db.edge_count(), 1);
2965
2966 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2968 assert!(node0.is_some());
2969
2970 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2971 assert!(node1.is_some());
2972 }
2973 }
2974
2975 #[cfg(feature = "wal")]
2976 #[test]
2977 fn test_wal_logging() {
2978 use tempfile::tempdir;
2979
2980 let dir = tempdir().unwrap();
2981 let db_path = dir.path().join("wal_test_db");
2982
2983 let db = GrafeoDB::open(&db_path).unwrap();
2984
2985 let node = db.create_node(&["Test"]);
2987 db.delete_node(node);
2988
2989 if let Some(wal) = db.wal() {
2991 assert!(wal.record_count() > 0);
2992 }
2993
2994 db.close().unwrap();
2995 }
2996
2997 #[cfg(feature = "wal")]
2998 #[test]
2999 fn test_wal_recovery_multiple_sessions() {
3000 use grafeo_common::types::Value;
3002 use tempfile::tempdir;
3003
3004 let dir = tempdir().unwrap();
3005 let db_path = dir.path().join("multi_session_db");
3006
3007 {
3009 let db = GrafeoDB::open(&db_path).unwrap();
3010 let alix = db.create_node(&["Person"]);
3011 db.set_node_property(alix, "name", Value::from("Alix"));
3012 db.close().unwrap();
3013 }
3014
3015 {
3017 let db = GrafeoDB::open(&db_path).unwrap();
3018 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
3020 db.set_node_property(gus, "name", Value::from("Gus"));
3021 db.close().unwrap();
3022 }
3023
3024 {
3026 let db = GrafeoDB::open(&db_path).unwrap();
3027 assert_eq!(db.node_count(), 2);
3028
3029 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
3031 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
3032
3033 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
3034 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
3035 }
3036 }
3037
3038 #[cfg(feature = "wal")]
3039 #[test]
3040 fn test_database_consistency_after_mutations() {
3041 use grafeo_common::types::Value;
3043 use tempfile::tempdir;
3044
3045 let dir = tempdir().unwrap();
3046 let db_path = dir.path().join("consistency_db");
3047
3048 {
3049 let db = GrafeoDB::open(&db_path).unwrap();
3050
3051 let a = db.create_node(&["Node"]);
3053 let b = db.create_node(&["Node"]);
3054 let c = db.create_node(&["Node"]);
3055
3056 let e1 = db.create_edge(a, b, "LINKS");
3058 let _e2 = db.create_edge(b, c, "LINKS");
3059
3060 db.delete_edge(e1);
3062 db.delete_node(b);
3063
3064 db.set_node_property(a, "value", Value::Int64(1));
3066 db.set_node_property(c, "value", Value::Int64(3));
3067
3068 db.close().unwrap();
3069 }
3070
3071 {
3073 let db = GrafeoDB::open(&db_path).unwrap();
3074
3075 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
3079 assert!(node_a.is_some());
3080
3081 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
3082 assert!(node_c.is_some());
3083
3084 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
3086 assert!(node_b.is_none());
3087 }
3088 }
3089
3090 #[cfg(feature = "wal")]
3091 #[test]
3092 fn test_close_is_idempotent() {
3093 use tempfile::tempdir;
3095
3096 let dir = tempdir().unwrap();
3097 let db_path = dir.path().join("close_test_db");
3098
3099 let db = GrafeoDB::open(&db_path).unwrap();
3100 db.create_node(&["Test"]);
3101
3102 assert!(db.close().is_ok());
3104
3105 assert!(db.close().is_ok());
3107 }
3108
3109 #[test]
3110 fn test_with_store_external_backend() {
3111 use grafeo_core::graph::lpg::LpgStore;
3112
3113 let external = Arc::new(LpgStore::new().unwrap());
3114
3115 let n1 = external.create_node(&["Person"]);
3117 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
3118
3119 let db = GrafeoDB::with_store(
3120 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
3121 Config::in_memory(),
3122 )
3123 .unwrap();
3124
3125 let session = db.session();
3126
3127 #[cfg(feature = "gql")]
3129 {
3130 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
3131 assert_eq!(result.rows.len(), 1);
3132 }
3133 }
3134
3135 #[test]
3136 fn test_with_config_custom_memory_limit() {
3137 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
3140 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
3141 assert_eq!(db.node_count(), 0);
3142 }
3143
3144 #[cfg(feature = "metrics")]
3145 #[test]
3146 fn test_database_metrics_registry() {
3147 let db = GrafeoDB::new_in_memory();
3148
3149 db.create_node(&["Person"]);
3151 db.create_node(&["Person"]);
3152
3153 let snap = db.metrics();
3155 assert_eq!(snap.query_count, 0); }
3158
3159 #[test]
3160 fn test_query_result_has_metrics() {
3161 let db = GrafeoDB::new_in_memory();
3163 db.create_node(&["Person"]);
3164 db.create_node(&["Person"]);
3165
3166 #[cfg(feature = "gql")]
3167 {
3168 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3169
3170 assert!(result.execution_time_ms.is_some());
3172 assert!(result.rows_scanned.is_some());
3173 assert!(result.execution_time_ms.unwrap() >= 0.0);
3174 assert_eq!(result.rows_scanned.unwrap(), 2);
3175 }
3176 }
3177
3178 #[test]
3179 fn test_empty_query_result_metrics() {
3180 let db = GrafeoDB::new_in_memory();
3182 db.create_node(&["Person"]);
3183
3184 #[cfg(feature = "gql")]
3185 {
3186 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3188
3189 assert!(result.execution_time_ms.is_some());
3190 assert!(result.rows_scanned.is_some());
3191 assert_eq!(result.rows_scanned.unwrap(), 0);
3192 }
3193 }
3194
3195 #[cfg(feature = "cdc")]
3196 mod cdc_integration {
3197 use super::*;
3198
3199 fn cdc_db() -> GrafeoDB {
3201 GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
3202 }
3203
3204 #[test]
3205 fn test_node_lifecycle_history() {
3206 let db = cdc_db();
3207
3208 let id = db.create_node(&["Person"]);
3210 db.set_node_property(id, "name", "Alix".into());
3212 db.set_node_property(id, "name", "Gus".into());
3213 db.delete_node(id);
3215
3216 let history = db.history(id).unwrap();
3217 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3219 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3220 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3222 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3224 }
3225
3226 #[test]
3227 fn test_edge_lifecycle_history() {
3228 let db = cdc_db();
3229
3230 let alix = db.create_node(&["Person"]);
3231 let gus = db.create_node(&["Person"]);
3232 let edge = db.create_edge(alix, gus, "KNOWS");
3233 db.set_edge_property(edge, "since", 2024i64.into());
3234 db.delete_edge(edge);
3235
3236 let history = db.history(edge).unwrap();
3237 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3239 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3240 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3241 }
3242
3243 #[test]
3244 fn test_create_node_with_props_cdc() {
3245 let db = cdc_db();
3246
3247 let id = db.create_node_with_props(
3248 &["Person"],
3249 vec![
3250 ("name", grafeo_common::types::Value::from("Alix")),
3251 ("age", grafeo_common::types::Value::from(30i64)),
3252 ],
3253 );
3254
3255 let history = db.history(id).unwrap();
3256 assert_eq!(history.len(), 1);
3257 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3258 let after = history[0].after.as_ref().unwrap();
3260 assert_eq!(after.len(), 2);
3261 }
3262
3263 #[test]
3264 fn test_changes_between() {
3265 let db = cdc_db();
3266
3267 let id1 = db.create_node(&["A"]);
3268 let _id2 = db.create_node(&["B"]);
3269 db.set_node_property(id1, "x", 1i64.into());
3270
3271 let changes = db
3273 .changes_between(
3274 grafeo_common::types::EpochId(0),
3275 grafeo_common::types::EpochId(u64::MAX),
3276 )
3277 .unwrap();
3278 assert_eq!(changes.len(), 3); }
3280
3281 #[test]
3282 fn test_cdc_disabled_by_default() {
3283 let db = GrafeoDB::new_in_memory();
3284 assert!(!db.is_cdc_enabled());
3285
3286 let id = db.create_node(&["Person"]);
3287 db.set_node_property(id, "name", "Alix".into());
3288
3289 let history = db.history(id).unwrap();
3290 assert!(history.is_empty(), "CDC off by default: no events recorded");
3291 }
3292
3293 #[test]
3294 fn test_session_with_cdc_override_on() {
3295 let db = GrafeoDB::new_in_memory();
3297 let session = db.session_with_cdc(true);
3298 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3299 let changes = db
3301 .changes_between(
3302 grafeo_common::types::EpochId(0),
3303 grafeo_common::types::EpochId(u64::MAX),
3304 )
3305 .unwrap();
3306 assert!(
3307 !changes.is_empty(),
3308 "session_with_cdc(true) should record events"
3309 );
3310 }
3311
3312 #[test]
3313 fn test_session_with_cdc_override_off() {
3314 let db = cdc_db();
3316 let session = db.session_with_cdc(false);
3317 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3318 let changes = db
3319 .changes_between(
3320 grafeo_common::types::EpochId(0),
3321 grafeo_common::types::EpochId(u64::MAX),
3322 )
3323 .unwrap();
3324 assert!(
3325 changes.is_empty(),
3326 "session_with_cdc(false) should not record events"
3327 );
3328 }
3329
3330 #[test]
3331 fn test_set_cdc_enabled_runtime() {
3332 let db = GrafeoDB::new_in_memory();
3333 assert!(!db.is_cdc_enabled());
3334
3335 db.set_cdc_enabled(true);
3337 assert!(db.is_cdc_enabled());
3338
3339 let id = db.create_node(&["Person"]);
3340 let history = db.history(id).unwrap();
3341 assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3342
3343 db.set_cdc_enabled(false);
3345 let id2 = db.create_node(&["Person"]);
3346 let history2 = db.history(id2).unwrap();
3347 assert!(
3348 history2.is_empty(),
3349 "CDC disabled at runtime stops recording"
3350 );
3351 }
3352 }
3353
3354 #[test]
3355 fn test_with_store_basic() {
3356 use grafeo_core::graph::lpg::LpgStore;
3357
3358 let store = Arc::new(LpgStore::new().unwrap());
3359 let n1 = store.create_node(&["Person"]);
3360 store.set_node_property(n1, "name", "Alix".into());
3361
3362 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3363 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3364
3365 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3366 assert_eq!(result.rows.len(), 1);
3367 }
3368
3369 #[test]
3370 fn test_with_store_session() {
3371 use grafeo_core::graph::lpg::LpgStore;
3372
3373 let store = Arc::new(LpgStore::new().unwrap());
3374 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3375 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3376
3377 let session = db.session();
3378 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3379 assert_eq!(result.rows.len(), 1);
3380 }
3381
3382 #[test]
3383 fn test_with_store_mutations() {
3384 use grafeo_core::graph::lpg::LpgStore;
3385
3386 let store = Arc::new(LpgStore::new().unwrap());
3387 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3388 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3389
3390 let mut session = db.session();
3391
3392 session.begin_transaction().unwrap();
3396 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3397
3398 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3399 assert_eq!(result.rows.len(), 1);
3400
3401 session.commit().unwrap();
3402 }
3403
3404 #[test]
3409 fn test_query_result_empty() {
3410 let result = QueryResult::empty();
3411 assert!(result.is_empty());
3412 assert_eq!(result.row_count(), 0);
3413 assert_eq!(result.column_count(), 0);
3414 assert!(result.execution_time_ms().is_none());
3415 assert!(result.rows_scanned().is_none());
3416 assert!(result.status_message.is_none());
3417 }
3418
3419 #[test]
3420 fn test_query_result_status() {
3421 let result = QueryResult::status("Created node type 'Person'");
3422 assert!(result.is_empty());
3423 assert_eq!(result.column_count(), 0);
3424 assert_eq!(
3425 result.status_message.as_deref(),
3426 Some("Created node type 'Person'")
3427 );
3428 }
3429
3430 #[test]
3431 fn test_query_result_new_with_columns() {
3432 let result = QueryResult::new(vec!["name".into(), "age".into()]);
3433 assert_eq!(result.column_count(), 2);
3434 assert_eq!(result.row_count(), 0);
3435 assert!(result.is_empty());
3436 assert_eq!(
3438 result.column_types,
3439 vec![
3440 grafeo_common::types::LogicalType::Any,
3441 grafeo_common::types::LogicalType::Any
3442 ]
3443 );
3444 }
3445
3446 #[test]
3447 fn test_query_result_with_types() {
3448 use grafeo_common::types::LogicalType;
3449 let result = QueryResult::with_types(
3450 vec!["name".into(), "age".into()],
3451 vec![LogicalType::String, LogicalType::Int64],
3452 );
3453 assert_eq!(result.column_count(), 2);
3454 assert_eq!(result.column_types[0], LogicalType::String);
3455 assert_eq!(result.column_types[1], LogicalType::Int64);
3456 }
3457
3458 #[test]
3459 fn test_query_result_with_metrics() {
3460 let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3461 assert_eq!(result.execution_time_ms(), Some(42.5));
3462 assert_eq!(result.rows_scanned(), Some(100));
3463 }
3464
3465 #[test]
3466 fn test_query_result_scalar_success() {
3467 use grafeo_common::types::Value;
3468 let mut result = QueryResult::new(vec!["count".into()]);
3469 result.rows.push(vec![Value::Int64(42)]);
3470
3471 let val: i64 = result.scalar().unwrap();
3472 assert_eq!(val, 42);
3473 }
3474
3475 #[test]
3476 fn test_query_result_scalar_wrong_shape() {
3477 use grafeo_common::types::Value;
3478 let mut result = QueryResult::new(vec!["x".into()]);
3480 result.rows.push(vec![Value::Int64(1)]);
3481 result.rows.push(vec![Value::Int64(2)]);
3482 assert!(result.scalar::<i64>().is_err());
3483
3484 let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3486 result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3487 assert!(result2.scalar::<i64>().is_err());
3488
3489 let result3 = QueryResult::new(vec!["x".into()]);
3491 assert!(result3.scalar::<i64>().is_err());
3492 }
3493
3494 #[test]
3495 fn test_query_result_iter() {
3496 use grafeo_common::types::Value;
3497 let mut result = QueryResult::new(vec!["x".into()]);
3498 result.rows.push(vec![Value::Int64(1)]);
3499 result.rows.push(vec![Value::Int64(2)]);
3500
3501 let collected: Vec<_> = result.iter().collect();
3502 assert_eq!(collected.len(), 2);
3503 }
3504
3505 #[test]
3506 fn test_query_result_display() {
3507 use grafeo_common::types::Value;
3508 let mut result = QueryResult::new(vec!["name".into()]);
3509 result.rows.push(vec![Value::from("Alix")]);
3510 let display = result.to_string();
3511 assert!(display.contains("name"));
3512 assert!(display.contains("Alix"));
3513 }
3514
3515 #[test]
3520 fn test_from_value_i64_type_mismatch() {
3521 use grafeo_common::types::Value;
3522 let val = Value::from("not a number");
3523 assert!(i64::from_value(&val).is_err());
3524 }
3525
3526 #[test]
3527 fn test_from_value_f64_type_mismatch() {
3528 use grafeo_common::types::Value;
3529 let val = Value::from("not a float");
3530 assert!(f64::from_value(&val).is_err());
3531 }
3532
3533 #[test]
3534 fn test_from_value_string_type_mismatch() {
3535 use grafeo_common::types::Value;
3536 let val = Value::Int64(42);
3537 assert!(String::from_value(&val).is_err());
3538 }
3539
3540 #[test]
3541 fn test_from_value_bool_type_mismatch() {
3542 use grafeo_common::types::Value;
3543 let val = Value::Int64(1);
3544 assert!(bool::from_value(&val).is_err());
3545 }
3546
3547 #[test]
3548 fn test_from_value_all_success() {
3549 use grafeo_common::types::Value;
3550 assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3551 assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3552 assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3553 assert!(bool::from_value(&Value::Bool(true)).unwrap());
3554 }
3555
3556 #[test]
3561 fn test_database_is_read_only_false_by_default() {
3562 let db = GrafeoDB::new_in_memory();
3563 assert!(!db.is_read_only());
3564 }
3565
3566 #[test]
3567 fn test_database_graph_model() {
3568 let db = GrafeoDB::new_in_memory();
3569 assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3570 }
3571
3572 #[test]
3573 fn test_database_memory_limit_none_by_default() {
3574 let db = GrafeoDB::new_in_memory();
3575 assert!(db.memory_limit().is_none());
3576 }
3577
3578 #[test]
3579 fn test_database_memory_limit_custom() {
3580 let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3581 let db = GrafeoDB::with_config(config).unwrap();
3582 assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3583 }
3584
3585 #[test]
3586 fn test_database_adaptive_config() {
3587 let db = GrafeoDB::new_in_memory();
3588 let adaptive = db.adaptive_config();
3589 assert!(adaptive.enabled);
3590 assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3591 }
3592
3593 #[test]
3594 fn test_database_buffer_manager() {
3595 let db = GrafeoDB::new_in_memory();
3596 let _bm = db.buffer_manager();
3597 }
3599
3600 #[test]
3601 fn test_database_query_cache() {
3602 let db = GrafeoDB::new_in_memory();
3603 let _qc = db.query_cache();
3604 }
3605
3606 #[test]
3607 fn test_database_clear_plan_cache() {
3608 let db = GrafeoDB::new_in_memory();
3609 #[cfg(feature = "gql")]
3611 {
3612 let _ = db.execute("MATCH (n) RETURN count(n)");
3613 }
3614 db.clear_plan_cache();
3615 }
3617
3618 #[test]
3619 fn test_database_gc() {
3620 let db = GrafeoDB::new_in_memory();
3621 db.create_node(&["Person"]);
3622 db.gc();
3623 assert_eq!(db.node_count(), 1);
3625 }
3626
3627 #[test]
3632 fn test_create_and_list_graphs() {
3633 let db = GrafeoDB::new_in_memory();
3634 let created = db.create_graph("social").unwrap();
3635 assert!(created);
3636
3637 let created_again = db.create_graph("social").unwrap();
3639 assert!(!created_again);
3640
3641 let names = db.list_graphs();
3642 assert!(names.contains(&"social".to_string()));
3643 }
3644
3645 #[test]
3646 fn test_drop_graph() {
3647 let db = GrafeoDB::new_in_memory();
3648 db.create_graph("temp").unwrap();
3649 assert!(db.drop_graph("temp"));
3650 assert!(!db.drop_graph("temp")); }
3652
3653 #[test]
3654 fn test_drop_graph_resets_current_graph() {
3655 let db = GrafeoDB::new_in_memory();
3656 db.create_graph("active").unwrap();
3657 db.set_current_graph(Some("active")).unwrap();
3658 assert_eq!(db.current_graph(), Some("active".to_string()));
3659
3660 db.drop_graph("active");
3661 assert_eq!(db.current_graph(), None);
3662 }
3663
3664 #[test]
3669 fn test_current_graph_default_none() {
3670 let db = GrafeoDB::new_in_memory();
3671 assert_eq!(db.current_graph(), None);
3672 }
3673
3674 #[test]
3675 fn test_set_current_graph_valid() {
3676 let db = GrafeoDB::new_in_memory();
3677 db.create_graph("social").unwrap();
3678 db.set_current_graph(Some("social")).unwrap();
3679 assert_eq!(db.current_graph(), Some("social".to_string()));
3680 }
3681
3682 #[test]
3683 fn test_set_current_graph_nonexistent() {
3684 let db = GrafeoDB::new_in_memory();
3685 let result = db.set_current_graph(Some("nonexistent"));
3686 assert!(result.is_err());
3687 }
3688
3689 #[test]
3690 fn test_set_current_graph_none_resets() {
3691 let db = GrafeoDB::new_in_memory();
3692 db.create_graph("social").unwrap();
3693 db.set_current_graph(Some("social")).unwrap();
3694 db.set_current_graph(None).unwrap();
3695 assert_eq!(db.current_graph(), None);
3696 }
3697
3698 #[test]
3699 fn test_set_current_graph_default_keyword() {
3700 let db = GrafeoDB::new_in_memory();
3701 db.set_current_graph(Some("default")).unwrap();
3703 assert_eq!(db.current_graph(), Some("default".to_string()));
3704 }
3705
3706 #[test]
3707 fn test_current_schema_default_none() {
3708 let db = GrafeoDB::new_in_memory();
3709 assert_eq!(db.current_schema(), None);
3710 }
3711
3712 #[test]
3713 fn test_set_current_schema_nonexistent() {
3714 let db = GrafeoDB::new_in_memory();
3715 let result = db.set_current_schema(Some("nonexistent"));
3716 assert!(result.is_err());
3717 }
3718
3719 #[test]
3720 fn test_set_current_schema_none_resets() {
3721 let db = GrafeoDB::new_in_memory();
3722 db.set_current_schema(None).unwrap();
3723 assert_eq!(db.current_schema(), None);
3724 }
3725
3726 #[test]
3731 fn test_graph_store_returns_lpg_by_default() {
3732 let db = GrafeoDB::new_in_memory();
3733 db.create_node(&["Person"]);
3734 let store = db.graph_store();
3735 assert_eq!(store.node_count(), 1);
3736 }
3737
3738 #[test]
3739 fn test_graph_store_mut_returns_some_by_default() {
3740 let db = GrafeoDB::new_in_memory();
3741 assert!(db.graph_store_mut().is_some());
3742 }
3743
3744 #[test]
3745 fn test_with_read_store() {
3746 use grafeo_core::graph::lpg::LpgStore;
3747
3748 let store = Arc::new(LpgStore::new().unwrap());
3749 store.create_node(&["Person"]);
3750
3751 let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
3752 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3753
3754 assert!(db.is_read_only());
3755 assert!(db.graph_store_mut().is_none());
3756
3757 let gs = db.graph_store();
3759 assert_eq!(gs.node_count(), 1);
3760 }
3761
3762 #[test]
3763 fn test_with_store_graph_store_methods() {
3764 use grafeo_core::graph::lpg::LpgStore;
3765
3766 let store = Arc::new(LpgStore::new().unwrap());
3767 store.create_node(&["Person"]);
3768
3769 let db = GrafeoDB::with_store(
3770 Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3771 Config::in_memory(),
3772 )
3773 .unwrap();
3774
3775 assert!(!db.is_read_only());
3776 assert!(db.graph_store_mut().is_some());
3777 assert_eq!(db.graph_store().node_count(), 1);
3778 }
3779
3780 #[test]
3785 #[allow(deprecated)]
3786 fn test_session_read_only() {
3787 let db = GrafeoDB::new_in_memory();
3788 db.create_node(&["Person"]);
3789
3790 let session = db.session_read_only();
3791 #[cfg(feature = "gql")]
3793 {
3794 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3795 assert_eq!(result.rows.len(), 1);
3796 }
3797 }
3798
3799 #[test]
3804 fn test_close_in_memory_database() {
3805 let db = GrafeoDB::new_in_memory();
3806 db.create_node(&["Person"]);
3807 assert!(db.close().is_ok());
3808 assert!(db.close().is_ok());
3810 }
3811
3812 #[test]
3817 fn test_with_config_invalid_config_zero_threads() {
3818 let config = Config::in_memory().with_threads(0);
3819 let result = GrafeoDB::with_config(config);
3820 assert!(result.is_err());
3821 }
3822
3823 #[test]
3824 fn test_with_config_invalid_config_zero_memory_limit() {
3825 let config = Config::in_memory().with_memory_limit(0);
3826 let result = GrafeoDB::with_config(config);
3827 assert!(result.is_err());
3828 }
3829
3830 #[test]
3835 fn test_storage_format_display() {
3836 use crate::config::StorageFormat;
3837 assert_eq!(StorageFormat::Auto.to_string(), "auto");
3838 assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3839 assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3840 }
3841
3842 #[test]
3843 fn test_storage_format_default() {
3844 use crate::config::StorageFormat;
3845 assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3846 }
3847
3848 #[test]
3849 fn test_config_with_storage_format() {
3850 use crate::config::StorageFormat;
3851 let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3852 assert_eq!(config.storage_format, StorageFormat::SingleFile);
3853 }
3854
3855 #[test]
3860 fn test_config_with_cdc() {
3861 let config = Config::in_memory().with_cdc();
3862 assert!(config.cdc_enabled);
3863 }
3864
3865 #[test]
3866 fn test_config_cdc_default_false() {
3867 let config = Config::in_memory();
3868 assert!(!config.cdc_enabled);
3869 }
3870
3871 #[test]
3876 fn test_config_error_is_error_trait() {
3877 use crate::config::ConfigError;
3878 let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3879 assert!(err.source().is_none());
3880 }
3881
3882 #[cfg(feature = "metrics")]
3887 #[test]
3888 fn test_metrics_prometheus_output() {
3889 let db = GrafeoDB::new_in_memory();
3890 let prom = db.metrics_prometheus();
3891 assert!(!prom.is_empty());
3893 }
3894
3895 #[cfg(feature = "metrics")]
3896 #[test]
3897 fn test_reset_metrics() {
3898 let db = GrafeoDB::new_in_memory();
3899 let _session = db.session();
3901 db.reset_metrics();
3902 let snap = db.metrics();
3903 assert_eq!(snap.query_count, 0);
3904 }
3905
3906 #[test]
3911 fn test_drop_graph_on_external_store() {
3912 use grafeo_core::graph::lpg::LpgStore;
3913
3914 let store = Arc::new(LpgStore::new().unwrap());
3915 let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
3916 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3917
3918 assert!(!db.drop_graph("anything"));
3920 }
3921}