Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::{AdaptiveConfig, GraphModel};
19use crate::database::QueryResult;
20use crate::query::cache::QueryCache;
21use crate::transaction::TransactionManager;
22
23/// Your handle to the database - execute queries and manage transactions.
24///
25/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
26/// tracks its own transaction state, so you can have multiple concurrent
27/// sessions without them interfering.
28pub struct Session {
29    /// The underlying store.
30    store: Arc<LpgStore>,
31    /// RDF triple store (if RDF feature is enabled).
32    #[cfg(feature = "rdf")]
33    #[allow(dead_code)]
34    rdf_store: Arc<RdfStore>,
35    /// Transaction manager.
36    tx_manager: Arc<TransactionManager>,
37    /// Query cache shared across sessions.
38    query_cache: Arc<QueryCache>,
39    /// Current transaction ID (if any).
40    current_tx: Option<TxId>,
41    /// Whether the session is in auto-commit mode.
42    auto_commit: bool,
43    /// Adaptive execution configuration.
44    #[allow(dead_code)]
45    adaptive_config: AdaptiveConfig,
46    /// Whether to use factorized execution for multi-hop queries.
47    factorized_execution: bool,
48    /// The graph data model this session operates on.
49    graph_model: GraphModel,
50    /// Maximum time a query may run before being cancelled.
51    query_timeout: Option<Duration>,
52    /// Shared commit counter for triggering auto-GC.
53    commit_counter: Arc<AtomicUsize>,
54    /// GC every N commits (0 = disabled).
55    gc_interval: usize,
56    /// CDC log for change tracking.
57    #[cfg(feature = "cdc")]
58    cdc_log: Arc<crate::cdc::CdcLog>,
59}
60
61impl Session {
62    /// Creates a new session with adaptive execution configuration.
63    #[allow(dead_code, clippy::too_many_arguments)]
64    pub(crate) fn with_adaptive(
65        store: Arc<LpgStore>,
66        tx_manager: Arc<TransactionManager>,
67        query_cache: Arc<QueryCache>,
68        adaptive_config: AdaptiveConfig,
69        factorized_execution: bool,
70        graph_model: GraphModel,
71        query_timeout: Option<Duration>,
72        commit_counter: Arc<AtomicUsize>,
73        gc_interval: usize,
74    ) -> Self {
75        Self {
76            store,
77            #[cfg(feature = "rdf")]
78            rdf_store: Arc::new(RdfStore::new()),
79            tx_manager,
80            query_cache,
81            current_tx: None,
82            auto_commit: true,
83            adaptive_config,
84            factorized_execution,
85            graph_model,
86            query_timeout,
87            commit_counter,
88            gc_interval,
89            #[cfg(feature = "cdc")]
90            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
91        }
92    }
93
94    /// Sets the CDC log for this session (shared with the database).
95    #[cfg(feature = "cdc")]
96    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
97        self.cdc_log = cdc_log;
98    }
99
100    /// Creates a new session with RDF store and adaptive configuration.
101    #[cfg(feature = "rdf")]
102    #[allow(clippy::too_many_arguments)]
103    pub(crate) fn with_rdf_store_and_adaptive(
104        store: Arc<LpgStore>,
105        rdf_store: Arc<RdfStore>,
106        tx_manager: Arc<TransactionManager>,
107        query_cache: Arc<QueryCache>,
108        adaptive_config: AdaptiveConfig,
109        factorized_execution: bool,
110        graph_model: GraphModel,
111        query_timeout: Option<Duration>,
112        commit_counter: Arc<AtomicUsize>,
113        gc_interval: usize,
114    ) -> Self {
115        Self {
116            store,
117            rdf_store,
118            tx_manager,
119            query_cache,
120            current_tx: None,
121            auto_commit: true,
122            adaptive_config,
123            factorized_execution,
124            graph_model,
125            query_timeout,
126            commit_counter,
127            gc_interval,
128            #[cfg(feature = "cdc")]
129            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
130        }
131    }
132
133    /// Returns the graph model this session operates on.
134    #[must_use]
135    pub fn graph_model(&self) -> GraphModel {
136        self.graph_model
137    }
138
139    /// Checks that the session's graph model supports LPG operations.
140    fn require_lpg(&self, language: &str) -> Result<()> {
141        if self.graph_model == GraphModel::Rdf {
142            return Err(grafeo_common::utils::error::Error::Internal(format!(
143                "This is an RDF database. {language} queries require an LPG database."
144            )));
145        }
146        Ok(())
147    }
148
149    /// Executes a GQL query.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the query fails to parse or execute.
154    ///
155    /// # Examples
156    ///
157    /// ```no_run
158    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
159    /// use grafeo_engine::GrafeoDB;
160    ///
161    /// let db = GrafeoDB::new_in_memory();
162    /// let session = db.session();
163    ///
164    /// // Create a node
165    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
166    ///
167    /// // Query nodes
168    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
169    /// for row in &result.rows {
170    ///     println!("{:?}", row);
171    /// }
172    /// # Ok(())
173    /// # }
174    /// ```
175    #[cfg(feature = "gql")]
176    pub fn execute(&self, query: &str) -> Result<QueryResult> {
177        self.require_lpg("GQL")?;
178
179        use crate::query::{
180            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
181            optimizer::Optimizer, processor::QueryLanguage,
182        };
183
184        let start_time = std::time::Instant::now();
185
186        // Create cache key for this query
187        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
188
189        // Try to get cached optimized plan
190        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
191            // Cache hit - skip parsing, translation, binding, and optimization
192            cached_plan
193        } else {
194            // Cache miss - run full pipeline
195
196            // Parse and translate the query to a logical plan
197            let logical_plan = gql_translator::translate(query)?;
198
199            // Semantic validation
200            let mut binder = Binder::new();
201            let _binding_context = binder.bind(&logical_plan)?;
202
203            // Optimize the plan
204            let optimizer = Optimizer::from_store(&self.store);
205            let plan = optimizer.optimize(logical_plan)?;
206
207            // Cache the optimized plan for future use
208            self.query_cache.put_optimized(cache_key, plan.clone());
209
210            plan
211        };
212
213        // Get transaction context for MVCC visibility
214        let (viewing_epoch, tx_id) = self.get_transaction_context();
215
216        // Convert to physical plan with transaction context
217        // (Physical planning cannot be cached as it depends on transaction state)
218        let planner = Planner::with_context(
219            Arc::clone(&self.store),
220            Arc::clone(&self.tx_manager),
221            tx_id,
222            viewing_epoch,
223        )
224        .with_factorized_execution(self.factorized_execution);
225        let mut physical_plan = planner.plan(&optimized_plan)?;
226
227        // Execute the plan
228        let executor = Executor::with_columns(physical_plan.columns.clone())
229            .with_deadline(self.query_deadline());
230        let mut result = executor.execute(physical_plan.operator.as_mut())?;
231
232        // Add execution metrics
233        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
234        let rows_scanned = result.rows.len() as u64;
235        result.execution_time_ms = Some(elapsed_ms);
236        result.rows_scanned = Some(rows_scanned);
237
238        Ok(result)
239    }
240
241    /// Executes a GQL query with parameters.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if the query fails to parse or execute.
246    #[cfg(feature = "gql")]
247    pub fn execute_with_params(
248        &self,
249        query: &str,
250        params: std::collections::HashMap<String, Value>,
251    ) -> Result<QueryResult> {
252        self.require_lpg("GQL")?;
253
254        use crate::query::processor::{QueryLanguage, QueryProcessor};
255
256        // Get transaction context for MVCC visibility
257        let (viewing_epoch, tx_id) = self.get_transaction_context();
258
259        // Create processor with transaction context
260        let processor =
261            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
262
263        // Apply transaction context if in a transaction
264        let processor = if let Some(tx_id) = tx_id {
265            processor.with_tx_context(viewing_epoch, tx_id)
266        } else {
267            processor
268        };
269
270        processor.process(query, QueryLanguage::Gql, Some(&params))
271    }
272
273    /// Executes a GQL query with parameters.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if no query language is enabled.
278    #[cfg(not(any(feature = "gql", feature = "cypher")))]
279    pub fn execute_with_params(
280        &self,
281        _query: &str,
282        _params: std::collections::HashMap<String, Value>,
283    ) -> Result<QueryResult> {
284        Err(grafeo_common::utils::error::Error::Internal(
285            "No query language enabled".to_string(),
286        ))
287    }
288
289    /// Executes a GQL query.
290    ///
291    /// # Errors
292    ///
293    /// Returns an error if no query language is enabled.
294    #[cfg(not(any(feature = "gql", feature = "cypher")))]
295    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
296        Err(grafeo_common::utils::error::Error::Internal(
297            "No query language enabled".to_string(),
298        ))
299    }
300
301    /// Executes a Cypher query.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if the query fails to parse or execute.
306    #[cfg(feature = "cypher")]
307    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
308        use crate::query::{
309            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
310            optimizer::Optimizer, processor::QueryLanguage,
311        };
312
313        // Create cache key for this query
314        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
315
316        // Try to get cached optimized plan
317        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
318            cached_plan
319        } else {
320            // Parse and translate the query to a logical plan
321            let logical_plan = cypher_translator::translate(query)?;
322
323            // Semantic validation
324            let mut binder = Binder::new();
325            let _binding_context = binder.bind(&logical_plan)?;
326
327            // Optimize the plan
328            let optimizer = Optimizer::from_store(&self.store);
329            let plan = optimizer.optimize(logical_plan)?;
330
331            // Cache the optimized plan
332            self.query_cache.put_optimized(cache_key, plan.clone());
333
334            plan
335        };
336
337        // Get transaction context for MVCC visibility
338        let (viewing_epoch, tx_id) = self.get_transaction_context();
339
340        // Convert to physical plan with transaction context
341        let planner = Planner::with_context(
342            Arc::clone(&self.store),
343            Arc::clone(&self.tx_manager),
344            tx_id,
345            viewing_epoch,
346        )
347        .with_factorized_execution(self.factorized_execution);
348        let mut physical_plan = planner.plan(&optimized_plan)?;
349
350        // Execute the plan
351        let executor = Executor::with_columns(physical_plan.columns.clone())
352            .with_deadline(self.query_deadline());
353        executor.execute(physical_plan.operator.as_mut())
354    }
355
356    /// Executes a Gremlin query.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the query fails to parse or execute.
361    ///
362    /// # Examples
363    ///
364    /// ```no_run
365    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
366    /// use grafeo_engine::GrafeoDB;
367    ///
368    /// let db = GrafeoDB::new_in_memory();
369    /// let session = db.session();
370    ///
371    /// // Create some nodes first
372    /// session.create_node(&["Person"]);
373    ///
374    /// // Query using Gremlin
375    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
376    /// # Ok(())
377    /// # }
378    /// ```
379    #[cfg(feature = "gremlin")]
380    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
381        use crate::query::{
382            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
383        };
384
385        // Parse and translate the query to a logical plan
386        let logical_plan = gremlin_translator::translate(query)?;
387
388        // Semantic validation
389        let mut binder = Binder::new();
390        let _binding_context = binder.bind(&logical_plan)?;
391
392        // Optimize the plan
393        let optimizer = Optimizer::from_store(&self.store);
394        let optimized_plan = optimizer.optimize(logical_plan)?;
395
396        // Get transaction context for MVCC visibility
397        let (viewing_epoch, tx_id) = self.get_transaction_context();
398
399        // Convert to physical plan with transaction context
400        let planner = Planner::with_context(
401            Arc::clone(&self.store),
402            Arc::clone(&self.tx_manager),
403            tx_id,
404            viewing_epoch,
405        )
406        .with_factorized_execution(self.factorized_execution);
407        let mut physical_plan = planner.plan(&optimized_plan)?;
408
409        // Execute the plan
410        let executor = Executor::with_columns(physical_plan.columns.clone())
411            .with_deadline(self.query_deadline());
412        executor.execute(physical_plan.operator.as_mut())
413    }
414
415    /// Executes a Gremlin query with parameters.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if the query fails to parse or execute.
420    #[cfg(feature = "gremlin")]
421    pub fn execute_gremlin_with_params(
422        &self,
423        query: &str,
424        params: std::collections::HashMap<String, Value>,
425    ) -> Result<QueryResult> {
426        use crate::query::processor::{QueryLanguage, QueryProcessor};
427
428        // Get transaction context for MVCC visibility
429        let (viewing_epoch, tx_id) = self.get_transaction_context();
430
431        // Create processor with transaction context
432        let processor =
433            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
434
435        // Apply transaction context if in a transaction
436        let processor = if let Some(tx_id) = tx_id {
437            processor.with_tx_context(viewing_epoch, tx_id)
438        } else {
439            processor
440        };
441
442        processor.process(query, QueryLanguage::Gremlin, Some(&params))
443    }
444
445    /// Executes a GraphQL query against the LPG store.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the query fails to parse or execute.
450    ///
451    /// # Examples
452    ///
453    /// ```no_run
454    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
455    /// use grafeo_engine::GrafeoDB;
456    ///
457    /// let db = GrafeoDB::new_in_memory();
458    /// let session = db.session();
459    ///
460    /// // Create some nodes first
461    /// session.create_node(&["User"]);
462    ///
463    /// // Query using GraphQL
464    /// let result = session.execute_graphql("query { user { id name } }")?;
465    /// # Ok(())
466    /// # }
467    /// ```
468    #[cfg(feature = "graphql")]
469    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
470        use crate::query::{
471            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
472        };
473
474        // Parse and translate the query to a logical plan
475        let logical_plan = graphql_translator::translate(query)?;
476
477        // Semantic validation
478        let mut binder = Binder::new();
479        let _binding_context = binder.bind(&logical_plan)?;
480
481        // Optimize the plan
482        let optimizer = Optimizer::from_store(&self.store);
483        let optimized_plan = optimizer.optimize(logical_plan)?;
484
485        // Get transaction context for MVCC visibility
486        let (viewing_epoch, tx_id) = self.get_transaction_context();
487
488        // Convert to physical plan with transaction context
489        let planner = Planner::with_context(
490            Arc::clone(&self.store),
491            Arc::clone(&self.tx_manager),
492            tx_id,
493            viewing_epoch,
494        )
495        .with_factorized_execution(self.factorized_execution);
496        let mut physical_plan = planner.plan(&optimized_plan)?;
497
498        // Execute the plan
499        let executor = Executor::with_columns(physical_plan.columns.clone())
500            .with_deadline(self.query_deadline());
501        executor.execute(physical_plan.operator.as_mut())
502    }
503
504    /// Executes a GraphQL query with parameters.
505    ///
506    /// # Errors
507    ///
508    /// Returns an error if the query fails to parse or execute.
509    #[cfg(feature = "graphql")]
510    pub fn execute_graphql_with_params(
511        &self,
512        query: &str,
513        params: std::collections::HashMap<String, Value>,
514    ) -> Result<QueryResult> {
515        use crate::query::processor::{QueryLanguage, QueryProcessor};
516
517        // Get transaction context for MVCC visibility
518        let (viewing_epoch, tx_id) = self.get_transaction_context();
519
520        // Create processor with transaction context
521        let processor =
522            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
523
524        // Apply transaction context if in a transaction
525        let processor = if let Some(tx_id) = tx_id {
526            processor.with_tx_context(viewing_epoch, tx_id)
527        } else {
528            processor
529        };
530
531        processor.process(query, QueryLanguage::GraphQL, Some(&params))
532    }
533
534    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if the query fails to parse or execute.
539    ///
540    /// # Examples
541    ///
542    /// ```no_run
543    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
544    /// use grafeo_engine::GrafeoDB;
545    ///
546    /// let db = GrafeoDB::new_in_memory();
547    /// let session = db.session();
548    ///
549    /// let result = session.execute_sql(
550    ///     "SELECT * FROM GRAPH_TABLE (
551    ///         MATCH (n:Person)
552    ///         COLUMNS (n.name AS name)
553    ///     )"
554    /// )?;
555    /// # Ok(())
556    /// # }
557    /// ```
558    #[cfg(feature = "sql-pgq")]
559    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
560        use crate::query::{
561            Executor, Planner, binder::Binder, cache::CacheKey, optimizer::Optimizer,
562            plan::LogicalOperator, processor::QueryLanguage, sql_pgq_translator,
563        };
564
565        // Parse and translate (always needed to check for DDL)
566        let logical_plan = sql_pgq_translator::translate(query)?;
567
568        // Handle DDL statements directly (they don't go through the query pipeline)
569        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
570            return Ok(QueryResult {
571                columns: vec!["status".into()],
572                column_types: vec![grafeo_common::types::LogicalType::String],
573                rows: vec![vec![Value::from(format!(
574                    "Property graph '{}' created ({} node tables, {} edge tables)",
575                    cpg.name,
576                    cpg.node_tables.len(),
577                    cpg.edge_tables.len()
578                ))]],
579                execution_time_ms: None,
580                rows_scanned: None,
581            });
582        }
583
584        // Create cache key for query plans
585        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
586
587        // Try to get cached optimized plan
588        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
589            cached_plan
590        } else {
591            // Semantic validation
592            let mut binder = Binder::new();
593            let _binding_context = binder.bind(&logical_plan)?;
594
595            // Optimize the plan
596            let optimizer = Optimizer::from_store(&self.store);
597            let plan = optimizer.optimize(logical_plan)?;
598
599            // Cache the optimized plan
600            self.query_cache.put_optimized(cache_key, plan.clone());
601
602            plan
603        };
604
605        // Get transaction context for MVCC visibility
606        let (viewing_epoch, tx_id) = self.get_transaction_context();
607
608        // Convert to physical plan with transaction context
609        let planner = Planner::with_context(
610            Arc::clone(&self.store),
611            Arc::clone(&self.tx_manager),
612            tx_id,
613            viewing_epoch,
614        )
615        .with_factorized_execution(self.factorized_execution);
616        let mut physical_plan = planner.plan(&optimized_plan)?;
617
618        // Execute the plan
619        let executor = Executor::with_columns(physical_plan.columns.clone())
620            .with_deadline(self.query_deadline());
621        executor.execute(physical_plan.operator.as_mut())
622    }
623
624    /// Executes a SQL/PGQ query with parameters.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the query fails to parse or execute.
629    #[cfg(feature = "sql-pgq")]
630    pub fn execute_sql_with_params(
631        &self,
632        query: &str,
633        params: std::collections::HashMap<String, Value>,
634    ) -> Result<QueryResult> {
635        use crate::query::processor::{QueryLanguage, QueryProcessor};
636
637        // Get transaction context for MVCC visibility
638        let (viewing_epoch, tx_id) = self.get_transaction_context();
639
640        // Create processor with transaction context
641        let processor =
642            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
643
644        // Apply transaction context if in a transaction
645        let processor = if let Some(tx_id) = tx_id {
646            processor.with_tx_context(viewing_epoch, tx_id)
647        } else {
648            processor
649        };
650
651        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
652    }
653
654    /// Executes a SPARQL query.
655    ///
656    /// # Errors
657    ///
658    /// Returns an error if the query fails to parse or execute.
659    #[cfg(all(feature = "sparql", feature = "rdf"))]
660    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
661        use crate::query::{
662            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
663        };
664
665        // Parse and translate the SPARQL query to a logical plan
666        let logical_plan = sparql_translator::translate(query)?;
667
668        // Optimize the plan
669        let optimizer = Optimizer::from_store(&self.store);
670        let optimized_plan = optimizer.optimize(logical_plan)?;
671
672        // Convert to physical plan using RDF planner
673        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
674        let mut physical_plan = planner.plan(&optimized_plan)?;
675
676        // Execute the plan
677        let executor = Executor::with_columns(physical_plan.columns.clone())
678            .with_deadline(self.query_deadline());
679        executor.execute(physical_plan.operator.as_mut())
680    }
681
682    /// Executes a SPARQL query with parameters.
683    ///
684    /// # Errors
685    ///
686    /// Returns an error if the query fails to parse or execute.
687    #[cfg(all(feature = "sparql", feature = "rdf"))]
688    pub fn execute_sparql_with_params(
689        &self,
690        query: &str,
691        _params: std::collections::HashMap<String, Value>,
692    ) -> Result<QueryResult> {
693        // TODO: Implement parameter substitution for SPARQL
694        // For now, just execute the query without parameters
695        self.execute_sparql(query)
696    }
697
698    /// Begins a new transaction.
699    ///
700    /// # Errors
701    ///
702    /// Returns an error if a transaction is already active.
703    ///
704    /// # Examples
705    ///
706    /// ```no_run
707    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
708    /// use grafeo_engine::GrafeoDB;
709    ///
710    /// let db = GrafeoDB::new_in_memory();
711    /// let mut session = db.session();
712    ///
713    /// session.begin_tx()?;
714    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
715    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
716    /// session.commit()?; // Both inserts committed atomically
717    /// # Ok(())
718    /// # }
719    /// ```
720    pub fn begin_tx(&mut self) -> Result<()> {
721        if self.current_tx.is_some() {
722            return Err(grafeo_common::utils::error::Error::Transaction(
723                grafeo_common::utils::error::TransactionError::InvalidState(
724                    "Transaction already active".to_string(),
725                ),
726            ));
727        }
728
729        let tx_id = self.tx_manager.begin();
730        self.current_tx = Some(tx_id);
731        Ok(())
732    }
733
734    /// Begins a transaction with a specific isolation level.
735    ///
736    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
737    ///
738    /// # Errors
739    ///
740    /// Returns an error if a transaction is already active.
741    pub fn begin_tx_with_isolation(
742        &mut self,
743        isolation_level: crate::transaction::IsolationLevel,
744    ) -> Result<()> {
745        if self.current_tx.is_some() {
746            return Err(grafeo_common::utils::error::Error::Transaction(
747                grafeo_common::utils::error::TransactionError::InvalidState(
748                    "Transaction already active".to_string(),
749                ),
750            ));
751        }
752
753        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
754        self.current_tx = Some(tx_id);
755        Ok(())
756    }
757
758    /// Commits the current transaction.
759    ///
760    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
761    ///
762    /// # Errors
763    ///
764    /// Returns an error if no transaction is active.
765    pub fn commit(&mut self) -> Result<()> {
766        let tx_id = self.current_tx.take().ok_or_else(|| {
767            grafeo_common::utils::error::Error::Transaction(
768                grafeo_common::utils::error::TransactionError::InvalidState(
769                    "No active transaction".to_string(),
770                ),
771            )
772        })?;
773
774        // Commit RDF store pending operations
775        #[cfg(feature = "rdf")]
776        self.rdf_store.commit_tx(tx_id);
777
778        self.tx_manager.commit(tx_id)?;
779
780        // Sync the LpgStore epoch with the TxManager so that
781        // convenience lookups (edge_type, get_edge, get_node) that use
782        // store.current_epoch() can see versions created at the latest epoch.
783        self.store.sync_epoch(self.tx_manager.current_epoch());
784
785        // Auto-GC: periodically prune old MVCC versions
786        if self.gc_interval > 0 {
787            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
788            if count.is_multiple_of(self.gc_interval) {
789                let min_epoch = self.tx_manager.min_active_epoch();
790                self.store.gc_versions(min_epoch);
791                self.tx_manager.gc();
792            }
793        }
794
795        Ok(())
796    }
797
798    /// Aborts the current transaction.
799    ///
800    /// Discards all changes since [`begin_tx`](Self::begin_tx).
801    ///
802    /// # Errors
803    ///
804    /// Returns an error if no transaction is active.
805    ///
806    /// # Examples
807    ///
808    /// ```no_run
809    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
810    /// use grafeo_engine::GrafeoDB;
811    ///
812    /// let db = GrafeoDB::new_in_memory();
813    /// let mut session = db.session();
814    ///
815    /// session.begin_tx()?;
816    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
817    /// session.rollback()?; // Insert is discarded
818    /// # Ok(())
819    /// # }
820    /// ```
821    pub fn rollback(&mut self) -> Result<()> {
822        let tx_id = self.current_tx.take().ok_or_else(|| {
823            grafeo_common::utils::error::Error::Transaction(
824                grafeo_common::utils::error::TransactionError::InvalidState(
825                    "No active transaction".to_string(),
826                ),
827            )
828        })?;
829
830        // Discard uncommitted versions in the LPG store
831        self.store.discard_uncommitted_versions(tx_id);
832
833        // Discard pending operations in the RDF store
834        #[cfg(feature = "rdf")]
835        self.rdf_store.rollback_tx(tx_id);
836
837        // Mark transaction as aborted in the manager
838        self.tx_manager.abort(tx_id)
839    }
840
841    /// Returns whether a transaction is active.
842    #[must_use]
843    pub fn in_transaction(&self) -> bool {
844        self.current_tx.is_some()
845    }
846
847    /// Sets auto-commit mode.
848    pub fn set_auto_commit(&mut self, auto_commit: bool) {
849        self.auto_commit = auto_commit;
850    }
851
852    /// Returns whether auto-commit is enabled.
853    #[must_use]
854    pub fn auto_commit(&self) -> bool {
855        self.auto_commit
856    }
857
858    /// Computes the wall-clock deadline for query execution.
859    #[must_use]
860    fn query_deadline(&self) -> Option<Instant> {
861        self.query_timeout.map(|d| Instant::now() + d)
862    }
863
864    /// Returns the current transaction context for MVCC visibility.
865    ///
866    /// Returns `(viewing_epoch, tx_id)` where:
867    /// - `viewing_epoch` is the epoch at which to check version visibility
868    /// - `tx_id` is the current transaction ID (if in a transaction)
869    #[must_use]
870    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
871        if let Some(tx_id) = self.current_tx {
872            // In a transaction - use the transaction's start epoch
873            let epoch = self
874                .tx_manager
875                .start_epoch(tx_id)
876                .unwrap_or_else(|| self.tx_manager.current_epoch());
877            (epoch, Some(tx_id))
878        } else {
879            // No transaction - use current epoch
880            (self.tx_manager.current_epoch(), None)
881        }
882    }
883
884    /// Creates a node directly (bypassing query execution).
885    ///
886    /// This is a low-level API for testing and direct manipulation.
887    /// If a transaction is active, the node will be versioned with the transaction ID.
888    pub fn create_node(&self, labels: &[&str]) -> NodeId {
889        let (epoch, tx_id) = self.get_transaction_context();
890        self.store
891            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
892    }
893
894    /// Creates a node with properties.
895    ///
896    /// If a transaction is active, the node will be versioned with the transaction ID.
897    pub fn create_node_with_props<'a>(
898        &self,
899        labels: &[&str],
900        properties: impl IntoIterator<Item = (&'a str, Value)>,
901    ) -> NodeId {
902        let (epoch, tx_id) = self.get_transaction_context();
903        self.store.create_node_with_props_versioned(
904            labels,
905            properties.into_iter().map(|(k, v)| (k, v)),
906            epoch,
907            tx_id.unwrap_or(TxId::SYSTEM),
908        )
909    }
910
911    /// Creates an edge between two nodes.
912    ///
913    /// This is a low-level API for testing and direct manipulation.
914    /// If a transaction is active, the edge will be versioned with the transaction ID.
915    pub fn create_edge(
916        &self,
917        src: NodeId,
918        dst: NodeId,
919        edge_type: &str,
920    ) -> grafeo_common::types::EdgeId {
921        let (epoch, tx_id) = self.get_transaction_context();
922        self.store
923            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
924    }
925
926    // =========================================================================
927    // Direct Lookup APIs (bypass query planning for O(1) point reads)
928    // =========================================================================
929
930    /// Gets a node by ID directly, bypassing query planning.
931    ///
932    /// This is the fastest way to retrieve a single node when you know its ID.
933    /// Skips parsing, binding, optimization, and physical planning entirely.
934    ///
935    /// # Performance
936    ///
937    /// - Time complexity: O(1) average case
938    /// - No lock contention (uses DashMap internally)
939    /// - ~20-30x faster than equivalent MATCH query
940    ///
941    /// # Example
942    ///
943    /// ```no_run
944    /// # use grafeo_engine::GrafeoDB;
945    /// # let db = GrafeoDB::new_in_memory();
946    /// let session = db.session();
947    /// let node_id = session.create_node(&["Person"]);
948    ///
949    /// // Direct lookup - O(1), no query planning
950    /// let node = session.get_node(node_id);
951    /// assert!(node.is_some());
952    /// ```
953    #[must_use]
954    pub fn get_node(&self, id: NodeId) -> Option<Node> {
955        let (epoch, tx_id) = self.get_transaction_context();
956        self.store
957            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
958    }
959
960    /// Gets a single property from a node by ID, bypassing query planning.
961    ///
962    /// More efficient than `get_node()` when you only need one property,
963    /// as it avoids loading the full node with all properties.
964    ///
965    /// # Performance
966    ///
967    /// - Time complexity: O(1) average case
968    /// - No query planning overhead
969    ///
970    /// # Example
971    ///
972    /// ```no_run
973    /// # use grafeo_engine::GrafeoDB;
974    /// # use grafeo_common::types::Value;
975    /// # let db = GrafeoDB::new_in_memory();
976    /// let session = db.session();
977    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
978    ///
979    /// // Direct property access - O(1)
980    /// let name = session.get_node_property(id, "name");
981    /// assert_eq!(name, Some(Value::String("Alice".into())));
982    /// ```
983    #[must_use]
984    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
985        self.get_node(id)
986            .and_then(|node| node.get_property(key).cloned())
987    }
988
989    /// Gets an edge by ID directly, bypassing query planning.
990    ///
991    /// # Performance
992    ///
993    /// - Time complexity: O(1) average case
994    /// - No lock contention
995    #[must_use]
996    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
997        let (epoch, tx_id) = self.get_transaction_context();
998        self.store
999            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
1000    }
1001
1002    /// Gets outgoing neighbors of a node directly, bypassing query planning.
1003    ///
1004    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
1005    ///
1006    /// # Performance
1007    ///
1008    /// - Time complexity: O(degree) where degree is the number of outgoing edges
1009    /// - Uses adjacency index for direct access
1010    /// - ~10-20x faster than equivalent MATCH query
1011    ///
1012    /// # Example
1013    ///
1014    /// ```no_run
1015    /// # use grafeo_engine::GrafeoDB;
1016    /// # let db = GrafeoDB::new_in_memory();
1017    /// let session = db.session();
1018    /// let alice = session.create_node(&["Person"]);
1019    /// let bob = session.create_node(&["Person"]);
1020    /// session.create_edge(alice, bob, "KNOWS");
1021    ///
1022    /// // Direct neighbor lookup - O(degree)
1023    /// let neighbors = session.get_neighbors_outgoing(alice);
1024    /// assert_eq!(neighbors.len(), 1);
1025    /// assert_eq!(neighbors[0].0, bob);
1026    /// ```
1027    #[must_use]
1028    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1029        self.store.edges_from(node, Direction::Outgoing).collect()
1030    }
1031
1032    /// Gets incoming neighbors of a node directly, bypassing query planning.
1033    ///
1034    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
1035    ///
1036    /// # Performance
1037    ///
1038    /// - Time complexity: O(degree) where degree is the number of incoming edges
1039    /// - Uses backward adjacency index for direct access
1040    #[must_use]
1041    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1042        self.store.edges_from(node, Direction::Incoming).collect()
1043    }
1044
1045    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
1046    ///
1047    /// # Example
1048    ///
1049    /// ```no_run
1050    /// # use grafeo_engine::GrafeoDB;
1051    /// # let db = GrafeoDB::new_in_memory();
1052    /// # let session = db.session();
1053    /// # let alice = session.create_node(&["Person"]);
1054    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1055    /// ```
1056    #[must_use]
1057    pub fn get_neighbors_outgoing_by_type(
1058        &self,
1059        node: NodeId,
1060        edge_type: &str,
1061    ) -> Vec<(NodeId, EdgeId)> {
1062        self.store
1063            .edges_from(node, Direction::Outgoing)
1064            .filter(|(_, edge_id)| {
1065                self.get_edge(*edge_id)
1066                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
1067            })
1068            .collect()
1069    }
1070
1071    /// Checks if a node exists, bypassing query planning.
1072    ///
1073    /// # Performance
1074    ///
1075    /// - Time complexity: O(1)
1076    /// - Fastest existence check available
1077    #[must_use]
1078    pub fn node_exists(&self, id: NodeId) -> bool {
1079        self.get_node(id).is_some()
1080    }
1081
1082    /// Checks if an edge exists, bypassing query planning.
1083    #[must_use]
1084    pub fn edge_exists(&self, id: EdgeId) -> bool {
1085        self.get_edge(id).is_some()
1086    }
1087
1088    /// Gets the degree (number of edges) of a node.
1089    ///
1090    /// Returns (outgoing_degree, incoming_degree).
1091    #[must_use]
1092    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
1093        let out = self.store.out_degree(node);
1094        let in_degree = self.store.in_degree(node);
1095        (out, in_degree)
1096    }
1097
1098    /// Batch lookup of multiple nodes by ID.
1099    ///
1100    /// More efficient than calling `get_node()` in a loop because it
1101    /// amortizes overhead.
1102    ///
1103    /// # Performance
1104    ///
1105    /// - Time complexity: O(n) where n is the number of IDs
1106    /// - Better cache utilization than individual lookups
1107    #[must_use]
1108    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
1109        let (epoch, tx_id) = self.get_transaction_context();
1110        let tx = tx_id.unwrap_or(TxId::SYSTEM);
1111        ids.iter()
1112            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
1113            .collect()
1114    }
1115
1116    // ── Change Data Capture ─────────────────────────────────────────────
1117
1118    /// Returns the full change history for an entity (node or edge).
1119    #[cfg(feature = "cdc")]
1120    pub fn history(
1121        &self,
1122        entity_id: impl Into<crate::cdc::EntityId>,
1123    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1124        Ok(self.cdc_log.history(entity_id.into()))
1125    }
1126
1127    /// Returns change events for an entity since the given epoch.
1128    #[cfg(feature = "cdc")]
1129    pub fn history_since(
1130        &self,
1131        entity_id: impl Into<crate::cdc::EntityId>,
1132        since_epoch: EpochId,
1133    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1134        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
1135    }
1136
1137    /// Returns all change events across all entities in an epoch range.
1138    #[cfg(feature = "cdc")]
1139    pub fn changes_between(
1140        &self,
1141        start_epoch: EpochId,
1142        end_epoch: EpochId,
1143    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
1144        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
1145    }
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150    use crate::database::GrafeoDB;
1151
1152    #[test]
1153    fn test_session_create_node() {
1154        let db = GrafeoDB::new_in_memory();
1155        let session = db.session();
1156
1157        let id = session.create_node(&["Person"]);
1158        assert!(id.is_valid());
1159        assert_eq!(db.node_count(), 1);
1160    }
1161
1162    #[test]
1163    fn test_session_transaction() {
1164        let db = GrafeoDB::new_in_memory();
1165        let mut session = db.session();
1166
1167        assert!(!session.in_transaction());
1168
1169        session.begin_tx().unwrap();
1170        assert!(session.in_transaction());
1171
1172        session.commit().unwrap();
1173        assert!(!session.in_transaction());
1174    }
1175
1176    #[test]
1177    fn test_session_transaction_context() {
1178        let db = GrafeoDB::new_in_memory();
1179        let mut session = db.session();
1180
1181        // Without transaction - context should have current epoch and no tx_id
1182        let (_epoch1, tx_id1) = session.get_transaction_context();
1183        assert!(tx_id1.is_none());
1184
1185        // Start a transaction
1186        session.begin_tx().unwrap();
1187        let (epoch2, tx_id2) = session.get_transaction_context();
1188        assert!(tx_id2.is_some());
1189        // Transaction should have a valid epoch
1190        let _ = epoch2; // Use the variable
1191
1192        // Commit and verify
1193        session.commit().unwrap();
1194        let (epoch3, tx_id3) = session.get_transaction_context();
1195        assert!(tx_id3.is_none());
1196        // Epoch should have advanced after commit
1197        assert!(epoch3.as_u64() >= epoch2.as_u64());
1198    }
1199
1200    #[test]
1201    fn test_session_rollback() {
1202        let db = GrafeoDB::new_in_memory();
1203        let mut session = db.session();
1204
1205        session.begin_tx().unwrap();
1206        session.rollback().unwrap();
1207        assert!(!session.in_transaction());
1208    }
1209
1210    #[test]
1211    fn test_session_rollback_discards_versions() {
1212        use grafeo_common::types::TxId;
1213
1214        let db = GrafeoDB::new_in_memory();
1215
1216        // Create a node outside of any transaction (at system level)
1217        let node_before = db.store().create_node(&["Person"]);
1218        assert!(node_before.is_valid());
1219        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1220
1221        // Start a transaction
1222        let mut session = db.session();
1223        session.begin_tx().unwrap();
1224        let tx_id = session.current_tx.unwrap();
1225
1226        // Create a node versioned with the transaction's ID
1227        let epoch = db.store().current_epoch();
1228        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1229        assert!(node_in_tx.is_valid());
1230
1231        // Should see 2 nodes at this point
1232        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1233
1234        // Rollback the transaction
1235        session.rollback().unwrap();
1236        assert!(!session.in_transaction());
1237
1238        // The node created in the transaction should be discarded
1239        // Only the first node should remain visible
1240        let count_after = db.node_count();
1241        assert_eq!(
1242            count_after, 1,
1243            "Rollback should discard uncommitted node, but got {count_after}"
1244        );
1245
1246        // The original node should still be accessible
1247        let current_epoch = db.store().current_epoch();
1248        assert!(
1249            db.store()
1250                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1251                .is_some(),
1252            "Original node should still exist"
1253        );
1254
1255        // The node created in the transaction should not be accessible
1256        assert!(
1257            db.store()
1258                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1259                .is_none(),
1260            "Transaction node should be gone"
1261        );
1262    }
1263
1264    #[test]
1265    fn test_session_create_node_in_transaction() {
1266        // Test that session.create_node() is transaction-aware
1267        let db = GrafeoDB::new_in_memory();
1268
1269        // Create a node outside of any transaction
1270        let node_before = db.create_node(&["Person"]);
1271        assert!(node_before.is_valid());
1272        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1273
1274        // Start a transaction and create a node through the session
1275        let mut session = db.session();
1276        session.begin_tx().unwrap();
1277
1278        // Create a node through session.create_node() - should be versioned with tx
1279        let node_in_tx = session.create_node(&["Person"]);
1280        assert!(node_in_tx.is_valid());
1281
1282        // Should see 2 nodes at this point
1283        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1284
1285        // Rollback the transaction
1286        session.rollback().unwrap();
1287
1288        // The node created via session.create_node() should be discarded
1289        let count_after = db.node_count();
1290        assert_eq!(
1291            count_after, 1,
1292            "Rollback should discard node created via session.create_node(), but got {count_after}"
1293        );
1294    }
1295
1296    #[test]
1297    fn test_session_create_node_with_props_in_transaction() {
1298        use grafeo_common::types::Value;
1299
1300        // Test that session.create_node_with_props() is transaction-aware
1301        let db = GrafeoDB::new_in_memory();
1302
1303        // Create a node outside of any transaction
1304        db.create_node(&["Person"]);
1305        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1306
1307        // Start a transaction and create a node with properties
1308        let mut session = db.session();
1309        session.begin_tx().unwrap();
1310
1311        let node_in_tx =
1312            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1313        assert!(node_in_tx.is_valid());
1314
1315        // Should see 2 nodes
1316        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1317
1318        // Rollback the transaction
1319        session.rollback().unwrap();
1320
1321        // The node should be discarded
1322        let count_after = db.node_count();
1323        assert_eq!(
1324            count_after, 1,
1325            "Rollback should discard node created via session.create_node_with_props()"
1326        );
1327    }
1328
1329    #[cfg(feature = "gql")]
1330    mod gql_tests {
1331        use super::*;
1332
1333        #[test]
1334        fn test_gql_query_execution() {
1335            let db = GrafeoDB::new_in_memory();
1336            let session = db.session();
1337
1338            // Create some test data
1339            session.create_node(&["Person"]);
1340            session.create_node(&["Person"]);
1341            session.create_node(&["Animal"]);
1342
1343            // Execute a GQL query
1344            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1345
1346            // Should return 2 Person nodes
1347            assert_eq!(result.row_count(), 2);
1348            assert_eq!(result.column_count(), 1);
1349            assert_eq!(result.columns[0], "n");
1350        }
1351
1352        #[test]
1353        fn test_gql_empty_result() {
1354            let db = GrafeoDB::new_in_memory();
1355            let session = db.session();
1356
1357            // No data in database
1358            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1359
1360            assert_eq!(result.row_count(), 0);
1361        }
1362
1363        #[test]
1364        fn test_gql_parse_error() {
1365            let db = GrafeoDB::new_in_memory();
1366            let session = db.session();
1367
1368            // Invalid GQL syntax
1369            let result = session.execute("MATCH (n RETURN n");
1370
1371            assert!(result.is_err());
1372        }
1373
1374        #[test]
1375        fn test_gql_relationship_traversal() {
1376            let db = GrafeoDB::new_in_memory();
1377            let session = db.session();
1378
1379            // Create a graph: Alice -> Bob, Alice -> Charlie
1380            let alice = session.create_node(&["Person"]);
1381            let bob = session.create_node(&["Person"]);
1382            let charlie = session.create_node(&["Person"]);
1383
1384            session.create_edge(alice, bob, "KNOWS");
1385            session.create_edge(alice, charlie, "KNOWS");
1386
1387            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1388            let result = session
1389                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1390                .unwrap();
1391
1392            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1393            assert_eq!(result.row_count(), 2);
1394            assert_eq!(result.column_count(), 2);
1395            assert_eq!(result.columns[0], "a");
1396            assert_eq!(result.columns[1], "b");
1397        }
1398
1399        #[test]
1400        fn test_gql_relationship_with_type_filter() {
1401            let db = GrafeoDB::new_in_memory();
1402            let session = db.session();
1403
1404            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1405            let alice = session.create_node(&["Person"]);
1406            let bob = session.create_node(&["Person"]);
1407            let charlie = session.create_node(&["Person"]);
1408
1409            session.create_edge(alice, bob, "KNOWS");
1410            session.create_edge(alice, charlie, "WORKS_WITH");
1411
1412            // Query only KNOWS relationships
1413            let result = session
1414                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1415                .unwrap();
1416
1417            // Should return only 1 row (Alice->Bob)
1418            assert_eq!(result.row_count(), 1);
1419        }
1420
1421        #[test]
1422        fn test_gql_semantic_error_undefined_variable() {
1423            let db = GrafeoDB::new_in_memory();
1424            let session = db.session();
1425
1426            // Reference undefined variable 'x' in RETURN
1427            let result = session.execute("MATCH (n:Person) RETURN x");
1428
1429            // Should fail with semantic error
1430            assert!(result.is_err());
1431            let Err(err) = result else {
1432                panic!("Expected error")
1433            };
1434            assert!(
1435                err.to_string().contains("Undefined variable"),
1436                "Expected undefined variable error, got: {}",
1437                err
1438            );
1439        }
1440
1441        #[test]
1442        fn test_gql_where_clause_property_filter() {
1443            use grafeo_common::types::Value;
1444
1445            let db = GrafeoDB::new_in_memory();
1446            let session = db.session();
1447
1448            // Create people with ages
1449            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1450            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1451            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1452
1453            // Query with WHERE clause: age > 30
1454            let result = session
1455                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1456                .unwrap();
1457
1458            // Should return 2 people (ages 35 and 45)
1459            assert_eq!(result.row_count(), 2);
1460        }
1461
1462        #[test]
1463        fn test_gql_where_clause_equality() {
1464            use grafeo_common::types::Value;
1465
1466            let db = GrafeoDB::new_in_memory();
1467            let session = db.session();
1468
1469            // Create people with names
1470            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1471            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1472            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1473
1474            // Query with WHERE clause: name = "Alice"
1475            let result = session
1476                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1477                .unwrap();
1478
1479            // Should return 2 people named Alice
1480            assert_eq!(result.row_count(), 2);
1481        }
1482
1483        #[test]
1484        fn test_gql_return_property_access() {
1485            use grafeo_common::types::Value;
1486
1487            let db = GrafeoDB::new_in_memory();
1488            let session = db.session();
1489
1490            // Create people with names and ages
1491            session.create_node_with_props(
1492                &["Person"],
1493                [
1494                    ("name", Value::String("Alice".into())),
1495                    ("age", Value::Int64(30)),
1496                ],
1497            );
1498            session.create_node_with_props(
1499                &["Person"],
1500                [
1501                    ("name", Value::String("Bob".into())),
1502                    ("age", Value::Int64(25)),
1503                ],
1504            );
1505
1506            // Query returning properties
1507            let result = session
1508                .execute("MATCH (n:Person) RETURN n.name, n.age")
1509                .unwrap();
1510
1511            // Should return 2 rows with name and age columns
1512            assert_eq!(result.row_count(), 2);
1513            assert_eq!(result.column_count(), 2);
1514            assert_eq!(result.columns[0], "n.name");
1515            assert_eq!(result.columns[1], "n.age");
1516
1517            // Check that we get actual values
1518            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1519            assert!(names.contains(&&Value::String("Alice".into())));
1520            assert!(names.contains(&&Value::String("Bob".into())));
1521        }
1522
1523        #[test]
1524        fn test_gql_return_mixed_expressions() {
1525            use grafeo_common::types::Value;
1526
1527            let db = GrafeoDB::new_in_memory();
1528            let session = db.session();
1529
1530            // Create a person
1531            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1532
1533            // Query returning both node and property
1534            let result = session
1535                .execute("MATCH (n:Person) RETURN n, n.name")
1536                .unwrap();
1537
1538            assert_eq!(result.row_count(), 1);
1539            assert_eq!(result.column_count(), 2);
1540            assert_eq!(result.columns[0], "n");
1541            assert_eq!(result.columns[1], "n.name");
1542
1543            // Second column should be the name
1544            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1545        }
1546    }
1547
1548    #[cfg(feature = "cypher")]
1549    mod cypher_tests {
1550        use super::*;
1551
1552        #[test]
1553        fn test_cypher_query_execution() {
1554            let db = GrafeoDB::new_in_memory();
1555            let session = db.session();
1556
1557            // Create some test data
1558            session.create_node(&["Person"]);
1559            session.create_node(&["Person"]);
1560            session.create_node(&["Animal"]);
1561
1562            // Execute a Cypher query
1563            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1564
1565            // Should return 2 Person nodes
1566            assert_eq!(result.row_count(), 2);
1567            assert_eq!(result.column_count(), 1);
1568            assert_eq!(result.columns[0], "n");
1569        }
1570
1571        #[test]
1572        fn test_cypher_empty_result() {
1573            let db = GrafeoDB::new_in_memory();
1574            let session = db.session();
1575
1576            // No data in database
1577            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1578
1579            assert_eq!(result.row_count(), 0);
1580        }
1581
1582        #[test]
1583        fn test_cypher_parse_error() {
1584            let db = GrafeoDB::new_in_memory();
1585            let session = db.session();
1586
1587            // Invalid Cypher syntax
1588            let result = session.execute_cypher("MATCH (n RETURN n");
1589
1590            assert!(result.is_err());
1591        }
1592    }
1593
1594    // ==================== Direct Lookup API Tests ====================
1595
1596    mod direct_lookup_tests {
1597        use super::*;
1598        use grafeo_common::types::Value;
1599
1600        #[test]
1601        fn test_get_node() {
1602            let db = GrafeoDB::new_in_memory();
1603            let session = db.session();
1604
1605            let id = session.create_node(&["Person"]);
1606            let node = session.get_node(id);
1607
1608            assert!(node.is_some());
1609            let node = node.unwrap();
1610            assert_eq!(node.id, id);
1611        }
1612
1613        #[test]
1614        fn test_get_node_not_found() {
1615            use grafeo_common::types::NodeId;
1616
1617            let db = GrafeoDB::new_in_memory();
1618            let session = db.session();
1619
1620            // Try to get a non-existent node
1621            let node = session.get_node(NodeId::new(9999));
1622            assert!(node.is_none());
1623        }
1624
1625        #[test]
1626        fn test_get_node_property() {
1627            let db = GrafeoDB::new_in_memory();
1628            let session = db.session();
1629
1630            let id = session
1631                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1632
1633            let name = session.get_node_property(id, "name");
1634            assert_eq!(name, Some(Value::String("Alice".into())));
1635
1636            // Non-existent property
1637            let missing = session.get_node_property(id, "missing");
1638            assert!(missing.is_none());
1639        }
1640
1641        #[test]
1642        fn test_get_edge() {
1643            let db = GrafeoDB::new_in_memory();
1644            let session = db.session();
1645
1646            let alice = session.create_node(&["Person"]);
1647            let bob = session.create_node(&["Person"]);
1648            let edge_id = session.create_edge(alice, bob, "KNOWS");
1649
1650            let edge = session.get_edge(edge_id);
1651            assert!(edge.is_some());
1652            let edge = edge.unwrap();
1653            assert_eq!(edge.id, edge_id);
1654            assert_eq!(edge.src, alice);
1655            assert_eq!(edge.dst, bob);
1656        }
1657
1658        #[test]
1659        fn test_get_edge_not_found() {
1660            use grafeo_common::types::EdgeId;
1661
1662            let db = GrafeoDB::new_in_memory();
1663            let session = db.session();
1664
1665            let edge = session.get_edge(EdgeId::new(9999));
1666            assert!(edge.is_none());
1667        }
1668
1669        #[test]
1670        fn test_get_neighbors_outgoing() {
1671            let db = GrafeoDB::new_in_memory();
1672            let session = db.session();
1673
1674            let alice = session.create_node(&["Person"]);
1675            let bob = session.create_node(&["Person"]);
1676            let carol = session.create_node(&["Person"]);
1677
1678            session.create_edge(alice, bob, "KNOWS");
1679            session.create_edge(alice, carol, "KNOWS");
1680
1681            let neighbors = session.get_neighbors_outgoing(alice);
1682            assert_eq!(neighbors.len(), 2);
1683
1684            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1685            assert!(neighbor_ids.contains(&bob));
1686            assert!(neighbor_ids.contains(&carol));
1687        }
1688
1689        #[test]
1690        fn test_get_neighbors_incoming() {
1691            let db = GrafeoDB::new_in_memory();
1692            let session = db.session();
1693
1694            let alice = session.create_node(&["Person"]);
1695            let bob = session.create_node(&["Person"]);
1696            let carol = session.create_node(&["Person"]);
1697
1698            session.create_edge(bob, alice, "KNOWS");
1699            session.create_edge(carol, alice, "KNOWS");
1700
1701            let neighbors = session.get_neighbors_incoming(alice);
1702            assert_eq!(neighbors.len(), 2);
1703
1704            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1705            assert!(neighbor_ids.contains(&bob));
1706            assert!(neighbor_ids.contains(&carol));
1707        }
1708
1709        #[test]
1710        fn test_get_neighbors_outgoing_by_type() {
1711            let db = GrafeoDB::new_in_memory();
1712            let session = db.session();
1713
1714            let alice = session.create_node(&["Person"]);
1715            let bob = session.create_node(&["Person"]);
1716            let company = session.create_node(&["Company"]);
1717
1718            session.create_edge(alice, bob, "KNOWS");
1719            session.create_edge(alice, company, "WORKS_AT");
1720
1721            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1722            assert_eq!(knows_neighbors.len(), 1);
1723            assert_eq!(knows_neighbors[0].0, bob);
1724
1725            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1726            assert_eq!(works_neighbors.len(), 1);
1727            assert_eq!(works_neighbors[0].0, company);
1728
1729            // No edges of this type
1730            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1731            assert!(no_neighbors.is_empty());
1732        }
1733
1734        #[test]
1735        fn test_node_exists() {
1736            use grafeo_common::types::NodeId;
1737
1738            let db = GrafeoDB::new_in_memory();
1739            let session = db.session();
1740
1741            let id = session.create_node(&["Person"]);
1742
1743            assert!(session.node_exists(id));
1744            assert!(!session.node_exists(NodeId::new(9999)));
1745        }
1746
1747        #[test]
1748        fn test_edge_exists() {
1749            use grafeo_common::types::EdgeId;
1750
1751            let db = GrafeoDB::new_in_memory();
1752            let session = db.session();
1753
1754            let alice = session.create_node(&["Person"]);
1755            let bob = session.create_node(&["Person"]);
1756            let edge_id = session.create_edge(alice, bob, "KNOWS");
1757
1758            assert!(session.edge_exists(edge_id));
1759            assert!(!session.edge_exists(EdgeId::new(9999)));
1760        }
1761
1762        #[test]
1763        fn test_get_degree() {
1764            let db = GrafeoDB::new_in_memory();
1765            let session = db.session();
1766
1767            let alice = session.create_node(&["Person"]);
1768            let bob = session.create_node(&["Person"]);
1769            let carol = session.create_node(&["Person"]);
1770
1771            // Alice knows Bob and Carol (2 outgoing)
1772            session.create_edge(alice, bob, "KNOWS");
1773            session.create_edge(alice, carol, "KNOWS");
1774            // Bob knows Alice (1 incoming for Alice)
1775            session.create_edge(bob, alice, "KNOWS");
1776
1777            let (out_degree, in_degree) = session.get_degree(alice);
1778            assert_eq!(out_degree, 2);
1779            assert_eq!(in_degree, 1);
1780
1781            // Node with no edges
1782            let lonely = session.create_node(&["Person"]);
1783            let (out, in_deg) = session.get_degree(lonely);
1784            assert_eq!(out, 0);
1785            assert_eq!(in_deg, 0);
1786        }
1787
1788        #[test]
1789        fn test_get_nodes_batch() {
1790            let db = GrafeoDB::new_in_memory();
1791            let session = db.session();
1792
1793            let alice = session.create_node(&["Person"]);
1794            let bob = session.create_node(&["Person"]);
1795            let carol = session.create_node(&["Person"]);
1796
1797            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1798            assert_eq!(nodes.len(), 3);
1799            assert!(nodes[0].is_some());
1800            assert!(nodes[1].is_some());
1801            assert!(nodes[2].is_some());
1802
1803            // With non-existent node
1804            use grafeo_common::types::NodeId;
1805            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1806            assert_eq!(nodes_with_missing.len(), 3);
1807            assert!(nodes_with_missing[0].is_some());
1808            assert!(nodes_with_missing[1].is_none()); // Missing node
1809            assert!(nodes_with_missing[2].is_some());
1810        }
1811
1812        #[test]
1813        fn test_auto_commit_setting() {
1814            let db = GrafeoDB::new_in_memory();
1815            let mut session = db.session();
1816
1817            // Default is auto-commit enabled
1818            assert!(session.auto_commit());
1819
1820            session.set_auto_commit(false);
1821            assert!(!session.auto_commit());
1822
1823            session.set_auto_commit(true);
1824            assert!(session.auto_commit());
1825        }
1826
1827        #[test]
1828        fn test_transaction_double_begin_error() {
1829            let db = GrafeoDB::new_in_memory();
1830            let mut session = db.session();
1831
1832            session.begin_tx().unwrap();
1833            let result = session.begin_tx();
1834
1835            assert!(result.is_err());
1836            // Clean up
1837            session.rollback().unwrap();
1838        }
1839
1840        #[test]
1841        fn test_commit_without_transaction_error() {
1842            let db = GrafeoDB::new_in_memory();
1843            let mut session = db.session();
1844
1845            let result = session.commit();
1846            assert!(result.is_err());
1847        }
1848
1849        #[test]
1850        fn test_rollback_without_transaction_error() {
1851            let db = GrafeoDB::new_in_memory();
1852            let mut session = db.session();
1853
1854            let result = session.rollback();
1855            assert!(result.is_err());
1856        }
1857
1858        #[test]
1859        fn test_create_edge_in_transaction() {
1860            let db = GrafeoDB::new_in_memory();
1861            let mut session = db.session();
1862
1863            // Create nodes outside transaction
1864            let alice = session.create_node(&["Person"]);
1865            let bob = session.create_node(&["Person"]);
1866
1867            // Create edge in transaction
1868            session.begin_tx().unwrap();
1869            let edge_id = session.create_edge(alice, bob, "KNOWS");
1870
1871            // Edge should be visible in the transaction
1872            assert!(session.edge_exists(edge_id));
1873
1874            // Commit
1875            session.commit().unwrap();
1876
1877            // Edge should still be visible
1878            assert!(session.edge_exists(edge_id));
1879        }
1880
1881        #[test]
1882        fn test_neighbors_empty_node() {
1883            let db = GrafeoDB::new_in_memory();
1884            let session = db.session();
1885
1886            let lonely = session.create_node(&["Person"]);
1887
1888            assert!(session.get_neighbors_outgoing(lonely).is_empty());
1889            assert!(session.get_neighbors_incoming(lonely).is_empty());
1890            assert!(
1891                session
1892                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1893                    .is_empty()
1894            );
1895        }
1896    }
1897}