1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22
23#[cfg(feature = "wal")]
24use std::path::Path;
25use std::sync::Arc;
26use std::sync::atomic::AtomicUsize;
27
28use parking_lot::RwLock;
29
30#[cfg(feature = "wal")]
31use grafeo_adapters::storage::wal::{
32 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
33};
34use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
35use grafeo_common::utils::error::Result;
36use grafeo_core::graph::GraphStoreMut;
37use grafeo_core::graph::lpg::LpgStore;
38#[cfg(feature = "rdf")]
39use grafeo_core::graph::rdf::RdfStore;
40
41use crate::catalog::Catalog;
42use crate::config::Config;
43use crate::query::cache::QueryCache;
44use crate::session::Session;
45use crate::transaction::TransactionManager;
46
47pub struct GrafeoDB {
70 pub(super) config: Config,
72 pub(super) store: Arc<LpgStore>,
74 pub(super) catalog: Arc<Catalog>,
76 #[cfg(feature = "rdf")]
78 pub(super) rdf_store: Arc<RdfStore>,
79 pub(super) tx_manager: Arc<TransactionManager>,
81 pub(super) buffer_manager: Arc<BufferManager>,
83 #[cfg(feature = "wal")]
85 pub(super) wal: Option<Arc<LpgWal>>,
86 pub(super) query_cache: Arc<QueryCache>,
88 pub(super) commit_counter: Arc<AtomicUsize>,
90 pub(super) is_open: RwLock<bool>,
92 #[cfg(feature = "cdc")]
94 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
95 #[cfg(feature = "embed")]
97 pub(super) embedding_models:
98 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
99 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
102}
103
104impl GrafeoDB {
105 #[must_use]
121 pub fn new_in_memory() -> Self {
122 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
123 }
124
125 #[cfg(feature = "wal")]
144 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
145 Self::with_config(Config::persistent(path.as_ref()))
146 }
147
148 pub fn with_config(config: Config) -> Result<Self> {
172 config
174 .validate()
175 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
176
177 let store = Arc::new(LpgStore::new());
178 #[cfg(feature = "rdf")]
179 let rdf_store = Arc::new(RdfStore::new());
180 let tx_manager = Arc::new(TransactionManager::new());
181
182 let buffer_config = BufferManagerConfig {
184 budget: config.memory_limit.unwrap_or_else(|| {
185 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
186 }),
187 spill_path: config
188 .spill_path
189 .clone()
190 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
191 ..BufferManagerConfig::default()
192 };
193 let buffer_manager = BufferManager::new(buffer_config);
194
195 let catalog = Arc::new(Catalog::new());
197
198 #[cfg(feature = "wal")]
200 let wal = if config.wal_enabled {
201 if let Some(ref db_path) = config.path {
202 std::fs::create_dir_all(db_path)?;
204
205 let wal_path = db_path.join("wal");
206
207 if wal_path.exists() {
209 let recovery = WalRecovery::new(&wal_path);
210 let records = recovery.recover()?;
211 Self::apply_wal_records(&store, &catalog, &records)?;
212 }
213
214 let wal_durability = match config.wal_durability {
216 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
217 crate::config::DurabilityMode::Batch {
218 max_delay_ms,
219 max_records,
220 } => WalDurabilityMode::Batch {
221 max_delay_ms,
222 max_records,
223 },
224 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
225 WalDurabilityMode::Adaptive { target_interval_ms }
226 }
227 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
228 };
229 let wal_config = WalConfig {
230 durability: wal_durability,
231 ..WalConfig::default()
232 };
233 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
234 Some(Arc::new(wal_manager))
235 } else {
236 None
237 }
238 } else {
239 None
240 };
241
242 let query_cache = Arc::new(QueryCache::default());
244
245 Ok(Self {
246 config,
247 store,
248 catalog,
249 #[cfg(feature = "rdf")]
250 rdf_store,
251 tx_manager,
252 buffer_manager,
253 #[cfg(feature = "wal")]
254 wal,
255 query_cache,
256 commit_counter: Arc::new(AtomicUsize::new(0)),
257 is_open: RwLock::new(true),
258 #[cfg(feature = "cdc")]
259 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
260 #[cfg(feature = "embed")]
261 embedding_models: RwLock::new(hashbrown::HashMap::new()),
262 external_store: None,
263 })
264 }
265
266 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
291 config
292 .validate()
293 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
294
295 let dummy_store = Arc::new(LpgStore::new());
296 let tx_manager = Arc::new(TransactionManager::new());
297
298 let buffer_config = BufferManagerConfig {
299 budget: config.memory_limit.unwrap_or_else(|| {
300 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
301 }),
302 spill_path: None,
303 ..BufferManagerConfig::default()
304 };
305 let buffer_manager = BufferManager::new(buffer_config);
306
307 let query_cache = Arc::new(QueryCache::default());
308
309 Ok(Self {
310 config,
311 store: dummy_store,
312 catalog: Arc::new(Catalog::new()),
313 #[cfg(feature = "rdf")]
314 rdf_store: Arc::new(RdfStore::new()),
315 tx_manager,
316 buffer_manager,
317 #[cfg(feature = "wal")]
318 wal: None,
319 query_cache,
320 commit_counter: Arc::new(AtomicUsize::new(0)),
321 is_open: RwLock::new(true),
322 #[cfg(feature = "cdc")]
323 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
324 #[cfg(feature = "embed")]
325 embedding_models: RwLock::new(hashbrown::HashMap::new()),
326 external_store: Some(store),
327 })
328 }
329
330 #[cfg(feature = "wal")]
332 fn apply_wal_records(store: &LpgStore, catalog: &Catalog, records: &[WalRecord]) -> Result<()> {
333 use crate::catalog::{
334 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
335 };
336
337 for record in records {
338 match record {
339 WalRecord::CreateNode { id, labels } => {
340 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
341 store.create_node_with_id(*id, &label_refs);
342 }
343 WalRecord::DeleteNode { id } => {
344 store.delete_node(*id);
345 }
346 WalRecord::CreateEdge {
347 id,
348 src,
349 dst,
350 edge_type,
351 } => {
352 store.create_edge_with_id(*id, *src, *dst, edge_type);
353 }
354 WalRecord::DeleteEdge { id } => {
355 store.delete_edge(*id);
356 }
357 WalRecord::SetNodeProperty { id, key, value } => {
358 store.set_node_property(*id, key, value.clone());
359 }
360 WalRecord::SetEdgeProperty { id, key, value } => {
361 store.set_edge_property(*id, key, value.clone());
362 }
363 WalRecord::AddNodeLabel { id, label } => {
364 store.add_label(*id, label);
365 }
366 WalRecord::RemoveNodeLabel { id, label } => {
367 store.remove_label(*id, label);
368 }
369
370 WalRecord::CreateNodeType {
372 name,
373 properties,
374 constraints,
375 } => {
376 let def = NodeTypeDefinition {
377 name: name.clone(),
378 properties: properties
379 .iter()
380 .map(|(n, t, nullable)| TypedProperty {
381 name: n.clone(),
382 data_type: PropertyDataType::from_type_name(t),
383 nullable: *nullable,
384 default_value: None,
385 })
386 .collect(),
387 constraints: constraints
388 .iter()
389 .map(|(kind, props)| match kind.as_str() {
390 "unique" => TypeConstraint::Unique(props.clone()),
391 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
392 "not_null" if !props.is_empty() => {
393 TypeConstraint::NotNull(props[0].clone())
394 }
395 _ => TypeConstraint::Unique(props.clone()),
396 })
397 .collect(),
398 };
399 let _ = catalog.register_node_type(def);
400 }
401 WalRecord::DropNodeType { name } => {
402 let _ = catalog.drop_node_type(name);
403 }
404 WalRecord::CreateEdgeType {
405 name,
406 properties,
407 constraints,
408 } => {
409 let def = EdgeTypeDefinition {
410 name: name.clone(),
411 properties: properties
412 .iter()
413 .map(|(n, t, nullable)| TypedProperty {
414 name: n.clone(),
415 data_type: PropertyDataType::from_type_name(t),
416 nullable: *nullable,
417 default_value: None,
418 })
419 .collect(),
420 constraints: constraints
421 .iter()
422 .map(|(kind, props)| match kind.as_str() {
423 "unique" => TypeConstraint::Unique(props.clone()),
424 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
425 "not_null" if !props.is_empty() => {
426 TypeConstraint::NotNull(props[0].clone())
427 }
428 _ => TypeConstraint::Unique(props.clone()),
429 })
430 .collect(),
431 };
432 let _ = catalog.register_edge_type_def(def);
433 }
434 WalRecord::DropEdgeType { name } => {
435 let _ = catalog.drop_edge_type_def(name);
436 }
437 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
438 }
441 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
442 }
445 WalRecord::CreateGraphType {
446 name,
447 node_types,
448 edge_types,
449 open,
450 } => {
451 use crate::catalog::GraphTypeDefinition;
452 let def = GraphTypeDefinition {
453 name: name.clone(),
454 allowed_node_types: node_types.clone(),
455 allowed_edge_types: edge_types.clone(),
456 open: *open,
457 };
458 let _ = catalog.register_graph_type(def);
459 }
460 WalRecord::DropGraphType { name } => {
461 let _ = catalog.drop_graph_type(name);
462 }
463 WalRecord::CreateSchema { name } => {
464 let _ = catalog.register_schema_namespace(name.clone());
465 }
466 WalRecord::DropSchema { name } => {
467 let _ = catalog.drop_schema_namespace(name);
468 }
469
470 WalRecord::AlterNodeType { name, alterations } => {
471 for (action, prop_name, type_name, nullable) in alterations {
472 match action.as_str() {
473 "add" => {
474 let prop = TypedProperty {
475 name: prop_name.clone(),
476 data_type: PropertyDataType::from_type_name(type_name),
477 nullable: *nullable,
478 default_value: None,
479 };
480 let _ = catalog.alter_node_type_add_property(name, prop);
481 }
482 "drop" => {
483 let _ = catalog.alter_node_type_drop_property(name, prop_name);
484 }
485 _ => {}
486 }
487 }
488 }
489 WalRecord::AlterEdgeType { name, alterations } => {
490 for (action, prop_name, type_name, nullable) in alterations {
491 match action.as_str() {
492 "add" => {
493 let prop = TypedProperty {
494 name: prop_name.clone(),
495 data_type: PropertyDataType::from_type_name(type_name),
496 nullable: *nullable,
497 default_value: None,
498 };
499 let _ = catalog.alter_edge_type_add_property(name, prop);
500 }
501 "drop" => {
502 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
503 }
504 _ => {}
505 }
506 }
507 }
508 WalRecord::AlterGraphType { name, alterations } => {
509 for (action, type_name) in alterations {
510 match action.as_str() {
511 "add_node" => {
512 let _ =
513 catalog.alter_graph_type_add_node_type(name, type_name.clone());
514 }
515 "drop_node" => {
516 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
517 }
518 "add_edge" => {
519 let _ =
520 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
521 }
522 "drop_edge" => {
523 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
524 }
525 _ => {}
526 }
527 }
528 }
529
530 WalRecord::CreateProcedure {
531 name,
532 params,
533 returns,
534 body,
535 } => {
536 use crate::catalog::ProcedureDefinition;
537 let def = ProcedureDefinition {
538 name: name.clone(),
539 params: params.clone(),
540 returns: returns.clone(),
541 body: body.clone(),
542 };
543 let _ = catalog.register_procedure(def);
544 }
545 WalRecord::DropProcedure { name } => {
546 let _ = catalog.drop_procedure(name);
547 }
548
549 WalRecord::TxCommit { .. }
550 | WalRecord::TxAbort { .. }
551 | WalRecord::Checkpoint { .. } => {
552 }
555 }
556 }
557 Ok(())
558 }
559
560 #[must_use]
583 pub fn session(&self) -> Session {
584 if let Some(ref ext_store) = self.external_store {
585 return Session::with_external_store(
586 Arc::clone(ext_store),
587 Arc::clone(&self.tx_manager),
588 Arc::clone(&self.query_cache),
589 Arc::clone(&self.catalog),
590 self.config.adaptive.clone(),
591 self.config.factorized_execution,
592 self.config.graph_model,
593 self.config.query_timeout,
594 Arc::clone(&self.commit_counter),
595 self.config.gc_interval,
596 );
597 }
598
599 #[cfg(feature = "rdf")]
600 let mut session = Session::with_rdf_store_and_adaptive(
601 Arc::clone(&self.store),
602 Arc::clone(&self.rdf_store),
603 Arc::clone(&self.tx_manager),
604 Arc::clone(&self.query_cache),
605 Arc::clone(&self.catalog),
606 self.config.adaptive.clone(),
607 self.config.factorized_execution,
608 self.config.graph_model,
609 self.config.query_timeout,
610 Arc::clone(&self.commit_counter),
611 self.config.gc_interval,
612 );
613 #[cfg(not(feature = "rdf"))]
614 let mut session = Session::with_adaptive(
615 Arc::clone(&self.store),
616 Arc::clone(&self.tx_manager),
617 Arc::clone(&self.query_cache),
618 Arc::clone(&self.catalog),
619 self.config.adaptive.clone(),
620 self.config.factorized_execution,
621 self.config.graph_model,
622 self.config.query_timeout,
623 Arc::clone(&self.commit_counter),
624 self.config.gc_interval,
625 );
626
627 #[cfg(feature = "wal")]
628 if let Some(ref wal) = self.wal {
629 session.set_wal(Arc::clone(wal));
630 }
631
632 #[cfg(feature = "cdc")]
633 session.set_cdc_log(Arc::clone(&self.cdc_log));
634
635 let _ = &mut session;
637
638 session
639 }
640
641 #[must_use]
643 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
644 &self.config.adaptive
645 }
646
647 #[must_use]
649 pub fn config(&self) -> &Config {
650 &self.config
651 }
652
653 #[must_use]
655 pub fn graph_model(&self) -> crate::config::GraphModel {
656 self.config.graph_model
657 }
658
659 #[must_use]
661 pub fn memory_limit(&self) -> Option<usize> {
662 self.config.memory_limit
663 }
664
665 #[must_use]
673 pub fn store(&self) -> &Arc<LpgStore> {
674 &self.store
675 }
676
677 pub fn create_graph(&self, name: &str) -> bool {
681 self.store.create_graph(name)
682 }
683
684 pub fn drop_graph(&self, name: &str) -> bool {
686 self.store.drop_graph(name)
687 }
688
689 #[must_use]
691 pub fn list_graphs(&self) -> Vec<String> {
692 self.store.graph_names()
693 }
694
695 #[must_use]
703 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
704 if let Some(ref ext_store) = self.external_store {
705 Arc::clone(ext_store)
706 } else {
707 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
708 }
709 }
710
711 pub fn gc(&self) {
717 let min_epoch = self.tx_manager.min_active_epoch();
718 self.store.gc_versions(min_epoch);
719 self.tx_manager.gc();
720 }
721
722 #[must_use]
724 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
725 &self.buffer_manager
726 }
727
728 #[must_use]
730 pub fn query_cache(&self) -> &Arc<QueryCache> {
731 &self.query_cache
732 }
733
734 pub fn close(&self) -> Result<()> {
748 let mut is_open = self.is_open.write();
749 if !*is_open {
750 return Ok(());
751 }
752
753 #[cfg(feature = "wal")]
755 if let Some(ref wal) = self.wal {
756 let epoch = self.store.current_epoch();
757
758 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
760 self.tx_manager.begin()
762 });
763
764 wal.log(&WalRecord::TxCommit {
766 tx_id: checkpoint_tx,
767 })?;
768
769 wal.checkpoint(checkpoint_tx, epoch)?;
771 wal.sync()?;
772 }
773
774 *is_open = false;
775 Ok(())
776 }
777
778 #[cfg(feature = "wal")]
780 #[must_use]
781 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
782 self.wal.as_ref()
783 }
784
785 #[cfg(feature = "wal")]
787 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
788 if let Some(ref wal) = self.wal {
789 wal.log(record)?;
790 }
791 Ok(())
792 }
793}
794
795impl Drop for GrafeoDB {
796 fn drop(&mut self) {
797 if let Err(e) = self.close() {
798 tracing::error!("Error closing database: {}", e);
799 }
800 }
801}
802
803impl crate::admin::AdminService for GrafeoDB {
804 fn info(&self) -> crate::admin::DatabaseInfo {
805 self.info()
806 }
807
808 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
809 self.detailed_stats()
810 }
811
812 fn schema(&self) -> crate::admin::SchemaInfo {
813 self.schema()
814 }
815
816 fn validate(&self) -> crate::admin::ValidationResult {
817 self.validate()
818 }
819
820 fn wal_status(&self) -> crate::admin::WalStatus {
821 self.wal_status()
822 }
823
824 fn wal_checkpoint(&self) -> Result<()> {
825 self.wal_checkpoint()
826 }
827}
828
829#[derive(Debug)]
859pub struct QueryResult {
860 pub columns: Vec<String>,
862 pub column_types: Vec<grafeo_common::types::LogicalType>,
864 pub rows: Vec<Vec<grafeo_common::types::Value>>,
866 pub execution_time_ms: Option<f64>,
868 pub rows_scanned: Option<u64>,
870 pub status_message: Option<String>,
872}
873
874impl QueryResult {
875 #[must_use]
877 pub fn empty() -> Self {
878 Self {
879 columns: Vec::new(),
880 column_types: Vec::new(),
881 rows: Vec::new(),
882 execution_time_ms: None,
883 rows_scanned: None,
884 status_message: None,
885 }
886 }
887
888 #[must_use]
890 pub fn status(msg: impl Into<String>) -> Self {
891 Self {
892 columns: Vec::new(),
893 column_types: Vec::new(),
894 rows: Vec::new(),
895 execution_time_ms: None,
896 rows_scanned: None,
897 status_message: Some(msg.into()),
898 }
899 }
900
901 #[must_use]
903 pub fn new(columns: Vec<String>) -> Self {
904 let len = columns.len();
905 Self {
906 columns,
907 column_types: vec![grafeo_common::types::LogicalType::Any; len],
908 rows: Vec::new(),
909 execution_time_ms: None,
910 rows_scanned: None,
911 status_message: None,
912 }
913 }
914
915 #[must_use]
917 pub fn with_types(
918 columns: Vec<String>,
919 column_types: Vec<grafeo_common::types::LogicalType>,
920 ) -> Self {
921 Self {
922 columns,
923 column_types,
924 rows: Vec::new(),
925 execution_time_ms: None,
926 rows_scanned: None,
927 status_message: None,
928 }
929 }
930
931 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
933 self.execution_time_ms = Some(execution_time_ms);
934 self.rows_scanned = Some(rows_scanned);
935 self
936 }
937
938 #[must_use]
940 pub fn execution_time_ms(&self) -> Option<f64> {
941 self.execution_time_ms
942 }
943
944 #[must_use]
946 pub fn rows_scanned(&self) -> Option<u64> {
947 self.rows_scanned
948 }
949
950 #[must_use]
952 pub fn row_count(&self) -> usize {
953 self.rows.len()
954 }
955
956 #[must_use]
958 pub fn column_count(&self) -> usize {
959 self.columns.len()
960 }
961
962 #[must_use]
964 pub fn is_empty(&self) -> bool {
965 self.rows.is_empty()
966 }
967
968 pub fn scalar<T: FromValue>(&self) -> Result<T> {
977 if self.rows.len() != 1 || self.columns.len() != 1 {
978 return Err(grafeo_common::utils::error::Error::InvalidValue(
979 "Expected single value".to_string(),
980 ));
981 }
982 T::from_value(&self.rows[0][0])
983 }
984
985 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
987 self.rows.iter()
988 }
989}
990
991pub trait FromValue: Sized {
996 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
998}
999
1000impl FromValue for i64 {
1001 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1002 value
1003 .as_int64()
1004 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1005 expected: "INT64".to_string(),
1006 found: value.type_name().to_string(),
1007 })
1008 }
1009}
1010
1011impl FromValue for f64 {
1012 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1013 value
1014 .as_float64()
1015 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1016 expected: "FLOAT64".to_string(),
1017 found: value.type_name().to_string(),
1018 })
1019 }
1020}
1021
1022impl FromValue for String {
1023 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1024 value.as_str().map(String::from).ok_or_else(|| {
1025 grafeo_common::utils::error::Error::TypeMismatch {
1026 expected: "STRING".to_string(),
1027 found: value.type_name().to_string(),
1028 }
1029 })
1030 }
1031}
1032
1033impl FromValue for bool {
1034 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1035 value
1036 .as_bool()
1037 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1038 expected: "BOOL".to_string(),
1039 found: value.type_name().to_string(),
1040 })
1041 }
1042}
1043
1044#[cfg(test)]
1045mod tests {
1046 use super::*;
1047
1048 #[test]
1049 fn test_create_in_memory_database() {
1050 let db = GrafeoDB::new_in_memory();
1051 assert_eq!(db.node_count(), 0);
1052 assert_eq!(db.edge_count(), 0);
1053 }
1054
1055 #[test]
1056 fn test_database_config() {
1057 let config = Config::in_memory().with_threads(4).with_query_logging();
1058
1059 let db = GrafeoDB::with_config(config).unwrap();
1060 assert_eq!(db.config().threads, 4);
1061 assert!(db.config().query_logging);
1062 }
1063
1064 #[test]
1065 fn test_database_session() {
1066 let db = GrafeoDB::new_in_memory();
1067 let _session = db.session();
1068 }
1070
1071 #[cfg(feature = "wal")]
1072 #[test]
1073 fn test_persistent_database_recovery() {
1074 use grafeo_common::types::Value;
1075 use tempfile::tempdir;
1076
1077 let dir = tempdir().unwrap();
1078 let db_path = dir.path().join("test_db");
1079
1080 {
1082 let db = GrafeoDB::open(&db_path).unwrap();
1083
1084 let alice = db.create_node(&["Person"]);
1085 db.set_node_property(alice, "name", Value::from("Alice"));
1086
1087 let bob = db.create_node(&["Person"]);
1088 db.set_node_property(bob, "name", Value::from("Bob"));
1089
1090 let _edge = db.create_edge(alice, bob, "KNOWS");
1091
1092 db.close().unwrap();
1094 }
1095
1096 {
1098 let db = GrafeoDB::open(&db_path).unwrap();
1099
1100 assert_eq!(db.node_count(), 2);
1101 assert_eq!(db.edge_count(), 1);
1102
1103 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1105 assert!(node0.is_some());
1106
1107 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1108 assert!(node1.is_some());
1109 }
1110 }
1111
1112 #[cfg(feature = "wal")]
1113 #[test]
1114 fn test_wal_logging() {
1115 use tempfile::tempdir;
1116
1117 let dir = tempdir().unwrap();
1118 let db_path = dir.path().join("wal_test_db");
1119
1120 let db = GrafeoDB::open(&db_path).unwrap();
1121
1122 let node = db.create_node(&["Test"]);
1124 db.delete_node(node);
1125
1126 if let Some(wal) = db.wal() {
1128 assert!(wal.record_count() > 0);
1129 }
1130
1131 db.close().unwrap();
1132 }
1133
1134 #[cfg(feature = "wal")]
1135 #[test]
1136 fn test_wal_recovery_multiple_sessions() {
1137 use grafeo_common::types::Value;
1139 use tempfile::tempdir;
1140
1141 let dir = tempdir().unwrap();
1142 let db_path = dir.path().join("multi_session_db");
1143
1144 {
1146 let db = GrafeoDB::open(&db_path).unwrap();
1147 let alice = db.create_node(&["Person"]);
1148 db.set_node_property(alice, "name", Value::from("Alice"));
1149 db.close().unwrap();
1150 }
1151
1152 {
1154 let db = GrafeoDB::open(&db_path).unwrap();
1155 assert_eq!(db.node_count(), 1); let bob = db.create_node(&["Person"]);
1157 db.set_node_property(bob, "name", Value::from("Bob"));
1158 db.close().unwrap();
1159 }
1160
1161 {
1163 let db = GrafeoDB::open(&db_path).unwrap();
1164 assert_eq!(db.node_count(), 2);
1165
1166 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1168 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1169
1170 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1171 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1172 }
1173 }
1174
1175 #[cfg(feature = "wal")]
1176 #[test]
1177 fn test_database_consistency_after_mutations() {
1178 use grafeo_common::types::Value;
1180 use tempfile::tempdir;
1181
1182 let dir = tempdir().unwrap();
1183 let db_path = dir.path().join("consistency_db");
1184
1185 {
1186 let db = GrafeoDB::open(&db_path).unwrap();
1187
1188 let a = db.create_node(&["Node"]);
1190 let b = db.create_node(&["Node"]);
1191 let c = db.create_node(&["Node"]);
1192
1193 let e1 = db.create_edge(a, b, "LINKS");
1195 let _e2 = db.create_edge(b, c, "LINKS");
1196
1197 db.delete_edge(e1);
1199 db.delete_node(b);
1200
1201 db.set_node_property(a, "value", Value::Int64(1));
1203 db.set_node_property(c, "value", Value::Int64(3));
1204
1205 db.close().unwrap();
1206 }
1207
1208 {
1210 let db = GrafeoDB::open(&db_path).unwrap();
1211
1212 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1216 assert!(node_a.is_some());
1217
1218 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1219 assert!(node_c.is_some());
1220
1221 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1223 assert!(node_b.is_none());
1224 }
1225 }
1226
1227 #[cfg(feature = "wal")]
1228 #[test]
1229 fn test_close_is_idempotent() {
1230 use tempfile::tempdir;
1232
1233 let dir = tempdir().unwrap();
1234 let db_path = dir.path().join("close_test_db");
1235
1236 let db = GrafeoDB::open(&db_path).unwrap();
1237 db.create_node(&["Test"]);
1238
1239 assert!(db.close().is_ok());
1241
1242 assert!(db.close().is_ok());
1244 }
1245
1246 #[test]
1247 fn test_query_result_has_metrics() {
1248 let db = GrafeoDB::new_in_memory();
1250 db.create_node(&["Person"]);
1251 db.create_node(&["Person"]);
1252
1253 #[cfg(feature = "gql")]
1254 {
1255 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1256
1257 assert!(result.execution_time_ms.is_some());
1259 assert!(result.rows_scanned.is_some());
1260 assert!(result.execution_time_ms.unwrap() >= 0.0);
1261 assert_eq!(result.rows_scanned.unwrap(), 2);
1262 }
1263 }
1264
1265 #[test]
1266 fn test_empty_query_result_metrics() {
1267 let db = GrafeoDB::new_in_memory();
1269 db.create_node(&["Person"]);
1270
1271 #[cfg(feature = "gql")]
1272 {
1273 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1275
1276 assert!(result.execution_time_ms.is_some());
1277 assert!(result.rows_scanned.is_some());
1278 assert_eq!(result.rows_scanned.unwrap(), 0);
1279 }
1280 }
1281
1282 #[cfg(feature = "cdc")]
1283 mod cdc_integration {
1284 use super::*;
1285
1286 #[test]
1287 fn test_node_lifecycle_history() {
1288 let db = GrafeoDB::new_in_memory();
1289
1290 let id = db.create_node(&["Person"]);
1292 db.set_node_property(id, "name", "Alice".into());
1294 db.set_node_property(id, "name", "Bob".into());
1295 db.delete_node(id);
1297
1298 let history = db.history(id).unwrap();
1299 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1301 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1302 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1304 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1306 }
1307
1308 #[test]
1309 fn test_edge_lifecycle_history() {
1310 let db = GrafeoDB::new_in_memory();
1311
1312 let alice = db.create_node(&["Person"]);
1313 let bob = db.create_node(&["Person"]);
1314 let edge = db.create_edge(alice, bob, "KNOWS");
1315 db.set_edge_property(edge, "since", 2024i64.into());
1316 db.delete_edge(edge);
1317
1318 let history = db.history(edge).unwrap();
1319 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1321 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1322 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1323 }
1324
1325 #[test]
1326 fn test_create_node_with_props_cdc() {
1327 let db = GrafeoDB::new_in_memory();
1328
1329 let id = db.create_node_with_props(
1330 &["Person"],
1331 vec![
1332 ("name", grafeo_common::types::Value::from("Alice")),
1333 ("age", grafeo_common::types::Value::from(30i64)),
1334 ],
1335 );
1336
1337 let history = db.history(id).unwrap();
1338 assert_eq!(history.len(), 1);
1339 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1340 let after = history[0].after.as_ref().unwrap();
1342 assert_eq!(after.len(), 2);
1343 }
1344
1345 #[test]
1346 fn test_changes_between() {
1347 let db = GrafeoDB::new_in_memory();
1348
1349 let id1 = db.create_node(&["A"]);
1350 let _id2 = db.create_node(&["B"]);
1351 db.set_node_property(id1, "x", 1i64.into());
1352
1353 let changes = db
1355 .changes_between(
1356 grafeo_common::types::EpochId(0),
1357 grafeo_common::types::EpochId(u64::MAX),
1358 )
1359 .unwrap();
1360 assert_eq!(changes.len(), 3); }
1362 }
1363
1364 #[test]
1365 fn test_with_store_basic() {
1366 use grafeo_core::graph::lpg::LpgStore;
1367
1368 let store = Arc::new(LpgStore::new());
1369 let n1 = store.create_node(&["Person"]);
1370 store.set_node_property(n1, "name", "Alice".into());
1371
1372 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1373 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1374
1375 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1376 assert_eq!(result.rows.len(), 1);
1377 }
1378
1379 #[test]
1380 fn test_with_store_session() {
1381 use grafeo_core::graph::lpg::LpgStore;
1382
1383 let store = Arc::new(LpgStore::new());
1384 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1385 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1386
1387 let session = db.session();
1388 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1389 assert_eq!(result.rows.len(), 1);
1390 }
1391
1392 #[test]
1393 fn test_with_store_mutations() {
1394 use grafeo_core::graph::lpg::LpgStore;
1395
1396 let store = Arc::new(LpgStore::new());
1397 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1398 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1399
1400 let session = db.session();
1401 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
1402
1403 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1404 assert_eq!(result.rows.len(), 1);
1405
1406 assert_eq!(store.node_count(), 1);
1408 }
1409}