1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::config::{AdaptiveConfig, GraphModel};
20use crate::database::QueryResult;
21use crate::query::cache::QueryCache;
22use crate::transaction::TransactionManager;
23
24pub struct Session {
30 store: Arc<LpgStore>,
32 graph_store: Arc<dyn GraphStoreMut>,
34 #[cfg(feature = "rdf")]
36 rdf_store: Arc<RdfStore>,
37 tx_manager: Arc<TransactionManager>,
39 query_cache: Arc<QueryCache>,
41 current_tx: Option<TxId>,
43 auto_commit: bool,
45 #[allow(dead_code)]
47 adaptive_config: AdaptiveConfig,
48 factorized_execution: bool,
50 graph_model: GraphModel,
52 query_timeout: Option<Duration>,
54 commit_counter: Arc<AtomicUsize>,
56 gc_interval: usize,
58 #[cfg(feature = "cdc")]
60 cdc_log: Arc<crate::cdc::CdcLog>,
61}
62
63impl Session {
64 #[allow(dead_code, clippy::too_many_arguments)]
66 pub(crate) fn with_adaptive(
67 store: Arc<LpgStore>,
68 tx_manager: Arc<TransactionManager>,
69 query_cache: Arc<QueryCache>,
70 adaptive_config: AdaptiveConfig,
71 factorized_execution: bool,
72 graph_model: GraphModel,
73 query_timeout: Option<Duration>,
74 commit_counter: Arc<AtomicUsize>,
75 gc_interval: usize,
76 ) -> Self {
77 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
78 Self {
79 store,
80 graph_store,
81 #[cfg(feature = "rdf")]
82 rdf_store: Arc::new(RdfStore::new()),
83 tx_manager,
84 query_cache,
85 current_tx: None,
86 auto_commit: true,
87 adaptive_config,
88 factorized_execution,
89 graph_model,
90 query_timeout,
91 commit_counter,
92 gc_interval,
93 #[cfg(feature = "cdc")]
94 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
95 }
96 }
97
98 #[cfg(feature = "cdc")]
100 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
101 self.cdc_log = cdc_log;
102 }
103
104 #[cfg(feature = "rdf")]
106 #[allow(clippy::too_many_arguments)]
107 pub(crate) fn with_rdf_store_and_adaptive(
108 store: Arc<LpgStore>,
109 rdf_store: Arc<RdfStore>,
110 tx_manager: Arc<TransactionManager>,
111 query_cache: Arc<QueryCache>,
112 adaptive_config: AdaptiveConfig,
113 factorized_execution: bool,
114 graph_model: GraphModel,
115 query_timeout: Option<Duration>,
116 commit_counter: Arc<AtomicUsize>,
117 gc_interval: usize,
118 ) -> Self {
119 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
120 Self {
121 store,
122 graph_store,
123 rdf_store,
124 tx_manager,
125 query_cache,
126 current_tx: None,
127 auto_commit: true,
128 adaptive_config,
129 factorized_execution,
130 graph_model,
131 query_timeout,
132 commit_counter,
133 gc_interval,
134 #[cfg(feature = "cdc")]
135 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
136 }
137 }
138
139 #[allow(clippy::too_many_arguments)]
144 pub(crate) fn with_external_store(
145 store: Arc<dyn GraphStoreMut>,
146 tx_manager: Arc<TransactionManager>,
147 query_cache: Arc<QueryCache>,
148 adaptive_config: AdaptiveConfig,
149 factorized_execution: bool,
150 graph_model: GraphModel,
151 query_timeout: Option<Duration>,
152 commit_counter: Arc<AtomicUsize>,
153 gc_interval: usize,
154 ) -> Self {
155 Self {
156 store: Arc::new(LpgStore::new()), graph_store: store,
158 #[cfg(feature = "rdf")]
159 rdf_store: Arc::new(RdfStore::new()),
160 tx_manager,
161 query_cache,
162 current_tx: None,
163 auto_commit: true,
164 adaptive_config,
165 factorized_execution,
166 graph_model,
167 query_timeout,
168 commit_counter,
169 gc_interval,
170 #[cfg(feature = "cdc")]
171 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
172 }
173 }
174
175 #[must_use]
177 pub fn graph_model(&self) -> GraphModel {
178 self.graph_model
179 }
180
181 fn require_lpg(&self, language: &str) -> Result<()> {
183 if self.graph_model == GraphModel::Rdf {
184 return Err(grafeo_common::utils::error::Error::Internal(format!(
185 "This is an RDF database. {language} queries require an LPG database."
186 )));
187 }
188 Ok(())
189 }
190
191 #[cfg(feature = "gql")]
218 pub fn execute(&self, query: &str) -> Result<QueryResult> {
219 self.require_lpg("GQL")?;
220
221 use crate::query::{
222 Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
223 optimizer::Optimizer, processor::QueryLanguage,
224 };
225
226 let start_time = std::time::Instant::now();
227
228 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
230
231 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
233 cached_plan
235 } else {
236 let logical_plan = gql_translator::translate(query)?;
240
241 let mut binder = Binder::new();
243 let _binding_context = binder.bind(&logical_plan)?;
244
245 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
247 let plan = optimizer.optimize(logical_plan)?;
248
249 self.query_cache.put_optimized(cache_key, plan.clone());
251
252 plan
253 };
254
255 let (viewing_epoch, tx_id) = self.get_transaction_context();
257
258 let planner = Planner::with_context(
261 Arc::clone(&self.graph_store),
262 Arc::clone(&self.tx_manager),
263 tx_id,
264 viewing_epoch,
265 )
266 .with_factorized_execution(self.factorized_execution);
267 let mut physical_plan = planner.plan(&optimized_plan)?;
268
269 let executor = Executor::with_columns(physical_plan.columns.clone())
271 .with_deadline(self.query_deadline());
272 let mut result = executor.execute(physical_plan.operator.as_mut())?;
273
274 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
276 let rows_scanned = result.rows.len() as u64;
277 result.execution_time_ms = Some(elapsed_ms);
278 result.rows_scanned = Some(rows_scanned);
279
280 Ok(result)
281 }
282
283 #[cfg(feature = "gql")]
289 pub fn execute_with_params(
290 &self,
291 query: &str,
292 params: std::collections::HashMap<String, Value>,
293 ) -> Result<QueryResult> {
294 self.require_lpg("GQL")?;
295
296 use crate::query::processor::{QueryLanguage, QueryProcessor};
297
298 let (viewing_epoch, tx_id) = self.get_transaction_context();
300
301 let processor = QueryProcessor::for_graph_store_with_tx(
303 Arc::clone(&self.graph_store),
304 Arc::clone(&self.tx_manager),
305 );
306
307 let processor = if let Some(tx_id) = tx_id {
309 processor.with_tx_context(viewing_epoch, tx_id)
310 } else {
311 processor
312 };
313
314 processor.process(query, QueryLanguage::Gql, Some(¶ms))
315 }
316
317 #[cfg(not(any(feature = "gql", feature = "cypher")))]
323 pub fn execute_with_params(
324 &self,
325 _query: &str,
326 _params: std::collections::HashMap<String, Value>,
327 ) -> Result<QueryResult> {
328 Err(grafeo_common::utils::error::Error::Internal(
329 "No query language enabled".to_string(),
330 ))
331 }
332
333 #[cfg(not(any(feature = "gql", feature = "cypher")))]
339 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
340 Err(grafeo_common::utils::error::Error::Internal(
341 "No query language enabled".to_string(),
342 ))
343 }
344
345 #[cfg(feature = "cypher")]
351 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
352 use crate::query::{
353 Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
354 optimizer::Optimizer, processor::QueryLanguage,
355 };
356
357 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
359
360 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
362 cached_plan
363 } else {
364 let logical_plan = cypher_translator::translate(query)?;
366
367 let mut binder = Binder::new();
369 let _binding_context = binder.bind(&logical_plan)?;
370
371 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
373 let plan = optimizer.optimize(logical_plan)?;
374
375 self.query_cache.put_optimized(cache_key, plan.clone());
377
378 plan
379 };
380
381 let (viewing_epoch, tx_id) = self.get_transaction_context();
383
384 let planner = Planner::with_context(
386 Arc::clone(&self.graph_store),
387 Arc::clone(&self.tx_manager),
388 tx_id,
389 viewing_epoch,
390 )
391 .with_factorized_execution(self.factorized_execution);
392 let mut physical_plan = planner.plan(&optimized_plan)?;
393
394 let executor = Executor::with_columns(physical_plan.columns.clone())
396 .with_deadline(self.query_deadline());
397 let result = executor.execute(physical_plan.operator.as_mut())?;
398 Ok(result)
399 }
400
401 #[cfg(feature = "gremlin")]
425 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
426 use crate::query::{
427 Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
428 };
429
430 let logical_plan = gremlin_translator::translate(query)?;
432
433 let mut binder = Binder::new();
435 let _binding_context = binder.bind(&logical_plan)?;
436
437 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
439 let optimized_plan = optimizer.optimize(logical_plan)?;
440
441 let (viewing_epoch, tx_id) = self.get_transaction_context();
443
444 let planner = Planner::with_context(
446 Arc::clone(&self.graph_store),
447 Arc::clone(&self.tx_manager),
448 tx_id,
449 viewing_epoch,
450 )
451 .with_factorized_execution(self.factorized_execution);
452 let mut physical_plan = planner.plan(&optimized_plan)?;
453
454 let executor = Executor::with_columns(physical_plan.columns.clone())
456 .with_deadline(self.query_deadline());
457 let result = executor.execute(physical_plan.operator.as_mut())?;
458 Ok(result)
459 }
460
461 #[cfg(feature = "gremlin")]
467 pub fn execute_gremlin_with_params(
468 &self,
469 query: &str,
470 params: std::collections::HashMap<String, Value>,
471 ) -> Result<QueryResult> {
472 use crate::query::processor::{QueryLanguage, QueryProcessor};
473
474 let (viewing_epoch, tx_id) = self.get_transaction_context();
476
477 let processor = QueryProcessor::for_graph_store_with_tx(
479 Arc::clone(&self.graph_store),
480 Arc::clone(&self.tx_manager),
481 );
482
483 let processor = if let Some(tx_id) = tx_id {
485 processor.with_tx_context(viewing_epoch, tx_id)
486 } else {
487 processor
488 };
489
490 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
491 }
492
493 #[cfg(feature = "graphql")]
517 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
518 use crate::query::{
519 Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
520 };
521
522 let logical_plan = graphql_translator::translate(query)?;
524
525 let mut binder = Binder::new();
527 let _binding_context = binder.bind(&logical_plan)?;
528
529 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
531 let optimized_plan = optimizer.optimize(logical_plan)?;
532
533 let (viewing_epoch, tx_id) = self.get_transaction_context();
535
536 let planner = Planner::with_context(
538 Arc::clone(&self.graph_store),
539 Arc::clone(&self.tx_manager),
540 tx_id,
541 viewing_epoch,
542 )
543 .with_factorized_execution(self.factorized_execution);
544 let mut physical_plan = planner.plan(&optimized_plan)?;
545
546 let executor = Executor::with_columns(physical_plan.columns.clone())
548 .with_deadline(self.query_deadline());
549 let result = executor.execute(physical_plan.operator.as_mut())?;
550 Ok(result)
551 }
552
553 #[cfg(feature = "graphql")]
559 pub fn execute_graphql_with_params(
560 &self,
561 query: &str,
562 params: std::collections::HashMap<String, Value>,
563 ) -> Result<QueryResult> {
564 use crate::query::processor::{QueryLanguage, QueryProcessor};
565
566 let (viewing_epoch, tx_id) = self.get_transaction_context();
568
569 let processor = QueryProcessor::for_graph_store_with_tx(
571 Arc::clone(&self.graph_store),
572 Arc::clone(&self.tx_manager),
573 );
574
575 let processor = if let Some(tx_id) = tx_id {
577 processor.with_tx_context(viewing_epoch, tx_id)
578 } else {
579 processor
580 };
581
582 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
583 }
584
585 #[cfg(feature = "sql-pgq")]
610 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
611 use crate::query::{
612 Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
613 plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
614 };
615
616 let logical_plan = sql_pgq_translator::translate(query)?;
618
619 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
621 return Ok(QueryResult {
622 columns: vec!["status".into()],
623 column_types: vec![grafeo_common::types::LogicalType::String],
624 rows: vec![vec![Value::from(format!(
625 "Property graph '{}' created ({} node tables, {} edge tables)",
626 cpg.name,
627 cpg.node_tables.len(),
628 cpg.edge_tables.len()
629 ))]],
630 execution_time_ms: None,
631 rows_scanned: None,
632 });
633 }
634
635 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
637
638 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
640 cached_plan
641 } else {
642 let mut binder = Binder::new();
644 let _binding_context = binder.bind(&logical_plan)?;
645
646 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
648 let plan = optimizer.optimize(logical_plan)?;
649
650 self.query_cache.put_optimized(cache_key, plan.clone());
652
653 plan
654 };
655
656 let (viewing_epoch, tx_id) = self.get_transaction_context();
658
659 let planner = Planner::with_context(
661 Arc::clone(&self.graph_store),
662 Arc::clone(&self.tx_manager),
663 tx_id,
664 viewing_epoch,
665 )
666 .with_factorized_execution(self.factorized_execution);
667 let mut physical_plan = planner.plan(&optimized_plan)?;
668
669 let executor = Executor::with_columns(physical_plan.columns.clone())
671 .with_deadline(self.query_deadline());
672 let result = executor.execute(physical_plan.operator.as_mut())?;
673 Ok(result)
674 }
675
676 #[cfg(feature = "sql-pgq")]
682 pub fn execute_sql_with_params(
683 &self,
684 query: &str,
685 params: std::collections::HashMap<String, Value>,
686 ) -> Result<QueryResult> {
687 use crate::query::processor::{QueryLanguage, QueryProcessor};
688
689 let (viewing_epoch, tx_id) = self.get_transaction_context();
691
692 let processor = QueryProcessor::for_graph_store_with_tx(
694 Arc::clone(&self.graph_store),
695 Arc::clone(&self.tx_manager),
696 );
697
698 let processor = if let Some(tx_id) = tx_id {
700 processor.with_tx_context(viewing_epoch, tx_id)
701 } else {
702 processor
703 };
704
705 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
706 }
707
708 #[cfg(all(feature = "sparql", feature = "rdf"))]
714 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
715 use crate::query::{
716 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
717 };
718
719 let logical_plan = sparql_translator::translate(query)?;
721
722 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
724 let optimized_plan = optimizer.optimize(logical_plan)?;
725
726 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
728 let mut physical_plan = planner.plan(&optimized_plan)?;
729
730 let executor = Executor::with_columns(physical_plan.columns.clone())
732 .with_deadline(self.query_deadline());
733 executor.execute(physical_plan.operator.as_mut())
734 }
735
736 #[cfg(all(feature = "sparql", feature = "rdf"))]
742 pub fn execute_sparql_with_params(
743 &self,
744 query: &str,
745 _params: std::collections::HashMap<String, Value>,
746 ) -> Result<QueryResult> {
747 self.execute_sparql(query)
750 }
751
752 pub fn execute_language(
761 &self,
762 query: &str,
763 language: &str,
764 params: Option<std::collections::HashMap<String, Value>>,
765 ) -> Result<QueryResult> {
766 match language {
767 "gql" => {
768 if let Some(p) = params {
769 self.execute_with_params(query, p)
770 } else {
771 self.execute(query)
772 }
773 }
774 #[cfg(feature = "cypher")]
775 "cypher" => {
776 if let Some(p) = params {
777 use crate::query::processor::{QueryLanguage, QueryProcessor};
778 let processor = QueryProcessor::for_graph_store_with_tx(
779 Arc::clone(&self.graph_store),
780 Arc::clone(&self.tx_manager),
781 );
782 let (viewing_epoch, tx_id) = self.get_transaction_context();
783 let processor = if let Some(tx_id) = tx_id {
784 processor.with_tx_context(viewing_epoch, tx_id)
785 } else {
786 processor
787 };
788 processor.process(query, QueryLanguage::Cypher, Some(&p))
789 } else {
790 self.execute_cypher(query)
791 }
792 }
793 #[cfg(feature = "gremlin")]
794 "gremlin" => {
795 if let Some(p) = params {
796 self.execute_gremlin_with_params(query, p)
797 } else {
798 self.execute_gremlin(query)
799 }
800 }
801 #[cfg(feature = "graphql")]
802 "graphql" => {
803 if let Some(p) = params {
804 self.execute_graphql_with_params(query, p)
805 } else {
806 self.execute_graphql(query)
807 }
808 }
809 #[cfg(feature = "sql-pgq")]
810 "sql" | "sql-pgq" => {
811 if let Some(p) = params {
812 self.execute_sql_with_params(query, p)
813 } else {
814 self.execute_sql(query)
815 }
816 }
817 #[cfg(all(feature = "sparql", feature = "rdf"))]
818 "sparql" => {
819 if let Some(p) = params {
820 self.execute_sparql_with_params(query, p)
821 } else {
822 self.execute_sparql(query)
823 }
824 }
825 other => Err(grafeo_common::utils::error::Error::Query(
826 grafeo_common::utils::error::QueryError::new(
827 grafeo_common::utils::error::QueryErrorKind::Semantic,
828 format!("Unknown query language: '{other}'"),
829 ),
830 )),
831 }
832 }
833
834 pub fn begin_tx(&mut self) -> Result<()> {
857 if self.current_tx.is_some() {
858 return Err(grafeo_common::utils::error::Error::Transaction(
859 grafeo_common::utils::error::TransactionError::InvalidState(
860 "Transaction already active".to_string(),
861 ),
862 ));
863 }
864
865 let tx_id = self.tx_manager.begin();
866 self.current_tx = Some(tx_id);
867 Ok(())
868 }
869
870 pub fn begin_tx_with_isolation(
878 &mut self,
879 isolation_level: crate::transaction::IsolationLevel,
880 ) -> Result<()> {
881 if self.current_tx.is_some() {
882 return Err(grafeo_common::utils::error::Error::Transaction(
883 grafeo_common::utils::error::TransactionError::InvalidState(
884 "Transaction already active".to_string(),
885 ),
886 ));
887 }
888
889 let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
890 self.current_tx = Some(tx_id);
891 Ok(())
892 }
893
894 pub fn commit(&mut self) -> Result<()> {
902 let tx_id = self.current_tx.take().ok_or_else(|| {
903 grafeo_common::utils::error::Error::Transaction(
904 grafeo_common::utils::error::TransactionError::InvalidState(
905 "No active transaction".to_string(),
906 ),
907 )
908 })?;
909
910 #[cfg(feature = "rdf")]
912 self.rdf_store.commit_tx(tx_id);
913
914 self.tx_manager.commit(tx_id)?;
915
916 self.store.sync_epoch(self.tx_manager.current_epoch());
920
921 if self.gc_interval > 0 {
923 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
924 if count.is_multiple_of(self.gc_interval) {
925 let min_epoch = self.tx_manager.min_active_epoch();
926 self.store.gc_versions(min_epoch);
927 self.tx_manager.gc();
928 }
929 }
930
931 Ok(())
932 }
933
934 pub fn rollback(&mut self) -> Result<()> {
958 let tx_id = self.current_tx.take().ok_or_else(|| {
959 grafeo_common::utils::error::Error::Transaction(
960 grafeo_common::utils::error::TransactionError::InvalidState(
961 "No active transaction".to_string(),
962 ),
963 )
964 })?;
965
966 self.store.discard_uncommitted_versions(tx_id);
968
969 #[cfg(feature = "rdf")]
971 self.rdf_store.rollback_tx(tx_id);
972
973 self.tx_manager.abort(tx_id)
975 }
976
977 #[must_use]
979 pub fn in_transaction(&self) -> bool {
980 self.current_tx.is_some()
981 }
982
983 pub fn set_auto_commit(&mut self, auto_commit: bool) {
985 self.auto_commit = auto_commit;
986 }
987
988 #[must_use]
990 pub fn auto_commit(&self) -> bool {
991 self.auto_commit
992 }
993
994 #[must_use]
996 fn query_deadline(&self) -> Option<Instant> {
997 self.query_timeout.map(|d| Instant::now() + d)
998 }
999
1000 #[must_use]
1006 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
1007 if let Some(tx_id) = self.current_tx {
1008 let epoch = self
1010 .tx_manager
1011 .start_epoch(tx_id)
1012 .unwrap_or_else(|| self.tx_manager.current_epoch());
1013 (epoch, Some(tx_id))
1014 } else {
1015 (self.tx_manager.current_epoch(), None)
1017 }
1018 }
1019
1020 pub fn create_node(&self, labels: &[&str]) -> NodeId {
1025 let (epoch, tx_id) = self.get_transaction_context();
1026 self.store
1027 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1028 }
1029
1030 pub fn create_node_with_props<'a>(
1034 &self,
1035 labels: &[&str],
1036 properties: impl IntoIterator<Item = (&'a str, Value)>,
1037 ) -> NodeId {
1038 let (epoch, tx_id) = self.get_transaction_context();
1039 self.store.create_node_with_props_versioned(
1040 labels,
1041 properties.into_iter().map(|(k, v)| (k, v)),
1042 epoch,
1043 tx_id.unwrap_or(TxId::SYSTEM),
1044 )
1045 }
1046
1047 pub fn create_edge(
1052 &self,
1053 src: NodeId,
1054 dst: NodeId,
1055 edge_type: &str,
1056 ) -> grafeo_common::types::EdgeId {
1057 let (epoch, tx_id) = self.get_transaction_context();
1058 self.store
1059 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1060 }
1061
1062 #[must_use]
1090 pub fn get_node(&self, id: NodeId) -> Option<Node> {
1091 let (epoch, tx_id) = self.get_transaction_context();
1092 self.store
1093 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1094 }
1095
1096 #[must_use]
1120 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
1121 self.get_node(id)
1122 .and_then(|node| node.get_property(key).cloned())
1123 }
1124
1125 #[must_use]
1132 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1133 let (epoch, tx_id) = self.get_transaction_context();
1134 self.store
1135 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1136 }
1137
1138 #[must_use]
1164 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1165 self.store.edges_from(node, Direction::Outgoing).collect()
1166 }
1167
1168 #[must_use]
1177 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1178 self.store.edges_from(node, Direction::Incoming).collect()
1179 }
1180
1181 #[must_use]
1193 pub fn get_neighbors_outgoing_by_type(
1194 &self,
1195 node: NodeId,
1196 edge_type: &str,
1197 ) -> Vec<(NodeId, EdgeId)> {
1198 self.store
1199 .edges_from(node, Direction::Outgoing)
1200 .filter(|(_, edge_id)| {
1201 self.get_edge(*edge_id)
1202 .is_some_and(|e| e.edge_type.as_str() == edge_type)
1203 })
1204 .collect()
1205 }
1206
1207 #[must_use]
1214 pub fn node_exists(&self, id: NodeId) -> bool {
1215 self.get_node(id).is_some()
1216 }
1217
1218 #[must_use]
1220 pub fn edge_exists(&self, id: EdgeId) -> bool {
1221 self.get_edge(id).is_some()
1222 }
1223
1224 #[must_use]
1228 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1229 let out = self.store.out_degree(node);
1230 let in_degree = self.store.in_degree(node);
1231 (out, in_degree)
1232 }
1233
1234 #[must_use]
1244 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1245 let (epoch, tx_id) = self.get_transaction_context();
1246 let tx = tx_id.unwrap_or(TxId::SYSTEM);
1247 ids.iter()
1248 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1249 .collect()
1250 }
1251
1252 #[cfg(feature = "cdc")]
1256 pub fn history(
1257 &self,
1258 entity_id: impl Into<crate::cdc::EntityId>,
1259 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1260 Ok(self.cdc_log.history(entity_id.into()))
1261 }
1262
1263 #[cfg(feature = "cdc")]
1265 pub fn history_since(
1266 &self,
1267 entity_id: impl Into<crate::cdc::EntityId>,
1268 since_epoch: EpochId,
1269 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1270 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1271 }
1272
1273 #[cfg(feature = "cdc")]
1275 pub fn changes_between(
1276 &self,
1277 start_epoch: EpochId,
1278 end_epoch: EpochId,
1279 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1280 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1281 }
1282}
1283
1284#[cfg(test)]
1285mod tests {
1286 use crate::database::GrafeoDB;
1287
1288 #[test]
1289 fn test_session_create_node() {
1290 let db = GrafeoDB::new_in_memory();
1291 let session = db.session();
1292
1293 let id = session.create_node(&["Person"]);
1294 assert!(id.is_valid());
1295 assert_eq!(db.node_count(), 1);
1296 }
1297
1298 #[test]
1299 fn test_session_transaction() {
1300 let db = GrafeoDB::new_in_memory();
1301 let mut session = db.session();
1302
1303 assert!(!session.in_transaction());
1304
1305 session.begin_tx().unwrap();
1306 assert!(session.in_transaction());
1307
1308 session.commit().unwrap();
1309 assert!(!session.in_transaction());
1310 }
1311
1312 #[test]
1313 fn test_session_transaction_context() {
1314 let db = GrafeoDB::new_in_memory();
1315 let mut session = db.session();
1316
1317 let (_epoch1, tx_id1) = session.get_transaction_context();
1319 assert!(tx_id1.is_none());
1320
1321 session.begin_tx().unwrap();
1323 let (epoch2, tx_id2) = session.get_transaction_context();
1324 assert!(tx_id2.is_some());
1325 let _ = epoch2; session.commit().unwrap();
1330 let (epoch3, tx_id3) = session.get_transaction_context();
1331 assert!(tx_id3.is_none());
1332 assert!(epoch3.as_u64() >= epoch2.as_u64());
1334 }
1335
1336 #[test]
1337 fn test_session_rollback() {
1338 let db = GrafeoDB::new_in_memory();
1339 let mut session = db.session();
1340
1341 session.begin_tx().unwrap();
1342 session.rollback().unwrap();
1343 assert!(!session.in_transaction());
1344 }
1345
1346 #[test]
1347 fn test_session_rollback_discards_versions() {
1348 use grafeo_common::types::TxId;
1349
1350 let db = GrafeoDB::new_in_memory();
1351
1352 let node_before = db.store().create_node(&["Person"]);
1354 assert!(node_before.is_valid());
1355 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1356
1357 let mut session = db.session();
1359 session.begin_tx().unwrap();
1360 let tx_id = session.current_tx.unwrap();
1361
1362 let epoch = db.store().current_epoch();
1364 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1365 assert!(node_in_tx.is_valid());
1366
1367 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1369
1370 session.rollback().unwrap();
1372 assert!(!session.in_transaction());
1373
1374 let count_after = db.node_count();
1377 assert_eq!(
1378 count_after, 1,
1379 "Rollback should discard uncommitted node, but got {count_after}"
1380 );
1381
1382 let current_epoch = db.store().current_epoch();
1384 assert!(
1385 db.store()
1386 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1387 .is_some(),
1388 "Original node should still exist"
1389 );
1390
1391 assert!(
1393 db.store()
1394 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1395 .is_none(),
1396 "Transaction node should be gone"
1397 );
1398 }
1399
1400 #[test]
1401 fn test_session_create_node_in_transaction() {
1402 let db = GrafeoDB::new_in_memory();
1404
1405 let node_before = db.create_node(&["Person"]);
1407 assert!(node_before.is_valid());
1408 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1409
1410 let mut session = db.session();
1412 session.begin_tx().unwrap();
1413
1414 let node_in_tx = session.create_node(&["Person"]);
1416 assert!(node_in_tx.is_valid());
1417
1418 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1420
1421 session.rollback().unwrap();
1423
1424 let count_after = db.node_count();
1426 assert_eq!(
1427 count_after, 1,
1428 "Rollback should discard node created via session.create_node(), but got {count_after}"
1429 );
1430 }
1431
1432 #[test]
1433 fn test_session_create_node_with_props_in_transaction() {
1434 use grafeo_common::types::Value;
1435
1436 let db = GrafeoDB::new_in_memory();
1438
1439 db.create_node(&["Person"]);
1441 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1442
1443 let mut session = db.session();
1445 session.begin_tx().unwrap();
1446
1447 let node_in_tx =
1448 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1449 assert!(node_in_tx.is_valid());
1450
1451 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1453
1454 session.rollback().unwrap();
1456
1457 let count_after = db.node_count();
1459 assert_eq!(
1460 count_after, 1,
1461 "Rollback should discard node created via session.create_node_with_props()"
1462 );
1463 }
1464
1465 #[cfg(feature = "gql")]
1466 mod gql_tests {
1467 use super::*;
1468
1469 #[test]
1470 fn test_gql_query_execution() {
1471 let db = GrafeoDB::new_in_memory();
1472 let session = db.session();
1473
1474 session.create_node(&["Person"]);
1476 session.create_node(&["Person"]);
1477 session.create_node(&["Animal"]);
1478
1479 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1481
1482 assert_eq!(result.row_count(), 2);
1484 assert_eq!(result.column_count(), 1);
1485 assert_eq!(result.columns[0], "n");
1486 }
1487
1488 #[test]
1489 fn test_gql_empty_result() {
1490 let db = GrafeoDB::new_in_memory();
1491 let session = db.session();
1492
1493 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1495
1496 assert_eq!(result.row_count(), 0);
1497 }
1498
1499 #[test]
1500 fn test_gql_parse_error() {
1501 let db = GrafeoDB::new_in_memory();
1502 let session = db.session();
1503
1504 let result = session.execute("MATCH (n RETURN n");
1506
1507 assert!(result.is_err());
1508 }
1509
1510 #[test]
1511 fn test_gql_relationship_traversal() {
1512 let db = GrafeoDB::new_in_memory();
1513 let session = db.session();
1514
1515 let alice = session.create_node(&["Person"]);
1517 let bob = session.create_node(&["Person"]);
1518 let charlie = session.create_node(&["Person"]);
1519
1520 session.create_edge(alice, bob, "KNOWS");
1521 session.create_edge(alice, charlie, "KNOWS");
1522
1523 let result = session
1525 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1526 .unwrap();
1527
1528 assert_eq!(result.row_count(), 2);
1530 assert_eq!(result.column_count(), 2);
1531 assert_eq!(result.columns[0], "a");
1532 assert_eq!(result.columns[1], "b");
1533 }
1534
1535 #[test]
1536 fn test_gql_relationship_with_type_filter() {
1537 let db = GrafeoDB::new_in_memory();
1538 let session = db.session();
1539
1540 let alice = session.create_node(&["Person"]);
1542 let bob = session.create_node(&["Person"]);
1543 let charlie = session.create_node(&["Person"]);
1544
1545 session.create_edge(alice, bob, "KNOWS");
1546 session.create_edge(alice, charlie, "WORKS_WITH");
1547
1548 let result = session
1550 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1551 .unwrap();
1552
1553 assert_eq!(result.row_count(), 1);
1555 }
1556
1557 #[test]
1558 fn test_gql_semantic_error_undefined_variable() {
1559 let db = GrafeoDB::new_in_memory();
1560 let session = db.session();
1561
1562 let result = session.execute("MATCH (n:Person) RETURN x");
1564
1565 assert!(result.is_err());
1567 let Err(err) = result else {
1568 panic!("Expected error")
1569 };
1570 assert!(
1571 err.to_string().contains("Undefined variable"),
1572 "Expected undefined variable error, got: {}",
1573 err
1574 );
1575 }
1576
1577 #[test]
1578 fn test_gql_where_clause_property_filter() {
1579 use grafeo_common::types::Value;
1580
1581 let db = GrafeoDB::new_in_memory();
1582 let session = db.session();
1583
1584 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1586 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1587 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1588
1589 let result = session
1591 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1592 .unwrap();
1593
1594 assert_eq!(result.row_count(), 2);
1596 }
1597
1598 #[test]
1599 fn test_gql_where_clause_equality() {
1600 use grafeo_common::types::Value;
1601
1602 let db = GrafeoDB::new_in_memory();
1603 let session = db.session();
1604
1605 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1607 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1608 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1609
1610 let result = session
1612 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1613 .unwrap();
1614
1615 assert_eq!(result.row_count(), 2);
1617 }
1618
1619 #[test]
1620 fn test_gql_return_property_access() {
1621 use grafeo_common::types::Value;
1622
1623 let db = GrafeoDB::new_in_memory();
1624 let session = db.session();
1625
1626 session.create_node_with_props(
1628 &["Person"],
1629 [
1630 ("name", Value::String("Alice".into())),
1631 ("age", Value::Int64(30)),
1632 ],
1633 );
1634 session.create_node_with_props(
1635 &["Person"],
1636 [
1637 ("name", Value::String("Bob".into())),
1638 ("age", Value::Int64(25)),
1639 ],
1640 );
1641
1642 let result = session
1644 .execute("MATCH (n:Person) RETURN n.name, n.age")
1645 .unwrap();
1646
1647 assert_eq!(result.row_count(), 2);
1649 assert_eq!(result.column_count(), 2);
1650 assert_eq!(result.columns[0], "n.name");
1651 assert_eq!(result.columns[1], "n.age");
1652
1653 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1655 assert!(names.contains(&&Value::String("Alice".into())));
1656 assert!(names.contains(&&Value::String("Bob".into())));
1657 }
1658
1659 #[test]
1660 fn test_gql_return_mixed_expressions() {
1661 use grafeo_common::types::Value;
1662
1663 let db = GrafeoDB::new_in_memory();
1664 let session = db.session();
1665
1666 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1668
1669 let result = session
1671 .execute("MATCH (n:Person) RETURN n, n.name")
1672 .unwrap();
1673
1674 assert_eq!(result.row_count(), 1);
1675 assert_eq!(result.column_count(), 2);
1676 assert_eq!(result.columns[0], "n");
1677 assert_eq!(result.columns[1], "n.name");
1678
1679 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1681 }
1682 }
1683
1684 #[cfg(feature = "cypher")]
1685 mod cypher_tests {
1686 use super::*;
1687
1688 #[test]
1689 fn test_cypher_query_execution() {
1690 let db = GrafeoDB::new_in_memory();
1691 let session = db.session();
1692
1693 session.create_node(&["Person"]);
1695 session.create_node(&["Person"]);
1696 session.create_node(&["Animal"]);
1697
1698 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1700
1701 assert_eq!(result.row_count(), 2);
1703 assert_eq!(result.column_count(), 1);
1704 assert_eq!(result.columns[0], "n");
1705 }
1706
1707 #[test]
1708 fn test_cypher_empty_result() {
1709 let db = GrafeoDB::new_in_memory();
1710 let session = db.session();
1711
1712 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1714
1715 assert_eq!(result.row_count(), 0);
1716 }
1717
1718 #[test]
1719 fn test_cypher_parse_error() {
1720 let db = GrafeoDB::new_in_memory();
1721 let session = db.session();
1722
1723 let result = session.execute_cypher("MATCH (n RETURN n");
1725
1726 assert!(result.is_err());
1727 }
1728 }
1729
1730 mod direct_lookup_tests {
1733 use super::*;
1734 use grafeo_common::types::Value;
1735
1736 #[test]
1737 fn test_get_node() {
1738 let db = GrafeoDB::new_in_memory();
1739 let session = db.session();
1740
1741 let id = session.create_node(&["Person"]);
1742 let node = session.get_node(id);
1743
1744 assert!(node.is_some());
1745 let node = node.unwrap();
1746 assert_eq!(node.id, id);
1747 }
1748
1749 #[test]
1750 fn test_get_node_not_found() {
1751 use grafeo_common::types::NodeId;
1752
1753 let db = GrafeoDB::new_in_memory();
1754 let session = db.session();
1755
1756 let node = session.get_node(NodeId::new(9999));
1758 assert!(node.is_none());
1759 }
1760
1761 #[test]
1762 fn test_get_node_property() {
1763 let db = GrafeoDB::new_in_memory();
1764 let session = db.session();
1765
1766 let id = session
1767 .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1768
1769 let name = session.get_node_property(id, "name");
1770 assert_eq!(name, Some(Value::String("Alice".into())));
1771
1772 let missing = session.get_node_property(id, "missing");
1774 assert!(missing.is_none());
1775 }
1776
1777 #[test]
1778 fn test_get_edge() {
1779 let db = GrafeoDB::new_in_memory();
1780 let session = db.session();
1781
1782 let alice = session.create_node(&["Person"]);
1783 let bob = session.create_node(&["Person"]);
1784 let edge_id = session.create_edge(alice, bob, "KNOWS");
1785
1786 let edge = session.get_edge(edge_id);
1787 assert!(edge.is_some());
1788 let edge = edge.unwrap();
1789 assert_eq!(edge.id, edge_id);
1790 assert_eq!(edge.src, alice);
1791 assert_eq!(edge.dst, bob);
1792 }
1793
1794 #[test]
1795 fn test_get_edge_not_found() {
1796 use grafeo_common::types::EdgeId;
1797
1798 let db = GrafeoDB::new_in_memory();
1799 let session = db.session();
1800
1801 let edge = session.get_edge(EdgeId::new(9999));
1802 assert!(edge.is_none());
1803 }
1804
1805 #[test]
1806 fn test_get_neighbors_outgoing() {
1807 let db = GrafeoDB::new_in_memory();
1808 let session = db.session();
1809
1810 let alice = session.create_node(&["Person"]);
1811 let bob = session.create_node(&["Person"]);
1812 let carol = session.create_node(&["Person"]);
1813
1814 session.create_edge(alice, bob, "KNOWS");
1815 session.create_edge(alice, carol, "KNOWS");
1816
1817 let neighbors = session.get_neighbors_outgoing(alice);
1818 assert_eq!(neighbors.len(), 2);
1819
1820 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1821 assert!(neighbor_ids.contains(&bob));
1822 assert!(neighbor_ids.contains(&carol));
1823 }
1824
1825 #[test]
1826 fn test_get_neighbors_incoming() {
1827 let db = GrafeoDB::new_in_memory();
1828 let session = db.session();
1829
1830 let alice = session.create_node(&["Person"]);
1831 let bob = session.create_node(&["Person"]);
1832 let carol = session.create_node(&["Person"]);
1833
1834 session.create_edge(bob, alice, "KNOWS");
1835 session.create_edge(carol, alice, "KNOWS");
1836
1837 let neighbors = session.get_neighbors_incoming(alice);
1838 assert_eq!(neighbors.len(), 2);
1839
1840 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1841 assert!(neighbor_ids.contains(&bob));
1842 assert!(neighbor_ids.contains(&carol));
1843 }
1844
1845 #[test]
1846 fn test_get_neighbors_outgoing_by_type() {
1847 let db = GrafeoDB::new_in_memory();
1848 let session = db.session();
1849
1850 let alice = session.create_node(&["Person"]);
1851 let bob = session.create_node(&["Person"]);
1852 let company = session.create_node(&["Company"]);
1853
1854 session.create_edge(alice, bob, "KNOWS");
1855 session.create_edge(alice, company, "WORKS_AT");
1856
1857 let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1858 assert_eq!(knows_neighbors.len(), 1);
1859 assert_eq!(knows_neighbors[0].0, bob);
1860
1861 let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1862 assert_eq!(works_neighbors.len(), 1);
1863 assert_eq!(works_neighbors[0].0, company);
1864
1865 let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1867 assert!(no_neighbors.is_empty());
1868 }
1869
1870 #[test]
1871 fn test_node_exists() {
1872 use grafeo_common::types::NodeId;
1873
1874 let db = GrafeoDB::new_in_memory();
1875 let session = db.session();
1876
1877 let id = session.create_node(&["Person"]);
1878
1879 assert!(session.node_exists(id));
1880 assert!(!session.node_exists(NodeId::new(9999)));
1881 }
1882
1883 #[test]
1884 fn test_edge_exists() {
1885 use grafeo_common::types::EdgeId;
1886
1887 let db = GrafeoDB::new_in_memory();
1888 let session = db.session();
1889
1890 let alice = session.create_node(&["Person"]);
1891 let bob = session.create_node(&["Person"]);
1892 let edge_id = session.create_edge(alice, bob, "KNOWS");
1893
1894 assert!(session.edge_exists(edge_id));
1895 assert!(!session.edge_exists(EdgeId::new(9999)));
1896 }
1897
1898 #[test]
1899 fn test_get_degree() {
1900 let db = GrafeoDB::new_in_memory();
1901 let session = db.session();
1902
1903 let alice = session.create_node(&["Person"]);
1904 let bob = session.create_node(&["Person"]);
1905 let carol = session.create_node(&["Person"]);
1906
1907 session.create_edge(alice, bob, "KNOWS");
1909 session.create_edge(alice, carol, "KNOWS");
1910 session.create_edge(bob, alice, "KNOWS");
1912
1913 let (out_degree, in_degree) = session.get_degree(alice);
1914 assert_eq!(out_degree, 2);
1915 assert_eq!(in_degree, 1);
1916
1917 let lonely = session.create_node(&["Person"]);
1919 let (out, in_deg) = session.get_degree(lonely);
1920 assert_eq!(out, 0);
1921 assert_eq!(in_deg, 0);
1922 }
1923
1924 #[test]
1925 fn test_get_nodes_batch() {
1926 let db = GrafeoDB::new_in_memory();
1927 let session = db.session();
1928
1929 let alice = session.create_node(&["Person"]);
1930 let bob = session.create_node(&["Person"]);
1931 let carol = session.create_node(&["Person"]);
1932
1933 let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1934 assert_eq!(nodes.len(), 3);
1935 assert!(nodes[0].is_some());
1936 assert!(nodes[1].is_some());
1937 assert!(nodes[2].is_some());
1938
1939 use grafeo_common::types::NodeId;
1941 let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1942 assert_eq!(nodes_with_missing.len(), 3);
1943 assert!(nodes_with_missing[0].is_some());
1944 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
1946 }
1947
1948 #[test]
1949 fn test_auto_commit_setting() {
1950 let db = GrafeoDB::new_in_memory();
1951 let mut session = db.session();
1952
1953 assert!(session.auto_commit());
1955
1956 session.set_auto_commit(false);
1957 assert!(!session.auto_commit());
1958
1959 session.set_auto_commit(true);
1960 assert!(session.auto_commit());
1961 }
1962
1963 #[test]
1964 fn test_transaction_double_begin_error() {
1965 let db = GrafeoDB::new_in_memory();
1966 let mut session = db.session();
1967
1968 session.begin_tx().unwrap();
1969 let result = session.begin_tx();
1970
1971 assert!(result.is_err());
1972 session.rollback().unwrap();
1974 }
1975
1976 #[test]
1977 fn test_commit_without_transaction_error() {
1978 let db = GrafeoDB::new_in_memory();
1979 let mut session = db.session();
1980
1981 let result = session.commit();
1982 assert!(result.is_err());
1983 }
1984
1985 #[test]
1986 fn test_rollback_without_transaction_error() {
1987 let db = GrafeoDB::new_in_memory();
1988 let mut session = db.session();
1989
1990 let result = session.rollback();
1991 assert!(result.is_err());
1992 }
1993
1994 #[test]
1995 fn test_create_edge_in_transaction() {
1996 let db = GrafeoDB::new_in_memory();
1997 let mut session = db.session();
1998
1999 let alice = session.create_node(&["Person"]);
2001 let bob = session.create_node(&["Person"]);
2002
2003 session.begin_tx().unwrap();
2005 let edge_id = session.create_edge(alice, bob, "KNOWS");
2006
2007 assert!(session.edge_exists(edge_id));
2009
2010 session.commit().unwrap();
2012
2013 assert!(session.edge_exists(edge_id));
2015 }
2016
2017 #[test]
2018 fn test_neighbors_empty_node() {
2019 let db = GrafeoDB::new_in_memory();
2020 let session = db.session();
2021
2022 let lonely = session.create_node(&["Person"]);
2023
2024 assert!(session.get_neighbors_outgoing(lonely).is_empty());
2025 assert!(session.get_neighbors_incoming(lonely).is_empty());
2026 assert!(
2027 session
2028 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
2029 .is_empty()
2030 );
2031 }
2032 }
2033}