Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8
9use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
10use grafeo_common::utils::error::Result;
11use grafeo_core::graph::Direction;
12use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
13#[cfg(feature = "rdf")]
14use grafeo_core::graph::rdf::RdfStore;
15
16use crate::config::AdaptiveConfig;
17use crate::database::QueryResult;
18use crate::query::cache::QueryCache;
19use crate::transaction::TransactionManager;
20
21/// Your handle to the database - execute queries and manage transactions.
22///
23/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
24/// tracks its own transaction state, so you can have multiple concurrent
25/// sessions without them interfering.
26pub struct Session {
27    /// The underlying store.
28    store: Arc<LpgStore>,
29    /// RDF triple store (if RDF feature is enabled).
30    #[cfg(feature = "rdf")]
31    #[allow(dead_code)]
32    rdf_store: Arc<RdfStore>,
33    /// Transaction manager.
34    tx_manager: Arc<TransactionManager>,
35    /// Query cache shared across sessions.
36    query_cache: Arc<QueryCache>,
37    /// Current transaction ID (if any).
38    current_tx: Option<TxId>,
39    /// Whether the session is in auto-commit mode.
40    auto_commit: bool,
41    /// Adaptive execution configuration.
42    #[allow(dead_code)]
43    adaptive_config: AdaptiveConfig,
44    /// Whether to use factorized execution for multi-hop queries.
45    factorized_execution: bool,
46}
47
48impl Session {
49    /// Creates a new session.
50    #[allow(dead_code)]
51    pub(crate) fn new(
52        store: Arc<LpgStore>,
53        tx_manager: Arc<TransactionManager>,
54        query_cache: Arc<QueryCache>,
55    ) -> Self {
56        Self {
57            store,
58            #[cfg(feature = "rdf")]
59            rdf_store: Arc::new(RdfStore::new()),
60            tx_manager,
61            query_cache,
62            current_tx: None,
63            auto_commit: true,
64            adaptive_config: AdaptiveConfig::default(),
65            factorized_execution: true,
66        }
67    }
68
69    /// Creates a new session with adaptive execution configuration.
70    #[allow(dead_code)]
71    pub(crate) fn with_adaptive(
72        store: Arc<LpgStore>,
73        tx_manager: Arc<TransactionManager>,
74        query_cache: Arc<QueryCache>,
75        adaptive_config: AdaptiveConfig,
76        factorized_execution: bool,
77    ) -> Self {
78        Self {
79            store,
80            #[cfg(feature = "rdf")]
81            rdf_store: Arc::new(RdfStore::new()),
82            tx_manager,
83            query_cache,
84            current_tx: None,
85            auto_commit: true,
86            adaptive_config,
87            factorized_execution,
88        }
89    }
90
91    /// Creates a new session with RDF store and adaptive configuration.
92    #[cfg(feature = "rdf")]
93    pub(crate) fn with_rdf_store_and_adaptive(
94        store: Arc<LpgStore>,
95        rdf_store: Arc<RdfStore>,
96        tx_manager: Arc<TransactionManager>,
97        query_cache: Arc<QueryCache>,
98        adaptive_config: AdaptiveConfig,
99        factorized_execution: bool,
100    ) -> Self {
101        Self {
102            store,
103            rdf_store,
104            tx_manager,
105            query_cache,
106            current_tx: None,
107            auto_commit: true,
108            adaptive_config,
109            factorized_execution,
110        }
111    }
112
113    /// Executes a GQL query.
114    ///
115    /// # Errors
116    ///
117    /// Returns an error if the query fails to parse or execute.
118    ///
119    /// # Examples
120    ///
121    /// ```ignore
122    /// use grafeo_engine::GrafeoDB;
123    ///
124    /// let db = GrafeoDB::new_in_memory();
125    /// let session = db.session();
126    ///
127    /// // Create a node
128    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
129    ///
130    /// // Query nodes
131    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
132    /// for row in result {
133    ///     println!("{:?}", row);
134    /// }
135    /// ```
136    #[cfg(feature = "gql")]
137    pub fn execute(&self, query: &str) -> Result<QueryResult> {
138        use crate::query::{
139            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
140            optimizer::Optimizer, processor::QueryLanguage,
141        };
142
143        let start_time = std::time::Instant::now();
144
145        // Create cache key for this query
146        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
147
148        // Try to get cached optimized plan
149        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
150            // Cache hit - skip parsing, translation, binding, and optimization
151            cached_plan
152        } else {
153            // Cache miss - run full pipeline
154
155            // Parse and translate the query to a logical plan
156            let logical_plan = gql_translator::translate(query)?;
157
158            // Semantic validation
159            let mut binder = Binder::new();
160            let _binding_context = binder.bind(&logical_plan)?;
161
162            // Optimize the plan
163            let optimizer = Optimizer::from_store(&self.store);
164            let plan = optimizer.optimize(logical_plan)?;
165
166            // Cache the optimized plan for future use
167            self.query_cache.put_optimized(cache_key, plan.clone());
168
169            plan
170        };
171
172        // Get transaction context for MVCC visibility
173        let (viewing_epoch, tx_id) = self.get_transaction_context();
174
175        // Convert to physical plan with transaction context
176        // (Physical planning cannot be cached as it depends on transaction state)
177        let planner = Planner::with_context(
178            Arc::clone(&self.store),
179            Arc::clone(&self.tx_manager),
180            tx_id,
181            viewing_epoch,
182        )
183        .with_factorized_execution(self.factorized_execution);
184        let mut physical_plan = planner.plan(&optimized_plan)?;
185
186        // Execute the plan
187        let executor = Executor::with_columns(physical_plan.columns.clone());
188        let mut result = executor.execute(physical_plan.operator.as_mut())?;
189
190        // Add execution metrics
191        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
192        let rows_scanned = result.rows.len() as u64;
193        result.execution_time_ms = Some(elapsed_ms);
194        result.rows_scanned = Some(rows_scanned);
195
196        Ok(result)
197    }
198
199    /// Executes a GQL query with parameters.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if the query fails to parse or execute.
204    #[cfg(feature = "gql")]
205    pub fn execute_with_params(
206        &self,
207        query: &str,
208        params: std::collections::HashMap<String, Value>,
209    ) -> Result<QueryResult> {
210        use crate::query::processor::{QueryLanguage, QueryProcessor};
211
212        // Get transaction context for MVCC visibility
213        let (viewing_epoch, tx_id) = self.get_transaction_context();
214
215        // Create processor with transaction context
216        let processor =
217            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
218
219        // Apply transaction context if in a transaction
220        let processor = if let Some(tx_id) = tx_id {
221            processor.with_tx_context(viewing_epoch, tx_id)
222        } else {
223            processor
224        };
225
226        processor.process(query, QueryLanguage::Gql, Some(&params))
227    }
228
229    /// Executes a GQL query with parameters.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if no query language is enabled.
234    #[cfg(not(any(feature = "gql", feature = "cypher")))]
235    pub fn execute_with_params(
236        &self,
237        _query: &str,
238        _params: std::collections::HashMap<String, Value>,
239    ) -> Result<QueryResult> {
240        Err(grafeo_common::utils::error::Error::Internal(
241            "No query language enabled".to_string(),
242        ))
243    }
244
245    /// Executes a GQL query.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if no query language is enabled.
250    #[cfg(not(any(feature = "gql", feature = "cypher")))]
251    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
252        Err(grafeo_common::utils::error::Error::Internal(
253            "No query language enabled".to_string(),
254        ))
255    }
256
257    /// Executes a Cypher query.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the query fails to parse or execute.
262    #[cfg(feature = "cypher")]
263    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
264        use crate::query::{
265            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
266            optimizer::Optimizer, processor::QueryLanguage,
267        };
268
269        // Create cache key for this query
270        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
271
272        // Try to get cached optimized plan
273        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
274            cached_plan
275        } else {
276            // Parse and translate the query to a logical plan
277            let logical_plan = cypher_translator::translate(query)?;
278
279            // Semantic validation
280            let mut binder = Binder::new();
281            let _binding_context = binder.bind(&logical_plan)?;
282
283            // Optimize the plan
284            let optimizer = Optimizer::from_store(&self.store);
285            let plan = optimizer.optimize(logical_plan)?;
286
287            // Cache the optimized plan
288            self.query_cache.put_optimized(cache_key, plan.clone());
289
290            plan
291        };
292
293        // Get transaction context for MVCC visibility
294        let (viewing_epoch, tx_id) = self.get_transaction_context();
295
296        // Convert to physical plan with transaction context
297        let planner = Planner::with_context(
298            Arc::clone(&self.store),
299            Arc::clone(&self.tx_manager),
300            tx_id,
301            viewing_epoch,
302        )
303        .with_factorized_execution(self.factorized_execution);
304        let mut physical_plan = planner.plan(&optimized_plan)?;
305
306        // Execute the plan
307        let executor = Executor::with_columns(physical_plan.columns.clone());
308        executor.execute(physical_plan.operator.as_mut())
309    }
310
311    /// Executes a Gremlin query.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the query fails to parse or execute.
316    ///
317    /// # Examples
318    ///
319    /// ```ignore
320    /// use grafeo_engine::GrafeoDB;
321    ///
322    /// let db = GrafeoDB::new_in_memory();
323    /// let session = db.session();
324    ///
325    /// // Create some nodes first
326    /// session.create_node(&["Person"]);
327    ///
328    /// // Query using Gremlin
329    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
330    /// ```
331    #[cfg(feature = "gremlin")]
332    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
333        use crate::query::{
334            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
335        };
336
337        // Parse and translate the query to a logical plan
338        let logical_plan = gremlin_translator::translate(query)?;
339
340        // Semantic validation
341        let mut binder = Binder::new();
342        let _binding_context = binder.bind(&logical_plan)?;
343
344        // Optimize the plan
345        let optimizer = Optimizer::from_store(&self.store);
346        let optimized_plan = optimizer.optimize(logical_plan)?;
347
348        // Get transaction context for MVCC visibility
349        let (viewing_epoch, tx_id) = self.get_transaction_context();
350
351        // Convert to physical plan with transaction context
352        let planner = Planner::with_context(
353            Arc::clone(&self.store),
354            Arc::clone(&self.tx_manager),
355            tx_id,
356            viewing_epoch,
357        )
358        .with_factorized_execution(self.factorized_execution);
359        let mut physical_plan = planner.plan(&optimized_plan)?;
360
361        // Execute the plan
362        let executor = Executor::with_columns(physical_plan.columns.clone());
363        executor.execute(physical_plan.operator.as_mut())
364    }
365
366    /// Executes a Gremlin query with parameters.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if the query fails to parse or execute.
371    #[cfg(feature = "gremlin")]
372    pub fn execute_gremlin_with_params(
373        &self,
374        query: &str,
375        params: std::collections::HashMap<String, Value>,
376    ) -> Result<QueryResult> {
377        use crate::query::processor::{QueryLanguage, QueryProcessor};
378
379        // Get transaction context for MVCC visibility
380        let (viewing_epoch, tx_id) = self.get_transaction_context();
381
382        // Create processor with transaction context
383        let processor =
384            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
385
386        // Apply transaction context if in a transaction
387        let processor = if let Some(tx_id) = tx_id {
388            processor.with_tx_context(viewing_epoch, tx_id)
389        } else {
390            processor
391        };
392
393        processor.process(query, QueryLanguage::Gremlin, Some(&params))
394    }
395
396    /// Executes a GraphQL query against the LPG store.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the query fails to parse or execute.
401    ///
402    /// # Examples
403    ///
404    /// ```ignore
405    /// use grafeo_engine::GrafeoDB;
406    ///
407    /// let db = GrafeoDB::new_in_memory();
408    /// let session = db.session();
409    ///
410    /// // Create some nodes first
411    /// session.create_node(&["User"]);
412    ///
413    /// // Query using GraphQL
414    /// let result = session.execute_graphql("query { user { id name } }")?;
415    /// ```
416    #[cfg(feature = "graphql")]
417    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
418        use crate::query::{
419            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
420        };
421
422        // Parse and translate the query to a logical plan
423        let logical_plan = graphql_translator::translate(query)?;
424
425        // Semantic validation
426        let mut binder = Binder::new();
427        let _binding_context = binder.bind(&logical_plan)?;
428
429        // Optimize the plan
430        let optimizer = Optimizer::from_store(&self.store);
431        let optimized_plan = optimizer.optimize(logical_plan)?;
432
433        // Get transaction context for MVCC visibility
434        let (viewing_epoch, tx_id) = self.get_transaction_context();
435
436        // Convert to physical plan with transaction context
437        let planner = Planner::with_context(
438            Arc::clone(&self.store),
439            Arc::clone(&self.tx_manager),
440            tx_id,
441            viewing_epoch,
442        )
443        .with_factorized_execution(self.factorized_execution);
444        let mut physical_plan = planner.plan(&optimized_plan)?;
445
446        // Execute the plan
447        let executor = Executor::with_columns(physical_plan.columns.clone());
448        executor.execute(physical_plan.operator.as_mut())
449    }
450
451    /// Executes a GraphQL query with parameters.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if the query fails to parse or execute.
456    #[cfg(feature = "graphql")]
457    pub fn execute_graphql_with_params(
458        &self,
459        query: &str,
460        params: std::collections::HashMap<String, Value>,
461    ) -> Result<QueryResult> {
462        use crate::query::processor::{QueryLanguage, QueryProcessor};
463
464        // Get transaction context for MVCC visibility
465        let (viewing_epoch, tx_id) = self.get_transaction_context();
466
467        // Create processor with transaction context
468        let processor =
469            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
470
471        // Apply transaction context if in a transaction
472        let processor = if let Some(tx_id) = tx_id {
473            processor.with_tx_context(viewing_epoch, tx_id)
474        } else {
475            processor
476        };
477
478        processor.process(query, QueryLanguage::GraphQL, Some(&params))
479    }
480
481    /// Executes a SPARQL query.
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if the query fails to parse or execute.
486    #[cfg(all(feature = "sparql", feature = "rdf"))]
487    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
488        use crate::query::{
489            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
490        };
491
492        // Parse and translate the SPARQL query to a logical plan
493        let logical_plan = sparql_translator::translate(query)?;
494
495        // Optimize the plan
496        let optimizer = Optimizer::from_store(&self.store);
497        let optimized_plan = optimizer.optimize(logical_plan)?;
498
499        // Convert to physical plan using RDF planner
500        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
501        let mut physical_plan = planner.plan(&optimized_plan)?;
502
503        // Execute the plan
504        let executor = Executor::with_columns(physical_plan.columns.clone());
505        executor.execute(physical_plan.operator.as_mut())
506    }
507
508    /// Executes a SPARQL query with parameters.
509    ///
510    /// # Errors
511    ///
512    /// Returns an error if the query fails to parse or execute.
513    #[cfg(all(feature = "sparql", feature = "rdf"))]
514    pub fn execute_sparql_with_params(
515        &self,
516        query: &str,
517        _params: std::collections::HashMap<String, Value>,
518    ) -> Result<QueryResult> {
519        // TODO: Implement parameter substitution for SPARQL
520        // For now, just execute the query without parameters
521        self.execute_sparql(query)
522    }
523
524    /// Begins a new transaction.
525    ///
526    /// # Errors
527    ///
528    /// Returns an error if a transaction is already active.
529    ///
530    /// # Examples
531    ///
532    /// ```ignore
533    /// use grafeo_engine::GrafeoDB;
534    ///
535    /// let db = GrafeoDB::new_in_memory();
536    /// let mut session = db.session();
537    ///
538    /// session.begin_tx()?;
539    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
540    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
541    /// session.commit()?; // Both inserts committed atomically
542    /// ```
543    pub fn begin_tx(&mut self) -> Result<()> {
544        if self.current_tx.is_some() {
545            return Err(grafeo_common::utils::error::Error::Transaction(
546                grafeo_common::utils::error::TransactionError::InvalidState(
547                    "Transaction already active".to_string(),
548                ),
549            ));
550        }
551
552        let tx_id = self.tx_manager.begin();
553        self.current_tx = Some(tx_id);
554        Ok(())
555    }
556
557    /// Begins a transaction with a specific isolation level.
558    ///
559    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
560    ///
561    /// # Errors
562    ///
563    /// Returns an error if a transaction is already active.
564    pub fn begin_tx_with_isolation(
565        &mut self,
566        isolation_level: crate::transaction::IsolationLevel,
567    ) -> Result<()> {
568        if self.current_tx.is_some() {
569            return Err(grafeo_common::utils::error::Error::Transaction(
570                grafeo_common::utils::error::TransactionError::InvalidState(
571                    "Transaction already active".to_string(),
572                ),
573            ));
574        }
575
576        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
577        self.current_tx = Some(tx_id);
578        Ok(())
579    }
580
581    /// Commits the current transaction.
582    ///
583    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
584    ///
585    /// # Errors
586    ///
587    /// Returns an error if no transaction is active.
588    pub fn commit(&mut self) -> Result<()> {
589        let tx_id = self.current_tx.take().ok_or_else(|| {
590            grafeo_common::utils::error::Error::Transaction(
591                grafeo_common::utils::error::TransactionError::InvalidState(
592                    "No active transaction".to_string(),
593                ),
594            )
595        })?;
596
597        // Commit RDF store pending operations
598        #[cfg(feature = "rdf")]
599        self.rdf_store.commit_tx(tx_id);
600
601        self.tx_manager.commit(tx_id).map(|_| ())
602    }
603
604    /// Aborts the current transaction.
605    ///
606    /// Discards all changes since [`begin_tx`](Self::begin_tx).
607    ///
608    /// # Errors
609    ///
610    /// Returns an error if no transaction is active.
611    ///
612    /// # Examples
613    ///
614    /// ```ignore
615    /// use grafeo_engine::GrafeoDB;
616    ///
617    /// let db = GrafeoDB::new_in_memory();
618    /// let mut session = db.session();
619    ///
620    /// session.begin_tx()?;
621    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
622    /// session.rollback()?; // Insert is discarded
623    /// ```
624    pub fn rollback(&mut self) -> Result<()> {
625        let tx_id = self.current_tx.take().ok_or_else(|| {
626            grafeo_common::utils::error::Error::Transaction(
627                grafeo_common::utils::error::TransactionError::InvalidState(
628                    "No active transaction".to_string(),
629                ),
630            )
631        })?;
632
633        // Discard uncommitted versions in the LPG store
634        self.store.discard_uncommitted_versions(tx_id);
635
636        // Discard pending operations in the RDF store
637        #[cfg(feature = "rdf")]
638        self.rdf_store.rollback_tx(tx_id);
639
640        // Mark transaction as aborted in the manager
641        self.tx_manager.abort(tx_id)
642    }
643
644    /// Returns whether a transaction is active.
645    #[must_use]
646    pub fn in_transaction(&self) -> bool {
647        self.current_tx.is_some()
648    }
649
650    /// Sets auto-commit mode.
651    pub fn set_auto_commit(&mut self, auto_commit: bool) {
652        self.auto_commit = auto_commit;
653    }
654
655    /// Returns whether auto-commit is enabled.
656    #[must_use]
657    pub fn auto_commit(&self) -> bool {
658        self.auto_commit
659    }
660
661    /// Returns the current transaction context for MVCC visibility.
662    ///
663    /// Returns `(viewing_epoch, tx_id)` where:
664    /// - `viewing_epoch` is the epoch at which to check version visibility
665    /// - `tx_id` is the current transaction ID (if in a transaction)
666    #[must_use]
667    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
668        if let Some(tx_id) = self.current_tx {
669            // In a transaction - use the transaction's start epoch
670            let epoch = self
671                .tx_manager
672                .start_epoch(tx_id)
673                .unwrap_or_else(|| self.tx_manager.current_epoch());
674            (epoch, Some(tx_id))
675        } else {
676            // No transaction - use current epoch
677            (self.tx_manager.current_epoch(), None)
678        }
679    }
680
681    /// Creates a node directly (bypassing query execution).
682    ///
683    /// This is a low-level API for testing and direct manipulation.
684    /// If a transaction is active, the node will be versioned with the transaction ID.
685    pub fn create_node(&self, labels: &[&str]) -> NodeId {
686        let (epoch, tx_id) = self.get_transaction_context();
687        self.store
688            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
689    }
690
691    /// Creates a node with properties.
692    ///
693    /// If a transaction is active, the node will be versioned with the transaction ID.
694    pub fn create_node_with_props<'a>(
695        &self,
696        labels: &[&str],
697        properties: impl IntoIterator<Item = (&'a str, Value)>,
698    ) -> NodeId {
699        let (epoch, tx_id) = self.get_transaction_context();
700        self.store.create_node_with_props_versioned(
701            labels,
702            properties.into_iter().map(|(k, v)| (k, v)),
703            epoch,
704            tx_id.unwrap_or(TxId::SYSTEM),
705        )
706    }
707
708    /// Creates an edge between two nodes.
709    ///
710    /// This is a low-level API for testing and direct manipulation.
711    /// If a transaction is active, the edge will be versioned with the transaction ID.
712    pub fn create_edge(
713        &self,
714        src: NodeId,
715        dst: NodeId,
716        edge_type: &str,
717    ) -> grafeo_common::types::EdgeId {
718        let (epoch, tx_id) = self.get_transaction_context();
719        self.store
720            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
721    }
722
723    // =========================================================================
724    // Direct Lookup APIs (bypass query planning for O(1) point reads)
725    // =========================================================================
726
727    /// Gets a node by ID directly, bypassing query planning.
728    ///
729    /// This is the fastest way to retrieve a single node when you know its ID.
730    /// Skips parsing, binding, optimization, and physical planning entirely.
731    ///
732    /// # Performance
733    ///
734    /// - Time complexity: O(1) average case
735    /// - No lock contention (uses DashMap internally)
736    /// - ~20-30x faster than equivalent MATCH query
737    ///
738    /// # Example
739    ///
740    /// ```ignore
741    /// let session = db.session();
742    /// let node_id = session.create_node(&["Person"]);
743    ///
744    /// // Direct lookup - O(1), no query planning
745    /// let node = session.get_node(node_id);
746    /// assert!(node.is_some());
747    /// ```
748    #[must_use]
749    pub fn get_node(&self, id: NodeId) -> Option<Node> {
750        let (epoch, tx_id) = self.get_transaction_context();
751        self.store
752            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
753    }
754
755    /// Gets a single property from a node by ID, bypassing query planning.
756    ///
757    /// More efficient than `get_node()` when you only need one property,
758    /// as it avoids loading the full node with all properties.
759    ///
760    /// # Performance
761    ///
762    /// - Time complexity: O(1) average case
763    /// - No query planning overhead
764    ///
765    /// # Example
766    ///
767    /// ```ignore
768    /// let session = db.session();
769    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
770    ///
771    /// // Direct property access - O(1)
772    /// let name = session.get_node_property(id, "name");
773    /// assert_eq!(name, Some(Value::String("Alice".into())));
774    /// ```
775    #[must_use]
776    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
777        self.get_node(id)
778            .and_then(|node| node.get_property(key).cloned())
779    }
780
781    /// Gets an edge by ID directly, bypassing query planning.
782    ///
783    /// # Performance
784    ///
785    /// - Time complexity: O(1) average case
786    /// - No lock contention
787    #[must_use]
788    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
789        let (epoch, tx_id) = self.get_transaction_context();
790        self.store
791            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
792    }
793
794    /// Gets outgoing neighbors of a node directly, bypassing query planning.
795    ///
796    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
797    ///
798    /// # Performance
799    ///
800    /// - Time complexity: O(degree) where degree is the number of outgoing edges
801    /// - Uses adjacency index for direct access
802    /// - ~10-20x faster than equivalent MATCH query
803    ///
804    /// # Example
805    ///
806    /// ```ignore
807    /// let session = db.session();
808    /// let alice = session.create_node(&["Person"]);
809    /// let bob = session.create_node(&["Person"]);
810    /// session.create_edge(alice, bob, "KNOWS");
811    ///
812    /// // Direct neighbor lookup - O(degree)
813    /// let neighbors = session.get_neighbors_outgoing(alice);
814    /// assert_eq!(neighbors.len(), 1);
815    /// assert_eq!(neighbors[0].0, bob);
816    /// ```
817    #[must_use]
818    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
819        self.store.edges_from(node, Direction::Outgoing).collect()
820    }
821
822    /// Gets incoming neighbors of a node directly, bypassing query planning.
823    ///
824    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
825    ///
826    /// # Performance
827    ///
828    /// - Time complexity: O(degree) where degree is the number of incoming edges
829    /// - Uses backward adjacency index for direct access
830    #[must_use]
831    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
832        self.store.edges_from(node, Direction::Incoming).collect()
833    }
834
835    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
836    ///
837    /// # Example
838    ///
839    /// ```ignore
840    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
841    /// ```
842    #[must_use]
843    pub fn get_neighbors_outgoing_by_type(
844        &self,
845        node: NodeId,
846        edge_type: &str,
847    ) -> Vec<(NodeId, EdgeId)> {
848        self.store
849            .edges_from(node, Direction::Outgoing)
850            .filter(|(_, edge_id)| {
851                self.get_edge(*edge_id)
852                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
853            })
854            .collect()
855    }
856
857    /// Checks if a node exists, bypassing query planning.
858    ///
859    /// # Performance
860    ///
861    /// - Time complexity: O(1)
862    /// - Fastest existence check available
863    #[must_use]
864    pub fn node_exists(&self, id: NodeId) -> bool {
865        self.get_node(id).is_some()
866    }
867
868    /// Checks if an edge exists, bypassing query planning.
869    #[must_use]
870    pub fn edge_exists(&self, id: EdgeId) -> bool {
871        self.get_edge(id).is_some()
872    }
873
874    /// Gets the degree (number of edges) of a node.
875    ///
876    /// Returns (outgoing_degree, incoming_degree).
877    #[must_use]
878    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
879        let out = self.store.out_degree(node);
880        let in_degree = self.store.in_degree(node);
881        (out, in_degree)
882    }
883
884    /// Batch lookup of multiple nodes by ID.
885    ///
886    /// More efficient than calling `get_node()` in a loop because it
887    /// amortizes overhead.
888    ///
889    /// # Performance
890    ///
891    /// - Time complexity: O(n) where n is the number of IDs
892    /// - Better cache utilization than individual lookups
893    #[must_use]
894    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
895        let (epoch, tx_id) = self.get_transaction_context();
896        let tx = tx_id.unwrap_or(TxId::SYSTEM);
897        ids.iter()
898            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
899            .collect()
900    }
901}
902
903#[cfg(test)]
904mod tests {
905    use crate::database::GrafeoDB;
906
907    #[test]
908    fn test_session_create_node() {
909        let db = GrafeoDB::new_in_memory();
910        let session = db.session();
911
912        let id = session.create_node(&["Person"]);
913        assert!(id.is_valid());
914        assert_eq!(db.node_count(), 1);
915    }
916
917    #[test]
918    fn test_session_transaction() {
919        let db = GrafeoDB::new_in_memory();
920        let mut session = db.session();
921
922        assert!(!session.in_transaction());
923
924        session.begin_tx().unwrap();
925        assert!(session.in_transaction());
926
927        session.commit().unwrap();
928        assert!(!session.in_transaction());
929    }
930
931    #[test]
932    fn test_session_transaction_context() {
933        let db = GrafeoDB::new_in_memory();
934        let mut session = db.session();
935
936        // Without transaction - context should have current epoch and no tx_id
937        let (_epoch1, tx_id1) = session.get_transaction_context();
938        assert!(tx_id1.is_none());
939
940        // Start a transaction
941        session.begin_tx().unwrap();
942        let (epoch2, tx_id2) = session.get_transaction_context();
943        assert!(tx_id2.is_some());
944        // Transaction should have a valid epoch
945        let _ = epoch2; // Use the variable
946
947        // Commit and verify
948        session.commit().unwrap();
949        let (epoch3, tx_id3) = session.get_transaction_context();
950        assert!(tx_id3.is_none());
951        // Epoch should have advanced after commit
952        assert!(epoch3.as_u64() >= epoch2.as_u64());
953    }
954
955    #[test]
956    fn test_session_rollback() {
957        let db = GrafeoDB::new_in_memory();
958        let mut session = db.session();
959
960        session.begin_tx().unwrap();
961        session.rollback().unwrap();
962        assert!(!session.in_transaction());
963    }
964
965    #[test]
966    fn test_session_rollback_discards_versions() {
967        use grafeo_common::types::TxId;
968
969        let db = GrafeoDB::new_in_memory();
970
971        // Create a node outside of any transaction (at system level)
972        let node_before = db.store().create_node(&["Person"]);
973        assert!(node_before.is_valid());
974        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
975
976        // Start a transaction
977        let mut session = db.session();
978        session.begin_tx().unwrap();
979        let tx_id = session.current_tx.unwrap();
980
981        // Create a node versioned with the transaction's ID
982        let epoch = db.store().current_epoch();
983        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
984        assert!(node_in_tx.is_valid());
985
986        // Should see 2 nodes at this point
987        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
988
989        // Rollback the transaction
990        session.rollback().unwrap();
991        assert!(!session.in_transaction());
992
993        // The node created in the transaction should be discarded
994        // Only the first node should remain visible
995        let count_after = db.node_count();
996        assert_eq!(
997            count_after, 1,
998            "Rollback should discard uncommitted node, but got {count_after}"
999        );
1000
1001        // The original node should still be accessible
1002        let current_epoch = db.store().current_epoch();
1003        assert!(
1004            db.store()
1005                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1006                .is_some(),
1007            "Original node should still exist"
1008        );
1009
1010        // The node created in the transaction should not be accessible
1011        assert!(
1012            db.store()
1013                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1014                .is_none(),
1015            "Transaction node should be gone"
1016        );
1017    }
1018
1019    #[test]
1020    fn test_session_create_node_in_transaction() {
1021        // Test that session.create_node() is transaction-aware
1022        let db = GrafeoDB::new_in_memory();
1023
1024        // Create a node outside of any transaction
1025        let node_before = db.create_node(&["Person"]);
1026        assert!(node_before.is_valid());
1027        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1028
1029        // Start a transaction and create a node through the session
1030        let mut session = db.session();
1031        session.begin_tx().unwrap();
1032
1033        // Create a node through session.create_node() - should be versioned with tx
1034        let node_in_tx = session.create_node(&["Person"]);
1035        assert!(node_in_tx.is_valid());
1036
1037        // Should see 2 nodes at this point
1038        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1039
1040        // Rollback the transaction
1041        session.rollback().unwrap();
1042
1043        // The node created via session.create_node() should be discarded
1044        let count_after = db.node_count();
1045        assert_eq!(
1046            count_after, 1,
1047            "Rollback should discard node created via session.create_node(), but got {count_after}"
1048        );
1049    }
1050
1051    #[test]
1052    fn test_session_create_node_with_props_in_transaction() {
1053        use grafeo_common::types::Value;
1054
1055        // Test that session.create_node_with_props() is transaction-aware
1056        let db = GrafeoDB::new_in_memory();
1057
1058        // Create a node outside of any transaction
1059        db.create_node(&["Person"]);
1060        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1061
1062        // Start a transaction and create a node with properties
1063        let mut session = db.session();
1064        session.begin_tx().unwrap();
1065
1066        let node_in_tx =
1067            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1068        assert!(node_in_tx.is_valid());
1069
1070        // Should see 2 nodes
1071        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1072
1073        // Rollback the transaction
1074        session.rollback().unwrap();
1075
1076        // The node should be discarded
1077        let count_after = db.node_count();
1078        assert_eq!(
1079            count_after, 1,
1080            "Rollback should discard node created via session.create_node_with_props()"
1081        );
1082    }
1083
1084    #[cfg(feature = "gql")]
1085    mod gql_tests {
1086        use super::*;
1087
1088        #[test]
1089        fn test_gql_query_execution() {
1090            let db = GrafeoDB::new_in_memory();
1091            let session = db.session();
1092
1093            // Create some test data
1094            session.create_node(&["Person"]);
1095            session.create_node(&["Person"]);
1096            session.create_node(&["Animal"]);
1097
1098            // Execute a GQL query
1099            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1100
1101            // Should return 2 Person nodes
1102            assert_eq!(result.row_count(), 2);
1103            assert_eq!(result.column_count(), 1);
1104            assert_eq!(result.columns[0], "n");
1105        }
1106
1107        #[test]
1108        fn test_gql_empty_result() {
1109            let db = GrafeoDB::new_in_memory();
1110            let session = db.session();
1111
1112            // No data in database
1113            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1114
1115            assert_eq!(result.row_count(), 0);
1116        }
1117
1118        #[test]
1119        fn test_gql_parse_error() {
1120            let db = GrafeoDB::new_in_memory();
1121            let session = db.session();
1122
1123            // Invalid GQL syntax
1124            let result = session.execute("MATCH (n RETURN n");
1125
1126            assert!(result.is_err());
1127        }
1128
1129        #[test]
1130        fn test_gql_relationship_traversal() {
1131            let db = GrafeoDB::new_in_memory();
1132            let session = db.session();
1133
1134            // Create a graph: Alice -> Bob, Alice -> Charlie
1135            let alice = session.create_node(&["Person"]);
1136            let bob = session.create_node(&["Person"]);
1137            let charlie = session.create_node(&["Person"]);
1138
1139            session.create_edge(alice, bob, "KNOWS");
1140            session.create_edge(alice, charlie, "KNOWS");
1141
1142            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1143            let result = session
1144                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1145                .unwrap();
1146
1147            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1148            assert_eq!(result.row_count(), 2);
1149            assert_eq!(result.column_count(), 2);
1150            assert_eq!(result.columns[0], "a");
1151            assert_eq!(result.columns[1], "b");
1152        }
1153
1154        #[test]
1155        fn test_gql_relationship_with_type_filter() {
1156            let db = GrafeoDB::new_in_memory();
1157            let session = db.session();
1158
1159            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1160            let alice = session.create_node(&["Person"]);
1161            let bob = session.create_node(&["Person"]);
1162            let charlie = session.create_node(&["Person"]);
1163
1164            session.create_edge(alice, bob, "KNOWS");
1165            session.create_edge(alice, charlie, "WORKS_WITH");
1166
1167            // Query only KNOWS relationships
1168            let result = session
1169                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1170                .unwrap();
1171
1172            // Should return only 1 row (Alice->Bob)
1173            assert_eq!(result.row_count(), 1);
1174        }
1175
1176        #[test]
1177        fn test_gql_semantic_error_undefined_variable() {
1178            let db = GrafeoDB::new_in_memory();
1179            let session = db.session();
1180
1181            // Reference undefined variable 'x' in RETURN
1182            let result = session.execute("MATCH (n:Person) RETURN x");
1183
1184            // Should fail with semantic error
1185            assert!(result.is_err());
1186            let Err(err) = result else {
1187                panic!("Expected error")
1188            };
1189            assert!(
1190                err.to_string().contains("Undefined variable"),
1191                "Expected undefined variable error, got: {}",
1192                err
1193            );
1194        }
1195
1196        #[test]
1197        fn test_gql_where_clause_property_filter() {
1198            use grafeo_common::types::Value;
1199
1200            let db = GrafeoDB::new_in_memory();
1201            let session = db.session();
1202
1203            // Create people with ages
1204            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1205            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1206            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1207
1208            // Query with WHERE clause: age > 30
1209            let result = session
1210                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1211                .unwrap();
1212
1213            // Should return 2 people (ages 35 and 45)
1214            assert_eq!(result.row_count(), 2);
1215        }
1216
1217        #[test]
1218        fn test_gql_where_clause_equality() {
1219            use grafeo_common::types::Value;
1220
1221            let db = GrafeoDB::new_in_memory();
1222            let session = db.session();
1223
1224            // Create people with names
1225            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1226            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1227            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1228
1229            // Query with WHERE clause: name = "Alice"
1230            let result = session
1231                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1232                .unwrap();
1233
1234            // Should return 2 people named Alice
1235            assert_eq!(result.row_count(), 2);
1236        }
1237
1238        #[test]
1239        fn test_gql_return_property_access() {
1240            use grafeo_common::types::Value;
1241
1242            let db = GrafeoDB::new_in_memory();
1243            let session = db.session();
1244
1245            // Create people with names and ages
1246            session.create_node_with_props(
1247                &["Person"],
1248                [
1249                    ("name", Value::String("Alice".into())),
1250                    ("age", Value::Int64(30)),
1251                ],
1252            );
1253            session.create_node_with_props(
1254                &["Person"],
1255                [
1256                    ("name", Value::String("Bob".into())),
1257                    ("age", Value::Int64(25)),
1258                ],
1259            );
1260
1261            // Query returning properties
1262            let result = session
1263                .execute("MATCH (n:Person) RETURN n.name, n.age")
1264                .unwrap();
1265
1266            // Should return 2 rows with name and age columns
1267            assert_eq!(result.row_count(), 2);
1268            assert_eq!(result.column_count(), 2);
1269            assert_eq!(result.columns[0], "n.name");
1270            assert_eq!(result.columns[1], "n.age");
1271
1272            // Check that we get actual values
1273            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1274            assert!(names.contains(&&Value::String("Alice".into())));
1275            assert!(names.contains(&&Value::String("Bob".into())));
1276        }
1277
1278        #[test]
1279        fn test_gql_return_mixed_expressions() {
1280            use grafeo_common::types::Value;
1281
1282            let db = GrafeoDB::new_in_memory();
1283            let session = db.session();
1284
1285            // Create a person
1286            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1287
1288            // Query returning both node and property
1289            let result = session
1290                .execute("MATCH (n:Person) RETURN n, n.name")
1291                .unwrap();
1292
1293            assert_eq!(result.row_count(), 1);
1294            assert_eq!(result.column_count(), 2);
1295            assert_eq!(result.columns[0], "n");
1296            assert_eq!(result.columns[1], "n.name");
1297
1298            // Second column should be the name
1299            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1300        }
1301    }
1302
1303    #[cfg(feature = "cypher")]
1304    mod cypher_tests {
1305        use super::*;
1306
1307        #[test]
1308        fn test_cypher_query_execution() {
1309            let db = GrafeoDB::new_in_memory();
1310            let session = db.session();
1311
1312            // Create some test data
1313            session.create_node(&["Person"]);
1314            session.create_node(&["Person"]);
1315            session.create_node(&["Animal"]);
1316
1317            // Execute a Cypher query
1318            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1319
1320            // Should return 2 Person nodes
1321            assert_eq!(result.row_count(), 2);
1322            assert_eq!(result.column_count(), 1);
1323            assert_eq!(result.columns[0], "n");
1324        }
1325
1326        #[test]
1327        fn test_cypher_empty_result() {
1328            let db = GrafeoDB::new_in_memory();
1329            let session = db.session();
1330
1331            // No data in database
1332            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1333
1334            assert_eq!(result.row_count(), 0);
1335        }
1336
1337        #[test]
1338        fn test_cypher_parse_error() {
1339            let db = GrafeoDB::new_in_memory();
1340            let session = db.session();
1341
1342            // Invalid Cypher syntax
1343            let result = session.execute_cypher("MATCH (n RETURN n");
1344
1345            assert!(result.is_err());
1346        }
1347    }
1348
1349    // ==================== Direct Lookup API Tests ====================
1350
1351    mod direct_lookup_tests {
1352        use super::*;
1353        use grafeo_common::types::Value;
1354
1355        #[test]
1356        fn test_get_node() {
1357            let db = GrafeoDB::new_in_memory();
1358            let session = db.session();
1359
1360            let id = session.create_node(&["Person"]);
1361            let node = session.get_node(id);
1362
1363            assert!(node.is_some());
1364            let node = node.unwrap();
1365            assert_eq!(node.id, id);
1366        }
1367
1368        #[test]
1369        fn test_get_node_not_found() {
1370            use grafeo_common::types::NodeId;
1371
1372            let db = GrafeoDB::new_in_memory();
1373            let session = db.session();
1374
1375            // Try to get a non-existent node
1376            let node = session.get_node(NodeId::new(9999));
1377            assert!(node.is_none());
1378        }
1379
1380        #[test]
1381        fn test_get_node_property() {
1382            let db = GrafeoDB::new_in_memory();
1383            let session = db.session();
1384
1385            let id = session
1386                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1387
1388            let name = session.get_node_property(id, "name");
1389            assert_eq!(name, Some(Value::String("Alice".into())));
1390
1391            // Non-existent property
1392            let missing = session.get_node_property(id, "missing");
1393            assert!(missing.is_none());
1394        }
1395
1396        #[test]
1397        fn test_get_edge() {
1398            let db = GrafeoDB::new_in_memory();
1399            let session = db.session();
1400
1401            let alice = session.create_node(&["Person"]);
1402            let bob = session.create_node(&["Person"]);
1403            let edge_id = session.create_edge(alice, bob, "KNOWS");
1404
1405            let edge = session.get_edge(edge_id);
1406            assert!(edge.is_some());
1407            let edge = edge.unwrap();
1408            assert_eq!(edge.id, edge_id);
1409            assert_eq!(edge.src, alice);
1410            assert_eq!(edge.dst, bob);
1411        }
1412
1413        #[test]
1414        fn test_get_edge_not_found() {
1415            use grafeo_common::types::EdgeId;
1416
1417            let db = GrafeoDB::new_in_memory();
1418            let session = db.session();
1419
1420            let edge = session.get_edge(EdgeId::new(9999));
1421            assert!(edge.is_none());
1422        }
1423
1424        #[test]
1425        fn test_get_neighbors_outgoing() {
1426            let db = GrafeoDB::new_in_memory();
1427            let session = db.session();
1428
1429            let alice = session.create_node(&["Person"]);
1430            let bob = session.create_node(&["Person"]);
1431            let carol = session.create_node(&["Person"]);
1432
1433            session.create_edge(alice, bob, "KNOWS");
1434            session.create_edge(alice, carol, "KNOWS");
1435
1436            let neighbors = session.get_neighbors_outgoing(alice);
1437            assert_eq!(neighbors.len(), 2);
1438
1439            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1440            assert!(neighbor_ids.contains(&bob));
1441            assert!(neighbor_ids.contains(&carol));
1442        }
1443
1444        #[test]
1445        fn test_get_neighbors_incoming() {
1446            let db = GrafeoDB::new_in_memory();
1447            let session = db.session();
1448
1449            let alice = session.create_node(&["Person"]);
1450            let bob = session.create_node(&["Person"]);
1451            let carol = session.create_node(&["Person"]);
1452
1453            session.create_edge(bob, alice, "KNOWS");
1454            session.create_edge(carol, alice, "KNOWS");
1455
1456            let neighbors = session.get_neighbors_incoming(alice);
1457            assert_eq!(neighbors.len(), 2);
1458
1459            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1460            assert!(neighbor_ids.contains(&bob));
1461            assert!(neighbor_ids.contains(&carol));
1462        }
1463
1464        #[test]
1465        fn test_get_neighbors_outgoing_by_type() {
1466            let db = GrafeoDB::new_in_memory();
1467            let session = db.session();
1468
1469            let alice = session.create_node(&["Person"]);
1470            let bob = session.create_node(&["Person"]);
1471            let company = session.create_node(&["Company"]);
1472
1473            session.create_edge(alice, bob, "KNOWS");
1474            session.create_edge(alice, company, "WORKS_AT");
1475
1476            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1477            assert_eq!(knows_neighbors.len(), 1);
1478            assert_eq!(knows_neighbors[0].0, bob);
1479
1480            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1481            assert_eq!(works_neighbors.len(), 1);
1482            assert_eq!(works_neighbors[0].0, company);
1483
1484            // No edges of this type
1485            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1486            assert!(no_neighbors.is_empty());
1487        }
1488
1489        #[test]
1490        fn test_node_exists() {
1491            use grafeo_common::types::NodeId;
1492
1493            let db = GrafeoDB::new_in_memory();
1494            let session = db.session();
1495
1496            let id = session.create_node(&["Person"]);
1497
1498            assert!(session.node_exists(id));
1499            assert!(!session.node_exists(NodeId::new(9999)));
1500        }
1501
1502        #[test]
1503        fn test_edge_exists() {
1504            use grafeo_common::types::EdgeId;
1505
1506            let db = GrafeoDB::new_in_memory();
1507            let session = db.session();
1508
1509            let alice = session.create_node(&["Person"]);
1510            let bob = session.create_node(&["Person"]);
1511            let edge_id = session.create_edge(alice, bob, "KNOWS");
1512
1513            assert!(session.edge_exists(edge_id));
1514            assert!(!session.edge_exists(EdgeId::new(9999)));
1515        }
1516
1517        #[test]
1518        fn test_get_degree() {
1519            let db = GrafeoDB::new_in_memory();
1520            let session = db.session();
1521
1522            let alice = session.create_node(&["Person"]);
1523            let bob = session.create_node(&["Person"]);
1524            let carol = session.create_node(&["Person"]);
1525
1526            // Alice knows Bob and Carol (2 outgoing)
1527            session.create_edge(alice, bob, "KNOWS");
1528            session.create_edge(alice, carol, "KNOWS");
1529            // Bob knows Alice (1 incoming for Alice)
1530            session.create_edge(bob, alice, "KNOWS");
1531
1532            let (out_degree, in_degree) = session.get_degree(alice);
1533            assert_eq!(out_degree, 2);
1534            assert_eq!(in_degree, 1);
1535
1536            // Node with no edges
1537            let lonely = session.create_node(&["Person"]);
1538            let (out, in_deg) = session.get_degree(lonely);
1539            assert_eq!(out, 0);
1540            assert_eq!(in_deg, 0);
1541        }
1542
1543        #[test]
1544        fn test_get_nodes_batch() {
1545            let db = GrafeoDB::new_in_memory();
1546            let session = db.session();
1547
1548            let alice = session.create_node(&["Person"]);
1549            let bob = session.create_node(&["Person"]);
1550            let carol = session.create_node(&["Person"]);
1551
1552            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1553            assert_eq!(nodes.len(), 3);
1554            assert!(nodes[0].is_some());
1555            assert!(nodes[1].is_some());
1556            assert!(nodes[2].is_some());
1557
1558            // With non-existent node
1559            use grafeo_common::types::NodeId;
1560            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1561            assert_eq!(nodes_with_missing.len(), 3);
1562            assert!(nodes_with_missing[0].is_some());
1563            assert!(nodes_with_missing[1].is_none()); // Missing node
1564            assert!(nodes_with_missing[2].is_some());
1565        }
1566
1567        #[test]
1568        fn test_auto_commit_setting() {
1569            let db = GrafeoDB::new_in_memory();
1570            let mut session = db.session();
1571
1572            // Default is auto-commit enabled
1573            assert!(session.auto_commit());
1574
1575            session.set_auto_commit(false);
1576            assert!(!session.auto_commit());
1577
1578            session.set_auto_commit(true);
1579            assert!(session.auto_commit());
1580        }
1581
1582        #[test]
1583        fn test_transaction_double_begin_error() {
1584            let db = GrafeoDB::new_in_memory();
1585            let mut session = db.session();
1586
1587            session.begin_tx().unwrap();
1588            let result = session.begin_tx();
1589
1590            assert!(result.is_err());
1591            // Clean up
1592            session.rollback().unwrap();
1593        }
1594
1595        #[test]
1596        fn test_commit_without_transaction_error() {
1597            let db = GrafeoDB::new_in_memory();
1598            let mut session = db.session();
1599
1600            let result = session.commit();
1601            assert!(result.is_err());
1602        }
1603
1604        #[test]
1605        fn test_rollback_without_transaction_error() {
1606            let db = GrafeoDB::new_in_memory();
1607            let mut session = db.session();
1608
1609            let result = session.rollback();
1610            assert!(result.is_err());
1611        }
1612
1613        #[test]
1614        fn test_create_edge_in_transaction() {
1615            let db = GrafeoDB::new_in_memory();
1616            let mut session = db.session();
1617
1618            // Create nodes outside transaction
1619            let alice = session.create_node(&["Person"]);
1620            let bob = session.create_node(&["Person"]);
1621
1622            // Create edge in transaction
1623            session.begin_tx().unwrap();
1624            let edge_id = session.create_edge(alice, bob, "KNOWS");
1625
1626            // Edge should be visible in the transaction
1627            assert!(session.edge_exists(edge_id));
1628
1629            // Commit
1630            session.commit().unwrap();
1631
1632            // Edge should still be visible
1633            assert!(session.edge_exists(edge_id));
1634        }
1635
1636        #[test]
1637        fn test_neighbors_empty_node() {
1638            let db = GrafeoDB::new_in_memory();
1639            let session = db.session();
1640
1641            let lonely = session.create_node(&["Person"]);
1642
1643            assert!(session.get_neighbors_outgoing(lonely).is_empty());
1644            assert!(session.get_neighbors_incoming(lonely).is_empty());
1645            assert!(
1646                session
1647                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1648                    .is_empty()
1649            );
1650        }
1651    }
1652}