Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::{AdaptiveConfig, GraphModel};
19use crate::database::QueryResult;
20use crate::query::cache::QueryCache;
21use crate::transaction::TransactionManager;
22
23/// Your handle to the database - execute queries and manage transactions.
24///
25/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
26/// tracks its own transaction state, so you can have multiple concurrent
27/// sessions without them interfering.
28pub struct Session {
29    /// The underlying store.
30    store: Arc<LpgStore>,
31    /// RDF triple store (if RDF feature is enabled).
32    #[cfg(feature = "rdf")]
33    rdf_store: Arc<RdfStore>,
34    /// Transaction manager.
35    tx_manager: Arc<TransactionManager>,
36    /// Query cache shared across sessions.
37    query_cache: Arc<QueryCache>,
38    /// Current transaction ID (if any).
39    current_tx: Option<TxId>,
40    /// Whether the session is in auto-commit mode.
41    auto_commit: bool,
42    /// Adaptive execution configuration.
43    #[allow(dead_code)]
44    adaptive_config: AdaptiveConfig,
45    /// Whether to use factorized execution for multi-hop queries.
46    factorized_execution: bool,
47    /// The graph data model this session operates on.
48    graph_model: GraphModel,
49    /// Maximum time a query may run before being cancelled.
50    query_timeout: Option<Duration>,
51    /// Shared commit counter for triggering auto-GC.
52    commit_counter: Arc<AtomicUsize>,
53    /// GC every N commits (0 = disabled).
54    gc_interval: usize,
55    /// CDC log for change tracking.
56    #[cfg(feature = "cdc")]
57    cdc_log: Arc<crate::cdc::CdcLog>,
58}
59
60impl Session {
61    /// Creates a new session with adaptive execution configuration.
62    #[allow(dead_code, clippy::too_many_arguments)]
63    pub(crate) fn with_adaptive(
64        store: Arc<LpgStore>,
65        tx_manager: Arc<TransactionManager>,
66        query_cache: Arc<QueryCache>,
67        adaptive_config: AdaptiveConfig,
68        factorized_execution: bool,
69        graph_model: GraphModel,
70        query_timeout: Option<Duration>,
71        commit_counter: Arc<AtomicUsize>,
72        gc_interval: usize,
73    ) -> Self {
74        Self {
75            store,
76            #[cfg(feature = "rdf")]
77            rdf_store: Arc::new(RdfStore::new()),
78            tx_manager,
79            query_cache,
80            current_tx: None,
81            auto_commit: true,
82            adaptive_config,
83            factorized_execution,
84            graph_model,
85            query_timeout,
86            commit_counter,
87            gc_interval,
88            #[cfg(feature = "cdc")]
89            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
90        }
91    }
92
93    /// Sets the CDC log for this session (shared with the database).
94    #[cfg(feature = "cdc")]
95    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
96        self.cdc_log = cdc_log;
97    }
98
99    /// Creates a new session with RDF store and adaptive configuration.
100    #[cfg(feature = "rdf")]
101    #[allow(clippy::too_many_arguments)]
102    pub(crate) fn with_rdf_store_and_adaptive(
103        store: Arc<LpgStore>,
104        rdf_store: Arc<RdfStore>,
105        tx_manager: Arc<TransactionManager>,
106        query_cache: Arc<QueryCache>,
107        adaptive_config: AdaptiveConfig,
108        factorized_execution: bool,
109        graph_model: GraphModel,
110        query_timeout: Option<Duration>,
111        commit_counter: Arc<AtomicUsize>,
112        gc_interval: usize,
113    ) -> Self {
114        Self {
115            store,
116            rdf_store,
117            tx_manager,
118            query_cache,
119            current_tx: None,
120            auto_commit: true,
121            adaptive_config,
122            factorized_execution,
123            graph_model,
124            query_timeout,
125            commit_counter,
126            gc_interval,
127            #[cfg(feature = "cdc")]
128            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
129        }
130    }
131
132    /// Returns the graph model this session operates on.
133    #[must_use]
134    pub fn graph_model(&self) -> GraphModel {
135        self.graph_model
136    }
137
138    /// Checks that the session's graph model supports LPG operations.
139    fn require_lpg(&self, language: &str) -> Result<()> {
140        if self.graph_model == GraphModel::Rdf {
141            return Err(grafeo_common::utils::error::Error::Internal(format!(
142                "This is an RDF database. {language} queries require an LPG database."
143            )));
144        }
145        Ok(())
146    }
147
148    /// Executes a GQL query.
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if the query fails to parse or execute.
153    ///
154    /// # Examples
155    ///
156    /// ```no_run
157    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
158    /// use grafeo_engine::GrafeoDB;
159    ///
160    /// let db = GrafeoDB::new_in_memory();
161    /// let session = db.session();
162    ///
163    /// // Create a node
164    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
165    ///
166    /// // Query nodes
167    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
168    /// for row in &result.rows {
169    ///     println!("{:?}", row);
170    /// }
171    /// # Ok(())
172    /// # }
173    /// ```
174    #[cfg(feature = "gql")]
175    pub fn execute(&self, query: &str) -> Result<QueryResult> {
176        self.require_lpg("GQL")?;
177
178        use crate::query::{
179            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
180            optimizer::Optimizer, processor::QueryLanguage,
181        };
182
183        let start_time = std::time::Instant::now();
184
185        // Create cache key for this query
186        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
187
188        // Try to get cached optimized plan
189        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
190            // Cache hit - skip parsing, translation, binding, and optimization
191            cached_plan
192        } else {
193            // Cache miss - run full pipeline
194
195            // Parse and translate the query to a logical plan
196            let logical_plan = gql_translator::translate(query)?;
197
198            // Semantic validation
199            let mut binder = Binder::new();
200            let _binding_context = binder.bind(&logical_plan)?;
201
202            // Optimize the plan
203            let optimizer = Optimizer::from_store(&self.store);
204            let plan = optimizer.optimize(logical_plan)?;
205
206            // Cache the optimized plan for future use
207            self.query_cache.put_optimized(cache_key, plan.clone());
208
209            plan
210        };
211
212        // Get transaction context for MVCC visibility
213        let (viewing_epoch, tx_id) = self.get_transaction_context();
214
215        // Convert to physical plan with transaction context
216        // (Physical planning cannot be cached as it depends on transaction state)
217        let planner = Planner::with_context(
218            Arc::clone(&self.store),
219            Arc::clone(&self.tx_manager),
220            tx_id,
221            viewing_epoch,
222        )
223        .with_factorized_execution(self.factorized_execution);
224        let mut physical_plan = planner.plan(&optimized_plan)?;
225
226        // Execute the plan
227        let executor = Executor::with_columns(physical_plan.columns.clone())
228            .with_deadline(self.query_deadline());
229        let mut result = executor.execute(physical_plan.operator.as_mut())?;
230
231        // Add execution metrics
232        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
233        let rows_scanned = result.rows.len() as u64;
234        result.execution_time_ms = Some(elapsed_ms);
235        result.rows_scanned = Some(rows_scanned);
236
237        Ok(result)
238    }
239
240    /// Executes a GQL query with parameters.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if the query fails to parse or execute.
245    #[cfg(feature = "gql")]
246    pub fn execute_with_params(
247        &self,
248        query: &str,
249        params: std::collections::HashMap<String, Value>,
250    ) -> Result<QueryResult> {
251        self.require_lpg("GQL")?;
252
253        use crate::query::processor::{QueryLanguage, QueryProcessor};
254
255        // Get transaction context for MVCC visibility
256        let (viewing_epoch, tx_id) = self.get_transaction_context();
257
258        // Create processor with transaction context
259        let processor =
260            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
261
262        // Apply transaction context if in a transaction
263        let processor = if let Some(tx_id) = tx_id {
264            processor.with_tx_context(viewing_epoch, tx_id)
265        } else {
266            processor
267        };
268
269        processor.process(query, QueryLanguage::Gql, Some(&params))
270    }
271
272    /// Executes a GQL query with parameters.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if no query language is enabled.
277    #[cfg(not(any(feature = "gql", feature = "cypher")))]
278    pub fn execute_with_params(
279        &self,
280        _query: &str,
281        _params: std::collections::HashMap<String, Value>,
282    ) -> Result<QueryResult> {
283        Err(grafeo_common::utils::error::Error::Internal(
284            "No query language enabled".to_string(),
285        ))
286    }
287
288    /// Executes a GQL query.
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if no query language is enabled.
293    #[cfg(not(any(feature = "gql", feature = "cypher")))]
294    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
295        Err(grafeo_common::utils::error::Error::Internal(
296            "No query language enabled".to_string(),
297        ))
298    }
299
300    /// Executes a Cypher query.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the query fails to parse or execute.
305    #[cfg(feature = "cypher")]
306    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
307        use crate::query::{
308            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
309            optimizer::Optimizer, processor::QueryLanguage,
310        };
311
312        // Create cache key for this query
313        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
314
315        // Try to get cached optimized plan
316        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
317            cached_plan
318        } else {
319            // Parse and translate the query to a logical plan
320            let logical_plan = cypher_translator::translate(query)?;
321
322            // Semantic validation
323            let mut binder = Binder::new();
324            let _binding_context = binder.bind(&logical_plan)?;
325
326            // Optimize the plan
327            let optimizer = Optimizer::from_store(&self.store);
328            let plan = optimizer.optimize(logical_plan)?;
329
330            // Cache the optimized plan
331            self.query_cache.put_optimized(cache_key, plan.clone());
332
333            plan
334        };
335
336        // Get transaction context for MVCC visibility
337        let (viewing_epoch, tx_id) = self.get_transaction_context();
338
339        // Convert to physical plan with transaction context
340        let planner = Planner::with_context(
341            Arc::clone(&self.store),
342            Arc::clone(&self.tx_manager),
343            tx_id,
344            viewing_epoch,
345        )
346        .with_factorized_execution(self.factorized_execution);
347        let mut physical_plan = planner.plan(&optimized_plan)?;
348
349        // Execute the plan
350        let executor = Executor::with_columns(physical_plan.columns.clone())
351            .with_deadline(self.query_deadline());
352        let result = executor.execute(physical_plan.operator.as_mut())?;
353        Ok(result)
354    }
355
356    /// Executes a Gremlin query.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the query fails to parse or execute.
361    ///
362    /// # Examples
363    ///
364    /// ```no_run
365    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
366    /// use grafeo_engine::GrafeoDB;
367    ///
368    /// let db = GrafeoDB::new_in_memory();
369    /// let session = db.session();
370    ///
371    /// // Create some nodes first
372    /// session.create_node(&["Person"]);
373    ///
374    /// // Query using Gremlin
375    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
376    /// # Ok(())
377    /// # }
378    /// ```
379    #[cfg(feature = "gremlin")]
380    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
381        use crate::query::{
382            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
383        };
384
385        // Parse and translate the query to a logical plan
386        let logical_plan = gremlin_translator::translate(query)?;
387
388        // Semantic validation
389        let mut binder = Binder::new();
390        let _binding_context = binder.bind(&logical_plan)?;
391
392        // Optimize the plan
393        let optimizer = Optimizer::from_store(&self.store);
394        let optimized_plan = optimizer.optimize(logical_plan)?;
395
396        // Get transaction context for MVCC visibility
397        let (viewing_epoch, tx_id) = self.get_transaction_context();
398
399        // Convert to physical plan with transaction context
400        let planner = Planner::with_context(
401            Arc::clone(&self.store),
402            Arc::clone(&self.tx_manager),
403            tx_id,
404            viewing_epoch,
405        )
406        .with_factorized_execution(self.factorized_execution);
407        let mut physical_plan = planner.plan(&optimized_plan)?;
408
409        // Execute the plan
410        let executor = Executor::with_columns(physical_plan.columns.clone())
411            .with_deadline(self.query_deadline());
412        let result = executor.execute(physical_plan.operator.as_mut())?;
413        Ok(result)
414    }
415
416    /// Executes a Gremlin query with parameters.
417    ///
418    /// # Errors
419    ///
420    /// Returns an error if the query fails to parse or execute.
421    #[cfg(feature = "gremlin")]
422    pub fn execute_gremlin_with_params(
423        &self,
424        query: &str,
425        params: std::collections::HashMap<String, Value>,
426    ) -> Result<QueryResult> {
427        use crate::query::processor::{QueryLanguage, QueryProcessor};
428
429        // Get transaction context for MVCC visibility
430        let (viewing_epoch, tx_id) = self.get_transaction_context();
431
432        // Create processor with transaction context
433        let processor =
434            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
435
436        // Apply transaction context if in a transaction
437        let processor = if let Some(tx_id) = tx_id {
438            processor.with_tx_context(viewing_epoch, tx_id)
439        } else {
440            processor
441        };
442
443        processor.process(query, QueryLanguage::Gremlin, Some(&params))
444    }
445
446    /// Executes a GraphQL query against the LPG store.
447    ///
448    /// # Errors
449    ///
450    /// Returns an error if the query fails to parse or execute.
451    ///
452    /// # Examples
453    ///
454    /// ```no_run
455    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
456    /// use grafeo_engine::GrafeoDB;
457    ///
458    /// let db = GrafeoDB::new_in_memory();
459    /// let session = db.session();
460    ///
461    /// // Create some nodes first
462    /// session.create_node(&["User"]);
463    ///
464    /// // Query using GraphQL
465    /// let result = session.execute_graphql("query { user { id name } }")?;
466    /// # Ok(())
467    /// # }
468    /// ```
469    #[cfg(feature = "graphql")]
470    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
471        use crate::query::{
472            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
473        };
474
475        // Parse and translate the query to a logical plan
476        let logical_plan = graphql_translator::translate(query)?;
477
478        // Semantic validation
479        let mut binder = Binder::new();
480        let _binding_context = binder.bind(&logical_plan)?;
481
482        // Optimize the plan
483        let optimizer = Optimizer::from_store(&self.store);
484        let optimized_plan = optimizer.optimize(logical_plan)?;
485
486        // Get transaction context for MVCC visibility
487        let (viewing_epoch, tx_id) = self.get_transaction_context();
488
489        // Convert to physical plan with transaction context
490        let planner = Planner::with_context(
491            Arc::clone(&self.store),
492            Arc::clone(&self.tx_manager),
493            tx_id,
494            viewing_epoch,
495        )
496        .with_factorized_execution(self.factorized_execution);
497        let mut physical_plan = planner.plan(&optimized_plan)?;
498
499        // Execute the plan
500        let executor = Executor::with_columns(physical_plan.columns.clone())
501            .with_deadline(self.query_deadline());
502        let result = executor.execute(physical_plan.operator.as_mut())?;
503        Ok(result)
504    }
505
506    /// Executes a GraphQL query with parameters.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if the query fails to parse or execute.
511    #[cfg(feature = "graphql")]
512    pub fn execute_graphql_with_params(
513        &self,
514        query: &str,
515        params: std::collections::HashMap<String, Value>,
516    ) -> Result<QueryResult> {
517        use crate::query::processor::{QueryLanguage, QueryProcessor};
518
519        // Get transaction context for MVCC visibility
520        let (viewing_epoch, tx_id) = self.get_transaction_context();
521
522        // Create processor with transaction context
523        let processor =
524            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
525
526        // Apply transaction context if in a transaction
527        let processor = if let Some(tx_id) = tx_id {
528            processor.with_tx_context(viewing_epoch, tx_id)
529        } else {
530            processor
531        };
532
533        processor.process(query, QueryLanguage::GraphQL, Some(&params))
534    }
535
536    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
537    ///
538    /// # Errors
539    ///
540    /// Returns an error if the query fails to parse or execute.
541    ///
542    /// # Examples
543    ///
544    /// ```no_run
545    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
546    /// use grafeo_engine::GrafeoDB;
547    ///
548    /// let db = GrafeoDB::new_in_memory();
549    /// let session = db.session();
550    ///
551    /// let result = session.execute_sql(
552    ///     "SELECT * FROM GRAPH_TABLE (
553    ///         MATCH (n:Person)
554    ///         COLUMNS (n.name AS name)
555    ///     )"
556    /// )?;
557    /// # Ok(())
558    /// # }
559    /// ```
560    #[cfg(feature = "sql-pgq")]
561    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
562        use crate::query::{
563            Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
564            plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
565        };
566
567        // Parse and translate (always needed to check for DDL)
568        let logical_plan = sql_pgq_translator::translate(query)?;
569
570        // Handle DDL statements directly (they don't go through the query pipeline)
571        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
572            return Ok(QueryResult {
573                columns: vec!["status".into()],
574                column_types: vec![grafeo_common::types::LogicalType::String],
575                rows: vec![vec![Value::from(format!(
576                    "Property graph '{}' created ({} node tables, {} edge tables)",
577                    cpg.name,
578                    cpg.node_tables.len(),
579                    cpg.edge_tables.len()
580                ))]],
581                execution_time_ms: None,
582                rows_scanned: None,
583            });
584        }
585
586        // Create cache key for query plans
587        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
588
589        // Try to get cached optimized plan
590        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
591            cached_plan
592        } else {
593            // Semantic validation
594            let mut binder = Binder::new();
595            let _binding_context = binder.bind(&logical_plan)?;
596
597            // Optimize the plan
598            let optimizer = Optimizer::from_store(&self.store);
599            let plan = optimizer.optimize(logical_plan)?;
600
601            // Cache the optimized plan
602            self.query_cache.put_optimized(cache_key, plan.clone());
603
604            plan
605        };
606
607        // Get transaction context for MVCC visibility
608        let (viewing_epoch, tx_id) = self.get_transaction_context();
609
610        // Convert to physical plan with transaction context
611        let planner = Planner::with_context(
612            Arc::clone(&self.store),
613            Arc::clone(&self.tx_manager),
614            tx_id,
615            viewing_epoch,
616        )
617        .with_factorized_execution(self.factorized_execution);
618        let mut physical_plan = planner.plan(&optimized_plan)?;
619
620        // Execute the plan
621        let executor = Executor::with_columns(physical_plan.columns.clone())
622            .with_deadline(self.query_deadline());
623        let result = executor.execute(physical_plan.operator.as_mut())?;
624        Ok(result)
625    }
626
627    /// Executes a SQL/PGQ query with parameters.
628    ///
629    /// # Errors
630    ///
631    /// Returns an error if the query fails to parse or execute.
632    #[cfg(feature = "sql-pgq")]
633    pub fn execute_sql_with_params(
634        &self,
635        query: &str,
636        params: std::collections::HashMap<String, Value>,
637    ) -> Result<QueryResult> {
638        use crate::query::processor::{QueryLanguage, QueryProcessor};
639
640        // Get transaction context for MVCC visibility
641        let (viewing_epoch, tx_id) = self.get_transaction_context();
642
643        // Create processor with transaction context
644        let processor =
645            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
646
647        // Apply transaction context if in a transaction
648        let processor = if let Some(tx_id) = tx_id {
649            processor.with_tx_context(viewing_epoch, tx_id)
650        } else {
651            processor
652        };
653
654        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
655    }
656
657    /// Executes a SPARQL query.
658    ///
659    /// # Errors
660    ///
661    /// Returns an error if the query fails to parse or execute.
662    #[cfg(all(feature = "sparql", feature = "rdf"))]
663    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
664        use crate::query::{
665            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
666        };
667
668        // Parse and translate the SPARQL query to a logical plan
669        let logical_plan = sparql_translator::translate(query)?;
670
671        // Optimize the plan
672        let optimizer = Optimizer::from_store(&self.store);
673        let optimized_plan = optimizer.optimize(logical_plan)?;
674
675        // Convert to physical plan using RDF planner
676        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
677        let mut physical_plan = planner.plan(&optimized_plan)?;
678
679        // Execute the plan
680        let executor = Executor::with_columns(physical_plan.columns.clone())
681            .with_deadline(self.query_deadline());
682        executor.execute(physical_plan.operator.as_mut())
683    }
684
685    /// Executes a SPARQL query with parameters.
686    ///
687    /// # Errors
688    ///
689    /// Returns an error if the query fails to parse or execute.
690    #[cfg(all(feature = "sparql", feature = "rdf"))]
691    pub fn execute_sparql_with_params(
692        &self,
693        query: &str,
694        _params: std::collections::HashMap<String, Value>,
695    ) -> Result<QueryResult> {
696        // TODO: Implement parameter substitution for SPARQL
697        // For now, just execute the query without parameters
698        self.execute_sparql(query)
699    }
700
701    /// Executes a query in the specified language by name.
702    ///
703    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
704    /// `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
705    ///
706    /// # Errors
707    ///
708    /// Returns an error if the language is unknown/disabled or the query fails.
709    pub fn execute_language(
710        &self,
711        query: &str,
712        language: &str,
713        params: Option<std::collections::HashMap<String, Value>>,
714    ) -> Result<QueryResult> {
715        match language {
716            "gql" => {
717                if let Some(p) = params {
718                    self.execute_with_params(query, p)
719                } else {
720                    self.execute(query)
721                }
722            }
723            #[cfg(feature = "cypher")]
724            "cypher" => {
725                if let Some(p) = params {
726                    use crate::query::processor::{QueryLanguage, QueryProcessor};
727                    let processor = QueryProcessor::for_lpg_with_tx(
728                        Arc::clone(&self.store),
729                        Arc::clone(&self.tx_manager),
730                    );
731                    let (viewing_epoch, tx_id) = self.get_transaction_context();
732                    let processor = if let Some(tx_id) = tx_id {
733                        processor.with_tx_context(viewing_epoch, tx_id)
734                    } else {
735                        processor
736                    };
737                    processor.process(query, QueryLanguage::Cypher, Some(&p))
738                } else {
739                    self.execute_cypher(query)
740                }
741            }
742            #[cfg(feature = "gremlin")]
743            "gremlin" => {
744                if let Some(p) = params {
745                    self.execute_gremlin_with_params(query, p)
746                } else {
747                    self.execute_gremlin(query)
748                }
749            }
750            #[cfg(feature = "graphql")]
751            "graphql" => {
752                if let Some(p) = params {
753                    self.execute_graphql_with_params(query, p)
754                } else {
755                    self.execute_graphql(query)
756                }
757            }
758            #[cfg(feature = "sql-pgq")]
759            "sql" | "sql-pgq" => {
760                if let Some(p) = params {
761                    self.execute_sql_with_params(query, p)
762                } else {
763                    self.execute_sql(query)
764                }
765            }
766            #[cfg(all(feature = "sparql", feature = "rdf"))]
767            "sparql" => {
768                if let Some(p) = params {
769                    self.execute_sparql_with_params(query, p)
770                } else {
771                    self.execute_sparql(query)
772                }
773            }
774            other => Err(grafeo_common::utils::error::Error::Query(
775                grafeo_common::utils::error::QueryError::new(
776                    grafeo_common::utils::error::QueryErrorKind::Semantic,
777                    format!("Unknown query language: '{other}'"),
778                ),
779            )),
780        }
781    }
782
783    /// Begins a new transaction.
784    ///
785    /// # Errors
786    ///
787    /// Returns an error if a transaction is already active.
788    ///
789    /// # Examples
790    ///
791    /// ```no_run
792    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
793    /// use grafeo_engine::GrafeoDB;
794    ///
795    /// let db = GrafeoDB::new_in_memory();
796    /// let mut session = db.session();
797    ///
798    /// session.begin_tx()?;
799    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
800    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
801    /// session.commit()?; // Both inserts committed atomically
802    /// # Ok(())
803    /// # }
804    /// ```
805    pub fn begin_tx(&mut self) -> Result<()> {
806        if self.current_tx.is_some() {
807            return Err(grafeo_common::utils::error::Error::Transaction(
808                grafeo_common::utils::error::TransactionError::InvalidState(
809                    "Transaction already active".to_string(),
810                ),
811            ));
812        }
813
814        let tx_id = self.tx_manager.begin();
815        self.current_tx = Some(tx_id);
816        Ok(())
817    }
818
819    /// Begins a transaction with a specific isolation level.
820    ///
821    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
822    ///
823    /// # Errors
824    ///
825    /// Returns an error if a transaction is already active.
826    pub fn begin_tx_with_isolation(
827        &mut self,
828        isolation_level: crate::transaction::IsolationLevel,
829    ) -> Result<()> {
830        if self.current_tx.is_some() {
831            return Err(grafeo_common::utils::error::Error::Transaction(
832                grafeo_common::utils::error::TransactionError::InvalidState(
833                    "Transaction already active".to_string(),
834                ),
835            ));
836        }
837
838        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
839        self.current_tx = Some(tx_id);
840        Ok(())
841    }
842
843    /// Commits the current transaction.
844    ///
845    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
846    ///
847    /// # Errors
848    ///
849    /// Returns an error if no transaction is active.
850    pub fn commit(&mut self) -> Result<()> {
851        let tx_id = self.current_tx.take().ok_or_else(|| {
852            grafeo_common::utils::error::Error::Transaction(
853                grafeo_common::utils::error::TransactionError::InvalidState(
854                    "No active transaction".to_string(),
855                ),
856            )
857        })?;
858
859        // Commit RDF store pending operations
860        #[cfg(feature = "rdf")]
861        self.rdf_store.commit_tx(tx_id);
862
863        self.tx_manager.commit(tx_id)?;
864
865        // Sync the LpgStore epoch with the TxManager so that
866        // convenience lookups (edge_type, get_edge, get_node) that use
867        // store.current_epoch() can see versions created at the latest epoch.
868        self.store.sync_epoch(self.tx_manager.current_epoch());
869
870        // Auto-GC: periodically prune old MVCC versions
871        if self.gc_interval > 0 {
872            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
873            if count.is_multiple_of(self.gc_interval) {
874                let min_epoch = self.tx_manager.min_active_epoch();
875                self.store.gc_versions(min_epoch);
876                self.tx_manager.gc();
877            }
878        }
879
880        Ok(())
881    }
882
883    /// Aborts the current transaction.
884    ///
885    /// Discards all changes since [`begin_tx`](Self::begin_tx).
886    ///
887    /// # Errors
888    ///
889    /// Returns an error if no transaction is active.
890    ///
891    /// # Examples
892    ///
893    /// ```no_run
894    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
895    /// use grafeo_engine::GrafeoDB;
896    ///
897    /// let db = GrafeoDB::new_in_memory();
898    /// let mut session = db.session();
899    ///
900    /// session.begin_tx()?;
901    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
902    /// session.rollback()?; // Insert is discarded
903    /// # Ok(())
904    /// # }
905    /// ```
906    pub fn rollback(&mut self) -> Result<()> {
907        let tx_id = self.current_tx.take().ok_or_else(|| {
908            grafeo_common::utils::error::Error::Transaction(
909                grafeo_common::utils::error::TransactionError::InvalidState(
910                    "No active transaction".to_string(),
911                ),
912            )
913        })?;
914
915        // Discard uncommitted versions in the LPG store
916        self.store.discard_uncommitted_versions(tx_id);
917
918        // Discard pending operations in the RDF store
919        #[cfg(feature = "rdf")]
920        self.rdf_store.rollback_tx(tx_id);
921
922        // Mark transaction as aborted in the manager
923        self.tx_manager.abort(tx_id)
924    }
925
926    /// Returns whether a transaction is active.
927    #[must_use]
928    pub fn in_transaction(&self) -> bool {
929        self.current_tx.is_some()
930    }
931
932    /// Sets auto-commit mode.
933    pub fn set_auto_commit(&mut self, auto_commit: bool) {
934        self.auto_commit = auto_commit;
935    }
936
937    /// Returns whether auto-commit is enabled.
938    #[must_use]
939    pub fn auto_commit(&self) -> bool {
940        self.auto_commit
941    }
942
943    /// Computes the wall-clock deadline for query execution.
944    #[must_use]
945    fn query_deadline(&self) -> Option<Instant> {
946        self.query_timeout.map(|d| Instant::now() + d)
947    }
948
949    /// Returns the current transaction context for MVCC visibility.
950    ///
951    /// Returns `(viewing_epoch, tx_id)` where:
952    /// - `viewing_epoch` is the epoch at which to check version visibility
953    /// - `tx_id` is the current transaction ID (if in a transaction)
954    #[must_use]
955    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
956        if let Some(tx_id) = self.current_tx {
957            // In a transaction - use the transaction's start epoch
958            let epoch = self
959                .tx_manager
960                .start_epoch(tx_id)
961                .unwrap_or_else(|| self.tx_manager.current_epoch());
962            (epoch, Some(tx_id))
963        } else {
964            // No transaction - use current epoch
965            (self.tx_manager.current_epoch(), None)
966        }
967    }
968
969    /// Creates a node directly (bypassing query execution).
970    ///
971    /// This is a low-level API for testing and direct manipulation.
972    /// If a transaction is active, the node will be versioned with the transaction ID.
973    pub fn create_node(&self, labels: &[&str]) -> NodeId {
974        let (epoch, tx_id) = self.get_transaction_context();
975        self.store
976            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
977    }
978
979    /// Creates a node with properties.
980    ///
981    /// If a transaction is active, the node will be versioned with the transaction ID.
982    pub fn create_node_with_props<'a>(
983        &self,
984        labels: &[&str],
985        properties: impl IntoIterator<Item = (&'a str, Value)>,
986    ) -> NodeId {
987        let (epoch, tx_id) = self.get_transaction_context();
988        self.store.create_node_with_props_versioned(
989            labels,
990            properties.into_iter().map(|(k, v)| (k, v)),
991            epoch,
992            tx_id.unwrap_or(TxId::SYSTEM),
993        )
994    }
995
996    /// Creates an edge between two nodes.
997    ///
998    /// This is a low-level API for testing and direct manipulation.
999    /// If a transaction is active, the edge will be versioned with the transaction ID.
1000    pub fn create_edge(
1001        &self,
1002        src: NodeId,
1003        dst: NodeId,
1004        edge_type: &str,
1005    ) -> grafeo_common::types::EdgeId {
1006        let (epoch, tx_id) = self.get_transaction_context();
1007        self.store
1008            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1009    }
1010
1011    // =========================================================================
1012    // Direct Lookup APIs (bypass query planning for O(1) point reads)
1013    // =========================================================================
1014
1015    /// Gets a node by ID directly, bypassing query planning.
1016    ///
1017    /// This is the fastest way to retrieve a single node when you know its ID.
1018    /// Skips parsing, binding, optimization, and physical planning entirely.
1019    ///
1020    /// # Performance
1021    ///
1022    /// - Time complexity: O(1) average case
1023    /// - No lock contention (uses DashMap internally)
1024    /// - ~20-30x faster than equivalent MATCH query
1025    ///
1026    /// # Example
1027    ///
1028    /// ```no_run
1029    /// # use grafeo_engine::GrafeoDB;
1030    /// # let db = GrafeoDB::new_in_memory();
1031    /// let session = db.session();
1032    /// let node_id = session.create_node(&["Person"]);
1033    ///
1034    /// // Direct lookup - O(1), no query planning
1035    /// let node = session.get_node(node_id);
1036    /// assert!(node.is_some());
1037    /// ```
1038    #[must_use]
1039    pub fn get_node(&self, id: NodeId) -> Option<Node> {
1040        let (epoch, tx_id) = self.get_transaction_context();
1041        self.store
1042            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1043    }
1044
1045    /// Gets a single property from a node by ID, bypassing query planning.
1046    ///
1047    /// More efficient than `get_node()` when you only need one property,
1048    /// as it avoids loading the full node with all properties.
1049    ///
1050    /// # Performance
1051    ///
1052    /// - Time complexity: O(1) average case
1053    /// - No query planning overhead
1054    ///
1055    /// # Example
1056    ///
1057    /// ```no_run
1058    /// # use grafeo_engine::GrafeoDB;
1059    /// # use grafeo_common::types::Value;
1060    /// # let db = GrafeoDB::new_in_memory();
1061    /// let session = db.session();
1062    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
1063    ///
1064    /// // Direct property access - O(1)
1065    /// let name = session.get_node_property(id, "name");
1066    /// assert_eq!(name, Some(Value::String("Alice".into())));
1067    /// ```
1068    #[must_use]
1069    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
1070        self.get_node(id)
1071            .and_then(|node| node.get_property(key).cloned())
1072    }
1073
1074    /// Gets an edge by ID directly, bypassing query planning.
1075    ///
1076    /// # Performance
1077    ///
1078    /// - Time complexity: O(1) average case
1079    /// - No lock contention
1080    #[must_use]
1081    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1082        let (epoch, tx_id) = self.get_transaction_context();
1083        self.store
1084            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1085    }
1086
1087    /// Gets outgoing neighbors of a node directly, bypassing query planning.
1088    ///
1089    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
1090    ///
1091    /// # Performance
1092    ///
1093    /// - Time complexity: O(degree) where degree is the number of outgoing edges
1094    /// - Uses adjacency index for direct access
1095    /// - ~10-20x faster than equivalent MATCH query
1096    ///
1097    /// # Example
1098    ///
1099    /// ```no_run
1100    /// # use grafeo_engine::GrafeoDB;
1101    /// # let db = GrafeoDB::new_in_memory();
1102    /// let session = db.session();
1103    /// let alice = session.create_node(&["Person"]);
1104    /// let bob = session.create_node(&["Person"]);
1105    /// session.create_edge(alice, bob, "KNOWS");
1106    ///
1107    /// // Direct neighbor lookup - O(degree)
1108    /// let neighbors = session.get_neighbors_outgoing(alice);
1109    /// assert_eq!(neighbors.len(), 1);
1110    /// assert_eq!(neighbors[0].0, bob);
1111    /// ```
1112    #[must_use]
1113    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1114        self.store.edges_from(node, Direction::Outgoing).collect()
1115    }
1116
1117    /// Gets incoming neighbors of a node directly, bypassing query planning.
1118    ///
1119    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
1120    ///
1121    /// # Performance
1122    ///
1123    /// - Time complexity: O(degree) where degree is the number of incoming edges
1124    /// - Uses backward adjacency index for direct access
1125    #[must_use]
1126    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1127        self.store.edges_from(node, Direction::Incoming).collect()
1128    }
1129
1130    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
1131    ///
1132    /// # Example
1133    ///
1134    /// ```no_run
1135    /// # use grafeo_engine::GrafeoDB;
1136    /// # let db = GrafeoDB::new_in_memory();
1137    /// # let session = db.session();
1138    /// # let alice = session.create_node(&["Person"]);
1139    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1140    /// ```
1141    #[must_use]
1142    pub fn get_neighbors_outgoing_by_type(
1143        &self,
1144        node: NodeId,
1145        edge_type: &str,
1146    ) -> Vec<(NodeId, EdgeId)> {
1147        self.store
1148            .edges_from(node, Direction::Outgoing)
1149            .filter(|(_, edge_id)| {
1150                self.get_edge(*edge_id)
1151                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
1152            })
1153            .collect()
1154    }
1155
1156    /// Checks if a node exists, bypassing query planning.
1157    ///
1158    /// # Performance
1159    ///
1160    /// - Time complexity: O(1)
1161    /// - Fastest existence check available
1162    #[must_use]
1163    pub fn node_exists(&self, id: NodeId) -> bool {
1164        self.get_node(id).is_some()
1165    }
1166
1167    /// Checks if an edge exists, bypassing query planning.
1168    #[must_use]
1169    pub fn edge_exists(&self, id: EdgeId) -> bool {
1170        self.get_edge(id).is_some()
1171    }
1172
1173    /// Gets the degree (number of edges) of a node.
1174    ///
1175    /// Returns (outgoing_degree, incoming_degree).
1176    #[must_use]
1177    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1178        let out = self.store.out_degree(node);
1179        let in_degree = self.store.in_degree(node);
1180        (out, in_degree)
1181    }
1182
1183    /// Batch lookup of multiple nodes by ID.
1184    ///
1185    /// More efficient than calling `get_node()` in a loop because it
1186    /// amortizes overhead.
1187    ///
1188    /// # Performance
1189    ///
1190    /// - Time complexity: O(n) where n is the number of IDs
1191    /// - Better cache utilization than individual lookups
1192    #[must_use]
1193    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1194        let (epoch, tx_id) = self.get_transaction_context();
1195        let tx = tx_id.unwrap_or(TxId::SYSTEM);
1196        ids.iter()
1197            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1198            .collect()
1199    }
1200
1201    // ── Change Data Capture ─────────────────────────────────────────────
1202
1203    /// Returns the full change history for an entity (node or edge).
1204    #[cfg(feature = "cdc")]
1205    pub fn history(
1206        &self,
1207        entity_id: impl Into<crate::cdc::EntityId>,
1208    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1209        Ok(self.cdc_log.history(entity_id.into()))
1210    }
1211
1212    /// Returns change events for an entity since the given epoch.
1213    #[cfg(feature = "cdc")]
1214    pub fn history_since(
1215        &self,
1216        entity_id: impl Into<crate::cdc::EntityId>,
1217        since_epoch: EpochId,
1218    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1219        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1220    }
1221
1222    /// Returns all change events across all entities in an epoch range.
1223    #[cfg(feature = "cdc")]
1224    pub fn changes_between(
1225        &self,
1226        start_epoch: EpochId,
1227        end_epoch: EpochId,
1228    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1229        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1230    }
1231}
1232
1233#[cfg(test)]
1234mod tests {
1235    use crate::database::GrafeoDB;
1236
1237    #[test]
1238    fn test_session_create_node() {
1239        let db = GrafeoDB::new_in_memory();
1240        let session = db.session();
1241
1242        let id = session.create_node(&["Person"]);
1243        assert!(id.is_valid());
1244        assert_eq!(db.node_count(), 1);
1245    }
1246
1247    #[test]
1248    fn test_session_transaction() {
1249        let db = GrafeoDB::new_in_memory();
1250        let mut session = db.session();
1251
1252        assert!(!session.in_transaction());
1253
1254        session.begin_tx().unwrap();
1255        assert!(session.in_transaction());
1256
1257        session.commit().unwrap();
1258        assert!(!session.in_transaction());
1259    }
1260
1261    #[test]
1262    fn test_session_transaction_context() {
1263        let db = GrafeoDB::new_in_memory();
1264        let mut session = db.session();
1265
1266        // Without transaction - context should have current epoch and no tx_id
1267        let (_epoch1, tx_id1) = session.get_transaction_context();
1268        assert!(tx_id1.is_none());
1269
1270        // Start a transaction
1271        session.begin_tx().unwrap();
1272        let (epoch2, tx_id2) = session.get_transaction_context();
1273        assert!(tx_id2.is_some());
1274        // Transaction should have a valid epoch
1275        let _ = epoch2; // Use the variable
1276
1277        // Commit and verify
1278        session.commit().unwrap();
1279        let (epoch3, tx_id3) = session.get_transaction_context();
1280        assert!(tx_id3.is_none());
1281        // Epoch should have advanced after commit
1282        assert!(epoch3.as_u64() >= epoch2.as_u64());
1283    }
1284
1285    #[test]
1286    fn test_session_rollback() {
1287        let db = GrafeoDB::new_in_memory();
1288        let mut session = db.session();
1289
1290        session.begin_tx().unwrap();
1291        session.rollback().unwrap();
1292        assert!(!session.in_transaction());
1293    }
1294
1295    #[test]
1296    fn test_session_rollback_discards_versions() {
1297        use grafeo_common::types::TxId;
1298
1299        let db = GrafeoDB::new_in_memory();
1300
1301        // Create a node outside of any transaction (at system level)
1302        let node_before = db.store().create_node(&["Person"]);
1303        assert!(node_before.is_valid());
1304        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1305
1306        // Start a transaction
1307        let mut session = db.session();
1308        session.begin_tx().unwrap();
1309        let tx_id = session.current_tx.unwrap();
1310
1311        // Create a node versioned with the transaction's ID
1312        let epoch = db.store().current_epoch();
1313        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1314        assert!(node_in_tx.is_valid());
1315
1316        // Should see 2 nodes at this point
1317        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1318
1319        // Rollback the transaction
1320        session.rollback().unwrap();
1321        assert!(!session.in_transaction());
1322
1323        // The node created in the transaction should be discarded
1324        // Only the first node should remain visible
1325        let count_after = db.node_count();
1326        assert_eq!(
1327            count_after, 1,
1328            "Rollback should discard uncommitted node, but got {count_after}"
1329        );
1330
1331        // The original node should still be accessible
1332        let current_epoch = db.store().current_epoch();
1333        assert!(
1334            db.store()
1335                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1336                .is_some(),
1337            "Original node should still exist"
1338        );
1339
1340        // The node created in the transaction should not be accessible
1341        assert!(
1342            db.store()
1343                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1344                .is_none(),
1345            "Transaction node should be gone"
1346        );
1347    }
1348
1349    #[test]
1350    fn test_session_create_node_in_transaction() {
1351        // Test that session.create_node() is transaction-aware
1352        let db = GrafeoDB::new_in_memory();
1353
1354        // Create a node outside of any transaction
1355        let node_before = db.create_node(&["Person"]);
1356        assert!(node_before.is_valid());
1357        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1358
1359        // Start a transaction and create a node through the session
1360        let mut session = db.session();
1361        session.begin_tx().unwrap();
1362
1363        // Create a node through session.create_node() - should be versioned with tx
1364        let node_in_tx = session.create_node(&["Person"]);
1365        assert!(node_in_tx.is_valid());
1366
1367        // Should see 2 nodes at this point
1368        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1369
1370        // Rollback the transaction
1371        session.rollback().unwrap();
1372
1373        // The node created via session.create_node() should be discarded
1374        let count_after = db.node_count();
1375        assert_eq!(
1376            count_after, 1,
1377            "Rollback should discard node created via session.create_node(), but got {count_after}"
1378        );
1379    }
1380
1381    #[test]
1382    fn test_session_create_node_with_props_in_transaction() {
1383        use grafeo_common::types::Value;
1384
1385        // Test that session.create_node_with_props() is transaction-aware
1386        let db = GrafeoDB::new_in_memory();
1387
1388        // Create a node outside of any transaction
1389        db.create_node(&["Person"]);
1390        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1391
1392        // Start a transaction and create a node with properties
1393        let mut session = db.session();
1394        session.begin_tx().unwrap();
1395
1396        let node_in_tx =
1397            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1398        assert!(node_in_tx.is_valid());
1399
1400        // Should see 2 nodes
1401        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1402
1403        // Rollback the transaction
1404        session.rollback().unwrap();
1405
1406        // The node should be discarded
1407        let count_after = db.node_count();
1408        assert_eq!(
1409            count_after, 1,
1410            "Rollback should discard node created via session.create_node_with_props()"
1411        );
1412    }
1413
1414    #[cfg(feature = "gql")]
1415    mod gql_tests {
1416        use super::*;
1417
1418        #[test]
1419        fn test_gql_query_execution() {
1420            let db = GrafeoDB::new_in_memory();
1421            let session = db.session();
1422
1423            // Create some test data
1424            session.create_node(&["Person"]);
1425            session.create_node(&["Person"]);
1426            session.create_node(&["Animal"]);
1427
1428            // Execute a GQL query
1429            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1430
1431            // Should return 2 Person nodes
1432            assert_eq!(result.row_count(), 2);
1433            assert_eq!(result.column_count(), 1);
1434            assert_eq!(result.columns[0], "n");
1435        }
1436
1437        #[test]
1438        fn test_gql_empty_result() {
1439            let db = GrafeoDB::new_in_memory();
1440            let session = db.session();
1441
1442            // No data in database
1443            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1444
1445            assert_eq!(result.row_count(), 0);
1446        }
1447
1448        #[test]
1449        fn test_gql_parse_error() {
1450            let db = GrafeoDB::new_in_memory();
1451            let session = db.session();
1452
1453            // Invalid GQL syntax
1454            let result = session.execute("MATCH (n RETURN n");
1455
1456            assert!(result.is_err());
1457        }
1458
1459        #[test]
1460        fn test_gql_relationship_traversal() {
1461            let db = GrafeoDB::new_in_memory();
1462            let session = db.session();
1463
1464            // Create a graph: Alice -> Bob, Alice -> Charlie
1465            let alice = session.create_node(&["Person"]);
1466            let bob = session.create_node(&["Person"]);
1467            let charlie = session.create_node(&["Person"]);
1468
1469            session.create_edge(alice, bob, "KNOWS");
1470            session.create_edge(alice, charlie, "KNOWS");
1471
1472            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1473            let result = session
1474                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1475                .unwrap();
1476
1477            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1478            assert_eq!(result.row_count(), 2);
1479            assert_eq!(result.column_count(), 2);
1480            assert_eq!(result.columns[0], "a");
1481            assert_eq!(result.columns[1], "b");
1482        }
1483
1484        #[test]
1485        fn test_gql_relationship_with_type_filter() {
1486            let db = GrafeoDB::new_in_memory();
1487            let session = db.session();
1488
1489            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1490            let alice = session.create_node(&["Person"]);
1491            let bob = session.create_node(&["Person"]);
1492            let charlie = session.create_node(&["Person"]);
1493
1494            session.create_edge(alice, bob, "KNOWS");
1495            session.create_edge(alice, charlie, "WORKS_WITH");
1496
1497            // Query only KNOWS relationships
1498            let result = session
1499                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1500                .unwrap();
1501
1502            // Should return only 1 row (Alice->Bob)
1503            assert_eq!(result.row_count(), 1);
1504        }
1505
1506        #[test]
1507        fn test_gql_semantic_error_undefined_variable() {
1508            let db = GrafeoDB::new_in_memory();
1509            let session = db.session();
1510
1511            // Reference undefined variable 'x' in RETURN
1512            let result = session.execute("MATCH (n:Person) RETURN x");
1513
1514            // Should fail with semantic error
1515            assert!(result.is_err());
1516            let Err(err) = result else {
1517                panic!("Expected error")
1518            };
1519            assert!(
1520                err.to_string().contains("Undefined variable"),
1521                "Expected undefined variable error, got: {}",
1522                err
1523            );
1524        }
1525
1526        #[test]
1527        fn test_gql_where_clause_property_filter() {
1528            use grafeo_common::types::Value;
1529
1530            let db = GrafeoDB::new_in_memory();
1531            let session = db.session();
1532
1533            // Create people with ages
1534            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1535            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1536            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1537
1538            // Query with WHERE clause: age > 30
1539            let result = session
1540                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1541                .unwrap();
1542
1543            // Should return 2 people (ages 35 and 45)
1544            assert_eq!(result.row_count(), 2);
1545        }
1546
1547        #[test]
1548        fn test_gql_where_clause_equality() {
1549            use grafeo_common::types::Value;
1550
1551            let db = GrafeoDB::new_in_memory();
1552            let session = db.session();
1553
1554            // Create people with names
1555            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1556            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1557            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1558
1559            // Query with WHERE clause: name = "Alice"
1560            let result = session
1561                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1562                .unwrap();
1563
1564            // Should return 2 people named Alice
1565            assert_eq!(result.row_count(), 2);
1566        }
1567
1568        #[test]
1569        fn test_gql_return_property_access() {
1570            use grafeo_common::types::Value;
1571
1572            let db = GrafeoDB::new_in_memory();
1573            let session = db.session();
1574
1575            // Create people with names and ages
1576            session.create_node_with_props(
1577                &["Person"],
1578                [
1579                    ("name", Value::String("Alice".into())),
1580                    ("age", Value::Int64(30)),
1581                ],
1582            );
1583            session.create_node_with_props(
1584                &["Person"],
1585                [
1586                    ("name", Value::String("Bob".into())),
1587                    ("age", Value::Int64(25)),
1588                ],
1589            );
1590
1591            // Query returning properties
1592            let result = session
1593                .execute("MATCH (n:Person) RETURN n.name, n.age")
1594                .unwrap();
1595
1596            // Should return 2 rows with name and age columns
1597            assert_eq!(result.row_count(), 2);
1598            assert_eq!(result.column_count(), 2);
1599            assert_eq!(result.columns[0], "n.name");
1600            assert_eq!(result.columns[1], "n.age");
1601
1602            // Check that we get actual values
1603            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1604            assert!(names.contains(&&Value::String("Alice".into())));
1605            assert!(names.contains(&&Value::String("Bob".into())));
1606        }
1607
1608        #[test]
1609        fn test_gql_return_mixed_expressions() {
1610            use grafeo_common::types::Value;
1611
1612            let db = GrafeoDB::new_in_memory();
1613            let session = db.session();
1614
1615            // Create a person
1616            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1617
1618            // Query returning both node and property
1619            let result = session
1620                .execute("MATCH (n:Person) RETURN n, n.name")
1621                .unwrap();
1622
1623            assert_eq!(result.row_count(), 1);
1624            assert_eq!(result.column_count(), 2);
1625            assert_eq!(result.columns[0], "n");
1626            assert_eq!(result.columns[1], "n.name");
1627
1628            // Second column should be the name
1629            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1630        }
1631    }
1632
1633    #[cfg(feature = "cypher")]
1634    mod cypher_tests {
1635        use super::*;
1636
1637        #[test]
1638        fn test_cypher_query_execution() {
1639            let db = GrafeoDB::new_in_memory();
1640            let session = db.session();
1641
1642            // Create some test data
1643            session.create_node(&["Person"]);
1644            session.create_node(&["Person"]);
1645            session.create_node(&["Animal"]);
1646
1647            // Execute a Cypher query
1648            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1649
1650            // Should return 2 Person nodes
1651            assert_eq!(result.row_count(), 2);
1652            assert_eq!(result.column_count(), 1);
1653            assert_eq!(result.columns[0], "n");
1654        }
1655
1656        #[test]
1657        fn test_cypher_empty_result() {
1658            let db = GrafeoDB::new_in_memory();
1659            let session = db.session();
1660
1661            // No data in database
1662            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1663
1664            assert_eq!(result.row_count(), 0);
1665        }
1666
1667        #[test]
1668        fn test_cypher_parse_error() {
1669            let db = GrafeoDB::new_in_memory();
1670            let session = db.session();
1671
1672            // Invalid Cypher syntax
1673            let result = session.execute_cypher("MATCH (n RETURN n");
1674
1675            assert!(result.is_err());
1676        }
1677    }
1678
1679    // ==================== Direct Lookup API Tests ====================
1680
1681    mod direct_lookup_tests {
1682        use super::*;
1683        use grafeo_common::types::Value;
1684
1685        #[test]
1686        fn test_get_node() {
1687            let db = GrafeoDB::new_in_memory();
1688            let session = db.session();
1689
1690            let id = session.create_node(&["Person"]);
1691            let node = session.get_node(id);
1692
1693            assert!(node.is_some());
1694            let node = node.unwrap();
1695            assert_eq!(node.id, id);
1696        }
1697
1698        #[test]
1699        fn test_get_node_not_found() {
1700            use grafeo_common::types::NodeId;
1701
1702            let db = GrafeoDB::new_in_memory();
1703            let session = db.session();
1704
1705            // Try to get a non-existent node
1706            let node = session.get_node(NodeId::new(9999));
1707            assert!(node.is_none());
1708        }
1709
1710        #[test]
1711        fn test_get_node_property() {
1712            let db = GrafeoDB::new_in_memory();
1713            let session = db.session();
1714
1715            let id = session
1716                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1717
1718            let name = session.get_node_property(id, "name");
1719            assert_eq!(name, Some(Value::String("Alice".into())));
1720
1721            // Non-existent property
1722            let missing = session.get_node_property(id, "missing");
1723            assert!(missing.is_none());
1724        }
1725
1726        #[test]
1727        fn test_get_edge() {
1728            let db = GrafeoDB::new_in_memory();
1729            let session = db.session();
1730
1731            let alice = session.create_node(&["Person"]);
1732            let bob = session.create_node(&["Person"]);
1733            let edge_id = session.create_edge(alice, bob, "KNOWS");
1734
1735            let edge = session.get_edge(edge_id);
1736            assert!(edge.is_some());
1737            let edge = edge.unwrap();
1738            assert_eq!(edge.id, edge_id);
1739            assert_eq!(edge.src, alice);
1740            assert_eq!(edge.dst, bob);
1741        }
1742
1743        #[test]
1744        fn test_get_edge_not_found() {
1745            use grafeo_common::types::EdgeId;
1746
1747            let db = GrafeoDB::new_in_memory();
1748            let session = db.session();
1749
1750            let edge = session.get_edge(EdgeId::new(9999));
1751            assert!(edge.is_none());
1752        }
1753
1754        #[test]
1755        fn test_get_neighbors_outgoing() {
1756            let db = GrafeoDB::new_in_memory();
1757            let session = db.session();
1758
1759            let alice = session.create_node(&["Person"]);
1760            let bob = session.create_node(&["Person"]);
1761            let carol = session.create_node(&["Person"]);
1762
1763            session.create_edge(alice, bob, "KNOWS");
1764            session.create_edge(alice, carol, "KNOWS");
1765
1766            let neighbors = session.get_neighbors_outgoing(alice);
1767            assert_eq!(neighbors.len(), 2);
1768
1769            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1770            assert!(neighbor_ids.contains(&bob));
1771            assert!(neighbor_ids.contains(&carol));
1772        }
1773
1774        #[test]
1775        fn test_get_neighbors_incoming() {
1776            let db = GrafeoDB::new_in_memory();
1777            let session = db.session();
1778
1779            let alice = session.create_node(&["Person"]);
1780            let bob = session.create_node(&["Person"]);
1781            let carol = session.create_node(&["Person"]);
1782
1783            session.create_edge(bob, alice, "KNOWS");
1784            session.create_edge(carol, alice, "KNOWS");
1785
1786            let neighbors = session.get_neighbors_incoming(alice);
1787            assert_eq!(neighbors.len(), 2);
1788
1789            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1790            assert!(neighbor_ids.contains(&bob));
1791            assert!(neighbor_ids.contains(&carol));
1792        }
1793
1794        #[test]
1795        fn test_get_neighbors_outgoing_by_type() {
1796            let db = GrafeoDB::new_in_memory();
1797            let session = db.session();
1798
1799            let alice = session.create_node(&["Person"]);
1800            let bob = session.create_node(&["Person"]);
1801            let company = session.create_node(&["Company"]);
1802
1803            session.create_edge(alice, bob, "KNOWS");
1804            session.create_edge(alice, company, "WORKS_AT");
1805
1806            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1807            assert_eq!(knows_neighbors.len(), 1);
1808            assert_eq!(knows_neighbors[0].0, bob);
1809
1810            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1811            assert_eq!(works_neighbors.len(), 1);
1812            assert_eq!(works_neighbors[0].0, company);
1813
1814            // No edges of this type
1815            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1816            assert!(no_neighbors.is_empty());
1817        }
1818
1819        #[test]
1820        fn test_node_exists() {
1821            use grafeo_common::types::NodeId;
1822
1823            let db = GrafeoDB::new_in_memory();
1824            let session = db.session();
1825
1826            let id = session.create_node(&["Person"]);
1827
1828            assert!(session.node_exists(id));
1829            assert!(!session.node_exists(NodeId::new(9999)));
1830        }
1831
1832        #[test]
1833        fn test_edge_exists() {
1834            use grafeo_common::types::EdgeId;
1835
1836            let db = GrafeoDB::new_in_memory();
1837            let session = db.session();
1838
1839            let alice = session.create_node(&["Person"]);
1840            let bob = session.create_node(&["Person"]);
1841            let edge_id = session.create_edge(alice, bob, "KNOWS");
1842
1843            assert!(session.edge_exists(edge_id));
1844            assert!(!session.edge_exists(EdgeId::new(9999)));
1845        }
1846
1847        #[test]
1848        fn test_get_degree() {
1849            let db = GrafeoDB::new_in_memory();
1850            let session = db.session();
1851
1852            let alice = session.create_node(&["Person"]);
1853            let bob = session.create_node(&["Person"]);
1854            let carol = session.create_node(&["Person"]);
1855
1856            // Alice knows Bob and Carol (2 outgoing)
1857            session.create_edge(alice, bob, "KNOWS");
1858            session.create_edge(alice, carol, "KNOWS");
1859            // Bob knows Alice (1 incoming for Alice)
1860            session.create_edge(bob, alice, "KNOWS");
1861
1862            let (out_degree, in_degree) = session.get_degree(alice);
1863            assert_eq!(out_degree, 2);
1864            assert_eq!(in_degree, 1);
1865
1866            // Node with no edges
1867            let lonely = session.create_node(&["Person"]);
1868            let (out, in_deg) = session.get_degree(lonely);
1869            assert_eq!(out, 0);
1870            assert_eq!(in_deg, 0);
1871        }
1872
1873        #[test]
1874        fn test_get_nodes_batch() {
1875            let db = GrafeoDB::new_in_memory();
1876            let session = db.session();
1877
1878            let alice = session.create_node(&["Person"]);
1879            let bob = session.create_node(&["Person"]);
1880            let carol = session.create_node(&["Person"]);
1881
1882            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1883            assert_eq!(nodes.len(), 3);
1884            assert!(nodes[0].is_some());
1885            assert!(nodes[1].is_some());
1886            assert!(nodes[2].is_some());
1887
1888            // With non-existent node
1889            use grafeo_common::types::NodeId;
1890            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1891            assert_eq!(nodes_with_missing.len(), 3);
1892            assert!(nodes_with_missing[0].is_some());
1893            assert!(nodes_with_missing[1].is_none()); // Missing node
1894            assert!(nodes_with_missing[2].is_some());
1895        }
1896
1897        #[test]
1898        fn test_auto_commit_setting() {
1899            let db = GrafeoDB::new_in_memory();
1900            let mut session = db.session();
1901
1902            // Default is auto-commit enabled
1903            assert!(session.auto_commit());
1904
1905            session.set_auto_commit(false);
1906            assert!(!session.auto_commit());
1907
1908            session.set_auto_commit(true);
1909            assert!(session.auto_commit());
1910        }
1911
1912        #[test]
1913        fn test_transaction_double_begin_error() {
1914            let db = GrafeoDB::new_in_memory();
1915            let mut session = db.session();
1916
1917            session.begin_tx().unwrap();
1918            let result = session.begin_tx();
1919
1920            assert!(result.is_err());
1921            // Clean up
1922            session.rollback().unwrap();
1923        }
1924
1925        #[test]
1926        fn test_commit_without_transaction_error() {
1927            let db = GrafeoDB::new_in_memory();
1928            let mut session = db.session();
1929
1930            let result = session.commit();
1931            assert!(result.is_err());
1932        }
1933
1934        #[test]
1935        fn test_rollback_without_transaction_error() {
1936            let db = GrafeoDB::new_in_memory();
1937            let mut session = db.session();
1938
1939            let result = session.rollback();
1940            assert!(result.is_err());
1941        }
1942
1943        #[test]
1944        fn test_create_edge_in_transaction() {
1945            let db = GrafeoDB::new_in_memory();
1946            let mut session = db.session();
1947
1948            // Create nodes outside transaction
1949            let alice = session.create_node(&["Person"]);
1950            let bob = session.create_node(&["Person"]);
1951
1952            // Create edge in transaction
1953            session.begin_tx().unwrap();
1954            let edge_id = session.create_edge(alice, bob, "KNOWS");
1955
1956            // Edge should be visible in the transaction
1957            assert!(session.edge_exists(edge_id));
1958
1959            // Commit
1960            session.commit().unwrap();
1961
1962            // Edge should still be visible
1963            assert!(session.edge_exists(edge_id));
1964        }
1965
1966        #[test]
1967        fn test_neighbors_empty_node() {
1968            let db = GrafeoDB::new_in_memory();
1969            let session = db.session();
1970
1971            let lonely = session.create_node(&["Person"]);
1972
1973            assert!(session.get_neighbors_outgoing(lonely).is_empty());
1974            assert!(session.get_neighbors_incoming(lonely).is_empty());
1975            assert!(
1976                session
1977                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1978                    .is_empty()
1979            );
1980        }
1981    }
1982}