1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::{AdaptiveConfig, GraphModel};
19use crate::database::QueryResult;
20use crate::query::cache::QueryCache;
21use crate::transaction::TransactionManager;
22
23pub struct Session {
29 store: Arc<LpgStore>,
31 #[cfg(feature = "rdf")]
33 #[allow(dead_code)]
34 rdf_store: Arc<RdfStore>,
35 tx_manager: Arc<TransactionManager>,
37 query_cache: Arc<QueryCache>,
39 current_tx: Option<TxId>,
41 auto_commit: bool,
43 #[allow(dead_code)]
45 adaptive_config: AdaptiveConfig,
46 factorized_execution: bool,
48 graph_model: GraphModel,
50 query_timeout: Option<Duration>,
52 commit_counter: Arc<AtomicUsize>,
54 gc_interval: usize,
56 #[cfg(feature = "cdc")]
58 cdc_log: Arc<crate::cdc::CdcLog>,
59}
60
61impl Session {
62 #[allow(dead_code, clippy::too_many_arguments)]
64 pub(crate) fn with_adaptive(
65 store: Arc<LpgStore>,
66 tx_manager: Arc<TransactionManager>,
67 query_cache: Arc<QueryCache>,
68 adaptive_config: AdaptiveConfig,
69 factorized_execution: bool,
70 graph_model: GraphModel,
71 query_timeout: Option<Duration>,
72 commit_counter: Arc<AtomicUsize>,
73 gc_interval: usize,
74 ) -> Self {
75 Self {
76 store,
77 #[cfg(feature = "rdf")]
78 rdf_store: Arc::new(RdfStore::new()),
79 tx_manager,
80 query_cache,
81 current_tx: None,
82 auto_commit: true,
83 adaptive_config,
84 factorized_execution,
85 graph_model,
86 query_timeout,
87 commit_counter,
88 gc_interval,
89 #[cfg(feature = "cdc")]
90 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
91 }
92 }
93
94 #[cfg(feature = "cdc")]
96 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
97 self.cdc_log = cdc_log;
98 }
99
100 #[cfg(feature = "rdf")]
102 #[allow(clippy::too_many_arguments)]
103 pub(crate) fn with_rdf_store_and_adaptive(
104 store: Arc<LpgStore>,
105 rdf_store: Arc<RdfStore>,
106 tx_manager: Arc<TransactionManager>,
107 query_cache: Arc<QueryCache>,
108 adaptive_config: AdaptiveConfig,
109 factorized_execution: bool,
110 graph_model: GraphModel,
111 query_timeout: Option<Duration>,
112 commit_counter: Arc<AtomicUsize>,
113 gc_interval: usize,
114 ) -> Self {
115 Self {
116 store,
117 rdf_store,
118 tx_manager,
119 query_cache,
120 current_tx: None,
121 auto_commit: true,
122 adaptive_config,
123 factorized_execution,
124 graph_model,
125 query_timeout,
126 commit_counter,
127 gc_interval,
128 #[cfg(feature = "cdc")]
129 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
130 }
131 }
132
133 #[must_use]
135 pub fn graph_model(&self) -> GraphModel {
136 self.graph_model
137 }
138
139 fn require_lpg(&self, language: &str) -> Result<()> {
141 if self.graph_model == GraphModel::Rdf {
142 return Err(grafeo_common::utils::error::Error::Internal(format!(
143 "This is an RDF database. {language} queries require an LPG database."
144 )));
145 }
146 Ok(())
147 }
148
149 #[cfg(feature = "gql")]
176 pub fn execute(&self, query: &str) -> Result<QueryResult> {
177 self.require_lpg("GQL")?;
178
179 use crate::query::{
180 Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
181 optimizer::Optimizer, processor::QueryLanguage,
182 };
183
184 let start_time = std::time::Instant::now();
185
186 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
188
189 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
191 cached_plan
193 } else {
194 let logical_plan = gql_translator::translate(query)?;
198
199 let mut binder = Binder::new();
201 let _binding_context = binder.bind(&logical_plan)?;
202
203 let optimizer = Optimizer::from_store(&self.store);
205 let plan = optimizer.optimize(logical_plan)?;
206
207 self.query_cache.put_optimized(cache_key, plan.clone());
209
210 plan
211 };
212
213 let (viewing_epoch, tx_id) = self.get_transaction_context();
215
216 let planner = Planner::with_context(
219 Arc::clone(&self.store),
220 Arc::clone(&self.tx_manager),
221 tx_id,
222 viewing_epoch,
223 )
224 .with_factorized_execution(self.factorized_execution);
225 let mut physical_plan = planner.plan(&optimized_plan)?;
226
227 let executor = Executor::with_columns(physical_plan.columns.clone())
229 .with_deadline(self.query_deadline());
230 let mut result = executor.execute(physical_plan.operator.as_mut())?;
231
232 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
234 let rows_scanned = result.rows.len() as u64;
235 result.execution_time_ms = Some(elapsed_ms);
236 result.rows_scanned = Some(rows_scanned);
237
238 Ok(result)
239 }
240
241 #[cfg(feature = "gql")]
247 pub fn execute_with_params(
248 &self,
249 query: &str,
250 params: std::collections::HashMap<String, Value>,
251 ) -> Result<QueryResult> {
252 self.require_lpg("GQL")?;
253
254 use crate::query::processor::{QueryLanguage, QueryProcessor};
255
256 let (viewing_epoch, tx_id) = self.get_transaction_context();
258
259 let processor =
261 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
262
263 let processor = if let Some(tx_id) = tx_id {
265 processor.with_tx_context(viewing_epoch, tx_id)
266 } else {
267 processor
268 };
269
270 processor.process(query, QueryLanguage::Gql, Some(¶ms))
271 }
272
273 #[cfg(not(any(feature = "gql", feature = "cypher")))]
279 pub fn execute_with_params(
280 &self,
281 _query: &str,
282 _params: std::collections::HashMap<String, Value>,
283 ) -> Result<QueryResult> {
284 Err(grafeo_common::utils::error::Error::Internal(
285 "No query language enabled".to_string(),
286 ))
287 }
288
289 #[cfg(not(any(feature = "gql", feature = "cypher")))]
295 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
296 Err(grafeo_common::utils::error::Error::Internal(
297 "No query language enabled".to_string(),
298 ))
299 }
300
301 #[cfg(feature = "cypher")]
307 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
308 use crate::query::{
309 Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
310 optimizer::Optimizer, processor::QueryLanguage,
311 };
312
313 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
315
316 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
318 cached_plan
319 } else {
320 let logical_plan = cypher_translator::translate(query)?;
322
323 let mut binder = Binder::new();
325 let _binding_context = binder.bind(&logical_plan)?;
326
327 let optimizer = Optimizer::from_store(&self.store);
329 let plan = optimizer.optimize(logical_plan)?;
330
331 self.query_cache.put_optimized(cache_key, plan.clone());
333
334 plan
335 };
336
337 let (viewing_epoch, tx_id) = self.get_transaction_context();
339
340 let planner = Planner::with_context(
342 Arc::clone(&self.store),
343 Arc::clone(&self.tx_manager),
344 tx_id,
345 viewing_epoch,
346 )
347 .with_factorized_execution(self.factorized_execution);
348 let mut physical_plan = planner.plan(&optimized_plan)?;
349
350 let executor = Executor::with_columns(physical_plan.columns.clone())
352 .with_deadline(self.query_deadline());
353 executor.execute(physical_plan.operator.as_mut())
354 }
355
356 #[cfg(feature = "gremlin")]
380 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
381 use crate::query::{
382 Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
383 };
384
385 let logical_plan = gremlin_translator::translate(query)?;
387
388 let mut binder = Binder::new();
390 let _binding_context = binder.bind(&logical_plan)?;
391
392 let optimizer = Optimizer::from_store(&self.store);
394 let optimized_plan = optimizer.optimize(logical_plan)?;
395
396 let (viewing_epoch, tx_id) = self.get_transaction_context();
398
399 let planner = Planner::with_context(
401 Arc::clone(&self.store),
402 Arc::clone(&self.tx_manager),
403 tx_id,
404 viewing_epoch,
405 )
406 .with_factorized_execution(self.factorized_execution);
407 let mut physical_plan = planner.plan(&optimized_plan)?;
408
409 let executor = Executor::with_columns(physical_plan.columns.clone())
411 .with_deadline(self.query_deadline());
412 executor.execute(physical_plan.operator.as_mut())
413 }
414
415 #[cfg(feature = "gremlin")]
421 pub fn execute_gremlin_with_params(
422 &self,
423 query: &str,
424 params: std::collections::HashMap<String, Value>,
425 ) -> Result<QueryResult> {
426 use crate::query::processor::{QueryLanguage, QueryProcessor};
427
428 let (viewing_epoch, tx_id) = self.get_transaction_context();
430
431 let processor =
433 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
434
435 let processor = if let Some(tx_id) = tx_id {
437 processor.with_tx_context(viewing_epoch, tx_id)
438 } else {
439 processor
440 };
441
442 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
443 }
444
445 #[cfg(feature = "graphql")]
469 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
470 use crate::query::{
471 Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
472 };
473
474 let logical_plan = graphql_translator::translate(query)?;
476
477 let mut binder = Binder::new();
479 let _binding_context = binder.bind(&logical_plan)?;
480
481 let optimizer = Optimizer::from_store(&self.store);
483 let optimized_plan = optimizer.optimize(logical_plan)?;
484
485 let (viewing_epoch, tx_id) = self.get_transaction_context();
487
488 let planner = Planner::with_context(
490 Arc::clone(&self.store),
491 Arc::clone(&self.tx_manager),
492 tx_id,
493 viewing_epoch,
494 )
495 .with_factorized_execution(self.factorized_execution);
496 let mut physical_plan = planner.plan(&optimized_plan)?;
497
498 let executor = Executor::with_columns(physical_plan.columns.clone())
500 .with_deadline(self.query_deadline());
501 executor.execute(physical_plan.operator.as_mut())
502 }
503
504 #[cfg(feature = "graphql")]
510 pub fn execute_graphql_with_params(
511 &self,
512 query: &str,
513 params: std::collections::HashMap<String, Value>,
514 ) -> Result<QueryResult> {
515 use crate::query::processor::{QueryLanguage, QueryProcessor};
516
517 let (viewing_epoch, tx_id) = self.get_transaction_context();
519
520 let processor =
522 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
523
524 let processor = if let Some(tx_id) = tx_id {
526 processor.with_tx_context(viewing_epoch, tx_id)
527 } else {
528 processor
529 };
530
531 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
532 }
533
534 #[cfg(feature = "sql-pgq")]
559 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
560 use crate::query::{
561 Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
562 plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
563 };
564
565 let logical_plan = sql_pgq_translator::translate(query)?;
567
568 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
570 return Ok(QueryResult {
571 columns: vec!["status".into()],
572 column_types: vec![grafeo_common::types::LogicalType::String],
573 rows: vec![vec![Value::from(format!(
574 "Property graph '{}' created ({} node tables, {} edge tables)",
575 cpg.name,
576 cpg.node_tables.len(),
577 cpg.edge_tables.len()
578 ))]],
579 execution_time_ms: None,
580 rows_scanned: None,
581 });
582 }
583
584 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
586
587 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
589 cached_plan
590 } else {
591 let mut binder = Binder::new();
593 let _binding_context = binder.bind(&logical_plan)?;
594
595 let optimizer = Optimizer::from_store(&self.store);
597 let plan = optimizer.optimize(logical_plan)?;
598
599 self.query_cache.put_optimized(cache_key, plan.clone());
601
602 plan
603 };
604
605 let (viewing_epoch, tx_id) = self.get_transaction_context();
607
608 let planner = Planner::with_context(
610 Arc::clone(&self.store),
611 Arc::clone(&self.tx_manager),
612 tx_id,
613 viewing_epoch,
614 )
615 .with_factorized_execution(self.factorized_execution);
616 let mut physical_plan = planner.plan(&optimized_plan)?;
617
618 let executor = Executor::with_columns(physical_plan.columns.clone())
620 .with_deadline(self.query_deadline());
621 executor.execute(physical_plan.operator.as_mut())
622 }
623
624 #[cfg(feature = "sql-pgq")]
630 pub fn execute_sql_with_params(
631 &self,
632 query: &str,
633 params: std::collections::HashMap<String, Value>,
634 ) -> Result<QueryResult> {
635 use crate::query::processor::{QueryLanguage, QueryProcessor};
636
637 let (viewing_epoch, tx_id) = self.get_transaction_context();
639
640 let processor =
642 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
643
644 let processor = if let Some(tx_id) = tx_id {
646 processor.with_tx_context(viewing_epoch, tx_id)
647 } else {
648 processor
649 };
650
651 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
652 }
653
654 #[cfg(all(feature = "sparql", feature = "rdf"))]
660 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
661 use crate::query::{
662 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
663 };
664
665 let logical_plan = sparql_translator::translate(query)?;
667
668 let optimizer = Optimizer::from_store(&self.store);
670 let optimized_plan = optimizer.optimize(logical_plan)?;
671
672 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
674 let mut physical_plan = planner.plan(&optimized_plan)?;
675
676 let executor = Executor::with_columns(physical_plan.columns.clone())
678 .with_deadline(self.query_deadline());
679 executor.execute(physical_plan.operator.as_mut())
680 }
681
682 #[cfg(all(feature = "sparql", feature = "rdf"))]
688 pub fn execute_sparql_with_params(
689 &self,
690 query: &str,
691 _params: std::collections::HashMap<String, Value>,
692 ) -> Result<QueryResult> {
693 self.execute_sparql(query)
696 }
697
698 pub fn begin_tx(&mut self) -> Result<()> {
721 if self.current_tx.is_some() {
722 return Err(grafeo_common::utils::error::Error::Transaction(
723 grafeo_common::utils::error::TransactionError::InvalidState(
724 "Transaction already active".to_string(),
725 ),
726 ));
727 }
728
729 let tx_id = self.tx_manager.begin();
730 self.current_tx = Some(tx_id);
731 Ok(())
732 }
733
734 pub fn begin_tx_with_isolation(
742 &mut self,
743 isolation_level: crate::transaction::IsolationLevel,
744 ) -> Result<()> {
745 if self.current_tx.is_some() {
746 return Err(grafeo_common::utils::error::Error::Transaction(
747 grafeo_common::utils::error::TransactionError::InvalidState(
748 "Transaction already active".to_string(),
749 ),
750 ));
751 }
752
753 let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
754 self.current_tx = Some(tx_id);
755 Ok(())
756 }
757
758 pub fn commit(&mut self) -> Result<()> {
766 let tx_id = self.current_tx.take().ok_or_else(|| {
767 grafeo_common::utils::error::Error::Transaction(
768 grafeo_common::utils::error::TransactionError::InvalidState(
769 "No active transaction".to_string(),
770 ),
771 )
772 })?;
773
774 #[cfg(feature = "rdf")]
776 self.rdf_store.commit_tx(tx_id);
777
778 self.tx_manager.commit(tx_id)?;
779
780 self.store.sync_epoch(self.tx_manager.current_epoch());
784
785 if self.gc_interval > 0 {
787 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
788 if count.is_multiple_of(self.gc_interval) {
789 let min_epoch = self.tx_manager.min_active_epoch();
790 self.store.gc_versions(min_epoch);
791 self.tx_manager.gc();
792 }
793 }
794
795 Ok(())
796 }
797
798 pub fn rollback(&mut self) -> Result<()> {
822 let tx_id = self.current_tx.take().ok_or_else(|| {
823 grafeo_common::utils::error::Error::Transaction(
824 grafeo_common::utils::error::TransactionError::InvalidState(
825 "No active transaction".to_string(),
826 ),
827 )
828 })?;
829
830 self.store.discard_uncommitted_versions(tx_id);
832
833 #[cfg(feature = "rdf")]
835 self.rdf_store.rollback_tx(tx_id);
836
837 self.tx_manager.abort(tx_id)
839 }
840
841 #[must_use]
843 pub fn in_transaction(&self) -> bool {
844 self.current_tx.is_some()
845 }
846
847 pub fn set_auto_commit(&mut self, auto_commit: bool) {
849 self.auto_commit = auto_commit;
850 }
851
852 #[must_use]
854 pub fn auto_commit(&self) -> bool {
855 self.auto_commit
856 }
857
858 #[must_use]
860 fn query_deadline(&self) -> Option<Instant> {
861 self.query_timeout.map(|d| Instant::now() + d)
862 }
863
864 #[must_use]
870 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
871 if let Some(tx_id) = self.current_tx {
872 let epoch = self
874 .tx_manager
875 .start_epoch(tx_id)
876 .unwrap_or_else(|| self.tx_manager.current_epoch());
877 (epoch, Some(tx_id))
878 } else {
879 (self.tx_manager.current_epoch(), None)
881 }
882 }
883
884 pub fn create_node(&self, labels: &[&str]) -> NodeId {
889 let (epoch, tx_id) = self.get_transaction_context();
890 self.store
891 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
892 }
893
894 pub fn create_node_with_props<'a>(
898 &self,
899 labels: &[&str],
900 properties: impl IntoIterator<Item = (&'a str, Value)>,
901 ) -> NodeId {
902 let (epoch, tx_id) = self.get_transaction_context();
903 self.store.create_node_with_props_versioned(
904 labels,
905 properties.into_iter().map(|(k, v)| (k, v)),
906 epoch,
907 tx_id.unwrap_or(TxId::SYSTEM),
908 )
909 }
910
911 pub fn create_edge(
916 &self,
917 src: NodeId,
918 dst: NodeId,
919 edge_type: &str,
920 ) -> grafeo_common::types::EdgeId {
921 let (epoch, tx_id) = self.get_transaction_context();
922 self.store
923 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
924 }
925
926 #[must_use]
954 pub fn get_node(&self, id: NodeId) -> Option<Node> {
955 let (epoch, tx_id) = self.get_transaction_context();
956 self.store
957 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
958 }
959
960 #[must_use]
984 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
985 self.get_node(id)
986 .and_then(|node| node.get_property(key).cloned())
987 }
988
989 #[must_use]
996 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
997 let (epoch, tx_id) = self.get_transaction_context();
998 self.store
999 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1000 }
1001
1002 #[must_use]
1028 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1029 self.store.edges_from(node, Direction::Outgoing).collect()
1030 }
1031
1032 #[must_use]
1041 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1042 self.store.edges_from(node, Direction::Incoming).collect()
1043 }
1044
1045 #[must_use]
1057 pub fn get_neighbors_outgoing_by_type(
1058 &self,
1059 node: NodeId,
1060 edge_type: &str,
1061 ) -> Vec<(NodeId, EdgeId)> {
1062 self.store
1063 .edges_from(node, Direction::Outgoing)
1064 .filter(|(_, edge_id)| {
1065 self.get_edge(*edge_id)
1066 .is_some_and(|e| e.edge_type.as_str() == edge_type)
1067 })
1068 .collect()
1069 }
1070
1071 #[must_use]
1078 pub fn node_exists(&self, id: NodeId) -> bool {
1079 self.get_node(id).is_some()
1080 }
1081
1082 #[must_use]
1084 pub fn edge_exists(&self, id: EdgeId) -> bool {
1085 self.get_edge(id).is_some()
1086 }
1087
1088 #[must_use]
1092 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1093 let out = self.store.out_degree(node);
1094 let in_degree = self.store.in_degree(node);
1095 (out, in_degree)
1096 }
1097
1098 #[must_use]
1108 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1109 let (epoch, tx_id) = self.get_transaction_context();
1110 let tx = tx_id.unwrap_or(TxId::SYSTEM);
1111 ids.iter()
1112 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1113 .collect()
1114 }
1115
1116 #[cfg(feature = "cdc")]
1120 pub fn history(
1121 &self,
1122 entity_id: impl Into<crate::cdc::EntityId>,
1123 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1124 Ok(self.cdc_log.history(entity_id.into()))
1125 }
1126
1127 #[cfg(feature = "cdc")]
1129 pub fn history_since(
1130 &self,
1131 entity_id: impl Into<crate::cdc::EntityId>,
1132 since_epoch: EpochId,
1133 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1134 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1135 }
1136
1137 #[cfg(feature = "cdc")]
1139 pub fn changes_between(
1140 &self,
1141 start_epoch: EpochId,
1142 end_epoch: EpochId,
1143 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1144 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1145 }
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150 use crate::database::GrafeoDB;
1151
1152 #[test]
1153 fn test_session_create_node() {
1154 let db = GrafeoDB::new_in_memory();
1155 let session = db.session();
1156
1157 let id = session.create_node(&["Person"]);
1158 assert!(id.is_valid());
1159 assert_eq!(db.node_count(), 1);
1160 }
1161
1162 #[test]
1163 fn test_session_transaction() {
1164 let db = GrafeoDB::new_in_memory();
1165 let mut session = db.session();
1166
1167 assert!(!session.in_transaction());
1168
1169 session.begin_tx().unwrap();
1170 assert!(session.in_transaction());
1171
1172 session.commit().unwrap();
1173 assert!(!session.in_transaction());
1174 }
1175
1176 #[test]
1177 fn test_session_transaction_context() {
1178 let db = GrafeoDB::new_in_memory();
1179 let mut session = db.session();
1180
1181 let (_epoch1, tx_id1) = session.get_transaction_context();
1183 assert!(tx_id1.is_none());
1184
1185 session.begin_tx().unwrap();
1187 let (epoch2, tx_id2) = session.get_transaction_context();
1188 assert!(tx_id2.is_some());
1189 let _ = epoch2; session.commit().unwrap();
1194 let (epoch3, tx_id3) = session.get_transaction_context();
1195 assert!(tx_id3.is_none());
1196 assert!(epoch3.as_u64() >= epoch2.as_u64());
1198 }
1199
1200 #[test]
1201 fn test_session_rollback() {
1202 let db = GrafeoDB::new_in_memory();
1203 let mut session = db.session();
1204
1205 session.begin_tx().unwrap();
1206 session.rollback().unwrap();
1207 assert!(!session.in_transaction());
1208 }
1209
1210 #[test]
1211 fn test_session_rollback_discards_versions() {
1212 use grafeo_common::types::TxId;
1213
1214 let db = GrafeoDB::new_in_memory();
1215
1216 let node_before = db.store().create_node(&["Person"]);
1218 assert!(node_before.is_valid());
1219 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1220
1221 let mut session = db.session();
1223 session.begin_tx().unwrap();
1224 let tx_id = session.current_tx.unwrap();
1225
1226 let epoch = db.store().current_epoch();
1228 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1229 assert!(node_in_tx.is_valid());
1230
1231 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1233
1234 session.rollback().unwrap();
1236 assert!(!session.in_transaction());
1237
1238 let count_after = db.node_count();
1241 assert_eq!(
1242 count_after, 1,
1243 "Rollback should discard uncommitted node, but got {count_after}"
1244 );
1245
1246 let current_epoch = db.store().current_epoch();
1248 assert!(
1249 db.store()
1250 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1251 .is_some(),
1252 "Original node should still exist"
1253 );
1254
1255 assert!(
1257 db.store()
1258 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1259 .is_none(),
1260 "Transaction node should be gone"
1261 );
1262 }
1263
1264 #[test]
1265 fn test_session_create_node_in_transaction() {
1266 let db = GrafeoDB::new_in_memory();
1268
1269 let node_before = db.create_node(&["Person"]);
1271 assert!(node_before.is_valid());
1272 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1273
1274 let mut session = db.session();
1276 session.begin_tx().unwrap();
1277
1278 let node_in_tx = session.create_node(&["Person"]);
1280 assert!(node_in_tx.is_valid());
1281
1282 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1284
1285 session.rollback().unwrap();
1287
1288 let count_after = db.node_count();
1290 assert_eq!(
1291 count_after, 1,
1292 "Rollback should discard node created via session.create_node(), but got {count_after}"
1293 );
1294 }
1295
1296 #[test]
1297 fn test_session_create_node_with_props_in_transaction() {
1298 use grafeo_common::types::Value;
1299
1300 let db = GrafeoDB::new_in_memory();
1302
1303 db.create_node(&["Person"]);
1305 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1306
1307 let mut session = db.session();
1309 session.begin_tx().unwrap();
1310
1311 let node_in_tx =
1312 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1313 assert!(node_in_tx.is_valid());
1314
1315 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1317
1318 session.rollback().unwrap();
1320
1321 let count_after = db.node_count();
1323 assert_eq!(
1324 count_after, 1,
1325 "Rollback should discard node created via session.create_node_with_props()"
1326 );
1327 }
1328
1329 #[cfg(feature = "gql")]
1330 mod gql_tests {
1331 use super::*;
1332
1333 #[test]
1334 fn test_gql_query_execution() {
1335 let db = GrafeoDB::new_in_memory();
1336 let session = db.session();
1337
1338 session.create_node(&["Person"]);
1340 session.create_node(&["Person"]);
1341 session.create_node(&["Animal"]);
1342
1343 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1345
1346 assert_eq!(result.row_count(), 2);
1348 assert_eq!(result.column_count(), 1);
1349 assert_eq!(result.columns[0], "n");
1350 }
1351
1352 #[test]
1353 fn test_gql_empty_result() {
1354 let db = GrafeoDB::new_in_memory();
1355 let session = db.session();
1356
1357 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1359
1360 assert_eq!(result.row_count(), 0);
1361 }
1362
1363 #[test]
1364 fn test_gql_parse_error() {
1365 let db = GrafeoDB::new_in_memory();
1366 let session = db.session();
1367
1368 let result = session.execute("MATCH (n RETURN n");
1370
1371 assert!(result.is_err());
1372 }
1373
1374 #[test]
1375 fn test_gql_relationship_traversal() {
1376 let db = GrafeoDB::new_in_memory();
1377 let session = db.session();
1378
1379 let alice = session.create_node(&["Person"]);
1381 let bob = session.create_node(&["Person"]);
1382 let charlie = session.create_node(&["Person"]);
1383
1384 session.create_edge(alice, bob, "KNOWS");
1385 session.create_edge(alice, charlie, "KNOWS");
1386
1387 let result = session
1389 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1390 .unwrap();
1391
1392 assert_eq!(result.row_count(), 2);
1394 assert_eq!(result.column_count(), 2);
1395 assert_eq!(result.columns[0], "a");
1396 assert_eq!(result.columns[1], "b");
1397 }
1398
1399 #[test]
1400 fn test_gql_relationship_with_type_filter() {
1401 let db = GrafeoDB::new_in_memory();
1402 let session = db.session();
1403
1404 let alice = session.create_node(&["Person"]);
1406 let bob = session.create_node(&["Person"]);
1407 let charlie = session.create_node(&["Person"]);
1408
1409 session.create_edge(alice, bob, "KNOWS");
1410 session.create_edge(alice, charlie, "WORKS_WITH");
1411
1412 let result = session
1414 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1415 .unwrap();
1416
1417 assert_eq!(result.row_count(), 1);
1419 }
1420
1421 #[test]
1422 fn test_gql_semantic_error_undefined_variable() {
1423 let db = GrafeoDB::new_in_memory();
1424 let session = db.session();
1425
1426 let result = session.execute("MATCH (n:Person) RETURN x");
1428
1429 assert!(result.is_err());
1431 let Err(err) = result else {
1432 panic!("Expected error")
1433 };
1434 assert!(
1435 err.to_string().contains("Undefined variable"),
1436 "Expected undefined variable error, got: {}",
1437 err
1438 );
1439 }
1440
1441 #[test]
1442 fn test_gql_where_clause_property_filter() {
1443 use grafeo_common::types::Value;
1444
1445 let db = GrafeoDB::new_in_memory();
1446 let session = db.session();
1447
1448 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1450 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1451 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1452
1453 let result = session
1455 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1456 .unwrap();
1457
1458 assert_eq!(result.row_count(), 2);
1460 }
1461
1462 #[test]
1463 fn test_gql_where_clause_equality() {
1464 use grafeo_common::types::Value;
1465
1466 let db = GrafeoDB::new_in_memory();
1467 let session = db.session();
1468
1469 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1471 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1472 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1473
1474 let result = session
1476 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1477 .unwrap();
1478
1479 assert_eq!(result.row_count(), 2);
1481 }
1482
1483 #[test]
1484 fn test_gql_return_property_access() {
1485 use grafeo_common::types::Value;
1486
1487 let db = GrafeoDB::new_in_memory();
1488 let session = db.session();
1489
1490 session.create_node_with_props(
1492 &["Person"],
1493 [
1494 ("name", Value::String("Alice".into())),
1495 ("age", Value::Int64(30)),
1496 ],
1497 );
1498 session.create_node_with_props(
1499 &["Person"],
1500 [
1501 ("name", Value::String("Bob".into())),
1502 ("age", Value::Int64(25)),
1503 ],
1504 );
1505
1506 let result = session
1508 .execute("MATCH (n:Person) RETURN n.name, n.age")
1509 .unwrap();
1510
1511 assert_eq!(result.row_count(), 2);
1513 assert_eq!(result.column_count(), 2);
1514 assert_eq!(result.columns[0], "n.name");
1515 assert_eq!(result.columns[1], "n.age");
1516
1517 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1519 assert!(names.contains(&&Value::String("Alice".into())));
1520 assert!(names.contains(&&Value::String("Bob".into())));
1521 }
1522
1523 #[test]
1524 fn test_gql_return_mixed_expressions() {
1525 use grafeo_common::types::Value;
1526
1527 let db = GrafeoDB::new_in_memory();
1528 let session = db.session();
1529
1530 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1532
1533 let result = session
1535 .execute("MATCH (n:Person) RETURN n, n.name")
1536 .unwrap();
1537
1538 assert_eq!(result.row_count(), 1);
1539 assert_eq!(result.column_count(), 2);
1540 assert_eq!(result.columns[0], "n");
1541 assert_eq!(result.columns[1], "n.name");
1542
1543 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1545 }
1546 }
1547
1548 #[cfg(feature = "cypher")]
1549 mod cypher_tests {
1550 use super::*;
1551
1552 #[test]
1553 fn test_cypher_query_execution() {
1554 let db = GrafeoDB::new_in_memory();
1555 let session = db.session();
1556
1557 session.create_node(&["Person"]);
1559 session.create_node(&["Person"]);
1560 session.create_node(&["Animal"]);
1561
1562 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1564
1565 assert_eq!(result.row_count(), 2);
1567 assert_eq!(result.column_count(), 1);
1568 assert_eq!(result.columns[0], "n");
1569 }
1570
1571 #[test]
1572 fn test_cypher_empty_result() {
1573 let db = GrafeoDB::new_in_memory();
1574 let session = db.session();
1575
1576 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1578
1579 assert_eq!(result.row_count(), 0);
1580 }
1581
1582 #[test]
1583 fn test_cypher_parse_error() {
1584 let db = GrafeoDB::new_in_memory();
1585 let session = db.session();
1586
1587 let result = session.execute_cypher("MATCH (n RETURN n");
1589
1590 assert!(result.is_err());
1591 }
1592 }
1593
1594 mod direct_lookup_tests {
1597 use super::*;
1598 use grafeo_common::types::Value;
1599
1600 #[test]
1601 fn test_get_node() {
1602 let db = GrafeoDB::new_in_memory();
1603 let session = db.session();
1604
1605 let id = session.create_node(&["Person"]);
1606 let node = session.get_node(id);
1607
1608 assert!(node.is_some());
1609 let node = node.unwrap();
1610 assert_eq!(node.id, id);
1611 }
1612
1613 #[test]
1614 fn test_get_node_not_found() {
1615 use grafeo_common::types::NodeId;
1616
1617 let db = GrafeoDB::new_in_memory();
1618 let session = db.session();
1619
1620 let node = session.get_node(NodeId::new(9999));
1622 assert!(node.is_none());
1623 }
1624
1625 #[test]
1626 fn test_get_node_property() {
1627 let db = GrafeoDB::new_in_memory();
1628 let session = db.session();
1629
1630 let id = session
1631 .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1632
1633 let name = session.get_node_property(id, "name");
1634 assert_eq!(name, Some(Value::String("Alice".into())));
1635
1636 let missing = session.get_node_property(id, "missing");
1638 assert!(missing.is_none());
1639 }
1640
1641 #[test]
1642 fn test_get_edge() {
1643 let db = GrafeoDB::new_in_memory();
1644 let session = db.session();
1645
1646 let alice = session.create_node(&["Person"]);
1647 let bob = session.create_node(&["Person"]);
1648 let edge_id = session.create_edge(alice, bob, "KNOWS");
1649
1650 let edge = session.get_edge(edge_id);
1651 assert!(edge.is_some());
1652 let edge = edge.unwrap();
1653 assert_eq!(edge.id, edge_id);
1654 assert_eq!(edge.src, alice);
1655 assert_eq!(edge.dst, bob);
1656 }
1657
1658 #[test]
1659 fn test_get_edge_not_found() {
1660 use grafeo_common::types::EdgeId;
1661
1662 let db = GrafeoDB::new_in_memory();
1663 let session = db.session();
1664
1665 let edge = session.get_edge(EdgeId::new(9999));
1666 assert!(edge.is_none());
1667 }
1668
1669 #[test]
1670 fn test_get_neighbors_outgoing() {
1671 let db = GrafeoDB::new_in_memory();
1672 let session = db.session();
1673
1674 let alice = session.create_node(&["Person"]);
1675 let bob = session.create_node(&["Person"]);
1676 let carol = session.create_node(&["Person"]);
1677
1678 session.create_edge(alice, bob, "KNOWS");
1679 session.create_edge(alice, carol, "KNOWS");
1680
1681 let neighbors = session.get_neighbors_outgoing(alice);
1682 assert_eq!(neighbors.len(), 2);
1683
1684 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1685 assert!(neighbor_ids.contains(&bob));
1686 assert!(neighbor_ids.contains(&carol));
1687 }
1688
1689 #[test]
1690 fn test_get_neighbors_incoming() {
1691 let db = GrafeoDB::new_in_memory();
1692 let session = db.session();
1693
1694 let alice = session.create_node(&["Person"]);
1695 let bob = session.create_node(&["Person"]);
1696 let carol = session.create_node(&["Person"]);
1697
1698 session.create_edge(bob, alice, "KNOWS");
1699 session.create_edge(carol, alice, "KNOWS");
1700
1701 let neighbors = session.get_neighbors_incoming(alice);
1702 assert_eq!(neighbors.len(), 2);
1703
1704 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1705 assert!(neighbor_ids.contains(&bob));
1706 assert!(neighbor_ids.contains(&carol));
1707 }
1708
1709 #[test]
1710 fn test_get_neighbors_outgoing_by_type() {
1711 let db = GrafeoDB::new_in_memory();
1712 let session = db.session();
1713
1714 let alice = session.create_node(&["Person"]);
1715 let bob = session.create_node(&["Person"]);
1716 let company = session.create_node(&["Company"]);
1717
1718 session.create_edge(alice, bob, "KNOWS");
1719 session.create_edge(alice, company, "WORKS_AT");
1720
1721 let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1722 assert_eq!(knows_neighbors.len(), 1);
1723 assert_eq!(knows_neighbors[0].0, bob);
1724
1725 let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1726 assert_eq!(works_neighbors.len(), 1);
1727 assert_eq!(works_neighbors[0].0, company);
1728
1729 let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1731 assert!(no_neighbors.is_empty());
1732 }
1733
1734 #[test]
1735 fn test_node_exists() {
1736 use grafeo_common::types::NodeId;
1737
1738 let db = GrafeoDB::new_in_memory();
1739 let session = db.session();
1740
1741 let id = session.create_node(&["Person"]);
1742
1743 assert!(session.node_exists(id));
1744 assert!(!session.node_exists(NodeId::new(9999)));
1745 }
1746
1747 #[test]
1748 fn test_edge_exists() {
1749 use grafeo_common::types::EdgeId;
1750
1751 let db = GrafeoDB::new_in_memory();
1752 let session = db.session();
1753
1754 let alice = session.create_node(&["Person"]);
1755 let bob = session.create_node(&["Person"]);
1756 let edge_id = session.create_edge(alice, bob, "KNOWS");
1757
1758 assert!(session.edge_exists(edge_id));
1759 assert!(!session.edge_exists(EdgeId::new(9999)));
1760 }
1761
1762 #[test]
1763 fn test_get_degree() {
1764 let db = GrafeoDB::new_in_memory();
1765 let session = db.session();
1766
1767 let alice = session.create_node(&["Person"]);
1768 let bob = session.create_node(&["Person"]);
1769 let carol = session.create_node(&["Person"]);
1770
1771 session.create_edge(alice, bob, "KNOWS");
1773 session.create_edge(alice, carol, "KNOWS");
1774 session.create_edge(bob, alice, "KNOWS");
1776
1777 let (out_degree, in_degree) = session.get_degree(alice);
1778 assert_eq!(out_degree, 2);
1779 assert_eq!(in_degree, 1);
1780
1781 let lonely = session.create_node(&["Person"]);
1783 let (out, in_deg) = session.get_degree(lonely);
1784 assert_eq!(out, 0);
1785 assert_eq!(in_deg, 0);
1786 }
1787
1788 #[test]
1789 fn test_get_nodes_batch() {
1790 let db = GrafeoDB::new_in_memory();
1791 let session = db.session();
1792
1793 let alice = session.create_node(&["Person"]);
1794 let bob = session.create_node(&["Person"]);
1795 let carol = session.create_node(&["Person"]);
1796
1797 let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1798 assert_eq!(nodes.len(), 3);
1799 assert!(nodes[0].is_some());
1800 assert!(nodes[1].is_some());
1801 assert!(nodes[2].is_some());
1802
1803 use grafeo_common::types::NodeId;
1805 let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1806 assert_eq!(nodes_with_missing.len(), 3);
1807 assert!(nodes_with_missing[0].is_some());
1808 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
1810 }
1811
1812 #[test]
1813 fn test_auto_commit_setting() {
1814 let db = GrafeoDB::new_in_memory();
1815 let mut session = db.session();
1816
1817 assert!(session.auto_commit());
1819
1820 session.set_auto_commit(false);
1821 assert!(!session.auto_commit());
1822
1823 session.set_auto_commit(true);
1824 assert!(session.auto_commit());
1825 }
1826
1827 #[test]
1828 fn test_transaction_double_begin_error() {
1829 let db = GrafeoDB::new_in_memory();
1830 let mut session = db.session();
1831
1832 session.begin_tx().unwrap();
1833 let result = session.begin_tx();
1834
1835 assert!(result.is_err());
1836 session.rollback().unwrap();
1838 }
1839
1840 #[test]
1841 fn test_commit_without_transaction_error() {
1842 let db = GrafeoDB::new_in_memory();
1843 let mut session = db.session();
1844
1845 let result = session.commit();
1846 assert!(result.is_err());
1847 }
1848
1849 #[test]
1850 fn test_rollback_without_transaction_error() {
1851 let db = GrafeoDB::new_in_memory();
1852 let mut session = db.session();
1853
1854 let result = session.rollback();
1855 assert!(result.is_err());
1856 }
1857
1858 #[test]
1859 fn test_create_edge_in_transaction() {
1860 let db = GrafeoDB::new_in_memory();
1861 let mut session = db.session();
1862
1863 let alice = session.create_node(&["Person"]);
1865 let bob = session.create_node(&["Person"]);
1866
1867 session.begin_tx().unwrap();
1869 let edge_id = session.create_edge(alice, bob, "KNOWS");
1870
1871 assert!(session.edge_exists(edge_id));
1873
1874 session.commit().unwrap();
1876
1877 assert!(session.edge_exists(edge_id));
1879 }
1880
1881 #[test]
1882 fn test_neighbors_empty_node() {
1883 let db = GrafeoDB::new_in_memory();
1884 let session = db.session();
1885
1886 let lonely = session.create_node(&["Person"]);
1887
1888 assert!(session.get_neighbors_outgoing(lonely).is_empty());
1889 assert!(session.get_neighbors_incoming(lonely).is_empty());
1890 assert!(
1891 session
1892 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1893 .is_empty()
1894 );
1895 }
1896 }
1897}