Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::{AdaptiveConfig, GraphModel};
19use crate::database::QueryResult;
20use crate::query::cache::QueryCache;
21use crate::transaction::TransactionManager;
22
23/// Your handle to the database - execute queries and manage transactions.
24///
25/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
26/// tracks its own transaction state, so you can have multiple concurrent
27/// sessions without them interfering.
28pub struct Session {
29    /// The underlying store.
30    store: Arc<LpgStore>,
31    /// RDF triple store (if RDF feature is enabled).
32    #[cfg(feature = "rdf")]
33    #[allow(dead_code)]
34    rdf_store: Arc<RdfStore>,
35    /// Transaction manager.
36    tx_manager: Arc<TransactionManager>,
37    /// Query cache shared across sessions.
38    query_cache: Arc<QueryCache>,
39    /// Current transaction ID (if any).
40    current_tx: Option<TxId>,
41    /// Whether the session is in auto-commit mode.
42    auto_commit: bool,
43    /// Adaptive execution configuration.
44    #[allow(dead_code)]
45    adaptive_config: AdaptiveConfig,
46    /// Whether to use factorized execution for multi-hop queries.
47    factorized_execution: bool,
48    /// The graph data model this session operates on.
49    graph_model: GraphModel,
50    /// Maximum time a query may run before being cancelled.
51    query_timeout: Option<Duration>,
52    /// Shared commit counter for triggering auto-GC.
53    commit_counter: Arc<AtomicUsize>,
54    /// GC every N commits (0 = disabled).
55    gc_interval: usize,
56    /// CDC log for change tracking.
57    #[cfg(feature = "cdc")]
58    cdc_log: Arc<crate::cdc::CdcLog>,
59}
60
61impl Session {
62    /// Creates a new session with adaptive execution configuration.
63    #[allow(dead_code, clippy::too_many_arguments)]
64    pub(crate) fn with_adaptive(
65        store: Arc<LpgStore>,
66        tx_manager: Arc<TransactionManager>,
67        query_cache: Arc<QueryCache>,
68        adaptive_config: AdaptiveConfig,
69        factorized_execution: bool,
70        graph_model: GraphModel,
71        query_timeout: Option<Duration>,
72        commit_counter: Arc<AtomicUsize>,
73        gc_interval: usize,
74    ) -> Self {
75        Self {
76            store,
77            #[cfg(feature = "rdf")]
78            rdf_store: Arc::new(RdfStore::new()),
79            tx_manager,
80            query_cache,
81            current_tx: None,
82            auto_commit: true,
83            adaptive_config,
84            factorized_execution,
85            graph_model,
86            query_timeout,
87            commit_counter,
88            gc_interval,
89            #[cfg(feature = "cdc")]
90            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
91        }
92    }
93
94    /// Sets the CDC log for this session (shared with the database).
95    #[cfg(feature = "cdc")]
96    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
97        self.cdc_log = cdc_log;
98    }
99
100    /// Creates a new session with RDF store and adaptive configuration.
101    #[cfg(feature = "rdf")]
102    #[allow(clippy::too_many_arguments)]
103    pub(crate) fn with_rdf_store_and_adaptive(
104        store: Arc<LpgStore>,
105        rdf_store: Arc<RdfStore>,
106        tx_manager: Arc<TransactionManager>,
107        query_cache: Arc<QueryCache>,
108        adaptive_config: AdaptiveConfig,
109        factorized_execution: bool,
110        graph_model: GraphModel,
111        query_timeout: Option<Duration>,
112        commit_counter: Arc<AtomicUsize>,
113        gc_interval: usize,
114    ) -> Self {
115        Self {
116            store,
117            rdf_store,
118            tx_manager,
119            query_cache,
120            current_tx: None,
121            auto_commit: true,
122            adaptive_config,
123            factorized_execution,
124            graph_model,
125            query_timeout,
126            commit_counter,
127            gc_interval,
128            #[cfg(feature = "cdc")]
129            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
130        }
131    }
132
133    /// Returns the graph model this session operates on.
134    #[must_use]
135    pub fn graph_model(&self) -> GraphModel {
136        self.graph_model
137    }
138
139    /// Checks that the session's graph model supports LPG operations.
140    fn require_lpg(&self, language: &str) -> Result<()> {
141        if self.graph_model == GraphModel::Rdf {
142            return Err(grafeo_common::utils::error::Error::Internal(format!(
143                "This is an RDF database. {language} queries require an LPG database."
144            )));
145        }
146        Ok(())
147    }
148
149    /// Executes a GQL query.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the query fails to parse or execute.
154    ///
155    /// # Examples
156    ///
157    /// ```no_run
158    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
159    /// use grafeo_engine::GrafeoDB;
160    ///
161    /// let db = GrafeoDB::new_in_memory();
162    /// let session = db.session();
163    ///
164    /// // Create a node
165    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
166    ///
167    /// // Query nodes
168    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
169    /// for row in &result.rows {
170    ///     println!("{:?}", row);
171    /// }
172    /// # Ok(())
173    /// # }
174    /// ```
175    #[cfg(feature = "gql")]
176    pub fn execute(&self, query: &str) -> Result<QueryResult> {
177        self.require_lpg("GQL")?;
178
179        use crate::query::{
180            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
181            optimizer::Optimizer, processor::QueryLanguage,
182        };
183
184        let start_time = std::time::Instant::now();
185
186        // Create cache key for this query
187        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
188
189        // Try to get cached optimized plan
190        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
191            // Cache hit - skip parsing, translation, binding, and optimization
192            cached_plan
193        } else {
194            // Cache miss - run full pipeline
195
196            // Parse and translate the query to a logical plan
197            let logical_plan = gql_translator::translate(query)?;
198
199            // Semantic validation
200            let mut binder = Binder::new();
201            let _binding_context = binder.bind(&logical_plan)?;
202
203            // Optimize the plan
204            let optimizer = Optimizer::from_store(&self.store);
205            let plan = optimizer.optimize(logical_plan)?;
206
207            // Cache the optimized plan for future use
208            self.query_cache.put_optimized(cache_key, plan.clone());
209
210            plan
211        };
212
213        // Get transaction context for MVCC visibility
214        let (viewing_epoch, tx_id) = self.get_transaction_context();
215
216        // Convert to physical plan with transaction context
217        // (Physical planning cannot be cached as it depends on transaction state)
218        let planner = Planner::with_context(
219            Arc::clone(&self.store),
220            Arc::clone(&self.tx_manager),
221            tx_id,
222            viewing_epoch,
223        )
224        .with_factorized_execution(self.factorized_execution);
225        let mut physical_plan = planner.plan(&optimized_plan)?;
226
227        // Execute the plan
228        let executor = Executor::with_columns(physical_plan.columns.clone())
229            .with_deadline(self.query_deadline());
230        let mut result = executor.execute(physical_plan.operator.as_mut())?;
231
232        // Add execution metrics
233        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
234        let rows_scanned = result.rows.len() as u64;
235        result.execution_time_ms = Some(elapsed_ms);
236        result.rows_scanned = Some(rows_scanned);
237
238        Ok(result)
239    }
240
241    /// Executes a GQL query with parameters.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if the query fails to parse or execute.
246    #[cfg(feature = "gql")]
247    pub fn execute_with_params(
248        &self,
249        query: &str,
250        params: std::collections::HashMap<String, Value>,
251    ) -> Result<QueryResult> {
252        self.require_lpg("GQL")?;
253
254        use crate::query::processor::{QueryLanguage, QueryProcessor};
255
256        // Get transaction context for MVCC visibility
257        let (viewing_epoch, tx_id) = self.get_transaction_context();
258
259        // Create processor with transaction context
260        let processor =
261            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
262
263        // Apply transaction context if in a transaction
264        let processor = if let Some(tx_id) = tx_id {
265            processor.with_tx_context(viewing_epoch, tx_id)
266        } else {
267            processor
268        };
269
270        processor.process(query, QueryLanguage::Gql, Some(&params))
271    }
272
273    /// Executes a GQL query with parameters.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if no query language is enabled.
278    #[cfg(not(any(feature = "gql", feature = "cypher")))]
279    pub fn execute_with_params(
280        &self,
281        _query: &str,
282        _params: std::collections::HashMap<String, Value>,
283    ) -> Result<QueryResult> {
284        Err(grafeo_common::utils::error::Error::Internal(
285            "No query language enabled".to_string(),
286        ))
287    }
288
289    /// Executes a GQL query.
290    ///
291    /// # Errors
292    ///
293    /// Returns an error if no query language is enabled.
294    #[cfg(not(any(feature = "gql", feature = "cypher")))]
295    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
296        Err(grafeo_common::utils::error::Error::Internal(
297            "No query language enabled".to_string(),
298        ))
299    }
300
301    /// Executes a Cypher query.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if the query fails to parse or execute.
306    #[cfg(feature = "cypher")]
307    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
308        use crate::query::{
309            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
310            optimizer::Optimizer, processor::QueryLanguage,
311        };
312
313        // Create cache key for this query
314        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
315
316        // Try to get cached optimized plan
317        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
318            cached_plan
319        } else {
320            // Parse and translate the query to a logical plan
321            let logical_plan = cypher_translator::translate(query)?;
322
323            // Semantic validation
324            let mut binder = Binder::new();
325            let _binding_context = binder.bind(&logical_plan)?;
326
327            // Optimize the plan
328            let optimizer = Optimizer::from_store(&self.store);
329            let plan = optimizer.optimize(logical_plan)?;
330
331            // Cache the optimized plan
332            self.query_cache.put_optimized(cache_key, plan.clone());
333
334            plan
335        };
336
337        // Get transaction context for MVCC visibility
338        let (viewing_epoch, tx_id) = self.get_transaction_context();
339
340        // Convert to physical plan with transaction context
341        let planner = Planner::with_context(
342            Arc::clone(&self.store),
343            Arc::clone(&self.tx_manager),
344            tx_id,
345            viewing_epoch,
346        )
347        .with_factorized_execution(self.factorized_execution);
348        let mut physical_plan = planner.plan(&optimized_plan)?;
349
350        // Execute the plan
351        let executor = Executor::with_columns(physical_plan.columns.clone())
352            .with_deadline(self.query_deadline());
353        executor.execute(physical_plan.operator.as_mut())
354    }
355
356    /// Executes a Gremlin query.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the query fails to parse or execute.
361    ///
362    /// # Examples
363    ///
364    /// ```no_run
365    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
366    /// use grafeo_engine::GrafeoDB;
367    ///
368    /// let db = GrafeoDB::new_in_memory();
369    /// let session = db.session();
370    ///
371    /// // Create some nodes first
372    /// session.create_node(&["Person"]);
373    ///
374    /// // Query using Gremlin
375    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
376    /// # Ok(())
377    /// # }
378    /// ```
379    #[cfg(feature = "gremlin")]
380    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
381        use crate::query::{
382            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
383        };
384
385        // Parse and translate the query to a logical plan
386        let logical_plan = gremlin_translator::translate(query)?;
387
388        // Semantic validation
389        let mut binder = Binder::new();
390        let _binding_context = binder.bind(&logical_plan)?;
391
392        // Optimize the plan
393        let optimizer = Optimizer::from_store(&self.store);
394        let optimized_plan = optimizer.optimize(logical_plan)?;
395
396        // Get transaction context for MVCC visibility
397        let (viewing_epoch, tx_id) = self.get_transaction_context();
398
399        // Convert to physical plan with transaction context
400        let planner = Planner::with_context(
401            Arc::clone(&self.store),
402            Arc::clone(&self.tx_manager),
403            tx_id,
404            viewing_epoch,
405        )
406        .with_factorized_execution(self.factorized_execution);
407        let mut physical_plan = planner.plan(&optimized_plan)?;
408
409        // Execute the plan
410        let executor = Executor::with_columns(physical_plan.columns.clone())
411            .with_deadline(self.query_deadline());
412        executor.execute(physical_plan.operator.as_mut())
413    }
414
415    /// Executes a Gremlin query with parameters.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if the query fails to parse or execute.
420    #[cfg(feature = "gremlin")]
421    pub fn execute_gremlin_with_params(
422        &self,
423        query: &str,
424        params: std::collections::HashMap<String, Value>,
425    ) -> Result<QueryResult> {
426        use crate::query::processor::{QueryLanguage, QueryProcessor};
427
428        // Get transaction context for MVCC visibility
429        let (viewing_epoch, tx_id) = self.get_transaction_context();
430
431        // Create processor with transaction context
432        let processor =
433            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
434
435        // Apply transaction context if in a transaction
436        let processor = if let Some(tx_id) = tx_id {
437            processor.with_tx_context(viewing_epoch, tx_id)
438        } else {
439            processor
440        };
441
442        processor.process(query, QueryLanguage::Gremlin, Some(&params))
443    }
444
445    /// Executes a GraphQL query against the LPG store.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the query fails to parse or execute.
450    ///
451    /// # Examples
452    ///
453    /// ```no_run
454    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
455    /// use grafeo_engine::GrafeoDB;
456    ///
457    /// let db = GrafeoDB::new_in_memory();
458    /// let session = db.session();
459    ///
460    /// // Create some nodes first
461    /// session.create_node(&["User"]);
462    ///
463    /// // Query using GraphQL
464    /// let result = session.execute_graphql("query { user { id name } }")?;
465    /// # Ok(())
466    /// # }
467    /// ```
468    #[cfg(feature = "graphql")]
469    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
470        use crate::query::{
471            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
472        };
473
474        // Parse and translate the query to a logical plan
475        let logical_plan = graphql_translator::translate(query)?;
476
477        // Semantic validation
478        let mut binder = Binder::new();
479        let _binding_context = binder.bind(&logical_plan)?;
480
481        // Optimize the plan
482        let optimizer = Optimizer::from_store(&self.store);
483        let optimized_plan = optimizer.optimize(logical_plan)?;
484
485        // Get transaction context for MVCC visibility
486        let (viewing_epoch, tx_id) = self.get_transaction_context();
487
488        // Convert to physical plan with transaction context
489        let planner = Planner::with_context(
490            Arc::clone(&self.store),
491            Arc::clone(&self.tx_manager),
492            tx_id,
493            viewing_epoch,
494        )
495        .with_factorized_execution(self.factorized_execution);
496        let mut physical_plan = planner.plan(&optimized_plan)?;
497
498        // Execute the plan
499        let executor = Executor::with_columns(physical_plan.columns.clone())
500            .with_deadline(self.query_deadline());
501        executor.execute(physical_plan.operator.as_mut())
502    }
503
504    /// Executes a GraphQL query with parameters.
505    ///
506    /// # Errors
507    ///
508    /// Returns an error if the query fails to parse or execute.
509    #[cfg(feature = "graphql")]
510    pub fn execute_graphql_with_params(
511        &self,
512        query: &str,
513        params: std::collections::HashMap<String, Value>,
514    ) -> Result<QueryResult> {
515        use crate::query::processor::{QueryLanguage, QueryProcessor};
516
517        // Get transaction context for MVCC visibility
518        let (viewing_epoch, tx_id) = self.get_transaction_context();
519
520        // Create processor with transaction context
521        let processor =
522            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
523
524        // Apply transaction context if in a transaction
525        let processor = if let Some(tx_id) = tx_id {
526            processor.with_tx_context(viewing_epoch, tx_id)
527        } else {
528            processor
529        };
530
531        processor.process(query, QueryLanguage::GraphQL, Some(&params))
532    }
533
534    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if the query fails to parse or execute.
539    ///
540    /// # Examples
541    ///
542    /// ```no_run
543    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
544    /// use grafeo_engine::GrafeoDB;
545    ///
546    /// let db = GrafeoDB::new_in_memory();
547    /// let session = db.session();
548    ///
549    /// let result = session.execute_sql(
550    ///     "SELECT * FROM GRAPH_TABLE (
551    ///         MATCH (n:Person)
552    ///         COLUMNS (n.name AS name)
553    ///     )"
554    /// )?;
555    /// # Ok(())
556    /// # }
557    /// ```
558    #[cfg(feature = "sql-pgq")]
559    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
560        use crate::query::{
561            Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
562            plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
563        };
564
565        // Parse and translate (always needed to check for DDL)
566        let logical_plan = sql_pgq_translator::translate(query)?;
567
568        // Handle DDL statements directly (they don't go through the query pipeline)
569        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
570            return Ok(QueryResult {
571                columns: vec!["status".into()],
572                column_types: vec![grafeo_common::types::LogicalType::String],
573                rows: vec![vec![Value::from(format!(
574                    "Property graph '{}' created ({} node tables, {} edge tables)",
575                    cpg.name,
576                    cpg.node_tables.len(),
577                    cpg.edge_tables.len()
578                ))]],
579                execution_time_ms: None,
580                rows_scanned: None,
581            });
582        }
583
584        // Create cache key for query plans
585        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
586
587        // Try to get cached optimized plan
588        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
589            cached_plan
590        } else {
591            // Semantic validation
592            let mut binder = Binder::new();
593            let _binding_context = binder.bind(&logical_plan)?;
594
595            // Optimize the plan
596            let optimizer = Optimizer::from_store(&self.store);
597            let plan = optimizer.optimize(logical_plan)?;
598
599            // Cache the optimized plan
600            self.query_cache.put_optimized(cache_key, plan.clone());
601
602            plan
603        };
604
605        // Get transaction context for MVCC visibility
606        let (viewing_epoch, tx_id) = self.get_transaction_context();
607
608        // Convert to physical plan with transaction context
609        let planner = Planner::with_context(
610            Arc::clone(&self.store),
611            Arc::clone(&self.tx_manager),
612            tx_id,
613            viewing_epoch,
614        )
615        .with_factorized_execution(self.factorized_execution);
616        let mut physical_plan = planner.plan(&optimized_plan)?;
617
618        // Execute the plan
619        let executor = Executor::with_columns(physical_plan.columns.clone())
620            .with_deadline(self.query_deadline());
621        executor.execute(physical_plan.operator.as_mut())
622    }
623
624    /// Executes a SQL/PGQ query with parameters.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the query fails to parse or execute.
629    #[cfg(feature = "sql-pgq")]
630    pub fn execute_sql_with_params(
631        &self,
632        query: &str,
633        params: std::collections::HashMap<String, Value>,
634    ) -> Result<QueryResult> {
635        use crate::query::processor::{QueryLanguage, QueryProcessor};
636
637        // Get transaction context for MVCC visibility
638        let (viewing_epoch, tx_id) = self.get_transaction_context();
639
640        // Create processor with transaction context
641        let processor =
642            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
643
644        // Apply transaction context if in a transaction
645        let processor = if let Some(tx_id) = tx_id {
646            processor.with_tx_context(viewing_epoch, tx_id)
647        } else {
648            processor
649        };
650
651        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
652    }
653
654    /// Executes a SPARQL query.
655    ///
656    /// # Errors
657    ///
658    /// Returns an error if the query fails to parse or execute.
659    #[cfg(all(feature = "sparql", feature = "rdf"))]
660    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
661        use crate::query::{
662            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
663        };
664
665        // Parse and translate the SPARQL query to a logical plan
666        let logical_plan = sparql_translator::translate(query)?;
667
668        // Optimize the plan
669        let optimizer = Optimizer::from_store(&self.store);
670        let optimized_plan = optimizer.optimize(logical_plan)?;
671
672        // Convert to physical plan using RDF planner
673        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
674        let mut physical_plan = planner.plan(&optimized_plan)?;
675
676        // Execute the plan
677        let executor = Executor::with_columns(physical_plan.columns.clone())
678            .with_deadline(self.query_deadline());
679        executor.execute(physical_plan.operator.as_mut())
680    }
681
682    /// Executes a SPARQL query with parameters.
683    ///
684    /// # Errors
685    ///
686    /// Returns an error if the query fails to parse or execute.
687    #[cfg(all(feature = "sparql", feature = "rdf"))]
688    pub fn execute_sparql_with_params(
689        &self,
690        query: &str,
691        _params: std::collections::HashMap<String, Value>,
692    ) -> Result<QueryResult> {
693        // TODO: Implement parameter substitution for SPARQL
694        // For now, just execute the query without parameters
695        self.execute_sparql(query)
696    }
697
698    /// Begins a new transaction.
699    ///
700    /// # Errors
701    ///
702    /// Returns an error if a transaction is already active.
703    ///
704    /// # Examples
705    ///
706    /// ```no_run
707    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
708    /// use grafeo_engine::GrafeoDB;
709    ///
710    /// let db = GrafeoDB::new_in_memory();
711    /// let mut session = db.session();
712    ///
713    /// session.begin_tx()?;
714    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
715    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
716    /// session.commit()?; // Both inserts committed atomically
717    /// # Ok(())
718    /// # }
719    /// ```
720    pub fn begin_tx(&mut self) -> Result<()> {
721        if self.current_tx.is_some() {
722            return Err(grafeo_common::utils::error::Error::Transaction(
723                grafeo_common::utils::error::TransactionError::InvalidState(
724                    "Transaction already active".to_string(),
725                ),
726            ));
727        }
728
729        let tx_id = self.tx_manager.begin();
730        self.current_tx = Some(tx_id);
731        Ok(())
732    }
733
734    /// Begins a transaction with a specific isolation level.
735    ///
736    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
737    ///
738    /// # Errors
739    ///
740    /// Returns an error if a transaction is already active.
741    pub fn begin_tx_with_isolation(
742        &mut self,
743        isolation_level: crate::transaction::IsolationLevel,
744    ) -> Result<()> {
745        if self.current_tx.is_some() {
746            return Err(grafeo_common::utils::error::Error::Transaction(
747                grafeo_common::utils::error::TransactionError::InvalidState(
748                    "Transaction already active".to_string(),
749                ),
750            ));
751        }
752
753        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
754        self.current_tx = Some(tx_id);
755        Ok(())
756    }
757
758    /// Commits the current transaction.
759    ///
760    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
761    ///
762    /// # Errors
763    ///
764    /// Returns an error if no transaction is active.
765    pub fn commit(&mut self) -> Result<()> {
766        let tx_id = self.current_tx.take().ok_or_else(|| {
767            grafeo_common::utils::error::Error::Transaction(
768                grafeo_common::utils::error::TransactionError::InvalidState(
769                    "No active transaction".to_string(),
770                ),
771            )
772        })?;
773
774        // Commit RDF store pending operations
775        #[cfg(feature = "rdf")]
776        self.rdf_store.commit_tx(tx_id);
777
778        self.tx_manager.commit(tx_id)?;
779
780        // Auto-GC: periodically prune old MVCC versions
781        if self.gc_interval > 0 {
782            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
783            if count.is_multiple_of(self.gc_interval) {
784                let min_epoch = self.tx_manager.min_active_epoch();
785                self.store.gc_versions(min_epoch);
786                self.tx_manager.gc();
787            }
788        }
789
790        Ok(())
791    }
792
793    /// Aborts the current transaction.
794    ///
795    /// Discards all changes since [`begin_tx`](Self::begin_tx).
796    ///
797    /// # Errors
798    ///
799    /// Returns an error if no transaction is active.
800    ///
801    /// # Examples
802    ///
803    /// ```no_run
804    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
805    /// use grafeo_engine::GrafeoDB;
806    ///
807    /// let db = GrafeoDB::new_in_memory();
808    /// let mut session = db.session();
809    ///
810    /// session.begin_tx()?;
811    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
812    /// session.rollback()?; // Insert is discarded
813    /// # Ok(())
814    /// # }
815    /// ```
816    pub fn rollback(&mut self) -> Result<()> {
817        let tx_id = self.current_tx.take().ok_or_else(|| {
818            grafeo_common::utils::error::Error::Transaction(
819                grafeo_common::utils::error::TransactionError::InvalidState(
820                    "No active transaction".to_string(),
821                ),
822            )
823        })?;
824
825        // Discard uncommitted versions in the LPG store
826        self.store.discard_uncommitted_versions(tx_id);
827
828        // Discard pending operations in the RDF store
829        #[cfg(feature = "rdf")]
830        self.rdf_store.rollback_tx(tx_id);
831
832        // Mark transaction as aborted in the manager
833        self.tx_manager.abort(tx_id)
834    }
835
836    /// Returns whether a transaction is active.
837    #[must_use]
838    pub fn in_transaction(&self) -> bool {
839        self.current_tx.is_some()
840    }
841
842    /// Sets auto-commit mode.
843    pub fn set_auto_commit(&mut self, auto_commit: bool) {
844        self.auto_commit = auto_commit;
845    }
846
847    /// Returns whether auto-commit is enabled.
848    #[must_use]
849    pub fn auto_commit(&self) -> bool {
850        self.auto_commit
851    }
852
853    /// Computes the wall-clock deadline for query execution.
854    #[must_use]
855    fn query_deadline(&self) -> Option<Instant> {
856        self.query_timeout.map(|d| Instant::now() + d)
857    }
858
859    /// Returns the current transaction context for MVCC visibility.
860    ///
861    /// Returns `(viewing_epoch, tx_id)` where:
862    /// - `viewing_epoch` is the epoch at which to check version visibility
863    /// - `tx_id` is the current transaction ID (if in a transaction)
864    #[must_use]
865    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
866        if let Some(tx_id) = self.current_tx {
867            // In a transaction - use the transaction's start epoch
868            let epoch = self
869                .tx_manager
870                .start_epoch(tx_id)
871                .unwrap_or_else(|| self.tx_manager.current_epoch());
872            (epoch, Some(tx_id))
873        } else {
874            // No transaction - use current epoch
875            (self.tx_manager.current_epoch(), None)
876        }
877    }
878
879    /// Creates a node directly (bypassing query execution).
880    ///
881    /// This is a low-level API for testing and direct manipulation.
882    /// If a transaction is active, the node will be versioned with the transaction ID.
883    pub fn create_node(&self, labels: &[&str]) -> NodeId {
884        let (epoch, tx_id) = self.get_transaction_context();
885        self.store
886            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
887    }
888
889    /// Creates a node with properties.
890    ///
891    /// If a transaction is active, the node will be versioned with the transaction ID.
892    pub fn create_node_with_props<'a>(
893        &self,
894        labels: &[&str],
895        properties: impl IntoIterator<Item = (&'a str, Value)>,
896    ) -> NodeId {
897        let (epoch, tx_id) = self.get_transaction_context();
898        self.store.create_node_with_props_versioned(
899            labels,
900            properties.into_iter().map(|(k, v)| (k, v)),
901            epoch,
902            tx_id.unwrap_or(TxId::SYSTEM),
903        )
904    }
905
906    /// Creates an edge between two nodes.
907    ///
908    /// This is a low-level API for testing and direct manipulation.
909    /// If a transaction is active, the edge will be versioned with the transaction ID.
910    pub fn create_edge(
911        &self,
912        src: NodeId,
913        dst: NodeId,
914        edge_type: &str,
915    ) -> grafeo_common::types::EdgeId {
916        let (epoch, tx_id) = self.get_transaction_context();
917        self.store
918            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
919    }
920
921    // =========================================================================
922    // Direct Lookup APIs (bypass query planning for O(1) point reads)
923    // =========================================================================
924
925    /// Gets a node by ID directly, bypassing query planning.
926    ///
927    /// This is the fastest way to retrieve a single node when you know its ID.
928    /// Skips parsing, binding, optimization, and physical planning entirely.
929    ///
930    /// # Performance
931    ///
932    /// - Time complexity: O(1) average case
933    /// - No lock contention (uses DashMap internally)
934    /// - ~20-30x faster than equivalent MATCH query
935    ///
936    /// # Example
937    ///
938    /// ```no_run
939    /// # use grafeo_engine::GrafeoDB;
940    /// # let db = GrafeoDB::new_in_memory();
941    /// let session = db.session();
942    /// let node_id = session.create_node(&["Person"]);
943    ///
944    /// // Direct lookup - O(1), no query planning
945    /// let node = session.get_node(node_id);
946    /// assert!(node.is_some());
947    /// ```
948    #[must_use]
949    pub fn get_node(&self, id: NodeId) -> Option<Node> {
950        let (epoch, tx_id) = self.get_transaction_context();
951        self.store
952            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
953    }
954
955    /// Gets a single property from a node by ID, bypassing query planning.
956    ///
957    /// More efficient than `get_node()` when you only need one property,
958    /// as it avoids loading the full node with all properties.
959    ///
960    /// # Performance
961    ///
962    /// - Time complexity: O(1) average case
963    /// - No query planning overhead
964    ///
965    /// # Example
966    ///
967    /// ```no_run
968    /// # use grafeo_engine::GrafeoDB;
969    /// # use grafeo_common::types::Value;
970    /// # let db = GrafeoDB::new_in_memory();
971    /// let session = db.session();
972    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
973    ///
974    /// // Direct property access - O(1)
975    /// let name = session.get_node_property(id, "name");
976    /// assert_eq!(name, Some(Value::String("Alice".into())));
977    /// ```
978    #[must_use]
979    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
980        self.get_node(id)
981            .and_then(|node| node.get_property(key).cloned())
982    }
983
984    /// Gets an edge by ID directly, bypassing query planning.
985    ///
986    /// # Performance
987    ///
988    /// - Time complexity: O(1) average case
989    /// - No lock contention
990    #[must_use]
991    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
992        let (epoch, tx_id) = self.get_transaction_context();
993        self.store
994            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
995    }
996
997    /// Gets outgoing neighbors of a node directly, bypassing query planning.
998    ///
999    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
1000    ///
1001    /// # Performance
1002    ///
1003    /// - Time complexity: O(degree) where degree is the number of outgoing edges
1004    /// - Uses adjacency index for direct access
1005    /// - ~10-20x faster than equivalent MATCH query
1006    ///
1007    /// # Example
1008    ///
1009    /// ```no_run
1010    /// # use grafeo_engine::GrafeoDB;
1011    /// # let db = GrafeoDB::new_in_memory();
1012    /// let session = db.session();
1013    /// let alice = session.create_node(&["Person"]);
1014    /// let bob = session.create_node(&["Person"]);
1015    /// session.create_edge(alice, bob, "KNOWS");
1016    ///
1017    /// // Direct neighbor lookup - O(degree)
1018    /// let neighbors = session.get_neighbors_outgoing(alice);
1019    /// assert_eq!(neighbors.len(), 1);
1020    /// assert_eq!(neighbors[0].0, bob);
1021    /// ```
1022    #[must_use]
1023    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1024        self.store.edges_from(node, Direction::Outgoing).collect()
1025    }
1026
1027    /// Gets incoming neighbors of a node directly, bypassing query planning.
1028    ///
1029    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
1030    ///
1031    /// # Performance
1032    ///
1033    /// - Time complexity: O(degree) where degree is the number of incoming edges
1034    /// - Uses backward adjacency index for direct access
1035    #[must_use]
1036    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1037        self.store.edges_from(node, Direction::Incoming).collect()
1038    }
1039
1040    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
1041    ///
1042    /// # Example
1043    ///
1044    /// ```no_run
1045    /// # use grafeo_engine::GrafeoDB;
1046    /// # let db = GrafeoDB::new_in_memory();
1047    /// # let session = db.session();
1048    /// # let alice = session.create_node(&["Person"]);
1049    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1050    /// ```
1051    #[must_use]
1052    pub fn get_neighbors_outgoing_by_type(
1053        &self,
1054        node: NodeId,
1055        edge_type: &str,
1056    ) -> Vec<(NodeId, EdgeId)> {
1057        self.store
1058            .edges_from(node, Direction::Outgoing)
1059            .filter(|(_, edge_id)| {
1060                self.get_edge(*edge_id)
1061                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
1062            })
1063            .collect()
1064    }
1065
1066    /// Checks if a node exists, bypassing query planning.
1067    ///
1068    /// # Performance
1069    ///
1070    /// - Time complexity: O(1)
1071    /// - Fastest existence check available
1072    #[must_use]
1073    pub fn node_exists(&self, id: NodeId) -> bool {
1074        self.get_node(id).is_some()
1075    }
1076
1077    /// Checks if an edge exists, bypassing query planning.
1078    #[must_use]
1079    pub fn edge_exists(&self, id: EdgeId) -> bool {
1080        self.get_edge(id).is_some()
1081    }
1082
1083    /// Gets the degree (number of edges) of a node.
1084    ///
1085    /// Returns (outgoing_degree, incoming_degree).
1086    #[must_use]
1087    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1088        let out = self.store.out_degree(node);
1089        let in_degree = self.store.in_degree(node);
1090        (out, in_degree)
1091    }
1092
1093    /// Batch lookup of multiple nodes by ID.
1094    ///
1095    /// More efficient than calling `get_node()` in a loop because it
1096    /// amortizes overhead.
1097    ///
1098    /// # Performance
1099    ///
1100    /// - Time complexity: O(n) where n is the number of IDs
1101    /// - Better cache utilization than individual lookups
1102    #[must_use]
1103    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1104        let (epoch, tx_id) = self.get_transaction_context();
1105        let tx = tx_id.unwrap_or(TxId::SYSTEM);
1106        ids.iter()
1107            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1108            .collect()
1109    }
1110
1111    // ── Change Data Capture ─────────────────────────────────────────────
1112
1113    /// Returns the full change history for an entity (node or edge).
1114    #[cfg(feature = "cdc")]
1115    pub fn history(
1116        &self,
1117        entity_id: impl Into<crate::cdc::EntityId>,
1118    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1119        Ok(self.cdc_log.history(entity_id.into()))
1120    }
1121
1122    /// Returns change events for an entity since the given epoch.
1123    #[cfg(feature = "cdc")]
1124    pub fn history_since(
1125        &self,
1126        entity_id: impl Into<crate::cdc::EntityId>,
1127        since_epoch: EpochId,
1128    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1129        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1130    }
1131
1132    /// Returns all change events across all entities in an epoch range.
1133    #[cfg(feature = "cdc")]
1134    pub fn changes_between(
1135        &self,
1136        start_epoch: EpochId,
1137        end_epoch: EpochId,
1138    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1139        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1140    }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145    use crate::database::GrafeoDB;
1146
1147    #[test]
1148    fn test_session_create_node() {
1149        let db = GrafeoDB::new_in_memory();
1150        let session = db.session();
1151
1152        let id = session.create_node(&["Person"]);
1153        assert!(id.is_valid());
1154        assert_eq!(db.node_count(), 1);
1155    }
1156
1157    #[test]
1158    fn test_session_transaction() {
1159        let db = GrafeoDB::new_in_memory();
1160        let mut session = db.session();
1161
1162        assert!(!session.in_transaction());
1163
1164        session.begin_tx().unwrap();
1165        assert!(session.in_transaction());
1166
1167        session.commit().unwrap();
1168        assert!(!session.in_transaction());
1169    }
1170
1171    #[test]
1172    fn test_session_transaction_context() {
1173        let db = GrafeoDB::new_in_memory();
1174        let mut session = db.session();
1175
1176        // Without transaction - context should have current epoch and no tx_id
1177        let (_epoch1, tx_id1) = session.get_transaction_context();
1178        assert!(tx_id1.is_none());
1179
1180        // Start a transaction
1181        session.begin_tx().unwrap();
1182        let (epoch2, tx_id2) = session.get_transaction_context();
1183        assert!(tx_id2.is_some());
1184        // Transaction should have a valid epoch
1185        let _ = epoch2; // Use the variable
1186
1187        // Commit and verify
1188        session.commit().unwrap();
1189        let (epoch3, tx_id3) = session.get_transaction_context();
1190        assert!(tx_id3.is_none());
1191        // Epoch should have advanced after commit
1192        assert!(epoch3.as_u64() >= epoch2.as_u64());
1193    }
1194
1195    #[test]
1196    fn test_session_rollback() {
1197        let db = GrafeoDB::new_in_memory();
1198        let mut session = db.session();
1199
1200        session.begin_tx().unwrap();
1201        session.rollback().unwrap();
1202        assert!(!session.in_transaction());
1203    }
1204
1205    #[test]
1206    fn test_session_rollback_discards_versions() {
1207        use grafeo_common::types::TxId;
1208
1209        let db = GrafeoDB::new_in_memory();
1210
1211        // Create a node outside of any transaction (at system level)
1212        let node_before = db.store().create_node(&["Person"]);
1213        assert!(node_before.is_valid());
1214        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1215
1216        // Start a transaction
1217        let mut session = db.session();
1218        session.begin_tx().unwrap();
1219        let tx_id = session.current_tx.unwrap();
1220
1221        // Create a node versioned with the transaction's ID
1222        let epoch = db.store().current_epoch();
1223        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1224        assert!(node_in_tx.is_valid());
1225
1226        // Should see 2 nodes at this point
1227        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1228
1229        // Rollback the transaction
1230        session.rollback().unwrap();
1231        assert!(!session.in_transaction());
1232
1233        // The node created in the transaction should be discarded
1234        // Only the first node should remain visible
1235        let count_after = db.node_count();
1236        assert_eq!(
1237            count_after, 1,
1238            "Rollback should discard uncommitted node, but got {count_after}"
1239        );
1240
1241        // The original node should still be accessible
1242        let current_epoch = db.store().current_epoch();
1243        assert!(
1244            db.store()
1245                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1246                .is_some(),
1247            "Original node should still exist"
1248        );
1249
1250        // The node created in the transaction should not be accessible
1251        assert!(
1252            db.store()
1253                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1254                .is_none(),
1255            "Transaction node should be gone"
1256        );
1257    }
1258
1259    #[test]
1260    fn test_session_create_node_in_transaction() {
1261        // Test that session.create_node() is transaction-aware
1262        let db = GrafeoDB::new_in_memory();
1263
1264        // Create a node outside of any transaction
1265        let node_before = db.create_node(&["Person"]);
1266        assert!(node_before.is_valid());
1267        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1268
1269        // Start a transaction and create a node through the session
1270        let mut session = db.session();
1271        session.begin_tx().unwrap();
1272
1273        // Create a node through session.create_node() - should be versioned with tx
1274        let node_in_tx = session.create_node(&["Person"]);
1275        assert!(node_in_tx.is_valid());
1276
1277        // Should see 2 nodes at this point
1278        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1279
1280        // Rollback the transaction
1281        session.rollback().unwrap();
1282
1283        // The node created via session.create_node() should be discarded
1284        let count_after = db.node_count();
1285        assert_eq!(
1286            count_after, 1,
1287            "Rollback should discard node created via session.create_node(), but got {count_after}"
1288        );
1289    }
1290
1291    #[test]
1292    fn test_session_create_node_with_props_in_transaction() {
1293        use grafeo_common::types::Value;
1294
1295        // Test that session.create_node_with_props() is transaction-aware
1296        let db = GrafeoDB::new_in_memory();
1297
1298        // Create a node outside of any transaction
1299        db.create_node(&["Person"]);
1300        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1301
1302        // Start a transaction and create a node with properties
1303        let mut session = db.session();
1304        session.begin_tx().unwrap();
1305
1306        let node_in_tx =
1307            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1308        assert!(node_in_tx.is_valid());
1309
1310        // Should see 2 nodes
1311        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1312
1313        // Rollback the transaction
1314        session.rollback().unwrap();
1315
1316        // The node should be discarded
1317        let count_after = db.node_count();
1318        assert_eq!(
1319            count_after, 1,
1320            "Rollback should discard node created via session.create_node_with_props()"
1321        );
1322    }
1323
1324    #[cfg(feature = "gql")]
1325    mod gql_tests {
1326        use super::*;
1327
1328        #[test]
1329        fn test_gql_query_execution() {
1330            let db = GrafeoDB::new_in_memory();
1331            let session = db.session();
1332
1333            // Create some test data
1334            session.create_node(&["Person"]);
1335            session.create_node(&["Person"]);
1336            session.create_node(&["Animal"]);
1337
1338            // Execute a GQL query
1339            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1340
1341            // Should return 2 Person nodes
1342            assert_eq!(result.row_count(), 2);
1343            assert_eq!(result.column_count(), 1);
1344            assert_eq!(result.columns[0], "n");
1345        }
1346
1347        #[test]
1348        fn test_gql_empty_result() {
1349            let db = GrafeoDB::new_in_memory();
1350            let session = db.session();
1351
1352            // No data in database
1353            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1354
1355            assert_eq!(result.row_count(), 0);
1356        }
1357
1358        #[test]
1359        fn test_gql_parse_error() {
1360            let db = GrafeoDB::new_in_memory();
1361            let session = db.session();
1362
1363            // Invalid GQL syntax
1364            let result = session.execute("MATCH (n RETURN n");
1365
1366            assert!(result.is_err());
1367        }
1368
1369        #[test]
1370        fn test_gql_relationship_traversal() {
1371            let db = GrafeoDB::new_in_memory();
1372            let session = db.session();
1373
1374            // Create a graph: Alice -> Bob, Alice -> Charlie
1375            let alice = session.create_node(&["Person"]);
1376            let bob = session.create_node(&["Person"]);
1377            let charlie = session.create_node(&["Person"]);
1378
1379            session.create_edge(alice, bob, "KNOWS");
1380            session.create_edge(alice, charlie, "KNOWS");
1381
1382            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1383            let result = session
1384                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1385                .unwrap();
1386
1387            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1388            assert_eq!(result.row_count(), 2);
1389            assert_eq!(result.column_count(), 2);
1390            assert_eq!(result.columns[0], "a");
1391            assert_eq!(result.columns[1], "b");
1392        }
1393
1394        #[test]
1395        fn test_gql_relationship_with_type_filter() {
1396            let db = GrafeoDB::new_in_memory();
1397            let session = db.session();
1398
1399            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1400            let alice = session.create_node(&["Person"]);
1401            let bob = session.create_node(&["Person"]);
1402            let charlie = session.create_node(&["Person"]);
1403
1404            session.create_edge(alice, bob, "KNOWS");
1405            session.create_edge(alice, charlie, "WORKS_WITH");
1406
1407            // Query only KNOWS relationships
1408            let result = session
1409                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1410                .unwrap();
1411
1412            // Should return only 1 row (Alice->Bob)
1413            assert_eq!(result.row_count(), 1);
1414        }
1415
1416        #[test]
1417        fn test_gql_semantic_error_undefined_variable() {
1418            let db = GrafeoDB::new_in_memory();
1419            let session = db.session();
1420
1421            // Reference undefined variable 'x' in RETURN
1422            let result = session.execute("MATCH (n:Person) RETURN x");
1423
1424            // Should fail with semantic error
1425            assert!(result.is_err());
1426            let Err(err) = result else {
1427                panic!("Expected error")
1428            };
1429            assert!(
1430                err.to_string().contains("Undefined variable"),
1431                "Expected undefined variable error, got: {}",
1432                err
1433            );
1434        }
1435
1436        #[test]
1437        fn test_gql_where_clause_property_filter() {
1438            use grafeo_common::types::Value;
1439
1440            let db = GrafeoDB::new_in_memory();
1441            let session = db.session();
1442
1443            // Create people with ages
1444            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1445            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1446            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1447
1448            // Query with WHERE clause: age > 30
1449            let result = session
1450                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1451                .unwrap();
1452
1453            // Should return 2 people (ages 35 and 45)
1454            assert_eq!(result.row_count(), 2);
1455        }
1456
1457        #[test]
1458        fn test_gql_where_clause_equality() {
1459            use grafeo_common::types::Value;
1460
1461            let db = GrafeoDB::new_in_memory();
1462            let session = db.session();
1463
1464            // Create people with names
1465            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1466            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1467            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1468
1469            // Query with WHERE clause: name = "Alice"
1470            let result = session
1471                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1472                .unwrap();
1473
1474            // Should return 2 people named Alice
1475            assert_eq!(result.row_count(), 2);
1476        }
1477
1478        #[test]
1479        fn test_gql_return_property_access() {
1480            use grafeo_common::types::Value;
1481
1482            let db = GrafeoDB::new_in_memory();
1483            let session = db.session();
1484
1485            // Create people with names and ages
1486            session.create_node_with_props(
1487                &["Person"],
1488                [
1489                    ("name", Value::String("Alice".into())),
1490                    ("age", Value::Int64(30)),
1491                ],
1492            );
1493            session.create_node_with_props(
1494                &["Person"],
1495                [
1496                    ("name", Value::String("Bob".into())),
1497                    ("age", Value::Int64(25)),
1498                ],
1499            );
1500
1501            // Query returning properties
1502            let result = session
1503                .execute("MATCH (n:Person) RETURN n.name, n.age")
1504                .unwrap();
1505
1506            // Should return 2 rows with name and age columns
1507            assert_eq!(result.row_count(), 2);
1508            assert_eq!(result.column_count(), 2);
1509            assert_eq!(result.columns[0], "n.name");
1510            assert_eq!(result.columns[1], "n.age");
1511
1512            // Check that we get actual values
1513            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1514            assert!(names.contains(&&Value::String("Alice".into())));
1515            assert!(names.contains(&&Value::String("Bob".into())));
1516        }
1517
1518        #[test]
1519        fn test_gql_return_mixed_expressions() {
1520            use grafeo_common::types::Value;
1521
1522            let db = GrafeoDB::new_in_memory();
1523            let session = db.session();
1524
1525            // Create a person
1526            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1527
1528            // Query returning both node and property
1529            let result = session
1530                .execute("MATCH (n:Person) RETURN n, n.name")
1531                .unwrap();
1532
1533            assert_eq!(result.row_count(), 1);
1534            assert_eq!(result.column_count(), 2);
1535            assert_eq!(result.columns[0], "n");
1536            assert_eq!(result.columns[1], "n.name");
1537
1538            // Second column should be the name
1539            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1540        }
1541    }
1542
1543    #[cfg(feature = "cypher")]
1544    mod cypher_tests {
1545        use super::*;
1546
1547        #[test]
1548        fn test_cypher_query_execution() {
1549            let db = GrafeoDB::new_in_memory();
1550            let session = db.session();
1551
1552            // Create some test data
1553            session.create_node(&["Person"]);
1554            session.create_node(&["Person"]);
1555            session.create_node(&["Animal"]);
1556
1557            // Execute a Cypher query
1558            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1559
1560            // Should return 2 Person nodes
1561            assert_eq!(result.row_count(), 2);
1562            assert_eq!(result.column_count(), 1);
1563            assert_eq!(result.columns[0], "n");
1564        }
1565
1566        #[test]
1567        fn test_cypher_empty_result() {
1568            let db = GrafeoDB::new_in_memory();
1569            let session = db.session();
1570
1571            // No data in database
1572            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1573
1574            assert_eq!(result.row_count(), 0);
1575        }
1576
1577        #[test]
1578        fn test_cypher_parse_error() {
1579            let db = GrafeoDB::new_in_memory();
1580            let session = db.session();
1581
1582            // Invalid Cypher syntax
1583            let result = session.execute_cypher("MATCH (n RETURN n");
1584
1585            assert!(result.is_err());
1586        }
1587    }
1588
1589    // ==================== Direct Lookup API Tests ====================
1590
1591    mod direct_lookup_tests {
1592        use super::*;
1593        use grafeo_common::types::Value;
1594
1595        #[test]
1596        fn test_get_node() {
1597            let db = GrafeoDB::new_in_memory();
1598            let session = db.session();
1599
1600            let id = session.create_node(&["Person"]);
1601            let node = session.get_node(id);
1602
1603            assert!(node.is_some());
1604            let node = node.unwrap();
1605            assert_eq!(node.id, id);
1606        }
1607
1608        #[test]
1609        fn test_get_node_not_found() {
1610            use grafeo_common::types::NodeId;
1611
1612            let db = GrafeoDB::new_in_memory();
1613            let session = db.session();
1614
1615            // Try to get a non-existent node
1616            let node = session.get_node(NodeId::new(9999));
1617            assert!(node.is_none());
1618        }
1619
1620        #[test]
1621        fn test_get_node_property() {
1622            let db = GrafeoDB::new_in_memory();
1623            let session = db.session();
1624
1625            let id = session
1626                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1627
1628            let name = session.get_node_property(id, "name");
1629            assert_eq!(name, Some(Value::String("Alice".into())));
1630
1631            // Non-existent property
1632            let missing = session.get_node_property(id, "missing");
1633            assert!(missing.is_none());
1634        }
1635
1636        #[test]
1637        fn test_get_edge() {
1638            let db = GrafeoDB::new_in_memory();
1639            let session = db.session();
1640
1641            let alice = session.create_node(&["Person"]);
1642            let bob = session.create_node(&["Person"]);
1643            let edge_id = session.create_edge(alice, bob, "KNOWS");
1644
1645            let edge = session.get_edge(edge_id);
1646            assert!(edge.is_some());
1647            let edge = edge.unwrap();
1648            assert_eq!(edge.id, edge_id);
1649            assert_eq!(edge.src, alice);
1650            assert_eq!(edge.dst, bob);
1651        }
1652
1653        #[test]
1654        fn test_get_edge_not_found() {
1655            use grafeo_common::types::EdgeId;
1656
1657            let db = GrafeoDB::new_in_memory();
1658            let session = db.session();
1659
1660            let edge = session.get_edge(EdgeId::new(9999));
1661            assert!(edge.is_none());
1662        }
1663
1664        #[test]
1665        fn test_get_neighbors_outgoing() {
1666            let db = GrafeoDB::new_in_memory();
1667            let session = db.session();
1668
1669            let alice = session.create_node(&["Person"]);
1670            let bob = session.create_node(&["Person"]);
1671            let carol = session.create_node(&["Person"]);
1672
1673            session.create_edge(alice, bob, "KNOWS");
1674            session.create_edge(alice, carol, "KNOWS");
1675
1676            let neighbors = session.get_neighbors_outgoing(alice);
1677            assert_eq!(neighbors.len(), 2);
1678
1679            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1680            assert!(neighbor_ids.contains(&bob));
1681            assert!(neighbor_ids.contains(&carol));
1682        }
1683
1684        #[test]
1685        fn test_get_neighbors_incoming() {
1686            let db = GrafeoDB::new_in_memory();
1687            let session = db.session();
1688
1689            let alice = session.create_node(&["Person"]);
1690            let bob = session.create_node(&["Person"]);
1691            let carol = session.create_node(&["Person"]);
1692
1693            session.create_edge(bob, alice, "KNOWS");
1694            session.create_edge(carol, alice, "KNOWS");
1695
1696            let neighbors = session.get_neighbors_incoming(alice);
1697            assert_eq!(neighbors.len(), 2);
1698
1699            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1700            assert!(neighbor_ids.contains(&bob));
1701            assert!(neighbor_ids.contains(&carol));
1702        }
1703
1704        #[test]
1705        fn test_get_neighbors_outgoing_by_type() {
1706            let db = GrafeoDB::new_in_memory();
1707            let session = db.session();
1708
1709            let alice = session.create_node(&["Person"]);
1710            let bob = session.create_node(&["Person"]);
1711            let company = session.create_node(&["Company"]);
1712
1713            session.create_edge(alice, bob, "KNOWS");
1714            session.create_edge(alice, company, "WORKS_AT");
1715
1716            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1717            assert_eq!(knows_neighbors.len(), 1);
1718            assert_eq!(knows_neighbors[0].0, bob);
1719
1720            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1721            assert_eq!(works_neighbors.len(), 1);
1722            assert_eq!(works_neighbors[0].0, company);
1723
1724            // No edges of this type
1725            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1726            assert!(no_neighbors.is_empty());
1727        }
1728
1729        #[test]
1730        fn test_node_exists() {
1731            use grafeo_common::types::NodeId;
1732
1733            let db = GrafeoDB::new_in_memory();
1734            let session = db.session();
1735
1736            let id = session.create_node(&["Person"]);
1737
1738            assert!(session.node_exists(id));
1739            assert!(!session.node_exists(NodeId::new(9999)));
1740        }
1741
1742        #[test]
1743        fn test_edge_exists() {
1744            use grafeo_common::types::EdgeId;
1745
1746            let db = GrafeoDB::new_in_memory();
1747            let session = db.session();
1748
1749            let alice = session.create_node(&["Person"]);
1750            let bob = session.create_node(&["Person"]);
1751            let edge_id = session.create_edge(alice, bob, "KNOWS");
1752
1753            assert!(session.edge_exists(edge_id));
1754            assert!(!session.edge_exists(EdgeId::new(9999)));
1755        }
1756
1757        #[test]
1758        fn test_get_degree() {
1759            let db = GrafeoDB::new_in_memory();
1760            let session = db.session();
1761
1762            let alice = session.create_node(&["Person"]);
1763            let bob = session.create_node(&["Person"]);
1764            let carol = session.create_node(&["Person"]);
1765
1766            // Alice knows Bob and Carol (2 outgoing)
1767            session.create_edge(alice, bob, "KNOWS");
1768            session.create_edge(alice, carol, "KNOWS");
1769            // Bob knows Alice (1 incoming for Alice)
1770            session.create_edge(bob, alice, "KNOWS");
1771
1772            let (out_degree, in_degree) = session.get_degree(alice);
1773            assert_eq!(out_degree, 2);
1774            assert_eq!(in_degree, 1);
1775
1776            // Node with no edges
1777            let lonely = session.create_node(&["Person"]);
1778            let (out, in_deg) = session.get_degree(lonely);
1779            assert_eq!(out, 0);
1780            assert_eq!(in_deg, 0);
1781        }
1782
1783        #[test]
1784        fn test_get_nodes_batch() {
1785            let db = GrafeoDB::new_in_memory();
1786            let session = db.session();
1787
1788            let alice = session.create_node(&["Person"]);
1789            let bob = session.create_node(&["Person"]);
1790            let carol = session.create_node(&["Person"]);
1791
1792            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1793            assert_eq!(nodes.len(), 3);
1794            assert!(nodes[0].is_some());
1795            assert!(nodes[1].is_some());
1796            assert!(nodes[2].is_some());
1797
1798            // With non-existent node
1799            use grafeo_common::types::NodeId;
1800            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1801            assert_eq!(nodes_with_missing.len(), 3);
1802            assert!(nodes_with_missing[0].is_some());
1803            assert!(nodes_with_missing[1].is_none()); // Missing node
1804            assert!(nodes_with_missing[2].is_some());
1805        }
1806
1807        #[test]
1808        fn test_auto_commit_setting() {
1809            let db = GrafeoDB::new_in_memory();
1810            let mut session = db.session();
1811
1812            // Default is auto-commit enabled
1813            assert!(session.auto_commit());
1814
1815            session.set_auto_commit(false);
1816            assert!(!session.auto_commit());
1817
1818            session.set_auto_commit(true);
1819            assert!(session.auto_commit());
1820        }
1821
1822        #[test]
1823        fn test_transaction_double_begin_error() {
1824            let db = GrafeoDB::new_in_memory();
1825            let mut session = db.session();
1826
1827            session.begin_tx().unwrap();
1828            let result = session.begin_tx();
1829
1830            assert!(result.is_err());
1831            // Clean up
1832            session.rollback().unwrap();
1833        }
1834
1835        #[test]
1836        fn test_commit_without_transaction_error() {
1837            let db = GrafeoDB::new_in_memory();
1838            let mut session = db.session();
1839
1840            let result = session.commit();
1841            assert!(result.is_err());
1842        }
1843
1844        #[test]
1845        fn test_rollback_without_transaction_error() {
1846            let db = GrafeoDB::new_in_memory();
1847            let mut session = db.session();
1848
1849            let result = session.rollback();
1850            assert!(result.is_err());
1851        }
1852
1853        #[test]
1854        fn test_create_edge_in_transaction() {
1855            let db = GrafeoDB::new_in_memory();
1856            let mut session = db.session();
1857
1858            // Create nodes outside transaction
1859            let alice = session.create_node(&["Person"]);
1860            let bob = session.create_node(&["Person"]);
1861
1862            // Create edge in transaction
1863            session.begin_tx().unwrap();
1864            let edge_id = session.create_edge(alice, bob, "KNOWS");
1865
1866            // Edge should be visible in the transaction
1867            assert!(session.edge_exists(edge_id));
1868
1869            // Commit
1870            session.commit().unwrap();
1871
1872            // Edge should still be visible
1873            assert!(session.edge_exists(edge_id));
1874        }
1875
1876        #[test]
1877        fn test_neighbors_empty_node() {
1878            let db = GrafeoDB::new_in_memory();
1879            let session = db.session();
1880
1881            let lonely = session.create_node(&["Person"]);
1882
1883            assert!(session.get_neighbors_outgoing(lonely).is_empty());
1884            assert!(session.get_neighbors_incoming(lonely).is_empty());
1885            assert!(
1886                session
1887                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1888                    .is_empty()
1889            );
1890        }
1891    }
1892}