1use std::sync::Arc;
8
9use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
10use grafeo_common::utils::error::Result;
11use grafeo_core::graph::Direction;
12use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
13#[cfg(feature = "rdf")]
14use grafeo_core::graph::rdf::RdfStore;
15
16use crate::config::{AdaptiveConfig, GraphModel};
17use crate::database::QueryResult;
18use crate::query::cache::QueryCache;
19use crate::transaction::TransactionManager;
20
21pub struct Session {
27 store: Arc<LpgStore>,
29 #[cfg(feature = "rdf")]
31 #[allow(dead_code)]
32 rdf_store: Arc<RdfStore>,
33 tx_manager: Arc<TransactionManager>,
35 query_cache: Arc<QueryCache>,
37 current_tx: Option<TxId>,
39 auto_commit: bool,
41 #[allow(dead_code)]
43 adaptive_config: AdaptiveConfig,
44 factorized_execution: bool,
46 graph_model: GraphModel,
48}
49
50impl Session {
51 #[allow(dead_code)]
53 pub(crate) fn new(
54 store: Arc<LpgStore>,
55 tx_manager: Arc<TransactionManager>,
56 query_cache: Arc<QueryCache>,
57 ) -> Self {
58 Self {
59 store,
60 #[cfg(feature = "rdf")]
61 rdf_store: Arc::new(RdfStore::new()),
62 tx_manager,
63 query_cache,
64 current_tx: None,
65 auto_commit: true,
66 adaptive_config: AdaptiveConfig::default(),
67 factorized_execution: true,
68 graph_model: GraphModel::Lpg,
69 }
70 }
71
72 #[allow(dead_code)]
74 pub(crate) fn with_adaptive(
75 store: Arc<LpgStore>,
76 tx_manager: Arc<TransactionManager>,
77 query_cache: Arc<QueryCache>,
78 adaptive_config: AdaptiveConfig,
79 factorized_execution: bool,
80 graph_model: GraphModel,
81 ) -> Self {
82 Self {
83 store,
84 #[cfg(feature = "rdf")]
85 rdf_store: Arc::new(RdfStore::new()),
86 tx_manager,
87 query_cache,
88 current_tx: None,
89 auto_commit: true,
90 adaptive_config,
91 factorized_execution,
92 graph_model,
93 }
94 }
95
96 #[cfg(feature = "rdf")]
98 pub(crate) fn with_rdf_store_and_adaptive(
99 store: Arc<LpgStore>,
100 rdf_store: Arc<RdfStore>,
101 tx_manager: Arc<TransactionManager>,
102 query_cache: Arc<QueryCache>,
103 adaptive_config: AdaptiveConfig,
104 factorized_execution: bool,
105 graph_model: GraphModel,
106 ) -> Self {
107 Self {
108 store,
109 rdf_store,
110 tx_manager,
111 query_cache,
112 current_tx: None,
113 auto_commit: true,
114 adaptive_config,
115 factorized_execution,
116 graph_model,
117 }
118 }
119
120 #[must_use]
122 pub fn graph_model(&self) -> GraphModel {
123 self.graph_model
124 }
125
126 fn require_lpg(&self, language: &str) -> Result<()> {
128 if self.graph_model == GraphModel::Rdf {
129 return Err(grafeo_common::utils::error::Error::Internal(format!(
130 "This is an RDF database. {language} queries require an LPG database."
131 )));
132 }
133 Ok(())
134 }
135
136 #[cfg(feature = "gql")]
160 pub fn execute(&self, query: &str) -> Result<QueryResult> {
161 self.require_lpg("GQL")?;
162
163 use crate::query::{
164 Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
165 optimizer::Optimizer, processor::QueryLanguage,
166 };
167
168 let start_time = std::time::Instant::now();
169
170 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
172
173 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
175 cached_plan
177 } else {
178 let logical_plan = gql_translator::translate(query)?;
182
183 let mut binder = Binder::new();
185 let _binding_context = binder.bind(&logical_plan)?;
186
187 let optimizer = Optimizer::from_store(&self.store);
189 let plan = optimizer.optimize(logical_plan)?;
190
191 self.query_cache.put_optimized(cache_key, plan.clone());
193
194 plan
195 };
196
197 let (viewing_epoch, tx_id) = self.get_transaction_context();
199
200 let planner = Planner::with_context(
203 Arc::clone(&self.store),
204 Arc::clone(&self.tx_manager),
205 tx_id,
206 viewing_epoch,
207 )
208 .with_factorized_execution(self.factorized_execution);
209 let mut physical_plan = planner.plan(&optimized_plan)?;
210
211 let executor = Executor::with_columns(physical_plan.columns.clone());
213 let mut result = executor.execute(physical_plan.operator.as_mut())?;
214
215 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
217 let rows_scanned = result.rows.len() as u64;
218 result.execution_time_ms = Some(elapsed_ms);
219 result.rows_scanned = Some(rows_scanned);
220
221 Ok(result)
222 }
223
224 #[cfg(feature = "gql")]
230 pub fn execute_with_params(
231 &self,
232 query: &str,
233 params: std::collections::HashMap<String, Value>,
234 ) -> Result<QueryResult> {
235 self.require_lpg("GQL")?;
236
237 use crate::query::processor::{QueryLanguage, QueryProcessor};
238
239 let (viewing_epoch, tx_id) = self.get_transaction_context();
241
242 let processor =
244 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
245
246 let processor = if let Some(tx_id) = tx_id {
248 processor.with_tx_context(viewing_epoch, tx_id)
249 } else {
250 processor
251 };
252
253 processor.process(query, QueryLanguage::Gql, Some(¶ms))
254 }
255
256 #[cfg(not(any(feature = "gql", feature = "cypher")))]
262 pub fn execute_with_params(
263 &self,
264 _query: &str,
265 _params: std::collections::HashMap<String, Value>,
266 ) -> Result<QueryResult> {
267 Err(grafeo_common::utils::error::Error::Internal(
268 "No query language enabled".to_string(),
269 ))
270 }
271
272 #[cfg(not(any(feature = "gql", feature = "cypher")))]
278 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
279 Err(grafeo_common::utils::error::Error::Internal(
280 "No query language enabled".to_string(),
281 ))
282 }
283
284 #[cfg(feature = "cypher")]
290 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
291 use crate::query::{
292 Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
293 optimizer::Optimizer, processor::QueryLanguage,
294 };
295
296 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
298
299 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
301 cached_plan
302 } else {
303 let logical_plan = cypher_translator::translate(query)?;
305
306 let mut binder = Binder::new();
308 let _binding_context = binder.bind(&logical_plan)?;
309
310 let optimizer = Optimizer::from_store(&self.store);
312 let plan = optimizer.optimize(logical_plan)?;
313
314 self.query_cache.put_optimized(cache_key, plan.clone());
316
317 plan
318 };
319
320 let (viewing_epoch, tx_id) = self.get_transaction_context();
322
323 let planner = Planner::with_context(
325 Arc::clone(&self.store),
326 Arc::clone(&self.tx_manager),
327 tx_id,
328 viewing_epoch,
329 )
330 .with_factorized_execution(self.factorized_execution);
331 let mut physical_plan = planner.plan(&optimized_plan)?;
332
333 let executor = Executor::with_columns(physical_plan.columns.clone());
335 executor.execute(physical_plan.operator.as_mut())
336 }
337
338 #[cfg(feature = "gremlin")]
359 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
360 use crate::query::{
361 Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
362 };
363
364 let logical_plan = gremlin_translator::translate(query)?;
366
367 let mut binder = Binder::new();
369 let _binding_context = binder.bind(&logical_plan)?;
370
371 let optimizer = Optimizer::from_store(&self.store);
373 let optimized_plan = optimizer.optimize(logical_plan)?;
374
375 let (viewing_epoch, tx_id) = self.get_transaction_context();
377
378 let planner = Planner::with_context(
380 Arc::clone(&self.store),
381 Arc::clone(&self.tx_manager),
382 tx_id,
383 viewing_epoch,
384 )
385 .with_factorized_execution(self.factorized_execution);
386 let mut physical_plan = planner.plan(&optimized_plan)?;
387
388 let executor = Executor::with_columns(physical_plan.columns.clone());
390 executor.execute(physical_plan.operator.as_mut())
391 }
392
393 #[cfg(feature = "gremlin")]
399 pub fn execute_gremlin_with_params(
400 &self,
401 query: &str,
402 params: std::collections::HashMap<String, Value>,
403 ) -> Result<QueryResult> {
404 use crate::query::processor::{QueryLanguage, QueryProcessor};
405
406 let (viewing_epoch, tx_id) = self.get_transaction_context();
408
409 let processor =
411 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
412
413 let processor = if let Some(tx_id) = tx_id {
415 processor.with_tx_context(viewing_epoch, tx_id)
416 } else {
417 processor
418 };
419
420 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
421 }
422
423 #[cfg(feature = "graphql")]
444 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
445 use crate::query::{
446 Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
447 };
448
449 let logical_plan = graphql_translator::translate(query)?;
451
452 let mut binder = Binder::new();
454 let _binding_context = binder.bind(&logical_plan)?;
455
456 let optimizer = Optimizer::from_store(&self.store);
458 let optimized_plan = optimizer.optimize(logical_plan)?;
459
460 let (viewing_epoch, tx_id) = self.get_transaction_context();
462
463 let planner = Planner::with_context(
465 Arc::clone(&self.store),
466 Arc::clone(&self.tx_manager),
467 tx_id,
468 viewing_epoch,
469 )
470 .with_factorized_execution(self.factorized_execution);
471 let mut physical_plan = planner.plan(&optimized_plan)?;
472
473 let executor = Executor::with_columns(physical_plan.columns.clone());
475 executor.execute(physical_plan.operator.as_mut())
476 }
477
478 #[cfg(feature = "graphql")]
484 pub fn execute_graphql_with_params(
485 &self,
486 query: &str,
487 params: std::collections::HashMap<String, Value>,
488 ) -> Result<QueryResult> {
489 use crate::query::processor::{QueryLanguage, QueryProcessor};
490
491 let (viewing_epoch, tx_id) = self.get_transaction_context();
493
494 let processor =
496 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
497
498 let processor = if let Some(tx_id) = tx_id {
500 processor.with_tx_context(viewing_epoch, tx_id)
501 } else {
502 processor
503 };
504
505 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
506 }
507
508 #[cfg(all(feature = "sparql", feature = "rdf"))]
514 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
515 use crate::query::{
516 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
517 };
518
519 let logical_plan = sparql_translator::translate(query)?;
521
522 let optimizer = Optimizer::from_store(&self.store);
524 let optimized_plan = optimizer.optimize(logical_plan)?;
525
526 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
528 let mut physical_plan = planner.plan(&optimized_plan)?;
529
530 let executor = Executor::with_columns(physical_plan.columns.clone());
532 executor.execute(physical_plan.operator.as_mut())
533 }
534
535 #[cfg(all(feature = "sparql", feature = "rdf"))]
541 pub fn execute_sparql_with_params(
542 &self,
543 query: &str,
544 _params: std::collections::HashMap<String, Value>,
545 ) -> Result<QueryResult> {
546 self.execute_sparql(query)
549 }
550
551 pub fn begin_tx(&mut self) -> Result<()> {
571 if self.current_tx.is_some() {
572 return Err(grafeo_common::utils::error::Error::Transaction(
573 grafeo_common::utils::error::TransactionError::InvalidState(
574 "Transaction already active".to_string(),
575 ),
576 ));
577 }
578
579 let tx_id = self.tx_manager.begin();
580 self.current_tx = Some(tx_id);
581 Ok(())
582 }
583
584 pub fn begin_tx_with_isolation(
592 &mut self,
593 isolation_level: crate::transaction::IsolationLevel,
594 ) -> Result<()> {
595 if self.current_tx.is_some() {
596 return Err(grafeo_common::utils::error::Error::Transaction(
597 grafeo_common::utils::error::TransactionError::InvalidState(
598 "Transaction already active".to_string(),
599 ),
600 ));
601 }
602
603 let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
604 self.current_tx = Some(tx_id);
605 Ok(())
606 }
607
608 pub fn commit(&mut self) -> Result<()> {
616 let tx_id = self.current_tx.take().ok_or_else(|| {
617 grafeo_common::utils::error::Error::Transaction(
618 grafeo_common::utils::error::TransactionError::InvalidState(
619 "No active transaction".to_string(),
620 ),
621 )
622 })?;
623
624 #[cfg(feature = "rdf")]
626 self.rdf_store.commit_tx(tx_id);
627
628 self.tx_manager.commit(tx_id).map(|_| ())
629 }
630
631 pub fn rollback(&mut self) -> Result<()> {
652 let tx_id = self.current_tx.take().ok_or_else(|| {
653 grafeo_common::utils::error::Error::Transaction(
654 grafeo_common::utils::error::TransactionError::InvalidState(
655 "No active transaction".to_string(),
656 ),
657 )
658 })?;
659
660 self.store.discard_uncommitted_versions(tx_id);
662
663 #[cfg(feature = "rdf")]
665 self.rdf_store.rollback_tx(tx_id);
666
667 self.tx_manager.abort(tx_id)
669 }
670
671 #[must_use]
673 pub fn in_transaction(&self) -> bool {
674 self.current_tx.is_some()
675 }
676
677 pub fn set_auto_commit(&mut self, auto_commit: bool) {
679 self.auto_commit = auto_commit;
680 }
681
682 #[must_use]
684 pub fn auto_commit(&self) -> bool {
685 self.auto_commit
686 }
687
688 #[must_use]
694 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
695 if let Some(tx_id) = self.current_tx {
696 let epoch = self
698 .tx_manager
699 .start_epoch(tx_id)
700 .unwrap_or_else(|| self.tx_manager.current_epoch());
701 (epoch, Some(tx_id))
702 } else {
703 (self.tx_manager.current_epoch(), None)
705 }
706 }
707
708 pub fn create_node(&self, labels: &[&str]) -> NodeId {
713 let (epoch, tx_id) = self.get_transaction_context();
714 self.store
715 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
716 }
717
718 pub fn create_node_with_props<'a>(
722 &self,
723 labels: &[&str],
724 properties: impl IntoIterator<Item = (&'a str, Value)>,
725 ) -> NodeId {
726 let (epoch, tx_id) = self.get_transaction_context();
727 self.store.create_node_with_props_versioned(
728 labels,
729 properties.into_iter().map(|(k, v)| (k, v)),
730 epoch,
731 tx_id.unwrap_or(TxId::SYSTEM),
732 )
733 }
734
735 pub fn create_edge(
740 &self,
741 src: NodeId,
742 dst: NodeId,
743 edge_type: &str,
744 ) -> grafeo_common::types::EdgeId {
745 let (epoch, tx_id) = self.get_transaction_context();
746 self.store
747 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
748 }
749
750 #[must_use]
776 pub fn get_node(&self, id: NodeId) -> Option<Node> {
777 let (epoch, tx_id) = self.get_transaction_context();
778 self.store
779 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
780 }
781
782 #[must_use]
803 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
804 self.get_node(id)
805 .and_then(|node| node.get_property(key).cloned())
806 }
807
808 #[must_use]
815 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
816 let (epoch, tx_id) = self.get_transaction_context();
817 self.store
818 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
819 }
820
821 #[must_use]
845 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
846 self.store.edges_from(node, Direction::Outgoing).collect()
847 }
848
849 #[must_use]
858 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
859 self.store.edges_from(node, Direction::Incoming).collect()
860 }
861
862 #[must_use]
870 pub fn get_neighbors_outgoing_by_type(
871 &self,
872 node: NodeId,
873 edge_type: &str,
874 ) -> Vec<(NodeId, EdgeId)> {
875 self.store
876 .edges_from(node, Direction::Outgoing)
877 .filter(|(_, edge_id)| {
878 self.get_edge(*edge_id)
879 .is_some_and(|e| e.edge_type.as_str() == edge_type)
880 })
881 .collect()
882 }
883
884 #[must_use]
891 pub fn node_exists(&self, id: NodeId) -> bool {
892 self.get_node(id).is_some()
893 }
894
895 #[must_use]
897 pub fn edge_exists(&self, id: EdgeId) -> bool {
898 self.get_edge(id).is_some()
899 }
900
901 #[must_use]
905 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
906 let out = self.store.out_degree(node);
907 let in_degree = self.store.in_degree(node);
908 (out, in_degree)
909 }
910
911 #[must_use]
921 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
922 let (epoch, tx_id) = self.get_transaction_context();
923 let tx = tx_id.unwrap_or(TxId::SYSTEM);
924 ids.iter()
925 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
926 .collect()
927 }
928}
929
930#[cfg(test)]
931mod tests {
932 use crate::database::GrafeoDB;
933
934 #[test]
935 fn test_session_create_node() {
936 let db = GrafeoDB::new_in_memory();
937 let session = db.session();
938
939 let id = session.create_node(&["Person"]);
940 assert!(id.is_valid());
941 assert_eq!(db.node_count(), 1);
942 }
943
944 #[test]
945 fn test_session_transaction() {
946 let db = GrafeoDB::new_in_memory();
947 let mut session = db.session();
948
949 assert!(!session.in_transaction());
950
951 session.begin_tx().unwrap();
952 assert!(session.in_transaction());
953
954 session.commit().unwrap();
955 assert!(!session.in_transaction());
956 }
957
958 #[test]
959 fn test_session_transaction_context() {
960 let db = GrafeoDB::new_in_memory();
961 let mut session = db.session();
962
963 let (_epoch1, tx_id1) = session.get_transaction_context();
965 assert!(tx_id1.is_none());
966
967 session.begin_tx().unwrap();
969 let (epoch2, tx_id2) = session.get_transaction_context();
970 assert!(tx_id2.is_some());
971 let _ = epoch2; session.commit().unwrap();
976 let (epoch3, tx_id3) = session.get_transaction_context();
977 assert!(tx_id3.is_none());
978 assert!(epoch3.as_u64() >= epoch2.as_u64());
980 }
981
982 #[test]
983 fn test_session_rollback() {
984 let db = GrafeoDB::new_in_memory();
985 let mut session = db.session();
986
987 session.begin_tx().unwrap();
988 session.rollback().unwrap();
989 assert!(!session.in_transaction());
990 }
991
992 #[test]
993 fn test_session_rollback_discards_versions() {
994 use grafeo_common::types::TxId;
995
996 let db = GrafeoDB::new_in_memory();
997
998 let node_before = db.store().create_node(&["Person"]);
1000 assert!(node_before.is_valid());
1001 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1002
1003 let mut session = db.session();
1005 session.begin_tx().unwrap();
1006 let tx_id = session.current_tx.unwrap();
1007
1008 let epoch = db.store().current_epoch();
1010 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1011 assert!(node_in_tx.is_valid());
1012
1013 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1015
1016 session.rollback().unwrap();
1018 assert!(!session.in_transaction());
1019
1020 let count_after = db.node_count();
1023 assert_eq!(
1024 count_after, 1,
1025 "Rollback should discard uncommitted node, but got {count_after}"
1026 );
1027
1028 let current_epoch = db.store().current_epoch();
1030 assert!(
1031 db.store()
1032 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1033 .is_some(),
1034 "Original node should still exist"
1035 );
1036
1037 assert!(
1039 db.store()
1040 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1041 .is_none(),
1042 "Transaction node should be gone"
1043 );
1044 }
1045
1046 #[test]
1047 fn test_session_create_node_in_transaction() {
1048 let db = GrafeoDB::new_in_memory();
1050
1051 let node_before = db.create_node(&["Person"]);
1053 assert!(node_before.is_valid());
1054 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1055
1056 let mut session = db.session();
1058 session.begin_tx().unwrap();
1059
1060 let node_in_tx = session.create_node(&["Person"]);
1062 assert!(node_in_tx.is_valid());
1063
1064 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1066
1067 session.rollback().unwrap();
1069
1070 let count_after = db.node_count();
1072 assert_eq!(
1073 count_after, 1,
1074 "Rollback should discard node created via session.create_node(), but got {count_after}"
1075 );
1076 }
1077
1078 #[test]
1079 fn test_session_create_node_with_props_in_transaction() {
1080 use grafeo_common::types::Value;
1081
1082 let db = GrafeoDB::new_in_memory();
1084
1085 db.create_node(&["Person"]);
1087 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1088
1089 let mut session = db.session();
1091 session.begin_tx().unwrap();
1092
1093 let node_in_tx =
1094 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1095 assert!(node_in_tx.is_valid());
1096
1097 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1099
1100 session.rollback().unwrap();
1102
1103 let count_after = db.node_count();
1105 assert_eq!(
1106 count_after, 1,
1107 "Rollback should discard node created via session.create_node_with_props()"
1108 );
1109 }
1110
1111 #[cfg(feature = "gql")]
1112 mod gql_tests {
1113 use super::*;
1114
1115 #[test]
1116 fn test_gql_query_execution() {
1117 let db = GrafeoDB::new_in_memory();
1118 let session = db.session();
1119
1120 session.create_node(&["Person"]);
1122 session.create_node(&["Person"]);
1123 session.create_node(&["Animal"]);
1124
1125 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1127
1128 assert_eq!(result.row_count(), 2);
1130 assert_eq!(result.column_count(), 1);
1131 assert_eq!(result.columns[0], "n");
1132 }
1133
1134 #[test]
1135 fn test_gql_empty_result() {
1136 let db = GrafeoDB::new_in_memory();
1137 let session = db.session();
1138
1139 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1141
1142 assert_eq!(result.row_count(), 0);
1143 }
1144
1145 #[test]
1146 fn test_gql_parse_error() {
1147 let db = GrafeoDB::new_in_memory();
1148 let session = db.session();
1149
1150 let result = session.execute("MATCH (n RETURN n");
1152
1153 assert!(result.is_err());
1154 }
1155
1156 #[test]
1157 fn test_gql_relationship_traversal() {
1158 let db = GrafeoDB::new_in_memory();
1159 let session = db.session();
1160
1161 let alice = session.create_node(&["Person"]);
1163 let bob = session.create_node(&["Person"]);
1164 let charlie = session.create_node(&["Person"]);
1165
1166 session.create_edge(alice, bob, "KNOWS");
1167 session.create_edge(alice, charlie, "KNOWS");
1168
1169 let result = session
1171 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1172 .unwrap();
1173
1174 assert_eq!(result.row_count(), 2);
1176 assert_eq!(result.column_count(), 2);
1177 assert_eq!(result.columns[0], "a");
1178 assert_eq!(result.columns[1], "b");
1179 }
1180
1181 #[test]
1182 fn test_gql_relationship_with_type_filter() {
1183 let db = GrafeoDB::new_in_memory();
1184 let session = db.session();
1185
1186 let alice = session.create_node(&["Person"]);
1188 let bob = session.create_node(&["Person"]);
1189 let charlie = session.create_node(&["Person"]);
1190
1191 session.create_edge(alice, bob, "KNOWS");
1192 session.create_edge(alice, charlie, "WORKS_WITH");
1193
1194 let result = session
1196 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1197 .unwrap();
1198
1199 assert_eq!(result.row_count(), 1);
1201 }
1202
1203 #[test]
1204 fn test_gql_semantic_error_undefined_variable() {
1205 let db = GrafeoDB::new_in_memory();
1206 let session = db.session();
1207
1208 let result = session.execute("MATCH (n:Person) RETURN x");
1210
1211 assert!(result.is_err());
1213 let Err(err) = result else {
1214 panic!("Expected error")
1215 };
1216 assert!(
1217 err.to_string().contains("Undefined variable"),
1218 "Expected undefined variable error, got: {}",
1219 err
1220 );
1221 }
1222
1223 #[test]
1224 fn test_gql_where_clause_property_filter() {
1225 use grafeo_common::types::Value;
1226
1227 let db = GrafeoDB::new_in_memory();
1228 let session = db.session();
1229
1230 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1232 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1233 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1234
1235 let result = session
1237 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1238 .unwrap();
1239
1240 assert_eq!(result.row_count(), 2);
1242 }
1243
1244 #[test]
1245 fn test_gql_where_clause_equality() {
1246 use grafeo_common::types::Value;
1247
1248 let db = GrafeoDB::new_in_memory();
1249 let session = db.session();
1250
1251 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1253 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1254 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1255
1256 let result = session
1258 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1259 .unwrap();
1260
1261 assert_eq!(result.row_count(), 2);
1263 }
1264
1265 #[test]
1266 fn test_gql_return_property_access() {
1267 use grafeo_common::types::Value;
1268
1269 let db = GrafeoDB::new_in_memory();
1270 let session = db.session();
1271
1272 session.create_node_with_props(
1274 &["Person"],
1275 [
1276 ("name", Value::String("Alice".into())),
1277 ("age", Value::Int64(30)),
1278 ],
1279 );
1280 session.create_node_with_props(
1281 &["Person"],
1282 [
1283 ("name", Value::String("Bob".into())),
1284 ("age", Value::Int64(25)),
1285 ],
1286 );
1287
1288 let result = session
1290 .execute("MATCH (n:Person) RETURN n.name, n.age")
1291 .unwrap();
1292
1293 assert_eq!(result.row_count(), 2);
1295 assert_eq!(result.column_count(), 2);
1296 assert_eq!(result.columns[0], "n.name");
1297 assert_eq!(result.columns[1], "n.age");
1298
1299 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1301 assert!(names.contains(&&Value::String("Alice".into())));
1302 assert!(names.contains(&&Value::String("Bob".into())));
1303 }
1304
1305 #[test]
1306 fn test_gql_return_mixed_expressions() {
1307 use grafeo_common::types::Value;
1308
1309 let db = GrafeoDB::new_in_memory();
1310 let session = db.session();
1311
1312 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1314
1315 let result = session
1317 .execute("MATCH (n:Person) RETURN n, n.name")
1318 .unwrap();
1319
1320 assert_eq!(result.row_count(), 1);
1321 assert_eq!(result.column_count(), 2);
1322 assert_eq!(result.columns[0], "n");
1323 assert_eq!(result.columns[1], "n.name");
1324
1325 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1327 }
1328 }
1329
1330 #[cfg(feature = "cypher")]
1331 mod cypher_tests {
1332 use super::*;
1333
1334 #[test]
1335 fn test_cypher_query_execution() {
1336 let db = GrafeoDB::new_in_memory();
1337 let session = db.session();
1338
1339 session.create_node(&["Person"]);
1341 session.create_node(&["Person"]);
1342 session.create_node(&["Animal"]);
1343
1344 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1346
1347 assert_eq!(result.row_count(), 2);
1349 assert_eq!(result.column_count(), 1);
1350 assert_eq!(result.columns[0], "n");
1351 }
1352
1353 #[test]
1354 fn test_cypher_empty_result() {
1355 let db = GrafeoDB::new_in_memory();
1356 let session = db.session();
1357
1358 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1360
1361 assert_eq!(result.row_count(), 0);
1362 }
1363
1364 #[test]
1365 fn test_cypher_parse_error() {
1366 let db = GrafeoDB::new_in_memory();
1367 let session = db.session();
1368
1369 let result = session.execute_cypher("MATCH (n RETURN n");
1371
1372 assert!(result.is_err());
1373 }
1374 }
1375
1376 mod direct_lookup_tests {
1379 use super::*;
1380 use grafeo_common::types::Value;
1381
1382 #[test]
1383 fn test_get_node() {
1384 let db = GrafeoDB::new_in_memory();
1385 let session = db.session();
1386
1387 let id = session.create_node(&["Person"]);
1388 let node = session.get_node(id);
1389
1390 assert!(node.is_some());
1391 let node = node.unwrap();
1392 assert_eq!(node.id, id);
1393 }
1394
1395 #[test]
1396 fn test_get_node_not_found() {
1397 use grafeo_common::types::NodeId;
1398
1399 let db = GrafeoDB::new_in_memory();
1400 let session = db.session();
1401
1402 let node = session.get_node(NodeId::new(9999));
1404 assert!(node.is_none());
1405 }
1406
1407 #[test]
1408 fn test_get_node_property() {
1409 let db = GrafeoDB::new_in_memory();
1410 let session = db.session();
1411
1412 let id = session
1413 .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1414
1415 let name = session.get_node_property(id, "name");
1416 assert_eq!(name, Some(Value::String("Alice".into())));
1417
1418 let missing = session.get_node_property(id, "missing");
1420 assert!(missing.is_none());
1421 }
1422
1423 #[test]
1424 fn test_get_edge() {
1425 let db = GrafeoDB::new_in_memory();
1426 let session = db.session();
1427
1428 let alice = session.create_node(&["Person"]);
1429 let bob = session.create_node(&["Person"]);
1430 let edge_id = session.create_edge(alice, bob, "KNOWS");
1431
1432 let edge = session.get_edge(edge_id);
1433 assert!(edge.is_some());
1434 let edge = edge.unwrap();
1435 assert_eq!(edge.id, edge_id);
1436 assert_eq!(edge.src, alice);
1437 assert_eq!(edge.dst, bob);
1438 }
1439
1440 #[test]
1441 fn test_get_edge_not_found() {
1442 use grafeo_common::types::EdgeId;
1443
1444 let db = GrafeoDB::new_in_memory();
1445 let session = db.session();
1446
1447 let edge = session.get_edge(EdgeId::new(9999));
1448 assert!(edge.is_none());
1449 }
1450
1451 #[test]
1452 fn test_get_neighbors_outgoing() {
1453 let db = GrafeoDB::new_in_memory();
1454 let session = db.session();
1455
1456 let alice = session.create_node(&["Person"]);
1457 let bob = session.create_node(&["Person"]);
1458 let carol = session.create_node(&["Person"]);
1459
1460 session.create_edge(alice, bob, "KNOWS");
1461 session.create_edge(alice, carol, "KNOWS");
1462
1463 let neighbors = session.get_neighbors_outgoing(alice);
1464 assert_eq!(neighbors.len(), 2);
1465
1466 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1467 assert!(neighbor_ids.contains(&bob));
1468 assert!(neighbor_ids.contains(&carol));
1469 }
1470
1471 #[test]
1472 fn test_get_neighbors_incoming() {
1473 let db = GrafeoDB::new_in_memory();
1474 let session = db.session();
1475
1476 let alice = session.create_node(&["Person"]);
1477 let bob = session.create_node(&["Person"]);
1478 let carol = session.create_node(&["Person"]);
1479
1480 session.create_edge(bob, alice, "KNOWS");
1481 session.create_edge(carol, alice, "KNOWS");
1482
1483 let neighbors = session.get_neighbors_incoming(alice);
1484 assert_eq!(neighbors.len(), 2);
1485
1486 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1487 assert!(neighbor_ids.contains(&bob));
1488 assert!(neighbor_ids.contains(&carol));
1489 }
1490
1491 #[test]
1492 fn test_get_neighbors_outgoing_by_type() {
1493 let db = GrafeoDB::new_in_memory();
1494 let session = db.session();
1495
1496 let alice = session.create_node(&["Person"]);
1497 let bob = session.create_node(&["Person"]);
1498 let company = session.create_node(&["Company"]);
1499
1500 session.create_edge(alice, bob, "KNOWS");
1501 session.create_edge(alice, company, "WORKS_AT");
1502
1503 let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1504 assert_eq!(knows_neighbors.len(), 1);
1505 assert_eq!(knows_neighbors[0].0, bob);
1506
1507 let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1508 assert_eq!(works_neighbors.len(), 1);
1509 assert_eq!(works_neighbors[0].0, company);
1510
1511 let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1513 assert!(no_neighbors.is_empty());
1514 }
1515
1516 #[test]
1517 fn test_node_exists() {
1518 use grafeo_common::types::NodeId;
1519
1520 let db = GrafeoDB::new_in_memory();
1521 let session = db.session();
1522
1523 let id = session.create_node(&["Person"]);
1524
1525 assert!(session.node_exists(id));
1526 assert!(!session.node_exists(NodeId::new(9999)));
1527 }
1528
1529 #[test]
1530 fn test_edge_exists() {
1531 use grafeo_common::types::EdgeId;
1532
1533 let db = GrafeoDB::new_in_memory();
1534 let session = db.session();
1535
1536 let alice = session.create_node(&["Person"]);
1537 let bob = session.create_node(&["Person"]);
1538 let edge_id = session.create_edge(alice, bob, "KNOWS");
1539
1540 assert!(session.edge_exists(edge_id));
1541 assert!(!session.edge_exists(EdgeId::new(9999)));
1542 }
1543
1544 #[test]
1545 fn test_get_degree() {
1546 let db = GrafeoDB::new_in_memory();
1547 let session = db.session();
1548
1549 let alice = session.create_node(&["Person"]);
1550 let bob = session.create_node(&["Person"]);
1551 let carol = session.create_node(&["Person"]);
1552
1553 session.create_edge(alice, bob, "KNOWS");
1555 session.create_edge(alice, carol, "KNOWS");
1556 session.create_edge(bob, alice, "KNOWS");
1558
1559 let (out_degree, in_degree) = session.get_degree(alice);
1560 assert_eq!(out_degree, 2);
1561 assert_eq!(in_degree, 1);
1562
1563 let lonely = session.create_node(&["Person"]);
1565 let (out, in_deg) = session.get_degree(lonely);
1566 assert_eq!(out, 0);
1567 assert_eq!(in_deg, 0);
1568 }
1569
1570 #[test]
1571 fn test_get_nodes_batch() {
1572 let db = GrafeoDB::new_in_memory();
1573 let session = db.session();
1574
1575 let alice = session.create_node(&["Person"]);
1576 let bob = session.create_node(&["Person"]);
1577 let carol = session.create_node(&["Person"]);
1578
1579 let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1580 assert_eq!(nodes.len(), 3);
1581 assert!(nodes[0].is_some());
1582 assert!(nodes[1].is_some());
1583 assert!(nodes[2].is_some());
1584
1585 use grafeo_common::types::NodeId;
1587 let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1588 assert_eq!(nodes_with_missing.len(), 3);
1589 assert!(nodes_with_missing[0].is_some());
1590 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
1592 }
1593
1594 #[test]
1595 fn test_auto_commit_setting() {
1596 let db = GrafeoDB::new_in_memory();
1597 let mut session = db.session();
1598
1599 assert!(session.auto_commit());
1601
1602 session.set_auto_commit(false);
1603 assert!(!session.auto_commit());
1604
1605 session.set_auto_commit(true);
1606 assert!(session.auto_commit());
1607 }
1608
1609 #[test]
1610 fn test_transaction_double_begin_error() {
1611 let db = GrafeoDB::new_in_memory();
1612 let mut session = db.session();
1613
1614 session.begin_tx().unwrap();
1615 let result = session.begin_tx();
1616
1617 assert!(result.is_err());
1618 session.rollback().unwrap();
1620 }
1621
1622 #[test]
1623 fn test_commit_without_transaction_error() {
1624 let db = GrafeoDB::new_in_memory();
1625 let mut session = db.session();
1626
1627 let result = session.commit();
1628 assert!(result.is_err());
1629 }
1630
1631 #[test]
1632 fn test_rollback_without_transaction_error() {
1633 let db = GrafeoDB::new_in_memory();
1634 let mut session = db.session();
1635
1636 let result = session.rollback();
1637 assert!(result.is_err());
1638 }
1639
1640 #[test]
1641 fn test_create_edge_in_transaction() {
1642 let db = GrafeoDB::new_in_memory();
1643 let mut session = db.session();
1644
1645 let alice = session.create_node(&["Person"]);
1647 let bob = session.create_node(&["Person"]);
1648
1649 session.begin_tx().unwrap();
1651 let edge_id = session.create_edge(alice, bob, "KNOWS");
1652
1653 assert!(session.edge_exists(edge_id));
1655
1656 session.commit().unwrap();
1658
1659 assert!(session.edge_exists(edge_id));
1661 }
1662
1663 #[test]
1664 fn test_neighbors_empty_node() {
1665 let db = GrafeoDB::new_in_memory();
1666 let session = db.session();
1667
1668 let lonely = session.create_node(&["Person"]);
1669
1670 assert!(session.get_neighbors_outgoing(lonely).is_empty());
1671 assert!(session.get_neighbors_incoming(lonely).is_empty());
1672 assert!(
1673 session
1674 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1675 .is_empty()
1676 );
1677 }
1678 }
1679}