1use std::sync::Arc;
8
9use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
10use grafeo_common::utils::error::Result;
11use grafeo_core::graph::Direction;
12use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
13#[cfg(feature = "rdf")]
14use grafeo_core::graph::rdf::RdfStore;
15
16use crate::config::{AdaptiveConfig, GraphModel};
17use crate::database::QueryResult;
18use crate::query::cache::QueryCache;
19use crate::transaction::TransactionManager;
20
21pub struct Session {
27 store: Arc<LpgStore>,
29 #[cfg(feature = "rdf")]
31 #[allow(dead_code)]
32 rdf_store: Arc<RdfStore>,
33 tx_manager: Arc<TransactionManager>,
35 query_cache: Arc<QueryCache>,
37 current_tx: Option<TxId>,
39 auto_commit: bool,
41 #[allow(dead_code)]
43 adaptive_config: AdaptiveConfig,
44 factorized_execution: bool,
46 graph_model: GraphModel,
48}
49
50impl Session {
51 #[allow(dead_code)]
53 pub(crate) fn new(
54 store: Arc<LpgStore>,
55 tx_manager: Arc<TransactionManager>,
56 query_cache: Arc<QueryCache>,
57 ) -> Self {
58 Self {
59 store,
60 #[cfg(feature = "rdf")]
61 rdf_store: Arc::new(RdfStore::new()),
62 tx_manager,
63 query_cache,
64 current_tx: None,
65 auto_commit: true,
66 adaptive_config: AdaptiveConfig::default(),
67 factorized_execution: true,
68 graph_model: GraphModel::Lpg,
69 }
70 }
71
72 #[allow(dead_code)]
74 pub(crate) fn with_adaptive(
75 store: Arc<LpgStore>,
76 tx_manager: Arc<TransactionManager>,
77 query_cache: Arc<QueryCache>,
78 adaptive_config: AdaptiveConfig,
79 factorized_execution: bool,
80 graph_model: GraphModel,
81 ) -> Self {
82 Self {
83 store,
84 #[cfg(feature = "rdf")]
85 rdf_store: Arc::new(RdfStore::new()),
86 tx_manager,
87 query_cache,
88 current_tx: None,
89 auto_commit: true,
90 adaptive_config,
91 factorized_execution,
92 graph_model,
93 }
94 }
95
96 #[cfg(feature = "rdf")]
98 pub(crate) fn with_rdf_store_and_adaptive(
99 store: Arc<LpgStore>,
100 rdf_store: Arc<RdfStore>,
101 tx_manager: Arc<TransactionManager>,
102 query_cache: Arc<QueryCache>,
103 adaptive_config: AdaptiveConfig,
104 factorized_execution: bool,
105 graph_model: GraphModel,
106 ) -> Self {
107 Self {
108 store,
109 rdf_store,
110 tx_manager,
111 query_cache,
112 current_tx: None,
113 auto_commit: true,
114 adaptive_config,
115 factorized_execution,
116 graph_model,
117 }
118 }
119
120 #[must_use]
122 pub fn graph_model(&self) -> GraphModel {
123 self.graph_model
124 }
125
126 fn require_lpg(&self, language: &str) -> Result<()> {
128 if self.graph_model == GraphModel::Rdf {
129 return Err(grafeo_common::utils::error::Error::Internal(format!(
130 "This is an RDF database. {language} queries require an LPG database."
131 )));
132 }
133 Ok(())
134 }
135
136 #[cfg(feature = "gql")]
160 pub fn execute(&self, query: &str) -> Result<QueryResult> {
161 self.require_lpg("GQL")?;
162
163 use crate::query::{
164 Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
165 optimizer::Optimizer, processor::QueryLanguage,
166 };
167
168 let start_time = std::time::Instant::now();
169
170 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
172
173 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
175 cached_plan
177 } else {
178 let logical_plan = gql_translator::translate(query)?;
182
183 let mut binder = Binder::new();
185 let _binding_context = binder.bind(&logical_plan)?;
186
187 let optimizer = Optimizer::from_store(&self.store);
189 let plan = optimizer.optimize(logical_plan)?;
190
191 self.query_cache.put_optimized(cache_key, plan.clone());
193
194 plan
195 };
196
197 let (viewing_epoch, tx_id) = self.get_transaction_context();
199
200 let planner = Planner::with_context(
203 Arc::clone(&self.store),
204 Arc::clone(&self.tx_manager),
205 tx_id,
206 viewing_epoch,
207 )
208 .with_factorized_execution(self.factorized_execution);
209 let mut physical_plan = planner.plan(&optimized_plan)?;
210
211 let executor = Executor::with_columns(physical_plan.columns.clone());
213 let mut result = executor.execute(physical_plan.operator.as_mut())?;
214
215 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
217 let rows_scanned = result.rows.len() as u64;
218 result.execution_time_ms = Some(elapsed_ms);
219 result.rows_scanned = Some(rows_scanned);
220
221 Ok(result)
222 }
223
224 #[cfg(feature = "gql")]
230 pub fn execute_with_params(
231 &self,
232 query: &str,
233 params: std::collections::HashMap<String, Value>,
234 ) -> Result<QueryResult> {
235 self.require_lpg("GQL")?;
236
237 use crate::query::processor::{QueryLanguage, QueryProcessor};
238
239 let (viewing_epoch, tx_id) = self.get_transaction_context();
241
242 let processor =
244 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
245
246 let processor = if let Some(tx_id) = tx_id {
248 processor.with_tx_context(viewing_epoch, tx_id)
249 } else {
250 processor
251 };
252
253 processor.process(query, QueryLanguage::Gql, Some(¶ms))
254 }
255
256 #[cfg(not(any(feature = "gql", feature = "cypher")))]
262 pub fn execute_with_params(
263 &self,
264 _query: &str,
265 _params: std::collections::HashMap<String, Value>,
266 ) -> Result<QueryResult> {
267 Err(grafeo_common::utils::error::Error::Internal(
268 "No query language enabled".to_string(),
269 ))
270 }
271
272 #[cfg(not(any(feature = "gql", feature = "cypher")))]
278 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
279 Err(grafeo_common::utils::error::Error::Internal(
280 "No query language enabled".to_string(),
281 ))
282 }
283
284 #[cfg(feature = "cypher")]
290 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
291 use crate::query::{
292 Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
293 optimizer::Optimizer, processor::QueryLanguage,
294 };
295
296 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
298
299 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
301 cached_plan
302 } else {
303 let logical_plan = cypher_translator::translate(query)?;
305
306 let mut binder = Binder::new();
308 let _binding_context = binder.bind(&logical_plan)?;
309
310 let optimizer = Optimizer::from_store(&self.store);
312 let plan = optimizer.optimize(logical_plan)?;
313
314 self.query_cache.put_optimized(cache_key, plan.clone());
316
317 plan
318 };
319
320 let (viewing_epoch, tx_id) = self.get_transaction_context();
322
323 let planner = Planner::with_context(
325 Arc::clone(&self.store),
326 Arc::clone(&self.tx_manager),
327 tx_id,
328 viewing_epoch,
329 )
330 .with_factorized_execution(self.factorized_execution);
331 let mut physical_plan = planner.plan(&optimized_plan)?;
332
333 let executor = Executor::with_columns(physical_plan.columns.clone());
335 executor.execute(physical_plan.operator.as_mut())
336 }
337
338 #[cfg(feature = "gremlin")]
359 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
360 use crate::query::{
361 Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
362 };
363
364 let logical_plan = gremlin_translator::translate(query)?;
366
367 let mut binder = Binder::new();
369 let _binding_context = binder.bind(&logical_plan)?;
370
371 let optimizer = Optimizer::from_store(&self.store);
373 let optimized_plan = optimizer.optimize(logical_plan)?;
374
375 let (viewing_epoch, tx_id) = self.get_transaction_context();
377
378 let planner = Planner::with_context(
380 Arc::clone(&self.store),
381 Arc::clone(&self.tx_manager),
382 tx_id,
383 viewing_epoch,
384 )
385 .with_factorized_execution(self.factorized_execution);
386 let mut physical_plan = planner.plan(&optimized_plan)?;
387
388 let executor = Executor::with_columns(physical_plan.columns.clone());
390 executor.execute(physical_plan.operator.as_mut())
391 }
392
393 #[cfg(feature = "gremlin")]
399 pub fn execute_gremlin_with_params(
400 &self,
401 query: &str,
402 params: std::collections::HashMap<String, Value>,
403 ) -> Result<QueryResult> {
404 use crate::query::processor::{QueryLanguage, QueryProcessor};
405
406 let (viewing_epoch, tx_id) = self.get_transaction_context();
408
409 let processor =
411 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
412
413 let processor = if let Some(tx_id) = tx_id {
415 processor.with_tx_context(viewing_epoch, tx_id)
416 } else {
417 processor
418 };
419
420 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
421 }
422
423 #[cfg(feature = "graphql")]
444 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
445 use crate::query::{
446 Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
447 };
448
449 let logical_plan = graphql_translator::translate(query)?;
451
452 let mut binder = Binder::new();
454 let _binding_context = binder.bind(&logical_plan)?;
455
456 let optimizer = Optimizer::from_store(&self.store);
458 let optimized_plan = optimizer.optimize(logical_plan)?;
459
460 let (viewing_epoch, tx_id) = self.get_transaction_context();
462
463 let planner = Planner::with_context(
465 Arc::clone(&self.store),
466 Arc::clone(&self.tx_manager),
467 tx_id,
468 viewing_epoch,
469 )
470 .with_factorized_execution(self.factorized_execution);
471 let mut physical_plan = planner.plan(&optimized_plan)?;
472
473 let executor = Executor::with_columns(physical_plan.columns.clone());
475 executor.execute(physical_plan.operator.as_mut())
476 }
477
478 #[cfg(feature = "graphql")]
484 pub fn execute_graphql_with_params(
485 &self,
486 query: &str,
487 params: std::collections::HashMap<String, Value>,
488 ) -> Result<QueryResult> {
489 use crate::query::processor::{QueryLanguage, QueryProcessor};
490
491 let (viewing_epoch, tx_id) = self.get_transaction_context();
493
494 let processor =
496 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
497
498 let processor = if let Some(tx_id) = tx_id {
500 processor.with_tx_context(viewing_epoch, tx_id)
501 } else {
502 processor
503 };
504
505 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
506 }
507
508 #[cfg(feature = "sql-pgq")]
530 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
531 use crate::query::{
532 Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
533 plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
534 };
535
536 let logical_plan = sql_pgq_translator::translate(query)?;
538
539 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
541 return Ok(QueryResult {
542 columns: vec!["status".into()],
543 column_types: vec![grafeo_common::types::LogicalType::String],
544 rows: vec![vec![Value::from(format!(
545 "Property graph '{}' created ({} node tables, {} edge tables)",
546 cpg.name,
547 cpg.node_tables.len(),
548 cpg.edge_tables.len()
549 ))]],
550 execution_time_ms: None,
551 rows_scanned: None,
552 });
553 }
554
555 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
557
558 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
560 cached_plan
561 } else {
562 let mut binder = Binder::new();
564 let _binding_context = binder.bind(&logical_plan)?;
565
566 let optimizer = Optimizer::from_store(&self.store);
568 let plan = optimizer.optimize(logical_plan)?;
569
570 self.query_cache.put_optimized(cache_key, plan.clone());
572
573 plan
574 };
575
576 let (viewing_epoch, tx_id) = self.get_transaction_context();
578
579 let planner = Planner::with_context(
581 Arc::clone(&self.store),
582 Arc::clone(&self.tx_manager),
583 tx_id,
584 viewing_epoch,
585 )
586 .with_factorized_execution(self.factorized_execution);
587 let mut physical_plan = planner.plan(&optimized_plan)?;
588
589 let executor = Executor::with_columns(physical_plan.columns.clone());
591 executor.execute(physical_plan.operator.as_mut())
592 }
593
594 #[cfg(feature = "sql-pgq")]
600 pub fn execute_sql_with_params(
601 &self,
602 query: &str,
603 params: std::collections::HashMap<String, Value>,
604 ) -> Result<QueryResult> {
605 use crate::query::processor::{QueryLanguage, QueryProcessor};
606
607 let (viewing_epoch, tx_id) = self.get_transaction_context();
609
610 let processor =
612 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
613
614 let processor = if let Some(tx_id) = tx_id {
616 processor.with_tx_context(viewing_epoch, tx_id)
617 } else {
618 processor
619 };
620
621 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
622 }
623
624 #[cfg(all(feature = "sparql", feature = "rdf"))]
630 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
631 use crate::query::{
632 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
633 };
634
635 let logical_plan = sparql_translator::translate(query)?;
637
638 let optimizer = Optimizer::from_store(&self.store);
640 let optimized_plan = optimizer.optimize(logical_plan)?;
641
642 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
644 let mut physical_plan = planner.plan(&optimized_plan)?;
645
646 let executor = Executor::with_columns(physical_plan.columns.clone());
648 executor.execute(physical_plan.operator.as_mut())
649 }
650
651 #[cfg(all(feature = "sparql", feature = "rdf"))]
657 pub fn execute_sparql_with_params(
658 &self,
659 query: &str,
660 _params: std::collections::HashMap<String, Value>,
661 ) -> Result<QueryResult> {
662 self.execute_sparql(query)
665 }
666
667 pub fn begin_tx(&mut self) -> Result<()> {
687 if self.current_tx.is_some() {
688 return Err(grafeo_common::utils::error::Error::Transaction(
689 grafeo_common::utils::error::TransactionError::InvalidState(
690 "Transaction already active".to_string(),
691 ),
692 ));
693 }
694
695 let tx_id = self.tx_manager.begin();
696 self.current_tx = Some(tx_id);
697 Ok(())
698 }
699
700 pub fn begin_tx_with_isolation(
708 &mut self,
709 isolation_level: crate::transaction::IsolationLevel,
710 ) -> Result<()> {
711 if self.current_tx.is_some() {
712 return Err(grafeo_common::utils::error::Error::Transaction(
713 grafeo_common::utils::error::TransactionError::InvalidState(
714 "Transaction already active".to_string(),
715 ),
716 ));
717 }
718
719 let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
720 self.current_tx = Some(tx_id);
721 Ok(())
722 }
723
724 pub fn commit(&mut self) -> Result<()> {
732 let tx_id = self.current_tx.take().ok_or_else(|| {
733 grafeo_common::utils::error::Error::Transaction(
734 grafeo_common::utils::error::TransactionError::InvalidState(
735 "No active transaction".to_string(),
736 ),
737 )
738 })?;
739
740 #[cfg(feature = "rdf")]
742 self.rdf_store.commit_tx(tx_id);
743
744 self.tx_manager.commit(tx_id).map(|_| ())
745 }
746
747 pub fn rollback(&mut self) -> Result<()> {
768 let tx_id = self.current_tx.take().ok_or_else(|| {
769 grafeo_common::utils::error::Error::Transaction(
770 grafeo_common::utils::error::TransactionError::InvalidState(
771 "No active transaction".to_string(),
772 ),
773 )
774 })?;
775
776 self.store.discard_uncommitted_versions(tx_id);
778
779 #[cfg(feature = "rdf")]
781 self.rdf_store.rollback_tx(tx_id);
782
783 self.tx_manager.abort(tx_id)
785 }
786
787 #[must_use]
789 pub fn in_transaction(&self) -> bool {
790 self.current_tx.is_some()
791 }
792
793 pub fn set_auto_commit(&mut self, auto_commit: bool) {
795 self.auto_commit = auto_commit;
796 }
797
798 #[must_use]
800 pub fn auto_commit(&self) -> bool {
801 self.auto_commit
802 }
803
804 #[must_use]
810 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
811 if let Some(tx_id) = self.current_tx {
812 let epoch = self
814 .tx_manager
815 .start_epoch(tx_id)
816 .unwrap_or_else(|| self.tx_manager.current_epoch());
817 (epoch, Some(tx_id))
818 } else {
819 (self.tx_manager.current_epoch(), None)
821 }
822 }
823
824 pub fn create_node(&self, labels: &[&str]) -> NodeId {
829 let (epoch, tx_id) = self.get_transaction_context();
830 self.store
831 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
832 }
833
834 pub fn create_node_with_props<'a>(
838 &self,
839 labels: &[&str],
840 properties: impl IntoIterator<Item = (&'a str, Value)>,
841 ) -> NodeId {
842 let (epoch, tx_id) = self.get_transaction_context();
843 self.store.create_node_with_props_versioned(
844 labels,
845 properties.into_iter().map(|(k, v)| (k, v)),
846 epoch,
847 tx_id.unwrap_or(TxId::SYSTEM),
848 )
849 }
850
851 pub fn create_edge(
856 &self,
857 src: NodeId,
858 dst: NodeId,
859 edge_type: &str,
860 ) -> grafeo_common::types::EdgeId {
861 let (epoch, tx_id) = self.get_transaction_context();
862 self.store
863 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
864 }
865
866 #[must_use]
892 pub fn get_node(&self, id: NodeId) -> Option<Node> {
893 let (epoch, tx_id) = self.get_transaction_context();
894 self.store
895 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
896 }
897
898 #[must_use]
919 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
920 self.get_node(id)
921 .and_then(|node| node.get_property(key).cloned())
922 }
923
924 #[must_use]
931 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
932 let (epoch, tx_id) = self.get_transaction_context();
933 self.store
934 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
935 }
936
937 #[must_use]
961 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
962 self.store.edges_from(node, Direction::Outgoing).collect()
963 }
964
965 #[must_use]
974 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
975 self.store.edges_from(node, Direction::Incoming).collect()
976 }
977
978 #[must_use]
986 pub fn get_neighbors_outgoing_by_type(
987 &self,
988 node: NodeId,
989 edge_type: &str,
990 ) -> Vec<(NodeId, EdgeId)> {
991 self.store
992 .edges_from(node, Direction::Outgoing)
993 .filter(|(_, edge_id)| {
994 self.get_edge(*edge_id)
995 .is_some_and(|e| e.edge_type.as_str() == edge_type)
996 })
997 .collect()
998 }
999
1000 #[must_use]
1007 pub fn node_exists(&self, id: NodeId) -> bool {
1008 self.get_node(id).is_some()
1009 }
1010
1011 #[must_use]
1013 pub fn edge_exists(&self, id: EdgeId) -> bool {
1014 self.get_edge(id).is_some()
1015 }
1016
1017 #[must_use]
1021 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1022 let out = self.store.out_degree(node);
1023 let in_degree = self.store.in_degree(node);
1024 (out, in_degree)
1025 }
1026
1027 #[must_use]
1037 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1038 let (epoch, tx_id) = self.get_transaction_context();
1039 let tx = tx_id.unwrap_or(TxId::SYSTEM);
1040 ids.iter()
1041 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1042 .collect()
1043 }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048 use crate::database::GrafeoDB;
1049
1050 #[test]
1051 fn test_session_create_node() {
1052 let db = GrafeoDB::new_in_memory();
1053 let session = db.session();
1054
1055 let id = session.create_node(&["Person"]);
1056 assert!(id.is_valid());
1057 assert_eq!(db.node_count(), 1);
1058 }
1059
1060 #[test]
1061 fn test_session_transaction() {
1062 let db = GrafeoDB::new_in_memory();
1063 let mut session = db.session();
1064
1065 assert!(!session.in_transaction());
1066
1067 session.begin_tx().unwrap();
1068 assert!(session.in_transaction());
1069
1070 session.commit().unwrap();
1071 assert!(!session.in_transaction());
1072 }
1073
1074 #[test]
1075 fn test_session_transaction_context() {
1076 let db = GrafeoDB::new_in_memory();
1077 let mut session = db.session();
1078
1079 let (_epoch1, tx_id1) = session.get_transaction_context();
1081 assert!(tx_id1.is_none());
1082
1083 session.begin_tx().unwrap();
1085 let (epoch2, tx_id2) = session.get_transaction_context();
1086 assert!(tx_id2.is_some());
1087 let _ = epoch2; session.commit().unwrap();
1092 let (epoch3, tx_id3) = session.get_transaction_context();
1093 assert!(tx_id3.is_none());
1094 assert!(epoch3.as_u64() >= epoch2.as_u64());
1096 }
1097
1098 #[test]
1099 fn test_session_rollback() {
1100 let db = GrafeoDB::new_in_memory();
1101 let mut session = db.session();
1102
1103 session.begin_tx().unwrap();
1104 session.rollback().unwrap();
1105 assert!(!session.in_transaction());
1106 }
1107
1108 #[test]
1109 fn test_session_rollback_discards_versions() {
1110 use grafeo_common::types::TxId;
1111
1112 let db = GrafeoDB::new_in_memory();
1113
1114 let node_before = db.store().create_node(&["Person"]);
1116 assert!(node_before.is_valid());
1117 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1118
1119 let mut session = db.session();
1121 session.begin_tx().unwrap();
1122 let tx_id = session.current_tx.unwrap();
1123
1124 let epoch = db.store().current_epoch();
1126 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1127 assert!(node_in_tx.is_valid());
1128
1129 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1131
1132 session.rollback().unwrap();
1134 assert!(!session.in_transaction());
1135
1136 let count_after = db.node_count();
1139 assert_eq!(
1140 count_after, 1,
1141 "Rollback should discard uncommitted node, but got {count_after}"
1142 );
1143
1144 let current_epoch = db.store().current_epoch();
1146 assert!(
1147 db.store()
1148 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1149 .is_some(),
1150 "Original node should still exist"
1151 );
1152
1153 assert!(
1155 db.store()
1156 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1157 .is_none(),
1158 "Transaction node should be gone"
1159 );
1160 }
1161
1162 #[test]
1163 fn test_session_create_node_in_transaction() {
1164 let db = GrafeoDB::new_in_memory();
1166
1167 let node_before = db.create_node(&["Person"]);
1169 assert!(node_before.is_valid());
1170 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1171
1172 let mut session = db.session();
1174 session.begin_tx().unwrap();
1175
1176 let node_in_tx = session.create_node(&["Person"]);
1178 assert!(node_in_tx.is_valid());
1179
1180 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1182
1183 session.rollback().unwrap();
1185
1186 let count_after = db.node_count();
1188 assert_eq!(
1189 count_after, 1,
1190 "Rollback should discard node created via session.create_node(), but got {count_after}"
1191 );
1192 }
1193
1194 #[test]
1195 fn test_session_create_node_with_props_in_transaction() {
1196 use grafeo_common::types::Value;
1197
1198 let db = GrafeoDB::new_in_memory();
1200
1201 db.create_node(&["Person"]);
1203 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1204
1205 let mut session = db.session();
1207 session.begin_tx().unwrap();
1208
1209 let node_in_tx =
1210 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1211 assert!(node_in_tx.is_valid());
1212
1213 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1215
1216 session.rollback().unwrap();
1218
1219 let count_after = db.node_count();
1221 assert_eq!(
1222 count_after, 1,
1223 "Rollback should discard node created via session.create_node_with_props()"
1224 );
1225 }
1226
1227 #[cfg(feature = "gql")]
1228 mod gql_tests {
1229 use super::*;
1230
1231 #[test]
1232 fn test_gql_query_execution() {
1233 let db = GrafeoDB::new_in_memory();
1234 let session = db.session();
1235
1236 session.create_node(&["Person"]);
1238 session.create_node(&["Person"]);
1239 session.create_node(&["Animal"]);
1240
1241 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1243
1244 assert_eq!(result.row_count(), 2);
1246 assert_eq!(result.column_count(), 1);
1247 assert_eq!(result.columns[0], "n");
1248 }
1249
1250 #[test]
1251 fn test_gql_empty_result() {
1252 let db = GrafeoDB::new_in_memory();
1253 let session = db.session();
1254
1255 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1257
1258 assert_eq!(result.row_count(), 0);
1259 }
1260
1261 #[test]
1262 fn test_gql_parse_error() {
1263 let db = GrafeoDB::new_in_memory();
1264 let session = db.session();
1265
1266 let result = session.execute("MATCH (n RETURN n");
1268
1269 assert!(result.is_err());
1270 }
1271
1272 #[test]
1273 fn test_gql_relationship_traversal() {
1274 let db = GrafeoDB::new_in_memory();
1275 let session = db.session();
1276
1277 let alice = session.create_node(&["Person"]);
1279 let bob = session.create_node(&["Person"]);
1280 let charlie = session.create_node(&["Person"]);
1281
1282 session.create_edge(alice, bob, "KNOWS");
1283 session.create_edge(alice, charlie, "KNOWS");
1284
1285 let result = session
1287 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1288 .unwrap();
1289
1290 assert_eq!(result.row_count(), 2);
1292 assert_eq!(result.column_count(), 2);
1293 assert_eq!(result.columns[0], "a");
1294 assert_eq!(result.columns[1], "b");
1295 }
1296
1297 #[test]
1298 fn test_gql_relationship_with_type_filter() {
1299 let db = GrafeoDB::new_in_memory();
1300 let session = db.session();
1301
1302 let alice = session.create_node(&["Person"]);
1304 let bob = session.create_node(&["Person"]);
1305 let charlie = session.create_node(&["Person"]);
1306
1307 session.create_edge(alice, bob, "KNOWS");
1308 session.create_edge(alice, charlie, "WORKS_WITH");
1309
1310 let result = session
1312 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1313 .unwrap();
1314
1315 assert_eq!(result.row_count(), 1);
1317 }
1318
1319 #[test]
1320 fn test_gql_semantic_error_undefined_variable() {
1321 let db = GrafeoDB::new_in_memory();
1322 let session = db.session();
1323
1324 let result = session.execute("MATCH (n:Person) RETURN x");
1326
1327 assert!(result.is_err());
1329 let Err(err) = result else {
1330 panic!("Expected error")
1331 };
1332 assert!(
1333 err.to_string().contains("Undefined variable"),
1334 "Expected undefined variable error, got: {}",
1335 err
1336 );
1337 }
1338
1339 #[test]
1340 fn test_gql_where_clause_property_filter() {
1341 use grafeo_common::types::Value;
1342
1343 let db = GrafeoDB::new_in_memory();
1344 let session = db.session();
1345
1346 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1348 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1349 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1350
1351 let result = session
1353 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1354 .unwrap();
1355
1356 assert_eq!(result.row_count(), 2);
1358 }
1359
1360 #[test]
1361 fn test_gql_where_clause_equality() {
1362 use grafeo_common::types::Value;
1363
1364 let db = GrafeoDB::new_in_memory();
1365 let session = db.session();
1366
1367 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1369 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1370 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1371
1372 let result = session
1374 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1375 .unwrap();
1376
1377 assert_eq!(result.row_count(), 2);
1379 }
1380
1381 #[test]
1382 fn test_gql_return_property_access() {
1383 use grafeo_common::types::Value;
1384
1385 let db = GrafeoDB::new_in_memory();
1386 let session = db.session();
1387
1388 session.create_node_with_props(
1390 &["Person"],
1391 [
1392 ("name", Value::String("Alice".into())),
1393 ("age", Value::Int64(30)),
1394 ],
1395 );
1396 session.create_node_with_props(
1397 &["Person"],
1398 [
1399 ("name", Value::String("Bob".into())),
1400 ("age", Value::Int64(25)),
1401 ],
1402 );
1403
1404 let result = session
1406 .execute("MATCH (n:Person) RETURN n.name, n.age")
1407 .unwrap();
1408
1409 assert_eq!(result.row_count(), 2);
1411 assert_eq!(result.column_count(), 2);
1412 assert_eq!(result.columns[0], "n.name");
1413 assert_eq!(result.columns[1], "n.age");
1414
1415 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1417 assert!(names.contains(&&Value::String("Alice".into())));
1418 assert!(names.contains(&&Value::String("Bob".into())));
1419 }
1420
1421 #[test]
1422 fn test_gql_return_mixed_expressions() {
1423 use grafeo_common::types::Value;
1424
1425 let db = GrafeoDB::new_in_memory();
1426 let session = db.session();
1427
1428 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1430
1431 let result = session
1433 .execute("MATCH (n:Person) RETURN n, n.name")
1434 .unwrap();
1435
1436 assert_eq!(result.row_count(), 1);
1437 assert_eq!(result.column_count(), 2);
1438 assert_eq!(result.columns[0], "n");
1439 assert_eq!(result.columns[1], "n.name");
1440
1441 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1443 }
1444 }
1445
1446 #[cfg(feature = "cypher")]
1447 mod cypher_tests {
1448 use super::*;
1449
1450 #[test]
1451 fn test_cypher_query_execution() {
1452 let db = GrafeoDB::new_in_memory();
1453 let session = db.session();
1454
1455 session.create_node(&["Person"]);
1457 session.create_node(&["Person"]);
1458 session.create_node(&["Animal"]);
1459
1460 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1462
1463 assert_eq!(result.row_count(), 2);
1465 assert_eq!(result.column_count(), 1);
1466 assert_eq!(result.columns[0], "n");
1467 }
1468
1469 #[test]
1470 fn test_cypher_empty_result() {
1471 let db = GrafeoDB::new_in_memory();
1472 let session = db.session();
1473
1474 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1476
1477 assert_eq!(result.row_count(), 0);
1478 }
1479
1480 #[test]
1481 fn test_cypher_parse_error() {
1482 let db = GrafeoDB::new_in_memory();
1483 let session = db.session();
1484
1485 let result = session.execute_cypher("MATCH (n RETURN n");
1487
1488 assert!(result.is_err());
1489 }
1490 }
1491
1492 mod direct_lookup_tests {
1495 use super::*;
1496 use grafeo_common::types::Value;
1497
1498 #[test]
1499 fn test_get_node() {
1500 let db = GrafeoDB::new_in_memory();
1501 let session = db.session();
1502
1503 let id = session.create_node(&["Person"]);
1504 let node = session.get_node(id);
1505
1506 assert!(node.is_some());
1507 let node = node.unwrap();
1508 assert_eq!(node.id, id);
1509 }
1510
1511 #[test]
1512 fn test_get_node_not_found() {
1513 use grafeo_common::types::NodeId;
1514
1515 let db = GrafeoDB::new_in_memory();
1516 let session = db.session();
1517
1518 let node = session.get_node(NodeId::new(9999));
1520 assert!(node.is_none());
1521 }
1522
1523 #[test]
1524 fn test_get_node_property() {
1525 let db = GrafeoDB::new_in_memory();
1526 let session = db.session();
1527
1528 let id = session
1529 .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1530
1531 let name = session.get_node_property(id, "name");
1532 assert_eq!(name, Some(Value::String("Alice".into())));
1533
1534 let missing = session.get_node_property(id, "missing");
1536 assert!(missing.is_none());
1537 }
1538
1539 #[test]
1540 fn test_get_edge() {
1541 let db = GrafeoDB::new_in_memory();
1542 let session = db.session();
1543
1544 let alice = session.create_node(&["Person"]);
1545 let bob = session.create_node(&["Person"]);
1546 let edge_id = session.create_edge(alice, bob, "KNOWS");
1547
1548 let edge = session.get_edge(edge_id);
1549 assert!(edge.is_some());
1550 let edge = edge.unwrap();
1551 assert_eq!(edge.id, edge_id);
1552 assert_eq!(edge.src, alice);
1553 assert_eq!(edge.dst, bob);
1554 }
1555
1556 #[test]
1557 fn test_get_edge_not_found() {
1558 use grafeo_common::types::EdgeId;
1559
1560 let db = GrafeoDB::new_in_memory();
1561 let session = db.session();
1562
1563 let edge = session.get_edge(EdgeId::new(9999));
1564 assert!(edge.is_none());
1565 }
1566
1567 #[test]
1568 fn test_get_neighbors_outgoing() {
1569 let db = GrafeoDB::new_in_memory();
1570 let session = db.session();
1571
1572 let alice = session.create_node(&["Person"]);
1573 let bob = session.create_node(&["Person"]);
1574 let carol = session.create_node(&["Person"]);
1575
1576 session.create_edge(alice, bob, "KNOWS");
1577 session.create_edge(alice, carol, "KNOWS");
1578
1579 let neighbors = session.get_neighbors_outgoing(alice);
1580 assert_eq!(neighbors.len(), 2);
1581
1582 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1583 assert!(neighbor_ids.contains(&bob));
1584 assert!(neighbor_ids.contains(&carol));
1585 }
1586
1587 #[test]
1588 fn test_get_neighbors_incoming() {
1589 let db = GrafeoDB::new_in_memory();
1590 let session = db.session();
1591
1592 let alice = session.create_node(&["Person"]);
1593 let bob = session.create_node(&["Person"]);
1594 let carol = session.create_node(&["Person"]);
1595
1596 session.create_edge(bob, alice, "KNOWS");
1597 session.create_edge(carol, alice, "KNOWS");
1598
1599 let neighbors = session.get_neighbors_incoming(alice);
1600 assert_eq!(neighbors.len(), 2);
1601
1602 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1603 assert!(neighbor_ids.contains(&bob));
1604 assert!(neighbor_ids.contains(&carol));
1605 }
1606
1607 #[test]
1608 fn test_get_neighbors_outgoing_by_type() {
1609 let db = GrafeoDB::new_in_memory();
1610 let session = db.session();
1611
1612 let alice = session.create_node(&["Person"]);
1613 let bob = session.create_node(&["Person"]);
1614 let company = session.create_node(&["Company"]);
1615
1616 session.create_edge(alice, bob, "KNOWS");
1617 session.create_edge(alice, company, "WORKS_AT");
1618
1619 let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1620 assert_eq!(knows_neighbors.len(), 1);
1621 assert_eq!(knows_neighbors[0].0, bob);
1622
1623 let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1624 assert_eq!(works_neighbors.len(), 1);
1625 assert_eq!(works_neighbors[0].0, company);
1626
1627 let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1629 assert!(no_neighbors.is_empty());
1630 }
1631
1632 #[test]
1633 fn test_node_exists() {
1634 use grafeo_common::types::NodeId;
1635
1636 let db = GrafeoDB::new_in_memory();
1637 let session = db.session();
1638
1639 let id = session.create_node(&["Person"]);
1640
1641 assert!(session.node_exists(id));
1642 assert!(!session.node_exists(NodeId::new(9999)));
1643 }
1644
1645 #[test]
1646 fn test_edge_exists() {
1647 use grafeo_common::types::EdgeId;
1648
1649 let db = GrafeoDB::new_in_memory();
1650 let session = db.session();
1651
1652 let alice = session.create_node(&["Person"]);
1653 let bob = session.create_node(&["Person"]);
1654 let edge_id = session.create_edge(alice, bob, "KNOWS");
1655
1656 assert!(session.edge_exists(edge_id));
1657 assert!(!session.edge_exists(EdgeId::new(9999)));
1658 }
1659
1660 #[test]
1661 fn test_get_degree() {
1662 let db = GrafeoDB::new_in_memory();
1663 let session = db.session();
1664
1665 let alice = session.create_node(&["Person"]);
1666 let bob = session.create_node(&["Person"]);
1667 let carol = session.create_node(&["Person"]);
1668
1669 session.create_edge(alice, bob, "KNOWS");
1671 session.create_edge(alice, carol, "KNOWS");
1672 session.create_edge(bob, alice, "KNOWS");
1674
1675 let (out_degree, in_degree) = session.get_degree(alice);
1676 assert_eq!(out_degree, 2);
1677 assert_eq!(in_degree, 1);
1678
1679 let lonely = session.create_node(&["Person"]);
1681 let (out, in_deg) = session.get_degree(lonely);
1682 assert_eq!(out, 0);
1683 assert_eq!(in_deg, 0);
1684 }
1685
1686 #[test]
1687 fn test_get_nodes_batch() {
1688 let db = GrafeoDB::new_in_memory();
1689 let session = db.session();
1690
1691 let alice = session.create_node(&["Person"]);
1692 let bob = session.create_node(&["Person"]);
1693 let carol = session.create_node(&["Person"]);
1694
1695 let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1696 assert_eq!(nodes.len(), 3);
1697 assert!(nodes[0].is_some());
1698 assert!(nodes[1].is_some());
1699 assert!(nodes[2].is_some());
1700
1701 use grafeo_common::types::NodeId;
1703 let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1704 assert_eq!(nodes_with_missing.len(), 3);
1705 assert!(nodes_with_missing[0].is_some());
1706 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
1708 }
1709
1710 #[test]
1711 fn test_auto_commit_setting() {
1712 let db = GrafeoDB::new_in_memory();
1713 let mut session = db.session();
1714
1715 assert!(session.auto_commit());
1717
1718 session.set_auto_commit(false);
1719 assert!(!session.auto_commit());
1720
1721 session.set_auto_commit(true);
1722 assert!(session.auto_commit());
1723 }
1724
1725 #[test]
1726 fn test_transaction_double_begin_error() {
1727 let db = GrafeoDB::new_in_memory();
1728 let mut session = db.session();
1729
1730 session.begin_tx().unwrap();
1731 let result = session.begin_tx();
1732
1733 assert!(result.is_err());
1734 session.rollback().unwrap();
1736 }
1737
1738 #[test]
1739 fn test_commit_without_transaction_error() {
1740 let db = GrafeoDB::new_in_memory();
1741 let mut session = db.session();
1742
1743 let result = session.commit();
1744 assert!(result.is_err());
1745 }
1746
1747 #[test]
1748 fn test_rollback_without_transaction_error() {
1749 let db = GrafeoDB::new_in_memory();
1750 let mut session = db.session();
1751
1752 let result = session.rollback();
1753 assert!(result.is_err());
1754 }
1755
1756 #[test]
1757 fn test_create_edge_in_transaction() {
1758 let db = GrafeoDB::new_in_memory();
1759 let mut session = db.session();
1760
1761 let alice = session.create_node(&["Person"]);
1763 let bob = session.create_node(&["Person"]);
1764
1765 session.begin_tx().unwrap();
1767 let edge_id = session.create_edge(alice, bob, "KNOWS");
1768
1769 assert!(session.edge_exists(edge_id));
1771
1772 session.commit().unwrap();
1774
1775 assert!(session.edge_exists(edge_id));
1777 }
1778
1779 #[test]
1780 fn test_neighbors_empty_node() {
1781 let db = GrafeoDB::new_in_memory();
1782 let session = db.session();
1783
1784 let lonely = session.create_node(&["Person"]);
1785
1786 assert!(session.get_neighbors_outgoing(lonely).is_empty());
1787 assert!(session.get_neighbors_incoming(lonely).is_empty());
1788 assert!(
1789 session
1790 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1791 .is_empty()
1792 );
1793 }
1794 }
1795}