Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::config::{AdaptiveConfig, GraphModel};
20use crate::database::QueryResult;
21use crate::query::cache::QueryCache;
22use crate::transaction::TransactionManager;
23
24/// Your handle to the database - execute queries and manage transactions.
25///
26/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
27/// tracks its own transaction state, so you can have multiple concurrent
28/// sessions without them interfering.
29pub struct Session {
30    /// The underlying store.
31    store: Arc<LpgStore>,
32    /// Graph store trait object for pluggable storage backends.
33    graph_store: Arc<dyn GraphStoreMut>,
34    /// RDF triple store (if RDF feature is enabled).
35    #[cfg(feature = "rdf")]
36    rdf_store: Arc<RdfStore>,
37    /// Transaction manager.
38    tx_manager: Arc<TransactionManager>,
39    /// Query cache shared across sessions.
40    query_cache: Arc<QueryCache>,
41    /// Current transaction ID (if any).
42    current_tx: Option<TxId>,
43    /// Whether the session is in auto-commit mode.
44    auto_commit: bool,
45    /// Adaptive execution configuration.
46    #[allow(dead_code)]
47    adaptive_config: AdaptiveConfig,
48    /// Whether to use factorized execution for multi-hop queries.
49    factorized_execution: bool,
50    /// The graph data model this session operates on.
51    graph_model: GraphModel,
52    /// Maximum time a query may run before being cancelled.
53    query_timeout: Option<Duration>,
54    /// Shared commit counter for triggering auto-GC.
55    commit_counter: Arc<AtomicUsize>,
56    /// GC every N commits (0 = disabled).
57    gc_interval: usize,
58    /// Node count at the start of the current transaction (for PreparedCommit stats).
59    tx_start_node_count: usize,
60    /// Edge count at the start of the current transaction (for PreparedCommit stats).
61    tx_start_edge_count: usize,
62    /// CDC log for change tracking.
63    #[cfg(feature = "cdc")]
64    cdc_log: Arc<crate::cdc::CdcLog>,
65}
66
67impl Session {
68    /// Creates a new session with adaptive execution configuration.
69    #[allow(dead_code, clippy::too_many_arguments)]
70    pub(crate) fn with_adaptive(
71        store: Arc<LpgStore>,
72        tx_manager: Arc<TransactionManager>,
73        query_cache: Arc<QueryCache>,
74        adaptive_config: AdaptiveConfig,
75        factorized_execution: bool,
76        graph_model: GraphModel,
77        query_timeout: Option<Duration>,
78        commit_counter: Arc<AtomicUsize>,
79        gc_interval: usize,
80    ) -> Self {
81        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
82        Self {
83            store,
84            graph_store,
85            #[cfg(feature = "rdf")]
86            rdf_store: Arc::new(RdfStore::new()),
87            tx_manager,
88            query_cache,
89            current_tx: None,
90            auto_commit: true,
91            adaptive_config,
92            factorized_execution,
93            graph_model,
94            query_timeout,
95            commit_counter,
96            gc_interval,
97            tx_start_node_count: 0,
98            tx_start_edge_count: 0,
99            #[cfg(feature = "cdc")]
100            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
101        }
102    }
103
104    /// Sets the CDC log for this session (shared with the database).
105    #[cfg(feature = "cdc")]
106    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
107        self.cdc_log = cdc_log;
108    }
109
110    /// Creates a new session with RDF store and adaptive configuration.
111    #[cfg(feature = "rdf")]
112    #[allow(clippy::too_many_arguments)]
113    pub(crate) fn with_rdf_store_and_adaptive(
114        store: Arc<LpgStore>,
115        rdf_store: Arc<RdfStore>,
116        tx_manager: Arc<TransactionManager>,
117        query_cache: Arc<QueryCache>,
118        adaptive_config: AdaptiveConfig,
119        factorized_execution: bool,
120        graph_model: GraphModel,
121        query_timeout: Option<Duration>,
122        commit_counter: Arc<AtomicUsize>,
123        gc_interval: usize,
124    ) -> Self {
125        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
126        Self {
127            store,
128            graph_store,
129            rdf_store,
130            tx_manager,
131            query_cache,
132            current_tx: None,
133            auto_commit: true,
134            adaptive_config,
135            factorized_execution,
136            graph_model,
137            query_timeout,
138            commit_counter,
139            gc_interval,
140            tx_start_node_count: 0,
141            tx_start_edge_count: 0,
142            #[cfg(feature = "cdc")]
143            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
144        }
145    }
146
147    /// Creates a session backed by an external graph store.
148    ///
149    /// The external store handles all data operations. Transaction management
150    /// (begin/commit/rollback) is not supported for external stores.
151    #[allow(clippy::too_many_arguments)]
152    pub(crate) fn with_external_store(
153        store: Arc<dyn GraphStoreMut>,
154        tx_manager: Arc<TransactionManager>,
155        query_cache: Arc<QueryCache>,
156        adaptive_config: AdaptiveConfig,
157        factorized_execution: bool,
158        graph_model: GraphModel,
159        query_timeout: Option<Duration>,
160        commit_counter: Arc<AtomicUsize>,
161        gc_interval: usize,
162    ) -> Self {
163        Self {
164            store: Arc::new(LpgStore::new()), // dummy for LpgStore-specific ops
165            graph_store: store,
166            #[cfg(feature = "rdf")]
167            rdf_store: Arc::new(RdfStore::new()),
168            tx_manager,
169            query_cache,
170            current_tx: None,
171            auto_commit: true,
172            adaptive_config,
173            factorized_execution,
174            graph_model,
175            query_timeout,
176            commit_counter,
177            gc_interval,
178            tx_start_node_count: 0,
179            tx_start_edge_count: 0,
180            #[cfg(feature = "cdc")]
181            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
182        }
183    }
184
185    /// Returns the graph model this session operates on.
186    #[must_use]
187    pub fn graph_model(&self) -> GraphModel {
188        self.graph_model
189    }
190
191    /// Checks that the session's graph model supports LPG operations.
192    fn require_lpg(&self, language: &str) -> Result<()> {
193        if self.graph_model == GraphModel::Rdf {
194            return Err(grafeo_common::utils::error::Error::Internal(format!(
195                "This is an RDF database. {language} queries require an LPG database."
196            )));
197        }
198        Ok(())
199    }
200
201    /// Executes a GQL query.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the query fails to parse or execute.
206    ///
207    /// # Examples
208    ///
209    /// ```no_run
210    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
211    /// use grafeo_engine::GrafeoDB;
212    ///
213    /// let db = GrafeoDB::new_in_memory();
214    /// let session = db.session();
215    ///
216    /// // Create a node
217    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
218    ///
219    /// // Query nodes
220    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
221    /// for row in &result.rows {
222    ///     println!("{:?}", row);
223    /// }
224    /// # Ok(())
225    /// # }
226    /// ```
227    #[cfg(feature = "gql")]
228    pub fn execute(&self, query: &str) -> Result<QueryResult> {
229        self.require_lpg("GQL")?;
230
231        use crate::query::{
232            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
233            optimizer::Optimizer, processor::QueryLanguage,
234        };
235
236        let start_time = std::time::Instant::now();
237
238        // Create cache key for this query
239        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
240
241        // Try to get cached optimized plan
242        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
243            // Cache hit - skip parsing, translation, binding, and optimization
244            cached_plan
245        } else {
246            // Cache miss - run full pipeline
247
248            // Parse and translate the query to a logical plan
249            let logical_plan = gql_translator::translate(query)?;
250
251            // Semantic validation
252            let mut binder = Binder::new();
253            let _binding_context = binder.bind(&logical_plan)?;
254
255            // Optimize the plan
256            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
257            let plan = optimizer.optimize(logical_plan)?;
258
259            // Cache the optimized plan for future use
260            self.query_cache.put_optimized(cache_key, plan.clone());
261
262            plan
263        };
264
265        // Get transaction context for MVCC visibility
266        let (viewing_epoch, tx_id) = self.get_transaction_context();
267
268        // Convert to physical plan with transaction context
269        // (Physical planning cannot be cached as it depends on transaction state)
270        let planner = Planner::with_context(
271            Arc::clone(&self.graph_store),
272            Arc::clone(&self.tx_manager),
273            tx_id,
274            viewing_epoch,
275        )
276        .with_factorized_execution(self.factorized_execution);
277        let mut physical_plan = planner.plan(&optimized_plan)?;
278
279        // Execute the plan
280        let executor = Executor::with_columns(physical_plan.columns.clone())
281            .with_deadline(self.query_deadline());
282        let mut result = executor.execute(physical_plan.operator.as_mut())?;
283
284        // Add execution metrics
285        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
286        let rows_scanned = result.rows.len() as u64;
287        result.execution_time_ms = Some(elapsed_ms);
288        result.rows_scanned = Some(rows_scanned);
289
290        Ok(result)
291    }
292
293    /// Executes a GQL query with parameters.
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if the query fails to parse or execute.
298    #[cfg(feature = "gql")]
299    pub fn execute_with_params(
300        &self,
301        query: &str,
302        params: std::collections::HashMap<String, Value>,
303    ) -> Result<QueryResult> {
304        self.require_lpg("GQL")?;
305
306        use crate::query::processor::{QueryLanguage, QueryProcessor};
307
308        // Get transaction context for MVCC visibility
309        let (viewing_epoch, tx_id) = self.get_transaction_context();
310
311        // Create processor with transaction context
312        let processor = QueryProcessor::for_graph_store_with_tx(
313            Arc::clone(&self.graph_store),
314            Arc::clone(&self.tx_manager),
315        );
316
317        // Apply transaction context if in a transaction
318        let processor = if let Some(tx_id) = tx_id {
319            processor.with_tx_context(viewing_epoch, tx_id)
320        } else {
321            processor
322        };
323
324        processor.process(query, QueryLanguage::Gql, Some(&params))
325    }
326
327    /// Executes a GQL query with parameters.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if no query language is enabled.
332    #[cfg(not(any(feature = "gql", feature = "cypher")))]
333    pub fn execute_with_params(
334        &self,
335        _query: &str,
336        _params: std::collections::HashMap<String, Value>,
337    ) -> Result<QueryResult> {
338        Err(grafeo_common::utils::error::Error::Internal(
339            "No query language enabled".to_string(),
340        ))
341    }
342
343    /// Executes a GQL query.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if no query language is enabled.
348    #[cfg(not(any(feature = "gql", feature = "cypher")))]
349    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
350        Err(grafeo_common::utils::error::Error::Internal(
351            "No query language enabled".to_string(),
352        ))
353    }
354
355    /// Executes a Cypher query.
356    ///
357    /// # Errors
358    ///
359    /// Returns an error if the query fails to parse or execute.
360    #[cfg(feature = "cypher")]
361    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
362        use crate::query::{
363            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
364            optimizer::Optimizer, processor::QueryLanguage,
365        };
366
367        // Create cache key for this query
368        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
369
370        // Try to get cached optimized plan
371        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
372            cached_plan
373        } else {
374            // Parse and translate the query to a logical plan
375            let logical_plan = cypher_translator::translate(query)?;
376
377            // Semantic validation
378            let mut binder = Binder::new();
379            let _binding_context = binder.bind(&logical_plan)?;
380
381            // Optimize the plan
382            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
383            let plan = optimizer.optimize(logical_plan)?;
384
385            // Cache the optimized plan
386            self.query_cache.put_optimized(cache_key, plan.clone());
387
388            plan
389        };
390
391        // Get transaction context for MVCC visibility
392        let (viewing_epoch, tx_id) = self.get_transaction_context();
393
394        // Convert to physical plan with transaction context
395        let planner = Planner::with_context(
396            Arc::clone(&self.graph_store),
397            Arc::clone(&self.tx_manager),
398            tx_id,
399            viewing_epoch,
400        )
401        .with_factorized_execution(self.factorized_execution);
402        let mut physical_plan = planner.plan(&optimized_plan)?;
403
404        // Execute the plan
405        let executor = Executor::with_columns(physical_plan.columns.clone())
406            .with_deadline(self.query_deadline());
407        let result = executor.execute(physical_plan.operator.as_mut())?;
408        Ok(result)
409    }
410
411    /// Executes a Gremlin query.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if the query fails to parse or execute.
416    ///
417    /// # Examples
418    ///
419    /// ```no_run
420    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
421    /// use grafeo_engine::GrafeoDB;
422    ///
423    /// let db = GrafeoDB::new_in_memory();
424    /// let session = db.session();
425    ///
426    /// // Create some nodes first
427    /// session.create_node(&["Person"]);
428    ///
429    /// // Query using Gremlin
430    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
431    /// # Ok(())
432    /// # }
433    /// ```
434    #[cfg(feature = "gremlin")]
435    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
436        use crate::query::{
437            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
438        };
439
440        // Parse and translate the query to a logical plan
441        let logical_plan = gremlin_translator::translate(query)?;
442
443        // Semantic validation
444        let mut binder = Binder::new();
445        let _binding_context = binder.bind(&logical_plan)?;
446
447        // Optimize the plan
448        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
449        let optimized_plan = optimizer.optimize(logical_plan)?;
450
451        // Get transaction context for MVCC visibility
452        let (viewing_epoch, tx_id) = self.get_transaction_context();
453
454        // Convert to physical plan with transaction context
455        let planner = Planner::with_context(
456            Arc::clone(&self.graph_store),
457            Arc::clone(&self.tx_manager),
458            tx_id,
459            viewing_epoch,
460        )
461        .with_factorized_execution(self.factorized_execution);
462        let mut physical_plan = planner.plan(&optimized_plan)?;
463
464        // Execute the plan
465        let executor = Executor::with_columns(physical_plan.columns.clone())
466            .with_deadline(self.query_deadline());
467        let result = executor.execute(physical_plan.operator.as_mut())?;
468        Ok(result)
469    }
470
471    /// Executes a Gremlin query with parameters.
472    ///
473    /// # Errors
474    ///
475    /// Returns an error if the query fails to parse or execute.
476    #[cfg(feature = "gremlin")]
477    pub fn execute_gremlin_with_params(
478        &self,
479        query: &str,
480        params: std::collections::HashMap<String, Value>,
481    ) -> Result<QueryResult> {
482        use crate::query::processor::{QueryLanguage, QueryProcessor};
483
484        // Get transaction context for MVCC visibility
485        let (viewing_epoch, tx_id) = self.get_transaction_context();
486
487        // Create processor with transaction context
488        let processor = QueryProcessor::for_graph_store_with_tx(
489            Arc::clone(&self.graph_store),
490            Arc::clone(&self.tx_manager),
491        );
492
493        // Apply transaction context if in a transaction
494        let processor = if let Some(tx_id) = tx_id {
495            processor.with_tx_context(viewing_epoch, tx_id)
496        } else {
497            processor
498        };
499
500        processor.process(query, QueryLanguage::Gremlin, Some(&params))
501    }
502
503    /// Executes a GraphQL query against the LPG store.
504    ///
505    /// # Errors
506    ///
507    /// Returns an error if the query fails to parse or execute.
508    ///
509    /// # Examples
510    ///
511    /// ```no_run
512    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
513    /// use grafeo_engine::GrafeoDB;
514    ///
515    /// let db = GrafeoDB::new_in_memory();
516    /// let session = db.session();
517    ///
518    /// // Create some nodes first
519    /// session.create_node(&["User"]);
520    ///
521    /// // Query using GraphQL
522    /// let result = session.execute_graphql("query { user { id name } }")?;
523    /// # Ok(())
524    /// # }
525    /// ```
526    #[cfg(feature = "graphql")]
527    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
528        use crate::query::{
529            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
530        };
531
532        // Parse and translate the query to a logical plan
533        let logical_plan = graphql_translator::translate(query)?;
534
535        // Semantic validation
536        let mut binder = Binder::new();
537        let _binding_context = binder.bind(&logical_plan)?;
538
539        // Optimize the plan
540        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
541        let optimized_plan = optimizer.optimize(logical_plan)?;
542
543        // Get transaction context for MVCC visibility
544        let (viewing_epoch, tx_id) = self.get_transaction_context();
545
546        // Convert to physical plan with transaction context
547        let planner = Planner::with_context(
548            Arc::clone(&self.graph_store),
549            Arc::clone(&self.tx_manager),
550            tx_id,
551            viewing_epoch,
552        )
553        .with_factorized_execution(self.factorized_execution);
554        let mut physical_plan = planner.plan(&optimized_plan)?;
555
556        // Execute the plan
557        let executor = Executor::with_columns(physical_plan.columns.clone())
558            .with_deadline(self.query_deadline());
559        let result = executor.execute(physical_plan.operator.as_mut())?;
560        Ok(result)
561    }
562
563    /// Executes a GraphQL query with parameters.
564    ///
565    /// # Errors
566    ///
567    /// Returns an error if the query fails to parse or execute.
568    #[cfg(feature = "graphql")]
569    pub fn execute_graphql_with_params(
570        &self,
571        query: &str,
572        params: std::collections::HashMap<String, Value>,
573    ) -> Result<QueryResult> {
574        use crate::query::processor::{QueryLanguage, QueryProcessor};
575
576        // Get transaction context for MVCC visibility
577        let (viewing_epoch, tx_id) = self.get_transaction_context();
578
579        // Create processor with transaction context
580        let processor = QueryProcessor::for_graph_store_with_tx(
581            Arc::clone(&self.graph_store),
582            Arc::clone(&self.tx_manager),
583        );
584
585        // Apply transaction context if in a transaction
586        let processor = if let Some(tx_id) = tx_id {
587            processor.with_tx_context(viewing_epoch, tx_id)
588        } else {
589            processor
590        };
591
592        processor.process(query, QueryLanguage::GraphQL, Some(&params))
593    }
594
595    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
596    ///
597    /// # Errors
598    ///
599    /// Returns an error if the query fails to parse or execute.
600    ///
601    /// # Examples
602    ///
603    /// ```no_run
604    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
605    /// use grafeo_engine::GrafeoDB;
606    ///
607    /// let db = GrafeoDB::new_in_memory();
608    /// let session = db.session();
609    ///
610    /// let result = session.execute_sql(
611    ///     "SELECT * FROM GRAPH_TABLE (
612    ///         MATCH (n:Person)
613    ///         COLUMNS (n.name AS name)
614    ///     )"
615    /// )?;
616    /// # Ok(())
617    /// # }
618    /// ```
619    #[cfg(feature = "sql-pgq")]
620    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
621        use crate::query::{
622            Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
623            plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
624        };
625
626        // Parse and translate (always needed to check for DDL)
627        let logical_plan = sql_pgq_translator::translate(query)?;
628
629        // Handle DDL statements directly (they don't go through the query pipeline)
630        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
631            return Ok(QueryResult {
632                columns: vec!["status".into()],
633                column_types: vec![grafeo_common::types::LogicalType::String],
634                rows: vec![vec![Value::from(format!(
635                    "Property graph '{}' created ({} node tables, {} edge tables)",
636                    cpg.name,
637                    cpg.node_tables.len(),
638                    cpg.edge_tables.len()
639                ))]],
640                execution_time_ms: None,
641                rows_scanned: None,
642            });
643        }
644
645        // Create cache key for query plans
646        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
647
648        // Try to get cached optimized plan
649        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
650            cached_plan
651        } else {
652            // Semantic validation
653            let mut binder = Binder::new();
654            let _binding_context = binder.bind(&logical_plan)?;
655
656            // Optimize the plan
657            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
658            let plan = optimizer.optimize(logical_plan)?;
659
660            // Cache the optimized plan
661            self.query_cache.put_optimized(cache_key, plan.clone());
662
663            plan
664        };
665
666        // Get transaction context for MVCC visibility
667        let (viewing_epoch, tx_id) = self.get_transaction_context();
668
669        // Convert to physical plan with transaction context
670        let planner = Planner::with_context(
671            Arc::clone(&self.graph_store),
672            Arc::clone(&self.tx_manager),
673            tx_id,
674            viewing_epoch,
675        )
676        .with_factorized_execution(self.factorized_execution);
677        let mut physical_plan = planner.plan(&optimized_plan)?;
678
679        // Execute the plan
680        let executor = Executor::with_columns(physical_plan.columns.clone())
681            .with_deadline(self.query_deadline());
682        let result = executor.execute(physical_plan.operator.as_mut())?;
683        Ok(result)
684    }
685
686    /// Executes a SQL/PGQ query with parameters.
687    ///
688    /// # Errors
689    ///
690    /// Returns an error if the query fails to parse or execute.
691    #[cfg(feature = "sql-pgq")]
692    pub fn execute_sql_with_params(
693        &self,
694        query: &str,
695        params: std::collections::HashMap<String, Value>,
696    ) -> Result<QueryResult> {
697        use crate::query::processor::{QueryLanguage, QueryProcessor};
698
699        // Get transaction context for MVCC visibility
700        let (viewing_epoch, tx_id) = self.get_transaction_context();
701
702        // Create processor with transaction context
703        let processor = QueryProcessor::for_graph_store_with_tx(
704            Arc::clone(&self.graph_store),
705            Arc::clone(&self.tx_manager),
706        );
707
708        // Apply transaction context if in a transaction
709        let processor = if let Some(tx_id) = tx_id {
710            processor.with_tx_context(viewing_epoch, tx_id)
711        } else {
712            processor
713        };
714
715        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
716    }
717
718    /// Executes a SPARQL query.
719    ///
720    /// # Errors
721    ///
722    /// Returns an error if the query fails to parse or execute.
723    #[cfg(all(feature = "sparql", feature = "rdf"))]
724    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
725        use crate::query::{
726            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
727        };
728
729        // Parse and translate the SPARQL query to a logical plan
730        let logical_plan = sparql_translator::translate(query)?;
731
732        // Optimize the plan
733        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
734        let optimized_plan = optimizer.optimize(logical_plan)?;
735
736        // Convert to physical plan using RDF planner
737        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
738        let mut physical_plan = planner.plan(&optimized_plan)?;
739
740        // Execute the plan
741        let executor = Executor::with_columns(physical_plan.columns.clone())
742            .with_deadline(self.query_deadline());
743        executor.execute(physical_plan.operator.as_mut())
744    }
745
746    /// Executes a SPARQL query with parameters.
747    ///
748    /// # Errors
749    ///
750    /// Returns an error if the query fails to parse or execute.
751    #[cfg(all(feature = "sparql", feature = "rdf"))]
752    pub fn execute_sparql_with_params(
753        &self,
754        query: &str,
755        _params: std::collections::HashMap<String, Value>,
756    ) -> Result<QueryResult> {
757        // TODO: Implement parameter substitution for SPARQL
758        // For now, just execute the query without parameters
759        self.execute_sparql(query)
760    }
761
762    /// Executes a query in the specified language by name.
763    ///
764    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
765    /// `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
766    ///
767    /// # Errors
768    ///
769    /// Returns an error if the language is unknown/disabled or the query fails.
770    pub fn execute_language(
771        &self,
772        query: &str,
773        language: &str,
774        params: Option<std::collections::HashMap<String, Value>>,
775    ) -> Result<QueryResult> {
776        match language {
777            "gql" => {
778                if let Some(p) = params {
779                    self.execute_with_params(query, p)
780                } else {
781                    self.execute(query)
782                }
783            }
784            #[cfg(feature = "cypher")]
785            "cypher" => {
786                if let Some(p) = params {
787                    use crate::query::processor::{QueryLanguage, QueryProcessor};
788                    let processor = QueryProcessor::for_graph_store_with_tx(
789                        Arc::clone(&self.graph_store),
790                        Arc::clone(&self.tx_manager),
791                    );
792                    let (viewing_epoch, tx_id) = self.get_transaction_context();
793                    let processor = if let Some(tx_id) = tx_id {
794                        processor.with_tx_context(viewing_epoch, tx_id)
795                    } else {
796                        processor
797                    };
798                    processor.process(query, QueryLanguage::Cypher, Some(&p))
799                } else {
800                    self.execute_cypher(query)
801                }
802            }
803            #[cfg(feature = "gremlin")]
804            "gremlin" => {
805                if let Some(p) = params {
806                    self.execute_gremlin_with_params(query, p)
807                } else {
808                    self.execute_gremlin(query)
809                }
810            }
811            #[cfg(feature = "graphql")]
812            "graphql" => {
813                if let Some(p) = params {
814                    self.execute_graphql_with_params(query, p)
815                } else {
816                    self.execute_graphql(query)
817                }
818            }
819            #[cfg(feature = "sql-pgq")]
820            "sql" | "sql-pgq" => {
821                if let Some(p) = params {
822                    self.execute_sql_with_params(query, p)
823                } else {
824                    self.execute_sql(query)
825                }
826            }
827            #[cfg(all(feature = "sparql", feature = "rdf"))]
828            "sparql" => {
829                if let Some(p) = params {
830                    self.execute_sparql_with_params(query, p)
831                } else {
832                    self.execute_sparql(query)
833                }
834            }
835            other => Err(grafeo_common::utils::error::Error::Query(
836                grafeo_common::utils::error::QueryError::new(
837                    grafeo_common::utils::error::QueryErrorKind::Semantic,
838                    format!("Unknown query language: '{other}'"),
839                ),
840            )),
841        }
842    }
843
844    /// Begins a new transaction.
845    ///
846    /// # Errors
847    ///
848    /// Returns an error if a transaction is already active.
849    ///
850    /// # Examples
851    ///
852    /// ```no_run
853    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
854    /// use grafeo_engine::GrafeoDB;
855    ///
856    /// let db = GrafeoDB::new_in_memory();
857    /// let mut session = db.session();
858    ///
859    /// session.begin_tx()?;
860    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
861    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
862    /// session.commit()?; // Both inserts committed atomically
863    /// # Ok(())
864    /// # }
865    /// ```
866    pub fn begin_tx(&mut self) -> Result<()> {
867        if self.current_tx.is_some() {
868            return Err(grafeo_common::utils::error::Error::Transaction(
869                grafeo_common::utils::error::TransactionError::InvalidState(
870                    "Transaction already active".to_string(),
871                ),
872            ));
873        }
874
875        self.tx_start_node_count = self.store.node_count();
876        self.tx_start_edge_count = self.store.edge_count();
877        let tx_id = self.tx_manager.begin();
878        self.current_tx = Some(tx_id);
879        Ok(())
880    }
881
882    /// Begins a transaction with a specific isolation level.
883    ///
884    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
885    ///
886    /// # Errors
887    ///
888    /// Returns an error if a transaction is already active.
889    pub fn begin_tx_with_isolation(
890        &mut self,
891        isolation_level: crate::transaction::IsolationLevel,
892    ) -> Result<()> {
893        if self.current_tx.is_some() {
894            return Err(grafeo_common::utils::error::Error::Transaction(
895                grafeo_common::utils::error::TransactionError::InvalidState(
896                    "Transaction already active".to_string(),
897                ),
898            ));
899        }
900
901        self.tx_start_node_count = self.store.node_count();
902        self.tx_start_edge_count = self.store.edge_count();
903        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
904        self.current_tx = Some(tx_id);
905        Ok(())
906    }
907
908    /// Commits the current transaction.
909    ///
910    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
911    ///
912    /// # Errors
913    ///
914    /// Returns an error if no transaction is active.
915    pub fn commit(&mut self) -> Result<()> {
916        let tx_id = self.current_tx.take().ok_or_else(|| {
917            grafeo_common::utils::error::Error::Transaction(
918                grafeo_common::utils::error::TransactionError::InvalidState(
919                    "No active transaction".to_string(),
920                ),
921            )
922        })?;
923
924        // Commit RDF store pending operations
925        #[cfg(feature = "rdf")]
926        self.rdf_store.commit_tx(tx_id);
927
928        self.tx_manager.commit(tx_id)?;
929
930        // Sync the LpgStore epoch with the TxManager so that
931        // convenience lookups (edge_type, get_edge, get_node) that use
932        // store.current_epoch() can see versions created at the latest epoch.
933        self.store.sync_epoch(self.tx_manager.current_epoch());
934
935        // Auto-GC: periodically prune old MVCC versions
936        if self.gc_interval > 0 {
937            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
938            if count.is_multiple_of(self.gc_interval) {
939                let min_epoch = self.tx_manager.min_active_epoch();
940                self.store.gc_versions(min_epoch);
941                self.tx_manager.gc();
942            }
943        }
944
945        Ok(())
946    }
947
948    /// Aborts the current transaction.
949    ///
950    /// Discards all changes since [`begin_tx`](Self::begin_tx).
951    ///
952    /// # Errors
953    ///
954    /// Returns an error if no transaction is active.
955    ///
956    /// # Examples
957    ///
958    /// ```no_run
959    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
960    /// use grafeo_engine::GrafeoDB;
961    ///
962    /// let db = GrafeoDB::new_in_memory();
963    /// let mut session = db.session();
964    ///
965    /// session.begin_tx()?;
966    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
967    /// session.rollback()?; // Insert is discarded
968    /// # Ok(())
969    /// # }
970    /// ```
971    pub fn rollback(&mut self) -> Result<()> {
972        let tx_id = self.current_tx.take().ok_or_else(|| {
973            grafeo_common::utils::error::Error::Transaction(
974                grafeo_common::utils::error::TransactionError::InvalidState(
975                    "No active transaction".to_string(),
976                ),
977            )
978        })?;
979
980        // Discard uncommitted versions in the LPG store
981        self.store.discard_uncommitted_versions(tx_id);
982
983        // Discard pending operations in the RDF store
984        #[cfg(feature = "rdf")]
985        self.rdf_store.rollback_tx(tx_id);
986
987        // Mark transaction as aborted in the manager
988        self.tx_manager.abort(tx_id)
989    }
990
991    /// Returns whether a transaction is active.
992    #[must_use]
993    pub fn in_transaction(&self) -> bool {
994        self.current_tx.is_some()
995    }
996
997    /// Returns the current transaction ID, if any.
998    #[must_use]
999    pub(crate) fn current_tx_id(&self) -> Option<TxId> {
1000        self.current_tx
1001    }
1002
1003    /// Returns a reference to the transaction manager.
1004    #[must_use]
1005    pub(crate) fn tx_manager(&self) -> &TransactionManager {
1006        &self.tx_manager
1007    }
1008
1009    /// Returns the store's current node count and the count at transaction start.
1010    #[must_use]
1011    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
1012        (self.tx_start_node_count, self.store.node_count())
1013    }
1014
1015    /// Returns the store's current edge count and the count at transaction start.
1016    #[must_use]
1017    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
1018        (self.tx_start_edge_count, self.store.edge_count())
1019    }
1020
1021    /// Prepares the current transaction for a two-phase commit.
1022    ///
1023    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
1024    /// lets you inspect pending changes and attach metadata before finalizing.
1025    /// The mutable borrow prevents concurrent operations while the commit is
1026    /// pending.
1027    ///
1028    /// If the `PreparedCommit` is dropped without calling `commit()` or
1029    /// `abort()`, the transaction is automatically rolled back.
1030    ///
1031    /// # Errors
1032    ///
1033    /// Returns an error if no transaction is active.
1034    ///
1035    /// # Examples
1036    ///
1037    /// ```no_run
1038    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1039    /// use grafeo_engine::GrafeoDB;
1040    ///
1041    /// let db = GrafeoDB::new_in_memory();
1042    /// let mut session = db.session();
1043    ///
1044    /// session.begin_tx()?;
1045    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
1046    ///
1047    /// let mut prepared = session.prepare_commit()?;
1048    /// println!("Nodes written: {}", prepared.info().nodes_written);
1049    /// prepared.set_metadata("audit_user", "admin");
1050    /// prepared.commit()?;
1051    /// # Ok(())
1052    /// # }
1053    /// ```
1054    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
1055        crate::transaction::PreparedCommit::new(self)
1056    }
1057
1058    /// Sets auto-commit mode.
1059    pub fn set_auto_commit(&mut self, auto_commit: bool) {
1060        self.auto_commit = auto_commit;
1061    }
1062
1063    /// Returns whether auto-commit is enabled.
1064    #[must_use]
1065    pub fn auto_commit(&self) -> bool {
1066        self.auto_commit
1067    }
1068
1069    /// Computes the wall-clock deadline for query execution.
1070    #[must_use]
1071    fn query_deadline(&self) -> Option<Instant> {
1072        self.query_timeout.map(|d| Instant::now() + d)
1073    }
1074
1075    /// Returns the current transaction context for MVCC visibility.
1076    ///
1077    /// Returns `(viewing_epoch, tx_id)` where:
1078    /// - `viewing_epoch` is the epoch at which to check version visibility
1079    /// - `tx_id` is the current transaction ID (if in a transaction)
1080    #[must_use]
1081    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
1082        if let Some(tx_id) = self.current_tx {
1083            // In a transaction - use the transaction's start epoch
1084            let epoch = self
1085                .tx_manager
1086                .start_epoch(tx_id)
1087                .unwrap_or_else(|| self.tx_manager.current_epoch());
1088            (epoch, Some(tx_id))
1089        } else {
1090            // No transaction - use current epoch
1091            (self.tx_manager.current_epoch(), None)
1092        }
1093    }
1094
1095    /// Creates a node directly (bypassing query execution).
1096    ///
1097    /// This is a low-level API for testing and direct manipulation.
1098    /// If a transaction is active, the node will be versioned with the transaction ID.
1099    pub fn create_node(&self, labels: &[&str]) -> NodeId {
1100        let (epoch, tx_id) = self.get_transaction_context();
1101        self.store
1102            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1103    }
1104
1105    /// Creates a node with properties.
1106    ///
1107    /// If a transaction is active, the node will be versioned with the transaction ID.
1108    pub fn create_node_with_props<'a>(
1109        &self,
1110        labels: &[&str],
1111        properties: impl IntoIterator<Item = (&'a str, Value)>,
1112    ) -> NodeId {
1113        let (epoch, tx_id) = self.get_transaction_context();
1114        self.store.create_node_with_props_versioned(
1115            labels,
1116            properties.into_iter().map(|(k, v)| (k, v)),
1117            epoch,
1118            tx_id.unwrap_or(TxId::SYSTEM),
1119        )
1120    }
1121
1122    /// Creates an edge between two nodes.
1123    ///
1124    /// This is a low-level API for testing and direct manipulation.
1125    /// If a transaction is active, the edge will be versioned with the transaction ID.
1126    pub fn create_edge(
1127        &self,
1128        src: NodeId,
1129        dst: NodeId,
1130        edge_type: &str,
1131    ) -> grafeo_common::types::EdgeId {
1132        let (epoch, tx_id) = self.get_transaction_context();
1133        self.store
1134            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1135    }
1136
1137    // =========================================================================
1138    // Direct Lookup APIs (bypass query planning for O(1) point reads)
1139    // =========================================================================
1140
1141    /// Gets a node by ID directly, bypassing query planning.
1142    ///
1143    /// This is the fastest way to retrieve a single node when you know its ID.
1144    /// Skips parsing, binding, optimization, and physical planning entirely.
1145    ///
1146    /// # Performance
1147    ///
1148    /// - Time complexity: O(1) average case
1149    /// - No lock contention (uses DashMap internally)
1150    /// - ~20-30x faster than equivalent MATCH query
1151    ///
1152    /// # Example
1153    ///
1154    /// ```no_run
1155    /// # use grafeo_engine::GrafeoDB;
1156    /// # let db = GrafeoDB::new_in_memory();
1157    /// let session = db.session();
1158    /// let node_id = session.create_node(&["Person"]);
1159    ///
1160    /// // Direct lookup - O(1), no query planning
1161    /// let node = session.get_node(node_id);
1162    /// assert!(node.is_some());
1163    /// ```
1164    #[must_use]
1165    pub fn get_node(&self, id: NodeId) -> Option<Node> {
1166        let (epoch, tx_id) = self.get_transaction_context();
1167        self.store
1168            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1169    }
1170
1171    /// Gets a single property from a node by ID, bypassing query planning.
1172    ///
1173    /// More efficient than `get_node()` when you only need one property,
1174    /// as it avoids loading the full node with all properties.
1175    ///
1176    /// # Performance
1177    ///
1178    /// - Time complexity: O(1) average case
1179    /// - No query planning overhead
1180    ///
1181    /// # Example
1182    ///
1183    /// ```no_run
1184    /// # use grafeo_engine::GrafeoDB;
1185    /// # use grafeo_common::types::Value;
1186    /// # let db = GrafeoDB::new_in_memory();
1187    /// let session = db.session();
1188    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
1189    ///
1190    /// // Direct property access - O(1)
1191    /// let name = session.get_node_property(id, "name");
1192    /// assert_eq!(name, Some(Value::String("Alice".into())));
1193    /// ```
1194    #[must_use]
1195    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
1196        self.get_node(id)
1197            .and_then(|node| node.get_property(key).cloned())
1198    }
1199
1200    /// Gets an edge by ID directly, bypassing query planning.
1201    ///
1202    /// # Performance
1203    ///
1204    /// - Time complexity: O(1) average case
1205    /// - No lock contention
1206    #[must_use]
1207    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1208        let (epoch, tx_id) = self.get_transaction_context();
1209        self.store
1210            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1211    }
1212
1213    /// Gets outgoing neighbors of a node directly, bypassing query planning.
1214    ///
1215    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
1216    ///
1217    /// # Performance
1218    ///
1219    /// - Time complexity: O(degree) where degree is the number of outgoing edges
1220    /// - Uses adjacency index for direct access
1221    /// - ~10-20x faster than equivalent MATCH query
1222    ///
1223    /// # Example
1224    ///
1225    /// ```no_run
1226    /// # use grafeo_engine::GrafeoDB;
1227    /// # let db = GrafeoDB::new_in_memory();
1228    /// let session = db.session();
1229    /// let alice = session.create_node(&["Person"]);
1230    /// let bob = session.create_node(&["Person"]);
1231    /// session.create_edge(alice, bob, "KNOWS");
1232    ///
1233    /// // Direct neighbor lookup - O(degree)
1234    /// let neighbors = session.get_neighbors_outgoing(alice);
1235    /// assert_eq!(neighbors.len(), 1);
1236    /// assert_eq!(neighbors[0].0, bob);
1237    /// ```
1238    #[must_use]
1239    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1240        self.store.edges_from(node, Direction::Outgoing).collect()
1241    }
1242
1243    /// Gets incoming neighbors of a node directly, bypassing query planning.
1244    ///
1245    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
1246    ///
1247    /// # Performance
1248    ///
1249    /// - Time complexity: O(degree) where degree is the number of incoming edges
1250    /// - Uses backward adjacency index for direct access
1251    #[must_use]
1252    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1253        self.store.edges_from(node, Direction::Incoming).collect()
1254    }
1255
1256    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
1257    ///
1258    /// # Example
1259    ///
1260    /// ```no_run
1261    /// # use grafeo_engine::GrafeoDB;
1262    /// # let db = GrafeoDB::new_in_memory();
1263    /// # let session = db.session();
1264    /// # let alice = session.create_node(&["Person"]);
1265    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1266    /// ```
1267    #[must_use]
1268    pub fn get_neighbors_outgoing_by_type(
1269        &self,
1270        node: NodeId,
1271        edge_type: &str,
1272    ) -> Vec<(NodeId, EdgeId)> {
1273        self.store
1274            .edges_from(node, Direction::Outgoing)
1275            .filter(|(_, edge_id)| {
1276                self.get_edge(*edge_id)
1277                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
1278            })
1279            .collect()
1280    }
1281
1282    /// Checks if a node exists, bypassing query planning.
1283    ///
1284    /// # Performance
1285    ///
1286    /// - Time complexity: O(1)
1287    /// - Fastest existence check available
1288    #[must_use]
1289    pub fn node_exists(&self, id: NodeId) -> bool {
1290        self.get_node(id).is_some()
1291    }
1292
1293    /// Checks if an edge exists, bypassing query planning.
1294    #[must_use]
1295    pub fn edge_exists(&self, id: EdgeId) -> bool {
1296        self.get_edge(id).is_some()
1297    }
1298
1299    /// Gets the degree (number of edges) of a node.
1300    ///
1301    /// Returns (outgoing_degree, incoming_degree).
1302    #[must_use]
1303    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1304        let out = self.store.out_degree(node);
1305        let in_degree = self.store.in_degree(node);
1306        (out, in_degree)
1307    }
1308
1309    /// Batch lookup of multiple nodes by ID.
1310    ///
1311    /// More efficient than calling `get_node()` in a loop because it
1312    /// amortizes overhead.
1313    ///
1314    /// # Performance
1315    ///
1316    /// - Time complexity: O(n) where n is the number of IDs
1317    /// - Better cache utilization than individual lookups
1318    #[must_use]
1319    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1320        let (epoch, tx_id) = self.get_transaction_context();
1321        let tx = tx_id.unwrap_or(TxId::SYSTEM);
1322        ids.iter()
1323            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1324            .collect()
1325    }
1326
1327    // ── Change Data Capture ─────────────────────────────────────────────
1328
1329    /// Returns the full change history for an entity (node or edge).
1330    #[cfg(feature = "cdc")]
1331    pub fn history(
1332        &self,
1333        entity_id: impl Into<crate::cdc::EntityId>,
1334    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1335        Ok(self.cdc_log.history(entity_id.into()))
1336    }
1337
1338    /// Returns change events for an entity since the given epoch.
1339    #[cfg(feature = "cdc")]
1340    pub fn history_since(
1341        &self,
1342        entity_id: impl Into<crate::cdc::EntityId>,
1343        since_epoch: EpochId,
1344    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1345        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1346    }
1347
1348    /// Returns all change events across all entities in an epoch range.
1349    #[cfg(feature = "cdc")]
1350    pub fn changes_between(
1351        &self,
1352        start_epoch: EpochId,
1353        end_epoch: EpochId,
1354    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1355        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1356    }
1357}
1358
1359#[cfg(test)]
1360mod tests {
1361    use crate::database::GrafeoDB;
1362
1363    #[test]
1364    fn test_session_create_node() {
1365        let db = GrafeoDB::new_in_memory();
1366        let session = db.session();
1367
1368        let id = session.create_node(&["Person"]);
1369        assert!(id.is_valid());
1370        assert_eq!(db.node_count(), 1);
1371    }
1372
1373    #[test]
1374    fn test_session_transaction() {
1375        let db = GrafeoDB::new_in_memory();
1376        let mut session = db.session();
1377
1378        assert!(!session.in_transaction());
1379
1380        session.begin_tx().unwrap();
1381        assert!(session.in_transaction());
1382
1383        session.commit().unwrap();
1384        assert!(!session.in_transaction());
1385    }
1386
1387    #[test]
1388    fn test_session_transaction_context() {
1389        let db = GrafeoDB::new_in_memory();
1390        let mut session = db.session();
1391
1392        // Without transaction - context should have current epoch and no tx_id
1393        let (_epoch1, tx_id1) = session.get_transaction_context();
1394        assert!(tx_id1.is_none());
1395
1396        // Start a transaction
1397        session.begin_tx().unwrap();
1398        let (epoch2, tx_id2) = session.get_transaction_context();
1399        assert!(tx_id2.is_some());
1400        // Transaction should have a valid epoch
1401        let _ = epoch2; // Use the variable
1402
1403        // Commit and verify
1404        session.commit().unwrap();
1405        let (epoch3, tx_id3) = session.get_transaction_context();
1406        assert!(tx_id3.is_none());
1407        // Epoch should have advanced after commit
1408        assert!(epoch3.as_u64() >= epoch2.as_u64());
1409    }
1410
1411    #[test]
1412    fn test_session_rollback() {
1413        let db = GrafeoDB::new_in_memory();
1414        let mut session = db.session();
1415
1416        session.begin_tx().unwrap();
1417        session.rollback().unwrap();
1418        assert!(!session.in_transaction());
1419    }
1420
1421    #[test]
1422    fn test_session_rollback_discards_versions() {
1423        use grafeo_common::types::TxId;
1424
1425        let db = GrafeoDB::new_in_memory();
1426
1427        // Create a node outside of any transaction (at system level)
1428        let node_before = db.store().create_node(&["Person"]);
1429        assert!(node_before.is_valid());
1430        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1431
1432        // Start a transaction
1433        let mut session = db.session();
1434        session.begin_tx().unwrap();
1435        let tx_id = session.current_tx.unwrap();
1436
1437        // Create a node versioned with the transaction's ID
1438        let epoch = db.store().current_epoch();
1439        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1440        assert!(node_in_tx.is_valid());
1441
1442        // Should see 2 nodes at this point
1443        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1444
1445        // Rollback the transaction
1446        session.rollback().unwrap();
1447        assert!(!session.in_transaction());
1448
1449        // The node created in the transaction should be discarded
1450        // Only the first node should remain visible
1451        let count_after = db.node_count();
1452        assert_eq!(
1453            count_after, 1,
1454            "Rollback should discard uncommitted node, but got {count_after}"
1455        );
1456
1457        // The original node should still be accessible
1458        let current_epoch = db.store().current_epoch();
1459        assert!(
1460            db.store()
1461                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1462                .is_some(),
1463            "Original node should still exist"
1464        );
1465
1466        // The node created in the transaction should not be accessible
1467        assert!(
1468            db.store()
1469                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1470                .is_none(),
1471            "Transaction node should be gone"
1472        );
1473    }
1474
1475    #[test]
1476    fn test_session_create_node_in_transaction() {
1477        // Test that session.create_node() is transaction-aware
1478        let db = GrafeoDB::new_in_memory();
1479
1480        // Create a node outside of any transaction
1481        let node_before = db.create_node(&["Person"]);
1482        assert!(node_before.is_valid());
1483        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1484
1485        // Start a transaction and create a node through the session
1486        let mut session = db.session();
1487        session.begin_tx().unwrap();
1488
1489        // Create a node through session.create_node() - should be versioned with tx
1490        let node_in_tx = session.create_node(&["Person"]);
1491        assert!(node_in_tx.is_valid());
1492
1493        // Should see 2 nodes at this point
1494        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1495
1496        // Rollback the transaction
1497        session.rollback().unwrap();
1498
1499        // The node created via session.create_node() should be discarded
1500        let count_after = db.node_count();
1501        assert_eq!(
1502            count_after, 1,
1503            "Rollback should discard node created via session.create_node(), but got {count_after}"
1504        );
1505    }
1506
1507    #[test]
1508    fn test_session_create_node_with_props_in_transaction() {
1509        use grafeo_common::types::Value;
1510
1511        // Test that session.create_node_with_props() is transaction-aware
1512        let db = GrafeoDB::new_in_memory();
1513
1514        // Create a node outside of any transaction
1515        db.create_node(&["Person"]);
1516        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1517
1518        // Start a transaction and create a node with properties
1519        let mut session = db.session();
1520        session.begin_tx().unwrap();
1521
1522        let node_in_tx =
1523            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1524        assert!(node_in_tx.is_valid());
1525
1526        // Should see 2 nodes
1527        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1528
1529        // Rollback the transaction
1530        session.rollback().unwrap();
1531
1532        // The node should be discarded
1533        let count_after = db.node_count();
1534        assert_eq!(
1535            count_after, 1,
1536            "Rollback should discard node created via session.create_node_with_props()"
1537        );
1538    }
1539
1540    #[cfg(feature = "gql")]
1541    mod gql_tests {
1542        use super::*;
1543
1544        #[test]
1545        fn test_gql_query_execution() {
1546            let db = GrafeoDB::new_in_memory();
1547            let session = db.session();
1548
1549            // Create some test data
1550            session.create_node(&["Person"]);
1551            session.create_node(&["Person"]);
1552            session.create_node(&["Animal"]);
1553
1554            // Execute a GQL query
1555            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1556
1557            // Should return 2 Person nodes
1558            assert_eq!(result.row_count(), 2);
1559            assert_eq!(result.column_count(), 1);
1560            assert_eq!(result.columns[0], "n");
1561        }
1562
1563        #[test]
1564        fn test_gql_empty_result() {
1565            let db = GrafeoDB::new_in_memory();
1566            let session = db.session();
1567
1568            // No data in database
1569            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1570
1571            assert_eq!(result.row_count(), 0);
1572        }
1573
1574        #[test]
1575        fn test_gql_parse_error() {
1576            let db = GrafeoDB::new_in_memory();
1577            let session = db.session();
1578
1579            // Invalid GQL syntax
1580            let result = session.execute("MATCH (n RETURN n");
1581
1582            assert!(result.is_err());
1583        }
1584
1585        #[test]
1586        fn test_gql_relationship_traversal() {
1587            let db = GrafeoDB::new_in_memory();
1588            let session = db.session();
1589
1590            // Create a graph: Alice -> Bob, Alice -> Charlie
1591            let alice = session.create_node(&["Person"]);
1592            let bob = session.create_node(&["Person"]);
1593            let charlie = session.create_node(&["Person"]);
1594
1595            session.create_edge(alice, bob, "KNOWS");
1596            session.create_edge(alice, charlie, "KNOWS");
1597
1598            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1599            let result = session
1600                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1601                .unwrap();
1602
1603            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1604            assert_eq!(result.row_count(), 2);
1605            assert_eq!(result.column_count(), 2);
1606            assert_eq!(result.columns[0], "a");
1607            assert_eq!(result.columns[1], "b");
1608        }
1609
1610        #[test]
1611        fn test_gql_relationship_with_type_filter() {
1612            let db = GrafeoDB::new_in_memory();
1613            let session = db.session();
1614
1615            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1616            let alice = session.create_node(&["Person"]);
1617            let bob = session.create_node(&["Person"]);
1618            let charlie = session.create_node(&["Person"]);
1619
1620            session.create_edge(alice, bob, "KNOWS");
1621            session.create_edge(alice, charlie, "WORKS_WITH");
1622
1623            // Query only KNOWS relationships
1624            let result = session
1625                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1626                .unwrap();
1627
1628            // Should return only 1 row (Alice->Bob)
1629            assert_eq!(result.row_count(), 1);
1630        }
1631
1632        #[test]
1633        fn test_gql_semantic_error_undefined_variable() {
1634            let db = GrafeoDB::new_in_memory();
1635            let session = db.session();
1636
1637            // Reference undefined variable 'x' in RETURN
1638            let result = session.execute("MATCH (n:Person) RETURN x");
1639
1640            // Should fail with semantic error
1641            assert!(result.is_err());
1642            let Err(err) = result else {
1643                panic!("Expected error")
1644            };
1645            assert!(
1646                err.to_string().contains("Undefined variable"),
1647                "Expected undefined variable error, got: {}",
1648                err
1649            );
1650        }
1651
1652        #[test]
1653        fn test_gql_where_clause_property_filter() {
1654            use grafeo_common::types::Value;
1655
1656            let db = GrafeoDB::new_in_memory();
1657            let session = db.session();
1658
1659            // Create people with ages
1660            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1661            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1662            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1663
1664            // Query with WHERE clause: age > 30
1665            let result = session
1666                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1667                .unwrap();
1668
1669            // Should return 2 people (ages 35 and 45)
1670            assert_eq!(result.row_count(), 2);
1671        }
1672
1673        #[test]
1674        fn test_gql_where_clause_equality() {
1675            use grafeo_common::types::Value;
1676
1677            let db = GrafeoDB::new_in_memory();
1678            let session = db.session();
1679
1680            // Create people with names
1681            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1682            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1683            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1684
1685            // Query with WHERE clause: name = "Alice"
1686            let result = session
1687                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1688                .unwrap();
1689
1690            // Should return 2 people named Alice
1691            assert_eq!(result.row_count(), 2);
1692        }
1693
1694        #[test]
1695        fn test_gql_return_property_access() {
1696            use grafeo_common::types::Value;
1697
1698            let db = GrafeoDB::new_in_memory();
1699            let session = db.session();
1700
1701            // Create people with names and ages
1702            session.create_node_with_props(
1703                &["Person"],
1704                [
1705                    ("name", Value::String("Alice".into())),
1706                    ("age", Value::Int64(30)),
1707                ],
1708            );
1709            session.create_node_with_props(
1710                &["Person"],
1711                [
1712                    ("name", Value::String("Bob".into())),
1713                    ("age", Value::Int64(25)),
1714                ],
1715            );
1716
1717            // Query returning properties
1718            let result = session
1719                .execute("MATCH (n:Person) RETURN n.name, n.age")
1720                .unwrap();
1721
1722            // Should return 2 rows with name and age columns
1723            assert_eq!(result.row_count(), 2);
1724            assert_eq!(result.column_count(), 2);
1725            assert_eq!(result.columns[0], "n.name");
1726            assert_eq!(result.columns[1], "n.age");
1727
1728            // Check that we get actual values
1729            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1730            assert!(names.contains(&&Value::String("Alice".into())));
1731            assert!(names.contains(&&Value::String("Bob".into())));
1732        }
1733
1734        #[test]
1735        fn test_gql_return_mixed_expressions() {
1736            use grafeo_common::types::Value;
1737
1738            let db = GrafeoDB::new_in_memory();
1739            let session = db.session();
1740
1741            // Create a person
1742            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1743
1744            // Query returning both node and property
1745            let result = session
1746                .execute("MATCH (n:Person) RETURN n, n.name")
1747                .unwrap();
1748
1749            assert_eq!(result.row_count(), 1);
1750            assert_eq!(result.column_count(), 2);
1751            assert_eq!(result.columns[0], "n");
1752            assert_eq!(result.columns[1], "n.name");
1753
1754            // Second column should be the name
1755            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1756        }
1757    }
1758
1759    #[cfg(feature = "cypher")]
1760    mod cypher_tests {
1761        use super::*;
1762
1763        #[test]
1764        fn test_cypher_query_execution() {
1765            let db = GrafeoDB::new_in_memory();
1766            let session = db.session();
1767
1768            // Create some test data
1769            session.create_node(&["Person"]);
1770            session.create_node(&["Person"]);
1771            session.create_node(&["Animal"]);
1772
1773            // Execute a Cypher query
1774            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1775
1776            // Should return 2 Person nodes
1777            assert_eq!(result.row_count(), 2);
1778            assert_eq!(result.column_count(), 1);
1779            assert_eq!(result.columns[0], "n");
1780        }
1781
1782        #[test]
1783        fn test_cypher_empty_result() {
1784            let db = GrafeoDB::new_in_memory();
1785            let session = db.session();
1786
1787            // No data in database
1788            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1789
1790            assert_eq!(result.row_count(), 0);
1791        }
1792
1793        #[test]
1794        fn test_cypher_parse_error() {
1795            let db = GrafeoDB::new_in_memory();
1796            let session = db.session();
1797
1798            // Invalid Cypher syntax
1799            let result = session.execute_cypher("MATCH (n RETURN n");
1800
1801            assert!(result.is_err());
1802        }
1803    }
1804
1805    // ==================== Direct Lookup API Tests ====================
1806
1807    mod direct_lookup_tests {
1808        use super::*;
1809        use grafeo_common::types::Value;
1810
1811        #[test]
1812        fn test_get_node() {
1813            let db = GrafeoDB::new_in_memory();
1814            let session = db.session();
1815
1816            let id = session.create_node(&["Person"]);
1817            let node = session.get_node(id);
1818
1819            assert!(node.is_some());
1820            let node = node.unwrap();
1821            assert_eq!(node.id, id);
1822        }
1823
1824        #[test]
1825        fn test_get_node_not_found() {
1826            use grafeo_common::types::NodeId;
1827
1828            let db = GrafeoDB::new_in_memory();
1829            let session = db.session();
1830
1831            // Try to get a non-existent node
1832            let node = session.get_node(NodeId::new(9999));
1833            assert!(node.is_none());
1834        }
1835
1836        #[test]
1837        fn test_get_node_property() {
1838            let db = GrafeoDB::new_in_memory();
1839            let session = db.session();
1840
1841            let id = session
1842                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1843
1844            let name = session.get_node_property(id, "name");
1845            assert_eq!(name, Some(Value::String("Alice".into())));
1846
1847            // Non-existent property
1848            let missing = session.get_node_property(id, "missing");
1849            assert!(missing.is_none());
1850        }
1851
1852        #[test]
1853        fn test_get_edge() {
1854            let db = GrafeoDB::new_in_memory();
1855            let session = db.session();
1856
1857            let alice = session.create_node(&["Person"]);
1858            let bob = session.create_node(&["Person"]);
1859            let edge_id = session.create_edge(alice, bob, "KNOWS");
1860
1861            let edge = session.get_edge(edge_id);
1862            assert!(edge.is_some());
1863            let edge = edge.unwrap();
1864            assert_eq!(edge.id, edge_id);
1865            assert_eq!(edge.src, alice);
1866            assert_eq!(edge.dst, bob);
1867        }
1868
1869        #[test]
1870        fn test_get_edge_not_found() {
1871            use grafeo_common::types::EdgeId;
1872
1873            let db = GrafeoDB::new_in_memory();
1874            let session = db.session();
1875
1876            let edge = session.get_edge(EdgeId::new(9999));
1877            assert!(edge.is_none());
1878        }
1879
1880        #[test]
1881        fn test_get_neighbors_outgoing() {
1882            let db = GrafeoDB::new_in_memory();
1883            let session = db.session();
1884
1885            let alice = session.create_node(&["Person"]);
1886            let bob = session.create_node(&["Person"]);
1887            let carol = session.create_node(&["Person"]);
1888
1889            session.create_edge(alice, bob, "KNOWS");
1890            session.create_edge(alice, carol, "KNOWS");
1891
1892            let neighbors = session.get_neighbors_outgoing(alice);
1893            assert_eq!(neighbors.len(), 2);
1894
1895            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1896            assert!(neighbor_ids.contains(&bob));
1897            assert!(neighbor_ids.contains(&carol));
1898        }
1899
1900        #[test]
1901        fn test_get_neighbors_incoming() {
1902            let db = GrafeoDB::new_in_memory();
1903            let session = db.session();
1904
1905            let alice = session.create_node(&["Person"]);
1906            let bob = session.create_node(&["Person"]);
1907            let carol = session.create_node(&["Person"]);
1908
1909            session.create_edge(bob, alice, "KNOWS");
1910            session.create_edge(carol, alice, "KNOWS");
1911
1912            let neighbors = session.get_neighbors_incoming(alice);
1913            assert_eq!(neighbors.len(), 2);
1914
1915            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1916            assert!(neighbor_ids.contains(&bob));
1917            assert!(neighbor_ids.contains(&carol));
1918        }
1919
1920        #[test]
1921        fn test_get_neighbors_outgoing_by_type() {
1922            let db = GrafeoDB::new_in_memory();
1923            let session = db.session();
1924
1925            let alice = session.create_node(&["Person"]);
1926            let bob = session.create_node(&["Person"]);
1927            let company = session.create_node(&["Company"]);
1928
1929            session.create_edge(alice, bob, "KNOWS");
1930            session.create_edge(alice, company, "WORKS_AT");
1931
1932            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1933            assert_eq!(knows_neighbors.len(), 1);
1934            assert_eq!(knows_neighbors[0].0, bob);
1935
1936            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1937            assert_eq!(works_neighbors.len(), 1);
1938            assert_eq!(works_neighbors[0].0, company);
1939
1940            // No edges of this type
1941            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1942            assert!(no_neighbors.is_empty());
1943        }
1944
1945        #[test]
1946        fn test_node_exists() {
1947            use grafeo_common::types::NodeId;
1948
1949            let db = GrafeoDB::new_in_memory();
1950            let session = db.session();
1951
1952            let id = session.create_node(&["Person"]);
1953
1954            assert!(session.node_exists(id));
1955            assert!(!session.node_exists(NodeId::new(9999)));
1956        }
1957
1958        #[test]
1959        fn test_edge_exists() {
1960            use grafeo_common::types::EdgeId;
1961
1962            let db = GrafeoDB::new_in_memory();
1963            let session = db.session();
1964
1965            let alice = session.create_node(&["Person"]);
1966            let bob = session.create_node(&["Person"]);
1967            let edge_id = session.create_edge(alice, bob, "KNOWS");
1968
1969            assert!(session.edge_exists(edge_id));
1970            assert!(!session.edge_exists(EdgeId::new(9999)));
1971        }
1972
1973        #[test]
1974        fn test_get_degree() {
1975            let db = GrafeoDB::new_in_memory();
1976            let session = db.session();
1977
1978            let alice = session.create_node(&["Person"]);
1979            let bob = session.create_node(&["Person"]);
1980            let carol = session.create_node(&["Person"]);
1981
1982            // Alice knows Bob and Carol (2 outgoing)
1983            session.create_edge(alice, bob, "KNOWS");
1984            session.create_edge(alice, carol, "KNOWS");
1985            // Bob knows Alice (1 incoming for Alice)
1986            session.create_edge(bob, alice, "KNOWS");
1987
1988            let (out_degree, in_degree) = session.get_degree(alice);
1989            assert_eq!(out_degree, 2);
1990            assert_eq!(in_degree, 1);
1991
1992            // Node with no edges
1993            let lonely = session.create_node(&["Person"]);
1994            let (out, in_deg) = session.get_degree(lonely);
1995            assert_eq!(out, 0);
1996            assert_eq!(in_deg, 0);
1997        }
1998
1999        #[test]
2000        fn test_get_nodes_batch() {
2001            let db = GrafeoDB::new_in_memory();
2002            let session = db.session();
2003
2004            let alice = session.create_node(&["Person"]);
2005            let bob = session.create_node(&["Person"]);
2006            let carol = session.create_node(&["Person"]);
2007
2008            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
2009            assert_eq!(nodes.len(), 3);
2010            assert!(nodes[0].is_some());
2011            assert!(nodes[1].is_some());
2012            assert!(nodes[2].is_some());
2013
2014            // With non-existent node
2015            use grafeo_common::types::NodeId;
2016            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
2017            assert_eq!(nodes_with_missing.len(), 3);
2018            assert!(nodes_with_missing[0].is_some());
2019            assert!(nodes_with_missing[1].is_none()); // Missing node
2020            assert!(nodes_with_missing[2].is_some());
2021        }
2022
2023        #[test]
2024        fn test_auto_commit_setting() {
2025            let db = GrafeoDB::new_in_memory();
2026            let mut session = db.session();
2027
2028            // Default is auto-commit enabled
2029            assert!(session.auto_commit());
2030
2031            session.set_auto_commit(false);
2032            assert!(!session.auto_commit());
2033
2034            session.set_auto_commit(true);
2035            assert!(session.auto_commit());
2036        }
2037
2038        #[test]
2039        fn test_transaction_double_begin_error() {
2040            let db = GrafeoDB::new_in_memory();
2041            let mut session = db.session();
2042
2043            session.begin_tx().unwrap();
2044            let result = session.begin_tx();
2045
2046            assert!(result.is_err());
2047            // Clean up
2048            session.rollback().unwrap();
2049        }
2050
2051        #[test]
2052        fn test_commit_without_transaction_error() {
2053            let db = GrafeoDB::new_in_memory();
2054            let mut session = db.session();
2055
2056            let result = session.commit();
2057            assert!(result.is_err());
2058        }
2059
2060        #[test]
2061        fn test_rollback_without_transaction_error() {
2062            let db = GrafeoDB::new_in_memory();
2063            let mut session = db.session();
2064
2065            let result = session.rollback();
2066            assert!(result.is_err());
2067        }
2068
2069        #[test]
2070        fn test_create_edge_in_transaction() {
2071            let db = GrafeoDB::new_in_memory();
2072            let mut session = db.session();
2073
2074            // Create nodes outside transaction
2075            let alice = session.create_node(&["Person"]);
2076            let bob = session.create_node(&["Person"]);
2077
2078            // Create edge in transaction
2079            session.begin_tx().unwrap();
2080            let edge_id = session.create_edge(alice, bob, "KNOWS");
2081
2082            // Edge should be visible in the transaction
2083            assert!(session.edge_exists(edge_id));
2084
2085            // Commit
2086            session.commit().unwrap();
2087
2088            // Edge should still be visible
2089            assert!(session.edge_exists(edge_id));
2090        }
2091
2092        #[test]
2093        fn test_neighbors_empty_node() {
2094            let db = GrafeoDB::new_in_memory();
2095            let session = db.session();
2096
2097            let lonely = session.create_node(&["Person"]);
2098
2099            assert!(session.get_neighbors_outgoing(lonely).is_empty());
2100            assert!(session.get_neighbors_incoming(lonely).is_empty());
2101            assert!(
2102                session
2103                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
2104                    .is_empty()
2105            );
2106        }
2107    }
2108
2109    #[test]
2110    fn test_auto_gc_triggers_on_commit_interval() {
2111        use crate::config::Config;
2112
2113        let config = Config::in_memory().with_gc_interval(2);
2114        let db = GrafeoDB::with_config(config).unwrap();
2115        let mut session = db.session();
2116
2117        // First commit: counter = 1, no GC (not a multiple of 2)
2118        session.begin_tx().unwrap();
2119        session.create_node(&["A"]);
2120        session.commit().unwrap();
2121
2122        // Second commit: counter = 2, GC should trigger (multiple of 2)
2123        session.begin_tx().unwrap();
2124        session.create_node(&["B"]);
2125        session.commit().unwrap();
2126
2127        // Verify the database is still functional after GC
2128        assert_eq!(db.node_count(), 2);
2129    }
2130
2131    #[test]
2132    fn test_query_timeout_config_propagates_to_session() {
2133        use crate::config::Config;
2134        use std::time::Duration;
2135
2136        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
2137        let db = GrafeoDB::with_config(config).unwrap();
2138        let session = db.session();
2139
2140        // Verify the session has a query deadline (timeout was set)
2141        assert!(session.query_deadline().is_some());
2142    }
2143
2144    #[test]
2145    fn test_no_query_timeout_returns_no_deadline() {
2146        let db = GrafeoDB::new_in_memory();
2147        let session = db.session();
2148
2149        // Default config has no timeout
2150        assert!(session.query_deadline().is_none());
2151    }
2152
2153    #[test]
2154    fn test_graph_model_accessor() {
2155        use crate::config::GraphModel;
2156
2157        let db = GrafeoDB::new_in_memory();
2158        let session = db.session();
2159
2160        assert_eq!(session.graph_model(), GraphModel::Lpg);
2161    }
2162
2163    #[cfg(feature = "gql")]
2164    #[test]
2165    fn test_external_store_session() {
2166        use grafeo_core::graph::GraphStoreMut;
2167        use std::sync::Arc;
2168
2169        let config = crate::config::Config::in_memory();
2170        let store = Arc::new(grafeo_core::graph::lpg::LpgStore::new()) as Arc<dyn GraphStoreMut>;
2171        let db = GrafeoDB::with_store(store, config).unwrap();
2172
2173        let session = db.session();
2174
2175        // Create data through a query (goes through the external graph_store)
2176        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
2177
2178        // Verify we can query through it
2179        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
2180        assert_eq!(result.row_count(), 1);
2181    }
2182}