Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8
9use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
10use grafeo_common::utils::error::Result;
11use grafeo_core::graph::Direction;
12use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
13#[cfg(feature = "rdf")]
14use grafeo_core::graph::rdf::RdfStore;
15
16use crate::config::{AdaptiveConfig, GraphModel};
17use crate::database::QueryResult;
18use crate::query::cache::QueryCache;
19use crate::transaction::TransactionManager;
20
21/// Your handle to the database - execute queries and manage transactions.
22///
23/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
24/// tracks its own transaction state, so you can have multiple concurrent
25/// sessions without them interfering.
26pub struct Session {
27    /// The underlying store.
28    store: Arc<LpgStore>,
29    /// RDF triple store (if RDF feature is enabled).
30    #[cfg(feature = "rdf")]
31    #[allow(dead_code)]
32    rdf_store: Arc<RdfStore>,
33    /// Transaction manager.
34    tx_manager: Arc<TransactionManager>,
35    /// Query cache shared across sessions.
36    query_cache: Arc<QueryCache>,
37    /// Current transaction ID (if any).
38    current_tx: Option<TxId>,
39    /// Whether the session is in auto-commit mode.
40    auto_commit: bool,
41    /// Adaptive execution configuration.
42    #[allow(dead_code)]
43    adaptive_config: AdaptiveConfig,
44    /// Whether to use factorized execution for multi-hop queries.
45    factorized_execution: bool,
46    /// The graph data model this session operates on.
47    graph_model: GraphModel,
48}
49
50impl Session {
51    /// Creates a new session.
52    #[allow(dead_code)]
53    pub(crate) fn new(
54        store: Arc<LpgStore>,
55        tx_manager: Arc<TransactionManager>,
56        query_cache: Arc<QueryCache>,
57    ) -> Self {
58        Self {
59            store,
60            #[cfg(feature = "rdf")]
61            rdf_store: Arc::new(RdfStore::new()),
62            tx_manager,
63            query_cache,
64            current_tx: None,
65            auto_commit: true,
66            adaptive_config: AdaptiveConfig::default(),
67            factorized_execution: true,
68            graph_model: GraphModel::Lpg,
69        }
70    }
71
72    /// Creates a new session with adaptive execution configuration.
73    #[allow(dead_code)]
74    pub(crate) fn with_adaptive(
75        store: Arc<LpgStore>,
76        tx_manager: Arc<TransactionManager>,
77        query_cache: Arc<QueryCache>,
78        adaptive_config: AdaptiveConfig,
79        factorized_execution: bool,
80        graph_model: GraphModel,
81    ) -> Self {
82        Self {
83            store,
84            #[cfg(feature = "rdf")]
85            rdf_store: Arc::new(RdfStore::new()),
86            tx_manager,
87            query_cache,
88            current_tx: None,
89            auto_commit: true,
90            adaptive_config,
91            factorized_execution,
92            graph_model,
93        }
94    }
95
96    /// Creates a new session with RDF store and adaptive configuration.
97    #[cfg(feature = "rdf")]
98    pub(crate) fn with_rdf_store_and_adaptive(
99        store: Arc<LpgStore>,
100        rdf_store: Arc<RdfStore>,
101        tx_manager: Arc<TransactionManager>,
102        query_cache: Arc<QueryCache>,
103        adaptive_config: AdaptiveConfig,
104        factorized_execution: bool,
105        graph_model: GraphModel,
106    ) -> Self {
107        Self {
108            store,
109            rdf_store,
110            tx_manager,
111            query_cache,
112            current_tx: None,
113            auto_commit: true,
114            adaptive_config,
115            factorized_execution,
116            graph_model,
117        }
118    }
119
120    /// Returns the graph model this session operates on.
121    #[must_use]
122    pub fn graph_model(&self) -> GraphModel {
123        self.graph_model
124    }
125
126    /// Checks that the session's graph model supports LPG operations.
127    fn require_lpg(&self, language: &str) -> Result<()> {
128        if self.graph_model == GraphModel::Rdf {
129            return Err(grafeo_common::utils::error::Error::Internal(format!(
130                "This is an RDF database. {language} queries require an LPG database."
131            )));
132        }
133        Ok(())
134    }
135
136    /// Executes a GQL query.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the query fails to parse or execute.
141    ///
142    /// # Examples
143    ///
144    /// ```ignore
145    /// use grafeo_engine::GrafeoDB;
146    ///
147    /// let db = GrafeoDB::new_in_memory();
148    /// let session = db.session();
149    ///
150    /// // Create a node
151    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
152    ///
153    /// // Query nodes
154    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
155    /// for row in result {
156    ///     println!("{:?}", row);
157    /// }
158    /// ```
159    #[cfg(feature = "gql")]
160    pub fn execute(&self, query: &str) -> Result<QueryResult> {
161        self.require_lpg("GQL")?;
162
163        use crate::query::{
164            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
165            optimizer::Optimizer, processor::QueryLanguage,
166        };
167
168        let start_time = std::time::Instant::now();
169
170        // Create cache key for this query
171        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
172
173        // Try to get cached optimized plan
174        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
175            // Cache hit - skip parsing, translation, binding, and optimization
176            cached_plan
177        } else {
178            // Cache miss - run full pipeline
179
180            // Parse and translate the query to a logical plan
181            let logical_plan = gql_translator::translate(query)?;
182
183            // Semantic validation
184            let mut binder = Binder::new();
185            let _binding_context = binder.bind(&logical_plan)?;
186
187            // Optimize the plan
188            let optimizer = Optimizer::from_store(&self.store);
189            let plan = optimizer.optimize(logical_plan)?;
190
191            // Cache the optimized plan for future use
192            self.query_cache.put_optimized(cache_key, plan.clone());
193
194            plan
195        };
196
197        // Get transaction context for MVCC visibility
198        let (viewing_epoch, tx_id) = self.get_transaction_context();
199
200        // Convert to physical plan with transaction context
201        // (Physical planning cannot be cached as it depends on transaction state)
202        let planner = Planner::with_context(
203            Arc::clone(&self.store),
204            Arc::clone(&self.tx_manager),
205            tx_id,
206            viewing_epoch,
207        )
208        .with_factorized_execution(self.factorized_execution);
209        let mut physical_plan = planner.plan(&optimized_plan)?;
210
211        // Execute the plan
212        let executor = Executor::with_columns(physical_plan.columns.clone());
213        let mut result = executor.execute(physical_plan.operator.as_mut())?;
214
215        // Add execution metrics
216        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
217        let rows_scanned = result.rows.len() as u64;
218        result.execution_time_ms = Some(elapsed_ms);
219        result.rows_scanned = Some(rows_scanned);
220
221        Ok(result)
222    }
223
224    /// Executes a GQL query with parameters.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if the query fails to parse or execute.
229    #[cfg(feature = "gql")]
230    pub fn execute_with_params(
231        &self,
232        query: &str,
233        params: std::collections::HashMap<String, Value>,
234    ) -> Result<QueryResult> {
235        self.require_lpg("GQL")?;
236
237        use crate::query::processor::{QueryLanguage, QueryProcessor};
238
239        // Get transaction context for MVCC visibility
240        let (viewing_epoch, tx_id) = self.get_transaction_context();
241
242        // Create processor with transaction context
243        let processor =
244            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
245
246        // Apply transaction context if in a transaction
247        let processor = if let Some(tx_id) = tx_id {
248            processor.with_tx_context(viewing_epoch, tx_id)
249        } else {
250            processor
251        };
252
253        processor.process(query, QueryLanguage::Gql, Some(&params))
254    }
255
256    /// Executes a GQL query with parameters.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if no query language is enabled.
261    #[cfg(not(any(feature = "gql", feature = "cypher")))]
262    pub fn execute_with_params(
263        &self,
264        _query: &str,
265        _params: std::collections::HashMap<String, Value>,
266    ) -> Result<QueryResult> {
267        Err(grafeo_common::utils::error::Error::Internal(
268            "No query language enabled".to_string(),
269        ))
270    }
271
272    /// Executes a GQL query.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if no query language is enabled.
277    #[cfg(not(any(feature = "gql", feature = "cypher")))]
278    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
279        Err(grafeo_common::utils::error::Error::Internal(
280            "No query language enabled".to_string(),
281        ))
282    }
283
284    /// Executes a Cypher query.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if the query fails to parse or execute.
289    #[cfg(feature = "cypher")]
290    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
291        use crate::query::{
292            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
293            optimizer::Optimizer, processor::QueryLanguage,
294        };
295
296        // Create cache key for this query
297        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
298
299        // Try to get cached optimized plan
300        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
301            cached_plan
302        } else {
303            // Parse and translate the query to a logical plan
304            let logical_plan = cypher_translator::translate(query)?;
305
306            // Semantic validation
307            let mut binder = Binder::new();
308            let _binding_context = binder.bind(&logical_plan)?;
309
310            // Optimize the plan
311            let optimizer = Optimizer::from_store(&self.store);
312            let plan = optimizer.optimize(logical_plan)?;
313
314            // Cache the optimized plan
315            self.query_cache.put_optimized(cache_key, plan.clone());
316
317            plan
318        };
319
320        // Get transaction context for MVCC visibility
321        let (viewing_epoch, tx_id) = self.get_transaction_context();
322
323        // Convert to physical plan with transaction context
324        let planner = Planner::with_context(
325            Arc::clone(&self.store),
326            Arc::clone(&self.tx_manager),
327            tx_id,
328            viewing_epoch,
329        )
330        .with_factorized_execution(self.factorized_execution);
331        let mut physical_plan = planner.plan(&optimized_plan)?;
332
333        // Execute the plan
334        let executor = Executor::with_columns(physical_plan.columns.clone());
335        executor.execute(physical_plan.operator.as_mut())
336    }
337
338    /// Executes a Gremlin query.
339    ///
340    /// # Errors
341    ///
342    /// Returns an error if the query fails to parse or execute.
343    ///
344    /// # Examples
345    ///
346    /// ```ignore
347    /// use grafeo_engine::GrafeoDB;
348    ///
349    /// let db = GrafeoDB::new_in_memory();
350    /// let session = db.session();
351    ///
352    /// // Create some nodes first
353    /// session.create_node(&["Person"]);
354    ///
355    /// // Query using Gremlin
356    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
357    /// ```
358    #[cfg(feature = "gremlin")]
359    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
360        use crate::query::{
361            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
362        };
363
364        // Parse and translate the query to a logical plan
365        let logical_plan = gremlin_translator::translate(query)?;
366
367        // Semantic validation
368        let mut binder = Binder::new();
369        let _binding_context = binder.bind(&logical_plan)?;
370
371        // Optimize the plan
372        let optimizer = Optimizer::from_store(&self.store);
373        let optimized_plan = optimizer.optimize(logical_plan)?;
374
375        // Get transaction context for MVCC visibility
376        let (viewing_epoch, tx_id) = self.get_transaction_context();
377
378        // Convert to physical plan with transaction context
379        let planner = Planner::with_context(
380            Arc::clone(&self.store),
381            Arc::clone(&self.tx_manager),
382            tx_id,
383            viewing_epoch,
384        )
385        .with_factorized_execution(self.factorized_execution);
386        let mut physical_plan = planner.plan(&optimized_plan)?;
387
388        // Execute the plan
389        let executor = Executor::with_columns(physical_plan.columns.clone());
390        executor.execute(physical_plan.operator.as_mut())
391    }
392
393    /// Executes a Gremlin query with parameters.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if the query fails to parse or execute.
398    #[cfg(feature = "gremlin")]
399    pub fn execute_gremlin_with_params(
400        &self,
401        query: &str,
402        params: std::collections::HashMap<String, Value>,
403    ) -> Result<QueryResult> {
404        use crate::query::processor::{QueryLanguage, QueryProcessor};
405
406        // Get transaction context for MVCC visibility
407        let (viewing_epoch, tx_id) = self.get_transaction_context();
408
409        // Create processor with transaction context
410        let processor =
411            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
412
413        // Apply transaction context if in a transaction
414        let processor = if let Some(tx_id) = tx_id {
415            processor.with_tx_context(viewing_epoch, tx_id)
416        } else {
417            processor
418        };
419
420        processor.process(query, QueryLanguage::Gremlin, Some(&params))
421    }
422
423    /// Executes a GraphQL query against the LPG store.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if the query fails to parse or execute.
428    ///
429    /// # Examples
430    ///
431    /// ```ignore
432    /// use grafeo_engine::GrafeoDB;
433    ///
434    /// let db = GrafeoDB::new_in_memory();
435    /// let session = db.session();
436    ///
437    /// // Create some nodes first
438    /// session.create_node(&["User"]);
439    ///
440    /// // Query using GraphQL
441    /// let result = session.execute_graphql("query { user { id name } }")?;
442    /// ```
443    #[cfg(feature = "graphql")]
444    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
445        use crate::query::{
446            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
447        };
448
449        // Parse and translate the query to a logical plan
450        let logical_plan = graphql_translator::translate(query)?;
451
452        // Semantic validation
453        let mut binder = Binder::new();
454        let _binding_context = binder.bind(&logical_plan)?;
455
456        // Optimize the plan
457        let optimizer = Optimizer::from_store(&self.store);
458        let optimized_plan = optimizer.optimize(logical_plan)?;
459
460        // Get transaction context for MVCC visibility
461        let (viewing_epoch, tx_id) = self.get_transaction_context();
462
463        // Convert to physical plan with transaction context
464        let planner = Planner::with_context(
465            Arc::clone(&self.store),
466            Arc::clone(&self.tx_manager),
467            tx_id,
468            viewing_epoch,
469        )
470        .with_factorized_execution(self.factorized_execution);
471        let mut physical_plan = planner.plan(&optimized_plan)?;
472
473        // Execute the plan
474        let executor = Executor::with_columns(physical_plan.columns.clone());
475        executor.execute(physical_plan.operator.as_mut())
476    }
477
478    /// Executes a GraphQL query with parameters.
479    ///
480    /// # Errors
481    ///
482    /// Returns an error if the query fails to parse or execute.
483    #[cfg(feature = "graphql")]
484    pub fn execute_graphql_with_params(
485        &self,
486        query: &str,
487        params: std::collections::HashMap<String, Value>,
488    ) -> Result<QueryResult> {
489        use crate::query::processor::{QueryLanguage, QueryProcessor};
490
491        // Get transaction context for MVCC visibility
492        let (viewing_epoch, tx_id) = self.get_transaction_context();
493
494        // Create processor with transaction context
495        let processor =
496            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
497
498        // Apply transaction context if in a transaction
499        let processor = if let Some(tx_id) = tx_id {
500            processor.with_tx_context(viewing_epoch, tx_id)
501        } else {
502            processor
503        };
504
505        processor.process(query, QueryLanguage::GraphQL, Some(&params))
506    }
507
508    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
509    ///
510    /// # Errors
511    ///
512    /// Returns an error if the query fails to parse or execute.
513    ///
514    /// # Examples
515    ///
516    /// ```ignore
517    /// use grafeo_engine::GrafeoDB;
518    ///
519    /// let db = GrafeoDB::new_in_memory();
520    /// let session = db.session();
521    ///
522    /// let result = session.execute_sql(
523    ///     "SELECT * FROM GRAPH_TABLE (
524    ///         MATCH (n:Person)
525    ///         COLUMNS (n.name AS name)
526    ///     )"
527    /// )?;
528    /// ```
529    #[cfg(feature = "sql-pgq")]
530    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
531        use crate::query::{
532            Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
533            plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
534        };
535
536        // Parse and translate (always needed to check for DDL)
537        let logical_plan = sql_pgq_translator::translate(query)?;
538
539        // Handle DDL statements directly (they don't go through the query pipeline)
540        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
541            return Ok(QueryResult {
542                columns: vec!["status".into()],
543                column_types: vec![grafeo_common::types::LogicalType::String],
544                rows: vec![vec![Value::from(format!(
545                    "Property graph '{}' created ({} node tables, {} edge tables)",
546                    cpg.name,
547                    cpg.node_tables.len(),
548                    cpg.edge_tables.len()
549                ))]],
550                execution_time_ms: None,
551                rows_scanned: None,
552            });
553        }
554
555        // Create cache key for query plans
556        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
557
558        // Try to get cached optimized plan
559        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
560            cached_plan
561        } else {
562            // Semantic validation
563            let mut binder = Binder::new();
564            let _binding_context = binder.bind(&logical_plan)?;
565
566            // Optimize the plan
567            let optimizer = Optimizer::from_store(&self.store);
568            let plan = optimizer.optimize(logical_plan)?;
569
570            // Cache the optimized plan
571            self.query_cache.put_optimized(cache_key, plan.clone());
572
573            plan
574        };
575
576        // Get transaction context for MVCC visibility
577        let (viewing_epoch, tx_id) = self.get_transaction_context();
578
579        // Convert to physical plan with transaction context
580        let planner = Planner::with_context(
581            Arc::clone(&self.store),
582            Arc::clone(&self.tx_manager),
583            tx_id,
584            viewing_epoch,
585        )
586        .with_factorized_execution(self.factorized_execution);
587        let mut physical_plan = planner.plan(&optimized_plan)?;
588
589        // Execute the plan
590        let executor = Executor::with_columns(physical_plan.columns.clone());
591        executor.execute(physical_plan.operator.as_mut())
592    }
593
594    /// Executes a SQL/PGQ query with parameters.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if the query fails to parse or execute.
599    #[cfg(feature = "sql-pgq")]
600    pub fn execute_sql_with_params(
601        &self,
602        query: &str,
603        params: std::collections::HashMap<String, Value>,
604    ) -> Result<QueryResult> {
605        use crate::query::processor::{QueryLanguage, QueryProcessor};
606
607        // Get transaction context for MVCC visibility
608        let (viewing_epoch, tx_id) = self.get_transaction_context();
609
610        // Create processor with transaction context
611        let processor =
612            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
613
614        // Apply transaction context if in a transaction
615        let processor = if let Some(tx_id) = tx_id {
616            processor.with_tx_context(viewing_epoch, tx_id)
617        } else {
618            processor
619        };
620
621        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
622    }
623
624    /// Executes a SPARQL query.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the query fails to parse or execute.
629    #[cfg(all(feature = "sparql", feature = "rdf"))]
630    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
631        use crate::query::{
632            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
633        };
634
635        // Parse and translate the SPARQL query to a logical plan
636        let logical_plan = sparql_translator::translate(query)?;
637
638        // Optimize the plan
639        let optimizer = Optimizer::from_store(&self.store);
640        let optimized_plan = optimizer.optimize(logical_plan)?;
641
642        // Convert to physical plan using RDF planner
643        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
644        let mut physical_plan = planner.plan(&optimized_plan)?;
645
646        // Execute the plan
647        let executor = Executor::with_columns(physical_plan.columns.clone());
648        executor.execute(physical_plan.operator.as_mut())
649    }
650
651    /// Executes a SPARQL query with parameters.
652    ///
653    /// # Errors
654    ///
655    /// Returns an error if the query fails to parse or execute.
656    #[cfg(all(feature = "sparql", feature = "rdf"))]
657    pub fn execute_sparql_with_params(
658        &self,
659        query: &str,
660        _params: std::collections::HashMap<String, Value>,
661    ) -> Result<QueryResult> {
662        // TODO: Implement parameter substitution for SPARQL
663        // For now, just execute the query without parameters
664        self.execute_sparql(query)
665    }
666
667    /// Begins a new transaction.
668    ///
669    /// # Errors
670    ///
671    /// Returns an error if a transaction is already active.
672    ///
673    /// # Examples
674    ///
675    /// ```ignore
676    /// use grafeo_engine::GrafeoDB;
677    ///
678    /// let db = GrafeoDB::new_in_memory();
679    /// let mut session = db.session();
680    ///
681    /// session.begin_tx()?;
682    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
683    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
684    /// session.commit()?; // Both inserts committed atomically
685    /// ```
686    pub fn begin_tx(&mut self) -> Result<()> {
687        if self.current_tx.is_some() {
688            return Err(grafeo_common::utils::error::Error::Transaction(
689                grafeo_common::utils::error::TransactionError::InvalidState(
690                    "Transaction already active".to_string(),
691                ),
692            ));
693        }
694
695        let tx_id = self.tx_manager.begin();
696        self.current_tx = Some(tx_id);
697        Ok(())
698    }
699
700    /// Begins a transaction with a specific isolation level.
701    ///
702    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
703    ///
704    /// # Errors
705    ///
706    /// Returns an error if a transaction is already active.
707    pub fn begin_tx_with_isolation(
708        &mut self,
709        isolation_level: crate::transaction::IsolationLevel,
710    ) -> Result<()> {
711        if self.current_tx.is_some() {
712            return Err(grafeo_common::utils::error::Error::Transaction(
713                grafeo_common::utils::error::TransactionError::InvalidState(
714                    "Transaction already active".to_string(),
715                ),
716            ));
717        }
718
719        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
720        self.current_tx = Some(tx_id);
721        Ok(())
722    }
723
724    /// Commits the current transaction.
725    ///
726    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
727    ///
728    /// # Errors
729    ///
730    /// Returns an error if no transaction is active.
731    pub fn commit(&mut self) -> Result<()> {
732        let tx_id = self.current_tx.take().ok_or_else(|| {
733            grafeo_common::utils::error::Error::Transaction(
734                grafeo_common::utils::error::TransactionError::InvalidState(
735                    "No active transaction".to_string(),
736                ),
737            )
738        })?;
739
740        // Commit RDF store pending operations
741        #[cfg(feature = "rdf")]
742        self.rdf_store.commit_tx(tx_id);
743
744        self.tx_manager.commit(tx_id).map(|_| ())
745    }
746
747    /// Aborts the current transaction.
748    ///
749    /// Discards all changes since [`begin_tx`](Self::begin_tx).
750    ///
751    /// # Errors
752    ///
753    /// Returns an error if no transaction is active.
754    ///
755    /// # Examples
756    ///
757    /// ```ignore
758    /// use grafeo_engine::GrafeoDB;
759    ///
760    /// let db = GrafeoDB::new_in_memory();
761    /// let mut session = db.session();
762    ///
763    /// session.begin_tx()?;
764    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
765    /// session.rollback()?; // Insert is discarded
766    /// ```
767    pub fn rollback(&mut self) -> Result<()> {
768        let tx_id = self.current_tx.take().ok_or_else(|| {
769            grafeo_common::utils::error::Error::Transaction(
770                grafeo_common::utils::error::TransactionError::InvalidState(
771                    "No active transaction".to_string(),
772                ),
773            )
774        })?;
775
776        // Discard uncommitted versions in the LPG store
777        self.store.discard_uncommitted_versions(tx_id);
778
779        // Discard pending operations in the RDF store
780        #[cfg(feature = "rdf")]
781        self.rdf_store.rollback_tx(tx_id);
782
783        // Mark transaction as aborted in the manager
784        self.tx_manager.abort(tx_id)
785    }
786
787    /// Returns whether a transaction is active.
788    #[must_use]
789    pub fn in_transaction(&self) -> bool {
790        self.current_tx.is_some()
791    }
792
793    /// Sets auto-commit mode.
794    pub fn set_auto_commit(&mut self, auto_commit: bool) {
795        self.auto_commit = auto_commit;
796    }
797
798    /// Returns whether auto-commit is enabled.
799    #[must_use]
800    pub fn auto_commit(&self) -> bool {
801        self.auto_commit
802    }
803
804    /// Returns the current transaction context for MVCC visibility.
805    ///
806    /// Returns `(viewing_epoch, tx_id)` where:
807    /// - `viewing_epoch` is the epoch at which to check version visibility
808    /// - `tx_id` is the current transaction ID (if in a transaction)
809    #[must_use]
810    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
811        if let Some(tx_id) = self.current_tx {
812            // In a transaction - use the transaction's start epoch
813            let epoch = self
814                .tx_manager
815                .start_epoch(tx_id)
816                .unwrap_or_else(|| self.tx_manager.current_epoch());
817            (epoch, Some(tx_id))
818        } else {
819            // No transaction - use current epoch
820            (self.tx_manager.current_epoch(), None)
821        }
822    }
823
824    /// Creates a node directly (bypassing query execution).
825    ///
826    /// This is a low-level API for testing and direct manipulation.
827    /// If a transaction is active, the node will be versioned with the transaction ID.
828    pub fn create_node(&self, labels: &[&str]) -> NodeId {
829        let (epoch, tx_id) = self.get_transaction_context();
830        self.store
831            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
832    }
833
834    /// Creates a node with properties.
835    ///
836    /// If a transaction is active, the node will be versioned with the transaction ID.
837    pub fn create_node_with_props<'a>(
838        &self,
839        labels: &[&str],
840        properties: impl IntoIterator<Item = (&'a str, Value)>,
841    ) -> NodeId {
842        let (epoch, tx_id) = self.get_transaction_context();
843        self.store.create_node_with_props_versioned(
844            labels,
845            properties.into_iter().map(|(k, v)| (k, v)),
846            epoch,
847            tx_id.unwrap_or(TxId::SYSTEM),
848        )
849    }
850
851    /// Creates an edge between two nodes.
852    ///
853    /// This is a low-level API for testing and direct manipulation.
854    /// If a transaction is active, the edge will be versioned with the transaction ID.
855    pub fn create_edge(
856        &self,
857        src: NodeId,
858        dst: NodeId,
859        edge_type: &str,
860    ) -> grafeo_common::types::EdgeId {
861        let (epoch, tx_id) = self.get_transaction_context();
862        self.store
863            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
864    }
865
866    // =========================================================================
867    // Direct Lookup APIs (bypass query planning for O(1) point reads)
868    // =========================================================================
869
870    /// Gets a node by ID directly, bypassing query planning.
871    ///
872    /// This is the fastest way to retrieve a single node when you know its ID.
873    /// Skips parsing, binding, optimization, and physical planning entirely.
874    ///
875    /// # Performance
876    ///
877    /// - Time complexity: O(1) average case
878    /// - No lock contention (uses DashMap internally)
879    /// - ~20-30x faster than equivalent MATCH query
880    ///
881    /// # Example
882    ///
883    /// ```ignore
884    /// let session = db.session();
885    /// let node_id = session.create_node(&["Person"]);
886    ///
887    /// // Direct lookup - O(1), no query planning
888    /// let node = session.get_node(node_id);
889    /// assert!(node.is_some());
890    /// ```
891    #[must_use]
892    pub fn get_node(&self, id: NodeId) -> Option<Node> {
893        let (epoch, tx_id) = self.get_transaction_context();
894        self.store
895            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
896    }
897
898    /// Gets a single property from a node by ID, bypassing query planning.
899    ///
900    /// More efficient than `get_node()` when you only need one property,
901    /// as it avoids loading the full node with all properties.
902    ///
903    /// # Performance
904    ///
905    /// - Time complexity: O(1) average case
906    /// - No query planning overhead
907    ///
908    /// # Example
909    ///
910    /// ```ignore
911    /// let session = db.session();
912    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
913    ///
914    /// // Direct property access - O(1)
915    /// let name = session.get_node_property(id, "name");
916    /// assert_eq!(name, Some(Value::String("Alice".into())));
917    /// ```
918    #[must_use]
919    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
920        self.get_node(id)
921            .and_then(|node| node.get_property(key).cloned())
922    }
923
924    /// Gets an edge by ID directly, bypassing query planning.
925    ///
926    /// # Performance
927    ///
928    /// - Time complexity: O(1) average case
929    /// - No lock contention
930    #[must_use]
931    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
932        let (epoch, tx_id) = self.get_transaction_context();
933        self.store
934            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
935    }
936
937    /// Gets outgoing neighbors of a node directly, bypassing query planning.
938    ///
939    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
940    ///
941    /// # Performance
942    ///
943    /// - Time complexity: O(degree) where degree is the number of outgoing edges
944    /// - Uses adjacency index for direct access
945    /// - ~10-20x faster than equivalent MATCH query
946    ///
947    /// # Example
948    ///
949    /// ```ignore
950    /// let session = db.session();
951    /// let alice = session.create_node(&["Person"]);
952    /// let bob = session.create_node(&["Person"]);
953    /// session.create_edge(alice, bob, "KNOWS");
954    ///
955    /// // Direct neighbor lookup - O(degree)
956    /// let neighbors = session.get_neighbors_outgoing(alice);
957    /// assert_eq!(neighbors.len(), 1);
958    /// assert_eq!(neighbors[0].0, bob);
959    /// ```
960    #[must_use]
961    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
962        self.store.edges_from(node, Direction::Outgoing).collect()
963    }
964
965    /// Gets incoming neighbors of a node directly, bypassing query planning.
966    ///
967    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
968    ///
969    /// # Performance
970    ///
971    /// - Time complexity: O(degree) where degree is the number of incoming edges
972    /// - Uses backward adjacency index for direct access
973    #[must_use]
974    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
975        self.store.edges_from(node, Direction::Incoming).collect()
976    }
977
978    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
979    ///
980    /// # Example
981    ///
982    /// ```ignore
983    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
984    /// ```
985    #[must_use]
986    pub fn get_neighbors_outgoing_by_type(
987        &self,
988        node: NodeId,
989        edge_type: &str,
990    ) -> Vec<(NodeId, EdgeId)> {
991        self.store
992            .edges_from(node, Direction::Outgoing)
993            .filter(|(_, edge_id)| {
994                self.get_edge(*edge_id)
995                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
996            })
997            .collect()
998    }
999
1000    /// Checks if a node exists, bypassing query planning.
1001    ///
1002    /// # Performance
1003    ///
1004    /// - Time complexity: O(1)
1005    /// - Fastest existence check available
1006    #[must_use]
1007    pub fn node_exists(&self, id: NodeId) -> bool {
1008        self.get_node(id).is_some()
1009    }
1010
1011    /// Checks if an edge exists, bypassing query planning.
1012    #[must_use]
1013    pub fn edge_exists(&self, id: EdgeId) -> bool {
1014        self.get_edge(id).is_some()
1015    }
1016
1017    /// Gets the degree (number of edges) of a node.
1018    ///
1019    /// Returns (outgoing_degree, incoming_degree).
1020    #[must_use]
1021    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1022        let out = self.store.out_degree(node);
1023        let in_degree = self.store.in_degree(node);
1024        (out, in_degree)
1025    }
1026
1027    /// Batch lookup of multiple nodes by ID.
1028    ///
1029    /// More efficient than calling `get_node()` in a loop because it
1030    /// amortizes overhead.
1031    ///
1032    /// # Performance
1033    ///
1034    /// - Time complexity: O(n) where n is the number of IDs
1035    /// - Better cache utilization than individual lookups
1036    #[must_use]
1037    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1038        let (epoch, tx_id) = self.get_transaction_context();
1039        let tx = tx_id.unwrap_or(TxId::SYSTEM);
1040        ids.iter()
1041            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1042            .collect()
1043    }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048    use crate::database::GrafeoDB;
1049
1050    #[test]
1051    fn test_session_create_node() {
1052        let db = GrafeoDB::new_in_memory();
1053        let session = db.session();
1054
1055        let id = session.create_node(&["Person"]);
1056        assert!(id.is_valid());
1057        assert_eq!(db.node_count(), 1);
1058    }
1059
1060    #[test]
1061    fn test_session_transaction() {
1062        let db = GrafeoDB::new_in_memory();
1063        let mut session = db.session();
1064
1065        assert!(!session.in_transaction());
1066
1067        session.begin_tx().unwrap();
1068        assert!(session.in_transaction());
1069
1070        session.commit().unwrap();
1071        assert!(!session.in_transaction());
1072    }
1073
1074    #[test]
1075    fn test_session_transaction_context() {
1076        let db = GrafeoDB::new_in_memory();
1077        let mut session = db.session();
1078
1079        // Without transaction - context should have current epoch and no tx_id
1080        let (_epoch1, tx_id1) = session.get_transaction_context();
1081        assert!(tx_id1.is_none());
1082
1083        // Start a transaction
1084        session.begin_tx().unwrap();
1085        let (epoch2, tx_id2) = session.get_transaction_context();
1086        assert!(tx_id2.is_some());
1087        // Transaction should have a valid epoch
1088        let _ = epoch2; // Use the variable
1089
1090        // Commit and verify
1091        session.commit().unwrap();
1092        let (epoch3, tx_id3) = session.get_transaction_context();
1093        assert!(tx_id3.is_none());
1094        // Epoch should have advanced after commit
1095        assert!(epoch3.as_u64() >= epoch2.as_u64());
1096    }
1097
1098    #[test]
1099    fn test_session_rollback() {
1100        let db = GrafeoDB::new_in_memory();
1101        let mut session = db.session();
1102
1103        session.begin_tx().unwrap();
1104        session.rollback().unwrap();
1105        assert!(!session.in_transaction());
1106    }
1107
1108    #[test]
1109    fn test_session_rollback_discards_versions() {
1110        use grafeo_common::types::TxId;
1111
1112        let db = GrafeoDB::new_in_memory();
1113
1114        // Create a node outside of any transaction (at system level)
1115        let node_before = db.store().create_node(&["Person"]);
1116        assert!(node_before.is_valid());
1117        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1118
1119        // Start a transaction
1120        let mut session = db.session();
1121        session.begin_tx().unwrap();
1122        let tx_id = session.current_tx.unwrap();
1123
1124        // Create a node versioned with the transaction's ID
1125        let epoch = db.store().current_epoch();
1126        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1127        assert!(node_in_tx.is_valid());
1128
1129        // Should see 2 nodes at this point
1130        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1131
1132        // Rollback the transaction
1133        session.rollback().unwrap();
1134        assert!(!session.in_transaction());
1135
1136        // The node created in the transaction should be discarded
1137        // Only the first node should remain visible
1138        let count_after = db.node_count();
1139        assert_eq!(
1140            count_after, 1,
1141            "Rollback should discard uncommitted node, but got {count_after}"
1142        );
1143
1144        // The original node should still be accessible
1145        let current_epoch = db.store().current_epoch();
1146        assert!(
1147            db.store()
1148                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1149                .is_some(),
1150            "Original node should still exist"
1151        );
1152
1153        // The node created in the transaction should not be accessible
1154        assert!(
1155            db.store()
1156                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1157                .is_none(),
1158            "Transaction node should be gone"
1159        );
1160    }
1161
1162    #[test]
1163    fn test_session_create_node_in_transaction() {
1164        // Test that session.create_node() is transaction-aware
1165        let db = GrafeoDB::new_in_memory();
1166
1167        // Create a node outside of any transaction
1168        let node_before = db.create_node(&["Person"]);
1169        assert!(node_before.is_valid());
1170        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1171
1172        // Start a transaction and create a node through the session
1173        let mut session = db.session();
1174        session.begin_tx().unwrap();
1175
1176        // Create a node through session.create_node() - should be versioned with tx
1177        let node_in_tx = session.create_node(&["Person"]);
1178        assert!(node_in_tx.is_valid());
1179
1180        // Should see 2 nodes at this point
1181        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1182
1183        // Rollback the transaction
1184        session.rollback().unwrap();
1185
1186        // The node created via session.create_node() should be discarded
1187        let count_after = db.node_count();
1188        assert_eq!(
1189            count_after, 1,
1190            "Rollback should discard node created via session.create_node(), but got {count_after}"
1191        );
1192    }
1193
1194    #[test]
1195    fn test_session_create_node_with_props_in_transaction() {
1196        use grafeo_common::types::Value;
1197
1198        // Test that session.create_node_with_props() is transaction-aware
1199        let db = GrafeoDB::new_in_memory();
1200
1201        // Create a node outside of any transaction
1202        db.create_node(&["Person"]);
1203        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1204
1205        // Start a transaction and create a node with properties
1206        let mut session = db.session();
1207        session.begin_tx().unwrap();
1208
1209        let node_in_tx =
1210            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1211        assert!(node_in_tx.is_valid());
1212
1213        // Should see 2 nodes
1214        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1215
1216        // Rollback the transaction
1217        session.rollback().unwrap();
1218
1219        // The node should be discarded
1220        let count_after = db.node_count();
1221        assert_eq!(
1222            count_after, 1,
1223            "Rollback should discard node created via session.create_node_with_props()"
1224        );
1225    }
1226
1227    #[cfg(feature = "gql")]
1228    mod gql_tests {
1229        use super::*;
1230
1231        #[test]
1232        fn test_gql_query_execution() {
1233            let db = GrafeoDB::new_in_memory();
1234            let session = db.session();
1235
1236            // Create some test data
1237            session.create_node(&["Person"]);
1238            session.create_node(&["Person"]);
1239            session.create_node(&["Animal"]);
1240
1241            // Execute a GQL query
1242            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1243
1244            // Should return 2 Person nodes
1245            assert_eq!(result.row_count(), 2);
1246            assert_eq!(result.column_count(), 1);
1247            assert_eq!(result.columns[0], "n");
1248        }
1249
1250        #[test]
1251        fn test_gql_empty_result() {
1252            let db = GrafeoDB::new_in_memory();
1253            let session = db.session();
1254
1255            // No data in database
1256            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1257
1258            assert_eq!(result.row_count(), 0);
1259        }
1260
1261        #[test]
1262        fn test_gql_parse_error() {
1263            let db = GrafeoDB::new_in_memory();
1264            let session = db.session();
1265
1266            // Invalid GQL syntax
1267            let result = session.execute("MATCH (n RETURN n");
1268
1269            assert!(result.is_err());
1270        }
1271
1272        #[test]
1273        fn test_gql_relationship_traversal() {
1274            let db = GrafeoDB::new_in_memory();
1275            let session = db.session();
1276
1277            // Create a graph: Alice -> Bob, Alice -> Charlie
1278            let alice = session.create_node(&["Person"]);
1279            let bob = session.create_node(&["Person"]);
1280            let charlie = session.create_node(&["Person"]);
1281
1282            session.create_edge(alice, bob, "KNOWS");
1283            session.create_edge(alice, charlie, "KNOWS");
1284
1285            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1286            let result = session
1287                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1288                .unwrap();
1289
1290            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1291            assert_eq!(result.row_count(), 2);
1292            assert_eq!(result.column_count(), 2);
1293            assert_eq!(result.columns[0], "a");
1294            assert_eq!(result.columns[1], "b");
1295        }
1296
1297        #[test]
1298        fn test_gql_relationship_with_type_filter() {
1299            let db = GrafeoDB::new_in_memory();
1300            let session = db.session();
1301
1302            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1303            let alice = session.create_node(&["Person"]);
1304            let bob = session.create_node(&["Person"]);
1305            let charlie = session.create_node(&["Person"]);
1306
1307            session.create_edge(alice, bob, "KNOWS");
1308            session.create_edge(alice, charlie, "WORKS_WITH");
1309
1310            // Query only KNOWS relationships
1311            let result = session
1312                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1313                .unwrap();
1314
1315            // Should return only 1 row (Alice->Bob)
1316            assert_eq!(result.row_count(), 1);
1317        }
1318
1319        #[test]
1320        fn test_gql_semantic_error_undefined_variable() {
1321            let db = GrafeoDB::new_in_memory();
1322            let session = db.session();
1323
1324            // Reference undefined variable 'x' in RETURN
1325            let result = session.execute("MATCH (n:Person) RETURN x");
1326
1327            // Should fail with semantic error
1328            assert!(result.is_err());
1329            let Err(err) = result else {
1330                panic!("Expected error")
1331            };
1332            assert!(
1333                err.to_string().contains("Undefined variable"),
1334                "Expected undefined variable error, got: {}",
1335                err
1336            );
1337        }
1338
1339        #[test]
1340        fn test_gql_where_clause_property_filter() {
1341            use grafeo_common::types::Value;
1342
1343            let db = GrafeoDB::new_in_memory();
1344            let session = db.session();
1345
1346            // Create people with ages
1347            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1348            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1349            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1350
1351            // Query with WHERE clause: age > 30
1352            let result = session
1353                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1354                .unwrap();
1355
1356            // Should return 2 people (ages 35 and 45)
1357            assert_eq!(result.row_count(), 2);
1358        }
1359
1360        #[test]
1361        fn test_gql_where_clause_equality() {
1362            use grafeo_common::types::Value;
1363
1364            let db = GrafeoDB::new_in_memory();
1365            let session = db.session();
1366
1367            // Create people with names
1368            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1369            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1370            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1371
1372            // Query with WHERE clause: name = "Alice"
1373            let result = session
1374                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1375                .unwrap();
1376
1377            // Should return 2 people named Alice
1378            assert_eq!(result.row_count(), 2);
1379        }
1380
1381        #[test]
1382        fn test_gql_return_property_access() {
1383            use grafeo_common::types::Value;
1384
1385            let db = GrafeoDB::new_in_memory();
1386            let session = db.session();
1387
1388            // Create people with names and ages
1389            session.create_node_with_props(
1390                &["Person"],
1391                [
1392                    ("name", Value::String("Alice".into())),
1393                    ("age", Value::Int64(30)),
1394                ],
1395            );
1396            session.create_node_with_props(
1397                &["Person"],
1398                [
1399                    ("name", Value::String("Bob".into())),
1400                    ("age", Value::Int64(25)),
1401                ],
1402            );
1403
1404            // Query returning properties
1405            let result = session
1406                .execute("MATCH (n:Person) RETURN n.name, n.age")
1407                .unwrap();
1408
1409            // Should return 2 rows with name and age columns
1410            assert_eq!(result.row_count(), 2);
1411            assert_eq!(result.column_count(), 2);
1412            assert_eq!(result.columns[0], "n.name");
1413            assert_eq!(result.columns[1], "n.age");
1414
1415            // Check that we get actual values
1416            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1417            assert!(names.contains(&&Value::String("Alice".into())));
1418            assert!(names.contains(&&Value::String("Bob".into())));
1419        }
1420
1421        #[test]
1422        fn test_gql_return_mixed_expressions() {
1423            use grafeo_common::types::Value;
1424
1425            let db = GrafeoDB::new_in_memory();
1426            let session = db.session();
1427
1428            // Create a person
1429            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1430
1431            // Query returning both node and property
1432            let result = session
1433                .execute("MATCH (n:Person) RETURN n, n.name")
1434                .unwrap();
1435
1436            assert_eq!(result.row_count(), 1);
1437            assert_eq!(result.column_count(), 2);
1438            assert_eq!(result.columns[0], "n");
1439            assert_eq!(result.columns[1], "n.name");
1440
1441            // Second column should be the name
1442            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1443        }
1444    }
1445
1446    #[cfg(feature = "cypher")]
1447    mod cypher_tests {
1448        use super::*;
1449
1450        #[test]
1451        fn test_cypher_query_execution() {
1452            let db = GrafeoDB::new_in_memory();
1453            let session = db.session();
1454
1455            // Create some test data
1456            session.create_node(&["Person"]);
1457            session.create_node(&["Person"]);
1458            session.create_node(&["Animal"]);
1459
1460            // Execute a Cypher query
1461            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1462
1463            // Should return 2 Person nodes
1464            assert_eq!(result.row_count(), 2);
1465            assert_eq!(result.column_count(), 1);
1466            assert_eq!(result.columns[0], "n");
1467        }
1468
1469        #[test]
1470        fn test_cypher_empty_result() {
1471            let db = GrafeoDB::new_in_memory();
1472            let session = db.session();
1473
1474            // No data in database
1475            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1476
1477            assert_eq!(result.row_count(), 0);
1478        }
1479
1480        #[test]
1481        fn test_cypher_parse_error() {
1482            let db = GrafeoDB::new_in_memory();
1483            let session = db.session();
1484
1485            // Invalid Cypher syntax
1486            let result = session.execute_cypher("MATCH (n RETURN n");
1487
1488            assert!(result.is_err());
1489        }
1490    }
1491
1492    // ==================== Direct Lookup API Tests ====================
1493
1494    mod direct_lookup_tests {
1495        use super::*;
1496        use grafeo_common::types::Value;
1497
1498        #[test]
1499        fn test_get_node() {
1500            let db = GrafeoDB::new_in_memory();
1501            let session = db.session();
1502
1503            let id = session.create_node(&["Person"]);
1504            let node = session.get_node(id);
1505
1506            assert!(node.is_some());
1507            let node = node.unwrap();
1508            assert_eq!(node.id, id);
1509        }
1510
1511        #[test]
1512        fn test_get_node_not_found() {
1513            use grafeo_common::types::NodeId;
1514
1515            let db = GrafeoDB::new_in_memory();
1516            let session = db.session();
1517
1518            // Try to get a non-existent node
1519            let node = session.get_node(NodeId::new(9999));
1520            assert!(node.is_none());
1521        }
1522
1523        #[test]
1524        fn test_get_node_property() {
1525            let db = GrafeoDB::new_in_memory();
1526            let session = db.session();
1527
1528            let id = session
1529                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1530
1531            let name = session.get_node_property(id, "name");
1532            assert_eq!(name, Some(Value::String("Alice".into())));
1533
1534            // Non-existent property
1535            let missing = session.get_node_property(id, "missing");
1536            assert!(missing.is_none());
1537        }
1538
1539        #[test]
1540        fn test_get_edge() {
1541            let db = GrafeoDB::new_in_memory();
1542            let session = db.session();
1543
1544            let alice = session.create_node(&["Person"]);
1545            let bob = session.create_node(&["Person"]);
1546            let edge_id = session.create_edge(alice, bob, "KNOWS");
1547
1548            let edge = session.get_edge(edge_id);
1549            assert!(edge.is_some());
1550            let edge = edge.unwrap();
1551            assert_eq!(edge.id, edge_id);
1552            assert_eq!(edge.src, alice);
1553            assert_eq!(edge.dst, bob);
1554        }
1555
1556        #[test]
1557        fn test_get_edge_not_found() {
1558            use grafeo_common::types::EdgeId;
1559
1560            let db = GrafeoDB::new_in_memory();
1561            let session = db.session();
1562
1563            let edge = session.get_edge(EdgeId::new(9999));
1564            assert!(edge.is_none());
1565        }
1566
1567        #[test]
1568        fn test_get_neighbors_outgoing() {
1569            let db = GrafeoDB::new_in_memory();
1570            let session = db.session();
1571
1572            let alice = session.create_node(&["Person"]);
1573            let bob = session.create_node(&["Person"]);
1574            let carol = session.create_node(&["Person"]);
1575
1576            session.create_edge(alice, bob, "KNOWS");
1577            session.create_edge(alice, carol, "KNOWS");
1578
1579            let neighbors = session.get_neighbors_outgoing(alice);
1580            assert_eq!(neighbors.len(), 2);
1581
1582            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1583            assert!(neighbor_ids.contains(&bob));
1584            assert!(neighbor_ids.contains(&carol));
1585        }
1586
1587        #[test]
1588        fn test_get_neighbors_incoming() {
1589            let db = GrafeoDB::new_in_memory();
1590            let session = db.session();
1591
1592            let alice = session.create_node(&["Person"]);
1593            let bob = session.create_node(&["Person"]);
1594            let carol = session.create_node(&["Person"]);
1595
1596            session.create_edge(bob, alice, "KNOWS");
1597            session.create_edge(carol, alice, "KNOWS");
1598
1599            let neighbors = session.get_neighbors_incoming(alice);
1600            assert_eq!(neighbors.len(), 2);
1601
1602            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1603            assert!(neighbor_ids.contains(&bob));
1604            assert!(neighbor_ids.contains(&carol));
1605        }
1606
1607        #[test]
1608        fn test_get_neighbors_outgoing_by_type() {
1609            let db = GrafeoDB::new_in_memory();
1610            let session = db.session();
1611
1612            let alice = session.create_node(&["Person"]);
1613            let bob = session.create_node(&["Person"]);
1614            let company = session.create_node(&["Company"]);
1615
1616            session.create_edge(alice, bob, "KNOWS");
1617            session.create_edge(alice, company, "WORKS_AT");
1618
1619            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1620            assert_eq!(knows_neighbors.len(), 1);
1621            assert_eq!(knows_neighbors[0].0, bob);
1622
1623            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1624            assert_eq!(works_neighbors.len(), 1);
1625            assert_eq!(works_neighbors[0].0, company);
1626
1627            // No edges of this type
1628            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1629            assert!(no_neighbors.is_empty());
1630        }
1631
1632        #[test]
1633        fn test_node_exists() {
1634            use grafeo_common::types::NodeId;
1635
1636            let db = GrafeoDB::new_in_memory();
1637            let session = db.session();
1638
1639            let id = session.create_node(&["Person"]);
1640
1641            assert!(session.node_exists(id));
1642            assert!(!session.node_exists(NodeId::new(9999)));
1643        }
1644
1645        #[test]
1646        fn test_edge_exists() {
1647            use grafeo_common::types::EdgeId;
1648
1649            let db = GrafeoDB::new_in_memory();
1650            let session = db.session();
1651
1652            let alice = session.create_node(&["Person"]);
1653            let bob = session.create_node(&["Person"]);
1654            let edge_id = session.create_edge(alice, bob, "KNOWS");
1655
1656            assert!(session.edge_exists(edge_id));
1657            assert!(!session.edge_exists(EdgeId::new(9999)));
1658        }
1659
1660        #[test]
1661        fn test_get_degree() {
1662            let db = GrafeoDB::new_in_memory();
1663            let session = db.session();
1664
1665            let alice = session.create_node(&["Person"]);
1666            let bob = session.create_node(&["Person"]);
1667            let carol = session.create_node(&["Person"]);
1668
1669            // Alice knows Bob and Carol (2 outgoing)
1670            session.create_edge(alice, bob, "KNOWS");
1671            session.create_edge(alice, carol, "KNOWS");
1672            // Bob knows Alice (1 incoming for Alice)
1673            session.create_edge(bob, alice, "KNOWS");
1674
1675            let (out_degree, in_degree) = session.get_degree(alice);
1676            assert_eq!(out_degree, 2);
1677            assert_eq!(in_degree, 1);
1678
1679            // Node with no edges
1680            let lonely = session.create_node(&["Person"]);
1681            let (out, in_deg) = session.get_degree(lonely);
1682            assert_eq!(out, 0);
1683            assert_eq!(in_deg, 0);
1684        }
1685
1686        #[test]
1687        fn test_get_nodes_batch() {
1688            let db = GrafeoDB::new_in_memory();
1689            let session = db.session();
1690
1691            let alice = session.create_node(&["Person"]);
1692            let bob = session.create_node(&["Person"]);
1693            let carol = session.create_node(&["Person"]);
1694
1695            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1696            assert_eq!(nodes.len(), 3);
1697            assert!(nodes[0].is_some());
1698            assert!(nodes[1].is_some());
1699            assert!(nodes[2].is_some());
1700
1701            // With non-existent node
1702            use grafeo_common::types::NodeId;
1703            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1704            assert_eq!(nodes_with_missing.len(), 3);
1705            assert!(nodes_with_missing[0].is_some());
1706            assert!(nodes_with_missing[1].is_none()); // Missing node
1707            assert!(nodes_with_missing[2].is_some());
1708        }
1709
1710        #[test]
1711        fn test_auto_commit_setting() {
1712            let db = GrafeoDB::new_in_memory();
1713            let mut session = db.session();
1714
1715            // Default is auto-commit enabled
1716            assert!(session.auto_commit());
1717
1718            session.set_auto_commit(false);
1719            assert!(!session.auto_commit());
1720
1721            session.set_auto_commit(true);
1722            assert!(session.auto_commit());
1723        }
1724
1725        #[test]
1726        fn test_transaction_double_begin_error() {
1727            let db = GrafeoDB::new_in_memory();
1728            let mut session = db.session();
1729
1730            session.begin_tx().unwrap();
1731            let result = session.begin_tx();
1732
1733            assert!(result.is_err());
1734            // Clean up
1735            session.rollback().unwrap();
1736        }
1737
1738        #[test]
1739        fn test_commit_without_transaction_error() {
1740            let db = GrafeoDB::new_in_memory();
1741            let mut session = db.session();
1742
1743            let result = session.commit();
1744            assert!(result.is_err());
1745        }
1746
1747        #[test]
1748        fn test_rollback_without_transaction_error() {
1749            let db = GrafeoDB::new_in_memory();
1750            let mut session = db.session();
1751
1752            let result = session.rollback();
1753            assert!(result.is_err());
1754        }
1755
1756        #[test]
1757        fn test_create_edge_in_transaction() {
1758            let db = GrafeoDB::new_in_memory();
1759            let mut session = db.session();
1760
1761            // Create nodes outside transaction
1762            let alice = session.create_node(&["Person"]);
1763            let bob = session.create_node(&["Person"]);
1764
1765            // Create edge in transaction
1766            session.begin_tx().unwrap();
1767            let edge_id = session.create_edge(alice, bob, "KNOWS");
1768
1769            // Edge should be visible in the transaction
1770            assert!(session.edge_exists(edge_id));
1771
1772            // Commit
1773            session.commit().unwrap();
1774
1775            // Edge should still be visible
1776            assert!(session.edge_exists(edge_id));
1777        }
1778
1779        #[test]
1780        fn test_neighbors_empty_node() {
1781            let db = GrafeoDB::new_in_memory();
1782            let session = db.session();
1783
1784            let lonely = session.create_node(&["Person"]);
1785
1786            assert!(session.get_neighbors_outgoing(lonely).is_empty());
1787            assert!(session.get_neighbors_incoming(lonely).is_empty());
1788            assert!(
1789                session
1790                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1791                    .is_empty()
1792            );
1793        }
1794    }
1795}