1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22#[cfg(feature = "wal")]
23pub(crate) mod wal_store;
24
25#[cfg(feature = "wal")]
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::AtomicUsize;
29
30use parking_lot::RwLock;
31
32#[cfg(feature = "wal")]
33use grafeo_adapters::storage::wal::{
34 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
35};
36use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
37use grafeo_common::utils::error::Result;
38use grafeo_core::graph::GraphStoreMut;
39use grafeo_core::graph::lpg::LpgStore;
40#[cfg(feature = "rdf")]
41use grafeo_core::graph::rdf::RdfStore;
42
43use crate::catalog::Catalog;
44use crate::config::Config;
45use crate::query::cache::QueryCache;
46use crate::session::Session;
47use crate::transaction::TransactionManager;
48
49pub struct GrafeoDB {
72 pub(super) config: Config,
74 pub(super) store: Arc<LpgStore>,
76 pub(super) catalog: Arc<Catalog>,
78 #[cfg(feature = "rdf")]
80 pub(super) rdf_store: Arc<RdfStore>,
81 pub(super) transaction_manager: Arc<TransactionManager>,
83 pub(super) buffer_manager: Arc<BufferManager>,
85 #[cfg(feature = "wal")]
87 pub(super) wal: Option<Arc<LpgWal>>,
88 pub(super) query_cache: Arc<QueryCache>,
90 pub(super) commit_counter: Arc<AtomicUsize>,
92 pub(super) is_open: RwLock<bool>,
94 #[cfg(feature = "cdc")]
96 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
97 #[cfg(feature = "embed")]
99 pub(super) embedding_models:
100 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
101 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
104}
105
106impl GrafeoDB {
107 #[must_use]
123 pub fn new_in_memory() -> Self {
124 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
125 }
126
127 #[cfg(feature = "wal")]
146 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
147 Self::with_config(Config::persistent(path.as_ref()))
148 }
149
150 pub fn with_config(config: Config) -> Result<Self> {
174 config
176 .validate()
177 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
178
179 let store = Arc::new(LpgStore::new()?);
180 #[cfg(feature = "rdf")]
181 let rdf_store = Arc::new(RdfStore::new());
182 let transaction_manager = Arc::new(TransactionManager::new());
183
184 let buffer_config = BufferManagerConfig {
186 budget: config.memory_limit.unwrap_or_else(|| {
187 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
188 }),
189 spill_path: config
190 .spill_path
191 .clone()
192 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
193 ..BufferManagerConfig::default()
194 };
195 let buffer_manager = BufferManager::new(buffer_config);
196
197 let catalog = Arc::new(Catalog::new());
199
200 #[cfg(feature = "wal")]
202 let wal = if config.wal_enabled {
203 if let Some(ref db_path) = config.path {
204 std::fs::create_dir_all(db_path)?;
206
207 let wal_path = db_path.join("wal");
208
209 if wal_path.exists() {
211 let recovery = WalRecovery::new(&wal_path);
212 let records = recovery.recover()?;
213 Self::apply_wal_records(&store, &catalog, &records)?;
214 }
215
216 let wal_durability = match config.wal_durability {
218 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
219 crate::config::DurabilityMode::Batch {
220 max_delay_ms,
221 max_records,
222 } => WalDurabilityMode::Batch {
223 max_delay_ms,
224 max_records,
225 },
226 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
227 WalDurabilityMode::Adaptive { target_interval_ms }
228 }
229 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
230 };
231 let wal_config = WalConfig {
232 durability: wal_durability,
233 ..WalConfig::default()
234 };
235 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
236 Some(Arc::new(wal_manager))
237 } else {
238 None
239 }
240 } else {
241 None
242 };
243
244 let query_cache = Arc::new(QueryCache::default());
246
247 Ok(Self {
248 config,
249 store,
250 catalog,
251 #[cfg(feature = "rdf")]
252 rdf_store,
253 transaction_manager,
254 buffer_manager,
255 #[cfg(feature = "wal")]
256 wal,
257 query_cache,
258 commit_counter: Arc::new(AtomicUsize::new(0)),
259 is_open: RwLock::new(true),
260 #[cfg(feature = "cdc")]
261 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
262 #[cfg(feature = "embed")]
263 embedding_models: RwLock::new(hashbrown::HashMap::new()),
264 external_store: None,
265 })
266 }
267
268 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
293 config
294 .validate()
295 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
296
297 let dummy_store = Arc::new(LpgStore::new()?);
298 let transaction_manager = Arc::new(TransactionManager::new());
299
300 let buffer_config = BufferManagerConfig {
301 budget: config.memory_limit.unwrap_or_else(|| {
302 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
303 }),
304 spill_path: None,
305 ..BufferManagerConfig::default()
306 };
307 let buffer_manager = BufferManager::new(buffer_config);
308
309 let query_cache = Arc::new(QueryCache::default());
310
311 Ok(Self {
312 config,
313 store: dummy_store,
314 catalog: Arc::new(Catalog::new()),
315 #[cfg(feature = "rdf")]
316 rdf_store: Arc::new(RdfStore::new()),
317 transaction_manager,
318 buffer_manager,
319 #[cfg(feature = "wal")]
320 wal: None,
321 query_cache,
322 commit_counter: Arc::new(AtomicUsize::new(0)),
323 is_open: RwLock::new(true),
324 #[cfg(feature = "cdc")]
325 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
326 #[cfg(feature = "embed")]
327 embedding_models: RwLock::new(hashbrown::HashMap::new()),
328 external_store: Some(store),
329 })
330 }
331
332 #[cfg(feature = "wal")]
334 fn apply_wal_records(store: &LpgStore, catalog: &Catalog, records: &[WalRecord]) -> Result<()> {
335 use crate::catalog::{
336 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
337 };
338
339 for record in records {
340 match record {
341 WalRecord::CreateNode { id, labels } => {
342 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
343 store.create_node_with_id(*id, &label_refs);
344 }
345 WalRecord::DeleteNode { id } => {
346 store.delete_node(*id);
347 }
348 WalRecord::CreateEdge {
349 id,
350 src,
351 dst,
352 edge_type,
353 } => {
354 store.create_edge_with_id(*id, *src, *dst, edge_type);
355 }
356 WalRecord::DeleteEdge { id } => {
357 store.delete_edge(*id);
358 }
359 WalRecord::SetNodeProperty { id, key, value } => {
360 store.set_node_property(*id, key, value.clone());
361 }
362 WalRecord::SetEdgeProperty { id, key, value } => {
363 store.set_edge_property(*id, key, value.clone());
364 }
365 WalRecord::AddNodeLabel { id, label } => {
366 store.add_label(*id, label);
367 }
368 WalRecord::RemoveNodeLabel { id, label } => {
369 store.remove_label(*id, label);
370 }
371 WalRecord::RemoveNodeProperty { id, key } => {
372 store.remove_node_property(*id, key);
373 }
374 WalRecord::RemoveEdgeProperty { id, key } => {
375 store.remove_edge_property(*id, key);
376 }
377
378 WalRecord::CreateNodeType {
380 name,
381 properties,
382 constraints,
383 } => {
384 let def = NodeTypeDefinition {
385 name: name.clone(),
386 properties: properties
387 .iter()
388 .map(|(n, t, nullable)| TypedProperty {
389 name: n.clone(),
390 data_type: PropertyDataType::from_type_name(t),
391 nullable: *nullable,
392 default_value: None,
393 })
394 .collect(),
395 constraints: constraints
396 .iter()
397 .map(|(kind, props)| match kind.as_str() {
398 "unique" => TypeConstraint::Unique(props.clone()),
399 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
400 "not_null" if !props.is_empty() => {
401 TypeConstraint::NotNull(props[0].clone())
402 }
403 _ => TypeConstraint::Unique(props.clone()),
404 })
405 .collect(),
406 };
407 let _ = catalog.register_node_type(def);
408 }
409 WalRecord::DropNodeType { name } => {
410 let _ = catalog.drop_node_type(name);
411 }
412 WalRecord::CreateEdgeType {
413 name,
414 properties,
415 constraints,
416 } => {
417 let def = EdgeTypeDefinition {
418 name: name.clone(),
419 properties: properties
420 .iter()
421 .map(|(n, t, nullable)| TypedProperty {
422 name: n.clone(),
423 data_type: PropertyDataType::from_type_name(t),
424 nullable: *nullable,
425 default_value: None,
426 })
427 .collect(),
428 constraints: constraints
429 .iter()
430 .map(|(kind, props)| match kind.as_str() {
431 "unique" => TypeConstraint::Unique(props.clone()),
432 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
433 "not_null" if !props.is_empty() => {
434 TypeConstraint::NotNull(props[0].clone())
435 }
436 _ => TypeConstraint::Unique(props.clone()),
437 })
438 .collect(),
439 };
440 let _ = catalog.register_edge_type_def(def);
441 }
442 WalRecord::DropEdgeType { name } => {
443 let _ = catalog.drop_edge_type_def(name);
444 }
445 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
446 }
449 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
450 }
453 WalRecord::CreateGraphType {
454 name,
455 node_types,
456 edge_types,
457 open,
458 } => {
459 use crate::catalog::GraphTypeDefinition;
460 let def = GraphTypeDefinition {
461 name: name.clone(),
462 allowed_node_types: node_types.clone(),
463 allowed_edge_types: edge_types.clone(),
464 open: *open,
465 };
466 let _ = catalog.register_graph_type(def);
467 }
468 WalRecord::DropGraphType { name } => {
469 let _ = catalog.drop_graph_type(name);
470 }
471 WalRecord::CreateSchema { name } => {
472 let _ = catalog.register_schema_namespace(name.clone());
473 }
474 WalRecord::DropSchema { name } => {
475 let _ = catalog.drop_schema_namespace(name);
476 }
477
478 WalRecord::AlterNodeType { name, alterations } => {
479 for (action, prop_name, type_name, nullable) in alterations {
480 match action.as_str() {
481 "add" => {
482 let prop = TypedProperty {
483 name: prop_name.clone(),
484 data_type: PropertyDataType::from_type_name(type_name),
485 nullable: *nullable,
486 default_value: None,
487 };
488 let _ = catalog.alter_node_type_add_property(name, prop);
489 }
490 "drop" => {
491 let _ = catalog.alter_node_type_drop_property(name, prop_name);
492 }
493 _ => {}
494 }
495 }
496 }
497 WalRecord::AlterEdgeType { name, alterations } => {
498 for (action, prop_name, type_name, nullable) in alterations {
499 match action.as_str() {
500 "add" => {
501 let prop = TypedProperty {
502 name: prop_name.clone(),
503 data_type: PropertyDataType::from_type_name(type_name),
504 nullable: *nullable,
505 default_value: None,
506 };
507 let _ = catalog.alter_edge_type_add_property(name, prop);
508 }
509 "drop" => {
510 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
511 }
512 _ => {}
513 }
514 }
515 }
516 WalRecord::AlterGraphType { name, alterations } => {
517 for (action, type_name) in alterations {
518 match action.as_str() {
519 "add_node" => {
520 let _ =
521 catalog.alter_graph_type_add_node_type(name, type_name.clone());
522 }
523 "drop_node" => {
524 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
525 }
526 "add_edge" => {
527 let _ =
528 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
529 }
530 "drop_edge" => {
531 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
532 }
533 _ => {}
534 }
535 }
536 }
537
538 WalRecord::CreateProcedure {
539 name,
540 params,
541 returns,
542 body,
543 } => {
544 use crate::catalog::ProcedureDefinition;
545 let def = ProcedureDefinition {
546 name: name.clone(),
547 params: params.clone(),
548 returns: returns.clone(),
549 body: body.clone(),
550 };
551 let _ = catalog.register_procedure(def);
552 }
553 WalRecord::DropProcedure { name } => {
554 let _ = catalog.drop_procedure(name);
555 }
556
557 WalRecord::TransactionCommit { .. }
558 | WalRecord::TransactionAbort { .. }
559 | WalRecord::Checkpoint { .. } => {
560 }
563 }
564 }
565 Ok(())
566 }
567
568 #[must_use]
591 pub fn session(&self) -> Session {
592 if let Some(ref ext_store) = self.external_store {
593 return Session::with_external_store(
594 Arc::clone(ext_store),
595 Arc::clone(&self.transaction_manager),
596 Arc::clone(&self.query_cache),
597 Arc::clone(&self.catalog),
598 self.config.adaptive.clone(),
599 self.config.factorized_execution,
600 self.config.graph_model,
601 self.config.query_timeout,
602 Arc::clone(&self.commit_counter),
603 self.config.gc_interval,
604 );
605 }
606
607 #[cfg(feature = "rdf")]
608 let mut session = Session::with_rdf_store_and_adaptive(
609 Arc::clone(&self.store),
610 Arc::clone(&self.rdf_store),
611 Arc::clone(&self.transaction_manager),
612 Arc::clone(&self.query_cache),
613 Arc::clone(&self.catalog),
614 self.config.adaptive.clone(),
615 self.config.factorized_execution,
616 self.config.graph_model,
617 self.config.query_timeout,
618 Arc::clone(&self.commit_counter),
619 self.config.gc_interval,
620 );
621 #[cfg(not(feature = "rdf"))]
622 let mut session = Session::with_adaptive(
623 Arc::clone(&self.store),
624 Arc::clone(&self.transaction_manager),
625 Arc::clone(&self.query_cache),
626 Arc::clone(&self.catalog),
627 self.config.adaptive.clone(),
628 self.config.factorized_execution,
629 self.config.graph_model,
630 self.config.query_timeout,
631 Arc::clone(&self.commit_counter),
632 self.config.gc_interval,
633 );
634
635 #[cfg(feature = "wal")]
636 if let Some(ref wal) = self.wal {
637 session.set_wal(Arc::clone(wal));
638 }
639
640 #[cfg(feature = "cdc")]
641 session.set_cdc_log(Arc::clone(&self.cdc_log));
642
643 let _ = &mut session;
645
646 session
647 }
648
649 #[must_use]
651 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
652 &self.config.adaptive
653 }
654
655 #[must_use]
657 pub fn config(&self) -> &Config {
658 &self.config
659 }
660
661 #[must_use]
663 pub fn graph_model(&self) -> crate::config::GraphModel {
664 self.config.graph_model
665 }
666
667 #[must_use]
669 pub fn memory_limit(&self) -> Option<usize> {
670 self.config.memory_limit
671 }
672
673 #[must_use]
681 pub fn store(&self) -> &Arc<LpgStore> {
682 &self.store
683 }
684
685 pub fn create_graph(&self, name: &str) -> Result<bool> {
693 Ok(self.store.create_graph(name)?)
694 }
695
696 pub fn drop_graph(&self, name: &str) -> bool {
698 self.store.drop_graph(name)
699 }
700
701 #[must_use]
703 pub fn list_graphs(&self) -> Vec<String> {
704 self.store.graph_names()
705 }
706
707 #[must_use]
715 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
716 if let Some(ref ext_store) = self.external_store {
717 Arc::clone(ext_store)
718 } else {
719 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
720 }
721 }
722
723 pub fn gc(&self) {
729 let min_epoch = self.transaction_manager.min_active_epoch();
730 self.store.gc_versions(min_epoch);
731 self.transaction_manager.gc();
732 }
733
734 #[must_use]
736 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
737 &self.buffer_manager
738 }
739
740 #[must_use]
742 pub fn query_cache(&self) -> &Arc<QueryCache> {
743 &self.query_cache
744 }
745
746 pub fn close(&self) -> Result<()> {
760 let mut is_open = self.is_open.write();
761 if !*is_open {
762 return Ok(());
763 }
764
765 #[cfg(feature = "wal")]
767 if let Some(ref wal) = self.wal {
768 let epoch = self.store.current_epoch();
769
770 let checkpoint_tx = self
772 .transaction_manager
773 .last_assigned_transaction_id()
774 .unwrap_or_else(|| {
775 self.transaction_manager.begin()
777 });
778
779 wal.log(&WalRecord::TransactionCommit {
781 transaction_id: checkpoint_tx,
782 })?;
783
784 wal.checkpoint(checkpoint_tx, epoch)?;
786 wal.sync()?;
787 }
788
789 *is_open = false;
790 Ok(())
791 }
792
793 #[cfg(feature = "wal")]
795 #[must_use]
796 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
797 self.wal.as_ref()
798 }
799
800 #[cfg(feature = "wal")]
802 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
803 if let Some(ref wal) = self.wal {
804 wal.log(record)?;
805 }
806 Ok(())
807 }
808}
809
810impl Drop for GrafeoDB {
811 fn drop(&mut self) {
812 if let Err(e) = self.close() {
813 tracing::error!("Error closing database: {}", e);
814 }
815 }
816}
817
818impl crate::admin::AdminService for GrafeoDB {
819 fn info(&self) -> crate::admin::DatabaseInfo {
820 self.info()
821 }
822
823 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
824 self.detailed_stats()
825 }
826
827 fn schema(&self) -> crate::admin::SchemaInfo {
828 self.schema()
829 }
830
831 fn validate(&self) -> crate::admin::ValidationResult {
832 self.validate()
833 }
834
835 fn wal_status(&self) -> crate::admin::WalStatus {
836 self.wal_status()
837 }
838
839 fn wal_checkpoint(&self) -> Result<()> {
840 self.wal_checkpoint()
841 }
842}
843
844#[derive(Debug)]
874pub struct QueryResult {
875 pub columns: Vec<String>,
877 pub column_types: Vec<grafeo_common::types::LogicalType>,
879 pub rows: Vec<Vec<grafeo_common::types::Value>>,
881 pub execution_time_ms: Option<f64>,
883 pub rows_scanned: Option<u64>,
885 pub status_message: Option<String>,
887 pub gql_status: grafeo_common::utils::GqlStatus,
889}
890
891impl QueryResult {
892 #[must_use]
894 pub fn empty() -> Self {
895 Self {
896 columns: Vec::new(),
897 column_types: Vec::new(),
898 rows: Vec::new(),
899 execution_time_ms: None,
900 rows_scanned: None,
901 status_message: None,
902 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
903 }
904 }
905
906 #[must_use]
908 pub fn status(msg: impl Into<String>) -> Self {
909 Self {
910 columns: Vec::new(),
911 column_types: Vec::new(),
912 rows: Vec::new(),
913 execution_time_ms: None,
914 rows_scanned: None,
915 status_message: Some(msg.into()),
916 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
917 }
918 }
919
920 #[must_use]
922 pub fn new(columns: Vec<String>) -> Self {
923 let len = columns.len();
924 Self {
925 columns,
926 column_types: vec![grafeo_common::types::LogicalType::Any; len],
927 rows: Vec::new(),
928 execution_time_ms: None,
929 rows_scanned: None,
930 status_message: None,
931 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
932 }
933 }
934
935 #[must_use]
937 pub fn with_types(
938 columns: Vec<String>,
939 column_types: Vec<grafeo_common::types::LogicalType>,
940 ) -> Self {
941 Self {
942 columns,
943 column_types,
944 rows: Vec::new(),
945 execution_time_ms: None,
946 rows_scanned: None,
947 status_message: None,
948 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
949 }
950 }
951
952 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
954 self.execution_time_ms = Some(execution_time_ms);
955 self.rows_scanned = Some(rows_scanned);
956 self
957 }
958
959 #[must_use]
961 pub fn execution_time_ms(&self) -> Option<f64> {
962 self.execution_time_ms
963 }
964
965 #[must_use]
967 pub fn rows_scanned(&self) -> Option<u64> {
968 self.rows_scanned
969 }
970
971 #[must_use]
973 pub fn row_count(&self) -> usize {
974 self.rows.len()
975 }
976
977 #[must_use]
979 pub fn column_count(&self) -> usize {
980 self.columns.len()
981 }
982
983 #[must_use]
985 pub fn is_empty(&self) -> bool {
986 self.rows.is_empty()
987 }
988
989 pub fn scalar<T: FromValue>(&self) -> Result<T> {
998 if self.rows.len() != 1 || self.columns.len() != 1 {
999 return Err(grafeo_common::utils::error::Error::InvalidValue(
1000 "Expected single value".to_string(),
1001 ));
1002 }
1003 T::from_value(&self.rows[0][0])
1004 }
1005
1006 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1008 self.rows.iter()
1009 }
1010}
1011
1012pub trait FromValue: Sized {
1017 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1019}
1020
1021impl FromValue for i64 {
1022 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1023 value
1024 .as_int64()
1025 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1026 expected: "INT64".to_string(),
1027 found: value.type_name().to_string(),
1028 })
1029 }
1030}
1031
1032impl FromValue for f64 {
1033 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1034 value
1035 .as_float64()
1036 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1037 expected: "FLOAT64".to_string(),
1038 found: value.type_name().to_string(),
1039 })
1040 }
1041}
1042
1043impl FromValue for String {
1044 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1045 value.as_str().map(String::from).ok_or_else(|| {
1046 grafeo_common::utils::error::Error::TypeMismatch {
1047 expected: "STRING".to_string(),
1048 found: value.type_name().to_string(),
1049 }
1050 })
1051 }
1052}
1053
1054impl FromValue for bool {
1055 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1056 value
1057 .as_bool()
1058 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1059 expected: "BOOL".to_string(),
1060 found: value.type_name().to_string(),
1061 })
1062 }
1063}
1064
1065#[cfg(test)]
1066mod tests {
1067 use super::*;
1068
1069 #[test]
1070 fn test_create_in_memory_database() {
1071 let db = GrafeoDB::new_in_memory();
1072 assert_eq!(db.node_count(), 0);
1073 assert_eq!(db.edge_count(), 0);
1074 }
1075
1076 #[test]
1077 fn test_database_config() {
1078 let config = Config::in_memory().with_threads(4).with_query_logging();
1079
1080 let db = GrafeoDB::with_config(config).unwrap();
1081 assert_eq!(db.config().threads, 4);
1082 assert!(db.config().query_logging);
1083 }
1084
1085 #[test]
1086 fn test_database_session() {
1087 let db = GrafeoDB::new_in_memory();
1088 let _session = db.session();
1089 }
1091
1092 #[cfg(feature = "wal")]
1093 #[test]
1094 fn test_persistent_database_recovery() {
1095 use grafeo_common::types::Value;
1096 use tempfile::tempdir;
1097
1098 let dir = tempdir().unwrap();
1099 let db_path = dir.path().join("test_db");
1100
1101 {
1103 let db = GrafeoDB::open(&db_path).unwrap();
1104
1105 let alix = db.create_node(&["Person"]);
1106 db.set_node_property(alix, "name", Value::from("Alix"));
1107
1108 let gus = db.create_node(&["Person"]);
1109 db.set_node_property(gus, "name", Value::from("Gus"));
1110
1111 let _edge = db.create_edge(alix, gus, "KNOWS");
1112
1113 db.close().unwrap();
1115 }
1116
1117 {
1119 let db = GrafeoDB::open(&db_path).unwrap();
1120
1121 assert_eq!(db.node_count(), 2);
1122 assert_eq!(db.edge_count(), 1);
1123
1124 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1126 assert!(node0.is_some());
1127
1128 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1129 assert!(node1.is_some());
1130 }
1131 }
1132
1133 #[cfg(feature = "wal")]
1134 #[test]
1135 fn test_wal_logging() {
1136 use tempfile::tempdir;
1137
1138 let dir = tempdir().unwrap();
1139 let db_path = dir.path().join("wal_test_db");
1140
1141 let db = GrafeoDB::open(&db_path).unwrap();
1142
1143 let node = db.create_node(&["Test"]);
1145 db.delete_node(node);
1146
1147 if let Some(wal) = db.wal() {
1149 assert!(wal.record_count() > 0);
1150 }
1151
1152 db.close().unwrap();
1153 }
1154
1155 #[cfg(feature = "wal")]
1156 #[test]
1157 fn test_wal_recovery_multiple_sessions() {
1158 use grafeo_common::types::Value;
1160 use tempfile::tempdir;
1161
1162 let dir = tempdir().unwrap();
1163 let db_path = dir.path().join("multi_session_db");
1164
1165 {
1167 let db = GrafeoDB::open(&db_path).unwrap();
1168 let alix = db.create_node(&["Person"]);
1169 db.set_node_property(alix, "name", Value::from("Alix"));
1170 db.close().unwrap();
1171 }
1172
1173 {
1175 let db = GrafeoDB::open(&db_path).unwrap();
1176 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1178 db.set_node_property(gus, "name", Value::from("Gus"));
1179 db.close().unwrap();
1180 }
1181
1182 {
1184 let db = GrafeoDB::open(&db_path).unwrap();
1185 assert_eq!(db.node_count(), 2);
1186
1187 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1189 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1190
1191 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1192 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1193 }
1194 }
1195
1196 #[cfg(feature = "wal")]
1197 #[test]
1198 fn test_database_consistency_after_mutations() {
1199 use grafeo_common::types::Value;
1201 use tempfile::tempdir;
1202
1203 let dir = tempdir().unwrap();
1204 let db_path = dir.path().join("consistency_db");
1205
1206 {
1207 let db = GrafeoDB::open(&db_path).unwrap();
1208
1209 let a = db.create_node(&["Node"]);
1211 let b = db.create_node(&["Node"]);
1212 let c = db.create_node(&["Node"]);
1213
1214 let e1 = db.create_edge(a, b, "LINKS");
1216 let _e2 = db.create_edge(b, c, "LINKS");
1217
1218 db.delete_edge(e1);
1220 db.delete_node(b);
1221
1222 db.set_node_property(a, "value", Value::Int64(1));
1224 db.set_node_property(c, "value", Value::Int64(3));
1225
1226 db.close().unwrap();
1227 }
1228
1229 {
1231 let db = GrafeoDB::open(&db_path).unwrap();
1232
1233 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1237 assert!(node_a.is_some());
1238
1239 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1240 assert!(node_c.is_some());
1241
1242 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1244 assert!(node_b.is_none());
1245 }
1246 }
1247
1248 #[cfg(feature = "wal")]
1249 #[test]
1250 fn test_close_is_idempotent() {
1251 use tempfile::tempdir;
1253
1254 let dir = tempdir().unwrap();
1255 let db_path = dir.path().join("close_test_db");
1256
1257 let db = GrafeoDB::open(&db_path).unwrap();
1258 db.create_node(&["Test"]);
1259
1260 assert!(db.close().is_ok());
1262
1263 assert!(db.close().is_ok());
1265 }
1266
1267 #[test]
1268 fn test_query_result_has_metrics() {
1269 let db = GrafeoDB::new_in_memory();
1271 db.create_node(&["Person"]);
1272 db.create_node(&["Person"]);
1273
1274 #[cfg(feature = "gql")]
1275 {
1276 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1277
1278 assert!(result.execution_time_ms.is_some());
1280 assert!(result.rows_scanned.is_some());
1281 assert!(result.execution_time_ms.unwrap() >= 0.0);
1282 assert_eq!(result.rows_scanned.unwrap(), 2);
1283 }
1284 }
1285
1286 #[test]
1287 fn test_empty_query_result_metrics() {
1288 let db = GrafeoDB::new_in_memory();
1290 db.create_node(&["Person"]);
1291
1292 #[cfg(feature = "gql")]
1293 {
1294 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1296
1297 assert!(result.execution_time_ms.is_some());
1298 assert!(result.rows_scanned.is_some());
1299 assert_eq!(result.rows_scanned.unwrap(), 0);
1300 }
1301 }
1302
1303 #[cfg(feature = "cdc")]
1304 mod cdc_integration {
1305 use super::*;
1306
1307 #[test]
1308 fn test_node_lifecycle_history() {
1309 let db = GrafeoDB::new_in_memory();
1310
1311 let id = db.create_node(&["Person"]);
1313 db.set_node_property(id, "name", "Alix".into());
1315 db.set_node_property(id, "name", "Gus".into());
1316 db.delete_node(id);
1318
1319 let history = db.history(id).unwrap();
1320 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1322 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1323 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1325 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1327 }
1328
1329 #[test]
1330 fn test_edge_lifecycle_history() {
1331 let db = GrafeoDB::new_in_memory();
1332
1333 let alix = db.create_node(&["Person"]);
1334 let gus = db.create_node(&["Person"]);
1335 let edge = db.create_edge(alix, gus, "KNOWS");
1336 db.set_edge_property(edge, "since", 2024i64.into());
1337 db.delete_edge(edge);
1338
1339 let history = db.history(edge).unwrap();
1340 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1342 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1343 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1344 }
1345
1346 #[test]
1347 fn test_create_node_with_props_cdc() {
1348 let db = GrafeoDB::new_in_memory();
1349
1350 let id = db.create_node_with_props(
1351 &["Person"],
1352 vec![
1353 ("name", grafeo_common::types::Value::from("Alix")),
1354 ("age", grafeo_common::types::Value::from(30i64)),
1355 ],
1356 );
1357
1358 let history = db.history(id).unwrap();
1359 assert_eq!(history.len(), 1);
1360 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1361 let after = history[0].after.as_ref().unwrap();
1363 assert_eq!(after.len(), 2);
1364 }
1365
1366 #[test]
1367 fn test_changes_between() {
1368 let db = GrafeoDB::new_in_memory();
1369
1370 let id1 = db.create_node(&["A"]);
1371 let _id2 = db.create_node(&["B"]);
1372 db.set_node_property(id1, "x", 1i64.into());
1373
1374 let changes = db
1376 .changes_between(
1377 grafeo_common::types::EpochId(0),
1378 grafeo_common::types::EpochId(u64::MAX),
1379 )
1380 .unwrap();
1381 assert_eq!(changes.len(), 3); }
1383 }
1384
1385 #[test]
1386 fn test_with_store_basic() {
1387 use grafeo_core::graph::lpg::LpgStore;
1388
1389 let store = Arc::new(LpgStore::new().unwrap());
1390 let n1 = store.create_node(&["Person"]);
1391 store.set_node_property(n1, "name", "Alix".into());
1392
1393 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1394 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1395
1396 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1397 assert_eq!(result.rows.len(), 1);
1398 }
1399
1400 #[test]
1401 fn test_with_store_session() {
1402 use grafeo_core::graph::lpg::LpgStore;
1403
1404 let store = Arc::new(LpgStore::new().unwrap());
1405 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1406 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1407
1408 let session = db.session();
1409 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1410 assert_eq!(result.rows.len(), 1);
1411 }
1412
1413 #[test]
1414 fn test_with_store_mutations() {
1415 use grafeo_core::graph::lpg::LpgStore;
1416
1417 let store = Arc::new(LpgStore::new().unwrap());
1418 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1419 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1420
1421 let session = db.session();
1422 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1423
1424 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1425 assert_eq!(result.rows.len(), 1);
1426
1427 assert_eq!(store.node_count(), 1);
1429 }
1430}