1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::config::{AdaptiveConfig, GraphModel};
20use crate::database::QueryResult;
21use crate::query::cache::QueryCache;
22use crate::transaction::TransactionManager;
23
24pub struct Session {
30 store: Arc<LpgStore>,
32 graph_store: Arc<dyn GraphStoreMut>,
34 #[cfg(feature = "rdf")]
36 rdf_store: Arc<RdfStore>,
37 tx_manager: Arc<TransactionManager>,
39 query_cache: Arc<QueryCache>,
41 current_tx: Option<TxId>,
43 auto_commit: bool,
45 #[allow(dead_code)]
47 adaptive_config: AdaptiveConfig,
48 factorized_execution: bool,
50 graph_model: GraphModel,
52 query_timeout: Option<Duration>,
54 commit_counter: Arc<AtomicUsize>,
56 gc_interval: usize,
58 tx_start_node_count: usize,
60 tx_start_edge_count: usize,
62 #[cfg(feature = "cdc")]
64 cdc_log: Arc<crate::cdc::CdcLog>,
65}
66
67impl Session {
68 #[allow(dead_code, clippy::too_many_arguments)]
70 pub(crate) fn with_adaptive(
71 store: Arc<LpgStore>,
72 tx_manager: Arc<TransactionManager>,
73 query_cache: Arc<QueryCache>,
74 adaptive_config: AdaptiveConfig,
75 factorized_execution: bool,
76 graph_model: GraphModel,
77 query_timeout: Option<Duration>,
78 commit_counter: Arc<AtomicUsize>,
79 gc_interval: usize,
80 ) -> Self {
81 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
82 Self {
83 store,
84 graph_store,
85 #[cfg(feature = "rdf")]
86 rdf_store: Arc::new(RdfStore::new()),
87 tx_manager,
88 query_cache,
89 current_tx: None,
90 auto_commit: true,
91 adaptive_config,
92 factorized_execution,
93 graph_model,
94 query_timeout,
95 commit_counter,
96 gc_interval,
97 tx_start_node_count: 0,
98 tx_start_edge_count: 0,
99 #[cfg(feature = "cdc")]
100 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
101 }
102 }
103
104 #[cfg(feature = "cdc")]
106 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
107 self.cdc_log = cdc_log;
108 }
109
110 #[cfg(feature = "rdf")]
112 #[allow(clippy::too_many_arguments)]
113 pub(crate) fn with_rdf_store_and_adaptive(
114 store: Arc<LpgStore>,
115 rdf_store: Arc<RdfStore>,
116 tx_manager: Arc<TransactionManager>,
117 query_cache: Arc<QueryCache>,
118 adaptive_config: AdaptiveConfig,
119 factorized_execution: bool,
120 graph_model: GraphModel,
121 query_timeout: Option<Duration>,
122 commit_counter: Arc<AtomicUsize>,
123 gc_interval: usize,
124 ) -> Self {
125 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
126 Self {
127 store,
128 graph_store,
129 rdf_store,
130 tx_manager,
131 query_cache,
132 current_tx: None,
133 auto_commit: true,
134 adaptive_config,
135 factorized_execution,
136 graph_model,
137 query_timeout,
138 commit_counter,
139 gc_interval,
140 tx_start_node_count: 0,
141 tx_start_edge_count: 0,
142 #[cfg(feature = "cdc")]
143 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
144 }
145 }
146
147 #[allow(clippy::too_many_arguments)]
152 pub(crate) fn with_external_store(
153 store: Arc<dyn GraphStoreMut>,
154 tx_manager: Arc<TransactionManager>,
155 query_cache: Arc<QueryCache>,
156 adaptive_config: AdaptiveConfig,
157 factorized_execution: bool,
158 graph_model: GraphModel,
159 query_timeout: Option<Duration>,
160 commit_counter: Arc<AtomicUsize>,
161 gc_interval: usize,
162 ) -> Self {
163 Self {
164 store: Arc::new(LpgStore::new()), graph_store: store,
166 #[cfg(feature = "rdf")]
167 rdf_store: Arc::new(RdfStore::new()),
168 tx_manager,
169 query_cache,
170 current_tx: None,
171 auto_commit: true,
172 adaptive_config,
173 factorized_execution,
174 graph_model,
175 query_timeout,
176 commit_counter,
177 gc_interval,
178 tx_start_node_count: 0,
179 tx_start_edge_count: 0,
180 #[cfg(feature = "cdc")]
181 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
182 }
183 }
184
185 #[must_use]
187 pub fn graph_model(&self) -> GraphModel {
188 self.graph_model
189 }
190
191 fn require_lpg(&self, language: &str) -> Result<()> {
193 if self.graph_model == GraphModel::Rdf {
194 return Err(grafeo_common::utils::error::Error::Internal(format!(
195 "This is an RDF database. {language} queries require an LPG database."
196 )));
197 }
198 Ok(())
199 }
200
201 #[cfg(feature = "gql")]
228 pub fn execute(&self, query: &str) -> Result<QueryResult> {
229 self.require_lpg("GQL")?;
230
231 use crate::query::{
232 Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
233 optimizer::Optimizer, processor::QueryLanguage,
234 };
235
236 let start_time = std::time::Instant::now();
237
238 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
240
241 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
243 cached_plan
245 } else {
246 let logical_plan = gql_translator::translate(query)?;
250
251 let mut binder = Binder::new();
253 let _binding_context = binder.bind(&logical_plan)?;
254
255 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
257 let plan = optimizer.optimize(logical_plan)?;
258
259 self.query_cache.put_optimized(cache_key, plan.clone());
261
262 plan
263 };
264
265 let (viewing_epoch, tx_id) = self.get_transaction_context();
267
268 let planner = Planner::with_context(
271 Arc::clone(&self.graph_store),
272 Arc::clone(&self.tx_manager),
273 tx_id,
274 viewing_epoch,
275 )
276 .with_factorized_execution(self.factorized_execution);
277 let mut physical_plan = planner.plan(&optimized_plan)?;
278
279 let executor = Executor::with_columns(physical_plan.columns.clone())
281 .with_deadline(self.query_deadline());
282 let mut result = executor.execute(physical_plan.operator.as_mut())?;
283
284 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
286 let rows_scanned = result.rows.len() as u64;
287 result.execution_time_ms = Some(elapsed_ms);
288 result.rows_scanned = Some(rows_scanned);
289
290 Ok(result)
291 }
292
293 #[cfg(feature = "gql")]
299 pub fn execute_with_params(
300 &self,
301 query: &str,
302 params: std::collections::HashMap<String, Value>,
303 ) -> Result<QueryResult> {
304 self.require_lpg("GQL")?;
305
306 use crate::query::processor::{QueryLanguage, QueryProcessor};
307
308 let (viewing_epoch, tx_id) = self.get_transaction_context();
310
311 let processor = QueryProcessor::for_graph_store_with_tx(
313 Arc::clone(&self.graph_store),
314 Arc::clone(&self.tx_manager),
315 );
316
317 let processor = if let Some(tx_id) = tx_id {
319 processor.with_tx_context(viewing_epoch, tx_id)
320 } else {
321 processor
322 };
323
324 processor.process(query, QueryLanguage::Gql, Some(¶ms))
325 }
326
327 #[cfg(not(any(feature = "gql", feature = "cypher")))]
333 pub fn execute_with_params(
334 &self,
335 _query: &str,
336 _params: std::collections::HashMap<String, Value>,
337 ) -> Result<QueryResult> {
338 Err(grafeo_common::utils::error::Error::Internal(
339 "No query language enabled".to_string(),
340 ))
341 }
342
343 #[cfg(not(any(feature = "gql", feature = "cypher")))]
349 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
350 Err(grafeo_common::utils::error::Error::Internal(
351 "No query language enabled".to_string(),
352 ))
353 }
354
355 #[cfg(feature = "cypher")]
361 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
362 use crate::query::{
363 Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
364 optimizer::Optimizer, processor::QueryLanguage,
365 };
366
367 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
369
370 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
372 cached_plan
373 } else {
374 let logical_plan = cypher_translator::translate(query)?;
376
377 let mut binder = Binder::new();
379 let _binding_context = binder.bind(&logical_plan)?;
380
381 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
383 let plan = optimizer.optimize(logical_plan)?;
384
385 self.query_cache.put_optimized(cache_key, plan.clone());
387
388 plan
389 };
390
391 let (viewing_epoch, tx_id) = self.get_transaction_context();
393
394 let planner = Planner::with_context(
396 Arc::clone(&self.graph_store),
397 Arc::clone(&self.tx_manager),
398 tx_id,
399 viewing_epoch,
400 )
401 .with_factorized_execution(self.factorized_execution);
402 let mut physical_plan = planner.plan(&optimized_plan)?;
403
404 let executor = Executor::with_columns(physical_plan.columns.clone())
406 .with_deadline(self.query_deadline());
407 let result = executor.execute(physical_plan.operator.as_mut())?;
408 Ok(result)
409 }
410
411 #[cfg(feature = "gremlin")]
435 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
436 use crate::query::{
437 Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
438 };
439
440 let logical_plan = gremlin_translator::translate(query)?;
442
443 let mut binder = Binder::new();
445 let _binding_context = binder.bind(&logical_plan)?;
446
447 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
449 let optimized_plan = optimizer.optimize(logical_plan)?;
450
451 let (viewing_epoch, tx_id) = self.get_transaction_context();
453
454 let planner = Planner::with_context(
456 Arc::clone(&self.graph_store),
457 Arc::clone(&self.tx_manager),
458 tx_id,
459 viewing_epoch,
460 )
461 .with_factorized_execution(self.factorized_execution);
462 let mut physical_plan = planner.plan(&optimized_plan)?;
463
464 let executor = Executor::with_columns(physical_plan.columns.clone())
466 .with_deadline(self.query_deadline());
467 let result = executor.execute(physical_plan.operator.as_mut())?;
468 Ok(result)
469 }
470
471 #[cfg(feature = "gremlin")]
477 pub fn execute_gremlin_with_params(
478 &self,
479 query: &str,
480 params: std::collections::HashMap<String, Value>,
481 ) -> Result<QueryResult> {
482 use crate::query::processor::{QueryLanguage, QueryProcessor};
483
484 let (viewing_epoch, tx_id) = self.get_transaction_context();
486
487 let processor = QueryProcessor::for_graph_store_with_tx(
489 Arc::clone(&self.graph_store),
490 Arc::clone(&self.tx_manager),
491 );
492
493 let processor = if let Some(tx_id) = tx_id {
495 processor.with_tx_context(viewing_epoch, tx_id)
496 } else {
497 processor
498 };
499
500 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
501 }
502
503 #[cfg(feature = "graphql")]
527 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
528 use crate::query::{
529 Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
530 };
531
532 let logical_plan = graphql_translator::translate(query)?;
534
535 let mut binder = Binder::new();
537 let _binding_context = binder.bind(&logical_plan)?;
538
539 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
541 let optimized_plan = optimizer.optimize(logical_plan)?;
542
543 let (viewing_epoch, tx_id) = self.get_transaction_context();
545
546 let planner = Planner::with_context(
548 Arc::clone(&self.graph_store),
549 Arc::clone(&self.tx_manager),
550 tx_id,
551 viewing_epoch,
552 )
553 .with_factorized_execution(self.factorized_execution);
554 let mut physical_plan = planner.plan(&optimized_plan)?;
555
556 let executor = Executor::with_columns(physical_plan.columns.clone())
558 .with_deadline(self.query_deadline());
559 let result = executor.execute(physical_plan.operator.as_mut())?;
560 Ok(result)
561 }
562
563 #[cfg(feature = "graphql")]
569 pub fn execute_graphql_with_params(
570 &self,
571 query: &str,
572 params: std::collections::HashMap<String, Value>,
573 ) -> Result<QueryResult> {
574 use crate::query::processor::{QueryLanguage, QueryProcessor};
575
576 let (viewing_epoch, tx_id) = self.get_transaction_context();
578
579 let processor = QueryProcessor::for_graph_store_with_tx(
581 Arc::clone(&self.graph_store),
582 Arc::clone(&self.tx_manager),
583 );
584
585 let processor = if let Some(tx_id) = tx_id {
587 processor.with_tx_context(viewing_epoch, tx_id)
588 } else {
589 processor
590 };
591
592 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
593 }
594
595 #[cfg(feature = "sql-pgq")]
620 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
621 use crate::query::{
622 Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
623 plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
624 };
625
626 let logical_plan = sql_pgq_translator::translate(query)?;
628
629 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
631 return Ok(QueryResult {
632 columns: vec!["status".into()],
633 column_types: vec![grafeo_common::types::LogicalType::String],
634 rows: vec![vec![Value::from(format!(
635 "Property graph '{}' created ({} node tables, {} edge tables)",
636 cpg.name,
637 cpg.node_tables.len(),
638 cpg.edge_tables.len()
639 ))]],
640 execution_time_ms: None,
641 rows_scanned: None,
642 });
643 }
644
645 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
647
648 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
650 cached_plan
651 } else {
652 let mut binder = Binder::new();
654 let _binding_context = binder.bind(&logical_plan)?;
655
656 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
658 let plan = optimizer.optimize(logical_plan)?;
659
660 self.query_cache.put_optimized(cache_key, plan.clone());
662
663 plan
664 };
665
666 let (viewing_epoch, tx_id) = self.get_transaction_context();
668
669 let planner = Planner::with_context(
671 Arc::clone(&self.graph_store),
672 Arc::clone(&self.tx_manager),
673 tx_id,
674 viewing_epoch,
675 )
676 .with_factorized_execution(self.factorized_execution);
677 let mut physical_plan = planner.plan(&optimized_plan)?;
678
679 let executor = Executor::with_columns(physical_plan.columns.clone())
681 .with_deadline(self.query_deadline());
682 let result = executor.execute(physical_plan.operator.as_mut())?;
683 Ok(result)
684 }
685
686 #[cfg(feature = "sql-pgq")]
692 pub fn execute_sql_with_params(
693 &self,
694 query: &str,
695 params: std::collections::HashMap<String, Value>,
696 ) -> Result<QueryResult> {
697 use crate::query::processor::{QueryLanguage, QueryProcessor};
698
699 let (viewing_epoch, tx_id) = self.get_transaction_context();
701
702 let processor = QueryProcessor::for_graph_store_with_tx(
704 Arc::clone(&self.graph_store),
705 Arc::clone(&self.tx_manager),
706 );
707
708 let processor = if let Some(tx_id) = tx_id {
710 processor.with_tx_context(viewing_epoch, tx_id)
711 } else {
712 processor
713 };
714
715 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
716 }
717
718 #[cfg(all(feature = "sparql", feature = "rdf"))]
724 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
725 use crate::query::{
726 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
727 };
728
729 let logical_plan = sparql_translator::translate(query)?;
731
732 let optimizer = Optimizer::from_graph_store(&*self.graph_store);
734 let optimized_plan = optimizer.optimize(logical_plan)?;
735
736 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
738 let mut physical_plan = planner.plan(&optimized_plan)?;
739
740 let executor = Executor::with_columns(physical_plan.columns.clone())
742 .with_deadline(self.query_deadline());
743 executor.execute(physical_plan.operator.as_mut())
744 }
745
746 #[cfg(all(feature = "sparql", feature = "rdf"))]
752 pub fn execute_sparql_with_params(
753 &self,
754 query: &str,
755 _params: std::collections::HashMap<String, Value>,
756 ) -> Result<QueryResult> {
757 self.execute_sparql(query)
760 }
761
762 pub fn execute_language(
771 &self,
772 query: &str,
773 language: &str,
774 params: Option<std::collections::HashMap<String, Value>>,
775 ) -> Result<QueryResult> {
776 match language {
777 "gql" => {
778 if let Some(p) = params {
779 self.execute_with_params(query, p)
780 } else {
781 self.execute(query)
782 }
783 }
784 #[cfg(feature = "cypher")]
785 "cypher" => {
786 if let Some(p) = params {
787 use crate::query::processor::{QueryLanguage, QueryProcessor};
788 let processor = QueryProcessor::for_graph_store_with_tx(
789 Arc::clone(&self.graph_store),
790 Arc::clone(&self.tx_manager),
791 );
792 let (viewing_epoch, tx_id) = self.get_transaction_context();
793 let processor = if let Some(tx_id) = tx_id {
794 processor.with_tx_context(viewing_epoch, tx_id)
795 } else {
796 processor
797 };
798 processor.process(query, QueryLanguage::Cypher, Some(&p))
799 } else {
800 self.execute_cypher(query)
801 }
802 }
803 #[cfg(feature = "gremlin")]
804 "gremlin" => {
805 if let Some(p) = params {
806 self.execute_gremlin_with_params(query, p)
807 } else {
808 self.execute_gremlin(query)
809 }
810 }
811 #[cfg(feature = "graphql")]
812 "graphql" => {
813 if let Some(p) = params {
814 self.execute_graphql_with_params(query, p)
815 } else {
816 self.execute_graphql(query)
817 }
818 }
819 #[cfg(feature = "sql-pgq")]
820 "sql" | "sql-pgq" => {
821 if let Some(p) = params {
822 self.execute_sql_with_params(query, p)
823 } else {
824 self.execute_sql(query)
825 }
826 }
827 #[cfg(all(feature = "sparql", feature = "rdf"))]
828 "sparql" => {
829 if let Some(p) = params {
830 self.execute_sparql_with_params(query, p)
831 } else {
832 self.execute_sparql(query)
833 }
834 }
835 other => Err(grafeo_common::utils::error::Error::Query(
836 grafeo_common::utils::error::QueryError::new(
837 grafeo_common::utils::error::QueryErrorKind::Semantic,
838 format!("Unknown query language: '{other}'"),
839 ),
840 )),
841 }
842 }
843
844 pub fn begin_tx(&mut self) -> Result<()> {
867 if self.current_tx.is_some() {
868 return Err(grafeo_common::utils::error::Error::Transaction(
869 grafeo_common::utils::error::TransactionError::InvalidState(
870 "Transaction already active".to_string(),
871 ),
872 ));
873 }
874
875 self.tx_start_node_count = self.store.node_count();
876 self.tx_start_edge_count = self.store.edge_count();
877 let tx_id = self.tx_manager.begin();
878 self.current_tx = Some(tx_id);
879 Ok(())
880 }
881
882 pub fn begin_tx_with_isolation(
890 &mut self,
891 isolation_level: crate::transaction::IsolationLevel,
892 ) -> Result<()> {
893 if self.current_tx.is_some() {
894 return Err(grafeo_common::utils::error::Error::Transaction(
895 grafeo_common::utils::error::TransactionError::InvalidState(
896 "Transaction already active".to_string(),
897 ),
898 ));
899 }
900
901 self.tx_start_node_count = self.store.node_count();
902 self.tx_start_edge_count = self.store.edge_count();
903 let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
904 self.current_tx = Some(tx_id);
905 Ok(())
906 }
907
908 pub fn commit(&mut self) -> Result<()> {
916 let tx_id = self.current_tx.take().ok_or_else(|| {
917 grafeo_common::utils::error::Error::Transaction(
918 grafeo_common::utils::error::TransactionError::InvalidState(
919 "No active transaction".to_string(),
920 ),
921 )
922 })?;
923
924 #[cfg(feature = "rdf")]
926 self.rdf_store.commit_tx(tx_id);
927
928 self.tx_manager.commit(tx_id)?;
929
930 self.store.sync_epoch(self.tx_manager.current_epoch());
934
935 if self.gc_interval > 0 {
937 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
938 if count.is_multiple_of(self.gc_interval) {
939 let min_epoch = self.tx_manager.min_active_epoch();
940 self.store.gc_versions(min_epoch);
941 self.tx_manager.gc();
942 }
943 }
944
945 Ok(())
946 }
947
948 pub fn rollback(&mut self) -> Result<()> {
972 let tx_id = self.current_tx.take().ok_or_else(|| {
973 grafeo_common::utils::error::Error::Transaction(
974 grafeo_common::utils::error::TransactionError::InvalidState(
975 "No active transaction".to_string(),
976 ),
977 )
978 })?;
979
980 self.store.discard_uncommitted_versions(tx_id);
982
983 #[cfg(feature = "rdf")]
985 self.rdf_store.rollback_tx(tx_id);
986
987 self.tx_manager.abort(tx_id)
989 }
990
991 #[must_use]
993 pub fn in_transaction(&self) -> bool {
994 self.current_tx.is_some()
995 }
996
997 #[must_use]
999 pub(crate) fn current_tx_id(&self) -> Option<TxId> {
1000 self.current_tx
1001 }
1002
1003 #[must_use]
1005 pub(crate) fn tx_manager(&self) -> &TransactionManager {
1006 &self.tx_manager
1007 }
1008
1009 #[must_use]
1011 pub(crate) fn node_count_delta(&self) -> (usize, usize) {
1012 (self.tx_start_node_count, self.store.node_count())
1013 }
1014
1015 #[must_use]
1017 pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
1018 (self.tx_start_edge_count, self.store.edge_count())
1019 }
1020
1021 pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
1055 crate::transaction::PreparedCommit::new(self)
1056 }
1057
1058 pub fn set_auto_commit(&mut self, auto_commit: bool) {
1060 self.auto_commit = auto_commit;
1061 }
1062
1063 #[must_use]
1065 pub fn auto_commit(&self) -> bool {
1066 self.auto_commit
1067 }
1068
1069 #[must_use]
1071 fn query_deadline(&self) -> Option<Instant> {
1072 self.query_timeout.map(|d| Instant::now() + d)
1073 }
1074
1075 #[must_use]
1081 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
1082 if let Some(tx_id) = self.current_tx {
1083 let epoch = self
1085 .tx_manager
1086 .start_epoch(tx_id)
1087 .unwrap_or_else(|| self.tx_manager.current_epoch());
1088 (epoch, Some(tx_id))
1089 } else {
1090 (self.tx_manager.current_epoch(), None)
1092 }
1093 }
1094
1095 pub fn create_node(&self, labels: &[&str]) -> NodeId {
1100 let (epoch, tx_id) = self.get_transaction_context();
1101 self.store
1102 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1103 }
1104
1105 pub fn create_node_with_props<'a>(
1109 &self,
1110 labels: &[&str],
1111 properties: impl IntoIterator<Item = (&'a str, Value)>,
1112 ) -> NodeId {
1113 let (epoch, tx_id) = self.get_transaction_context();
1114 self.store.create_node_with_props_versioned(
1115 labels,
1116 properties.into_iter().map(|(k, v)| (k, v)),
1117 epoch,
1118 tx_id.unwrap_or(TxId::SYSTEM),
1119 )
1120 }
1121
1122 pub fn create_edge(
1127 &self,
1128 src: NodeId,
1129 dst: NodeId,
1130 edge_type: &str,
1131 ) -> grafeo_common::types::EdgeId {
1132 let (epoch, tx_id) = self.get_transaction_context();
1133 self.store
1134 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1135 }
1136
1137 #[must_use]
1165 pub fn get_node(&self, id: NodeId) -> Option<Node> {
1166 let (epoch, tx_id) = self.get_transaction_context();
1167 self.store
1168 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1169 }
1170
1171 #[must_use]
1195 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
1196 self.get_node(id)
1197 .and_then(|node| node.get_property(key).cloned())
1198 }
1199
1200 #[must_use]
1207 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1208 let (epoch, tx_id) = self.get_transaction_context();
1209 self.store
1210 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1211 }
1212
1213 #[must_use]
1239 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1240 self.store.edges_from(node, Direction::Outgoing).collect()
1241 }
1242
1243 #[must_use]
1252 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1253 self.store.edges_from(node, Direction::Incoming).collect()
1254 }
1255
1256 #[must_use]
1268 pub fn get_neighbors_outgoing_by_type(
1269 &self,
1270 node: NodeId,
1271 edge_type: &str,
1272 ) -> Vec<(NodeId, EdgeId)> {
1273 self.store
1274 .edges_from(node, Direction::Outgoing)
1275 .filter(|(_, edge_id)| {
1276 self.get_edge(*edge_id)
1277 .is_some_and(|e| e.edge_type.as_str() == edge_type)
1278 })
1279 .collect()
1280 }
1281
1282 #[must_use]
1289 pub fn node_exists(&self, id: NodeId) -> bool {
1290 self.get_node(id).is_some()
1291 }
1292
1293 #[must_use]
1295 pub fn edge_exists(&self, id: EdgeId) -> bool {
1296 self.get_edge(id).is_some()
1297 }
1298
1299 #[must_use]
1303 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1304 let out = self.store.out_degree(node);
1305 let in_degree = self.store.in_degree(node);
1306 (out, in_degree)
1307 }
1308
1309 #[must_use]
1319 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1320 let (epoch, tx_id) = self.get_transaction_context();
1321 let tx = tx_id.unwrap_or(TxId::SYSTEM);
1322 ids.iter()
1323 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1324 .collect()
1325 }
1326
1327 #[cfg(feature = "cdc")]
1331 pub fn history(
1332 &self,
1333 entity_id: impl Into<crate::cdc::EntityId>,
1334 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1335 Ok(self.cdc_log.history(entity_id.into()))
1336 }
1337
1338 #[cfg(feature = "cdc")]
1340 pub fn history_since(
1341 &self,
1342 entity_id: impl Into<crate::cdc::EntityId>,
1343 since_epoch: EpochId,
1344 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1345 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1346 }
1347
1348 #[cfg(feature = "cdc")]
1350 pub fn changes_between(
1351 &self,
1352 start_epoch: EpochId,
1353 end_epoch: EpochId,
1354 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1355 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1356 }
1357}
1358
1359#[cfg(test)]
1360mod tests {
1361 use crate::database::GrafeoDB;
1362
1363 #[test]
1364 fn test_session_create_node() {
1365 let db = GrafeoDB::new_in_memory();
1366 let session = db.session();
1367
1368 let id = session.create_node(&["Person"]);
1369 assert!(id.is_valid());
1370 assert_eq!(db.node_count(), 1);
1371 }
1372
1373 #[test]
1374 fn test_session_transaction() {
1375 let db = GrafeoDB::new_in_memory();
1376 let mut session = db.session();
1377
1378 assert!(!session.in_transaction());
1379
1380 session.begin_tx().unwrap();
1381 assert!(session.in_transaction());
1382
1383 session.commit().unwrap();
1384 assert!(!session.in_transaction());
1385 }
1386
1387 #[test]
1388 fn test_session_transaction_context() {
1389 let db = GrafeoDB::new_in_memory();
1390 let mut session = db.session();
1391
1392 let (_epoch1, tx_id1) = session.get_transaction_context();
1394 assert!(tx_id1.is_none());
1395
1396 session.begin_tx().unwrap();
1398 let (epoch2, tx_id2) = session.get_transaction_context();
1399 assert!(tx_id2.is_some());
1400 let _ = epoch2; session.commit().unwrap();
1405 let (epoch3, tx_id3) = session.get_transaction_context();
1406 assert!(tx_id3.is_none());
1407 assert!(epoch3.as_u64() >= epoch2.as_u64());
1409 }
1410
1411 #[test]
1412 fn test_session_rollback() {
1413 let db = GrafeoDB::new_in_memory();
1414 let mut session = db.session();
1415
1416 session.begin_tx().unwrap();
1417 session.rollback().unwrap();
1418 assert!(!session.in_transaction());
1419 }
1420
1421 #[test]
1422 fn test_session_rollback_discards_versions() {
1423 use grafeo_common::types::TxId;
1424
1425 let db = GrafeoDB::new_in_memory();
1426
1427 let node_before = db.store().create_node(&["Person"]);
1429 assert!(node_before.is_valid());
1430 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1431
1432 let mut session = db.session();
1434 session.begin_tx().unwrap();
1435 let tx_id = session.current_tx.unwrap();
1436
1437 let epoch = db.store().current_epoch();
1439 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1440 assert!(node_in_tx.is_valid());
1441
1442 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1444
1445 session.rollback().unwrap();
1447 assert!(!session.in_transaction());
1448
1449 let count_after = db.node_count();
1452 assert_eq!(
1453 count_after, 1,
1454 "Rollback should discard uncommitted node, but got {count_after}"
1455 );
1456
1457 let current_epoch = db.store().current_epoch();
1459 assert!(
1460 db.store()
1461 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1462 .is_some(),
1463 "Original node should still exist"
1464 );
1465
1466 assert!(
1468 db.store()
1469 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1470 .is_none(),
1471 "Transaction node should be gone"
1472 );
1473 }
1474
1475 #[test]
1476 fn test_session_create_node_in_transaction() {
1477 let db = GrafeoDB::new_in_memory();
1479
1480 let node_before = db.create_node(&["Person"]);
1482 assert!(node_before.is_valid());
1483 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1484
1485 let mut session = db.session();
1487 session.begin_tx().unwrap();
1488
1489 let node_in_tx = session.create_node(&["Person"]);
1491 assert!(node_in_tx.is_valid());
1492
1493 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1495
1496 session.rollback().unwrap();
1498
1499 let count_after = db.node_count();
1501 assert_eq!(
1502 count_after, 1,
1503 "Rollback should discard node created via session.create_node(), but got {count_after}"
1504 );
1505 }
1506
1507 #[test]
1508 fn test_session_create_node_with_props_in_transaction() {
1509 use grafeo_common::types::Value;
1510
1511 let db = GrafeoDB::new_in_memory();
1513
1514 db.create_node(&["Person"]);
1516 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1517
1518 let mut session = db.session();
1520 session.begin_tx().unwrap();
1521
1522 let node_in_tx =
1523 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1524 assert!(node_in_tx.is_valid());
1525
1526 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1528
1529 session.rollback().unwrap();
1531
1532 let count_after = db.node_count();
1534 assert_eq!(
1535 count_after, 1,
1536 "Rollback should discard node created via session.create_node_with_props()"
1537 );
1538 }
1539
1540 #[cfg(feature = "gql")]
1541 mod gql_tests {
1542 use super::*;
1543
1544 #[test]
1545 fn test_gql_query_execution() {
1546 let db = GrafeoDB::new_in_memory();
1547 let session = db.session();
1548
1549 session.create_node(&["Person"]);
1551 session.create_node(&["Person"]);
1552 session.create_node(&["Animal"]);
1553
1554 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1556
1557 assert_eq!(result.row_count(), 2);
1559 assert_eq!(result.column_count(), 1);
1560 assert_eq!(result.columns[0], "n");
1561 }
1562
1563 #[test]
1564 fn test_gql_empty_result() {
1565 let db = GrafeoDB::new_in_memory();
1566 let session = db.session();
1567
1568 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1570
1571 assert_eq!(result.row_count(), 0);
1572 }
1573
1574 #[test]
1575 fn test_gql_parse_error() {
1576 let db = GrafeoDB::new_in_memory();
1577 let session = db.session();
1578
1579 let result = session.execute("MATCH (n RETURN n");
1581
1582 assert!(result.is_err());
1583 }
1584
1585 #[test]
1586 fn test_gql_relationship_traversal() {
1587 let db = GrafeoDB::new_in_memory();
1588 let session = db.session();
1589
1590 let alice = session.create_node(&["Person"]);
1592 let bob = session.create_node(&["Person"]);
1593 let charlie = session.create_node(&["Person"]);
1594
1595 session.create_edge(alice, bob, "KNOWS");
1596 session.create_edge(alice, charlie, "KNOWS");
1597
1598 let result = session
1600 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1601 .unwrap();
1602
1603 assert_eq!(result.row_count(), 2);
1605 assert_eq!(result.column_count(), 2);
1606 assert_eq!(result.columns[0], "a");
1607 assert_eq!(result.columns[1], "b");
1608 }
1609
1610 #[test]
1611 fn test_gql_relationship_with_type_filter() {
1612 let db = GrafeoDB::new_in_memory();
1613 let session = db.session();
1614
1615 let alice = session.create_node(&["Person"]);
1617 let bob = session.create_node(&["Person"]);
1618 let charlie = session.create_node(&["Person"]);
1619
1620 session.create_edge(alice, bob, "KNOWS");
1621 session.create_edge(alice, charlie, "WORKS_WITH");
1622
1623 let result = session
1625 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1626 .unwrap();
1627
1628 assert_eq!(result.row_count(), 1);
1630 }
1631
1632 #[test]
1633 fn test_gql_semantic_error_undefined_variable() {
1634 let db = GrafeoDB::new_in_memory();
1635 let session = db.session();
1636
1637 let result = session.execute("MATCH (n:Person) RETURN x");
1639
1640 assert!(result.is_err());
1642 let Err(err) = result else {
1643 panic!("Expected error")
1644 };
1645 assert!(
1646 err.to_string().contains("Undefined variable"),
1647 "Expected undefined variable error, got: {}",
1648 err
1649 );
1650 }
1651
1652 #[test]
1653 fn test_gql_where_clause_property_filter() {
1654 use grafeo_common::types::Value;
1655
1656 let db = GrafeoDB::new_in_memory();
1657 let session = db.session();
1658
1659 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1661 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1662 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1663
1664 let result = session
1666 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1667 .unwrap();
1668
1669 assert_eq!(result.row_count(), 2);
1671 }
1672
1673 #[test]
1674 fn test_gql_where_clause_equality() {
1675 use grafeo_common::types::Value;
1676
1677 let db = GrafeoDB::new_in_memory();
1678 let session = db.session();
1679
1680 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1682 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1683 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1684
1685 let result = session
1687 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1688 .unwrap();
1689
1690 assert_eq!(result.row_count(), 2);
1692 }
1693
1694 #[test]
1695 fn test_gql_return_property_access() {
1696 use grafeo_common::types::Value;
1697
1698 let db = GrafeoDB::new_in_memory();
1699 let session = db.session();
1700
1701 session.create_node_with_props(
1703 &["Person"],
1704 [
1705 ("name", Value::String("Alice".into())),
1706 ("age", Value::Int64(30)),
1707 ],
1708 );
1709 session.create_node_with_props(
1710 &["Person"],
1711 [
1712 ("name", Value::String("Bob".into())),
1713 ("age", Value::Int64(25)),
1714 ],
1715 );
1716
1717 let result = session
1719 .execute("MATCH (n:Person) RETURN n.name, n.age")
1720 .unwrap();
1721
1722 assert_eq!(result.row_count(), 2);
1724 assert_eq!(result.column_count(), 2);
1725 assert_eq!(result.columns[0], "n.name");
1726 assert_eq!(result.columns[1], "n.age");
1727
1728 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1730 assert!(names.contains(&&Value::String("Alice".into())));
1731 assert!(names.contains(&&Value::String("Bob".into())));
1732 }
1733
1734 #[test]
1735 fn test_gql_return_mixed_expressions() {
1736 use grafeo_common::types::Value;
1737
1738 let db = GrafeoDB::new_in_memory();
1739 let session = db.session();
1740
1741 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1743
1744 let result = session
1746 .execute("MATCH (n:Person) RETURN n, n.name")
1747 .unwrap();
1748
1749 assert_eq!(result.row_count(), 1);
1750 assert_eq!(result.column_count(), 2);
1751 assert_eq!(result.columns[0], "n");
1752 assert_eq!(result.columns[1], "n.name");
1753
1754 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1756 }
1757 }
1758
1759 #[cfg(feature = "cypher")]
1760 mod cypher_tests {
1761 use super::*;
1762
1763 #[test]
1764 fn test_cypher_query_execution() {
1765 let db = GrafeoDB::new_in_memory();
1766 let session = db.session();
1767
1768 session.create_node(&["Person"]);
1770 session.create_node(&["Person"]);
1771 session.create_node(&["Animal"]);
1772
1773 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1775
1776 assert_eq!(result.row_count(), 2);
1778 assert_eq!(result.column_count(), 1);
1779 assert_eq!(result.columns[0], "n");
1780 }
1781
1782 #[test]
1783 fn test_cypher_empty_result() {
1784 let db = GrafeoDB::new_in_memory();
1785 let session = db.session();
1786
1787 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1789
1790 assert_eq!(result.row_count(), 0);
1791 }
1792
1793 #[test]
1794 fn test_cypher_parse_error() {
1795 let db = GrafeoDB::new_in_memory();
1796 let session = db.session();
1797
1798 let result = session.execute_cypher("MATCH (n RETURN n");
1800
1801 assert!(result.is_err());
1802 }
1803 }
1804
1805 mod direct_lookup_tests {
1808 use super::*;
1809 use grafeo_common::types::Value;
1810
1811 #[test]
1812 fn test_get_node() {
1813 let db = GrafeoDB::new_in_memory();
1814 let session = db.session();
1815
1816 let id = session.create_node(&["Person"]);
1817 let node = session.get_node(id);
1818
1819 assert!(node.is_some());
1820 let node = node.unwrap();
1821 assert_eq!(node.id, id);
1822 }
1823
1824 #[test]
1825 fn test_get_node_not_found() {
1826 use grafeo_common::types::NodeId;
1827
1828 let db = GrafeoDB::new_in_memory();
1829 let session = db.session();
1830
1831 let node = session.get_node(NodeId::new(9999));
1833 assert!(node.is_none());
1834 }
1835
1836 #[test]
1837 fn test_get_node_property() {
1838 let db = GrafeoDB::new_in_memory();
1839 let session = db.session();
1840
1841 let id = session
1842 .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1843
1844 let name = session.get_node_property(id, "name");
1845 assert_eq!(name, Some(Value::String("Alice".into())));
1846
1847 let missing = session.get_node_property(id, "missing");
1849 assert!(missing.is_none());
1850 }
1851
1852 #[test]
1853 fn test_get_edge() {
1854 let db = GrafeoDB::new_in_memory();
1855 let session = db.session();
1856
1857 let alice = session.create_node(&["Person"]);
1858 let bob = session.create_node(&["Person"]);
1859 let edge_id = session.create_edge(alice, bob, "KNOWS");
1860
1861 let edge = session.get_edge(edge_id);
1862 assert!(edge.is_some());
1863 let edge = edge.unwrap();
1864 assert_eq!(edge.id, edge_id);
1865 assert_eq!(edge.src, alice);
1866 assert_eq!(edge.dst, bob);
1867 }
1868
1869 #[test]
1870 fn test_get_edge_not_found() {
1871 use grafeo_common::types::EdgeId;
1872
1873 let db = GrafeoDB::new_in_memory();
1874 let session = db.session();
1875
1876 let edge = session.get_edge(EdgeId::new(9999));
1877 assert!(edge.is_none());
1878 }
1879
1880 #[test]
1881 fn test_get_neighbors_outgoing() {
1882 let db = GrafeoDB::new_in_memory();
1883 let session = db.session();
1884
1885 let alice = session.create_node(&["Person"]);
1886 let bob = session.create_node(&["Person"]);
1887 let carol = session.create_node(&["Person"]);
1888
1889 session.create_edge(alice, bob, "KNOWS");
1890 session.create_edge(alice, carol, "KNOWS");
1891
1892 let neighbors = session.get_neighbors_outgoing(alice);
1893 assert_eq!(neighbors.len(), 2);
1894
1895 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1896 assert!(neighbor_ids.contains(&bob));
1897 assert!(neighbor_ids.contains(&carol));
1898 }
1899
1900 #[test]
1901 fn test_get_neighbors_incoming() {
1902 let db = GrafeoDB::new_in_memory();
1903 let session = db.session();
1904
1905 let alice = session.create_node(&["Person"]);
1906 let bob = session.create_node(&["Person"]);
1907 let carol = session.create_node(&["Person"]);
1908
1909 session.create_edge(bob, alice, "KNOWS");
1910 session.create_edge(carol, alice, "KNOWS");
1911
1912 let neighbors = session.get_neighbors_incoming(alice);
1913 assert_eq!(neighbors.len(), 2);
1914
1915 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1916 assert!(neighbor_ids.contains(&bob));
1917 assert!(neighbor_ids.contains(&carol));
1918 }
1919
1920 #[test]
1921 fn test_get_neighbors_outgoing_by_type() {
1922 let db = GrafeoDB::new_in_memory();
1923 let session = db.session();
1924
1925 let alice = session.create_node(&["Person"]);
1926 let bob = session.create_node(&["Person"]);
1927 let company = session.create_node(&["Company"]);
1928
1929 session.create_edge(alice, bob, "KNOWS");
1930 session.create_edge(alice, company, "WORKS_AT");
1931
1932 let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1933 assert_eq!(knows_neighbors.len(), 1);
1934 assert_eq!(knows_neighbors[0].0, bob);
1935
1936 let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1937 assert_eq!(works_neighbors.len(), 1);
1938 assert_eq!(works_neighbors[0].0, company);
1939
1940 let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1942 assert!(no_neighbors.is_empty());
1943 }
1944
1945 #[test]
1946 fn test_node_exists() {
1947 use grafeo_common::types::NodeId;
1948
1949 let db = GrafeoDB::new_in_memory();
1950 let session = db.session();
1951
1952 let id = session.create_node(&["Person"]);
1953
1954 assert!(session.node_exists(id));
1955 assert!(!session.node_exists(NodeId::new(9999)));
1956 }
1957
1958 #[test]
1959 fn test_edge_exists() {
1960 use grafeo_common::types::EdgeId;
1961
1962 let db = GrafeoDB::new_in_memory();
1963 let session = db.session();
1964
1965 let alice = session.create_node(&["Person"]);
1966 let bob = session.create_node(&["Person"]);
1967 let edge_id = session.create_edge(alice, bob, "KNOWS");
1968
1969 assert!(session.edge_exists(edge_id));
1970 assert!(!session.edge_exists(EdgeId::new(9999)));
1971 }
1972
1973 #[test]
1974 fn test_get_degree() {
1975 let db = GrafeoDB::new_in_memory();
1976 let session = db.session();
1977
1978 let alice = session.create_node(&["Person"]);
1979 let bob = session.create_node(&["Person"]);
1980 let carol = session.create_node(&["Person"]);
1981
1982 session.create_edge(alice, bob, "KNOWS");
1984 session.create_edge(alice, carol, "KNOWS");
1985 session.create_edge(bob, alice, "KNOWS");
1987
1988 let (out_degree, in_degree) = session.get_degree(alice);
1989 assert_eq!(out_degree, 2);
1990 assert_eq!(in_degree, 1);
1991
1992 let lonely = session.create_node(&["Person"]);
1994 let (out, in_deg) = session.get_degree(lonely);
1995 assert_eq!(out, 0);
1996 assert_eq!(in_deg, 0);
1997 }
1998
1999 #[test]
2000 fn test_get_nodes_batch() {
2001 let db = GrafeoDB::new_in_memory();
2002 let session = db.session();
2003
2004 let alice = session.create_node(&["Person"]);
2005 let bob = session.create_node(&["Person"]);
2006 let carol = session.create_node(&["Person"]);
2007
2008 let nodes = session.get_nodes_batch(&[alice, bob, carol]);
2009 assert_eq!(nodes.len(), 3);
2010 assert!(nodes[0].is_some());
2011 assert!(nodes[1].is_some());
2012 assert!(nodes[2].is_some());
2013
2014 use grafeo_common::types::NodeId;
2016 let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
2017 assert_eq!(nodes_with_missing.len(), 3);
2018 assert!(nodes_with_missing[0].is_some());
2019 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
2021 }
2022
2023 #[test]
2024 fn test_auto_commit_setting() {
2025 let db = GrafeoDB::new_in_memory();
2026 let mut session = db.session();
2027
2028 assert!(session.auto_commit());
2030
2031 session.set_auto_commit(false);
2032 assert!(!session.auto_commit());
2033
2034 session.set_auto_commit(true);
2035 assert!(session.auto_commit());
2036 }
2037
2038 #[test]
2039 fn test_transaction_double_begin_error() {
2040 let db = GrafeoDB::new_in_memory();
2041 let mut session = db.session();
2042
2043 session.begin_tx().unwrap();
2044 let result = session.begin_tx();
2045
2046 assert!(result.is_err());
2047 session.rollback().unwrap();
2049 }
2050
2051 #[test]
2052 fn test_commit_without_transaction_error() {
2053 let db = GrafeoDB::new_in_memory();
2054 let mut session = db.session();
2055
2056 let result = session.commit();
2057 assert!(result.is_err());
2058 }
2059
2060 #[test]
2061 fn test_rollback_without_transaction_error() {
2062 let db = GrafeoDB::new_in_memory();
2063 let mut session = db.session();
2064
2065 let result = session.rollback();
2066 assert!(result.is_err());
2067 }
2068
2069 #[test]
2070 fn test_create_edge_in_transaction() {
2071 let db = GrafeoDB::new_in_memory();
2072 let mut session = db.session();
2073
2074 let alice = session.create_node(&["Person"]);
2076 let bob = session.create_node(&["Person"]);
2077
2078 session.begin_tx().unwrap();
2080 let edge_id = session.create_edge(alice, bob, "KNOWS");
2081
2082 assert!(session.edge_exists(edge_id));
2084
2085 session.commit().unwrap();
2087
2088 assert!(session.edge_exists(edge_id));
2090 }
2091
2092 #[test]
2093 fn test_neighbors_empty_node() {
2094 let db = GrafeoDB::new_in_memory();
2095 let session = db.session();
2096
2097 let lonely = session.create_node(&["Person"]);
2098
2099 assert!(session.get_neighbors_outgoing(lonely).is_empty());
2100 assert!(session.get_neighbors_incoming(lonely).is_empty());
2101 assert!(
2102 session
2103 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
2104 .is_empty()
2105 );
2106 }
2107 }
2108
2109 #[test]
2110 fn test_auto_gc_triggers_on_commit_interval() {
2111 use crate::config::Config;
2112
2113 let config = Config::in_memory().with_gc_interval(2);
2114 let db = GrafeoDB::with_config(config).unwrap();
2115 let mut session = db.session();
2116
2117 session.begin_tx().unwrap();
2119 session.create_node(&["A"]);
2120 session.commit().unwrap();
2121
2122 session.begin_tx().unwrap();
2124 session.create_node(&["B"]);
2125 session.commit().unwrap();
2126
2127 assert_eq!(db.node_count(), 2);
2129 }
2130
2131 #[test]
2132 fn test_query_timeout_config_propagates_to_session() {
2133 use crate::config::Config;
2134 use std::time::Duration;
2135
2136 let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
2137 let db = GrafeoDB::with_config(config).unwrap();
2138 let session = db.session();
2139
2140 assert!(session.query_deadline().is_some());
2142 }
2143
2144 #[test]
2145 fn test_no_query_timeout_returns_no_deadline() {
2146 let db = GrafeoDB::new_in_memory();
2147 let session = db.session();
2148
2149 assert!(session.query_deadline().is_none());
2151 }
2152
2153 #[test]
2154 fn test_graph_model_accessor() {
2155 use crate::config::GraphModel;
2156
2157 let db = GrafeoDB::new_in_memory();
2158 let session = db.session();
2159
2160 assert_eq!(session.graph_model(), GraphModel::Lpg);
2161 }
2162
2163 #[cfg(feature = "gql")]
2164 #[test]
2165 fn test_external_store_session() {
2166 use grafeo_core::graph::GraphStoreMut;
2167 use std::sync::Arc;
2168
2169 let config = crate::config::Config::in_memory();
2170 let store = Arc::new(grafeo_core::graph::lpg::LpgStore::new()) as Arc<dyn GraphStoreMut>;
2171 let db = GrafeoDB::with_store(store, config).unwrap();
2172
2173 let session = db.session();
2174
2175 session.execute("INSERT (:Test {name: 'hello'})").unwrap();
2177
2178 let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
2180 assert_eq!(result.row_count(), 1);
2181 }
2182}