1#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(feature = "lpg")]
31mod crud;
32#[cfg(feature = "embed")]
33mod embed;
34#[cfg(feature = "grafeo-file")]
35pub(crate) mod flush;
36#[cfg(feature = "lpg")]
37mod import;
38#[cfg(feature = "lpg")]
39mod index;
40#[cfg(feature = "lpg")]
41mod persistence;
42mod query;
43#[cfg(feature = "triple-store")]
44mod rdf_ops;
45#[cfg(feature = "lpg")]
46mod search;
47pub(crate) mod section_consumer;
48#[cfg(all(feature = "wal", feature = "lpg"))]
49pub(crate) mod wal_store;
50
51use grafeo_common::{grafeo_error, grafeo_warn};
52#[cfg(feature = "wal")]
53use std::path::Path;
54use std::sync::Arc;
55use std::sync::atomic::AtomicUsize;
56
57use parking_lot::RwLock;
58
59use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
60use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
61#[cfg(feature = "lpg")]
62use grafeo_core::graph::lpg::LpgStore;
63#[cfg(feature = "triple-store")]
64use grafeo_core::graph::rdf::RdfStore;
65use grafeo_core::graph::{GraphStore, GraphStoreMut};
66#[cfg(feature = "grafeo-file")]
67use grafeo_storage::file::GrafeoFileManager;
68#[cfg(all(feature = "wal", feature = "lpg"))]
69use grafeo_storage::wal::WalRecovery;
70#[cfg(feature = "wal")]
71use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
72
73use crate::catalog::Catalog;
74use crate::config::Config;
75use crate::query::cache::QueryCache;
76use crate::session::Session;
77use crate::transaction::TransactionManager;
78
79pub struct GrafeoDB {
102 pub(super) config: Config,
104 #[cfg(feature = "lpg")]
106 pub(super) store: Option<Arc<LpgStore>>,
107 pub(super) catalog: Arc<Catalog>,
109 #[cfg(feature = "triple-store")]
111 pub(super) rdf_store: Arc<RdfStore>,
112 pub(super) transaction_manager: Arc<TransactionManager>,
114 pub(super) buffer_manager: Arc<BufferManager>,
116 #[cfg(feature = "wal")]
118 pub(super) wal: Option<Arc<LpgWal>>,
119 #[cfg(feature = "wal")]
123 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
124 pub(super) query_cache: Arc<QueryCache>,
126 pub(super) commit_counter: Arc<AtomicUsize>,
128 pub(super) is_open: RwLock<bool>,
130 #[cfg(feature = "cdc")]
132 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
133 #[cfg(feature = "cdc")]
135 cdc_enabled: std::sync::atomic::AtomicBool,
136 #[cfg(feature = "embed")]
138 pub(super) embedding_models:
139 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
140 #[cfg(feature = "grafeo-file")]
142 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
143 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
146 checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
147 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
150 vector_spill_storages: Option<
151 Arc<
152 parking_lot::RwLock<
153 std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
154 >,
155 >,
156 >,
157 pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
160 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
163 #[cfg(feature = "metrics")]
165 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
166 current_graph: RwLock<Option<String>>,
170 current_schema: RwLock<Option<String>>,
174 read_only: bool,
177 projections:
179 Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
180}
181
182impl GrafeoDB {
183 #[cfg(feature = "lpg")]
191 fn lpg_store(&self) -> &Arc<LpgStore> {
192 self.store.as_ref().expect(
193 "no built-in LpgStore: this GrafeoDB was created with an external store \
194 (with_store / with_read_store). Use session() or graph_store() instead.",
195 )
196 }
197
198 #[cfg(feature = "cdc")]
200 #[inline]
201 pub(super) fn cdc_active(&self) -> bool {
202 self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
203 }
204
205 #[must_use]
226 pub fn new_in_memory() -> Self {
227 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
228 }
229
230 #[cfg(feature = "wal")]
249 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
250 Self::with_config(Config::persistent(path.as_ref()))
251 }
252
253 #[cfg(feature = "grafeo-file")]
278 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
279 Self::with_config(Config::read_only(path.as_ref()))
280 }
281
282 pub fn with_config(config: Config) -> Result<Self> {
306 config
308 .validate()
309 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
310
311 #[cfg(feature = "lpg")]
312 let store = Arc::new(LpgStore::new()?);
313 #[cfg(feature = "triple-store")]
314 let rdf_store = Arc::new(RdfStore::new());
315 let transaction_manager = Arc::new(TransactionManager::new());
316
317 let buffer_config = BufferManagerConfig {
319 budget: config.memory_limit.unwrap_or_else(|| {
320 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
321 }),
322 spill_path: config.spill_path.clone().or_else(|| {
323 config.path.as_ref().and_then(|p| {
324 let parent = p.parent()?;
325 let name = p.file_name()?.to_str()?;
326 Some(parent.join(format!("{name}.spill")))
327 })
328 }),
329 ..BufferManagerConfig::default()
330 };
331 let buffer_manager = BufferManager::new(buffer_config);
332
333 let catalog = Arc::new(Catalog::new());
335
336 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
337
338 #[cfg(feature = "grafeo-file")]
340 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
341 if let Some(ref db_path) = config.path {
343 if db_path.exists() && db_path.is_file() {
344 let fm = GrafeoFileManager::open_read_only(db_path)?;
345 #[cfg(feature = "lpg")]
347 if fm.read_section_directory()?.is_some() {
348 Self::load_from_sections(
349 &fm,
350 &store,
351 &catalog,
352 #[cfg(feature = "triple-store")]
353 &rdf_store,
354 )?;
355 } else {
356 let snapshot_data = fm.read_snapshot()?;
358 if !snapshot_data.is_empty() {
359 Self::apply_snapshot_data(
360 &store,
361 &catalog,
362 #[cfg(feature = "triple-store")]
363 &rdf_store,
364 &snapshot_data,
365 )?;
366 }
367 }
368 Some(Arc::new(fm))
369 } else {
370 return Err(grafeo_common::utils::error::Error::Internal(format!(
371 "read-only open requires an existing .grafeo file: {}",
372 db_path.display()
373 )));
374 }
375 } else {
376 return Err(grafeo_common::utils::error::Error::Internal(
377 "read-only mode requires a database path".to_string(),
378 ));
379 }
380 } else if let Some(ref db_path) = config.path {
381 if Self::should_use_single_file(db_path, config.storage_format) {
386 let fm = if db_path.exists() && db_path.is_file() {
387 GrafeoFileManager::open(db_path)?
388 } else if !db_path.exists() {
389 GrafeoFileManager::create(db_path)?
390 } else {
391 return Err(grafeo_common::utils::error::Error::Internal(format!(
393 "path exists but is not a file: {}",
394 db_path.display()
395 )));
396 };
397
398 #[cfg(feature = "lpg")]
400 if fm.read_section_directory()?.is_some() {
401 Self::load_from_sections(
402 &fm,
403 &store,
404 &catalog,
405 #[cfg(feature = "triple-store")]
406 &rdf_store,
407 )?;
408 } else {
409 let snapshot_data = fm.read_snapshot()?;
410 if !snapshot_data.is_empty() {
411 Self::apply_snapshot_data(
412 &store,
413 &catalog,
414 #[cfg(feature = "triple-store")]
415 &rdf_store,
416 &snapshot_data,
417 )?;
418 }
419 }
420
421 #[cfg(all(feature = "wal", feature = "lpg"))]
423 if config.wal_enabled && fm.has_sidecar_wal() {
424 let recovery = WalRecovery::new(fm.sidecar_wal_path());
425 let records = recovery.recover()?;
426 Self::apply_wal_records(
427 &store,
428 &catalog,
429 #[cfg(feature = "triple-store")]
430 &rdf_store,
431 &records,
432 )?;
433 }
434
435 Some(Arc::new(fm))
436 } else {
437 None
438 }
439 } else {
440 None
441 };
442
443 #[cfg(feature = "wal")]
446 let wal = if is_read_only {
447 None
448 } else if config.wal_enabled {
449 if let Some(ref db_path) = config.path {
450 #[cfg(feature = "grafeo-file")]
452 let wal_path = if let Some(ref fm) = file_manager {
453 let p = fm.sidecar_wal_path();
454 std::fs::create_dir_all(&p)?;
455 p
456 } else {
457 std::fs::create_dir_all(db_path)?;
459 db_path.join("wal")
460 };
461
462 #[cfg(not(feature = "grafeo-file"))]
463 let wal_path = {
464 std::fs::create_dir_all(db_path)?;
465 db_path.join("wal")
466 };
467
468 #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
470 let is_single_file = file_manager.is_some();
471 #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
472 let is_single_file = false;
473
474 #[cfg(feature = "lpg")]
475 if !is_single_file && wal_path.exists() {
476 let recovery = WalRecovery::new(&wal_path);
477 let records = recovery.recover()?;
478 Self::apply_wal_records(
479 &store,
480 &catalog,
481 #[cfg(feature = "triple-store")]
482 &rdf_store,
483 &records,
484 )?;
485 }
486
487 let wal_durability = match config.wal_durability {
489 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
490 crate::config::DurabilityMode::Batch {
491 max_delay_ms,
492 max_records,
493 } => WalDurabilityMode::Batch {
494 max_delay_ms,
495 max_records,
496 },
497 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
498 WalDurabilityMode::Adaptive { target_interval_ms }
499 }
500 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
501 };
502 let wal_config = WalConfig {
503 durability: wal_durability,
504 ..WalConfig::default()
505 };
506 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
507 Some(Arc::new(wal_manager))
508 } else {
509 None
510 }
511 } else {
512 None
513 };
514
515 let query_cache = Arc::new(QueryCache::default());
517
518 #[cfg(all(feature = "temporal", feature = "lpg"))]
521 transaction_manager.sync_epoch(store.current_epoch());
522
523 #[cfg(feature = "cdc")]
524 let cdc_enabled_val = config.cdc_enabled;
525 #[cfg(feature = "cdc")]
526 let cdc_retention = config.cdc_retention.clone();
527
528 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
531 let checkpoint_interval = config.checkpoint_interval;
532 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
533 let timer_store = Arc::clone(&store);
534 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
535 let timer_catalog = Arc::clone(&catalog);
536 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
537 let timer_tm = Arc::clone(&transaction_manager);
538 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
539 let timer_rdf = Arc::clone(&rdf_store);
540 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
541 let timer_wal = wal.clone();
542
543 let mut db = Self {
544 config,
545 #[cfg(feature = "lpg")]
546 store: Some(store),
547 catalog,
548 #[cfg(feature = "triple-store")]
549 rdf_store,
550 transaction_manager,
551 buffer_manager,
552 #[cfg(feature = "wal")]
553 wal,
554 #[cfg(feature = "wal")]
555 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
556 query_cache,
557 commit_counter: Arc::new(AtomicUsize::new(0)),
558 is_open: RwLock::new(true),
559 #[cfg(feature = "cdc")]
560 cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
561 #[cfg(feature = "cdc")]
562 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
563 #[cfg(feature = "embed")]
564 embedding_models: RwLock::new(hashbrown::HashMap::new()),
565 #[cfg(feature = "grafeo-file")]
566 file_manager,
567 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
568 checkpoint_timer: parking_lot::Mutex::new(None),
569 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
570 vector_spill_storages: None,
571 external_read_store: None,
572 external_write_store: None,
573 #[cfg(feature = "metrics")]
574 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
575 current_graph: RwLock::new(None),
576 current_schema: RwLock::new(None),
577 read_only: is_read_only,
578 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
579 };
580
581 db.register_section_consumers();
583
584 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
586 if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
587 && !is_read_only
588 {
589 *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
590 interval,
591 Arc::clone(fm),
592 timer_store,
593 timer_catalog,
594 timer_tm,
595 #[cfg(feature = "triple-store")]
596 timer_rdf,
597 #[cfg(feature = "wal")]
598 timer_wal,
599 ));
600 }
601
602 #[cfg(all(
606 feature = "lpg",
607 feature = "vector-index",
608 feature = "mmap",
609 not(feature = "temporal")
610 ))]
611 db.restore_spill_files();
612
613 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
616 if db
617 .config
618 .section_configs
619 .get(&grafeo_common::storage::SectionType::VectorStore)
620 .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
621 {
622 db.buffer_manager.spill_all();
623 }
624
625 Ok(db)
626 }
627
628 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
657 config
658 .validate()
659 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
660
661 let transaction_manager = Arc::new(TransactionManager::new());
662
663 let buffer_config = BufferManagerConfig {
664 budget: config.memory_limit.unwrap_or_else(|| {
665 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
666 }),
667 spill_path: None,
668 ..BufferManagerConfig::default()
669 };
670 let buffer_manager = BufferManager::new(buffer_config);
671
672 let query_cache = Arc::new(QueryCache::default());
673
674 #[cfg(feature = "cdc")]
675 let cdc_enabled_val = config.cdc_enabled;
676
677 Ok(Self {
678 config,
679 #[cfg(feature = "lpg")]
680 store: None,
681 catalog: Arc::new(Catalog::new()),
682 #[cfg(feature = "triple-store")]
683 rdf_store: Arc::new(RdfStore::new()),
684 transaction_manager,
685 buffer_manager,
686 #[cfg(feature = "wal")]
687 wal: None,
688 #[cfg(feature = "wal")]
689 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
690 query_cache,
691 commit_counter: Arc::new(AtomicUsize::new(0)),
692 is_open: RwLock::new(true),
693 #[cfg(feature = "cdc")]
694 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
695 #[cfg(feature = "cdc")]
696 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
697 #[cfg(feature = "embed")]
698 embedding_models: RwLock::new(hashbrown::HashMap::new()),
699 #[cfg(feature = "grafeo-file")]
700 file_manager: None,
701 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
702 checkpoint_timer: parking_lot::Mutex::new(None),
703 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
704 vector_spill_storages: None,
705 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
706 external_write_store: Some(store),
707 #[cfg(feature = "metrics")]
708 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
709 current_graph: RwLock::new(None),
710 current_schema: RwLock::new(None),
711 read_only: false,
712 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
713 })
714 }
715
716 pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
741 config
742 .validate()
743 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
744
745 let transaction_manager = Arc::new(TransactionManager::new());
746
747 let buffer_config = BufferManagerConfig {
748 budget: config.memory_limit.unwrap_or_else(|| {
749 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
750 }),
751 spill_path: None,
752 ..BufferManagerConfig::default()
753 };
754 let buffer_manager = BufferManager::new(buffer_config);
755
756 let query_cache = Arc::new(QueryCache::default());
757
758 #[cfg(feature = "cdc")]
759 let cdc_enabled_val = config.cdc_enabled;
760
761 Ok(Self {
762 config,
763 #[cfg(feature = "lpg")]
764 store: None,
765 catalog: Arc::new(Catalog::new()),
766 #[cfg(feature = "triple-store")]
767 rdf_store: Arc::new(RdfStore::new()),
768 transaction_manager,
769 buffer_manager,
770 #[cfg(feature = "wal")]
771 wal: None,
772 #[cfg(feature = "wal")]
773 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
774 query_cache,
775 commit_counter: Arc::new(AtomicUsize::new(0)),
776 is_open: RwLock::new(true),
777 #[cfg(feature = "cdc")]
778 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
779 #[cfg(feature = "cdc")]
780 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
781 #[cfg(feature = "embed")]
782 embedding_models: RwLock::new(hashbrown::HashMap::new()),
783 #[cfg(feature = "grafeo-file")]
784 file_manager: None,
785 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
786 checkpoint_timer: parking_lot::Mutex::new(None),
787 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
788 vector_spill_storages: None,
789 external_read_store: Some(store),
790 external_write_store: None,
791 #[cfg(feature = "metrics")]
792 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
793 current_graph: RwLock::new(None),
794 current_schema: RwLock::new(None),
795 read_only: true,
796 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
797 })
798 }
799
800 #[cfg(feature = "compact-store")]
818 pub fn compact(&mut self) -> Result<()> {
819 use grafeo_core::graph::compact::from_graph_store;
820
821 let current_store = self.graph_store();
822 let compact = from_graph_store(current_store.as_ref())
823 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
824
825 self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
826 self.external_write_store = None;
827 #[cfg(feature = "lpg")]
828 {
829 self.store = None;
830 }
831 self.read_only = true;
832 self.query_cache = Arc::new(QueryCache::default());
833 self.projections.write().clear();
836
837 Ok(())
838 }
839
840 #[cfg(all(feature = "wal", feature = "lpg"))]
846 fn apply_wal_records(
847 store: &Arc<LpgStore>,
848 catalog: &Catalog,
849 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
850 records: &[WalRecord],
851 ) -> Result<()> {
852 use crate::catalog::{
853 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
854 };
855 use grafeo_common::utils::error::Error;
856
857 let mut current_graph: Option<String> = None;
860 let mut target_store: Arc<LpgStore> = Arc::clone(store);
861
862 for record in records {
863 match record {
864 WalRecord::CreateNamedGraph { name } => {
866 let _ = store.create_graph(name);
867 }
868 WalRecord::DropNamedGraph { name } => {
869 store.drop_graph(name);
870 if current_graph.as_deref() == Some(name.as_str()) {
872 current_graph = None;
873 target_store = Arc::clone(store);
874 }
875 }
876 WalRecord::SwitchGraph { name } => {
877 current_graph.clone_from(name);
878 target_store = match ¤t_graph {
879 None => Arc::clone(store),
880 Some(graph_name) => store
881 .graph_or_create(graph_name)
882 .map_err(|e| Error::Internal(e.to_string()))?,
883 };
884 }
885
886 WalRecord::CreateNode { id, labels } => {
888 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
889 target_store.create_node_with_id(*id, &label_refs)?;
890 }
891 WalRecord::DeleteNode { id } => {
892 target_store.delete_node(*id);
893 }
894 WalRecord::CreateEdge {
895 id,
896 src,
897 dst,
898 edge_type,
899 } => {
900 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
901 }
902 WalRecord::DeleteEdge { id } => {
903 target_store.delete_edge(*id);
904 }
905 WalRecord::SetNodeProperty { id, key, value } => {
906 target_store.set_node_property(*id, key, value.clone());
907 }
908 WalRecord::SetEdgeProperty { id, key, value } => {
909 target_store.set_edge_property(*id, key, value.clone());
910 }
911 WalRecord::AddNodeLabel { id, label } => {
912 target_store.add_label(*id, label);
913 }
914 WalRecord::RemoveNodeLabel { id, label } => {
915 target_store.remove_label(*id, label);
916 }
917 WalRecord::RemoveNodeProperty { id, key } => {
918 target_store.remove_node_property(*id, key);
919 }
920 WalRecord::RemoveEdgeProperty { id, key } => {
921 target_store.remove_edge_property(*id, key);
922 }
923
924 WalRecord::CreateNodeType {
926 name,
927 properties,
928 constraints,
929 } => {
930 let def = NodeTypeDefinition {
931 name: name.clone(),
932 properties: properties
933 .iter()
934 .map(|(n, t, nullable)| TypedProperty {
935 name: n.clone(),
936 data_type: PropertyDataType::from_type_name(t),
937 nullable: *nullable,
938 default_value: None,
939 })
940 .collect(),
941 constraints: constraints
942 .iter()
943 .map(|(kind, props)| match kind.as_str() {
944 "unique" => TypeConstraint::Unique(props.clone()),
945 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
946 "not_null" if !props.is_empty() => {
947 TypeConstraint::NotNull(props[0].clone())
948 }
949 _ => TypeConstraint::Unique(props.clone()),
950 })
951 .collect(),
952 parent_types: Vec::new(),
953 };
954 let _ = catalog.register_node_type(def);
955 }
956 WalRecord::DropNodeType { name } => {
957 let _ = catalog.drop_node_type(name);
958 }
959 WalRecord::CreateEdgeType {
960 name,
961 properties,
962 constraints,
963 } => {
964 let def = EdgeTypeDefinition {
965 name: name.clone(),
966 properties: properties
967 .iter()
968 .map(|(n, t, nullable)| TypedProperty {
969 name: n.clone(),
970 data_type: PropertyDataType::from_type_name(t),
971 nullable: *nullable,
972 default_value: None,
973 })
974 .collect(),
975 constraints: constraints
976 .iter()
977 .map(|(kind, props)| match kind.as_str() {
978 "unique" => TypeConstraint::Unique(props.clone()),
979 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
980 "not_null" if !props.is_empty() => {
981 TypeConstraint::NotNull(props[0].clone())
982 }
983 _ => TypeConstraint::Unique(props.clone()),
984 })
985 .collect(),
986 source_node_types: Vec::new(),
987 target_node_types: Vec::new(),
988 };
989 let _ = catalog.register_edge_type_def(def);
990 }
991 WalRecord::DropEdgeType { name } => {
992 let _ = catalog.drop_edge_type_def(name);
993 }
994 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
995 }
998 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
999 }
1002 WalRecord::CreateGraphType {
1003 name,
1004 node_types,
1005 edge_types,
1006 open,
1007 } => {
1008 use crate::catalog::GraphTypeDefinition;
1009 let def = GraphTypeDefinition {
1010 name: name.clone(),
1011 allowed_node_types: node_types.clone(),
1012 allowed_edge_types: edge_types.clone(),
1013 open: *open,
1014 };
1015 let _ = catalog.register_graph_type(def);
1016 }
1017 WalRecord::DropGraphType { name } => {
1018 let _ = catalog.drop_graph_type(name);
1019 }
1020 WalRecord::CreateSchema { name } => {
1021 let _ = catalog.register_schema_namespace(name.clone());
1022 }
1023 WalRecord::DropSchema { name } => {
1024 let _ = catalog.drop_schema_namespace(name);
1025 }
1026
1027 WalRecord::AlterNodeType { name, alterations } => {
1028 for (action, prop_name, type_name, nullable) in alterations {
1029 match action.as_str() {
1030 "add" => {
1031 let prop = TypedProperty {
1032 name: prop_name.clone(),
1033 data_type: PropertyDataType::from_type_name(type_name),
1034 nullable: *nullable,
1035 default_value: None,
1036 };
1037 let _ = catalog.alter_node_type_add_property(name, prop);
1038 }
1039 "drop" => {
1040 let _ = catalog.alter_node_type_drop_property(name, prop_name);
1041 }
1042 _ => {}
1043 }
1044 }
1045 }
1046 WalRecord::AlterEdgeType { name, alterations } => {
1047 for (action, prop_name, type_name, nullable) in alterations {
1048 match action.as_str() {
1049 "add" => {
1050 let prop = TypedProperty {
1051 name: prop_name.clone(),
1052 data_type: PropertyDataType::from_type_name(type_name),
1053 nullable: *nullable,
1054 default_value: None,
1055 };
1056 let _ = catalog.alter_edge_type_add_property(name, prop);
1057 }
1058 "drop" => {
1059 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1060 }
1061 _ => {}
1062 }
1063 }
1064 }
1065 WalRecord::AlterGraphType { name, alterations } => {
1066 for (action, type_name) in alterations {
1067 match action.as_str() {
1068 "add_node" => {
1069 let _ =
1070 catalog.alter_graph_type_add_node_type(name, type_name.clone());
1071 }
1072 "drop_node" => {
1073 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1074 }
1075 "add_edge" => {
1076 let _ =
1077 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1078 }
1079 "drop_edge" => {
1080 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1081 }
1082 _ => {}
1083 }
1084 }
1085 }
1086
1087 WalRecord::CreateProcedure {
1088 name,
1089 params,
1090 returns,
1091 body,
1092 } => {
1093 use crate::catalog::ProcedureDefinition;
1094 let def = ProcedureDefinition {
1095 name: name.clone(),
1096 params: params.clone(),
1097 returns: returns.clone(),
1098 body: body.clone(),
1099 };
1100 let _ = catalog.register_procedure(def);
1101 }
1102 WalRecord::DropProcedure { name } => {
1103 let _ = catalog.drop_procedure(name);
1104 }
1105
1106 #[cfg(feature = "triple-store")]
1108 WalRecord::InsertRdfTriple { .. }
1109 | WalRecord::DeleteRdfTriple { .. }
1110 | WalRecord::ClearRdfGraph { .. }
1111 | WalRecord::CreateRdfGraph { .. }
1112 | WalRecord::DropRdfGraph { .. } => {
1113 rdf_ops::replay_rdf_wal_record(rdf_store, record);
1114 }
1115 #[cfg(not(feature = "triple-store"))]
1116 WalRecord::InsertRdfTriple { .. }
1117 | WalRecord::DeleteRdfTriple { .. }
1118 | WalRecord::ClearRdfGraph { .. }
1119 | WalRecord::CreateRdfGraph { .. }
1120 | WalRecord::DropRdfGraph { .. } => {}
1121
1122 WalRecord::TransactionCommit { .. } => {
1123 #[cfg(feature = "temporal")]
1127 {
1128 target_store.new_epoch();
1129 }
1130 }
1131 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1132 }
1135 WalRecord::EpochAdvance { .. } => {
1136 }
1139 }
1140 }
1141 Ok(())
1142 }
1143
1144 #[cfg(feature = "grafeo-file")]
1150 fn should_use_single_file(
1151 path: &std::path::Path,
1152 configured: crate::config::StorageFormat,
1153 ) -> bool {
1154 use crate::config::StorageFormat;
1155 match configured {
1156 StorageFormat::SingleFile => true,
1157 StorageFormat::WalDirectory => false,
1158 StorageFormat::Auto => {
1159 if path.is_file() {
1161 if let Ok(mut f) = std::fs::File::open(path) {
1162 use std::io::Read;
1163 let mut magic = [0u8; 4];
1164 if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1165 {
1166 return true;
1167 }
1168 }
1169 return false;
1170 }
1171 if path.is_dir() {
1173 return false;
1174 }
1175 path.extension().is_some_and(|ext| ext == "grafeo")
1177 }
1178 }
1179 }
1180
1181 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1185 fn apply_snapshot_data(
1186 store: &Arc<LpgStore>,
1187 catalog: &Arc<crate::catalog::Catalog>,
1188 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1189 data: &[u8],
1190 ) -> Result<()> {
1191 persistence::load_snapshot_into_store(
1193 store,
1194 catalog,
1195 #[cfg(feature = "triple-store")]
1196 rdf_store,
1197 data,
1198 )
1199 }
1200
1201 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1205 fn load_from_sections(
1206 fm: &GrafeoFileManager,
1207 store: &Arc<LpgStore>,
1208 catalog: &Arc<crate::catalog::Catalog>,
1209 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1210 ) -> Result<()> {
1211 use grafeo_common::storage::{Section, SectionType};
1212
1213 let dir = fm.read_section_directory()?.ok_or_else(|| {
1214 grafeo_common::utils::error::Error::Internal(
1215 "expected v2 section directory but found none".to_string(),
1216 )
1217 })?;
1218
1219 if let Some(entry) = dir.find(SectionType::Catalog) {
1221 let data = fm.read_section_data(entry)?;
1222 let tm = Arc::new(crate::transaction::TransactionManager::new());
1223 let mut section = catalog_section::CatalogSection::new(
1224 Arc::clone(catalog),
1225 Arc::clone(store),
1226 move || tm.current_epoch().as_u64(),
1227 );
1228 section.deserialize(&data)?;
1229 }
1230
1231 if let Some(entry) = dir.find(SectionType::LpgStore) {
1233 let data = fm.read_section_data(entry)?;
1234 let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1235 section.deserialize(&data)?;
1236 }
1237
1238 #[cfg(feature = "triple-store")]
1240 if let Some(entry) = dir.find(SectionType::RdfStore) {
1241 let data = fm.read_section_data(entry)?;
1242 let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1243 section.deserialize(&data)?;
1244 }
1245
1246 #[cfg(feature = "ring-index")]
1248 if let Some(entry) = dir.find(SectionType::RdfRing) {
1249 let data = fm.read_section_data(entry)?;
1250 let mut section = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(rdf_store));
1251 section.deserialize(&data)?;
1252 }
1253
1254 #[cfg(feature = "vector-index")]
1256 if let Some(entry) = dir.find(SectionType::VectorStore) {
1257 let data = fm.read_section_data(entry)?;
1258 let indexes = store.vector_index_entries();
1259 if !indexes.is_empty() {
1260 let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1261 section.deserialize(&data)?;
1262 }
1263 }
1264
1265 #[cfg(feature = "text-index")]
1267 if let Some(entry) = dir.find(SectionType::TextIndex) {
1268 let data = fm.read_section_data(entry)?;
1269 let indexes = store.text_index_entries();
1270 if !indexes.is_empty() {
1271 let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1272 section.deserialize(&data)?;
1273 }
1274 }
1275
1276 Ok(())
1277 }
1278
1279 #[must_use]
1307 pub fn session(&self) -> Session {
1308 self.create_session_inner(None)
1309 }
1310
1311 #[must_use]
1329 pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1330 let force_read_only = !identity.can_write();
1331 self.create_session_inner_full(None, force_read_only, identity)
1332 }
1333
1334 #[must_use]
1348 pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1349 self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1350 }
1351
1352 #[cfg(feature = "cdc")]
1371 #[must_use]
1372 pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1373 self.create_session_inner(Some(cdc_enabled))
1374 }
1375
1376 #[deprecated(
1385 since = "0.5.36",
1386 note = "use session_with_role(Role::ReadOnly) instead"
1387 )]
1388 #[must_use]
1389 pub fn session_read_only(&self) -> Session {
1390 self.session_with_role(crate::auth::Role::ReadOnly)
1391 }
1392
1393 #[allow(unused_variables)] fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1399 self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1400 }
1401
1402 #[allow(unused_variables)]
1404 fn create_session_inner_full(
1405 &self,
1406 cdc_override: Option<bool>,
1407 force_read_only: bool,
1408 identity: crate::auth::Identity,
1409 ) -> Session {
1410 let session_cfg = || crate::session::SessionConfig {
1411 transaction_manager: Arc::clone(&self.transaction_manager),
1412 query_cache: Arc::clone(&self.query_cache),
1413 catalog: Arc::clone(&self.catalog),
1414 adaptive_config: self.config.adaptive.clone(),
1415 factorized_execution: self.config.factorized_execution,
1416 graph_model: self.config.graph_model,
1417 query_timeout: self.config.query_timeout,
1418 commit_counter: Arc::clone(&self.commit_counter),
1419 gc_interval: self.config.gc_interval,
1420 read_only: self.read_only || force_read_only,
1421 identity: identity.clone(),
1422 #[cfg(feature = "lpg")]
1423 projections: Arc::clone(&self.projections),
1424 };
1425
1426 if let Some(ref ext_read) = self.external_read_store {
1427 return Session::with_external_store(
1428 Arc::clone(ext_read),
1429 self.external_write_store.as_ref().map(Arc::clone),
1430 session_cfg(),
1431 )
1432 .expect("arena allocation for external store session");
1433 }
1434
1435 #[cfg(all(feature = "lpg", feature = "triple-store"))]
1436 let mut session = Session::with_rdf_store_and_adaptive(
1437 Arc::clone(self.lpg_store()),
1438 Arc::clone(&self.rdf_store),
1439 session_cfg(),
1440 );
1441 #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1442 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1443 #[cfg(not(feature = "lpg"))]
1444 let mut session =
1445 Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1446 .expect("session creation for non-lpg build");
1447
1448 #[cfg(all(feature = "wal", feature = "lpg"))]
1449 if let Some(ref wal) = self.wal {
1450 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1451 }
1452
1453 #[cfg(feature = "cdc")]
1454 {
1455 let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1456 if should_enable {
1457 session.set_cdc_log(Arc::clone(&self.cdc_log));
1458 }
1459 }
1460
1461 #[cfg(feature = "metrics")]
1462 {
1463 if let Some(ref m) = self.metrics {
1464 session.set_metrics(Arc::clone(m));
1465 m.session_created
1466 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1467 m.session_active
1468 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1469 }
1470 }
1471
1472 if let Some(ref graph) = *self.current_graph.read() {
1474 session.use_graph(graph);
1475 }
1476
1477 if let Some(ref schema) = *self.current_schema.read() {
1479 session.set_schema(schema);
1480 }
1481
1482 let _ = &mut session;
1484
1485 session
1486 }
1487
1488 #[must_use]
1494 pub fn current_graph(&self) -> Option<String> {
1495 self.current_graph.read().clone()
1496 }
1497
1498 pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1507 #[cfg(feature = "lpg")]
1508 if let Some(name) = name
1509 && !name.eq_ignore_ascii_case("default")
1510 && let Some(store) = &self.store
1511 && store.graph(name).is_none()
1512 {
1513 return Err(Error::Query(QueryError::new(
1514 QueryErrorKind::Semantic,
1515 format!("Graph '{name}' does not exist"),
1516 )));
1517 }
1518 *self.current_graph.write() = name.map(ToString::to_string);
1519 Ok(())
1520 }
1521
1522 #[must_use]
1527 pub fn current_schema(&self) -> Option<String> {
1528 self.current_schema.read().clone()
1529 }
1530
1531 pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1540 if let Some(name) = name
1541 && !self.catalog.schema_exists(name)
1542 {
1543 return Err(Error::Query(QueryError::new(
1544 QueryErrorKind::Semantic,
1545 format!("Schema '{name}' does not exist"),
1546 )));
1547 }
1548 *self.current_schema.write() = name.map(ToString::to_string);
1549 Ok(())
1550 }
1551
1552 #[must_use]
1554 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1555 &self.config.adaptive
1556 }
1557
1558 #[must_use]
1560 pub fn is_read_only(&self) -> bool {
1561 self.read_only
1562 }
1563
1564 #[must_use]
1566 pub fn config(&self) -> &Config {
1567 &self.config
1568 }
1569
1570 #[must_use]
1572 pub fn graph_model(&self) -> crate::config::GraphModel {
1573 self.config.graph_model
1574 }
1575
1576 #[must_use]
1578 pub fn memory_limit(&self) -> Option<usize> {
1579 self.config.memory_limit
1580 }
1581
1582 #[cfg(feature = "metrics")]
1587 #[must_use]
1588 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1589 let mut snapshot = self
1590 .metrics
1591 .as_ref()
1592 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1593
1594 let cache_stats = self.query_cache.stats();
1596 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1597 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1598 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1599 snapshot.cache_invalidations = cache_stats.invalidations;
1600
1601 snapshot
1602 }
1603
1604 #[cfg(feature = "metrics")]
1608 #[must_use]
1609 pub fn metrics_prometheus(&self) -> String {
1610 self.metrics
1611 .as_ref()
1612 .map_or_else(String::new, |m| m.to_prometheus())
1613 }
1614
1615 #[cfg(feature = "metrics")]
1617 pub fn reset_metrics(&self) {
1618 if let Some(ref m) = self.metrics {
1619 m.reset();
1620 }
1621 self.query_cache.reset_stats();
1622 }
1623
1624 #[cfg(feature = "lpg")]
1632 #[must_use]
1633 pub fn store(&self) -> &Arc<LpgStore> {
1634 self.lpg_store()
1635 }
1636
1637 #[cfg(feature = "lpg")]
1645 pub fn create_graph(&self, name: &str) -> Result<bool> {
1646 Ok(self.lpg_store().create_graph(name)?)
1647 }
1648
1649 #[cfg(feature = "lpg")]
1654 pub fn drop_graph(&self, name: &str) -> bool {
1655 let Some(store) = &self.store else {
1656 return false;
1657 };
1658 let dropped = store.drop_graph(name);
1659 if dropped {
1660 let mut current = self.current_graph.write();
1661 if current
1662 .as_deref()
1663 .is_some_and(|g| g.eq_ignore_ascii_case(name))
1664 {
1665 *current = None;
1666 }
1667 }
1668 dropped
1669 }
1670
1671 #[cfg(feature = "lpg")]
1673 #[must_use]
1674 pub fn list_graphs(&self) -> Vec<String> {
1675 self.lpg_store().graph_names()
1676 }
1677
1678 pub fn create_projection(
1699 &self,
1700 name: impl Into<String>,
1701 spec: grafeo_core::graph::ProjectionSpec,
1702 ) -> bool {
1703 use grafeo_core::graph::GraphProjection;
1704 use std::collections::hash_map::Entry;
1705
1706 let store = self.graph_store();
1707 let projection = Arc::new(GraphProjection::new(store, spec));
1708 let mut projections = self.projections.write();
1709 match projections.entry(name.into()) {
1710 Entry::Occupied(_) => false,
1711 Entry::Vacant(e) => {
1712 e.insert(projection);
1713 true
1714 }
1715 }
1716 }
1717
1718 pub fn drop_projection(&self, name: &str) -> bool {
1720 self.projections.write().remove(name).is_some()
1721 }
1722
1723 #[must_use]
1725 pub fn list_projections(&self) -> Vec<String> {
1726 self.projections.read().keys().cloned().collect()
1727 }
1728
1729 #[must_use]
1731 pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStore>> {
1732 self.projections
1733 .read()
1734 .get(name)
1735 .map(|p| Arc::clone(p) as Arc<dyn GraphStore>)
1736 }
1737
1738 #[must_use]
1747 pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1748 if let Some(ref ext_read) = self.external_read_store {
1749 Arc::clone(ext_read)
1750 } else {
1751 #[cfg(feature = "lpg")]
1752 {
1753 Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1754 }
1755 #[cfg(not(feature = "lpg"))]
1756 unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1757 }
1758 }
1759
1760 #[must_use]
1765 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1766 if self.external_read_store.is_some() {
1767 self.external_write_store.as_ref().map(Arc::clone)
1768 } else {
1769 #[cfg(feature = "lpg")]
1770 {
1771 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1772 }
1773 #[cfg(not(feature = "lpg"))]
1774 {
1775 None
1776 }
1777 }
1778 }
1779
1780 pub fn gc(&self) {
1787 #[cfg(feature = "lpg")]
1788 let current_epoch = {
1789 let min_epoch = self.transaction_manager.min_active_epoch();
1790 self.lpg_store().gc_versions(min_epoch);
1791 self.transaction_manager.current_epoch()
1792 };
1793 self.transaction_manager.gc();
1794
1795 #[cfg(feature = "cdc")]
1797 if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
1798 #[cfg(feature = "lpg")]
1799 self.cdc_log.apply_retention(current_epoch);
1800 }
1801 }
1802
1803 #[must_use]
1805 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1806 &self.buffer_manager
1807 }
1808
1809 #[must_use]
1811 pub fn query_cache(&self) -> &Arc<QueryCache> {
1812 &self.query_cache
1813 }
1814
1815 pub fn clear_plan_cache(&self) {
1821 self.query_cache.clear();
1822 }
1823
1824 pub fn close(&self) -> Result<()> {
1838 let mut is_open = self.is_open.write();
1839 if !*is_open {
1840 return Ok(());
1841 }
1842
1843 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1848 if let Some(mut timer) = self.checkpoint_timer.lock().take() {
1849 timer.stop();
1850 }
1851
1852 if self.read_only {
1854 #[cfg(feature = "grafeo-file")]
1855 if let Some(ref fm) = self.file_manager {
1856 fm.close()?;
1857 }
1858 *is_open = false;
1859 return Ok(());
1860 }
1861
1862 #[cfg(feature = "grafeo-file")]
1866 let is_single_file = self.file_manager.is_some();
1867 #[cfg(not(feature = "grafeo-file"))]
1868 let is_single_file = false;
1869
1870 #[cfg(feature = "grafeo-file")]
1871 if let Some(ref fm) = self.file_manager {
1872 #[cfg(feature = "wal")]
1874 if let Some(ref wal) = self.wal {
1875 wal.sync()?;
1876 }
1877 let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
1878
1879 #[cfg(feature = "wal")]
1885 let flush_result = if flush_result.sections_written == 0 {
1886 if let Some(ref wal) = self.wal {
1887 if wal.record_count() > 0 {
1888 grafeo_warn!(
1889 "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
1890 wal.record_count()
1891 );
1892 self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
1893 } else {
1894 flush_result
1895 }
1896 } else {
1897 flush_result
1898 }
1899 } else {
1900 flush_result
1901 };
1902
1903 #[cfg(feature = "wal")]
1906 if let Some(ref wal) = self.wal {
1907 wal.close_active_log();
1908 }
1909
1910 #[cfg(feature = "wal")]
1914 let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
1915 #[cfg(not(feature = "wal"))]
1916 let has_wal_records = false;
1917
1918 if flush_result.sections_written > 0 || !has_wal_records {
1919 {
1920 use grafeo_common::testing::crash::maybe_crash;
1921 maybe_crash("close:before_remove_sidecar_wal");
1922 }
1923 fm.remove_sidecar_wal()?;
1924 } else {
1925 grafeo_warn!(
1926 "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
1927 );
1928 }
1929 fm.close()?;
1930 }
1931
1932 #[cfg(feature = "wal")]
1938 if !is_single_file && let Some(ref wal) = self.wal {
1939 let commit_tx = self
1941 .transaction_manager
1942 .last_assigned_transaction_id()
1943 .unwrap_or_else(|| self.transaction_manager.begin());
1944
1945 wal.log(&WalRecord::TransactionCommit {
1947 transaction_id: commit_tx,
1948 })?;
1949
1950 wal.sync()?;
1951 }
1952
1953 *is_open = false;
1954 Ok(())
1955 }
1956
1957 #[cfg(feature = "wal")]
1959 #[must_use]
1960 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1961 self.wal.as_ref()
1962 }
1963
1964 #[cfg(feature = "wal")]
1966 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1967 if let Some(ref wal) = self.wal {
1968 wal.log(record)?;
1969 }
1970 Ok(())
1971 }
1972
1973 fn register_section_consumers(&mut self) {
1978 #[cfg(feature = "lpg")]
1979 let store_ref = self.store.as_ref();
1980 #[cfg(not(feature = "lpg"))]
1981 #[cfg(feature = "lpg")]
1983 if let Some(store) = store_ref {
1984 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1985 self.buffer_manager.register_consumer(Arc::new(
1986 section_consumer::SectionConsumer::new(Arc::new(lpg)),
1987 ));
1988 }
1989
1990 #[cfg(feature = "triple-store")]
1992 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
1993 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
1994 self.buffer_manager.register_consumer(Arc::new(
1995 section_consumer::SectionConsumer::new(Arc::new(rdf)),
1996 ));
1997 }
1998
1999 #[cfg(feature = "ring-index")]
2001 if self.rdf_store.ring().is_some() {
2002 let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2003 self.buffer_manager.register_consumer(Arc::new(
2004 section_consumer::SectionConsumer::new(Arc::new(ring)),
2005 ));
2006 }
2007
2008 #[cfg(all(
2011 feature = "lpg",
2012 feature = "vector-index",
2013 feature = "mmap",
2014 not(feature = "temporal")
2015 ))]
2016 if let Some(store) = store_ref {
2017 let spill_path = self.buffer_manager.config().spill_path.clone();
2018 let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2019 store, spill_path,
2020 ));
2021 self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2023 self.buffer_manager.register_consumer(consumer);
2024 }
2025
2026 #[cfg(all(feature = "lpg", feature = "text-index"))]
2028 if let Some(store) = store_ref {
2029 self.buffer_manager
2030 .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2031 }
2032
2033 #[cfg(feature = "cdc")]
2036 self.buffer_manager.register_consumer(
2037 Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2038 );
2039 }
2040
2041 #[cfg(all(
2048 feature = "lpg",
2049 feature = "vector-index",
2050 feature = "mmap",
2051 not(feature = "temporal")
2052 ))]
2053 fn restore_spill_files(&mut self) {
2054 use grafeo_core::index::vector::MmapStorage;
2055
2056 let spill_dir = match self.buffer_manager.config().spill_path {
2057 Some(ref path) => path.clone(),
2058 None => return,
2059 };
2060
2061 if !spill_dir.exists() {
2062 return;
2063 }
2064
2065 let spill_map = match self.vector_spill_storages {
2066 Some(ref map) => Arc::clone(map),
2067 None => return,
2068 };
2069
2070 let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2071 return;
2072 };
2073
2074 let Some(ref store) = self.store else {
2075 return;
2076 };
2077
2078 for entry in entries.flatten() {
2079 let path = entry.path();
2080 let file_name = match path.file_name().and_then(|n| n.to_str()) {
2081 Some(name) => name.to_string(),
2082 None => continue,
2083 };
2084
2085 if !file_name.starts_with("vectors_")
2087 || !std::path::Path::new(&file_name)
2088 .extension()
2089 .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2090 {
2091 continue;
2092 }
2093
2094 let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2096
2097 let key = key_part.replace("%3A", ":").replace("%25", "%");
2099
2100 if !key.contains(':') {
2102 continue;
2104 }
2105
2106 if store.get_vector_index_by_key(&key).is_none() {
2108 let _ = std::fs::remove_file(&path);
2110 continue;
2111 }
2112
2113 match MmapStorage::open(&path) {
2115 Ok(mmap_storage) => {
2116 let property = key.split(':').nth(1).unwrap_or("");
2118 let prop_key = grafeo_common::types::PropertyKey::new(property);
2119 store.node_properties_mark_spilled(&prop_key);
2120
2121 spill_map.write().insert(key, Arc::new(mmap_storage));
2122 }
2123 Err(e) => {
2124 eprintln!("failed to restore spill file {}: {e}", path.display());
2125 let _ = std::fs::remove_file(&path);
2127 }
2128 }
2129 }
2130 }
2131
2132 #[cfg(feature = "grafeo-file")]
2134 fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2135 let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2136
2137 #[cfg(feature = "lpg")]
2139 if let Some(store) = self.store.as_ref() {
2140 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2141
2142 let catalog = catalog_section::CatalogSection::new(
2143 Arc::clone(&self.catalog),
2144 Arc::clone(store),
2145 {
2146 let tm = Arc::clone(&self.transaction_manager);
2147 move || tm.current_epoch().as_u64()
2148 },
2149 );
2150
2151 sections.push(Box::new(catalog));
2152 sections.push(Box::new(lpg));
2153
2154 #[cfg(feature = "vector-index")]
2156 {
2157 let indexes = store.vector_index_entries();
2158 if !indexes.is_empty() {
2159 let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2160 sections.push(Box::new(vector));
2161 }
2162 }
2163
2164 #[cfg(feature = "text-index")]
2166 {
2167 let indexes = store.text_index_entries();
2168 if !indexes.is_empty() {
2169 let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2170 sections.push(Box::new(text));
2171 }
2172 }
2173 }
2174
2175 #[cfg(feature = "triple-store")]
2176 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2177 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2178 sections.push(Box::new(rdf));
2179 }
2180
2181 #[cfg(feature = "ring-index")]
2182 if self.rdf_store.ring().is_some() {
2183 let ring = grafeo_core::index::ring::RdfRingSection::new(Arc::clone(&self.rdf_store));
2184 sections.push(Box::new(ring));
2185 }
2186
2187 sections
2188 }
2189
2190 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2204 pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2205 let fm = self
2206 .file_manager
2207 .as_ref()
2208 .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2209
2210 if !self.read_only {
2214 let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2215 }
2216
2217 let current_epoch = self.transaction_manager.current_epoch();
2218 backup::do_backup_full(backup_dir, fm, self.wal.as_deref(), current_epoch)
2219 }
2220
2221 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2229 pub fn backup_incremental(
2230 &self,
2231 backup_dir: &std::path::Path,
2232 ) -> Result<backup::BackupSegment> {
2233 let wal = self
2234 .wal
2235 .as_ref()
2236 .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2237
2238 let current_epoch = self.transaction_manager.current_epoch();
2239 backup::do_backup_incremental(backup_dir, wal, current_epoch)
2240 }
2241
2242 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2248 pub fn read_backup_manifest(
2249 backup_dir: &std::path::Path,
2250 ) -> Result<Option<backup::BackupManifest>> {
2251 backup::read_manifest(backup_dir)
2252 }
2253
2254 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2256 #[must_use]
2257 pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2258 self.wal
2259 .as_ref()
2260 .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2261 }
2262
2263 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2274 pub fn restore_to_epoch(
2275 backup_dir: &std::path::Path,
2276 target_epoch: grafeo_common::types::EpochId,
2277 output_path: &std::path::Path,
2278 ) -> Result<()> {
2279 backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2280 }
2281
2282 #[cfg(feature = "grafeo-file")]
2288 fn checkpoint_to_file(
2289 &self,
2290 fm: &GrafeoFileManager,
2291 reason: flush::FlushReason,
2292 ) -> Result<flush::FlushResult> {
2293 let sections = self.build_sections();
2294 let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2295 sections.iter().map(|s| s.as_ref()).collect();
2296 #[cfg(feature = "lpg")]
2297 let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2298 #[cfg(not(feature = "lpg"))]
2299 let context = flush::build_context_minimal(&self.transaction_manager);
2300
2301 flush::flush(
2302 fm,
2303 §ion_refs,
2304 &context,
2305 reason,
2306 #[cfg(feature = "wal")]
2307 self.wal.as_deref(),
2308 )
2309 }
2310
2311 #[cfg(feature = "grafeo-file")]
2313 #[must_use]
2314 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2315 self.file_manager.as_ref()
2316 }
2317}
2318
2319impl Drop for GrafeoDB {
2320 fn drop(&mut self) {
2321 if let Err(e) = self.close() {
2322 grafeo_error!("Error closing database: {}", e);
2323 }
2324 }
2325}
2326
2327#[cfg(feature = "lpg")]
2328impl crate::admin::AdminService for GrafeoDB {
2329 fn info(&self) -> crate::admin::DatabaseInfo {
2330 self.info()
2331 }
2332
2333 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2334 self.detailed_stats()
2335 }
2336
2337 fn schema(&self) -> crate::admin::SchemaInfo {
2338 self.schema()
2339 }
2340
2341 fn validate(&self) -> crate::admin::ValidationResult {
2342 self.validate()
2343 }
2344
2345 fn wal_status(&self) -> crate::admin::WalStatus {
2346 self.wal_status()
2347 }
2348
2349 fn wal_checkpoint(&self) -> Result<()> {
2350 self.wal_checkpoint()
2351 }
2352}
2353
2354#[derive(Debug)]
2384pub struct QueryResult {
2385 pub columns: Vec<String>,
2387 pub column_types: Vec<grafeo_common::types::LogicalType>,
2389 pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2394 pub execution_time_ms: Option<f64>,
2396 pub rows_scanned: Option<u64>,
2398 pub status_message: Option<String>,
2400 pub gql_status: grafeo_common::utils::GqlStatus,
2402}
2403
2404impl QueryResult {
2405 #[must_use]
2407 pub fn empty() -> Self {
2408 Self {
2409 columns: Vec::new(),
2410 column_types: Vec::new(),
2411 rows: Vec::new(),
2412 execution_time_ms: None,
2413 rows_scanned: None,
2414 status_message: None,
2415 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2416 }
2417 }
2418
2419 #[must_use]
2421 pub fn status(msg: impl Into<String>) -> Self {
2422 Self {
2423 columns: Vec::new(),
2424 column_types: Vec::new(),
2425 rows: Vec::new(),
2426 execution_time_ms: None,
2427 rows_scanned: None,
2428 status_message: Some(msg.into()),
2429 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2430 }
2431 }
2432
2433 #[must_use]
2435 pub fn new(columns: Vec<String>) -> Self {
2436 let len = columns.len();
2437 Self {
2438 columns,
2439 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2440 rows: Vec::new(),
2441 execution_time_ms: None,
2442 rows_scanned: None,
2443 status_message: None,
2444 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2445 }
2446 }
2447
2448 #[must_use]
2450 pub fn with_types(
2451 columns: Vec<String>,
2452 column_types: Vec<grafeo_common::types::LogicalType>,
2453 ) -> Self {
2454 Self {
2455 columns,
2456 column_types,
2457 rows: Vec::new(),
2458 execution_time_ms: None,
2459 rows_scanned: None,
2460 status_message: None,
2461 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2462 }
2463 }
2464
2465 #[must_use]
2467 pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2468 let len = columns.len();
2469 Self {
2470 columns,
2471 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2472 rows,
2473 execution_time_ms: None,
2474 rows_scanned: None,
2475 status_message: None,
2476 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2477 }
2478 }
2479
2480 pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2482 self.rows.push(row);
2483 }
2484
2485 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2487 self.execution_time_ms = Some(execution_time_ms);
2488 self.rows_scanned = Some(rows_scanned);
2489 self
2490 }
2491
2492 #[must_use]
2494 pub fn execution_time_ms(&self) -> Option<f64> {
2495 self.execution_time_ms
2496 }
2497
2498 #[must_use]
2500 pub fn rows_scanned(&self) -> Option<u64> {
2501 self.rows_scanned
2502 }
2503
2504 #[must_use]
2506 pub fn row_count(&self) -> usize {
2507 self.rows.len()
2508 }
2509
2510 #[must_use]
2512 pub fn column_count(&self) -> usize {
2513 self.columns.len()
2514 }
2515
2516 #[must_use]
2518 pub fn is_empty(&self) -> bool {
2519 self.rows.is_empty()
2520 }
2521
2522 pub fn scalar<T: FromValue>(&self) -> Result<T> {
2531 if self.rows.len() != 1 || self.columns.len() != 1 {
2532 return Err(grafeo_common::utils::error::Error::InvalidValue(
2533 "Expected single value".to_string(),
2534 ));
2535 }
2536 T::from_value(&self.rows[0][0])
2537 }
2538
2539 #[must_use]
2541 pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2542 &self.rows
2543 }
2544
2545 #[must_use]
2547 pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2548 self.rows
2549 }
2550
2551 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2553 self.rows.iter()
2554 }
2555
2556 #[cfg(feature = "arrow-export")]
2571 pub fn to_record_batch(
2572 &self,
2573 ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2574 arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2575 }
2576
2577 #[cfg(feature = "arrow-export")]
2588 pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2589 let batch = self.to_record_batch()?;
2590 arrow::record_batch_to_ipc_stream(&batch)
2591 }
2592}
2593
2594impl std::fmt::Display for QueryResult {
2595 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2596 let table = grafeo_common::fmt::format_result_table(
2597 &self.columns,
2598 &self.rows,
2599 self.execution_time_ms,
2600 self.status_message.as_deref(),
2601 );
2602 f.write_str(&table)
2603 }
2604}
2605
2606pub trait FromValue: Sized {
2611 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2617}
2618
2619impl FromValue for i64 {
2620 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2621 value
2622 .as_int64()
2623 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2624 expected: "INT64".to_string(),
2625 found: value.type_name().to_string(),
2626 })
2627 }
2628}
2629
2630impl FromValue for f64 {
2631 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2632 value
2633 .as_float64()
2634 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2635 expected: "FLOAT64".to_string(),
2636 found: value.type_name().to_string(),
2637 })
2638 }
2639}
2640
2641impl FromValue for String {
2642 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2643 value.as_str().map(String::from).ok_or_else(|| {
2644 grafeo_common::utils::error::Error::TypeMismatch {
2645 expected: "STRING".to_string(),
2646 found: value.type_name().to_string(),
2647 }
2648 })
2649 }
2650}
2651
2652impl FromValue for bool {
2653 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2654 value
2655 .as_bool()
2656 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2657 expected: "BOOL".to_string(),
2658 found: value.type_name().to_string(),
2659 })
2660 }
2661}
2662
2663#[cfg(test)]
2664mod tests {
2665 use super::*;
2666
2667 #[test]
2668 fn test_create_in_memory_database() {
2669 let db = GrafeoDB::new_in_memory();
2670 assert_eq!(db.node_count(), 0);
2671 assert_eq!(db.edge_count(), 0);
2672 }
2673
2674 #[test]
2675 fn test_database_config() {
2676 let config = Config::in_memory().with_threads(4).with_query_logging();
2677
2678 let db = GrafeoDB::with_config(config).unwrap();
2679 assert_eq!(db.config().threads, 4);
2680 assert!(db.config().query_logging);
2681 }
2682
2683 #[test]
2684 fn test_database_session() {
2685 let db = GrafeoDB::new_in_memory();
2686 let _session = db.session();
2687 }
2689
2690 #[cfg(feature = "wal")]
2691 #[test]
2692 fn test_persistent_database_recovery() {
2693 use grafeo_common::types::Value;
2694 use tempfile::tempdir;
2695
2696 let dir = tempdir().unwrap();
2697 let db_path = dir.path().join("test_db");
2698
2699 {
2701 let db = GrafeoDB::open(&db_path).unwrap();
2702
2703 let alix = db.create_node(&["Person"]);
2704 db.set_node_property(alix, "name", Value::from("Alix"));
2705
2706 let gus = db.create_node(&["Person"]);
2707 db.set_node_property(gus, "name", Value::from("Gus"));
2708
2709 let _edge = db.create_edge(alix, gus, "KNOWS");
2710
2711 db.close().unwrap();
2713 }
2714
2715 {
2717 let db = GrafeoDB::open(&db_path).unwrap();
2718
2719 assert_eq!(db.node_count(), 2);
2720 assert_eq!(db.edge_count(), 1);
2721
2722 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2724 assert!(node0.is_some());
2725
2726 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2727 assert!(node1.is_some());
2728 }
2729 }
2730
2731 #[cfg(feature = "wal")]
2732 #[test]
2733 fn test_wal_logging() {
2734 use tempfile::tempdir;
2735
2736 let dir = tempdir().unwrap();
2737 let db_path = dir.path().join("wal_test_db");
2738
2739 let db = GrafeoDB::open(&db_path).unwrap();
2740
2741 let node = db.create_node(&["Test"]);
2743 db.delete_node(node);
2744
2745 if let Some(wal) = db.wal() {
2747 assert!(wal.record_count() > 0);
2748 }
2749
2750 db.close().unwrap();
2751 }
2752
2753 #[cfg(feature = "wal")]
2754 #[test]
2755 fn test_wal_recovery_multiple_sessions() {
2756 use grafeo_common::types::Value;
2758 use tempfile::tempdir;
2759
2760 let dir = tempdir().unwrap();
2761 let db_path = dir.path().join("multi_session_db");
2762
2763 {
2765 let db = GrafeoDB::open(&db_path).unwrap();
2766 let alix = db.create_node(&["Person"]);
2767 db.set_node_property(alix, "name", Value::from("Alix"));
2768 db.close().unwrap();
2769 }
2770
2771 {
2773 let db = GrafeoDB::open(&db_path).unwrap();
2774 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
2776 db.set_node_property(gus, "name", Value::from("Gus"));
2777 db.close().unwrap();
2778 }
2779
2780 {
2782 let db = GrafeoDB::open(&db_path).unwrap();
2783 assert_eq!(db.node_count(), 2);
2784
2785 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2787 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2788
2789 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2790 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2791 }
2792 }
2793
2794 #[cfg(feature = "wal")]
2795 #[test]
2796 fn test_database_consistency_after_mutations() {
2797 use grafeo_common::types::Value;
2799 use tempfile::tempdir;
2800
2801 let dir = tempdir().unwrap();
2802 let db_path = dir.path().join("consistency_db");
2803
2804 {
2805 let db = GrafeoDB::open(&db_path).unwrap();
2806
2807 let a = db.create_node(&["Node"]);
2809 let b = db.create_node(&["Node"]);
2810 let c = db.create_node(&["Node"]);
2811
2812 let e1 = db.create_edge(a, b, "LINKS");
2814 let _e2 = db.create_edge(b, c, "LINKS");
2815
2816 db.delete_edge(e1);
2818 db.delete_node(b);
2819
2820 db.set_node_property(a, "value", Value::Int64(1));
2822 db.set_node_property(c, "value", Value::Int64(3));
2823
2824 db.close().unwrap();
2825 }
2826
2827 {
2829 let db = GrafeoDB::open(&db_path).unwrap();
2830
2831 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2835 assert!(node_a.is_some());
2836
2837 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2838 assert!(node_c.is_some());
2839
2840 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2842 assert!(node_b.is_none());
2843 }
2844 }
2845
2846 #[cfg(feature = "wal")]
2847 #[test]
2848 fn test_close_is_idempotent() {
2849 use tempfile::tempdir;
2851
2852 let dir = tempdir().unwrap();
2853 let db_path = dir.path().join("close_test_db");
2854
2855 let db = GrafeoDB::open(&db_path).unwrap();
2856 db.create_node(&["Test"]);
2857
2858 assert!(db.close().is_ok());
2860
2861 assert!(db.close().is_ok());
2863 }
2864
2865 #[test]
2866 fn test_with_store_external_backend() {
2867 use grafeo_core::graph::lpg::LpgStore;
2868
2869 let external = Arc::new(LpgStore::new().unwrap());
2870
2871 let n1 = external.create_node(&["Person"]);
2873 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2874
2875 let db = GrafeoDB::with_store(
2876 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2877 Config::in_memory(),
2878 )
2879 .unwrap();
2880
2881 let session = db.session();
2882
2883 #[cfg(feature = "gql")]
2885 {
2886 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2887 assert_eq!(result.rows.len(), 1);
2888 }
2889 }
2890
2891 #[test]
2892 fn test_with_config_custom_memory_limit() {
2893 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
2896 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2897 assert_eq!(db.node_count(), 0);
2898 }
2899
2900 #[cfg(feature = "metrics")]
2901 #[test]
2902 fn test_database_metrics_registry() {
2903 let db = GrafeoDB::new_in_memory();
2904
2905 db.create_node(&["Person"]);
2907 db.create_node(&["Person"]);
2908
2909 let snap = db.metrics();
2911 assert_eq!(snap.query_count, 0); }
2914
2915 #[test]
2916 fn test_query_result_has_metrics() {
2917 let db = GrafeoDB::new_in_memory();
2919 db.create_node(&["Person"]);
2920 db.create_node(&["Person"]);
2921
2922 #[cfg(feature = "gql")]
2923 {
2924 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2925
2926 assert!(result.execution_time_ms.is_some());
2928 assert!(result.rows_scanned.is_some());
2929 assert!(result.execution_time_ms.unwrap() >= 0.0);
2930 assert_eq!(result.rows_scanned.unwrap(), 2);
2931 }
2932 }
2933
2934 #[test]
2935 fn test_empty_query_result_metrics() {
2936 let db = GrafeoDB::new_in_memory();
2938 db.create_node(&["Person"]);
2939
2940 #[cfg(feature = "gql")]
2941 {
2942 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2944
2945 assert!(result.execution_time_ms.is_some());
2946 assert!(result.rows_scanned.is_some());
2947 assert_eq!(result.rows_scanned.unwrap(), 0);
2948 }
2949 }
2950
2951 #[cfg(feature = "cdc")]
2952 mod cdc_integration {
2953 use super::*;
2954
2955 fn cdc_db() -> GrafeoDB {
2957 GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2958 }
2959
2960 #[test]
2961 fn test_node_lifecycle_history() {
2962 let db = cdc_db();
2963
2964 let id = db.create_node(&["Person"]);
2966 db.set_node_property(id, "name", "Alix".into());
2968 db.set_node_property(id, "name", "Gus".into());
2969 db.delete_node(id);
2971
2972 let history = db.history(id).unwrap();
2973 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2975 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2976 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2978 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2980 }
2981
2982 #[test]
2983 fn test_edge_lifecycle_history() {
2984 let db = cdc_db();
2985
2986 let alix = db.create_node(&["Person"]);
2987 let gus = db.create_node(&["Person"]);
2988 let edge = db.create_edge(alix, gus, "KNOWS");
2989 db.set_edge_property(edge, "since", 2024i64.into());
2990 db.delete_edge(edge);
2991
2992 let history = db.history(edge).unwrap();
2993 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2995 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2996 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2997 }
2998
2999 #[test]
3000 fn test_create_node_with_props_cdc() {
3001 let db = cdc_db();
3002
3003 let id = db.create_node_with_props(
3004 &["Person"],
3005 vec![
3006 ("name", grafeo_common::types::Value::from("Alix")),
3007 ("age", grafeo_common::types::Value::from(30i64)),
3008 ],
3009 );
3010
3011 let history = db.history(id).unwrap();
3012 assert_eq!(history.len(), 1);
3013 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3014 let after = history[0].after.as_ref().unwrap();
3016 assert_eq!(after.len(), 2);
3017 }
3018
3019 #[test]
3020 fn test_changes_between() {
3021 let db = cdc_db();
3022
3023 let id1 = db.create_node(&["A"]);
3024 let _id2 = db.create_node(&["B"]);
3025 db.set_node_property(id1, "x", 1i64.into());
3026
3027 let changes = db
3029 .changes_between(
3030 grafeo_common::types::EpochId(0),
3031 grafeo_common::types::EpochId(u64::MAX),
3032 )
3033 .unwrap();
3034 assert_eq!(changes.len(), 3); }
3036
3037 #[test]
3038 fn test_cdc_disabled_by_default() {
3039 let db = GrafeoDB::new_in_memory();
3040 assert!(!db.is_cdc_enabled());
3041
3042 let id = db.create_node(&["Person"]);
3043 db.set_node_property(id, "name", "Alix".into());
3044
3045 let history = db.history(id).unwrap();
3046 assert!(history.is_empty(), "CDC off by default: no events recorded");
3047 }
3048
3049 #[test]
3050 fn test_session_with_cdc_override_on() {
3051 let db = GrafeoDB::new_in_memory();
3053 let session = db.session_with_cdc(true);
3054 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3055 let changes = db
3057 .changes_between(
3058 grafeo_common::types::EpochId(0),
3059 grafeo_common::types::EpochId(u64::MAX),
3060 )
3061 .unwrap();
3062 assert!(
3063 !changes.is_empty(),
3064 "session_with_cdc(true) should record events"
3065 );
3066 }
3067
3068 #[test]
3069 fn test_session_with_cdc_override_off() {
3070 let db = cdc_db();
3072 let session = db.session_with_cdc(false);
3073 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3074 let changes = db
3075 .changes_between(
3076 grafeo_common::types::EpochId(0),
3077 grafeo_common::types::EpochId(u64::MAX),
3078 )
3079 .unwrap();
3080 assert!(
3081 changes.is_empty(),
3082 "session_with_cdc(false) should not record events"
3083 );
3084 }
3085
3086 #[test]
3087 fn test_set_cdc_enabled_runtime() {
3088 let db = GrafeoDB::new_in_memory();
3089 assert!(!db.is_cdc_enabled());
3090
3091 db.set_cdc_enabled(true);
3093 assert!(db.is_cdc_enabled());
3094
3095 let id = db.create_node(&["Person"]);
3096 let history = db.history(id).unwrap();
3097 assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3098
3099 db.set_cdc_enabled(false);
3101 let id2 = db.create_node(&["Person"]);
3102 let history2 = db.history(id2).unwrap();
3103 assert!(
3104 history2.is_empty(),
3105 "CDC disabled at runtime stops recording"
3106 );
3107 }
3108 }
3109
3110 #[test]
3111 fn test_with_store_basic() {
3112 use grafeo_core::graph::lpg::LpgStore;
3113
3114 let store = Arc::new(LpgStore::new().unwrap());
3115 let n1 = store.create_node(&["Person"]);
3116 store.set_node_property(n1, "name", "Alix".into());
3117
3118 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3119 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3120
3121 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3122 assert_eq!(result.rows.len(), 1);
3123 }
3124
3125 #[test]
3126 fn test_with_store_session() {
3127 use grafeo_core::graph::lpg::LpgStore;
3128
3129 let store = Arc::new(LpgStore::new().unwrap());
3130 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3131 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3132
3133 let session = db.session();
3134 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3135 assert_eq!(result.rows.len(), 1);
3136 }
3137
3138 #[test]
3139 fn test_with_store_mutations() {
3140 use grafeo_core::graph::lpg::LpgStore;
3141
3142 let store = Arc::new(LpgStore::new().unwrap());
3143 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3144 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3145
3146 let mut session = db.session();
3147
3148 session.begin_transaction().unwrap();
3152 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3153
3154 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3155 assert_eq!(result.rows.len(), 1);
3156
3157 session.commit().unwrap();
3158 }
3159
3160 #[test]
3165 fn test_query_result_empty() {
3166 let result = QueryResult::empty();
3167 assert!(result.is_empty());
3168 assert_eq!(result.row_count(), 0);
3169 assert_eq!(result.column_count(), 0);
3170 assert!(result.execution_time_ms().is_none());
3171 assert!(result.rows_scanned().is_none());
3172 assert!(result.status_message.is_none());
3173 }
3174
3175 #[test]
3176 fn test_query_result_status() {
3177 let result = QueryResult::status("Created node type 'Person'");
3178 assert!(result.is_empty());
3179 assert_eq!(result.column_count(), 0);
3180 assert_eq!(
3181 result.status_message.as_deref(),
3182 Some("Created node type 'Person'")
3183 );
3184 }
3185
3186 #[test]
3187 fn test_query_result_new_with_columns() {
3188 let result = QueryResult::new(vec!["name".into(), "age".into()]);
3189 assert_eq!(result.column_count(), 2);
3190 assert_eq!(result.row_count(), 0);
3191 assert!(result.is_empty());
3192 assert_eq!(
3194 result.column_types,
3195 vec![
3196 grafeo_common::types::LogicalType::Any,
3197 grafeo_common::types::LogicalType::Any
3198 ]
3199 );
3200 }
3201
3202 #[test]
3203 fn test_query_result_with_types() {
3204 use grafeo_common::types::LogicalType;
3205 let result = QueryResult::with_types(
3206 vec!["name".into(), "age".into()],
3207 vec![LogicalType::String, LogicalType::Int64],
3208 );
3209 assert_eq!(result.column_count(), 2);
3210 assert_eq!(result.column_types[0], LogicalType::String);
3211 assert_eq!(result.column_types[1], LogicalType::Int64);
3212 }
3213
3214 #[test]
3215 fn test_query_result_with_metrics() {
3216 let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3217 assert_eq!(result.execution_time_ms(), Some(42.5));
3218 assert_eq!(result.rows_scanned(), Some(100));
3219 }
3220
3221 #[test]
3222 fn test_query_result_scalar_success() {
3223 use grafeo_common::types::Value;
3224 let mut result = QueryResult::new(vec!["count".into()]);
3225 result.rows.push(vec![Value::Int64(42)]);
3226
3227 let val: i64 = result.scalar().unwrap();
3228 assert_eq!(val, 42);
3229 }
3230
3231 #[test]
3232 fn test_query_result_scalar_wrong_shape() {
3233 use grafeo_common::types::Value;
3234 let mut result = QueryResult::new(vec!["x".into()]);
3236 result.rows.push(vec![Value::Int64(1)]);
3237 result.rows.push(vec![Value::Int64(2)]);
3238 assert!(result.scalar::<i64>().is_err());
3239
3240 let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3242 result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3243 assert!(result2.scalar::<i64>().is_err());
3244
3245 let result3 = QueryResult::new(vec!["x".into()]);
3247 assert!(result3.scalar::<i64>().is_err());
3248 }
3249
3250 #[test]
3251 fn test_query_result_iter() {
3252 use grafeo_common::types::Value;
3253 let mut result = QueryResult::new(vec!["x".into()]);
3254 result.rows.push(vec![Value::Int64(1)]);
3255 result.rows.push(vec![Value::Int64(2)]);
3256
3257 let collected: Vec<_> = result.iter().collect();
3258 assert_eq!(collected.len(), 2);
3259 }
3260
3261 #[test]
3262 fn test_query_result_display() {
3263 use grafeo_common::types::Value;
3264 let mut result = QueryResult::new(vec!["name".into()]);
3265 result.rows.push(vec![Value::from("Alix")]);
3266 let display = result.to_string();
3267 assert!(display.contains("name"));
3268 assert!(display.contains("Alix"));
3269 }
3270
3271 #[test]
3276 fn test_from_value_i64_type_mismatch() {
3277 use grafeo_common::types::Value;
3278 let val = Value::from("not a number");
3279 assert!(i64::from_value(&val).is_err());
3280 }
3281
3282 #[test]
3283 fn test_from_value_f64_type_mismatch() {
3284 use grafeo_common::types::Value;
3285 let val = Value::from("not a float");
3286 assert!(f64::from_value(&val).is_err());
3287 }
3288
3289 #[test]
3290 fn test_from_value_string_type_mismatch() {
3291 use grafeo_common::types::Value;
3292 let val = Value::Int64(42);
3293 assert!(String::from_value(&val).is_err());
3294 }
3295
3296 #[test]
3297 fn test_from_value_bool_type_mismatch() {
3298 use grafeo_common::types::Value;
3299 let val = Value::Int64(1);
3300 assert!(bool::from_value(&val).is_err());
3301 }
3302
3303 #[test]
3304 fn test_from_value_all_success() {
3305 use grafeo_common::types::Value;
3306 assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3307 assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3308 assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3309 assert!(bool::from_value(&Value::Bool(true)).unwrap());
3310 }
3311
3312 #[test]
3317 fn test_database_is_read_only_false_by_default() {
3318 let db = GrafeoDB::new_in_memory();
3319 assert!(!db.is_read_only());
3320 }
3321
3322 #[test]
3323 fn test_database_graph_model() {
3324 let db = GrafeoDB::new_in_memory();
3325 assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3326 }
3327
3328 #[test]
3329 fn test_database_memory_limit_none_by_default() {
3330 let db = GrafeoDB::new_in_memory();
3331 assert!(db.memory_limit().is_none());
3332 }
3333
3334 #[test]
3335 fn test_database_memory_limit_custom() {
3336 let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3337 let db = GrafeoDB::with_config(config).unwrap();
3338 assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3339 }
3340
3341 #[test]
3342 fn test_database_adaptive_config() {
3343 let db = GrafeoDB::new_in_memory();
3344 let adaptive = db.adaptive_config();
3345 assert!(adaptive.enabled);
3346 assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3347 }
3348
3349 #[test]
3350 fn test_database_buffer_manager() {
3351 let db = GrafeoDB::new_in_memory();
3352 let _bm = db.buffer_manager();
3353 }
3355
3356 #[test]
3357 fn test_database_query_cache() {
3358 let db = GrafeoDB::new_in_memory();
3359 let _qc = db.query_cache();
3360 }
3361
3362 #[test]
3363 fn test_database_clear_plan_cache() {
3364 let db = GrafeoDB::new_in_memory();
3365 #[cfg(feature = "gql")]
3367 {
3368 let _ = db.execute("MATCH (n) RETURN count(n)");
3369 }
3370 db.clear_plan_cache();
3371 }
3373
3374 #[test]
3375 fn test_database_gc() {
3376 let db = GrafeoDB::new_in_memory();
3377 db.create_node(&["Person"]);
3378 db.gc();
3379 assert_eq!(db.node_count(), 1);
3381 }
3382
3383 #[test]
3388 fn test_create_and_list_graphs() {
3389 let db = GrafeoDB::new_in_memory();
3390 let created = db.create_graph("social").unwrap();
3391 assert!(created);
3392
3393 let created_again = db.create_graph("social").unwrap();
3395 assert!(!created_again);
3396
3397 let names = db.list_graphs();
3398 assert!(names.contains(&"social".to_string()));
3399 }
3400
3401 #[test]
3402 fn test_drop_graph() {
3403 let db = GrafeoDB::new_in_memory();
3404 db.create_graph("temp").unwrap();
3405 assert!(db.drop_graph("temp"));
3406 assert!(!db.drop_graph("temp")); }
3408
3409 #[test]
3410 fn test_drop_graph_resets_current_graph() {
3411 let db = GrafeoDB::new_in_memory();
3412 db.create_graph("active").unwrap();
3413 db.set_current_graph(Some("active")).unwrap();
3414 assert_eq!(db.current_graph(), Some("active".to_string()));
3415
3416 db.drop_graph("active");
3417 assert_eq!(db.current_graph(), None);
3418 }
3419
3420 #[test]
3425 fn test_current_graph_default_none() {
3426 let db = GrafeoDB::new_in_memory();
3427 assert_eq!(db.current_graph(), None);
3428 }
3429
3430 #[test]
3431 fn test_set_current_graph_valid() {
3432 let db = GrafeoDB::new_in_memory();
3433 db.create_graph("social").unwrap();
3434 db.set_current_graph(Some("social")).unwrap();
3435 assert_eq!(db.current_graph(), Some("social".to_string()));
3436 }
3437
3438 #[test]
3439 fn test_set_current_graph_nonexistent() {
3440 let db = GrafeoDB::new_in_memory();
3441 let result = db.set_current_graph(Some("nonexistent"));
3442 assert!(result.is_err());
3443 }
3444
3445 #[test]
3446 fn test_set_current_graph_none_resets() {
3447 let db = GrafeoDB::new_in_memory();
3448 db.create_graph("social").unwrap();
3449 db.set_current_graph(Some("social")).unwrap();
3450 db.set_current_graph(None).unwrap();
3451 assert_eq!(db.current_graph(), None);
3452 }
3453
3454 #[test]
3455 fn test_set_current_graph_default_keyword() {
3456 let db = GrafeoDB::new_in_memory();
3457 db.set_current_graph(Some("default")).unwrap();
3459 assert_eq!(db.current_graph(), Some("default".to_string()));
3460 }
3461
3462 #[test]
3463 fn test_current_schema_default_none() {
3464 let db = GrafeoDB::new_in_memory();
3465 assert_eq!(db.current_schema(), None);
3466 }
3467
3468 #[test]
3469 fn test_set_current_schema_nonexistent() {
3470 let db = GrafeoDB::new_in_memory();
3471 let result = db.set_current_schema(Some("nonexistent"));
3472 assert!(result.is_err());
3473 }
3474
3475 #[test]
3476 fn test_set_current_schema_none_resets() {
3477 let db = GrafeoDB::new_in_memory();
3478 db.set_current_schema(None).unwrap();
3479 assert_eq!(db.current_schema(), None);
3480 }
3481
3482 #[test]
3487 fn test_graph_store_returns_lpg_by_default() {
3488 let db = GrafeoDB::new_in_memory();
3489 db.create_node(&["Person"]);
3490 let store = db.graph_store();
3491 assert_eq!(store.node_count(), 1);
3492 }
3493
3494 #[test]
3495 fn test_graph_store_mut_returns_some_by_default() {
3496 let db = GrafeoDB::new_in_memory();
3497 assert!(db.graph_store_mut().is_some());
3498 }
3499
3500 #[test]
3501 fn test_with_read_store() {
3502 use grafeo_core::graph::lpg::LpgStore;
3503
3504 let store = Arc::new(LpgStore::new().unwrap());
3505 store.create_node(&["Person"]);
3506
3507 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3508 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3509
3510 assert!(db.is_read_only());
3511 assert!(db.graph_store_mut().is_none());
3512
3513 let gs = db.graph_store();
3515 assert_eq!(gs.node_count(), 1);
3516 }
3517
3518 #[test]
3519 fn test_with_store_graph_store_methods() {
3520 use grafeo_core::graph::lpg::LpgStore;
3521
3522 let store = Arc::new(LpgStore::new().unwrap());
3523 store.create_node(&["Person"]);
3524
3525 let db = GrafeoDB::with_store(
3526 Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3527 Config::in_memory(),
3528 )
3529 .unwrap();
3530
3531 assert!(!db.is_read_only());
3532 assert!(db.graph_store_mut().is_some());
3533 assert_eq!(db.graph_store().node_count(), 1);
3534 }
3535
3536 #[test]
3541 #[allow(deprecated)]
3542 fn test_session_read_only() {
3543 let db = GrafeoDB::new_in_memory();
3544 db.create_node(&["Person"]);
3545
3546 let session = db.session_read_only();
3547 #[cfg(feature = "gql")]
3549 {
3550 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3551 assert_eq!(result.rows.len(), 1);
3552 }
3553 }
3554
3555 #[test]
3560 fn test_close_in_memory_database() {
3561 let db = GrafeoDB::new_in_memory();
3562 db.create_node(&["Person"]);
3563 assert!(db.close().is_ok());
3564 assert!(db.close().is_ok());
3566 }
3567
3568 #[test]
3573 fn test_with_config_invalid_config_zero_threads() {
3574 let config = Config::in_memory().with_threads(0);
3575 let result = GrafeoDB::with_config(config);
3576 assert!(result.is_err());
3577 }
3578
3579 #[test]
3580 fn test_with_config_invalid_config_zero_memory_limit() {
3581 let config = Config::in_memory().with_memory_limit(0);
3582 let result = GrafeoDB::with_config(config);
3583 assert!(result.is_err());
3584 }
3585
3586 #[test]
3591 fn test_storage_format_display() {
3592 use crate::config::StorageFormat;
3593 assert_eq!(StorageFormat::Auto.to_string(), "auto");
3594 assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3595 assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3596 }
3597
3598 #[test]
3599 fn test_storage_format_default() {
3600 use crate::config::StorageFormat;
3601 assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3602 }
3603
3604 #[test]
3605 fn test_config_with_storage_format() {
3606 use crate::config::StorageFormat;
3607 let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3608 assert_eq!(config.storage_format, StorageFormat::SingleFile);
3609 }
3610
3611 #[test]
3616 fn test_config_with_cdc() {
3617 let config = Config::in_memory().with_cdc();
3618 assert!(config.cdc_enabled);
3619 }
3620
3621 #[test]
3622 fn test_config_cdc_default_false() {
3623 let config = Config::in_memory();
3624 assert!(!config.cdc_enabled);
3625 }
3626
3627 #[test]
3632 fn test_config_error_is_error_trait() {
3633 use crate::config::ConfigError;
3634 let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3635 assert!(err.source().is_none());
3636 }
3637
3638 #[cfg(feature = "metrics")]
3643 #[test]
3644 fn test_metrics_prometheus_output() {
3645 let db = GrafeoDB::new_in_memory();
3646 let prom = db.metrics_prometheus();
3647 assert!(!prom.is_empty());
3649 }
3650
3651 #[cfg(feature = "metrics")]
3652 #[test]
3653 fn test_reset_metrics() {
3654 let db = GrafeoDB::new_in_memory();
3655 let _session = db.session();
3657 db.reset_metrics();
3658 let snap = db.metrics();
3659 assert_eq!(snap.query_count, 0);
3660 }
3661
3662 #[test]
3667 fn test_drop_graph_on_external_store() {
3668 use grafeo_core::graph::lpg::LpgStore;
3669
3670 let store = Arc::new(LpgStore::new().unwrap());
3671 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3672 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3673
3674 assert!(!db.drop_graph("anything"));
3676 }
3677}