1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::{AdaptiveConfig, GraphModel};
19use crate::database::QueryResult;
20use crate::query::cache::QueryCache;
21use crate::transaction::TransactionManager;
22
23pub struct Session {
29 store: Arc<LpgStore>,
31 #[cfg(feature = "rdf")]
33 #[allow(dead_code)]
34 rdf_store: Arc<RdfStore>,
35 tx_manager: Arc<TransactionManager>,
37 query_cache: Arc<QueryCache>,
39 current_tx: Option<TxId>,
41 auto_commit: bool,
43 #[allow(dead_code)]
45 adaptive_config: AdaptiveConfig,
46 factorized_execution: bool,
48 graph_model: GraphModel,
50 query_timeout: Option<Duration>,
52 commit_counter: Arc<AtomicUsize>,
54 gc_interval: usize,
56 #[cfg(feature = "cdc")]
58 cdc_log: Arc<crate::cdc::CdcLog>,
59}
60
61impl Session {
62 #[allow(dead_code, clippy::too_many_arguments)]
64 pub(crate) fn with_adaptive(
65 store: Arc<LpgStore>,
66 tx_manager: Arc<TransactionManager>,
67 query_cache: Arc<QueryCache>,
68 adaptive_config: AdaptiveConfig,
69 factorized_execution: bool,
70 graph_model: GraphModel,
71 query_timeout: Option<Duration>,
72 commit_counter: Arc<AtomicUsize>,
73 gc_interval: usize,
74 ) -> Self {
75 Self {
76 store,
77 #[cfg(feature = "rdf")]
78 rdf_store: Arc::new(RdfStore::new()),
79 tx_manager,
80 query_cache,
81 current_tx: None,
82 auto_commit: true,
83 adaptive_config,
84 factorized_execution,
85 graph_model,
86 query_timeout,
87 commit_counter,
88 gc_interval,
89 #[cfg(feature = "cdc")]
90 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
91 }
92 }
93
94 #[cfg(feature = "cdc")]
96 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
97 self.cdc_log = cdc_log;
98 }
99
100 #[cfg(feature = "rdf")]
102 #[allow(clippy::too_many_arguments)]
103 pub(crate) fn with_rdf_store_and_adaptive(
104 store: Arc<LpgStore>,
105 rdf_store: Arc<RdfStore>,
106 tx_manager: Arc<TransactionManager>,
107 query_cache: Arc<QueryCache>,
108 adaptive_config: AdaptiveConfig,
109 factorized_execution: bool,
110 graph_model: GraphModel,
111 query_timeout: Option<Duration>,
112 commit_counter: Arc<AtomicUsize>,
113 gc_interval: usize,
114 ) -> Self {
115 Self {
116 store,
117 rdf_store,
118 tx_manager,
119 query_cache,
120 current_tx: None,
121 auto_commit: true,
122 adaptive_config,
123 factorized_execution,
124 graph_model,
125 query_timeout,
126 commit_counter,
127 gc_interval,
128 #[cfg(feature = "cdc")]
129 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
130 }
131 }
132
133 #[must_use]
135 pub fn graph_model(&self) -> GraphModel {
136 self.graph_model
137 }
138
139 fn require_lpg(&self, language: &str) -> Result<()> {
141 if self.graph_model == GraphModel::Rdf {
142 return Err(grafeo_common::utils::error::Error::Internal(format!(
143 "This is an RDF database. {language} queries require an LPG database."
144 )));
145 }
146 Ok(())
147 }
148
149 #[cfg(feature = "gql")]
176 pub fn execute(&self, query: &str) -> Result<QueryResult> {
177 self.require_lpg("GQL")?;
178
179 use crate::query::{
180 Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
181 optimizer::Optimizer, processor::QueryLanguage,
182 };
183
184 let start_time = std::time::Instant::now();
185
186 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
188
189 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
191 cached_plan
193 } else {
194 let logical_plan = gql_translator::translate(query)?;
198
199 let mut binder = Binder::new();
201 let _binding_context = binder.bind(&logical_plan)?;
202
203 let optimizer = Optimizer::from_store(&self.store);
205 let plan = optimizer.optimize(logical_plan)?;
206
207 self.query_cache.put_optimized(cache_key, plan.clone());
209
210 plan
211 };
212
213 let (viewing_epoch, tx_id) = self.get_transaction_context();
215
216 let planner = Planner::with_context(
219 Arc::clone(&self.store),
220 Arc::clone(&self.tx_manager),
221 tx_id,
222 viewing_epoch,
223 )
224 .with_factorized_execution(self.factorized_execution);
225 let mut physical_plan = planner.plan(&optimized_plan)?;
226
227 let executor = Executor::with_columns(physical_plan.columns.clone())
229 .with_deadline(self.query_deadline());
230 let mut result = executor.execute(physical_plan.operator.as_mut())?;
231
232 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
234 let rows_scanned = result.rows.len() as u64;
235 result.execution_time_ms = Some(elapsed_ms);
236 result.rows_scanned = Some(rows_scanned);
237
238 Ok(result)
239 }
240
241 #[cfg(feature = "gql")]
247 pub fn execute_with_params(
248 &self,
249 query: &str,
250 params: std::collections::HashMap<String, Value>,
251 ) -> Result<QueryResult> {
252 self.require_lpg("GQL")?;
253
254 use crate::query::processor::{QueryLanguage, QueryProcessor};
255
256 let (viewing_epoch, tx_id) = self.get_transaction_context();
258
259 let processor =
261 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
262
263 let processor = if let Some(tx_id) = tx_id {
265 processor.with_tx_context(viewing_epoch, tx_id)
266 } else {
267 processor
268 };
269
270 processor.process(query, QueryLanguage::Gql, Some(¶ms))
271 }
272
273 #[cfg(not(any(feature = "gql", feature = "cypher")))]
279 pub fn execute_with_params(
280 &self,
281 _query: &str,
282 _params: std::collections::HashMap<String, Value>,
283 ) -> Result<QueryResult> {
284 Err(grafeo_common::utils::error::Error::Internal(
285 "No query language enabled".to_string(),
286 ))
287 }
288
289 #[cfg(not(any(feature = "gql", feature = "cypher")))]
295 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
296 Err(grafeo_common::utils::error::Error::Internal(
297 "No query language enabled".to_string(),
298 ))
299 }
300
301 #[cfg(feature = "cypher")]
307 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
308 use crate::query::{
309 Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
310 optimizer::Optimizer, processor::QueryLanguage,
311 };
312
313 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
315
316 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
318 cached_plan
319 } else {
320 let logical_plan = cypher_translator::translate(query)?;
322
323 let mut binder = Binder::new();
325 let _binding_context = binder.bind(&logical_plan)?;
326
327 let optimizer = Optimizer::from_store(&self.store);
329 let plan = optimizer.optimize(logical_plan)?;
330
331 self.query_cache.put_optimized(cache_key, plan.clone());
333
334 plan
335 };
336
337 let (viewing_epoch, tx_id) = self.get_transaction_context();
339
340 let planner = Planner::with_context(
342 Arc::clone(&self.store),
343 Arc::clone(&self.tx_manager),
344 tx_id,
345 viewing_epoch,
346 )
347 .with_factorized_execution(self.factorized_execution);
348 let mut physical_plan = planner.plan(&optimized_plan)?;
349
350 let executor = Executor::with_columns(physical_plan.columns.clone())
352 .with_deadline(self.query_deadline());
353 executor.execute(physical_plan.operator.as_mut())
354 }
355
356 #[cfg(feature = "gremlin")]
380 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
381 use crate::query::{
382 Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
383 };
384
385 let logical_plan = gremlin_translator::translate(query)?;
387
388 let mut binder = Binder::new();
390 let _binding_context = binder.bind(&logical_plan)?;
391
392 let optimizer = Optimizer::from_store(&self.store);
394 let optimized_plan = optimizer.optimize(logical_plan)?;
395
396 let (viewing_epoch, tx_id) = self.get_transaction_context();
398
399 let planner = Planner::with_context(
401 Arc::clone(&self.store),
402 Arc::clone(&self.tx_manager),
403 tx_id,
404 viewing_epoch,
405 )
406 .with_factorized_execution(self.factorized_execution);
407 let mut physical_plan = planner.plan(&optimized_plan)?;
408
409 let executor = Executor::with_columns(physical_plan.columns.clone())
411 .with_deadline(self.query_deadline());
412 executor.execute(physical_plan.operator.as_mut())
413 }
414
415 #[cfg(feature = "gremlin")]
421 pub fn execute_gremlin_with_params(
422 &self,
423 query: &str,
424 params: std::collections::HashMap<String, Value>,
425 ) -> Result<QueryResult> {
426 use crate::query::processor::{QueryLanguage, QueryProcessor};
427
428 let (viewing_epoch, tx_id) = self.get_transaction_context();
430
431 let processor =
433 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
434
435 let processor = if let Some(tx_id) = tx_id {
437 processor.with_tx_context(viewing_epoch, tx_id)
438 } else {
439 processor
440 };
441
442 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
443 }
444
445 #[cfg(feature = "graphql")]
469 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
470 use crate::query::{
471 Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
472 };
473
474 let logical_plan = graphql_translator::translate(query)?;
476
477 let mut binder = Binder::new();
479 let _binding_context = binder.bind(&logical_plan)?;
480
481 let optimizer = Optimizer::from_store(&self.store);
483 let optimized_plan = optimizer.optimize(logical_plan)?;
484
485 let (viewing_epoch, tx_id) = self.get_transaction_context();
487
488 let planner = Planner::with_context(
490 Arc::clone(&self.store),
491 Arc::clone(&self.tx_manager),
492 tx_id,
493 viewing_epoch,
494 )
495 .with_factorized_execution(self.factorized_execution);
496 let mut physical_plan = planner.plan(&optimized_plan)?;
497
498 let executor = Executor::with_columns(physical_plan.columns.clone())
500 .with_deadline(self.query_deadline());
501 executor.execute(physical_plan.operator.as_mut())
502 }
503
504 #[cfg(feature = "graphql")]
510 pub fn execute_graphql_with_params(
511 &self,
512 query: &str,
513 params: std::collections::HashMap<String, Value>,
514 ) -> Result<QueryResult> {
515 use crate::query::processor::{QueryLanguage, QueryProcessor};
516
517 let (viewing_epoch, tx_id) = self.get_transaction_context();
519
520 let processor =
522 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
523
524 let processor = if let Some(tx_id) = tx_id {
526 processor.with_tx_context(viewing_epoch, tx_id)
527 } else {
528 processor
529 };
530
531 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
532 }
533
534 #[cfg(feature = "sql-pgq")]
559 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
560 use crate::query::{
561 Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
562 plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
563 };
564
565 let logical_plan = sql_pgq_translator::translate(query)?;
567
568 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
570 return Ok(QueryResult {
571 columns: vec!["status".into()],
572 column_types: vec![grafeo_common::types::LogicalType::String],
573 rows: vec![vec![Value::from(format!(
574 "Property graph '{}' created ({} node tables, {} edge tables)",
575 cpg.name,
576 cpg.node_tables.len(),
577 cpg.edge_tables.len()
578 ))]],
579 execution_time_ms: None,
580 rows_scanned: None,
581 });
582 }
583
584 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
586
587 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
589 cached_plan
590 } else {
591 let mut binder = Binder::new();
593 let _binding_context = binder.bind(&logical_plan)?;
594
595 let optimizer = Optimizer::from_store(&self.store);
597 let plan = optimizer.optimize(logical_plan)?;
598
599 self.query_cache.put_optimized(cache_key, plan.clone());
601
602 plan
603 };
604
605 let (viewing_epoch, tx_id) = self.get_transaction_context();
607
608 let planner = Planner::with_context(
610 Arc::clone(&self.store),
611 Arc::clone(&self.tx_manager),
612 tx_id,
613 viewing_epoch,
614 )
615 .with_factorized_execution(self.factorized_execution);
616 let mut physical_plan = planner.plan(&optimized_plan)?;
617
618 let executor = Executor::with_columns(physical_plan.columns.clone())
620 .with_deadline(self.query_deadline());
621 executor.execute(physical_plan.operator.as_mut())
622 }
623
624 #[cfg(feature = "sql-pgq")]
630 pub fn execute_sql_with_params(
631 &self,
632 query: &str,
633 params: std::collections::HashMap<String, Value>,
634 ) -> Result<QueryResult> {
635 use crate::query::processor::{QueryLanguage, QueryProcessor};
636
637 let (viewing_epoch, tx_id) = self.get_transaction_context();
639
640 let processor =
642 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
643
644 let processor = if let Some(tx_id) = tx_id {
646 processor.with_tx_context(viewing_epoch, tx_id)
647 } else {
648 processor
649 };
650
651 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
652 }
653
654 #[cfg(all(feature = "sparql", feature = "rdf"))]
660 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
661 use crate::query::{
662 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
663 };
664
665 let logical_plan = sparql_translator::translate(query)?;
667
668 let optimizer = Optimizer::from_store(&self.store);
670 let optimized_plan = optimizer.optimize(logical_plan)?;
671
672 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
674 let mut physical_plan = planner.plan(&optimized_plan)?;
675
676 let executor = Executor::with_columns(physical_plan.columns.clone())
678 .with_deadline(self.query_deadline());
679 executor.execute(physical_plan.operator.as_mut())
680 }
681
682 #[cfg(all(feature = "sparql", feature = "rdf"))]
688 pub fn execute_sparql_with_params(
689 &self,
690 query: &str,
691 _params: std::collections::HashMap<String, Value>,
692 ) -> Result<QueryResult> {
693 self.execute_sparql(query)
696 }
697
698 pub fn begin_tx(&mut self) -> Result<()> {
721 if self.current_tx.is_some() {
722 return Err(grafeo_common::utils::error::Error::Transaction(
723 grafeo_common::utils::error::TransactionError::InvalidState(
724 "Transaction already active".to_string(),
725 ),
726 ));
727 }
728
729 let tx_id = self.tx_manager.begin();
730 self.current_tx = Some(tx_id);
731 Ok(())
732 }
733
734 pub fn begin_tx_with_isolation(
742 &mut self,
743 isolation_level: crate::transaction::IsolationLevel,
744 ) -> Result<()> {
745 if self.current_tx.is_some() {
746 return Err(grafeo_common::utils::error::Error::Transaction(
747 grafeo_common::utils::error::TransactionError::InvalidState(
748 "Transaction already active".to_string(),
749 ),
750 ));
751 }
752
753 let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
754 self.current_tx = Some(tx_id);
755 Ok(())
756 }
757
758 pub fn commit(&mut self) -> Result<()> {
766 let tx_id = self.current_tx.take().ok_or_else(|| {
767 grafeo_common::utils::error::Error::Transaction(
768 grafeo_common::utils::error::TransactionError::InvalidState(
769 "No active transaction".to_string(),
770 ),
771 )
772 })?;
773
774 #[cfg(feature = "rdf")]
776 self.rdf_store.commit_tx(tx_id);
777
778 self.tx_manager.commit(tx_id)?;
779
780 if self.gc_interval > 0 {
782 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
783 if count.is_multiple_of(self.gc_interval) {
784 let min_epoch = self.tx_manager.min_active_epoch();
785 self.store.gc_versions(min_epoch);
786 self.tx_manager.gc();
787 }
788 }
789
790 Ok(())
791 }
792
793 pub fn rollback(&mut self) -> Result<()> {
817 let tx_id = self.current_tx.take().ok_or_else(|| {
818 grafeo_common::utils::error::Error::Transaction(
819 grafeo_common::utils::error::TransactionError::InvalidState(
820 "No active transaction".to_string(),
821 ),
822 )
823 })?;
824
825 self.store.discard_uncommitted_versions(tx_id);
827
828 #[cfg(feature = "rdf")]
830 self.rdf_store.rollback_tx(tx_id);
831
832 self.tx_manager.abort(tx_id)
834 }
835
836 #[must_use]
838 pub fn in_transaction(&self) -> bool {
839 self.current_tx.is_some()
840 }
841
842 pub fn set_auto_commit(&mut self, auto_commit: bool) {
844 self.auto_commit = auto_commit;
845 }
846
847 #[must_use]
849 pub fn auto_commit(&self) -> bool {
850 self.auto_commit
851 }
852
853 #[must_use]
855 fn query_deadline(&self) -> Option<Instant> {
856 self.query_timeout.map(|d| Instant::now() + d)
857 }
858
859 #[must_use]
865 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
866 if let Some(tx_id) = self.current_tx {
867 let epoch = self
869 .tx_manager
870 .start_epoch(tx_id)
871 .unwrap_or_else(|| self.tx_manager.current_epoch());
872 (epoch, Some(tx_id))
873 } else {
874 (self.tx_manager.current_epoch(), None)
876 }
877 }
878
879 pub fn create_node(&self, labels: &[&str]) -> NodeId {
884 let (epoch, tx_id) = self.get_transaction_context();
885 self.store
886 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
887 }
888
889 pub fn create_node_with_props<'a>(
893 &self,
894 labels: &[&str],
895 properties: impl IntoIterator<Item = (&'a str, Value)>,
896 ) -> NodeId {
897 let (epoch, tx_id) = self.get_transaction_context();
898 self.store.create_node_with_props_versioned(
899 labels,
900 properties.into_iter().map(|(k, v)| (k, v)),
901 epoch,
902 tx_id.unwrap_or(TxId::SYSTEM),
903 )
904 }
905
906 pub fn create_edge(
911 &self,
912 src: NodeId,
913 dst: NodeId,
914 edge_type: &str,
915 ) -> grafeo_common::types::EdgeId {
916 let (epoch, tx_id) = self.get_transaction_context();
917 self.store
918 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
919 }
920
921 #[must_use]
949 pub fn get_node(&self, id: NodeId) -> Option<Node> {
950 let (epoch, tx_id) = self.get_transaction_context();
951 self.store
952 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
953 }
954
955 #[must_use]
979 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
980 self.get_node(id)
981 .and_then(|node| node.get_property(key).cloned())
982 }
983
984 #[must_use]
991 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
992 let (epoch, tx_id) = self.get_transaction_context();
993 self.store
994 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
995 }
996
997 #[must_use]
1023 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1024 self.store.edges_from(node, Direction::Outgoing).collect()
1025 }
1026
1027 #[must_use]
1036 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1037 self.store.edges_from(node, Direction::Incoming).collect()
1038 }
1039
1040 #[must_use]
1052 pub fn get_neighbors_outgoing_by_type(
1053 &self,
1054 node: NodeId,
1055 edge_type: &str,
1056 ) -> Vec<(NodeId, EdgeId)> {
1057 self.store
1058 .edges_from(node, Direction::Outgoing)
1059 .filter(|(_, edge_id)| {
1060 self.get_edge(*edge_id)
1061 .is_some_and(|e| e.edge_type.as_str() == edge_type)
1062 })
1063 .collect()
1064 }
1065
1066 #[must_use]
1073 pub fn node_exists(&self, id: NodeId) -> bool {
1074 self.get_node(id).is_some()
1075 }
1076
1077 #[must_use]
1079 pub fn edge_exists(&self, id: EdgeId) -> bool {
1080 self.get_edge(id).is_some()
1081 }
1082
1083 #[must_use]
1087 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1088 let out = self.store.out_degree(node);
1089 let in_degree = self.store.in_degree(node);
1090 (out, in_degree)
1091 }
1092
1093 #[must_use]
1103 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1104 let (epoch, tx_id) = self.get_transaction_context();
1105 let tx = tx_id.unwrap_or(TxId::SYSTEM);
1106 ids.iter()
1107 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1108 .collect()
1109 }
1110
1111 #[cfg(feature = "cdc")]
1115 pub fn history(
1116 &self,
1117 entity_id: impl Into<crate::cdc::EntityId>,
1118 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1119 Ok(self.cdc_log.history(entity_id.into()))
1120 }
1121
1122 #[cfg(feature = "cdc")]
1124 pub fn history_since(
1125 &self,
1126 entity_id: impl Into<crate::cdc::EntityId>,
1127 since_epoch: EpochId,
1128 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1129 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1130 }
1131
1132 #[cfg(feature = "cdc")]
1134 pub fn changes_between(
1135 &self,
1136 start_epoch: EpochId,
1137 end_epoch: EpochId,
1138 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1139 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1140 }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145 use crate::database::GrafeoDB;
1146
1147 #[test]
1148 fn test_session_create_node() {
1149 let db = GrafeoDB::new_in_memory();
1150 let session = db.session();
1151
1152 let id = session.create_node(&["Person"]);
1153 assert!(id.is_valid());
1154 assert_eq!(db.node_count(), 1);
1155 }
1156
1157 #[test]
1158 fn test_session_transaction() {
1159 let db = GrafeoDB::new_in_memory();
1160 let mut session = db.session();
1161
1162 assert!(!session.in_transaction());
1163
1164 session.begin_tx().unwrap();
1165 assert!(session.in_transaction());
1166
1167 session.commit().unwrap();
1168 assert!(!session.in_transaction());
1169 }
1170
1171 #[test]
1172 fn test_session_transaction_context() {
1173 let db = GrafeoDB::new_in_memory();
1174 let mut session = db.session();
1175
1176 let (_epoch1, tx_id1) = session.get_transaction_context();
1178 assert!(tx_id1.is_none());
1179
1180 session.begin_tx().unwrap();
1182 let (epoch2, tx_id2) = session.get_transaction_context();
1183 assert!(tx_id2.is_some());
1184 let _ = epoch2; session.commit().unwrap();
1189 let (epoch3, tx_id3) = session.get_transaction_context();
1190 assert!(tx_id3.is_none());
1191 assert!(epoch3.as_u64() >= epoch2.as_u64());
1193 }
1194
1195 #[test]
1196 fn test_session_rollback() {
1197 let db = GrafeoDB::new_in_memory();
1198 let mut session = db.session();
1199
1200 session.begin_tx().unwrap();
1201 session.rollback().unwrap();
1202 assert!(!session.in_transaction());
1203 }
1204
1205 #[test]
1206 fn test_session_rollback_discards_versions() {
1207 use grafeo_common::types::TxId;
1208
1209 let db = GrafeoDB::new_in_memory();
1210
1211 let node_before = db.store().create_node(&["Person"]);
1213 assert!(node_before.is_valid());
1214 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1215
1216 let mut session = db.session();
1218 session.begin_tx().unwrap();
1219 let tx_id = session.current_tx.unwrap();
1220
1221 let epoch = db.store().current_epoch();
1223 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1224 assert!(node_in_tx.is_valid());
1225
1226 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1228
1229 session.rollback().unwrap();
1231 assert!(!session.in_transaction());
1232
1233 let count_after = db.node_count();
1236 assert_eq!(
1237 count_after, 1,
1238 "Rollback should discard uncommitted node, but got {count_after}"
1239 );
1240
1241 let current_epoch = db.store().current_epoch();
1243 assert!(
1244 db.store()
1245 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1246 .is_some(),
1247 "Original node should still exist"
1248 );
1249
1250 assert!(
1252 db.store()
1253 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1254 .is_none(),
1255 "Transaction node should be gone"
1256 );
1257 }
1258
1259 #[test]
1260 fn test_session_create_node_in_transaction() {
1261 let db = GrafeoDB::new_in_memory();
1263
1264 let node_before = db.create_node(&["Person"]);
1266 assert!(node_before.is_valid());
1267 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1268
1269 let mut session = db.session();
1271 session.begin_tx().unwrap();
1272
1273 let node_in_tx = session.create_node(&["Person"]);
1275 assert!(node_in_tx.is_valid());
1276
1277 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1279
1280 session.rollback().unwrap();
1282
1283 let count_after = db.node_count();
1285 assert_eq!(
1286 count_after, 1,
1287 "Rollback should discard node created via session.create_node(), but got {count_after}"
1288 );
1289 }
1290
1291 #[test]
1292 fn test_session_create_node_with_props_in_transaction() {
1293 use grafeo_common::types::Value;
1294
1295 let db = GrafeoDB::new_in_memory();
1297
1298 db.create_node(&["Person"]);
1300 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1301
1302 let mut session = db.session();
1304 session.begin_tx().unwrap();
1305
1306 let node_in_tx =
1307 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1308 assert!(node_in_tx.is_valid());
1309
1310 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1312
1313 session.rollback().unwrap();
1315
1316 let count_after = db.node_count();
1318 assert_eq!(
1319 count_after, 1,
1320 "Rollback should discard node created via session.create_node_with_props()"
1321 );
1322 }
1323
1324 #[cfg(feature = "gql")]
1325 mod gql_tests {
1326 use super::*;
1327
1328 #[test]
1329 fn test_gql_query_execution() {
1330 let db = GrafeoDB::new_in_memory();
1331 let session = db.session();
1332
1333 session.create_node(&["Person"]);
1335 session.create_node(&["Person"]);
1336 session.create_node(&["Animal"]);
1337
1338 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1340
1341 assert_eq!(result.row_count(), 2);
1343 assert_eq!(result.column_count(), 1);
1344 assert_eq!(result.columns[0], "n");
1345 }
1346
1347 #[test]
1348 fn test_gql_empty_result() {
1349 let db = GrafeoDB::new_in_memory();
1350 let session = db.session();
1351
1352 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1354
1355 assert_eq!(result.row_count(), 0);
1356 }
1357
1358 #[test]
1359 fn test_gql_parse_error() {
1360 let db = GrafeoDB::new_in_memory();
1361 let session = db.session();
1362
1363 let result = session.execute("MATCH (n RETURN n");
1365
1366 assert!(result.is_err());
1367 }
1368
1369 #[test]
1370 fn test_gql_relationship_traversal() {
1371 let db = GrafeoDB::new_in_memory();
1372 let session = db.session();
1373
1374 let alice = session.create_node(&["Person"]);
1376 let bob = session.create_node(&["Person"]);
1377 let charlie = session.create_node(&["Person"]);
1378
1379 session.create_edge(alice, bob, "KNOWS");
1380 session.create_edge(alice, charlie, "KNOWS");
1381
1382 let result = session
1384 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1385 .unwrap();
1386
1387 assert_eq!(result.row_count(), 2);
1389 assert_eq!(result.column_count(), 2);
1390 assert_eq!(result.columns[0], "a");
1391 assert_eq!(result.columns[1], "b");
1392 }
1393
1394 #[test]
1395 fn test_gql_relationship_with_type_filter() {
1396 let db = GrafeoDB::new_in_memory();
1397 let session = db.session();
1398
1399 let alice = session.create_node(&["Person"]);
1401 let bob = session.create_node(&["Person"]);
1402 let charlie = session.create_node(&["Person"]);
1403
1404 session.create_edge(alice, bob, "KNOWS");
1405 session.create_edge(alice, charlie, "WORKS_WITH");
1406
1407 let result = session
1409 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1410 .unwrap();
1411
1412 assert_eq!(result.row_count(), 1);
1414 }
1415
1416 #[test]
1417 fn test_gql_semantic_error_undefined_variable() {
1418 let db = GrafeoDB::new_in_memory();
1419 let session = db.session();
1420
1421 let result = session.execute("MATCH (n:Person) RETURN x");
1423
1424 assert!(result.is_err());
1426 let Err(err) = result else {
1427 panic!("Expected error")
1428 };
1429 assert!(
1430 err.to_string().contains("Undefined variable"),
1431 "Expected undefined variable error, got: {}",
1432 err
1433 );
1434 }
1435
1436 #[test]
1437 fn test_gql_where_clause_property_filter() {
1438 use grafeo_common::types::Value;
1439
1440 let db = GrafeoDB::new_in_memory();
1441 let session = db.session();
1442
1443 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1445 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1446 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1447
1448 let result = session
1450 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1451 .unwrap();
1452
1453 assert_eq!(result.row_count(), 2);
1455 }
1456
1457 #[test]
1458 fn test_gql_where_clause_equality() {
1459 use grafeo_common::types::Value;
1460
1461 let db = GrafeoDB::new_in_memory();
1462 let session = db.session();
1463
1464 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1466 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1467 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1468
1469 let result = session
1471 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1472 .unwrap();
1473
1474 assert_eq!(result.row_count(), 2);
1476 }
1477
1478 #[test]
1479 fn test_gql_return_property_access() {
1480 use grafeo_common::types::Value;
1481
1482 let db = GrafeoDB::new_in_memory();
1483 let session = db.session();
1484
1485 session.create_node_with_props(
1487 &["Person"],
1488 [
1489 ("name", Value::String("Alice".into())),
1490 ("age", Value::Int64(30)),
1491 ],
1492 );
1493 session.create_node_with_props(
1494 &["Person"],
1495 [
1496 ("name", Value::String("Bob".into())),
1497 ("age", Value::Int64(25)),
1498 ],
1499 );
1500
1501 let result = session
1503 .execute("MATCH (n:Person) RETURN n.name, n.age")
1504 .unwrap();
1505
1506 assert_eq!(result.row_count(), 2);
1508 assert_eq!(result.column_count(), 2);
1509 assert_eq!(result.columns[0], "n.name");
1510 assert_eq!(result.columns[1], "n.age");
1511
1512 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1514 assert!(names.contains(&&Value::String("Alice".into())));
1515 assert!(names.contains(&&Value::String("Bob".into())));
1516 }
1517
1518 #[test]
1519 fn test_gql_return_mixed_expressions() {
1520 use grafeo_common::types::Value;
1521
1522 let db = GrafeoDB::new_in_memory();
1523 let session = db.session();
1524
1525 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1527
1528 let result = session
1530 .execute("MATCH (n:Person) RETURN n, n.name")
1531 .unwrap();
1532
1533 assert_eq!(result.row_count(), 1);
1534 assert_eq!(result.column_count(), 2);
1535 assert_eq!(result.columns[0], "n");
1536 assert_eq!(result.columns[1], "n.name");
1537
1538 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1540 }
1541 }
1542
1543 #[cfg(feature = "cypher")]
1544 mod cypher_tests {
1545 use super::*;
1546
1547 #[test]
1548 fn test_cypher_query_execution() {
1549 let db = GrafeoDB::new_in_memory();
1550 let session = db.session();
1551
1552 session.create_node(&["Person"]);
1554 session.create_node(&["Person"]);
1555 session.create_node(&["Animal"]);
1556
1557 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1559
1560 assert_eq!(result.row_count(), 2);
1562 assert_eq!(result.column_count(), 1);
1563 assert_eq!(result.columns[0], "n");
1564 }
1565
1566 #[test]
1567 fn test_cypher_empty_result() {
1568 let db = GrafeoDB::new_in_memory();
1569 let session = db.session();
1570
1571 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1573
1574 assert_eq!(result.row_count(), 0);
1575 }
1576
1577 #[test]
1578 fn test_cypher_parse_error() {
1579 let db = GrafeoDB::new_in_memory();
1580 let session = db.session();
1581
1582 let result = session.execute_cypher("MATCH (n RETURN n");
1584
1585 assert!(result.is_err());
1586 }
1587 }
1588
1589 mod direct_lookup_tests {
1592 use super::*;
1593 use grafeo_common::types::Value;
1594
1595 #[test]
1596 fn test_get_node() {
1597 let db = GrafeoDB::new_in_memory();
1598 let session = db.session();
1599
1600 let id = session.create_node(&["Person"]);
1601 let node = session.get_node(id);
1602
1603 assert!(node.is_some());
1604 let node = node.unwrap();
1605 assert_eq!(node.id, id);
1606 }
1607
1608 #[test]
1609 fn test_get_node_not_found() {
1610 use grafeo_common::types::NodeId;
1611
1612 let db = GrafeoDB::new_in_memory();
1613 let session = db.session();
1614
1615 let node = session.get_node(NodeId::new(9999));
1617 assert!(node.is_none());
1618 }
1619
1620 #[test]
1621 fn test_get_node_property() {
1622 let db = GrafeoDB::new_in_memory();
1623 let session = db.session();
1624
1625 let id = session
1626 .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1627
1628 let name = session.get_node_property(id, "name");
1629 assert_eq!(name, Some(Value::String("Alice".into())));
1630
1631 let missing = session.get_node_property(id, "missing");
1633 assert!(missing.is_none());
1634 }
1635
1636 #[test]
1637 fn test_get_edge() {
1638 let db = GrafeoDB::new_in_memory();
1639 let session = db.session();
1640
1641 let alice = session.create_node(&["Person"]);
1642 let bob = session.create_node(&["Person"]);
1643 let edge_id = session.create_edge(alice, bob, "KNOWS");
1644
1645 let edge = session.get_edge(edge_id);
1646 assert!(edge.is_some());
1647 let edge = edge.unwrap();
1648 assert_eq!(edge.id, edge_id);
1649 assert_eq!(edge.src, alice);
1650 assert_eq!(edge.dst, bob);
1651 }
1652
1653 #[test]
1654 fn test_get_edge_not_found() {
1655 use grafeo_common::types::EdgeId;
1656
1657 let db = GrafeoDB::new_in_memory();
1658 let session = db.session();
1659
1660 let edge = session.get_edge(EdgeId::new(9999));
1661 assert!(edge.is_none());
1662 }
1663
1664 #[test]
1665 fn test_get_neighbors_outgoing() {
1666 let db = GrafeoDB::new_in_memory();
1667 let session = db.session();
1668
1669 let alice = session.create_node(&["Person"]);
1670 let bob = session.create_node(&["Person"]);
1671 let carol = session.create_node(&["Person"]);
1672
1673 session.create_edge(alice, bob, "KNOWS");
1674 session.create_edge(alice, carol, "KNOWS");
1675
1676 let neighbors = session.get_neighbors_outgoing(alice);
1677 assert_eq!(neighbors.len(), 2);
1678
1679 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1680 assert!(neighbor_ids.contains(&bob));
1681 assert!(neighbor_ids.contains(&carol));
1682 }
1683
1684 #[test]
1685 fn test_get_neighbors_incoming() {
1686 let db = GrafeoDB::new_in_memory();
1687 let session = db.session();
1688
1689 let alice = session.create_node(&["Person"]);
1690 let bob = session.create_node(&["Person"]);
1691 let carol = session.create_node(&["Person"]);
1692
1693 session.create_edge(bob, alice, "KNOWS");
1694 session.create_edge(carol, alice, "KNOWS");
1695
1696 let neighbors = session.get_neighbors_incoming(alice);
1697 assert_eq!(neighbors.len(), 2);
1698
1699 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1700 assert!(neighbor_ids.contains(&bob));
1701 assert!(neighbor_ids.contains(&carol));
1702 }
1703
1704 #[test]
1705 fn test_get_neighbors_outgoing_by_type() {
1706 let db = GrafeoDB::new_in_memory();
1707 let session = db.session();
1708
1709 let alice = session.create_node(&["Person"]);
1710 let bob = session.create_node(&["Person"]);
1711 let company = session.create_node(&["Company"]);
1712
1713 session.create_edge(alice, bob, "KNOWS");
1714 session.create_edge(alice, company, "WORKS_AT");
1715
1716 let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1717 assert_eq!(knows_neighbors.len(), 1);
1718 assert_eq!(knows_neighbors[0].0, bob);
1719
1720 let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1721 assert_eq!(works_neighbors.len(), 1);
1722 assert_eq!(works_neighbors[0].0, company);
1723
1724 let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1726 assert!(no_neighbors.is_empty());
1727 }
1728
1729 #[test]
1730 fn test_node_exists() {
1731 use grafeo_common::types::NodeId;
1732
1733 let db = GrafeoDB::new_in_memory();
1734 let session = db.session();
1735
1736 let id = session.create_node(&["Person"]);
1737
1738 assert!(session.node_exists(id));
1739 assert!(!session.node_exists(NodeId::new(9999)));
1740 }
1741
1742 #[test]
1743 fn test_edge_exists() {
1744 use grafeo_common::types::EdgeId;
1745
1746 let db = GrafeoDB::new_in_memory();
1747 let session = db.session();
1748
1749 let alice = session.create_node(&["Person"]);
1750 let bob = session.create_node(&["Person"]);
1751 let edge_id = session.create_edge(alice, bob, "KNOWS");
1752
1753 assert!(session.edge_exists(edge_id));
1754 assert!(!session.edge_exists(EdgeId::new(9999)));
1755 }
1756
1757 #[test]
1758 fn test_get_degree() {
1759 let db = GrafeoDB::new_in_memory();
1760 let session = db.session();
1761
1762 let alice = session.create_node(&["Person"]);
1763 let bob = session.create_node(&["Person"]);
1764 let carol = session.create_node(&["Person"]);
1765
1766 session.create_edge(alice, bob, "KNOWS");
1768 session.create_edge(alice, carol, "KNOWS");
1769 session.create_edge(bob, alice, "KNOWS");
1771
1772 let (out_degree, in_degree) = session.get_degree(alice);
1773 assert_eq!(out_degree, 2);
1774 assert_eq!(in_degree, 1);
1775
1776 let lonely = session.create_node(&["Person"]);
1778 let (out, in_deg) = session.get_degree(lonely);
1779 assert_eq!(out, 0);
1780 assert_eq!(in_deg, 0);
1781 }
1782
1783 #[test]
1784 fn test_get_nodes_batch() {
1785 let db = GrafeoDB::new_in_memory();
1786 let session = db.session();
1787
1788 let alice = session.create_node(&["Person"]);
1789 let bob = session.create_node(&["Person"]);
1790 let carol = session.create_node(&["Person"]);
1791
1792 let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1793 assert_eq!(nodes.len(), 3);
1794 assert!(nodes[0].is_some());
1795 assert!(nodes[1].is_some());
1796 assert!(nodes[2].is_some());
1797
1798 use grafeo_common::types::NodeId;
1800 let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1801 assert_eq!(nodes_with_missing.len(), 3);
1802 assert!(nodes_with_missing[0].is_some());
1803 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
1805 }
1806
1807 #[test]
1808 fn test_auto_commit_setting() {
1809 let db = GrafeoDB::new_in_memory();
1810 let mut session = db.session();
1811
1812 assert!(session.auto_commit());
1814
1815 session.set_auto_commit(false);
1816 assert!(!session.auto_commit());
1817
1818 session.set_auto_commit(true);
1819 assert!(session.auto_commit());
1820 }
1821
1822 #[test]
1823 fn test_transaction_double_begin_error() {
1824 let db = GrafeoDB::new_in_memory();
1825 let mut session = db.session();
1826
1827 session.begin_tx().unwrap();
1828 let result = session.begin_tx();
1829
1830 assert!(result.is_err());
1831 session.rollback().unwrap();
1833 }
1834
1835 #[test]
1836 fn test_commit_without_transaction_error() {
1837 let db = GrafeoDB::new_in_memory();
1838 let mut session = db.session();
1839
1840 let result = session.commit();
1841 assert!(result.is_err());
1842 }
1843
1844 #[test]
1845 fn test_rollback_without_transaction_error() {
1846 let db = GrafeoDB::new_in_memory();
1847 let mut session = db.session();
1848
1849 let result = session.rollback();
1850 assert!(result.is_err());
1851 }
1852
1853 #[test]
1854 fn test_create_edge_in_transaction() {
1855 let db = GrafeoDB::new_in_memory();
1856 let mut session = db.session();
1857
1858 let alice = session.create_node(&["Person"]);
1860 let bob = session.create_node(&["Person"]);
1861
1862 session.begin_tx().unwrap();
1864 let edge_id = session.create_edge(alice, bob, "KNOWS");
1865
1866 assert!(session.edge_exists(edge_id));
1868
1869 session.commit().unwrap();
1871
1872 assert!(session.edge_exists(edge_id));
1874 }
1875
1876 #[test]
1877 fn test_neighbors_empty_node() {
1878 let db = GrafeoDB::new_in_memory();
1879 let session = db.session();
1880
1881 let lonely = session.create_node(&["Person"]);
1882
1883 assert!(session.get_neighbors_outgoing(lonely).is_empty());
1884 assert!(session.get_neighbors_incoming(lonely).is_empty());
1885 assert!(
1886 session
1887 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1888 .is_empty()
1889 );
1890 }
1891 }
1892}