Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8
9use grafeo_common::types::{EpochId, NodeId, TxId, Value};
10use grafeo_common::utils::error::Result;
11use grafeo_core::graph::lpg::LpgStore;
12#[cfg(feature = "rdf")]
13use grafeo_core::graph::rdf::RdfStore;
14
15use crate::config::AdaptiveConfig;
16use crate::database::QueryResult;
17use crate::transaction::TransactionManager;
18
19/// Your handle to the database - execute queries and manage transactions.
20///
21/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
22/// tracks its own transaction state, so you can have multiple concurrent
23/// sessions without them interfering.
24pub struct Session {
25    /// The underlying store.
26    store: Arc<LpgStore>,
27    /// RDF triple store (if RDF feature is enabled).
28    #[cfg(feature = "rdf")]
29    #[allow(dead_code)]
30    rdf_store: Arc<RdfStore>,
31    /// Transaction manager.
32    tx_manager: Arc<TransactionManager>,
33    /// Current transaction ID (if any).
34    current_tx: Option<TxId>,
35    /// Whether the session is in auto-commit mode.
36    auto_commit: bool,
37    /// Adaptive execution configuration.
38    #[allow(dead_code)]
39    adaptive_config: AdaptiveConfig,
40    /// Whether to use factorized execution for multi-hop queries.
41    factorized_execution: bool,
42}
43
44impl Session {
45    /// Creates a new session.
46    #[allow(dead_code)]
47    pub(crate) fn new(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
48        Self {
49            store,
50            #[cfg(feature = "rdf")]
51            rdf_store: Arc::new(RdfStore::new()),
52            tx_manager,
53            current_tx: None,
54            auto_commit: true,
55            adaptive_config: AdaptiveConfig::default(),
56            factorized_execution: true,
57        }
58    }
59
60    /// Creates a new session with adaptive execution configuration.
61    #[allow(dead_code)]
62    pub(crate) fn with_adaptive(
63        store: Arc<LpgStore>,
64        tx_manager: Arc<TransactionManager>,
65        adaptive_config: AdaptiveConfig,
66        factorized_execution: bool,
67    ) -> Self {
68        Self {
69            store,
70            #[cfg(feature = "rdf")]
71            rdf_store: Arc::new(RdfStore::new()),
72            tx_manager,
73            current_tx: None,
74            auto_commit: true,
75            adaptive_config,
76            factorized_execution,
77        }
78    }
79
80    /// Creates a new session with RDF store and adaptive configuration.
81    #[cfg(feature = "rdf")]
82    pub(crate) fn with_rdf_store_and_adaptive(
83        store: Arc<LpgStore>,
84        rdf_store: Arc<RdfStore>,
85        tx_manager: Arc<TransactionManager>,
86        adaptive_config: AdaptiveConfig,
87        factorized_execution: bool,
88    ) -> Self {
89        Self {
90            store,
91            rdf_store,
92            tx_manager,
93            current_tx: None,
94            auto_commit: true,
95            adaptive_config,
96            factorized_execution,
97        }
98    }
99
100    /// Executes a GQL query.
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if the query fails to parse or execute.
105    ///
106    /// # Examples
107    ///
108    /// ```ignore
109    /// use grafeo_engine::GrafeoDB;
110    ///
111    /// let db = GrafeoDB::new_in_memory();
112    /// let session = db.session();
113    ///
114    /// // Create a node
115    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
116    ///
117    /// // Query nodes
118    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
119    /// for row in result {
120    ///     println!("{:?}", row);
121    /// }
122    /// ```
123    #[cfg(feature = "gql")]
124    pub fn execute(&self, query: &str) -> Result<QueryResult> {
125        use crate::query::{
126            Executor, Planner, binder::Binder, gql_translator, optimizer::Optimizer,
127        };
128
129        // Parse and translate the query to a logical plan
130        let logical_plan = gql_translator::translate(query)?;
131
132        // Semantic validation
133        let mut binder = Binder::new();
134        let _binding_context = binder.bind(&logical_plan)?;
135
136        // Optimize the plan
137        let optimizer = Optimizer::new();
138        let optimized_plan = optimizer.optimize(logical_plan)?;
139
140        // Get transaction context for MVCC visibility
141        let (viewing_epoch, tx_id) = self.get_transaction_context();
142
143        // Convert to physical plan with transaction context
144        let planner = Planner::with_context(
145            Arc::clone(&self.store),
146            Arc::clone(&self.tx_manager),
147            tx_id,
148            viewing_epoch,
149        )
150        .with_factorized_execution(self.factorized_execution);
151        let mut physical_plan = planner.plan(&optimized_plan)?;
152
153        // Execute the plan
154        let executor = Executor::with_columns(physical_plan.columns.clone());
155        executor.execute(physical_plan.operator.as_mut())
156    }
157
158    /// Executes a GQL query with parameters.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the query fails to parse or execute.
163    #[cfg(feature = "gql")]
164    pub fn execute_with_params(
165        &self,
166        query: &str,
167        params: std::collections::HashMap<String, Value>,
168    ) -> Result<QueryResult> {
169        use crate::query::processor::{QueryLanguage, QueryProcessor};
170
171        // Get transaction context for MVCC visibility
172        let (viewing_epoch, tx_id) = self.get_transaction_context();
173
174        // Create processor with transaction context
175        let processor =
176            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
177
178        // Apply transaction context if in a transaction
179        let processor = if let Some(tx_id) = tx_id {
180            processor.with_tx_context(viewing_epoch, tx_id)
181        } else {
182            processor
183        };
184
185        processor.process(query, QueryLanguage::Gql, Some(&params))
186    }
187
188    /// Executes a GQL query with parameters.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if no query language is enabled.
193    #[cfg(not(any(feature = "gql", feature = "cypher")))]
194    pub fn execute_with_params(
195        &self,
196        _query: &str,
197        _params: std::collections::HashMap<String, Value>,
198    ) -> Result<QueryResult> {
199        Err(grafeo_common::utils::error::Error::Internal(
200            "No query language enabled".to_string(),
201        ))
202    }
203
204    /// Executes a GQL query.
205    ///
206    /// # Errors
207    ///
208    /// Returns an error if no query language is enabled.
209    #[cfg(not(any(feature = "gql", feature = "cypher")))]
210    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
211        Err(grafeo_common::utils::error::Error::Internal(
212            "No query language enabled".to_string(),
213        ))
214    }
215
216    /// Executes a Cypher query.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the query fails to parse or execute.
221    #[cfg(feature = "cypher")]
222    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
223        use crate::query::{
224            Executor, Planner, binder::Binder, cypher_translator, optimizer::Optimizer,
225        };
226
227        // Parse and translate the query to a logical plan
228        let logical_plan = cypher_translator::translate(query)?;
229
230        // Semantic validation
231        let mut binder = Binder::new();
232        let _binding_context = binder.bind(&logical_plan)?;
233
234        // Optimize the plan
235        let optimizer = Optimizer::new();
236        let optimized_plan = optimizer.optimize(logical_plan)?;
237
238        // Get transaction context for MVCC visibility
239        let (viewing_epoch, tx_id) = self.get_transaction_context();
240
241        // Convert to physical plan with transaction context
242        let planner = Planner::with_context(
243            Arc::clone(&self.store),
244            Arc::clone(&self.tx_manager),
245            tx_id,
246            viewing_epoch,
247        )
248        .with_factorized_execution(self.factorized_execution);
249        let mut physical_plan = planner.plan(&optimized_plan)?;
250
251        // Execute the plan
252        let executor = Executor::with_columns(physical_plan.columns.clone());
253        executor.execute(physical_plan.operator.as_mut())
254    }
255
256    /// Executes a Gremlin query.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the query fails to parse or execute.
261    ///
262    /// # Examples
263    ///
264    /// ```ignore
265    /// use grafeo_engine::GrafeoDB;
266    ///
267    /// let db = GrafeoDB::new_in_memory();
268    /// let session = db.session();
269    ///
270    /// // Create some nodes first
271    /// session.create_node(&["Person"]);
272    ///
273    /// // Query using Gremlin
274    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
275    /// ```
276    #[cfg(feature = "gremlin")]
277    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
278        use crate::query::{
279            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
280        };
281
282        // Parse and translate the query to a logical plan
283        let logical_plan = gremlin_translator::translate(query)?;
284
285        // Semantic validation
286        let mut binder = Binder::new();
287        let _binding_context = binder.bind(&logical_plan)?;
288
289        // Optimize the plan
290        let optimizer = Optimizer::new();
291        let optimized_plan = optimizer.optimize(logical_plan)?;
292
293        // Get transaction context for MVCC visibility
294        let (viewing_epoch, tx_id) = self.get_transaction_context();
295
296        // Convert to physical plan with transaction context
297        let planner = Planner::with_context(
298            Arc::clone(&self.store),
299            Arc::clone(&self.tx_manager),
300            tx_id,
301            viewing_epoch,
302        )
303        .with_factorized_execution(self.factorized_execution);
304        let mut physical_plan = planner.plan(&optimized_plan)?;
305
306        // Execute the plan
307        let executor = Executor::with_columns(physical_plan.columns.clone());
308        executor.execute(physical_plan.operator.as_mut())
309    }
310
311    /// Executes a Gremlin query with parameters.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the query fails to parse or execute.
316    #[cfg(feature = "gremlin")]
317    pub fn execute_gremlin_with_params(
318        &self,
319        query: &str,
320        params: std::collections::HashMap<String, Value>,
321    ) -> Result<QueryResult> {
322        use crate::query::processor::{QueryLanguage, QueryProcessor};
323
324        // Get transaction context for MVCC visibility
325        let (viewing_epoch, tx_id) = self.get_transaction_context();
326
327        // Create processor with transaction context
328        let processor =
329            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
330
331        // Apply transaction context if in a transaction
332        let processor = if let Some(tx_id) = tx_id {
333            processor.with_tx_context(viewing_epoch, tx_id)
334        } else {
335            processor
336        };
337
338        processor.process(query, QueryLanguage::Gremlin, Some(&params))
339    }
340
341    /// Executes a GraphQL query against the LPG store.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the query fails to parse or execute.
346    ///
347    /// # Examples
348    ///
349    /// ```ignore
350    /// use grafeo_engine::GrafeoDB;
351    ///
352    /// let db = GrafeoDB::new_in_memory();
353    /// let session = db.session();
354    ///
355    /// // Create some nodes first
356    /// session.create_node(&["User"]);
357    ///
358    /// // Query using GraphQL
359    /// let result = session.execute_graphql("query { user { id name } }")?;
360    /// ```
361    #[cfg(feature = "graphql")]
362    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
363        use crate::query::{
364            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
365        };
366
367        // Parse and translate the query to a logical plan
368        let logical_plan = graphql_translator::translate(query)?;
369
370        // Semantic validation
371        let mut binder = Binder::new();
372        let _binding_context = binder.bind(&logical_plan)?;
373
374        // Optimize the plan
375        let optimizer = Optimizer::new();
376        let optimized_plan = optimizer.optimize(logical_plan)?;
377
378        // Get transaction context for MVCC visibility
379        let (viewing_epoch, tx_id) = self.get_transaction_context();
380
381        // Convert to physical plan with transaction context
382        let planner = Planner::with_context(
383            Arc::clone(&self.store),
384            Arc::clone(&self.tx_manager),
385            tx_id,
386            viewing_epoch,
387        )
388        .with_factorized_execution(self.factorized_execution);
389        let mut physical_plan = planner.plan(&optimized_plan)?;
390
391        // Execute the plan
392        let executor = Executor::with_columns(physical_plan.columns.clone());
393        executor.execute(physical_plan.operator.as_mut())
394    }
395
396    /// Executes a GraphQL query with parameters.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the query fails to parse or execute.
401    #[cfg(feature = "graphql")]
402    pub fn execute_graphql_with_params(
403        &self,
404        query: &str,
405        params: std::collections::HashMap<String, Value>,
406    ) -> Result<QueryResult> {
407        use crate::query::processor::{QueryLanguage, QueryProcessor};
408
409        // Get transaction context for MVCC visibility
410        let (viewing_epoch, tx_id) = self.get_transaction_context();
411
412        // Create processor with transaction context
413        let processor =
414            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
415
416        // Apply transaction context if in a transaction
417        let processor = if let Some(tx_id) = tx_id {
418            processor.with_tx_context(viewing_epoch, tx_id)
419        } else {
420            processor
421        };
422
423        processor.process(query, QueryLanguage::GraphQL, Some(&params))
424    }
425
426    /// Executes a SPARQL query.
427    ///
428    /// # Errors
429    ///
430    /// Returns an error if the query fails to parse or execute.
431    #[cfg(all(feature = "sparql", feature = "rdf"))]
432    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
433        use crate::query::{
434            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
435        };
436
437        // Parse and translate the SPARQL query to a logical plan
438        let logical_plan = sparql_translator::translate(query)?;
439
440        // Optimize the plan
441        let optimizer = Optimizer::new();
442        let optimized_plan = optimizer.optimize(logical_plan)?;
443
444        // Convert to physical plan using RDF planner
445        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
446        let mut physical_plan = planner.plan(&optimized_plan)?;
447
448        // Execute the plan
449        let executor = Executor::with_columns(physical_plan.columns.clone());
450        executor.execute(physical_plan.operator.as_mut())
451    }
452
453    /// Executes a SPARQL query with parameters.
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if the query fails to parse or execute.
458    #[cfg(all(feature = "sparql", feature = "rdf"))]
459    pub fn execute_sparql_with_params(
460        &self,
461        query: &str,
462        _params: std::collections::HashMap<String, Value>,
463    ) -> Result<QueryResult> {
464        // TODO: Implement parameter substitution for SPARQL
465        // For now, just execute the query without parameters
466        self.execute_sparql(query)
467    }
468
469    /// Begins a new transaction.
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if a transaction is already active.
474    ///
475    /// # Examples
476    ///
477    /// ```ignore
478    /// use grafeo_engine::GrafeoDB;
479    ///
480    /// let db = GrafeoDB::new_in_memory();
481    /// let mut session = db.session();
482    ///
483    /// session.begin_tx()?;
484    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
485    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
486    /// session.commit()?; // Both inserts committed atomically
487    /// ```
488    pub fn begin_tx(&mut self) -> Result<()> {
489        if self.current_tx.is_some() {
490            return Err(grafeo_common::utils::error::Error::Transaction(
491                grafeo_common::utils::error::TransactionError::InvalidState(
492                    "Transaction already active".to_string(),
493                ),
494            ));
495        }
496
497        let tx_id = self.tx_manager.begin();
498        self.current_tx = Some(tx_id);
499        Ok(())
500    }
501
502    /// Commits the current transaction.
503    ///
504    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
505    ///
506    /// # Errors
507    ///
508    /// Returns an error if no transaction is active.
509    pub fn commit(&mut self) -> Result<()> {
510        let tx_id = self.current_tx.take().ok_or_else(|| {
511            grafeo_common::utils::error::Error::Transaction(
512                grafeo_common::utils::error::TransactionError::InvalidState(
513                    "No active transaction".to_string(),
514                ),
515            )
516        })?;
517
518        // Commit RDF store pending operations
519        #[cfg(feature = "rdf")]
520        self.rdf_store.commit_tx(tx_id);
521
522        self.tx_manager.commit(tx_id).map(|_| ())
523    }
524
525    /// Aborts the current transaction.
526    ///
527    /// Discards all changes since [`begin_tx`](Self::begin_tx).
528    ///
529    /// # Errors
530    ///
531    /// Returns an error if no transaction is active.
532    ///
533    /// # Examples
534    ///
535    /// ```ignore
536    /// use grafeo_engine::GrafeoDB;
537    ///
538    /// let db = GrafeoDB::new_in_memory();
539    /// let mut session = db.session();
540    ///
541    /// session.begin_tx()?;
542    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
543    /// session.rollback()?; // Insert is discarded
544    /// ```
545    pub fn rollback(&mut self) -> Result<()> {
546        let tx_id = self.current_tx.take().ok_or_else(|| {
547            grafeo_common::utils::error::Error::Transaction(
548                grafeo_common::utils::error::TransactionError::InvalidState(
549                    "No active transaction".to_string(),
550                ),
551            )
552        })?;
553
554        // Discard uncommitted versions in the LPG store
555        self.store.discard_uncommitted_versions(tx_id);
556
557        // Discard pending operations in the RDF store
558        #[cfg(feature = "rdf")]
559        self.rdf_store.rollback_tx(tx_id);
560
561        // Mark transaction as aborted in the manager
562        self.tx_manager.abort(tx_id)
563    }
564
565    /// Returns whether a transaction is active.
566    #[must_use]
567    pub fn in_transaction(&self) -> bool {
568        self.current_tx.is_some()
569    }
570
571    /// Sets auto-commit mode.
572    pub fn set_auto_commit(&mut self, auto_commit: bool) {
573        self.auto_commit = auto_commit;
574    }
575
576    /// Returns whether auto-commit is enabled.
577    #[must_use]
578    pub fn auto_commit(&self) -> bool {
579        self.auto_commit
580    }
581
582    /// Returns the current transaction context for MVCC visibility.
583    ///
584    /// Returns `(viewing_epoch, tx_id)` where:
585    /// - `viewing_epoch` is the epoch at which to check version visibility
586    /// - `tx_id` is the current transaction ID (if in a transaction)
587    #[must_use]
588    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
589        if let Some(tx_id) = self.current_tx {
590            // In a transaction - use the transaction's start epoch
591            let epoch = self
592                .tx_manager
593                .start_epoch(tx_id)
594                .unwrap_or_else(|| self.tx_manager.current_epoch());
595            (epoch, Some(tx_id))
596        } else {
597            // No transaction - use current epoch
598            (self.tx_manager.current_epoch(), None)
599        }
600    }
601
602    /// Creates a node directly (bypassing query execution).
603    ///
604    /// This is a low-level API for testing and direct manipulation.
605    /// If a transaction is active, the node will be versioned with the transaction ID.
606    pub fn create_node(&self, labels: &[&str]) -> NodeId {
607        let (epoch, tx_id) = self.get_transaction_context();
608        self.store
609            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
610    }
611
612    /// Creates a node with properties.
613    ///
614    /// If a transaction is active, the node will be versioned with the transaction ID.
615    pub fn create_node_with_props<'a>(
616        &self,
617        labels: &[&str],
618        properties: impl IntoIterator<Item = (&'a str, Value)>,
619    ) -> NodeId {
620        let (epoch, tx_id) = self.get_transaction_context();
621        self.store.create_node_with_props_versioned(
622            labels,
623            properties.into_iter().map(|(k, v)| (k, v)),
624            epoch,
625            tx_id.unwrap_or(TxId::SYSTEM),
626        )
627    }
628
629    /// Creates an edge between two nodes.
630    ///
631    /// This is a low-level API for testing and direct manipulation.
632    /// If a transaction is active, the edge will be versioned with the transaction ID.
633    pub fn create_edge(
634        &self,
635        src: NodeId,
636        dst: NodeId,
637        edge_type: &str,
638    ) -> grafeo_common::types::EdgeId {
639        let (epoch, tx_id) = self.get_transaction_context();
640        self.store
641            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
642    }
643}
644
645#[cfg(test)]
646mod tests {
647    use crate::database::GrafeoDB;
648
649    #[test]
650    fn test_session_create_node() {
651        let db = GrafeoDB::new_in_memory();
652        let session = db.session();
653
654        let id = session.create_node(&["Person"]);
655        assert!(id.is_valid());
656        assert_eq!(db.node_count(), 1);
657    }
658
659    #[test]
660    fn test_session_transaction() {
661        let db = GrafeoDB::new_in_memory();
662        let mut session = db.session();
663
664        assert!(!session.in_transaction());
665
666        session.begin_tx().unwrap();
667        assert!(session.in_transaction());
668
669        session.commit().unwrap();
670        assert!(!session.in_transaction());
671    }
672
673    #[test]
674    fn test_session_transaction_context() {
675        let db = GrafeoDB::new_in_memory();
676        let mut session = db.session();
677
678        // Without transaction - context should have current epoch and no tx_id
679        let (_epoch1, tx_id1) = session.get_transaction_context();
680        assert!(tx_id1.is_none());
681
682        // Start a transaction
683        session.begin_tx().unwrap();
684        let (epoch2, tx_id2) = session.get_transaction_context();
685        assert!(tx_id2.is_some());
686        // Transaction should have a valid epoch
687        let _ = epoch2; // Use the variable
688
689        // Commit and verify
690        session.commit().unwrap();
691        let (epoch3, tx_id3) = session.get_transaction_context();
692        assert!(tx_id3.is_none());
693        // Epoch should have advanced after commit
694        assert!(epoch3.as_u64() >= epoch2.as_u64());
695    }
696
697    #[test]
698    fn test_session_rollback() {
699        let db = GrafeoDB::new_in_memory();
700        let mut session = db.session();
701
702        session.begin_tx().unwrap();
703        session.rollback().unwrap();
704        assert!(!session.in_transaction());
705    }
706
707    #[test]
708    fn test_session_rollback_discards_versions() {
709        use grafeo_common::types::TxId;
710
711        let db = GrafeoDB::new_in_memory();
712
713        // Create a node outside of any transaction (at system level)
714        let node_before = db.store().create_node(&["Person"]);
715        assert!(node_before.is_valid());
716        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
717
718        // Start a transaction
719        let mut session = db.session();
720        session.begin_tx().unwrap();
721        let tx_id = session.current_tx.unwrap();
722
723        // Create a node versioned with the transaction's ID
724        let epoch = db.store().current_epoch();
725        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
726        assert!(node_in_tx.is_valid());
727
728        // Should see 2 nodes at this point
729        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
730
731        // Rollback the transaction
732        session.rollback().unwrap();
733        assert!(!session.in_transaction());
734
735        // The node created in the transaction should be discarded
736        // Only the first node should remain visible
737        let count_after = db.node_count();
738        assert_eq!(
739            count_after, 1,
740            "Rollback should discard uncommitted node, but got {count_after}"
741        );
742
743        // The original node should still be accessible
744        let current_epoch = db.store().current_epoch();
745        assert!(
746            db.store()
747                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
748                .is_some(),
749            "Original node should still exist"
750        );
751
752        // The node created in the transaction should not be accessible
753        assert!(
754            db.store()
755                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
756                .is_none(),
757            "Transaction node should be gone"
758        );
759    }
760
761    #[test]
762    fn test_session_create_node_in_transaction() {
763        // Test that session.create_node() is transaction-aware
764        let db = GrafeoDB::new_in_memory();
765
766        // Create a node outside of any transaction
767        let node_before = db.create_node(&["Person"]);
768        assert!(node_before.is_valid());
769        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
770
771        // Start a transaction and create a node through the session
772        let mut session = db.session();
773        session.begin_tx().unwrap();
774
775        // Create a node through session.create_node() - should be versioned with tx
776        let node_in_tx = session.create_node(&["Person"]);
777        assert!(node_in_tx.is_valid());
778
779        // Should see 2 nodes at this point
780        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
781
782        // Rollback the transaction
783        session.rollback().unwrap();
784
785        // The node created via session.create_node() should be discarded
786        let count_after = db.node_count();
787        assert_eq!(
788            count_after, 1,
789            "Rollback should discard node created via session.create_node(), but got {count_after}"
790        );
791    }
792
793    #[test]
794    fn test_session_create_node_with_props_in_transaction() {
795        use grafeo_common::types::Value;
796
797        // Test that session.create_node_with_props() is transaction-aware
798        let db = GrafeoDB::new_in_memory();
799
800        // Create a node outside of any transaction
801        db.create_node(&["Person"]);
802        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
803
804        // Start a transaction and create a node with properties
805        let mut session = db.session();
806        session.begin_tx().unwrap();
807
808        let node_in_tx =
809            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
810        assert!(node_in_tx.is_valid());
811
812        // Should see 2 nodes
813        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
814
815        // Rollback the transaction
816        session.rollback().unwrap();
817
818        // The node should be discarded
819        let count_after = db.node_count();
820        assert_eq!(
821            count_after, 1,
822            "Rollback should discard node created via session.create_node_with_props()"
823        );
824    }
825
826    #[cfg(feature = "gql")]
827    mod gql_tests {
828        use super::*;
829
830        #[test]
831        fn test_gql_query_execution() {
832            let db = GrafeoDB::new_in_memory();
833            let session = db.session();
834
835            // Create some test data
836            session.create_node(&["Person"]);
837            session.create_node(&["Person"]);
838            session.create_node(&["Animal"]);
839
840            // Execute a GQL query
841            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
842
843            // Should return 2 Person nodes
844            assert_eq!(result.row_count(), 2);
845            assert_eq!(result.column_count(), 1);
846            assert_eq!(result.columns[0], "n");
847        }
848
849        #[test]
850        fn test_gql_empty_result() {
851            let db = GrafeoDB::new_in_memory();
852            let session = db.session();
853
854            // No data in database
855            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
856
857            assert_eq!(result.row_count(), 0);
858        }
859
860        #[test]
861        fn test_gql_parse_error() {
862            let db = GrafeoDB::new_in_memory();
863            let session = db.session();
864
865            // Invalid GQL syntax
866            let result = session.execute("MATCH (n RETURN n");
867
868            assert!(result.is_err());
869        }
870
871        #[test]
872        fn test_gql_relationship_traversal() {
873            let db = GrafeoDB::new_in_memory();
874            let session = db.session();
875
876            // Create a graph: Alice -> Bob, Alice -> Charlie
877            let alice = session.create_node(&["Person"]);
878            let bob = session.create_node(&["Person"]);
879            let charlie = session.create_node(&["Person"]);
880
881            session.create_edge(alice, bob, "KNOWS");
882            session.create_edge(alice, charlie, "KNOWS");
883
884            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
885            let result = session
886                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
887                .unwrap();
888
889            // Should return 2 rows (Alice->Bob, Alice->Charlie)
890            assert_eq!(result.row_count(), 2);
891            assert_eq!(result.column_count(), 2);
892            assert_eq!(result.columns[0], "a");
893            assert_eq!(result.columns[1], "b");
894        }
895
896        #[test]
897        fn test_gql_relationship_with_type_filter() {
898            let db = GrafeoDB::new_in_memory();
899            let session = db.session();
900
901            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
902            let alice = session.create_node(&["Person"]);
903            let bob = session.create_node(&["Person"]);
904            let charlie = session.create_node(&["Person"]);
905
906            session.create_edge(alice, bob, "KNOWS");
907            session.create_edge(alice, charlie, "WORKS_WITH");
908
909            // Query only KNOWS relationships
910            let result = session
911                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
912                .unwrap();
913
914            // Should return only 1 row (Alice->Bob)
915            assert_eq!(result.row_count(), 1);
916        }
917
918        #[test]
919        fn test_gql_semantic_error_undefined_variable() {
920            let db = GrafeoDB::new_in_memory();
921            let session = db.session();
922
923            // Reference undefined variable 'x' in RETURN
924            let result = session.execute("MATCH (n:Person) RETURN x");
925
926            // Should fail with semantic error
927            assert!(result.is_err());
928            let err = match result {
929                Err(e) => e,
930                Ok(_) => panic!("Expected error"),
931            };
932            assert!(
933                err.to_string().contains("Undefined variable"),
934                "Expected undefined variable error, got: {}",
935                err
936            );
937        }
938
939        #[test]
940        fn test_gql_where_clause_property_filter() {
941            use grafeo_common::types::Value;
942
943            let db = GrafeoDB::new_in_memory();
944            let session = db.session();
945
946            // Create people with ages
947            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
948            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
949            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
950
951            // Query with WHERE clause: age > 30
952            let result = session
953                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
954                .unwrap();
955
956            // Should return 2 people (ages 35 and 45)
957            assert_eq!(result.row_count(), 2);
958        }
959
960        #[test]
961        fn test_gql_where_clause_equality() {
962            use grafeo_common::types::Value;
963
964            let db = GrafeoDB::new_in_memory();
965            let session = db.session();
966
967            // Create people with names
968            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
969            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
970            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
971
972            // Query with WHERE clause: name = "Alice"
973            let result = session
974                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
975                .unwrap();
976
977            // Should return 2 people named Alice
978            assert_eq!(result.row_count(), 2);
979        }
980
981        #[test]
982        fn test_gql_return_property_access() {
983            use grafeo_common::types::Value;
984
985            let db = GrafeoDB::new_in_memory();
986            let session = db.session();
987
988            // Create people with names and ages
989            session.create_node_with_props(
990                &["Person"],
991                [
992                    ("name", Value::String("Alice".into())),
993                    ("age", Value::Int64(30)),
994                ],
995            );
996            session.create_node_with_props(
997                &["Person"],
998                [
999                    ("name", Value::String("Bob".into())),
1000                    ("age", Value::Int64(25)),
1001                ],
1002            );
1003
1004            // Query returning properties
1005            let result = session
1006                .execute("MATCH (n:Person) RETURN n.name, n.age")
1007                .unwrap();
1008
1009            // Should return 2 rows with name and age columns
1010            assert_eq!(result.row_count(), 2);
1011            assert_eq!(result.column_count(), 2);
1012            assert_eq!(result.columns[0], "n.name");
1013            assert_eq!(result.columns[1], "n.age");
1014
1015            // Check that we get actual values
1016            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1017            assert!(names.contains(&&Value::String("Alice".into())));
1018            assert!(names.contains(&&Value::String("Bob".into())));
1019        }
1020
1021        #[test]
1022        fn test_gql_return_mixed_expressions() {
1023            use grafeo_common::types::Value;
1024
1025            let db = GrafeoDB::new_in_memory();
1026            let session = db.session();
1027
1028            // Create a person
1029            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1030
1031            // Query returning both node and property
1032            let result = session
1033                .execute("MATCH (n:Person) RETURN n, n.name")
1034                .unwrap();
1035
1036            assert_eq!(result.row_count(), 1);
1037            assert_eq!(result.column_count(), 2);
1038            assert_eq!(result.columns[0], "n");
1039            assert_eq!(result.columns[1], "n.name");
1040
1041            // Second column should be the name
1042            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1043        }
1044    }
1045
1046    #[cfg(feature = "cypher")]
1047    mod cypher_tests {
1048        use super::*;
1049
1050        #[test]
1051        fn test_cypher_query_execution() {
1052            let db = GrafeoDB::new_in_memory();
1053            let session = db.session();
1054
1055            // Create some test data
1056            session.create_node(&["Person"]);
1057            session.create_node(&["Person"]);
1058            session.create_node(&["Animal"]);
1059
1060            // Execute a Cypher query
1061            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1062
1063            // Should return 2 Person nodes
1064            assert_eq!(result.row_count(), 2);
1065            assert_eq!(result.column_count(), 1);
1066            assert_eq!(result.columns[0], "n");
1067        }
1068
1069        #[test]
1070        fn test_cypher_empty_result() {
1071            let db = GrafeoDB::new_in_memory();
1072            let session = db.session();
1073
1074            // No data in database
1075            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1076
1077            assert_eq!(result.row_count(), 0);
1078        }
1079
1080        #[test]
1081        fn test_cypher_parse_error() {
1082            let db = GrafeoDB::new_in_memory();
1083            let session = db.session();
1084
1085            // Invalid Cypher syntax
1086            let result = session.execute_cypher("MATCH (n RETURN n");
1087
1088            assert!(result.is_err());
1089        }
1090    }
1091}