1use std::sync::Arc;
4
5use grafeo_common::types::{EpochId, NodeId, TxId, Value};
6use grafeo_common::utils::error::Result;
7use grafeo_core::graph::lpg::LpgStore;
8
9use crate::database::QueryResult;
10use crate::transaction::TransactionManager;
11
12pub struct Session {
17 store: Arc<LpgStore>,
19 tx_manager: Arc<TransactionManager>,
21 current_tx: Option<TxId>,
23 auto_commit: bool,
25}
26
27impl Session {
28 pub(crate) fn new(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
30 Self {
31 store,
32 tx_manager,
33 current_tx: None,
34 auto_commit: true,
35 }
36 }
37
38 #[cfg(feature = "gql")]
62 pub fn execute(&self, query: &str) -> Result<QueryResult> {
63 use crate::query::{
64 Executor, Planner, binder::Binder, gql_translator, optimizer::Optimizer,
65 };
66
67 let logical_plan = gql_translator::translate(query)?;
69
70 let mut binder = Binder::new();
72 let _binding_context = binder.bind(&logical_plan)?;
73
74 let optimizer = Optimizer::new();
76 let optimized_plan = optimizer.optimize(logical_plan)?;
77
78 let (viewing_epoch, tx_id) = self.get_transaction_context();
80
81 let planner = Planner::with_context(
83 Arc::clone(&self.store),
84 Arc::clone(&self.tx_manager),
85 tx_id,
86 viewing_epoch,
87 );
88 let mut physical_plan = planner.plan(&optimized_plan)?;
89
90 let executor = Executor::with_columns(physical_plan.columns.clone());
92 executor.execute(physical_plan.operator.as_mut())
93 }
94
95 #[cfg(feature = "gql")]
101 pub fn execute_with_params(
102 &self,
103 query: &str,
104 params: std::collections::HashMap<String, Value>,
105 ) -> Result<QueryResult> {
106 use crate::query::processor::{QueryLanguage, QueryProcessor};
107
108 let (viewing_epoch, tx_id) = self.get_transaction_context();
110
111 let processor =
113 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
114
115 let processor = if let Some(tx_id) = tx_id {
117 processor.with_tx_context(viewing_epoch, tx_id)
118 } else {
119 processor
120 };
121
122 processor.process(query, QueryLanguage::Gql, Some(¶ms))
123 }
124
125 #[cfg(not(any(feature = "gql", feature = "cypher")))]
131 pub fn execute_with_params(
132 &self,
133 _query: &str,
134 _params: std::collections::HashMap<String, Value>,
135 ) -> Result<QueryResult> {
136 Err(grafeo_common::utils::error::Error::Internal(
137 "No query language enabled".to_string(),
138 ))
139 }
140
141 #[cfg(not(any(feature = "gql", feature = "cypher")))]
147 pub fn execute(&self, _query: &str) -> Result<QueryResult> {
148 Err(grafeo_common::utils::error::Error::Internal(
149 "No query language enabled".to_string(),
150 ))
151 }
152
153 #[cfg(feature = "cypher")]
159 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
160 use crate::query::{
161 Executor, Planner, binder::Binder, cypher_translator, optimizer::Optimizer,
162 };
163
164 let logical_plan = cypher_translator::translate(query)?;
166
167 let mut binder = Binder::new();
169 let _binding_context = binder.bind(&logical_plan)?;
170
171 let optimizer = Optimizer::new();
173 let optimized_plan = optimizer.optimize(logical_plan)?;
174
175 let (viewing_epoch, tx_id) = self.get_transaction_context();
177
178 let planner = Planner::with_context(
180 Arc::clone(&self.store),
181 Arc::clone(&self.tx_manager),
182 tx_id,
183 viewing_epoch,
184 );
185 let mut physical_plan = planner.plan(&optimized_plan)?;
186
187 let executor = Executor::with_columns(physical_plan.columns.clone());
189 executor.execute(physical_plan.operator.as_mut())
190 }
191
192 #[cfg(feature = "gremlin")]
213 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
214 use crate::query::{
215 Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
216 };
217
218 let logical_plan = gremlin_translator::translate(query)?;
220
221 let mut binder = Binder::new();
223 let _binding_context = binder.bind(&logical_plan)?;
224
225 let optimizer = Optimizer::new();
227 let optimized_plan = optimizer.optimize(logical_plan)?;
228
229 let (viewing_epoch, tx_id) = self.get_transaction_context();
231
232 let planner = Planner::with_context(
234 Arc::clone(&self.store),
235 Arc::clone(&self.tx_manager),
236 tx_id,
237 viewing_epoch,
238 );
239 let mut physical_plan = planner.plan(&optimized_plan)?;
240
241 let executor = Executor::with_columns(physical_plan.columns.clone());
243 executor.execute(physical_plan.operator.as_mut())
244 }
245
246 #[cfg(feature = "gremlin")]
252 pub fn execute_gremlin_with_params(
253 &self,
254 query: &str,
255 params: std::collections::HashMap<String, Value>,
256 ) -> Result<QueryResult> {
257 use crate::query::processor::{QueryLanguage, QueryProcessor};
258
259 let (viewing_epoch, tx_id) = self.get_transaction_context();
261
262 let processor =
264 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
265
266 let processor = if let Some(tx_id) = tx_id {
268 processor.with_tx_context(viewing_epoch, tx_id)
269 } else {
270 processor
271 };
272
273 processor.process(query, QueryLanguage::Gremlin, Some(¶ms))
274 }
275
276 #[cfg(feature = "graphql")]
297 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
298 use crate::query::{
299 Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
300 };
301
302 let logical_plan = graphql_translator::translate(query)?;
304
305 let mut binder = Binder::new();
307 let _binding_context = binder.bind(&logical_plan)?;
308
309 let optimizer = Optimizer::new();
311 let optimized_plan = optimizer.optimize(logical_plan)?;
312
313 let (viewing_epoch, tx_id) = self.get_transaction_context();
315
316 let planner = Planner::with_context(
318 Arc::clone(&self.store),
319 Arc::clone(&self.tx_manager),
320 tx_id,
321 viewing_epoch,
322 );
323 let mut physical_plan = planner.plan(&optimized_plan)?;
324
325 let executor = Executor::with_columns(physical_plan.columns.clone());
327 executor.execute(physical_plan.operator.as_mut())
328 }
329
330 #[cfg(feature = "graphql")]
336 pub fn execute_graphql_with_params(
337 &self,
338 query: &str,
339 params: std::collections::HashMap<String, Value>,
340 ) -> Result<QueryResult> {
341 use crate::query::processor::{QueryLanguage, QueryProcessor};
342
343 let (viewing_epoch, tx_id) = self.get_transaction_context();
345
346 let processor =
348 QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
349
350 let processor = if let Some(tx_id) = tx_id {
352 processor.with_tx_context(viewing_epoch, tx_id)
353 } else {
354 processor
355 };
356
357 processor.process(query, QueryLanguage::GraphQL, Some(¶ms))
358 }
359
360 pub fn begin_tx(&mut self) -> Result<()> {
380 if self.current_tx.is_some() {
381 return Err(grafeo_common::utils::error::Error::Transaction(
382 grafeo_common::utils::error::TransactionError::InvalidState(
383 "Transaction already active".to_string(),
384 ),
385 ));
386 }
387
388 let tx_id = self.tx_manager.begin();
389 self.current_tx = Some(tx_id);
390 Ok(())
391 }
392
393 pub fn commit(&mut self) -> Result<()> {
401 let tx_id = self.current_tx.take().ok_or_else(|| {
402 grafeo_common::utils::error::Error::Transaction(
403 grafeo_common::utils::error::TransactionError::InvalidState(
404 "No active transaction".to_string(),
405 ),
406 )
407 })?;
408
409 self.tx_manager.commit(tx_id).map(|_| ())
410 }
411
412 pub fn rollback(&mut self) -> Result<()> {
433 let tx_id = self.current_tx.take().ok_or_else(|| {
434 grafeo_common::utils::error::Error::Transaction(
435 grafeo_common::utils::error::TransactionError::InvalidState(
436 "No active transaction".to_string(),
437 ),
438 )
439 })?;
440
441 self.store.discard_uncommitted_versions(tx_id);
443
444 self.tx_manager.abort(tx_id)
446 }
447
448 #[must_use]
450 pub fn in_transaction(&self) -> bool {
451 self.current_tx.is_some()
452 }
453
454 pub fn set_auto_commit(&mut self, auto_commit: bool) {
456 self.auto_commit = auto_commit;
457 }
458
459 #[must_use]
461 pub fn auto_commit(&self) -> bool {
462 self.auto_commit
463 }
464
465 #[must_use]
471 fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
472 if let Some(tx_id) = self.current_tx {
473 let epoch = self
475 .tx_manager
476 .start_epoch(tx_id)
477 .unwrap_or_else(|| self.tx_manager.current_epoch());
478 (epoch, Some(tx_id))
479 } else {
480 (self.tx_manager.current_epoch(), None)
482 }
483 }
484
485 pub fn create_node(&self, labels: &[&str]) -> NodeId {
490 let (epoch, tx_id) = self.get_transaction_context();
491 self.store
492 .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
493 }
494
495 pub fn create_node_with_props<'a>(
499 &self,
500 labels: &[&str],
501 properties: impl IntoIterator<Item = (&'a str, Value)>,
502 ) -> NodeId {
503 let (epoch, tx_id) = self.get_transaction_context();
504 self.store.create_node_with_props_versioned(
505 labels,
506 properties.into_iter().map(|(k, v)| (k, v)),
507 epoch,
508 tx_id.unwrap_or(TxId::SYSTEM),
509 )
510 }
511
512 pub fn create_edge(
517 &self,
518 src: NodeId,
519 dst: NodeId,
520 edge_type: &str,
521 ) -> grafeo_common::types::EdgeId {
522 let (epoch, tx_id) = self.get_transaction_context();
523 self.store
524 .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use crate::database::GrafeoDB;
531
532 #[test]
533 fn test_session_create_node() {
534 let db = GrafeoDB::new_in_memory();
535 let session = db.session();
536
537 let id = session.create_node(&["Person"]);
538 assert!(id.is_valid());
539 assert_eq!(db.node_count(), 1);
540 }
541
542 #[test]
543 fn test_session_transaction() {
544 let db = GrafeoDB::new_in_memory();
545 let mut session = db.session();
546
547 assert!(!session.in_transaction());
548
549 session.begin_tx().unwrap();
550 assert!(session.in_transaction());
551
552 session.commit().unwrap();
553 assert!(!session.in_transaction());
554 }
555
556 #[test]
557 fn test_session_transaction_context() {
558 let db = GrafeoDB::new_in_memory();
559 let mut session = db.session();
560
561 let (_epoch1, tx_id1) = session.get_transaction_context();
563 assert!(tx_id1.is_none());
564
565 session.begin_tx().unwrap();
567 let (epoch2, tx_id2) = session.get_transaction_context();
568 assert!(tx_id2.is_some());
569 let _ = epoch2; session.commit().unwrap();
574 let (epoch3, tx_id3) = session.get_transaction_context();
575 assert!(tx_id3.is_none());
576 assert!(epoch3.as_u64() >= epoch2.as_u64());
578 }
579
580 #[test]
581 fn test_session_rollback() {
582 let db = GrafeoDB::new_in_memory();
583 let mut session = db.session();
584
585 session.begin_tx().unwrap();
586 session.rollback().unwrap();
587 assert!(!session.in_transaction());
588 }
589
590 #[test]
591 fn test_session_rollback_discards_versions() {
592 use grafeo_common::types::TxId;
593
594 let db = GrafeoDB::new_in_memory();
595
596 let node_before = db.store().create_node(&["Person"]);
598 assert!(node_before.is_valid());
599 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
600
601 let mut session = db.session();
603 session.begin_tx().unwrap();
604 let tx_id = session.current_tx.unwrap();
605
606 let epoch = db.store().current_epoch();
608 let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
609 assert!(node_in_tx.is_valid());
610
611 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
613
614 session.rollback().unwrap();
616 assert!(!session.in_transaction());
617
618 let count_after = db.node_count();
621 assert_eq!(
622 count_after, 1,
623 "Rollback should discard uncommitted node, but got {count_after}"
624 );
625
626 let current_epoch = db.store().current_epoch();
628 assert!(
629 db.store()
630 .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
631 .is_some(),
632 "Original node should still exist"
633 );
634
635 assert!(
637 db.store()
638 .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
639 .is_none(),
640 "Transaction node should be gone"
641 );
642 }
643
644 #[test]
645 fn test_session_create_node_in_transaction() {
646 let db = GrafeoDB::new_in_memory();
648
649 let node_before = db.create_node(&["Person"]);
651 assert!(node_before.is_valid());
652 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
653
654 let mut session = db.session();
656 session.begin_tx().unwrap();
657
658 let node_in_tx = session.create_node(&["Person"]);
660 assert!(node_in_tx.is_valid());
661
662 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
664
665 session.rollback().unwrap();
667
668 let count_after = db.node_count();
670 assert_eq!(
671 count_after, 1,
672 "Rollback should discard node created via session.create_node(), but got {count_after}"
673 );
674 }
675
676 #[test]
677 fn test_session_create_node_with_props_in_transaction() {
678 use grafeo_common::types::Value;
679
680 let db = GrafeoDB::new_in_memory();
682
683 db.create_node(&["Person"]);
685 assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
686
687 let mut session = db.session();
689 session.begin_tx().unwrap();
690
691 let node_in_tx =
692 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
693 assert!(node_in_tx.is_valid());
694
695 assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
697
698 session.rollback().unwrap();
700
701 let count_after = db.node_count();
703 assert_eq!(
704 count_after, 1,
705 "Rollback should discard node created via session.create_node_with_props()"
706 );
707 }
708
709 #[cfg(feature = "gql")]
710 mod gql_tests {
711 use super::*;
712
713 #[test]
714 fn test_gql_query_execution() {
715 let db = GrafeoDB::new_in_memory();
716 let session = db.session();
717
718 session.create_node(&["Person"]);
720 session.create_node(&["Person"]);
721 session.create_node(&["Animal"]);
722
723 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
725
726 assert_eq!(result.row_count(), 2);
728 assert_eq!(result.column_count(), 1);
729 assert_eq!(result.columns[0], "n");
730 }
731
732 #[test]
733 fn test_gql_empty_result() {
734 let db = GrafeoDB::new_in_memory();
735 let session = db.session();
736
737 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
739
740 assert_eq!(result.row_count(), 0);
741 }
742
743 #[test]
744 fn test_gql_parse_error() {
745 let db = GrafeoDB::new_in_memory();
746 let session = db.session();
747
748 let result = session.execute("MATCH (n RETURN n");
750
751 assert!(result.is_err());
752 }
753
754 #[test]
755 fn test_gql_relationship_traversal() {
756 let db = GrafeoDB::new_in_memory();
757 let session = db.session();
758
759 let alice = session.create_node(&["Person"]);
761 let bob = session.create_node(&["Person"]);
762 let charlie = session.create_node(&["Person"]);
763
764 session.create_edge(alice, bob, "KNOWS");
765 session.create_edge(alice, charlie, "KNOWS");
766
767 let result = session
769 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
770 .unwrap();
771
772 assert_eq!(result.row_count(), 2);
774 assert_eq!(result.column_count(), 2);
775 assert_eq!(result.columns[0], "a");
776 assert_eq!(result.columns[1], "b");
777 }
778
779 #[test]
780 fn test_gql_relationship_with_type_filter() {
781 let db = GrafeoDB::new_in_memory();
782 let session = db.session();
783
784 let alice = session.create_node(&["Person"]);
786 let bob = session.create_node(&["Person"]);
787 let charlie = session.create_node(&["Person"]);
788
789 session.create_edge(alice, bob, "KNOWS");
790 session.create_edge(alice, charlie, "WORKS_WITH");
791
792 let result = session
794 .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
795 .unwrap();
796
797 assert_eq!(result.row_count(), 1);
799 }
800
801 #[test]
802 fn test_gql_semantic_error_undefined_variable() {
803 let db = GrafeoDB::new_in_memory();
804 let session = db.session();
805
806 let result = session.execute("MATCH (n:Person) RETURN x");
808
809 assert!(result.is_err());
811 let err = match result {
812 Err(e) => e,
813 Ok(_) => panic!("Expected error"),
814 };
815 assert!(
816 err.to_string().contains("Undefined variable"),
817 "Expected undefined variable error, got: {}",
818 err
819 );
820 }
821
822 #[test]
823 fn test_gql_where_clause_property_filter() {
824 use grafeo_common::types::Value;
825
826 let db = GrafeoDB::new_in_memory();
827 let session = db.session();
828
829 session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
831 session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
832 session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
833
834 let result = session
836 .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
837 .unwrap();
838
839 assert_eq!(result.row_count(), 2);
841 }
842
843 #[test]
844 fn test_gql_where_clause_equality() {
845 use grafeo_common::types::Value;
846
847 let db = GrafeoDB::new_in_memory();
848 let session = db.session();
849
850 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
852 session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
853 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
854
855 let result = session
857 .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
858 .unwrap();
859
860 assert_eq!(result.row_count(), 2);
862 }
863
864 #[test]
865 fn test_gql_return_property_access() {
866 use grafeo_common::types::Value;
867
868 let db = GrafeoDB::new_in_memory();
869 let session = db.session();
870
871 session.create_node_with_props(
873 &["Person"],
874 [
875 ("name", Value::String("Alice".into())),
876 ("age", Value::Int64(30)),
877 ],
878 );
879 session.create_node_with_props(
880 &["Person"],
881 [
882 ("name", Value::String("Bob".into())),
883 ("age", Value::Int64(25)),
884 ],
885 );
886
887 let result = session
889 .execute("MATCH (n:Person) RETURN n.name, n.age")
890 .unwrap();
891
892 assert_eq!(result.row_count(), 2);
894 assert_eq!(result.column_count(), 2);
895 assert_eq!(result.columns[0], "n.name");
896 assert_eq!(result.columns[1], "n.age");
897
898 let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
900 assert!(names.contains(&&Value::String("Alice".into())));
901 assert!(names.contains(&&Value::String("Bob".into())));
902 }
903
904 #[test]
905 fn test_gql_return_mixed_expressions() {
906 use grafeo_common::types::Value;
907
908 let db = GrafeoDB::new_in_memory();
909 let session = db.session();
910
911 session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
913
914 let result = session
916 .execute("MATCH (n:Person) RETURN n, n.name")
917 .unwrap();
918
919 assert_eq!(result.row_count(), 1);
920 assert_eq!(result.column_count(), 2);
921 assert_eq!(result.columns[0], "n");
922 assert_eq!(result.columns[1], "n.name");
923
924 assert_eq!(result.rows[0][1], Value::String("Alice".into()));
926 }
927 }
928
929 #[cfg(feature = "cypher")]
930 mod cypher_tests {
931 use super::*;
932
933 #[test]
934 fn test_cypher_query_execution() {
935 let db = GrafeoDB::new_in_memory();
936 let session = db.session();
937
938 session.create_node(&["Person"]);
940 session.create_node(&["Person"]);
941 session.create_node(&["Animal"]);
942
943 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
945
946 assert_eq!(result.row_count(), 2);
948 assert_eq!(result.column_count(), 1);
949 assert_eq!(result.columns[0], "n");
950 }
951
952 #[test]
953 fn test_cypher_empty_result() {
954 let db = GrafeoDB::new_in_memory();
955 let session = db.session();
956
957 let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
959
960 assert_eq!(result.row_count(), 0);
961 }
962
963 #[test]
964 fn test_cypher_parse_error() {
965 let db = GrafeoDB::new_in_memory();
966 let session = db.session();
967
968 let result = session.execute_cypher("MATCH (n RETURN n");
970
971 assert!(result.is_err());
972 }
973 }
974}