1#[cfg(feature = "lpg")]
15mod admin;
16#[cfg(feature = "arrow-export")]
17pub mod arrow;
18#[cfg(all(feature = "async-storage", feature = "lpg"))]
19mod async_ops;
20#[cfg(all(feature = "async-storage", feature = "lpg"))]
21pub(crate) mod async_wal_store;
22#[cfg(all(feature = "wal", feature = "grafeo-file"))]
23pub mod backup;
24#[cfg(feature = "lpg")]
25pub(crate) mod catalog_section;
26#[cfg(feature = "cdc")]
27pub(crate) mod cdc_store;
28#[cfg(all(feature = "grafeo-file", feature = "lpg"))]
29mod checkpoint_timer;
30#[cfg(feature = "lpg")]
31mod crud;
32#[cfg(feature = "embed")]
33mod embed;
34#[cfg(feature = "grafeo-file")]
35pub(crate) mod flush;
36#[cfg(feature = "lpg")]
37mod import;
38#[cfg(feature = "lpg")]
39mod index;
40#[cfg(feature = "lpg")]
41mod persistence;
42mod query;
43#[cfg(feature = "triple-store")]
44mod rdf_ops;
45#[cfg(feature = "lpg")]
46mod search;
47pub(crate) mod section_consumer;
48#[cfg(all(feature = "wal", feature = "lpg"))]
49pub(crate) mod wal_store;
50
51use grafeo_common::{grafeo_error, grafeo_warn};
52#[cfg(feature = "wal")]
53use std::path::Path;
54use std::sync::Arc;
55use std::sync::atomic::AtomicUsize;
56
57use parking_lot::RwLock;
58
59use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
60use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind, Result};
61#[cfg(feature = "lpg")]
62use grafeo_core::graph::lpg::LpgStore;
63#[cfg(feature = "triple-store")]
64use grafeo_core::graph::rdf::RdfStore;
65use grafeo_core::graph::{GraphStore, GraphStoreMut};
66#[cfg(feature = "grafeo-file")]
67use grafeo_storage::file::GrafeoFileManager;
68#[cfg(all(feature = "wal", feature = "lpg"))]
69use grafeo_storage::wal::WalRecovery;
70#[cfg(feature = "wal")]
71use grafeo_storage::wal::{DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord};
72
73use crate::catalog::Catalog;
74use crate::config::Config;
75use crate::query::cache::QueryCache;
76use crate::session::Session;
77use crate::transaction::TransactionManager;
78
79pub struct GrafeoDB {
102 pub(super) config: Config,
104 #[cfg(feature = "lpg")]
106 pub(super) store: Option<Arc<LpgStore>>,
107 pub(super) catalog: Arc<Catalog>,
109 #[cfg(feature = "triple-store")]
111 pub(super) rdf_store: Arc<RdfStore>,
112 pub(super) transaction_manager: Arc<TransactionManager>,
114 pub(super) buffer_manager: Arc<BufferManager>,
116 #[cfg(feature = "wal")]
118 pub(super) wal: Option<Arc<LpgWal>>,
119 #[cfg(feature = "wal")]
123 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
124 pub(super) query_cache: Arc<QueryCache>,
126 pub(super) commit_counter: Arc<AtomicUsize>,
128 pub(super) is_open: RwLock<bool>,
130 #[cfg(feature = "cdc")]
132 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
133 #[cfg(feature = "cdc")]
135 cdc_enabled: std::sync::atomic::AtomicBool,
136 #[cfg(feature = "embed")]
138 pub(super) embedding_models:
139 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
140 #[cfg(feature = "grafeo-file")]
142 pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
143 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
146 checkpoint_timer: parking_lot::Mutex<Option<checkpoint_timer::CheckpointTimer>>,
147 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
150 vector_spill_storages: Option<
151 Arc<
152 parking_lot::RwLock<
153 std::collections::HashMap<String, Arc<grafeo_core::index::vector::MmapStorage>>,
154 >,
155 >,
156 >,
157 pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
160 pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
163 #[cfg(feature = "metrics")]
165 pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
166 current_graph: RwLock<Option<String>>,
170 current_schema: RwLock<Option<String>>,
174 read_only: bool,
177 projections:
179 Arc<RwLock<std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>>>,
180}
181
182impl GrafeoDB {
183 #[cfg(feature = "lpg")]
191 fn lpg_store(&self) -> &Arc<LpgStore> {
192 self.store.as_ref().expect(
193 "no built-in LpgStore: this GrafeoDB was created with an external store \
194 (with_store / with_read_store). Use session() or graph_store() instead.",
195 )
196 }
197
198 #[cfg(feature = "cdc")]
200 #[inline]
201 pub(super) fn cdc_active(&self) -> bool {
202 self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
203 }
204
205 #[must_use]
226 pub fn new_in_memory() -> Self {
227 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
228 }
229
230 #[cfg(feature = "wal")]
249 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
250 Self::with_config(Config::persistent(path.as_ref()))
251 }
252
253 #[cfg(feature = "grafeo-file")]
278 pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
279 Self::with_config(Config::read_only(path.as_ref()))
280 }
281
282 pub fn with_config(config: Config) -> Result<Self> {
306 config
308 .validate()
309 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
310
311 #[cfg(feature = "lpg")]
312 let store = Arc::new(LpgStore::new()?);
313 #[cfg(feature = "triple-store")]
314 let rdf_store = Arc::new(RdfStore::new());
315 let transaction_manager = Arc::new(TransactionManager::new());
316
317 let buffer_config = BufferManagerConfig {
319 budget: config.memory_limit.unwrap_or_else(|| {
320 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
321 }),
322 spill_path: config.spill_path.clone().or_else(|| {
323 config.path.as_ref().and_then(|p| {
324 let parent = p.parent()?;
325 let name = p.file_name()?.to_str()?;
326 Some(parent.join(format!("{name}.spill")))
327 })
328 }),
329 ..BufferManagerConfig::default()
330 };
331 let buffer_manager = BufferManager::new(buffer_config);
332
333 let catalog = Arc::new(Catalog::new());
335
336 let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
337
338 #[cfg(feature = "grafeo-file")]
340 let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
341 if let Some(ref db_path) = config.path {
343 if db_path.exists() && db_path.is_file() {
344 let fm = GrafeoFileManager::open_read_only(db_path)?;
345 #[cfg(feature = "lpg")]
347 if fm.read_section_directory()?.is_some() {
348 Self::load_from_sections(
349 &fm,
350 &store,
351 &catalog,
352 #[cfg(feature = "triple-store")]
353 &rdf_store,
354 )?;
355 } else {
356 let snapshot_data = fm.read_snapshot()?;
358 if !snapshot_data.is_empty() {
359 Self::apply_snapshot_data(
360 &store,
361 &catalog,
362 #[cfg(feature = "triple-store")]
363 &rdf_store,
364 &snapshot_data,
365 )?;
366 }
367 }
368 Some(Arc::new(fm))
369 } else {
370 return Err(grafeo_common::utils::error::Error::Internal(format!(
371 "read-only open requires an existing .grafeo file: {}",
372 db_path.display()
373 )));
374 }
375 } else {
376 return Err(grafeo_common::utils::error::Error::Internal(
377 "read-only mode requires a database path".to_string(),
378 ));
379 }
380 } else if let Some(ref db_path) = config.path {
381 if Self::should_use_single_file(db_path, config.storage_format) {
386 let fm = if db_path.exists() && db_path.is_file() {
387 GrafeoFileManager::open(db_path)?
388 } else if !db_path.exists() {
389 GrafeoFileManager::create(db_path)?
390 } else {
391 return Err(grafeo_common::utils::error::Error::Internal(format!(
393 "path exists but is not a file: {}",
394 db_path.display()
395 )));
396 };
397
398 #[cfg(feature = "lpg")]
400 if fm.read_section_directory()?.is_some() {
401 Self::load_from_sections(
402 &fm,
403 &store,
404 &catalog,
405 #[cfg(feature = "triple-store")]
406 &rdf_store,
407 )?;
408 } else {
409 let snapshot_data = fm.read_snapshot()?;
410 if !snapshot_data.is_empty() {
411 Self::apply_snapshot_data(
412 &store,
413 &catalog,
414 #[cfg(feature = "triple-store")]
415 &rdf_store,
416 &snapshot_data,
417 )?;
418 }
419 }
420
421 #[cfg(all(feature = "wal", feature = "lpg"))]
423 if config.wal_enabled && fm.has_sidecar_wal() {
424 let recovery = WalRecovery::new(fm.sidecar_wal_path());
425 let records = recovery.recover()?;
426 Self::apply_wal_records(
427 &store,
428 &catalog,
429 #[cfg(feature = "triple-store")]
430 &rdf_store,
431 &records,
432 )?;
433 }
434
435 Some(Arc::new(fm))
436 } else {
437 None
438 }
439 } else {
440 None
441 };
442
443 #[cfg(feature = "wal")]
446 let wal = if is_read_only {
447 None
448 } else if config.wal_enabled {
449 if let Some(ref db_path) = config.path {
450 #[cfg(feature = "grafeo-file")]
452 let wal_path = if let Some(ref fm) = file_manager {
453 let p = fm.sidecar_wal_path();
454 std::fs::create_dir_all(&p)?;
455 p
456 } else {
457 std::fs::create_dir_all(db_path)?;
459 db_path.join("wal")
460 };
461
462 #[cfg(not(feature = "grafeo-file"))]
463 let wal_path = {
464 std::fs::create_dir_all(db_path)?;
465 db_path.join("wal")
466 };
467
468 #[cfg(all(feature = "lpg", feature = "grafeo-file"))]
470 let is_single_file = file_manager.is_some();
471 #[cfg(all(feature = "lpg", not(feature = "grafeo-file")))]
472 let is_single_file = false;
473
474 #[cfg(feature = "lpg")]
475 if !is_single_file && wal_path.exists() {
476 let recovery = WalRecovery::new(&wal_path);
477 let records = recovery.recover()?;
478 Self::apply_wal_records(
479 &store,
480 &catalog,
481 #[cfg(feature = "triple-store")]
482 &rdf_store,
483 &records,
484 )?;
485 }
486
487 let wal_durability = match config.wal_durability {
489 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
490 crate::config::DurabilityMode::Batch {
491 max_delay_ms,
492 max_records,
493 } => WalDurabilityMode::Batch {
494 max_delay_ms,
495 max_records,
496 },
497 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
498 WalDurabilityMode::Adaptive { target_interval_ms }
499 }
500 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
501 };
502 let wal_config = WalConfig {
503 durability: wal_durability,
504 ..WalConfig::default()
505 };
506 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
507 Some(Arc::new(wal_manager))
508 } else {
509 None
510 }
511 } else {
512 None
513 };
514
515 let query_cache = Arc::new(QueryCache::default());
517
518 #[cfg(all(feature = "temporal", feature = "lpg"))]
521 transaction_manager.sync_epoch(store.current_epoch());
522
523 #[cfg(feature = "cdc")]
524 let cdc_enabled_val = config.cdc_enabled;
525 #[cfg(feature = "cdc")]
526 let cdc_retention = config.cdc_retention.clone();
527
528 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
531 let checkpoint_interval = config.checkpoint_interval;
532 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
533 let timer_store = Arc::clone(&store);
534 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
535 let timer_catalog = Arc::clone(&catalog);
536 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
537 let timer_tm = Arc::clone(&transaction_manager);
538 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "triple-store"))]
539 let timer_rdf = Arc::clone(&rdf_store);
540 #[cfg(all(feature = "grafeo-file", feature = "lpg", feature = "wal"))]
541 let timer_wal = wal.clone();
542
543 let mut db = Self {
544 config,
545 #[cfg(feature = "lpg")]
546 store: Some(store),
547 catalog,
548 #[cfg(feature = "triple-store")]
549 rdf_store,
550 transaction_manager,
551 buffer_manager,
552 #[cfg(feature = "wal")]
553 wal,
554 #[cfg(feature = "wal")]
555 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
556 query_cache,
557 commit_counter: Arc::new(AtomicUsize::new(0)),
558 is_open: RwLock::new(true),
559 #[cfg(feature = "cdc")]
560 cdc_log: Arc::new(crate::cdc::CdcLog::with_retention(cdc_retention)),
561 #[cfg(feature = "cdc")]
562 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
563 #[cfg(feature = "embed")]
564 embedding_models: RwLock::new(hashbrown::HashMap::new()),
565 #[cfg(feature = "grafeo-file")]
566 file_manager,
567 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
568 checkpoint_timer: parking_lot::Mutex::new(None),
569 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
570 vector_spill_storages: None,
571 external_read_store: None,
572 external_write_store: None,
573 #[cfg(feature = "metrics")]
574 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
575 current_graph: RwLock::new(None),
576 current_schema: RwLock::new(None),
577 read_only: is_read_only,
578 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
579 };
580
581 db.register_section_consumers();
583
584 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
586 if let (Some(interval), Some(fm)) = (checkpoint_interval, &db.file_manager)
587 && !is_read_only
588 {
589 *db.checkpoint_timer.lock() = Some(checkpoint_timer::CheckpointTimer::start(
590 interval,
591 Arc::clone(fm),
592 timer_store,
593 timer_catalog,
594 timer_tm,
595 #[cfg(feature = "triple-store")]
596 timer_rdf,
597 #[cfg(feature = "wal")]
598 timer_wal,
599 ));
600 }
601
602 #[cfg(all(
606 feature = "lpg",
607 feature = "vector-index",
608 feature = "mmap",
609 not(feature = "temporal")
610 ))]
611 db.restore_spill_files();
612
613 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
616 if db
617 .config
618 .section_configs
619 .get(&grafeo_common::storage::SectionType::VectorStore)
620 .is_some_and(|c| c.tier == grafeo_common::storage::TierOverride::ForceDisk)
621 {
622 db.buffer_manager.spill_all();
623 }
624
625 Ok(db)
626 }
627
628 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
657 config
658 .validate()
659 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
660
661 let transaction_manager = Arc::new(TransactionManager::new());
662
663 let buffer_config = BufferManagerConfig {
664 budget: config.memory_limit.unwrap_or_else(|| {
665 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
666 }),
667 spill_path: None,
668 ..BufferManagerConfig::default()
669 };
670 let buffer_manager = BufferManager::new(buffer_config);
671
672 let query_cache = Arc::new(QueryCache::default());
673
674 #[cfg(feature = "cdc")]
675 let cdc_enabled_val = config.cdc_enabled;
676
677 Ok(Self {
678 config,
679 #[cfg(feature = "lpg")]
680 store: None,
681 catalog: Arc::new(Catalog::new()),
682 #[cfg(feature = "triple-store")]
683 rdf_store: Arc::new(RdfStore::new()),
684 transaction_manager,
685 buffer_manager,
686 #[cfg(feature = "wal")]
687 wal: None,
688 #[cfg(feature = "wal")]
689 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
690 query_cache,
691 commit_counter: Arc::new(AtomicUsize::new(0)),
692 is_open: RwLock::new(true),
693 #[cfg(feature = "cdc")]
694 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
695 #[cfg(feature = "cdc")]
696 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
697 #[cfg(feature = "embed")]
698 embedding_models: RwLock::new(hashbrown::HashMap::new()),
699 #[cfg(feature = "grafeo-file")]
700 file_manager: None,
701 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
702 checkpoint_timer: parking_lot::Mutex::new(None),
703 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
704 vector_spill_storages: None,
705 external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
706 external_write_store: Some(store),
707 #[cfg(feature = "metrics")]
708 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
709 current_graph: RwLock::new(None),
710 current_schema: RwLock::new(None),
711 read_only: false,
712 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
713 })
714 }
715
716 pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
741 config
742 .validate()
743 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
744
745 let transaction_manager = Arc::new(TransactionManager::new());
746
747 let buffer_config = BufferManagerConfig {
748 budget: config.memory_limit.unwrap_or_else(|| {
749 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
750 }),
751 spill_path: None,
752 ..BufferManagerConfig::default()
753 };
754 let buffer_manager = BufferManager::new(buffer_config);
755
756 let query_cache = Arc::new(QueryCache::default());
757
758 #[cfg(feature = "cdc")]
759 let cdc_enabled_val = config.cdc_enabled;
760
761 Ok(Self {
762 config,
763 #[cfg(feature = "lpg")]
764 store: None,
765 catalog: Arc::new(Catalog::new()),
766 #[cfg(feature = "triple-store")]
767 rdf_store: Arc::new(RdfStore::new()),
768 transaction_manager,
769 buffer_manager,
770 #[cfg(feature = "wal")]
771 wal: None,
772 #[cfg(feature = "wal")]
773 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
774 query_cache,
775 commit_counter: Arc::new(AtomicUsize::new(0)),
776 is_open: RwLock::new(true),
777 #[cfg(feature = "cdc")]
778 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
779 #[cfg(feature = "cdc")]
780 cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
781 #[cfg(feature = "embed")]
782 embedding_models: RwLock::new(hashbrown::HashMap::new()),
783 #[cfg(feature = "grafeo-file")]
784 file_manager: None,
785 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
786 checkpoint_timer: parking_lot::Mutex::new(None),
787 #[cfg(all(feature = "vector-index", feature = "mmap", not(feature = "temporal")))]
788 vector_spill_storages: None,
789 external_read_store: Some(store),
790 external_write_store: None,
791 #[cfg(feature = "metrics")]
792 metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
793 current_graph: RwLock::new(None),
794 current_schema: RwLock::new(None),
795 read_only: true,
796 projections: Arc::new(RwLock::new(std::collections::HashMap::new())),
797 })
798 }
799
800 #[cfg(feature = "compact-store")]
818 pub fn compact(&mut self) -> Result<()> {
819 use grafeo_core::graph::compact::from_graph_store;
820
821 let current_store = self.graph_store();
822 let compact = from_graph_store(current_store.as_ref())
823 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
824
825 self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
826 self.external_write_store = None;
827 #[cfg(feature = "lpg")]
828 {
829 self.store = None;
830 }
831 self.read_only = true;
832 self.query_cache = Arc::new(QueryCache::default());
833 self.projections.write().clear();
836
837 Ok(())
838 }
839
840 #[cfg(all(feature = "wal", feature = "lpg"))]
846 fn apply_wal_records(
847 store: &Arc<LpgStore>,
848 catalog: &Catalog,
849 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
850 records: &[WalRecord],
851 ) -> Result<()> {
852 use crate::catalog::{
853 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
854 };
855 use grafeo_common::utils::error::Error;
856
857 let mut current_graph: Option<String> = None;
860 let mut target_store: Arc<LpgStore> = Arc::clone(store);
861
862 for record in records {
863 match record {
864 WalRecord::CreateNamedGraph { name } => {
866 let _ = store.create_graph(name);
867 }
868 WalRecord::DropNamedGraph { name } => {
869 store.drop_graph(name);
870 if current_graph.as_deref() == Some(name.as_str()) {
872 current_graph = None;
873 target_store = Arc::clone(store);
874 }
875 }
876 WalRecord::SwitchGraph { name } => {
877 current_graph.clone_from(name);
878 target_store = match ¤t_graph {
879 None => Arc::clone(store),
880 Some(graph_name) => store
881 .graph_or_create(graph_name)
882 .map_err(|e| Error::Internal(e.to_string()))?,
883 };
884 }
885
886 WalRecord::CreateNode { id, labels } => {
888 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
889 target_store.create_node_with_id(*id, &label_refs)?;
890 }
891 WalRecord::DeleteNode { id } => {
892 target_store.delete_node(*id);
893 }
894 WalRecord::CreateEdge {
895 id,
896 src,
897 dst,
898 edge_type,
899 } => {
900 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
901 }
902 WalRecord::DeleteEdge { id } => {
903 target_store.delete_edge(*id);
904 }
905 WalRecord::SetNodeProperty { id, key, value } => {
906 target_store.set_node_property(*id, key, value.clone());
907 }
908 WalRecord::SetEdgeProperty { id, key, value } => {
909 target_store.set_edge_property(*id, key, value.clone());
910 }
911 WalRecord::AddNodeLabel { id, label } => {
912 target_store.add_label(*id, label);
913 }
914 WalRecord::RemoveNodeLabel { id, label } => {
915 target_store.remove_label(*id, label);
916 }
917 WalRecord::RemoveNodeProperty { id, key } => {
918 target_store.remove_node_property(*id, key);
919 }
920 WalRecord::RemoveEdgeProperty { id, key } => {
921 target_store.remove_edge_property(*id, key);
922 }
923
924 WalRecord::CreateNodeType {
926 name,
927 properties,
928 constraints,
929 } => {
930 let def = NodeTypeDefinition {
931 name: name.clone(),
932 properties: properties
933 .iter()
934 .map(|(n, t, nullable)| TypedProperty {
935 name: n.clone(),
936 data_type: PropertyDataType::from_type_name(t),
937 nullable: *nullable,
938 default_value: None,
939 })
940 .collect(),
941 constraints: constraints
942 .iter()
943 .map(|(kind, props)| match kind.as_str() {
944 "unique" => TypeConstraint::Unique(props.clone()),
945 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
946 "not_null" if !props.is_empty() => {
947 TypeConstraint::NotNull(props[0].clone())
948 }
949 _ => TypeConstraint::Unique(props.clone()),
950 })
951 .collect(),
952 parent_types: Vec::new(),
953 };
954 let _ = catalog.register_node_type(def);
955 }
956 WalRecord::DropNodeType { name } => {
957 let _ = catalog.drop_node_type(name);
958 }
959 WalRecord::CreateEdgeType {
960 name,
961 properties,
962 constraints,
963 } => {
964 let def = EdgeTypeDefinition {
965 name: name.clone(),
966 properties: properties
967 .iter()
968 .map(|(n, t, nullable)| TypedProperty {
969 name: n.clone(),
970 data_type: PropertyDataType::from_type_name(t),
971 nullable: *nullable,
972 default_value: None,
973 })
974 .collect(),
975 constraints: constraints
976 .iter()
977 .map(|(kind, props)| match kind.as_str() {
978 "unique" => TypeConstraint::Unique(props.clone()),
979 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
980 "not_null" if !props.is_empty() => {
981 TypeConstraint::NotNull(props[0].clone())
982 }
983 _ => TypeConstraint::Unique(props.clone()),
984 })
985 .collect(),
986 source_node_types: Vec::new(),
987 target_node_types: Vec::new(),
988 };
989 let _ = catalog.register_edge_type_def(def);
990 }
991 WalRecord::DropEdgeType { name } => {
992 let _ = catalog.drop_edge_type_def(name);
993 }
994 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
995 }
998 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
999 }
1002 WalRecord::CreateGraphType {
1003 name,
1004 node_types,
1005 edge_types,
1006 open,
1007 } => {
1008 use crate::catalog::GraphTypeDefinition;
1009 let def = GraphTypeDefinition {
1010 name: name.clone(),
1011 allowed_node_types: node_types.clone(),
1012 allowed_edge_types: edge_types.clone(),
1013 open: *open,
1014 };
1015 let _ = catalog.register_graph_type(def);
1016 }
1017 WalRecord::DropGraphType { name } => {
1018 let _ = catalog.drop_graph_type(name);
1019 }
1020 WalRecord::CreateSchema { name } => {
1021 let _ = catalog.register_schema_namespace(name.clone());
1022 }
1023 WalRecord::DropSchema { name } => {
1024 let _ = catalog.drop_schema_namespace(name);
1025 }
1026
1027 WalRecord::AlterNodeType { name, alterations } => {
1028 for (action, prop_name, type_name, nullable) in alterations {
1029 match action.as_str() {
1030 "add" => {
1031 let prop = TypedProperty {
1032 name: prop_name.clone(),
1033 data_type: PropertyDataType::from_type_name(type_name),
1034 nullable: *nullable,
1035 default_value: None,
1036 };
1037 let _ = catalog.alter_node_type_add_property(name, prop);
1038 }
1039 "drop" => {
1040 let _ = catalog.alter_node_type_drop_property(name, prop_name);
1041 }
1042 _ => {}
1043 }
1044 }
1045 }
1046 WalRecord::AlterEdgeType { name, alterations } => {
1047 for (action, prop_name, type_name, nullable) in alterations {
1048 match action.as_str() {
1049 "add" => {
1050 let prop = TypedProperty {
1051 name: prop_name.clone(),
1052 data_type: PropertyDataType::from_type_name(type_name),
1053 nullable: *nullable,
1054 default_value: None,
1055 };
1056 let _ = catalog.alter_edge_type_add_property(name, prop);
1057 }
1058 "drop" => {
1059 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
1060 }
1061 _ => {}
1062 }
1063 }
1064 }
1065 WalRecord::AlterGraphType { name, alterations } => {
1066 for (action, type_name) in alterations {
1067 match action.as_str() {
1068 "add_node" => {
1069 let _ =
1070 catalog.alter_graph_type_add_node_type(name, type_name.clone());
1071 }
1072 "drop_node" => {
1073 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
1074 }
1075 "add_edge" => {
1076 let _ =
1077 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
1078 }
1079 "drop_edge" => {
1080 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
1081 }
1082 _ => {}
1083 }
1084 }
1085 }
1086
1087 WalRecord::CreateProcedure {
1088 name,
1089 params,
1090 returns,
1091 body,
1092 } => {
1093 use crate::catalog::ProcedureDefinition;
1094 let def = ProcedureDefinition {
1095 name: name.clone(),
1096 params: params.clone(),
1097 returns: returns.clone(),
1098 body: body.clone(),
1099 };
1100 let _ = catalog.register_procedure(def);
1101 }
1102 WalRecord::DropProcedure { name } => {
1103 let _ = catalog.drop_procedure(name);
1104 }
1105
1106 #[cfg(feature = "triple-store")]
1108 WalRecord::InsertRdfTriple { .. }
1109 | WalRecord::DeleteRdfTriple { .. }
1110 | WalRecord::ClearRdfGraph { .. }
1111 | WalRecord::CreateRdfGraph { .. }
1112 | WalRecord::DropRdfGraph { .. } => {
1113 rdf_ops::replay_rdf_wal_record(rdf_store, record);
1114 }
1115 #[cfg(not(feature = "triple-store"))]
1116 WalRecord::InsertRdfTriple { .. }
1117 | WalRecord::DeleteRdfTriple { .. }
1118 | WalRecord::ClearRdfGraph { .. }
1119 | WalRecord::CreateRdfGraph { .. }
1120 | WalRecord::DropRdfGraph { .. } => {}
1121
1122 WalRecord::TransactionCommit { .. } => {
1123 #[cfg(feature = "temporal")]
1127 {
1128 target_store.new_epoch();
1129 }
1130 }
1131 WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
1132 }
1135 WalRecord::EpochAdvance { .. } => {
1136 }
1139 }
1140 }
1141 Ok(())
1142 }
1143
1144 #[cfg(feature = "grafeo-file")]
1150 fn should_use_single_file(
1151 path: &std::path::Path,
1152 configured: crate::config::StorageFormat,
1153 ) -> bool {
1154 use crate::config::StorageFormat;
1155 match configured {
1156 StorageFormat::SingleFile => true,
1157 StorageFormat::WalDirectory => false,
1158 StorageFormat::Auto => {
1159 if path.is_file() {
1161 if let Ok(mut f) = std::fs::File::open(path) {
1162 use std::io::Read;
1163 let mut magic = [0u8; 4];
1164 if f.read_exact(&mut magic).is_ok() && magic == grafeo_storage::file::MAGIC
1165 {
1166 return true;
1167 }
1168 }
1169 return false;
1170 }
1171 if path.is_dir() {
1173 return false;
1174 }
1175 path.extension().is_some_and(|ext| ext == "grafeo")
1177 }
1178 }
1179 }
1180
1181 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1185 fn apply_snapshot_data(
1186 store: &Arc<LpgStore>,
1187 catalog: &Arc<crate::catalog::Catalog>,
1188 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1189 data: &[u8],
1190 ) -> Result<()> {
1191 persistence::load_snapshot_into_store(
1193 store,
1194 catalog,
1195 #[cfg(feature = "triple-store")]
1196 rdf_store,
1197 data,
1198 )
1199 }
1200
1201 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1205 fn load_from_sections(
1206 fm: &GrafeoFileManager,
1207 store: &Arc<LpgStore>,
1208 catalog: &Arc<crate::catalog::Catalog>,
1209 #[cfg(feature = "triple-store")] rdf_store: &Arc<RdfStore>,
1210 ) -> Result<()> {
1211 use grafeo_common::storage::{Section, SectionType};
1212
1213 let dir = fm.read_section_directory()?.ok_or_else(|| {
1214 grafeo_common::utils::error::Error::Internal(
1215 "expected v2 section directory but found none".to_string(),
1216 )
1217 })?;
1218
1219 if let Some(entry) = dir.find(SectionType::Catalog) {
1221 let data = fm.read_section_data(entry)?;
1222 let tm = Arc::new(crate::transaction::TransactionManager::new());
1223 let mut section = catalog_section::CatalogSection::new(
1224 Arc::clone(catalog),
1225 Arc::clone(store),
1226 move || tm.current_epoch().as_u64(),
1227 );
1228 section.deserialize(&data)?;
1229 }
1230
1231 if let Some(entry) = dir.find(SectionType::LpgStore) {
1233 let data = fm.read_section_data(entry)?;
1234 let mut section = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1235 section.deserialize(&data)?;
1236 }
1237
1238 #[cfg(feature = "triple-store")]
1240 if let Some(entry) = dir.find(SectionType::RdfStore) {
1241 let data = fm.read_section_data(entry)?;
1242 let mut section = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(rdf_store));
1243 section.deserialize(&data)?;
1244 }
1245
1246 #[cfg(feature = "vector-index")]
1248 if let Some(entry) = dir.find(SectionType::VectorStore) {
1249 let data = fm.read_section_data(entry)?;
1250 let indexes = store.vector_index_entries();
1251 if !indexes.is_empty() {
1252 let mut section = grafeo_core::index::vector::VectorStoreSection::new(indexes);
1253 section.deserialize(&data)?;
1254 }
1255 }
1256
1257 #[cfg(feature = "text-index")]
1259 if let Some(entry) = dir.find(SectionType::TextIndex) {
1260 let data = fm.read_section_data(entry)?;
1261 let indexes = store.text_index_entries();
1262 if !indexes.is_empty() {
1263 let mut section = grafeo_core::index::text::TextIndexSection::new(indexes);
1264 section.deserialize(&data)?;
1265 }
1266 }
1267
1268 Ok(())
1269 }
1270
1271 #[must_use]
1299 pub fn session(&self) -> Session {
1300 self.create_session_inner(None)
1301 }
1302
1303 #[must_use]
1321 pub fn session_with_identity(&self, identity: crate::auth::Identity) -> Session {
1322 let force_read_only = !identity.can_write();
1323 self.create_session_inner_full(None, force_read_only, identity)
1324 }
1325
1326 #[must_use]
1340 pub fn session_with_role(&self, role: crate::auth::Role) -> Session {
1341 self.session_with_identity(crate::auth::Identity::new("anonymous", [role]))
1342 }
1343
1344 #[cfg(feature = "cdc")]
1363 #[must_use]
1364 pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1365 self.create_session_inner(Some(cdc_enabled))
1366 }
1367
1368 #[deprecated(
1377 since = "0.5.36",
1378 note = "use session_with_role(Role::ReadOnly) instead"
1379 )]
1380 #[must_use]
1381 pub fn session_read_only(&self) -> Session {
1382 self.session_with_role(crate::auth::Role::ReadOnly)
1383 }
1384
1385 #[allow(unused_variables)] fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1391 self.create_session_inner_full(cdc_override, false, crate::auth::Identity::anonymous())
1392 }
1393
1394 #[allow(unused_variables)]
1396 fn create_session_inner_full(
1397 &self,
1398 cdc_override: Option<bool>,
1399 force_read_only: bool,
1400 identity: crate::auth::Identity,
1401 ) -> Session {
1402 let session_cfg = || crate::session::SessionConfig {
1403 transaction_manager: Arc::clone(&self.transaction_manager),
1404 query_cache: Arc::clone(&self.query_cache),
1405 catalog: Arc::clone(&self.catalog),
1406 adaptive_config: self.config.adaptive.clone(),
1407 factorized_execution: self.config.factorized_execution,
1408 graph_model: self.config.graph_model,
1409 query_timeout: self.config.query_timeout,
1410 commit_counter: Arc::clone(&self.commit_counter),
1411 gc_interval: self.config.gc_interval,
1412 read_only: self.read_only || force_read_only,
1413 identity: identity.clone(),
1414 #[cfg(feature = "lpg")]
1415 projections: Arc::clone(&self.projections),
1416 };
1417
1418 if let Some(ref ext_read) = self.external_read_store {
1419 return Session::with_external_store(
1420 Arc::clone(ext_read),
1421 self.external_write_store.as_ref().map(Arc::clone),
1422 session_cfg(),
1423 )
1424 .expect("arena allocation for external store session");
1425 }
1426
1427 #[cfg(all(feature = "lpg", feature = "triple-store"))]
1428 let mut session = Session::with_rdf_store_and_adaptive(
1429 Arc::clone(self.lpg_store()),
1430 Arc::clone(&self.rdf_store),
1431 session_cfg(),
1432 );
1433 #[cfg(all(feature = "lpg", not(feature = "triple-store")))]
1434 let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1435 #[cfg(not(feature = "lpg"))]
1436 let mut session =
1437 Session::with_external_store(self.graph_store(), self.graph_store_mut(), session_cfg())
1438 .expect("session creation for non-lpg build");
1439
1440 #[cfg(all(feature = "wal", feature = "lpg"))]
1441 if let Some(ref wal) = self.wal {
1442 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1443 }
1444
1445 #[cfg(feature = "cdc")]
1446 {
1447 let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1448 if should_enable {
1449 session.set_cdc_log(Arc::clone(&self.cdc_log));
1450 }
1451 }
1452
1453 #[cfg(feature = "metrics")]
1454 {
1455 if let Some(ref m) = self.metrics {
1456 session.set_metrics(Arc::clone(m));
1457 m.session_created
1458 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1459 m.session_active
1460 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1461 }
1462 }
1463
1464 if let Some(ref graph) = *self.current_graph.read() {
1466 session.use_graph(graph);
1467 }
1468
1469 if let Some(ref schema) = *self.current_schema.read() {
1471 session.set_schema(schema);
1472 }
1473
1474 let _ = &mut session;
1476
1477 session
1478 }
1479
1480 #[must_use]
1486 pub fn current_graph(&self) -> Option<String> {
1487 self.current_graph.read().clone()
1488 }
1489
1490 pub fn set_current_graph(&self, name: Option<&str>) -> Result<()> {
1499 #[cfg(feature = "lpg")]
1500 if let Some(name) = name
1501 && !name.eq_ignore_ascii_case("default")
1502 && let Some(store) = &self.store
1503 && store.graph(name).is_none()
1504 {
1505 return Err(Error::Query(QueryError::new(
1506 QueryErrorKind::Semantic,
1507 format!("Graph '{name}' does not exist"),
1508 )));
1509 }
1510 *self.current_graph.write() = name.map(ToString::to_string);
1511 Ok(())
1512 }
1513
1514 #[must_use]
1519 pub fn current_schema(&self) -> Option<String> {
1520 self.current_schema.read().clone()
1521 }
1522
1523 pub fn set_current_schema(&self, name: Option<&str>) -> Result<()> {
1532 if let Some(name) = name
1533 && !self.catalog.schema_exists(name)
1534 {
1535 return Err(Error::Query(QueryError::new(
1536 QueryErrorKind::Semantic,
1537 format!("Schema '{name}' does not exist"),
1538 )));
1539 }
1540 *self.current_schema.write() = name.map(ToString::to_string);
1541 Ok(())
1542 }
1543
1544 #[must_use]
1546 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1547 &self.config.adaptive
1548 }
1549
1550 #[must_use]
1552 pub fn is_read_only(&self) -> bool {
1553 self.read_only
1554 }
1555
1556 #[must_use]
1558 pub fn config(&self) -> &Config {
1559 &self.config
1560 }
1561
1562 #[must_use]
1564 pub fn graph_model(&self) -> crate::config::GraphModel {
1565 self.config.graph_model
1566 }
1567
1568 #[must_use]
1570 pub fn memory_limit(&self) -> Option<usize> {
1571 self.config.memory_limit
1572 }
1573
1574 #[cfg(feature = "metrics")]
1579 #[must_use]
1580 pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1581 let mut snapshot = self
1582 .metrics
1583 .as_ref()
1584 .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1585
1586 let cache_stats = self.query_cache.stats();
1588 snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1589 snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1590 snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1591 snapshot.cache_invalidations = cache_stats.invalidations;
1592
1593 snapshot
1594 }
1595
1596 #[cfg(feature = "metrics")]
1600 #[must_use]
1601 pub fn metrics_prometheus(&self) -> String {
1602 self.metrics
1603 .as_ref()
1604 .map_or_else(String::new, |m| m.to_prometheus())
1605 }
1606
1607 #[cfg(feature = "metrics")]
1609 pub fn reset_metrics(&self) {
1610 if let Some(ref m) = self.metrics {
1611 m.reset();
1612 }
1613 self.query_cache.reset_stats();
1614 }
1615
1616 #[cfg(feature = "lpg")]
1624 #[must_use]
1625 pub fn store(&self) -> &Arc<LpgStore> {
1626 self.lpg_store()
1627 }
1628
1629 #[cfg(feature = "lpg")]
1637 pub fn create_graph(&self, name: &str) -> Result<bool> {
1638 Ok(self.lpg_store().create_graph(name)?)
1639 }
1640
1641 #[cfg(feature = "lpg")]
1646 pub fn drop_graph(&self, name: &str) -> bool {
1647 let Some(store) = &self.store else {
1648 return false;
1649 };
1650 let dropped = store.drop_graph(name);
1651 if dropped {
1652 let mut current = self.current_graph.write();
1653 if current
1654 .as_deref()
1655 .is_some_and(|g| g.eq_ignore_ascii_case(name))
1656 {
1657 *current = None;
1658 }
1659 }
1660 dropped
1661 }
1662
1663 #[cfg(feature = "lpg")]
1665 #[must_use]
1666 pub fn list_graphs(&self) -> Vec<String> {
1667 self.lpg_store().graph_names()
1668 }
1669
1670 pub fn create_projection(
1691 &self,
1692 name: impl Into<String>,
1693 spec: grafeo_core::graph::ProjectionSpec,
1694 ) -> bool {
1695 use grafeo_core::graph::GraphProjection;
1696 use std::collections::hash_map::Entry;
1697
1698 let store = self.graph_store();
1699 let projection = Arc::new(GraphProjection::new(store, spec));
1700 let mut projections = self.projections.write();
1701 match projections.entry(name.into()) {
1702 Entry::Occupied(_) => false,
1703 Entry::Vacant(e) => {
1704 e.insert(projection);
1705 true
1706 }
1707 }
1708 }
1709
1710 pub fn drop_projection(&self, name: &str) -> bool {
1712 self.projections.write().remove(name).is_some()
1713 }
1714
1715 #[must_use]
1717 pub fn list_projections(&self) -> Vec<String> {
1718 self.projections.read().keys().cloned().collect()
1719 }
1720
1721 #[must_use]
1723 pub fn projection(&self, name: &str) -> Option<Arc<dyn GraphStore>> {
1724 self.projections
1725 .read()
1726 .get(name)
1727 .map(|p| Arc::clone(p) as Arc<dyn GraphStore>)
1728 }
1729
1730 #[must_use]
1739 pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1740 if let Some(ref ext_read) = self.external_read_store {
1741 Arc::clone(ext_read)
1742 } else {
1743 #[cfg(feature = "lpg")]
1744 {
1745 Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1746 }
1747 #[cfg(not(feature = "lpg"))]
1748 unreachable!("no graph store available: enable the `lpg` feature or use with_store()")
1749 }
1750 }
1751
1752 #[must_use]
1757 pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1758 if self.external_read_store.is_some() {
1759 self.external_write_store.as_ref().map(Arc::clone)
1760 } else {
1761 #[cfg(feature = "lpg")]
1762 {
1763 Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1764 }
1765 #[cfg(not(feature = "lpg"))]
1766 {
1767 None
1768 }
1769 }
1770 }
1771
1772 pub fn gc(&self) {
1779 #[cfg(feature = "lpg")]
1780 let current_epoch = {
1781 let min_epoch = self.transaction_manager.min_active_epoch();
1782 self.lpg_store().gc_versions(min_epoch);
1783 self.transaction_manager.current_epoch()
1784 };
1785 self.transaction_manager.gc();
1786
1787 #[cfg(feature = "cdc")]
1789 if self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed) {
1790 #[cfg(feature = "lpg")]
1791 self.cdc_log.apply_retention(current_epoch);
1792 }
1793 }
1794
1795 #[must_use]
1797 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1798 &self.buffer_manager
1799 }
1800
1801 #[must_use]
1803 pub fn query_cache(&self) -> &Arc<QueryCache> {
1804 &self.query_cache
1805 }
1806
1807 pub fn clear_plan_cache(&self) {
1813 self.query_cache.clear();
1814 }
1815
1816 pub fn close(&self) -> Result<()> {
1830 let mut is_open = self.is_open.write();
1831 if !*is_open {
1832 return Ok(());
1833 }
1834
1835 #[cfg(all(feature = "grafeo-file", feature = "lpg"))]
1840 if let Some(mut timer) = self.checkpoint_timer.lock().take() {
1841 timer.stop();
1842 }
1843
1844 if self.read_only {
1846 #[cfg(feature = "grafeo-file")]
1847 if let Some(ref fm) = self.file_manager {
1848 fm.close()?;
1849 }
1850 *is_open = false;
1851 return Ok(());
1852 }
1853
1854 #[cfg(feature = "grafeo-file")]
1858 let is_single_file = self.file_manager.is_some();
1859 #[cfg(not(feature = "grafeo-file"))]
1860 let is_single_file = false;
1861
1862 #[cfg(feature = "grafeo-file")]
1863 if let Some(ref fm) = self.file_manager {
1864 #[cfg(feature = "wal")]
1866 if let Some(ref wal) = self.wal {
1867 wal.sync()?;
1868 }
1869 let flush_result = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
1870
1871 #[cfg(feature = "wal")]
1877 let flush_result = if flush_result.sections_written == 0 {
1878 if let Some(ref wal) = self.wal {
1879 if wal.record_count() > 0 {
1880 grafeo_warn!(
1881 "WAL has {} records but checkpoint wrote 0 sections; retrying with forced flush",
1882 wal.record_count()
1883 );
1884 self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?
1885 } else {
1886 flush_result
1887 }
1888 } else {
1889 flush_result
1890 }
1891 } else {
1892 flush_result
1893 };
1894
1895 #[cfg(feature = "wal")]
1898 if let Some(ref wal) = self.wal {
1899 wal.close_active_log();
1900 }
1901
1902 #[cfg(feature = "wal")]
1906 let has_wal_records = self.wal.as_ref().is_some_and(|wal| wal.record_count() > 0);
1907 #[cfg(not(feature = "wal"))]
1908 let has_wal_records = false;
1909
1910 if flush_result.sections_written > 0 || !has_wal_records {
1911 {
1912 use grafeo_common::testing::crash::maybe_crash;
1913 maybe_crash("close:before_remove_sidecar_wal");
1914 }
1915 fm.remove_sidecar_wal()?;
1916 } else {
1917 grafeo_warn!(
1918 "keeping sidecar WAL for recovery: checkpoint wrote 0 sections but WAL has records"
1919 );
1920 }
1921 fm.close()?;
1922 }
1923
1924 #[cfg(feature = "wal")]
1930 if !is_single_file && let Some(ref wal) = self.wal {
1931 let commit_tx = self
1933 .transaction_manager
1934 .last_assigned_transaction_id()
1935 .unwrap_or_else(|| self.transaction_manager.begin());
1936
1937 wal.log(&WalRecord::TransactionCommit {
1939 transaction_id: commit_tx,
1940 })?;
1941
1942 wal.sync()?;
1943 }
1944
1945 *is_open = false;
1946 Ok(())
1947 }
1948
1949 #[cfg(feature = "wal")]
1951 #[must_use]
1952 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1953 self.wal.as_ref()
1954 }
1955
1956 #[cfg(feature = "wal")]
1958 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1959 if let Some(ref wal) = self.wal {
1960 wal.log(record)?;
1961 }
1962 Ok(())
1963 }
1964
1965 fn register_section_consumers(&mut self) {
1970 #[cfg(feature = "lpg")]
1971 let store_ref = self.store.as_ref();
1972 #[cfg(not(feature = "lpg"))]
1973 #[cfg(feature = "lpg")]
1975 if let Some(store) = store_ref {
1976 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
1977 self.buffer_manager.register_consumer(Arc::new(
1978 section_consumer::SectionConsumer::new(Arc::new(lpg)),
1979 ));
1980 }
1981
1982 #[cfg(feature = "triple-store")]
1984 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
1985 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
1986 self.buffer_manager.register_consumer(Arc::new(
1987 section_consumer::SectionConsumer::new(Arc::new(rdf)),
1988 ));
1989 }
1990
1991 #[cfg(all(
1994 feature = "lpg",
1995 feature = "vector-index",
1996 feature = "mmap",
1997 not(feature = "temporal")
1998 ))]
1999 if let Some(store) = store_ref {
2000 let spill_path = self.buffer_manager.config().spill_path.clone();
2001 let consumer = Arc::new(section_consumer::VectorIndexConsumer::new(
2002 store, spill_path,
2003 ));
2004 self.vector_spill_storages = Some(Arc::clone(consumer.spilled_storages()));
2006 self.buffer_manager.register_consumer(consumer);
2007 }
2008
2009 #[cfg(all(feature = "lpg", feature = "text-index"))]
2011 if let Some(store) = store_ref {
2012 self.buffer_manager
2013 .register_consumer(Arc::new(section_consumer::TextIndexConsumer::new(store)));
2014 }
2015
2016 #[cfg(feature = "cdc")]
2019 self.buffer_manager.register_consumer(
2020 Arc::clone(&self.cdc_log) as Arc<dyn grafeo_common::memory::MemoryConsumer>
2021 );
2022 }
2023
2024 #[cfg(all(
2031 feature = "lpg",
2032 feature = "vector-index",
2033 feature = "mmap",
2034 not(feature = "temporal")
2035 ))]
2036 fn restore_spill_files(&mut self) {
2037 use grafeo_core::index::vector::MmapStorage;
2038
2039 let spill_dir = match self.buffer_manager.config().spill_path {
2040 Some(ref path) => path.clone(),
2041 None => return,
2042 };
2043
2044 if !spill_dir.exists() {
2045 return;
2046 }
2047
2048 let spill_map = match self.vector_spill_storages {
2049 Some(ref map) => Arc::clone(map),
2050 None => return,
2051 };
2052
2053 let Ok(entries) = std::fs::read_dir(&spill_dir) else {
2054 return;
2055 };
2056
2057 let Some(ref store) = self.store else {
2058 return;
2059 };
2060
2061 for entry in entries.flatten() {
2062 let path = entry.path();
2063 let file_name = match path.file_name().and_then(|n| n.to_str()) {
2064 Some(name) => name.to_string(),
2065 None => continue,
2066 };
2067
2068 if !file_name.starts_with("vectors_")
2070 || !std::path::Path::new(&file_name)
2071 .extension()
2072 .is_some_and(|ext| ext.eq_ignore_ascii_case("bin"))
2073 {
2074 continue;
2075 }
2076
2077 let key_part = &file_name["vectors_".len()..file_name.len() - ".bin".len()];
2079
2080 let key = key_part.replace("%3A", ":").replace("%25", "%");
2082
2083 if !key.contains(':') {
2085 continue;
2087 }
2088
2089 if store.get_vector_index_by_key(&key).is_none() {
2091 let _ = std::fs::remove_file(&path);
2093 continue;
2094 }
2095
2096 match MmapStorage::open(&path) {
2098 Ok(mmap_storage) => {
2099 let property = key.split(':').nth(1).unwrap_or("");
2101 let prop_key = grafeo_common::types::PropertyKey::new(property);
2102 store.node_properties_mark_spilled(&prop_key);
2103
2104 spill_map.write().insert(key, Arc::new(mmap_storage));
2105 }
2106 Err(e) => {
2107 eprintln!("failed to restore spill file {}: {e}", path.display());
2108 let _ = std::fs::remove_file(&path);
2110 }
2111 }
2112 }
2113 }
2114
2115 #[cfg(feature = "grafeo-file")]
2117 fn build_sections(&self) -> Vec<Box<dyn grafeo_common::storage::Section>> {
2118 let mut sections: Vec<Box<dyn grafeo_common::storage::Section>> = Vec::new();
2119
2120 #[cfg(feature = "lpg")]
2122 if let Some(store) = self.store.as_ref() {
2123 let lpg = grafeo_core::graph::lpg::LpgStoreSection::new(Arc::clone(store));
2124
2125 let catalog = catalog_section::CatalogSection::new(
2126 Arc::clone(&self.catalog),
2127 Arc::clone(store),
2128 {
2129 let tm = Arc::clone(&self.transaction_manager);
2130 move || tm.current_epoch().as_u64()
2131 },
2132 );
2133
2134 sections.push(Box::new(catalog));
2135 sections.push(Box::new(lpg));
2136
2137 #[cfg(feature = "vector-index")]
2139 {
2140 let indexes = store.vector_index_entries();
2141 if !indexes.is_empty() {
2142 let vector = grafeo_core::index::vector::VectorStoreSection::new(indexes);
2143 sections.push(Box::new(vector));
2144 }
2145 }
2146
2147 #[cfg(feature = "text-index")]
2149 {
2150 let indexes = store.text_index_entries();
2151 if !indexes.is_empty() {
2152 let text = grafeo_core::index::text::TextIndexSection::new(indexes);
2153 sections.push(Box::new(text));
2154 }
2155 }
2156 }
2157
2158 #[cfg(feature = "triple-store")]
2159 if !self.rdf_store.is_empty() || self.rdf_store.graph_count() > 0 {
2160 let rdf = grafeo_core::graph::rdf::RdfStoreSection::new(Arc::clone(&self.rdf_store));
2161 sections.push(Box::new(rdf));
2162 }
2163
2164 sections
2165 }
2166
2167 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2181 pub fn backup_full(&self, backup_dir: &std::path::Path) -> Result<backup::BackupSegment> {
2182 let fm = self
2183 .file_manager
2184 .as_ref()
2185 .ok_or_else(|| Error::Internal("backup requires a persistent database".to_string()))?;
2186
2187 let _ = self.checkpoint_to_file(fm, flush::FlushReason::Explicit)?;
2189
2190 let current_epoch = self.transaction_manager.current_epoch();
2191 backup::do_backup_full(backup_dir, fm.path(), self.wal.as_deref(), current_epoch)
2192 }
2193
2194 #[cfg(all(feature = "wal", feature = "grafeo-file", feature = "lpg"))]
2202 pub fn backup_incremental(
2203 &self,
2204 backup_dir: &std::path::Path,
2205 ) -> Result<backup::BackupSegment> {
2206 let wal = self
2207 .wal
2208 .as_ref()
2209 .ok_or_else(|| Error::Internal("incremental backup requires WAL".to_string()))?;
2210
2211 let current_epoch = self.transaction_manager.current_epoch();
2212 backup::do_backup_incremental(backup_dir, wal, current_epoch)
2213 }
2214
2215 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2221 pub fn read_backup_manifest(
2222 backup_dir: &std::path::Path,
2223 ) -> Result<Option<backup::BackupManifest>> {
2224 backup::read_manifest(backup_dir)
2225 }
2226
2227 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2229 #[must_use]
2230 pub fn backup_cursor(&self) -> Option<backup::BackupCursor> {
2231 self.wal
2232 .as_ref()
2233 .and_then(|wal| backup::read_backup_cursor(wal.dir()).ok().flatten())
2234 }
2235
2236 #[cfg(all(feature = "wal", feature = "grafeo-file"))]
2247 pub fn restore_to_epoch(
2248 backup_dir: &std::path::Path,
2249 target_epoch: grafeo_common::types::EpochId,
2250 output_path: &std::path::Path,
2251 ) -> Result<()> {
2252 backup::do_restore_to_epoch(backup_dir, target_epoch, output_path)
2253 }
2254
2255 #[cfg(feature = "grafeo-file")]
2261 fn checkpoint_to_file(
2262 &self,
2263 fm: &GrafeoFileManager,
2264 reason: flush::FlushReason,
2265 ) -> Result<flush::FlushResult> {
2266 let sections = self.build_sections();
2267 let section_refs: Vec<&dyn grafeo_common::storage::Section> =
2268 sections.iter().map(|s| s.as_ref()).collect();
2269 #[cfg(feature = "lpg")]
2270 let context = flush::build_context(self.lpg_store(), &self.transaction_manager);
2271 #[cfg(not(feature = "lpg"))]
2272 let context = flush::build_context_minimal(&self.transaction_manager);
2273
2274 flush::flush(
2275 fm,
2276 §ion_refs,
2277 &context,
2278 reason,
2279 #[cfg(feature = "wal")]
2280 self.wal.as_deref(),
2281 )
2282 }
2283
2284 #[cfg(feature = "grafeo-file")]
2286 #[must_use]
2287 pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
2288 self.file_manager.as_ref()
2289 }
2290}
2291
2292impl Drop for GrafeoDB {
2293 fn drop(&mut self) {
2294 if let Err(e) = self.close() {
2295 grafeo_error!("Error closing database: {}", e);
2296 }
2297 }
2298}
2299
2300#[cfg(feature = "lpg")]
2301impl crate::admin::AdminService for GrafeoDB {
2302 fn info(&self) -> crate::admin::DatabaseInfo {
2303 self.info()
2304 }
2305
2306 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2307 self.detailed_stats()
2308 }
2309
2310 fn schema(&self) -> crate::admin::SchemaInfo {
2311 self.schema()
2312 }
2313
2314 fn validate(&self) -> crate::admin::ValidationResult {
2315 self.validate()
2316 }
2317
2318 fn wal_status(&self) -> crate::admin::WalStatus {
2319 self.wal_status()
2320 }
2321
2322 fn wal_checkpoint(&self) -> Result<()> {
2323 self.wal_checkpoint()
2324 }
2325}
2326
2327#[derive(Debug)]
2357pub struct QueryResult {
2358 pub columns: Vec<String>,
2360 pub column_types: Vec<grafeo_common::types::LogicalType>,
2362 pub(crate) rows: Vec<Vec<grafeo_common::types::Value>>,
2367 pub execution_time_ms: Option<f64>,
2369 pub rows_scanned: Option<u64>,
2371 pub status_message: Option<String>,
2373 pub gql_status: grafeo_common::utils::GqlStatus,
2375}
2376
2377impl QueryResult {
2378 #[must_use]
2380 pub fn empty() -> Self {
2381 Self {
2382 columns: Vec::new(),
2383 column_types: Vec::new(),
2384 rows: Vec::new(),
2385 execution_time_ms: None,
2386 rows_scanned: None,
2387 status_message: None,
2388 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2389 }
2390 }
2391
2392 #[must_use]
2394 pub fn status(msg: impl Into<String>) -> Self {
2395 Self {
2396 columns: Vec::new(),
2397 column_types: Vec::new(),
2398 rows: Vec::new(),
2399 execution_time_ms: None,
2400 rows_scanned: None,
2401 status_message: Some(msg.into()),
2402 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2403 }
2404 }
2405
2406 #[must_use]
2408 pub fn new(columns: Vec<String>) -> Self {
2409 let len = columns.len();
2410 Self {
2411 columns,
2412 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2413 rows: Vec::new(),
2414 execution_time_ms: None,
2415 rows_scanned: None,
2416 status_message: None,
2417 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2418 }
2419 }
2420
2421 #[must_use]
2423 pub fn with_types(
2424 columns: Vec<String>,
2425 column_types: Vec<grafeo_common::types::LogicalType>,
2426 ) -> Self {
2427 Self {
2428 columns,
2429 column_types,
2430 rows: Vec::new(),
2431 execution_time_ms: None,
2432 rows_scanned: None,
2433 status_message: None,
2434 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2435 }
2436 }
2437
2438 #[must_use]
2440 pub fn from_rows(columns: Vec<String>, rows: Vec<Vec<grafeo_common::types::Value>>) -> Self {
2441 let len = columns.len();
2442 Self {
2443 columns,
2444 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2445 rows,
2446 execution_time_ms: None,
2447 rows_scanned: None,
2448 status_message: None,
2449 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2450 }
2451 }
2452
2453 pub fn push_row(&mut self, row: Vec<grafeo_common::types::Value>) {
2455 self.rows.push(row);
2456 }
2457
2458 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2460 self.execution_time_ms = Some(execution_time_ms);
2461 self.rows_scanned = Some(rows_scanned);
2462 self
2463 }
2464
2465 #[must_use]
2467 pub fn execution_time_ms(&self) -> Option<f64> {
2468 self.execution_time_ms
2469 }
2470
2471 #[must_use]
2473 pub fn rows_scanned(&self) -> Option<u64> {
2474 self.rows_scanned
2475 }
2476
2477 #[must_use]
2479 pub fn row_count(&self) -> usize {
2480 self.rows.len()
2481 }
2482
2483 #[must_use]
2485 pub fn column_count(&self) -> usize {
2486 self.columns.len()
2487 }
2488
2489 #[must_use]
2491 pub fn is_empty(&self) -> bool {
2492 self.rows.is_empty()
2493 }
2494
2495 pub fn scalar<T: FromValue>(&self) -> Result<T> {
2504 if self.rows.len() != 1 || self.columns.len() != 1 {
2505 return Err(grafeo_common::utils::error::Error::InvalidValue(
2506 "Expected single value".to_string(),
2507 ));
2508 }
2509 T::from_value(&self.rows[0][0])
2510 }
2511
2512 #[must_use]
2514 pub fn rows(&self) -> &[Vec<grafeo_common::types::Value>] {
2515 &self.rows
2516 }
2517
2518 #[must_use]
2520 pub fn into_rows(self) -> Vec<Vec<grafeo_common::types::Value>> {
2521 self.rows
2522 }
2523
2524 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2526 self.rows.iter()
2527 }
2528
2529 #[cfg(feature = "arrow-export")]
2544 pub fn to_record_batch(
2545 &self,
2546 ) -> std::result::Result<arrow_array::RecordBatch, arrow::ArrowExportError> {
2547 arrow::query_result_to_record_batch(&self.columns, &self.column_types, &self.rows)
2548 }
2549
2550 #[cfg(feature = "arrow-export")]
2561 pub fn to_arrow_ipc(&self) -> std::result::Result<Vec<u8>, arrow::ArrowExportError> {
2562 let batch = self.to_record_batch()?;
2563 arrow::record_batch_to_ipc_stream(&batch)
2564 }
2565}
2566
2567impl std::fmt::Display for QueryResult {
2568 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2569 let table = grafeo_common::fmt::format_result_table(
2570 &self.columns,
2571 &self.rows,
2572 self.execution_time_ms,
2573 self.status_message.as_deref(),
2574 );
2575 f.write_str(&table)
2576 }
2577}
2578
2579pub trait FromValue: Sized {
2584 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2590}
2591
2592impl FromValue for i64 {
2593 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2594 value
2595 .as_int64()
2596 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2597 expected: "INT64".to_string(),
2598 found: value.type_name().to_string(),
2599 })
2600 }
2601}
2602
2603impl FromValue for f64 {
2604 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2605 value
2606 .as_float64()
2607 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2608 expected: "FLOAT64".to_string(),
2609 found: value.type_name().to_string(),
2610 })
2611 }
2612}
2613
2614impl FromValue for String {
2615 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2616 value.as_str().map(String::from).ok_or_else(|| {
2617 grafeo_common::utils::error::Error::TypeMismatch {
2618 expected: "STRING".to_string(),
2619 found: value.type_name().to_string(),
2620 }
2621 })
2622 }
2623}
2624
2625impl FromValue for bool {
2626 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2627 value
2628 .as_bool()
2629 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2630 expected: "BOOL".to_string(),
2631 found: value.type_name().to_string(),
2632 })
2633 }
2634}
2635
2636#[cfg(test)]
2637mod tests {
2638 use super::*;
2639
2640 #[test]
2641 fn test_create_in_memory_database() {
2642 let db = GrafeoDB::new_in_memory();
2643 assert_eq!(db.node_count(), 0);
2644 assert_eq!(db.edge_count(), 0);
2645 }
2646
2647 #[test]
2648 fn test_database_config() {
2649 let config = Config::in_memory().with_threads(4).with_query_logging();
2650
2651 let db = GrafeoDB::with_config(config).unwrap();
2652 assert_eq!(db.config().threads, 4);
2653 assert!(db.config().query_logging);
2654 }
2655
2656 #[test]
2657 fn test_database_session() {
2658 let db = GrafeoDB::new_in_memory();
2659 let _session = db.session();
2660 }
2662
2663 #[cfg(feature = "wal")]
2664 #[test]
2665 fn test_persistent_database_recovery() {
2666 use grafeo_common::types::Value;
2667 use tempfile::tempdir;
2668
2669 let dir = tempdir().unwrap();
2670 let db_path = dir.path().join("test_db");
2671
2672 {
2674 let db = GrafeoDB::open(&db_path).unwrap();
2675
2676 let alix = db.create_node(&["Person"]);
2677 db.set_node_property(alix, "name", Value::from("Alix"));
2678
2679 let gus = db.create_node(&["Person"]);
2680 db.set_node_property(gus, "name", Value::from("Gus"));
2681
2682 let _edge = db.create_edge(alix, gus, "KNOWS");
2683
2684 db.close().unwrap();
2686 }
2687
2688 {
2690 let db = GrafeoDB::open(&db_path).unwrap();
2691
2692 assert_eq!(db.node_count(), 2);
2693 assert_eq!(db.edge_count(), 1);
2694
2695 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2697 assert!(node0.is_some());
2698
2699 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2700 assert!(node1.is_some());
2701 }
2702 }
2703
2704 #[cfg(feature = "wal")]
2705 #[test]
2706 fn test_wal_logging() {
2707 use tempfile::tempdir;
2708
2709 let dir = tempdir().unwrap();
2710 let db_path = dir.path().join("wal_test_db");
2711
2712 let db = GrafeoDB::open(&db_path).unwrap();
2713
2714 let node = db.create_node(&["Test"]);
2716 db.delete_node(node);
2717
2718 if let Some(wal) = db.wal() {
2720 assert!(wal.record_count() > 0);
2721 }
2722
2723 db.close().unwrap();
2724 }
2725
2726 #[cfg(feature = "wal")]
2727 #[test]
2728 fn test_wal_recovery_multiple_sessions() {
2729 use grafeo_common::types::Value;
2731 use tempfile::tempdir;
2732
2733 let dir = tempdir().unwrap();
2734 let db_path = dir.path().join("multi_session_db");
2735
2736 {
2738 let db = GrafeoDB::open(&db_path).unwrap();
2739 let alix = db.create_node(&["Person"]);
2740 db.set_node_property(alix, "name", Value::from("Alix"));
2741 db.close().unwrap();
2742 }
2743
2744 {
2746 let db = GrafeoDB::open(&db_path).unwrap();
2747 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
2749 db.set_node_property(gus, "name", Value::from("Gus"));
2750 db.close().unwrap();
2751 }
2752
2753 {
2755 let db = GrafeoDB::open(&db_path).unwrap();
2756 assert_eq!(db.node_count(), 2);
2757
2758 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2760 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2761
2762 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2763 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2764 }
2765 }
2766
2767 #[cfg(feature = "wal")]
2768 #[test]
2769 fn test_database_consistency_after_mutations() {
2770 use grafeo_common::types::Value;
2772 use tempfile::tempdir;
2773
2774 let dir = tempdir().unwrap();
2775 let db_path = dir.path().join("consistency_db");
2776
2777 {
2778 let db = GrafeoDB::open(&db_path).unwrap();
2779
2780 let a = db.create_node(&["Node"]);
2782 let b = db.create_node(&["Node"]);
2783 let c = db.create_node(&["Node"]);
2784
2785 let e1 = db.create_edge(a, b, "LINKS");
2787 let _e2 = db.create_edge(b, c, "LINKS");
2788
2789 db.delete_edge(e1);
2791 db.delete_node(b);
2792
2793 db.set_node_property(a, "value", Value::Int64(1));
2795 db.set_node_property(c, "value", Value::Int64(3));
2796
2797 db.close().unwrap();
2798 }
2799
2800 {
2802 let db = GrafeoDB::open(&db_path).unwrap();
2803
2804 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2808 assert!(node_a.is_some());
2809
2810 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2811 assert!(node_c.is_some());
2812
2813 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2815 assert!(node_b.is_none());
2816 }
2817 }
2818
2819 #[cfg(feature = "wal")]
2820 #[test]
2821 fn test_close_is_idempotent() {
2822 use tempfile::tempdir;
2824
2825 let dir = tempdir().unwrap();
2826 let db_path = dir.path().join("close_test_db");
2827
2828 let db = GrafeoDB::open(&db_path).unwrap();
2829 db.create_node(&["Test"]);
2830
2831 assert!(db.close().is_ok());
2833
2834 assert!(db.close().is_ok());
2836 }
2837
2838 #[test]
2839 fn test_with_store_external_backend() {
2840 use grafeo_core::graph::lpg::LpgStore;
2841
2842 let external = Arc::new(LpgStore::new().unwrap());
2843
2844 let n1 = external.create_node(&["Person"]);
2846 external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2847
2848 let db = GrafeoDB::with_store(
2849 Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2850 Config::in_memory(),
2851 )
2852 .unwrap();
2853
2854 let session = db.session();
2855
2856 #[cfg(feature = "gql")]
2858 {
2859 let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2860 assert_eq!(result.rows.len(), 1);
2861 }
2862 }
2863
2864 #[test]
2865 fn test_with_config_custom_memory_limit() {
2866 let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); let db = GrafeoDB::with_config(config).unwrap();
2869 assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2870 assert_eq!(db.node_count(), 0);
2871 }
2872
2873 #[cfg(feature = "metrics")]
2874 #[test]
2875 fn test_database_metrics_registry() {
2876 let db = GrafeoDB::new_in_memory();
2877
2878 db.create_node(&["Person"]);
2880 db.create_node(&["Person"]);
2881
2882 let snap = db.metrics();
2884 assert_eq!(snap.query_count, 0); }
2887
2888 #[test]
2889 fn test_query_result_has_metrics() {
2890 let db = GrafeoDB::new_in_memory();
2892 db.create_node(&["Person"]);
2893 db.create_node(&["Person"]);
2894
2895 #[cfg(feature = "gql")]
2896 {
2897 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2898
2899 assert!(result.execution_time_ms.is_some());
2901 assert!(result.rows_scanned.is_some());
2902 assert!(result.execution_time_ms.unwrap() >= 0.0);
2903 assert_eq!(result.rows_scanned.unwrap(), 2);
2904 }
2905 }
2906
2907 #[test]
2908 fn test_empty_query_result_metrics() {
2909 let db = GrafeoDB::new_in_memory();
2911 db.create_node(&["Person"]);
2912
2913 #[cfg(feature = "gql")]
2914 {
2915 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2917
2918 assert!(result.execution_time_ms.is_some());
2919 assert!(result.rows_scanned.is_some());
2920 assert_eq!(result.rows_scanned.unwrap(), 0);
2921 }
2922 }
2923
2924 #[cfg(feature = "cdc")]
2925 mod cdc_integration {
2926 use super::*;
2927
2928 fn cdc_db() -> GrafeoDB {
2930 GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2931 }
2932
2933 #[test]
2934 fn test_node_lifecycle_history() {
2935 let db = cdc_db();
2936
2937 let id = db.create_node(&["Person"]);
2939 db.set_node_property(id, "name", "Alix".into());
2941 db.set_node_property(id, "name", "Gus".into());
2942 db.delete_node(id);
2944
2945 let history = db.history(id).unwrap();
2946 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2948 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2949 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2951 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2953 }
2954
2955 #[test]
2956 fn test_edge_lifecycle_history() {
2957 let db = cdc_db();
2958
2959 let alix = db.create_node(&["Person"]);
2960 let gus = db.create_node(&["Person"]);
2961 let edge = db.create_edge(alix, gus, "KNOWS");
2962 db.set_edge_property(edge, "since", 2024i64.into());
2963 db.delete_edge(edge);
2964
2965 let history = db.history(edge).unwrap();
2966 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2968 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2969 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2970 }
2971
2972 #[test]
2973 fn test_create_node_with_props_cdc() {
2974 let db = cdc_db();
2975
2976 let id = db.create_node_with_props(
2977 &["Person"],
2978 vec![
2979 ("name", grafeo_common::types::Value::from("Alix")),
2980 ("age", grafeo_common::types::Value::from(30i64)),
2981 ],
2982 );
2983
2984 let history = db.history(id).unwrap();
2985 assert_eq!(history.len(), 1);
2986 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2987 let after = history[0].after.as_ref().unwrap();
2989 assert_eq!(after.len(), 2);
2990 }
2991
2992 #[test]
2993 fn test_changes_between() {
2994 let db = cdc_db();
2995
2996 let id1 = db.create_node(&["A"]);
2997 let _id2 = db.create_node(&["B"]);
2998 db.set_node_property(id1, "x", 1i64.into());
2999
3000 let changes = db
3002 .changes_between(
3003 grafeo_common::types::EpochId(0),
3004 grafeo_common::types::EpochId(u64::MAX),
3005 )
3006 .unwrap();
3007 assert_eq!(changes.len(), 3); }
3009
3010 #[test]
3011 fn test_cdc_disabled_by_default() {
3012 let db = GrafeoDB::new_in_memory();
3013 assert!(!db.is_cdc_enabled());
3014
3015 let id = db.create_node(&["Person"]);
3016 db.set_node_property(id, "name", "Alix".into());
3017
3018 let history = db.history(id).unwrap();
3019 assert!(history.is_empty(), "CDC off by default: no events recorded");
3020 }
3021
3022 #[test]
3023 fn test_session_with_cdc_override_on() {
3024 let db = GrafeoDB::new_in_memory();
3026 let session = db.session_with_cdc(true);
3027 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3028 let changes = db
3030 .changes_between(
3031 grafeo_common::types::EpochId(0),
3032 grafeo_common::types::EpochId(u64::MAX),
3033 )
3034 .unwrap();
3035 assert!(
3036 !changes.is_empty(),
3037 "session_with_cdc(true) should record events"
3038 );
3039 }
3040
3041 #[test]
3042 fn test_session_with_cdc_override_off() {
3043 let db = cdc_db();
3045 let session = db.session_with_cdc(false);
3046 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3047 let changes = db
3048 .changes_between(
3049 grafeo_common::types::EpochId(0),
3050 grafeo_common::types::EpochId(u64::MAX),
3051 )
3052 .unwrap();
3053 assert!(
3054 changes.is_empty(),
3055 "session_with_cdc(false) should not record events"
3056 );
3057 }
3058
3059 #[test]
3060 fn test_set_cdc_enabled_runtime() {
3061 let db = GrafeoDB::new_in_memory();
3062 assert!(!db.is_cdc_enabled());
3063
3064 db.set_cdc_enabled(true);
3066 assert!(db.is_cdc_enabled());
3067
3068 let id = db.create_node(&["Person"]);
3069 let history = db.history(id).unwrap();
3070 assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
3071
3072 db.set_cdc_enabled(false);
3074 let id2 = db.create_node(&["Person"]);
3075 let history2 = db.history(id2).unwrap();
3076 assert!(
3077 history2.is_empty(),
3078 "CDC disabled at runtime stops recording"
3079 );
3080 }
3081 }
3082
3083 #[test]
3084 fn test_with_store_basic() {
3085 use grafeo_core::graph::lpg::LpgStore;
3086
3087 let store = Arc::new(LpgStore::new().unwrap());
3088 let n1 = store.create_node(&["Person"]);
3089 store.set_node_property(n1, "name", "Alix".into());
3090
3091 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3092 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3093
3094 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
3095 assert_eq!(result.rows.len(), 1);
3096 }
3097
3098 #[test]
3099 fn test_with_store_session() {
3100 use grafeo_core::graph::lpg::LpgStore;
3101
3102 let store = Arc::new(LpgStore::new().unwrap());
3103 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3104 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3105
3106 let session = db.session();
3107 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3108 assert_eq!(result.rows.len(), 1);
3109 }
3110
3111 #[test]
3112 fn test_with_store_mutations() {
3113 use grafeo_core::graph::lpg::LpgStore;
3114
3115 let store = Arc::new(LpgStore::new().unwrap());
3116 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
3117 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
3118
3119 let mut session = db.session();
3120
3121 session.begin_transaction().unwrap();
3125 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3126
3127 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3128 assert_eq!(result.rows.len(), 1);
3129
3130 session.commit().unwrap();
3131 }
3132
3133 #[test]
3138 fn test_query_result_empty() {
3139 let result = QueryResult::empty();
3140 assert!(result.is_empty());
3141 assert_eq!(result.row_count(), 0);
3142 assert_eq!(result.column_count(), 0);
3143 assert!(result.execution_time_ms().is_none());
3144 assert!(result.rows_scanned().is_none());
3145 assert!(result.status_message.is_none());
3146 }
3147
3148 #[test]
3149 fn test_query_result_status() {
3150 let result = QueryResult::status("Created node type 'Person'");
3151 assert!(result.is_empty());
3152 assert_eq!(result.column_count(), 0);
3153 assert_eq!(
3154 result.status_message.as_deref(),
3155 Some("Created node type 'Person'")
3156 );
3157 }
3158
3159 #[test]
3160 fn test_query_result_new_with_columns() {
3161 let result = QueryResult::new(vec!["name".into(), "age".into()]);
3162 assert_eq!(result.column_count(), 2);
3163 assert_eq!(result.row_count(), 0);
3164 assert!(result.is_empty());
3165 assert_eq!(
3167 result.column_types,
3168 vec![
3169 grafeo_common::types::LogicalType::Any,
3170 grafeo_common::types::LogicalType::Any
3171 ]
3172 );
3173 }
3174
3175 #[test]
3176 fn test_query_result_with_types() {
3177 use grafeo_common::types::LogicalType;
3178 let result = QueryResult::with_types(
3179 vec!["name".into(), "age".into()],
3180 vec![LogicalType::String, LogicalType::Int64],
3181 );
3182 assert_eq!(result.column_count(), 2);
3183 assert_eq!(result.column_types[0], LogicalType::String);
3184 assert_eq!(result.column_types[1], LogicalType::Int64);
3185 }
3186
3187 #[test]
3188 fn test_query_result_with_metrics() {
3189 let result = QueryResult::new(vec!["x".into()]).with_metrics(42.5, 100);
3190 assert_eq!(result.execution_time_ms(), Some(42.5));
3191 assert_eq!(result.rows_scanned(), Some(100));
3192 }
3193
3194 #[test]
3195 fn test_query_result_scalar_success() {
3196 use grafeo_common::types::Value;
3197 let mut result = QueryResult::new(vec!["count".into()]);
3198 result.rows.push(vec![Value::Int64(42)]);
3199
3200 let val: i64 = result.scalar().unwrap();
3201 assert_eq!(val, 42);
3202 }
3203
3204 #[test]
3205 fn test_query_result_scalar_wrong_shape() {
3206 use grafeo_common::types::Value;
3207 let mut result = QueryResult::new(vec!["x".into()]);
3209 result.rows.push(vec![Value::Int64(1)]);
3210 result.rows.push(vec![Value::Int64(2)]);
3211 assert!(result.scalar::<i64>().is_err());
3212
3213 let mut result2 = QueryResult::new(vec!["a".into(), "b".into()]);
3215 result2.rows.push(vec![Value::Int64(1), Value::Int64(2)]);
3216 assert!(result2.scalar::<i64>().is_err());
3217
3218 let result3 = QueryResult::new(vec!["x".into()]);
3220 assert!(result3.scalar::<i64>().is_err());
3221 }
3222
3223 #[test]
3224 fn test_query_result_iter() {
3225 use grafeo_common::types::Value;
3226 let mut result = QueryResult::new(vec!["x".into()]);
3227 result.rows.push(vec![Value::Int64(1)]);
3228 result.rows.push(vec![Value::Int64(2)]);
3229
3230 let collected: Vec<_> = result.iter().collect();
3231 assert_eq!(collected.len(), 2);
3232 }
3233
3234 #[test]
3235 fn test_query_result_display() {
3236 use grafeo_common::types::Value;
3237 let mut result = QueryResult::new(vec!["name".into()]);
3238 result.rows.push(vec![Value::from("Alix")]);
3239 let display = result.to_string();
3240 assert!(display.contains("name"));
3241 assert!(display.contains("Alix"));
3242 }
3243
3244 #[test]
3249 fn test_from_value_i64_type_mismatch() {
3250 use grafeo_common::types::Value;
3251 let val = Value::from("not a number");
3252 assert!(i64::from_value(&val).is_err());
3253 }
3254
3255 #[test]
3256 fn test_from_value_f64_type_mismatch() {
3257 use grafeo_common::types::Value;
3258 let val = Value::from("not a float");
3259 assert!(f64::from_value(&val).is_err());
3260 }
3261
3262 #[test]
3263 fn test_from_value_string_type_mismatch() {
3264 use grafeo_common::types::Value;
3265 let val = Value::Int64(42);
3266 assert!(String::from_value(&val).is_err());
3267 }
3268
3269 #[test]
3270 fn test_from_value_bool_type_mismatch() {
3271 use grafeo_common::types::Value;
3272 let val = Value::Int64(1);
3273 assert!(bool::from_value(&val).is_err());
3274 }
3275
3276 #[test]
3277 fn test_from_value_all_success() {
3278 use grafeo_common::types::Value;
3279 assert_eq!(i64::from_value(&Value::Int64(99)).unwrap(), 99);
3280 assert!((f64::from_value(&Value::Float64(2.72)).unwrap() - 2.72).abs() < f64::EPSILON);
3281 assert_eq!(String::from_value(&Value::from("hello")).unwrap(), "hello");
3282 assert!(bool::from_value(&Value::Bool(true)).unwrap());
3283 }
3284
3285 #[test]
3290 fn test_database_is_read_only_false_by_default() {
3291 let db = GrafeoDB::new_in_memory();
3292 assert!(!db.is_read_only());
3293 }
3294
3295 #[test]
3296 fn test_database_graph_model() {
3297 let db = GrafeoDB::new_in_memory();
3298 assert_eq!(db.graph_model(), crate::config::GraphModel::Lpg);
3299 }
3300
3301 #[test]
3302 fn test_database_memory_limit_none_by_default() {
3303 let db = GrafeoDB::new_in_memory();
3304 assert!(db.memory_limit().is_none());
3305 }
3306
3307 #[test]
3308 fn test_database_memory_limit_custom() {
3309 let config = Config::in_memory().with_memory_limit(128 * 1024 * 1024);
3310 let db = GrafeoDB::with_config(config).unwrap();
3311 assert_eq!(db.memory_limit(), Some(128 * 1024 * 1024));
3312 }
3313
3314 #[test]
3315 fn test_database_adaptive_config() {
3316 let db = GrafeoDB::new_in_memory();
3317 let adaptive = db.adaptive_config();
3318 assert!(adaptive.enabled);
3319 assert!((adaptive.threshold - 3.0).abs() < f64::EPSILON);
3320 }
3321
3322 #[test]
3323 fn test_database_buffer_manager() {
3324 let db = GrafeoDB::new_in_memory();
3325 let _bm = db.buffer_manager();
3326 }
3328
3329 #[test]
3330 fn test_database_query_cache() {
3331 let db = GrafeoDB::new_in_memory();
3332 let _qc = db.query_cache();
3333 }
3334
3335 #[test]
3336 fn test_database_clear_plan_cache() {
3337 let db = GrafeoDB::new_in_memory();
3338 #[cfg(feature = "gql")]
3340 {
3341 let _ = db.execute("MATCH (n) RETURN count(n)");
3342 }
3343 db.clear_plan_cache();
3344 }
3346
3347 #[test]
3348 fn test_database_gc() {
3349 let db = GrafeoDB::new_in_memory();
3350 db.create_node(&["Person"]);
3351 db.gc();
3352 assert_eq!(db.node_count(), 1);
3354 }
3355
3356 #[test]
3361 fn test_create_and_list_graphs() {
3362 let db = GrafeoDB::new_in_memory();
3363 let created = db.create_graph("social").unwrap();
3364 assert!(created);
3365
3366 let created_again = db.create_graph("social").unwrap();
3368 assert!(!created_again);
3369
3370 let names = db.list_graphs();
3371 assert!(names.contains(&"social".to_string()));
3372 }
3373
3374 #[test]
3375 fn test_drop_graph() {
3376 let db = GrafeoDB::new_in_memory();
3377 db.create_graph("temp").unwrap();
3378 assert!(db.drop_graph("temp"));
3379 assert!(!db.drop_graph("temp")); }
3381
3382 #[test]
3383 fn test_drop_graph_resets_current_graph() {
3384 let db = GrafeoDB::new_in_memory();
3385 db.create_graph("active").unwrap();
3386 db.set_current_graph(Some("active")).unwrap();
3387 assert_eq!(db.current_graph(), Some("active".to_string()));
3388
3389 db.drop_graph("active");
3390 assert_eq!(db.current_graph(), None);
3391 }
3392
3393 #[test]
3398 fn test_current_graph_default_none() {
3399 let db = GrafeoDB::new_in_memory();
3400 assert_eq!(db.current_graph(), None);
3401 }
3402
3403 #[test]
3404 fn test_set_current_graph_valid() {
3405 let db = GrafeoDB::new_in_memory();
3406 db.create_graph("social").unwrap();
3407 db.set_current_graph(Some("social")).unwrap();
3408 assert_eq!(db.current_graph(), Some("social".to_string()));
3409 }
3410
3411 #[test]
3412 fn test_set_current_graph_nonexistent() {
3413 let db = GrafeoDB::new_in_memory();
3414 let result = db.set_current_graph(Some("nonexistent"));
3415 assert!(result.is_err());
3416 }
3417
3418 #[test]
3419 fn test_set_current_graph_none_resets() {
3420 let db = GrafeoDB::new_in_memory();
3421 db.create_graph("social").unwrap();
3422 db.set_current_graph(Some("social")).unwrap();
3423 db.set_current_graph(None).unwrap();
3424 assert_eq!(db.current_graph(), None);
3425 }
3426
3427 #[test]
3428 fn test_set_current_graph_default_keyword() {
3429 let db = GrafeoDB::new_in_memory();
3430 db.set_current_graph(Some("default")).unwrap();
3432 assert_eq!(db.current_graph(), Some("default".to_string()));
3433 }
3434
3435 #[test]
3436 fn test_current_schema_default_none() {
3437 let db = GrafeoDB::new_in_memory();
3438 assert_eq!(db.current_schema(), None);
3439 }
3440
3441 #[test]
3442 fn test_set_current_schema_nonexistent() {
3443 let db = GrafeoDB::new_in_memory();
3444 let result = db.set_current_schema(Some("nonexistent"));
3445 assert!(result.is_err());
3446 }
3447
3448 #[test]
3449 fn test_set_current_schema_none_resets() {
3450 let db = GrafeoDB::new_in_memory();
3451 db.set_current_schema(None).unwrap();
3452 assert_eq!(db.current_schema(), None);
3453 }
3454
3455 #[test]
3460 fn test_graph_store_returns_lpg_by_default() {
3461 let db = GrafeoDB::new_in_memory();
3462 db.create_node(&["Person"]);
3463 let store = db.graph_store();
3464 assert_eq!(store.node_count(), 1);
3465 }
3466
3467 #[test]
3468 fn test_graph_store_mut_returns_some_by_default() {
3469 let db = GrafeoDB::new_in_memory();
3470 assert!(db.graph_store_mut().is_some());
3471 }
3472
3473 #[test]
3474 fn test_with_read_store() {
3475 use grafeo_core::graph::lpg::LpgStore;
3476
3477 let store = Arc::new(LpgStore::new().unwrap());
3478 store.create_node(&["Person"]);
3479
3480 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3481 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3482
3483 assert!(db.is_read_only());
3484 assert!(db.graph_store_mut().is_none());
3485
3486 let gs = db.graph_store();
3488 assert_eq!(gs.node_count(), 1);
3489 }
3490
3491 #[test]
3492 fn test_with_store_graph_store_methods() {
3493 use grafeo_core::graph::lpg::LpgStore;
3494
3495 let store = Arc::new(LpgStore::new().unwrap());
3496 store.create_node(&["Person"]);
3497
3498 let db = GrafeoDB::with_store(
3499 Arc::clone(&store) as Arc<dyn GraphStoreMut>,
3500 Config::in_memory(),
3501 )
3502 .unwrap();
3503
3504 assert!(!db.is_read_only());
3505 assert!(db.graph_store_mut().is_some());
3506 assert_eq!(db.graph_store().node_count(), 1);
3507 }
3508
3509 #[test]
3514 #[allow(deprecated)]
3515 fn test_session_read_only() {
3516 let db = GrafeoDB::new_in_memory();
3517 db.create_node(&["Person"]);
3518
3519 let session = db.session_read_only();
3520 #[cfg(feature = "gql")]
3522 {
3523 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
3524 assert_eq!(result.rows.len(), 1);
3525 }
3526 }
3527
3528 #[test]
3533 fn test_close_in_memory_database() {
3534 let db = GrafeoDB::new_in_memory();
3535 db.create_node(&["Person"]);
3536 assert!(db.close().is_ok());
3537 assert!(db.close().is_ok());
3539 }
3540
3541 #[test]
3546 fn test_with_config_invalid_config_zero_threads() {
3547 let config = Config::in_memory().with_threads(0);
3548 let result = GrafeoDB::with_config(config);
3549 assert!(result.is_err());
3550 }
3551
3552 #[test]
3553 fn test_with_config_invalid_config_zero_memory_limit() {
3554 let config = Config::in_memory().with_memory_limit(0);
3555 let result = GrafeoDB::with_config(config);
3556 assert!(result.is_err());
3557 }
3558
3559 #[test]
3564 fn test_storage_format_display() {
3565 use crate::config::StorageFormat;
3566 assert_eq!(StorageFormat::Auto.to_string(), "auto");
3567 assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
3568 assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
3569 }
3570
3571 #[test]
3572 fn test_storage_format_default() {
3573 use crate::config::StorageFormat;
3574 assert_eq!(StorageFormat::default(), StorageFormat::Auto);
3575 }
3576
3577 #[test]
3578 fn test_config_with_storage_format() {
3579 use crate::config::StorageFormat;
3580 let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
3581 assert_eq!(config.storage_format, StorageFormat::SingleFile);
3582 }
3583
3584 #[test]
3589 fn test_config_with_cdc() {
3590 let config = Config::in_memory().with_cdc();
3591 assert!(config.cdc_enabled);
3592 }
3593
3594 #[test]
3595 fn test_config_cdc_default_false() {
3596 let config = Config::in_memory();
3597 assert!(!config.cdc_enabled);
3598 }
3599
3600 #[test]
3605 fn test_config_error_is_error_trait() {
3606 use crate::config::ConfigError;
3607 let err: Box<dyn std::error::Error> = Box::new(ConfigError::ZeroMemoryLimit);
3608 assert!(err.source().is_none());
3609 }
3610
3611 #[cfg(feature = "metrics")]
3616 #[test]
3617 fn test_metrics_prometheus_output() {
3618 let db = GrafeoDB::new_in_memory();
3619 let prom = db.metrics_prometheus();
3620 assert!(!prom.is_empty());
3622 }
3623
3624 #[cfg(feature = "metrics")]
3625 #[test]
3626 fn test_reset_metrics() {
3627 let db = GrafeoDB::new_in_memory();
3628 let _session = db.session();
3630 db.reset_metrics();
3631 let snap = db.metrics();
3632 assert_eq!(snap.query_count, 0);
3633 }
3634
3635 #[test]
3640 fn test_drop_graph_on_external_store() {
3641 use grafeo_core::graph::lpg::LpgStore;
3642
3643 let store = Arc::new(LpgStore::new().unwrap());
3644 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
3645 let db = GrafeoDB::with_read_store(read_store, Config::in_memory()).unwrap();
3646
3647 assert!(!db.drop_graph("anything"));
3649 }
3650}