1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22#[cfg(feature = "wal")]
23pub(crate) mod wal_store;
24
25#[cfg(feature = "wal")]
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::AtomicUsize;
29
30use parking_lot::RwLock;
31
32#[cfg(feature = "wal")]
33use grafeo_adapters::storage::wal::{
34 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
35};
36use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
37use grafeo_common::utils::error::Result;
38use grafeo_core::graph::GraphStoreMut;
39use grafeo_core::graph::lpg::LpgStore;
40#[cfg(feature = "rdf")]
41use grafeo_core::graph::rdf::RdfStore;
42
43use crate::catalog::Catalog;
44use crate::config::Config;
45use crate::query::cache::QueryCache;
46use crate::session::Session;
47use crate::transaction::TransactionManager;
48
49pub struct GrafeoDB {
72 pub(super) config: Config,
74 pub(super) store: Arc<LpgStore>,
76 pub(super) catalog: Arc<Catalog>,
78 #[cfg(feature = "rdf")]
80 pub(super) rdf_store: Arc<RdfStore>,
81 pub(super) transaction_manager: Arc<TransactionManager>,
83 pub(super) buffer_manager: Arc<BufferManager>,
85 #[cfg(feature = "wal")]
87 pub(super) wal: Option<Arc<LpgWal>>,
88 #[cfg(feature = "wal")]
92 pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
93 pub(super) query_cache: Arc<QueryCache>,
95 pub(super) commit_counter: Arc<AtomicUsize>,
97 pub(super) is_open: RwLock<bool>,
99 #[cfg(feature = "cdc")]
101 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
102 #[cfg(feature = "embed")]
104 pub(super) embedding_models:
105 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
106 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
109 current_graph: RwLock<Option<String>>,
113}
114
115impl GrafeoDB {
116 #[must_use]
137 pub fn new_in_memory() -> Self {
138 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
139 }
140
141 #[cfg(feature = "wal")]
160 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
161 Self::with_config(Config::persistent(path.as_ref()))
162 }
163
164 pub fn with_config(config: Config) -> Result<Self> {
188 config
190 .validate()
191 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
192
193 let store = Arc::new(LpgStore::new()?);
194 #[cfg(feature = "rdf")]
195 let rdf_store = Arc::new(RdfStore::new());
196 let transaction_manager = Arc::new(TransactionManager::new());
197
198 let buffer_config = BufferManagerConfig {
200 budget: config.memory_limit.unwrap_or_else(|| {
201 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
202 }),
203 spill_path: config
204 .spill_path
205 .clone()
206 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
207 ..BufferManagerConfig::default()
208 };
209 let buffer_manager = BufferManager::new(buffer_config);
210
211 let catalog = Arc::new(Catalog::new());
213
214 #[cfg(feature = "wal")]
216 let wal = if config.wal_enabled {
217 if let Some(ref db_path) = config.path {
218 std::fs::create_dir_all(db_path)?;
220
221 let wal_path = db_path.join("wal");
222
223 if wal_path.exists() {
225 let recovery = WalRecovery::new(&wal_path);
226 let records = recovery.recover()?;
227 Self::apply_wal_records(
228 &store,
229 &catalog,
230 #[cfg(feature = "rdf")]
231 &rdf_store,
232 &records,
233 )?;
234 }
235
236 let wal_durability = match config.wal_durability {
238 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
239 crate::config::DurabilityMode::Batch {
240 max_delay_ms,
241 max_records,
242 } => WalDurabilityMode::Batch {
243 max_delay_ms,
244 max_records,
245 },
246 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
247 WalDurabilityMode::Adaptive { target_interval_ms }
248 }
249 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
250 };
251 let wal_config = WalConfig {
252 durability: wal_durability,
253 ..WalConfig::default()
254 };
255 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
256 Some(Arc::new(wal_manager))
257 } else {
258 None
259 }
260 } else {
261 None
262 };
263
264 let query_cache = Arc::new(QueryCache::default());
266
267 Ok(Self {
268 config,
269 store,
270 catalog,
271 #[cfg(feature = "rdf")]
272 rdf_store,
273 transaction_manager,
274 buffer_manager,
275 #[cfg(feature = "wal")]
276 wal,
277 #[cfg(feature = "wal")]
278 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
279 query_cache,
280 commit_counter: Arc::new(AtomicUsize::new(0)),
281 is_open: RwLock::new(true),
282 #[cfg(feature = "cdc")]
283 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
284 #[cfg(feature = "embed")]
285 embedding_models: RwLock::new(hashbrown::HashMap::new()),
286 external_store: None,
287 current_graph: RwLock::new(None),
288 })
289 }
290
291 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
316 config
317 .validate()
318 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
319
320 let dummy_store = Arc::new(LpgStore::new()?);
321 let transaction_manager = Arc::new(TransactionManager::new());
322
323 let buffer_config = BufferManagerConfig {
324 budget: config.memory_limit.unwrap_or_else(|| {
325 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
326 }),
327 spill_path: None,
328 ..BufferManagerConfig::default()
329 };
330 let buffer_manager = BufferManager::new(buffer_config);
331
332 let query_cache = Arc::new(QueryCache::default());
333
334 Ok(Self {
335 config,
336 store: dummy_store,
337 catalog: Arc::new(Catalog::new()),
338 #[cfg(feature = "rdf")]
339 rdf_store: Arc::new(RdfStore::new()),
340 transaction_manager,
341 buffer_manager,
342 #[cfg(feature = "wal")]
343 wal: None,
344 #[cfg(feature = "wal")]
345 wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
346 query_cache,
347 commit_counter: Arc::new(AtomicUsize::new(0)),
348 is_open: RwLock::new(true),
349 #[cfg(feature = "cdc")]
350 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
351 #[cfg(feature = "embed")]
352 embedding_models: RwLock::new(hashbrown::HashMap::new()),
353 external_store: Some(store),
354 current_graph: RwLock::new(None),
355 })
356 }
357
358 #[cfg(feature = "wal")]
364 fn apply_wal_records(
365 store: &Arc<LpgStore>,
366 catalog: &Catalog,
367 #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
368 records: &[WalRecord],
369 ) -> Result<()> {
370 use crate::catalog::{
371 EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
372 };
373 use grafeo_common::utils::error::Error;
374
375 let mut current_graph: Option<String> = None;
378 let mut target_store: Arc<LpgStore> = Arc::clone(store);
379
380 for record in records {
381 match record {
382 WalRecord::CreateNamedGraph { name } => {
384 let _ = store.create_graph(name);
385 }
386 WalRecord::DropNamedGraph { name } => {
387 store.drop_graph(name);
388 if current_graph.as_deref() == Some(name.as_str()) {
390 current_graph = None;
391 target_store = Arc::clone(store);
392 }
393 }
394 WalRecord::SwitchGraph { name } => {
395 current_graph.clone_from(name);
396 target_store = match ¤t_graph {
397 None => Arc::clone(store),
398 Some(graph_name) => store
399 .graph_or_create(graph_name)
400 .map_err(|e| Error::Internal(e.to_string()))?,
401 };
402 }
403
404 WalRecord::CreateNode { id, labels } => {
406 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
407 target_store.create_node_with_id(*id, &label_refs)?;
408 }
409 WalRecord::DeleteNode { id } => {
410 target_store.delete_node(*id);
411 }
412 WalRecord::CreateEdge {
413 id,
414 src,
415 dst,
416 edge_type,
417 } => {
418 target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
419 }
420 WalRecord::DeleteEdge { id } => {
421 target_store.delete_edge(*id);
422 }
423 WalRecord::SetNodeProperty { id, key, value } => {
424 target_store.set_node_property(*id, key, value.clone());
425 }
426 WalRecord::SetEdgeProperty { id, key, value } => {
427 target_store.set_edge_property(*id, key, value.clone());
428 }
429 WalRecord::AddNodeLabel { id, label } => {
430 target_store.add_label(*id, label);
431 }
432 WalRecord::RemoveNodeLabel { id, label } => {
433 target_store.remove_label(*id, label);
434 }
435 WalRecord::RemoveNodeProperty { id, key } => {
436 target_store.remove_node_property(*id, key);
437 }
438 WalRecord::RemoveEdgeProperty { id, key } => {
439 target_store.remove_edge_property(*id, key);
440 }
441
442 WalRecord::CreateNodeType {
444 name,
445 properties,
446 constraints,
447 } => {
448 let def = NodeTypeDefinition {
449 name: name.clone(),
450 properties: properties
451 .iter()
452 .map(|(n, t, nullable)| TypedProperty {
453 name: n.clone(),
454 data_type: PropertyDataType::from_type_name(t),
455 nullable: *nullable,
456 default_value: None,
457 })
458 .collect(),
459 constraints: constraints
460 .iter()
461 .map(|(kind, props)| match kind.as_str() {
462 "unique" => TypeConstraint::Unique(props.clone()),
463 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
464 "not_null" if !props.is_empty() => {
465 TypeConstraint::NotNull(props[0].clone())
466 }
467 _ => TypeConstraint::Unique(props.clone()),
468 })
469 .collect(),
470 parent_types: Vec::new(),
471 };
472 let _ = catalog.register_node_type(def);
473 }
474 WalRecord::DropNodeType { name } => {
475 let _ = catalog.drop_node_type(name);
476 }
477 WalRecord::CreateEdgeType {
478 name,
479 properties,
480 constraints,
481 } => {
482 let def = EdgeTypeDefinition {
483 name: name.clone(),
484 properties: properties
485 .iter()
486 .map(|(n, t, nullable)| TypedProperty {
487 name: n.clone(),
488 data_type: PropertyDataType::from_type_name(t),
489 nullable: *nullable,
490 default_value: None,
491 })
492 .collect(),
493 constraints: constraints
494 .iter()
495 .map(|(kind, props)| match kind.as_str() {
496 "unique" => TypeConstraint::Unique(props.clone()),
497 "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
498 "not_null" if !props.is_empty() => {
499 TypeConstraint::NotNull(props[0].clone())
500 }
501 _ => TypeConstraint::Unique(props.clone()),
502 })
503 .collect(),
504 source_node_types: Vec::new(),
505 target_node_types: Vec::new(),
506 };
507 let _ = catalog.register_edge_type_def(def);
508 }
509 WalRecord::DropEdgeType { name } => {
510 let _ = catalog.drop_edge_type_def(name);
511 }
512 WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
513 }
516 WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
517 }
520 WalRecord::CreateGraphType {
521 name,
522 node_types,
523 edge_types,
524 open,
525 } => {
526 use crate::catalog::GraphTypeDefinition;
527 let def = GraphTypeDefinition {
528 name: name.clone(),
529 allowed_node_types: node_types.clone(),
530 allowed_edge_types: edge_types.clone(),
531 open: *open,
532 };
533 let _ = catalog.register_graph_type(def);
534 }
535 WalRecord::DropGraphType { name } => {
536 let _ = catalog.drop_graph_type(name);
537 }
538 WalRecord::CreateSchema { name } => {
539 let _ = catalog.register_schema_namespace(name.clone());
540 }
541 WalRecord::DropSchema { name } => {
542 let _ = catalog.drop_schema_namespace(name);
543 }
544
545 WalRecord::AlterNodeType { name, alterations } => {
546 for (action, prop_name, type_name, nullable) in alterations {
547 match action.as_str() {
548 "add" => {
549 let prop = TypedProperty {
550 name: prop_name.clone(),
551 data_type: PropertyDataType::from_type_name(type_name),
552 nullable: *nullable,
553 default_value: None,
554 };
555 let _ = catalog.alter_node_type_add_property(name, prop);
556 }
557 "drop" => {
558 let _ = catalog.alter_node_type_drop_property(name, prop_name);
559 }
560 _ => {}
561 }
562 }
563 }
564 WalRecord::AlterEdgeType { name, alterations } => {
565 for (action, prop_name, type_name, nullable) in alterations {
566 match action.as_str() {
567 "add" => {
568 let prop = TypedProperty {
569 name: prop_name.clone(),
570 data_type: PropertyDataType::from_type_name(type_name),
571 nullable: *nullable,
572 default_value: None,
573 };
574 let _ = catalog.alter_edge_type_add_property(name, prop);
575 }
576 "drop" => {
577 let _ = catalog.alter_edge_type_drop_property(name, prop_name);
578 }
579 _ => {}
580 }
581 }
582 }
583 WalRecord::AlterGraphType { name, alterations } => {
584 for (action, type_name) in alterations {
585 match action.as_str() {
586 "add_node" => {
587 let _ =
588 catalog.alter_graph_type_add_node_type(name, type_name.clone());
589 }
590 "drop_node" => {
591 let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
592 }
593 "add_edge" => {
594 let _ =
595 catalog.alter_graph_type_add_edge_type(name, type_name.clone());
596 }
597 "drop_edge" => {
598 let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
599 }
600 _ => {}
601 }
602 }
603 }
604
605 WalRecord::CreateProcedure {
606 name,
607 params,
608 returns,
609 body,
610 } => {
611 use crate::catalog::ProcedureDefinition;
612 let def = ProcedureDefinition {
613 name: name.clone(),
614 params: params.clone(),
615 returns: returns.clone(),
616 body: body.clone(),
617 };
618 let _ = catalog.register_procedure(def);
619 }
620 WalRecord::DropProcedure { name } => {
621 let _ = catalog.drop_procedure(name);
622 }
623
624 #[cfg(feature = "rdf")]
626 WalRecord::InsertRdfTriple {
627 subject,
628 predicate,
629 object,
630 graph,
631 } => {
632 use grafeo_core::graph::rdf::Term;
633 if let (Some(s), Some(p), Some(o)) = (
634 Term::from_ntriples(subject),
635 Term::from_ntriples(predicate),
636 Term::from_ntriples(object),
637 ) {
638 let triple = grafeo_core::graph::rdf::Triple::new(s, p, o);
639 let target = match graph {
640 Some(name) => rdf_store.graph_or_create(name),
641 None => Arc::clone(rdf_store),
642 };
643 target.insert(triple);
644 }
645 }
646 #[cfg(feature = "rdf")]
647 WalRecord::DeleteRdfTriple {
648 subject,
649 predicate,
650 object,
651 graph,
652 } => {
653 use grafeo_core::graph::rdf::Term;
654 if let (Some(s), Some(p), Some(o)) = (
655 Term::from_ntriples(subject),
656 Term::from_ntriples(predicate),
657 Term::from_ntriples(object),
658 ) {
659 let triple = grafeo_core::graph::rdf::Triple::new(s, p, o);
660 let target = match graph {
661 Some(name) => rdf_store.graph_or_create(name),
662 None => Arc::clone(rdf_store),
663 };
664 target.remove(&triple);
665 }
666 }
667 #[cfg(feature = "rdf")]
668 WalRecord::ClearRdfGraph { graph } => {
669 rdf_store.clear_graph(graph.as_deref());
670 }
671 #[cfg(feature = "rdf")]
672 WalRecord::CreateRdfGraph { name } => {
673 let _ = rdf_store.create_graph(name);
674 }
675 #[cfg(feature = "rdf")]
676 WalRecord::DropRdfGraph { name } => match name {
677 None => rdf_store.clear(),
678 Some(graph_name) => {
679 rdf_store.drop_graph(graph_name);
680 }
681 },
682 #[cfg(not(feature = "rdf"))]
684 WalRecord::InsertRdfTriple { .. }
685 | WalRecord::DeleteRdfTriple { .. }
686 | WalRecord::ClearRdfGraph { .. }
687 | WalRecord::CreateRdfGraph { .. }
688 | WalRecord::DropRdfGraph { .. } => {}
689
690 WalRecord::TransactionCommit { .. }
691 | WalRecord::TransactionAbort { .. }
692 | WalRecord::Checkpoint { .. } => {
693 }
696 }
697 }
698 Ok(())
699 }
700
701 #[must_use]
729 pub fn session(&self) -> Session {
730 let session_cfg = || crate::session::SessionConfig {
731 transaction_manager: Arc::clone(&self.transaction_manager),
732 query_cache: Arc::clone(&self.query_cache),
733 catalog: Arc::clone(&self.catalog),
734 adaptive_config: self.config.adaptive.clone(),
735 factorized_execution: self.config.factorized_execution,
736 graph_model: self.config.graph_model,
737 query_timeout: self.config.query_timeout,
738 commit_counter: Arc::clone(&self.commit_counter),
739 gc_interval: self.config.gc_interval,
740 };
741
742 if let Some(ref ext_store) = self.external_store {
743 return Session::with_external_store(Arc::clone(ext_store), session_cfg())
744 .expect("arena allocation for external store session");
745 }
746
747 #[cfg(feature = "rdf")]
748 let mut session = Session::with_rdf_store_and_adaptive(
749 Arc::clone(&self.store),
750 Arc::clone(&self.rdf_store),
751 session_cfg(),
752 );
753 #[cfg(not(feature = "rdf"))]
754 let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
755
756 #[cfg(feature = "wal")]
757 if let Some(ref wal) = self.wal {
758 session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
759 }
760
761 #[cfg(feature = "cdc")]
762 session.set_cdc_log(Arc::clone(&self.cdc_log));
763
764 if let Some(ref graph) = *self.current_graph.read() {
766 session.use_graph(graph);
767 }
768
769 let _ = &mut session;
771
772 session
773 }
774
775 #[must_use]
781 pub fn current_graph(&self) -> Option<String> {
782 self.current_graph.read().clone()
783 }
784
785 pub fn set_current_graph(&self, name: Option<&str>) {
790 *self.current_graph.write() = name.map(ToString::to_string);
791 }
792
793 #[must_use]
795 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
796 &self.config.adaptive
797 }
798
799 #[must_use]
801 pub fn config(&self) -> &Config {
802 &self.config
803 }
804
805 #[must_use]
807 pub fn graph_model(&self) -> crate::config::GraphModel {
808 self.config.graph_model
809 }
810
811 #[must_use]
813 pub fn memory_limit(&self) -> Option<usize> {
814 self.config.memory_limit
815 }
816
817 #[must_use]
825 pub fn store(&self) -> &Arc<LpgStore> {
826 &self.store
827 }
828
829 #[allow(dead_code)] fn active_store(&self) -> Arc<LpgStore> {
836 let graph_name = self.current_graph.read().clone();
837 match graph_name {
838 None => Arc::clone(&self.store),
839 Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
840 Some(ref name) => self
841 .store
842 .graph(name)
843 .unwrap_or_else(|| Arc::clone(&self.store)),
844 }
845 }
846
847 pub fn create_graph(&self, name: &str) -> Result<bool> {
855 Ok(self.store.create_graph(name)?)
856 }
857
858 pub fn drop_graph(&self, name: &str) -> bool {
860 self.store.drop_graph(name)
861 }
862
863 #[must_use]
865 pub fn list_graphs(&self) -> Vec<String> {
866 self.store.graph_names()
867 }
868
869 #[must_use]
877 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
878 if let Some(ref ext_store) = self.external_store {
879 Arc::clone(ext_store)
880 } else {
881 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
882 }
883 }
884
885 pub fn gc(&self) {
891 let min_epoch = self.transaction_manager.min_active_epoch();
892 self.store.gc_versions(min_epoch);
893 self.transaction_manager.gc();
894 }
895
896 #[must_use]
898 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
899 &self.buffer_manager
900 }
901
902 #[must_use]
904 pub fn query_cache(&self) -> &Arc<QueryCache> {
905 &self.query_cache
906 }
907
908 pub fn clear_plan_cache(&self) {
914 self.query_cache.clear();
915 }
916
917 pub fn close(&self) -> Result<()> {
931 let mut is_open = self.is_open.write();
932 if !*is_open {
933 return Ok(());
934 }
935
936 #[cfg(feature = "wal")]
938 if let Some(ref wal) = self.wal {
939 let epoch = self.store.current_epoch();
940
941 let checkpoint_tx = self
943 .transaction_manager
944 .last_assigned_transaction_id()
945 .unwrap_or_else(|| {
946 self.transaction_manager.begin()
948 });
949
950 wal.log(&WalRecord::TransactionCommit {
952 transaction_id: checkpoint_tx,
953 })?;
954
955 wal.checkpoint(checkpoint_tx, epoch)?;
957 wal.sync()?;
958 }
959
960 *is_open = false;
961 Ok(())
962 }
963
964 #[cfg(feature = "wal")]
966 #[must_use]
967 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
968 self.wal.as_ref()
969 }
970
971 #[cfg(feature = "wal")]
973 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
974 if let Some(ref wal) = self.wal {
975 wal.log(record)?;
976 }
977 Ok(())
978 }
979}
980
981impl Drop for GrafeoDB {
982 fn drop(&mut self) {
983 if let Err(e) = self.close() {
984 tracing::error!("Error closing database: {}", e);
985 }
986 }
987}
988
989impl crate::admin::AdminService for GrafeoDB {
990 fn info(&self) -> crate::admin::DatabaseInfo {
991 self.info()
992 }
993
994 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
995 self.detailed_stats()
996 }
997
998 fn schema(&self) -> crate::admin::SchemaInfo {
999 self.schema()
1000 }
1001
1002 fn validate(&self) -> crate::admin::ValidationResult {
1003 self.validate()
1004 }
1005
1006 fn wal_status(&self) -> crate::admin::WalStatus {
1007 self.wal_status()
1008 }
1009
1010 fn wal_checkpoint(&self) -> Result<()> {
1011 self.wal_checkpoint()
1012 }
1013}
1014
1015#[derive(Debug)]
1045pub struct QueryResult {
1046 pub columns: Vec<String>,
1048 pub column_types: Vec<grafeo_common::types::LogicalType>,
1050 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1052 pub execution_time_ms: Option<f64>,
1054 pub rows_scanned: Option<u64>,
1056 pub status_message: Option<String>,
1058 pub gql_status: grafeo_common::utils::GqlStatus,
1060}
1061
1062impl QueryResult {
1063 #[must_use]
1065 pub fn empty() -> Self {
1066 Self {
1067 columns: Vec::new(),
1068 column_types: Vec::new(),
1069 rows: Vec::new(),
1070 execution_time_ms: None,
1071 rows_scanned: None,
1072 status_message: None,
1073 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1074 }
1075 }
1076
1077 #[must_use]
1079 pub fn status(msg: impl Into<String>) -> Self {
1080 Self {
1081 columns: Vec::new(),
1082 column_types: Vec::new(),
1083 rows: Vec::new(),
1084 execution_time_ms: None,
1085 rows_scanned: None,
1086 status_message: Some(msg.into()),
1087 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1088 }
1089 }
1090
1091 #[must_use]
1093 pub fn new(columns: Vec<String>) -> Self {
1094 let len = columns.len();
1095 Self {
1096 columns,
1097 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1098 rows: Vec::new(),
1099 execution_time_ms: None,
1100 rows_scanned: None,
1101 status_message: None,
1102 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1103 }
1104 }
1105
1106 #[must_use]
1108 pub fn with_types(
1109 columns: Vec<String>,
1110 column_types: Vec<grafeo_common::types::LogicalType>,
1111 ) -> Self {
1112 Self {
1113 columns,
1114 column_types,
1115 rows: Vec::new(),
1116 execution_time_ms: None,
1117 rows_scanned: None,
1118 status_message: None,
1119 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1120 }
1121 }
1122
1123 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1125 self.execution_time_ms = Some(execution_time_ms);
1126 self.rows_scanned = Some(rows_scanned);
1127 self
1128 }
1129
1130 #[must_use]
1132 pub fn execution_time_ms(&self) -> Option<f64> {
1133 self.execution_time_ms
1134 }
1135
1136 #[must_use]
1138 pub fn rows_scanned(&self) -> Option<u64> {
1139 self.rows_scanned
1140 }
1141
1142 #[must_use]
1144 pub fn row_count(&self) -> usize {
1145 self.rows.len()
1146 }
1147
1148 #[must_use]
1150 pub fn column_count(&self) -> usize {
1151 self.columns.len()
1152 }
1153
1154 #[must_use]
1156 pub fn is_empty(&self) -> bool {
1157 self.rows.is_empty()
1158 }
1159
1160 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1169 if self.rows.len() != 1 || self.columns.len() != 1 {
1170 return Err(grafeo_common::utils::error::Error::InvalidValue(
1171 "Expected single value".to_string(),
1172 ));
1173 }
1174 T::from_value(&self.rows[0][0])
1175 }
1176
1177 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1179 self.rows.iter()
1180 }
1181}
1182
1183pub trait FromValue: Sized {
1188 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1190}
1191
1192impl FromValue for i64 {
1193 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1194 value
1195 .as_int64()
1196 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1197 expected: "INT64".to_string(),
1198 found: value.type_name().to_string(),
1199 })
1200 }
1201}
1202
1203impl FromValue for f64 {
1204 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1205 value
1206 .as_float64()
1207 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1208 expected: "FLOAT64".to_string(),
1209 found: value.type_name().to_string(),
1210 })
1211 }
1212}
1213
1214impl FromValue for String {
1215 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1216 value.as_str().map(String::from).ok_or_else(|| {
1217 grafeo_common::utils::error::Error::TypeMismatch {
1218 expected: "STRING".to_string(),
1219 found: value.type_name().to_string(),
1220 }
1221 })
1222 }
1223}
1224
1225impl FromValue for bool {
1226 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1227 value
1228 .as_bool()
1229 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1230 expected: "BOOL".to_string(),
1231 found: value.type_name().to_string(),
1232 })
1233 }
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238 use super::*;
1239
1240 #[test]
1241 fn test_create_in_memory_database() {
1242 let db = GrafeoDB::new_in_memory();
1243 assert_eq!(db.node_count(), 0);
1244 assert_eq!(db.edge_count(), 0);
1245 }
1246
1247 #[test]
1248 fn test_database_config() {
1249 let config = Config::in_memory().with_threads(4).with_query_logging();
1250
1251 let db = GrafeoDB::with_config(config).unwrap();
1252 assert_eq!(db.config().threads, 4);
1253 assert!(db.config().query_logging);
1254 }
1255
1256 #[test]
1257 fn test_database_session() {
1258 let db = GrafeoDB::new_in_memory();
1259 let _session = db.session();
1260 }
1262
1263 #[cfg(feature = "wal")]
1264 #[test]
1265 fn test_persistent_database_recovery() {
1266 use grafeo_common::types::Value;
1267 use tempfile::tempdir;
1268
1269 let dir = tempdir().unwrap();
1270 let db_path = dir.path().join("test_db");
1271
1272 {
1274 let db = GrafeoDB::open(&db_path).unwrap();
1275
1276 let alix = db.create_node(&["Person"]);
1277 db.set_node_property(alix, "name", Value::from("Alix"));
1278
1279 let gus = db.create_node(&["Person"]);
1280 db.set_node_property(gus, "name", Value::from("Gus"));
1281
1282 let _edge = db.create_edge(alix, gus, "KNOWS");
1283
1284 db.close().unwrap();
1286 }
1287
1288 {
1290 let db = GrafeoDB::open(&db_path).unwrap();
1291
1292 assert_eq!(db.node_count(), 2);
1293 assert_eq!(db.edge_count(), 1);
1294
1295 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1297 assert!(node0.is_some());
1298
1299 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1300 assert!(node1.is_some());
1301 }
1302 }
1303
1304 #[cfg(feature = "wal")]
1305 #[test]
1306 fn test_wal_logging() {
1307 use tempfile::tempdir;
1308
1309 let dir = tempdir().unwrap();
1310 let db_path = dir.path().join("wal_test_db");
1311
1312 let db = GrafeoDB::open(&db_path).unwrap();
1313
1314 let node = db.create_node(&["Test"]);
1316 db.delete_node(node);
1317
1318 if let Some(wal) = db.wal() {
1320 assert!(wal.record_count() > 0);
1321 }
1322
1323 db.close().unwrap();
1324 }
1325
1326 #[cfg(feature = "wal")]
1327 #[test]
1328 fn test_wal_recovery_multiple_sessions() {
1329 use grafeo_common::types::Value;
1331 use tempfile::tempdir;
1332
1333 let dir = tempdir().unwrap();
1334 let db_path = dir.path().join("multi_session_db");
1335
1336 {
1338 let db = GrafeoDB::open(&db_path).unwrap();
1339 let alix = db.create_node(&["Person"]);
1340 db.set_node_property(alix, "name", Value::from("Alix"));
1341 db.close().unwrap();
1342 }
1343
1344 {
1346 let db = GrafeoDB::open(&db_path).unwrap();
1347 assert_eq!(db.node_count(), 1); let gus = db.create_node(&["Person"]);
1349 db.set_node_property(gus, "name", Value::from("Gus"));
1350 db.close().unwrap();
1351 }
1352
1353 {
1355 let db = GrafeoDB::open(&db_path).unwrap();
1356 assert_eq!(db.node_count(), 2);
1357
1358 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1360 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1361
1362 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1363 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1364 }
1365 }
1366
1367 #[cfg(feature = "wal")]
1368 #[test]
1369 fn test_database_consistency_after_mutations() {
1370 use grafeo_common::types::Value;
1372 use tempfile::tempdir;
1373
1374 let dir = tempdir().unwrap();
1375 let db_path = dir.path().join("consistency_db");
1376
1377 {
1378 let db = GrafeoDB::open(&db_path).unwrap();
1379
1380 let a = db.create_node(&["Node"]);
1382 let b = db.create_node(&["Node"]);
1383 let c = db.create_node(&["Node"]);
1384
1385 let e1 = db.create_edge(a, b, "LINKS");
1387 let _e2 = db.create_edge(b, c, "LINKS");
1388
1389 db.delete_edge(e1);
1391 db.delete_node(b);
1392
1393 db.set_node_property(a, "value", Value::Int64(1));
1395 db.set_node_property(c, "value", Value::Int64(3));
1396
1397 db.close().unwrap();
1398 }
1399
1400 {
1402 let db = GrafeoDB::open(&db_path).unwrap();
1403
1404 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1408 assert!(node_a.is_some());
1409
1410 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1411 assert!(node_c.is_some());
1412
1413 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1415 assert!(node_b.is_none());
1416 }
1417 }
1418
1419 #[cfg(feature = "wal")]
1420 #[test]
1421 fn test_close_is_idempotent() {
1422 use tempfile::tempdir;
1424
1425 let dir = tempdir().unwrap();
1426 let db_path = dir.path().join("close_test_db");
1427
1428 let db = GrafeoDB::open(&db_path).unwrap();
1429 db.create_node(&["Test"]);
1430
1431 assert!(db.close().is_ok());
1433
1434 assert!(db.close().is_ok());
1436 }
1437
1438 #[test]
1439 fn test_query_result_has_metrics() {
1440 let db = GrafeoDB::new_in_memory();
1442 db.create_node(&["Person"]);
1443 db.create_node(&["Person"]);
1444
1445 #[cfg(feature = "gql")]
1446 {
1447 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1448
1449 assert!(result.execution_time_ms.is_some());
1451 assert!(result.rows_scanned.is_some());
1452 assert!(result.execution_time_ms.unwrap() >= 0.0);
1453 assert_eq!(result.rows_scanned.unwrap(), 2);
1454 }
1455 }
1456
1457 #[test]
1458 fn test_empty_query_result_metrics() {
1459 let db = GrafeoDB::new_in_memory();
1461 db.create_node(&["Person"]);
1462
1463 #[cfg(feature = "gql")]
1464 {
1465 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1467
1468 assert!(result.execution_time_ms.is_some());
1469 assert!(result.rows_scanned.is_some());
1470 assert_eq!(result.rows_scanned.unwrap(), 0);
1471 }
1472 }
1473
1474 #[cfg(feature = "cdc")]
1475 mod cdc_integration {
1476 use super::*;
1477
1478 #[test]
1479 fn test_node_lifecycle_history() {
1480 let db = GrafeoDB::new_in_memory();
1481
1482 let id = db.create_node(&["Person"]);
1484 db.set_node_property(id, "name", "Alix".into());
1486 db.set_node_property(id, "name", "Gus".into());
1487 db.delete_node(id);
1489
1490 let history = db.history(id).unwrap();
1491 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1493 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1494 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1496 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1498 }
1499
1500 #[test]
1501 fn test_edge_lifecycle_history() {
1502 let db = GrafeoDB::new_in_memory();
1503
1504 let alix = db.create_node(&["Person"]);
1505 let gus = db.create_node(&["Person"]);
1506 let edge = db.create_edge(alix, gus, "KNOWS");
1507 db.set_edge_property(edge, "since", 2024i64.into());
1508 db.delete_edge(edge);
1509
1510 let history = db.history(edge).unwrap();
1511 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1513 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1514 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1515 }
1516
1517 #[test]
1518 fn test_create_node_with_props_cdc() {
1519 let db = GrafeoDB::new_in_memory();
1520
1521 let id = db.create_node_with_props(
1522 &["Person"],
1523 vec![
1524 ("name", grafeo_common::types::Value::from("Alix")),
1525 ("age", grafeo_common::types::Value::from(30i64)),
1526 ],
1527 );
1528
1529 let history = db.history(id).unwrap();
1530 assert_eq!(history.len(), 1);
1531 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1532 let after = history[0].after.as_ref().unwrap();
1534 assert_eq!(after.len(), 2);
1535 }
1536
1537 #[test]
1538 fn test_changes_between() {
1539 let db = GrafeoDB::new_in_memory();
1540
1541 let id1 = db.create_node(&["A"]);
1542 let _id2 = db.create_node(&["B"]);
1543 db.set_node_property(id1, "x", 1i64.into());
1544
1545 let changes = db
1547 .changes_between(
1548 grafeo_common::types::EpochId(0),
1549 grafeo_common::types::EpochId(u64::MAX),
1550 )
1551 .unwrap();
1552 assert_eq!(changes.len(), 3); }
1554 }
1555
1556 #[test]
1557 fn test_with_store_basic() {
1558 use grafeo_core::graph::lpg::LpgStore;
1559
1560 let store = Arc::new(LpgStore::new().unwrap());
1561 let n1 = store.create_node(&["Person"]);
1562 store.set_node_property(n1, "name", "Alix".into());
1563
1564 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1565 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1566
1567 let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1568 assert_eq!(result.rows.len(), 1);
1569 }
1570
1571 #[test]
1572 fn test_with_store_session() {
1573 use grafeo_core::graph::lpg::LpgStore;
1574
1575 let store = Arc::new(LpgStore::new().unwrap());
1576 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1577 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1578
1579 let session = db.session();
1580 let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1581 assert_eq!(result.rows.len(), 1);
1582 }
1583
1584 #[test]
1585 fn test_with_store_mutations() {
1586 use grafeo_core::graph::lpg::LpgStore;
1587
1588 let store = Arc::new(LpgStore::new().unwrap());
1589 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1590 let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1591
1592 let mut session = db.session();
1593
1594 session.begin_transaction().unwrap();
1598 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1599
1600 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1601 assert_eq!(result.rows.len(), 1);
1602
1603 session.commit().unwrap();
1604 }
1605}