1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::{AdaptiveConfig, GraphModel};
19use crate::database::QueryResult;
20use crate::query::cache::QueryCache;
21use crate::transaction::TransactionManager;
22
23pub struct Session {
29 store: Arc<LpgStore>,
31 #[cfg(feature = "rdf")]
33 rdf_store: Arc<RdfStore>,
34 tx_manager: Arc<TransactionManager>,
36 query_cache: Arc<QueryCache>,
38 current_tx: Option<TxId>,
40 auto_commit: bool,
42 #[allow(dead_code)]
44 adaptive_config: AdaptiveConfig,
45 factorized_execution: bool,
47 graph_model: GraphModel,
49 query_timeout: Option<Duration>,
51 commit_counter: Arc<AtomicUsize>,
53 gc_interval: usize,
55 #[cfg(feature = "cdc")]
57 cdc_log: Arc<crate::cdc::CdcLog>,
58}
59
60impl Session {
61 #[allow(dead_code, clippy::too_many_arguments)]
63 pub(crate) fn with_adaptive(
64 store: Arc<LpgStore>,
65 tx_manager: Arc<TransactionManager>,
66 query_cache: Arc<QueryCache>,
67 adaptive_config: AdaptiveConfig,
68 factorized_execution: bool,
69 graph_model: GraphModel,
70 query_timeout: Option<Duration>,
71 commit_counter: Arc<AtomicUsize>,
72 gc_interval: usize,
73 ) -> Self {
74 Self {
75 store,
76 #[cfg(feature = "rdf")]
77 rdf_store: Arc::new(RdfStore::new()),
78 tx_manager,
79 query_cache,
80 current_tx: None,
81 auto_commit: true,
82 adaptive_config,
83 factorized_execution,
84 graph_model,
85 query_timeout,
86 commit_counter,
87 gc_interval,
88 #[cfg(feature = "cdc")]
89 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
90 }
91 }
92
93 #[cfg(feature = "cdc")]
95 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
96 self.cdc_log = cdc_log;
97 }
98
99 #[cfg(feature = "rdf")]
101 #[allow(clippy::too_many_arguments)]
102 pub(crate) fn with_rdf_store_and_adaptive(
103 store: Arc<LpgStore>,
104 rdf_store: Arc<RdfStore>,
105 tx_manager: Arc<TransactionManager>,
106 query_cache: Arc<QueryCache>,
107 adaptive_config: AdaptiveConfig,
108 factorized_execution: bool,
109 graph_model: GraphModel,
110 query_timeout: Option<Duration>,
111 commit_counter: Arc<AtomicUsize>,
112 gc_interval: usize,
113 ) -> Self {
114 Self {
115 store,
116 rdf_store,
117 tx_manager,
118 query_cache,
119 current_tx: None,
120 auto_commit: true,
121 adaptive_config,
122 factorized_execution,
123 graph_model,
124 query_timeout,
125 commit_counter,
126 gc_interval,
127 #[cfg(feature = "cdc")]
128 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
129 }
130 }
131
132 #[must_use]
134 pub fn graph_model(&self) -> GraphModel {
135 self.graph_model
136 }
137
138 fn require_lpg(&self, language: &str) -> Result<()> {
140 if self.graph_model == GraphModel::Rdf {
141 return Err(grafeo_common::utils::error::Error::Internal(format!(
142 "This is an RDF database. {language} queries require an LPG database."
143 )));
144 }
145 Ok(())
146 }
147
148 #[cfg(feature = "gql")]
175 pub fn execute(&self, query: &str) -> Result<QueryResult> {
176 self.require_lpg("GQL")?;
177
178 use crate::query::{
179 Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
180 optimizer::Optimizer, processor::QueryLanguage,
181 };
182
183 let start_time = std::time::Instant::now();
184
185 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
187
188 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
190 cached_plan
192 } else {
193 let logical_plan = gql_translator::translate(query)?;
197
198 let mut binder = Binder::new();
200 let _binding_context = binder.bind(&logical_plan)?;
201
202 let optimizer = Optimizer::from_store(&self.store);
204 let plan = optimizer.optimize(logical_plan)?;
205
206 self.query_cache.put_optimized(cache_key, plan.clone());
208
209 plan
210 };
211
212 let (viewing_epoch, tx_id) = self.get_transaction_context();
214
215 let planner = Planner::with_context(
218 Arc::clone(&self.store),
219 Arc::clone(&self.tx_manager),
220 tx_id,
221 viewing_epoch,
222 )
223 .with_factorized_execution(self.factorized_execution);
224 let mut physical_plan = planner.plan(&optimized_plan)?;
225
226 let executor = Executor::with_columns(physical_plan.columns.clone())
228 .with_deadline(self.query_deadline());
229 let mut result = executor.execute(physical_plan.operator.as_mut())?;
230
231 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
233 let rows_scanned = result.rows.len() as u64;
234 result.execution_time_ms = Some(elapsed_ms);
235 result.rows_scanned = Some(rows_scanned);
236
237 Ok(result)
238 }
239
240 #[cfg(feature = "gql")]
246 pub fn execute_with_params(
247 &self,
248 query: &str,
249 params: std::collections::HashMap<String, Value>,
250 ) -> Result<QueryResult> {
251 self.require_lpg("GQL")?;
252
253 use crate::query::processor::{QueryLanguage, QueryProcessor};
254
255 let (viewing_epoch, tx_id) = self.get_transaction_context();
257
258 let processor =
260 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
261
262 let processor = if let Some(tx_id) = tx_id {
264 processor.with_tx_context(viewing_epoch, tx_id)
265 } else {
266 processor
267 };
268
269 processor.process(query, QueryLanguage::Gql, Some(¶ms))
270 }
271
272 #[cfg(not(any(feature = "gql", feature = "cypher")))]
278 pub fn execute_with_params(
279 &self,
280 _query: &str,
281 _params: std::collections::HashMap<String, Value>,
282 ) -> Result<QueryResult> {
283 Err(grafeo_common::utils::error::Error::Internal(
284 "No query language enabled".to_string(),
285 ))
286 }
287
288 #[cfg(not(any(feature = "gql", feature = "cypher")))]
294 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
295 Err(grafeo_common::utils::error::Error::Internal(
296 "No query language enabled".to_string(),
297 ))
298 }
299
300 #[cfg(feature = "cypher")]
306 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
307 use crate::query::{
308 Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
309 optimizer::Optimizer, processor::QueryLanguage,
310 };
311
312 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
314
315 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
317 cached_plan
318 } else {
319 let logical_plan = cypher_translator::translate(query)?;
321
322 let mut binder = Binder::new();
324 let _binding_context = binder.bind(&logical_plan)?;
325
326 let optimizer = Optimizer::from_store(&self.store);
328 let plan = optimizer.optimize(logical_plan)?;
329
330 self.query_cache.put_optimized(cache_key, plan.clone());
332
333 plan
334 };
335
336 let (viewing_epoch, tx_id) = self.get_transaction_context();
338
339 let planner = Planner::with_context(
341 Arc::clone(&self.store),
342 Arc::clone(&self.tx_manager),
343 tx_id,
344 viewing_epoch,
345 )
346 .with_factorized_execution(self.factorized_execution);
347 let mut physical_plan = planner.plan(&optimized_plan)?;
348
349 let executor = Executor::with_columns(physical_plan.columns.clone())
351 .with_deadline(self.query_deadline());
352 let result = executor.execute(physical_plan.operator.as_mut())?;
353 Ok(result)
354 }
355
356 #[cfg(feature = "gremlin")]
380 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
381 use crate::query::{
382 Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
383 };
384
385 let logical_plan = gremlin_translator::translate(query)?;
387
388 let mut binder = Binder::new();
390 let _binding_context = binder.bind(&logical_plan)?;
391
392 let optimizer = Optimizer::from_store(&self.store);
394 let optimized_plan = optimizer.optimize(logical_plan)?;
395
396 let (viewing_epoch, tx_id) = self.get_transaction_context();
398
399 let planner = Planner::with_context(
401 Arc::clone(&self.store),
402 Arc::clone(&self.tx_manager),
403 tx_id,
404 viewing_epoch,
405 )
406 .with_factorized_execution(self.factorized_execution);
407 let mut physical_plan = planner.plan(&optimized_plan)?;
408
409 let executor = Executor::with_columns(physical_plan.columns.clone())
411 .with_deadline(self.query_deadline());
412 let result = executor.execute(physical_plan.operator.as_mut())?;
413 Ok(result)
414 }
415
416 #[cfg(feature = "gremlin")]
422 pub fn execute_gremlin_with_params(
423 &self,
424 query: &str,
425 params: std::collections::HashMap<String, Value>,
426 ) -> Result<QueryResult> {
427 use crate::query::processor::{QueryLanguage, QueryProcessor};
428
429 let (viewing_epoch, tx_id) = self.get_transaction_context();
431
432 let processor =
434 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
435
436 let processor = if let Some(tx_id) = tx_id {
438 processor.with_tx_context(viewing_epoch, tx_id)
439 } else {
440 processor
441 };
442
443 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
444 }
445
446 #[cfg(feature = "graphql")]
470 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
471 use crate::query::{
472 Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
473 };
474
475 let logical_plan = graphql_translator::translate(query)?;
477
478 let mut binder = Binder::new();
480 let _binding_context = binder.bind(&logical_plan)?;
481
482 let optimizer = Optimizer::from_store(&self.store);
484 let optimized_plan = optimizer.optimize(logical_plan)?;
485
486 let (viewing_epoch, tx_id) = self.get_transaction_context();
488
489 let planner = Planner::with_context(
491 Arc::clone(&self.store),
492 Arc::clone(&self.tx_manager),
493 tx_id,
494 viewing_epoch,
495 )
496 .with_factorized_execution(self.factorized_execution);
497 let mut physical_plan = planner.plan(&optimized_plan)?;
498
499 let executor = Executor::with_columns(physical_plan.columns.clone())
501 .with_deadline(self.query_deadline());
502 let result = executor.execute(physical_plan.operator.as_mut())?;
503 Ok(result)
504 }
505
506 #[cfg(feature = "graphql")]
512 pub fn execute_graphql_with_params(
513 &self,
514 query: &str,
515 params: std::collections::HashMap<String, Value>,
516 ) -> Result<QueryResult> {
517 use crate::query::processor::{QueryLanguage, QueryProcessor};
518
519 let (viewing_epoch, tx_id) = self.get_transaction_context();
521
522 let processor =
524 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
525
526 let processor = if let Some(tx_id) = tx_id {
528 processor.with_tx_context(viewing_epoch, tx_id)
529 } else {
530 processor
531 };
532
533 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
534 }
535
536 #[cfg(feature = "sql-pgq")]
561 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
562 use crate::query::{
563 Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
564 plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
565 };
566
567 let logical_plan = sql_pgq_translator::translate(query)?;
569
570 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
572 return Ok(QueryResult {
573 columns: vec!["status".into()],
574 column_types: vec![grafeo_common::types::LogicalType::String],
575 rows: vec![vec![Value::from(format!(
576 "Property graph '{}' created ({} node tables, {} edge tables)",
577 cpg.name,
578 cpg.node_tables.len(),
579 cpg.edge_tables.len()
580 ))]],
581 execution_time_ms: None,
582 rows_scanned: None,
583 });
584 }
585
586 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
588
589 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
591 cached_plan
592 } else {
593 let mut binder = Binder::new();
595 let _binding_context = binder.bind(&logical_plan)?;
596
597 let optimizer = Optimizer::from_store(&self.store);
599 let plan = optimizer.optimize(logical_plan)?;
600
601 self.query_cache.put_optimized(cache_key, plan.clone());
603
604 plan
605 };
606
607 let (viewing_epoch, tx_id) = self.get_transaction_context();
609
610 let planner = Planner::with_context(
612 Arc::clone(&self.store),
613 Arc::clone(&self.tx_manager),
614 tx_id,
615 viewing_epoch,
616 )
617 .with_factorized_execution(self.factorized_execution);
618 let mut physical_plan = planner.plan(&optimized_plan)?;
619
620 let executor = Executor::with_columns(physical_plan.columns.clone())
622 .with_deadline(self.query_deadline());
623 let result = executor.execute(physical_plan.operator.as_mut())?;
624 Ok(result)
625 }
626
627 #[cfg(feature = "sql-pgq")]
633 pub fn execute_sql_with_params(
634 &self,
635 query: &str,
636 params: std::collections::HashMap<String, Value>,
637 ) -> Result<QueryResult> {
638 use crate::query::processor::{QueryLanguage, QueryProcessor};
639
640 let (viewing_epoch, tx_id) = self.get_transaction_context();
642
643 let processor =
645 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
646
647 let processor = if let Some(tx_id) = tx_id {
649 processor.with_tx_context(viewing_epoch, tx_id)
650 } else {
651 processor
652 };
653
654 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
655 }
656
657 #[cfg(all(feature = "sparql", feature = "rdf"))]
663 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
664 use crate::query::{
665 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
666 };
667
668 let logical_plan = sparql_translator::translate(query)?;
670
671 let optimizer = Optimizer::from_store(&self.store);
673 let optimized_plan = optimizer.optimize(logical_plan)?;
674
675 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
677 let mut physical_plan = planner.plan(&optimized_plan)?;
678
679 let executor = Executor::with_columns(physical_plan.columns.clone())
681 .with_deadline(self.query_deadline());
682 executor.execute(physical_plan.operator.as_mut())
683 }
684
685 #[cfg(all(feature = "sparql", feature = "rdf"))]
691 pub fn execute_sparql_with_params(
692 &self,
693 query: &str,
694 _params: std::collections::HashMap<String, Value>,
695 ) -> Result<QueryResult> {
696 self.execute_sparql(query)
699 }
700
701 pub fn begin_tx(&mut self) -> Result<()> {
724 if self.current_tx.is_some() {
725 return Err(grafeo_common::utils::error::Error::Transaction(
726 grafeo_common::utils::error::TransactionError::InvalidState(
727 "Transaction already active".to_string(),
728 ),
729 ));
730 }
731
732 let tx_id = self.tx_manager.begin();
733 self.current_tx = Some(tx_id);
734 Ok(())
735 }
736
737 pub fn begin_tx_with_isolation(
745 &mut self,
746 isolation_level: crate::transaction::IsolationLevel,
747 ) -> Result<()> {
748 if self.current_tx.is_some() {
749 return Err(grafeo_common::utils::error::Error::Transaction(
750 grafeo_common::utils::error::TransactionError::InvalidState(
751 "Transaction already active".to_string(),
752 ),
753 ));
754 }
755
756 let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
757 self.current_tx = Some(tx_id);
758 Ok(())
759 }
760
761 pub fn commit(&mut self) -> Result<()> {
769 let tx_id = self.current_tx.take().ok_or_else(|| {
770 grafeo_common::utils::error::Error::Transaction(
771 grafeo_common::utils::error::TransactionError::InvalidState(
772 "No active transaction".to_string(),
773 ),
774 )
775 })?;
776
777 #[cfg(feature = "rdf")]
779 self.rdf_store.commit_tx(tx_id);
780
781 self.tx_manager.commit(tx_id)?;
782
783 self.store.sync_epoch(self.tx_manager.current_epoch());
787
788 if self.gc_interval > 0 {
790 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
791 if count.is_multiple_of(self.gc_interval) {
792 let min_epoch = self.tx_manager.min_active_epoch();
793 self.store.gc_versions(min_epoch);
794 self.tx_manager.gc();
795 }
796 }
797
798 Ok(())
799 }
800
801 pub fn rollback(&mut self) -> Result<()> {
825 let tx_id = self.current_tx.take().ok_or_else(|| {
826 grafeo_common::utils::error::Error::Transaction(
827 grafeo_common::utils::error::TransactionError::InvalidState(
828 "No active transaction".to_string(),
829 ),
830 )
831 })?;
832
833 self.store.discard_uncommitted_versions(tx_id);
835
836 #[cfg(feature = "rdf")]
838 self.rdf_store.rollback_tx(tx_id);
839
840 self.tx_manager.abort(tx_id)
842 }
843
844 #[must_use]
846 pub fn in_transaction(&self) -> bool {
847 self.current_tx.is_some()
848 }
849
850 pub fn set_auto_commit(&mut self, auto_commit: bool) {
852 self.auto_commit = auto_commit;
853 }
854
855 #[must_use]
857 pub fn auto_commit(&self) -> bool {
858 self.auto_commit
859 }
860
861 #[must_use]
863 fn query_deadline(&self) -> Option<Instant> {
864 self.query_timeout.map(|d| Instant::now() + d)
865 }
866
867 #[must_use]
873 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
874 if let Some(tx_id) = self.current_tx {
875 let epoch = self
877 .tx_manager
878 .start_epoch(tx_id)
879 .unwrap_or_else(|| self.tx_manager.current_epoch());
880 (epoch, Some(tx_id))
881 } else {
882 (self.tx_manager.current_epoch(), None)
884 }
885 }
886
887 pub fn create_node(&self, labels: &[&str]) -> NodeId {
892 let (epoch, tx_id) = self.get_transaction_context();
893 self.store
894 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
895 }
896
897 pub fn create_node_with_props<'a>(
901 &self,
902 labels: &[&str],
903 properties: impl IntoIterator<Item = (&'a str, Value)>,
904 ) -> NodeId {
905 let (epoch, tx_id) = self.get_transaction_context();
906 self.store.create_node_with_props_versioned(
907 labels,
908 properties.into_iter().map(|(k, v)| (k, v)),
909 epoch,
910 tx_id.unwrap_or(TxId::SYSTEM),
911 )
912 }
913
914 pub fn create_edge(
919 &self,
920 src: NodeId,
921 dst: NodeId,
922 edge_type: &str,
923 ) -> grafeo_common::types::EdgeId {
924 let (epoch, tx_id) = self.get_transaction_context();
925 self.store
926 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
927 }
928
929 #[must_use]
957 pub fn get_node(&self, id: NodeId) -> Option<Node> {
958 let (epoch, tx_id) = self.get_transaction_context();
959 self.store
960 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
961 }
962
963 #[must_use]
987 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
988 self.get_node(id)
989 .and_then(|node| node.get_property(key).cloned())
990 }
991
992 #[must_use]
999 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1000 let (epoch, tx_id) = self.get_transaction_context();
1001 self.store
1002 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1003 }
1004
1005 #[must_use]
1031 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1032 self.store.edges_from(node, Direction::Outgoing).collect()
1033 }
1034
1035 #[must_use]
1044 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1045 self.store.edges_from(node, Direction::Incoming).collect()
1046 }
1047
1048 #[must_use]
1060 pub fn get_neighbors_outgoing_by_type(
1061 &self,
1062 node: NodeId,
1063 edge_type: &str,
1064 ) -> Vec<(NodeId, EdgeId)> {
1065 self.store
1066 .edges_from(node, Direction::Outgoing)
1067 .filter(|(_, edge_id)| {
1068 self.get_edge(*edge_id)
1069 .is_some_and(|e| e.edge_type.as_str() == edge_type)
1070 })
1071 .collect()
1072 }
1073
1074 #[must_use]
1081 pub fn node_exists(&self, id: NodeId) -> bool {
1082 self.get_node(id).is_some()
1083 }
1084
1085 #[must_use]
1087 pub fn edge_exists(&self, id: EdgeId) -> bool {
1088 self.get_edge(id).is_some()
1089 }
1090
1091 #[must_use]
1095 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1096 let out = self.store.out_degree(node);
1097 let in_degree = self.store.in_degree(node);
1098 (out, in_degree)
1099 }
1100
1101 #[must_use]
1111 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1112 let (epoch, tx_id) = self.get_transaction_context();
1113 let tx = tx_id.unwrap_or(TxId::SYSTEM);
1114 ids.iter()
1115 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1116 .collect()
1117 }
1118
1119 #[cfg(feature = "cdc")]
1123 pub fn history(
1124 &self,
1125 entity_id: impl Into<crate::cdc::EntityId>,
1126 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1127 Ok(self.cdc_log.history(entity_id.into()))
1128 }
1129
1130 #[cfg(feature = "cdc")]
1132 pub fn history_since(
1133 &self,
1134 entity_id: impl Into<crate::cdc::EntityId>,
1135 since_epoch: EpochId,
1136 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1137 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1138 }
1139
1140 #[cfg(feature = "cdc")]
1142 pub fn changes_between(
1143 &self,
1144 start_epoch: EpochId,
1145 end_epoch: EpochId,
1146 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1147 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1148 }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153 use crate::database::GrafeoDB;
1154
1155 #[test]
1156 fn test_session_create_node() {
1157 let db = GrafeoDB::new_in_memory();
1158 let session = db.session();
1159
1160 let id = session.create_node(&["Person"]);
1161 assert!(id.is_valid());
1162 assert_eq!(db.node_count(), 1);
1163 }
1164
1165 #[test]
1166 fn test_session_transaction() {
1167 let db = GrafeoDB::new_in_memory();
1168 let mut session = db.session();
1169
1170 assert!(!session.in_transaction());
1171
1172 session.begin_tx().unwrap();
1173 assert!(session.in_transaction());
1174
1175 session.commit().unwrap();
1176 assert!(!session.in_transaction());
1177 }
1178
1179 #[test]
1180 fn test_session_transaction_context() {
1181 let db = GrafeoDB::new_in_memory();
1182 let mut session = db.session();
1183
1184 let (_epoch1, tx_id1) = session.get_transaction_context();
1186 assert!(tx_id1.is_none());
1187
1188 session.begin_tx().unwrap();
1190 let (epoch2, tx_id2) = session.get_transaction_context();
1191 assert!(tx_id2.is_some());
1192 let _ = epoch2; session.commit().unwrap();
1197 let (epoch3, tx_id3) = session.get_transaction_context();
1198 assert!(tx_id3.is_none());
1199 assert!(epoch3.as_u64() >= epoch2.as_u64());
1201 }
1202
1203 #[test]
1204 fn test_session_rollback() {
1205 let db = GrafeoDB::new_in_memory();
1206 let mut session = db.session();
1207
1208 session.begin_tx().unwrap();
1209 session.rollback().unwrap();
1210 assert!(!session.in_transaction());
1211 }
1212
1213 #[test]
1214 fn test_session_rollback_discards_versions() {
1215 use grafeo_common::types::TxId;
1216
1217 let db = GrafeoDB::new_in_memory();
1218
1219 let node_before = db.store().create_node(&["Person"]);
1221 assert!(node_before.is_valid());
1222 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1223
1224 let mut session = db.session();
1226 session.begin_tx().unwrap();
1227 let tx_id = session.current_tx.unwrap();
1228
1229 let epoch = db.store().current_epoch();
1231 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1232 assert!(node_in_tx.is_valid());
1233
1234 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1236
1237 session.rollback().unwrap();
1239 assert!(!session.in_transaction());
1240
1241 let count_after = db.node_count();
1244 assert_eq!(
1245 count_after, 1,
1246 "Rollback should discard uncommitted node, but got {count_after}"
1247 );
1248
1249 let current_epoch = db.store().current_epoch();
1251 assert!(
1252 db.store()
1253 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1254 .is_some(),
1255 "Original node should still exist"
1256 );
1257
1258 assert!(
1260 db.store()
1261 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1262 .is_none(),
1263 "Transaction node should be gone"
1264 );
1265 }
1266
1267 #[test]
1268 fn test_session_create_node_in_transaction() {
1269 let db = GrafeoDB::new_in_memory();
1271
1272 let node_before = db.create_node(&["Person"]);
1274 assert!(node_before.is_valid());
1275 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1276
1277 let mut session = db.session();
1279 session.begin_tx().unwrap();
1280
1281 let node_in_tx = session.create_node(&["Person"]);
1283 assert!(node_in_tx.is_valid());
1284
1285 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1287
1288 session.rollback().unwrap();
1290
1291 let count_after = db.node_count();
1293 assert_eq!(
1294 count_after, 1,
1295 "Rollback should discard node created via session.create_node(), but got {count_after}"
1296 );
1297 }
1298
1299 #[test]
1300 fn test_session_create_node_with_props_in_transaction() {
1301 use grafeo_common::types::Value;
1302
1303 let db = GrafeoDB::new_in_memory();
1305
1306 db.create_node(&["Person"]);
1308 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1309
1310 let mut session = db.session();
1312 session.begin_tx().unwrap();
1313
1314 let node_in_tx =
1315 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1316 assert!(node_in_tx.is_valid());
1317
1318 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1320
1321 session.rollback().unwrap();
1323
1324 let count_after = db.node_count();
1326 assert_eq!(
1327 count_after, 1,
1328 "Rollback should discard node created via session.create_node_with_props()"
1329 );
1330 }
1331
1332 #[cfg(feature = "gql")]
1333 mod gql_tests {
1334 use super::*;
1335
1336 #[test]
1337 fn test_gql_query_execution() {
1338 let db = GrafeoDB::new_in_memory();
1339 let session = db.session();
1340
1341 session.create_node(&["Person"]);
1343 session.create_node(&["Person"]);
1344 session.create_node(&["Animal"]);
1345
1346 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1348
1349 assert_eq!(result.row_count(), 2);
1351 assert_eq!(result.column_count(), 1);
1352 assert_eq!(result.columns[0], "n");
1353 }
1354
1355 #[test]
1356 fn test_gql_empty_result() {
1357 let db = GrafeoDB::new_in_memory();
1358 let session = db.session();
1359
1360 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1362
1363 assert_eq!(result.row_count(), 0);
1364 }
1365
1366 #[test]
1367 fn test_gql_parse_error() {
1368 let db = GrafeoDB::new_in_memory();
1369 let session = db.session();
1370
1371 let result = session.execute("MATCH (n RETURN n");
1373
1374 assert!(result.is_err());
1375 }
1376
1377 #[test]
1378 fn test_gql_relationship_traversal() {
1379 let db = GrafeoDB::new_in_memory();
1380 let session = db.session();
1381
1382 let alice = session.create_node(&["Person"]);
1384 let bob = session.create_node(&["Person"]);
1385 let charlie = session.create_node(&["Person"]);
1386
1387 session.create_edge(alice, bob, "KNOWS");
1388 session.create_edge(alice, charlie, "KNOWS");
1389
1390 let result = session
1392 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1393 .unwrap();
1394
1395 assert_eq!(result.row_count(), 2);
1397 assert_eq!(result.column_count(), 2);
1398 assert_eq!(result.columns[0], "a");
1399 assert_eq!(result.columns[1], "b");
1400 }
1401
1402 #[test]
1403 fn test_gql_relationship_with_type_filter() {
1404 let db = GrafeoDB::new_in_memory();
1405 let session = db.session();
1406
1407 let alice = session.create_node(&["Person"]);
1409 let bob = session.create_node(&["Person"]);
1410 let charlie = session.create_node(&["Person"]);
1411
1412 session.create_edge(alice, bob, "KNOWS");
1413 session.create_edge(alice, charlie, "WORKS_WITH");
1414
1415 let result = session
1417 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1418 .unwrap();
1419
1420 assert_eq!(result.row_count(), 1);
1422 }
1423
1424 #[test]
1425 fn test_gql_semantic_error_undefined_variable() {
1426 let db = GrafeoDB::new_in_memory();
1427 let session = db.session();
1428
1429 let result = session.execute("MATCH (n:Person) RETURN x");
1431
1432 assert!(result.is_err());
1434 let Err(err) = result else {
1435 panic!("Expected error")
1436 };
1437 assert!(
1438 err.to_string().contains("Undefined variable"),
1439 "Expected undefined variable error, got: {}",
1440 err
1441 );
1442 }
1443
1444 #[test]
1445 fn test_gql_where_clause_property_filter() {
1446 use grafeo_common::types::Value;
1447
1448 let db = GrafeoDB::new_in_memory();
1449 let session = db.session();
1450
1451 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1453 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1454 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1455
1456 let result = session
1458 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1459 .unwrap();
1460
1461 assert_eq!(result.row_count(), 2);
1463 }
1464
1465 #[test]
1466 fn test_gql_where_clause_equality() {
1467 use grafeo_common::types::Value;
1468
1469 let db = GrafeoDB::new_in_memory();
1470 let session = db.session();
1471
1472 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1474 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1475 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1476
1477 let result = session
1479 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1480 .unwrap();
1481
1482 assert_eq!(result.row_count(), 2);
1484 }
1485
1486 #[test]
1487 fn test_gql_return_property_access() {
1488 use grafeo_common::types::Value;
1489
1490 let db = GrafeoDB::new_in_memory();
1491 let session = db.session();
1492
1493 session.create_node_with_props(
1495 &["Person"],
1496 [
1497 ("name", Value::String("Alice".into())),
1498 ("age", Value::Int64(30)),
1499 ],
1500 );
1501 session.create_node_with_props(
1502 &["Person"],
1503 [
1504 ("name", Value::String("Bob".into())),
1505 ("age", Value::Int64(25)),
1506 ],
1507 );
1508
1509 let result = session
1511 .execute("MATCH (n:Person) RETURN n.name, n.age")
1512 .unwrap();
1513
1514 assert_eq!(result.row_count(), 2);
1516 assert_eq!(result.column_count(), 2);
1517 assert_eq!(result.columns[0], "n.name");
1518 assert_eq!(result.columns[1], "n.age");
1519
1520 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1522 assert!(names.contains(&&Value::String("Alice".into())));
1523 assert!(names.contains(&&Value::String("Bob".into())));
1524 }
1525
1526 #[test]
1527 fn test_gql_return_mixed_expressions() {
1528 use grafeo_common::types::Value;
1529
1530 let db = GrafeoDB::new_in_memory();
1531 let session = db.session();
1532
1533 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1535
1536 let result = session
1538 .execute("MATCH (n:Person) RETURN n, n.name")
1539 .unwrap();
1540
1541 assert_eq!(result.row_count(), 1);
1542 assert_eq!(result.column_count(), 2);
1543 assert_eq!(result.columns[0], "n");
1544 assert_eq!(result.columns[1], "n.name");
1545
1546 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1548 }
1549 }
1550
1551 #[cfg(feature = "cypher")]
1552 mod cypher_tests {
1553 use super::*;
1554
1555 #[test]
1556 fn test_cypher_query_execution() {
1557 let db = GrafeoDB::new_in_memory();
1558 let session = db.session();
1559
1560 session.create_node(&["Person"]);
1562 session.create_node(&["Person"]);
1563 session.create_node(&["Animal"]);
1564
1565 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1567
1568 assert_eq!(result.row_count(), 2);
1570 assert_eq!(result.column_count(), 1);
1571 assert_eq!(result.columns[0], "n");
1572 }
1573
1574 #[test]
1575 fn test_cypher_empty_result() {
1576 let db = GrafeoDB::new_in_memory();
1577 let session = db.session();
1578
1579 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1581
1582 assert_eq!(result.row_count(), 0);
1583 }
1584
1585 #[test]
1586 fn test_cypher_parse_error() {
1587 let db = GrafeoDB::new_in_memory();
1588 let session = db.session();
1589
1590 let result = session.execute_cypher("MATCH (n RETURN n");
1592
1593 assert!(result.is_err());
1594 }
1595 }
1596
1597 mod direct_lookup_tests {
1600 use super::*;
1601 use grafeo_common::types::Value;
1602
1603 #[test]
1604 fn test_get_node() {
1605 let db = GrafeoDB::new_in_memory();
1606 let session = db.session();
1607
1608 let id = session.create_node(&["Person"]);
1609 let node = session.get_node(id);
1610
1611 assert!(node.is_some());
1612 let node = node.unwrap();
1613 assert_eq!(node.id, id);
1614 }
1615
1616 #[test]
1617 fn test_get_node_not_found() {
1618 use grafeo_common::types::NodeId;
1619
1620 let db = GrafeoDB::new_in_memory();
1621 let session = db.session();
1622
1623 let node = session.get_node(NodeId::new(9999));
1625 assert!(node.is_none());
1626 }
1627
1628 #[test]
1629 fn test_get_node_property() {
1630 let db = GrafeoDB::new_in_memory();
1631 let session = db.session();
1632
1633 let id = session
1634 .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1635
1636 let name = session.get_node_property(id, "name");
1637 assert_eq!(name, Some(Value::String("Alice".into())));
1638
1639 let missing = session.get_node_property(id, "missing");
1641 assert!(missing.is_none());
1642 }
1643
1644 #[test]
1645 fn test_get_edge() {
1646 let db = GrafeoDB::new_in_memory();
1647 let session = db.session();
1648
1649 let alice = session.create_node(&["Person"]);
1650 let bob = session.create_node(&["Person"]);
1651 let edge_id = session.create_edge(alice, bob, "KNOWS");
1652
1653 let edge = session.get_edge(edge_id);
1654 assert!(edge.is_some());
1655 let edge = edge.unwrap();
1656 assert_eq!(edge.id, edge_id);
1657 assert_eq!(edge.src, alice);
1658 assert_eq!(edge.dst, bob);
1659 }
1660
1661 #[test]
1662 fn test_get_edge_not_found() {
1663 use grafeo_common::types::EdgeId;
1664
1665 let db = GrafeoDB::new_in_memory();
1666 let session = db.session();
1667
1668 let edge = session.get_edge(EdgeId::new(9999));
1669 assert!(edge.is_none());
1670 }
1671
1672 #[test]
1673 fn test_get_neighbors_outgoing() {
1674 let db = GrafeoDB::new_in_memory();
1675 let session = db.session();
1676
1677 let alice = session.create_node(&["Person"]);
1678 let bob = session.create_node(&["Person"]);
1679 let carol = session.create_node(&["Person"]);
1680
1681 session.create_edge(alice, bob, "KNOWS");
1682 session.create_edge(alice, carol, "KNOWS");
1683
1684 let neighbors = session.get_neighbors_outgoing(alice);
1685 assert_eq!(neighbors.len(), 2);
1686
1687 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1688 assert!(neighbor_ids.contains(&bob));
1689 assert!(neighbor_ids.contains(&carol));
1690 }
1691
1692 #[test]
1693 fn test_get_neighbors_incoming() {
1694 let db = GrafeoDB::new_in_memory();
1695 let session = db.session();
1696
1697 let alice = session.create_node(&["Person"]);
1698 let bob = session.create_node(&["Person"]);
1699 let carol = session.create_node(&["Person"]);
1700
1701 session.create_edge(bob, alice, "KNOWS");
1702 session.create_edge(carol, alice, "KNOWS");
1703
1704 let neighbors = session.get_neighbors_incoming(alice);
1705 assert_eq!(neighbors.len(), 2);
1706
1707 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1708 assert!(neighbor_ids.contains(&bob));
1709 assert!(neighbor_ids.contains(&carol));
1710 }
1711
1712 #[test]
1713 fn test_get_neighbors_outgoing_by_type() {
1714 let db = GrafeoDB::new_in_memory();
1715 let session = db.session();
1716
1717 let alice = session.create_node(&["Person"]);
1718 let bob = session.create_node(&["Person"]);
1719 let company = session.create_node(&["Company"]);
1720
1721 session.create_edge(alice, bob, "KNOWS");
1722 session.create_edge(alice, company, "WORKS_AT");
1723
1724 let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1725 assert_eq!(knows_neighbors.len(), 1);
1726 assert_eq!(knows_neighbors[0].0, bob);
1727
1728 let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1729 assert_eq!(works_neighbors.len(), 1);
1730 assert_eq!(works_neighbors[0].0, company);
1731
1732 let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1734 assert!(no_neighbors.is_empty());
1735 }
1736
1737 #[test]
1738 fn test_node_exists() {
1739 use grafeo_common::types::NodeId;
1740
1741 let db = GrafeoDB::new_in_memory();
1742 let session = db.session();
1743
1744 let id = session.create_node(&["Person"]);
1745
1746 assert!(session.node_exists(id));
1747 assert!(!session.node_exists(NodeId::new(9999)));
1748 }
1749
1750 #[test]
1751 fn test_edge_exists() {
1752 use grafeo_common::types::EdgeId;
1753
1754 let db = GrafeoDB::new_in_memory();
1755 let session = db.session();
1756
1757 let alice = session.create_node(&["Person"]);
1758 let bob = session.create_node(&["Person"]);
1759 let edge_id = session.create_edge(alice, bob, "KNOWS");
1760
1761 assert!(session.edge_exists(edge_id));
1762 assert!(!session.edge_exists(EdgeId::new(9999)));
1763 }
1764
1765 #[test]
1766 fn test_get_degree() {
1767 let db = GrafeoDB::new_in_memory();
1768 let session = db.session();
1769
1770 let alice = session.create_node(&["Person"]);
1771 let bob = session.create_node(&["Person"]);
1772 let carol = session.create_node(&["Person"]);
1773
1774 session.create_edge(alice, bob, "KNOWS");
1776 session.create_edge(alice, carol, "KNOWS");
1777 session.create_edge(bob, alice, "KNOWS");
1779
1780 let (out_degree, in_degree) = session.get_degree(alice);
1781 assert_eq!(out_degree, 2);
1782 assert_eq!(in_degree, 1);
1783
1784 let lonely = session.create_node(&["Person"]);
1786 let (out, in_deg) = session.get_degree(lonely);
1787 assert_eq!(out, 0);
1788 assert_eq!(in_deg, 0);
1789 }
1790
1791 #[test]
1792 fn test_get_nodes_batch() {
1793 let db = GrafeoDB::new_in_memory();
1794 let session = db.session();
1795
1796 let alice = session.create_node(&["Person"]);
1797 let bob = session.create_node(&["Person"]);
1798 let carol = session.create_node(&["Person"]);
1799
1800 let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1801 assert_eq!(nodes.len(), 3);
1802 assert!(nodes[0].is_some());
1803 assert!(nodes[1].is_some());
1804 assert!(nodes[2].is_some());
1805
1806 use grafeo_common::types::NodeId;
1808 let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1809 assert_eq!(nodes_with_missing.len(), 3);
1810 assert!(nodes_with_missing[0].is_some());
1811 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
1813 }
1814
1815 #[test]
1816 fn test_auto_commit_setting() {
1817 let db = GrafeoDB::new_in_memory();
1818 let mut session = db.session();
1819
1820 assert!(session.auto_commit());
1822
1823 session.set_auto_commit(false);
1824 assert!(!session.auto_commit());
1825
1826 session.set_auto_commit(true);
1827 assert!(session.auto_commit());
1828 }
1829
1830 #[test]
1831 fn test_transaction_double_begin_error() {
1832 let db = GrafeoDB::new_in_memory();
1833 let mut session = db.session();
1834
1835 session.begin_tx().unwrap();
1836 let result = session.begin_tx();
1837
1838 assert!(result.is_err());
1839 session.rollback().unwrap();
1841 }
1842
1843 #[test]
1844 fn test_commit_without_transaction_error() {
1845 let db = GrafeoDB::new_in_memory();
1846 let mut session = db.session();
1847
1848 let result = session.commit();
1849 assert!(result.is_err());
1850 }
1851
1852 #[test]
1853 fn test_rollback_without_transaction_error() {
1854 let db = GrafeoDB::new_in_memory();
1855 let mut session = db.session();
1856
1857 let result = session.rollback();
1858 assert!(result.is_err());
1859 }
1860
1861 #[test]
1862 fn test_create_edge_in_transaction() {
1863 let db = GrafeoDB::new_in_memory();
1864 let mut session = db.session();
1865
1866 let alice = session.create_node(&["Person"]);
1868 let bob = session.create_node(&["Person"]);
1869
1870 session.begin_tx().unwrap();
1872 let edge_id = session.create_edge(alice, bob, "KNOWS");
1873
1874 assert!(session.edge_exists(edge_id));
1876
1877 session.commit().unwrap();
1879
1880 assert!(session.edge_exists(edge_id));
1882 }
1883
1884 #[test]
1885 fn test_neighbors_empty_node() {
1886 let db = GrafeoDB::new_in_memory();
1887 let session = db.session();
1888
1889 let lonely = session.create_node(&["Person"]);
1890
1891 assert!(session.get_neighbors_outgoing(lonely).is_empty());
1892 assert!(session.get_neighbors_incoming(lonely).is_empty());
1893 assert!(
1894 session
1895 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1896 .is_empty()
1897 );
1898 }
1899 }
1900}