1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22#[cfg(feature = "wal")]
23pub(crate) mod wal_store;
24
25#[cfg(feature = "wal")]
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::AtomicUsize;
29
30use parking_lot::RwLock;
31
32#[cfg(feature = "wal")]
33use grafeo_adapters::storage::wal::{
34 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
35};
36use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
37use grafeo_common::utils::error::Result;
38use grafeo_core::graph::GraphStoreMut;
39use grafeo_core::graph::lpg::LpgStore;
40#[cfg(feature = "rdf")]
41use grafeo_core::graph::rdf::RdfStore;
42
43use crate::catalog::Catalog;
44use crate::config::Config;
45use crate::query::cache::QueryCache;
46use crate::session::Session;
47use crate::transaction::TransactionManager;
48
49pub struct GrafeoDB {
72 pub(super) config: Config,
74 pub(super) store: Arc<LpgStore>,
76 pub(super) catalog: Arc<Catalog>,
78 #[cfg(feature = "rdf")]
80 pub(super) rdf_store: Arc<RdfStore>,
81 pub(super) transaction_manager: Arc<TransactionManager>,
83 pub(super) buffer_manager: Arc<BufferManager>,
85 #[cfg(feature = "wal")]
87 pub(super) wal: Option<Arc<LpgWal>>,
88 pub(super) query_cache: Arc<QueryCache>,
90 pub(super) commit_counter: Arc<AtomicUsize>,
92 pub(super) is_open: RwLock<bool>,
94 #[cfg(feature = "cdc")]
96 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
97 #[cfg(feature = "embed")]
99 pub(super) embedding_models:
100 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
101 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
104}
105
106impl GrafeoDB {
107 #[must_use]
123 pub fn new_in_memory() -> Self {
124 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
125 }
126
127 #[cfg(feature = "wal")]
146 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
147 Self::with_config(Config::persistent(path.as_ref()))
148 }
149
150 pub fn with_config(config: Config) -> Result<Self> {
174 config
176 .validate()
177 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
178
179 let store = Arc::new(LpgStore::new()?);
180 #[cfg(feature = "rdf")]
181 let rdf_store = Arc::new(RdfStore::new());
182 let transaction_manager = Arc::new(TransactionManager::new());
183
184 let buffer_config = BufferManagerConfig {
186 budget: config.memory_limit.unwrap_or_else(|| {
187 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
188 }),
189 spill_path: config
190 .spill_path
191 .clone()
192 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
193 ..BufferManagerConfig::default()
194 };
195 let buffer_manager = BufferManager::new(buffer_config);
196
197 let catalog = Arc::new(Catalog::new());
199
200 #[cfg(feature = "wal")]
202 let wal = if config.wal_enabled {
203 if let Some(ref db_path) = config.path {
204 std::fs::create_dir_all(db_path)?;
206
207 let wal_path = db_path.join("wal");
208
209 if wal_path.exists() {
211 let recovery = WalRecovery::new(&wal_path);
212 let records = recovery.recover()?;
213 Self::apply_wal_records(&store, &catalog, &records)?;
214 }
215
216 let wal_durability = match config.wal_durability {
218 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
219 crate::config::DurabilityMode::Batch {
220 max_delay_ms,
221 max_records,
222 } => WalDurabilityMode::Batch {
223 max_delay_ms,
224 max_records,
225 },
226 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
227 WalDurabilityMode::Adaptive { target_interval_ms }
228 }
229 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
230 };
231 let wal_config = WalConfig {
232 durability: wal_durability,
233 ..WalConfig::default()
234 };
235 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
236 Some(Arc::new(wal_manager))
237 } else {
238 None
239 }
240 } else {
241 None
242 };
243
244 let query_cache = Arc::new(QueryCache::default());
246
247 Ok(Self {
248 config,
249 store,
250 catalog,
251 #[cfg(feature = "rdf")]
252 rdf_store,
253 transaction_manager,
254 buffer_manager,
255 #[cfg(feature = "wal")]
256 wal,
257 query_cache,
258 commit_counter: Arc::new(AtomicUsize::new(0)),
259 is_open: RwLock::new(true),
260 #[cfg(feature = "cdc")]
261 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
262 #[cfg(feature = "embed")]
263 embedding_models: RwLock::new(hashbrown::HashMap::new()),
264 external_store: None,
265 })
266 }
267
268 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
293 config
294 .validate()
295 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
296
297 let dummy_store = Arc::new(LpgStore::new()?);
298 let transaction_manager = Arc::new(TransactionManager::new());
299
300 let buffer_config = BufferManagerConfig {
301 budget: config.memory_limit.unwrap_or_else(|| {
302 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
303 }),
304 spill_path: None,
305 ..BufferManagerConfig::default()
306 };
307 let buffer_manager = BufferManager::new(buffer_config);
308
309 let query_cache = Arc::new(QueryCache::default());
310
311 Ok(Self {
312 config,
313 store: dummy_store,
314 catalog: Arc::new(Catalog::new()),
315 #[cfg(feature = "rdf")]
316 rdf_store: Arc::new(RdfStore::new()),
317 transaction_manager,
318 buffer_manager,
319 #[cfg(feature = "wal")]
320 wal: None,
321 query_cache,
322 commit_counter: Arc::new(AtomicUsize::new(0)),
323 is_open: RwLock::new(true),
324 #[cfg(feature = "cdc")]
325 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
326 #[cfg(feature = "embed")]
327 embedding_models: RwLock::new(hashbrown::HashMap::new()),
328 external_store: Some(store),
329 })
330 }
331
332 #[cfg(feature = "wal")]
334 fn apply_wal_records(store: &LpgStore, catalog: &Catalog, records: &[WalRecord]) -> Result<()> {
335 use crate::catalog::{
336 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
337 };
338
339 for record in records {
340 match record {
341 WalRecord::CreateNode { id, labels } => {
342 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
343 store.create_node_with_id(*id, &label_refs);
344 }
345 WalRecord::DeleteNode { id } => {
346 store.delete_node(*id);
347 }
348 WalRecord::CreateEdge {
349 id,
350 src,
351 dst,
352 edge_type,
353 } => {
354 store.create_edge_with_id(*id, *src, *dst, edge_type);
355 }
356 WalRecord::DeleteEdge { id } => {
357 store.delete_edge(*id);
358 }
359 WalRecord::SetNodeProperty { id, key, value } => {
360 store.set_node_property(*id, key, value.clone());
361 }
362 WalRecord::SetEdgeProperty { id, key, value } => {
363 store.set_edge_property(*id, key, value.clone());
364 }
365 WalRecord::AddNodeLabel { id, label } => {
366 store.add_label(*id, label);
367 }
368 WalRecord::RemoveNodeLabel { id, label } => {
369 store.remove_label(*id, label);
370 }
371 WalRecord::RemoveNodeProperty { id, key } => {
372 store.remove_node_property(*id, key);
373 }
374 WalRecord::RemoveEdgeProperty { id, key } => {
375 store.remove_edge_property(*id, key);
376 }
377
378 WalRecord::CreateNodeType {
380 name,
381 properties,
382 constraints,
383 } => {
384 let def = NodeTypeDefinition {
385 name: name.clone(),
386 properties: properties
387 .iter()
388 .map(|(n, t, nullable)| TypedProperty {
389 name: n.clone(),
390 data_type: PropertyDataType::from_type_name(t),
391 nullable: *nullable,
392 default_value: None,
393 })
394 .collect(),
395 constraints: constraints
396 .iter()
397 .map(|(kind, props)| match kind.as_str() {
398 "unique" => TypeConstraint::Unique(props.clone()),
399 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
400 "not_null" if !props.is_empty() => {
401 TypeConstraint::NotNull(props[0].clone())
402 }
403 _ => TypeConstraint::Unique(props.clone()),
404 })
405 .collect(),
406 };
407 let _ = catalog.register_node_type(def);
408 }
409 WalRecord::DropNodeType { name } => {
410 let _ = catalog.drop_node_type(name);
411 }
412 WalRecord::CreateEdgeType {
413 name,
414 properties,
415 constraints,
416 } => {
417 let def = EdgeTypeDefinition {
418 name: name.clone(),
419 properties: properties
420 .iter()
421 .map(|(n, t, nullable)| TypedProperty {
422 name: n.clone(),
423 data_type: PropertyDataType::from_type_name(t),
424 nullable: *nullable,
425 default_value: None,
426 })
427 .collect(),
428 constraints: constraints
429 .iter()
430 .map(|(kind, props)| match kind.as_str() {
431 "unique" => TypeConstraint::Unique(props.clone()),
432 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
433 "not_null" if !props.is_empty() => {
434 TypeConstraint::NotNull(props[0].clone())
435 }
436 _ => TypeConstraint::Unique(props.clone()),
437 })
438 .collect(),
439 };
440 let _ = catalog.register_edge_type_def(def);
441 }
442 WalRecord::DropEdgeType { name } => {
443 let _ = catalog.drop_edge_type_def(name);
444 }
445 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
446 }
449 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
450 }
453 WalRecord::CreateGraphType {
454 name,
455 node_types,
456 edge_types,
457 open,
458 } => {
459 use crate::catalog::GraphTypeDefinition;
460 let def = GraphTypeDefinition {
461 name: name.clone(),
462 allowed_node_types: node_types.clone(),
463 allowed_edge_types: edge_types.clone(),
464 open: *open,
465 };
466 let _ = catalog.register_graph_type(def);
467 }
468 WalRecord::DropGraphType { name } => {
469 let _ = catalog.drop_graph_type(name);
470 }
471 WalRecord::CreateSchema { name } => {
472 let _ = catalog.register_schema_namespace(name.clone());
473 }
474 WalRecord::DropSchema { name } => {
475 let _ = catalog.drop_schema_namespace(name);
476 }
477
478 WalRecord::AlterNodeType { name, alterations } => {
479 for (action, prop_name, type_name, nullable) in alterations {
480 match action.as_str() {
481 "add" => {
482 let prop = TypedProperty {
483 name: prop_name.clone(),
484 data_type: PropertyDataType::from_type_name(type_name),
485 nullable: *nullable,
486 default_value: None,
487 };
488 let _ = catalog.alter_node_type_add_property(name, prop);
489 }
490 "drop" => {
491 let _ = catalog.alter_node_type_drop_property(name, prop_name);
492 }
493 _ => {}
494 }
495 }
496 }
497 WalRecord::AlterEdgeType { name, alterations } => {
498 for (action, prop_name, type_name, nullable) in alterations {
499 match action.as_str() {
500 "add" => {
501 let prop = TypedProperty {
502 name: prop_name.clone(),
503 data_type: PropertyDataType::from_type_name(type_name),
504 nullable: *nullable,
505 default_value: None,
506 };
507 let _ = catalog.alter_edge_type_add_property(name, prop);
508 }
509 "drop" => {
510 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
511 }
512 _ => {}
513 }
514 }
515 }
516 WalRecord::AlterGraphType { name, alterations } => {
517 for (action, type_name) in alterations {
518 match action.as_str() {
519 "add_node" => {
520 let _ =
521 catalog.alter_graph_type_add_node_type(name, type_name.clone());
522 }
523 "drop_node" => {
524 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
525 }
526 "add_edge" => {
527 let _ =
528 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
529 }
530 "drop_edge" => {
531 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
532 }
533 _ => {}
534 }
535 }
536 }
537
538 WalRecord::CreateProcedure {
539 name,
540 params,
541 returns,
542 body,
543 } => {
544 use crate::catalog::ProcedureDefinition;
545 let def = ProcedureDefinition {
546 name: name.clone(),
547 params: params.clone(),
548 returns: returns.clone(),
549 body: body.clone(),
550 };
551 let _ = catalog.register_procedure(def);
552 }
553 WalRecord::DropProcedure { name } => {
554 let _ = catalog.drop_procedure(name);
555 }
556
557 WalRecord::TransactionCommit { .. }
558 | WalRecord::TransactionAbort { .. }
559 | WalRecord::Checkpoint { .. } => {
560 }
563 }
564 }
565 Ok(())
566 }
567
568 #[must_use]
591 pub fn session(&self) -> Session {
592 if let Some(ref ext_store) = self.external_store {
593 return Session::with_external_store(
594 Arc::clone(ext_store),
595 Arc::clone(&self.transaction_manager),
596 Arc::clone(&self.query_cache),
597 Arc::clone(&self.catalog),
598 self.config.adaptive.clone(),
599 self.config.factorized_execution,
600 self.config.graph_model,
601 self.config.query_timeout,
602 Arc::clone(&self.commit_counter),
603 self.config.gc_interval,
604 );
605 }
606
607 #[cfg(feature = "rdf")]
608 let mut session = Session::with_rdf_store_and_adaptive(
609 Arc::clone(&self.store),
610 Arc::clone(&self.rdf_store),
611 Arc::clone(&self.transaction_manager),
612 Arc::clone(&self.query_cache),
613 Arc::clone(&self.catalog),
614 self.config.adaptive.clone(),
615 self.config.factorized_execution,
616 self.config.graph_model,
617 self.config.query_timeout,
618 Arc::clone(&self.commit_counter),
619 self.config.gc_interval,
620 );
621 #[cfg(not(feature = "rdf"))]
622 let mut session = Session::with_adaptive(
623 Arc::clone(&self.store),
624 Arc::clone(&self.transaction_manager),
625 Arc::clone(&self.query_cache),
626 Arc::clone(&self.catalog),
627 self.config.adaptive.clone(),
628 self.config.factorized_execution,
629 self.config.graph_model,
630 self.config.query_timeout,
631 Arc::clone(&self.commit_counter),
632 self.config.gc_interval,
633 );
634
635 #[cfg(feature = "wal")]
636 if let Some(ref wal) = self.wal {
637 session.set_wal(Arc::clone(wal));
638 }
639
640 #[cfg(feature = "cdc")]
641 session.set_cdc_log(Arc::clone(&self.cdc_log));
642
643 let _ = &mut session;
645
646 session
647 }
648
649 #[must_use]
651 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
652 &self.config.adaptive
653 }
654
655 #[must_use]
657 pub fn config(&self) -> &Config {
658 &self.config
659 }
660
661 #[must_use]
663 pub fn graph_model(&self) -> crate::config::GraphModel {
664 self.config.graph_model
665 }
666
667 #[must_use]
669 pub fn memory_limit(&self) -> Option<usize> {
670 self.config.memory_limit
671 }
672
673 #[must_use]
681 pub fn store(&self) -> &Arc<LpgStore> {
682 &self.store
683 }
684
685 pub fn create_graph(&self, name: &str) -> Result<bool> {
693 Ok(self.store.create_graph(name)?)
694 }
695
696 pub fn drop_graph(&self, name: &str) -> bool {
698 self.store.drop_graph(name)
699 }
700
701 #[must_use]
703 pub fn list_graphs(&self) -> Vec<String> {
704 self.store.graph_names()
705 }
706
707 #[must_use]
715 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
716 if let Some(ref ext_store) = self.external_store {
717 Arc::clone(ext_store)
718 } else {
719 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
720 }
721 }
722
723 pub fn gc(&self) {
729 let min_epoch = self.transaction_manager.min_active_epoch();
730 self.store.gc_versions(min_epoch);
731 self.transaction_manager.gc();
732 }
733
734 #[must_use]
736 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
737 &self.buffer_manager
738 }
739
740 #[must_use]
742 pub fn query_cache(&self) -> &Arc<QueryCache> {
743 &self.query_cache
744 }
745
746 pub fn clear_plan_cache(&self) {
752 self.query_cache.clear();
753 }
754
755 pub fn close(&self) -> Result<()> {
769 let mut is_open = self.is_open.write();
770 if !*is_open {
771 return Ok(());
772 }
773
774 #[cfg(feature = "wal")]
776 if let Some(ref wal) = self.wal {
777 let epoch = self.store.current_epoch();
778
779 let checkpoint_tx = self
781 .transaction_manager
782 .last_assigned_transaction_id()
783 .unwrap_or_else(|| {
784 self.transaction_manager.begin()
786 });
787
788 wal.log(&WalRecord::TransactionCommit {
790 transaction_id: checkpoint_tx,
791 })?;
792
793 wal.checkpoint(checkpoint_tx, epoch)?;
795 wal.sync()?;
796 }
797
798 *is_open = false;
799 Ok(())
800 }
801
802 #[cfg(feature = "wal")]
804 #[must_use]
805 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
806 self.wal.as_ref()
807 }
808
809 #[cfg(feature = "wal")]
811 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
812 if let Some(ref wal) = self.wal {
813 wal.log(record)?;
814 }
815 Ok(())
816 }
817}
818
819impl Drop for GrafeoDB {
820 fn drop(&mut self) {
821 if let Err(e) = self.close() {
822 tracing::error!("Error closing database: {}", e);
823 }
824 }
825}
826
827impl crate::admin::AdminService for GrafeoDB {
828 fn info(&self) -> crate::admin::DatabaseInfo {
829 self.info()
830 }
831
832 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
833 self.detailed_stats()
834 }
835
836 fn schema(&self) -> crate::admin::SchemaInfo {
837 self.schema()
838 }
839
840 fn validate(&self) -> crate::admin::ValidationResult {
841 self.validate()
842 }
843
844 fn wal_status(&self) -> crate::admin::WalStatus {
845 self.wal_status()
846 }
847
848 fn wal_checkpoint(&self) -> Result<()> {
849 self.wal_checkpoint()
850 }
851}
852
853#[derive(Debug)]
883pub struct QueryResult {
884 pub columns: Vec<String>,
886 pub column_types: Vec<grafeo_common::types::LogicalType>,
888 pub rows: Vec<Vec<grafeo_common::types::Value>>,
890 pub execution_time_ms: Option<f64>,
892 pub rows_scanned: Option<u64>,
894 pub status_message: Option<String>,
896 pub gql_status: grafeo_common::utils::GqlStatus,
898}
899
900impl QueryResult {
901 #[must_use]
903 pub fn empty() -> Self {
904 Self {
905 columns: Vec::new(),
906 column_types: Vec::new(),
907 rows: Vec::new(),
908 execution_time_ms: None,
909 rows_scanned: None,
910 status_message: None,
911 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
912 }
913 }
914
915 #[must_use]
917 pub fn status(msg: impl Into<String>) -> Self {
918 Self {
919 columns: Vec::new(),
920 column_types: Vec::new(),
921 rows: Vec::new(),
922 execution_time_ms: None,
923 rows_scanned: None,
924 status_message: Some(msg.into()),
925 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
926 }
927 }
928
929 #[must_use]
931 pub fn new(columns: Vec<String>) -> Self {
932 let len = columns.len();
933 Self {
934 columns,
935 column_types: vec![grafeo_common::types::LogicalType::Any; len],
936 rows: Vec::new(),
937 execution_time_ms: None,
938 rows_scanned: None,
939 status_message: None,
940 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
941 }
942 }
943
944 #[must_use]
946 pub fn with_types(
947 columns: Vec<String>,
948 column_types: Vec<grafeo_common::types::LogicalType>,
949 ) -> Self {
950 Self {
951 columns,
952 column_types,
953 rows: Vec::new(),
954 execution_time_ms: None,
955 rows_scanned: None,
956 status_message: None,
957 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
958 }
959 }
960
961 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
963 self.execution_time_ms = Some(execution_time_ms);
964 self.rows_scanned = Some(rows_scanned);
965 self
966 }
967
968 #[must_use]
970 pub fn execution_time_ms(&self) -> Option<f64> {
971 self.execution_time_ms
972 }
973
974 #[must_use]
976 pub fn rows_scanned(&self) -> Option<u64> {
977 self.rows_scanned
978 }
979
980 #[must_use]
982 pub fn row_count(&self) -> usize {
983 self.rows.len()
984 }
985
986 #[must_use]
988 pub fn column_count(&self) -> usize {
989 self.columns.len()
990 }
991
992 #[must_use]
994 pub fn is_empty(&self) -> bool {
995 self.rows.is_empty()
996 }
997
998 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1007 if self.rows.len() != 1 || self.columns.len() != 1 {
1008 return Err(grafeo_common::utils::error::Error::InvalidValue(
1009 "Expected single value".to_string(),
1010 ));
1011 }
1012 T::from_value(&self.rows[0][0])
1013 }
1014
1015 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1017 self.rows.iter()
1018 }
1019}
1020
1021pub trait FromValue: Sized {
1026 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1028}
1029
1030impl FromValue for i64 {
1031 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1032 value
1033 .as_int64()
1034 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1035 expected: "INT64".to_string(),
1036 found: value.type_name().to_string(),
1037 })
1038 }
1039}
1040
1041impl FromValue for f64 {
1042 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1043 value
1044 .as_float64()
1045 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1046 expected: "FLOAT64".to_string(),
1047 found: value.type_name().to_string(),
1048 })
1049 }
1050}
1051
1052impl FromValue for String {
1053 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1054 value.as_str().map(String::from).ok_or_else(|| {
1055 grafeo_common::utils::error::Error::TypeMismatch {
1056 expected: "STRING".to_string(),
1057 found: value.type_name().to_string(),
1058 }
1059 })
1060 }
1061}
1062
1063impl FromValue for bool {
1064 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1065 value
1066 .as_bool()
1067 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1068 expected: "BOOL".to_string(),
1069 found: value.type_name().to_string(),
1070 })
1071 }
1072}
1073
1074#[cfg(test)]
1075mod tests {
1076 use super::*;
1077
1078 #[test]
1079 fn test_create_in_memory_database() {
1080 let db = GrafeoDB::new_in_memory();
1081 assert_eq!(db.node_count(), 0);
1082 assert_eq!(db.edge_count(), 0);
1083 }
1084
1085 #[test]
1086 fn test_database_config() {
1087 let config = Config::in_memory().with_threads(4).with_query_logging();
1088
1089 let db = GrafeoDB::with_config(config).unwrap();
1090 assert_eq!(db.config().threads, 4);
1091 assert!(db.config().query_logging);
1092 }
1093
1094 #[test]
1095 fn test_database_session() {
1096 let db = GrafeoDB::new_in_memory();
1097 let _session = db.session();
1098 }
1100
1101 #[cfg(feature = "wal")]
1102 #[test]
1103 fn test_persistent_database_recovery() {
1104 use grafeo_common::types::Value;
1105 use tempfile::tempdir;
1106
1107 let dir = tempdir().unwrap();
1108 let db_path = dir.path().join("test_db");
1109
1110 {
1112 let db = GrafeoDB::open(&db_path).unwrap();
1113
1114 let alix = db.create_node(&["Person"]);
1115 db.set_node_property(alix, "name", Value::from("Alix"));
1116
1117 let gus = db.create_node(&["Person"]);
1118 db.set_node_property(gus, "name", Value::from("Gus"));
1119
1120 let _edge = db.create_edge(alix, gus, "KNOWS");
1121
1122 db.close().unwrap();
1124 }
1125
1126 {
1128 let db = GrafeoDB::open(&db_path).unwrap();
1129
1130 assert_eq!(db.node_count(), 2);
1131 assert_eq!(db.edge_count(), 1);
1132
1133 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1135 assert!(node0.is_some());
1136
1137 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1138 assert!(node1.is_some());
1139 }
1140 }
1141
1142 #[cfg(feature = "wal")]
1143 #[test]
1144 fn test_wal_logging() {
1145 use tempfile::tempdir;
1146
1147 let dir = tempdir().unwrap();
1148 let db_path = dir.path().join("wal_test_db");
1149
1150 let db = GrafeoDB::open(&db_path).unwrap();
1151
1152 let node = db.create_node(&["Test"]);
1154 db.delete_node(node);
1155
1156 if let Some(wal) = db.wal() {
1158 assert!(wal.record_count() > 0);
1159 }
1160
1161 db.close().unwrap();
1162 }
1163
1164 #[cfg(feature = "wal")]
1165 #[test]
1166 fn test_wal_recovery_multiple_sessions() {
1167 use grafeo_common::types::Value;
1169 use tempfile::tempdir;
1170
1171 let dir = tempdir().unwrap();
1172 let db_path = dir.path().join("multi_session_db");
1173
1174 {
1176 let db = GrafeoDB::open(&db_path).unwrap();
1177 let alix = db.create_node(&["Person"]);
1178 db.set_node_property(alix, "name", Value::from("Alix"));
1179 db.close().unwrap();
1180 }
1181
1182 {
1184 let db = GrafeoDB::open(&db_path).unwrap();
1185 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1187 db.set_node_property(gus, "name", Value::from("Gus"));
1188 db.close().unwrap();
1189 }
1190
1191 {
1193 let db = GrafeoDB::open(&db_path).unwrap();
1194 assert_eq!(db.node_count(), 2);
1195
1196 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1198 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1199
1200 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1201 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1202 }
1203 }
1204
1205 #[cfg(feature = "wal")]
1206 #[test]
1207 fn test_database_consistency_after_mutations() {
1208 use grafeo_common::types::Value;
1210 use tempfile::tempdir;
1211
1212 let dir = tempdir().unwrap();
1213 let db_path = dir.path().join("consistency_db");
1214
1215 {
1216 let db = GrafeoDB::open(&db_path).unwrap();
1217
1218 let a = db.create_node(&["Node"]);
1220 let b = db.create_node(&["Node"]);
1221 let c = db.create_node(&["Node"]);
1222
1223 let e1 = db.create_edge(a, b, "LINKS");
1225 let _e2 = db.create_edge(b, c, "LINKS");
1226
1227 db.delete_edge(e1);
1229 db.delete_node(b);
1230
1231 db.set_node_property(a, "value", Value::Int64(1));
1233 db.set_node_property(c, "value", Value::Int64(3));
1234
1235 db.close().unwrap();
1236 }
1237
1238 {
1240 let db = GrafeoDB::open(&db_path).unwrap();
1241
1242 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1246 assert!(node_a.is_some());
1247
1248 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1249 assert!(node_c.is_some());
1250
1251 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1253 assert!(node_b.is_none());
1254 }
1255 }
1256
1257 #[cfg(feature = "wal")]
1258 #[test]
1259 fn test_close_is_idempotent() {
1260 use tempfile::tempdir;
1262
1263 let dir = tempdir().unwrap();
1264 let db_path = dir.path().join("close_test_db");
1265
1266 let db = GrafeoDB::open(&db_path).unwrap();
1267 db.create_node(&["Test"]);
1268
1269 assert!(db.close().is_ok());
1271
1272 assert!(db.close().is_ok());
1274 }
1275
1276 #[test]
1277 fn test_query_result_has_metrics() {
1278 let db = GrafeoDB::new_in_memory();
1280 db.create_node(&["Person"]);
1281 db.create_node(&["Person"]);
1282
1283 #[cfg(feature = "gql")]
1284 {
1285 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1286
1287 assert!(result.execution_time_ms.is_some());
1289 assert!(result.rows_scanned.is_some());
1290 assert!(result.execution_time_ms.unwrap() >= 0.0);
1291 assert_eq!(result.rows_scanned.unwrap(), 2);
1292 }
1293 }
1294
1295 #[test]
1296 fn test_empty_query_result_metrics() {
1297 let db = GrafeoDB::new_in_memory();
1299 db.create_node(&["Person"]);
1300
1301 #[cfg(feature = "gql")]
1302 {
1303 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1305
1306 assert!(result.execution_time_ms.is_some());
1307 assert!(result.rows_scanned.is_some());
1308 assert_eq!(result.rows_scanned.unwrap(), 0);
1309 }
1310 }
1311
1312 #[cfg(feature = "cdc")]
1313 mod cdc_integration {
1314 use super::*;
1315
1316 #[test]
1317 fn test_node_lifecycle_history() {
1318 let db = GrafeoDB::new_in_memory();
1319
1320 let id = db.create_node(&["Person"]);
1322 db.set_node_property(id, "name", "Alix".into());
1324 db.set_node_property(id, "name", "Gus".into());
1325 db.delete_node(id);
1327
1328 let history = db.history(id).unwrap();
1329 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1331 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1332 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1334 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1336 }
1337
1338 #[test]
1339 fn test_edge_lifecycle_history() {
1340 let db = GrafeoDB::new_in_memory();
1341
1342 let alix = db.create_node(&["Person"]);
1343 let gus = db.create_node(&["Person"]);
1344 let edge = db.create_edge(alix, gus, "KNOWS");
1345 db.set_edge_property(edge, "since", 2024i64.into());
1346 db.delete_edge(edge);
1347
1348 let history = db.history(edge).unwrap();
1349 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1351 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1352 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1353 }
1354
1355 #[test]
1356 fn test_create_node_with_props_cdc() {
1357 let db = GrafeoDB::new_in_memory();
1358
1359 let id = db.create_node_with_props(
1360 &["Person"],
1361 vec![
1362 ("name", grafeo_common::types::Value::from("Alix")),
1363 ("age", grafeo_common::types::Value::from(30i64)),
1364 ],
1365 );
1366
1367 let history = db.history(id).unwrap();
1368 assert_eq!(history.len(), 1);
1369 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1370 let after = history[0].after.as_ref().unwrap();
1372 assert_eq!(after.len(), 2);
1373 }
1374
1375 #[test]
1376 fn test_changes_between() {
1377 let db = GrafeoDB::new_in_memory();
1378
1379 let id1 = db.create_node(&["A"]);
1380 let _id2 = db.create_node(&["B"]);
1381 db.set_node_property(id1, "x", 1i64.into());
1382
1383 let changes = db
1385 .changes_between(
1386 grafeo_common::types::EpochId(0),
1387 grafeo_common::types::EpochId(u64::MAX),
1388 )
1389 .unwrap();
1390 assert_eq!(changes.len(), 3); }
1392 }
1393
1394 #[test]
1395 fn test_with_store_basic() {
1396 use grafeo_core::graph::lpg::LpgStore;
1397
1398 let store = Arc::new(LpgStore::new().unwrap());
1399 let n1 = store.create_node(&["Person"]);
1400 store.set_node_property(n1, "name", "Alix".into());
1401
1402 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1403 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1404
1405 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1406 assert_eq!(result.rows.len(), 1);
1407 }
1408
1409 #[test]
1410 fn test_with_store_session() {
1411 use grafeo_core::graph::lpg::LpgStore;
1412
1413 let store = Arc::new(LpgStore::new().unwrap());
1414 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1415 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1416
1417 let session = db.session();
1418 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1419 assert_eq!(result.rows.len(), 1);
1420 }
1421
1422 #[test]
1423 fn test_with_store_mutations() {
1424 use grafeo_core::graph::lpg::LpgStore;
1425
1426 let store = Arc::new(LpgStore::new().unwrap());
1427 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1428 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1429
1430 let session = db.session();
1431 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1432
1433 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1434 assert_eq!(result.rows.len(), 1);
1435
1436 assert_eq!(store.node_count(), 1);
1438 }
1439}