1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::{AdaptiveConfig, GraphModel};
19use crate::database::QueryResult;
20use crate::query::cache::QueryCache;
21use crate::transaction::TransactionManager;
22
23pub struct Session {
29 store: Arc<LpgStore>,
31 #[cfg(feature = "rdf")]
33 #[allow(dead_code)]
34 rdf_store: Arc<RdfStore>,
35 tx_manager: Arc<TransactionManager>,
37 query_cache: Arc<QueryCache>,
39 current_tx: Option<TxId>,
41 auto_commit: bool,
43 #[allow(dead_code)]
45 adaptive_config: AdaptiveConfig,
46 factorized_execution: bool,
48 graph_model: GraphModel,
50 query_timeout: Option<Duration>,
52 commit_counter: Arc<AtomicUsize>,
54 gc_interval: usize,
56}
57
58impl Session {
59 #[allow(dead_code)]
61 pub(crate) fn new(
62 store: Arc<LpgStore>,
63 tx_manager: Arc<TransactionManager>,
64 query_cache: Arc<QueryCache>,
65 ) -> Self {
66 Self {
67 store,
68 #[cfg(feature = "rdf")]
69 rdf_store: Arc::new(RdfStore::new()),
70 tx_manager,
71 query_cache,
72 current_tx: None,
73 auto_commit: true,
74 adaptive_config: AdaptiveConfig::default(),
75 factorized_execution: true,
76 graph_model: GraphModel::Lpg,
77 query_timeout: None,
78 commit_counter: Arc::new(AtomicUsize::new(0)),
79 gc_interval: 0,
80 }
81 }
82
83 #[allow(dead_code, clippy::too_many_arguments)]
85 pub(crate) fn with_adaptive(
86 store: Arc<LpgStore>,
87 tx_manager: Arc<TransactionManager>,
88 query_cache: Arc<QueryCache>,
89 adaptive_config: AdaptiveConfig,
90 factorized_execution: bool,
91 graph_model: GraphModel,
92 query_timeout: Option<Duration>,
93 commit_counter: Arc<AtomicUsize>,
94 gc_interval: usize,
95 ) -> Self {
96 Self {
97 store,
98 #[cfg(feature = "rdf")]
99 rdf_store: Arc::new(RdfStore::new()),
100 tx_manager,
101 query_cache,
102 current_tx: None,
103 auto_commit: true,
104 adaptive_config,
105 factorized_execution,
106 graph_model,
107 query_timeout,
108 commit_counter,
109 gc_interval,
110 }
111 }
112
113 #[cfg(feature = "rdf")]
115 #[allow(clippy::too_many_arguments)]
116 pub(crate) fn with_rdf_store_and_adaptive(
117 store: Arc<LpgStore>,
118 rdf_store: Arc<RdfStore>,
119 tx_manager: Arc<TransactionManager>,
120 query_cache: Arc<QueryCache>,
121 adaptive_config: AdaptiveConfig,
122 factorized_execution: bool,
123 graph_model: GraphModel,
124 query_timeout: Option<Duration>,
125 commit_counter: Arc<AtomicUsize>,
126 gc_interval: usize,
127 ) -> Self {
128 Self {
129 store,
130 rdf_store,
131 tx_manager,
132 query_cache,
133 current_tx: None,
134 auto_commit: true,
135 adaptive_config,
136 factorized_execution,
137 graph_model,
138 query_timeout,
139 commit_counter,
140 gc_interval,
141 }
142 }
143
144 #[must_use]
146 pub fn graph_model(&self) -> GraphModel {
147 self.graph_model
148 }
149
150 fn require_lpg(&self, language: &str) -> Result<()> {
152 if self.graph_model == GraphModel::Rdf {
153 return Err(grafeo_common::utils::error::Error::Internal(format!(
154 "This is an RDF database. {language} queries require an LPG database."
155 )));
156 }
157 Ok(())
158 }
159
160 #[cfg(feature = "gql")]
184 pub fn execute(&self, query: &str) -> Result<QueryResult> {
185 self.require_lpg("GQL")?;
186
187 use crate::query::{
188 Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
189 optimizer::Optimizer, processor::QueryLanguage,
190 };
191
192 let start_time = std::time::Instant::now();
193
194 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
196
197 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
199 cached_plan
201 } else {
202 let logical_plan = gql_translator::translate(query)?;
206
207 let mut binder = Binder::new();
209 let _binding_context = binder.bind(&logical_plan)?;
210
211 let optimizer = Optimizer::from_store(&self.store);
213 let plan = optimizer.optimize(logical_plan)?;
214
215 self.query_cache.put_optimized(cache_key, plan.clone());
217
218 plan
219 };
220
221 let (viewing_epoch, tx_id) = self.get_transaction_context();
223
224 let planner = Planner::with_context(
227 Arc::clone(&self.store),
228 Arc::clone(&self.tx_manager),
229 tx_id,
230 viewing_epoch,
231 )
232 .with_factorized_execution(self.factorized_execution);
233 let mut physical_plan = planner.plan(&optimized_plan)?;
234
235 let executor = Executor::with_columns(physical_plan.columns.clone())
237 .with_deadline(self.query_deadline());
238 let mut result = executor.execute(physical_plan.operator.as_mut())?;
239
240 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
242 let rows_scanned = result.rows.len() as u64;
243 result.execution_time_ms = Some(elapsed_ms);
244 result.rows_scanned = Some(rows_scanned);
245
246 Ok(result)
247 }
248
249 #[cfg(feature = "gql")]
255 pub fn execute_with_params(
256 &self,
257 query: &str,
258 params: std::collections::HashMap<String, Value>,
259 ) -> Result<QueryResult> {
260 self.require_lpg("GQL")?;
261
262 use crate::query::processor::{QueryLanguage, QueryProcessor};
263
264 let (viewing_epoch, tx_id) = self.get_transaction_context();
266
267 let processor =
269 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
270
271 let processor = if let Some(tx_id) = tx_id {
273 processor.with_tx_context(viewing_epoch, tx_id)
274 } else {
275 processor
276 };
277
278 processor.process(query, QueryLanguage::Gql, Some(¶ms))
279 }
280
281 #[cfg(not(any(feature = "gql", feature = "cypher")))]
287 pub fn execute_with_params(
288 &self,
289 _query: &str,
290 _params: std::collections::HashMap<String, Value>,
291 ) -> Result<QueryResult> {
292 Err(grafeo_common::utils::error::Error::Internal(
293 "No query language enabled".to_string(),
294 ))
295 }
296
297 #[cfg(not(any(feature = "gql", feature = "cypher")))]
303 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
304 Err(grafeo_common::utils::error::Error::Internal(
305 "No query language enabled".to_string(),
306 ))
307 }
308
309 #[cfg(feature = "cypher")]
315 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
316 use crate::query::{
317 Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
318 optimizer::Optimizer, processor::QueryLanguage,
319 };
320
321 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
323
324 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
326 cached_plan
327 } else {
328 let logical_plan = cypher_translator::translate(query)?;
330
331 let mut binder = Binder::new();
333 let _binding_context = binder.bind(&logical_plan)?;
334
335 let optimizer = Optimizer::from_store(&self.store);
337 let plan = optimizer.optimize(logical_plan)?;
338
339 self.query_cache.put_optimized(cache_key, plan.clone());
341
342 plan
343 };
344
345 let (viewing_epoch, tx_id) = self.get_transaction_context();
347
348 let planner = Planner::with_context(
350 Arc::clone(&self.store),
351 Arc::clone(&self.tx_manager),
352 tx_id,
353 viewing_epoch,
354 )
355 .with_factorized_execution(self.factorized_execution);
356 let mut physical_plan = planner.plan(&optimized_plan)?;
357
358 let executor = Executor::with_columns(physical_plan.columns.clone())
360 .with_deadline(self.query_deadline());
361 executor.execute(physical_plan.operator.as_mut())
362 }
363
364 #[cfg(feature = "gremlin")]
385 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
386 use crate::query::{
387 Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
388 };
389
390 let logical_plan = gremlin_translator::translate(query)?;
392
393 let mut binder = Binder::new();
395 let _binding_context = binder.bind(&logical_plan)?;
396
397 let optimizer = Optimizer::from_store(&self.store);
399 let optimized_plan = optimizer.optimize(logical_plan)?;
400
401 let (viewing_epoch, tx_id) = self.get_transaction_context();
403
404 let planner = Planner::with_context(
406 Arc::clone(&self.store),
407 Arc::clone(&self.tx_manager),
408 tx_id,
409 viewing_epoch,
410 )
411 .with_factorized_execution(self.factorized_execution);
412 let mut physical_plan = planner.plan(&optimized_plan)?;
413
414 let executor = Executor::with_columns(physical_plan.columns.clone())
416 .with_deadline(self.query_deadline());
417 executor.execute(physical_plan.operator.as_mut())
418 }
419
420 #[cfg(feature = "gremlin")]
426 pub fn execute_gremlin_with_params(
427 &self,
428 query: &str,
429 params: std::collections::HashMap<String, Value>,
430 ) -> Result<QueryResult> {
431 use crate::query::processor::{QueryLanguage, QueryProcessor};
432
433 let (viewing_epoch, tx_id) = self.get_transaction_context();
435
436 let processor =
438 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
439
440 let processor = if let Some(tx_id) = tx_id {
442 processor.with_tx_context(viewing_epoch, tx_id)
443 } else {
444 processor
445 };
446
447 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
448 }
449
450 #[cfg(feature = "graphql")]
471 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
472 use crate::query::{
473 Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
474 };
475
476 let logical_plan = graphql_translator::translate(query)?;
478
479 let mut binder = Binder::new();
481 let _binding_context = binder.bind(&logical_plan)?;
482
483 let optimizer = Optimizer::from_store(&self.store);
485 let optimized_plan = optimizer.optimize(logical_plan)?;
486
487 let (viewing_epoch, tx_id) = self.get_transaction_context();
489
490 let planner = Planner::with_context(
492 Arc::clone(&self.store),
493 Arc::clone(&self.tx_manager),
494 tx_id,
495 viewing_epoch,
496 )
497 .with_factorized_execution(self.factorized_execution);
498 let mut physical_plan = planner.plan(&optimized_plan)?;
499
500 let executor = Executor::with_columns(physical_plan.columns.clone())
502 .with_deadline(self.query_deadline());
503 executor.execute(physical_plan.operator.as_mut())
504 }
505
506 #[cfg(feature = "graphql")]
512 pub fn execute_graphql_with_params(
513 &self,
514 query: &str,
515 params: std::collections::HashMap<String, Value>,
516 ) -> Result<QueryResult> {
517 use crate::query::processor::{QueryLanguage, QueryProcessor};
518
519 let (viewing_epoch, tx_id) = self.get_transaction_context();
521
522 let processor =
524 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
525
526 let processor = if let Some(tx_id) = tx_id {
528 processor.with_tx_context(viewing_epoch, tx_id)
529 } else {
530 processor
531 };
532
533 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
534 }
535
536 #[cfg(feature = "sql-pgq")]
558 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
559 use crate::query::{
560 Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
561 plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
562 };
563
564 let logical_plan = sql_pgq_translator::translate(query)?;
566
567 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
569 return Ok(QueryResult {
570 columns: vec!["status".into()],
571 column_types: vec![grafeo_common::types::LogicalType::String],
572 rows: vec![vec![Value::from(format!(
573 "Property graph '{}' created ({} node tables, {} edge tables)",
574 cpg.name,
575 cpg.node_tables.len(),
576 cpg.edge_tables.len()
577 ))]],
578 execution_time_ms: None,
579 rows_scanned: None,
580 });
581 }
582
583 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
585
586 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
588 cached_plan
589 } else {
590 let mut binder = Binder::new();
592 let _binding_context = binder.bind(&logical_plan)?;
593
594 let optimizer = Optimizer::from_store(&self.store);
596 let plan = optimizer.optimize(logical_plan)?;
597
598 self.query_cache.put_optimized(cache_key, plan.clone());
600
601 plan
602 };
603
604 let (viewing_epoch, tx_id) = self.get_transaction_context();
606
607 let planner = Planner::with_context(
609 Arc::clone(&self.store),
610 Arc::clone(&self.tx_manager),
611 tx_id,
612 viewing_epoch,
613 )
614 .with_factorized_execution(self.factorized_execution);
615 let mut physical_plan = planner.plan(&optimized_plan)?;
616
617 let executor = Executor::with_columns(physical_plan.columns.clone())
619 .with_deadline(self.query_deadline());
620 executor.execute(physical_plan.operator.as_mut())
621 }
622
623 #[cfg(feature = "sql-pgq")]
629 pub fn execute_sql_with_params(
630 &self,
631 query: &str,
632 params: std::collections::HashMap<String, Value>,
633 ) -> Result<QueryResult> {
634 use crate::query::processor::{QueryLanguage, QueryProcessor};
635
636 let (viewing_epoch, tx_id) = self.get_transaction_context();
638
639 let processor =
641 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
642
643 let processor = if let Some(tx_id) = tx_id {
645 processor.with_tx_context(viewing_epoch, tx_id)
646 } else {
647 processor
648 };
649
650 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
651 }
652
653 #[cfg(all(feature = "sparql", feature = "rdf"))]
659 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
660 use crate::query::{
661 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
662 };
663
664 let logical_plan = sparql_translator::translate(query)?;
666
667 let optimizer = Optimizer::from_store(&self.store);
669 let optimized_plan = optimizer.optimize(logical_plan)?;
670
671 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
673 let mut physical_plan = planner.plan(&optimized_plan)?;
674
675 let executor = Executor::with_columns(physical_plan.columns.clone())
677 .with_deadline(self.query_deadline());
678 executor.execute(physical_plan.operator.as_mut())
679 }
680
681 #[cfg(all(feature = "sparql", feature = "rdf"))]
687 pub fn execute_sparql_with_params(
688 &self,
689 query: &str,
690 _params: std::collections::HashMap<String, Value>,
691 ) -> Result<QueryResult> {
692 self.execute_sparql(query)
695 }
696
697 pub fn begin_tx(&mut self) -> Result<()> {
717 if self.current_tx.is_some() {
718 return Err(grafeo_common::utils::error::Error::Transaction(
719 grafeo_common::utils::error::TransactionError::InvalidState(
720 "Transaction already active".to_string(),
721 ),
722 ));
723 }
724
725 let tx_id = self.tx_manager.begin();
726 self.current_tx = Some(tx_id);
727 Ok(())
728 }
729
730 pub fn begin_tx_with_isolation(
738 &mut self,
739 isolation_level: crate::transaction::IsolationLevel,
740 ) -> Result<()> {
741 if self.current_tx.is_some() {
742 return Err(grafeo_common::utils::error::Error::Transaction(
743 grafeo_common::utils::error::TransactionError::InvalidState(
744 "Transaction already active".to_string(),
745 ),
746 ));
747 }
748
749 let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
750 self.current_tx = Some(tx_id);
751 Ok(())
752 }
753
754 pub fn commit(&mut self) -> Result<()> {
762 let tx_id = self.current_tx.take().ok_or_else(|| {
763 grafeo_common::utils::error::Error::Transaction(
764 grafeo_common::utils::error::TransactionError::InvalidState(
765 "No active transaction".to_string(),
766 ),
767 )
768 })?;
769
770 #[cfg(feature = "rdf")]
772 self.rdf_store.commit_tx(tx_id);
773
774 self.tx_manager.commit(tx_id)?;
775
776 if self.gc_interval > 0 {
778 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
779 if count.is_multiple_of(self.gc_interval) {
780 let min_epoch = self.tx_manager.min_active_epoch();
781 self.store.gc_versions(min_epoch);
782 self.tx_manager.gc();
783 }
784 }
785
786 Ok(())
787 }
788
789 pub fn rollback(&mut self) -> Result<()> {
810 let tx_id = self.current_tx.take().ok_or_else(|| {
811 grafeo_common::utils::error::Error::Transaction(
812 grafeo_common::utils::error::TransactionError::InvalidState(
813 "No active transaction".to_string(),
814 ),
815 )
816 })?;
817
818 self.store.discard_uncommitted_versions(tx_id);
820
821 #[cfg(feature = "rdf")]
823 self.rdf_store.rollback_tx(tx_id);
824
825 self.tx_manager.abort(tx_id)
827 }
828
829 #[must_use]
831 pub fn in_transaction(&self) -> bool {
832 self.current_tx.is_some()
833 }
834
835 pub fn set_auto_commit(&mut self, auto_commit: bool) {
837 self.auto_commit = auto_commit;
838 }
839
840 #[must_use]
842 pub fn auto_commit(&self) -> bool {
843 self.auto_commit
844 }
845
846 #[must_use]
848 fn query_deadline(&self) -> Option<Instant> {
849 self.query_timeout.map(|d| Instant::now() + d)
850 }
851
852 #[must_use]
858 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
859 if let Some(tx_id) = self.current_tx {
860 let epoch = self
862 .tx_manager
863 .start_epoch(tx_id)
864 .unwrap_or_else(|| self.tx_manager.current_epoch());
865 (epoch, Some(tx_id))
866 } else {
867 (self.tx_manager.current_epoch(), None)
869 }
870 }
871
872 pub fn create_node(&self, labels: &[&str]) -> NodeId {
877 let (epoch, tx_id) = self.get_transaction_context();
878 self.store
879 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
880 }
881
882 pub fn create_node_with_props<'a>(
886 &self,
887 labels: &[&str],
888 properties: impl IntoIterator<Item = (&'a str, Value)>,
889 ) -> NodeId {
890 let (epoch, tx_id) = self.get_transaction_context();
891 self.store.create_node_with_props_versioned(
892 labels,
893 properties.into_iter().map(|(k, v)| (k, v)),
894 epoch,
895 tx_id.unwrap_or(TxId::SYSTEM),
896 )
897 }
898
899 pub fn create_edge(
904 &self,
905 src: NodeId,
906 dst: NodeId,
907 edge_type: &str,
908 ) -> grafeo_common::types::EdgeId {
909 let (epoch, tx_id) = self.get_transaction_context();
910 self.store
911 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
912 }
913
914 #[must_use]
940 pub fn get_node(&self, id: NodeId) -> Option<Node> {
941 let (epoch, tx_id) = self.get_transaction_context();
942 self.store
943 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
944 }
945
946 #[must_use]
967 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
968 self.get_node(id)
969 .and_then(|node| node.get_property(key).cloned())
970 }
971
972 #[must_use]
979 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
980 let (epoch, tx_id) = self.get_transaction_context();
981 self.store
982 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
983 }
984
985 #[must_use]
1009 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1010 self.store.edges_from(node, Direction::Outgoing).collect()
1011 }
1012
1013 #[must_use]
1022 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1023 self.store.edges_from(node, Direction::Incoming).collect()
1024 }
1025
1026 #[must_use]
1034 pub fn get_neighbors_outgoing_by_type(
1035 &self,
1036 node: NodeId,
1037 edge_type: &str,
1038 ) -> Vec<(NodeId, EdgeId)> {
1039 self.store
1040 .edges_from(node, Direction::Outgoing)
1041 .filter(|(_, edge_id)| {
1042 self.get_edge(*edge_id)
1043 .is_some_and(|e| e.edge_type.as_str() == edge_type)
1044 })
1045 .collect()
1046 }
1047
1048 #[must_use]
1055 pub fn node_exists(&self, id: NodeId) -> bool {
1056 self.get_node(id).is_some()
1057 }
1058
1059 #[must_use]
1061 pub fn edge_exists(&self, id: EdgeId) -> bool {
1062 self.get_edge(id).is_some()
1063 }
1064
1065 #[must_use]
1069 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1070 let out = self.store.out_degree(node);
1071 let in_degree = self.store.in_degree(node);
1072 (out, in_degree)
1073 }
1074
1075 #[must_use]
1085 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1086 let (epoch, tx_id) = self.get_transaction_context();
1087 let tx = tx_id.unwrap_or(TxId::SYSTEM);
1088 ids.iter()
1089 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1090 .collect()
1091 }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096 use crate::database::GrafeoDB;
1097
1098 #[test]
1099 fn test_session_create_node() {
1100 let db = GrafeoDB::new_in_memory();
1101 let session = db.session();
1102
1103 let id = session.create_node(&["Person"]);
1104 assert!(id.is_valid());
1105 assert_eq!(db.node_count(), 1);
1106 }
1107
1108 #[test]
1109 fn test_session_transaction() {
1110 let db = GrafeoDB::new_in_memory();
1111 let mut session = db.session();
1112
1113 assert!(!session.in_transaction());
1114
1115 session.begin_tx().unwrap();
1116 assert!(session.in_transaction());
1117
1118 session.commit().unwrap();
1119 assert!(!session.in_transaction());
1120 }
1121
1122 #[test]
1123 fn test_session_transaction_context() {
1124 let db = GrafeoDB::new_in_memory();
1125 let mut session = db.session();
1126
1127 let (_epoch1, tx_id1) = session.get_transaction_context();
1129 assert!(tx_id1.is_none());
1130
1131 session.begin_tx().unwrap();
1133 let (epoch2, tx_id2) = session.get_transaction_context();
1134 assert!(tx_id2.is_some());
1135 let _ = epoch2; session.commit().unwrap();
1140 let (epoch3, tx_id3) = session.get_transaction_context();
1141 assert!(tx_id3.is_none());
1142 assert!(epoch3.as_u64() >= epoch2.as_u64());
1144 }
1145
1146 #[test]
1147 fn test_session_rollback() {
1148 let db = GrafeoDB::new_in_memory();
1149 let mut session = db.session();
1150
1151 session.begin_tx().unwrap();
1152 session.rollback().unwrap();
1153 assert!(!session.in_transaction());
1154 }
1155
1156 #[test]
1157 fn test_session_rollback_discards_versions() {
1158 use grafeo_common::types::TxId;
1159
1160 let db = GrafeoDB::new_in_memory();
1161
1162 let node_before = db.store().create_node(&["Person"]);
1164 assert!(node_before.is_valid());
1165 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1166
1167 let mut session = db.session();
1169 session.begin_tx().unwrap();
1170 let tx_id = session.current_tx.unwrap();
1171
1172 let epoch = db.store().current_epoch();
1174 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1175 assert!(node_in_tx.is_valid());
1176
1177 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1179
1180 session.rollback().unwrap();
1182 assert!(!session.in_transaction());
1183
1184 let count_after = db.node_count();
1187 assert_eq!(
1188 count_after, 1,
1189 "Rollback should discard uncommitted node, but got {count_after}"
1190 );
1191
1192 let current_epoch = db.store().current_epoch();
1194 assert!(
1195 db.store()
1196 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1197 .is_some(),
1198 "Original node should still exist"
1199 );
1200
1201 assert!(
1203 db.store()
1204 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1205 .is_none(),
1206 "Transaction node should be gone"
1207 );
1208 }
1209
1210 #[test]
1211 fn test_session_create_node_in_transaction() {
1212 let db = GrafeoDB::new_in_memory();
1214
1215 let node_before = db.create_node(&["Person"]);
1217 assert!(node_before.is_valid());
1218 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1219
1220 let mut session = db.session();
1222 session.begin_tx().unwrap();
1223
1224 let node_in_tx = session.create_node(&["Person"]);
1226 assert!(node_in_tx.is_valid());
1227
1228 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1230
1231 session.rollback().unwrap();
1233
1234 let count_after = db.node_count();
1236 assert_eq!(
1237 count_after, 1,
1238 "Rollback should discard node created via session.create_node(), but got {count_after}"
1239 );
1240 }
1241
1242 #[test]
1243 fn test_session_create_node_with_props_in_transaction() {
1244 use grafeo_common::types::Value;
1245
1246 let db = GrafeoDB::new_in_memory();
1248
1249 db.create_node(&["Person"]);
1251 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1252
1253 let mut session = db.session();
1255 session.begin_tx().unwrap();
1256
1257 let node_in_tx =
1258 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1259 assert!(node_in_tx.is_valid());
1260
1261 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1263
1264 session.rollback().unwrap();
1266
1267 let count_after = db.node_count();
1269 assert_eq!(
1270 count_after, 1,
1271 "Rollback should discard node created via session.create_node_with_props()"
1272 );
1273 }
1274
1275 #[cfg(feature = "gql")]
1276 mod gql_tests {
1277 use super::*;
1278
1279 #[test]
1280 fn test_gql_query_execution() {
1281 let db = GrafeoDB::new_in_memory();
1282 let session = db.session();
1283
1284 session.create_node(&["Person"]);
1286 session.create_node(&["Person"]);
1287 session.create_node(&["Animal"]);
1288
1289 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1291
1292 assert_eq!(result.row_count(), 2);
1294 assert_eq!(result.column_count(), 1);
1295 assert_eq!(result.columns[0], "n");
1296 }
1297
1298 #[test]
1299 fn test_gql_empty_result() {
1300 let db = GrafeoDB::new_in_memory();
1301 let session = db.session();
1302
1303 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1305
1306 assert_eq!(result.row_count(), 0);
1307 }
1308
1309 #[test]
1310 fn test_gql_parse_error() {
1311 let db = GrafeoDB::new_in_memory();
1312 let session = db.session();
1313
1314 let result = session.execute("MATCH (n RETURN n");
1316
1317 assert!(result.is_err());
1318 }
1319
1320 #[test]
1321 fn test_gql_relationship_traversal() {
1322 let db = GrafeoDB::new_in_memory();
1323 let session = db.session();
1324
1325 let alice = session.create_node(&["Person"]);
1327 let bob = session.create_node(&["Person"]);
1328 let charlie = session.create_node(&["Person"]);
1329
1330 session.create_edge(alice, bob, "KNOWS");
1331 session.create_edge(alice, charlie, "KNOWS");
1332
1333 let result = session
1335 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1336 .unwrap();
1337
1338 assert_eq!(result.row_count(), 2);
1340 assert_eq!(result.column_count(), 2);
1341 assert_eq!(result.columns[0], "a");
1342 assert_eq!(result.columns[1], "b");
1343 }
1344
1345 #[test]
1346 fn test_gql_relationship_with_type_filter() {
1347 let db = GrafeoDB::new_in_memory();
1348 let session = db.session();
1349
1350 let alice = session.create_node(&["Person"]);
1352 let bob = session.create_node(&["Person"]);
1353 let charlie = session.create_node(&["Person"]);
1354
1355 session.create_edge(alice, bob, "KNOWS");
1356 session.create_edge(alice, charlie, "WORKS_WITH");
1357
1358 let result = session
1360 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1361 .unwrap();
1362
1363 assert_eq!(result.row_count(), 1);
1365 }
1366
1367 #[test]
1368 fn test_gql_semantic_error_undefined_variable() {
1369 let db = GrafeoDB::new_in_memory();
1370 let session = db.session();
1371
1372 let result = session.execute("MATCH (n:Person) RETURN x");
1374
1375 assert!(result.is_err());
1377 let Err(err) = result else {
1378 panic!("Expected error")
1379 };
1380 assert!(
1381 err.to_string().contains("Undefined variable"),
1382 "Expected undefined variable error, got: {}",
1383 err
1384 );
1385 }
1386
1387 #[test]
1388 fn test_gql_where_clause_property_filter() {
1389 use grafeo_common::types::Value;
1390
1391 let db = GrafeoDB::new_in_memory();
1392 let session = db.session();
1393
1394 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1396 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1397 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1398
1399 let result = session
1401 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1402 .unwrap();
1403
1404 assert_eq!(result.row_count(), 2);
1406 }
1407
1408 #[test]
1409 fn test_gql_where_clause_equality() {
1410 use grafeo_common::types::Value;
1411
1412 let db = GrafeoDB::new_in_memory();
1413 let session = db.session();
1414
1415 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1417 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1418 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1419
1420 let result = session
1422 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1423 .unwrap();
1424
1425 assert_eq!(result.row_count(), 2);
1427 }
1428
1429 #[test]
1430 fn test_gql_return_property_access() {
1431 use grafeo_common::types::Value;
1432
1433 let db = GrafeoDB::new_in_memory();
1434 let session = db.session();
1435
1436 session.create_node_with_props(
1438 &["Person"],
1439 [
1440 ("name", Value::String("Alice".into())),
1441 ("age", Value::Int64(30)),
1442 ],
1443 );
1444 session.create_node_with_props(
1445 &["Person"],
1446 [
1447 ("name", Value::String("Bob".into())),
1448 ("age", Value::Int64(25)),
1449 ],
1450 );
1451
1452 let result = session
1454 .execute("MATCH (n:Person) RETURN n.name, n.age")
1455 .unwrap();
1456
1457 assert_eq!(result.row_count(), 2);
1459 assert_eq!(result.column_count(), 2);
1460 assert_eq!(result.columns[0], "n.name");
1461 assert_eq!(result.columns[1], "n.age");
1462
1463 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1465 assert!(names.contains(&&Value::String("Alice".into())));
1466 assert!(names.contains(&&Value::String("Bob".into())));
1467 }
1468
1469 #[test]
1470 fn test_gql_return_mixed_expressions() {
1471 use grafeo_common::types::Value;
1472
1473 let db = GrafeoDB::new_in_memory();
1474 let session = db.session();
1475
1476 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1478
1479 let result = session
1481 .execute("MATCH (n:Person) RETURN n, n.name")
1482 .unwrap();
1483
1484 assert_eq!(result.row_count(), 1);
1485 assert_eq!(result.column_count(), 2);
1486 assert_eq!(result.columns[0], "n");
1487 assert_eq!(result.columns[1], "n.name");
1488
1489 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1491 }
1492 }
1493
1494 #[cfg(feature = "cypher")]
1495 mod cypher_tests {
1496 use super::*;
1497
1498 #[test]
1499 fn test_cypher_query_execution() {
1500 let db = GrafeoDB::new_in_memory();
1501 let session = db.session();
1502
1503 session.create_node(&["Person"]);
1505 session.create_node(&["Person"]);
1506 session.create_node(&["Animal"]);
1507
1508 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1510
1511 assert_eq!(result.row_count(), 2);
1513 assert_eq!(result.column_count(), 1);
1514 assert_eq!(result.columns[0], "n");
1515 }
1516
1517 #[test]
1518 fn test_cypher_empty_result() {
1519 let db = GrafeoDB::new_in_memory();
1520 let session = db.session();
1521
1522 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1524
1525 assert_eq!(result.row_count(), 0);
1526 }
1527
1528 #[test]
1529 fn test_cypher_parse_error() {
1530 let db = GrafeoDB::new_in_memory();
1531 let session = db.session();
1532
1533 let result = session.execute_cypher("MATCH (n RETURN n");
1535
1536 assert!(result.is_err());
1537 }
1538 }
1539
1540 mod direct_lookup_tests {
1543 use super::*;
1544 use grafeo_common::types::Value;
1545
1546 #[test]
1547 fn test_get_node() {
1548 let db = GrafeoDB::new_in_memory();
1549 let session = db.session();
1550
1551 let id = session.create_node(&["Person"]);
1552 let node = session.get_node(id);
1553
1554 assert!(node.is_some());
1555 let node = node.unwrap();
1556 assert_eq!(node.id, id);
1557 }
1558
1559 #[test]
1560 fn test_get_node_not_found() {
1561 use grafeo_common::types::NodeId;
1562
1563 let db = GrafeoDB::new_in_memory();
1564 let session = db.session();
1565
1566 let node = session.get_node(NodeId::new(9999));
1568 assert!(node.is_none());
1569 }
1570
1571 #[test]
1572 fn test_get_node_property() {
1573 let db = GrafeoDB::new_in_memory();
1574 let session = db.session();
1575
1576 let id = session
1577 .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1578
1579 let name = session.get_node_property(id, "name");
1580 assert_eq!(name, Some(Value::String("Alice".into())));
1581
1582 let missing = session.get_node_property(id, "missing");
1584 assert!(missing.is_none());
1585 }
1586
1587 #[test]
1588 fn test_get_edge() {
1589 let db = GrafeoDB::new_in_memory();
1590 let session = db.session();
1591
1592 let alice = session.create_node(&["Person"]);
1593 let bob = session.create_node(&["Person"]);
1594 let edge_id = session.create_edge(alice, bob, "KNOWS");
1595
1596 let edge = session.get_edge(edge_id);
1597 assert!(edge.is_some());
1598 let edge = edge.unwrap();
1599 assert_eq!(edge.id, edge_id);
1600 assert_eq!(edge.src, alice);
1601 assert_eq!(edge.dst, bob);
1602 }
1603
1604 #[test]
1605 fn test_get_edge_not_found() {
1606 use grafeo_common::types::EdgeId;
1607
1608 let db = GrafeoDB::new_in_memory();
1609 let session = db.session();
1610
1611 let edge = session.get_edge(EdgeId::new(9999));
1612 assert!(edge.is_none());
1613 }
1614
1615 #[test]
1616 fn test_get_neighbors_outgoing() {
1617 let db = GrafeoDB::new_in_memory();
1618 let session = db.session();
1619
1620 let alice = session.create_node(&["Person"]);
1621 let bob = session.create_node(&["Person"]);
1622 let carol = session.create_node(&["Person"]);
1623
1624 session.create_edge(alice, bob, "KNOWS");
1625 session.create_edge(alice, carol, "KNOWS");
1626
1627 let neighbors = session.get_neighbors_outgoing(alice);
1628 assert_eq!(neighbors.len(), 2);
1629
1630 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1631 assert!(neighbor_ids.contains(&bob));
1632 assert!(neighbor_ids.contains(&carol));
1633 }
1634
1635 #[test]
1636 fn test_get_neighbors_incoming() {
1637 let db = GrafeoDB::new_in_memory();
1638 let session = db.session();
1639
1640 let alice = session.create_node(&["Person"]);
1641 let bob = session.create_node(&["Person"]);
1642 let carol = session.create_node(&["Person"]);
1643
1644 session.create_edge(bob, alice, "KNOWS");
1645 session.create_edge(carol, alice, "KNOWS");
1646
1647 let neighbors = session.get_neighbors_incoming(alice);
1648 assert_eq!(neighbors.len(), 2);
1649
1650 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1651 assert!(neighbor_ids.contains(&bob));
1652 assert!(neighbor_ids.contains(&carol));
1653 }
1654
1655 #[test]
1656 fn test_get_neighbors_outgoing_by_type() {
1657 let db = GrafeoDB::new_in_memory();
1658 let session = db.session();
1659
1660 let alice = session.create_node(&["Person"]);
1661 let bob = session.create_node(&["Person"]);
1662 let company = session.create_node(&["Company"]);
1663
1664 session.create_edge(alice, bob, "KNOWS");
1665 session.create_edge(alice, company, "WORKS_AT");
1666
1667 let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1668 assert_eq!(knows_neighbors.len(), 1);
1669 assert_eq!(knows_neighbors[0].0, bob);
1670
1671 let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1672 assert_eq!(works_neighbors.len(), 1);
1673 assert_eq!(works_neighbors[0].0, company);
1674
1675 let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1677 assert!(no_neighbors.is_empty());
1678 }
1679
1680 #[test]
1681 fn test_node_exists() {
1682 use grafeo_common::types::NodeId;
1683
1684 let db = GrafeoDB::new_in_memory();
1685 let session = db.session();
1686
1687 let id = session.create_node(&["Person"]);
1688
1689 assert!(session.node_exists(id));
1690 assert!(!session.node_exists(NodeId::new(9999)));
1691 }
1692
1693 #[test]
1694 fn test_edge_exists() {
1695 use grafeo_common::types::EdgeId;
1696
1697 let db = GrafeoDB::new_in_memory();
1698 let session = db.session();
1699
1700 let alice = session.create_node(&["Person"]);
1701 let bob = session.create_node(&["Person"]);
1702 let edge_id = session.create_edge(alice, bob, "KNOWS");
1703
1704 assert!(session.edge_exists(edge_id));
1705 assert!(!session.edge_exists(EdgeId::new(9999)));
1706 }
1707
1708 #[test]
1709 fn test_get_degree() {
1710 let db = GrafeoDB::new_in_memory();
1711 let session = db.session();
1712
1713 let alice = session.create_node(&["Person"]);
1714 let bob = session.create_node(&["Person"]);
1715 let carol = session.create_node(&["Person"]);
1716
1717 session.create_edge(alice, bob, "KNOWS");
1719 session.create_edge(alice, carol, "KNOWS");
1720 session.create_edge(bob, alice, "KNOWS");
1722
1723 let (out_degree, in_degree) = session.get_degree(alice);
1724 assert_eq!(out_degree, 2);
1725 assert_eq!(in_degree, 1);
1726
1727 let lonely = session.create_node(&["Person"]);
1729 let (out, in_deg) = session.get_degree(lonely);
1730 assert_eq!(out, 0);
1731 assert_eq!(in_deg, 0);
1732 }
1733
1734 #[test]
1735 fn test_get_nodes_batch() {
1736 let db = GrafeoDB::new_in_memory();
1737 let session = db.session();
1738
1739 let alice = session.create_node(&["Person"]);
1740 let bob = session.create_node(&["Person"]);
1741 let carol = session.create_node(&["Person"]);
1742
1743 let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1744 assert_eq!(nodes.len(), 3);
1745 assert!(nodes[0].is_some());
1746 assert!(nodes[1].is_some());
1747 assert!(nodes[2].is_some());
1748
1749 use grafeo_common::types::NodeId;
1751 let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1752 assert_eq!(nodes_with_missing.len(), 3);
1753 assert!(nodes_with_missing[0].is_some());
1754 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
1756 }
1757
1758 #[test]
1759 fn test_auto_commit_setting() {
1760 let db = GrafeoDB::new_in_memory();
1761 let mut session = db.session();
1762
1763 assert!(session.auto_commit());
1765
1766 session.set_auto_commit(false);
1767 assert!(!session.auto_commit());
1768
1769 session.set_auto_commit(true);
1770 assert!(session.auto_commit());
1771 }
1772
1773 #[test]
1774 fn test_transaction_double_begin_error() {
1775 let db = GrafeoDB::new_in_memory();
1776 let mut session = db.session();
1777
1778 session.begin_tx().unwrap();
1779 let result = session.begin_tx();
1780
1781 assert!(result.is_err());
1782 session.rollback().unwrap();
1784 }
1785
1786 #[test]
1787 fn test_commit_without_transaction_error() {
1788 let db = GrafeoDB::new_in_memory();
1789 let mut session = db.session();
1790
1791 let result = session.commit();
1792 assert!(result.is_err());
1793 }
1794
1795 #[test]
1796 fn test_rollback_without_transaction_error() {
1797 let db = GrafeoDB::new_in_memory();
1798 let mut session = db.session();
1799
1800 let result = session.rollback();
1801 assert!(result.is_err());
1802 }
1803
1804 #[test]
1805 fn test_create_edge_in_transaction() {
1806 let db = GrafeoDB::new_in_memory();
1807 let mut session = db.session();
1808
1809 let alice = session.create_node(&["Person"]);
1811 let bob = session.create_node(&["Person"]);
1812
1813 session.begin_tx().unwrap();
1815 let edge_id = session.create_edge(alice, bob, "KNOWS");
1816
1817 assert!(session.edge_exists(edge_id));
1819
1820 session.commit().unwrap();
1822
1823 assert!(session.edge_exists(edge_id));
1825 }
1826
1827 #[test]
1828 fn test_neighbors_empty_node() {
1829 let db = GrafeoDB::new_in_memory();
1830 let session = db.session();
1831
1832 let lonely = session.create_node(&["Person"]);
1833
1834 assert!(session.get_neighbors_outgoing(lonely).is_empty());
1835 assert!(session.get_neighbors_incoming(lonely).is_empty());
1836 assert!(
1837 session
1838 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1839 .is_empty()
1840 );
1841 }
1842 }
1843}