Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8
9use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
10use grafeo_common::utils::error::Result;
11use grafeo_core::graph::Direction;
12use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
13#[cfg(feature = "rdf")]
14use grafeo_core::graph::rdf::RdfStore;
15
16use crate::config::AdaptiveConfig;
17use crate::database::QueryResult;
18use crate::query::cache::QueryCache;
19use crate::transaction::TransactionManager;
20
21/// Your handle to the database - execute queries and manage transactions.
22///
23/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
24/// tracks its own transaction state, so you can have multiple concurrent
25/// sessions without them interfering.
26pub struct Session {
27    /// The underlying store.
28    store: Arc<LpgStore>,
29    /// RDF triple store (if RDF feature is enabled).
30    #[cfg(feature = "rdf")]
31    #[allow(dead_code)]
32    rdf_store: Arc<RdfStore>,
33    /// Transaction manager.
34    tx_manager: Arc<TransactionManager>,
35    /// Query cache shared across sessions.
36    query_cache: Arc<QueryCache>,
37    /// Current transaction ID (if any).
38    current_tx: Option<TxId>,
39    /// Whether the session is in auto-commit mode.
40    auto_commit: bool,
41    /// Adaptive execution configuration.
42    #[allow(dead_code)]
43    adaptive_config: AdaptiveConfig,
44    /// Whether to use factorized execution for multi-hop queries.
45    factorized_execution: bool,
46}
47
48impl Session {
49    /// Creates a new session.
50    #[allow(dead_code)]
51    pub(crate) fn new(
52        store: Arc<LpgStore>,
53        tx_manager: Arc<TransactionManager>,
54        query_cache: Arc<QueryCache>,
55    ) -> Self {
56        Self {
57            store,
58            #[cfg(feature = "rdf")]
59            rdf_store: Arc::new(RdfStore::new()),
60            tx_manager,
61            query_cache,
62            current_tx: None,
63            auto_commit: true,
64            adaptive_config: AdaptiveConfig::default(),
65            factorized_execution: true,
66        }
67    }
68
69    /// Creates a new session with adaptive execution configuration.
70    #[allow(dead_code)]
71    pub(crate) fn with_adaptive(
72        store: Arc<LpgStore>,
73        tx_manager: Arc<TransactionManager>,
74        query_cache: Arc<QueryCache>,
75        adaptive_config: AdaptiveConfig,
76        factorized_execution: bool,
77    ) -> Self {
78        Self {
79            store,
80            #[cfg(feature = "rdf")]
81            rdf_store: Arc::new(RdfStore::new()),
82            tx_manager,
83            query_cache,
84            current_tx: None,
85            auto_commit: true,
86            adaptive_config,
87            factorized_execution,
88        }
89    }
90
91    /// Creates a new session with RDF store and adaptive configuration.
92    #[cfg(feature = "rdf")]
93    pub(crate) fn with_rdf_store_and_adaptive(
94        store: Arc<LpgStore>,
95        rdf_store: Arc<RdfStore>,
96        tx_manager: Arc<TransactionManager>,
97        query_cache: Arc<QueryCache>,
98        adaptive_config: AdaptiveConfig,
99        factorized_execution: bool,
100    ) -> Self {
101        Self {
102            store,
103            rdf_store,
104            tx_manager,
105            query_cache,
106            current_tx: None,
107            auto_commit: true,
108            adaptive_config,
109            factorized_execution,
110        }
111    }
112
113    /// Executes a GQL query.
114    ///
115    /// # Errors
116    ///
117    /// Returns an error if the query fails to parse or execute.
118    ///
119    /// # Examples
120    ///
121    /// ```ignore
122    /// use grafeo_engine::GrafeoDB;
123    ///
124    /// let db = GrafeoDB::new_in_memory();
125    /// let session = db.session();
126    ///
127    /// // Create a node
128    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
129    ///
130    /// // Query nodes
131    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
132    /// for row in result {
133    ///     println!("{:?}", row);
134    /// }
135    /// ```
136    #[cfg(feature = "gql")]
137    pub fn execute(&self, query: &str) -> Result<QueryResult> {
138        use crate::query::{
139            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
140            optimizer::Optimizer, processor::QueryLanguage,
141        };
142
143        let start_time = std::time::Instant::now();
144
145        // Create cache key for this query
146        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
147
148        // Try to get cached optimized plan
149        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
150            // Cache hit - skip parsing, translation, binding, and optimization
151            cached_plan
152        } else {
153            // Cache miss - run full pipeline
154
155            // Parse and translate the query to a logical plan
156            let logical_plan = gql_translator::translate(query)?;
157
158            // Semantic validation
159            let mut binder = Binder::new();
160            let _binding_context = binder.bind(&logical_plan)?;
161
162            // Optimize the plan
163            let optimizer = Optimizer::from_store(&self.store);
164            let plan = optimizer.optimize(logical_plan)?;
165
166            // Cache the optimized plan for future use
167            self.query_cache.put_optimized(cache_key, plan.clone());
168
169            plan
170        };
171
172        // Get transaction context for MVCC visibility
173        let (viewing_epoch, tx_id) = self.get_transaction_context();
174
175        // Convert to physical plan with transaction context
176        // (Physical planning cannot be cached as it depends on transaction state)
177        let planner = Planner::with_context(
178            Arc::clone(&self.store),
179            Arc::clone(&self.tx_manager),
180            tx_id,
181            viewing_epoch,
182        )
183        .with_factorized_execution(self.factorized_execution);
184        let mut physical_plan = planner.plan(&optimized_plan)?;
185
186        // Execute the plan
187        let executor = Executor::with_columns(physical_plan.columns.clone());
188        let mut result = executor.execute(physical_plan.operator.as_mut())?;
189
190        // Add execution metrics
191        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
192        let rows_scanned = result.rows.len() as u64;
193        result.execution_time_ms = Some(elapsed_ms);
194        result.rows_scanned = Some(rows_scanned);
195
196        Ok(result)
197    }
198
199    /// Executes a GQL query with parameters.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if the query fails to parse or execute.
204    #[cfg(feature = "gql")]
205    pub fn execute_with_params(
206        &self,
207        query: &str,
208        params: std::collections::HashMap<String, Value>,
209    ) -> Result<QueryResult> {
210        use crate::query::processor::{QueryLanguage, QueryProcessor};
211
212        // Get transaction context for MVCC visibility
213        let (viewing_epoch, tx_id) = self.get_transaction_context();
214
215        // Create processor with transaction context
216        let processor =
217            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
218
219        // Apply transaction context if in a transaction
220        let processor = if let Some(tx_id) = tx_id {
221            processor.with_tx_context(viewing_epoch, tx_id)
222        } else {
223            processor
224        };
225
226        processor.process(query, QueryLanguage::Gql, Some(&params))
227    }
228
229    /// Executes a GQL query with parameters.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if no query language is enabled.
234    #[cfg(not(any(feature = "gql", feature = "cypher")))]
235    pub fn execute_with_params(
236        &self,
237        _query: &str,
238        _params: std::collections::HashMap<String, Value>,
239    ) -> Result<QueryResult> {
240        Err(grafeo_common::utils::error::Error::Internal(
241            "No query language enabled".to_string(),
242        ))
243    }
244
245    /// Executes a GQL query.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if no query language is enabled.
250    #[cfg(not(any(feature = "gql", feature = "cypher")))]
251    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
252        Err(grafeo_common::utils::error::Error::Internal(
253            "No query language enabled".to_string(),
254        ))
255    }
256
257    /// Executes a Cypher query.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the query fails to parse or execute.
262    #[cfg(feature = "cypher")]
263    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
264        use crate::query::{
265            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
266            optimizer::Optimizer, processor::QueryLanguage,
267        };
268
269        // Create cache key for this query
270        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
271
272        // Try to get cached optimized plan
273        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
274            cached_plan
275        } else {
276            // Parse and translate the query to a logical plan
277            let logical_plan = cypher_translator::translate(query)?;
278
279            // Semantic validation
280            let mut binder = Binder::new();
281            let _binding_context = binder.bind(&logical_plan)?;
282
283            // Optimize the plan
284            let optimizer = Optimizer::from_store(&self.store);
285            let plan = optimizer.optimize(logical_plan)?;
286
287            // Cache the optimized plan
288            self.query_cache.put_optimized(cache_key, plan.clone());
289
290            plan
291        };
292
293        // Get transaction context for MVCC visibility
294        let (viewing_epoch, tx_id) = self.get_transaction_context();
295
296        // Convert to physical plan with transaction context
297        let planner = Planner::with_context(
298            Arc::clone(&self.store),
299            Arc::clone(&self.tx_manager),
300            tx_id,
301            viewing_epoch,
302        )
303        .with_factorized_execution(self.factorized_execution);
304        let mut physical_plan = planner.plan(&optimized_plan)?;
305
306        // Execute the plan
307        let executor = Executor::with_columns(physical_plan.columns.clone());
308        executor.execute(physical_plan.operator.as_mut())
309    }
310
311    /// Executes a Gremlin query.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the query fails to parse or execute.
316    ///
317    /// # Examples
318    ///
319    /// ```ignore
320    /// use grafeo_engine::GrafeoDB;
321    ///
322    /// let db = GrafeoDB::new_in_memory();
323    /// let session = db.session();
324    ///
325    /// // Create some nodes first
326    /// session.create_node(&["Person"]);
327    ///
328    /// // Query using Gremlin
329    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
330    /// ```
331    #[cfg(feature = "gremlin")]
332    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
333        use crate::query::{
334            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
335        };
336
337        // Parse and translate the query to a logical plan
338        let logical_plan = gremlin_translator::translate(query)?;
339
340        // Semantic validation
341        let mut binder = Binder::new();
342        let _binding_context = binder.bind(&logical_plan)?;
343
344        // Optimize the plan
345        let optimizer = Optimizer::from_store(&self.store);
346        let optimized_plan = optimizer.optimize(logical_plan)?;
347
348        // Get transaction context for MVCC visibility
349        let (viewing_epoch, tx_id) = self.get_transaction_context();
350
351        // Convert to physical plan with transaction context
352        let planner = Planner::with_context(
353            Arc::clone(&self.store),
354            Arc::clone(&self.tx_manager),
355            tx_id,
356            viewing_epoch,
357        )
358        .with_factorized_execution(self.factorized_execution);
359        let mut physical_plan = planner.plan(&optimized_plan)?;
360
361        // Execute the plan
362        let executor = Executor::with_columns(physical_plan.columns.clone());
363        executor.execute(physical_plan.operator.as_mut())
364    }
365
366    /// Executes a Gremlin query with parameters.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if the query fails to parse or execute.
371    #[cfg(feature = "gremlin")]
372    pub fn execute_gremlin_with_params(
373        &self,
374        query: &str,
375        params: std::collections::HashMap<String, Value>,
376    ) -> Result<QueryResult> {
377        use crate::query::processor::{QueryLanguage, QueryProcessor};
378
379        // Get transaction context for MVCC visibility
380        let (viewing_epoch, tx_id) = self.get_transaction_context();
381
382        // Create processor with transaction context
383        let processor =
384            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
385
386        // Apply transaction context if in a transaction
387        let processor = if let Some(tx_id) = tx_id {
388            processor.with_tx_context(viewing_epoch, tx_id)
389        } else {
390            processor
391        };
392
393        processor.process(query, QueryLanguage::Gremlin, Some(&params))
394    }
395
396    /// Executes a GraphQL query against the LPG store.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the query fails to parse or execute.
401    ///
402    /// # Examples
403    ///
404    /// ```ignore
405    /// use grafeo_engine::GrafeoDB;
406    ///
407    /// let db = GrafeoDB::new_in_memory();
408    /// let session = db.session();
409    ///
410    /// // Create some nodes first
411    /// session.create_node(&["User"]);
412    ///
413    /// // Query using GraphQL
414    /// let result = session.execute_graphql("query { user { id name } }")?;
415    /// ```
416    #[cfg(feature = "graphql")]
417    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
418        use crate::query::{
419            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
420        };
421
422        // Parse and translate the query to a logical plan
423        let logical_plan = graphql_translator::translate(query)?;
424
425        // Semantic validation
426        let mut binder = Binder::new();
427        let _binding_context = binder.bind(&logical_plan)?;
428
429        // Optimize the plan
430        let optimizer = Optimizer::from_store(&self.store);
431        let optimized_plan = optimizer.optimize(logical_plan)?;
432
433        // Get transaction context for MVCC visibility
434        let (viewing_epoch, tx_id) = self.get_transaction_context();
435
436        // Convert to physical plan with transaction context
437        let planner = Planner::with_context(
438            Arc::clone(&self.store),
439            Arc::clone(&self.tx_manager),
440            tx_id,
441            viewing_epoch,
442        )
443        .with_factorized_execution(self.factorized_execution);
444        let mut physical_plan = planner.plan(&optimized_plan)?;
445
446        // Execute the plan
447        let executor = Executor::with_columns(physical_plan.columns.clone());
448        executor.execute(physical_plan.operator.as_mut())
449    }
450
451    /// Executes a GraphQL query with parameters.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if the query fails to parse or execute.
456    #[cfg(feature = "graphql")]
457    pub fn execute_graphql_with_params(
458        &self,
459        query: &str,
460        params: std::collections::HashMap<String, Value>,
461    ) -> Result<QueryResult> {
462        use crate::query::processor::{QueryLanguage, QueryProcessor};
463
464        // Get transaction context for MVCC visibility
465        let (viewing_epoch, tx_id) = self.get_transaction_context();
466
467        // Create processor with transaction context
468        let processor =
469            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
470
471        // Apply transaction context if in a transaction
472        let processor = if let Some(tx_id) = tx_id {
473            processor.with_tx_context(viewing_epoch, tx_id)
474        } else {
475            processor
476        };
477
478        processor.process(query, QueryLanguage::GraphQL, Some(&params))
479    }
480
481    /// Executes a SPARQL query.
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if the query fails to parse or execute.
486    #[cfg(all(feature = "sparql", feature = "rdf"))]
487    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
488        use crate::query::{
489            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
490        };
491
492        // Parse and translate the SPARQL query to a logical plan
493        let logical_plan = sparql_translator::translate(query)?;
494
495        // Optimize the plan
496        let optimizer = Optimizer::from_store(&self.store);
497        let optimized_plan = optimizer.optimize(logical_plan)?;
498
499        // Convert to physical plan using RDF planner
500        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
501        let mut physical_plan = planner.plan(&optimized_plan)?;
502
503        // Execute the plan
504        let executor = Executor::with_columns(physical_plan.columns.clone());
505        executor.execute(physical_plan.operator.as_mut())
506    }
507
508    /// Executes a SPARQL query with parameters.
509    ///
510    /// # Errors
511    ///
512    /// Returns an error if the query fails to parse or execute.
513    #[cfg(all(feature = "sparql", feature = "rdf"))]
514    pub fn execute_sparql_with_params(
515        &self,
516        query: &str,
517        _params: std::collections::HashMap<String, Value>,
518    ) -> Result<QueryResult> {
519        // TODO: Implement parameter substitution for SPARQL
520        // For now, just execute the query without parameters
521        self.execute_sparql(query)
522    }
523
524    /// Begins a new transaction.
525    ///
526    /// # Errors
527    ///
528    /// Returns an error if a transaction is already active.
529    ///
530    /// # Examples
531    ///
532    /// ```ignore
533    /// use grafeo_engine::GrafeoDB;
534    ///
535    /// let db = GrafeoDB::new_in_memory();
536    /// let mut session = db.session();
537    ///
538    /// session.begin_tx()?;
539    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
540    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
541    /// session.commit()?; // Both inserts committed atomically
542    /// ```
543    pub fn begin_tx(&mut self) -> Result<()> {
544        if self.current_tx.is_some() {
545            return Err(grafeo_common::utils::error::Error::Transaction(
546                grafeo_common::utils::error::TransactionError::InvalidState(
547                    "Transaction already active".to_string(),
548                ),
549            ));
550        }
551
552        let tx_id = self.tx_manager.begin();
553        self.current_tx = Some(tx_id);
554        Ok(())
555    }
556
557    /// Begins a transaction with a specific isolation level.
558    ///
559    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
560    ///
561    /// # Errors
562    ///
563    /// Returns an error if a transaction is already active.
564    pub fn begin_tx_with_isolation(
565        &mut self,
566        isolation_level: crate::transaction::IsolationLevel,
567    ) -> Result<()> {
568        if self.current_tx.is_some() {
569            return Err(grafeo_common::utils::error::Error::Transaction(
570                grafeo_common::utils::error::TransactionError::InvalidState(
571                    "Transaction already active".to_string(),
572                ),
573            ));
574        }
575
576        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
577        self.current_tx = Some(tx_id);
578        Ok(())
579    }
580
581    /// Commits the current transaction.
582    ///
583    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
584    ///
585    /// # Errors
586    ///
587    /// Returns an error if no transaction is active.
588    pub fn commit(&mut self) -> Result<()> {
589        let tx_id = self.current_tx.take().ok_or_else(|| {
590            grafeo_common::utils::error::Error::Transaction(
591                grafeo_common::utils::error::TransactionError::InvalidState(
592                    "No active transaction".to_string(),
593                ),
594            )
595        })?;
596
597        // Commit RDF store pending operations
598        #[cfg(feature = "rdf")]
599        self.rdf_store.commit_tx(tx_id);
600
601        self.tx_manager.commit(tx_id).map(|_| ())
602    }
603
604    /// Aborts the current transaction.
605    ///
606    /// Discards all changes since [`begin_tx`](Self::begin_tx).
607    ///
608    /// # Errors
609    ///
610    /// Returns an error if no transaction is active.
611    ///
612    /// # Examples
613    ///
614    /// ```ignore
615    /// use grafeo_engine::GrafeoDB;
616    ///
617    /// let db = GrafeoDB::new_in_memory();
618    /// let mut session = db.session();
619    ///
620    /// session.begin_tx()?;
621    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
622    /// session.rollback()?; // Insert is discarded
623    /// ```
624    pub fn rollback(&mut self) -> Result<()> {
625        let tx_id = self.current_tx.take().ok_or_else(|| {
626            grafeo_common::utils::error::Error::Transaction(
627                grafeo_common::utils::error::TransactionError::InvalidState(
628                    "No active transaction".to_string(),
629                ),
630            )
631        })?;
632
633        // Discard uncommitted versions in the LPG store
634        self.store.discard_uncommitted_versions(tx_id);
635
636        // Discard pending operations in the RDF store
637        #[cfg(feature = "rdf")]
638        self.rdf_store.rollback_tx(tx_id);
639
640        // Mark transaction as aborted in the manager
641        self.tx_manager.abort(tx_id)
642    }
643
644    /// Returns whether a transaction is active.
645    #[must_use]
646    pub fn in_transaction(&self) -> bool {
647        self.current_tx.is_some()
648    }
649
650    /// Sets auto-commit mode.
651    pub fn set_auto_commit(&mut self, auto_commit: bool) {
652        self.auto_commit = auto_commit;
653    }
654
655    /// Returns whether auto-commit is enabled.
656    #[must_use]
657    pub fn auto_commit(&self) -> bool {
658        self.auto_commit
659    }
660
661    /// Returns the current transaction context for MVCC visibility.
662    ///
663    /// Returns `(viewing_epoch, tx_id)` where:
664    /// - `viewing_epoch` is the epoch at which to check version visibility
665    /// - `tx_id` is the current transaction ID (if in a transaction)
666    #[must_use]
667    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
668        if let Some(tx_id) = self.current_tx {
669            // In a transaction - use the transaction's start epoch
670            let epoch = self
671                .tx_manager
672                .start_epoch(tx_id)
673                .unwrap_or_else(|| self.tx_manager.current_epoch());
674            (epoch, Some(tx_id))
675        } else {
676            // No transaction - use current epoch
677            (self.tx_manager.current_epoch(), None)
678        }
679    }
680
681    /// Creates a node directly (bypassing query execution).
682    ///
683    /// This is a low-level API for testing and direct manipulation.
684    /// If a transaction is active, the node will be versioned with the transaction ID.
685    pub fn create_node(&self, labels: &[&str]) -> NodeId {
686        let (epoch, tx_id) = self.get_transaction_context();
687        self.store
688            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
689    }
690
691    /// Creates a node with properties.
692    ///
693    /// If a transaction is active, the node will be versioned with the transaction ID.
694    pub fn create_node_with_props<'a>(
695        &self,
696        labels: &[&str],
697        properties: impl IntoIterator<Item = (&'a str, Value)>,
698    ) -> NodeId {
699        let (epoch, tx_id) = self.get_transaction_context();
700        self.store.create_node_with_props_versioned(
701            labels,
702            properties.into_iter().map(|(k, v)| (k, v)),
703            epoch,
704            tx_id.unwrap_or(TxId::SYSTEM),
705        )
706    }
707
708    /// Creates an edge between two nodes.
709    ///
710    /// This is a low-level API for testing and direct manipulation.
711    /// If a transaction is active, the edge will be versioned with the transaction ID.
712    pub fn create_edge(
713        &self,
714        src: NodeId,
715        dst: NodeId,
716        edge_type: &str,
717    ) -> grafeo_common::types::EdgeId {
718        let (epoch, tx_id) = self.get_transaction_context();
719        self.store
720            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
721    }
722
723    // =========================================================================
724    // Direct Lookup APIs (bypass query planning for O(1) point reads)
725    // =========================================================================
726
727    /// Gets a node by ID directly, bypassing query planning.
728    ///
729    /// This is the fastest way to retrieve a single node when you know its ID.
730    /// Skips parsing, binding, optimization, and physical planning entirely.
731    ///
732    /// # Performance
733    ///
734    /// - Time complexity: O(1) average case
735    /// - No lock contention (uses DashMap internally)
736    /// - ~20-30x faster than equivalent MATCH query
737    ///
738    /// # Example
739    ///
740    /// ```ignore
741    /// let session = db.session();
742    /// let node_id = session.create_node(&["Person"]);
743    ///
744    /// // Direct lookup - O(1), no query planning
745    /// let node = session.get_node(node_id);
746    /// assert!(node.is_some());
747    /// ```
748    #[must_use]
749    pub fn get_node(&self, id: NodeId) -> Option<Node> {
750        let (epoch, tx_id) = self.get_transaction_context();
751        self.store
752            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
753    }
754
755    /// Gets a single property from a node by ID, bypassing query planning.
756    ///
757    /// More efficient than `get_node()` when you only need one property,
758    /// as it avoids loading the full node with all properties.
759    ///
760    /// # Performance
761    ///
762    /// - Time complexity: O(1) average case
763    /// - No query planning overhead
764    ///
765    /// # Example
766    ///
767    /// ```ignore
768    /// let session = db.session();
769    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
770    ///
771    /// // Direct property access - O(1)
772    /// let name = session.get_node_property(id, "name");
773    /// assert_eq!(name, Some(Value::String("Alice".into())));
774    /// ```
775    #[must_use]
776    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
777        self.get_node(id)
778            .and_then(|node| node.get_property(key).cloned())
779    }
780
781    /// Gets an edge by ID directly, bypassing query planning.
782    ///
783    /// # Performance
784    ///
785    /// - Time complexity: O(1) average case
786    /// - No lock contention
787    #[must_use]
788    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
789        let (epoch, tx_id) = self.get_transaction_context();
790        self.store
791            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
792    }
793
794    /// Gets outgoing neighbors of a node directly, bypassing query planning.
795    ///
796    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
797    ///
798    /// # Performance
799    ///
800    /// - Time complexity: O(degree) where degree is the number of outgoing edges
801    /// - Uses adjacency index for direct access
802    /// - ~10-20x faster than equivalent MATCH query
803    ///
804    /// # Example
805    ///
806    /// ```ignore
807    /// let session = db.session();
808    /// let alice = session.create_node(&["Person"]);
809    /// let bob = session.create_node(&["Person"]);
810    /// session.create_edge(alice, bob, "KNOWS");
811    ///
812    /// // Direct neighbor lookup - O(degree)
813    /// let neighbors = session.get_neighbors_outgoing(alice);
814    /// assert_eq!(neighbors.len(), 1);
815    /// assert_eq!(neighbors[0].0, bob);
816    /// ```
817    #[must_use]
818    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
819        self.store.edges_from(node, Direction::Outgoing).collect()
820    }
821
822    /// Gets incoming neighbors of a node directly, bypassing query planning.
823    ///
824    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
825    ///
826    /// # Performance
827    ///
828    /// - Time complexity: O(degree) where degree is the number of incoming edges
829    /// - Uses backward adjacency index for direct access
830    #[must_use]
831    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
832        self.store.edges_from(node, Direction::Incoming).collect()
833    }
834
835    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
836    ///
837    /// # Example
838    ///
839    /// ```ignore
840    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
841    /// ```
842    #[must_use]
843    pub fn get_neighbors_outgoing_by_type(
844        &self,
845        node: NodeId,
846        edge_type: &str,
847    ) -> Vec<(NodeId, EdgeId)> {
848        self.store
849            .edges_from(node, Direction::Outgoing)
850            .filter(|(_, edge_id)| {
851                self.get_edge(*edge_id)
852                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
853            })
854            .collect()
855    }
856
857    /// Checks if a node exists, bypassing query planning.
858    ///
859    /// # Performance
860    ///
861    /// - Time complexity: O(1)
862    /// - Fastest existence check available
863    #[must_use]
864    pub fn node_exists(&self, id: NodeId) -> bool {
865        self.get_node(id).is_some()
866    }
867
868    /// Checks if an edge exists, bypassing query planning.
869    #[must_use]
870    pub fn edge_exists(&self, id: EdgeId) -> bool {
871        self.get_edge(id).is_some()
872    }
873
874    /// Gets the degree (number of edges) of a node.
875    ///
876    /// Returns (outgoing_degree, incoming_degree).
877    #[must_use]
878    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
879        let out = self.store.out_degree(node);
880        let in_degree = self.store.in_degree(node);
881        (out, in_degree)
882    }
883
884    /// Batch lookup of multiple nodes by ID.
885    ///
886    /// More efficient than calling `get_node()` in a loop because it
887    /// amortizes overhead.
888    ///
889    /// # Performance
890    ///
891    /// - Time complexity: O(n) where n is the number of IDs
892    /// - Better cache utilization than individual lookups
893    #[must_use]
894    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
895        let (epoch, tx_id) = self.get_transaction_context();
896        let tx = tx_id.unwrap_or(TxId::SYSTEM);
897        ids.iter()
898            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
899            .collect()
900    }
901}
902
903#[cfg(test)]
904mod tests {
905    use crate::database::GrafeoDB;
906
907    #[test]
908    fn test_session_create_node() {
909        let db = GrafeoDB::new_in_memory();
910        let session = db.session();
911
912        let id = session.create_node(&["Person"]);
913        assert!(id.is_valid());
914        assert_eq!(db.node_count(), 1);
915    }
916
917    #[test]
918    fn test_session_transaction() {
919        let db = GrafeoDB::new_in_memory();
920        let mut session = db.session();
921
922        assert!(!session.in_transaction());
923
924        session.begin_tx().unwrap();
925        assert!(session.in_transaction());
926
927        session.commit().unwrap();
928        assert!(!session.in_transaction());
929    }
930
931    #[test]
932    fn test_session_transaction_context() {
933        let db = GrafeoDB::new_in_memory();
934        let mut session = db.session();
935
936        // Without transaction - context should have current epoch and no tx_id
937        let (_epoch1, tx_id1) = session.get_transaction_context();
938        assert!(tx_id1.is_none());
939
940        // Start a transaction
941        session.begin_tx().unwrap();
942        let (epoch2, tx_id2) = session.get_transaction_context();
943        assert!(tx_id2.is_some());
944        // Transaction should have a valid epoch
945        let _ = epoch2; // Use the variable
946
947        // Commit and verify
948        session.commit().unwrap();
949        let (epoch3, tx_id3) = session.get_transaction_context();
950        assert!(tx_id3.is_none());
951        // Epoch should have advanced after commit
952        assert!(epoch3.as_u64() >= epoch2.as_u64());
953    }
954
955    #[test]
956    fn test_session_rollback() {
957        let db = GrafeoDB::new_in_memory();
958        let mut session = db.session();
959
960        session.begin_tx().unwrap();
961        session.rollback().unwrap();
962        assert!(!session.in_transaction());
963    }
964
965    #[test]
966    fn test_session_rollback_discards_versions() {
967        use grafeo_common::types::TxId;
968
969        let db = GrafeoDB::new_in_memory();
970
971        // Create a node outside of any transaction (at system level)
972        let node_before = db.store().create_node(&["Person"]);
973        assert!(node_before.is_valid());
974        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
975
976        // Start a transaction
977        let mut session = db.session();
978        session.begin_tx().unwrap();
979        let tx_id = session.current_tx.unwrap();
980
981        // Create a node versioned with the transaction's ID
982        let epoch = db.store().current_epoch();
983        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
984        assert!(node_in_tx.is_valid());
985
986        // Should see 2 nodes at this point
987        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
988
989        // Rollback the transaction
990        session.rollback().unwrap();
991        assert!(!session.in_transaction());
992
993        // The node created in the transaction should be discarded
994        // Only the first node should remain visible
995        let count_after = db.node_count();
996        assert_eq!(
997            count_after, 1,
998            "Rollback should discard uncommitted node, but got {count_after}"
999        );
1000
1001        // The original node should still be accessible
1002        let current_epoch = db.store().current_epoch();
1003        assert!(
1004            db.store()
1005                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1006                .is_some(),
1007            "Original node should still exist"
1008        );
1009
1010        // The node created in the transaction should not be accessible
1011        assert!(
1012            db.store()
1013                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1014                .is_none(),
1015            "Transaction node should be gone"
1016        );
1017    }
1018
1019    #[test]
1020    fn test_session_create_node_in_transaction() {
1021        // Test that session.create_node() is transaction-aware
1022        let db = GrafeoDB::new_in_memory();
1023
1024        // Create a node outside of any transaction
1025        let node_before = db.create_node(&["Person"]);
1026        assert!(node_before.is_valid());
1027        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1028
1029        // Start a transaction and create a node through the session
1030        let mut session = db.session();
1031        session.begin_tx().unwrap();
1032
1033        // Create a node through session.create_node() - should be versioned with tx
1034        let node_in_tx = session.create_node(&["Person"]);
1035        assert!(node_in_tx.is_valid());
1036
1037        // Should see 2 nodes at this point
1038        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1039
1040        // Rollback the transaction
1041        session.rollback().unwrap();
1042
1043        // The node created via session.create_node() should be discarded
1044        let count_after = db.node_count();
1045        assert_eq!(
1046            count_after, 1,
1047            "Rollback should discard node created via session.create_node(), but got {count_after}"
1048        );
1049    }
1050
1051    #[test]
1052    fn test_session_create_node_with_props_in_transaction() {
1053        use grafeo_common::types::Value;
1054
1055        // Test that session.create_node_with_props() is transaction-aware
1056        let db = GrafeoDB::new_in_memory();
1057
1058        // Create a node outside of any transaction
1059        db.create_node(&["Person"]);
1060        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1061
1062        // Start a transaction and create a node with properties
1063        let mut session = db.session();
1064        session.begin_tx().unwrap();
1065
1066        let node_in_tx =
1067            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1068        assert!(node_in_tx.is_valid());
1069
1070        // Should see 2 nodes
1071        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1072
1073        // Rollback the transaction
1074        session.rollback().unwrap();
1075
1076        // The node should be discarded
1077        let count_after = db.node_count();
1078        assert_eq!(
1079            count_after, 1,
1080            "Rollback should discard node created via session.create_node_with_props()"
1081        );
1082    }
1083
1084    #[cfg(feature = "gql")]
1085    mod gql_tests {
1086        use super::*;
1087
1088        #[test]
1089        fn test_gql_query_execution() {
1090            let db = GrafeoDB::new_in_memory();
1091            let session = db.session();
1092
1093            // Create some test data
1094            session.create_node(&["Person"]);
1095            session.create_node(&["Person"]);
1096            session.create_node(&["Animal"]);
1097
1098            // Execute a GQL query
1099            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1100
1101            // Should return 2 Person nodes
1102            assert_eq!(result.row_count(), 2);
1103            assert_eq!(result.column_count(), 1);
1104            assert_eq!(result.columns[0], "n");
1105        }
1106
1107        #[test]
1108        fn test_gql_empty_result() {
1109            let db = GrafeoDB::new_in_memory();
1110            let session = db.session();
1111
1112            // No data in database
1113            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1114
1115            assert_eq!(result.row_count(), 0);
1116        }
1117
1118        #[test]
1119        fn test_gql_parse_error() {
1120            let db = GrafeoDB::new_in_memory();
1121            let session = db.session();
1122
1123            // Invalid GQL syntax
1124            let result = session.execute("MATCH (n RETURN n");
1125
1126            assert!(result.is_err());
1127        }
1128
1129        #[test]
1130        fn test_gql_relationship_traversal() {
1131            let db = GrafeoDB::new_in_memory();
1132            let session = db.session();
1133
1134            // Create a graph: Alice -> Bob, Alice -> Charlie
1135            let alice = session.create_node(&["Person"]);
1136            let bob = session.create_node(&["Person"]);
1137            let charlie = session.create_node(&["Person"]);
1138
1139            session.create_edge(alice, bob, "KNOWS");
1140            session.create_edge(alice, charlie, "KNOWS");
1141
1142            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1143            let result = session
1144                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1145                .unwrap();
1146
1147            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1148            assert_eq!(result.row_count(), 2);
1149            assert_eq!(result.column_count(), 2);
1150            assert_eq!(result.columns[0], "a");
1151            assert_eq!(result.columns[1], "b");
1152        }
1153
1154        #[test]
1155        fn test_gql_relationship_with_type_filter() {
1156            let db = GrafeoDB::new_in_memory();
1157            let session = db.session();
1158
1159            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1160            let alice = session.create_node(&["Person"]);
1161            let bob = session.create_node(&["Person"]);
1162            let charlie = session.create_node(&["Person"]);
1163
1164            session.create_edge(alice, bob, "KNOWS");
1165            session.create_edge(alice, charlie, "WORKS_WITH");
1166
1167            // Query only KNOWS relationships
1168            let result = session
1169                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1170                .unwrap();
1171
1172            // Should return only 1 row (Alice->Bob)
1173            assert_eq!(result.row_count(), 1);
1174        }
1175
1176        #[test]
1177        fn test_gql_semantic_error_undefined_variable() {
1178            let db = GrafeoDB::new_in_memory();
1179            let session = db.session();
1180
1181            // Reference undefined variable 'x' in RETURN
1182            let result = session.execute("MATCH (n:Person) RETURN x");
1183
1184            // Should fail with semantic error
1185            assert!(result.is_err());
1186            let err = match result {
1187                Err(e) => e,
1188                Ok(_) => panic!("Expected error"),
1189            };
1190            assert!(
1191                err.to_string().contains("Undefined variable"),
1192                "Expected undefined variable error, got: {}",
1193                err
1194            );
1195        }
1196
1197        #[test]
1198        fn test_gql_where_clause_property_filter() {
1199            use grafeo_common::types::Value;
1200
1201            let db = GrafeoDB::new_in_memory();
1202            let session = db.session();
1203
1204            // Create people with ages
1205            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1206            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1207            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1208
1209            // Query with WHERE clause: age > 30
1210            let result = session
1211                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1212                .unwrap();
1213
1214            // Should return 2 people (ages 35 and 45)
1215            assert_eq!(result.row_count(), 2);
1216        }
1217
1218        #[test]
1219        fn test_gql_where_clause_equality() {
1220            use grafeo_common::types::Value;
1221
1222            let db = GrafeoDB::new_in_memory();
1223            let session = db.session();
1224
1225            // Create people with names
1226            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1227            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1228            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1229
1230            // Query with WHERE clause: name = "Alice"
1231            let result = session
1232                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1233                .unwrap();
1234
1235            // Should return 2 people named Alice
1236            assert_eq!(result.row_count(), 2);
1237        }
1238
1239        #[test]
1240        fn test_gql_return_property_access() {
1241            use grafeo_common::types::Value;
1242
1243            let db = GrafeoDB::new_in_memory();
1244            let session = db.session();
1245
1246            // Create people with names and ages
1247            session.create_node_with_props(
1248                &["Person"],
1249                [
1250                    ("name", Value::String("Alice".into())),
1251                    ("age", Value::Int64(30)),
1252                ],
1253            );
1254            session.create_node_with_props(
1255                &["Person"],
1256                [
1257                    ("name", Value::String("Bob".into())),
1258                    ("age", Value::Int64(25)),
1259                ],
1260            );
1261
1262            // Query returning properties
1263            let result = session
1264                .execute("MATCH (n:Person) RETURN n.name, n.age")
1265                .unwrap();
1266
1267            // Should return 2 rows with name and age columns
1268            assert_eq!(result.row_count(), 2);
1269            assert_eq!(result.column_count(), 2);
1270            assert_eq!(result.columns[0], "n.name");
1271            assert_eq!(result.columns[1], "n.age");
1272
1273            // Check that we get actual values
1274            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1275            assert!(names.contains(&&Value::String("Alice".into())));
1276            assert!(names.contains(&&Value::String("Bob".into())));
1277        }
1278
1279        #[test]
1280        fn test_gql_return_mixed_expressions() {
1281            use grafeo_common::types::Value;
1282
1283            let db = GrafeoDB::new_in_memory();
1284            let session = db.session();
1285
1286            // Create a person
1287            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1288
1289            // Query returning both node and property
1290            let result = session
1291                .execute("MATCH (n:Person) RETURN n, n.name")
1292                .unwrap();
1293
1294            assert_eq!(result.row_count(), 1);
1295            assert_eq!(result.column_count(), 2);
1296            assert_eq!(result.columns[0], "n");
1297            assert_eq!(result.columns[1], "n.name");
1298
1299            // Second column should be the name
1300            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1301        }
1302    }
1303
1304    #[cfg(feature = "cypher")]
1305    mod cypher_tests {
1306        use super::*;
1307
1308        #[test]
1309        fn test_cypher_query_execution() {
1310            let db = GrafeoDB::new_in_memory();
1311            let session = db.session();
1312
1313            // Create some test data
1314            session.create_node(&["Person"]);
1315            session.create_node(&["Person"]);
1316            session.create_node(&["Animal"]);
1317
1318            // Execute a Cypher query
1319            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1320
1321            // Should return 2 Person nodes
1322            assert_eq!(result.row_count(), 2);
1323            assert_eq!(result.column_count(), 1);
1324            assert_eq!(result.columns[0], "n");
1325        }
1326
1327        #[test]
1328        fn test_cypher_empty_result() {
1329            let db = GrafeoDB::new_in_memory();
1330            let session = db.session();
1331
1332            // No data in database
1333            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1334
1335            assert_eq!(result.row_count(), 0);
1336        }
1337
1338        #[test]
1339        fn test_cypher_parse_error() {
1340            let db = GrafeoDB::new_in_memory();
1341            let session = db.session();
1342
1343            // Invalid Cypher syntax
1344            let result = session.execute_cypher("MATCH (n RETURN n");
1345
1346            assert!(result.is_err());
1347        }
1348    }
1349
1350    // ==================== Direct Lookup API Tests ====================
1351
1352    mod direct_lookup_tests {
1353        use super::*;
1354        use grafeo_common::types::Value;
1355
1356        #[test]
1357        fn test_get_node() {
1358            let db = GrafeoDB::new_in_memory();
1359            let session = db.session();
1360
1361            let id = session.create_node(&["Person"]);
1362            let node = session.get_node(id);
1363
1364            assert!(node.is_some());
1365            let node = node.unwrap();
1366            assert_eq!(node.id, id);
1367        }
1368
1369        #[test]
1370        fn test_get_node_not_found() {
1371            use grafeo_common::types::NodeId;
1372
1373            let db = GrafeoDB::new_in_memory();
1374            let session = db.session();
1375
1376            // Try to get a non-existent node
1377            let node = session.get_node(NodeId::new(9999));
1378            assert!(node.is_none());
1379        }
1380
1381        #[test]
1382        fn test_get_node_property() {
1383            let db = GrafeoDB::new_in_memory();
1384            let session = db.session();
1385
1386            let id = session
1387                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1388
1389            let name = session.get_node_property(id, "name");
1390            assert_eq!(name, Some(Value::String("Alice".into())));
1391
1392            // Non-existent property
1393            let missing = session.get_node_property(id, "missing");
1394            assert!(missing.is_none());
1395        }
1396
1397        #[test]
1398        fn test_get_edge() {
1399            let db = GrafeoDB::new_in_memory();
1400            let session = db.session();
1401
1402            let alice = session.create_node(&["Person"]);
1403            let bob = session.create_node(&["Person"]);
1404            let edge_id = session.create_edge(alice, bob, "KNOWS");
1405
1406            let edge = session.get_edge(edge_id);
1407            assert!(edge.is_some());
1408            let edge = edge.unwrap();
1409            assert_eq!(edge.id, edge_id);
1410            assert_eq!(edge.src, alice);
1411            assert_eq!(edge.dst, bob);
1412        }
1413
1414        #[test]
1415        fn test_get_edge_not_found() {
1416            use grafeo_common::types::EdgeId;
1417
1418            let db = GrafeoDB::new_in_memory();
1419            let session = db.session();
1420
1421            let edge = session.get_edge(EdgeId::new(9999));
1422            assert!(edge.is_none());
1423        }
1424
1425        #[test]
1426        fn test_get_neighbors_outgoing() {
1427            let db = GrafeoDB::new_in_memory();
1428            let session = db.session();
1429
1430            let alice = session.create_node(&["Person"]);
1431            let bob = session.create_node(&["Person"]);
1432            let carol = session.create_node(&["Person"]);
1433
1434            session.create_edge(alice, bob, "KNOWS");
1435            session.create_edge(alice, carol, "KNOWS");
1436
1437            let neighbors = session.get_neighbors_outgoing(alice);
1438            assert_eq!(neighbors.len(), 2);
1439
1440            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1441            assert!(neighbor_ids.contains(&bob));
1442            assert!(neighbor_ids.contains(&carol));
1443        }
1444
1445        #[test]
1446        fn test_get_neighbors_incoming() {
1447            let db = GrafeoDB::new_in_memory();
1448            let session = db.session();
1449
1450            let alice = session.create_node(&["Person"]);
1451            let bob = session.create_node(&["Person"]);
1452            let carol = session.create_node(&["Person"]);
1453
1454            session.create_edge(bob, alice, "KNOWS");
1455            session.create_edge(carol, alice, "KNOWS");
1456
1457            let neighbors = session.get_neighbors_incoming(alice);
1458            assert_eq!(neighbors.len(), 2);
1459
1460            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1461            assert!(neighbor_ids.contains(&bob));
1462            assert!(neighbor_ids.contains(&carol));
1463        }
1464
1465        #[test]
1466        fn test_get_neighbors_outgoing_by_type() {
1467            let db = GrafeoDB::new_in_memory();
1468            let session = db.session();
1469
1470            let alice = session.create_node(&["Person"]);
1471            let bob = session.create_node(&["Person"]);
1472            let company = session.create_node(&["Company"]);
1473
1474            session.create_edge(alice, bob, "KNOWS");
1475            session.create_edge(alice, company, "WORKS_AT");
1476
1477            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1478            assert_eq!(knows_neighbors.len(), 1);
1479            assert_eq!(knows_neighbors[0].0, bob);
1480
1481            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1482            assert_eq!(works_neighbors.len(), 1);
1483            assert_eq!(works_neighbors[0].0, company);
1484
1485            // No edges of this type
1486            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1487            assert!(no_neighbors.is_empty());
1488        }
1489
1490        #[test]
1491        fn test_node_exists() {
1492            use grafeo_common::types::NodeId;
1493
1494            let db = GrafeoDB::new_in_memory();
1495            let session = db.session();
1496
1497            let id = session.create_node(&["Person"]);
1498
1499            assert!(session.node_exists(id));
1500            assert!(!session.node_exists(NodeId::new(9999)));
1501        }
1502
1503        #[test]
1504        fn test_edge_exists() {
1505            use grafeo_common::types::EdgeId;
1506
1507            let db = GrafeoDB::new_in_memory();
1508            let session = db.session();
1509
1510            let alice = session.create_node(&["Person"]);
1511            let bob = session.create_node(&["Person"]);
1512            let edge_id = session.create_edge(alice, bob, "KNOWS");
1513
1514            assert!(session.edge_exists(edge_id));
1515            assert!(!session.edge_exists(EdgeId::new(9999)));
1516        }
1517
1518        #[test]
1519        fn test_get_degree() {
1520            let db = GrafeoDB::new_in_memory();
1521            let session = db.session();
1522
1523            let alice = session.create_node(&["Person"]);
1524            let bob = session.create_node(&["Person"]);
1525            let carol = session.create_node(&["Person"]);
1526
1527            // Alice knows Bob and Carol (2 outgoing)
1528            session.create_edge(alice, bob, "KNOWS");
1529            session.create_edge(alice, carol, "KNOWS");
1530            // Bob knows Alice (1 incoming for Alice)
1531            session.create_edge(bob, alice, "KNOWS");
1532
1533            let (out_degree, in_degree) = session.get_degree(alice);
1534            assert_eq!(out_degree, 2);
1535            assert_eq!(in_degree, 1);
1536
1537            // Node with no edges
1538            let lonely = session.create_node(&["Person"]);
1539            let (out, in_deg) = session.get_degree(lonely);
1540            assert_eq!(out, 0);
1541            assert_eq!(in_deg, 0);
1542        }
1543
1544        #[test]
1545        fn test_get_nodes_batch() {
1546            let db = GrafeoDB::new_in_memory();
1547            let session = db.session();
1548
1549            let alice = session.create_node(&["Person"]);
1550            let bob = session.create_node(&["Person"]);
1551            let carol = session.create_node(&["Person"]);
1552
1553            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1554            assert_eq!(nodes.len(), 3);
1555            assert!(nodes[0].is_some());
1556            assert!(nodes[1].is_some());
1557            assert!(nodes[2].is_some());
1558
1559            // With non-existent node
1560            use grafeo_common::types::NodeId;
1561            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1562            assert_eq!(nodes_with_missing.len(), 3);
1563            assert!(nodes_with_missing[0].is_some());
1564            assert!(nodes_with_missing[1].is_none()); // Missing node
1565            assert!(nodes_with_missing[2].is_some());
1566        }
1567
1568        #[test]
1569        fn test_auto_commit_setting() {
1570            let db = GrafeoDB::new_in_memory();
1571            let mut session = db.session();
1572
1573            // Default is auto-commit enabled
1574            assert!(session.auto_commit());
1575
1576            session.set_auto_commit(false);
1577            assert!(!session.auto_commit());
1578
1579            session.set_auto_commit(true);
1580            assert!(session.auto_commit());
1581        }
1582
1583        #[test]
1584        fn test_transaction_double_begin_error() {
1585            let db = GrafeoDB::new_in_memory();
1586            let mut session = db.session();
1587
1588            session.begin_tx().unwrap();
1589            let result = session.begin_tx();
1590
1591            assert!(result.is_err());
1592            // Clean up
1593            session.rollback().unwrap();
1594        }
1595
1596        #[test]
1597        fn test_commit_without_transaction_error() {
1598            let db = GrafeoDB::new_in_memory();
1599            let mut session = db.session();
1600
1601            let result = session.commit();
1602            assert!(result.is_err());
1603        }
1604
1605        #[test]
1606        fn test_rollback_without_transaction_error() {
1607            let db = GrafeoDB::new_in_memory();
1608            let mut session = db.session();
1609
1610            let result = session.rollback();
1611            assert!(result.is_err());
1612        }
1613
1614        #[test]
1615        fn test_create_edge_in_transaction() {
1616            let db = GrafeoDB::new_in_memory();
1617            let mut session = db.session();
1618
1619            // Create nodes outside transaction
1620            let alice = session.create_node(&["Person"]);
1621            let bob = session.create_node(&["Person"]);
1622
1623            // Create edge in transaction
1624            session.begin_tx().unwrap();
1625            let edge_id = session.create_edge(alice, bob, "KNOWS");
1626
1627            // Edge should be visible in the transaction
1628            assert!(session.edge_exists(edge_id));
1629
1630            // Commit
1631            session.commit().unwrap();
1632
1633            // Edge should still be visible
1634            assert!(session.edge_exists(edge_id));
1635        }
1636
1637        #[test]
1638        fn test_neighbors_empty_node() {
1639            let db = GrafeoDB::new_in_memory();
1640            let session = db.session();
1641
1642            let lonely = session.create_node(&["Person"]);
1643
1644            assert!(session.get_neighbors_outgoing(lonely).is_empty());
1645            assert!(session.get_neighbors_incoming(lonely).is_empty());
1646            assert!(
1647                session
1648                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1649                    .is_empty()
1650            );
1651        }
1652    }
1653}