1use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::AtomicUsize;
8
9use parking_lot::RwLock;
10
11#[cfg(feature = "wal")]
12use grafeo_adapters::storage::wal::{
13 DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
14};
15use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
16use grafeo_common::types::{EdgeId, NodeId, Value};
17use grafeo_common::utils::error::{Error, Result};
18use grafeo_core::graph::lpg::LpgStore;
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::config::Config;
23use crate::query::cache::QueryCache;
24use crate::session::Session;
25use crate::transaction::TransactionManager;
26
27pub struct GrafeoDB {
50 config: Config,
52 store: Arc<LpgStore>,
54 #[cfg(feature = "rdf")]
56 rdf_store: Arc<RdfStore>,
57 tx_manager: Arc<TransactionManager>,
59 buffer_manager: Arc<BufferManager>,
61 #[cfg(feature = "wal")]
63 wal: Option<Arc<WalManager>>,
64 query_cache: Arc<QueryCache>,
66 commit_counter: Arc<AtomicUsize>,
68 is_open: RwLock<bool>,
70 #[cfg(feature = "cdc")]
72 cdc_log: Arc<crate::cdc::CdcLog>,
73 #[cfg(feature = "embed")]
75 embedding_models: RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
76}
77
78impl GrafeoDB {
79 #[must_use]
95 pub fn new_in_memory() -> Self {
96 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
97 }
98
99 #[cfg(feature = "wal")]
118 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
119 Self::with_config(Config::persistent(path.as_ref()))
120 }
121
122 pub fn with_config(config: Config) -> Result<Self> {
146 config
148 .validate()
149 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
150
151 let store = Arc::new(LpgStore::new());
152 #[cfg(feature = "rdf")]
153 let rdf_store = Arc::new(RdfStore::new());
154 let tx_manager = Arc::new(TransactionManager::new());
155
156 let buffer_config = BufferManagerConfig {
158 budget: config.memory_limit.unwrap_or_else(|| {
159 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
160 }),
161 spill_path: config
162 .spill_path
163 .clone()
164 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
165 ..BufferManagerConfig::default()
166 };
167 let buffer_manager = BufferManager::new(buffer_config);
168
169 #[cfg(feature = "wal")]
171 let wal = if config.wal_enabled {
172 if let Some(ref db_path) = config.path {
173 std::fs::create_dir_all(db_path)?;
175
176 let wal_path = db_path.join("wal");
177
178 if wal_path.exists() {
180 let recovery = WalRecovery::new(&wal_path);
181 let records = recovery.recover()?;
182 Self::apply_wal_records(&store, &records)?;
183 }
184
185 let wal_durability = match config.wal_durability {
187 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
188 crate::config::DurabilityMode::Batch {
189 max_delay_ms,
190 max_records,
191 } => WalDurabilityMode::Batch {
192 max_delay_ms,
193 max_records,
194 },
195 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
196 WalDurabilityMode::Adaptive { target_interval_ms }
197 }
198 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
199 };
200 let wal_config = WalConfig {
201 durability: wal_durability,
202 ..WalConfig::default()
203 };
204 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
205 Some(Arc::new(wal_manager))
206 } else {
207 None
208 }
209 } else {
210 None
211 };
212
213 let query_cache = Arc::new(QueryCache::default());
215
216 Ok(Self {
217 config,
218 store,
219 #[cfg(feature = "rdf")]
220 rdf_store,
221 tx_manager,
222 buffer_manager,
223 #[cfg(feature = "wal")]
224 wal,
225 query_cache,
226 commit_counter: Arc::new(AtomicUsize::new(0)),
227 is_open: RwLock::new(true),
228 #[cfg(feature = "cdc")]
229 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
230 #[cfg(feature = "embed")]
231 embedding_models: RwLock::new(hashbrown::HashMap::new()),
232 })
233 }
234
235 #[cfg(feature = "wal")]
237 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
238 for record in records {
239 match record {
240 WalRecord::CreateNode { id, labels } => {
241 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
242 store.create_node_with_id(*id, &label_refs);
243 }
244 WalRecord::DeleteNode { id } => {
245 store.delete_node(*id);
246 }
247 WalRecord::CreateEdge {
248 id,
249 src,
250 dst,
251 edge_type,
252 } => {
253 store.create_edge_with_id(*id, *src, *dst, edge_type);
254 }
255 WalRecord::DeleteEdge { id } => {
256 store.delete_edge(*id);
257 }
258 WalRecord::SetNodeProperty { id, key, value } => {
259 store.set_node_property(*id, key, value.clone());
260 }
261 WalRecord::SetEdgeProperty { id, key, value } => {
262 store.set_edge_property(*id, key, value.clone());
263 }
264 WalRecord::AddNodeLabel { id, label } => {
265 store.add_label(*id, label);
266 }
267 WalRecord::RemoveNodeLabel { id, label } => {
268 store.remove_label(*id, label);
269 }
270 WalRecord::TxCommit { .. }
271 | WalRecord::TxAbort { .. }
272 | WalRecord::Checkpoint { .. } => {
273 }
276 }
277 }
278 Ok(())
279 }
280
281 #[must_use]
300 pub fn session(&self) -> Session {
301 #[cfg(feature = "rdf")]
302 let mut session = Session::with_rdf_store_and_adaptive(
303 Arc::clone(&self.store),
304 Arc::clone(&self.rdf_store),
305 Arc::clone(&self.tx_manager),
306 Arc::clone(&self.query_cache),
307 self.config.adaptive.clone(),
308 self.config.factorized_execution,
309 self.config.graph_model,
310 self.config.query_timeout,
311 Arc::clone(&self.commit_counter),
312 self.config.gc_interval,
313 );
314 #[cfg(not(feature = "rdf"))]
315 let mut session = Session::with_adaptive(
316 Arc::clone(&self.store),
317 Arc::clone(&self.tx_manager),
318 Arc::clone(&self.query_cache),
319 self.config.adaptive.clone(),
320 self.config.factorized_execution,
321 self.config.graph_model,
322 self.config.query_timeout,
323 Arc::clone(&self.commit_counter),
324 self.config.gc_interval,
325 );
326
327 #[cfg(feature = "cdc")]
328 session.set_cdc_log(Arc::clone(&self.cdc_log));
329
330 let _ = &mut session;
332
333 session
334 }
335
336 #[must_use]
338 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
339 &self.config.adaptive
340 }
341
342 pub fn execute(&self, query: &str) -> Result<QueryResult> {
352 let session = self.session();
353 session.execute(query)
354 }
355
356 pub fn execute_with_params(
362 &self,
363 query: &str,
364 params: std::collections::HashMap<String, grafeo_common::types::Value>,
365 ) -> Result<QueryResult> {
366 let session = self.session();
367 session.execute_with_params(query, params)
368 }
369
370 #[cfg(feature = "cypher")]
376 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
377 let session = self.session();
378 session.execute_cypher(query)
379 }
380
381 #[cfg(feature = "cypher")]
387 pub fn execute_cypher_with_params(
388 &self,
389 query: &str,
390 params: std::collections::HashMap<String, grafeo_common::types::Value>,
391 ) -> Result<QueryResult> {
392 use crate::query::processor::{QueryLanguage, QueryProcessor};
393
394 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
396 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
397 }
398
399 #[cfg(feature = "gremlin")]
405 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
406 let session = self.session();
407 session.execute_gremlin(query)
408 }
409
410 #[cfg(feature = "gremlin")]
416 pub fn execute_gremlin_with_params(
417 &self,
418 query: &str,
419 params: std::collections::HashMap<String, grafeo_common::types::Value>,
420 ) -> Result<QueryResult> {
421 let session = self.session();
422 session.execute_gremlin_with_params(query, params)
423 }
424
425 #[cfg(feature = "graphql")]
431 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
432 let session = self.session();
433 session.execute_graphql(query)
434 }
435
436 #[cfg(feature = "graphql")]
442 pub fn execute_graphql_with_params(
443 &self,
444 query: &str,
445 params: std::collections::HashMap<String, grafeo_common::types::Value>,
446 ) -> Result<QueryResult> {
447 let session = self.session();
448 session.execute_graphql_with_params(query, params)
449 }
450
451 #[cfg(feature = "sql-pgq")]
457 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
458 let session = self.session();
459 session.execute_sql(query)
460 }
461
462 #[cfg(feature = "sql-pgq")]
468 pub fn execute_sql_with_params(
469 &self,
470 query: &str,
471 params: std::collections::HashMap<String, grafeo_common::types::Value>,
472 ) -> Result<QueryResult> {
473 use crate::query::processor::{QueryLanguage, QueryProcessor};
474
475 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
477 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
478 }
479
480 #[cfg(all(feature = "sparql", feature = "rdf"))]
500 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
501 use crate::query::{
502 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
503 };
504
505 let logical_plan = sparql_translator::translate(query)?;
507
508 let optimizer = Optimizer::from_store(&self.store);
510 let optimized_plan = optimizer.optimize(logical_plan)?;
511
512 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
514 let mut physical_plan = planner.plan(&optimized_plan)?;
515
516 let executor = Executor::with_columns(physical_plan.columns.clone());
518 executor.execute(physical_plan.operator.as_mut())
519 }
520
521 #[cfg(feature = "rdf")]
525 #[must_use]
526 pub fn rdf_store(&self) -> &Arc<RdfStore> {
527 &self.rdf_store
528 }
529
530 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
536 let result = self.execute(query)?;
537 result.scalar()
538 }
539
540 #[must_use]
542 pub fn config(&self) -> &Config {
543 &self.config
544 }
545
546 #[must_use]
548 pub fn graph_model(&self) -> crate::config::GraphModel {
549 self.config.graph_model
550 }
551
552 #[must_use]
554 pub fn memory_limit(&self) -> Option<usize> {
555 self.config.memory_limit
556 }
557
558 #[must_use]
562 pub fn store(&self) -> &Arc<LpgStore> {
563 &self.store
564 }
565
566 pub fn gc(&self) {
572 let min_epoch = self.tx_manager.min_active_epoch();
573 self.store.gc_versions(min_epoch);
574 self.tx_manager.gc();
575 }
576
577 #[must_use]
579 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
580 &self.buffer_manager
581 }
582
583 pub fn close(&self) -> Result<()> {
593 let mut is_open = self.is_open.write();
594 if !*is_open {
595 return Ok(());
596 }
597
598 #[cfg(feature = "wal")]
600 if let Some(ref wal) = self.wal {
601 let epoch = self.store.current_epoch();
602
603 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
605 self.tx_manager.begin()
607 });
608
609 wal.log(&WalRecord::TxCommit {
611 tx_id: checkpoint_tx,
612 })?;
613
614 wal.checkpoint(checkpoint_tx, epoch)?;
616 wal.sync()?;
617 }
618
619 *is_open = false;
620 Ok(())
621 }
622
623 #[cfg(feature = "wal")]
625 #[must_use]
626 pub fn wal(&self) -> Option<&Arc<WalManager>> {
627 self.wal.as_ref()
628 }
629
630 #[cfg(feature = "wal")]
632 fn log_wal(&self, record: &WalRecord) -> Result<()> {
633 if let Some(ref wal) = self.wal {
634 wal.log(record)?;
635 }
636 Ok(())
637 }
638
639 #[must_use]
641 pub fn node_count(&self) -> usize {
642 self.store.node_count()
643 }
644
645 #[must_use]
647 pub fn edge_count(&self) -> usize {
648 self.store.edge_count()
649 }
650
651 #[must_use]
653 pub fn label_count(&self) -> usize {
654 self.store.label_count()
655 }
656
657 #[must_use]
659 pub fn property_key_count(&self) -> usize {
660 self.store.property_key_count()
661 }
662
663 #[must_use]
665 pub fn edge_type_count(&self) -> usize {
666 self.store.edge_type_count()
667 }
668
669 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
686 let id = self.store.create_node(labels);
687
688 #[cfg(feature = "wal")]
690 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
691 id,
692 labels: labels.iter().map(|s| (*s).to_string()).collect(),
693 }) {
694 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
695 }
696
697 #[cfg(feature = "cdc")]
698 self.cdc_log
699 .record_create_node(id, self.store.current_epoch(), None);
700
701 id
702 }
703
704 pub fn create_node_with_props(
708 &self,
709 labels: &[&str],
710 properties: impl IntoIterator<
711 Item = (
712 impl Into<grafeo_common::types::PropertyKey>,
713 impl Into<grafeo_common::types::Value>,
714 ),
715 >,
716 ) -> grafeo_common::types::NodeId {
717 let props: Vec<(
719 grafeo_common::types::PropertyKey,
720 grafeo_common::types::Value,
721 )> = properties
722 .into_iter()
723 .map(|(k, v)| (k.into(), v.into()))
724 .collect();
725
726 let id = self
727 .store
728 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
729
730 #[cfg(feature = "cdc")]
732 let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
733 .iter()
734 .map(|(k, v)| (k.to_string(), v.clone()))
735 .collect();
736
737 #[cfg(feature = "wal")]
739 {
740 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
741 id,
742 labels: labels.iter().map(|s| (*s).to_string()).collect(),
743 }) {
744 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
745 }
746
747 for (key, value) in props {
749 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
750 id,
751 key: key.to_string(),
752 value,
753 }) {
754 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
755 }
756 }
757 }
758
759 #[cfg(feature = "cdc")]
760 self.cdc_log.record_create_node(
761 id,
762 self.store.current_epoch(),
763 if cdc_props.is_empty() {
764 None
765 } else {
766 Some(cdc_props)
767 },
768 );
769
770 #[cfg(feature = "text-index")]
772 if let Some(node) = self.store.get_node(id) {
773 for label in &node.labels {
774 for (prop_key, prop_val) in &node.properties {
775 if let grafeo_common::types::Value::String(text) = prop_val
776 && let Some(index) =
777 self.store.get_text_index(label.as_str(), prop_key.as_ref())
778 {
779 index.write().insert(id, text);
780 }
781 }
782 }
783 }
784
785 id
786 }
787
788 #[must_use]
790 pub fn get_node(
791 &self,
792 id: grafeo_common::types::NodeId,
793 ) -> Option<grafeo_core::graph::lpg::Node> {
794 self.store.get_node(id)
795 }
796
797 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
801 #[cfg(feature = "cdc")]
803 let cdc_props = self.store.get_node(id).map(|node| {
804 node.properties
805 .iter()
806 .map(|(k, v)| (k.to_string(), v.clone()))
807 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
808 });
809
810 #[cfg(feature = "vector-index")]
812 let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
813 .store
814 .get_node(id)
815 .map(|node| {
816 let mut indexes = Vec::new();
817 for label in &node.labels {
818 let prefix = format!("{}:", label.as_str());
819 for (key, index) in self.store.vector_index_entries() {
820 if key.starts_with(&prefix) {
821 indexes.push(index);
822 }
823 }
824 }
825 indexes
826 })
827 .unwrap_or_default();
828
829 #[cfg(feature = "text-index")]
831 let text_indexes_to_clean: Vec<
832 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
833 > = self
834 .store
835 .get_node(id)
836 .map(|node| {
837 let mut indexes = Vec::new();
838 for label in &node.labels {
839 let prefix = format!("{}:", label.as_str());
840 for (key, index) in self.store.text_index_entries() {
841 if key.starts_with(&prefix) {
842 indexes.push(index);
843 }
844 }
845 }
846 indexes
847 })
848 .unwrap_or_default();
849
850 let result = self.store.delete_node(id);
851
852 #[cfg(feature = "vector-index")]
854 if result {
855 for index in indexes_to_clean {
856 index.remove(id);
857 }
858 }
859
860 #[cfg(feature = "text-index")]
862 if result {
863 for index in text_indexes_to_clean {
864 index.write().remove(id);
865 }
866 }
867
868 #[cfg(feature = "wal")]
869 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
870 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
871 }
872
873 #[cfg(feature = "cdc")]
874 if result {
875 self.cdc_log.record_delete(
876 crate::cdc::EntityId::Node(id),
877 self.store.current_epoch(),
878 cdc_props,
879 );
880 }
881
882 result
883 }
884
885 pub fn set_node_property(
889 &self,
890 id: grafeo_common::types::NodeId,
891 key: &str,
892 value: grafeo_common::types::Value,
893 ) {
894 #[cfg(feature = "vector-index")]
896 let vector_data = match &value {
897 grafeo_common::types::Value::Vector(v) => Some(v.clone()),
898 _ => None,
899 };
900
901 #[cfg(feature = "wal")]
903 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
904 id,
905 key: key.to_string(),
906 value: value.clone(),
907 }) {
908 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
909 }
910
911 #[cfg(feature = "cdc")]
913 let cdc_old_value = self
914 .store
915 .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
916 #[cfg(feature = "cdc")]
917 let cdc_new_value = value.clone();
918
919 self.store.set_node_property(id, key, value);
920
921 #[cfg(feature = "cdc")]
922 self.cdc_log.record_update(
923 crate::cdc::EntityId::Node(id),
924 self.store.current_epoch(),
925 key,
926 cdc_old_value,
927 cdc_new_value,
928 );
929
930 #[cfg(feature = "vector-index")]
932 if let Some(vec) = vector_data
933 && let Some(node) = self.store.get_node(id)
934 {
935 for label in &node.labels {
936 if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
937 let accessor =
938 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, key);
939 index.insert(id, &vec, &accessor);
940 }
941 }
942 }
943
944 #[cfg(feature = "text-index")]
946 if let Some(node) = self.store.get_node(id) {
947 let text_val = node
948 .properties
949 .get(&grafeo_common::types::PropertyKey::new(key))
950 .and_then(|v| match v {
951 grafeo_common::types::Value::String(s) => Some(s.to_string()),
952 _ => None,
953 });
954 for label in &node.labels {
955 if let Some(index) = self.store.get_text_index(label.as_str(), key) {
956 let mut idx = index.write();
957 if let Some(ref text) = text_val {
958 idx.insert(id, text);
959 } else {
960 idx.remove(id);
961 }
962 }
963 }
964 }
965 }
966
967 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
985 let result = self.store.add_label(id, label);
986
987 #[cfg(feature = "wal")]
988 if result {
989 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
991 id,
992 label: label.to_string(),
993 }) {
994 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
995 }
996 }
997
998 #[cfg(feature = "vector-index")]
1000 if result {
1001 let prefix = format!("{label}:");
1002 for (key, index) in self.store.vector_index_entries() {
1003 if let Some(property) = key.strip_prefix(&prefix)
1004 && let Some(node) = self.store.get_node(id)
1005 {
1006 let prop_key = grafeo_common::types::PropertyKey::new(property);
1007 if let Some(grafeo_common::types::Value::Vector(v)) =
1008 node.properties.get(&prop_key)
1009 {
1010 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1011 &self.store,
1012 property,
1013 );
1014 index.insert(id, v, &accessor);
1015 }
1016 }
1017 }
1018 }
1019
1020 #[cfg(feature = "text-index")]
1022 if result && let Some(node) = self.store.get_node(id) {
1023 for (prop_key, prop_val) in &node.properties {
1024 if let grafeo_common::types::Value::String(text) = prop_val
1025 && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
1026 {
1027 index.write().insert(id, text);
1028 }
1029 }
1030 }
1031
1032 result
1033 }
1034
1035 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
1053 #[cfg(feature = "text-index")]
1055 let text_indexes_to_clean: Vec<
1056 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
1057 > = {
1058 let prefix = format!("{label}:");
1059 self.store
1060 .text_index_entries()
1061 .into_iter()
1062 .filter(|(key, _)| key.starts_with(&prefix))
1063 .map(|(_, index)| index)
1064 .collect()
1065 };
1066
1067 let result = self.store.remove_label(id, label);
1068
1069 #[cfg(feature = "wal")]
1070 if result {
1071 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
1073 id,
1074 label: label.to_string(),
1075 }) {
1076 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
1077 }
1078 }
1079
1080 #[cfg(feature = "text-index")]
1082 if result {
1083 for index in text_indexes_to_clean {
1084 index.write().remove(id);
1085 }
1086 }
1087
1088 result
1089 }
1090
1091 #[must_use]
1108 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
1109 self.store
1110 .get_node(id)
1111 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
1112 }
1113
1114 pub fn create_edge(
1134 &self,
1135 src: grafeo_common::types::NodeId,
1136 dst: grafeo_common::types::NodeId,
1137 edge_type: &str,
1138 ) -> grafeo_common::types::EdgeId {
1139 let id = self.store.create_edge(src, dst, edge_type);
1140
1141 #[cfg(feature = "wal")]
1143 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
1144 id,
1145 src,
1146 dst,
1147 edge_type: edge_type.to_string(),
1148 }) {
1149 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
1150 }
1151
1152 #[cfg(feature = "cdc")]
1153 self.cdc_log
1154 .record_create_edge(id, self.store.current_epoch(), None);
1155
1156 id
1157 }
1158
1159 pub fn create_edge_with_props(
1163 &self,
1164 src: grafeo_common::types::NodeId,
1165 dst: grafeo_common::types::NodeId,
1166 edge_type: &str,
1167 properties: impl IntoIterator<
1168 Item = (
1169 impl Into<grafeo_common::types::PropertyKey>,
1170 impl Into<grafeo_common::types::Value>,
1171 ),
1172 >,
1173 ) -> grafeo_common::types::EdgeId {
1174 let props: Vec<(
1176 grafeo_common::types::PropertyKey,
1177 grafeo_common::types::Value,
1178 )> = properties
1179 .into_iter()
1180 .map(|(k, v)| (k.into(), v.into()))
1181 .collect();
1182
1183 let id = self.store.create_edge_with_props(
1184 src,
1185 dst,
1186 edge_type,
1187 props.iter().map(|(k, v)| (k.clone(), v.clone())),
1188 );
1189
1190 #[cfg(feature = "cdc")]
1192 let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
1193 .iter()
1194 .map(|(k, v)| (k.to_string(), v.clone()))
1195 .collect();
1196
1197 #[cfg(feature = "wal")]
1199 {
1200 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
1201 id,
1202 src,
1203 dst,
1204 edge_type: edge_type.to_string(),
1205 }) {
1206 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
1207 }
1208
1209 for (key, value) in props {
1211 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1212 id,
1213 key: key.to_string(),
1214 value,
1215 }) {
1216 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1217 }
1218 }
1219 }
1220
1221 #[cfg(feature = "cdc")]
1222 self.cdc_log.record_create_edge(
1223 id,
1224 self.store.current_epoch(),
1225 if cdc_props.is_empty() {
1226 None
1227 } else {
1228 Some(cdc_props)
1229 },
1230 );
1231
1232 id
1233 }
1234
1235 #[must_use]
1237 pub fn get_edge(
1238 &self,
1239 id: grafeo_common::types::EdgeId,
1240 ) -> Option<grafeo_core::graph::lpg::Edge> {
1241 self.store.get_edge(id)
1242 }
1243
1244 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
1248 #[cfg(feature = "cdc")]
1250 let cdc_props = self.store.get_edge(id).map(|edge| {
1251 edge.properties
1252 .iter()
1253 .map(|(k, v)| (k.to_string(), v.clone()))
1254 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
1255 });
1256
1257 let result = self.store.delete_edge(id);
1258
1259 #[cfg(feature = "wal")]
1260 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
1261 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
1262 }
1263
1264 #[cfg(feature = "cdc")]
1265 if result {
1266 self.cdc_log.record_delete(
1267 crate::cdc::EntityId::Edge(id),
1268 self.store.current_epoch(),
1269 cdc_props,
1270 );
1271 }
1272
1273 result
1274 }
1275
1276 pub fn set_edge_property(
1280 &self,
1281 id: grafeo_common::types::EdgeId,
1282 key: &str,
1283 value: grafeo_common::types::Value,
1284 ) {
1285 #[cfg(feature = "wal")]
1287 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1288 id,
1289 key: key.to_string(),
1290 value: value.clone(),
1291 }) {
1292 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1293 }
1294
1295 #[cfg(feature = "cdc")]
1297 let cdc_old_value = self
1298 .store
1299 .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
1300 #[cfg(feature = "cdc")]
1301 let cdc_new_value = value.clone();
1302
1303 self.store.set_edge_property(id, key, value);
1304
1305 #[cfg(feature = "cdc")]
1306 self.cdc_log.record_update(
1307 crate::cdc::EntityId::Edge(id),
1308 self.store.current_epoch(),
1309 key,
1310 cdc_old_value,
1311 cdc_new_value,
1312 );
1313 }
1314
1315 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
1319 let removed = self.store.remove_node_property(id, key).is_some();
1321
1322 #[cfg(feature = "text-index")]
1324 if removed && let Some(node) = self.store.get_node(id) {
1325 for label in &node.labels {
1326 if let Some(index) = self.store.get_text_index(label.as_str(), key) {
1327 index.write().remove(id);
1328 }
1329 }
1330 }
1331
1332 removed
1333 }
1334
1335 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
1339 self.store.remove_edge_property(id, key).is_some()
1341 }
1342
1343 pub fn create_property_index(&self, property: &str) {
1366 self.store.create_property_index(property);
1367 }
1368
1369 pub fn create_vector_index(
1389 &self,
1390 label: &str,
1391 property: &str,
1392 dimensions: Option<usize>,
1393 metric: Option<&str>,
1394 m: Option<usize>,
1395 ef_construction: Option<usize>,
1396 ) -> Result<()> {
1397 use grafeo_common::types::{PropertyKey, Value};
1398 use grafeo_core::index::vector::DistanceMetric;
1399
1400 let metric = match metric {
1401 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1402 grafeo_common::utils::error::Error::Internal(format!(
1403 "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
1404 m
1405 ))
1406 })?,
1407 None => DistanceMetric::Cosine,
1408 };
1409
1410 let prop_key = PropertyKey::new(property);
1412 let mut found_dims: Option<usize> = dimensions;
1413 let mut vector_count = 0usize;
1414
1415 #[cfg(feature = "vector-index")]
1416 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1417
1418 for node in self.store.nodes_with_label(label) {
1419 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1420 if let Some(expected) = found_dims {
1421 if v.len() != expected {
1422 return Err(grafeo_common::utils::error::Error::Internal(format!(
1423 "Vector dimension mismatch: expected {}, found {} on node {}",
1424 expected,
1425 v.len(),
1426 node.id.0
1427 )));
1428 }
1429 } else {
1430 found_dims = Some(v.len());
1431 }
1432 vector_count += 1;
1433 #[cfg(feature = "vector-index")]
1434 vectors.push((node.id, v.to_vec()));
1435 }
1436 }
1437
1438 let Some(dims) = found_dims else {
1439 return if let Some(d) = dimensions {
1442 #[cfg(feature = "vector-index")]
1443 {
1444 use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1445
1446 let mut config = HnswConfig::new(d, metric);
1447 if let Some(m_val) = m {
1448 config = config.with_m(m_val);
1449 }
1450 if let Some(ef_c) = ef_construction {
1451 config = config.with_ef_construction(ef_c);
1452 }
1453
1454 let index = HnswIndex::new(config);
1455 self.store
1456 .add_vector_index(label, property, Arc::new(index));
1457 }
1458
1459 let _ = (m, ef_construction);
1460 tracing::info!(
1461 "Empty vector index created: :{label}({property}) - 0 vectors, {d} dimensions, metric={metric_name}",
1462 metric_name = metric.name()
1463 );
1464 Ok(())
1465 } else {
1466 Err(grafeo_common::utils::error::Error::Internal(format!(
1467 "No vector properties found on :{label}({property}) and no dimensions specified"
1468 )))
1469 };
1470 };
1471
1472 #[cfg(feature = "vector-index")]
1474 {
1475 use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1476
1477 let mut config = HnswConfig::new(dims, metric);
1478 if let Some(m_val) = m {
1479 config = config.with_m(m_val);
1480 }
1481 if let Some(ef_c) = ef_construction {
1482 config = config.with_ef_construction(ef_c);
1483 }
1484
1485 let index = HnswIndex::with_capacity(config, vectors.len());
1486 let accessor =
1487 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1488 for (node_id, vec) in &vectors {
1489 index.insert(*node_id, vec, &accessor);
1490 }
1491
1492 self.store
1493 .add_vector_index(label, property, Arc::new(index));
1494 }
1495
1496 let _ = (m, ef_construction);
1498
1499 tracing::info!(
1500 "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1501 metric_name = metric.name()
1502 );
1503
1504 Ok(())
1505 }
1506
1507 #[cfg(feature = "vector-index")]
1515 pub fn drop_vector_index(&self, label: &str, property: &str) -> bool {
1516 let removed = self.store.remove_vector_index(label, property);
1517 if removed {
1518 tracing::info!("Vector index dropped: :{label}({property})");
1519 }
1520 removed
1521 }
1522
1523 #[cfg(feature = "vector-index")]
1534 pub fn rebuild_vector_index(&self, label: &str, property: &str) -> Result<()> {
1535 let config = self
1536 .store
1537 .get_vector_index(label, property)
1538 .map(|idx| idx.config().clone())
1539 .ok_or_else(|| {
1540 grafeo_common::utils::error::Error::Internal(format!(
1541 "No vector index found for :{label}({property}). Cannot rebuild."
1542 ))
1543 })?;
1544
1545 self.store.remove_vector_index(label, property);
1546
1547 self.create_vector_index(
1548 label,
1549 property,
1550 Some(config.dimensions),
1551 Some(config.metric.name()),
1552 Some(config.m),
1553 Some(config.ef_construction),
1554 )
1555 }
1556
1557 #[cfg(feature = "vector-index")]
1565 fn compute_filter_allowlist(
1566 &self,
1567 label: &str,
1568 filters: Option<&std::collections::HashMap<String, Value>>,
1569 ) -> Option<std::collections::HashSet<NodeId>> {
1570 let filters = filters.filter(|f| !f.is_empty())?;
1571
1572 let label_nodes: std::collections::HashSet<NodeId> =
1574 self.store.nodes_by_label(label).into_iter().collect();
1575
1576 let mut allowlist = label_nodes;
1577
1578 for (key, filter_value) in filters {
1579 let is_operator_filter = matches!(filter_value, Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')));
1581
1582 let matching: std::collections::HashSet<NodeId> = if is_operator_filter {
1583 self.store
1585 .find_nodes_matching_filter(key, filter_value)
1586 .into_iter()
1587 .collect()
1588 } else {
1589 self.store
1591 .find_nodes_by_property(key, filter_value)
1592 .into_iter()
1593 .collect()
1594 };
1595 allowlist = allowlist.intersection(&matching).copied().collect();
1596
1597 if allowlist.is_empty() {
1599 return Some(allowlist);
1600 }
1601 }
1602
1603 Some(allowlist)
1604 }
1605
1606 #[cfg(feature = "vector-index")]
1624 pub fn vector_search(
1625 &self,
1626 label: &str,
1627 property: &str,
1628 query: &[f32],
1629 k: usize,
1630 ef: Option<usize>,
1631 filters: Option<&std::collections::HashMap<String, Value>>,
1632 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1633 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1634 grafeo_common::utils::error::Error::Internal(format!(
1635 "No vector index found for :{label}({property}). Call create_vector_index() first."
1636 ))
1637 })?;
1638
1639 let accessor =
1640 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1641
1642 let results = match self.compute_filter_allowlist(label, filters) {
1643 Some(allowlist) => match ef {
1644 Some(ef_val) => {
1645 index.search_with_ef_and_filter(query, k, ef_val, &allowlist, &accessor)
1646 }
1647 None => index.search_with_filter(query, k, &allowlist, &accessor),
1648 },
1649 None => match ef {
1650 Some(ef_val) => index.search_with_ef(query, k, ef_val, &accessor),
1651 None => index.search(query, k, &accessor),
1652 },
1653 };
1654
1655 Ok(results)
1656 }
1657
1658 pub fn batch_create_nodes(
1674 &self,
1675 label: &str,
1676 property: &str,
1677 vectors: Vec<Vec<f32>>,
1678 ) -> Vec<grafeo_common::types::NodeId> {
1679 use grafeo_common::types::{PropertyKey, Value};
1680
1681 let prop_key = PropertyKey::new(property);
1682 let labels: &[&str] = &[label];
1683
1684 let ids: Vec<grafeo_common::types::NodeId> = vectors
1685 .into_iter()
1686 .map(|vec| {
1687 let value = Value::Vector(vec.into());
1688 let id = self.store.create_node_with_props(
1689 labels,
1690 std::iter::once((prop_key.clone(), value.clone())),
1691 );
1692
1693 #[cfg(feature = "wal")]
1695 {
1696 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1697 id,
1698 labels: labels.iter().map(|s| (*s).to_string()).collect(),
1699 }) {
1700 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1701 }
1702 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1703 id,
1704 key: property.to_string(),
1705 value,
1706 }) {
1707 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1708 }
1709 }
1710
1711 id
1712 })
1713 .collect();
1714
1715 #[cfg(feature = "vector-index")]
1717 if let Some(index) = self.store.get_vector_index(label, property) {
1718 let accessor =
1719 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1720 for &id in &ids {
1721 if let Some(node) = self.store.get_node(id) {
1722 let pk = grafeo_common::types::PropertyKey::new(property);
1723 if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
1724 index.insert(id, v, &accessor);
1725 }
1726 }
1727 }
1728 }
1729
1730 ids
1731 }
1732
1733 #[cfg(feature = "vector-index")]
1746 pub fn batch_vector_search(
1747 &self,
1748 label: &str,
1749 property: &str,
1750 queries: &[Vec<f32>],
1751 k: usize,
1752 ef: Option<usize>,
1753 filters: Option<&std::collections::HashMap<String, Value>>,
1754 ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1755 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1756 grafeo_common::utils::error::Error::Internal(format!(
1757 "No vector index found for :{label}({property}). Call create_vector_index() first."
1758 ))
1759 })?;
1760
1761 let accessor =
1762 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1763
1764 let results = match self.compute_filter_allowlist(label, filters) {
1765 Some(allowlist) => match ef {
1766 Some(ef_val) => {
1767 index.batch_search_with_ef_and_filter(queries, k, ef_val, &allowlist, &accessor)
1768 }
1769 None => index.batch_search_with_filter(queries, k, &allowlist, &accessor),
1770 },
1771 None => match ef {
1772 Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val, &accessor),
1773 None => index.batch_search(queries, k, &accessor),
1774 },
1775 };
1776
1777 Ok(results)
1778 }
1779
1780 #[cfg(feature = "vector-index")]
1803 #[allow(clippy::too_many_arguments)]
1804 pub fn mmr_search(
1805 &self,
1806 label: &str,
1807 property: &str,
1808 query: &[f32],
1809 k: usize,
1810 fetch_k: Option<usize>,
1811 lambda: Option<f32>,
1812 ef: Option<usize>,
1813 filters: Option<&std::collections::HashMap<String, Value>>,
1814 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1815 use grafeo_core::index::vector::mmr_select;
1816
1817 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1818 grafeo_common::utils::error::Error::Internal(format!(
1819 "No vector index found for :{label}({property}). Call create_vector_index() first."
1820 ))
1821 })?;
1822
1823 let accessor =
1824 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1825
1826 let fetch_k = fetch_k.unwrap_or(k.saturating_mul(4).max(k));
1827 let lambda = lambda.unwrap_or(0.5);
1828
1829 let initial_results = match self.compute_filter_allowlist(label, filters) {
1831 Some(allowlist) => match ef {
1832 Some(ef_val) => {
1833 index.search_with_ef_and_filter(query, fetch_k, ef_val, &allowlist, &accessor)
1834 }
1835 None => index.search_with_filter(query, fetch_k, &allowlist, &accessor),
1836 },
1837 None => match ef {
1838 Some(ef_val) => index.search_with_ef(query, fetch_k, ef_val, &accessor),
1839 None => index.search(query, fetch_k, &accessor),
1840 },
1841 };
1842
1843 if initial_results.is_empty() {
1844 return Ok(Vec::new());
1845 }
1846
1847 use grafeo_core::index::vector::VectorAccessor;
1849 let candidates: Vec<(grafeo_common::types::NodeId, f32, std::sync::Arc<[f32]>)> =
1850 initial_results
1851 .into_iter()
1852 .filter_map(|(id, dist)| accessor.get_vector(id).map(|vec| (id, dist, vec)))
1853 .collect();
1854
1855 let candidate_refs: Vec<(grafeo_common::types::NodeId, f32, &[f32])> = candidates
1857 .iter()
1858 .map(|(id, dist, vec)| (*id, *dist, vec.as_ref()))
1859 .collect();
1860
1861 let metric = index.config().metric;
1863 Ok(mmr_select(query, &candidate_refs, k, lambda, metric))
1864 }
1865
1866 #[cfg(feature = "text-index")]
1879 pub fn create_text_index(&self, label: &str, property: &str) -> Result<()> {
1880 use grafeo_common::types::PropertyKey;
1881 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1882
1883 let mut index = InvertedIndex::new(BM25Config::default());
1884 let prop_key = PropertyKey::new(property);
1885
1886 let nodes = self.store.nodes_by_label(label);
1888 for node_id in nodes {
1889 if let Some(Value::String(text)) = self.store.get_node_property(node_id, &prop_key) {
1890 index.insert(node_id, text.as_str());
1891 }
1892 }
1893
1894 self.store
1895 .add_text_index(label, property, Arc::new(RwLock::new(index)));
1896 Ok(())
1897 }
1898
1899 #[cfg(feature = "text-index")]
1903 pub fn drop_text_index(&self, label: &str, property: &str) -> bool {
1904 self.store.remove_text_index(label, property)
1905 }
1906
1907 #[cfg(feature = "text-index")]
1915 pub fn rebuild_text_index(&self, label: &str, property: &str) -> Result<()> {
1916 self.store.remove_text_index(label, property);
1917 self.create_text_index(label, property)
1918 }
1919
1920 #[cfg(feature = "text-index")]
1929 pub fn text_search(
1930 &self,
1931 label: &str,
1932 property: &str,
1933 query: &str,
1934 k: usize,
1935 ) -> Result<Vec<(NodeId, f64)>> {
1936 let index = self.store.get_text_index(label, property).ok_or_else(|| {
1937 Error::Internal(format!(
1938 "No text index found for :{label}({property}). Call create_text_index() first."
1939 ))
1940 })?;
1941
1942 Ok(index.read().search(query, k))
1943 }
1944
1945 #[cfg(feature = "hybrid-search")]
1964 #[allow(clippy::too_many_arguments)]
1965 pub fn hybrid_search(
1966 &self,
1967 label: &str,
1968 text_property: &str,
1969 vector_property: &str,
1970 query_text: &str,
1971 query_vector: Option<&[f32]>,
1972 k: usize,
1973 fusion: Option<grafeo_core::index::text::FusionMethod>,
1974 ) -> Result<Vec<(NodeId, f64)>> {
1975 use grafeo_core::index::text::fuse_results;
1976
1977 let fusion_method = fusion.unwrap_or_default();
1978 let mut sources: Vec<Vec<(NodeId, f64)>> = Vec::new();
1979
1980 if let Some(text_index) = self.store.get_text_index(label, text_property) {
1982 let text_results = text_index.read().search(query_text, k * 2);
1983 if !text_results.is_empty() {
1984 sources.push(text_results);
1985 }
1986 }
1987
1988 if let Some(query_vec) = query_vector
1990 && let Some(vector_index) = self.store.get_vector_index(label, vector_property)
1991 {
1992 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1993 &self.store,
1994 vector_property,
1995 );
1996 let vector_results = vector_index.search(query_vec, k * 2, &accessor);
1997 if !vector_results.is_empty() {
1998 sources.push(
1999 vector_results
2000 .into_iter()
2001 .map(|(id, dist)| (id, f64::from(dist)))
2002 .collect(),
2003 );
2004 }
2005 }
2006
2007 if sources.is_empty() {
2008 return Ok(Vec::new());
2009 }
2010
2011 Ok(fuse_results(&sources, &fusion_method, k))
2012 }
2013
2014 #[cfg(feature = "embed")]
2021 pub fn register_embedding_model(
2022 &self,
2023 name: &str,
2024 model: Arc<dyn crate::embedding::EmbeddingModel>,
2025 ) {
2026 self.embedding_models
2027 .write()
2028 .insert(name.to_string(), model);
2029 }
2030
2031 #[cfg(feature = "embed")]
2037 pub fn embed_text(&self, model_name: &str, texts: &[&str]) -> Result<Vec<Vec<f32>>> {
2038 let models = self.embedding_models.read();
2039 let model = models.get(model_name).ok_or_else(|| {
2040 grafeo_common::utils::error::Error::Internal(format!(
2041 "Embedding model '{}' not registered",
2042 model_name
2043 ))
2044 })?;
2045 model.embed(texts)
2046 }
2047
2048 #[cfg(all(feature = "embed", feature = "vector-index"))]
2058 pub fn vector_search_text(
2059 &self,
2060 label: &str,
2061 property: &str,
2062 model_name: &str,
2063 query_text: &str,
2064 k: usize,
2065 ef: Option<usize>,
2066 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
2067 let vectors = self.embed_text(model_name, &[query_text])?;
2068 let query_vec = vectors.into_iter().next().ok_or_else(|| {
2069 grafeo_common::utils::error::Error::Internal(
2070 "Embedding model returned no vectors".to_string(),
2071 )
2072 })?;
2073 self.vector_search(label, property, &query_vec, k, ef, None)
2074 }
2075
2076 #[cfg(feature = "cdc")]
2086 pub fn history(
2087 &self,
2088 entity_id: impl Into<crate::cdc::EntityId>,
2089 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2090 Ok(self.cdc_log.history(entity_id.into()))
2091 }
2092
2093 #[cfg(feature = "cdc")]
2095 pub fn history_since(
2096 &self,
2097 entity_id: impl Into<crate::cdc::EntityId>,
2098 since_epoch: grafeo_common::types::EpochId,
2099 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2100 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
2101 }
2102
2103 #[cfg(feature = "cdc")]
2105 pub fn changes_between(
2106 &self,
2107 start_epoch: grafeo_common::types::EpochId,
2108 end_epoch: grafeo_common::types::EpochId,
2109 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2110 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
2111 }
2112
2113 pub fn drop_property_index(&self, property: &str) -> bool {
2119 self.store.drop_property_index(property)
2120 }
2121
2122 #[must_use]
2124 pub fn has_property_index(&self, property: &str) -> bool {
2125 self.store.has_property_index(property)
2126 }
2127
2128 #[must_use]
2146 pub fn find_nodes_by_property(
2147 &self,
2148 property: &str,
2149 value: &grafeo_common::types::Value,
2150 ) -> Vec<grafeo_common::types::NodeId> {
2151 self.store.find_nodes_by_property(property, value)
2152 }
2153
2154 #[must_use]
2162 pub fn is_persistent(&self) -> bool {
2163 self.config.path.is_some()
2164 }
2165
2166 #[must_use]
2170 pub fn path(&self) -> Option<&Path> {
2171 self.config.path.as_deref()
2172 }
2173
2174 #[must_use]
2178 pub fn info(&self) -> crate::admin::DatabaseInfo {
2179 crate::admin::DatabaseInfo {
2180 mode: crate::admin::DatabaseMode::Lpg,
2181 node_count: self.store.node_count(),
2182 edge_count: self.store.edge_count(),
2183 is_persistent: self.is_persistent(),
2184 path: self.config.path.clone(),
2185 wal_enabled: self.config.wal_enabled,
2186 version: env!("CARGO_PKG_VERSION").to_string(),
2187 }
2188 }
2189
2190 #[must_use]
2194 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2195 #[cfg(feature = "wal")]
2196 let disk_bytes = self.config.path.as_ref().and_then(|p| {
2197 if p.exists() {
2198 Self::calculate_disk_usage(p).ok()
2199 } else {
2200 None
2201 }
2202 });
2203 #[cfg(not(feature = "wal"))]
2204 let disk_bytes: Option<usize> = None;
2205
2206 crate::admin::DatabaseStats {
2207 node_count: self.store.node_count(),
2208 edge_count: self.store.edge_count(),
2209 label_count: self.store.label_count(),
2210 edge_type_count: self.store.edge_type_count(),
2211 property_key_count: self.store.property_key_count(),
2212 index_count: 0, memory_bytes: self.buffer_manager.allocated(),
2214 disk_bytes,
2215 }
2216 }
2217
2218 #[cfg(feature = "wal")]
2220 fn calculate_disk_usage(path: &Path) -> Result<usize> {
2221 let mut total = 0usize;
2222 if path.is_dir() {
2223 for entry in std::fs::read_dir(path)? {
2224 let entry = entry?;
2225 let metadata = entry.metadata()?;
2226 if metadata.is_file() {
2227 total += metadata.len() as usize;
2228 } else if metadata.is_dir() {
2229 total += Self::calculate_disk_usage(&entry.path())?;
2230 }
2231 }
2232 }
2233 Ok(total)
2234 }
2235
2236 #[must_use]
2241 pub fn schema(&self) -> crate::admin::SchemaInfo {
2242 let labels = self
2243 .store
2244 .all_labels()
2245 .into_iter()
2246 .map(|name| crate::admin::LabelInfo {
2247 name: name.clone(),
2248 count: self.store.nodes_with_label(&name).count(),
2249 })
2250 .collect();
2251
2252 let edge_types = self
2253 .store
2254 .all_edge_types()
2255 .into_iter()
2256 .map(|name| crate::admin::EdgeTypeInfo {
2257 name: name.clone(),
2258 count: self.store.edges_with_type(&name).count(),
2259 })
2260 .collect();
2261
2262 let property_keys = self.store.all_property_keys();
2263
2264 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
2265 labels,
2266 edge_types,
2267 property_keys,
2268 })
2269 }
2270
2271 #[cfg(feature = "rdf")]
2275 #[must_use]
2276 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
2277 let stats = self.rdf_store.stats();
2278
2279 let predicates = self
2280 .rdf_store
2281 .predicates()
2282 .into_iter()
2283 .map(|predicate| {
2284 let count = self.rdf_store.triples_with_predicate(&predicate).len();
2285 crate::admin::PredicateInfo {
2286 iri: predicate.to_string(),
2287 count,
2288 }
2289 })
2290 .collect();
2291
2292 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
2293 predicates,
2294 named_graphs: Vec::new(), subject_count: stats.subject_count,
2296 object_count: stats.object_count,
2297 })
2298 }
2299
2300 #[must_use]
2308 pub fn validate(&self) -> crate::admin::ValidationResult {
2309 let mut result = crate::admin::ValidationResult::default();
2310
2311 for edge in self.store.all_edges() {
2313 if self.store.get_node(edge.src).is_none() {
2314 result.errors.push(crate::admin::ValidationError {
2315 code: "DANGLING_SRC".to_string(),
2316 message: format!(
2317 "Edge {} references non-existent source node {}",
2318 edge.id.0, edge.src.0
2319 ),
2320 context: Some(format!("edge:{}", edge.id.0)),
2321 });
2322 }
2323 if self.store.get_node(edge.dst).is_none() {
2324 result.errors.push(crate::admin::ValidationError {
2325 code: "DANGLING_DST".to_string(),
2326 message: format!(
2327 "Edge {} references non-existent destination node {}",
2328 edge.id.0, edge.dst.0
2329 ),
2330 context: Some(format!("edge:{}", edge.id.0)),
2331 });
2332 }
2333 }
2334
2335 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
2337 result.warnings.push(crate::admin::ValidationWarning {
2338 code: "NO_EDGES".to_string(),
2339 message: "Database has nodes but no edges".to_string(),
2340 context: None,
2341 });
2342 }
2343
2344 result
2345 }
2346
2347 #[must_use]
2351 pub fn wal_status(&self) -> crate::admin::WalStatus {
2352 #[cfg(feature = "wal")]
2353 if let Some(ref wal) = self.wal {
2354 return crate::admin::WalStatus {
2355 enabled: true,
2356 path: self.config.path.as_ref().map(|p| p.join("wal")),
2357 size_bytes: wal.size_bytes(),
2358 record_count: wal.record_count() as usize,
2359 last_checkpoint: wal.last_checkpoint_timestamp(),
2360 current_epoch: self.store.current_epoch().as_u64(),
2361 };
2362 }
2363
2364 crate::admin::WalStatus {
2365 enabled: false,
2366 path: None,
2367 size_bytes: 0,
2368 record_count: 0,
2369 last_checkpoint: None,
2370 current_epoch: self.store.current_epoch().as_u64(),
2371 }
2372 }
2373
2374 pub fn wal_checkpoint(&self) -> Result<()> {
2382 #[cfg(feature = "wal")]
2383 if let Some(ref wal) = self.wal {
2384 let epoch = self.store.current_epoch();
2385 let tx_id = self
2386 .tx_manager
2387 .last_assigned_tx_id()
2388 .unwrap_or_else(|| self.tx_manager.begin());
2389 wal.checkpoint(tx_id, epoch)?;
2390 wal.sync()?;
2391 }
2392 Ok(())
2393 }
2394
2395 #[cfg(feature = "wal")]
2412 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
2413 let path = path.as_ref();
2414
2415 let target_config = Config::persistent(path);
2417 let target = Self::with_config(target_config)?;
2418
2419 for node in self.store.all_nodes() {
2421 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
2422 target.store.create_node_with_id(node.id, &label_refs);
2423
2424 target.log_wal(&WalRecord::CreateNode {
2426 id: node.id,
2427 labels: node.labels.iter().map(|s| s.to_string()).collect(),
2428 })?;
2429
2430 for (key, value) in node.properties {
2432 target
2433 .store
2434 .set_node_property(node.id, key.as_str(), value.clone());
2435 target.log_wal(&WalRecord::SetNodeProperty {
2436 id: node.id,
2437 key: key.to_string(),
2438 value,
2439 })?;
2440 }
2441 }
2442
2443 for edge in self.store.all_edges() {
2445 target
2446 .store
2447 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2448
2449 target.log_wal(&WalRecord::CreateEdge {
2451 id: edge.id,
2452 src: edge.src,
2453 dst: edge.dst,
2454 edge_type: edge.edge_type.to_string(),
2455 })?;
2456
2457 for (key, value) in edge.properties {
2459 target
2460 .store
2461 .set_edge_property(edge.id, key.as_str(), value.clone());
2462 target.log_wal(&WalRecord::SetEdgeProperty {
2463 id: edge.id,
2464 key: key.to_string(),
2465 value,
2466 })?;
2467 }
2468 }
2469
2470 target.close()?;
2472
2473 Ok(())
2474 }
2475
2476 pub fn to_memory(&self) -> Result<Self> {
2487 let config = Config::in_memory();
2488 let target = Self::with_config(config)?;
2489
2490 for node in self.store.all_nodes() {
2492 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
2493 target.store.create_node_with_id(node.id, &label_refs);
2494
2495 for (key, value) in node.properties {
2497 target.store.set_node_property(node.id, key.as_str(), value);
2498 }
2499 }
2500
2501 for edge in self.store.all_edges() {
2503 target
2504 .store
2505 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2506
2507 for (key, value) in edge.properties {
2509 target.store.set_edge_property(edge.id, key.as_str(), value);
2510 }
2511 }
2512
2513 Ok(target)
2514 }
2515
2516 #[cfg(feature = "wal")]
2525 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
2526 let source = Self::open(path)?;
2528
2529 let target = source.to_memory()?;
2531
2532 source.close()?;
2534
2535 Ok(target)
2536 }
2537
2538 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
2551 let nodes: Vec<SnapshotNode> = self
2552 .store
2553 .all_nodes()
2554 .map(|n| SnapshotNode {
2555 id: n.id,
2556 labels: n.labels.iter().map(|l| l.to_string()).collect(),
2557 properties: n
2558 .properties
2559 .into_iter()
2560 .map(|(k, v)| (k.to_string(), v))
2561 .collect(),
2562 })
2563 .collect();
2564
2565 let edges: Vec<SnapshotEdge> = self
2566 .store
2567 .all_edges()
2568 .map(|e| SnapshotEdge {
2569 id: e.id,
2570 src: e.src,
2571 dst: e.dst,
2572 edge_type: e.edge_type.to_string(),
2573 properties: e
2574 .properties
2575 .into_iter()
2576 .map(|(k, v)| (k.to_string(), v))
2577 .collect(),
2578 })
2579 .collect();
2580
2581 let snapshot = Snapshot {
2582 version: 1,
2583 nodes,
2584 edges,
2585 };
2586
2587 let config = bincode::config::standard();
2588 bincode::serde::encode_to_vec(&snapshot, config)
2589 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
2590 }
2591
2592 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
2600 let config = bincode::config::standard();
2601 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
2602 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
2603
2604 if snapshot.version != 1 {
2605 return Err(Error::Internal(format!(
2606 "unsupported snapshot version: {}",
2607 snapshot.version
2608 )));
2609 }
2610
2611 let db = Self::new_in_memory();
2612
2613 for node in snapshot.nodes {
2614 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
2615 db.store.create_node_with_id(node.id, &label_refs);
2616 for (key, value) in node.properties {
2617 db.store.set_node_property(node.id, &key, value);
2618 }
2619 }
2620
2621 for edge in snapshot.edges {
2622 db.store
2623 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2624 for (key, value) in edge.properties {
2625 db.store.set_edge_property(edge.id, &key, value);
2626 }
2627 }
2628
2629 Ok(db)
2630 }
2631
2632 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
2640 self.store.all_nodes()
2641 }
2642
2643 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
2647 self.store.all_edges()
2648 }
2649}
2650
2651#[derive(serde::Serialize, serde::Deserialize)]
2653struct Snapshot {
2654 version: u8,
2655 nodes: Vec<SnapshotNode>,
2656 edges: Vec<SnapshotEdge>,
2657}
2658
2659#[derive(serde::Serialize, serde::Deserialize)]
2660struct SnapshotNode {
2661 id: NodeId,
2662 labels: Vec<String>,
2663 properties: Vec<(String, Value)>,
2664}
2665
2666#[derive(serde::Serialize, serde::Deserialize)]
2667struct SnapshotEdge {
2668 id: EdgeId,
2669 src: NodeId,
2670 dst: NodeId,
2671 edge_type: String,
2672 properties: Vec<(String, Value)>,
2673}
2674
2675impl Drop for GrafeoDB {
2676 fn drop(&mut self) {
2677 if let Err(e) = self.close() {
2678 tracing::error!("Error closing database: {}", e);
2679 }
2680 }
2681}
2682
2683impl crate::admin::AdminService for GrafeoDB {
2684 fn info(&self) -> crate::admin::DatabaseInfo {
2685 self.info()
2686 }
2687
2688 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2689 self.detailed_stats()
2690 }
2691
2692 fn schema(&self) -> crate::admin::SchemaInfo {
2693 self.schema()
2694 }
2695
2696 fn validate(&self) -> crate::admin::ValidationResult {
2697 self.validate()
2698 }
2699
2700 fn wal_status(&self) -> crate::admin::WalStatus {
2701 self.wal_status()
2702 }
2703
2704 fn wal_checkpoint(&self) -> Result<()> {
2705 self.wal_checkpoint()
2706 }
2707}
2708
2709#[derive(Debug)]
2735pub struct QueryResult {
2736 pub columns: Vec<String>,
2738 pub column_types: Vec<grafeo_common::types::LogicalType>,
2740 pub rows: Vec<Vec<grafeo_common::types::Value>>,
2742 pub execution_time_ms: Option<f64>,
2744 pub rows_scanned: Option<u64>,
2746}
2747
2748impl QueryResult {
2749 #[must_use]
2751 pub fn new(columns: Vec<String>) -> Self {
2752 let len = columns.len();
2753 Self {
2754 columns,
2755 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2756 rows: Vec::new(),
2757 execution_time_ms: None,
2758 rows_scanned: None,
2759 }
2760 }
2761
2762 #[must_use]
2764 pub fn with_types(
2765 columns: Vec<String>,
2766 column_types: Vec<grafeo_common::types::LogicalType>,
2767 ) -> Self {
2768 Self {
2769 columns,
2770 column_types,
2771 rows: Vec::new(),
2772 execution_time_ms: None,
2773 rows_scanned: None,
2774 }
2775 }
2776
2777 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2779 self.execution_time_ms = Some(execution_time_ms);
2780 self.rows_scanned = Some(rows_scanned);
2781 self
2782 }
2783
2784 #[must_use]
2786 pub fn execution_time_ms(&self) -> Option<f64> {
2787 self.execution_time_ms
2788 }
2789
2790 #[must_use]
2792 pub fn rows_scanned(&self) -> Option<u64> {
2793 self.rows_scanned
2794 }
2795
2796 #[must_use]
2798 pub fn row_count(&self) -> usize {
2799 self.rows.len()
2800 }
2801
2802 #[must_use]
2804 pub fn column_count(&self) -> usize {
2805 self.columns.len()
2806 }
2807
2808 #[must_use]
2810 pub fn is_empty(&self) -> bool {
2811 self.rows.is_empty()
2812 }
2813
2814 pub fn scalar<T: FromValue>(&self) -> Result<T> {
2823 if self.rows.len() != 1 || self.columns.len() != 1 {
2824 return Err(grafeo_common::utils::error::Error::InvalidValue(
2825 "Expected single value".to_string(),
2826 ));
2827 }
2828 T::from_value(&self.rows[0][0])
2829 }
2830
2831 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2833 self.rows.iter()
2834 }
2835}
2836
2837pub trait FromValue: Sized {
2842 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2844}
2845
2846impl FromValue for i64 {
2847 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2848 value
2849 .as_int64()
2850 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2851 expected: "INT64".to_string(),
2852 found: value.type_name().to_string(),
2853 })
2854 }
2855}
2856
2857impl FromValue for f64 {
2858 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2859 value
2860 .as_float64()
2861 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2862 expected: "FLOAT64".to_string(),
2863 found: value.type_name().to_string(),
2864 })
2865 }
2866}
2867
2868impl FromValue for String {
2869 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2870 value.as_str().map(String::from).ok_or_else(|| {
2871 grafeo_common::utils::error::Error::TypeMismatch {
2872 expected: "STRING".to_string(),
2873 found: value.type_name().to_string(),
2874 }
2875 })
2876 }
2877}
2878
2879impl FromValue for bool {
2880 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2881 value
2882 .as_bool()
2883 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2884 expected: "BOOL".to_string(),
2885 found: value.type_name().to_string(),
2886 })
2887 }
2888}
2889
2890#[cfg(test)]
2891mod tests {
2892 use super::*;
2893
2894 #[test]
2895 fn test_create_in_memory_database() {
2896 let db = GrafeoDB::new_in_memory();
2897 assert_eq!(db.node_count(), 0);
2898 assert_eq!(db.edge_count(), 0);
2899 }
2900
2901 #[test]
2902 fn test_database_config() {
2903 let config = Config::in_memory().with_threads(4).with_query_logging();
2904
2905 let db = GrafeoDB::with_config(config).unwrap();
2906 assert_eq!(db.config().threads, 4);
2907 assert!(db.config().query_logging);
2908 }
2909
2910 #[test]
2911 fn test_database_session() {
2912 let db = GrafeoDB::new_in_memory();
2913 let _session = db.session();
2914 }
2916
2917 #[cfg(feature = "wal")]
2918 #[test]
2919 fn test_persistent_database_recovery() {
2920 use grafeo_common::types::Value;
2921 use tempfile::tempdir;
2922
2923 let dir = tempdir().unwrap();
2924 let db_path = dir.path().join("test_db");
2925
2926 {
2928 let db = GrafeoDB::open(&db_path).unwrap();
2929
2930 let alice = db.create_node(&["Person"]);
2931 db.set_node_property(alice, "name", Value::from("Alice"));
2932
2933 let bob = db.create_node(&["Person"]);
2934 db.set_node_property(bob, "name", Value::from("Bob"));
2935
2936 let _edge = db.create_edge(alice, bob, "KNOWS");
2937
2938 db.close().unwrap();
2940 }
2941
2942 {
2944 let db = GrafeoDB::open(&db_path).unwrap();
2945
2946 assert_eq!(db.node_count(), 2);
2947 assert_eq!(db.edge_count(), 1);
2948
2949 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2951 assert!(node0.is_some());
2952
2953 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2954 assert!(node1.is_some());
2955 }
2956 }
2957
2958 #[cfg(feature = "wal")]
2959 #[test]
2960 fn test_wal_logging() {
2961 use tempfile::tempdir;
2962
2963 let dir = tempdir().unwrap();
2964 let db_path = dir.path().join("wal_test_db");
2965
2966 let db = GrafeoDB::open(&db_path).unwrap();
2967
2968 let node = db.create_node(&["Test"]);
2970 db.delete_node(node);
2971
2972 if let Some(wal) = db.wal() {
2974 assert!(wal.record_count() > 0);
2975 }
2976
2977 db.close().unwrap();
2978 }
2979
2980 #[cfg(feature = "wal")]
2981 #[test]
2982 fn test_wal_recovery_multiple_sessions() {
2983 use grafeo_common::types::Value;
2985 use tempfile::tempdir;
2986
2987 let dir = tempdir().unwrap();
2988 let db_path = dir.path().join("multi_session_db");
2989
2990 {
2992 let db = GrafeoDB::open(&db_path).unwrap();
2993 let alice = db.create_node(&["Person"]);
2994 db.set_node_property(alice, "name", Value::from("Alice"));
2995 db.close().unwrap();
2996 }
2997
2998 {
3000 let db = GrafeoDB::open(&db_path).unwrap();
3001 assert_eq!(db.node_count(), 1); let bob = db.create_node(&["Person"]);
3003 db.set_node_property(bob, "name", Value::from("Bob"));
3004 db.close().unwrap();
3005 }
3006
3007 {
3009 let db = GrafeoDB::open(&db_path).unwrap();
3010 assert_eq!(db.node_count(), 2);
3011
3012 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
3014 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
3015
3016 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
3017 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
3018 }
3019 }
3020
3021 #[cfg(feature = "wal")]
3022 #[test]
3023 fn test_database_consistency_after_mutations() {
3024 use grafeo_common::types::Value;
3026 use tempfile::tempdir;
3027
3028 let dir = tempdir().unwrap();
3029 let db_path = dir.path().join("consistency_db");
3030
3031 {
3032 let db = GrafeoDB::open(&db_path).unwrap();
3033
3034 let a = db.create_node(&["Node"]);
3036 let b = db.create_node(&["Node"]);
3037 let c = db.create_node(&["Node"]);
3038
3039 let e1 = db.create_edge(a, b, "LINKS");
3041 let _e2 = db.create_edge(b, c, "LINKS");
3042
3043 db.delete_edge(e1);
3045 db.delete_node(b);
3046
3047 db.set_node_property(a, "value", Value::Int64(1));
3049 db.set_node_property(c, "value", Value::Int64(3));
3050
3051 db.close().unwrap();
3052 }
3053
3054 {
3056 let db = GrafeoDB::open(&db_path).unwrap();
3057
3058 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
3062 assert!(node_a.is_some());
3063
3064 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
3065 assert!(node_c.is_some());
3066
3067 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
3069 assert!(node_b.is_none());
3070 }
3071 }
3072
3073 #[cfg(feature = "wal")]
3074 #[test]
3075 fn test_close_is_idempotent() {
3076 use tempfile::tempdir;
3078
3079 let dir = tempdir().unwrap();
3080 let db_path = dir.path().join("close_test_db");
3081
3082 let db = GrafeoDB::open(&db_path).unwrap();
3083 db.create_node(&["Test"]);
3084
3085 assert!(db.close().is_ok());
3087
3088 assert!(db.close().is_ok());
3090 }
3091
3092 #[test]
3093 fn test_query_result_has_metrics() {
3094 let db = GrafeoDB::new_in_memory();
3096 db.create_node(&["Person"]);
3097 db.create_node(&["Person"]);
3098
3099 #[cfg(feature = "gql")]
3100 {
3101 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3102
3103 assert!(result.execution_time_ms.is_some());
3105 assert!(result.rows_scanned.is_some());
3106 assert!(result.execution_time_ms.unwrap() >= 0.0);
3107 assert_eq!(result.rows_scanned.unwrap(), 2);
3108 }
3109 }
3110
3111 #[test]
3112 fn test_empty_query_result_metrics() {
3113 let db = GrafeoDB::new_in_memory();
3115 db.create_node(&["Person"]);
3116
3117 #[cfg(feature = "gql")]
3118 {
3119 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3121
3122 assert!(result.execution_time_ms.is_some());
3123 assert!(result.rows_scanned.is_some());
3124 assert_eq!(result.rows_scanned.unwrap(), 0);
3125 }
3126 }
3127
3128 #[cfg(feature = "cdc")]
3129 mod cdc_integration {
3130 use super::*;
3131
3132 #[test]
3133 fn test_node_lifecycle_history() {
3134 let db = GrafeoDB::new_in_memory();
3135
3136 let id = db.create_node(&["Person"]);
3138 db.set_node_property(id, "name", "Alice".into());
3140 db.set_node_property(id, "name", "Bob".into());
3141 db.delete_node(id);
3143
3144 let history = db.history(id).unwrap();
3145 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3147 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3148 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3150 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3152 }
3153
3154 #[test]
3155 fn test_edge_lifecycle_history() {
3156 let db = GrafeoDB::new_in_memory();
3157
3158 let alice = db.create_node(&["Person"]);
3159 let bob = db.create_node(&["Person"]);
3160 let edge = db.create_edge(alice, bob, "KNOWS");
3161 db.set_edge_property(edge, "since", 2024i64.into());
3162 db.delete_edge(edge);
3163
3164 let history = db.history(edge).unwrap();
3165 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3167 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3168 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3169 }
3170
3171 #[test]
3172 fn test_create_node_with_props_cdc() {
3173 let db = GrafeoDB::new_in_memory();
3174
3175 let id = db.create_node_with_props(
3176 &["Person"],
3177 vec![
3178 ("name", grafeo_common::types::Value::from("Alice")),
3179 ("age", grafeo_common::types::Value::from(30i64)),
3180 ],
3181 );
3182
3183 let history = db.history(id).unwrap();
3184 assert_eq!(history.len(), 1);
3185 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3186 let after = history[0].after.as_ref().unwrap();
3188 assert_eq!(after.len(), 2);
3189 }
3190
3191 #[test]
3192 fn test_changes_between() {
3193 let db = GrafeoDB::new_in_memory();
3194
3195 let id1 = db.create_node(&["A"]);
3196 let _id2 = db.create_node(&["B"]);
3197 db.set_node_property(id1, "x", 1i64.into());
3198
3199 let changes = db
3201 .changes_between(
3202 grafeo_common::types::EpochId(0),
3203 grafeo_common::types::EpochId(u64::MAX),
3204 )
3205 .unwrap();
3206 assert_eq!(changes.len(), 3); }
3208 }
3209}