1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22#[cfg(feature = "wal")]
23pub(crate) mod wal_store;
24
25#[cfg(feature = "wal")]
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::AtomicUsize;
29
30use parking_lot::RwLock;
31
32#[cfg(feature = "wal")]
33use grafeo_adapters::storage::wal::{
34 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
35};
36use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
37use grafeo_common::utils::error::Result;
38use grafeo_core::graph::GraphStoreMut;
39use grafeo_core::graph::lpg::LpgStore;
40#[cfg(feature = "rdf")]
41use grafeo_core::graph::rdf::RdfStore;
42
43use crate::catalog::Catalog;
44use crate::config::Config;
45use crate::query::cache::QueryCache;
46use crate::session::Session;
47use crate::transaction::TransactionManager;
48
49pub struct GrafeoDB {
72 pub(super) config: Config,
74 pub(super) store: Arc<LpgStore>,
76 pub(super) catalog: Arc<Catalog>,
78 #[cfg(feature = "rdf")]
80 pub(super) rdf_store: Arc<RdfStore>,
81 pub(super) tx_manager: Arc<TransactionManager>,
83 pub(super) buffer_manager: Arc<BufferManager>,
85 #[cfg(feature = "wal")]
87 pub(super) wal: Option<Arc<LpgWal>>,
88 pub(super) query_cache: Arc<QueryCache>,
90 pub(super) commit_counter: Arc<AtomicUsize>,
92 pub(super) is_open: RwLock<bool>,
94 #[cfg(feature = "cdc")]
96 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
97 #[cfg(feature = "embed")]
99 pub(super) embedding_models:
100 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
101 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
104}
105
106impl GrafeoDB {
107 #[must_use]
123 pub fn new_in_memory() -> Self {
124 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
125 }
126
127 #[cfg(feature = "wal")]
146 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
147 Self::with_config(Config::persistent(path.as_ref()))
148 }
149
150 pub fn with_config(config: Config) -> Result<Self> {
174 config
176 .validate()
177 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
178
179 let store = Arc::new(LpgStore::new()?);
180 #[cfg(feature = "rdf")]
181 let rdf_store = Arc::new(RdfStore::new());
182 let tx_manager = Arc::new(TransactionManager::new());
183
184 let buffer_config = BufferManagerConfig {
186 budget: config.memory_limit.unwrap_or_else(|| {
187 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
188 }),
189 spill_path: config
190 .spill_path
191 .clone()
192 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
193 ..BufferManagerConfig::default()
194 };
195 let buffer_manager = BufferManager::new(buffer_config);
196
197 let catalog = Arc::new(Catalog::new());
199
200 #[cfg(feature = "wal")]
202 let wal = if config.wal_enabled {
203 if let Some(ref db_path) = config.path {
204 std::fs::create_dir_all(db_path)?;
206
207 let wal_path = db_path.join("wal");
208
209 if wal_path.exists() {
211 let recovery = WalRecovery::new(&wal_path);
212 let records = recovery.recover()?;
213 Self::apply_wal_records(&store, &catalog, &records)?;
214 }
215
216 let wal_durability = match config.wal_durability {
218 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
219 crate::config::DurabilityMode::Batch {
220 max_delay_ms,
221 max_records,
222 } => WalDurabilityMode::Batch {
223 max_delay_ms,
224 max_records,
225 },
226 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
227 WalDurabilityMode::Adaptive { target_interval_ms }
228 }
229 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
230 };
231 let wal_config = WalConfig {
232 durability: wal_durability,
233 ..WalConfig::default()
234 };
235 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
236 Some(Arc::new(wal_manager))
237 } else {
238 None
239 }
240 } else {
241 None
242 };
243
244 let query_cache = Arc::new(QueryCache::default());
246
247 Ok(Self {
248 config,
249 store,
250 catalog,
251 #[cfg(feature = "rdf")]
252 rdf_store,
253 tx_manager,
254 buffer_manager,
255 #[cfg(feature = "wal")]
256 wal,
257 query_cache,
258 commit_counter: Arc::new(AtomicUsize::new(0)),
259 is_open: RwLock::new(true),
260 #[cfg(feature = "cdc")]
261 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
262 #[cfg(feature = "embed")]
263 embedding_models: RwLock::new(hashbrown::HashMap::new()),
264 external_store: None,
265 })
266 }
267
268 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
293 config
294 .validate()
295 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
296
297 let dummy_store = Arc::new(LpgStore::new()?);
298 let tx_manager = Arc::new(TransactionManager::new());
299
300 let buffer_config = BufferManagerConfig {
301 budget: config.memory_limit.unwrap_or_else(|| {
302 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
303 }),
304 spill_path: None,
305 ..BufferManagerConfig::default()
306 };
307 let buffer_manager = BufferManager::new(buffer_config);
308
309 let query_cache = Arc::new(QueryCache::default());
310
311 Ok(Self {
312 config,
313 store: dummy_store,
314 catalog: Arc::new(Catalog::new()),
315 #[cfg(feature = "rdf")]
316 rdf_store: Arc::new(RdfStore::new()),
317 tx_manager,
318 buffer_manager,
319 #[cfg(feature = "wal")]
320 wal: None,
321 query_cache,
322 commit_counter: Arc::new(AtomicUsize::new(0)),
323 is_open: RwLock::new(true),
324 #[cfg(feature = "cdc")]
325 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
326 #[cfg(feature = "embed")]
327 embedding_models: RwLock::new(hashbrown::HashMap::new()),
328 external_store: Some(store),
329 })
330 }
331
332 #[cfg(feature = "wal")]
334 fn apply_wal_records(store: &LpgStore, catalog: &Catalog, records: &[WalRecord]) -> Result<()> {
335 use crate::catalog::{
336 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
337 };
338
339 for record in records {
340 match record {
341 WalRecord::CreateNode { id, labels } => {
342 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
343 store.create_node_with_id(*id, &label_refs);
344 }
345 WalRecord::DeleteNode { id } => {
346 store.delete_node(*id);
347 }
348 WalRecord::CreateEdge {
349 id,
350 src,
351 dst,
352 edge_type,
353 } => {
354 store.create_edge_with_id(*id, *src, *dst, edge_type);
355 }
356 WalRecord::DeleteEdge { id } => {
357 store.delete_edge(*id);
358 }
359 WalRecord::SetNodeProperty { id, key, value } => {
360 store.set_node_property(*id, key, value.clone());
361 }
362 WalRecord::SetEdgeProperty { id, key, value } => {
363 store.set_edge_property(*id, key, value.clone());
364 }
365 WalRecord::AddNodeLabel { id, label } => {
366 store.add_label(*id, label);
367 }
368 WalRecord::RemoveNodeLabel { id, label } => {
369 store.remove_label(*id, label);
370 }
371 WalRecord::RemoveNodeProperty { id, key } => {
372 store.remove_node_property(*id, key);
373 }
374 WalRecord::RemoveEdgeProperty { id, key } => {
375 store.remove_edge_property(*id, key);
376 }
377
378 WalRecord::CreateNodeType {
380 name,
381 properties,
382 constraints,
383 } => {
384 let def = NodeTypeDefinition {
385 name: name.clone(),
386 properties: properties
387 .iter()
388 .map(|(n, t, nullable)| TypedProperty {
389 name: n.clone(),
390 data_type: PropertyDataType::from_type_name(t),
391 nullable: *nullable,
392 default_value: None,
393 })
394 .collect(),
395 constraints: constraints
396 .iter()
397 .map(|(kind, props)| match kind.as_str() {
398 "unique" => TypeConstraint::Unique(props.clone()),
399 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
400 "not_null" if !props.is_empty() => {
401 TypeConstraint::NotNull(props[0].clone())
402 }
403 _ => TypeConstraint::Unique(props.clone()),
404 })
405 .collect(),
406 };
407 let _ = catalog.register_node_type(def);
408 }
409 WalRecord::DropNodeType { name } => {
410 let _ = catalog.drop_node_type(name);
411 }
412 WalRecord::CreateEdgeType {
413 name,
414 properties,
415 constraints,
416 } => {
417 let def = EdgeTypeDefinition {
418 name: name.clone(),
419 properties: properties
420 .iter()
421 .map(|(n, t, nullable)| TypedProperty {
422 name: n.clone(),
423 data_type: PropertyDataType::from_type_name(t),
424 nullable: *nullable,
425 default_value: None,
426 })
427 .collect(),
428 constraints: constraints
429 .iter()
430 .map(|(kind, props)| match kind.as_str() {
431 "unique" => TypeConstraint::Unique(props.clone()),
432 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
433 "not_null" if !props.is_empty() => {
434 TypeConstraint::NotNull(props[0].clone())
435 }
436 _ => TypeConstraint::Unique(props.clone()),
437 })
438 .collect(),
439 };
440 let _ = catalog.register_edge_type_def(def);
441 }
442 WalRecord::DropEdgeType { name } => {
443 let _ = catalog.drop_edge_type_def(name);
444 }
445 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
446 }
449 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
450 }
453 WalRecord::CreateGraphType {
454 name,
455 node_types,
456 edge_types,
457 open,
458 } => {
459 use crate::catalog::GraphTypeDefinition;
460 let def = GraphTypeDefinition {
461 name: name.clone(),
462 allowed_node_types: node_types.clone(),
463 allowed_edge_types: edge_types.clone(),
464 open: *open,
465 };
466 let _ = catalog.register_graph_type(def);
467 }
468 WalRecord::DropGraphType { name } => {
469 let _ = catalog.drop_graph_type(name);
470 }
471 WalRecord::CreateSchema { name } => {
472 let _ = catalog.register_schema_namespace(name.clone());
473 }
474 WalRecord::DropSchema { name } => {
475 let _ = catalog.drop_schema_namespace(name);
476 }
477
478 WalRecord::AlterNodeType { name, alterations } => {
479 for (action, prop_name, type_name, nullable) in alterations {
480 match action.as_str() {
481 "add" => {
482 let prop = TypedProperty {
483 name: prop_name.clone(),
484 data_type: PropertyDataType::from_type_name(type_name),
485 nullable: *nullable,
486 default_value: None,
487 };
488 let _ = catalog.alter_node_type_add_property(name, prop);
489 }
490 "drop" => {
491 let _ = catalog.alter_node_type_drop_property(name, prop_name);
492 }
493 _ => {}
494 }
495 }
496 }
497 WalRecord::AlterEdgeType { name, alterations } => {
498 for (action, prop_name, type_name, nullable) in alterations {
499 match action.as_str() {
500 "add" => {
501 let prop = TypedProperty {
502 name: prop_name.clone(),
503 data_type: PropertyDataType::from_type_name(type_name),
504 nullable: *nullable,
505 default_value: None,
506 };
507 let _ = catalog.alter_edge_type_add_property(name, prop);
508 }
509 "drop" => {
510 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
511 }
512 _ => {}
513 }
514 }
515 }
516 WalRecord::AlterGraphType { name, alterations } => {
517 for (action, type_name) in alterations {
518 match action.as_str() {
519 "add_node" => {
520 let _ =
521 catalog.alter_graph_type_add_node_type(name, type_name.clone());
522 }
523 "drop_node" => {
524 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
525 }
526 "add_edge" => {
527 let _ =
528 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
529 }
530 "drop_edge" => {
531 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
532 }
533 _ => {}
534 }
535 }
536 }
537
538 WalRecord::CreateProcedure {
539 name,
540 params,
541 returns,
542 body,
543 } => {
544 use crate::catalog::ProcedureDefinition;
545 let def = ProcedureDefinition {
546 name: name.clone(),
547 params: params.clone(),
548 returns: returns.clone(),
549 body: body.clone(),
550 };
551 let _ = catalog.register_procedure(def);
552 }
553 WalRecord::DropProcedure { name } => {
554 let _ = catalog.drop_procedure(name);
555 }
556
557 WalRecord::TxCommit { .. }
558 | WalRecord::TxAbort { .. }
559 | WalRecord::Checkpoint { .. } => {
560 }
563 }
564 }
565 Ok(())
566 }
567
568 #[must_use]
591 pub fn session(&self) -> Session {
592 if let Some(ref ext_store) = self.external_store {
593 return Session::with_external_store(
594 Arc::clone(ext_store),
595 Arc::clone(&self.tx_manager),
596 Arc::clone(&self.query_cache),
597 Arc::clone(&self.catalog),
598 self.config.adaptive.clone(),
599 self.config.factorized_execution,
600 self.config.graph_model,
601 self.config.query_timeout,
602 Arc::clone(&self.commit_counter),
603 self.config.gc_interval,
604 );
605 }
606
607 #[cfg(feature = "rdf")]
608 let mut session = Session::with_rdf_store_and_adaptive(
609 Arc::clone(&self.store),
610 Arc::clone(&self.rdf_store),
611 Arc::clone(&self.tx_manager),
612 Arc::clone(&self.query_cache),
613 Arc::clone(&self.catalog),
614 self.config.adaptive.clone(),
615 self.config.factorized_execution,
616 self.config.graph_model,
617 self.config.query_timeout,
618 Arc::clone(&self.commit_counter),
619 self.config.gc_interval,
620 );
621 #[cfg(not(feature = "rdf"))]
622 let mut session = Session::with_adaptive(
623 Arc::clone(&self.store),
624 Arc::clone(&self.tx_manager),
625 Arc::clone(&self.query_cache),
626 Arc::clone(&self.catalog),
627 self.config.adaptive.clone(),
628 self.config.factorized_execution,
629 self.config.graph_model,
630 self.config.query_timeout,
631 Arc::clone(&self.commit_counter),
632 self.config.gc_interval,
633 );
634
635 #[cfg(feature = "wal")]
636 if let Some(ref wal) = self.wal {
637 session.set_wal(Arc::clone(wal));
638 }
639
640 #[cfg(feature = "cdc")]
641 session.set_cdc_log(Arc::clone(&self.cdc_log));
642
643 let _ = &mut session;
645
646 session
647 }
648
649 #[must_use]
651 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
652 &self.config.adaptive
653 }
654
655 #[must_use]
657 pub fn config(&self) -> &Config {
658 &self.config
659 }
660
661 #[must_use]
663 pub fn graph_model(&self) -> crate::config::GraphModel {
664 self.config.graph_model
665 }
666
667 #[must_use]
669 pub fn memory_limit(&self) -> Option<usize> {
670 self.config.memory_limit
671 }
672
673 #[must_use]
681 pub fn store(&self) -> &Arc<LpgStore> {
682 &self.store
683 }
684
685 pub fn create_graph(&self, name: &str) -> Result<bool> {
693 Ok(self.store.create_graph(name)?)
694 }
695
696 pub fn drop_graph(&self, name: &str) -> bool {
698 self.store.drop_graph(name)
699 }
700
701 #[must_use]
703 pub fn list_graphs(&self) -> Vec<String> {
704 self.store.graph_names()
705 }
706
707 #[must_use]
715 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
716 if let Some(ref ext_store) = self.external_store {
717 Arc::clone(ext_store)
718 } else {
719 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
720 }
721 }
722
723 pub fn gc(&self) {
729 let min_epoch = self.tx_manager.min_active_epoch();
730 self.store.gc_versions(min_epoch);
731 self.tx_manager.gc();
732 }
733
734 #[must_use]
736 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
737 &self.buffer_manager
738 }
739
740 #[must_use]
742 pub fn query_cache(&self) -> &Arc<QueryCache> {
743 &self.query_cache
744 }
745
746 pub fn close(&self) -> Result<()> {
760 let mut is_open = self.is_open.write();
761 if !*is_open {
762 return Ok(());
763 }
764
765 #[cfg(feature = "wal")]
767 if let Some(ref wal) = self.wal {
768 let epoch = self.store.current_epoch();
769
770 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
772 self.tx_manager.begin()
774 });
775
776 wal.log(&WalRecord::TxCommit {
778 tx_id: checkpoint_tx,
779 })?;
780
781 wal.checkpoint(checkpoint_tx, epoch)?;
783 wal.sync()?;
784 }
785
786 *is_open = false;
787 Ok(())
788 }
789
790 #[cfg(feature = "wal")]
792 #[must_use]
793 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
794 self.wal.as_ref()
795 }
796
797 #[cfg(feature = "wal")]
799 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
800 if let Some(ref wal) = self.wal {
801 wal.log(record)?;
802 }
803 Ok(())
804 }
805}
806
807impl Drop for GrafeoDB {
808 fn drop(&mut self) {
809 if let Err(e) = self.close() {
810 tracing::error!("Error closing database: {}", e);
811 }
812 }
813}
814
815impl crate::admin::AdminService for GrafeoDB {
816 fn info(&self) -> crate::admin::DatabaseInfo {
817 self.info()
818 }
819
820 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
821 self.detailed_stats()
822 }
823
824 fn schema(&self) -> crate::admin::SchemaInfo {
825 self.schema()
826 }
827
828 fn validate(&self) -> crate::admin::ValidationResult {
829 self.validate()
830 }
831
832 fn wal_status(&self) -> crate::admin::WalStatus {
833 self.wal_status()
834 }
835
836 fn wal_checkpoint(&self) -> Result<()> {
837 self.wal_checkpoint()
838 }
839}
840
841#[derive(Debug)]
871pub struct QueryResult {
872 pub columns: Vec<String>,
874 pub column_types: Vec<grafeo_common::types::LogicalType>,
876 pub rows: Vec<Vec<grafeo_common::types::Value>>,
878 pub execution_time_ms: Option<f64>,
880 pub rows_scanned: Option<u64>,
882 pub status_message: Option<String>,
884}
885
886impl QueryResult {
887 #[must_use]
889 pub fn empty() -> Self {
890 Self {
891 columns: Vec::new(),
892 column_types: Vec::new(),
893 rows: Vec::new(),
894 execution_time_ms: None,
895 rows_scanned: None,
896 status_message: None,
897 }
898 }
899
900 #[must_use]
902 pub fn status(msg: impl Into<String>) -> Self {
903 Self {
904 columns: Vec::new(),
905 column_types: Vec::new(),
906 rows: Vec::new(),
907 execution_time_ms: None,
908 rows_scanned: None,
909 status_message: Some(msg.into()),
910 }
911 }
912
913 #[must_use]
915 pub fn new(columns: Vec<String>) -> Self {
916 let len = columns.len();
917 Self {
918 columns,
919 column_types: vec![grafeo_common::types::LogicalType::Any; len],
920 rows: Vec::new(),
921 execution_time_ms: None,
922 rows_scanned: None,
923 status_message: None,
924 }
925 }
926
927 #[must_use]
929 pub fn with_types(
930 columns: Vec<String>,
931 column_types: Vec<grafeo_common::types::LogicalType>,
932 ) -> Self {
933 Self {
934 columns,
935 column_types,
936 rows: Vec::new(),
937 execution_time_ms: None,
938 rows_scanned: None,
939 status_message: None,
940 }
941 }
942
943 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
945 self.execution_time_ms = Some(execution_time_ms);
946 self.rows_scanned = Some(rows_scanned);
947 self
948 }
949
950 #[must_use]
952 pub fn execution_time_ms(&self) -> Option<f64> {
953 self.execution_time_ms
954 }
955
956 #[must_use]
958 pub fn rows_scanned(&self) -> Option<u64> {
959 self.rows_scanned
960 }
961
962 #[must_use]
964 pub fn row_count(&self) -> usize {
965 self.rows.len()
966 }
967
968 #[must_use]
970 pub fn column_count(&self) -> usize {
971 self.columns.len()
972 }
973
974 #[must_use]
976 pub fn is_empty(&self) -> bool {
977 self.rows.is_empty()
978 }
979
980 pub fn scalar<T: FromValue>(&self) -> Result<T> {
989 if self.rows.len() != 1 || self.columns.len() != 1 {
990 return Err(grafeo_common::utils::error::Error::InvalidValue(
991 "Expected single value".to_string(),
992 ));
993 }
994 T::from_value(&self.rows[0][0])
995 }
996
997 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
999 self.rows.iter()
1000 }
1001}
1002
1003pub trait FromValue: Sized {
1008 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1010}
1011
1012impl FromValue for i64 {
1013 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1014 value
1015 .as_int64()
1016 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1017 expected: "INT64".to_string(),
1018 found: value.type_name().to_string(),
1019 })
1020 }
1021}
1022
1023impl FromValue for f64 {
1024 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1025 value
1026 .as_float64()
1027 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1028 expected: "FLOAT64".to_string(),
1029 found: value.type_name().to_string(),
1030 })
1031 }
1032}
1033
1034impl FromValue for String {
1035 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1036 value.as_str().map(String::from).ok_or_else(|| {
1037 grafeo_common::utils::error::Error::TypeMismatch {
1038 expected: "STRING".to_string(),
1039 found: value.type_name().to_string(),
1040 }
1041 })
1042 }
1043}
1044
1045impl FromValue for bool {
1046 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1047 value
1048 .as_bool()
1049 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1050 expected: "BOOL".to_string(),
1051 found: value.type_name().to_string(),
1052 })
1053 }
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058 use super::*;
1059
1060 #[test]
1061 fn test_create_in_memory_database() {
1062 let db = GrafeoDB::new_in_memory();
1063 assert_eq!(db.node_count(), 0);
1064 assert_eq!(db.edge_count(), 0);
1065 }
1066
1067 #[test]
1068 fn test_database_config() {
1069 let config = Config::in_memory().with_threads(4).with_query_logging();
1070
1071 let db = GrafeoDB::with_config(config).unwrap();
1072 assert_eq!(db.config().threads, 4);
1073 assert!(db.config().query_logging);
1074 }
1075
1076 #[test]
1077 fn test_database_session() {
1078 let db = GrafeoDB::new_in_memory();
1079 let _session = db.session();
1080 }
1082
1083 #[cfg(feature = "wal")]
1084 #[test]
1085 fn test_persistent_database_recovery() {
1086 use grafeo_common::types::Value;
1087 use tempfile::tempdir;
1088
1089 let dir = tempdir().unwrap();
1090 let db_path = dir.path().join("test_db");
1091
1092 {
1094 let db = GrafeoDB::open(&db_path).unwrap();
1095
1096 let alix = db.create_node(&["Person"]);
1097 db.set_node_property(alix, "name", Value::from("Alix"));
1098
1099 let gus = db.create_node(&["Person"]);
1100 db.set_node_property(gus, "name", Value::from("Gus"));
1101
1102 let _edge = db.create_edge(alix, gus, "KNOWS");
1103
1104 db.close().unwrap();
1106 }
1107
1108 {
1110 let db = GrafeoDB::open(&db_path).unwrap();
1111
1112 assert_eq!(db.node_count(), 2);
1113 assert_eq!(db.edge_count(), 1);
1114
1115 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1117 assert!(node0.is_some());
1118
1119 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1120 assert!(node1.is_some());
1121 }
1122 }
1123
1124 #[cfg(feature = "wal")]
1125 #[test]
1126 fn test_wal_logging() {
1127 use tempfile::tempdir;
1128
1129 let dir = tempdir().unwrap();
1130 let db_path = dir.path().join("wal_test_db");
1131
1132 let db = GrafeoDB::open(&db_path).unwrap();
1133
1134 let node = db.create_node(&["Test"]);
1136 db.delete_node(node);
1137
1138 if let Some(wal) = db.wal() {
1140 assert!(wal.record_count() > 0);
1141 }
1142
1143 db.close().unwrap();
1144 }
1145
1146 #[cfg(feature = "wal")]
1147 #[test]
1148 fn test_wal_recovery_multiple_sessions() {
1149 use grafeo_common::types::Value;
1151 use tempfile::tempdir;
1152
1153 let dir = tempdir().unwrap();
1154 let db_path = dir.path().join("multi_session_db");
1155
1156 {
1158 let db = GrafeoDB::open(&db_path).unwrap();
1159 let alix = db.create_node(&["Person"]);
1160 db.set_node_property(alix, "name", Value::from("Alix"));
1161 db.close().unwrap();
1162 }
1163
1164 {
1166 let db = GrafeoDB::open(&db_path).unwrap();
1167 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1169 db.set_node_property(gus, "name", Value::from("Gus"));
1170 db.close().unwrap();
1171 }
1172
1173 {
1175 let db = GrafeoDB::open(&db_path).unwrap();
1176 assert_eq!(db.node_count(), 2);
1177
1178 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1180 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1181
1182 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1183 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1184 }
1185 }
1186
1187 #[cfg(feature = "wal")]
1188 #[test]
1189 fn test_database_consistency_after_mutations() {
1190 use grafeo_common::types::Value;
1192 use tempfile::tempdir;
1193
1194 let dir = tempdir().unwrap();
1195 let db_path = dir.path().join("consistency_db");
1196
1197 {
1198 let db = GrafeoDB::open(&db_path).unwrap();
1199
1200 let a = db.create_node(&["Node"]);
1202 let b = db.create_node(&["Node"]);
1203 let c = db.create_node(&["Node"]);
1204
1205 let e1 = db.create_edge(a, b, "LINKS");
1207 let _e2 = db.create_edge(b, c, "LINKS");
1208
1209 db.delete_edge(e1);
1211 db.delete_node(b);
1212
1213 db.set_node_property(a, "value", Value::Int64(1));
1215 db.set_node_property(c, "value", Value::Int64(3));
1216
1217 db.close().unwrap();
1218 }
1219
1220 {
1222 let db = GrafeoDB::open(&db_path).unwrap();
1223
1224 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1228 assert!(node_a.is_some());
1229
1230 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1231 assert!(node_c.is_some());
1232
1233 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1235 assert!(node_b.is_none());
1236 }
1237 }
1238
1239 #[cfg(feature = "wal")]
1240 #[test]
1241 fn test_close_is_idempotent() {
1242 use tempfile::tempdir;
1244
1245 let dir = tempdir().unwrap();
1246 let db_path = dir.path().join("close_test_db");
1247
1248 let db = GrafeoDB::open(&db_path).unwrap();
1249 db.create_node(&["Test"]);
1250
1251 assert!(db.close().is_ok());
1253
1254 assert!(db.close().is_ok());
1256 }
1257
1258 #[test]
1259 fn test_query_result_has_metrics() {
1260 let db = GrafeoDB::new_in_memory();
1262 db.create_node(&["Person"]);
1263 db.create_node(&["Person"]);
1264
1265 #[cfg(feature = "gql")]
1266 {
1267 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1268
1269 assert!(result.execution_time_ms.is_some());
1271 assert!(result.rows_scanned.is_some());
1272 assert!(result.execution_time_ms.unwrap() >= 0.0);
1273 assert_eq!(result.rows_scanned.unwrap(), 2);
1274 }
1275 }
1276
1277 #[test]
1278 fn test_empty_query_result_metrics() {
1279 let db = GrafeoDB::new_in_memory();
1281 db.create_node(&["Person"]);
1282
1283 #[cfg(feature = "gql")]
1284 {
1285 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1287
1288 assert!(result.execution_time_ms.is_some());
1289 assert!(result.rows_scanned.is_some());
1290 assert_eq!(result.rows_scanned.unwrap(), 0);
1291 }
1292 }
1293
1294 #[cfg(feature = "cdc")]
1295 mod cdc_integration {
1296 use super::*;
1297
1298 #[test]
1299 fn test_node_lifecycle_history() {
1300 let db = GrafeoDB::new_in_memory();
1301
1302 let id = db.create_node(&["Person"]);
1304 db.set_node_property(id, "name", "Alix".into());
1306 db.set_node_property(id, "name", "Gus".into());
1307 db.delete_node(id);
1309
1310 let history = db.history(id).unwrap();
1311 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1313 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1314 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1316 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1318 }
1319
1320 #[test]
1321 fn test_edge_lifecycle_history() {
1322 let db = GrafeoDB::new_in_memory();
1323
1324 let alix = db.create_node(&["Person"]);
1325 let gus = db.create_node(&["Person"]);
1326 let edge = db.create_edge(alix, gus, "KNOWS");
1327 db.set_edge_property(edge, "since", 2024i64.into());
1328 db.delete_edge(edge);
1329
1330 let history = db.history(edge).unwrap();
1331 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1333 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1334 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1335 }
1336
1337 #[test]
1338 fn test_create_node_with_props_cdc() {
1339 let db = GrafeoDB::new_in_memory();
1340
1341 let id = db.create_node_with_props(
1342 &["Person"],
1343 vec![
1344 ("name", grafeo_common::types::Value::from("Alix")),
1345 ("age", grafeo_common::types::Value::from(30i64)),
1346 ],
1347 );
1348
1349 let history = db.history(id).unwrap();
1350 assert_eq!(history.len(), 1);
1351 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1352 let after = history[0].after.as_ref().unwrap();
1354 assert_eq!(after.len(), 2);
1355 }
1356
1357 #[test]
1358 fn test_changes_between() {
1359 let db = GrafeoDB::new_in_memory();
1360
1361 let id1 = db.create_node(&["A"]);
1362 let _id2 = db.create_node(&["B"]);
1363 db.set_node_property(id1, "x", 1i64.into());
1364
1365 let changes = db
1367 .changes_between(
1368 grafeo_common::types::EpochId(0),
1369 grafeo_common::types::EpochId(u64::MAX),
1370 )
1371 .unwrap();
1372 assert_eq!(changes.len(), 3); }
1374 }
1375
1376 #[test]
1377 fn test_with_store_basic() {
1378 use grafeo_core::graph::lpg::LpgStore;
1379
1380 let store = Arc::new(LpgStore::new().unwrap());
1381 let n1 = store.create_node(&["Person"]);
1382 store.set_node_property(n1, "name", "Alix".into());
1383
1384 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1385 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1386
1387 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1388 assert_eq!(result.rows.len(), 1);
1389 }
1390
1391 #[test]
1392 fn test_with_store_session() {
1393 use grafeo_core::graph::lpg::LpgStore;
1394
1395 let store = Arc::new(LpgStore::new().unwrap());
1396 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1397 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1398
1399 let session = db.session();
1400 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1401 assert_eq!(result.rows.len(), 1);
1402 }
1403
1404 #[test]
1405 fn test_with_store_mutations() {
1406 use grafeo_core::graph::lpg::LpgStore;
1407
1408 let store = Arc::new(LpgStore::new().unwrap());
1409 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1410 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1411
1412 let session = db.session();
1413 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1414
1415 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1416 assert_eq!(result.rows.len(), 1);
1417
1418 assert_eq!(store.node_count(), 1);
1420 }
1421}