1#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(feature = "lpg")]
31mod crud;
32#[cfg(feature = "embed")]
33mod embed;
34#[cfg(feature = "grafeo-file")]
35pub(crate) mod flush;
36#[cfg(feature = "lpg")]
37mod import;
38#[cfg(feature = "lpg")]
39mod index;
40#[cfg(feature = "lpg")]
41mod persistence;
42mod query;
43#[cfg(feature = "triple-store")]
44mod rdf_ops;
45#[cfg(feature = "lpg")]
46mod search;
47pub(crate) mod section_consumer;
48#[cfg(all(feature = "wal", feature = "lpg"))]
49pub(crate) mod wal_store;
50
51use grafeo_common::{grafeo_error, grafeo_warn};
52#[cfg(feature = "wal")]
53use std::path::Path;
54use std::sync::Arc;
55use std::sync::atomic::AtomicUsize;
56
57use parking_lot::RwLock;
58
59use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
60use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
61#[cfg(feature = "lpg")]
62use grafeo_core::graph::lpg::LpgStore;
63#[cfg(feature = "triple-store")]
64use grafeo_core::graph::rdf::RdfStore;
65use grafeo_core::graph::{GraphStore, GraphStoreMut};
66#[cfg(feature = "grafeo-file")]
67use grafeo_storage::file::GrafeoFileManager;
68#[cfg(all(feature = "wal", feature = "lpg"))]
69use grafeo_storage::wal::WalRecovery;
70#[cfg(feature = "wal")]
71use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
72
73use crate::catalog::Catalog;
74use crate::config::Config;
75use crate::query::cache::QueryCache;
76use crate::session::Session;
77use crate::transaction::TransactionManager;
78
79pub struct GrafeoDB {
102 pub(super) config: Config,
104 #[cfg(feature = "lpg")]
106 pub(super) store: Option<Arc<LpgStore>>,
107 pub(super) catalog: Arc<Catalog>,
109 #[cfg(feature = "triple-store")]
111 pub(super) rdf_store: Arc<RdfStore>,
112 pub(super) transaction_manager: Arc<TransactionManager>,
114 pub(super) buffer_manager: Arc<BufferManager>,
116 #[cfg(feature = "wal")]
118 pub(super) wal: Option<Arc<LpgWal>>,
119 #[cfg(feature = "wal")]
123 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
124 pub(super) query_cache: Arc<QueryCache>,
126 pub(super) commit_counter: Arc<AtomicUsize>,
128 pub(super) is_open: RwLock<bool>,
130 #[cfg(feature = "cdc")]
132 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
133 #[cfg(feature = "cdc")]
135 cdc_enabled: std::sync::atomic::AtomicBool,
136 #[cfg(feature = "embed")]
138 pub(super) embedding_models:
139 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
140 #[cfg(feature = "grafeo-file")]
142 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
143 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
146 checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
147 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
150 vector_spill_storages: Option<
151 Arc<
152 parking_lot::RwLock<
153 std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
154 >,
155 >,
156 >,
157 pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
160 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
163 #[cfg(feature = "metrics")]
165 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
166 current_graph: RwLock<Option<String>>,
170 current_schema: RwLock<Option<String>>,
174 read_only: bool,
177 projections:
179 Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
180 #[cfg(all(feature = "compact-store", feature = "lpg"))]
182 layered_store: Option<Arc<grafeo_core::graph::compact::layered::LayeredStore>>,
183}
184
185impl GrafeoDB {
186 #[cfg(feature = "lpg")]
194 fn lpg_store(&self) -> &Arc<LpgStore> {
195 self.store.as_ref().expect(
196 "no built-in LpgStore: this GrafeoDB was created with an external store \
197 (with_store / with_read_store). Use session() or graph_store() instead.",
198 )
199 }
200
201 #[cfg(feature = "cdc")]
203 #[inline]
204 pub(super) fn cdc_active(&self) -> bool {
205 self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
206 }
207
208 #[must_use]
229 pub fn new_in_memory() -> Self {
230 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
231 }
232
233 #[cfg(feature = "wal")]
252 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
253 Self::with_config(Config::persistent(path.as_ref()))
254 }
255
256 #[cfg(feature = "grafeo-file")]
281 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
282 Self::with_config(Config::read_only(path.as_ref()))
283 }
284
285 pub fn with_config(config: Config) -> Result<Self> {
309 config
311 .validate()
312 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
313
314 #[cfg(feature = "lpg")]
315 let store = Arc::new(LpgStore::new()?);
316 #[cfg(feature = "triple-store")]
317 let rdf_store = Arc::new(RdfStore::new());
318 let transaction_manager = Arc::new(TransactionManager::new());
319
320 let buffer_config = BufferManagerConfig {
322 budget: config.memory_limit.unwrap_or_else(|| {
323 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
325 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
326 b
327 }),
328 spill_path: config.spill_path.clone().or_else(|| {
329 config.path.as_ref().and_then(|p| {
330 let parent = p.parent()?;
331 let name = p.file_name()?.to_str()?;
332 Some(parent.join(format!("{name}.spill")))
333 })
334 }),
335 ..BufferManagerConfig::default()
336 };
337 let buffer_manager = BufferManager::new(buffer_config);
338
339 let catalog = Arc::new(Catalog::new());
341
342 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
343
344 #[cfg(feature = "grafeo-file")]
346 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
347 if let Some(ref db_path) = config.path {
349 if db_path.exists() && db_path.is_file() {
350 let fm = GrafeoFileManager::open_read_only(db_path)?;
351 #[cfg(feature = "lpg")]
353 if fm.read_section_directory()?.is_some() {
354 Self::load_from_sections(
355 &fm,
356 &store,
357 &catalog,
358 #[cfg(feature = "triple-store")]
359 &rdf_store,
360 )?;
361 } else {
362 let snapshot_data = fm.read_snapshot()?;
364 if !snapshot_data.is_empty() {
365 Self::apply_snapshot_data(
366 &store,
367 &catalog,
368 #[cfg(feature = "triple-store")]
369 &rdf_store,
370 &snapshot_data,
371 )?;
372 }
373 }
374 Some(Arc::new(fm))
375 } else {
376 return Err(grafeo_common::utils::error::Error::Internal(format!(
377 "read-only open requires an existing .grafeo file: {}",
378 db_path.display()
379 )));
380 }
381 } else {
382 return Err(grafeo_common::utils::error::Error::Internal(
383 "read-only mode requires a database path".to_string(),
384 ));
385 }
386 } else if let Some(ref db_path) = config.path {
387 if Self::should_use_single_file(db_path, config.storage_format) {
392 let fm = if db_path.exists() && db_path.is_file() {
393 GrafeoFileManager::open(db_path)?
394 } else if !db_path.exists() {
395 GrafeoFileManager::create(db_path)?
396 } else {
397 return Err(grafeo_common::utils::error::Error::Internal(format!(
399 "path exists but is not a file: {}",
400 db_path.display()
401 )));
402 };
403
404 #[cfg(feature = "lpg")]
406 if fm.read_section_directory()?.is_some() {
407 Self::load_from_sections(
408 &fm,
409 &store,
410 &catalog,
411 #[cfg(feature = "triple-store")]
412 &rdf_store,
413 )?;
414 } else {
415 let snapshot_data = fm.read_snapshot()?;
416 if !snapshot_data.is_empty() {
417 Self::apply_snapshot_data(
418 &store,
419 &catalog,
420 #[cfg(feature = "triple-store")]
421 &rdf_store,
422 &snapshot_data,
423 )?;
424 }
425 }
426
427 #[cfg(all(feature = "wal", feature = "lpg"))]
429 if config.wal_enabled && fm.has_sidecar_wal() {
430 let recovery = WalRecovery::new(fm.sidecar_wal_path());
431 let records = recovery.recover()?;
432 Self::apply_wal_records(
433 &store,
434 &catalog,
435 #[cfg(feature = "triple-store")]
436 &rdf_store,
437 &records,
438 )?;
439 }
440
441 Some(Arc::new(fm))
442 } else {
443 None
444 }
445 } else {
446 None
447 };
448
449 #[cfg(feature = "wal")]
452 let wal = if is_read_only {
453 None
454 } else if config.wal_enabled {
455 if let Some(ref db_path) = config.path {
456 #[cfg(feature = "grafeo-file")]
458 let wal_path = if let Some(ref fm) = file_manager {
459 let p = fm.sidecar_wal_path();
460 std::fs::create_dir_all(&p)?;
461 p
462 } else {
463 std::fs::create_dir_all(db_path)?;
465 db_path.join("wal")
466 };
467
468 #[cfg(not(feature = "grafeo-file"))]
469 let wal_path = {
470 std::fs::create_dir_all(db_path)?;
471 db_path.join("wal")
472 };
473
474 #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
476 let is_single_file = file_manager.is_some();
477 #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
478 let is_single_file = false;
479
480 #[cfg(feature = "lpg")]
481 if !is_single_file && wal_path.exists() {
482 let recovery = WalRecovery::new(&wal_path);
483 let records = recovery.recover()?;
484 Self::apply_wal_records(
485 &store,
486 &catalog,
487 #[cfg(feature = "triple-store")]
488 &rdf_store,
489 &records,
490 )?;
491 }
492
493 let wal_durability = match config.wal_durability {
495 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
496 crate::config::DurabilityMode::Batch {
497 max_delay_ms,
498 max_records,
499 } => WalDurabilityMode::Batch {
500 max_delay_ms,
501 max_records,
502 },
503 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
504 WalDurabilityMode::Adaptive { target_interval_ms }
505 }
506 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
507 };
508 let wal_config = WalConfig {
509 durability: wal_durability,
510 ..WalConfig::default()
511 };
512 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
513 Some(Arc::new(wal_manager))
514 } else {
515 None
516 }
517 } else {
518 None
519 };
520
521 let query_cache = Arc::new(QueryCache::default());
523
524 #[cfg(all(feature = "temporal", feature = "lpg"))]
527 transaction_manager.sync_epoch(store.current_epoch());
528
529 #[cfg(feature = "cdc")]
530 let cdc_enabled_val = config.cdc_enabled;
531 #[cfg(feature = "cdc")]
532 let cdc_retention = config.cdc_retention.clone();
533
534 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
537 let checkpoint_interval = config.checkpoint_interval;
538 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
539 let timer_store = Arc::clone(&store);
540 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
541 let timer_catalog = Arc::clone(&catalog);
542 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
543 let timer_tm = Arc::clone(&transaction_manager);
544 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
545 let timer_rdf = Arc::clone(&rdf_store);
546 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
547 let timer_wal = wal.clone();
548
549 let mut db = Self {
550 config,
551 #[cfg(feature = "lpg")]
552 store: Some(store),
553 catalog,
554 #[cfg(feature = "triple-store")]
555 rdf_store,
556 transaction_manager,
557 buffer_manager,
558 #[cfg(feature = "wal")]
559 wal,
560 #[cfg(feature = "wal")]
561 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
562 query_cache,
563 commit_counter: Arc::new(AtomicUsize::new(0)),
564 is_open: RwLock::new(true),
565 #[cfg(feature = "cdc")]
566 cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
567 #[cfg(feature = "cdc")]
568 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
569 #[cfg(feature = "embed")]
570 embedding_models: RwLock::new(hashbrown::HashMap::new()),
571 #[cfg(feature = "grafeo-file")]
572 file_manager,
573 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
574 checkpoint_timer: parking_lot::Mutex::new(None),
575 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
576 vector_spill_storages: None,
577 external_read_store: None,
578 external_write_store: None,
579 #[cfg(feature = "metrics")]
580 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
581 current_graph: RwLock::new(None),
582 current_schema: RwLock::new(None),
583 read_only: is_read_only,
584 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
585 #[cfg(all(feature = "compact-store", feature = "lpg"))]
586 layered_store: None,
587 };
588
589 db.register_section_consumers();
591
592 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
594 if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
595 && !is_read_only
596 {
597 *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
598 interval,
599 Arc::clone(fm),
600 timer_store,
601 timer_catalog,
602 timer_tm,
603 #[cfg(feature = "triple-store")]
604 timer_rdf,
605 #[cfg(feature = "wal")]
606 timer_wal,
607 ));
608 }
609
610 #[cfg(all(
614 feature = "lpg",
615 feature = "vector-index",
616 feature = "mmap",
617 not(feature = "temporal")
618 ))]
619 db.restore_spill_files();
620
621 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
624 if db
625 .config
626 .section_configs
627 .get(&grafeo_common::storage::SectionType::VectorStore)
628 .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
629 {
630 db.buffer_manager.spill_all();
631 }
632
633 Ok(db)
634 }
635
636 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
665 config
666 .validate()
667 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
668
669 let transaction_manager = Arc::new(TransactionManager::new());
670
671 let buffer_config = BufferManagerConfig {
672 budget: config.memory_limit.unwrap_or_else(|| {
673 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
675 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
676 b
677 }),
678 spill_path: None,
679 ..BufferManagerConfig::default()
680 };
681 let buffer_manager = BufferManager::new(buffer_config);
682
683 let query_cache = Arc::new(QueryCache::default());
684
685 #[cfg(feature = "cdc")]
686 let cdc_enabled_val = config.cdc_enabled;
687
688 Ok(Self {
689 config,
690 #[cfg(feature = "lpg")]
691 store: None,
692 catalog: Arc::new(Catalog::new()),
693 #[cfg(feature = "triple-store")]
694 rdf_store: Arc::new(RdfStore::new()),
695 transaction_manager,
696 buffer_manager,
697 #[cfg(feature = "wal")]
698 wal: None,
699 #[cfg(feature = "wal")]
700 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
701 query_cache,
702 commit_counter: Arc::new(AtomicUsize::new(0)),
703 is_open: RwLock::new(true),
704 #[cfg(feature = "cdc")]
705 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
706 #[cfg(feature = "cdc")]
707 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
708 #[cfg(feature = "embed")]
709 embedding_models: RwLock::new(hashbrown::HashMap::new()),
710 #[cfg(feature = "grafeo-file")]
711 file_manager: None,
712 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
713 checkpoint_timer: parking_lot::Mutex::new(None),
714 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
715 vector_spill_storages: None,
716 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
717 external_write_store: Some(store),
718 #[cfg(feature = "metrics")]
719 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
720 current_graph: RwLock::new(None),
721 current_schema: RwLock::new(None),
722 read_only: false,
723 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
724 #[cfg(all(feature = "compact-store", feature = "lpg"))]
725 layered_store: None,
726 })
727 }
728
729 pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
754 config
755 .validate()
756 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
757
758 let transaction_manager = Arc::new(TransactionManager::new());
759
760 let buffer_config = BufferManagerConfig {
761 budget: config.memory_limit.unwrap_or_else(|| {
762 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
764 let b = (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize;
765 b
766 }),
767 spill_path: None,
768 ..BufferManagerConfig::default()
769 };
770 let buffer_manager = BufferManager::new(buffer_config);
771
772 let query_cache = Arc::new(QueryCache::default());
773
774 #[cfg(feature = "cdc")]
775 let cdc_enabled_val = config.cdc_enabled;
776
777 Ok(Self {
778 config,
779 #[cfg(feature = "lpg")]
780 store: None,
781 catalog: Arc::new(Catalog::new()),
782 #[cfg(feature = "triple-store")]
783 rdf_store: Arc::new(RdfStore::new()),
784 transaction_manager,
785 buffer_manager,
786 #[cfg(feature = "wal")]
787 wal: None,
788 #[cfg(feature = "wal")]
789 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
790 query_cache,
791 commit_counter: Arc::new(AtomicUsize::new(0)),
792 is_open: RwLock::new(true),
793 #[cfg(feature = "cdc")]
794 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
795 #[cfg(feature = "cdc")]
796 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
797 #[cfg(feature = "embed")]
798 embedding_models: RwLock::new(hashbrown::HashMap::new()),
799 #[cfg(feature = "grafeo-file")]
800 file_manager: None,
801 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
802 checkpoint_timer: parking_lot::Mutex::new(None),
803 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
804 vector_spill_storages: None,
805 external_read_store: Some(store),
806 external_write_store: None,
807 #[cfg(feature = "metrics")]
808 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
809 current_graph: RwLock::new(None),
810 current_schema: RwLock::new(None),
811 read_only: true,
812 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
813 #[cfg(all(feature = "compact-store", feature = "lpg"))]
814 layered_store: None,
815 })
816 }
817
818 #[cfg(all(feature = "compact-store", feature = "lpg"))]
836 pub fn compact(&mut self) -> Result<()> {
837 use grafeo_core::graph::compact::from_graph_store_preserving_ids;
838 use grafeo_core::graph::compact::layered::LayeredStore;
839
840 let current_store = self.graph_store();
841
842 let max_node_id = if let Some(ref store) = self.store {
844 store.next_node_id().saturating_sub(1)
845 } else {
846 current_store.node_ids().last().map_or(0, |id| id.as_u64())
847 };
848 let max_edge_id = if let Some(ref store) = self.store {
849 store.next_edge_id().saturating_sub(1)
850 } else {
851 let mut max_eid = 0u64;
853 for nid in current_store.node_ids() {
854 for (_, eid) in
855 current_store.edges_from(nid, grafeo_core::graph::Direction::Outgoing)
856 {
857 max_eid = max_eid.max(eid.as_u64());
858 }
859 }
860 max_eid
861 };
862
863 let compact = from_graph_store_preserving_ids(current_store.as_ref())
864 .map_err(|e| Error::Internal(e.to_string()))?;
865
866 let layered = Arc::new(
867 LayeredStore::new(compact, max_node_id, max_edge_id)
868 .map_err(|e| Error::Internal(e.to_string()))?,
869 );
870
871 let current_epoch = self.transaction_manager.current_epoch();
874 layered.overlay_store().sync_epoch(current_epoch);
875
876 self.external_read_store = Some(Arc::clone(&layered) as Arc<dyn GraphStore>);
877 self.external_write_store = Some(Arc::clone(&layered) as Arc<dyn GraphStoreMut>);
878 self.layered_store = Some(layered);
879 self.store = None;
880 self.read_only = false;
881 self.query_cache = Arc::new(QueryCache::default());
882 self.projections.write().clear();
883
884 Ok(())
885 }
886
887 #[cfg(all(feature = "compact-store", feature = "lpg"))]
898 pub fn recompact(&mut self) -> Result<()> {
899 use grafeo_core::graph::compact::from_graph_store_preserving_ids;
900 use grafeo_core::graph::compact::layered::LayeredStore;
901
902 let layered = self
903 .layered_store
904 .as_ref()
905 .ok_or_else(|| Error::Internal("recompact() requires a prior compact()".into()))?;
906
907 let combined: Arc<dyn GraphStore> = Arc::clone(layered) as Arc<dyn GraphStore>;
909
910 let max_node_id = layered.overlay_store().next_node_id().saturating_sub(1);
912 let max_edge_id = layered.overlay_store().next_edge_id().saturating_sub(1);
913
914 let fresh_compact = from_graph_store_preserving_ids(combined.as_ref())
915 .map_err(|e| Error::Internal(e.to_string()))?;
916
917 let new_layered = Arc::new(
918 LayeredStore::new(fresh_compact, max_node_id, max_edge_id)
919 .map_err(|e| Error::Internal(e.to_string()))?,
920 );
921
922 let current_epoch = self.transaction_manager.current_epoch();
924 new_layered.overlay_store().sync_epoch(current_epoch);
925
926 self.external_read_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStore>);
927 self.external_write_store = Some(Arc::clone(&new_layered) as Arc<dyn GraphStoreMut>);
928 self.layered_store = Some(new_layered);
929 self.query_cache = Arc::new(QueryCache::default());
930
931 Ok(())
932 }
933
934 #[cfg(all(feature = "wal", feature = "lpg"))]
940 fn apply_wal_records(
941 store: &Arc<LpgStore>,
942 catalog: &Catalog,
943 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
944 records: &[WalRecord],
945 ) -> Result<()> {
946 use crate::catalog::{
947 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
948 };
949 use grafeo_common::utils::error::Error;
950
951 let mut current_graph: Option<String> = None;
954 let mut target_store: Arc<LpgStore> = Arc::clone(store);
955
956 for record in records {
957 match record {
958 WalRecord::CreateNamedGraph { name } => {
960 let _ = store.create_graph(name);
961 }
962 WalRecord::DropNamedGraph { name } => {
963 store.drop_graph(name);
964 if current_graph.as_deref() == Some(name.as_str()) {
966 current_graph = None;
967 target_store = Arc::clone(store);
968 }
969 }
970 WalRecord::SwitchGraph { name } => {
971 current_graph.clone_from(name);
972 target_store = match ¤t_graph {
973 None => Arc::clone(store),
974 Some(graph_name) => store
975 .graph_or_create(graph_name)
976 .map_err(|e| Error::Internal(e.to_string()))?,
977 };
978 }
979
980 WalRecord::CreateNode { id, labels } => {
982 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
983 target_store.create_node_with_id(*id, &label_refs)?;
984 }
985 WalRecord::DeleteNode { id } => {
986 target_store.delete_node(*id);
987 }
988 WalRecord::CreateEdge {
989 id,
990 src,
991 dst,
992 edge_type,
993 } => {
994 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
995 }
996 WalRecord::DeleteEdge { id } => {
997 target_store.delete_edge(*id);
998 }
999 WalRecord::SetNodeProperty { id, key, value } => {
1000 target_store.set_node_property(*id, key, value.clone());
1001 }
1002 WalRecord::SetEdgeProperty { id, key, value } => {
1003 target_store.set_edge_property(*id, key, value.clone());
1004 }
1005 WalRecord::AddNodeLabel { id, label } => {
1006 target_store.add_label(*id, label);
1007 }
1008 WalRecord::RemoveNodeLabel { id, label } => {
1009 target_store.remove_label(*id, label);
1010 }
1011 WalRecord::RemoveNodeProperty { id, key } => {
1012 target_store.remove_node_property(*id, key);
1013 }
1014 WalRecord::RemoveEdgeProperty { id, key } => {
1015 target_store.remove_edge_property(*id, key);
1016 }
1017
1018 WalRecord::CreateNodeType {
1020 name,
1021 properties,
1022 constraints,
1023 } => {
1024 let def = NodeTypeDefinition {
1025 name: name.clone(),
1026 properties: properties
1027 .iter()
1028 .map(|(n, t, nullable)| TypedProperty {
1029 name: n.clone(),
1030 data_type: PropertyDataType::from_type_name(t),
1031 nullable: *nullable,
1032 default_value: None,
1033 })
1034 .collect(),
1035 constraints: constraints
1036 .iter()
1037 .map(|(kind, props)| match kind.as_str() {
1038 "unique" => TypeConstraint::Unique(props.clone()),
1039 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1040 "not_null" if !props.is_empty() => {
1041 TypeConstraint::NotNull(props[0].clone())
1042 }
1043 _ => TypeConstraint::Unique(props.clone()),
1044 })
1045 .collect(),
1046 parent_types: Vec::new(),
1047 };
1048 let _ = catalog.register_node_type(def);
1049 }
1050 WalRecord::DropNodeType { name } => {
1051 let _ = catalog.drop_node_type(name);
1052 }
1053 WalRecord::CreateEdgeType {
1054 name,
1055 properties,
1056 constraints,
1057 } => {
1058 let def = EdgeTypeDefinition {
1059 name: name.clone(),
1060 properties: properties
1061 .iter()
1062 .map(|(n, t, nullable)| TypedProperty {
1063 name: n.clone(),
1064 data_type: PropertyDataType::from_type_name(t),
1065 nullable: *nullable,
1066 default_value: None,
1067 })
1068 .collect(),
1069 constraints: constraints
1070 .iter()
1071 .map(|(kind, props)| match kind.as_str() {
1072 "unique" => TypeConstraint::Unique(props.clone()),
1073 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
1074 "not_null" if !props.is_empty() => {
1075 TypeConstraint::NotNull(props[0].clone())
1076 }
1077 _ => TypeConstraint::Unique(props.clone()),
1078 })
1079 .collect(),
1080 source_node_types: Vec::new(),
1081 target_node_types: Vec::new(),
1082 };
1083 let _ = catalog.register_edge_type_def(def);
1084 }
1085 WalRecord::DropEdgeType { name } => {
1086 let _ = catalog.drop_edge_type_def(name);
1087 }
1088 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
1089 }
1092 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
1093 }
1096 WalRecord::CreateGraphType {
1097 name,
1098 node_types,
1099 edge_types,
1100 open,
1101 } => {
1102 use crate::catalog::GraphTypeDefinition;
1103 let def = GraphTypeDefinition {
1104 name: name.clone(),
1105 allowed_node_types: node_types.clone(),
1106 allowed_edge_types: edge_types.clone(),
1107 open: *open,
1108 };
1109 let _ = catalog.register_graph_type(def);
1110 }
1111 WalRecord::DropGraphType { name } => {
1112 let _ = catalog.drop_graph_type(name);
1113 }
1114 WalRecord::CreateSchema { name } => {
1115 let _ = catalog.register_schema_namespace(name.clone());
1116 }
1117 WalRecord::DropSchema { name } => {
1118 let _ = catalog.drop_schema_namespace(name);
1119 }
1120
1121 WalRecord::AlterNodeType { name, alterations } => {
1122 for (action, prop_name, type_name, nullable) in alterations {
1123 match action.as_str() {
1124 "add" => {
1125 let prop = TypedProperty {
1126 name: prop_name.clone(),
1127 data_type: PropertyDataType::from_type_name(type_name),
1128 nullable: *nullable,
1129 default_value: None,
1130 };
1131 let _ = catalog.alter_node_type_add_property(name, prop);
1132 }
1133 "drop" => {
1134 let _ = catalog.alter_node_type_drop_property(name, prop_name);
1135 }
1136 _ => {}
1137 }
1138 }
1139 }
1140 WalRecord::AlterEdgeType { name, alterations } => {
1141 for (action, prop_name, type_name, nullable) in alterations {
1142 match action.as_str() {
1143 "add" => {
1144 let prop = TypedProperty {
1145 name: prop_name.clone(),
1146 data_type: PropertyDataType::from_type_name(type_name),
1147 nullable: *nullable,
1148 default_value: None,
1149 };
1150 let _ = catalog.alter_edge_type_add_property(name, prop);
1151 }
1152 "drop" => {
1153 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1154 }
1155 _ => {}
1156 }
1157 }
1158 }
1159 WalRecord::AlterGraphType { name, alterations } => {
1160 for (action, type_name) in alterations {
1161 match action.as_str() {
1162 "add_node" => {
1163 let _ =
1164 catalog.alter_graph_type_add_node_type(name, type_name.clone());
1165 }
1166 "drop_node" => {
1167 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1168 }
1169 "add_edge" => {
1170 let _ =
1171 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1172 }
1173 "drop_edge" => {
1174 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1175 }
1176 _ => {}
1177 }
1178 }
1179 }
1180
1181 WalRecord::CreateProcedure {
1182 name,
1183 params,
1184 returns,
1185 body,
1186 } => {
1187 use crate::catalog::ProcedureDefinition;
1188 let def = ProcedureDefinition {
1189 name: name.clone(),
1190 params: params.clone(),
1191 returns: returns.clone(),
1192 body: body.clone(),
1193 };
1194 let _ = catalog.register_procedure(def);
1195 }
1196 WalRecord::DropProcedure { name } => {
1197 let _ = catalog.drop_procedure(name);
1198 }
1199
1200 #[cfg(feature = "triple-store")]
1202 WalRecord::InsertRdfTriple { .. }
1203 | WalRecord::DeleteRdfTriple { .. }
1204 | WalRecord::ClearRdfGraph { .. }
1205 | WalRecord::CreateRdfGraph { .. }
1206 | WalRecord::DropRdfGraph { .. } => {
1207 rdf_ops::replay_rdf_wal_record(rdf_store, record);
1208 }
1209 #[cfg(not(feature = "triple-store"))]
1210 WalRecord::InsertRdfTriple { .. }
1211 | WalRecord::DeleteRdfTriple { .. }
1212 | WalRecord::ClearRdfGraph { .. }
1213 | WalRecord::CreateRdfGraph { .. }
1214 | WalRecord::DropRdfGraph { .. } => {}
1215
1216 WalRecord::TransactionCommit { .. } => {
1217 #[cfg(feature = "temporal")]
1221 {
1222 target_store.new_epoch();
1223 }
1224 }
1225 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1226 }
1229 WalRecord::EpochAdvance { .. } => {
1230 }
1233 }
1234 }
1235 Ok(())
1236 }
1237
1238 #[cfg(feature = "grafeo-file")]
1244 fn should_use_single_file(
1245 path: &std::path::Path,
1246 configured: crate::config::StorageFormat,
1247 ) -> bool {
1248 use crate::config::StorageFormat;
1249 match configured {
1250 StorageFormat::SingleFile => true,
1251 StorageFormat::WalDirectory => false,
1252 StorageFormat::Auto => {
1253 if path.is_file() {
1255 if let Ok(mut f) = std::fs::File::open(path) {
1256 use std::io::Read;
1257 let mut magic = [0u8; 4];
1258 if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1259 {
1260 return true;
1261 }
1262 }
1263 return false;
1264 }
1265 if path.is_dir() {
1267 return false;
1268 }
1269 path.extension().is_some_and(|ext| ext == "grafeo")
1271 }
1272 }
1273 }
1274
1275 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1279 fn apply_snapshot_data(
1280 store: &Arc<LpgStore>,
1281 catalog: &Arc<crate::catalog::Catalog>,
1282 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1283 data: &[u8],
1284 ) -> Result<()> {
1285 persistence::load_snapshot_into_store(
1287 store,
1288 catalog,
1289 #[cfg(feature = "triple-store")]
1290 rdf_store,
1291 data,
1292 )
1293 }
1294
1295 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1299 fn load_from_sections(
1300 fm: &GrafeoFileManager,
1301 store: &Arc<LpgStore>,
1302 catalog: &Arc<crate::catalog::Catalog>,
1303 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1304 ) -> Result<()> {
1305 use grafeo_common::storage::{Section, SectionType};
1306
1307 let dir = fm.read_section_directory()?.ok_or_else(|| {
1308 grafeo_common::utils::error::Error::Internal(
1309 "expected v2 section directory but found none".to_string(),
1310 )
1311 })?;
1312
1313 if let Some(entry) = dir.find(SectionType::Catalog) {
1315 let data = fm.read_section_data(entry)?;
1316 let tm = Arc::new(crate::transaction::TransactionManager::new());
1317 let mut section = catalog_section::CatalogSection::new(
1318 Arc::clone(catalog),
1319 Arc::clone(store),
1320 move || tm.current_epoch().as_u64(),
1321 );
1322 section.deserialize(&data)?;
1323 }
1324
1325 if let Some(entry) = dir.find(SectionType::LpgStore) {
1327 let data = fm.read_section_data(entry)?;
1328 let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1329 section.deserialize(&data)?;
1330 }
1331
1332 #[cfg(feature = "triple-store")]
1334 if let Some(entry) = dir.find(SectionType::RdfStore) {
1335 let data = fm.read_section_data(entry)?;
1336 let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1337 section.deserialize(&data)?;
1338 }
1339
1340 #[cfg(feature = "ring-index")]
1342 if let Some(entry) = dir.find(SectionType::RdfRing) {
1343 let data = fm.read_section_data(entry)?;
1344 let mut section = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
1345 section.deserialize(&data)?;
1346 }
1347
1348 #[cfg(feature = "vector-index")]
1350 if let Some(entry) = dir.find(SectionType::VectorStore) {
1351 let data = fm.read_section_data(entry)?;
1352 let indexes = store.vector_index_entries();
1353 if !indexes.is_empty() {
1354 let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1355 section.deserialize(&data)?;
1356 }
1357 }
1358
1359 #[cfg(feature = "text-index")]
1361 if let Some(entry) = dir.find(SectionType::TextIndex) {
1362 let data = fm.read_section_data(entry)?;
1363 let indexes = store.text_index_entries();
1364 if !indexes.is_empty() {
1365 let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1366 section.deserialize(&data)?;
1367 }
1368 }
1369
1370 Ok(())
1371 }
1372
1373 #[must_use]
1401 pub fn session(&self) -> Session {
1402 self.create_session_inner(None)
1403 }
1404
1405 #[must_use]
1423 pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1424 let force_read_only = !identity.can_write();
1425 self.create_session_inner_full(None, force_read_only, identity)
1426 }
1427
1428 #[must_use]
1442 pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1443 self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1444 }
1445
1446 #[cfg(feature = "cdc")]
1465 #[must_use]
1466 pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1467 self.create_session_inner(Some(cdc_enabled))
1468 }
1469
1470 #[deprecated(
1479 since = "0.5.36",
1480 note = "use session_with_role(Role::ReadOnly) instead"
1481 )]
1482 #[must_use]
1483 pub fn session_read_only(&self) -> Session {
1484 self.session_with_role(crate::auth::Role::ReadOnly)
1485 }
1486
1487 #[allow(unused_variables)] fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1493 self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1494 }
1495
1496 #[allow(unused_variables)]
1498 fn create_session_inner_full(
1499 &self,
1500 cdc_override: Option<bool>,
1501 force_read_only: bool,
1502 identity: crate::auth::Identity,
1503 ) -> Session {
1504 let session_cfg = || crate::session::SessionConfig {
1505 transaction_manager: Arc::clone(&self.transaction_manager),
1506 query_cache: Arc::clone(&self.query_cache),
1507 catalog: Arc::clone(&self.catalog),
1508 adaptive_config: self.config.adaptive.clone(),
1509 factorized_execution: self.config.factorized_execution,
1510 graph_model: self.config.graph_model,
1511 query_timeout: self.config.query_timeout,
1512 max_property_size: self.config.max_property_size,
1513 buffer_manager: Some(Arc::clone(&self.buffer_manager)),
1514 commit_counter: Arc::clone(&self.commit_counter),
1515 gc_interval: self.config.gc_interval,
1516 read_only: self.read_only || force_read_only,
1517 identity: identity.clone(),
1518 #[cfg(feature = "lpg")]
1519 projections: Arc::clone(&self.projections),
1520 };
1521
1522 #[cfg(all(feature = "compact-store", feature = "lpg"))]
1525 if let Some(ref layered) = self.layered_store {
1526 let overlay = Arc::clone(layered.overlay_store());
1527 let layered_arc = Arc::clone(layered);
1528 let mut session = Session::with_adaptive(overlay, session_cfg());
1529 session.override_stores(
1532 Arc::clone(&layered_arc) as Arc<dyn GraphStore>,
1533 Some(layered_arc as Arc<dyn GraphStoreMut>),
1534 );
1535 return session;
1536 }
1537
1538 if let Some(ref ext_read) = self.external_read_store {
1539 return Session::with_external_store(
1540 Arc::clone(ext_read),
1541 self.external_write_store.as_ref().map(Arc::clone),
1542 session_cfg(),
1543 )
1544 .expect("arena allocation for external store session");
1545 }
1546
1547 #[cfg(all(feature = "lpg", feature = "triple-store"))]
1548 let mut session = Session::with_rdf_store_and_adaptive(
1549 Arc::clone(self.lpg_store()),
1550 Arc::clone(&self.rdf_store),
1551 session_cfg(),
1552 );
1553 #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1554 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1555 #[cfg(not(feature = "lpg"))]
1556 let mut session =
1557 Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1558 .expect("session creation for non-lpg build");
1559
1560 #[cfg(all(feature = "wal", feature = "lpg"))]
1561 if let Some(ref wal) = self.wal {
1562 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1563 }
1564
1565 #[cfg(feature = "cdc")]
1566 {
1567 let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1568 if should_enable {
1569 session.set_cdc_log(Arc::clone(&self.cdc_log));
1570 }
1571 }
1572
1573 #[cfg(feature = "metrics")]
1574 {
1575 if let Some(ref m) = self.metrics {
1576 session.set_metrics(Arc::clone(m));
1577 m.session_created
1578 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1579 m.session_active
1580 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1581 }
1582 }
1583
1584 if let Some(ref graph) = *self.current_graph.read() {
1586 session.use_graph(graph);
1587 }
1588
1589 if let Some(ref schema) = *self.current_schema.read() {
1591 session.set_schema(schema);
1592 }
1593
1594 let _ = &mut session;
1596
1597 session
1598 }
1599
1600 #[must_use]
1606 pub fn current_graph(&self) -> Option<String> {
1607 self.current_graph.read().clone()
1608 }
1609
1610 pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1619 #[cfg(feature = "lpg")]
1620 if let Some(name) = name
1621 && !name.eq_ignore_ascii_case("default")
1622 && let Some(store) = &self.store
1623 && store.graph(name).is_none()
1624 {
1625 return Err(Error::Query(QueryError::new(
1626 QueryErrorKind::Semantic,
1627 format!("Graph '{name}' does not exist"),
1628 )));
1629 }
1630 *self.current_graph.write() = name.map(ToString::to_string);
1631 Ok(())
1632 }
1633
1634 #[must_use]
1639 pub fn current_schema(&self) -> Option<String> {
1640 self.current_schema.read().clone()
1641 }
1642
1643 pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1652 if let Some(name) = name
1653 && !self.catalog.schema_exists(name)
1654 {
1655 return Err(Error::Query(QueryError::new(
1656 QueryErrorKind::Semantic,
1657 format!("Schema '{name}' does not exist"),
1658 )));
1659 }
1660 *self.current_schema.write() = name.map(ToString::to_string);
1661 Ok(())
1662 }
1663
1664 #[must_use]
1666 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1667 &self.config.adaptive
1668 }
1669
1670 #[must_use]
1672 pub fn is_read_only(&self) -> bool {
1673 self.read_only
1674 }
1675
1676 #[must_use]
1678 pub fn config(&self) -> &Config {
1679 &self.config
1680 }
1681
1682 #[must_use]
1684 pub fn graph_model(&self) -> crate::config::GraphModel {
1685 self.config.graph_model
1686 }
1687
1688 #[must_use]
1690 pub fn memory_limit(&self) -> Option<usize> {
1691 self.config.memory_limit
1692 }
1693
1694 #[cfg(feature = "metrics")]
1699 #[must_use]
1700 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1701 let mut snapshot = self
1702 .metrics
1703 .as_ref()
1704 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1705
1706 let cache_stats = self.query_cache.stats();
1708 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1709 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1710 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1711 snapshot.cache_invalidations = cache_stats.invalidations;
1712
1713 snapshot
1714 }
1715
1716 #[cfg(feature = "metrics")]
1720 #[must_use]
1721 pub fn metrics_prometheus(&self) -> String {
1722 self.metrics
1723 .as_ref()
1724 .map_or_else(String::new, |m| m.to_prometheus())
1725 }
1726
1727 #[cfg(feature = "metrics")]
1729 pub fn reset_metrics(&self) {
1730 if let Some(ref m) = self.metrics {
1731 m.reset();
1732 }
1733 self.query_cache.reset_stats();
1734 }
1735
1736 #[cfg(feature = "lpg")]
1744 #[must_use]
1745 pub fn store(&self) -> &Arc<LpgStore> {
1746 self.lpg_store()
1747 }
1748
1749 #[cfg(feature = "lpg")]
1757 pub fn create_graph(&self, name: &str) -> Result<bool> {
1758 Ok(self.lpg_store().create_graph(name)?)
1759 }
1760
1761 #[cfg(feature = "lpg")]
1766 pub fn drop_graph(&self, name: &str) -> bool {
1767 let Some(store) = &self.store else {
1768 return false;
1769 };
1770 let dropped = store.drop_graph(name);
1771 if dropped {
1772 let mut current = self.current_graph.write();
1773 if current
1774 .as_deref()
1775 .is_some_and(|g| g.eq_ignore_ascii_case(name))
1776 {
1777 *current = None;
1778 }
1779 }
1780 dropped
1781 }
1782
1783 #[cfg(feature = "lpg")]
1785 #[must_use]
1786 pub fn list_graphs(&self) -> Vec<String> {
1787 self.lpg_store().graph_names()
1788 }
1789
1790 pub fn create_projection(
1811 &self,
1812 name: impl Into<String>,
1813 spec: grafeo_core::graph::ProjectionSpec,
1814 ) -> bool {
1815 use grafeo_core::graph::GraphProjection;
1816 use std::collections::hash_map::Entry;
1817
1818 let store = self.graph_store();
1819 let projection = Arc::new(GraphProjection::new(store, spec));
1820 let mut projections = self.projections.write();
1821 match projections.entry(name.into()) {
1822 Entry::Occupied(_) => false,
1823 Entry::Vacant(e) => {
1824 e.insert(projection);
1825 true
1826 }
1827 }
1828 }
1829
1830 pub fn drop_projection(&self, name: &str) -> bool {
1832 self.projections.write().remove(name).is_some()
1833 }
1834
1835 #[must_use]
1837 pub fn list_projections(&self) -> Vec<String> {
1838 self.projections.read().keys().cloned().collect()
1839 }
1840
1841 #[must_use]
1843 pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStore>> {
1844 self.projections
1845 .read()
1846 .get(name)
1847 .map(|p| Arc::clone(p) as Arc<dyn GraphStore>)
1848 }
1849
1850 #[must_use]
1859 pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1860 if let Some(ref ext_read) = self.external_read_store {
1861 Arc::clone(ext_read)
1862 } else {
1863 #[cfg(feature = "lpg")]
1864 {
1865 Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1866 }
1867 #[cfg(not(feature = "lpg"))]
1868 unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1869 }
1870 }
1871
1872 #[must_use]
1877 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1878 if self.external_read_store.is_some() {
1879 self.external_write_store.as_ref().map(Arc::clone)
1880 } else {
1881 #[cfg(feature = "lpg")]
1882 {
1883 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1884 }
1885 #[cfg(not(feature = "lpg"))]
1886 {
1887 None
1888 }
1889 }
1890 }
1891
1892 pub fn gc(&self) {
1899 #[cfg(feature = "lpg")]
1900 let current_epoch = {
1901 let min_epoch = self.transaction_manager.min_active_epoch();
1902 self.lpg_store().gc_versions(min_epoch);
1903 self.transaction_manager.current_epoch()
1904 };
1905 self.transaction_manager.gc();
1906
1907 #[cfg(feature = "cdc")]
1909 if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
1910 #[cfg(feature = "lpg")]
1911 self.cdc_log.apply_retention(current_epoch);
1912 }
1913 }
1914
1915 #[must_use]
1917 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1918 &self.buffer_manager
1919 }
1920
1921 #[must_use]
1923 pub fn query_cache(&self) -> &Arc<QueryCache> {
1924 &self.query_cache
1925 }
1926
1927 pub fn clear_plan_cache(&self) {
1933 self.query_cache.clear();
1934 }
1935
1936 pub fn close(&self) -> Result<()> {
1950 let mut is_open = self.is_open.write();
1951 if !*is_open {
1952 return Ok(());
1953 }
1954
1955 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1960 if let Some(mut timer) = self.checkpoint_timer.lock().take() {
1961 timer.stop();
1962 }
1963
1964 if self.read_only {
1966 #[cfg(feature = "grafeo-file")]
1967 if let Some(ref fm) = self.file_manager {
1968 fm.close()?;
1969 }
1970 *is_open = false;
1971 return Ok(());
1972 }
1973
1974 #[cfg(feature = "grafeo-file")]
1978 let is_single_file = self.file_manager.is_some();
1979 #[cfg(not(feature = "grafeo-file"))]
1980 let is_single_file = false;
1981
1982 #[cfg(feature = "grafeo-file")]
1983 if let Some(ref fm) = self.file_manager {
1984 #[cfg(feature = "wal")]
1986 if let Some(ref wal) = self.wal {
1987 wal.sync()?;
1988 }
1989 let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
1990
1991 #[cfg(feature = "wal")]
1997 let flush_result = if flush_result.sections_written == 0 {
1998 if let Some(ref wal) = self.wal {
1999 if wal.record_count() > 0 {
2000 grafeo_warn!(
2001 "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
2002 wal.record_count()
2003 );
2004 self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
2005 } else {
2006 flush_result
2007 }
2008 } else {
2009 flush_result
2010 }
2011 } else {
2012 flush_result
2013 };
2014
2015 #[cfg(feature = "wal")]
2018 if let Some(ref wal) = self.wal {
2019 wal.close_active_log();
2020 }
2021
2022 #[cfg(feature = "wal")]
2026 let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
2027 #[cfg(not(feature = "wal"))]
2028 let has_wal_records = false;
2029
2030 if flush_result.sections_written > 0 || !has_wal_records {
2031 {
2032 use grafeo_common::testing::crash::maybe_crash;
2033 maybe_crash("close:before_remove_sidecar_wal");
2034 }
2035 fm.remove_sidecar_wal()?;
2036 } else {
2037 grafeo_warn!(
2038 "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
2039 );
2040 }
2041 fm.close()?;
2042 }
2043
2044 #[cfg(feature = "wal")]
2050 if !is_single_file && let Some(ref wal) = self.wal {
2051 let commit_tx = self
2053 .transaction_manager
2054 .last_assigned_transaction_id()
2055 .unwrap_or_else(|| self.transaction_manager.begin());
2056
2057 wal.log(&WalRecord::TransactionCommit {
2059 transaction_id: commit_tx,
2060 })?;
2061
2062 wal.sync()?;
2063 }
2064
2065 *is_open = false;
2066 Ok(())
2067 }
2068
2069 #[cfg(feature = "wal")]
2071 #[must_use]
2072 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
2073 self.wal.as_ref()
2074 }
2075
2076 #[cfg(feature = "wal")]
2078 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
2079 if let Some(ref wal) = self.wal {
2080 wal.log(record)?;
2081 }
2082 Ok(())
2083 }
2084
2085 fn register_section_consumers(&mut self) {
2090 #[cfg(feature = "lpg")]
2091 let store_ref = self.store.as_ref();
2092 #[cfg(not(feature = "lpg"))]
2093 #[cfg(feature = "lpg")]
2095 if let Some(store) = store_ref {
2096 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2097 self.buffer_manager.register_consumer(Arc::new(
2098 section_consumer::SectionConsumer::new(Arc::new(lpg)),
2099 ));
2100 }
2101
2102 #[cfg(feature = "triple-store")]
2104 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2105 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2106 self.buffer_manager.register_consumer(Arc::new(
2107 section_consumer::SectionConsumer::new(Arc::new(rdf)),
2108 ));
2109 }
2110
2111 #[cfg(feature = "ring-index")]
2113 if self.rdf_store.ring().is_some() {
2114 let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2115 self.buffer_manager.register_consumer(Arc::new(
2116 section_consumer::SectionConsumer::new(Arc::new(ring)),
2117 ));
2118 }
2119
2120 #[cfg(all(
2123 feature = "lpg",
2124 feature = "vector-index",
2125 feature = "mmap",
2126 not(feature = "temporal")
2127 ))]
2128 if let Some(store) = store_ref {
2129 let spill_path = self.buffer_manager.config().spill_path.clone();
2130 let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2131 store, spill_path,
2132 ));
2133 self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2135 self.buffer_manager.register_consumer(consumer);
2136 }
2137
2138 #[cfg(all(feature = "lpg", feature = "text-index"))]
2140 if let Some(store) = store_ref {
2141 self.buffer_manager
2142 .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2143 }
2144
2145 #[cfg(feature = "cdc")]
2148 self.buffer_manager.register_consumer(
2149 Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2150 );
2151 }
2152
2153 #[cfg(all(
2160 feature = "lpg",
2161 feature = "vector-index",
2162 feature = "mmap",
2163 not(feature = "temporal")
2164 ))]
2165 fn restore_spill_files(&mut self) {
2166 use grafeo_core::index::vector::MmapStorage;
2167
2168 let spill_dir = match self.buffer_manager.config().spill_path {
2169 Some(ref path) => path.clone(),
2170 None => return,
2171 };
2172
2173 if !spill_dir.exists() {
2174 return;
2175 }
2176
2177 let spill_map = match self.vector_spill_storages {
2178 Some(ref map) => Arc::clone(map),
2179 None => return,
2180 };
2181
2182 let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2183 return;
2184 };
2185
2186 let Some(ref store) = self.store else {
2187 return;
2188 };
2189
2190 for entry in entries.flatten() {
2191 let path = entry.path();
2192 let file_name = match path.file_name().and_then(|n| n.to_str()) {
2193 Some(name) => name.to_string(),
2194 None => continue,
2195 };
2196
2197 if !file_name.starts_with("vectors_")
2199 || !std::path::Path::new(&file_name)
2200 .extension()
2201 .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2202 {
2203 continue;
2204 }
2205
2206 let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2208
2209 let key = key_part.replace("%3A", ":").replace("%25", "%");
2211
2212 if !key.contains(':') {
2214 continue;
2216 }
2217
2218 if store.get_vector_index_by_key(&key).is_none() {
2220 let _ = std::fs::remove_file(&path);
2222 continue;
2223 }
2224
2225 match MmapStorage::open(&path) {
2227 Ok(mmap_storage) => {
2228 let property = key.split(':').nth(1).unwrap_or("");
2230 let prop_key = grafeo_common::types::PropertyKey::new(property);
2231 store.node_properties_mark_spilled(&prop_key);
2232
2233 spill_map.write().insert(key, Arc::new(mmap_storage));
2234 }
2235 Err(e) => {
2236 eprintln!("failed to restore spill file {}: {e}", path.display());
2237 let _ = std::fs::remove_file(&path);
2239 }
2240 }
2241 }
2242 }
2243
2244 #[cfg(feature = "grafeo-file")]
2246 fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2247 let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2248
2249 #[cfg(all(feature = "compact-store", feature = "lpg"))]
2251 if let Some(ref layered) = self.layered_store {
2252 let compact_section = grafeo_core::graph::compact::section::CompactStoreSection::new(
2254 layered.base_store_arc(),
2255 );
2256 sections.push(Box::new(compact_section));
2257
2258 let overlay = layered.overlay_store();
2260 let overlay_section =
2261 grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(overlay));
2262 sections.push(Box::new(overlay_section));
2263
2264 return sections;
2265 }
2266
2267 #[cfg(feature = "lpg")]
2269 if let Some(store) = self.store.as_ref() {
2270 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2271
2272 let catalog = catalog_section::CatalogSection::new(
2273 Arc::clone(&self.catalog),
2274 Arc::clone(store),
2275 {
2276 let tm = Arc::clone(&self.transaction_manager);
2277 move || tm.current_epoch().as_u64()
2278 },
2279 );
2280
2281 sections.push(Box::new(catalog));
2282 sections.push(Box::new(lpg));
2283
2284 #[cfg(feature = "vector-index")]
2286 {
2287 let indexes = store.vector_index_entries();
2288 if !indexes.is_empty() {
2289 let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2290 sections.push(Box::new(vector));
2291 }
2292 }
2293
2294 #[cfg(feature = "text-index")]
2296 {
2297 let indexes = store.text_index_entries();
2298 if !indexes.is_empty() {
2299 let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2300 sections.push(Box::new(text));
2301 }
2302 }
2303 }
2304
2305 #[cfg(feature = "triple-store")]
2306 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2307 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2308 sections.push(Box::new(rdf));
2309 }
2310
2311 #[cfg(feature = "ring-index")]
2312 if self.rdf_store.ring().is_some() {
2313 let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2314 sections.push(Box::new(ring));
2315 }
2316
2317 sections
2318 }
2319
2320 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2334 pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2335 let fm = self
2336 .file_manager
2337 .as_ref()
2338 .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2339
2340 if !self.read_only {
2344 let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2345 }
2346
2347 let current_epoch = self.transaction_manager.current_epoch();
2348 backup::do_backup_full(backup_dir, fm, self.wal.as_deref(), current_epoch)
2349 }
2350
2351 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2359 pub fn backup_incremental(
2360 &self,
2361 backup_dir: &std::path::Path,
2362 ) -> Result<backup::BackupSegment> {
2363 let wal = self
2364 .wal
2365 .as_ref()
2366 .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2367
2368 let current_epoch = self.transaction_manager.current_epoch();
2369 backup::do_backup_incremental(backup_dir, wal, current_epoch)
2370 }
2371
2372 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2378 pub fn read_backup_manifest(
2379 backup_dir: &std::path::Path,
2380 ) -> Result<Option<backup::BackupManifest>> {
2381 backup::read_manifest(backup_dir)
2382 }
2383
2384 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2386 #[must_use]
2387 pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2388 self.wal
2389 .as_ref()
2390 .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2391 }
2392
2393 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2404 pub fn restore_to_epoch(
2405 backup_dir: &std::path::Path,
2406 target_epoch: grafeo_common::types::EpochId,
2407 output_path: &std::path::Path,
2408 ) -> Result<()> {
2409 backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2410 }
2411
2412 #[cfg(feature = "grafeo-file")]
2418 fn checkpoint_to_file(
2419 &self,
2420 fm: &GrafeoFileManager,
2421 reason: flush::FlushReason,
2422 ) -> Result<flush::FlushResult> {
2423 let sections = self.build_sections();
2424 let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2425 sections.iter().map(|s| s.as_ref()).collect();
2426 #[cfg(feature = "lpg")]
2427 let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2428 #[cfg(not(feature = "lpg"))]
2429 let context = flush::build_context_minimal(&self.transaction_manager);
2430
2431 flush::flush(
2432 fm,
2433 §ion_refs,
2434 &context,
2435 reason,
2436 #[cfg(feature = "wal")]
2437 self.wal.as_deref(),
2438 )
2439 }
2440
2441 #[cfg(feature = "grafeo-file")]
2443 #[must_use]
2444 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2445 self.file_manager.as_ref()
2446 }
2447}
2448
2449impl Drop for GrafeoDB {
2450 fn drop(&mut self) {
2451 if let Err(e) = self.close() {
2452 grafeo_error!("Error closing database: {}", e);
2453 }
2454 }
2455}
2456
2457#[cfg(feature = "lpg")]
2458impl crate::admin::AdminService for GrafeoDB {
2459 fn info(&self) -> crate::admin::DatabaseInfo {
2460 self.info()
2461 }
2462
2463 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2464 self.detailed_stats()
2465 }
2466
2467 fn schema(&self) -> crate::admin::SchemaInfo {
2468 self.schema()
2469 }
2470
2471 fn validate(&self) -> crate::admin::ValidationResult {
2472 self.validate()
2473 }
2474
2475 fn wal_status(&self) -> crate::admin::WalStatus {
2476 self.wal_status()
2477 }
2478
2479 fn wal_checkpoint(&self) -> Result<()> {
2480 self.wal_checkpoint()
2481 }
2482}
2483
2484#[derive(Debug)]
2514pub struct QueryResult {
2515 pub columns: Vec<String>,
2517 pub column_types: Vec<grafeo_common::types::LogicalType>,
2519 pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2524 pub execution_time_ms: Option<f64>,
2526 pub rows_scanned: Option<u64>,
2528 pub status_message: Option<String>,
2530 pub gql_status: grafeo_common::utils::GqlStatus,
2532}
2533
2534impl QueryResult {
2535 #[must_use]
2537 pub fn empty() -> Self {
2538 Self {
2539 columns: Vec::new(),
2540 column_types: Vec::new(),
2541 rows: Vec::new(),
2542 execution_time_ms: None,
2543 rows_scanned: None,
2544 status_message: None,
2545 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2546 }
2547 }
2548
2549 #[must_use]
2551 pub fn status(msg: impl Into<String>) -> Self {
2552 Self {
2553 columns: Vec::new(),
2554 column_types: Vec::new(),
2555 rows: Vec::new(),
2556 execution_time_ms: None,
2557 rows_scanned: None,
2558 status_message: Some(msg.into()),
2559 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2560 }
2561 }
2562
2563 #[must_use]
2565 pub fn new(columns: Vec<String>) -> Self {
2566 let len = columns.len();
2567 Self {
2568 columns,
2569 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2570 rows: Vec::new(),
2571 execution_time_ms: None,
2572 rows_scanned: None,
2573 status_message: None,
2574 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2575 }
2576 }
2577
2578 #[must_use]
2580 pub fn with_types(
2581 columns: Vec<String>,
2582 column_types: Vec<grafeo_common::types::LogicalType>,
2583 ) -> Self {
2584 Self {
2585 columns,
2586 column_types,
2587 rows: Vec::new(),
2588 execution_time_ms: None,
2589 rows_scanned: None,
2590 status_message: None,
2591 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2592 }
2593 }
2594
2595 #[must_use]
2597 pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2598 let len = columns.len();
2599 Self {
2600 columns,
2601 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2602 rows,
2603 execution_time_ms: None,
2604 rows_scanned: None,
2605 status_message: None,
2606 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2607 }
2608 }
2609
2610 pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2612 self.rows.push(row);
2613 }
2614
2615 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2617 self.execution_time_ms = Some(execution_time_ms);
2618 self.rows_scanned = Some(rows_scanned);
2619 self
2620 }
2621
2622 #[must_use]
2624 pub fn execution_time_ms(&self) -> Option<f64> {
2625 self.execution_time_ms
2626 }
2627
2628 #[must_use]
2630 pub fn rows_scanned(&self) -> Option<u64> {
2631 self.rows_scanned
2632 }
2633
2634 #[must_use]
2636 pub fn row_count(&self) -> usize {
2637 self.rows.len()
2638 }
2639
2640 #[must_use]
2642 pub fn column_count(&self) -> usize {
2643 self.columns.len()
2644 }
2645
2646 #[must_use]
2648 pub fn is_empty(&self) -> bool {
2649 self.rows.is_empty()
2650 }
2651
2652 pub fn scalar<T: FromValue>(&self) -> Result<T> {
2661 if self.rows.len() != 1 || self.columns.len() != 1 {
2662 return Err(grafeo_common::utils::error::Error::InvalidValue(
2663 "Expected single value".to_string(),
2664 ));
2665 }
2666 T::from_value(&self.rows[0][0])
2667 }
2668
2669 #[must_use]
2671 pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2672 &self.rows
2673 }
2674
2675 #[must_use]
2677 pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2678 self.rows
2679 }
2680
2681 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2683 self.rows.iter()
2684 }
2685
2686 #[cfg(feature = "arrow-export")]
2701 pub fn to_record_batch(
2702 &self,
2703 ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2704 arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2705 }
2706
2707 #[cfg(feature = "arrow-export")]
2718 pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2719 let batch = self.to_record_batch()?;
2720 arrow::record_batch_to_ipc_stream(&batch)
2721 }
2722}
2723
2724impl std::fmt::Display for QueryResult {
2725 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2726 let table = grafeo_common::fmt::format_result_table(
2727 &self.columns,
2728 &self.rows,
2729 self.execution_time_ms,
2730 self.status_message.as_deref(),
2731 );
2732 f.write_str(&table)
2733 }
2734}
2735
2736pub trait FromValue: Sized {
2741 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2747}
2748
2749impl FromValue for i64 {
2750 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2751 value
2752 .as_int64()
2753 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2754 expected: "INT64".to_string(),
2755 found: value.type_name().to_string(),
2756 })
2757 }
2758}
2759
2760impl FromValue for f64 {
2761 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2762 value
2763 .as_float64()
2764 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2765 expected: "FLOAT64".to_string(),
2766 found: value.type_name().to_string(),
2767 })
2768 }
2769}
2770
2771impl FromValue for String {
2772 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2773 value.as_str().map(String::from).ok_or_else(|| {
2774 grafeo_common::utils::error::Error::TypeMismatch {
2775 expected: "STRING".to_string(),
2776 found: value.type_name().to_string(),
2777 }
2778 })
2779 }
2780}
2781
2782impl FromValue for bool {
2783 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2784 value
2785 .as_bool()
2786 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2787 expected: "BOOL".to_string(),
2788 found: value.type_name().to_string(),
2789 })
2790 }
2791}
2792
2793#[cfg(test)]
2794mod tests {
2795 use super::*;
2796
2797 #[test]
2798 fn test_create_in_memory_database() {
2799 let db = GrafeoDB::new_in_memory();
2800 assert_eq!(db.node_count(), 0);
2801 assert_eq!(db.edge_count(), 0);
2802 }
2803
2804 #[test]
2805 fn test_database_config() {
2806 let config = Config::in_memory().with_threads(4).with_query_logging();
2807
2808 let db = GrafeoDB::with_config(config).unwrap();
2809 assert_eq!(db.config().threads, 4);
2810 assert!(db.config().query_logging);
2811 }
2812
2813 #[test]
2814 fn test_database_session() {
2815 let db = GrafeoDB::new_in_memory();
2816 let _session = db.session();
2817 }
2819
2820 #[cfg(feature = "wal")]
2821 #[test]
2822 fn test_persistent_database_recovery() {
2823 use grafeo_common::types::Value;
2824 use tempfile::tempdir;
2825
2826 let dir = tempdir().unwrap();
2827 let db_path = dir.path().join("test_db");
2828
2829 {
2831 let db = GrafeoDB::open(&db_path).unwrap();
2832
2833 let alix = db.create_node(&["Person"]);
2834 db.set_node_property(alix, "name", Value::from("Alix"));
2835
2836 let gus = db.create_node(&["Person"]);
2837 db.set_node_property(gus, "name", Value::from("Gus"));
2838
2839 let _edge = db.create_edge(alix, gus, "KNOWS");
2840
2841 db.close().unwrap();
2843 }
2844
2845 {
2847 let db = GrafeoDB::open(&db_path).unwrap();
2848
2849 assert_eq!(db.node_count(), 2);
2850 assert_eq!(db.edge_count(), 1);
2851
2852 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2854 assert!(node0.is_some());
2855
2856 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2857 assert!(node1.is_some());
2858 }
2859 }
2860
2861 #[cfg(feature = "wal")]
2862 #[test]
2863 fn test_wal_logging() {
2864 use tempfile::tempdir;
2865
2866 let dir = tempdir().unwrap();
2867 let db_path = dir.path().join("wal_test_db");
2868
2869 let db = GrafeoDB::open(&db_path).unwrap();
2870
2871 let node = db.create_node(&["Test"]);
2873 db.delete_node(node);
2874
2875 if let Some(wal) = db.wal() {
2877 assert!(wal.record_count() > 0);
2878 }
2879
2880 db.close().unwrap();
2881 }
2882
2883 #[cfg(feature = "wal")]
2884 #[test]
2885 fn test_wal_recovery_multiple_sessions() {
2886 use grafeo_common::types::Value;
2888 use tempfile::tempdir;
2889
2890 let dir = tempdir().unwrap();
2891 let db_path = dir.path().join("multi_session_db");
2892
2893 {
2895 let db = GrafeoDB::open(&db_path).unwrap();
2896 let alix = db.create_node(&["Person"]);
2897 db.set_node_property(alix, "name", Value::from("Alix"));
2898 db.close().unwrap();
2899 }
2900
2901 {
2903 let db = GrafeoDB::open(&db_path).unwrap();
2904 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
2906 db.set_node_property(gus, "name", Value::from("Gus"));
2907 db.close().unwrap();
2908 }
2909
2910 {
2912 let db = GrafeoDB::open(&db_path).unwrap();
2913 assert_eq!(db.node_count(), 2);
2914
2915 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2917 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2918
2919 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2920 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2921 }
2922 }
2923
2924 #[cfg(feature = "wal")]
2925 #[test]
2926 fn test_database_consistency_after_mutations() {
2927 use grafeo_common::types::Value;
2929 use tempfile::tempdir;
2930
2931 let dir = tempdir().unwrap();
2932 let db_path = dir.path().join("consistency_db");
2933
2934 {
2935 let db = GrafeoDB::open(&db_path).unwrap();
2936
2937 let a = db.create_node(&["Node"]);
2939 let b = db.create_node(&["Node"]);
2940 let c = db.create_node(&["Node"]);
2941
2942 let e1 = db.create_edge(a, b, "LINKS");
2944 let _e2 = db.create_edge(b, c, "LINKS");
2945
2946 db.delete_edge(e1);
2948 db.delete_node(b);
2949
2950 db.set_node_property(a, "value", Value::Int64(1));
2952 db.set_node_property(c, "value", Value::Int64(3));
2953
2954 db.close().unwrap();
2955 }
2956
2957 {
2959 let db = GrafeoDB::open(&db_path).unwrap();
2960
2961 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2965 assert!(node_a.is_some());
2966
2967 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2968 assert!(node_c.is_some());
2969
2970 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2972 assert!(node_b.is_none());
2973 }
2974 }
2975
2976 #[cfg(feature = "wal")]
2977 #[test]
2978 fn test_close_is_idempotent() {
2979 use tempfile::tempdir;
2981
2982 let dir = tempdir().unwrap();
2983 let db_path = dir.path().join("close_test_db");
2984
2985 let db = GrafeoDB::open(&db_path).unwrap();
2986 db.create_node(&["Test"]);
2987
2988 assert!(db.close().is_ok());
2990
2991 assert!(db.close().is_ok());
2993 }
2994
2995 #[test]
2996 fn test_with_store_external_backend() {
2997 use grafeo_core::graph::lpg::LpgStore;
2998
2999 let external = Arc::new(LpgStore::new().unwrap());
3000
3001 let n1 = external.create_node(&["Person"]);
3003 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
3004
3005 let db = GrafeoDB::with_store(
3006 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
3007 Config::in_memory(),
3008 )
3009 .unwrap();
3010
3011 let session = db.session();
3012
3013 #[cfg(feature = "gql")]
3015 {
3016 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
3017 assert_eq!(result.rows.len(), 1);
3018 }
3019 }
3020
3021 #[test]
3022 fn test_with_config_custom_memory_limit() {
3023 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
3026 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
3027 assert_eq!(db.node_count(), 0);
3028 }
3029
3030 #[cfg(feature = "metrics")]
3031 #[test]
3032 fn test_database_metrics_registry() {
3033 let db = GrafeoDB::new_in_memory();
3034
3035 db.create_node(&["Person"]);
3037 db.create_node(&["Person"]);
3038
3039 let snap = db.metrics();
3041 assert_eq!(snap.query_count, 0); }
3044
3045 #[test]
3046 fn test_query_result_has_metrics() {
3047 let db = GrafeoDB::new_in_memory();
3049 db.create_node(&["Person"]);
3050 db.create_node(&["Person"]);
3051
3052 #[cfg(feature = "gql")]
3053 {
3054 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3055
3056 assert!(result.execution_time_ms.is_some());
3058 assert!(result.rows_scanned.is_some());
3059 assert!(result.execution_time_ms.unwrap() >= 0.0);
3060 assert_eq!(result.rows_scanned.unwrap(), 2);
3061 }
3062 }
3063
3064 #[test]
3065 fn test_empty_query_result_metrics() {
3066 let db = GrafeoDB::new_in_memory();
3068 db.create_node(&["Person"]);
3069
3070 #[cfg(feature = "gql")]
3071 {
3072 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3074
3075 assert!(result.execution_time_ms.is_some());
3076 assert!(result.rows_scanned.is_some());
3077 assert_eq!(result.rows_scanned.unwrap(), 0);
3078 }
3079 }
3080
3081 #[cfg(feature = "cdc")]
3082 mod cdc_integration {
3083 use super::*;
3084
3085 fn cdc_db() -> GrafeoDB {
3087 GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
3088 }
3089
3090 #[test]
3091 fn test_node_lifecycle_history() {
3092 let db = cdc_db();
3093
3094 let id = db.create_node(&["Person"]);
3096 db.set_node_property(id, "name", "Alix".into());
3098 db.set_node_property(id, "name", "Gus".into());
3099 db.delete_node(id);
3101
3102 let history = db.history(id).unwrap();
3103 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3105 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3106 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3108 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3110 }
3111
3112 #[test]
3113 fn test_edge_lifecycle_history() {
3114 let db = cdc_db();
3115
3116 let alix = db.create_node(&["Person"]);
3117 let gus = db.create_node(&["Person"]);
3118 let edge = db.create_edge(alix, gus, "KNOWS");
3119 db.set_edge_property(edge, "since", 2024i64.into());
3120 db.delete_edge(edge);
3121
3122 let history = db.history(edge).unwrap();
3123 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3125 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3126 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3127 }
3128
3129 #[test]
3130 fn test_create_node_with_props_cdc() {
3131 let db = cdc_db();
3132
3133 let id = db.create_node_with_props(
3134 &["Person"],
3135 vec![
3136 ("name", grafeo_common::types::Value::from("Alix")),
3137 ("age", grafeo_common::types::Value::from(30i64)),
3138 ],
3139 );
3140
3141 let history = db.history(id).unwrap();
3142 assert_eq!(history.len(), 1);
3143 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3144 let after = history[0].after.as_ref().unwrap();
3146 assert_eq!(after.len(), 2);
3147 }
3148
3149 #[test]
3150 fn test_changes_between() {
3151 let db = cdc_db();
3152
3153 let id1 = db.create_node(&["A"]);
3154 let _id2 = db.create_node(&["B"]);
3155 db.set_node_property(id1, "x", 1i64.into());
3156
3157 let changes = db
3159 .changes_between(
3160 grafeo_common::types::EpochId(0),
3161 grafeo_common::types::EpochId(u64::MAX),
3162 )
3163 .unwrap();
3164 assert_eq!(changes.len(), 3); }
3166
3167 #[test]
3168 fn test_cdc_disabled_by_default() {
3169 let db = GrafeoDB::new_in_memory();
3170 assert!(!db.is_cdc_enabled());
3171
3172 let id = db.create_node(&["Person"]);
3173 db.set_node_property(id, "name", "Alix".into());
3174
3175 let history = db.history(id).unwrap();
3176 assert!(history.is_empty(), "CDC off by default: no events recorded");
3177 }
3178
3179 #[test]
3180 fn test_session_with_cdc_override_on() {
3181 let db = GrafeoDB::new_in_memory();
3183 let session = db.session_with_cdc(true);
3184 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3185 let changes = db
3187 .changes_between(
3188 grafeo_common::types::EpochId(0),
3189 grafeo_common::types::EpochId(u64::MAX),
3190 )
3191 .unwrap();
3192 assert!(
3193 !changes.is_empty(),
3194 "session_with_cdc(true) should record events"
3195 );
3196 }
3197
3198 #[test]
3199 fn test_session_with_cdc_override_off() {
3200 let db = cdc_db();
3202 let session = db.session_with_cdc(false);
3203 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3204 let changes = db
3205 .changes_between(
3206 grafeo_common::types::EpochId(0),
3207 grafeo_common::types::EpochId(u64::MAX),
3208 )
3209 .unwrap();
3210 assert!(
3211 changes.is_empty(),
3212 "session_with_cdc(false) should not record events"
3213 );
3214 }
3215
3216 #[test]
3217 fn test_set_cdc_enabled_runtime() {
3218 let db = GrafeoDB::new_in_memory();
3219 assert!(!db.is_cdc_enabled());
3220
3221 db.set_cdc_enabled(true);
3223 assert!(db.is_cdc_enabled());
3224
3225 let id = db.create_node(&["Person"]);
3226 let history = db.history(id).unwrap();
3227 assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3228
3229 db.set_cdc_enabled(false);
3231 let id2 = db.create_node(&["Person"]);
3232 let history2 = db.history(id2).unwrap();
3233 assert!(
3234 history2.is_empty(),
3235 "CDC disabled at runtime stops recording"
3236 );
3237 }
3238 }
3239
3240 #[test]
3241 fn test_with_store_basic() {
3242 use grafeo_core::graph::lpg::LpgStore;
3243
3244 let store = Arc::new(LpgStore::new().unwrap());
3245 let n1 = store.create_node(&["Person"]);
3246 store.set_node_property(n1, "name", "Alix".into());
3247
3248 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3249 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3250
3251 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3252 assert_eq!(result.rows.len(), 1);
3253 }
3254
3255 #[test]
3256 fn test_with_store_session() {
3257 use grafeo_core::graph::lpg::LpgStore;
3258
3259 let store = Arc::new(LpgStore::new().unwrap());
3260 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3261 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3262
3263 let session = db.session();
3264 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3265 assert_eq!(result.rows.len(), 1);
3266 }
3267
3268 #[test]
3269 fn test_with_store_mutations() {
3270 use grafeo_core::graph::lpg::LpgStore;
3271
3272 let store = Arc::new(LpgStore::new().unwrap());
3273 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3274 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3275
3276 let mut session = db.session();
3277
3278 session.begin_transaction().unwrap();
3282 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3283
3284 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3285 assert_eq!(result.rows.len(), 1);
3286
3287 session.commit().unwrap();
3288 }
3289
3290 #[test]
3295 fn test_query_result_empty() {
3296 let result = QueryResult::empty();
3297 assert!(result.is_empty());
3298 assert_eq!(result.row_count(), 0);
3299 assert_eq!(result.column_count(), 0);
3300 assert!(result.execution_time_ms().is_none());
3301 assert!(result.rows_scanned().is_none());
3302 assert!(result.status_message.is_none());
3303 }
3304
3305 #[test]
3306 fn test_query_result_status() {
3307 let result = QueryResult::status("Created node type 'Person'");
3308 assert!(result.is_empty());
3309 assert_eq!(result.column_count(), 0);
3310 assert_eq!(
3311 result.status_message.as_deref(),
3312 Some("Created node type 'Person'")
3313 );
3314 }
3315
3316 #[test]
3317 fn test_query_result_new_with_columns() {
3318 let result = QueryResult::new(vec!["name".into(), "age".into()]);
3319 assert_eq!(result.column_count(), 2);
3320 assert_eq!(result.row_count(), 0);
3321 assert!(result.is_empty());
3322 assert_eq!(
3324 result.column_types,
3325 vec![
3326 grafeo_common::types::LogicalType::Any,
3327 grafeo_common::types::LogicalType::Any
3328 ]
3329 );
3330 }
3331
3332 #[test]
3333 fn test_query_result_with_types() {
3334 use grafeo_common::types::LogicalType;
3335 let result = QueryResult::with_types(
3336 vec!["name".into(), "age".into()],
3337 vec![LogicalType::String, LogicalType::Int64],
3338 );
3339 assert_eq!(result.column_count(), 2);
3340 assert_eq!(result.column_types[0], LogicalType::String);
3341 assert_eq!(result.column_types[1], LogicalType::Int64);
3342 }
3343
3344 #[test]
3345 fn test_query_result_with_metrics() {
3346 let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3347 assert_eq!(result.execution_time_ms(), Some(42.5));
3348 assert_eq!(result.rows_scanned(), Some(100));
3349 }
3350
3351 #[test]
3352 fn test_query_result_scalar_success() {
3353 use grafeo_common::types::Value;
3354 let mut result = QueryResult::new(vec!["count".into()]);
3355 result.rows.push(vec![Value::Int64(42)]);
3356
3357 let val: i64 = result.scalar().unwrap();
3358 assert_eq!(val, 42);
3359 }
3360
3361 #[test]
3362 fn test_query_result_scalar_wrong_shape() {
3363 use grafeo_common::types::Value;
3364 let mut result = QueryResult::new(vec!["x".into()]);
3366 result.rows.push(vec![Value::Int64(1)]);
3367 result.rows.push(vec![Value::Int64(2)]);
3368 assert!(result.scalar::<i64>().is_err());
3369
3370 let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3372 result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3373 assert!(result2.scalar::<i64>().is_err());
3374
3375 let result3 = QueryResult::new(vec!["x".into()]);
3377 assert!(result3.scalar::<i64>().is_err());
3378 }
3379
3380 #[test]
3381 fn test_query_result_iter() {
3382 use grafeo_common::types::Value;
3383 let mut result = QueryResult::new(vec!["x".into()]);
3384 result.rows.push(vec![Value::Int64(1)]);
3385 result.rows.push(vec![Value::Int64(2)]);
3386
3387 let collected: Vec<_> = result.iter().collect();
3388 assert_eq!(collected.len(), 2);
3389 }
3390
3391 #[test]
3392 fn test_query_result_display() {
3393 use grafeo_common::types::Value;
3394 let mut result = QueryResult::new(vec!["name".into()]);
3395 result.rows.push(vec![Value::from("Alix")]);
3396 let display = result.to_string();
3397 assert!(display.contains("name"));
3398 assert!(display.contains("Alix"));
3399 }
3400
3401 #[test]
3406 fn test_from_value_i64_type_mismatch() {
3407 use grafeo_common::types::Value;
3408 let val = Value::from("not a number");
3409 assert!(i64::from_value(&val).is_err());
3410 }
3411
3412 #[test]
3413 fn test_from_value_f64_type_mismatch() {
3414 use grafeo_common::types::Value;
3415 let val = Value::from("not a float");
3416 assert!(f64::from_value(&val).is_err());
3417 }
3418
3419 #[test]
3420 fn test_from_value_string_type_mismatch() {
3421 use grafeo_common::types::Value;
3422 let val = Value::Int64(42);
3423 assert!(String::from_value(&val).is_err());
3424 }
3425
3426 #[test]
3427 fn test_from_value_bool_type_mismatch() {
3428 use grafeo_common::types::Value;
3429 let val = Value::Int64(1);
3430 assert!(bool::from_value(&val).is_err());
3431 }
3432
3433 #[test]
3434 fn test_from_value_all_success() {
3435 use grafeo_common::types::Value;
3436 assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3437 assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3438 assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3439 assert!(bool::from_value(&Value::Bool(true)).unwrap());
3440 }
3441
3442 #[test]
3447 fn test_database_is_read_only_false_by_default() {
3448 let db = GrafeoDB::new_in_memory();
3449 assert!(!db.is_read_only());
3450 }
3451
3452 #[test]
3453 fn test_database_graph_model() {
3454 let db = GrafeoDB::new_in_memory();
3455 assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3456 }
3457
3458 #[test]
3459 fn test_database_memory_limit_none_by_default() {
3460 let db = GrafeoDB::new_in_memory();
3461 assert!(db.memory_limit().is_none());
3462 }
3463
3464 #[test]
3465 fn test_database_memory_limit_custom() {
3466 let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3467 let db = GrafeoDB::with_config(config).unwrap();
3468 assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3469 }
3470
3471 #[test]
3472 fn test_database_adaptive_config() {
3473 let db = GrafeoDB::new_in_memory();
3474 let adaptive = db.adaptive_config();
3475 assert!(adaptive.enabled);
3476 assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3477 }
3478
3479 #[test]
3480 fn test_database_buffer_manager() {
3481 let db = GrafeoDB::new_in_memory();
3482 let _bm = db.buffer_manager();
3483 }
3485
3486 #[test]
3487 fn test_database_query_cache() {
3488 let db = GrafeoDB::new_in_memory();
3489 let _qc = db.query_cache();
3490 }
3491
3492 #[test]
3493 fn test_database_clear_plan_cache() {
3494 let db = GrafeoDB::new_in_memory();
3495 #[cfg(feature = "gql")]
3497 {
3498 let _ = db.execute("MATCH (n) RETURN count(n)");
3499 }
3500 db.clear_plan_cache();
3501 }
3503
3504 #[test]
3505 fn test_database_gc() {
3506 let db = GrafeoDB::new_in_memory();
3507 db.create_node(&["Person"]);
3508 db.gc();
3509 assert_eq!(db.node_count(), 1);
3511 }
3512
3513 #[test]
3518 fn test_create_and_list_graphs() {
3519 let db = GrafeoDB::new_in_memory();
3520 let created = db.create_graph("social").unwrap();
3521 assert!(created);
3522
3523 let created_again = db.create_graph("social").unwrap();
3525 assert!(!created_again);
3526
3527 let names = db.list_graphs();
3528 assert!(names.contains(&"social".to_string()));
3529 }
3530
3531 #[test]
3532 fn test_drop_graph() {
3533 let db = GrafeoDB::new_in_memory();
3534 db.create_graph("temp").unwrap();
3535 assert!(db.drop_graph("temp"));
3536 assert!(!db.drop_graph("temp")); }
3538
3539 #[test]
3540 fn test_drop_graph_resets_current_graph() {
3541 let db = GrafeoDB::new_in_memory();
3542 db.create_graph("active").unwrap();
3543 db.set_current_graph(Some("active")).unwrap();
3544 assert_eq!(db.current_graph(), Some("active".to_string()));
3545
3546 db.drop_graph("active");
3547 assert_eq!(db.current_graph(), None);
3548 }
3549
3550 #[test]
3555 fn test_current_graph_default_none() {
3556 let db = GrafeoDB::new_in_memory();
3557 assert_eq!(db.current_graph(), None);
3558 }
3559
3560 #[test]
3561 fn test_set_current_graph_valid() {
3562 let db = GrafeoDB::new_in_memory();
3563 db.create_graph("social").unwrap();
3564 db.set_current_graph(Some("social")).unwrap();
3565 assert_eq!(db.current_graph(), Some("social".to_string()));
3566 }
3567
3568 #[test]
3569 fn test_set_current_graph_nonexistent() {
3570 let db = GrafeoDB::new_in_memory();
3571 let result = db.set_current_graph(Some("nonexistent"));
3572 assert!(result.is_err());
3573 }
3574
3575 #[test]
3576 fn test_set_current_graph_none_resets() {
3577 let db = GrafeoDB::new_in_memory();
3578 db.create_graph("social").unwrap();
3579 db.set_current_graph(Some("social")).unwrap();
3580 db.set_current_graph(None).unwrap();
3581 assert_eq!(db.current_graph(), None);
3582 }
3583
3584 #[test]
3585 fn test_set_current_graph_default_keyword() {
3586 let db = GrafeoDB::new_in_memory();
3587 db.set_current_graph(Some("default")).unwrap();
3589 assert_eq!(db.current_graph(), Some("default".to_string()));
3590 }
3591
3592 #[test]
3593 fn test_current_schema_default_none() {
3594 let db = GrafeoDB::new_in_memory();
3595 assert_eq!(db.current_schema(), None);
3596 }
3597
3598 #[test]
3599 fn test_set_current_schema_nonexistent() {
3600 let db = GrafeoDB::new_in_memory();
3601 let result = db.set_current_schema(Some("nonexistent"));
3602 assert!(result.is_err());
3603 }
3604
3605 #[test]
3606 fn test_set_current_schema_none_resets() {
3607 let db = GrafeoDB::new_in_memory();
3608 db.set_current_schema(None).unwrap();
3609 assert_eq!(db.current_schema(), None);
3610 }
3611
3612 #[test]
3617 fn test_graph_store_returns_lpg_by_default() {
3618 let db = GrafeoDB::new_in_memory();
3619 db.create_node(&["Person"]);
3620 let store = db.graph_store();
3621 assert_eq!(store.node_count(), 1);
3622 }
3623
3624 #[test]
3625 fn test_graph_store_mut_returns_some_by_default() {
3626 let db = GrafeoDB::new_in_memory();
3627 assert!(db.graph_store_mut().is_some());
3628 }
3629
3630 #[test]
3631 fn test_with_read_store() {
3632 use grafeo_core::graph::lpg::LpgStore;
3633
3634 let store = Arc::new(LpgStore::new().unwrap());
3635 store.create_node(&["Person"]);
3636
3637 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3638 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3639
3640 assert!(db.is_read_only());
3641 assert!(db.graph_store_mut().is_none());
3642
3643 let gs = db.graph_store();
3645 assert_eq!(gs.node_count(), 1);
3646 }
3647
3648 #[test]
3649 fn test_with_store_graph_store_methods() {
3650 use grafeo_core::graph::lpg::LpgStore;
3651
3652 let store = Arc::new(LpgStore::new().unwrap());
3653 store.create_node(&["Person"]);
3654
3655 let db = GrafeoDB::with_store(
3656 Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3657 Config::in_memory(),
3658 )
3659 .unwrap();
3660
3661 assert!(!db.is_read_only());
3662 assert!(db.graph_store_mut().is_some());
3663 assert_eq!(db.graph_store().node_count(), 1);
3664 }
3665
3666 #[test]
3671 #[allow(deprecated)]
3672 fn test_session_read_only() {
3673 let db = GrafeoDB::new_in_memory();
3674 db.create_node(&["Person"]);
3675
3676 let session = db.session_read_only();
3677 #[cfg(feature = "gql")]
3679 {
3680 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3681 assert_eq!(result.rows.len(), 1);
3682 }
3683 }
3684
3685 #[test]
3690 fn test_close_in_memory_database() {
3691 let db = GrafeoDB::new_in_memory();
3692 db.create_node(&["Person"]);
3693 assert!(db.close().is_ok());
3694 assert!(db.close().is_ok());
3696 }
3697
3698 #[test]
3703 fn test_with_config_invalid_config_zero_threads() {
3704 let config = Config::in_memory().with_threads(0);
3705 let result = GrafeoDB::with_config(config);
3706 assert!(result.is_err());
3707 }
3708
3709 #[test]
3710 fn test_with_config_invalid_config_zero_memory_limit() {
3711 let config = Config::in_memory().with_memory_limit(0);
3712 let result = GrafeoDB::with_config(config);
3713 assert!(result.is_err());
3714 }
3715
3716 #[test]
3721 fn test_storage_format_display() {
3722 use crate::config::StorageFormat;
3723 assert_eq!(StorageFormat::Auto.to_string(), "auto");
3724 assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3725 assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3726 }
3727
3728 #[test]
3729 fn test_storage_format_default() {
3730 use crate::config::StorageFormat;
3731 assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3732 }
3733
3734 #[test]
3735 fn test_config_with_storage_format() {
3736 use crate::config::StorageFormat;
3737 let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3738 assert_eq!(config.storage_format, StorageFormat::SingleFile);
3739 }
3740
3741 #[test]
3746 fn test_config_with_cdc() {
3747 let config = Config::in_memory().with_cdc();
3748 assert!(config.cdc_enabled);
3749 }
3750
3751 #[test]
3752 fn test_config_cdc_default_false() {
3753 let config = Config::in_memory();
3754 assert!(!config.cdc_enabled);
3755 }
3756
3757 #[test]
3762 fn test_config_error_is_error_trait() {
3763 use crate::config::ConfigError;
3764 let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3765 assert!(err.source().is_none());
3766 }
3767
3768 #[cfg(feature = "metrics")]
3773 #[test]
3774 fn test_metrics_prometheus_output() {
3775 let db = GrafeoDB::new_in_memory();
3776 let prom = db.metrics_prometheus();
3777 assert!(!prom.is_empty());
3779 }
3780
3781 #[cfg(feature = "metrics")]
3782 #[test]
3783 fn test_reset_metrics() {
3784 let db = GrafeoDB::new_in_memory();
3785 let _session = db.session();
3787 db.reset_metrics();
3788 let snap = db.metrics();
3789 assert_eq!(snap.query_count, 0);
3790 }
3791
3792 #[test]
3797 fn test_drop_graph_on_external_store() {
3798 use grafeo_core::graph::lpg::LpgStore;
3799
3800 let store = Arc::new(LpgStore::new().unwrap());
3801 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3802 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3803
3804 assert!(!db.drop_graph("anything"));
3806 }
3807}