1use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::{AdaptiveConfig, GraphModel};
19use crate::database::QueryResult;
20use crate::query::cache::QueryCache;
21use crate::transaction::TransactionManager;
22
23pub struct Session {
29 store: Arc<LpgStore>,
31 #[cfg(feature = "rdf")]
33 rdf_store: Arc<RdfStore>,
34 tx_manager: Arc<TransactionManager>,
36 query_cache: Arc<QueryCache>,
38 current_tx: Option<TxId>,
40 auto_commit: bool,
42 #[allow(dead_code)]
44 adaptive_config: AdaptiveConfig,
45 factorized_execution: bool,
47 graph_model: GraphModel,
49 query_timeout: Option<Duration>,
51 commit_counter: Arc<AtomicUsize>,
53 gc_interval: usize,
55 #[cfg(feature = "cdc")]
57 cdc_log: Arc<crate::cdc::CdcLog>,
58}
59
60impl Session {
61 #[allow(dead_code, clippy::too_many_arguments)]
63 pub(crate) fn with_adaptive(
64 store: Arc<LpgStore>,
65 tx_manager: Arc<TransactionManager>,
66 query_cache: Arc<QueryCache>,
67 adaptive_config: AdaptiveConfig,
68 factorized_execution: bool,
69 graph_model: GraphModel,
70 query_timeout: Option<Duration>,
71 commit_counter: Arc<AtomicUsize>,
72 gc_interval: usize,
73 ) -> Self {
74 Self {
75 store,
76 #[cfg(feature = "rdf")]
77 rdf_store: Arc::new(RdfStore::new()),
78 tx_manager,
79 query_cache,
80 current_tx: None,
81 auto_commit: true,
82 adaptive_config,
83 factorized_execution,
84 graph_model,
85 query_timeout,
86 commit_counter,
87 gc_interval,
88 #[cfg(feature = "cdc")]
89 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
90 }
91 }
92
93 #[cfg(feature = "cdc")]
95 pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
96 self.cdc_log = cdc_log;
97 }
98
99 #[cfg(feature = "rdf")]
101 #[allow(clippy::too_many_arguments)]
102 pub(crate) fn with_rdf_store_and_adaptive(
103 store: Arc<LpgStore>,
104 rdf_store: Arc<RdfStore>,
105 tx_manager: Arc<TransactionManager>,
106 query_cache: Arc<QueryCache>,
107 adaptive_config: AdaptiveConfig,
108 factorized_execution: bool,
109 graph_model: GraphModel,
110 query_timeout: Option<Duration>,
111 commit_counter: Arc<AtomicUsize>,
112 gc_interval: usize,
113 ) -> Self {
114 Self {
115 store,
116 rdf_store,
117 tx_manager,
118 query_cache,
119 current_tx: None,
120 auto_commit: true,
121 adaptive_config,
122 factorized_execution,
123 graph_model,
124 query_timeout,
125 commit_counter,
126 gc_interval,
127 #[cfg(feature = "cdc")]
128 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
129 }
130 }
131
132 #[must_use]
134 pub fn graph_model(&self) -> GraphModel {
135 self.graph_model
136 }
137
138 fn require_lpg(&self, language: &str) -> Result<()> {
140 if self.graph_model == GraphModel::Rdf {
141 return Err(grafeo_common::utils::error::Error::Internal(format!(
142 "This is an RDF database. {language} queries require an LPG database."
143 )));
144 }
145 Ok(())
146 }
147
148 #[cfg(feature = "gql")]
175 pub fn execute(&self, query: &str) -> Result<QueryResult> {
176 self.require_lpg("GQL")?;
177
178 use crate::query::{
179 Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
180 optimizer::Optimizer, processor::QueryLanguage,
181 };
182
183 let start_time = std::time::Instant::now();
184
185 let cache_key = CacheKey::new(query, QueryLanguage::Gql);
187
188 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
190 cached_plan
192 } else {
193 let logical_plan = gql_translator::translate(query)?;
197
198 let mut binder = Binder::new();
200 let _binding_context = binder.bind(&logical_plan)?;
201
202 let optimizer = Optimizer::from_store(&self.store);
204 let plan = optimizer.optimize(logical_plan)?;
205
206 self.query_cache.put_optimized(cache_key, plan.clone());
208
209 plan
210 };
211
212 let (viewing_epoch, tx_id) = self.get_transaction_context();
214
215 let planner = Planner::with_context(
218 Arc::clone(&self.store),
219 Arc::clone(&self.tx_manager),
220 tx_id,
221 viewing_epoch,
222 )
223 .with_factorized_execution(self.factorized_execution);
224 let mut physical_plan = planner.plan(&optimized_plan)?;
225
226 let executor = Executor::with_columns(physical_plan.columns.clone())
228 .with_deadline(self.query_deadline());
229 let mut result = executor.execute(physical_plan.operator.as_mut())?;
230
231 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
233 let rows_scanned = result.rows.len() as u64;
234 result.execution_time_ms = Some(elapsed_ms);
235 result.rows_scanned = Some(rows_scanned);
236
237 Ok(result)
238 }
239
240 #[cfg(feature = "gql")]
246 pub fn execute_with_params(
247 &self,
248 query: &str,
249 params: std::collections::HashMap<String, Value>,
250 ) -> Result<QueryResult> {
251 self.require_lpg("GQL")?;
252
253 use crate::query::processor::{QueryLanguage, QueryProcessor};
254
255 let (viewing_epoch, tx_id) = self.get_transaction_context();
257
258 let processor =
260 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
261
262 let processor = if let Some(tx_id) = tx_id {
264 processor.with_tx_context(viewing_epoch, tx_id)
265 } else {
266 processor
267 };
268
269 processor.process(query, QueryLanguage::Gql, Some(¶ms))
270 }
271
272 #[cfg(not(any(feature = "gql", feature = "cypher")))]
278 pub fn execute_with_params(
279 &self,
280 _query: &str,
281 _params: std::collections::HashMap<String, Value>,
282 ) -> Result<QueryResult> {
283 Err(grafeo_common::utils::error::Error::Internal(
284 "No query language enabled".to_string(),
285 ))
286 }
287
288 #[cfg(not(any(feature = "gql", feature = "cypher")))]
294 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
295 Err(grafeo_common::utils::error::Error::Internal(
296 "No query language enabled".to_string(),
297 ))
298 }
299
300 #[cfg(feature = "cypher")]
306 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
307 use crate::query::{
308 Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
309 optimizer::Optimizer, processor::QueryLanguage,
310 };
311
312 let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
314
315 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
317 cached_plan
318 } else {
319 let logical_plan = cypher_translator::translate(query)?;
321
322 let mut binder = Binder::new();
324 let _binding_context = binder.bind(&logical_plan)?;
325
326 let optimizer = Optimizer::from_store(&self.store);
328 let plan = optimizer.optimize(logical_plan)?;
329
330 self.query_cache.put_optimized(cache_key, plan.clone());
332
333 plan
334 };
335
336 let (viewing_epoch, tx_id) = self.get_transaction_context();
338
339 let planner = Planner::with_context(
341 Arc::clone(&self.store),
342 Arc::clone(&self.tx_manager),
343 tx_id,
344 viewing_epoch,
345 )
346 .with_factorized_execution(self.factorized_execution);
347 let mut physical_plan = planner.plan(&optimized_plan)?;
348
349 let executor = Executor::with_columns(physical_plan.columns.clone())
351 .with_deadline(self.query_deadline());
352 let result = executor.execute(physical_plan.operator.as_mut())?;
353 Ok(result)
354 }
355
356 #[cfg(feature = "gremlin")]
380 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
381 use crate::query::{
382 Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
383 };
384
385 let logical_plan = gremlin_translator::translate(query)?;
387
388 let mut binder = Binder::new();
390 let _binding_context = binder.bind(&logical_plan)?;
391
392 let optimizer = Optimizer::from_store(&self.store);
394 let optimized_plan = optimizer.optimize(logical_plan)?;
395
396 let (viewing_epoch, tx_id) = self.get_transaction_context();
398
399 let planner = Planner::with_context(
401 Arc::clone(&self.store),
402 Arc::clone(&self.tx_manager),
403 tx_id,
404 viewing_epoch,
405 )
406 .with_factorized_execution(self.factorized_execution);
407 let mut physical_plan = planner.plan(&optimized_plan)?;
408
409 let executor = Executor::with_columns(physical_plan.columns.clone())
411 .with_deadline(self.query_deadline());
412 let result = executor.execute(physical_plan.operator.as_mut())?;
413 Ok(result)
414 }
415
416 #[cfg(feature = "gremlin")]
422 pub fn execute_gremlin_with_params(
423 &self,
424 query: &str,
425 params: std::collections::HashMap<String, Value>,
426 ) -> Result<QueryResult> {
427 use crate::query::processor::{QueryLanguage, QueryProcessor};
428
429 let (viewing_epoch, tx_id) = self.get_transaction_context();
431
432 let processor =
434 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
435
436 let processor = if let Some(tx_id) = tx_id {
438 processor.with_tx_context(viewing_epoch, tx_id)
439 } else {
440 processor
441 };
442
443 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
444 }
445
446 #[cfg(feature = "graphql")]
470 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
471 use crate::query::{
472 Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
473 };
474
475 let logical_plan = graphql_translator::translate(query)?;
477
478 let mut binder = Binder::new();
480 let _binding_context = binder.bind(&logical_plan)?;
481
482 let optimizer = Optimizer::from_store(&self.store);
484 let optimized_plan = optimizer.optimize(logical_plan)?;
485
486 let (viewing_epoch, tx_id) = self.get_transaction_context();
488
489 let planner = Planner::with_context(
491 Arc::clone(&self.store),
492 Arc::clone(&self.tx_manager),
493 tx_id,
494 viewing_epoch,
495 )
496 .with_factorized_execution(self.factorized_execution);
497 let mut physical_plan = planner.plan(&optimized_plan)?;
498
499 let executor = Executor::with_columns(physical_plan.columns.clone())
501 .with_deadline(self.query_deadline());
502 let result = executor.execute(physical_plan.operator.as_mut())?;
503 Ok(result)
504 }
505
506 #[cfg(feature = "graphql")]
512 pub fn execute_graphql_with_params(
513 &self,
514 query: &str,
515 params: std::collections::HashMap<String, Value>,
516 ) -> Result<QueryResult> {
517 use crate::query::processor::{QueryLanguage, QueryProcessor};
518
519 let (viewing_epoch, tx_id) = self.get_transaction_context();
521
522 let processor =
524 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
525
526 let processor = if let Some(tx_id) = tx_id {
528 processor.with_tx_context(viewing_epoch, tx_id)
529 } else {
530 processor
531 };
532
533 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
534 }
535
536 #[cfg(feature = "sql-pgq")]
561 pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
562 use crate::query::{
563 Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
564 plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
565 };
566
567 let logical_plan = sql_pgq_translator::translate(query)?;
569
570 if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
572 return Ok(QueryResult {
573 columns: vec!["status".into()],
574 column_types: vec![grafeo_common::types::LogicalType::String],
575 rows: vec![vec![Value::from(format!(
576 "Property graph '{}' created ({} node tables, {} edge tables)",
577 cpg.name,
578 cpg.node_tables.len(),
579 cpg.edge_tables.len()
580 ))]],
581 execution_time_ms: None,
582 rows_scanned: None,
583 });
584 }
585
586 let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
588
589 let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
591 cached_plan
592 } else {
593 let mut binder = Binder::new();
595 let _binding_context = binder.bind(&logical_plan)?;
596
597 let optimizer = Optimizer::from_store(&self.store);
599 let plan = optimizer.optimize(logical_plan)?;
600
601 self.query_cache.put_optimized(cache_key, plan.clone());
603
604 plan
605 };
606
607 let (viewing_epoch, tx_id) = self.get_transaction_context();
609
610 let planner = Planner::with_context(
612 Arc::clone(&self.store),
613 Arc::clone(&self.tx_manager),
614 tx_id,
615 viewing_epoch,
616 )
617 .with_factorized_execution(self.factorized_execution);
618 let mut physical_plan = planner.plan(&optimized_plan)?;
619
620 let executor = Executor::with_columns(physical_plan.columns.clone())
622 .with_deadline(self.query_deadline());
623 let result = executor.execute(physical_plan.operator.as_mut())?;
624 Ok(result)
625 }
626
627 #[cfg(feature = "sql-pgq")]
633 pub fn execute_sql_with_params(
634 &self,
635 query: &str,
636 params: std::collections::HashMap<String, Value>,
637 ) -> Result<QueryResult> {
638 use crate::query::processor::{QueryLanguage, QueryProcessor};
639
640 let (viewing_epoch, tx_id) = self.get_transaction_context();
642
643 let processor =
645 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
646
647 let processor = if let Some(tx_id) = tx_id {
649 processor.with_tx_context(viewing_epoch, tx_id)
650 } else {
651 processor
652 };
653
654 processor.process(query, QueryLanguage::SqlPgq, Some(¶ms))
655 }
656
657 #[cfg(all(feature = "sparql", feature = "rdf"))]
663 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
664 use crate::query::{
665 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
666 };
667
668 let logical_plan = sparql_translator::translate(query)?;
670
671 let optimizer = Optimizer::from_store(&self.store);
673 let optimized_plan = optimizer.optimize(logical_plan)?;
674
675 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
677 let mut physical_plan = planner.plan(&optimized_plan)?;
678
679 let executor = Executor::with_columns(physical_plan.columns.clone())
681 .with_deadline(self.query_deadline());
682 executor.execute(physical_plan.operator.as_mut())
683 }
684
685 #[cfg(all(feature = "sparql", feature = "rdf"))]
691 pub fn execute_sparql_with_params(
692 &self,
693 query: &str,
694 _params: std::collections::HashMap<String, Value>,
695 ) -> Result<QueryResult> {
696 self.execute_sparql(query)
699 }
700
701 pub fn execute_language(
710 &self,
711 query: &str,
712 language: &str,
713 params: Option<std::collections::HashMap<String, Value>>,
714 ) -> Result<QueryResult> {
715 match language {
716 "gql" => {
717 if let Some(p) = params {
718 self.execute_with_params(query, p)
719 } else {
720 self.execute(query)
721 }
722 }
723 #[cfg(feature = "cypher")]
724 "cypher" => {
725 if let Some(p) = params {
726 use crate::query::processor::{QueryLanguage, QueryProcessor};
727 let processor = QueryProcessor::for_lpg_with_tx(
728 Arc::clone(&self.store),
729 Arc::clone(&self.tx_manager),
730 );
731 let (viewing_epoch, tx_id) = self.get_transaction_context();
732 let processor = if let Some(tx_id) = tx_id {
733 processor.with_tx_context(viewing_epoch, tx_id)
734 } else {
735 processor
736 };
737 processor.process(query, QueryLanguage::Cypher, Some(&p))
738 } else {
739 self.execute_cypher(query)
740 }
741 }
742 #[cfg(feature = "gremlin")]
743 "gremlin" => {
744 if let Some(p) = params {
745 self.execute_gremlin_with_params(query, p)
746 } else {
747 self.execute_gremlin(query)
748 }
749 }
750 #[cfg(feature = "graphql")]
751 "graphql" => {
752 if let Some(p) = params {
753 self.execute_graphql_with_params(query, p)
754 } else {
755 self.execute_graphql(query)
756 }
757 }
758 #[cfg(feature = "sql-pgq")]
759 "sql" | "sql-pgq" => {
760 if let Some(p) = params {
761 self.execute_sql_with_params(query, p)
762 } else {
763 self.execute_sql(query)
764 }
765 }
766 #[cfg(all(feature = "sparql", feature = "rdf"))]
767 "sparql" => {
768 if let Some(p) = params {
769 self.execute_sparql_with_params(query, p)
770 } else {
771 self.execute_sparql(query)
772 }
773 }
774 other => Err(grafeo_common::utils::error::Error::Query(
775 grafeo_common::utils::error::QueryError::new(
776 grafeo_common::utils::error::QueryErrorKind::Semantic,
777 format!("Unknown query language: '{other}'"),
778 ),
779 )),
780 }
781 }
782
783 pub fn begin_tx(&mut self) -> Result<()> {
806 if self.current_tx.is_some() {
807 return Err(grafeo_common::utils::error::Error::Transaction(
808 grafeo_common::utils::error::TransactionError::InvalidState(
809 "Transaction already active".to_string(),
810 ),
811 ));
812 }
813
814 let tx_id = self.tx_manager.begin();
815 self.current_tx = Some(tx_id);
816 Ok(())
817 }
818
819 pub fn begin_tx_with_isolation(
827 &mut self,
828 isolation_level: crate::transaction::IsolationLevel,
829 ) -> Result<()> {
830 if self.current_tx.is_some() {
831 return Err(grafeo_common::utils::error::Error::Transaction(
832 grafeo_common::utils::error::TransactionError::InvalidState(
833 "Transaction already active".to_string(),
834 ),
835 ));
836 }
837
838 let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
839 self.current_tx = Some(tx_id);
840 Ok(())
841 }
842
843 pub fn commit(&mut self) -> Result<()> {
851 let tx_id = self.current_tx.take().ok_or_else(|| {
852 grafeo_common::utils::error::Error::Transaction(
853 grafeo_common::utils::error::TransactionError::InvalidState(
854 "No active transaction".to_string(),
855 ),
856 )
857 })?;
858
859 #[cfg(feature = "rdf")]
861 self.rdf_store.commit_tx(tx_id);
862
863 self.tx_manager.commit(tx_id)?;
864
865 self.store.sync_epoch(self.tx_manager.current_epoch());
869
870 if self.gc_interval > 0 {
872 let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
873 if count.is_multiple_of(self.gc_interval) {
874 let min_epoch = self.tx_manager.min_active_epoch();
875 self.store.gc_versions(min_epoch);
876 self.tx_manager.gc();
877 }
878 }
879
880 Ok(())
881 }
882
883 pub fn rollback(&mut self) -> Result<()> {
907 let tx_id = self.current_tx.take().ok_or_else(|| {
908 grafeo_common::utils::error::Error::Transaction(
909 grafeo_common::utils::error::TransactionError::InvalidState(
910 "No active transaction".to_string(),
911 ),
912 )
913 })?;
914
915 self.store.discard_uncommitted_versions(tx_id);
917
918 #[cfg(feature = "rdf")]
920 self.rdf_store.rollback_tx(tx_id);
921
922 self.tx_manager.abort(tx_id)
924 }
925
926 #[must_use]
928 pub fn in_transaction(&self) -> bool {
929 self.current_tx.is_some()
930 }
931
932 pub fn set_auto_commit(&mut self, auto_commit: bool) {
934 self.auto_commit = auto_commit;
935 }
936
937 #[must_use]
939 pub fn auto_commit(&self) -> bool {
940 self.auto_commit
941 }
942
943 #[must_use]
945 fn query_deadline(&self) -> Option<Instant> {
946 self.query_timeout.map(|d| Instant::now() + d)
947 }
948
949 #[must_use]
955 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
956 if let Some(tx_id) = self.current_tx {
957 let epoch = self
959 .tx_manager
960 .start_epoch(tx_id)
961 .unwrap_or_else(|| self.tx_manager.current_epoch());
962 (epoch, Some(tx_id))
963 } else {
964 (self.tx_manager.current_epoch(), None)
966 }
967 }
968
969 pub fn create_node(&self, labels: &[&str]) -> NodeId {
974 let (epoch, tx_id) = self.get_transaction_context();
975 self.store
976 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
977 }
978
979 pub fn create_node_with_props<'a>(
983 &self,
984 labels: &[&str],
985 properties: impl IntoIterator<Item = (&'a str, Value)>,
986 ) -> NodeId {
987 let (epoch, tx_id) = self.get_transaction_context();
988 self.store.create_node_with_props_versioned(
989 labels,
990 properties.into_iter().map(|(k, v)| (k, v)),
991 epoch,
992 tx_id.unwrap_or(TxId::SYSTEM),
993 )
994 }
995
996 pub fn create_edge(
1001 &self,
1002 src: NodeId,
1003 dst: NodeId,
1004 edge_type: &str,
1005 ) -> grafeo_common::types::EdgeId {
1006 let (epoch, tx_id) = self.get_transaction_context();
1007 self.store
1008 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1009 }
1010
1011 #[must_use]
1039 pub fn get_node(&self, id: NodeId) -> Option<Node> {
1040 let (epoch, tx_id) = self.get_transaction_context();
1041 self.store
1042 .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1043 }
1044
1045 #[must_use]
1069 pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
1070 self.get_node(id)
1071 .and_then(|node| node.get_property(key).cloned())
1072 }
1073
1074 #[must_use]
1081 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1082 let (epoch, tx_id) = self.get_transaction_context();
1083 self.store
1084 .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1085 }
1086
1087 #[must_use]
1113 pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1114 self.store.edges_from(node, Direction::Outgoing).collect()
1115 }
1116
1117 #[must_use]
1126 pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1127 self.store.edges_from(node, Direction::Incoming).collect()
1128 }
1129
1130 #[must_use]
1142 pub fn get_neighbors_outgoing_by_type(
1143 &self,
1144 node: NodeId,
1145 edge_type: &str,
1146 ) -> Vec<(NodeId, EdgeId)> {
1147 self.store
1148 .edges_from(node, Direction::Outgoing)
1149 .filter(|(_, edge_id)| {
1150 self.get_edge(*edge_id)
1151 .is_some_and(|e| e.edge_type.as_str() == edge_type)
1152 })
1153 .collect()
1154 }
1155
1156 #[must_use]
1163 pub fn node_exists(&self, id: NodeId) -> bool {
1164 self.get_node(id).is_some()
1165 }
1166
1167 #[must_use]
1169 pub fn edge_exists(&self, id: EdgeId) -> bool {
1170 self.get_edge(id).is_some()
1171 }
1172
1173 #[must_use]
1177 pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1178 let out = self.store.out_degree(node);
1179 let in_degree = self.store.in_degree(node);
1180 (out, in_degree)
1181 }
1182
1183 #[must_use]
1193 pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1194 let (epoch, tx_id) = self.get_transaction_context();
1195 let tx = tx_id.unwrap_or(TxId::SYSTEM);
1196 ids.iter()
1197 .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1198 .collect()
1199 }
1200
1201 #[cfg(feature = "cdc")]
1205 pub fn history(
1206 &self,
1207 entity_id: impl Into<crate::cdc::EntityId>,
1208 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1209 Ok(self.cdc_log.history(entity_id.into()))
1210 }
1211
1212 #[cfg(feature = "cdc")]
1214 pub fn history_since(
1215 &self,
1216 entity_id: impl Into<crate::cdc::EntityId>,
1217 since_epoch: EpochId,
1218 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1219 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1220 }
1221
1222 #[cfg(feature = "cdc")]
1224 pub fn changes_between(
1225 &self,
1226 start_epoch: EpochId,
1227 end_epoch: EpochId,
1228 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1229 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1230 }
1231}
1232
1233#[cfg(test)]
1234mod tests {
1235 use crate::database::GrafeoDB;
1236
1237 #[test]
1238 fn test_session_create_node() {
1239 let db = GrafeoDB::new_in_memory();
1240 let session = db.session();
1241
1242 let id = session.create_node(&["Person"]);
1243 assert!(id.is_valid());
1244 assert_eq!(db.node_count(), 1);
1245 }
1246
1247 #[test]
1248 fn test_session_transaction() {
1249 let db = GrafeoDB::new_in_memory();
1250 let mut session = db.session();
1251
1252 assert!(!session.in_transaction());
1253
1254 session.begin_tx().unwrap();
1255 assert!(session.in_transaction());
1256
1257 session.commit().unwrap();
1258 assert!(!session.in_transaction());
1259 }
1260
1261 #[test]
1262 fn test_session_transaction_context() {
1263 let db = GrafeoDB::new_in_memory();
1264 let mut session = db.session();
1265
1266 let (_epoch1, tx_id1) = session.get_transaction_context();
1268 assert!(tx_id1.is_none());
1269
1270 session.begin_tx().unwrap();
1272 let (epoch2, tx_id2) = session.get_transaction_context();
1273 assert!(tx_id2.is_some());
1274 let _ = epoch2; session.commit().unwrap();
1279 let (epoch3, tx_id3) = session.get_transaction_context();
1280 assert!(tx_id3.is_none());
1281 assert!(epoch3.as_u64() >= epoch2.as_u64());
1283 }
1284
1285 #[test]
1286 fn test_session_rollback() {
1287 let db = GrafeoDB::new_in_memory();
1288 let mut session = db.session();
1289
1290 session.begin_tx().unwrap();
1291 session.rollback().unwrap();
1292 assert!(!session.in_transaction());
1293 }
1294
1295 #[test]
1296 fn test_session_rollback_discards_versions() {
1297 use grafeo_common::types::TxId;
1298
1299 let db = GrafeoDB::new_in_memory();
1300
1301 let node_before = db.store().create_node(&["Person"]);
1303 assert!(node_before.is_valid());
1304 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1305
1306 let mut session = db.session();
1308 session.begin_tx().unwrap();
1309 let tx_id = session.current_tx.unwrap();
1310
1311 let epoch = db.store().current_epoch();
1313 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1314 assert!(node_in_tx.is_valid());
1315
1316 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1318
1319 session.rollback().unwrap();
1321 assert!(!session.in_transaction());
1322
1323 let count_after = db.node_count();
1326 assert_eq!(
1327 count_after, 1,
1328 "Rollback should discard uncommitted node, but got {count_after}"
1329 );
1330
1331 let current_epoch = db.store().current_epoch();
1333 assert!(
1334 db.store()
1335 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1336 .is_some(),
1337 "Original node should still exist"
1338 );
1339
1340 assert!(
1342 db.store()
1343 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1344 .is_none(),
1345 "Transaction node should be gone"
1346 );
1347 }
1348
1349 #[test]
1350 fn test_session_create_node_in_transaction() {
1351 let db = GrafeoDB::new_in_memory();
1353
1354 let node_before = db.create_node(&["Person"]);
1356 assert!(node_before.is_valid());
1357 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1358
1359 let mut session = db.session();
1361 session.begin_tx().unwrap();
1362
1363 let node_in_tx = session.create_node(&["Person"]);
1365 assert!(node_in_tx.is_valid());
1366
1367 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1369
1370 session.rollback().unwrap();
1372
1373 let count_after = db.node_count();
1375 assert_eq!(
1376 count_after, 1,
1377 "Rollback should discard node created via session.create_node(), but got {count_after}"
1378 );
1379 }
1380
1381 #[test]
1382 fn test_session_create_node_with_props_in_transaction() {
1383 use grafeo_common::types::Value;
1384
1385 let db = GrafeoDB::new_in_memory();
1387
1388 db.create_node(&["Person"]);
1390 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1391
1392 let mut session = db.session();
1394 session.begin_tx().unwrap();
1395
1396 let node_in_tx =
1397 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1398 assert!(node_in_tx.is_valid());
1399
1400 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1402
1403 session.rollback().unwrap();
1405
1406 let count_after = db.node_count();
1408 assert_eq!(
1409 count_after, 1,
1410 "Rollback should discard node created via session.create_node_with_props()"
1411 );
1412 }
1413
1414 #[cfg(feature = "gql")]
1415 mod gql_tests {
1416 use super::*;
1417
1418 #[test]
1419 fn test_gql_query_execution() {
1420 let db = GrafeoDB::new_in_memory();
1421 let session = db.session();
1422
1423 session.create_node(&["Person"]);
1425 session.create_node(&["Person"]);
1426 session.create_node(&["Animal"]);
1427
1428 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1430
1431 assert_eq!(result.row_count(), 2);
1433 assert_eq!(result.column_count(), 1);
1434 assert_eq!(result.columns[0], "n");
1435 }
1436
1437 #[test]
1438 fn test_gql_empty_result() {
1439 let db = GrafeoDB::new_in_memory();
1440 let session = db.session();
1441
1442 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1444
1445 assert_eq!(result.row_count(), 0);
1446 }
1447
1448 #[test]
1449 fn test_gql_parse_error() {
1450 let db = GrafeoDB::new_in_memory();
1451 let session = db.session();
1452
1453 let result = session.execute("MATCH (n RETURN n");
1455
1456 assert!(result.is_err());
1457 }
1458
1459 #[test]
1460 fn test_gql_relationship_traversal() {
1461 let db = GrafeoDB::new_in_memory();
1462 let session = db.session();
1463
1464 let alice = session.create_node(&["Person"]);
1466 let bob = session.create_node(&["Person"]);
1467 let charlie = session.create_node(&["Person"]);
1468
1469 session.create_edge(alice, bob, "KNOWS");
1470 session.create_edge(alice, charlie, "KNOWS");
1471
1472 let result = session
1474 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1475 .unwrap();
1476
1477 assert_eq!(result.row_count(), 2);
1479 assert_eq!(result.column_count(), 2);
1480 assert_eq!(result.columns[0], "a");
1481 assert_eq!(result.columns[1], "b");
1482 }
1483
1484 #[test]
1485 fn test_gql_relationship_with_type_filter() {
1486 let db = GrafeoDB::new_in_memory();
1487 let session = db.session();
1488
1489 let alice = session.create_node(&["Person"]);
1491 let bob = session.create_node(&["Person"]);
1492 let charlie = session.create_node(&["Person"]);
1493
1494 session.create_edge(alice, bob, "KNOWS");
1495 session.create_edge(alice, charlie, "WORKS_WITH");
1496
1497 let result = session
1499 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1500 .unwrap();
1501
1502 assert_eq!(result.row_count(), 1);
1504 }
1505
1506 #[test]
1507 fn test_gql_semantic_error_undefined_variable() {
1508 let db = GrafeoDB::new_in_memory();
1509 let session = db.session();
1510
1511 let result = session.execute("MATCH (n:Person) RETURN x");
1513
1514 assert!(result.is_err());
1516 let Err(err) = result else {
1517 panic!("Expected error")
1518 };
1519 assert!(
1520 err.to_string().contains("Undefined variable"),
1521 "Expected undefined variable error, got: {}",
1522 err
1523 );
1524 }
1525
1526 #[test]
1527 fn test_gql_where_clause_property_filter() {
1528 use grafeo_common::types::Value;
1529
1530 let db = GrafeoDB::new_in_memory();
1531 let session = db.session();
1532
1533 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1535 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1536 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1537
1538 let result = session
1540 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1541 .unwrap();
1542
1543 assert_eq!(result.row_count(), 2);
1545 }
1546
1547 #[test]
1548 fn test_gql_where_clause_equality() {
1549 use grafeo_common::types::Value;
1550
1551 let db = GrafeoDB::new_in_memory();
1552 let session = db.session();
1553
1554 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1556 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1557 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1558
1559 let result = session
1561 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1562 .unwrap();
1563
1564 assert_eq!(result.row_count(), 2);
1566 }
1567
1568 #[test]
1569 fn test_gql_return_property_access() {
1570 use grafeo_common::types::Value;
1571
1572 let db = GrafeoDB::new_in_memory();
1573 let session = db.session();
1574
1575 session.create_node_with_props(
1577 &["Person"],
1578 [
1579 ("name", Value::String("Alice".into())),
1580 ("age", Value::Int64(30)),
1581 ],
1582 );
1583 session.create_node_with_props(
1584 &["Person"],
1585 [
1586 ("name", Value::String("Bob".into())),
1587 ("age", Value::Int64(25)),
1588 ],
1589 );
1590
1591 let result = session
1593 .execute("MATCH (n:Person) RETURN n.name, n.age")
1594 .unwrap();
1595
1596 assert_eq!(result.row_count(), 2);
1598 assert_eq!(result.column_count(), 2);
1599 assert_eq!(result.columns[0], "n.name");
1600 assert_eq!(result.columns[1], "n.age");
1601
1602 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1604 assert!(names.contains(&&Value::String("Alice".into())));
1605 assert!(names.contains(&&Value::String("Bob".into())));
1606 }
1607
1608 #[test]
1609 fn test_gql_return_mixed_expressions() {
1610 use grafeo_common::types::Value;
1611
1612 let db = GrafeoDB::new_in_memory();
1613 let session = db.session();
1614
1615 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1617
1618 let result = session
1620 .execute("MATCH (n:Person) RETURN n, n.name")
1621 .unwrap();
1622
1623 assert_eq!(result.row_count(), 1);
1624 assert_eq!(result.column_count(), 2);
1625 assert_eq!(result.columns[0], "n");
1626 assert_eq!(result.columns[1], "n.name");
1627
1628 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1630 }
1631 }
1632
1633 #[cfg(feature = "cypher")]
1634 mod cypher_tests {
1635 use super::*;
1636
1637 #[test]
1638 fn test_cypher_query_execution() {
1639 let db = GrafeoDB::new_in_memory();
1640 let session = db.session();
1641
1642 session.create_node(&["Person"]);
1644 session.create_node(&["Person"]);
1645 session.create_node(&["Animal"]);
1646
1647 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1649
1650 assert_eq!(result.row_count(), 2);
1652 assert_eq!(result.column_count(), 1);
1653 assert_eq!(result.columns[0], "n");
1654 }
1655
1656 #[test]
1657 fn test_cypher_empty_result() {
1658 let db = GrafeoDB::new_in_memory();
1659 let session = db.session();
1660
1661 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1663
1664 assert_eq!(result.row_count(), 0);
1665 }
1666
1667 #[test]
1668 fn test_cypher_parse_error() {
1669 let db = GrafeoDB::new_in_memory();
1670 let session = db.session();
1671
1672 let result = session.execute_cypher("MATCH (n RETURN n");
1674
1675 assert!(result.is_err());
1676 }
1677 }
1678
1679 mod direct_lookup_tests {
1682 use super::*;
1683 use grafeo_common::types::Value;
1684
1685 #[test]
1686 fn test_get_node() {
1687 let db = GrafeoDB::new_in_memory();
1688 let session = db.session();
1689
1690 let id = session.create_node(&["Person"]);
1691 let node = session.get_node(id);
1692
1693 assert!(node.is_some());
1694 let node = node.unwrap();
1695 assert_eq!(node.id, id);
1696 }
1697
1698 #[test]
1699 fn test_get_node_not_found() {
1700 use grafeo_common::types::NodeId;
1701
1702 let db = GrafeoDB::new_in_memory();
1703 let session = db.session();
1704
1705 let node = session.get_node(NodeId::new(9999));
1707 assert!(node.is_none());
1708 }
1709
1710 #[test]
1711 fn test_get_node_property() {
1712 let db = GrafeoDB::new_in_memory();
1713 let session = db.session();
1714
1715 let id = session
1716 .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1717
1718 let name = session.get_node_property(id, "name");
1719 assert_eq!(name, Some(Value::String("Alice".into())));
1720
1721 let missing = session.get_node_property(id, "missing");
1723 assert!(missing.is_none());
1724 }
1725
1726 #[test]
1727 fn test_get_edge() {
1728 let db = GrafeoDB::new_in_memory();
1729 let session = db.session();
1730
1731 let alice = session.create_node(&["Person"]);
1732 let bob = session.create_node(&["Person"]);
1733 let edge_id = session.create_edge(alice, bob, "KNOWS");
1734
1735 let edge = session.get_edge(edge_id);
1736 assert!(edge.is_some());
1737 let edge = edge.unwrap();
1738 assert_eq!(edge.id, edge_id);
1739 assert_eq!(edge.src, alice);
1740 assert_eq!(edge.dst, bob);
1741 }
1742
1743 #[test]
1744 fn test_get_edge_not_found() {
1745 use grafeo_common::types::EdgeId;
1746
1747 let db = GrafeoDB::new_in_memory();
1748 let session = db.session();
1749
1750 let edge = session.get_edge(EdgeId::new(9999));
1751 assert!(edge.is_none());
1752 }
1753
1754 #[test]
1755 fn test_get_neighbors_outgoing() {
1756 let db = GrafeoDB::new_in_memory();
1757 let session = db.session();
1758
1759 let alice = session.create_node(&["Person"]);
1760 let bob = session.create_node(&["Person"]);
1761 let carol = session.create_node(&["Person"]);
1762
1763 session.create_edge(alice, bob, "KNOWS");
1764 session.create_edge(alice, carol, "KNOWS");
1765
1766 let neighbors = session.get_neighbors_outgoing(alice);
1767 assert_eq!(neighbors.len(), 2);
1768
1769 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1770 assert!(neighbor_ids.contains(&bob));
1771 assert!(neighbor_ids.contains(&carol));
1772 }
1773
1774 #[test]
1775 fn test_get_neighbors_incoming() {
1776 let db = GrafeoDB::new_in_memory();
1777 let session = db.session();
1778
1779 let alice = session.create_node(&["Person"]);
1780 let bob = session.create_node(&["Person"]);
1781 let carol = session.create_node(&["Person"]);
1782
1783 session.create_edge(bob, alice, "KNOWS");
1784 session.create_edge(carol, alice, "KNOWS");
1785
1786 let neighbors = session.get_neighbors_incoming(alice);
1787 assert_eq!(neighbors.len(), 2);
1788
1789 let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1790 assert!(neighbor_ids.contains(&bob));
1791 assert!(neighbor_ids.contains(&carol));
1792 }
1793
1794 #[test]
1795 fn test_get_neighbors_outgoing_by_type() {
1796 let db = GrafeoDB::new_in_memory();
1797 let session = db.session();
1798
1799 let alice = session.create_node(&["Person"]);
1800 let bob = session.create_node(&["Person"]);
1801 let company = session.create_node(&["Company"]);
1802
1803 session.create_edge(alice, bob, "KNOWS");
1804 session.create_edge(alice, company, "WORKS_AT");
1805
1806 let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1807 assert_eq!(knows_neighbors.len(), 1);
1808 assert_eq!(knows_neighbors[0].0, bob);
1809
1810 let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1811 assert_eq!(works_neighbors.len(), 1);
1812 assert_eq!(works_neighbors[0].0, company);
1813
1814 let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1816 assert!(no_neighbors.is_empty());
1817 }
1818
1819 #[test]
1820 fn test_node_exists() {
1821 use grafeo_common::types::NodeId;
1822
1823 let db = GrafeoDB::new_in_memory();
1824 let session = db.session();
1825
1826 let id = session.create_node(&["Person"]);
1827
1828 assert!(session.node_exists(id));
1829 assert!(!session.node_exists(NodeId::new(9999)));
1830 }
1831
1832 #[test]
1833 fn test_edge_exists() {
1834 use grafeo_common::types::EdgeId;
1835
1836 let db = GrafeoDB::new_in_memory();
1837 let session = db.session();
1838
1839 let alice = session.create_node(&["Person"]);
1840 let bob = session.create_node(&["Person"]);
1841 let edge_id = session.create_edge(alice, bob, "KNOWS");
1842
1843 assert!(session.edge_exists(edge_id));
1844 assert!(!session.edge_exists(EdgeId::new(9999)));
1845 }
1846
1847 #[test]
1848 fn test_get_degree() {
1849 let db = GrafeoDB::new_in_memory();
1850 let session = db.session();
1851
1852 let alice = session.create_node(&["Person"]);
1853 let bob = session.create_node(&["Person"]);
1854 let carol = session.create_node(&["Person"]);
1855
1856 session.create_edge(alice, bob, "KNOWS");
1858 session.create_edge(alice, carol, "KNOWS");
1859 session.create_edge(bob, alice, "KNOWS");
1861
1862 let (out_degree, in_degree) = session.get_degree(alice);
1863 assert_eq!(out_degree, 2);
1864 assert_eq!(in_degree, 1);
1865
1866 let lonely = session.create_node(&["Person"]);
1868 let (out, in_deg) = session.get_degree(lonely);
1869 assert_eq!(out, 0);
1870 assert_eq!(in_deg, 0);
1871 }
1872
1873 #[test]
1874 fn test_get_nodes_batch() {
1875 let db = GrafeoDB::new_in_memory();
1876 let session = db.session();
1877
1878 let alice = session.create_node(&["Person"]);
1879 let bob = session.create_node(&["Person"]);
1880 let carol = session.create_node(&["Person"]);
1881
1882 let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1883 assert_eq!(nodes.len(), 3);
1884 assert!(nodes[0].is_some());
1885 assert!(nodes[1].is_some());
1886 assert!(nodes[2].is_some());
1887
1888 use grafeo_common::types::NodeId;
1890 let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1891 assert_eq!(nodes_with_missing.len(), 3);
1892 assert!(nodes_with_missing[0].is_some());
1893 assert!(nodes_with_missing[1].is_none()); assert!(nodes_with_missing[2].is_some());
1895 }
1896
1897 #[test]
1898 fn test_auto_commit_setting() {
1899 let db = GrafeoDB::new_in_memory();
1900 let mut session = db.session();
1901
1902 assert!(session.auto_commit());
1904
1905 session.set_auto_commit(false);
1906 assert!(!session.auto_commit());
1907
1908 session.set_auto_commit(true);
1909 assert!(session.auto_commit());
1910 }
1911
1912 #[test]
1913 fn test_transaction_double_begin_error() {
1914 let db = GrafeoDB::new_in_memory();
1915 let mut session = db.session();
1916
1917 session.begin_tx().unwrap();
1918 let result = session.begin_tx();
1919
1920 assert!(result.is_err());
1921 session.rollback().unwrap();
1923 }
1924
1925 #[test]
1926 fn test_commit_without_transaction_error() {
1927 let db = GrafeoDB::new_in_memory();
1928 let mut session = db.session();
1929
1930 let result = session.commit();
1931 assert!(result.is_err());
1932 }
1933
1934 #[test]
1935 fn test_rollback_without_transaction_error() {
1936 let db = GrafeoDB::new_in_memory();
1937 let mut session = db.session();
1938
1939 let result = session.rollback();
1940 assert!(result.is_err());
1941 }
1942
1943 #[test]
1944 fn test_create_edge_in_transaction() {
1945 let db = GrafeoDB::new_in_memory();
1946 let mut session = db.session();
1947
1948 let alice = session.create_node(&["Person"]);
1950 let bob = session.create_node(&["Person"]);
1951
1952 session.begin_tx().unwrap();
1954 let edge_id = session.create_edge(alice, bob, "KNOWS");
1955
1956 assert!(session.edge_exists(edge_id));
1958
1959 session.commit().unwrap();
1961
1962 assert!(session.edge_exists(edge_id));
1964 }
1965
1966 #[test]
1967 fn test_neighbors_empty_node() {
1968 let db = GrafeoDB::new_in_memory();
1969 let session = db.session();
1970
1971 let lonely = session.create_node(&["Person"]);
1972
1973 assert!(session.get_neighbors_outgoing(lonely).is_empty());
1974 assert!(session.get_neighbors_incoming(lonely).is_empty());
1975 assert!(
1976 session
1977 .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1978 .is_empty()
1979 );
1980 }
1981 }
1982}