1use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::AtomicUsize;
8
9use parking_lot::RwLock;
10
11#[cfg(feature = "wal")]
12use grafeo_adapters::storage::wal::{
13 DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
14};
15use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
16use grafeo_common::types::{EdgeId, NodeId, Value};
17use grafeo_common::utils::error::{Error, Result};
18use grafeo_core::graph::lpg::LpgStore;
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::config::Config;
23use crate::query::cache::QueryCache;
24use crate::session::Session;
25use crate::transaction::TransactionManager;
26
27pub struct GrafeoDB {
50 config: Config,
52 store: Arc<LpgStore>,
54 #[cfg(feature = "rdf")]
56 rdf_store: Arc<RdfStore>,
57 tx_manager: Arc<TransactionManager>,
59 buffer_manager: Arc<BufferManager>,
61 #[cfg(feature = "wal")]
63 wal: Option<Arc<WalManager>>,
64 query_cache: Arc<QueryCache>,
66 commit_counter: Arc<AtomicUsize>,
68 is_open: RwLock<bool>,
70 #[cfg(feature = "cdc")]
72 cdc_log: Arc<crate::cdc::CdcLog>,
73 #[cfg(feature = "embed")]
75 embedding_models: RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
76}
77
78impl GrafeoDB {
79 #[must_use]
95 pub fn new_in_memory() -> Self {
96 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
97 }
98
99 #[cfg(feature = "wal")]
118 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
119 Self::with_config(Config::persistent(path.as_ref()))
120 }
121
122 pub fn with_config(config: Config) -> Result<Self> {
146 config
148 .validate()
149 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
150
151 let store = Arc::new(LpgStore::new());
152 #[cfg(feature = "rdf")]
153 let rdf_store = Arc::new(RdfStore::new());
154 let tx_manager = Arc::new(TransactionManager::new());
155
156 let buffer_config = BufferManagerConfig {
158 budget: config.memory_limit.unwrap_or_else(|| {
159 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
160 }),
161 spill_path: config
162 .spill_path
163 .clone()
164 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
165 ..BufferManagerConfig::default()
166 };
167 let buffer_manager = BufferManager::new(buffer_config);
168
169 #[cfg(feature = "wal")]
171 let wal = if config.wal_enabled {
172 if let Some(ref db_path) = config.path {
173 std::fs::create_dir_all(db_path)?;
175
176 let wal_path = db_path.join("wal");
177
178 if wal_path.exists() {
180 let recovery = WalRecovery::new(&wal_path);
181 let records = recovery.recover()?;
182 Self::apply_wal_records(&store, &records)?;
183 }
184
185 let wal_durability = match config.wal_durability {
187 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
188 crate::config::DurabilityMode::Batch {
189 max_delay_ms,
190 max_records,
191 } => WalDurabilityMode::Batch {
192 max_delay_ms,
193 max_records,
194 },
195 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
196 WalDurabilityMode::Adaptive { target_interval_ms }
197 }
198 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
199 };
200 let wal_config = WalConfig {
201 durability: wal_durability,
202 ..WalConfig::default()
203 };
204 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
205 Some(Arc::new(wal_manager))
206 } else {
207 None
208 }
209 } else {
210 None
211 };
212
213 let query_cache = Arc::new(QueryCache::default());
215
216 Ok(Self {
217 config,
218 store,
219 #[cfg(feature = "rdf")]
220 rdf_store,
221 tx_manager,
222 buffer_manager,
223 #[cfg(feature = "wal")]
224 wal,
225 query_cache,
226 commit_counter: Arc::new(AtomicUsize::new(0)),
227 is_open: RwLock::new(true),
228 #[cfg(feature = "cdc")]
229 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
230 #[cfg(feature = "embed")]
231 embedding_models: RwLock::new(hashbrown::HashMap::new()),
232 })
233 }
234
235 #[cfg(feature = "wal")]
237 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
238 for record in records {
239 match record {
240 WalRecord::CreateNode { id, labels } => {
241 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
242 store.create_node_with_id(*id, &label_refs);
243 }
244 WalRecord::DeleteNode { id } => {
245 store.delete_node(*id);
246 }
247 WalRecord::CreateEdge {
248 id,
249 src,
250 dst,
251 edge_type,
252 } => {
253 store.create_edge_with_id(*id, *src, *dst, edge_type);
254 }
255 WalRecord::DeleteEdge { id } => {
256 store.delete_edge(*id);
257 }
258 WalRecord::SetNodeProperty { id, key, value } => {
259 store.set_node_property(*id, key, value.clone());
260 }
261 WalRecord::SetEdgeProperty { id, key, value } => {
262 store.set_edge_property(*id, key, value.clone());
263 }
264 WalRecord::AddNodeLabel { id, label } => {
265 store.add_label(*id, label);
266 }
267 WalRecord::RemoveNodeLabel { id, label } => {
268 store.remove_label(*id, label);
269 }
270 WalRecord::TxCommit { .. }
271 | WalRecord::TxAbort { .. }
272 | WalRecord::Checkpoint { .. } => {
273 }
276 }
277 }
278 Ok(())
279 }
280
281 #[must_use]
300 pub fn session(&self) -> Session {
301 #[cfg(feature = "rdf")]
302 let mut session = Session::with_rdf_store_and_adaptive(
303 Arc::clone(&self.store),
304 Arc::clone(&self.rdf_store),
305 Arc::clone(&self.tx_manager),
306 Arc::clone(&self.query_cache),
307 self.config.adaptive.clone(),
308 self.config.factorized_execution,
309 self.config.graph_model,
310 self.config.query_timeout,
311 Arc::clone(&self.commit_counter),
312 self.config.gc_interval,
313 );
314 #[cfg(not(feature = "rdf"))]
315 let mut session = Session::with_adaptive(
316 Arc::clone(&self.store),
317 Arc::clone(&self.tx_manager),
318 Arc::clone(&self.query_cache),
319 self.config.adaptive.clone(),
320 self.config.factorized_execution,
321 self.config.graph_model,
322 self.config.query_timeout,
323 Arc::clone(&self.commit_counter),
324 self.config.gc_interval,
325 );
326
327 #[cfg(feature = "cdc")]
328 session.set_cdc_log(Arc::clone(&self.cdc_log));
329
330 let _ = &mut session;
332
333 session
334 }
335
336 #[must_use]
338 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
339 &self.config.adaptive
340 }
341
342 pub fn execute(&self, query: &str) -> Result<QueryResult> {
352 let session = self.session();
353 session.execute(query)
354 }
355
356 pub fn execute_with_params(
362 &self,
363 query: &str,
364 params: std::collections::HashMap<String, grafeo_common::types::Value>,
365 ) -> Result<QueryResult> {
366 let session = self.session();
367 session.execute_with_params(query, params)
368 }
369
370 #[cfg(feature = "cypher")]
376 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
377 let session = self.session();
378 session.execute_cypher(query)
379 }
380
381 #[cfg(feature = "cypher")]
387 pub fn execute_cypher_with_params(
388 &self,
389 query: &str,
390 params: std::collections::HashMap<String, grafeo_common::types::Value>,
391 ) -> Result<QueryResult> {
392 use crate::query::processor::{QueryLanguage, QueryProcessor};
393
394 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
396 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
397 }
398
399 #[cfg(feature = "gremlin")]
405 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
406 let session = self.session();
407 session.execute_gremlin(query)
408 }
409
410 #[cfg(feature = "gremlin")]
416 pub fn execute_gremlin_with_params(
417 &self,
418 query: &str,
419 params: std::collections::HashMap<String, grafeo_common::types::Value>,
420 ) -> Result<QueryResult> {
421 let session = self.session();
422 session.execute_gremlin_with_params(query, params)
423 }
424
425 #[cfg(feature = "graphql")]
431 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
432 let session = self.session();
433 session.execute_graphql(query)
434 }
435
436 #[cfg(feature = "graphql")]
442 pub fn execute_graphql_with_params(
443 &self,
444 query: &str,
445 params: std::collections::HashMap<String, grafeo_common::types::Value>,
446 ) -> Result<QueryResult> {
447 let session = self.session();
448 session.execute_graphql_with_params(query, params)
449 }
450
451 #[cfg(feature = "sql-pgq")]
457 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
458 let session = self.session();
459 session.execute_sql(query)
460 }
461
462 #[cfg(feature = "sql-pgq")]
468 pub fn execute_sql_with_params(
469 &self,
470 query: &str,
471 params: std::collections::HashMap<String, grafeo_common::types::Value>,
472 ) -> Result<QueryResult> {
473 use crate::query::processor::{QueryLanguage, QueryProcessor};
474
475 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
477 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
478 }
479
480 #[cfg(all(feature = "sparql", feature = "rdf"))]
500 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
501 use crate::query::{
502 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
503 };
504
505 let logical_plan = sparql_translator::translate(query)?;
507
508 let optimizer = Optimizer::from_store(&self.store);
510 let optimized_plan = optimizer.optimize(logical_plan)?;
511
512 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
514 let mut physical_plan = planner.plan(&optimized_plan)?;
515
516 let executor = Executor::with_columns(physical_plan.columns.clone());
518 executor.execute(physical_plan.operator.as_mut())
519 }
520
521 #[cfg(feature = "rdf")]
525 #[must_use]
526 pub fn rdf_store(&self) -> &Arc<RdfStore> {
527 &self.rdf_store
528 }
529
530 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
536 let result = self.execute(query)?;
537 result.scalar()
538 }
539
540 #[must_use]
542 pub fn config(&self) -> &Config {
543 &self.config
544 }
545
546 #[must_use]
548 pub fn graph_model(&self) -> crate::config::GraphModel {
549 self.config.graph_model
550 }
551
552 #[must_use]
554 pub fn memory_limit(&self) -> Option<usize> {
555 self.config.memory_limit
556 }
557
558 #[must_use]
562 pub fn store(&self) -> &Arc<LpgStore> {
563 &self.store
564 }
565
566 pub fn gc(&self) {
572 let min_epoch = self.tx_manager.min_active_epoch();
573 self.store.gc_versions(min_epoch);
574 self.tx_manager.gc();
575 }
576
577 #[must_use]
579 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
580 &self.buffer_manager
581 }
582
583 pub fn close(&self) -> Result<()> {
593 let mut is_open = self.is_open.write();
594 if !*is_open {
595 return Ok(());
596 }
597
598 #[cfg(feature = "wal")]
600 if let Some(ref wal) = self.wal {
601 let epoch = self.store.current_epoch();
602
603 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
605 self.tx_manager.begin()
607 });
608
609 wal.log(&WalRecord::TxCommit {
611 tx_id: checkpoint_tx,
612 })?;
613
614 wal.checkpoint(checkpoint_tx, epoch)?;
616 wal.sync()?;
617 }
618
619 *is_open = false;
620 Ok(())
621 }
622
623 #[cfg(feature = "wal")]
625 #[must_use]
626 pub fn wal(&self) -> Option<&Arc<WalManager>> {
627 self.wal.as_ref()
628 }
629
630 #[cfg(feature = "wal")]
632 fn log_wal(&self, record: &WalRecord) -> Result<()> {
633 if let Some(ref wal) = self.wal {
634 wal.log(record)?;
635 }
636 Ok(())
637 }
638
639 #[must_use]
641 pub fn node_count(&self) -> usize {
642 self.store.node_count()
643 }
644
645 #[must_use]
647 pub fn edge_count(&self) -> usize {
648 self.store.edge_count()
649 }
650
651 #[must_use]
653 pub fn label_count(&self) -> usize {
654 self.store.label_count()
655 }
656
657 #[must_use]
659 pub fn property_key_count(&self) -> usize {
660 self.store.property_key_count()
661 }
662
663 #[must_use]
665 pub fn edge_type_count(&self) -> usize {
666 self.store.edge_type_count()
667 }
668
669 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
686 let id = self.store.create_node(labels);
687
688 #[cfg(feature = "wal")]
690 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
691 id,
692 labels: labels.iter().map(|s| (*s).to_string()).collect(),
693 }) {
694 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
695 }
696
697 #[cfg(feature = "cdc")]
698 self.cdc_log
699 .record_create_node(id, self.store.current_epoch(), None);
700
701 id
702 }
703
704 pub fn create_node_with_props(
708 &self,
709 labels: &[&str],
710 properties: impl IntoIterator<
711 Item = (
712 impl Into<grafeo_common::types::PropertyKey>,
713 impl Into<grafeo_common::types::Value>,
714 ),
715 >,
716 ) -> grafeo_common::types::NodeId {
717 let props: Vec<(
719 grafeo_common::types::PropertyKey,
720 grafeo_common::types::Value,
721 )> = properties
722 .into_iter()
723 .map(|(k, v)| (k.into(), v.into()))
724 .collect();
725
726 let id = self
727 .store
728 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
729
730 #[cfg(feature = "cdc")]
732 let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
733 .iter()
734 .map(|(k, v)| (k.to_string(), v.clone()))
735 .collect();
736
737 #[cfg(feature = "wal")]
739 {
740 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
741 id,
742 labels: labels.iter().map(|s| (*s).to_string()).collect(),
743 }) {
744 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
745 }
746
747 for (key, value) in props {
749 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
750 id,
751 key: key.to_string(),
752 value,
753 }) {
754 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
755 }
756 }
757 }
758
759 #[cfg(feature = "cdc")]
760 self.cdc_log.record_create_node(
761 id,
762 self.store.current_epoch(),
763 if cdc_props.is_empty() {
764 None
765 } else {
766 Some(cdc_props)
767 },
768 );
769
770 #[cfg(feature = "text-index")]
772 if let Some(node) = self.store.get_node(id) {
773 for label in &node.labels {
774 for (prop_key, prop_val) in &node.properties {
775 if let grafeo_common::types::Value::String(text) = prop_val
776 && let Some(index) =
777 self.store.get_text_index(label.as_str(), prop_key.as_ref())
778 {
779 index.write().insert(id, text);
780 }
781 }
782 }
783 }
784
785 id
786 }
787
788 #[must_use]
790 pub fn get_node(
791 &self,
792 id: grafeo_common::types::NodeId,
793 ) -> Option<grafeo_core::graph::lpg::Node> {
794 self.store.get_node(id)
795 }
796
797 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
801 #[cfg(feature = "cdc")]
803 let cdc_props = self.store.get_node(id).map(|node| {
804 node.properties
805 .iter()
806 .map(|(k, v)| (k.to_string(), v.clone()))
807 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
808 });
809
810 #[cfg(feature = "vector-index")]
812 let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
813 .store
814 .get_node(id)
815 .map(|node| {
816 let mut indexes = Vec::new();
817 for label in &node.labels {
818 let prefix = format!("{}:", label.as_str());
819 for (key, index) in self.store.vector_index_entries() {
820 if key.starts_with(&prefix) {
821 indexes.push(index);
822 }
823 }
824 }
825 indexes
826 })
827 .unwrap_or_default();
828
829 #[cfg(feature = "text-index")]
831 let text_indexes_to_clean: Vec<
832 std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
833 > = self
834 .store
835 .get_node(id)
836 .map(|node| {
837 let mut indexes = Vec::new();
838 for label in &node.labels {
839 let prefix = format!("{}:", label.as_str());
840 for (key, index) in self.store.text_index_entries() {
841 if key.starts_with(&prefix) {
842 indexes.push(index);
843 }
844 }
845 }
846 indexes
847 })
848 .unwrap_or_default();
849
850 let result = self.store.delete_node(id);
851
852 #[cfg(feature = "vector-index")]
854 if result {
855 for index in indexes_to_clean {
856 index.remove(id);
857 }
858 }
859
860 #[cfg(feature = "text-index")]
862 if result {
863 for index in text_indexes_to_clean {
864 index.write().remove(id);
865 }
866 }
867
868 #[cfg(feature = "wal")]
869 if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
870 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
871 }
872
873 #[cfg(feature = "cdc")]
874 if result {
875 self.cdc_log.record_delete(
876 crate::cdc::EntityId::Node(id),
877 self.store.current_epoch(),
878 cdc_props,
879 );
880 }
881
882 result
883 }
884
885 pub fn set_node_property(
889 &self,
890 id: grafeo_common::types::NodeId,
891 key: &str,
892 value: grafeo_common::types::Value,
893 ) {
894 #[cfg(feature = "vector-index")]
896 let vector_data = match &value {
897 grafeo_common::types::Value::Vector(v) => Some(v.clone()),
898 _ => None,
899 };
900
901 #[cfg(feature = "wal")]
903 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
904 id,
905 key: key.to_string(),
906 value: value.clone(),
907 }) {
908 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
909 }
910
911 #[cfg(feature = "cdc")]
913 let cdc_old_value = self
914 .store
915 .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
916 #[cfg(feature = "cdc")]
917 let cdc_new_value = value.clone();
918
919 self.store.set_node_property(id, key, value);
920
921 #[cfg(feature = "cdc")]
922 self.cdc_log.record_update(
923 crate::cdc::EntityId::Node(id),
924 self.store.current_epoch(),
925 key,
926 cdc_old_value,
927 cdc_new_value,
928 );
929
930 #[cfg(feature = "vector-index")]
932 if let Some(vec) = vector_data
933 && let Some(node) = self.store.get_node(id)
934 {
935 for label in &node.labels {
936 if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
937 let accessor =
938 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, key);
939 index.insert(id, &vec, &accessor);
940 }
941 }
942 }
943
944 #[cfg(feature = "text-index")]
946 if let Some(node) = self.store.get_node(id) {
947 let text_val = node
948 .properties
949 .get(&grafeo_common::types::PropertyKey::new(key))
950 .and_then(|v| match v {
951 grafeo_common::types::Value::String(s) => Some(s.to_string()),
952 _ => None,
953 });
954 for label in &node.labels {
955 if let Some(index) = self.store.get_text_index(label.as_str(), key) {
956 let mut idx = index.write();
957 if let Some(ref text) = text_val {
958 idx.insert(id, text);
959 } else {
960 idx.remove(id);
961 }
962 }
963 }
964 }
965 }
966
967 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
985 let result = self.store.add_label(id, label);
986
987 #[cfg(feature = "wal")]
988 if result {
989 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
991 id,
992 label: label.to_string(),
993 }) {
994 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
995 }
996 }
997
998 #[cfg(feature = "vector-index")]
1000 if result {
1001 let prefix = format!("{label}:");
1002 for (key, index) in self.store.vector_index_entries() {
1003 if let Some(property) = key.strip_prefix(&prefix)
1004 && let Some(node) = self.store.get_node(id)
1005 {
1006 let prop_key = grafeo_common::types::PropertyKey::new(property);
1007 if let Some(grafeo_common::types::Value::Vector(v)) =
1008 node.properties.get(&prop_key)
1009 {
1010 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1011 &self.store,
1012 property,
1013 );
1014 index.insert(id, v, &accessor);
1015 }
1016 }
1017 }
1018 }
1019
1020 #[cfg(feature = "text-index")]
1022 if result && let Some(node) = self.store.get_node(id) {
1023 for (prop_key, prop_val) in &node.properties {
1024 if let grafeo_common::types::Value::String(text) = prop_val
1025 && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
1026 {
1027 index.write().insert(id, text);
1028 }
1029 }
1030 }
1031
1032 result
1033 }
1034
1035 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
1053 let result = self.store.remove_label(id, label);
1054
1055 #[cfg(feature = "wal")]
1056 if result {
1057 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
1059 id,
1060 label: label.to_string(),
1061 }) {
1062 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
1063 }
1064 }
1065
1066 result
1067 }
1068
1069 #[must_use]
1086 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
1087 self.store
1088 .get_node(id)
1089 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
1090 }
1091
1092 pub fn create_edge(
1112 &self,
1113 src: grafeo_common::types::NodeId,
1114 dst: grafeo_common::types::NodeId,
1115 edge_type: &str,
1116 ) -> grafeo_common::types::EdgeId {
1117 let id = self.store.create_edge(src, dst, edge_type);
1118
1119 #[cfg(feature = "wal")]
1121 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
1122 id,
1123 src,
1124 dst,
1125 edge_type: edge_type.to_string(),
1126 }) {
1127 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
1128 }
1129
1130 #[cfg(feature = "cdc")]
1131 self.cdc_log
1132 .record_create_edge(id, self.store.current_epoch(), None);
1133
1134 id
1135 }
1136
1137 pub fn create_edge_with_props(
1141 &self,
1142 src: grafeo_common::types::NodeId,
1143 dst: grafeo_common::types::NodeId,
1144 edge_type: &str,
1145 properties: impl IntoIterator<
1146 Item = (
1147 impl Into<grafeo_common::types::PropertyKey>,
1148 impl Into<grafeo_common::types::Value>,
1149 ),
1150 >,
1151 ) -> grafeo_common::types::EdgeId {
1152 let props: Vec<(
1154 grafeo_common::types::PropertyKey,
1155 grafeo_common::types::Value,
1156 )> = properties
1157 .into_iter()
1158 .map(|(k, v)| (k.into(), v.into()))
1159 .collect();
1160
1161 let id = self.store.create_edge_with_props(
1162 src,
1163 dst,
1164 edge_type,
1165 props.iter().map(|(k, v)| (k.clone(), v.clone())),
1166 );
1167
1168 #[cfg(feature = "cdc")]
1170 let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
1171 .iter()
1172 .map(|(k, v)| (k.to_string(), v.clone()))
1173 .collect();
1174
1175 #[cfg(feature = "wal")]
1177 {
1178 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
1179 id,
1180 src,
1181 dst,
1182 edge_type: edge_type.to_string(),
1183 }) {
1184 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
1185 }
1186
1187 for (key, value) in props {
1189 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1190 id,
1191 key: key.to_string(),
1192 value,
1193 }) {
1194 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1195 }
1196 }
1197 }
1198
1199 #[cfg(feature = "cdc")]
1200 self.cdc_log.record_create_edge(
1201 id,
1202 self.store.current_epoch(),
1203 if cdc_props.is_empty() {
1204 None
1205 } else {
1206 Some(cdc_props)
1207 },
1208 );
1209
1210 id
1211 }
1212
1213 #[must_use]
1215 pub fn get_edge(
1216 &self,
1217 id: grafeo_common::types::EdgeId,
1218 ) -> Option<grafeo_core::graph::lpg::Edge> {
1219 self.store.get_edge(id)
1220 }
1221
1222 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
1226 #[cfg(feature = "cdc")]
1228 let cdc_props = self.store.get_edge(id).map(|edge| {
1229 edge.properties
1230 .iter()
1231 .map(|(k, v)| (k.to_string(), v.clone()))
1232 .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
1233 });
1234
1235 let result = self.store.delete_edge(id);
1236
1237 #[cfg(feature = "wal")]
1238 if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
1239 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
1240 }
1241
1242 #[cfg(feature = "cdc")]
1243 if result {
1244 self.cdc_log.record_delete(
1245 crate::cdc::EntityId::Edge(id),
1246 self.store.current_epoch(),
1247 cdc_props,
1248 );
1249 }
1250
1251 result
1252 }
1253
1254 pub fn set_edge_property(
1258 &self,
1259 id: grafeo_common::types::EdgeId,
1260 key: &str,
1261 value: grafeo_common::types::Value,
1262 ) {
1263 #[cfg(feature = "wal")]
1265 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1266 id,
1267 key: key.to_string(),
1268 value: value.clone(),
1269 }) {
1270 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1271 }
1272
1273 #[cfg(feature = "cdc")]
1275 let cdc_old_value = self
1276 .store
1277 .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
1278 #[cfg(feature = "cdc")]
1279 let cdc_new_value = value.clone();
1280
1281 self.store.set_edge_property(id, key, value);
1282
1283 #[cfg(feature = "cdc")]
1284 self.cdc_log.record_update(
1285 crate::cdc::EntityId::Edge(id),
1286 self.store.current_epoch(),
1287 key,
1288 cdc_old_value,
1289 cdc_new_value,
1290 );
1291 }
1292
1293 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
1297 self.store.remove_node_property(id, key).is_some()
1299 }
1300
1301 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
1305 self.store.remove_edge_property(id, key).is_some()
1307 }
1308
1309 pub fn create_property_index(&self, property: &str) {
1332 self.store.create_property_index(property);
1333 }
1334
1335 pub fn create_vector_index(
1355 &self,
1356 label: &str,
1357 property: &str,
1358 dimensions: Option<usize>,
1359 metric: Option<&str>,
1360 m: Option<usize>,
1361 ef_construction: Option<usize>,
1362 ) -> Result<()> {
1363 use grafeo_common::types::{PropertyKey, Value};
1364 use grafeo_core::index::vector::DistanceMetric;
1365
1366 let metric = match metric {
1367 Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1368 grafeo_common::utils::error::Error::Internal(format!(
1369 "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
1370 m
1371 ))
1372 })?,
1373 None => DistanceMetric::Cosine,
1374 };
1375
1376 let prop_key = PropertyKey::new(property);
1378 let mut found_dims: Option<usize> = dimensions;
1379 let mut vector_count = 0usize;
1380
1381 #[cfg(feature = "vector-index")]
1382 let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1383
1384 for node in self.store.nodes_with_label(label) {
1385 if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1386 if let Some(expected) = found_dims {
1387 if v.len() != expected {
1388 return Err(grafeo_common::utils::error::Error::Internal(format!(
1389 "Vector dimension mismatch: expected {}, found {} on node {}",
1390 expected,
1391 v.len(),
1392 node.id.0
1393 )));
1394 }
1395 } else {
1396 found_dims = Some(v.len());
1397 }
1398 vector_count += 1;
1399 #[cfg(feature = "vector-index")]
1400 vectors.push((node.id, v.to_vec()));
1401 }
1402 }
1403
1404 let Some(dims) = found_dims else {
1405 return if let Some(d) = dimensions {
1408 #[cfg(feature = "vector-index")]
1409 {
1410 use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1411
1412 let mut config = HnswConfig::new(d, metric);
1413 if let Some(m_val) = m {
1414 config = config.with_m(m_val);
1415 }
1416 if let Some(ef_c) = ef_construction {
1417 config = config.with_ef_construction(ef_c);
1418 }
1419
1420 let index = HnswIndex::new(config);
1421 self.store
1422 .add_vector_index(label, property, Arc::new(index));
1423 }
1424
1425 let _ = (m, ef_construction);
1426 tracing::info!(
1427 "Empty vector index created: :{label}({property}) - 0 vectors, {d} dimensions, metric={metric_name}",
1428 metric_name = metric.name()
1429 );
1430 Ok(())
1431 } else {
1432 Err(grafeo_common::utils::error::Error::Internal(format!(
1433 "No vector properties found on :{label}({property}) and no dimensions specified"
1434 )))
1435 };
1436 };
1437
1438 #[cfg(feature = "vector-index")]
1440 {
1441 use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1442
1443 let mut config = HnswConfig::new(dims, metric);
1444 if let Some(m_val) = m {
1445 config = config.with_m(m_val);
1446 }
1447 if let Some(ef_c) = ef_construction {
1448 config = config.with_ef_construction(ef_c);
1449 }
1450
1451 let index = HnswIndex::with_capacity(config, vectors.len());
1452 let accessor =
1453 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1454 for (node_id, vec) in &vectors {
1455 index.insert(*node_id, vec, &accessor);
1456 }
1457
1458 self.store
1459 .add_vector_index(label, property, Arc::new(index));
1460 }
1461
1462 let _ = (m, ef_construction);
1464
1465 tracing::info!(
1466 "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1467 metric_name = metric.name()
1468 );
1469
1470 Ok(())
1471 }
1472
1473 #[cfg(feature = "vector-index")]
1481 pub fn drop_vector_index(&self, label: &str, property: &str) -> bool {
1482 let removed = self.store.remove_vector_index(label, property);
1483 if removed {
1484 tracing::info!("Vector index dropped: :{label}({property})");
1485 }
1486 removed
1487 }
1488
1489 #[cfg(feature = "vector-index")]
1500 pub fn rebuild_vector_index(&self, label: &str, property: &str) -> Result<()> {
1501 let config = self
1502 .store
1503 .get_vector_index(label, property)
1504 .map(|idx| idx.config().clone())
1505 .ok_or_else(|| {
1506 grafeo_common::utils::error::Error::Internal(format!(
1507 "No vector index found for :{label}({property}). Cannot rebuild."
1508 ))
1509 })?;
1510
1511 self.store.remove_vector_index(label, property);
1512
1513 self.create_vector_index(
1514 label,
1515 property,
1516 Some(config.dimensions),
1517 Some(config.metric.name()),
1518 Some(config.m),
1519 Some(config.ef_construction),
1520 )
1521 }
1522
1523 #[cfg(feature = "vector-index")]
1531 fn compute_filter_allowlist(
1532 &self,
1533 label: &str,
1534 filters: Option<&std::collections::HashMap<String, Value>>,
1535 ) -> Option<std::collections::HashSet<NodeId>> {
1536 let filters = filters.filter(|f| !f.is_empty())?;
1537
1538 let label_nodes: std::collections::HashSet<NodeId> =
1540 self.store.nodes_by_label(label).into_iter().collect();
1541
1542 let mut allowlist = label_nodes;
1543
1544 for (key, filter_value) in filters {
1545 let is_operator_filter = matches!(filter_value, Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')));
1547
1548 let matching: std::collections::HashSet<NodeId> = if is_operator_filter {
1549 self.store
1551 .find_nodes_matching_filter(key, filter_value)
1552 .into_iter()
1553 .collect()
1554 } else {
1555 self.store
1557 .find_nodes_by_property(key, filter_value)
1558 .into_iter()
1559 .collect()
1560 };
1561 allowlist = allowlist.intersection(&matching).copied().collect();
1562
1563 if allowlist.is_empty() {
1565 return Some(allowlist);
1566 }
1567 }
1568
1569 Some(allowlist)
1570 }
1571
1572 #[cfg(feature = "vector-index")]
1590 pub fn vector_search(
1591 &self,
1592 label: &str,
1593 property: &str,
1594 query: &[f32],
1595 k: usize,
1596 ef: Option<usize>,
1597 filters: Option<&std::collections::HashMap<String, Value>>,
1598 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1599 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1600 grafeo_common::utils::error::Error::Internal(format!(
1601 "No vector index found for :{label}({property}). Call create_vector_index() first."
1602 ))
1603 })?;
1604
1605 let accessor =
1606 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1607
1608 let results = match self.compute_filter_allowlist(label, filters) {
1609 Some(allowlist) => match ef {
1610 Some(ef_val) => {
1611 index.search_with_ef_and_filter(query, k, ef_val, &allowlist, &accessor)
1612 }
1613 None => index.search_with_filter(query, k, &allowlist, &accessor),
1614 },
1615 None => match ef {
1616 Some(ef_val) => index.search_with_ef(query, k, ef_val, &accessor),
1617 None => index.search(query, k, &accessor),
1618 },
1619 };
1620
1621 Ok(results)
1622 }
1623
1624 pub fn batch_create_nodes(
1640 &self,
1641 label: &str,
1642 property: &str,
1643 vectors: Vec<Vec<f32>>,
1644 ) -> Vec<grafeo_common::types::NodeId> {
1645 use grafeo_common::types::{PropertyKey, Value};
1646
1647 let prop_key = PropertyKey::new(property);
1648 let labels: &[&str] = &[label];
1649
1650 let ids: Vec<grafeo_common::types::NodeId> = vectors
1651 .into_iter()
1652 .map(|vec| {
1653 let value = Value::Vector(vec.into());
1654 let id = self.store.create_node_with_props(
1655 labels,
1656 std::iter::once((prop_key.clone(), value.clone())),
1657 );
1658
1659 #[cfg(feature = "wal")]
1661 {
1662 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1663 id,
1664 labels: labels.iter().map(|s| (*s).to_string()).collect(),
1665 }) {
1666 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1667 }
1668 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1669 id,
1670 key: property.to_string(),
1671 value,
1672 }) {
1673 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1674 }
1675 }
1676
1677 id
1678 })
1679 .collect();
1680
1681 #[cfg(feature = "vector-index")]
1683 if let Some(index) = self.store.get_vector_index(label, property) {
1684 let accessor =
1685 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1686 for &id in &ids {
1687 if let Some(node) = self.store.get_node(id) {
1688 let pk = grafeo_common::types::PropertyKey::new(property);
1689 if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
1690 index.insert(id, v, &accessor);
1691 }
1692 }
1693 }
1694 }
1695
1696 ids
1697 }
1698
1699 #[cfg(feature = "vector-index")]
1712 pub fn batch_vector_search(
1713 &self,
1714 label: &str,
1715 property: &str,
1716 queries: &[Vec<f32>],
1717 k: usize,
1718 ef: Option<usize>,
1719 filters: Option<&std::collections::HashMap<String, Value>>,
1720 ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1721 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1722 grafeo_common::utils::error::Error::Internal(format!(
1723 "No vector index found for :{label}({property}). Call create_vector_index() first."
1724 ))
1725 })?;
1726
1727 let accessor =
1728 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1729
1730 let results = match self.compute_filter_allowlist(label, filters) {
1731 Some(allowlist) => match ef {
1732 Some(ef_val) => {
1733 index.batch_search_with_ef_and_filter(queries, k, ef_val, &allowlist, &accessor)
1734 }
1735 None => index.batch_search_with_filter(queries, k, &allowlist, &accessor),
1736 },
1737 None => match ef {
1738 Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val, &accessor),
1739 None => index.batch_search(queries, k, &accessor),
1740 },
1741 };
1742
1743 Ok(results)
1744 }
1745
1746 #[cfg(feature = "vector-index")]
1769 #[allow(clippy::too_many_arguments)]
1770 pub fn mmr_search(
1771 &self,
1772 label: &str,
1773 property: &str,
1774 query: &[f32],
1775 k: usize,
1776 fetch_k: Option<usize>,
1777 lambda: Option<f32>,
1778 ef: Option<usize>,
1779 filters: Option<&std::collections::HashMap<String, Value>>,
1780 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1781 use grafeo_core::index::vector::mmr_select;
1782
1783 let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1784 grafeo_common::utils::error::Error::Internal(format!(
1785 "No vector index found for :{label}({property}). Call create_vector_index() first."
1786 ))
1787 })?;
1788
1789 let accessor =
1790 grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1791
1792 let fetch_k = fetch_k.unwrap_or(k.saturating_mul(4).max(k));
1793 let lambda = lambda.unwrap_or(0.5);
1794
1795 let initial_results = match self.compute_filter_allowlist(label, filters) {
1797 Some(allowlist) => match ef {
1798 Some(ef_val) => {
1799 index.search_with_ef_and_filter(query, fetch_k, ef_val, &allowlist, &accessor)
1800 }
1801 None => index.search_with_filter(query, fetch_k, &allowlist, &accessor),
1802 },
1803 None => match ef {
1804 Some(ef_val) => index.search_with_ef(query, fetch_k, ef_val, &accessor),
1805 None => index.search(query, fetch_k, &accessor),
1806 },
1807 };
1808
1809 if initial_results.is_empty() {
1810 return Ok(Vec::new());
1811 }
1812
1813 use grafeo_core::index::vector::VectorAccessor;
1815 let candidates: Vec<(grafeo_common::types::NodeId, f32, std::sync::Arc<[f32]>)> =
1816 initial_results
1817 .into_iter()
1818 .filter_map(|(id, dist)| accessor.get_vector(id).map(|vec| (id, dist, vec)))
1819 .collect();
1820
1821 let candidate_refs: Vec<(grafeo_common::types::NodeId, f32, &[f32])> = candidates
1823 .iter()
1824 .map(|(id, dist, vec)| (*id, *dist, vec.as_ref()))
1825 .collect();
1826
1827 let metric = index.config().metric;
1829 Ok(mmr_select(query, &candidate_refs, k, lambda, metric))
1830 }
1831
1832 #[cfg(feature = "text-index")]
1844 pub fn create_text_index(&self, label: &str, property: &str) -> Result<()> {
1845 use grafeo_common::types::PropertyKey;
1846 use grafeo_core::index::text::{BM25Config, InvertedIndex};
1847
1848 let mut index = InvertedIndex::new(BM25Config::default());
1849 let prop_key = PropertyKey::new(property);
1850
1851 let nodes = self.store.nodes_by_label(label);
1853 for node_id in nodes {
1854 if let Some(Value::String(text)) = self.store.get_node_property(node_id, &prop_key) {
1855 index.insert(node_id, text.as_str());
1856 }
1857 }
1858
1859 self.store
1860 .add_text_index(label, property, Arc::new(RwLock::new(index)));
1861 Ok(())
1862 }
1863
1864 #[cfg(feature = "text-index")]
1868 pub fn drop_text_index(&self, label: &str, property: &str) -> bool {
1869 self.store.remove_text_index(label, property)
1870 }
1871
1872 #[cfg(feature = "text-index")]
1880 pub fn rebuild_text_index(&self, label: &str, property: &str) -> Result<()> {
1881 self.store.remove_text_index(label, property);
1882 self.create_text_index(label, property)
1883 }
1884
1885 #[cfg(feature = "text-index")]
1894 pub fn text_search(
1895 &self,
1896 label: &str,
1897 property: &str,
1898 query: &str,
1899 k: usize,
1900 ) -> Result<Vec<(NodeId, f64)>> {
1901 let index = self.store.get_text_index(label, property).ok_or_else(|| {
1902 Error::Internal(format!(
1903 "No text index found for :{label}({property}). Call create_text_index() first."
1904 ))
1905 })?;
1906
1907 Ok(index.read().search(query, k))
1908 }
1909
1910 #[cfg(feature = "hybrid-search")]
1929 #[allow(clippy::too_many_arguments)]
1930 pub fn hybrid_search(
1931 &self,
1932 label: &str,
1933 text_property: &str,
1934 vector_property: &str,
1935 query_text: &str,
1936 query_vector: Option<&[f32]>,
1937 k: usize,
1938 fusion: Option<grafeo_core::index::text::FusionMethod>,
1939 ) -> Result<Vec<(NodeId, f64)>> {
1940 use grafeo_core::index::text::fuse_results;
1941
1942 let fusion_method = fusion.unwrap_or_default();
1943 let mut sources: Vec<Vec<(NodeId, f64)>> = Vec::new();
1944
1945 if let Some(text_index) = self.store.get_text_index(label, text_property) {
1947 let text_results = text_index.read().search(query_text, k * 2);
1948 if !text_results.is_empty() {
1949 sources.push(text_results);
1950 }
1951 }
1952
1953 if let Some(query_vec) = query_vector
1955 && let Some(vector_index) = self.store.get_vector_index(label, vector_property)
1956 {
1957 let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1958 &self.store,
1959 vector_property,
1960 );
1961 let vector_results = vector_index.search(query_vec, k * 2, &accessor);
1962 if !vector_results.is_empty() {
1963 sources.push(
1964 vector_results
1965 .into_iter()
1966 .map(|(id, dist)| (id, f64::from(dist)))
1967 .collect(),
1968 );
1969 }
1970 }
1971
1972 if sources.is_empty() {
1973 return Ok(Vec::new());
1974 }
1975
1976 Ok(fuse_results(&sources, &fusion_method, k))
1977 }
1978
1979 #[cfg(feature = "embed")]
1986 pub fn register_embedding_model(
1987 &self,
1988 name: &str,
1989 model: Arc<dyn crate::embedding::EmbeddingModel>,
1990 ) {
1991 self.embedding_models
1992 .write()
1993 .insert(name.to_string(), model);
1994 }
1995
1996 #[cfg(feature = "embed")]
2002 pub fn embed_text(&self, model_name: &str, texts: &[&str]) -> Result<Vec<Vec<f32>>> {
2003 let models = self.embedding_models.read();
2004 let model = models.get(model_name).ok_or_else(|| {
2005 grafeo_common::utils::error::Error::Internal(format!(
2006 "Embedding model '{}' not registered",
2007 model_name
2008 ))
2009 })?;
2010 model.embed(texts)
2011 }
2012
2013 #[cfg(all(feature = "embed", feature = "vector-index"))]
2023 pub fn vector_search_text(
2024 &self,
2025 label: &str,
2026 property: &str,
2027 model_name: &str,
2028 query_text: &str,
2029 k: usize,
2030 ef: Option<usize>,
2031 ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
2032 let vectors = self.embed_text(model_name, &[query_text])?;
2033 let query_vec = vectors.into_iter().next().ok_or_else(|| {
2034 grafeo_common::utils::error::Error::Internal(
2035 "Embedding model returned no vectors".to_string(),
2036 )
2037 })?;
2038 self.vector_search(label, property, &query_vec, k, ef, None)
2039 }
2040
2041 #[cfg(feature = "cdc")]
2051 pub fn history(
2052 &self,
2053 entity_id: impl Into<crate::cdc::EntityId>,
2054 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2055 Ok(self.cdc_log.history(entity_id.into()))
2056 }
2057
2058 #[cfg(feature = "cdc")]
2060 pub fn history_since(
2061 &self,
2062 entity_id: impl Into<crate::cdc::EntityId>,
2063 since_epoch: grafeo_common::types::EpochId,
2064 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2065 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
2066 }
2067
2068 #[cfg(feature = "cdc")]
2070 pub fn changes_between(
2071 &self,
2072 start_epoch: grafeo_common::types::EpochId,
2073 end_epoch: grafeo_common::types::EpochId,
2074 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2075 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
2076 }
2077
2078 pub fn drop_property_index(&self, property: &str) -> bool {
2084 self.store.drop_property_index(property)
2085 }
2086
2087 #[must_use]
2089 pub fn has_property_index(&self, property: &str) -> bool {
2090 self.store.has_property_index(property)
2091 }
2092
2093 #[must_use]
2111 pub fn find_nodes_by_property(
2112 &self,
2113 property: &str,
2114 value: &grafeo_common::types::Value,
2115 ) -> Vec<grafeo_common::types::NodeId> {
2116 self.store.find_nodes_by_property(property, value)
2117 }
2118
2119 #[must_use]
2127 pub fn is_persistent(&self) -> bool {
2128 self.config.path.is_some()
2129 }
2130
2131 #[must_use]
2135 pub fn path(&self) -> Option<&Path> {
2136 self.config.path.as_deref()
2137 }
2138
2139 #[must_use]
2143 pub fn info(&self) -> crate::admin::DatabaseInfo {
2144 crate::admin::DatabaseInfo {
2145 mode: crate::admin::DatabaseMode::Lpg,
2146 node_count: self.store.node_count(),
2147 edge_count: self.store.edge_count(),
2148 is_persistent: self.is_persistent(),
2149 path: self.config.path.clone(),
2150 wal_enabled: self.config.wal_enabled,
2151 version: env!("CARGO_PKG_VERSION").to_string(),
2152 }
2153 }
2154
2155 #[must_use]
2159 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2160 #[cfg(feature = "wal")]
2161 let disk_bytes = self.config.path.as_ref().and_then(|p| {
2162 if p.exists() {
2163 Self::calculate_disk_usage(p).ok()
2164 } else {
2165 None
2166 }
2167 });
2168 #[cfg(not(feature = "wal"))]
2169 let disk_bytes: Option<usize> = None;
2170
2171 crate::admin::DatabaseStats {
2172 node_count: self.store.node_count(),
2173 edge_count: self.store.edge_count(),
2174 label_count: self.store.label_count(),
2175 edge_type_count: self.store.edge_type_count(),
2176 property_key_count: self.store.property_key_count(),
2177 index_count: 0, memory_bytes: self.buffer_manager.allocated(),
2179 disk_bytes,
2180 }
2181 }
2182
2183 #[cfg(feature = "wal")]
2185 fn calculate_disk_usage(path: &Path) -> Result<usize> {
2186 let mut total = 0usize;
2187 if path.is_dir() {
2188 for entry in std::fs::read_dir(path)? {
2189 let entry = entry?;
2190 let metadata = entry.metadata()?;
2191 if metadata.is_file() {
2192 total += metadata.len() as usize;
2193 } else if metadata.is_dir() {
2194 total += Self::calculate_disk_usage(&entry.path())?;
2195 }
2196 }
2197 }
2198 Ok(total)
2199 }
2200
2201 #[must_use]
2206 pub fn schema(&self) -> crate::admin::SchemaInfo {
2207 let labels = self
2208 .store
2209 .all_labels()
2210 .into_iter()
2211 .map(|name| crate::admin::LabelInfo {
2212 name: name.clone(),
2213 count: self.store.nodes_with_label(&name).count(),
2214 })
2215 .collect();
2216
2217 let edge_types = self
2218 .store
2219 .all_edge_types()
2220 .into_iter()
2221 .map(|name| crate::admin::EdgeTypeInfo {
2222 name: name.clone(),
2223 count: self.store.edges_with_type(&name).count(),
2224 })
2225 .collect();
2226
2227 let property_keys = self.store.all_property_keys();
2228
2229 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
2230 labels,
2231 edge_types,
2232 property_keys,
2233 })
2234 }
2235
2236 #[cfg(feature = "rdf")]
2240 #[must_use]
2241 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
2242 let stats = self.rdf_store.stats();
2243
2244 let predicates = self
2245 .rdf_store
2246 .predicates()
2247 .into_iter()
2248 .map(|predicate| {
2249 let count = self.rdf_store.triples_with_predicate(&predicate).len();
2250 crate::admin::PredicateInfo {
2251 iri: predicate.to_string(),
2252 count,
2253 }
2254 })
2255 .collect();
2256
2257 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
2258 predicates,
2259 named_graphs: Vec::new(), subject_count: stats.subject_count,
2261 object_count: stats.object_count,
2262 })
2263 }
2264
2265 #[must_use]
2273 pub fn validate(&self) -> crate::admin::ValidationResult {
2274 let mut result = crate::admin::ValidationResult::default();
2275
2276 for edge in self.store.all_edges() {
2278 if self.store.get_node(edge.src).is_none() {
2279 result.errors.push(crate::admin::ValidationError {
2280 code: "DANGLING_SRC".to_string(),
2281 message: format!(
2282 "Edge {} references non-existent source node {}",
2283 edge.id.0, edge.src.0
2284 ),
2285 context: Some(format!("edge:{}", edge.id.0)),
2286 });
2287 }
2288 if self.store.get_node(edge.dst).is_none() {
2289 result.errors.push(crate::admin::ValidationError {
2290 code: "DANGLING_DST".to_string(),
2291 message: format!(
2292 "Edge {} references non-existent destination node {}",
2293 edge.id.0, edge.dst.0
2294 ),
2295 context: Some(format!("edge:{}", edge.id.0)),
2296 });
2297 }
2298 }
2299
2300 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
2302 result.warnings.push(crate::admin::ValidationWarning {
2303 code: "NO_EDGES".to_string(),
2304 message: "Database has nodes but no edges".to_string(),
2305 context: None,
2306 });
2307 }
2308
2309 result
2310 }
2311
2312 #[must_use]
2316 pub fn wal_status(&self) -> crate::admin::WalStatus {
2317 #[cfg(feature = "wal")]
2318 if let Some(ref wal) = self.wal {
2319 return crate::admin::WalStatus {
2320 enabled: true,
2321 path: self.config.path.as_ref().map(|p| p.join("wal")),
2322 size_bytes: wal.size_bytes(),
2323 record_count: wal.record_count() as usize,
2324 last_checkpoint: wal.last_checkpoint_timestamp(),
2325 current_epoch: self.store.current_epoch().as_u64(),
2326 };
2327 }
2328
2329 crate::admin::WalStatus {
2330 enabled: false,
2331 path: None,
2332 size_bytes: 0,
2333 record_count: 0,
2334 last_checkpoint: None,
2335 current_epoch: self.store.current_epoch().as_u64(),
2336 }
2337 }
2338
2339 pub fn wal_checkpoint(&self) -> Result<()> {
2347 #[cfg(feature = "wal")]
2348 if let Some(ref wal) = self.wal {
2349 let epoch = self.store.current_epoch();
2350 let tx_id = self
2351 .tx_manager
2352 .last_assigned_tx_id()
2353 .unwrap_or_else(|| self.tx_manager.begin());
2354 wal.checkpoint(tx_id, epoch)?;
2355 wal.sync()?;
2356 }
2357 Ok(())
2358 }
2359
2360 #[cfg(feature = "wal")]
2377 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
2378 let path = path.as_ref();
2379
2380 let target_config = Config::persistent(path);
2382 let target = Self::with_config(target_config)?;
2383
2384 for node in self.store.all_nodes() {
2386 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
2387 target.store.create_node_with_id(node.id, &label_refs);
2388
2389 target.log_wal(&WalRecord::CreateNode {
2391 id: node.id,
2392 labels: node.labels.iter().map(|s| s.to_string()).collect(),
2393 })?;
2394
2395 for (key, value) in node.properties {
2397 target
2398 .store
2399 .set_node_property(node.id, key.as_str(), value.clone());
2400 target.log_wal(&WalRecord::SetNodeProperty {
2401 id: node.id,
2402 key: key.to_string(),
2403 value,
2404 })?;
2405 }
2406 }
2407
2408 for edge in self.store.all_edges() {
2410 target
2411 .store
2412 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2413
2414 target.log_wal(&WalRecord::CreateEdge {
2416 id: edge.id,
2417 src: edge.src,
2418 dst: edge.dst,
2419 edge_type: edge.edge_type.to_string(),
2420 })?;
2421
2422 for (key, value) in edge.properties {
2424 target
2425 .store
2426 .set_edge_property(edge.id, key.as_str(), value.clone());
2427 target.log_wal(&WalRecord::SetEdgeProperty {
2428 id: edge.id,
2429 key: key.to_string(),
2430 value,
2431 })?;
2432 }
2433 }
2434
2435 target.close()?;
2437
2438 Ok(())
2439 }
2440
2441 pub fn to_memory(&self) -> Result<Self> {
2452 let config = Config::in_memory();
2453 let target = Self::with_config(config)?;
2454
2455 for node in self.store.all_nodes() {
2457 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
2458 target.store.create_node_with_id(node.id, &label_refs);
2459
2460 for (key, value) in node.properties {
2462 target.store.set_node_property(node.id, key.as_str(), value);
2463 }
2464 }
2465
2466 for edge in self.store.all_edges() {
2468 target
2469 .store
2470 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2471
2472 for (key, value) in edge.properties {
2474 target.store.set_edge_property(edge.id, key.as_str(), value);
2475 }
2476 }
2477
2478 Ok(target)
2479 }
2480
2481 #[cfg(feature = "wal")]
2490 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
2491 let source = Self::open(path)?;
2493
2494 let target = source.to_memory()?;
2496
2497 source.close()?;
2499
2500 Ok(target)
2501 }
2502
2503 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
2516 let nodes: Vec<SnapshotNode> = self
2517 .store
2518 .all_nodes()
2519 .map(|n| SnapshotNode {
2520 id: n.id,
2521 labels: n.labels.iter().map(|l| l.to_string()).collect(),
2522 properties: n
2523 .properties
2524 .into_iter()
2525 .map(|(k, v)| (k.to_string(), v))
2526 .collect(),
2527 })
2528 .collect();
2529
2530 let edges: Vec<SnapshotEdge> = self
2531 .store
2532 .all_edges()
2533 .map(|e| SnapshotEdge {
2534 id: e.id,
2535 src: e.src,
2536 dst: e.dst,
2537 edge_type: e.edge_type.to_string(),
2538 properties: e
2539 .properties
2540 .into_iter()
2541 .map(|(k, v)| (k.to_string(), v))
2542 .collect(),
2543 })
2544 .collect();
2545
2546 let snapshot = Snapshot {
2547 version: 1,
2548 nodes,
2549 edges,
2550 };
2551
2552 let config = bincode::config::standard();
2553 bincode::serde::encode_to_vec(&snapshot, config)
2554 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
2555 }
2556
2557 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
2565 let config = bincode::config::standard();
2566 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
2567 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
2568
2569 if snapshot.version != 1 {
2570 return Err(Error::Internal(format!(
2571 "unsupported snapshot version: {}",
2572 snapshot.version
2573 )));
2574 }
2575
2576 let db = Self::new_in_memory();
2577
2578 for node in snapshot.nodes {
2579 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
2580 db.store.create_node_with_id(node.id, &label_refs);
2581 for (key, value) in node.properties {
2582 db.store.set_node_property(node.id, &key, value);
2583 }
2584 }
2585
2586 for edge in snapshot.edges {
2587 db.store
2588 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2589 for (key, value) in edge.properties {
2590 db.store.set_edge_property(edge.id, &key, value);
2591 }
2592 }
2593
2594 Ok(db)
2595 }
2596
2597 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
2605 self.store.all_nodes()
2606 }
2607
2608 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
2612 self.store.all_edges()
2613 }
2614}
2615
2616#[derive(serde::Serialize, serde::Deserialize)]
2618struct Snapshot {
2619 version: u8,
2620 nodes: Vec<SnapshotNode>,
2621 edges: Vec<SnapshotEdge>,
2622}
2623
2624#[derive(serde::Serialize, serde::Deserialize)]
2625struct SnapshotNode {
2626 id: NodeId,
2627 labels: Vec<String>,
2628 properties: Vec<(String, Value)>,
2629}
2630
2631#[derive(serde::Serialize, serde::Deserialize)]
2632struct SnapshotEdge {
2633 id: EdgeId,
2634 src: NodeId,
2635 dst: NodeId,
2636 edge_type: String,
2637 properties: Vec<(String, Value)>,
2638}
2639
2640impl Drop for GrafeoDB {
2641 fn drop(&mut self) {
2642 if let Err(e) = self.close() {
2643 tracing::error!("Error closing database: {}", e);
2644 }
2645 }
2646}
2647
2648impl crate::admin::AdminService for GrafeoDB {
2649 fn info(&self) -> crate::admin::DatabaseInfo {
2650 self.info()
2651 }
2652
2653 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2654 self.detailed_stats()
2655 }
2656
2657 fn schema(&self) -> crate::admin::SchemaInfo {
2658 self.schema()
2659 }
2660
2661 fn validate(&self) -> crate::admin::ValidationResult {
2662 self.validate()
2663 }
2664
2665 fn wal_status(&self) -> crate::admin::WalStatus {
2666 self.wal_status()
2667 }
2668
2669 fn wal_checkpoint(&self) -> Result<()> {
2670 self.wal_checkpoint()
2671 }
2672}
2673
2674#[derive(Debug)]
2700pub struct QueryResult {
2701 pub columns: Vec<String>,
2703 pub column_types: Vec<grafeo_common::types::LogicalType>,
2705 pub rows: Vec<Vec<grafeo_common::types::Value>>,
2707 pub execution_time_ms: Option<f64>,
2709 pub rows_scanned: Option<u64>,
2711}
2712
2713impl QueryResult {
2714 #[must_use]
2716 pub fn new(columns: Vec<String>) -> Self {
2717 let len = columns.len();
2718 Self {
2719 columns,
2720 column_types: vec![grafeo_common::types::LogicalType::Any; len],
2721 rows: Vec::new(),
2722 execution_time_ms: None,
2723 rows_scanned: None,
2724 }
2725 }
2726
2727 #[must_use]
2729 pub fn with_types(
2730 columns: Vec<String>,
2731 column_types: Vec<grafeo_common::types::LogicalType>,
2732 ) -> Self {
2733 Self {
2734 columns,
2735 column_types,
2736 rows: Vec::new(),
2737 execution_time_ms: None,
2738 rows_scanned: None,
2739 }
2740 }
2741
2742 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2744 self.execution_time_ms = Some(execution_time_ms);
2745 self.rows_scanned = Some(rows_scanned);
2746 self
2747 }
2748
2749 #[must_use]
2751 pub fn execution_time_ms(&self) -> Option<f64> {
2752 self.execution_time_ms
2753 }
2754
2755 #[must_use]
2757 pub fn rows_scanned(&self) -> Option<u64> {
2758 self.rows_scanned
2759 }
2760
2761 #[must_use]
2763 pub fn row_count(&self) -> usize {
2764 self.rows.len()
2765 }
2766
2767 #[must_use]
2769 pub fn column_count(&self) -> usize {
2770 self.columns.len()
2771 }
2772
2773 #[must_use]
2775 pub fn is_empty(&self) -> bool {
2776 self.rows.is_empty()
2777 }
2778
2779 pub fn scalar<T: FromValue>(&self) -> Result<T> {
2788 if self.rows.len() != 1 || self.columns.len() != 1 {
2789 return Err(grafeo_common::utils::error::Error::InvalidValue(
2790 "Expected single value".to_string(),
2791 ));
2792 }
2793 T::from_value(&self.rows[0][0])
2794 }
2795
2796 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2798 self.rows.iter()
2799 }
2800}
2801
2802pub trait FromValue: Sized {
2807 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2809}
2810
2811impl FromValue for i64 {
2812 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2813 value
2814 .as_int64()
2815 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2816 expected: "INT64".to_string(),
2817 found: value.type_name().to_string(),
2818 })
2819 }
2820}
2821
2822impl FromValue for f64 {
2823 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2824 value
2825 .as_float64()
2826 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2827 expected: "FLOAT64".to_string(),
2828 found: value.type_name().to_string(),
2829 })
2830 }
2831}
2832
2833impl FromValue for String {
2834 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2835 value.as_str().map(String::from).ok_or_else(|| {
2836 grafeo_common::utils::error::Error::TypeMismatch {
2837 expected: "STRING".to_string(),
2838 found: value.type_name().to_string(),
2839 }
2840 })
2841 }
2842}
2843
2844impl FromValue for bool {
2845 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2846 value
2847 .as_bool()
2848 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2849 expected: "BOOL".to_string(),
2850 found: value.type_name().to_string(),
2851 })
2852 }
2853}
2854
2855#[cfg(test)]
2856mod tests {
2857 use super::*;
2858
2859 #[test]
2860 fn test_create_in_memory_database() {
2861 let db = GrafeoDB::new_in_memory();
2862 assert_eq!(db.node_count(), 0);
2863 assert_eq!(db.edge_count(), 0);
2864 }
2865
2866 #[test]
2867 fn test_database_config() {
2868 let config = Config::in_memory().with_threads(4).with_query_logging();
2869
2870 let db = GrafeoDB::with_config(config).unwrap();
2871 assert_eq!(db.config().threads, 4);
2872 assert!(db.config().query_logging);
2873 }
2874
2875 #[test]
2876 fn test_database_session() {
2877 let db = GrafeoDB::new_in_memory();
2878 let _session = db.session();
2879 }
2881
2882 #[cfg(feature = "wal")]
2883 #[test]
2884 fn test_persistent_database_recovery() {
2885 use grafeo_common::types::Value;
2886 use tempfile::tempdir;
2887
2888 let dir = tempdir().unwrap();
2889 let db_path = dir.path().join("test_db");
2890
2891 {
2893 let db = GrafeoDB::open(&db_path).unwrap();
2894
2895 let alice = db.create_node(&["Person"]);
2896 db.set_node_property(alice, "name", Value::from("Alice"));
2897
2898 let bob = db.create_node(&["Person"]);
2899 db.set_node_property(bob, "name", Value::from("Bob"));
2900
2901 let _edge = db.create_edge(alice, bob, "KNOWS");
2902
2903 db.close().unwrap();
2905 }
2906
2907 {
2909 let db = GrafeoDB::open(&db_path).unwrap();
2910
2911 assert_eq!(db.node_count(), 2);
2912 assert_eq!(db.edge_count(), 1);
2913
2914 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2916 assert!(node0.is_some());
2917
2918 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2919 assert!(node1.is_some());
2920 }
2921 }
2922
2923 #[cfg(feature = "wal")]
2924 #[test]
2925 fn test_wal_logging() {
2926 use tempfile::tempdir;
2927
2928 let dir = tempdir().unwrap();
2929 let db_path = dir.path().join("wal_test_db");
2930
2931 let db = GrafeoDB::open(&db_path).unwrap();
2932
2933 let node = db.create_node(&["Test"]);
2935 db.delete_node(node);
2936
2937 if let Some(wal) = db.wal() {
2939 assert!(wal.record_count() > 0);
2940 }
2941
2942 db.close().unwrap();
2943 }
2944
2945 #[cfg(feature = "wal")]
2946 #[test]
2947 fn test_wal_recovery_multiple_sessions() {
2948 use grafeo_common::types::Value;
2950 use tempfile::tempdir;
2951
2952 let dir = tempdir().unwrap();
2953 let db_path = dir.path().join("multi_session_db");
2954
2955 {
2957 let db = GrafeoDB::open(&db_path).unwrap();
2958 let alice = db.create_node(&["Person"]);
2959 db.set_node_property(alice, "name", Value::from("Alice"));
2960 db.close().unwrap();
2961 }
2962
2963 {
2965 let db = GrafeoDB::open(&db_path).unwrap();
2966 assert_eq!(db.node_count(), 1); let bob = db.create_node(&["Person"]);
2968 db.set_node_property(bob, "name", Value::from("Bob"));
2969 db.close().unwrap();
2970 }
2971
2972 {
2974 let db = GrafeoDB::open(&db_path).unwrap();
2975 assert_eq!(db.node_count(), 2);
2976
2977 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2979 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2980
2981 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2982 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2983 }
2984 }
2985
2986 #[cfg(feature = "wal")]
2987 #[test]
2988 fn test_database_consistency_after_mutations() {
2989 use grafeo_common::types::Value;
2991 use tempfile::tempdir;
2992
2993 let dir = tempdir().unwrap();
2994 let db_path = dir.path().join("consistency_db");
2995
2996 {
2997 let db = GrafeoDB::open(&db_path).unwrap();
2998
2999 let a = db.create_node(&["Node"]);
3001 let b = db.create_node(&["Node"]);
3002 let c = db.create_node(&["Node"]);
3003
3004 let e1 = db.create_edge(a, b, "LINKS");
3006 let _e2 = db.create_edge(b, c, "LINKS");
3007
3008 db.delete_edge(e1);
3010 db.delete_node(b);
3011
3012 db.set_node_property(a, "value", Value::Int64(1));
3014 db.set_node_property(c, "value", Value::Int64(3));
3015
3016 db.close().unwrap();
3017 }
3018
3019 {
3021 let db = GrafeoDB::open(&db_path).unwrap();
3022
3023 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
3027 assert!(node_a.is_some());
3028
3029 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
3030 assert!(node_c.is_some());
3031
3032 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
3034 assert!(node_b.is_none());
3035 }
3036 }
3037
3038 #[cfg(feature = "wal")]
3039 #[test]
3040 fn test_close_is_idempotent() {
3041 use tempfile::tempdir;
3043
3044 let dir = tempdir().unwrap();
3045 let db_path = dir.path().join("close_test_db");
3046
3047 let db = GrafeoDB::open(&db_path).unwrap();
3048 db.create_node(&["Test"]);
3049
3050 assert!(db.close().is_ok());
3052
3053 assert!(db.close().is_ok());
3055 }
3056
3057 #[test]
3058 fn test_query_result_has_metrics() {
3059 let db = GrafeoDB::new_in_memory();
3061 db.create_node(&["Person"]);
3062 db.create_node(&["Person"]);
3063
3064 #[cfg(feature = "gql")]
3065 {
3066 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3067
3068 assert!(result.execution_time_ms.is_some());
3070 assert!(result.rows_scanned.is_some());
3071 assert!(result.execution_time_ms.unwrap() >= 0.0);
3072 assert_eq!(result.rows_scanned.unwrap(), 2);
3073 }
3074 }
3075
3076 #[test]
3077 fn test_empty_query_result_metrics() {
3078 let db = GrafeoDB::new_in_memory();
3080 db.create_node(&["Person"]);
3081
3082 #[cfg(feature = "gql")]
3083 {
3084 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3086
3087 assert!(result.execution_time_ms.is_some());
3088 assert!(result.rows_scanned.is_some());
3089 assert_eq!(result.rows_scanned.unwrap(), 0);
3090 }
3091 }
3092
3093 #[cfg(feature = "cdc")]
3094 mod cdc_integration {
3095 use super::*;
3096
3097 #[test]
3098 fn test_node_lifecycle_history() {
3099 let db = GrafeoDB::new_in_memory();
3100
3101 let id = db.create_node(&["Person"]);
3103 db.set_node_property(id, "name", "Alice".into());
3105 db.set_node_property(id, "name", "Bob".into());
3106 db.delete_node(id);
3108
3109 let history = db.history(id).unwrap();
3110 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3112 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3113 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3115 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3117 }
3118
3119 #[test]
3120 fn test_edge_lifecycle_history() {
3121 let db = GrafeoDB::new_in_memory();
3122
3123 let alice = db.create_node(&["Person"]);
3124 let bob = db.create_node(&["Person"]);
3125 let edge = db.create_edge(alice, bob, "KNOWS");
3126 db.set_edge_property(edge, "since", 2024i64.into());
3127 db.delete_edge(edge);
3128
3129 let history = db.history(edge).unwrap();
3130 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3132 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3133 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3134 }
3135
3136 #[test]
3137 fn test_create_node_with_props_cdc() {
3138 let db = GrafeoDB::new_in_memory();
3139
3140 let id = db.create_node_with_props(
3141 &["Person"],
3142 vec![
3143 ("name", grafeo_common::types::Value::from("Alice")),
3144 ("age", grafeo_common::types::Value::from(30i64)),
3145 ],
3146 );
3147
3148 let history = db.history(id).unwrap();
3149 assert_eq!(history.len(), 1);
3150 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3151 let after = history[0].after.as_ref().unwrap();
3153 assert_eq!(after.len(), 2);
3154 }
3155
3156 #[test]
3157 fn test_changes_between() {
3158 let db = GrafeoDB::new_in_memory();
3159
3160 let id1 = db.create_node(&["A"]);
3161 let _id2 = db.create_node(&["B"]);
3162 db.set_node_property(id1, "x", 1i64.into());
3163
3164 let changes = db
3166 .changes_between(
3167 grafeo_common::types::EpochId(0),
3168 grafeo_common::types::EpochId(u64::MAX),
3169 )
3170 .unwrap();
3171 assert_eq!(changes.len(), 3); }
3173 }
3174}