Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::{AdaptiveConfig, GraphModel};
19use crate::database::QueryResult;
20use crate::query::cache::QueryCache;
21use crate::transaction::TransactionManager;
22
23/// Your handle to the database - execute queries and manage transactions.
24///
25/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
26/// tracks its own transaction state, so you can have multiple concurrent
27/// sessions without them interfering.
28pub struct Session {
29    /// The underlying store.
30    store: Arc<LpgStore>,
31    /// RDF triple store (if RDF feature is enabled).
32    #[cfg(feature = "rdf")]
33    rdf_store: Arc<RdfStore>,
34    /// Transaction manager.
35    tx_manager: Arc<TransactionManager>,
36    /// Query cache shared across sessions.
37    query_cache: Arc<QueryCache>,
38    /// Current transaction ID (if any).
39    current_tx: Option<TxId>,
40    /// Whether the session is in auto-commit mode.
41    auto_commit: bool,
42    /// Adaptive execution configuration.
43    #[allow(dead_code)]
44    adaptive_config: AdaptiveConfig,
45    /// Whether to use factorized execution for multi-hop queries.
46    factorized_execution: bool,
47    /// The graph data model this session operates on.
48    graph_model: GraphModel,
49    /// Maximum time a query may run before being cancelled.
50    query_timeout: Option<Duration>,
51    /// Shared commit counter for triggering auto-GC.
52    commit_counter: Arc<AtomicUsize>,
53    /// GC every N commits (0 = disabled).
54    gc_interval: usize,
55    /// CDC log for change tracking.
56    #[cfg(feature = "cdc")]
57    cdc_log: Arc<crate::cdc::CdcLog>,
58}
59
60impl Session {
61    /// Creates a new session with adaptive execution configuration.
62    #[allow(dead_code, clippy::too_many_arguments)]
63    pub(crate) fn with_adaptive(
64        store: Arc<LpgStore>,
65        tx_manager: Arc<TransactionManager>,
66        query_cache: Arc<QueryCache>,
67        adaptive_config: AdaptiveConfig,
68        factorized_execution: bool,
69        graph_model: GraphModel,
70        query_timeout: Option<Duration>,
71        commit_counter: Arc<AtomicUsize>,
72        gc_interval: usize,
73    ) -> Self {
74        Self {
75            store,
76            #[cfg(feature = "rdf")]
77            rdf_store: Arc::new(RdfStore::new()),
78            tx_manager,
79            query_cache,
80            current_tx: None,
81            auto_commit: true,
82            adaptive_config,
83            factorized_execution,
84            graph_model,
85            query_timeout,
86            commit_counter,
87            gc_interval,
88            #[cfg(feature = "cdc")]
89            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
90        }
91    }
92
93    /// Sets the CDC log for this session (shared with the database).
94    #[cfg(feature = "cdc")]
95    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
96        self.cdc_log = cdc_log;
97    }
98
99    /// Creates a new session with RDF store and adaptive configuration.
100    #[cfg(feature = "rdf")]
101    #[allow(clippy::too_many_arguments)]
102    pub(crate) fn with_rdf_store_and_adaptive(
103        store: Arc<LpgStore>,
104        rdf_store: Arc<RdfStore>,
105        tx_manager: Arc<TransactionManager>,
106        query_cache: Arc<QueryCache>,
107        adaptive_config: AdaptiveConfig,
108        factorized_execution: bool,
109        graph_model: GraphModel,
110        query_timeout: Option<Duration>,
111        commit_counter: Arc<AtomicUsize>,
112        gc_interval: usize,
113    ) -> Self {
114        Self {
115            store,
116            rdf_store,
117            tx_manager,
118            query_cache,
119            current_tx: None,
120            auto_commit: true,
121            adaptive_config,
122            factorized_execution,
123            graph_model,
124            query_timeout,
125            commit_counter,
126            gc_interval,
127            #[cfg(feature = "cdc")]
128            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
129        }
130    }
131
132    /// Returns the graph model this session operates on.
133    #[must_use]
134    pub fn graph_model(&self) -> GraphModel {
135        self.graph_model
136    }
137
138    /// Checks that the session's graph model supports LPG operations.
139    fn require_lpg(&self, language: &str) -> Result<()> {
140        if self.graph_model == GraphModel::Rdf {
141            return Err(grafeo_common::utils::error::Error::Internal(format!(
142                "This is an RDF database. {language} queries require an LPG database."
143            )));
144        }
145        Ok(())
146    }
147
148    /// Executes a GQL query.
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if the query fails to parse or execute.
153    ///
154    /// # Examples
155    ///
156    /// ```no_run
157    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
158    /// use grafeo_engine::GrafeoDB;
159    ///
160    /// let db = GrafeoDB::new_in_memory();
161    /// let session = db.session();
162    ///
163    /// // Create a node
164    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
165    ///
166    /// // Query nodes
167    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
168    /// for row in &result.rows {
169    ///     println!("{:?}", row);
170    /// }
171    /// # Ok(())
172    /// # }
173    /// ```
174    #[cfg(feature = "gql")]
175    pub fn execute(&self, query: &str) -> Result<QueryResult> {
176        self.require_lpg("GQL")?;
177
178        use crate::query::{
179            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
180            optimizer::Optimizer, processor::QueryLanguage,
181        };
182
183        let start_time = std::time::Instant::now();
184
185        // Create cache key for this query
186        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
187
188        // Try to get cached optimized plan
189        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
190            // Cache hit - skip parsing, translation, binding, and optimization
191            cached_plan
192        } else {
193            // Cache miss - run full pipeline
194
195            // Parse and translate the query to a logical plan
196            let logical_plan = gql_translator::translate(query)?;
197
198            // Semantic validation
199            let mut binder = Binder::new();
200            let _binding_context = binder.bind(&logical_plan)?;
201
202            // Optimize the plan
203            let optimizer = Optimizer::from_store(&self.store);
204            let plan = optimizer.optimize(logical_plan)?;
205
206            // Cache the optimized plan for future use
207            self.query_cache.put_optimized(cache_key, plan.clone());
208
209            plan
210        };
211
212        // Get transaction context for MVCC visibility
213        let (viewing_epoch, tx_id) = self.get_transaction_context();
214
215        // Convert to physical plan with transaction context
216        // (Physical planning cannot be cached as it depends on transaction state)
217        let planner = Planner::with_context(
218            Arc::clone(&self.store),
219            Arc::clone(&self.tx_manager),
220            tx_id,
221            viewing_epoch,
222        )
223        .with_factorized_execution(self.factorized_execution);
224        let mut physical_plan = planner.plan(&optimized_plan)?;
225
226        // Execute the plan
227        let executor = Executor::with_columns(physical_plan.columns.clone())
228            .with_deadline(self.query_deadline());
229        let mut result = executor.execute(physical_plan.operator.as_mut())?;
230
231        // Add execution metrics
232        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
233        let rows_scanned = result.rows.len() as u64;
234        result.execution_time_ms = Some(elapsed_ms);
235        result.rows_scanned = Some(rows_scanned);
236
237        Ok(result)
238    }
239
240    /// Executes a GQL query with parameters.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if the query fails to parse or execute.
245    #[cfg(feature = "gql")]
246    pub fn execute_with_params(
247        &self,
248        query: &str,
249        params: std::collections::HashMap<String, Value>,
250    ) -> Result<QueryResult> {
251        self.require_lpg("GQL")?;
252
253        use crate::query::processor::{QueryLanguage, QueryProcessor};
254
255        // Get transaction context for MVCC visibility
256        let (viewing_epoch, tx_id) = self.get_transaction_context();
257
258        // Create processor with transaction context
259        let processor =
260            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
261
262        // Apply transaction context if in a transaction
263        let processor = if let Some(tx_id) = tx_id {
264            processor.with_tx_context(viewing_epoch, tx_id)
265        } else {
266            processor
267        };
268
269        processor.process(query, QueryLanguage::Gql, Some(&params))
270    }
271
272    /// Executes a GQL query with parameters.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if no query language is enabled.
277    #[cfg(not(any(feature = "gql", feature = "cypher")))]
278    pub fn execute_with_params(
279        &self,
280        _query: &str,
281        _params: std::collections::HashMap<String, Value>,
282    ) -> Result<QueryResult> {
283        Err(grafeo_common::utils::error::Error::Internal(
284            "No query language enabled".to_string(),
285        ))
286    }
287
288    /// Executes a GQL query.
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if no query language is enabled.
293    #[cfg(not(any(feature = "gql", feature = "cypher")))]
294    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
295        Err(grafeo_common::utils::error::Error::Internal(
296            "No query language enabled".to_string(),
297        ))
298    }
299
300    /// Executes a Cypher query.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the query fails to parse or execute.
305    #[cfg(feature = "cypher")]
306    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
307        use crate::query::{
308            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
309            optimizer::Optimizer, processor::QueryLanguage,
310        };
311
312        // Create cache key for this query
313        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
314
315        // Try to get cached optimized plan
316        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
317            cached_plan
318        } else {
319            // Parse and translate the query to a logical plan
320            let logical_plan = cypher_translator::translate(query)?;
321
322            // Semantic validation
323            let mut binder = Binder::new();
324            let _binding_context = binder.bind(&logical_plan)?;
325
326            // Optimize the plan
327            let optimizer = Optimizer::from_store(&self.store);
328            let plan = optimizer.optimize(logical_plan)?;
329
330            // Cache the optimized plan
331            self.query_cache.put_optimized(cache_key, plan.clone());
332
333            plan
334        };
335
336        // Get transaction context for MVCC visibility
337        let (viewing_epoch, tx_id) = self.get_transaction_context();
338
339        // Convert to physical plan with transaction context
340        let planner = Planner::with_context(
341            Arc::clone(&self.store),
342            Arc::clone(&self.tx_manager),
343            tx_id,
344            viewing_epoch,
345        )
346        .with_factorized_execution(self.factorized_execution);
347        let mut physical_plan = planner.plan(&optimized_plan)?;
348
349        // Execute the plan
350        let executor = Executor::with_columns(physical_plan.columns.clone())
351            .with_deadline(self.query_deadline());
352        let result = executor.execute(physical_plan.operator.as_mut())?;
353        Ok(result)
354    }
355
356    /// Executes a Gremlin query.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the query fails to parse or execute.
361    ///
362    /// # Examples
363    ///
364    /// ```no_run
365    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
366    /// use grafeo_engine::GrafeoDB;
367    ///
368    /// let db = GrafeoDB::new_in_memory();
369    /// let session = db.session();
370    ///
371    /// // Create some nodes first
372    /// session.create_node(&["Person"]);
373    ///
374    /// // Query using Gremlin
375    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
376    /// # Ok(())
377    /// # }
378    /// ```
379    #[cfg(feature = "gremlin")]
380    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
381        use crate::query::{
382            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
383        };
384
385        // Parse and translate the query to a logical plan
386        let logical_plan = gremlin_translator::translate(query)?;
387
388        // Semantic validation
389        let mut binder = Binder::new();
390        let _binding_context = binder.bind(&logical_plan)?;
391
392        // Optimize the plan
393        let optimizer = Optimizer::from_store(&self.store);
394        let optimized_plan = optimizer.optimize(logical_plan)?;
395
396        // Get transaction context for MVCC visibility
397        let (viewing_epoch, tx_id) = self.get_transaction_context();
398
399        // Convert to physical plan with transaction context
400        let planner = Planner::with_context(
401            Arc::clone(&self.store),
402            Arc::clone(&self.tx_manager),
403            tx_id,
404            viewing_epoch,
405        )
406        .with_factorized_execution(self.factorized_execution);
407        let mut physical_plan = planner.plan(&optimized_plan)?;
408
409        // Execute the plan
410        let executor = Executor::with_columns(physical_plan.columns.clone())
411            .with_deadline(self.query_deadline());
412        let result = executor.execute(physical_plan.operator.as_mut())?;
413        Ok(result)
414    }
415
416    /// Executes a Gremlin query with parameters.
417    ///
418    /// # Errors
419    ///
420    /// Returns an error if the query fails to parse or execute.
421    #[cfg(feature = "gremlin")]
422    pub fn execute_gremlin_with_params(
423        &self,
424        query: &str,
425        params: std::collections::HashMap<String, Value>,
426    ) -> Result<QueryResult> {
427        use crate::query::processor::{QueryLanguage, QueryProcessor};
428
429        // Get transaction context for MVCC visibility
430        let (viewing_epoch, tx_id) = self.get_transaction_context();
431
432        // Create processor with transaction context
433        let processor =
434            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
435
436        // Apply transaction context if in a transaction
437        let processor = if let Some(tx_id) = tx_id {
438            processor.with_tx_context(viewing_epoch, tx_id)
439        } else {
440            processor
441        };
442
443        processor.process(query, QueryLanguage::Gremlin, Some(&params))
444    }
445
446    /// Executes a GraphQL query against the LPG store.
447    ///
448    /// # Errors
449    ///
450    /// Returns an error if the query fails to parse or execute.
451    ///
452    /// # Examples
453    ///
454    /// ```no_run
455    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
456    /// use grafeo_engine::GrafeoDB;
457    ///
458    /// let db = GrafeoDB::new_in_memory();
459    /// let session = db.session();
460    ///
461    /// // Create some nodes first
462    /// session.create_node(&["User"]);
463    ///
464    /// // Query using GraphQL
465    /// let result = session.execute_graphql("query { user { id name } }")?;
466    /// # Ok(())
467    /// # }
468    /// ```
469    #[cfg(feature = "graphql")]
470    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
471        use crate::query::{
472            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
473        };
474
475        // Parse and translate the query to a logical plan
476        let logical_plan = graphql_translator::translate(query)?;
477
478        // Semantic validation
479        let mut binder = Binder::new();
480        let _binding_context = binder.bind(&logical_plan)?;
481
482        // Optimize the plan
483        let optimizer = Optimizer::from_store(&self.store);
484        let optimized_plan = optimizer.optimize(logical_plan)?;
485
486        // Get transaction context for MVCC visibility
487        let (viewing_epoch, tx_id) = self.get_transaction_context();
488
489        // Convert to physical plan with transaction context
490        let planner = Planner::with_context(
491            Arc::clone(&self.store),
492            Arc::clone(&self.tx_manager),
493            tx_id,
494            viewing_epoch,
495        )
496        .with_factorized_execution(self.factorized_execution);
497        let mut physical_plan = planner.plan(&optimized_plan)?;
498
499        // Execute the plan
500        let executor = Executor::with_columns(physical_plan.columns.clone())
501            .with_deadline(self.query_deadline());
502        let result = executor.execute(physical_plan.operator.as_mut())?;
503        Ok(result)
504    }
505
506    /// Executes a GraphQL query with parameters.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if the query fails to parse or execute.
511    #[cfg(feature = "graphql")]
512    pub fn execute_graphql_with_params(
513        &self,
514        query: &str,
515        params: std::collections::HashMap<String, Value>,
516    ) -> Result<QueryResult> {
517        use crate::query::processor::{QueryLanguage, QueryProcessor};
518
519        // Get transaction context for MVCC visibility
520        let (viewing_epoch, tx_id) = self.get_transaction_context();
521
522        // Create processor with transaction context
523        let processor =
524            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
525
526        // Apply transaction context if in a transaction
527        let processor = if let Some(tx_id) = tx_id {
528            processor.with_tx_context(viewing_epoch, tx_id)
529        } else {
530            processor
531        };
532
533        processor.process(query, QueryLanguage::GraphQL, Some(&params))
534    }
535
536    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
537    ///
538    /// # Errors
539    ///
540    /// Returns an error if the query fails to parse or execute.
541    ///
542    /// # Examples
543    ///
544    /// ```no_run
545    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
546    /// use grafeo_engine::GrafeoDB;
547    ///
548    /// let db = GrafeoDB::new_in_memory();
549    /// let session = db.session();
550    ///
551    /// let result = session.execute_sql(
552    ///     "SELECT * FROM GRAPH_TABLE (
553    ///         MATCH (n:Person)
554    ///         COLUMNS (n.name AS name)
555    ///     )"
556    /// )?;
557    /// # Ok(())
558    /// # }
559    /// ```
560    #[cfg(feature = "sql-pgq")]
561    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
562        use crate::query::{
563            Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
564            plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
565        };
566
567        // Parse and translate (always needed to check for DDL)
568        let logical_plan = sql_pgq_translator::translate(query)?;
569
570        // Handle DDL statements directly (they don't go through the query pipeline)
571        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
572            return Ok(QueryResult {
573                columns: vec!["status".into()],
574                column_types: vec![grafeo_common::types::LogicalType::String],
575                rows: vec![vec![Value::from(format!(
576                    "Property graph '{}' created ({} node tables, {} edge tables)",
577                    cpg.name,
578                    cpg.node_tables.len(),
579                    cpg.edge_tables.len()
580                ))]],
581                execution_time_ms: None,
582                rows_scanned: None,
583            });
584        }
585
586        // Create cache key for query plans
587        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
588
589        // Try to get cached optimized plan
590        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
591            cached_plan
592        } else {
593            // Semantic validation
594            let mut binder = Binder::new();
595            let _binding_context = binder.bind(&logical_plan)?;
596
597            // Optimize the plan
598            let optimizer = Optimizer::from_store(&self.store);
599            let plan = optimizer.optimize(logical_plan)?;
600
601            // Cache the optimized plan
602            self.query_cache.put_optimized(cache_key, plan.clone());
603
604            plan
605        };
606
607        // Get transaction context for MVCC visibility
608        let (viewing_epoch, tx_id) = self.get_transaction_context();
609
610        // Convert to physical plan with transaction context
611        let planner = Planner::with_context(
612            Arc::clone(&self.store),
613            Arc::clone(&self.tx_manager),
614            tx_id,
615            viewing_epoch,
616        )
617        .with_factorized_execution(self.factorized_execution);
618        let mut physical_plan = planner.plan(&optimized_plan)?;
619
620        // Execute the plan
621        let executor = Executor::with_columns(physical_plan.columns.clone())
622            .with_deadline(self.query_deadline());
623        let result = executor.execute(physical_plan.operator.as_mut())?;
624        Ok(result)
625    }
626
627    /// Executes a SQL/PGQ query with parameters.
628    ///
629    /// # Errors
630    ///
631    /// Returns an error if the query fails to parse or execute.
632    #[cfg(feature = "sql-pgq")]
633    pub fn execute_sql_with_params(
634        &self,
635        query: &str,
636        params: std::collections::HashMap<String, Value>,
637    ) -> Result<QueryResult> {
638        use crate::query::processor::{QueryLanguage, QueryProcessor};
639
640        // Get transaction context for MVCC visibility
641        let (viewing_epoch, tx_id) = self.get_transaction_context();
642
643        // Create processor with transaction context
644        let processor =
645            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
646
647        // Apply transaction context if in a transaction
648        let processor = if let Some(tx_id) = tx_id {
649            processor.with_tx_context(viewing_epoch, tx_id)
650        } else {
651            processor
652        };
653
654        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
655    }
656
657    /// Executes a SPARQL query.
658    ///
659    /// # Errors
660    ///
661    /// Returns an error if the query fails to parse or execute.
662    #[cfg(all(feature = "sparql", feature = "rdf"))]
663    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
664        use crate::query::{
665            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
666        };
667
668        // Parse and translate the SPARQL query to a logical plan
669        let logical_plan = sparql_translator::translate(query)?;
670
671        // Optimize the plan
672        let optimizer = Optimizer::from_store(&self.store);
673        let optimized_plan = optimizer.optimize(logical_plan)?;
674
675        // Convert to physical plan using RDF planner
676        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
677        let mut physical_plan = planner.plan(&optimized_plan)?;
678
679        // Execute the plan
680        let executor = Executor::with_columns(physical_plan.columns.clone())
681            .with_deadline(self.query_deadline());
682        executor.execute(physical_plan.operator.as_mut())
683    }
684
685    /// Executes a SPARQL query with parameters.
686    ///
687    /// # Errors
688    ///
689    /// Returns an error if the query fails to parse or execute.
690    #[cfg(all(feature = "sparql", feature = "rdf"))]
691    pub fn execute_sparql_with_params(
692        &self,
693        query: &str,
694        _params: std::collections::HashMap<String, Value>,
695    ) -> Result<QueryResult> {
696        // TODO: Implement parameter substitution for SPARQL
697        // For now, just execute the query without parameters
698        self.execute_sparql(query)
699    }
700
701    /// Begins a new transaction.
702    ///
703    /// # Errors
704    ///
705    /// Returns an error if a transaction is already active.
706    ///
707    /// # Examples
708    ///
709    /// ```no_run
710    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
711    /// use grafeo_engine::GrafeoDB;
712    ///
713    /// let db = GrafeoDB::new_in_memory();
714    /// let mut session = db.session();
715    ///
716    /// session.begin_tx()?;
717    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
718    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
719    /// session.commit()?; // Both inserts committed atomically
720    /// # Ok(())
721    /// # }
722    /// ```
723    pub fn begin_tx(&mut self) -> Result<()> {
724        if self.current_tx.is_some() {
725            return Err(grafeo_common::utils::error::Error::Transaction(
726                grafeo_common::utils::error::TransactionError::InvalidState(
727                    "Transaction already active".to_string(),
728                ),
729            ));
730        }
731
732        let tx_id = self.tx_manager.begin();
733        self.current_tx = Some(tx_id);
734        Ok(())
735    }
736
737    /// Begins a transaction with a specific isolation level.
738    ///
739    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
740    ///
741    /// # Errors
742    ///
743    /// Returns an error if a transaction is already active.
744    pub fn begin_tx_with_isolation(
745        &mut self,
746        isolation_level: crate::transaction::IsolationLevel,
747    ) -> Result<()> {
748        if self.current_tx.is_some() {
749            return Err(grafeo_common::utils::error::Error::Transaction(
750                grafeo_common::utils::error::TransactionError::InvalidState(
751                    "Transaction already active".to_string(),
752                ),
753            ));
754        }
755
756        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
757        self.current_tx = Some(tx_id);
758        Ok(())
759    }
760
761    /// Commits the current transaction.
762    ///
763    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if no transaction is active.
768    pub fn commit(&mut self) -> Result<()> {
769        let tx_id = self.current_tx.take().ok_or_else(|| {
770            grafeo_common::utils::error::Error::Transaction(
771                grafeo_common::utils::error::TransactionError::InvalidState(
772                    "No active transaction".to_string(),
773                ),
774            )
775        })?;
776
777        // Commit RDF store pending operations
778        #[cfg(feature = "rdf")]
779        self.rdf_store.commit_tx(tx_id);
780
781        self.tx_manager.commit(tx_id)?;
782
783        // Sync the LpgStore epoch with the TxManager so that
784        // convenience lookups (edge_type, get_edge, get_node) that use
785        // store.current_epoch() can see versions created at the latest epoch.
786        self.store.sync_epoch(self.tx_manager.current_epoch());
787
788        // Auto-GC: periodically prune old MVCC versions
789        if self.gc_interval > 0 {
790            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
791            if count.is_multiple_of(self.gc_interval) {
792                let min_epoch = self.tx_manager.min_active_epoch();
793                self.store.gc_versions(min_epoch);
794                self.tx_manager.gc();
795            }
796        }
797
798        Ok(())
799    }
800
801    /// Aborts the current transaction.
802    ///
803    /// Discards all changes since [`begin_tx`](Self::begin_tx).
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if no transaction is active.
808    ///
809    /// # Examples
810    ///
811    /// ```no_run
812    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
813    /// use grafeo_engine::GrafeoDB;
814    ///
815    /// let db = GrafeoDB::new_in_memory();
816    /// let mut session = db.session();
817    ///
818    /// session.begin_tx()?;
819    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
820    /// session.rollback()?; // Insert is discarded
821    /// # Ok(())
822    /// # }
823    /// ```
824    pub fn rollback(&mut self) -> Result<()> {
825        let tx_id = self.current_tx.take().ok_or_else(|| {
826            grafeo_common::utils::error::Error::Transaction(
827                grafeo_common::utils::error::TransactionError::InvalidState(
828                    "No active transaction".to_string(),
829                ),
830            )
831        })?;
832
833        // Discard uncommitted versions in the LPG store
834        self.store.discard_uncommitted_versions(tx_id);
835
836        // Discard pending operations in the RDF store
837        #[cfg(feature = "rdf")]
838        self.rdf_store.rollback_tx(tx_id);
839
840        // Mark transaction as aborted in the manager
841        self.tx_manager.abort(tx_id)
842    }
843
844    /// Returns whether a transaction is active.
845    #[must_use]
846    pub fn in_transaction(&self) -> bool {
847        self.current_tx.is_some()
848    }
849
850    /// Sets auto-commit mode.
851    pub fn set_auto_commit(&mut self, auto_commit: bool) {
852        self.auto_commit = auto_commit;
853    }
854
855    /// Returns whether auto-commit is enabled.
856    #[must_use]
857    pub fn auto_commit(&self) -> bool {
858        self.auto_commit
859    }
860
861    /// Computes the wall-clock deadline for query execution.
862    #[must_use]
863    fn query_deadline(&self) -> Option<Instant> {
864        self.query_timeout.map(|d| Instant::now() + d)
865    }
866
867    /// Returns the current transaction context for MVCC visibility.
868    ///
869    /// Returns `(viewing_epoch, tx_id)` where:
870    /// - `viewing_epoch` is the epoch at which to check version visibility
871    /// - `tx_id` is the current transaction ID (if in a transaction)
872    #[must_use]
873    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
874        if let Some(tx_id) = self.current_tx {
875            // In a transaction - use the transaction's start epoch
876            let epoch = self
877                .tx_manager
878                .start_epoch(tx_id)
879                .unwrap_or_else(|| self.tx_manager.current_epoch());
880            (epoch, Some(tx_id))
881        } else {
882            // No transaction - use current epoch
883            (self.tx_manager.current_epoch(), None)
884        }
885    }
886
887    /// Creates a node directly (bypassing query execution).
888    ///
889    /// This is a low-level API for testing and direct manipulation.
890    /// If a transaction is active, the node will be versioned with the transaction ID.
891    pub fn create_node(&self, labels: &[&str]) -> NodeId {
892        let (epoch, tx_id) = self.get_transaction_context();
893        self.store
894            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
895    }
896
897    /// Creates a node with properties.
898    ///
899    /// If a transaction is active, the node will be versioned with the transaction ID.
900    pub fn create_node_with_props<'a>(
901        &self,
902        labels: &[&str],
903        properties: impl IntoIterator<Item = (&'a str, Value)>,
904    ) -> NodeId {
905        let (epoch, tx_id) = self.get_transaction_context();
906        self.store.create_node_with_props_versioned(
907            labels,
908            properties.into_iter().map(|(k, v)| (k, v)),
909            epoch,
910            tx_id.unwrap_or(TxId::SYSTEM),
911        )
912    }
913
914    /// Creates an edge between two nodes.
915    ///
916    /// This is a low-level API for testing and direct manipulation.
917    /// If a transaction is active, the edge will be versioned with the transaction ID.
918    pub fn create_edge(
919        &self,
920        src: NodeId,
921        dst: NodeId,
922        edge_type: &str,
923    ) -> grafeo_common::types::EdgeId {
924        let (epoch, tx_id) = self.get_transaction_context();
925        self.store
926            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
927    }
928
929    // =========================================================================
930    // Direct Lookup APIs (bypass query planning for O(1) point reads)
931    // =========================================================================
932
933    /// Gets a node by ID directly, bypassing query planning.
934    ///
935    /// This is the fastest way to retrieve a single node when you know its ID.
936    /// Skips parsing, binding, optimization, and physical planning entirely.
937    ///
938    /// # Performance
939    ///
940    /// - Time complexity: O(1) average case
941    /// - No lock contention (uses DashMap internally)
942    /// - ~20-30x faster than equivalent MATCH query
943    ///
944    /// # Example
945    ///
946    /// ```no_run
947    /// # use grafeo_engine::GrafeoDB;
948    /// # let db = GrafeoDB::new_in_memory();
949    /// let session = db.session();
950    /// let node_id = session.create_node(&["Person"]);
951    ///
952    /// // Direct lookup - O(1), no query planning
953    /// let node = session.get_node(node_id);
954    /// assert!(node.is_some());
955    /// ```
956    #[must_use]
957    pub fn get_node(&self, id: NodeId) -> Option<Node> {
958        let (epoch, tx_id) = self.get_transaction_context();
959        self.store
960            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
961    }
962
963    /// Gets a single property from a node by ID, bypassing query planning.
964    ///
965    /// More efficient than `get_node()` when you only need one property,
966    /// as it avoids loading the full node with all properties.
967    ///
968    /// # Performance
969    ///
970    /// - Time complexity: O(1) average case
971    /// - No query planning overhead
972    ///
973    /// # Example
974    ///
975    /// ```no_run
976    /// # use grafeo_engine::GrafeoDB;
977    /// # use grafeo_common::types::Value;
978    /// # let db = GrafeoDB::new_in_memory();
979    /// let session = db.session();
980    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
981    ///
982    /// // Direct property access - O(1)
983    /// let name = session.get_node_property(id, "name");
984    /// assert_eq!(name, Some(Value::String("Alice".into())));
985    /// ```
986    #[must_use]
987    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
988        self.get_node(id)
989            .and_then(|node| node.get_property(key).cloned())
990    }
991
992    /// Gets an edge by ID directly, bypassing query planning.
993    ///
994    /// # Performance
995    ///
996    /// - Time complexity: O(1) average case
997    /// - No lock contention
998    #[must_use]
999    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1000        let (epoch, tx_id) = self.get_transaction_context();
1001        self.store
1002            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1003    }
1004
1005    /// Gets outgoing neighbors of a node directly, bypassing query planning.
1006    ///
1007    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
1008    ///
1009    /// # Performance
1010    ///
1011    /// - Time complexity: O(degree) where degree is the number of outgoing edges
1012    /// - Uses adjacency index for direct access
1013    /// - ~10-20x faster than equivalent MATCH query
1014    ///
1015    /// # Example
1016    ///
1017    /// ```no_run
1018    /// # use grafeo_engine::GrafeoDB;
1019    /// # let db = GrafeoDB::new_in_memory();
1020    /// let session = db.session();
1021    /// let alice = session.create_node(&["Person"]);
1022    /// let bob = session.create_node(&["Person"]);
1023    /// session.create_edge(alice, bob, "KNOWS");
1024    ///
1025    /// // Direct neighbor lookup - O(degree)
1026    /// let neighbors = session.get_neighbors_outgoing(alice);
1027    /// assert_eq!(neighbors.len(), 1);
1028    /// assert_eq!(neighbors[0].0, bob);
1029    /// ```
1030    #[must_use]
1031    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1032        self.store.edges_from(node, Direction::Outgoing).collect()
1033    }
1034
1035    /// Gets incoming neighbors of a node directly, bypassing query planning.
1036    ///
1037    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
1038    ///
1039    /// # Performance
1040    ///
1041    /// - Time complexity: O(degree) where degree is the number of incoming edges
1042    /// - Uses backward adjacency index for direct access
1043    #[must_use]
1044    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1045        self.store.edges_from(node, Direction::Incoming).collect()
1046    }
1047
1048    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
1049    ///
1050    /// # Example
1051    ///
1052    /// ```no_run
1053    /// # use grafeo_engine::GrafeoDB;
1054    /// # let db = GrafeoDB::new_in_memory();
1055    /// # let session = db.session();
1056    /// # let alice = session.create_node(&["Person"]);
1057    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1058    /// ```
1059    #[must_use]
1060    pub fn get_neighbors_outgoing_by_type(
1061        &self,
1062        node: NodeId,
1063        edge_type: &str,
1064    ) -> Vec<(NodeId, EdgeId)> {
1065        self.store
1066            .edges_from(node, Direction::Outgoing)
1067            .filter(|(_, edge_id)| {
1068                self.get_edge(*edge_id)
1069                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
1070            })
1071            .collect()
1072    }
1073
1074    /// Checks if a node exists, bypassing query planning.
1075    ///
1076    /// # Performance
1077    ///
1078    /// - Time complexity: O(1)
1079    /// - Fastest existence check available
1080    #[must_use]
1081    pub fn node_exists(&self, id: NodeId) -> bool {
1082        self.get_node(id).is_some()
1083    }
1084
1085    /// Checks if an edge exists, bypassing query planning.
1086    #[must_use]
1087    pub fn edge_exists(&self, id: EdgeId) -> bool {
1088        self.get_edge(id).is_some()
1089    }
1090
1091    /// Gets the degree (number of edges) of a node.
1092    ///
1093    /// Returns (outgoing_degree, incoming_degree).
1094    #[must_use]
1095    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1096        let out = self.store.out_degree(node);
1097        let in_degree = self.store.in_degree(node);
1098        (out, in_degree)
1099    }
1100
1101    /// Batch lookup of multiple nodes by ID.
1102    ///
1103    /// More efficient than calling `get_node()` in a loop because it
1104    /// amortizes overhead.
1105    ///
1106    /// # Performance
1107    ///
1108    /// - Time complexity: O(n) where n is the number of IDs
1109    /// - Better cache utilization than individual lookups
1110    #[must_use]
1111    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1112        let (epoch, tx_id) = self.get_transaction_context();
1113        let tx = tx_id.unwrap_or(TxId::SYSTEM);
1114        ids.iter()
1115            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1116            .collect()
1117    }
1118
1119    // ── Change Data Capture ─────────────────────────────────────────────
1120
1121    /// Returns the full change history for an entity (node or edge).
1122    #[cfg(feature = "cdc")]
1123    pub fn history(
1124        &self,
1125        entity_id: impl Into<crate::cdc::EntityId>,
1126    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1127        Ok(self.cdc_log.history(entity_id.into()))
1128    }
1129
1130    /// Returns change events for an entity since the given epoch.
1131    #[cfg(feature = "cdc")]
1132    pub fn history_since(
1133        &self,
1134        entity_id: impl Into<crate::cdc::EntityId>,
1135        since_epoch: EpochId,
1136    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1137        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1138    }
1139
1140    /// Returns all change events across all entities in an epoch range.
1141    #[cfg(feature = "cdc")]
1142    pub fn changes_between(
1143        &self,
1144        start_epoch: EpochId,
1145        end_epoch: EpochId,
1146    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1147        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1148    }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153    use crate::database::GrafeoDB;
1154
1155    #[test]
1156    fn test_session_create_node() {
1157        let db = GrafeoDB::new_in_memory();
1158        let session = db.session();
1159
1160        let id = session.create_node(&["Person"]);
1161        assert!(id.is_valid());
1162        assert_eq!(db.node_count(), 1);
1163    }
1164
1165    #[test]
1166    fn test_session_transaction() {
1167        let db = GrafeoDB::new_in_memory();
1168        let mut session = db.session();
1169
1170        assert!(!session.in_transaction());
1171
1172        session.begin_tx().unwrap();
1173        assert!(session.in_transaction());
1174
1175        session.commit().unwrap();
1176        assert!(!session.in_transaction());
1177    }
1178
1179    #[test]
1180    fn test_session_transaction_context() {
1181        let db = GrafeoDB::new_in_memory();
1182        let mut session = db.session();
1183
1184        // Without transaction - context should have current epoch and no tx_id
1185        let (_epoch1, tx_id1) = session.get_transaction_context();
1186        assert!(tx_id1.is_none());
1187
1188        // Start a transaction
1189        session.begin_tx().unwrap();
1190        let (epoch2, tx_id2) = session.get_transaction_context();
1191        assert!(tx_id2.is_some());
1192        // Transaction should have a valid epoch
1193        let _ = epoch2; // Use the variable
1194
1195        // Commit and verify
1196        session.commit().unwrap();
1197        let (epoch3, tx_id3) = session.get_transaction_context();
1198        assert!(tx_id3.is_none());
1199        // Epoch should have advanced after commit
1200        assert!(epoch3.as_u64() >= epoch2.as_u64());
1201    }
1202
1203    #[test]
1204    fn test_session_rollback() {
1205        let db = GrafeoDB::new_in_memory();
1206        let mut session = db.session();
1207
1208        session.begin_tx().unwrap();
1209        session.rollback().unwrap();
1210        assert!(!session.in_transaction());
1211    }
1212
1213    #[test]
1214    fn test_session_rollback_discards_versions() {
1215        use grafeo_common::types::TxId;
1216
1217        let db = GrafeoDB::new_in_memory();
1218
1219        // Create a node outside of any transaction (at system level)
1220        let node_before = db.store().create_node(&["Person"]);
1221        assert!(node_before.is_valid());
1222        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1223
1224        // Start a transaction
1225        let mut session = db.session();
1226        session.begin_tx().unwrap();
1227        let tx_id = session.current_tx.unwrap();
1228
1229        // Create a node versioned with the transaction's ID
1230        let epoch = db.store().current_epoch();
1231        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1232        assert!(node_in_tx.is_valid());
1233
1234        // Should see 2 nodes at this point
1235        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1236
1237        // Rollback the transaction
1238        session.rollback().unwrap();
1239        assert!(!session.in_transaction());
1240
1241        // The node created in the transaction should be discarded
1242        // Only the first node should remain visible
1243        let count_after = db.node_count();
1244        assert_eq!(
1245            count_after, 1,
1246            "Rollback should discard uncommitted node, but got {count_after}"
1247        );
1248
1249        // The original node should still be accessible
1250        let current_epoch = db.store().current_epoch();
1251        assert!(
1252            db.store()
1253                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1254                .is_some(),
1255            "Original node should still exist"
1256        );
1257
1258        // The node created in the transaction should not be accessible
1259        assert!(
1260            db.store()
1261                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1262                .is_none(),
1263            "Transaction node should be gone"
1264        );
1265    }
1266
1267    #[test]
1268    fn test_session_create_node_in_transaction() {
1269        // Test that session.create_node() is transaction-aware
1270        let db = GrafeoDB::new_in_memory();
1271
1272        // Create a node outside of any transaction
1273        let node_before = db.create_node(&["Person"]);
1274        assert!(node_before.is_valid());
1275        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1276
1277        // Start a transaction and create a node through the session
1278        let mut session = db.session();
1279        session.begin_tx().unwrap();
1280
1281        // Create a node through session.create_node() - should be versioned with tx
1282        let node_in_tx = session.create_node(&["Person"]);
1283        assert!(node_in_tx.is_valid());
1284
1285        // Should see 2 nodes at this point
1286        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1287
1288        // Rollback the transaction
1289        session.rollback().unwrap();
1290
1291        // The node created via session.create_node() should be discarded
1292        let count_after = db.node_count();
1293        assert_eq!(
1294            count_after, 1,
1295            "Rollback should discard node created via session.create_node(), but got {count_after}"
1296        );
1297    }
1298
1299    #[test]
1300    fn test_session_create_node_with_props_in_transaction() {
1301        use grafeo_common::types::Value;
1302
1303        // Test that session.create_node_with_props() is transaction-aware
1304        let db = GrafeoDB::new_in_memory();
1305
1306        // Create a node outside of any transaction
1307        db.create_node(&["Person"]);
1308        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1309
1310        // Start a transaction and create a node with properties
1311        let mut session = db.session();
1312        session.begin_tx().unwrap();
1313
1314        let node_in_tx =
1315            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1316        assert!(node_in_tx.is_valid());
1317
1318        // Should see 2 nodes
1319        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1320
1321        // Rollback the transaction
1322        session.rollback().unwrap();
1323
1324        // The node should be discarded
1325        let count_after = db.node_count();
1326        assert_eq!(
1327            count_after, 1,
1328            "Rollback should discard node created via session.create_node_with_props()"
1329        );
1330    }
1331
1332    #[cfg(feature = "gql")]
1333    mod gql_tests {
1334        use super::*;
1335
1336        #[test]
1337        fn test_gql_query_execution() {
1338            let db = GrafeoDB::new_in_memory();
1339            let session = db.session();
1340
1341            // Create some test data
1342            session.create_node(&["Person"]);
1343            session.create_node(&["Person"]);
1344            session.create_node(&["Animal"]);
1345
1346            // Execute a GQL query
1347            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1348
1349            // Should return 2 Person nodes
1350            assert_eq!(result.row_count(), 2);
1351            assert_eq!(result.column_count(), 1);
1352            assert_eq!(result.columns[0], "n");
1353        }
1354
1355        #[test]
1356        fn test_gql_empty_result() {
1357            let db = GrafeoDB::new_in_memory();
1358            let session = db.session();
1359
1360            // No data in database
1361            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1362
1363            assert_eq!(result.row_count(), 0);
1364        }
1365
1366        #[test]
1367        fn test_gql_parse_error() {
1368            let db = GrafeoDB::new_in_memory();
1369            let session = db.session();
1370
1371            // Invalid GQL syntax
1372            let result = session.execute("MATCH (n RETURN n");
1373
1374            assert!(result.is_err());
1375        }
1376
1377        #[test]
1378        fn test_gql_relationship_traversal() {
1379            let db = GrafeoDB::new_in_memory();
1380            let session = db.session();
1381
1382            // Create a graph: Alice -> Bob, Alice -> Charlie
1383            let alice = session.create_node(&["Person"]);
1384            let bob = session.create_node(&["Person"]);
1385            let charlie = session.create_node(&["Person"]);
1386
1387            session.create_edge(alice, bob, "KNOWS");
1388            session.create_edge(alice, charlie, "KNOWS");
1389
1390            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1391            let result = session
1392                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1393                .unwrap();
1394
1395            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1396            assert_eq!(result.row_count(), 2);
1397            assert_eq!(result.column_count(), 2);
1398            assert_eq!(result.columns[0], "a");
1399            assert_eq!(result.columns[1], "b");
1400        }
1401
1402        #[test]
1403        fn test_gql_relationship_with_type_filter() {
1404            let db = GrafeoDB::new_in_memory();
1405            let session = db.session();
1406
1407            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1408            let alice = session.create_node(&["Person"]);
1409            let bob = session.create_node(&["Person"]);
1410            let charlie = session.create_node(&["Person"]);
1411
1412            session.create_edge(alice, bob, "KNOWS");
1413            session.create_edge(alice, charlie, "WORKS_WITH");
1414
1415            // Query only KNOWS relationships
1416            let result = session
1417                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1418                .unwrap();
1419
1420            // Should return only 1 row (Alice->Bob)
1421            assert_eq!(result.row_count(), 1);
1422        }
1423
1424        #[test]
1425        fn test_gql_semantic_error_undefined_variable() {
1426            let db = GrafeoDB::new_in_memory();
1427            let session = db.session();
1428
1429            // Reference undefined variable 'x' in RETURN
1430            let result = session.execute("MATCH (n:Person) RETURN x");
1431
1432            // Should fail with semantic error
1433            assert!(result.is_err());
1434            let Err(err) = result else {
1435                panic!("Expected error")
1436            };
1437            assert!(
1438                err.to_string().contains("Undefined variable"),
1439                "Expected undefined variable error, got: {}",
1440                err
1441            );
1442        }
1443
1444        #[test]
1445        fn test_gql_where_clause_property_filter() {
1446            use grafeo_common::types::Value;
1447
1448            let db = GrafeoDB::new_in_memory();
1449            let session = db.session();
1450
1451            // Create people with ages
1452            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1453            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1454            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1455
1456            // Query with WHERE clause: age > 30
1457            let result = session
1458                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1459                .unwrap();
1460
1461            // Should return 2 people (ages 35 and 45)
1462            assert_eq!(result.row_count(), 2);
1463        }
1464
1465        #[test]
1466        fn test_gql_where_clause_equality() {
1467            use grafeo_common::types::Value;
1468
1469            let db = GrafeoDB::new_in_memory();
1470            let session = db.session();
1471
1472            // Create people with names
1473            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1474            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1475            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1476
1477            // Query with WHERE clause: name = "Alice"
1478            let result = session
1479                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1480                .unwrap();
1481
1482            // Should return 2 people named Alice
1483            assert_eq!(result.row_count(), 2);
1484        }
1485
1486        #[test]
1487        fn test_gql_return_property_access() {
1488            use grafeo_common::types::Value;
1489
1490            let db = GrafeoDB::new_in_memory();
1491            let session = db.session();
1492
1493            // Create people with names and ages
1494            session.create_node_with_props(
1495                &["Person"],
1496                [
1497                    ("name", Value::String("Alice".into())),
1498                    ("age", Value::Int64(30)),
1499                ],
1500            );
1501            session.create_node_with_props(
1502                &["Person"],
1503                [
1504                    ("name", Value::String("Bob".into())),
1505                    ("age", Value::Int64(25)),
1506                ],
1507            );
1508
1509            // Query returning properties
1510            let result = session
1511                .execute("MATCH (n:Person) RETURN n.name, n.age")
1512                .unwrap();
1513
1514            // Should return 2 rows with name and age columns
1515            assert_eq!(result.row_count(), 2);
1516            assert_eq!(result.column_count(), 2);
1517            assert_eq!(result.columns[0], "n.name");
1518            assert_eq!(result.columns[1], "n.age");
1519
1520            // Check that we get actual values
1521            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1522            assert!(names.contains(&&Value::String("Alice".into())));
1523            assert!(names.contains(&&Value::String("Bob".into())));
1524        }
1525
1526        #[test]
1527        fn test_gql_return_mixed_expressions() {
1528            use grafeo_common::types::Value;
1529
1530            let db = GrafeoDB::new_in_memory();
1531            let session = db.session();
1532
1533            // Create a person
1534            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1535
1536            // Query returning both node and property
1537            let result = session
1538                .execute("MATCH (n:Person) RETURN n, n.name")
1539                .unwrap();
1540
1541            assert_eq!(result.row_count(), 1);
1542            assert_eq!(result.column_count(), 2);
1543            assert_eq!(result.columns[0], "n");
1544            assert_eq!(result.columns[1], "n.name");
1545
1546            // Second column should be the name
1547            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1548        }
1549    }
1550
1551    #[cfg(feature = "cypher")]
1552    mod cypher_tests {
1553        use super::*;
1554
1555        #[test]
1556        fn test_cypher_query_execution() {
1557            let db = GrafeoDB::new_in_memory();
1558            let session = db.session();
1559
1560            // Create some test data
1561            session.create_node(&["Person"]);
1562            session.create_node(&["Person"]);
1563            session.create_node(&["Animal"]);
1564
1565            // Execute a Cypher query
1566            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1567
1568            // Should return 2 Person nodes
1569            assert_eq!(result.row_count(), 2);
1570            assert_eq!(result.column_count(), 1);
1571            assert_eq!(result.columns[0], "n");
1572        }
1573
1574        #[test]
1575        fn test_cypher_empty_result() {
1576            let db = GrafeoDB::new_in_memory();
1577            let session = db.session();
1578
1579            // No data in database
1580            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1581
1582            assert_eq!(result.row_count(), 0);
1583        }
1584
1585        #[test]
1586        fn test_cypher_parse_error() {
1587            let db = GrafeoDB::new_in_memory();
1588            let session = db.session();
1589
1590            // Invalid Cypher syntax
1591            let result = session.execute_cypher("MATCH (n RETURN n");
1592
1593            assert!(result.is_err());
1594        }
1595    }
1596
1597    // ==================== Direct Lookup API Tests ====================
1598
1599    mod direct_lookup_tests {
1600        use super::*;
1601        use grafeo_common::types::Value;
1602
1603        #[test]
1604        fn test_get_node() {
1605            let db = GrafeoDB::new_in_memory();
1606            let session = db.session();
1607
1608            let id = session.create_node(&["Person"]);
1609            let node = session.get_node(id);
1610
1611            assert!(node.is_some());
1612            let node = node.unwrap();
1613            assert_eq!(node.id, id);
1614        }
1615
1616        #[test]
1617        fn test_get_node_not_found() {
1618            use grafeo_common::types::NodeId;
1619
1620            let db = GrafeoDB::new_in_memory();
1621            let session = db.session();
1622
1623            // Try to get a non-existent node
1624            let node = session.get_node(NodeId::new(9999));
1625            assert!(node.is_none());
1626        }
1627
1628        #[test]
1629        fn test_get_node_property() {
1630            let db = GrafeoDB::new_in_memory();
1631            let session = db.session();
1632
1633            let id = session
1634                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1635
1636            let name = session.get_node_property(id, "name");
1637            assert_eq!(name, Some(Value::String("Alice".into())));
1638
1639            // Non-existent property
1640            let missing = session.get_node_property(id, "missing");
1641            assert!(missing.is_none());
1642        }
1643
1644        #[test]
1645        fn test_get_edge() {
1646            let db = GrafeoDB::new_in_memory();
1647            let session = db.session();
1648
1649            let alice = session.create_node(&["Person"]);
1650            let bob = session.create_node(&["Person"]);
1651            let edge_id = session.create_edge(alice, bob, "KNOWS");
1652
1653            let edge = session.get_edge(edge_id);
1654            assert!(edge.is_some());
1655            let edge = edge.unwrap();
1656            assert_eq!(edge.id, edge_id);
1657            assert_eq!(edge.src, alice);
1658            assert_eq!(edge.dst, bob);
1659        }
1660
1661        #[test]
1662        fn test_get_edge_not_found() {
1663            use grafeo_common::types::EdgeId;
1664
1665            let db = GrafeoDB::new_in_memory();
1666            let session = db.session();
1667
1668            let edge = session.get_edge(EdgeId::new(9999));
1669            assert!(edge.is_none());
1670        }
1671
1672        #[test]
1673        fn test_get_neighbors_outgoing() {
1674            let db = GrafeoDB::new_in_memory();
1675            let session = db.session();
1676
1677            let alice = session.create_node(&["Person"]);
1678            let bob = session.create_node(&["Person"]);
1679            let carol = session.create_node(&["Person"]);
1680
1681            session.create_edge(alice, bob, "KNOWS");
1682            session.create_edge(alice, carol, "KNOWS");
1683
1684            let neighbors = session.get_neighbors_outgoing(alice);
1685            assert_eq!(neighbors.len(), 2);
1686
1687            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1688            assert!(neighbor_ids.contains(&bob));
1689            assert!(neighbor_ids.contains(&carol));
1690        }
1691
1692        #[test]
1693        fn test_get_neighbors_incoming() {
1694            let db = GrafeoDB::new_in_memory();
1695            let session = db.session();
1696
1697            let alice = session.create_node(&["Person"]);
1698            let bob = session.create_node(&["Person"]);
1699            let carol = session.create_node(&["Person"]);
1700
1701            session.create_edge(bob, alice, "KNOWS");
1702            session.create_edge(carol, alice, "KNOWS");
1703
1704            let neighbors = session.get_neighbors_incoming(alice);
1705            assert_eq!(neighbors.len(), 2);
1706
1707            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1708            assert!(neighbor_ids.contains(&bob));
1709            assert!(neighbor_ids.contains(&carol));
1710        }
1711
1712        #[test]
1713        fn test_get_neighbors_outgoing_by_type() {
1714            let db = GrafeoDB::new_in_memory();
1715            let session = db.session();
1716
1717            let alice = session.create_node(&["Person"]);
1718            let bob = session.create_node(&["Person"]);
1719            let company = session.create_node(&["Company"]);
1720
1721            session.create_edge(alice, bob, "KNOWS");
1722            session.create_edge(alice, company, "WORKS_AT");
1723
1724            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1725            assert_eq!(knows_neighbors.len(), 1);
1726            assert_eq!(knows_neighbors[0].0, bob);
1727
1728            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1729            assert_eq!(works_neighbors.len(), 1);
1730            assert_eq!(works_neighbors[0].0, company);
1731
1732            // No edges of this type
1733            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1734            assert!(no_neighbors.is_empty());
1735        }
1736
1737        #[test]
1738        fn test_node_exists() {
1739            use grafeo_common::types::NodeId;
1740
1741            let db = GrafeoDB::new_in_memory();
1742            let session = db.session();
1743
1744            let id = session.create_node(&["Person"]);
1745
1746            assert!(session.node_exists(id));
1747            assert!(!session.node_exists(NodeId::new(9999)));
1748        }
1749
1750        #[test]
1751        fn test_edge_exists() {
1752            use grafeo_common::types::EdgeId;
1753
1754            let db = GrafeoDB::new_in_memory();
1755            let session = db.session();
1756
1757            let alice = session.create_node(&["Person"]);
1758            let bob = session.create_node(&["Person"]);
1759            let edge_id = session.create_edge(alice, bob, "KNOWS");
1760
1761            assert!(session.edge_exists(edge_id));
1762            assert!(!session.edge_exists(EdgeId::new(9999)));
1763        }
1764
1765        #[test]
1766        fn test_get_degree() {
1767            let db = GrafeoDB::new_in_memory();
1768            let session = db.session();
1769
1770            let alice = session.create_node(&["Person"]);
1771            let bob = session.create_node(&["Person"]);
1772            let carol = session.create_node(&["Person"]);
1773
1774            // Alice knows Bob and Carol (2 outgoing)
1775            session.create_edge(alice, bob, "KNOWS");
1776            session.create_edge(alice, carol, "KNOWS");
1777            // Bob knows Alice (1 incoming for Alice)
1778            session.create_edge(bob, alice, "KNOWS");
1779
1780            let (out_degree, in_degree) = session.get_degree(alice);
1781            assert_eq!(out_degree, 2);
1782            assert_eq!(in_degree, 1);
1783
1784            // Node with no edges
1785            let lonely = session.create_node(&["Person"]);
1786            let (out, in_deg) = session.get_degree(lonely);
1787            assert_eq!(out, 0);
1788            assert_eq!(in_deg, 0);
1789        }
1790
1791        #[test]
1792        fn test_get_nodes_batch() {
1793            let db = GrafeoDB::new_in_memory();
1794            let session = db.session();
1795
1796            let alice = session.create_node(&["Person"]);
1797            let bob = session.create_node(&["Person"]);
1798            let carol = session.create_node(&["Person"]);
1799
1800            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1801            assert_eq!(nodes.len(), 3);
1802            assert!(nodes[0].is_some());
1803            assert!(nodes[1].is_some());
1804            assert!(nodes[2].is_some());
1805
1806            // With non-existent node
1807            use grafeo_common::types::NodeId;
1808            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1809            assert_eq!(nodes_with_missing.len(), 3);
1810            assert!(nodes_with_missing[0].is_some());
1811            assert!(nodes_with_missing[1].is_none()); // Missing node
1812            assert!(nodes_with_missing[2].is_some());
1813        }
1814
1815        #[test]
1816        fn test_auto_commit_setting() {
1817            let db = GrafeoDB::new_in_memory();
1818            let mut session = db.session();
1819
1820            // Default is auto-commit enabled
1821            assert!(session.auto_commit());
1822
1823            session.set_auto_commit(false);
1824            assert!(!session.auto_commit());
1825
1826            session.set_auto_commit(true);
1827            assert!(session.auto_commit());
1828        }
1829
1830        #[test]
1831        fn test_transaction_double_begin_error() {
1832            let db = GrafeoDB::new_in_memory();
1833            let mut session = db.session();
1834
1835            session.begin_tx().unwrap();
1836            let result = session.begin_tx();
1837
1838            assert!(result.is_err());
1839            // Clean up
1840            session.rollback().unwrap();
1841        }
1842
1843        #[test]
1844        fn test_commit_without_transaction_error() {
1845            let db = GrafeoDB::new_in_memory();
1846            let mut session = db.session();
1847
1848            let result = session.commit();
1849            assert!(result.is_err());
1850        }
1851
1852        #[test]
1853        fn test_rollback_without_transaction_error() {
1854            let db = GrafeoDB::new_in_memory();
1855            let mut session = db.session();
1856
1857            let result = session.rollback();
1858            assert!(result.is_err());
1859        }
1860
1861        #[test]
1862        fn test_create_edge_in_transaction() {
1863            let db = GrafeoDB::new_in_memory();
1864            let mut session = db.session();
1865
1866            // Create nodes outside transaction
1867            let alice = session.create_node(&["Person"]);
1868            let bob = session.create_node(&["Person"]);
1869
1870            // Create edge in transaction
1871            session.begin_tx().unwrap();
1872            let edge_id = session.create_edge(alice, bob, "KNOWS");
1873
1874            // Edge should be visible in the transaction
1875            assert!(session.edge_exists(edge_id));
1876
1877            // Commit
1878            session.commit().unwrap();
1879
1880            // Edge should still be visible
1881            assert!(session.edge_exists(edge_id));
1882        }
1883
1884        #[test]
1885        fn test_neighbors_empty_node() {
1886            let db = GrafeoDB::new_in_memory();
1887            let session = db.session();
1888
1889            let lonely = session.create_node(&["Person"]);
1890
1891            assert!(session.get_neighbors_outgoing(lonely).is_empty());
1892            assert!(session.get_neighbors_incoming(lonely).is_empty());
1893            assert!(
1894                session
1895                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1896                    .is_empty()
1897            );
1898        }
1899    }
1900}