Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::config::{AdaptiveConfig, GraphModel};
20use crate::database::QueryResult;
21use crate::query::cache::QueryCache;
22use crate::transaction::TransactionManager;
23
24/// Your handle to the database - execute queries and manage transactions.
25///
26/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
27/// tracks its own transaction state, so you can have multiple concurrent
28/// sessions without them interfering.
29pub struct Session {
30    /// The underlying store.
31    store: Arc<LpgStore>,
32    /// Graph store trait object for pluggable storage backends.
33    graph_store: Arc<dyn GraphStoreMut>,
34    /// RDF triple store (if RDF feature is enabled).
35    #[cfg(feature = "rdf")]
36    rdf_store: Arc<RdfStore>,
37    /// Transaction manager.
38    tx_manager: Arc<TransactionManager>,
39    /// Query cache shared across sessions.
40    query_cache: Arc<QueryCache>,
41    /// Current transaction ID (if any).
42    current_tx: Option<TxId>,
43    /// Whether the session is in auto-commit mode.
44    auto_commit: bool,
45    /// Adaptive execution configuration.
46    #[allow(dead_code)]
47    adaptive_config: AdaptiveConfig,
48    /// Whether to use factorized execution for multi-hop queries.
49    factorized_execution: bool,
50    /// The graph data model this session operates on.
51    graph_model: GraphModel,
52    /// Maximum time a query may run before being cancelled.
53    query_timeout: Option<Duration>,
54    /// Shared commit counter for triggering auto-GC.
55    commit_counter: Arc<AtomicUsize>,
56    /// GC every N commits (0 = disabled).
57    gc_interval: usize,
58    /// CDC log for change tracking.
59    #[cfg(feature = "cdc")]
60    cdc_log: Arc<crate::cdc::CdcLog>,
61}
62
63impl Session {
64    /// Creates a new session with adaptive execution configuration.
65    #[allow(dead_code, clippy::too_many_arguments)]
66    pub(crate) fn with_adaptive(
67        store: Arc<LpgStore>,
68        tx_manager: Arc<TransactionManager>,
69        query_cache: Arc<QueryCache>,
70        adaptive_config: AdaptiveConfig,
71        factorized_execution: bool,
72        graph_model: GraphModel,
73        query_timeout: Option<Duration>,
74        commit_counter: Arc<AtomicUsize>,
75        gc_interval: usize,
76    ) -> Self {
77        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
78        Self {
79            store,
80            graph_store,
81            #[cfg(feature = "rdf")]
82            rdf_store: Arc::new(RdfStore::new()),
83            tx_manager,
84            query_cache,
85            current_tx: None,
86            auto_commit: true,
87            adaptive_config,
88            factorized_execution,
89            graph_model,
90            query_timeout,
91            commit_counter,
92            gc_interval,
93            #[cfg(feature = "cdc")]
94            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
95        }
96    }
97
98    /// Sets the CDC log for this session (shared with the database).
99    #[cfg(feature = "cdc")]
100    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
101        self.cdc_log = cdc_log;
102    }
103
104    /// Creates a new session with RDF store and adaptive configuration.
105    #[cfg(feature = "rdf")]
106    #[allow(clippy::too_many_arguments)]
107    pub(crate) fn with_rdf_store_and_adaptive(
108        store: Arc<LpgStore>,
109        rdf_store: Arc<RdfStore>,
110        tx_manager: Arc<TransactionManager>,
111        query_cache: Arc<QueryCache>,
112        adaptive_config: AdaptiveConfig,
113        factorized_execution: bool,
114        graph_model: GraphModel,
115        query_timeout: Option<Duration>,
116        commit_counter: Arc<AtomicUsize>,
117        gc_interval: usize,
118    ) -> Self {
119        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
120        Self {
121            store,
122            graph_store,
123            rdf_store,
124            tx_manager,
125            query_cache,
126            current_tx: None,
127            auto_commit: true,
128            adaptive_config,
129            factorized_execution,
130            graph_model,
131            query_timeout,
132            commit_counter,
133            gc_interval,
134            #[cfg(feature = "cdc")]
135            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
136        }
137    }
138
139    /// Creates a session backed by an external graph store.
140    ///
141    /// The external store handles all data operations. Transaction management
142    /// (begin/commit/rollback) is not supported for external stores.
143    #[allow(clippy::too_many_arguments)]
144    pub(crate) fn with_external_store(
145        store: Arc<dyn GraphStoreMut>,
146        tx_manager: Arc<TransactionManager>,
147        query_cache: Arc<QueryCache>,
148        adaptive_config: AdaptiveConfig,
149        factorized_execution: bool,
150        graph_model: GraphModel,
151        query_timeout: Option<Duration>,
152        commit_counter: Arc<AtomicUsize>,
153        gc_interval: usize,
154    ) -> Self {
155        Self {
156            store: Arc::new(LpgStore::new()), // dummy for LpgStore-specific ops
157            graph_store: store,
158            #[cfg(feature = "rdf")]
159            rdf_store: Arc::new(RdfStore::new()),
160            tx_manager,
161            query_cache,
162            current_tx: None,
163            auto_commit: true,
164            adaptive_config,
165            factorized_execution,
166            graph_model,
167            query_timeout,
168            commit_counter,
169            gc_interval,
170            #[cfg(feature = "cdc")]
171            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
172        }
173    }
174
175    /// Returns the graph model this session operates on.
176    #[must_use]
177    pub fn graph_model(&self) -> GraphModel {
178        self.graph_model
179    }
180
181    /// Checks that the session's graph model supports LPG operations.
182    fn require_lpg(&self, language: &str) -> Result<()> {
183        if self.graph_model == GraphModel::Rdf {
184            return Err(grafeo_common::utils::error::Error::Internal(format!(
185                "This is an RDF database. {language} queries require an LPG database."
186            )));
187        }
188        Ok(())
189    }
190
191    /// Executes a GQL query.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if the query fails to parse or execute.
196    ///
197    /// # Examples
198    ///
199    /// ```no_run
200    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
201    /// use grafeo_engine::GrafeoDB;
202    ///
203    /// let db = GrafeoDB::new_in_memory();
204    /// let session = db.session();
205    ///
206    /// // Create a node
207    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
208    ///
209    /// // Query nodes
210    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
211    /// for row in &result.rows {
212    ///     println!("{:?}", row);
213    /// }
214    /// # Ok(())
215    /// # }
216    /// ```
217    #[cfg(feature = "gql")]
218    pub fn execute(&self, query: &str) -> Result<QueryResult> {
219        self.require_lpg("GQL")?;
220
221        use crate::query::{
222            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
223            optimizer::Optimizer, processor::QueryLanguage,
224        };
225
226        let start_time = std::time::Instant::now();
227
228        // Create cache key for this query
229        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
230
231        // Try to get cached optimized plan
232        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
233            // Cache hit - skip parsing, translation, binding, and optimization
234            cached_plan
235        } else {
236            // Cache miss - run full pipeline
237
238            // Parse and translate the query to a logical plan
239            let logical_plan = gql_translator::translate(query)?;
240
241            // Semantic validation
242            let mut binder = Binder::new();
243            let _binding_context = binder.bind(&logical_plan)?;
244
245            // Optimize the plan
246            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
247            let plan = optimizer.optimize(logical_plan)?;
248
249            // Cache the optimized plan for future use
250            self.query_cache.put_optimized(cache_key, plan.clone());
251
252            plan
253        };
254
255        // Get transaction context for MVCC visibility
256        let (viewing_epoch, tx_id) = self.get_transaction_context();
257
258        // Convert to physical plan with transaction context
259        // (Physical planning cannot be cached as it depends on transaction state)
260        let planner = Planner::with_context(
261            Arc::clone(&self.graph_store),
262            Arc::clone(&self.tx_manager),
263            tx_id,
264            viewing_epoch,
265        )
266        .with_factorized_execution(self.factorized_execution);
267        let mut physical_plan = planner.plan(&optimized_plan)?;
268
269        // Execute the plan
270        let executor = Executor::with_columns(physical_plan.columns.clone())
271            .with_deadline(self.query_deadline());
272        let mut result = executor.execute(physical_plan.operator.as_mut())?;
273
274        // Add execution metrics
275        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
276        let rows_scanned = result.rows.len() as u64;
277        result.execution_time_ms = Some(elapsed_ms);
278        result.rows_scanned = Some(rows_scanned);
279
280        Ok(result)
281    }
282
283    /// Executes a GQL query with parameters.
284    ///
285    /// # Errors
286    ///
287    /// Returns an error if the query fails to parse or execute.
288    #[cfg(feature = "gql")]
289    pub fn execute_with_params(
290        &self,
291        query: &str,
292        params: std::collections::HashMap<String, Value>,
293    ) -> Result<QueryResult> {
294        self.require_lpg("GQL")?;
295
296        use crate::query::processor::{QueryLanguage, QueryProcessor};
297
298        // Get transaction context for MVCC visibility
299        let (viewing_epoch, tx_id) = self.get_transaction_context();
300
301        // Create processor with transaction context
302        let processor = QueryProcessor::for_graph_store_with_tx(
303            Arc::clone(&self.graph_store),
304            Arc::clone(&self.tx_manager),
305        );
306
307        // Apply transaction context if in a transaction
308        let processor = if let Some(tx_id) = tx_id {
309            processor.with_tx_context(viewing_epoch, tx_id)
310        } else {
311            processor
312        };
313
314        processor.process(query, QueryLanguage::Gql, Some(&params))
315    }
316
317    /// Executes a GQL query with parameters.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if no query language is enabled.
322    #[cfg(not(any(feature = "gql", feature = "cypher")))]
323    pub fn execute_with_params(
324        &self,
325        _query: &str,
326        _params: std::collections::HashMap<String, Value>,
327    ) -> Result<QueryResult> {
328        Err(grafeo_common::utils::error::Error::Internal(
329            "No query language enabled".to_string(),
330        ))
331    }
332
333    /// Executes a GQL query.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if no query language is enabled.
338    #[cfg(not(any(feature = "gql", feature = "cypher")))]
339    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
340        Err(grafeo_common::utils::error::Error::Internal(
341            "No query language enabled".to_string(),
342        ))
343    }
344
345    /// Executes a Cypher query.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error if the query fails to parse or execute.
350    #[cfg(feature = "cypher")]
351    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
352        use crate::query::{
353            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
354            optimizer::Optimizer, processor::QueryLanguage,
355        };
356
357        // Create cache key for this query
358        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
359
360        // Try to get cached optimized plan
361        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
362            cached_plan
363        } else {
364            // Parse and translate the query to a logical plan
365            let logical_plan = cypher_translator::translate(query)?;
366
367            // Semantic validation
368            let mut binder = Binder::new();
369            let _binding_context = binder.bind(&logical_plan)?;
370
371            // Optimize the plan
372            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
373            let plan = optimizer.optimize(logical_plan)?;
374
375            // Cache the optimized plan
376            self.query_cache.put_optimized(cache_key, plan.clone());
377
378            plan
379        };
380
381        // Get transaction context for MVCC visibility
382        let (viewing_epoch, tx_id) = self.get_transaction_context();
383
384        // Convert to physical plan with transaction context
385        let planner = Planner::with_context(
386            Arc::clone(&self.graph_store),
387            Arc::clone(&self.tx_manager),
388            tx_id,
389            viewing_epoch,
390        )
391        .with_factorized_execution(self.factorized_execution);
392        let mut physical_plan = planner.plan(&optimized_plan)?;
393
394        // Execute the plan
395        let executor = Executor::with_columns(physical_plan.columns.clone())
396            .with_deadline(self.query_deadline());
397        let result = executor.execute(physical_plan.operator.as_mut())?;
398        Ok(result)
399    }
400
401    /// Executes a Gremlin query.
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the query fails to parse or execute.
406    ///
407    /// # Examples
408    ///
409    /// ```no_run
410    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
411    /// use grafeo_engine::GrafeoDB;
412    ///
413    /// let db = GrafeoDB::new_in_memory();
414    /// let session = db.session();
415    ///
416    /// // Create some nodes first
417    /// session.create_node(&["Person"]);
418    ///
419    /// // Query using Gremlin
420    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
421    /// # Ok(())
422    /// # }
423    /// ```
424    #[cfg(feature = "gremlin")]
425    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
426        use crate::query::{
427            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
428        };
429
430        // Parse and translate the query to a logical plan
431        let logical_plan = gremlin_translator::translate(query)?;
432
433        // Semantic validation
434        let mut binder = Binder::new();
435        let _binding_context = binder.bind(&logical_plan)?;
436
437        // Optimize the plan
438        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
439        let optimized_plan = optimizer.optimize(logical_plan)?;
440
441        // Get transaction context for MVCC visibility
442        let (viewing_epoch, tx_id) = self.get_transaction_context();
443
444        // Convert to physical plan with transaction context
445        let planner = Planner::with_context(
446            Arc::clone(&self.graph_store),
447            Arc::clone(&self.tx_manager),
448            tx_id,
449            viewing_epoch,
450        )
451        .with_factorized_execution(self.factorized_execution);
452        let mut physical_plan = planner.plan(&optimized_plan)?;
453
454        // Execute the plan
455        let executor = Executor::with_columns(physical_plan.columns.clone())
456            .with_deadline(self.query_deadline());
457        let result = executor.execute(physical_plan.operator.as_mut())?;
458        Ok(result)
459    }
460
461    /// Executes a Gremlin query with parameters.
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the query fails to parse or execute.
466    #[cfg(feature = "gremlin")]
467    pub fn execute_gremlin_with_params(
468        &self,
469        query: &str,
470        params: std::collections::HashMap<String, Value>,
471    ) -> Result<QueryResult> {
472        use crate::query::processor::{QueryLanguage, QueryProcessor};
473
474        // Get transaction context for MVCC visibility
475        let (viewing_epoch, tx_id) = self.get_transaction_context();
476
477        // Create processor with transaction context
478        let processor = QueryProcessor::for_graph_store_with_tx(
479            Arc::clone(&self.graph_store),
480            Arc::clone(&self.tx_manager),
481        );
482
483        // Apply transaction context if in a transaction
484        let processor = if let Some(tx_id) = tx_id {
485            processor.with_tx_context(viewing_epoch, tx_id)
486        } else {
487            processor
488        };
489
490        processor.process(query, QueryLanguage::Gremlin, Some(&params))
491    }
492
493    /// Executes a GraphQL query against the LPG store.
494    ///
495    /// # Errors
496    ///
497    /// Returns an error if the query fails to parse or execute.
498    ///
499    /// # Examples
500    ///
501    /// ```no_run
502    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
503    /// use grafeo_engine::GrafeoDB;
504    ///
505    /// let db = GrafeoDB::new_in_memory();
506    /// let session = db.session();
507    ///
508    /// // Create some nodes first
509    /// session.create_node(&["User"]);
510    ///
511    /// // Query using GraphQL
512    /// let result = session.execute_graphql("query { user { id name } }")?;
513    /// # Ok(())
514    /// # }
515    /// ```
516    #[cfg(feature = "graphql")]
517    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
518        use crate::query::{
519            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
520        };
521
522        // Parse and translate the query to a logical plan
523        let logical_plan = graphql_translator::translate(query)?;
524
525        // Semantic validation
526        let mut binder = Binder::new();
527        let _binding_context = binder.bind(&logical_plan)?;
528
529        // Optimize the plan
530        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
531        let optimized_plan = optimizer.optimize(logical_plan)?;
532
533        // Get transaction context for MVCC visibility
534        let (viewing_epoch, tx_id) = self.get_transaction_context();
535
536        // Convert to physical plan with transaction context
537        let planner = Planner::with_context(
538            Arc::clone(&self.graph_store),
539            Arc::clone(&self.tx_manager),
540            tx_id,
541            viewing_epoch,
542        )
543        .with_factorized_execution(self.factorized_execution);
544        let mut physical_plan = planner.plan(&optimized_plan)?;
545
546        // Execute the plan
547        let executor = Executor::with_columns(physical_plan.columns.clone())
548            .with_deadline(self.query_deadline());
549        let result = executor.execute(physical_plan.operator.as_mut())?;
550        Ok(result)
551    }
552
553    /// Executes a GraphQL query with parameters.
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if the query fails to parse or execute.
558    #[cfg(feature = "graphql")]
559    pub fn execute_graphql_with_params(
560        &self,
561        query: &str,
562        params: std::collections::HashMap<String, Value>,
563    ) -> Result<QueryResult> {
564        use crate::query::processor::{QueryLanguage, QueryProcessor};
565
566        // Get transaction context for MVCC visibility
567        let (viewing_epoch, tx_id) = self.get_transaction_context();
568
569        // Create processor with transaction context
570        let processor = QueryProcessor::for_graph_store_with_tx(
571            Arc::clone(&self.graph_store),
572            Arc::clone(&self.tx_manager),
573        );
574
575        // Apply transaction context if in a transaction
576        let processor = if let Some(tx_id) = tx_id {
577            processor.with_tx_context(viewing_epoch, tx_id)
578        } else {
579            processor
580        };
581
582        processor.process(query, QueryLanguage::GraphQL, Some(&params))
583    }
584
585    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
586    ///
587    /// # Errors
588    ///
589    /// Returns an error if the query fails to parse or execute.
590    ///
591    /// # Examples
592    ///
593    /// ```no_run
594    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
595    /// use grafeo_engine::GrafeoDB;
596    ///
597    /// let db = GrafeoDB::new_in_memory();
598    /// let session = db.session();
599    ///
600    /// let result = session.execute_sql(
601    ///     "SELECT * FROM GRAPH_TABLE (
602    ///         MATCH (n:Person)
603    ///         COLUMNS (n.name AS name)
604    ///     )"
605    /// )?;
606    /// # Ok(())
607    /// # }
608    /// ```
609    #[cfg(feature = "sql-pgq")]
610    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
611        use crate::query::{
612            Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
613            plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
614        };
615
616        // Parse and translate (always needed to check for DDL)
617        let logical_plan = sql_pgq_translator::translate(query)?;
618
619        // Handle DDL statements directly (they don't go through the query pipeline)
620        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
621            return Ok(QueryResult {
622                columns: vec!["status".into()],
623                column_types: vec![grafeo_common::types::LogicalType::String],
624                rows: vec![vec![Value::from(format!(
625                    "Property graph '{}' created ({} node tables, {} edge tables)",
626                    cpg.name,
627                    cpg.node_tables.len(),
628                    cpg.edge_tables.len()
629                ))]],
630                execution_time_ms: None,
631                rows_scanned: None,
632            });
633        }
634
635        // Create cache key for query plans
636        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
637
638        // Try to get cached optimized plan
639        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
640            cached_plan
641        } else {
642            // Semantic validation
643            let mut binder = Binder::new();
644            let _binding_context = binder.bind(&logical_plan)?;
645
646            // Optimize the plan
647            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
648            let plan = optimizer.optimize(logical_plan)?;
649
650            // Cache the optimized plan
651            self.query_cache.put_optimized(cache_key, plan.clone());
652
653            plan
654        };
655
656        // Get transaction context for MVCC visibility
657        let (viewing_epoch, tx_id) = self.get_transaction_context();
658
659        // Convert to physical plan with transaction context
660        let planner = Planner::with_context(
661            Arc::clone(&self.graph_store),
662            Arc::clone(&self.tx_manager),
663            tx_id,
664            viewing_epoch,
665        )
666        .with_factorized_execution(self.factorized_execution);
667        let mut physical_plan = planner.plan(&optimized_plan)?;
668
669        // Execute the plan
670        let executor = Executor::with_columns(physical_plan.columns.clone())
671            .with_deadline(self.query_deadline());
672        let result = executor.execute(physical_plan.operator.as_mut())?;
673        Ok(result)
674    }
675
676    /// Executes a SQL/PGQ query with parameters.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if the query fails to parse or execute.
681    #[cfg(feature = "sql-pgq")]
682    pub fn execute_sql_with_params(
683        &self,
684        query: &str,
685        params: std::collections::HashMap<String, Value>,
686    ) -> Result<QueryResult> {
687        use crate::query::processor::{QueryLanguage, QueryProcessor};
688
689        // Get transaction context for MVCC visibility
690        let (viewing_epoch, tx_id) = self.get_transaction_context();
691
692        // Create processor with transaction context
693        let processor = QueryProcessor::for_graph_store_with_tx(
694            Arc::clone(&self.graph_store),
695            Arc::clone(&self.tx_manager),
696        );
697
698        // Apply transaction context if in a transaction
699        let processor = if let Some(tx_id) = tx_id {
700            processor.with_tx_context(viewing_epoch, tx_id)
701        } else {
702            processor
703        };
704
705        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
706    }
707
708    /// Executes a SPARQL query.
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if the query fails to parse or execute.
713    #[cfg(all(feature = "sparql", feature = "rdf"))]
714    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
715        use crate::query::{
716            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
717        };
718
719        // Parse and translate the SPARQL query to a logical plan
720        let logical_plan = sparql_translator::translate(query)?;
721
722        // Optimize the plan
723        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
724        let optimized_plan = optimizer.optimize(logical_plan)?;
725
726        // Convert to physical plan using RDF planner
727        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
728        let mut physical_plan = planner.plan(&optimized_plan)?;
729
730        // Execute the plan
731        let executor = Executor::with_columns(physical_plan.columns.clone())
732            .with_deadline(self.query_deadline());
733        executor.execute(physical_plan.operator.as_mut())
734    }
735
736    /// Executes a SPARQL query with parameters.
737    ///
738    /// # Errors
739    ///
740    /// Returns an error if the query fails to parse or execute.
741    #[cfg(all(feature = "sparql", feature = "rdf"))]
742    pub fn execute_sparql_with_params(
743        &self,
744        query: &str,
745        _params: std::collections::HashMap<String, Value>,
746    ) -> Result<QueryResult> {
747        // TODO: Implement parameter substitution for SPARQL
748        // For now, just execute the query without parameters
749        self.execute_sparql(query)
750    }
751
752    /// Executes a query in the specified language by name.
753    ///
754    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
755    /// `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
756    ///
757    /// # Errors
758    ///
759    /// Returns an error if the language is unknown/disabled or the query fails.
760    pub fn execute_language(
761        &self,
762        query: &str,
763        language: &str,
764        params: Option<std::collections::HashMap<String, Value>>,
765    ) -> Result<QueryResult> {
766        match language {
767            "gql" => {
768                if let Some(p) = params {
769                    self.execute_with_params(query, p)
770                } else {
771                    self.execute(query)
772                }
773            }
774            #[cfg(feature = "cypher")]
775            "cypher" => {
776                if let Some(p) = params {
777                    use crate::query::processor::{QueryLanguage, QueryProcessor};
778                    let processor = QueryProcessor::for_graph_store_with_tx(
779                        Arc::clone(&self.graph_store),
780                        Arc::clone(&self.tx_manager),
781                    );
782                    let (viewing_epoch, tx_id) = self.get_transaction_context();
783                    let processor = if let Some(tx_id) = tx_id {
784                        processor.with_tx_context(viewing_epoch, tx_id)
785                    } else {
786                        processor
787                    };
788                    processor.process(query, QueryLanguage::Cypher, Some(&p))
789                } else {
790                    self.execute_cypher(query)
791                }
792            }
793            #[cfg(feature = "gremlin")]
794            "gremlin" => {
795                if let Some(p) = params {
796                    self.execute_gremlin_with_params(query, p)
797                } else {
798                    self.execute_gremlin(query)
799                }
800            }
801            #[cfg(feature = "graphql")]
802            "graphql" => {
803                if let Some(p) = params {
804                    self.execute_graphql_with_params(query, p)
805                } else {
806                    self.execute_graphql(query)
807                }
808            }
809            #[cfg(feature = "sql-pgq")]
810            "sql" | "sql-pgq" => {
811                if let Some(p) = params {
812                    self.execute_sql_with_params(query, p)
813                } else {
814                    self.execute_sql(query)
815                }
816            }
817            #[cfg(all(feature = "sparql", feature = "rdf"))]
818            "sparql" => {
819                if let Some(p) = params {
820                    self.execute_sparql_with_params(query, p)
821                } else {
822                    self.execute_sparql(query)
823                }
824            }
825            other => Err(grafeo_common::utils::error::Error::Query(
826                grafeo_common::utils::error::QueryError::new(
827                    grafeo_common::utils::error::QueryErrorKind::Semantic,
828                    format!("Unknown query language: '{other}'"),
829                ),
830            )),
831        }
832    }
833
834    /// Begins a new transaction.
835    ///
836    /// # Errors
837    ///
838    /// Returns an error if a transaction is already active.
839    ///
840    /// # Examples
841    ///
842    /// ```no_run
843    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
844    /// use grafeo_engine::GrafeoDB;
845    ///
846    /// let db = GrafeoDB::new_in_memory();
847    /// let mut session = db.session();
848    ///
849    /// session.begin_tx()?;
850    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
851    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
852    /// session.commit()?; // Both inserts committed atomically
853    /// # Ok(())
854    /// # }
855    /// ```
856    pub fn begin_tx(&mut self) -> Result<()> {
857        if self.current_tx.is_some() {
858            return Err(grafeo_common::utils::error::Error::Transaction(
859                grafeo_common::utils::error::TransactionError::InvalidState(
860                    "Transaction already active".to_string(),
861                ),
862            ));
863        }
864
865        let tx_id = self.tx_manager.begin();
866        self.current_tx = Some(tx_id);
867        Ok(())
868    }
869
870    /// Begins a transaction with a specific isolation level.
871    ///
872    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
873    ///
874    /// # Errors
875    ///
876    /// Returns an error if a transaction is already active.
877    pub fn begin_tx_with_isolation(
878        &mut self,
879        isolation_level: crate::transaction::IsolationLevel,
880    ) -> Result<()> {
881        if self.current_tx.is_some() {
882            return Err(grafeo_common::utils::error::Error::Transaction(
883                grafeo_common::utils::error::TransactionError::InvalidState(
884                    "Transaction already active".to_string(),
885                ),
886            ));
887        }
888
889        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
890        self.current_tx = Some(tx_id);
891        Ok(())
892    }
893
894    /// Commits the current transaction.
895    ///
896    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
897    ///
898    /// # Errors
899    ///
900    /// Returns an error if no transaction is active.
901    pub fn commit(&mut self) -> Result<()> {
902        let tx_id = self.current_tx.take().ok_or_else(|| {
903            grafeo_common::utils::error::Error::Transaction(
904                grafeo_common::utils::error::TransactionError::InvalidState(
905                    "No active transaction".to_string(),
906                ),
907            )
908        })?;
909
910        // Commit RDF store pending operations
911        #[cfg(feature = "rdf")]
912        self.rdf_store.commit_tx(tx_id);
913
914        self.tx_manager.commit(tx_id)?;
915
916        // Sync the LpgStore epoch with the TxManager so that
917        // convenience lookups (edge_type, get_edge, get_node) that use
918        // store.current_epoch() can see versions created at the latest epoch.
919        self.store.sync_epoch(self.tx_manager.current_epoch());
920
921        // Auto-GC: periodically prune old MVCC versions
922        if self.gc_interval > 0 {
923            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
924            if count.is_multiple_of(self.gc_interval) {
925                let min_epoch = self.tx_manager.min_active_epoch();
926                self.store.gc_versions(min_epoch);
927                self.tx_manager.gc();
928            }
929        }
930
931        Ok(())
932    }
933
934    /// Aborts the current transaction.
935    ///
936    /// Discards all changes since [`begin_tx`](Self::begin_tx).
937    ///
938    /// # Errors
939    ///
940    /// Returns an error if no transaction is active.
941    ///
942    /// # Examples
943    ///
944    /// ```no_run
945    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
946    /// use grafeo_engine::GrafeoDB;
947    ///
948    /// let db = GrafeoDB::new_in_memory();
949    /// let mut session = db.session();
950    ///
951    /// session.begin_tx()?;
952    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
953    /// session.rollback()?; // Insert is discarded
954    /// # Ok(())
955    /// # }
956    /// ```
957    pub fn rollback(&mut self) -> Result<()> {
958        let tx_id = self.current_tx.take().ok_or_else(|| {
959            grafeo_common::utils::error::Error::Transaction(
960                grafeo_common::utils::error::TransactionError::InvalidState(
961                    "No active transaction".to_string(),
962                ),
963            )
964        })?;
965
966        // Discard uncommitted versions in the LPG store
967        self.store.discard_uncommitted_versions(tx_id);
968
969        // Discard pending operations in the RDF store
970        #[cfg(feature = "rdf")]
971        self.rdf_store.rollback_tx(tx_id);
972
973        // Mark transaction as aborted in the manager
974        self.tx_manager.abort(tx_id)
975    }
976
977    /// Returns whether a transaction is active.
978    #[must_use]
979    pub fn in_transaction(&self) -> bool {
980        self.current_tx.is_some()
981    }
982
983    /// Sets auto-commit mode.
984    pub fn set_auto_commit(&mut self, auto_commit: bool) {
985        self.auto_commit = auto_commit;
986    }
987
988    /// Returns whether auto-commit is enabled.
989    #[must_use]
990    pub fn auto_commit(&self) -> bool {
991        self.auto_commit
992    }
993
994    /// Computes the wall-clock deadline for query execution.
995    #[must_use]
996    fn query_deadline(&self) -> Option<Instant> {
997        self.query_timeout.map(|d| Instant::now() + d)
998    }
999
1000    /// Returns the current transaction context for MVCC visibility.
1001    ///
1002    /// Returns `(viewing_epoch, tx_id)` where:
1003    /// - `viewing_epoch` is the epoch at which to check version visibility
1004    /// - `tx_id` is the current transaction ID (if in a transaction)
1005    #[must_use]
1006    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
1007        if let Some(tx_id) = self.current_tx {
1008            // In a transaction - use the transaction's start epoch
1009            let epoch = self
1010                .tx_manager
1011                .start_epoch(tx_id)
1012                .unwrap_or_else(|| self.tx_manager.current_epoch());
1013            (epoch, Some(tx_id))
1014        } else {
1015            // No transaction - use current epoch
1016            (self.tx_manager.current_epoch(), None)
1017        }
1018    }
1019
1020    /// Creates a node directly (bypassing query execution).
1021    ///
1022    /// This is a low-level API for testing and direct manipulation.
1023    /// If a transaction is active, the node will be versioned with the transaction ID.
1024    pub fn create_node(&self, labels: &[&str]) -> NodeId {
1025        let (epoch, tx_id) = self.get_transaction_context();
1026        self.store
1027            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1028    }
1029
1030    /// Creates a node with properties.
1031    ///
1032    /// If a transaction is active, the node will be versioned with the transaction ID.
1033    pub fn create_node_with_props<'a>(
1034        &self,
1035        labels: &[&str],
1036        properties: impl IntoIterator<Item = (&'a str, Value)>,
1037    ) -> NodeId {
1038        let (epoch, tx_id) = self.get_transaction_context();
1039        self.store.create_node_with_props_versioned(
1040            labels,
1041            properties.into_iter().map(|(k, v)| (k, v)),
1042            epoch,
1043            tx_id.unwrap_or(TxId::SYSTEM),
1044        )
1045    }
1046
1047    /// Creates an edge between two nodes.
1048    ///
1049    /// This is a low-level API for testing and direct manipulation.
1050    /// If a transaction is active, the edge will be versioned with the transaction ID.
1051    pub fn create_edge(
1052        &self,
1053        src: NodeId,
1054        dst: NodeId,
1055        edge_type: &str,
1056    ) -> grafeo_common::types::EdgeId {
1057        let (epoch, tx_id) = self.get_transaction_context();
1058        self.store
1059            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1060    }
1061
1062    // =========================================================================
1063    // Direct Lookup APIs (bypass query planning for O(1) point reads)
1064    // =========================================================================
1065
1066    /// Gets a node by ID directly, bypassing query planning.
1067    ///
1068    /// This is the fastest way to retrieve a single node when you know its ID.
1069    /// Skips parsing, binding, optimization, and physical planning entirely.
1070    ///
1071    /// # Performance
1072    ///
1073    /// - Time complexity: O(1) average case
1074    /// - No lock contention (uses DashMap internally)
1075    /// - ~20-30x faster than equivalent MATCH query
1076    ///
1077    /// # Example
1078    ///
1079    /// ```no_run
1080    /// # use grafeo_engine::GrafeoDB;
1081    /// # let db = GrafeoDB::new_in_memory();
1082    /// let session = db.session();
1083    /// let node_id = session.create_node(&["Person"]);
1084    ///
1085    /// // Direct lookup - O(1), no query planning
1086    /// let node = session.get_node(node_id);
1087    /// assert!(node.is_some());
1088    /// ```
1089    #[must_use]
1090    pub fn get_node(&self, id: NodeId) -> Option<Node> {
1091        let (epoch, tx_id) = self.get_transaction_context();
1092        self.store
1093            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1094    }
1095
1096    /// Gets a single property from a node by ID, bypassing query planning.
1097    ///
1098    /// More efficient than `get_node()` when you only need one property,
1099    /// as it avoids loading the full node with all properties.
1100    ///
1101    /// # Performance
1102    ///
1103    /// - Time complexity: O(1) average case
1104    /// - No query planning overhead
1105    ///
1106    /// # Example
1107    ///
1108    /// ```no_run
1109    /// # use grafeo_engine::GrafeoDB;
1110    /// # use grafeo_common::types::Value;
1111    /// # let db = GrafeoDB::new_in_memory();
1112    /// let session = db.session();
1113    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
1114    ///
1115    /// // Direct property access - O(1)
1116    /// let name = session.get_node_property(id, "name");
1117    /// assert_eq!(name, Some(Value::String("Alice".into())));
1118    /// ```
1119    #[must_use]
1120    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
1121        self.get_node(id)
1122            .and_then(|node| node.get_property(key).cloned())
1123    }
1124
1125    /// Gets an edge by ID directly, bypassing query planning.
1126    ///
1127    /// # Performance
1128    ///
1129    /// - Time complexity: O(1) average case
1130    /// - No lock contention
1131    #[must_use]
1132    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1133        let (epoch, tx_id) = self.get_transaction_context();
1134        self.store
1135            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1136    }
1137
1138    /// Gets outgoing neighbors of a node directly, bypassing query planning.
1139    ///
1140    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
1141    ///
1142    /// # Performance
1143    ///
1144    /// - Time complexity: O(degree) where degree is the number of outgoing edges
1145    /// - Uses adjacency index for direct access
1146    /// - ~10-20x faster than equivalent MATCH query
1147    ///
1148    /// # Example
1149    ///
1150    /// ```no_run
1151    /// # use grafeo_engine::GrafeoDB;
1152    /// # let db = GrafeoDB::new_in_memory();
1153    /// let session = db.session();
1154    /// let alice = session.create_node(&["Person"]);
1155    /// let bob = session.create_node(&["Person"]);
1156    /// session.create_edge(alice, bob, "KNOWS");
1157    ///
1158    /// // Direct neighbor lookup - O(degree)
1159    /// let neighbors = session.get_neighbors_outgoing(alice);
1160    /// assert_eq!(neighbors.len(), 1);
1161    /// assert_eq!(neighbors[0].0, bob);
1162    /// ```
1163    #[must_use]
1164    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1165        self.store.edges_from(node, Direction::Outgoing).collect()
1166    }
1167
1168    /// Gets incoming neighbors of a node directly, bypassing query planning.
1169    ///
1170    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
1171    ///
1172    /// # Performance
1173    ///
1174    /// - Time complexity: O(degree) where degree is the number of incoming edges
1175    /// - Uses backward adjacency index for direct access
1176    #[must_use]
1177    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1178        self.store.edges_from(node, Direction::Incoming).collect()
1179    }
1180
1181    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
1182    ///
1183    /// # Example
1184    ///
1185    /// ```no_run
1186    /// # use grafeo_engine::GrafeoDB;
1187    /// # let db = GrafeoDB::new_in_memory();
1188    /// # let session = db.session();
1189    /// # let alice = session.create_node(&["Person"]);
1190    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1191    /// ```
1192    #[must_use]
1193    pub fn get_neighbors_outgoing_by_type(
1194        &self,
1195        node: NodeId,
1196        edge_type: &str,
1197    ) -> Vec<(NodeId, EdgeId)> {
1198        self.store
1199            .edges_from(node, Direction::Outgoing)
1200            .filter(|(_, edge_id)| {
1201                self.get_edge(*edge_id)
1202                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
1203            })
1204            .collect()
1205    }
1206
1207    /// Checks if a node exists, bypassing query planning.
1208    ///
1209    /// # Performance
1210    ///
1211    /// - Time complexity: O(1)
1212    /// - Fastest existence check available
1213    #[must_use]
1214    pub fn node_exists(&self, id: NodeId) -> bool {
1215        self.get_node(id).is_some()
1216    }
1217
1218    /// Checks if an edge exists, bypassing query planning.
1219    #[must_use]
1220    pub fn edge_exists(&self, id: EdgeId) -> bool {
1221        self.get_edge(id).is_some()
1222    }
1223
1224    /// Gets the degree (number of edges) of a node.
1225    ///
1226    /// Returns (outgoing_degree, incoming_degree).
1227    #[must_use]
1228    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1229        let out = self.store.out_degree(node);
1230        let in_degree = self.store.in_degree(node);
1231        (out, in_degree)
1232    }
1233
1234    /// Batch lookup of multiple nodes by ID.
1235    ///
1236    /// More efficient than calling `get_node()` in a loop because it
1237    /// amortizes overhead.
1238    ///
1239    /// # Performance
1240    ///
1241    /// - Time complexity: O(n) where n is the number of IDs
1242    /// - Better cache utilization than individual lookups
1243    #[must_use]
1244    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1245        let (epoch, tx_id) = self.get_transaction_context();
1246        let tx = tx_id.unwrap_or(TxId::SYSTEM);
1247        ids.iter()
1248            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1249            .collect()
1250    }
1251
1252    // ── Change Data Capture ─────────────────────────────────────────────
1253
1254    /// Returns the full change history for an entity (node or edge).
1255    #[cfg(feature = "cdc")]
1256    pub fn history(
1257        &self,
1258        entity_id: impl Into<crate::cdc::EntityId>,
1259    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1260        Ok(self.cdc_log.history(entity_id.into()))
1261    }
1262
1263    /// Returns change events for an entity since the given epoch.
1264    #[cfg(feature = "cdc")]
1265    pub fn history_since(
1266        &self,
1267        entity_id: impl Into<crate::cdc::EntityId>,
1268        since_epoch: EpochId,
1269    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1270        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1271    }
1272
1273    /// Returns all change events across all entities in an epoch range.
1274    #[cfg(feature = "cdc")]
1275    pub fn changes_between(
1276        &self,
1277        start_epoch: EpochId,
1278        end_epoch: EpochId,
1279    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1280        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1281    }
1282}
1283
1284#[cfg(test)]
1285mod tests {
1286    use crate::database::GrafeoDB;
1287
1288    #[test]
1289    fn test_session_create_node() {
1290        let db = GrafeoDB::new_in_memory();
1291        let session = db.session();
1292
1293        let id = session.create_node(&["Person"]);
1294        assert!(id.is_valid());
1295        assert_eq!(db.node_count(), 1);
1296    }
1297
1298    #[test]
1299    fn test_session_transaction() {
1300        let db = GrafeoDB::new_in_memory();
1301        let mut session = db.session();
1302
1303        assert!(!session.in_transaction());
1304
1305        session.begin_tx().unwrap();
1306        assert!(session.in_transaction());
1307
1308        session.commit().unwrap();
1309        assert!(!session.in_transaction());
1310    }
1311
1312    #[test]
1313    fn test_session_transaction_context() {
1314        let db = GrafeoDB::new_in_memory();
1315        let mut session = db.session();
1316
1317        // Without transaction - context should have current epoch and no tx_id
1318        let (_epoch1, tx_id1) = session.get_transaction_context();
1319        assert!(tx_id1.is_none());
1320
1321        // Start a transaction
1322        session.begin_tx().unwrap();
1323        let (epoch2, tx_id2) = session.get_transaction_context();
1324        assert!(tx_id2.is_some());
1325        // Transaction should have a valid epoch
1326        let _ = epoch2; // Use the variable
1327
1328        // Commit and verify
1329        session.commit().unwrap();
1330        let (epoch3, tx_id3) = session.get_transaction_context();
1331        assert!(tx_id3.is_none());
1332        // Epoch should have advanced after commit
1333        assert!(epoch3.as_u64() >= epoch2.as_u64());
1334    }
1335
1336    #[test]
1337    fn test_session_rollback() {
1338        let db = GrafeoDB::new_in_memory();
1339        let mut session = db.session();
1340
1341        session.begin_tx().unwrap();
1342        session.rollback().unwrap();
1343        assert!(!session.in_transaction());
1344    }
1345
1346    #[test]
1347    fn test_session_rollback_discards_versions() {
1348        use grafeo_common::types::TxId;
1349
1350        let db = GrafeoDB::new_in_memory();
1351
1352        // Create a node outside of any transaction (at system level)
1353        let node_before = db.store().create_node(&["Person"]);
1354        assert!(node_before.is_valid());
1355        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1356
1357        // Start a transaction
1358        let mut session = db.session();
1359        session.begin_tx().unwrap();
1360        let tx_id = session.current_tx.unwrap();
1361
1362        // Create a node versioned with the transaction's ID
1363        let epoch = db.store().current_epoch();
1364        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1365        assert!(node_in_tx.is_valid());
1366
1367        // Should see 2 nodes at this point
1368        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1369
1370        // Rollback the transaction
1371        session.rollback().unwrap();
1372        assert!(!session.in_transaction());
1373
1374        // The node created in the transaction should be discarded
1375        // Only the first node should remain visible
1376        let count_after = db.node_count();
1377        assert_eq!(
1378            count_after, 1,
1379            "Rollback should discard uncommitted node, but got {count_after}"
1380        );
1381
1382        // The original node should still be accessible
1383        let current_epoch = db.store().current_epoch();
1384        assert!(
1385            db.store()
1386                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1387                .is_some(),
1388            "Original node should still exist"
1389        );
1390
1391        // The node created in the transaction should not be accessible
1392        assert!(
1393            db.store()
1394                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1395                .is_none(),
1396            "Transaction node should be gone"
1397        );
1398    }
1399
1400    #[test]
1401    fn test_session_create_node_in_transaction() {
1402        // Test that session.create_node() is transaction-aware
1403        let db = GrafeoDB::new_in_memory();
1404
1405        // Create a node outside of any transaction
1406        let node_before = db.create_node(&["Person"]);
1407        assert!(node_before.is_valid());
1408        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1409
1410        // Start a transaction and create a node through the session
1411        let mut session = db.session();
1412        session.begin_tx().unwrap();
1413
1414        // Create a node through session.create_node() - should be versioned with tx
1415        let node_in_tx = session.create_node(&["Person"]);
1416        assert!(node_in_tx.is_valid());
1417
1418        // Should see 2 nodes at this point
1419        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1420
1421        // Rollback the transaction
1422        session.rollback().unwrap();
1423
1424        // The node created via session.create_node() should be discarded
1425        let count_after = db.node_count();
1426        assert_eq!(
1427            count_after, 1,
1428            "Rollback should discard node created via session.create_node(), but got {count_after}"
1429        );
1430    }
1431
1432    #[test]
1433    fn test_session_create_node_with_props_in_transaction() {
1434        use grafeo_common::types::Value;
1435
1436        // Test that session.create_node_with_props() is transaction-aware
1437        let db = GrafeoDB::new_in_memory();
1438
1439        // Create a node outside of any transaction
1440        db.create_node(&["Person"]);
1441        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1442
1443        // Start a transaction and create a node with properties
1444        let mut session = db.session();
1445        session.begin_tx().unwrap();
1446
1447        let node_in_tx =
1448            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1449        assert!(node_in_tx.is_valid());
1450
1451        // Should see 2 nodes
1452        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1453
1454        // Rollback the transaction
1455        session.rollback().unwrap();
1456
1457        // The node should be discarded
1458        let count_after = db.node_count();
1459        assert_eq!(
1460            count_after, 1,
1461            "Rollback should discard node created via session.create_node_with_props()"
1462        );
1463    }
1464
1465    #[cfg(feature = "gql")]
1466    mod gql_tests {
1467        use super::*;
1468
1469        #[test]
1470        fn test_gql_query_execution() {
1471            let db = GrafeoDB::new_in_memory();
1472            let session = db.session();
1473
1474            // Create some test data
1475            session.create_node(&["Person"]);
1476            session.create_node(&["Person"]);
1477            session.create_node(&["Animal"]);
1478
1479            // Execute a GQL query
1480            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1481
1482            // Should return 2 Person nodes
1483            assert_eq!(result.row_count(), 2);
1484            assert_eq!(result.column_count(), 1);
1485            assert_eq!(result.columns[0], "n");
1486        }
1487
1488        #[test]
1489        fn test_gql_empty_result() {
1490            let db = GrafeoDB::new_in_memory();
1491            let session = db.session();
1492
1493            // No data in database
1494            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1495
1496            assert_eq!(result.row_count(), 0);
1497        }
1498
1499        #[test]
1500        fn test_gql_parse_error() {
1501            let db = GrafeoDB::new_in_memory();
1502            let session = db.session();
1503
1504            // Invalid GQL syntax
1505            let result = session.execute("MATCH (n RETURN n");
1506
1507            assert!(result.is_err());
1508        }
1509
1510        #[test]
1511        fn test_gql_relationship_traversal() {
1512            let db = GrafeoDB::new_in_memory();
1513            let session = db.session();
1514
1515            // Create a graph: Alice -> Bob, Alice -> Charlie
1516            let alice = session.create_node(&["Person"]);
1517            let bob = session.create_node(&["Person"]);
1518            let charlie = session.create_node(&["Person"]);
1519
1520            session.create_edge(alice, bob, "KNOWS");
1521            session.create_edge(alice, charlie, "KNOWS");
1522
1523            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1524            let result = session
1525                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1526                .unwrap();
1527
1528            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1529            assert_eq!(result.row_count(), 2);
1530            assert_eq!(result.column_count(), 2);
1531            assert_eq!(result.columns[0], "a");
1532            assert_eq!(result.columns[1], "b");
1533        }
1534
1535        #[test]
1536        fn test_gql_relationship_with_type_filter() {
1537            let db = GrafeoDB::new_in_memory();
1538            let session = db.session();
1539
1540            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1541            let alice = session.create_node(&["Person"]);
1542            let bob = session.create_node(&["Person"]);
1543            let charlie = session.create_node(&["Person"]);
1544
1545            session.create_edge(alice, bob, "KNOWS");
1546            session.create_edge(alice, charlie, "WORKS_WITH");
1547
1548            // Query only KNOWS relationships
1549            let result = session
1550                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1551                .unwrap();
1552
1553            // Should return only 1 row (Alice->Bob)
1554            assert_eq!(result.row_count(), 1);
1555        }
1556
1557        #[test]
1558        fn test_gql_semantic_error_undefined_variable() {
1559            let db = GrafeoDB::new_in_memory();
1560            let session = db.session();
1561
1562            // Reference undefined variable 'x' in RETURN
1563            let result = session.execute("MATCH (n:Person) RETURN x");
1564
1565            // Should fail with semantic error
1566            assert!(result.is_err());
1567            let Err(err) = result else {
1568                panic!("Expected error")
1569            };
1570            assert!(
1571                err.to_string().contains("Undefined variable"),
1572                "Expected undefined variable error, got: {}",
1573                err
1574            );
1575        }
1576
1577        #[test]
1578        fn test_gql_where_clause_property_filter() {
1579            use grafeo_common::types::Value;
1580
1581            let db = GrafeoDB::new_in_memory();
1582            let session = db.session();
1583
1584            // Create people with ages
1585            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1586            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1587            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1588
1589            // Query with WHERE clause: age > 30
1590            let result = session
1591                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1592                .unwrap();
1593
1594            // Should return 2 people (ages 35 and 45)
1595            assert_eq!(result.row_count(), 2);
1596        }
1597
1598        #[test]
1599        fn test_gql_where_clause_equality() {
1600            use grafeo_common::types::Value;
1601
1602            let db = GrafeoDB::new_in_memory();
1603            let session = db.session();
1604
1605            // Create people with names
1606            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1607            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1608            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1609
1610            // Query with WHERE clause: name = "Alice"
1611            let result = session
1612                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1613                .unwrap();
1614
1615            // Should return 2 people named Alice
1616            assert_eq!(result.row_count(), 2);
1617        }
1618
1619        #[test]
1620        fn test_gql_return_property_access() {
1621            use grafeo_common::types::Value;
1622
1623            let db = GrafeoDB::new_in_memory();
1624            let session = db.session();
1625
1626            // Create people with names and ages
1627            session.create_node_with_props(
1628                &["Person"],
1629                [
1630                    ("name", Value::String("Alice".into())),
1631                    ("age", Value::Int64(30)),
1632                ],
1633            );
1634            session.create_node_with_props(
1635                &["Person"],
1636                [
1637                    ("name", Value::String("Bob".into())),
1638                    ("age", Value::Int64(25)),
1639                ],
1640            );
1641
1642            // Query returning properties
1643            let result = session
1644                .execute("MATCH (n:Person) RETURN n.name, n.age")
1645                .unwrap();
1646
1647            // Should return 2 rows with name and age columns
1648            assert_eq!(result.row_count(), 2);
1649            assert_eq!(result.column_count(), 2);
1650            assert_eq!(result.columns[0], "n.name");
1651            assert_eq!(result.columns[1], "n.age");
1652
1653            // Check that we get actual values
1654            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1655            assert!(names.contains(&&Value::String("Alice".into())));
1656            assert!(names.contains(&&Value::String("Bob".into())));
1657        }
1658
1659        #[test]
1660        fn test_gql_return_mixed_expressions() {
1661            use grafeo_common::types::Value;
1662
1663            let db = GrafeoDB::new_in_memory();
1664            let session = db.session();
1665
1666            // Create a person
1667            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1668
1669            // Query returning both node and property
1670            let result = session
1671                .execute("MATCH (n:Person) RETURN n, n.name")
1672                .unwrap();
1673
1674            assert_eq!(result.row_count(), 1);
1675            assert_eq!(result.column_count(), 2);
1676            assert_eq!(result.columns[0], "n");
1677            assert_eq!(result.columns[1], "n.name");
1678
1679            // Second column should be the name
1680            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1681        }
1682    }
1683
1684    #[cfg(feature = "cypher")]
1685    mod cypher_tests {
1686        use super::*;
1687
1688        #[test]
1689        fn test_cypher_query_execution() {
1690            let db = GrafeoDB::new_in_memory();
1691            let session = db.session();
1692
1693            // Create some test data
1694            session.create_node(&["Person"]);
1695            session.create_node(&["Person"]);
1696            session.create_node(&["Animal"]);
1697
1698            // Execute a Cypher query
1699            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1700
1701            // Should return 2 Person nodes
1702            assert_eq!(result.row_count(), 2);
1703            assert_eq!(result.column_count(), 1);
1704            assert_eq!(result.columns[0], "n");
1705        }
1706
1707        #[test]
1708        fn test_cypher_empty_result() {
1709            let db = GrafeoDB::new_in_memory();
1710            let session = db.session();
1711
1712            // No data in database
1713            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1714
1715            assert_eq!(result.row_count(), 0);
1716        }
1717
1718        #[test]
1719        fn test_cypher_parse_error() {
1720            let db = GrafeoDB::new_in_memory();
1721            let session = db.session();
1722
1723            // Invalid Cypher syntax
1724            let result = session.execute_cypher("MATCH (n RETURN n");
1725
1726            assert!(result.is_err());
1727        }
1728    }
1729
1730    // ==================== Direct Lookup API Tests ====================
1731
1732    mod direct_lookup_tests {
1733        use super::*;
1734        use grafeo_common::types::Value;
1735
1736        #[test]
1737        fn test_get_node() {
1738            let db = GrafeoDB::new_in_memory();
1739            let session = db.session();
1740
1741            let id = session.create_node(&["Person"]);
1742            let node = session.get_node(id);
1743
1744            assert!(node.is_some());
1745            let node = node.unwrap();
1746            assert_eq!(node.id, id);
1747        }
1748
1749        #[test]
1750        fn test_get_node_not_found() {
1751            use grafeo_common::types::NodeId;
1752
1753            let db = GrafeoDB::new_in_memory();
1754            let session = db.session();
1755
1756            // Try to get a non-existent node
1757            let node = session.get_node(NodeId::new(9999));
1758            assert!(node.is_none());
1759        }
1760
1761        #[test]
1762        fn test_get_node_property() {
1763            let db = GrafeoDB::new_in_memory();
1764            let session = db.session();
1765
1766            let id = session
1767                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1768
1769            let name = session.get_node_property(id, "name");
1770            assert_eq!(name, Some(Value::String("Alice".into())));
1771
1772            // Non-existent property
1773            let missing = session.get_node_property(id, "missing");
1774            assert!(missing.is_none());
1775        }
1776
1777        #[test]
1778        fn test_get_edge() {
1779            let db = GrafeoDB::new_in_memory();
1780            let session = db.session();
1781
1782            let alice = session.create_node(&["Person"]);
1783            let bob = session.create_node(&["Person"]);
1784            let edge_id = session.create_edge(alice, bob, "KNOWS");
1785
1786            let edge = session.get_edge(edge_id);
1787            assert!(edge.is_some());
1788            let edge = edge.unwrap();
1789            assert_eq!(edge.id, edge_id);
1790            assert_eq!(edge.src, alice);
1791            assert_eq!(edge.dst, bob);
1792        }
1793
1794        #[test]
1795        fn test_get_edge_not_found() {
1796            use grafeo_common::types::EdgeId;
1797
1798            let db = GrafeoDB::new_in_memory();
1799            let session = db.session();
1800
1801            let edge = session.get_edge(EdgeId::new(9999));
1802            assert!(edge.is_none());
1803        }
1804
1805        #[test]
1806        fn test_get_neighbors_outgoing() {
1807            let db = GrafeoDB::new_in_memory();
1808            let session = db.session();
1809
1810            let alice = session.create_node(&["Person"]);
1811            let bob = session.create_node(&["Person"]);
1812            let carol = session.create_node(&["Person"]);
1813
1814            session.create_edge(alice, bob, "KNOWS");
1815            session.create_edge(alice, carol, "KNOWS");
1816
1817            let neighbors = session.get_neighbors_outgoing(alice);
1818            assert_eq!(neighbors.len(), 2);
1819
1820            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1821            assert!(neighbor_ids.contains(&bob));
1822            assert!(neighbor_ids.contains(&carol));
1823        }
1824
1825        #[test]
1826        fn test_get_neighbors_incoming() {
1827            let db = GrafeoDB::new_in_memory();
1828            let session = db.session();
1829
1830            let alice = session.create_node(&["Person"]);
1831            let bob = session.create_node(&["Person"]);
1832            let carol = session.create_node(&["Person"]);
1833
1834            session.create_edge(bob, alice, "KNOWS");
1835            session.create_edge(carol, alice, "KNOWS");
1836
1837            let neighbors = session.get_neighbors_incoming(alice);
1838            assert_eq!(neighbors.len(), 2);
1839
1840            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1841            assert!(neighbor_ids.contains(&bob));
1842            assert!(neighbor_ids.contains(&carol));
1843        }
1844
1845        #[test]
1846        fn test_get_neighbors_outgoing_by_type() {
1847            let db = GrafeoDB::new_in_memory();
1848            let session = db.session();
1849
1850            let alice = session.create_node(&["Person"]);
1851            let bob = session.create_node(&["Person"]);
1852            let company = session.create_node(&["Company"]);
1853
1854            session.create_edge(alice, bob, "KNOWS");
1855            session.create_edge(alice, company, "WORKS_AT");
1856
1857            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1858            assert_eq!(knows_neighbors.len(), 1);
1859            assert_eq!(knows_neighbors[0].0, bob);
1860
1861            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1862            assert_eq!(works_neighbors.len(), 1);
1863            assert_eq!(works_neighbors[0].0, company);
1864
1865            // No edges of this type
1866            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1867            assert!(no_neighbors.is_empty());
1868        }
1869
1870        #[test]
1871        fn test_node_exists() {
1872            use grafeo_common::types::NodeId;
1873
1874            let db = GrafeoDB::new_in_memory();
1875            let session = db.session();
1876
1877            let id = session.create_node(&["Person"]);
1878
1879            assert!(session.node_exists(id));
1880            assert!(!session.node_exists(NodeId::new(9999)));
1881        }
1882
1883        #[test]
1884        fn test_edge_exists() {
1885            use grafeo_common::types::EdgeId;
1886
1887            let db = GrafeoDB::new_in_memory();
1888            let session = db.session();
1889
1890            let alice = session.create_node(&["Person"]);
1891            let bob = session.create_node(&["Person"]);
1892            let edge_id = session.create_edge(alice, bob, "KNOWS");
1893
1894            assert!(session.edge_exists(edge_id));
1895            assert!(!session.edge_exists(EdgeId::new(9999)));
1896        }
1897
1898        #[test]
1899        fn test_get_degree() {
1900            let db = GrafeoDB::new_in_memory();
1901            let session = db.session();
1902
1903            let alice = session.create_node(&["Person"]);
1904            let bob = session.create_node(&["Person"]);
1905            let carol = session.create_node(&["Person"]);
1906
1907            // Alice knows Bob and Carol (2 outgoing)
1908            session.create_edge(alice, bob, "KNOWS");
1909            session.create_edge(alice, carol, "KNOWS");
1910            // Bob knows Alice (1 incoming for Alice)
1911            session.create_edge(bob, alice, "KNOWS");
1912
1913            let (out_degree, in_degree) = session.get_degree(alice);
1914            assert_eq!(out_degree, 2);
1915            assert_eq!(in_degree, 1);
1916
1917            // Node with no edges
1918            let lonely = session.create_node(&["Person"]);
1919            let (out, in_deg) = session.get_degree(lonely);
1920            assert_eq!(out, 0);
1921            assert_eq!(in_deg, 0);
1922        }
1923
1924        #[test]
1925        fn test_get_nodes_batch() {
1926            let db = GrafeoDB::new_in_memory();
1927            let session = db.session();
1928
1929            let alice = session.create_node(&["Person"]);
1930            let bob = session.create_node(&["Person"]);
1931            let carol = session.create_node(&["Person"]);
1932
1933            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1934            assert_eq!(nodes.len(), 3);
1935            assert!(nodes[0].is_some());
1936            assert!(nodes[1].is_some());
1937            assert!(nodes[2].is_some());
1938
1939            // With non-existent node
1940            use grafeo_common::types::NodeId;
1941            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1942            assert_eq!(nodes_with_missing.len(), 3);
1943            assert!(nodes_with_missing[0].is_some());
1944            assert!(nodes_with_missing[1].is_none()); // Missing node
1945            assert!(nodes_with_missing[2].is_some());
1946        }
1947
1948        #[test]
1949        fn test_auto_commit_setting() {
1950            let db = GrafeoDB::new_in_memory();
1951            let mut session = db.session();
1952
1953            // Default is auto-commit enabled
1954            assert!(session.auto_commit());
1955
1956            session.set_auto_commit(false);
1957            assert!(!session.auto_commit());
1958
1959            session.set_auto_commit(true);
1960            assert!(session.auto_commit());
1961        }
1962
1963        #[test]
1964        fn test_transaction_double_begin_error() {
1965            let db = GrafeoDB::new_in_memory();
1966            let mut session = db.session();
1967
1968            session.begin_tx().unwrap();
1969            let result = session.begin_tx();
1970
1971            assert!(result.is_err());
1972            // Clean up
1973            session.rollback().unwrap();
1974        }
1975
1976        #[test]
1977        fn test_commit_without_transaction_error() {
1978            let db = GrafeoDB::new_in_memory();
1979            let mut session = db.session();
1980
1981            let result = session.commit();
1982            assert!(result.is_err());
1983        }
1984
1985        #[test]
1986        fn test_rollback_without_transaction_error() {
1987            let db = GrafeoDB::new_in_memory();
1988            let mut session = db.session();
1989
1990            let result = session.rollback();
1991            assert!(result.is_err());
1992        }
1993
1994        #[test]
1995        fn test_create_edge_in_transaction() {
1996            let db = GrafeoDB::new_in_memory();
1997            let mut session = db.session();
1998
1999            // Create nodes outside transaction
2000            let alice = session.create_node(&["Person"]);
2001            let bob = session.create_node(&["Person"]);
2002
2003            // Create edge in transaction
2004            session.begin_tx().unwrap();
2005            let edge_id = session.create_edge(alice, bob, "KNOWS");
2006
2007            // Edge should be visible in the transaction
2008            assert!(session.edge_exists(edge_id));
2009
2010            // Commit
2011            session.commit().unwrap();
2012
2013            // Edge should still be visible
2014            assert!(session.edge_exists(edge_id));
2015        }
2016
2017        #[test]
2018        fn test_neighbors_empty_node() {
2019            let db = GrafeoDB::new_in_memory();
2020            let session = db.session();
2021
2022            let lonely = session.create_node(&["Person"]);
2023
2024            assert!(session.get_neighbors_outgoing(lonely).is_empty());
2025            assert!(session.get_neighbors_incoming(lonely).is_empty());
2026            assert!(
2027                session
2028                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
2029                    .is_empty()
2030            );
2031        }
2032    }
2033}