Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::{AdaptiveConfig, GraphModel};
19use crate::database::QueryResult;
20use crate::query::cache::QueryCache;
21use crate::transaction::TransactionManager;
22
23/// Your handle to the database - execute queries and manage transactions.
24///
25/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
26/// tracks its own transaction state, so you can have multiple concurrent
27/// sessions without them interfering.
28pub struct Session {
29    /// The underlying store.
30    store: Arc<LpgStore>,
31    /// RDF triple store (if RDF feature is enabled).
32    #[cfg(feature = "rdf")]
33    #[allow(dead_code)]
34    rdf_store: Arc<RdfStore>,
35    /// Transaction manager.
36    tx_manager: Arc<TransactionManager>,
37    /// Query cache shared across sessions.
38    query_cache: Arc<QueryCache>,
39    /// Current transaction ID (if any).
40    current_tx: Option<TxId>,
41    /// Whether the session is in auto-commit mode.
42    auto_commit: bool,
43    /// Adaptive execution configuration.
44    #[allow(dead_code)]
45    adaptive_config: AdaptiveConfig,
46    /// Whether to use factorized execution for multi-hop queries.
47    factorized_execution: bool,
48    /// The graph data model this session operates on.
49    graph_model: GraphModel,
50    /// Maximum time a query may run before being cancelled.
51    query_timeout: Option<Duration>,
52    /// Shared commit counter for triggering auto-GC.
53    commit_counter: Arc<AtomicUsize>,
54    /// GC every N commits (0 = disabled).
55    gc_interval: usize,
56}
57
58impl Session {
59    /// Creates a new session.
60    #[allow(dead_code)]
61    pub(crate) fn new(
62        store: Arc<LpgStore>,
63        tx_manager: Arc<TransactionManager>,
64        query_cache: Arc<QueryCache>,
65    ) -> Self {
66        Self {
67            store,
68            #[cfg(feature = "rdf")]
69            rdf_store: Arc::new(RdfStore::new()),
70            tx_manager,
71            query_cache,
72            current_tx: None,
73            auto_commit: true,
74            adaptive_config: AdaptiveConfig::default(),
75            factorized_execution: true,
76            graph_model: GraphModel::Lpg,
77            query_timeout: None,
78            commit_counter: Arc::new(AtomicUsize::new(0)),
79            gc_interval: 0,
80        }
81    }
82
83    /// Creates a new session with adaptive execution configuration.
84    #[allow(dead_code, clippy::too_many_arguments)]
85    pub(crate) fn with_adaptive(
86        store: Arc<LpgStore>,
87        tx_manager: Arc<TransactionManager>,
88        query_cache: Arc<QueryCache>,
89        adaptive_config: AdaptiveConfig,
90        factorized_execution: bool,
91        graph_model: GraphModel,
92        query_timeout: Option<Duration>,
93        commit_counter: Arc<AtomicUsize>,
94        gc_interval: usize,
95    ) -> Self {
96        Self {
97            store,
98            #[cfg(feature = "rdf")]
99            rdf_store: Arc::new(RdfStore::new()),
100            tx_manager,
101            query_cache,
102            current_tx: None,
103            auto_commit: true,
104            adaptive_config,
105            factorized_execution,
106            graph_model,
107            query_timeout,
108            commit_counter,
109            gc_interval,
110        }
111    }
112
113    /// Creates a new session with RDF store and adaptive configuration.
114    #[cfg(feature = "rdf")]
115    #[allow(clippy::too_many_arguments)]
116    pub(crate) fn with_rdf_store_and_adaptive(
117        store: Arc<LpgStore>,
118        rdf_store: Arc<RdfStore>,
119        tx_manager: Arc<TransactionManager>,
120        query_cache: Arc<QueryCache>,
121        adaptive_config: AdaptiveConfig,
122        factorized_execution: bool,
123        graph_model: GraphModel,
124        query_timeout: Option<Duration>,
125        commit_counter: Arc<AtomicUsize>,
126        gc_interval: usize,
127    ) -> Self {
128        Self {
129            store,
130            rdf_store,
131            tx_manager,
132            query_cache,
133            current_tx: None,
134            auto_commit: true,
135            adaptive_config,
136            factorized_execution,
137            graph_model,
138            query_timeout,
139            commit_counter,
140            gc_interval,
141        }
142    }
143
144    /// Returns the graph model this session operates on.
145    #[must_use]
146    pub fn graph_model(&self) -> GraphModel {
147        self.graph_model
148    }
149
150    /// Checks that the session's graph model supports LPG operations.
151    fn require_lpg(&self, language: &str) -> Result<()> {
152        if self.graph_model == GraphModel::Rdf {
153            return Err(grafeo_common::utils::error::Error::Internal(format!(
154                "This is an RDF database. {language} queries require an LPG database."
155            )));
156        }
157        Ok(())
158    }
159
160    /// Executes a GQL query.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if the query fails to parse or execute.
165    ///
166    /// # Examples
167    ///
168    /// ```ignore
169    /// use grafeo_engine::GrafeoDB;
170    ///
171    /// let db = GrafeoDB::new_in_memory();
172    /// let session = db.session();
173    ///
174    /// // Create a node
175    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
176    ///
177    /// // Query nodes
178    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
179    /// for row in result {
180    ///     println!("{:?}", row);
181    /// }
182    /// ```
183    #[cfg(feature = "gql")]
184    pub fn execute(&self, query: &str) -> Result<QueryResult> {
185        self.require_lpg("GQL")?;
186
187        use crate::query::{
188            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
189            optimizer::Optimizer, processor::QueryLanguage,
190        };
191
192        let start_time = std::time::Instant::now();
193
194        // Create cache key for this query
195        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
196
197        // Try to get cached optimized plan
198        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
199            // Cache hit - skip parsing, translation, binding, and optimization
200            cached_plan
201        } else {
202            // Cache miss - run full pipeline
203
204            // Parse and translate the query to a logical plan
205            let logical_plan = gql_translator::translate(query)?;
206
207            // Semantic validation
208            let mut binder = Binder::new();
209            let _binding_context = binder.bind(&logical_plan)?;
210
211            // Optimize the plan
212            let optimizer = Optimizer::from_store(&self.store);
213            let plan = optimizer.optimize(logical_plan)?;
214
215            // Cache the optimized plan for future use
216            self.query_cache.put_optimized(cache_key, plan.clone());
217
218            plan
219        };
220
221        // Get transaction context for MVCC visibility
222        let (viewing_epoch, tx_id) = self.get_transaction_context();
223
224        // Convert to physical plan with transaction context
225        // (Physical planning cannot be cached as it depends on transaction state)
226        let planner = Planner::with_context(
227            Arc::clone(&self.store),
228            Arc::clone(&self.tx_manager),
229            tx_id,
230            viewing_epoch,
231        )
232        .with_factorized_execution(self.factorized_execution);
233        let mut physical_plan = planner.plan(&optimized_plan)?;
234
235        // Execute the plan
236        let executor = Executor::with_columns(physical_plan.columns.clone())
237            .with_deadline(self.query_deadline());
238        let mut result = executor.execute(physical_plan.operator.as_mut())?;
239
240        // Add execution metrics
241        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
242        let rows_scanned = result.rows.len() as u64;
243        result.execution_time_ms = Some(elapsed_ms);
244        result.rows_scanned = Some(rows_scanned);
245
246        Ok(result)
247    }
248
249    /// Executes a GQL query with parameters.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if the query fails to parse or execute.
254    #[cfg(feature = "gql")]
255    pub fn execute_with_params(
256        &self,
257        query: &str,
258        params: std::collections::HashMap<String, Value>,
259    ) -> Result<QueryResult> {
260        self.require_lpg("GQL")?;
261
262        use crate::query::processor::{QueryLanguage, QueryProcessor};
263
264        // Get transaction context for MVCC visibility
265        let (viewing_epoch, tx_id) = self.get_transaction_context();
266
267        // Create processor with transaction context
268        let processor =
269            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
270
271        // Apply transaction context if in a transaction
272        let processor = if let Some(tx_id) = tx_id {
273            processor.with_tx_context(viewing_epoch, tx_id)
274        } else {
275            processor
276        };
277
278        processor.process(query, QueryLanguage::Gql, Some(&params))
279    }
280
281    /// Executes a GQL query with parameters.
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if no query language is enabled.
286    #[cfg(not(any(feature = "gql", feature = "cypher")))]
287    pub fn execute_with_params(
288        &self,
289        _query: &str,
290        _params: std::collections::HashMap<String, Value>,
291    ) -> Result<QueryResult> {
292        Err(grafeo_common::utils::error::Error::Internal(
293            "No query language enabled".to_string(),
294        ))
295    }
296
297    /// Executes a GQL query.
298    ///
299    /// # Errors
300    ///
301    /// Returns an error if no query language is enabled.
302    #[cfg(not(any(feature = "gql", feature = "cypher")))]
303    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
304        Err(grafeo_common::utils::error::Error::Internal(
305            "No query language enabled".to_string(),
306        ))
307    }
308
309    /// Executes a Cypher query.
310    ///
311    /// # Errors
312    ///
313    /// Returns an error if the query fails to parse or execute.
314    #[cfg(feature = "cypher")]
315    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
316        use crate::query::{
317            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
318            optimizer::Optimizer, processor::QueryLanguage,
319        };
320
321        // Create cache key for this query
322        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
323
324        // Try to get cached optimized plan
325        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
326            cached_plan
327        } else {
328            // Parse and translate the query to a logical plan
329            let logical_plan = cypher_translator::translate(query)?;
330
331            // Semantic validation
332            let mut binder = Binder::new();
333            let _binding_context = binder.bind(&logical_plan)?;
334
335            // Optimize the plan
336            let optimizer = Optimizer::from_store(&self.store);
337            let plan = optimizer.optimize(logical_plan)?;
338
339            // Cache the optimized plan
340            self.query_cache.put_optimized(cache_key, plan.clone());
341
342            plan
343        };
344
345        // Get transaction context for MVCC visibility
346        let (viewing_epoch, tx_id) = self.get_transaction_context();
347
348        // Convert to physical plan with transaction context
349        let planner = Planner::with_context(
350            Arc::clone(&self.store),
351            Arc::clone(&self.tx_manager),
352            tx_id,
353            viewing_epoch,
354        )
355        .with_factorized_execution(self.factorized_execution);
356        let mut physical_plan = planner.plan(&optimized_plan)?;
357
358        // Execute the plan
359        let executor = Executor::with_columns(physical_plan.columns.clone())
360            .with_deadline(self.query_deadline());
361        executor.execute(physical_plan.operator.as_mut())
362    }
363
364    /// Executes a Gremlin query.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if the query fails to parse or execute.
369    ///
370    /// # Examples
371    ///
372    /// ```ignore
373    /// use grafeo_engine::GrafeoDB;
374    ///
375    /// let db = GrafeoDB::new_in_memory();
376    /// let session = db.session();
377    ///
378    /// // Create some nodes first
379    /// session.create_node(&["Person"]);
380    ///
381    /// // Query using Gremlin
382    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
383    /// ```
384    #[cfg(feature = "gremlin")]
385    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
386        use crate::query::{
387            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
388        };
389
390        // Parse and translate the query to a logical plan
391        let logical_plan = gremlin_translator::translate(query)?;
392
393        // Semantic validation
394        let mut binder = Binder::new();
395        let _binding_context = binder.bind(&logical_plan)?;
396
397        // Optimize the plan
398        let optimizer = Optimizer::from_store(&self.store);
399        let optimized_plan = optimizer.optimize(logical_plan)?;
400
401        // Get transaction context for MVCC visibility
402        let (viewing_epoch, tx_id) = self.get_transaction_context();
403
404        // Convert to physical plan with transaction context
405        let planner = Planner::with_context(
406            Arc::clone(&self.store),
407            Arc::clone(&self.tx_manager),
408            tx_id,
409            viewing_epoch,
410        )
411        .with_factorized_execution(self.factorized_execution);
412        let mut physical_plan = planner.plan(&optimized_plan)?;
413
414        // Execute the plan
415        let executor = Executor::with_columns(physical_plan.columns.clone())
416            .with_deadline(self.query_deadline());
417        executor.execute(physical_plan.operator.as_mut())
418    }
419
420    /// Executes a Gremlin query with parameters.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if the query fails to parse or execute.
425    #[cfg(feature = "gremlin")]
426    pub fn execute_gremlin_with_params(
427        &self,
428        query: &str,
429        params: std::collections::HashMap<String, Value>,
430    ) -> Result<QueryResult> {
431        use crate::query::processor::{QueryLanguage, QueryProcessor};
432
433        // Get transaction context for MVCC visibility
434        let (viewing_epoch, tx_id) = self.get_transaction_context();
435
436        // Create processor with transaction context
437        let processor =
438            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
439
440        // Apply transaction context if in a transaction
441        let processor = if let Some(tx_id) = tx_id {
442            processor.with_tx_context(viewing_epoch, tx_id)
443        } else {
444            processor
445        };
446
447        processor.process(query, QueryLanguage::Gremlin, Some(&params))
448    }
449
450    /// Executes a GraphQL query against the LPG store.
451    ///
452    /// # Errors
453    ///
454    /// Returns an error if the query fails to parse or execute.
455    ///
456    /// # Examples
457    ///
458    /// ```ignore
459    /// use grafeo_engine::GrafeoDB;
460    ///
461    /// let db = GrafeoDB::new_in_memory();
462    /// let session = db.session();
463    ///
464    /// // Create some nodes first
465    /// session.create_node(&["User"]);
466    ///
467    /// // Query using GraphQL
468    /// let result = session.execute_graphql("query { user { id name } }")?;
469    /// ```
470    #[cfg(feature = "graphql")]
471    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
472        use crate::query::{
473            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
474        };
475
476        // Parse and translate the query to a logical plan
477        let logical_plan = graphql_translator::translate(query)?;
478
479        // Semantic validation
480        let mut binder = Binder::new();
481        let _binding_context = binder.bind(&logical_plan)?;
482
483        // Optimize the plan
484        let optimizer = Optimizer::from_store(&self.store);
485        let optimized_plan = optimizer.optimize(logical_plan)?;
486
487        // Get transaction context for MVCC visibility
488        let (viewing_epoch, tx_id) = self.get_transaction_context();
489
490        // Convert to physical plan with transaction context
491        let planner = Planner::with_context(
492            Arc::clone(&self.store),
493            Arc::clone(&self.tx_manager),
494            tx_id,
495            viewing_epoch,
496        )
497        .with_factorized_execution(self.factorized_execution);
498        let mut physical_plan = planner.plan(&optimized_plan)?;
499
500        // Execute the plan
501        let executor = Executor::with_columns(physical_plan.columns.clone())
502            .with_deadline(self.query_deadline());
503        executor.execute(physical_plan.operator.as_mut())
504    }
505
506    /// Executes a GraphQL query with parameters.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if the query fails to parse or execute.
511    #[cfg(feature = "graphql")]
512    pub fn execute_graphql_with_params(
513        &self,
514        query: &str,
515        params: std::collections::HashMap<String, Value>,
516    ) -> Result<QueryResult> {
517        use crate::query::processor::{QueryLanguage, QueryProcessor};
518
519        // Get transaction context for MVCC visibility
520        let (viewing_epoch, tx_id) = self.get_transaction_context();
521
522        // Create processor with transaction context
523        let processor =
524            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
525
526        // Apply transaction context if in a transaction
527        let processor = if let Some(tx_id) = tx_id {
528            processor.with_tx_context(viewing_epoch, tx_id)
529        } else {
530            processor
531        };
532
533        processor.process(query, QueryLanguage::GraphQL, Some(&params))
534    }
535
536    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
537    ///
538    /// # Errors
539    ///
540    /// Returns an error if the query fails to parse or execute.
541    ///
542    /// # Examples
543    ///
544    /// ```ignore
545    /// use grafeo_engine::GrafeoDB;
546    ///
547    /// let db = GrafeoDB::new_in_memory();
548    /// let session = db.session();
549    ///
550    /// let result = session.execute_sql(
551    ///     "SELECT * FROM GRAPH_TABLE (
552    ///         MATCH (n:Person)
553    ///         COLUMNS (n.name AS name)
554    ///     )"
555    /// )?;
556    /// ```
557    #[cfg(feature = "sql-pgq")]
558    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
559        use crate::query::{
560            Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
561            plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
562        };
563
564        // Parse and translate (always needed to check for DDL)
565        let logical_plan = sql_pgq_translator::translate(query)?;
566
567        // Handle DDL statements directly (they don't go through the query pipeline)
568        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
569            return Ok(QueryResult {
570                columns: vec!["status".into()],
571                column_types: vec![grafeo_common::types::LogicalType::String],
572                rows: vec![vec![Value::from(format!(
573                    "Property graph '{}' created ({} node tables, {} edge tables)",
574                    cpg.name,
575                    cpg.node_tables.len(),
576                    cpg.edge_tables.len()
577                ))]],
578                execution_time_ms: None,
579                rows_scanned: None,
580            });
581        }
582
583        // Create cache key for query plans
584        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
585
586        // Try to get cached optimized plan
587        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
588            cached_plan
589        } else {
590            // Semantic validation
591            let mut binder = Binder::new();
592            let _binding_context = binder.bind(&logical_plan)?;
593
594            // Optimize the plan
595            let optimizer = Optimizer::from_store(&self.store);
596            let plan = optimizer.optimize(logical_plan)?;
597
598            // Cache the optimized plan
599            self.query_cache.put_optimized(cache_key, plan.clone());
600
601            plan
602        };
603
604        // Get transaction context for MVCC visibility
605        let (viewing_epoch, tx_id) = self.get_transaction_context();
606
607        // Convert to physical plan with transaction context
608        let planner = Planner::with_context(
609            Arc::clone(&self.store),
610            Arc::clone(&self.tx_manager),
611            tx_id,
612            viewing_epoch,
613        )
614        .with_factorized_execution(self.factorized_execution);
615        let mut physical_plan = planner.plan(&optimized_plan)?;
616
617        // Execute the plan
618        let executor = Executor::with_columns(physical_plan.columns.clone())
619            .with_deadline(self.query_deadline());
620        executor.execute(physical_plan.operator.as_mut())
621    }
622
623    /// Executes a SQL/PGQ query with parameters.
624    ///
625    /// # Errors
626    ///
627    /// Returns an error if the query fails to parse or execute.
628    #[cfg(feature = "sql-pgq")]
629    pub fn execute_sql_with_params(
630        &self,
631        query: &str,
632        params: std::collections::HashMap<String, Value>,
633    ) -> Result<QueryResult> {
634        use crate::query::processor::{QueryLanguage, QueryProcessor};
635
636        // Get transaction context for MVCC visibility
637        let (viewing_epoch, tx_id) = self.get_transaction_context();
638
639        // Create processor with transaction context
640        let processor =
641            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
642
643        // Apply transaction context if in a transaction
644        let processor = if let Some(tx_id) = tx_id {
645            processor.with_tx_context(viewing_epoch, tx_id)
646        } else {
647            processor
648        };
649
650        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
651    }
652
653    /// Executes a SPARQL query.
654    ///
655    /// # Errors
656    ///
657    /// Returns an error if the query fails to parse or execute.
658    #[cfg(all(feature = "sparql", feature = "rdf"))]
659    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
660        use crate::query::{
661            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
662        };
663
664        // Parse and translate the SPARQL query to a logical plan
665        let logical_plan = sparql_translator::translate(query)?;
666
667        // Optimize the plan
668        let optimizer = Optimizer::from_store(&self.store);
669        let optimized_plan = optimizer.optimize(logical_plan)?;
670
671        // Convert to physical plan using RDF planner
672        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
673        let mut physical_plan = planner.plan(&optimized_plan)?;
674
675        // Execute the plan
676        let executor = Executor::with_columns(physical_plan.columns.clone())
677            .with_deadline(self.query_deadline());
678        executor.execute(physical_plan.operator.as_mut())
679    }
680
681    /// Executes a SPARQL query with parameters.
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if the query fails to parse or execute.
686    #[cfg(all(feature = "sparql", feature = "rdf"))]
687    pub fn execute_sparql_with_params(
688        &self,
689        query: &str,
690        _params: std::collections::HashMap<String, Value>,
691    ) -> Result<QueryResult> {
692        // TODO: Implement parameter substitution for SPARQL
693        // For now, just execute the query without parameters
694        self.execute_sparql(query)
695    }
696
697    /// Begins a new transaction.
698    ///
699    /// # Errors
700    ///
701    /// Returns an error if a transaction is already active.
702    ///
703    /// # Examples
704    ///
705    /// ```ignore
706    /// use grafeo_engine::GrafeoDB;
707    ///
708    /// let db = GrafeoDB::new_in_memory();
709    /// let mut session = db.session();
710    ///
711    /// session.begin_tx()?;
712    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
713    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
714    /// session.commit()?; // Both inserts committed atomically
715    /// ```
716    pub fn begin_tx(&mut self) -> Result<()> {
717        if self.current_tx.is_some() {
718            return Err(grafeo_common::utils::error::Error::Transaction(
719                grafeo_common::utils::error::TransactionError::InvalidState(
720                    "Transaction already active".to_string(),
721                ),
722            ));
723        }
724
725        let tx_id = self.tx_manager.begin();
726        self.current_tx = Some(tx_id);
727        Ok(())
728    }
729
730    /// Begins a transaction with a specific isolation level.
731    ///
732    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
733    ///
734    /// # Errors
735    ///
736    /// Returns an error if a transaction is already active.
737    pub fn begin_tx_with_isolation(
738        &mut self,
739        isolation_level: crate::transaction::IsolationLevel,
740    ) -> Result<()> {
741        if self.current_tx.is_some() {
742            return Err(grafeo_common::utils::error::Error::Transaction(
743                grafeo_common::utils::error::TransactionError::InvalidState(
744                    "Transaction already active".to_string(),
745                ),
746            ));
747        }
748
749        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
750        self.current_tx = Some(tx_id);
751        Ok(())
752    }
753
754    /// Commits the current transaction.
755    ///
756    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
757    ///
758    /// # Errors
759    ///
760    /// Returns an error if no transaction is active.
761    pub fn commit(&mut self) -> Result<()> {
762        let tx_id = self.current_tx.take().ok_or_else(|| {
763            grafeo_common::utils::error::Error::Transaction(
764                grafeo_common::utils::error::TransactionError::InvalidState(
765                    "No active transaction".to_string(),
766                ),
767            )
768        })?;
769
770        // Commit RDF store pending operations
771        #[cfg(feature = "rdf")]
772        self.rdf_store.commit_tx(tx_id);
773
774        self.tx_manager.commit(tx_id)?;
775
776        // Auto-GC: periodically prune old MVCC versions
777        if self.gc_interval > 0 {
778            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
779            if count.is_multiple_of(self.gc_interval) {
780                let min_epoch = self.tx_manager.min_active_epoch();
781                self.store.gc_versions(min_epoch);
782                self.tx_manager.gc();
783            }
784        }
785
786        Ok(())
787    }
788
789    /// Aborts the current transaction.
790    ///
791    /// Discards all changes since [`begin_tx`](Self::begin_tx).
792    ///
793    /// # Errors
794    ///
795    /// Returns an error if no transaction is active.
796    ///
797    /// # Examples
798    ///
799    /// ```ignore
800    /// use grafeo_engine::GrafeoDB;
801    ///
802    /// let db = GrafeoDB::new_in_memory();
803    /// let mut session = db.session();
804    ///
805    /// session.begin_tx()?;
806    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
807    /// session.rollback()?; // Insert is discarded
808    /// ```
809    pub fn rollback(&mut self) -> Result<()> {
810        let tx_id = self.current_tx.take().ok_or_else(|| {
811            grafeo_common::utils::error::Error::Transaction(
812                grafeo_common::utils::error::TransactionError::InvalidState(
813                    "No active transaction".to_string(),
814                ),
815            )
816        })?;
817
818        // Discard uncommitted versions in the LPG store
819        self.store.discard_uncommitted_versions(tx_id);
820
821        // Discard pending operations in the RDF store
822        #[cfg(feature = "rdf")]
823        self.rdf_store.rollback_tx(tx_id);
824
825        // Mark transaction as aborted in the manager
826        self.tx_manager.abort(tx_id)
827    }
828
829    /// Returns whether a transaction is active.
830    #[must_use]
831    pub fn in_transaction(&self) -> bool {
832        self.current_tx.is_some()
833    }
834
835    /// Sets auto-commit mode.
836    pub fn set_auto_commit(&mut self, auto_commit: bool) {
837        self.auto_commit = auto_commit;
838    }
839
840    /// Returns whether auto-commit is enabled.
841    #[must_use]
842    pub fn auto_commit(&self) -> bool {
843        self.auto_commit
844    }
845
846    /// Computes the wall-clock deadline for query execution.
847    #[must_use]
848    fn query_deadline(&self) -> Option<Instant> {
849        self.query_timeout.map(|d| Instant::now() + d)
850    }
851
852    /// Returns the current transaction context for MVCC visibility.
853    ///
854    /// Returns `(viewing_epoch, tx_id)` where:
855    /// - `viewing_epoch` is the epoch at which to check version visibility
856    /// - `tx_id` is the current transaction ID (if in a transaction)
857    #[must_use]
858    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
859        if let Some(tx_id) = self.current_tx {
860            // In a transaction - use the transaction's start epoch
861            let epoch = self
862                .tx_manager
863                .start_epoch(tx_id)
864                .unwrap_or_else(|| self.tx_manager.current_epoch());
865            (epoch, Some(tx_id))
866        } else {
867            // No transaction - use current epoch
868            (self.tx_manager.current_epoch(), None)
869        }
870    }
871
872    /// Creates a node directly (bypassing query execution).
873    ///
874    /// This is a low-level API for testing and direct manipulation.
875    /// If a transaction is active, the node will be versioned with the transaction ID.
876    pub fn create_node(&self, labels: &[&str]) -> NodeId {
877        let (epoch, tx_id) = self.get_transaction_context();
878        self.store
879            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
880    }
881
882    /// Creates a node with properties.
883    ///
884    /// If a transaction is active, the node will be versioned with the transaction ID.
885    pub fn create_node_with_props<'a>(
886        &self,
887        labels: &[&str],
888        properties: impl IntoIterator<Item = (&'a str, Value)>,
889    ) -> NodeId {
890        let (epoch, tx_id) = self.get_transaction_context();
891        self.store.create_node_with_props_versioned(
892            labels,
893            properties.into_iter().map(|(k, v)| (k, v)),
894            epoch,
895            tx_id.unwrap_or(TxId::SYSTEM),
896        )
897    }
898
899    /// Creates an edge between two nodes.
900    ///
901    /// This is a low-level API for testing and direct manipulation.
902    /// If a transaction is active, the edge will be versioned with the transaction ID.
903    pub fn create_edge(
904        &self,
905        src: NodeId,
906        dst: NodeId,
907        edge_type: &str,
908    ) -> grafeo_common::types::EdgeId {
909        let (epoch, tx_id) = self.get_transaction_context();
910        self.store
911            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
912    }
913
914    // =========================================================================
915    // Direct Lookup APIs (bypass query planning for O(1) point reads)
916    // =========================================================================
917
918    /// Gets a node by ID directly, bypassing query planning.
919    ///
920    /// This is the fastest way to retrieve a single node when you know its ID.
921    /// Skips parsing, binding, optimization, and physical planning entirely.
922    ///
923    /// # Performance
924    ///
925    /// - Time complexity: O(1) average case
926    /// - No lock contention (uses DashMap internally)
927    /// - ~20-30x faster than equivalent MATCH query
928    ///
929    /// # Example
930    ///
931    /// ```ignore
932    /// let session = db.session();
933    /// let node_id = session.create_node(&["Person"]);
934    ///
935    /// // Direct lookup - O(1), no query planning
936    /// let node = session.get_node(node_id);
937    /// assert!(node.is_some());
938    /// ```
939    #[must_use]
940    pub fn get_node(&self, id: NodeId) -> Option<Node> {
941        let (epoch, tx_id) = self.get_transaction_context();
942        self.store
943            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
944    }
945
946    /// Gets a single property from a node by ID, bypassing query planning.
947    ///
948    /// More efficient than `get_node()` when you only need one property,
949    /// as it avoids loading the full node with all properties.
950    ///
951    /// # Performance
952    ///
953    /// - Time complexity: O(1) average case
954    /// - No query planning overhead
955    ///
956    /// # Example
957    ///
958    /// ```ignore
959    /// let session = db.session();
960    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
961    ///
962    /// // Direct property access - O(1)
963    /// let name = session.get_node_property(id, "name");
964    /// assert_eq!(name, Some(Value::String("Alice".into())));
965    /// ```
966    #[must_use]
967    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
968        self.get_node(id)
969            .and_then(|node| node.get_property(key).cloned())
970    }
971
972    /// Gets an edge by ID directly, bypassing query planning.
973    ///
974    /// # Performance
975    ///
976    /// - Time complexity: O(1) average case
977    /// - No lock contention
978    #[must_use]
979    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
980        let (epoch, tx_id) = self.get_transaction_context();
981        self.store
982            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
983    }
984
985    /// Gets outgoing neighbors of a node directly, bypassing query planning.
986    ///
987    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
988    ///
989    /// # Performance
990    ///
991    /// - Time complexity: O(degree) where degree is the number of outgoing edges
992    /// - Uses adjacency index for direct access
993    /// - ~10-20x faster than equivalent MATCH query
994    ///
995    /// # Example
996    ///
997    /// ```ignore
998    /// let session = db.session();
999    /// let alice = session.create_node(&["Person"]);
1000    /// let bob = session.create_node(&["Person"]);
1001    /// session.create_edge(alice, bob, "KNOWS");
1002    ///
1003    /// // Direct neighbor lookup - O(degree)
1004    /// let neighbors = session.get_neighbors_outgoing(alice);
1005    /// assert_eq!(neighbors.len(), 1);
1006    /// assert_eq!(neighbors[0].0, bob);
1007    /// ```
1008    #[must_use]
1009    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1010        self.store.edges_from(node, Direction::Outgoing).collect()
1011    }
1012
1013    /// Gets incoming neighbors of a node directly, bypassing query planning.
1014    ///
1015    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
1016    ///
1017    /// # Performance
1018    ///
1019    /// - Time complexity: O(degree) where degree is the number of incoming edges
1020    /// - Uses backward adjacency index for direct access
1021    #[must_use]
1022    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1023        self.store.edges_from(node, Direction::Incoming).collect()
1024    }
1025
1026    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
1027    ///
1028    /// # Example
1029    ///
1030    /// ```ignore
1031    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1032    /// ```
1033    #[must_use]
1034    pub fn get_neighbors_outgoing_by_type(
1035        &self,
1036        node: NodeId,
1037        edge_type: &str,
1038    ) -> Vec<(NodeId, EdgeId)> {
1039        self.store
1040            .edges_from(node, Direction::Outgoing)
1041            .filter(|(_, edge_id)| {
1042                self.get_edge(*edge_id)
1043                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
1044            })
1045            .collect()
1046    }
1047
1048    /// Checks if a node exists, bypassing query planning.
1049    ///
1050    /// # Performance
1051    ///
1052    /// - Time complexity: O(1)
1053    /// - Fastest existence check available
1054    #[must_use]
1055    pub fn node_exists(&self, id: NodeId) -> bool {
1056        self.get_node(id).is_some()
1057    }
1058
1059    /// Checks if an edge exists, bypassing query planning.
1060    #[must_use]
1061    pub fn edge_exists(&self, id: EdgeId) -> bool {
1062        self.get_edge(id).is_some()
1063    }
1064
1065    /// Gets the degree (number of edges) of a node.
1066    ///
1067    /// Returns (outgoing_degree, incoming_degree).
1068    #[must_use]
1069    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1070        let out = self.store.out_degree(node);
1071        let in_degree = self.store.in_degree(node);
1072        (out, in_degree)
1073    }
1074
1075    /// Batch lookup of multiple nodes by ID.
1076    ///
1077    /// More efficient than calling `get_node()` in a loop because it
1078    /// amortizes overhead.
1079    ///
1080    /// # Performance
1081    ///
1082    /// - Time complexity: O(n) where n is the number of IDs
1083    /// - Better cache utilization than individual lookups
1084    #[must_use]
1085    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1086        let (epoch, tx_id) = self.get_transaction_context();
1087        let tx = tx_id.unwrap_or(TxId::SYSTEM);
1088        ids.iter()
1089            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1090            .collect()
1091    }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096    use crate::database::GrafeoDB;
1097
1098    #[test]
1099    fn test_session_create_node() {
1100        let db = GrafeoDB::new_in_memory();
1101        let session = db.session();
1102
1103        let id = session.create_node(&["Person"]);
1104        assert!(id.is_valid());
1105        assert_eq!(db.node_count(), 1);
1106    }
1107
1108    #[test]
1109    fn test_session_transaction() {
1110        let db = GrafeoDB::new_in_memory();
1111        let mut session = db.session();
1112
1113        assert!(!session.in_transaction());
1114
1115        session.begin_tx().unwrap();
1116        assert!(session.in_transaction());
1117
1118        session.commit().unwrap();
1119        assert!(!session.in_transaction());
1120    }
1121
1122    #[test]
1123    fn test_session_transaction_context() {
1124        let db = GrafeoDB::new_in_memory();
1125        let mut session = db.session();
1126
1127        // Without transaction - context should have current epoch and no tx_id
1128        let (_epoch1, tx_id1) = session.get_transaction_context();
1129        assert!(tx_id1.is_none());
1130
1131        // Start a transaction
1132        session.begin_tx().unwrap();
1133        let (epoch2, tx_id2) = session.get_transaction_context();
1134        assert!(tx_id2.is_some());
1135        // Transaction should have a valid epoch
1136        let _ = epoch2; // Use the variable
1137
1138        // Commit and verify
1139        session.commit().unwrap();
1140        let (epoch3, tx_id3) = session.get_transaction_context();
1141        assert!(tx_id3.is_none());
1142        // Epoch should have advanced after commit
1143        assert!(epoch3.as_u64() >= epoch2.as_u64());
1144    }
1145
1146    #[test]
1147    fn test_session_rollback() {
1148        let db = GrafeoDB::new_in_memory();
1149        let mut session = db.session();
1150
1151        session.begin_tx().unwrap();
1152        session.rollback().unwrap();
1153        assert!(!session.in_transaction());
1154    }
1155
1156    #[test]
1157    fn test_session_rollback_discards_versions() {
1158        use grafeo_common::types::TxId;
1159
1160        let db = GrafeoDB::new_in_memory();
1161
1162        // Create a node outside of any transaction (at system level)
1163        let node_before = db.store().create_node(&["Person"]);
1164        assert!(node_before.is_valid());
1165        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1166
1167        // Start a transaction
1168        let mut session = db.session();
1169        session.begin_tx().unwrap();
1170        let tx_id = session.current_tx.unwrap();
1171
1172        // Create a node versioned with the transaction's ID
1173        let epoch = db.store().current_epoch();
1174        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1175        assert!(node_in_tx.is_valid());
1176
1177        // Should see 2 nodes at this point
1178        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1179
1180        // Rollback the transaction
1181        session.rollback().unwrap();
1182        assert!(!session.in_transaction());
1183
1184        // The node created in the transaction should be discarded
1185        // Only the first node should remain visible
1186        let count_after = db.node_count();
1187        assert_eq!(
1188            count_after, 1,
1189            "Rollback should discard uncommitted node, but got {count_after}"
1190        );
1191
1192        // The original node should still be accessible
1193        let current_epoch = db.store().current_epoch();
1194        assert!(
1195            db.store()
1196                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1197                .is_some(),
1198            "Original node should still exist"
1199        );
1200
1201        // The node created in the transaction should not be accessible
1202        assert!(
1203            db.store()
1204                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1205                .is_none(),
1206            "Transaction node should be gone"
1207        );
1208    }
1209
1210    #[test]
1211    fn test_session_create_node_in_transaction() {
1212        // Test that session.create_node() is transaction-aware
1213        let db = GrafeoDB::new_in_memory();
1214
1215        // Create a node outside of any transaction
1216        let node_before = db.create_node(&["Person"]);
1217        assert!(node_before.is_valid());
1218        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1219
1220        // Start a transaction and create a node through the session
1221        let mut session = db.session();
1222        session.begin_tx().unwrap();
1223
1224        // Create a node through session.create_node() - should be versioned with tx
1225        let node_in_tx = session.create_node(&["Person"]);
1226        assert!(node_in_tx.is_valid());
1227
1228        // Should see 2 nodes at this point
1229        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1230
1231        // Rollback the transaction
1232        session.rollback().unwrap();
1233
1234        // The node created via session.create_node() should be discarded
1235        let count_after = db.node_count();
1236        assert_eq!(
1237            count_after, 1,
1238            "Rollback should discard node created via session.create_node(), but got {count_after}"
1239        );
1240    }
1241
1242    #[test]
1243    fn test_session_create_node_with_props_in_transaction() {
1244        use grafeo_common::types::Value;
1245
1246        // Test that session.create_node_with_props() is transaction-aware
1247        let db = GrafeoDB::new_in_memory();
1248
1249        // Create a node outside of any transaction
1250        db.create_node(&["Person"]);
1251        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1252
1253        // Start a transaction and create a node with properties
1254        let mut session = db.session();
1255        session.begin_tx().unwrap();
1256
1257        let node_in_tx =
1258            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1259        assert!(node_in_tx.is_valid());
1260
1261        // Should see 2 nodes
1262        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1263
1264        // Rollback the transaction
1265        session.rollback().unwrap();
1266
1267        // The node should be discarded
1268        let count_after = db.node_count();
1269        assert_eq!(
1270            count_after, 1,
1271            "Rollback should discard node created via session.create_node_with_props()"
1272        );
1273    }
1274
1275    #[cfg(feature = "gql")]
1276    mod gql_tests {
1277        use super::*;
1278
1279        #[test]
1280        fn test_gql_query_execution() {
1281            let db = GrafeoDB::new_in_memory();
1282            let session = db.session();
1283
1284            // Create some test data
1285            session.create_node(&["Person"]);
1286            session.create_node(&["Person"]);
1287            session.create_node(&["Animal"]);
1288
1289            // Execute a GQL query
1290            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1291
1292            // Should return 2 Person nodes
1293            assert_eq!(result.row_count(), 2);
1294            assert_eq!(result.column_count(), 1);
1295            assert_eq!(result.columns[0], "n");
1296        }
1297
1298        #[test]
1299        fn test_gql_empty_result() {
1300            let db = GrafeoDB::new_in_memory();
1301            let session = db.session();
1302
1303            // No data in database
1304            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1305
1306            assert_eq!(result.row_count(), 0);
1307        }
1308
1309        #[test]
1310        fn test_gql_parse_error() {
1311            let db = GrafeoDB::new_in_memory();
1312            let session = db.session();
1313
1314            // Invalid GQL syntax
1315            let result = session.execute("MATCH (n RETURN n");
1316
1317            assert!(result.is_err());
1318        }
1319
1320        #[test]
1321        fn test_gql_relationship_traversal() {
1322            let db = GrafeoDB::new_in_memory();
1323            let session = db.session();
1324
1325            // Create a graph: Alice -> Bob, Alice -> Charlie
1326            let alice = session.create_node(&["Person"]);
1327            let bob = session.create_node(&["Person"]);
1328            let charlie = session.create_node(&["Person"]);
1329
1330            session.create_edge(alice, bob, "KNOWS");
1331            session.create_edge(alice, charlie, "KNOWS");
1332
1333            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1334            let result = session
1335                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1336                .unwrap();
1337
1338            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1339            assert_eq!(result.row_count(), 2);
1340            assert_eq!(result.column_count(), 2);
1341            assert_eq!(result.columns[0], "a");
1342            assert_eq!(result.columns[1], "b");
1343        }
1344
1345        #[test]
1346        fn test_gql_relationship_with_type_filter() {
1347            let db = GrafeoDB::new_in_memory();
1348            let session = db.session();
1349
1350            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1351            let alice = session.create_node(&["Person"]);
1352            let bob = session.create_node(&["Person"]);
1353            let charlie = session.create_node(&["Person"]);
1354
1355            session.create_edge(alice, bob, "KNOWS");
1356            session.create_edge(alice, charlie, "WORKS_WITH");
1357
1358            // Query only KNOWS relationships
1359            let result = session
1360                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1361                .unwrap();
1362
1363            // Should return only 1 row (Alice->Bob)
1364            assert_eq!(result.row_count(), 1);
1365        }
1366
1367        #[test]
1368        fn test_gql_semantic_error_undefined_variable() {
1369            let db = GrafeoDB::new_in_memory();
1370            let session = db.session();
1371
1372            // Reference undefined variable 'x' in RETURN
1373            let result = session.execute("MATCH (n:Person) RETURN x");
1374
1375            // Should fail with semantic error
1376            assert!(result.is_err());
1377            let Err(err) = result else {
1378                panic!("Expected error")
1379            };
1380            assert!(
1381                err.to_string().contains("Undefined variable"),
1382                "Expected undefined variable error, got: {}",
1383                err
1384            );
1385        }
1386
1387        #[test]
1388        fn test_gql_where_clause_property_filter() {
1389            use grafeo_common::types::Value;
1390
1391            let db = GrafeoDB::new_in_memory();
1392            let session = db.session();
1393
1394            // Create people with ages
1395            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1396            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1397            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1398
1399            // Query with WHERE clause: age > 30
1400            let result = session
1401                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1402                .unwrap();
1403
1404            // Should return 2 people (ages 35 and 45)
1405            assert_eq!(result.row_count(), 2);
1406        }
1407
1408        #[test]
1409        fn test_gql_where_clause_equality() {
1410            use grafeo_common::types::Value;
1411
1412            let db = GrafeoDB::new_in_memory();
1413            let session = db.session();
1414
1415            // Create people with names
1416            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1417            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1418            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1419
1420            // Query with WHERE clause: name = "Alice"
1421            let result = session
1422                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1423                .unwrap();
1424
1425            // Should return 2 people named Alice
1426            assert_eq!(result.row_count(), 2);
1427        }
1428
1429        #[test]
1430        fn test_gql_return_property_access() {
1431            use grafeo_common::types::Value;
1432
1433            let db = GrafeoDB::new_in_memory();
1434            let session = db.session();
1435
1436            // Create people with names and ages
1437            session.create_node_with_props(
1438                &["Person"],
1439                [
1440                    ("name", Value::String("Alice".into())),
1441                    ("age", Value::Int64(30)),
1442                ],
1443            );
1444            session.create_node_with_props(
1445                &["Person"],
1446                [
1447                    ("name", Value::String("Bob".into())),
1448                    ("age", Value::Int64(25)),
1449                ],
1450            );
1451
1452            // Query returning properties
1453            let result = session
1454                .execute("MATCH (n:Person) RETURN n.name, n.age")
1455                .unwrap();
1456
1457            // Should return 2 rows with name and age columns
1458            assert_eq!(result.row_count(), 2);
1459            assert_eq!(result.column_count(), 2);
1460            assert_eq!(result.columns[0], "n.name");
1461            assert_eq!(result.columns[1], "n.age");
1462
1463            // Check that we get actual values
1464            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1465            assert!(names.contains(&&Value::String("Alice".into())));
1466            assert!(names.contains(&&Value::String("Bob".into())));
1467        }
1468
1469        #[test]
1470        fn test_gql_return_mixed_expressions() {
1471            use grafeo_common::types::Value;
1472
1473            let db = GrafeoDB::new_in_memory();
1474            let session = db.session();
1475
1476            // Create a person
1477            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1478
1479            // Query returning both node and property
1480            let result = session
1481                .execute("MATCH (n:Person) RETURN n, n.name")
1482                .unwrap();
1483
1484            assert_eq!(result.row_count(), 1);
1485            assert_eq!(result.column_count(), 2);
1486            assert_eq!(result.columns[0], "n");
1487            assert_eq!(result.columns[1], "n.name");
1488
1489            // Second column should be the name
1490            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1491        }
1492    }
1493
1494    #[cfg(feature = "cypher")]
1495    mod cypher_tests {
1496        use super::*;
1497
1498        #[test]
1499        fn test_cypher_query_execution() {
1500            let db = GrafeoDB::new_in_memory();
1501            let session = db.session();
1502
1503            // Create some test data
1504            session.create_node(&["Person"]);
1505            session.create_node(&["Person"]);
1506            session.create_node(&["Animal"]);
1507
1508            // Execute a Cypher query
1509            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1510
1511            // Should return 2 Person nodes
1512            assert_eq!(result.row_count(), 2);
1513            assert_eq!(result.column_count(), 1);
1514            assert_eq!(result.columns[0], "n");
1515        }
1516
1517        #[test]
1518        fn test_cypher_empty_result() {
1519            let db = GrafeoDB::new_in_memory();
1520            let session = db.session();
1521
1522            // No data in database
1523            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1524
1525            assert_eq!(result.row_count(), 0);
1526        }
1527
1528        #[test]
1529        fn test_cypher_parse_error() {
1530            let db = GrafeoDB::new_in_memory();
1531            let session = db.session();
1532
1533            // Invalid Cypher syntax
1534            let result = session.execute_cypher("MATCH (n RETURN n");
1535
1536            assert!(result.is_err());
1537        }
1538    }
1539
1540    // ==================== Direct Lookup API Tests ====================
1541
1542    mod direct_lookup_tests {
1543        use super::*;
1544        use grafeo_common::types::Value;
1545
1546        #[test]
1547        fn test_get_node() {
1548            let db = GrafeoDB::new_in_memory();
1549            let session = db.session();
1550
1551            let id = session.create_node(&["Person"]);
1552            let node = session.get_node(id);
1553
1554            assert!(node.is_some());
1555            let node = node.unwrap();
1556            assert_eq!(node.id, id);
1557        }
1558
1559        #[test]
1560        fn test_get_node_not_found() {
1561            use grafeo_common::types::NodeId;
1562
1563            let db = GrafeoDB::new_in_memory();
1564            let session = db.session();
1565
1566            // Try to get a non-existent node
1567            let node = session.get_node(NodeId::new(9999));
1568            assert!(node.is_none());
1569        }
1570
1571        #[test]
1572        fn test_get_node_property() {
1573            let db = GrafeoDB::new_in_memory();
1574            let session = db.session();
1575
1576            let id = session
1577                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1578
1579            let name = session.get_node_property(id, "name");
1580            assert_eq!(name, Some(Value::String("Alice".into())));
1581
1582            // Non-existent property
1583            let missing = session.get_node_property(id, "missing");
1584            assert!(missing.is_none());
1585        }
1586
1587        #[test]
1588        fn test_get_edge() {
1589            let db = GrafeoDB::new_in_memory();
1590            let session = db.session();
1591
1592            let alice = session.create_node(&["Person"]);
1593            let bob = session.create_node(&["Person"]);
1594            let edge_id = session.create_edge(alice, bob, "KNOWS");
1595
1596            let edge = session.get_edge(edge_id);
1597            assert!(edge.is_some());
1598            let edge = edge.unwrap();
1599            assert_eq!(edge.id, edge_id);
1600            assert_eq!(edge.src, alice);
1601            assert_eq!(edge.dst, bob);
1602        }
1603
1604        #[test]
1605        fn test_get_edge_not_found() {
1606            use grafeo_common::types::EdgeId;
1607
1608            let db = GrafeoDB::new_in_memory();
1609            let session = db.session();
1610
1611            let edge = session.get_edge(EdgeId::new(9999));
1612            assert!(edge.is_none());
1613        }
1614
1615        #[test]
1616        fn test_get_neighbors_outgoing() {
1617            let db = GrafeoDB::new_in_memory();
1618            let session = db.session();
1619
1620            let alice = session.create_node(&["Person"]);
1621            let bob = session.create_node(&["Person"]);
1622            let carol = session.create_node(&["Person"]);
1623
1624            session.create_edge(alice, bob, "KNOWS");
1625            session.create_edge(alice, carol, "KNOWS");
1626
1627            let neighbors = session.get_neighbors_outgoing(alice);
1628            assert_eq!(neighbors.len(), 2);
1629
1630            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1631            assert!(neighbor_ids.contains(&bob));
1632            assert!(neighbor_ids.contains(&carol));
1633        }
1634
1635        #[test]
1636        fn test_get_neighbors_incoming() {
1637            let db = GrafeoDB::new_in_memory();
1638            let session = db.session();
1639
1640            let alice = session.create_node(&["Person"]);
1641            let bob = session.create_node(&["Person"]);
1642            let carol = session.create_node(&["Person"]);
1643
1644            session.create_edge(bob, alice, "KNOWS");
1645            session.create_edge(carol, alice, "KNOWS");
1646
1647            let neighbors = session.get_neighbors_incoming(alice);
1648            assert_eq!(neighbors.len(), 2);
1649
1650            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1651            assert!(neighbor_ids.contains(&bob));
1652            assert!(neighbor_ids.contains(&carol));
1653        }
1654
1655        #[test]
1656        fn test_get_neighbors_outgoing_by_type() {
1657            let db = GrafeoDB::new_in_memory();
1658            let session = db.session();
1659
1660            let alice = session.create_node(&["Person"]);
1661            let bob = session.create_node(&["Person"]);
1662            let company = session.create_node(&["Company"]);
1663
1664            session.create_edge(alice, bob, "KNOWS");
1665            session.create_edge(alice, company, "WORKS_AT");
1666
1667            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1668            assert_eq!(knows_neighbors.len(), 1);
1669            assert_eq!(knows_neighbors[0].0, bob);
1670
1671            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1672            assert_eq!(works_neighbors.len(), 1);
1673            assert_eq!(works_neighbors[0].0, company);
1674
1675            // No edges of this type
1676            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1677            assert!(no_neighbors.is_empty());
1678        }
1679
1680        #[test]
1681        fn test_node_exists() {
1682            use grafeo_common::types::NodeId;
1683
1684            let db = GrafeoDB::new_in_memory();
1685            let session = db.session();
1686
1687            let id = session.create_node(&["Person"]);
1688
1689            assert!(session.node_exists(id));
1690            assert!(!session.node_exists(NodeId::new(9999)));
1691        }
1692
1693        #[test]
1694        fn test_edge_exists() {
1695            use grafeo_common::types::EdgeId;
1696
1697            let db = GrafeoDB::new_in_memory();
1698            let session = db.session();
1699
1700            let alice = session.create_node(&["Person"]);
1701            let bob = session.create_node(&["Person"]);
1702            let edge_id = session.create_edge(alice, bob, "KNOWS");
1703
1704            assert!(session.edge_exists(edge_id));
1705            assert!(!session.edge_exists(EdgeId::new(9999)));
1706        }
1707
1708        #[test]
1709        fn test_get_degree() {
1710            let db = GrafeoDB::new_in_memory();
1711            let session = db.session();
1712
1713            let alice = session.create_node(&["Person"]);
1714            let bob = session.create_node(&["Person"]);
1715            let carol = session.create_node(&["Person"]);
1716
1717            // Alice knows Bob and Carol (2 outgoing)
1718            session.create_edge(alice, bob, "KNOWS");
1719            session.create_edge(alice, carol, "KNOWS");
1720            // Bob knows Alice (1 incoming for Alice)
1721            session.create_edge(bob, alice, "KNOWS");
1722
1723            let (out_degree, in_degree) = session.get_degree(alice);
1724            assert_eq!(out_degree, 2);
1725            assert_eq!(in_degree, 1);
1726
1727            // Node with no edges
1728            let lonely = session.create_node(&["Person"]);
1729            let (out, in_deg) = session.get_degree(lonely);
1730            assert_eq!(out, 0);
1731            assert_eq!(in_deg, 0);
1732        }
1733
1734        #[test]
1735        fn test_get_nodes_batch() {
1736            let db = GrafeoDB::new_in_memory();
1737            let session = db.session();
1738
1739            let alice = session.create_node(&["Person"]);
1740            let bob = session.create_node(&["Person"]);
1741            let carol = session.create_node(&["Person"]);
1742
1743            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1744            assert_eq!(nodes.len(), 3);
1745            assert!(nodes[0].is_some());
1746            assert!(nodes[1].is_some());
1747            assert!(nodes[2].is_some());
1748
1749            // With non-existent node
1750            use grafeo_common::types::NodeId;
1751            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1752            assert_eq!(nodes_with_missing.len(), 3);
1753            assert!(nodes_with_missing[0].is_some());
1754            assert!(nodes_with_missing[1].is_none()); // Missing node
1755            assert!(nodes_with_missing[2].is_some());
1756        }
1757
1758        #[test]
1759        fn test_auto_commit_setting() {
1760            let db = GrafeoDB::new_in_memory();
1761            let mut session = db.session();
1762
1763            // Default is auto-commit enabled
1764            assert!(session.auto_commit());
1765
1766            session.set_auto_commit(false);
1767            assert!(!session.auto_commit());
1768
1769            session.set_auto_commit(true);
1770            assert!(session.auto_commit());
1771        }
1772
1773        #[test]
1774        fn test_transaction_double_begin_error() {
1775            let db = GrafeoDB::new_in_memory();
1776            let mut session = db.session();
1777
1778            session.begin_tx().unwrap();
1779            let result = session.begin_tx();
1780
1781            assert!(result.is_err());
1782            // Clean up
1783            session.rollback().unwrap();
1784        }
1785
1786        #[test]
1787        fn test_commit_without_transaction_error() {
1788            let db = GrafeoDB::new_in_memory();
1789            let mut session = db.session();
1790
1791            let result = session.commit();
1792            assert!(result.is_err());
1793        }
1794
1795        #[test]
1796        fn test_rollback_without_transaction_error() {
1797            let db = GrafeoDB::new_in_memory();
1798            let mut session = db.session();
1799
1800            let result = session.rollback();
1801            assert!(result.is_err());
1802        }
1803
1804        #[test]
1805        fn test_create_edge_in_transaction() {
1806            let db = GrafeoDB::new_in_memory();
1807            let mut session = db.session();
1808
1809            // Create nodes outside transaction
1810            let alice = session.create_node(&["Person"]);
1811            let bob = session.create_node(&["Person"]);
1812
1813            // Create edge in transaction
1814            session.begin_tx().unwrap();
1815            let edge_id = session.create_edge(alice, bob, "KNOWS");
1816
1817            // Edge should be visible in the transaction
1818            assert!(session.edge_exists(edge_id));
1819
1820            // Commit
1821            session.commit().unwrap();
1822
1823            // Edge should still be visible
1824            assert!(session.edge_exists(edge_id));
1825        }
1826
1827        #[test]
1828        fn test_neighbors_empty_node() {
1829            let db = GrafeoDB::new_in_memory();
1830            let session = db.session();
1831
1832            let lonely = session.create_node(&["Person"]);
1833
1834            assert!(session.get_neighbors_outgoing(lonely).is_empty());
1835            assert!(session.get_neighbors_incoming(lonely).is_empty());
1836            assert!(
1837                session
1838                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1839                    .is_empty()
1840            );
1841        }
1842    }
1843}