Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8
9use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
10use grafeo_common::utils::error::Result;
11use grafeo_core::graph::Direction;
12use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
13#[cfg(feature = "rdf")]
14use grafeo_core::graph::rdf::RdfStore;
15
16use crate::config::{AdaptiveConfig, GraphModel};
17use crate::database::QueryResult;
18use crate::query::cache::QueryCache;
19use crate::transaction::TransactionManager;
20
21/// Your handle to the database - execute queries and manage transactions.
22///
23/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
24/// tracks its own transaction state, so you can have multiple concurrent
25/// sessions without them interfering.
26pub struct Session {
27    /// The underlying store.
28    store: Arc<LpgStore>,
29    /// RDF triple store (if RDF feature is enabled).
30    #[cfg(feature = "rdf")]
31    #[allow(dead_code)]
32    rdf_store: Arc<RdfStore>,
33    /// Transaction manager.
34    tx_manager: Arc<TransactionManager>,
35    /// Query cache shared across sessions.
36    query_cache: Arc<QueryCache>,
37    /// Current transaction ID (if any).
38    current_tx: Option<TxId>,
39    /// Whether the session is in auto-commit mode.
40    auto_commit: bool,
41    /// Adaptive execution configuration.
42    #[allow(dead_code)]
43    adaptive_config: AdaptiveConfig,
44    /// Whether to use factorized execution for multi-hop queries.
45    factorized_execution: bool,
46    /// The graph data model this session operates on.
47    graph_model: GraphModel,
48}
49
50impl Session {
51    /// Creates a new session.
52    #[allow(dead_code)]
53    pub(crate) fn new(
54        store: Arc<LpgStore>,
55        tx_manager: Arc<TransactionManager>,
56        query_cache: Arc<QueryCache>,
57    ) -> Self {
58        Self {
59            store,
60            #[cfg(feature = "rdf")]
61            rdf_store: Arc::new(RdfStore::new()),
62            tx_manager,
63            query_cache,
64            current_tx: None,
65            auto_commit: true,
66            adaptive_config: AdaptiveConfig::default(),
67            factorized_execution: true,
68            graph_model: GraphModel::Lpg,
69        }
70    }
71
72    /// Creates a new session with adaptive execution configuration.
73    #[allow(dead_code)]
74    pub(crate) fn with_adaptive(
75        store: Arc<LpgStore>,
76        tx_manager: Arc<TransactionManager>,
77        query_cache: Arc<QueryCache>,
78        adaptive_config: AdaptiveConfig,
79        factorized_execution: bool,
80        graph_model: GraphModel,
81    ) -> Self {
82        Self {
83            store,
84            #[cfg(feature = "rdf")]
85            rdf_store: Arc::new(RdfStore::new()),
86            tx_manager,
87            query_cache,
88            current_tx: None,
89            auto_commit: true,
90            adaptive_config,
91            factorized_execution,
92            graph_model,
93        }
94    }
95
96    /// Creates a new session with RDF store and adaptive configuration.
97    #[cfg(feature = "rdf")]
98    pub(crate) fn with_rdf_store_and_adaptive(
99        store: Arc<LpgStore>,
100        rdf_store: Arc<RdfStore>,
101        tx_manager: Arc<TransactionManager>,
102        query_cache: Arc<QueryCache>,
103        adaptive_config: AdaptiveConfig,
104        factorized_execution: bool,
105        graph_model: GraphModel,
106    ) -> Self {
107        Self {
108            store,
109            rdf_store,
110            tx_manager,
111            query_cache,
112            current_tx: None,
113            auto_commit: true,
114            adaptive_config,
115            factorized_execution,
116            graph_model,
117        }
118    }
119
120    /// Returns the graph model this session operates on.
121    #[must_use]
122    pub fn graph_model(&self) -> GraphModel {
123        self.graph_model
124    }
125
126    /// Checks that the session's graph model supports LPG operations.
127    fn require_lpg(&self, language: &str) -> Result<()> {
128        if self.graph_model == GraphModel::Rdf {
129            return Err(grafeo_common::utils::error::Error::Internal(format!(
130                "This is an RDF database. {language} queries require an LPG database."
131            )));
132        }
133        Ok(())
134    }
135
136    /// Executes a GQL query.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the query fails to parse or execute.
141    ///
142    /// # Examples
143    ///
144    /// ```ignore
145    /// use grafeo_engine::GrafeoDB;
146    ///
147    /// let db = GrafeoDB::new_in_memory();
148    /// let session = db.session();
149    ///
150    /// // Create a node
151    /// session.execute("INSERT (:Person {name: 'Alice', age: 30})")?;
152    ///
153    /// // Query nodes
154    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
155    /// for row in result {
156    ///     println!("{:?}", row);
157    /// }
158    /// ```
159    #[cfg(feature = "gql")]
160    pub fn execute(&self, query: &str) -> Result<QueryResult> {
161        self.require_lpg("GQL")?;
162
163        use crate::query::{
164            Executor, Planner, binder::Binder, cache::CacheKey, gql_translator,
165            optimizer::Optimizer, processor::QueryLanguage,
166        };
167
168        let start_time = std::time::Instant::now();
169
170        // Create cache key for this query
171        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
172
173        // Try to get cached optimized plan
174        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
175            // Cache hit - skip parsing, translation, binding, and optimization
176            cached_plan
177        } else {
178            // Cache miss - run full pipeline
179
180            // Parse and translate the query to a logical plan
181            let logical_plan = gql_translator::translate(query)?;
182
183            // Semantic validation
184            let mut binder = Binder::new();
185            let _binding_context = binder.bind(&logical_plan)?;
186
187            // Optimize the plan
188            let optimizer = Optimizer::from_store(&self.store);
189            let plan = optimizer.optimize(logical_plan)?;
190
191            // Cache the optimized plan for future use
192            self.query_cache.put_optimized(cache_key, plan.clone());
193
194            plan
195        };
196
197        // Get transaction context for MVCC visibility
198        let (viewing_epoch, tx_id) = self.get_transaction_context();
199
200        // Convert to physical plan with transaction context
201        // (Physical planning cannot be cached as it depends on transaction state)
202        let planner = Planner::with_context(
203            Arc::clone(&self.store),
204            Arc::clone(&self.tx_manager),
205            tx_id,
206            viewing_epoch,
207        )
208        .with_factorized_execution(self.factorized_execution);
209        let mut physical_plan = planner.plan(&optimized_plan)?;
210
211        // Execute the plan
212        let executor = Executor::with_columns(physical_plan.columns.clone());
213        let mut result = executor.execute(physical_plan.operator.as_mut())?;
214
215        // Add execution metrics
216        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
217        let rows_scanned = result.rows.len() as u64;
218        result.execution_time_ms = Some(elapsed_ms);
219        result.rows_scanned = Some(rows_scanned);
220
221        Ok(result)
222    }
223
224    /// Executes a GQL query with parameters.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if the query fails to parse or execute.
229    #[cfg(feature = "gql")]
230    pub fn execute_with_params(
231        &self,
232        query: &str,
233        params: std::collections::HashMap<String, Value>,
234    ) -> Result<QueryResult> {
235        self.require_lpg("GQL")?;
236
237        use crate::query::processor::{QueryLanguage, QueryProcessor};
238
239        // Get transaction context for MVCC visibility
240        let (viewing_epoch, tx_id) = self.get_transaction_context();
241
242        // Create processor with transaction context
243        let processor =
244            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
245
246        // Apply transaction context if in a transaction
247        let processor = if let Some(tx_id) = tx_id {
248            processor.with_tx_context(viewing_epoch, tx_id)
249        } else {
250            processor
251        };
252
253        processor.process(query, QueryLanguage::Gql, Some(&params))
254    }
255
256    /// Executes a GQL query with parameters.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if no query language is enabled.
261    #[cfg(not(any(feature = "gql", feature = "cypher")))]
262    pub fn execute_with_params(
263        &self,
264        _query: &str,
265        _params: std::collections::HashMap<String, Value>,
266    ) -> Result<QueryResult> {
267        Err(grafeo_common::utils::error::Error::Internal(
268            "No query language enabled".to_string(),
269        ))
270    }
271
272    /// Executes a GQL query.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if no query language is enabled.
277    #[cfg(not(any(feature = "gql", feature = "cypher")))]
278    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
279        Err(grafeo_common::utils::error::Error::Internal(
280            "No query language enabled".to_string(),
281        ))
282    }
283
284    /// Executes a Cypher query.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if the query fails to parse or execute.
289    #[cfg(feature = "cypher")]
290    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
291        use crate::query::{
292            Executor, Planner, binder::Binder, cache::CacheKey, cypher_translator,
293            optimizer::Optimizer, processor::QueryLanguage,
294        };
295
296        // Create cache key for this query
297        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
298
299        // Try to get cached optimized plan
300        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
301            cached_plan
302        } else {
303            // Parse and translate the query to a logical plan
304            let logical_plan = cypher_translator::translate(query)?;
305
306            // Semantic validation
307            let mut binder = Binder::new();
308            let _binding_context = binder.bind(&logical_plan)?;
309
310            // Optimize the plan
311            let optimizer = Optimizer::from_store(&self.store);
312            let plan = optimizer.optimize(logical_plan)?;
313
314            // Cache the optimized plan
315            self.query_cache.put_optimized(cache_key, plan.clone());
316
317            plan
318        };
319
320        // Get transaction context for MVCC visibility
321        let (viewing_epoch, tx_id) = self.get_transaction_context();
322
323        // Convert to physical plan with transaction context
324        let planner = Planner::with_context(
325            Arc::clone(&self.store),
326            Arc::clone(&self.tx_manager),
327            tx_id,
328            viewing_epoch,
329        )
330        .with_factorized_execution(self.factorized_execution);
331        let mut physical_plan = planner.plan(&optimized_plan)?;
332
333        // Execute the plan
334        let executor = Executor::with_columns(physical_plan.columns.clone());
335        executor.execute(physical_plan.operator.as_mut())
336    }
337
338    /// Executes a Gremlin query.
339    ///
340    /// # Errors
341    ///
342    /// Returns an error if the query fails to parse or execute.
343    ///
344    /// # Examples
345    ///
346    /// ```ignore
347    /// use grafeo_engine::GrafeoDB;
348    ///
349    /// let db = GrafeoDB::new_in_memory();
350    /// let session = db.session();
351    ///
352    /// // Create some nodes first
353    /// session.create_node(&["Person"]);
354    ///
355    /// // Query using Gremlin
356    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
357    /// ```
358    #[cfg(feature = "gremlin")]
359    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
360        use crate::query::{
361            Executor, Planner, binder::Binder, gremlin_translator, optimizer::Optimizer,
362        };
363
364        // Parse and translate the query to a logical plan
365        let logical_plan = gremlin_translator::translate(query)?;
366
367        // Semantic validation
368        let mut binder = Binder::new();
369        let _binding_context = binder.bind(&logical_plan)?;
370
371        // Optimize the plan
372        let optimizer = Optimizer::from_store(&self.store);
373        let optimized_plan = optimizer.optimize(logical_plan)?;
374
375        // Get transaction context for MVCC visibility
376        let (viewing_epoch, tx_id) = self.get_transaction_context();
377
378        // Convert to physical plan with transaction context
379        let planner = Planner::with_context(
380            Arc::clone(&self.store),
381            Arc::clone(&self.tx_manager),
382            tx_id,
383            viewing_epoch,
384        )
385        .with_factorized_execution(self.factorized_execution);
386        let mut physical_plan = planner.plan(&optimized_plan)?;
387
388        // Execute the plan
389        let executor = Executor::with_columns(physical_plan.columns.clone());
390        executor.execute(physical_plan.operator.as_mut())
391    }
392
393    /// Executes a Gremlin query with parameters.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if the query fails to parse or execute.
398    #[cfg(feature = "gremlin")]
399    pub fn execute_gremlin_with_params(
400        &self,
401        query: &str,
402        params: std::collections::HashMap<String, Value>,
403    ) -> Result<QueryResult> {
404        use crate::query::processor::{QueryLanguage, QueryProcessor};
405
406        // Get transaction context for MVCC visibility
407        let (viewing_epoch, tx_id) = self.get_transaction_context();
408
409        // Create processor with transaction context
410        let processor =
411            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
412
413        // Apply transaction context if in a transaction
414        let processor = if let Some(tx_id) = tx_id {
415            processor.with_tx_context(viewing_epoch, tx_id)
416        } else {
417            processor
418        };
419
420        processor.process(query, QueryLanguage::Gremlin, Some(&params))
421    }
422
423    /// Executes a GraphQL query against the LPG store.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if the query fails to parse or execute.
428    ///
429    /// # Examples
430    ///
431    /// ```ignore
432    /// use grafeo_engine::GrafeoDB;
433    ///
434    /// let db = GrafeoDB::new_in_memory();
435    /// let session = db.session();
436    ///
437    /// // Create some nodes first
438    /// session.create_node(&["User"]);
439    ///
440    /// // Query using GraphQL
441    /// let result = session.execute_graphql("query { user { id name } }")?;
442    /// ```
443    #[cfg(feature = "graphql")]
444    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
445        use crate::query::{
446            Executor, Planner, binder::Binder, graphql_translator, optimizer::Optimizer,
447        };
448
449        // Parse and translate the query to a logical plan
450        let logical_plan = graphql_translator::translate(query)?;
451
452        // Semantic validation
453        let mut binder = Binder::new();
454        let _binding_context = binder.bind(&logical_plan)?;
455
456        // Optimize the plan
457        let optimizer = Optimizer::from_store(&self.store);
458        let optimized_plan = optimizer.optimize(logical_plan)?;
459
460        // Get transaction context for MVCC visibility
461        let (viewing_epoch, tx_id) = self.get_transaction_context();
462
463        // Convert to physical plan with transaction context
464        let planner = Planner::with_context(
465            Arc::clone(&self.store),
466            Arc::clone(&self.tx_manager),
467            tx_id,
468            viewing_epoch,
469        )
470        .with_factorized_execution(self.factorized_execution);
471        let mut physical_plan = planner.plan(&optimized_plan)?;
472
473        // Execute the plan
474        let executor = Executor::with_columns(physical_plan.columns.clone());
475        executor.execute(physical_plan.operator.as_mut())
476    }
477
478    /// Executes a GraphQL query with parameters.
479    ///
480    /// # Errors
481    ///
482    /// Returns an error if the query fails to parse or execute.
483    #[cfg(feature = "graphql")]
484    pub fn execute_graphql_with_params(
485        &self,
486        query: &str,
487        params: std::collections::HashMap<String, Value>,
488    ) -> Result<QueryResult> {
489        use crate::query::processor::{QueryLanguage, QueryProcessor};
490
491        // Get transaction context for MVCC visibility
492        let (viewing_epoch, tx_id) = self.get_transaction_context();
493
494        // Create processor with transaction context
495        let processor =
496            QueryProcessor::for_lpg_with_tx(Arc::clone(&self.store), Arc::clone(&self.tx_manager));
497
498        // Apply transaction context if in a transaction
499        let processor = if let Some(tx_id) = tx_id {
500            processor.with_tx_context(viewing_epoch, tx_id)
501        } else {
502            processor
503        };
504
505        processor.process(query, QueryLanguage::GraphQL, Some(&params))
506    }
507
508    /// Executes a SPARQL query.
509    ///
510    /// # Errors
511    ///
512    /// Returns an error if the query fails to parse or execute.
513    #[cfg(all(feature = "sparql", feature = "rdf"))]
514    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
515        use crate::query::{
516            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
517        };
518
519        // Parse and translate the SPARQL query to a logical plan
520        let logical_plan = sparql_translator::translate(query)?;
521
522        // Optimize the plan
523        let optimizer = Optimizer::from_store(&self.store);
524        let optimized_plan = optimizer.optimize(logical_plan)?;
525
526        // Convert to physical plan using RDF planner
527        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(self.current_tx);
528        let mut physical_plan = planner.plan(&optimized_plan)?;
529
530        // Execute the plan
531        let executor = Executor::with_columns(physical_plan.columns.clone());
532        executor.execute(physical_plan.operator.as_mut())
533    }
534
535    /// Executes a SPARQL query with parameters.
536    ///
537    /// # Errors
538    ///
539    /// Returns an error if the query fails to parse or execute.
540    #[cfg(all(feature = "sparql", feature = "rdf"))]
541    pub fn execute_sparql_with_params(
542        &self,
543        query: &str,
544        _params: std::collections::HashMap<String, Value>,
545    ) -> Result<QueryResult> {
546        // TODO: Implement parameter substitution for SPARQL
547        // For now, just execute the query without parameters
548        self.execute_sparql(query)
549    }
550
551    /// Begins a new transaction.
552    ///
553    /// # Errors
554    ///
555    /// Returns an error if a transaction is already active.
556    ///
557    /// # Examples
558    ///
559    /// ```ignore
560    /// use grafeo_engine::GrafeoDB;
561    ///
562    /// let db = GrafeoDB::new_in_memory();
563    /// let mut session = db.session();
564    ///
565    /// session.begin_tx()?;
566    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
567    /// session.execute("INSERT (:Person {name: 'Bob'})")?;
568    /// session.commit()?; // Both inserts committed atomically
569    /// ```
570    pub fn begin_tx(&mut self) -> Result<()> {
571        if self.current_tx.is_some() {
572            return Err(grafeo_common::utils::error::Error::Transaction(
573                grafeo_common::utils::error::TransactionError::InvalidState(
574                    "Transaction already active".to_string(),
575                ),
576            ));
577        }
578
579        let tx_id = self.tx_manager.begin();
580        self.current_tx = Some(tx_id);
581        Ok(())
582    }
583
584    /// Begins a transaction with a specific isolation level.
585    ///
586    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
587    ///
588    /// # Errors
589    ///
590    /// Returns an error if a transaction is already active.
591    pub fn begin_tx_with_isolation(
592        &mut self,
593        isolation_level: crate::transaction::IsolationLevel,
594    ) -> Result<()> {
595        if self.current_tx.is_some() {
596            return Err(grafeo_common::utils::error::Error::Transaction(
597                grafeo_common::utils::error::TransactionError::InvalidState(
598                    "Transaction already active".to_string(),
599                ),
600            ));
601        }
602
603        let tx_id = self.tx_manager.begin_with_isolation(isolation_level);
604        self.current_tx = Some(tx_id);
605        Ok(())
606    }
607
608    /// Commits the current transaction.
609    ///
610    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
611    ///
612    /// # Errors
613    ///
614    /// Returns an error if no transaction is active.
615    pub fn commit(&mut self) -> Result<()> {
616        let tx_id = self.current_tx.take().ok_or_else(|| {
617            grafeo_common::utils::error::Error::Transaction(
618                grafeo_common::utils::error::TransactionError::InvalidState(
619                    "No active transaction".to_string(),
620                ),
621            )
622        })?;
623
624        // Commit RDF store pending operations
625        #[cfg(feature = "rdf")]
626        self.rdf_store.commit_tx(tx_id);
627
628        self.tx_manager.commit(tx_id).map(|_| ())
629    }
630
631    /// Aborts the current transaction.
632    ///
633    /// Discards all changes since [`begin_tx`](Self::begin_tx).
634    ///
635    /// # Errors
636    ///
637    /// Returns an error if no transaction is active.
638    ///
639    /// # Examples
640    ///
641    /// ```ignore
642    /// use grafeo_engine::GrafeoDB;
643    ///
644    /// let db = GrafeoDB::new_in_memory();
645    /// let mut session = db.session();
646    ///
647    /// session.begin_tx()?;
648    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
649    /// session.rollback()?; // Insert is discarded
650    /// ```
651    pub fn rollback(&mut self) -> Result<()> {
652        let tx_id = self.current_tx.take().ok_or_else(|| {
653            grafeo_common::utils::error::Error::Transaction(
654                grafeo_common::utils::error::TransactionError::InvalidState(
655                    "No active transaction".to_string(),
656                ),
657            )
658        })?;
659
660        // Discard uncommitted versions in the LPG store
661        self.store.discard_uncommitted_versions(tx_id);
662
663        // Discard pending operations in the RDF store
664        #[cfg(feature = "rdf")]
665        self.rdf_store.rollback_tx(tx_id);
666
667        // Mark transaction as aborted in the manager
668        self.tx_manager.abort(tx_id)
669    }
670
671    /// Returns whether a transaction is active.
672    #[must_use]
673    pub fn in_transaction(&self) -> bool {
674        self.current_tx.is_some()
675    }
676
677    /// Sets auto-commit mode.
678    pub fn set_auto_commit(&mut self, auto_commit: bool) {
679        self.auto_commit = auto_commit;
680    }
681
682    /// Returns whether auto-commit is enabled.
683    #[must_use]
684    pub fn auto_commit(&self) -> bool {
685        self.auto_commit
686    }
687
688    /// Returns the current transaction context for MVCC visibility.
689    ///
690    /// Returns `(viewing_epoch, tx_id)` where:
691    /// - `viewing_epoch` is the epoch at which to check version visibility
692    /// - `tx_id` is the current transaction ID (if in a transaction)
693    #[must_use]
694    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
695        if let Some(tx_id) = self.current_tx {
696            // In a transaction - use the transaction's start epoch
697            let epoch = self
698                .tx_manager
699                .start_epoch(tx_id)
700                .unwrap_or_else(|| self.tx_manager.current_epoch());
701            (epoch, Some(tx_id))
702        } else {
703            // No transaction - use current epoch
704            (self.tx_manager.current_epoch(), None)
705        }
706    }
707
708    /// Creates a node directly (bypassing query execution).
709    ///
710    /// This is a low-level API for testing and direct manipulation.
711    /// If a transaction is active, the node will be versioned with the transaction ID.
712    pub fn create_node(&self, labels: &[&str]) -> NodeId {
713        let (epoch, tx_id) = self.get_transaction_context();
714        self.store
715            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
716    }
717
718    /// Creates a node with properties.
719    ///
720    /// If a transaction is active, the node will be versioned with the transaction ID.
721    pub fn create_node_with_props<'a>(
722        &self,
723        labels: &[&str],
724        properties: impl IntoIterator<Item = (&'a str, Value)>,
725    ) -> NodeId {
726        let (epoch, tx_id) = self.get_transaction_context();
727        self.store.create_node_with_props_versioned(
728            labels,
729            properties.into_iter().map(|(k, v)| (k, v)),
730            epoch,
731            tx_id.unwrap_or(TxId::SYSTEM),
732        )
733    }
734
735    /// Creates an edge between two nodes.
736    ///
737    /// This is a low-level API for testing and direct manipulation.
738    /// If a transaction is active, the edge will be versioned with the transaction ID.
739    pub fn create_edge(
740        &self,
741        src: NodeId,
742        dst: NodeId,
743        edge_type: &str,
744    ) -> grafeo_common::types::EdgeId {
745        let (epoch, tx_id) = self.get_transaction_context();
746        self.store
747            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
748    }
749
750    // =========================================================================
751    // Direct Lookup APIs (bypass query planning for O(1) point reads)
752    // =========================================================================
753
754    /// Gets a node by ID directly, bypassing query planning.
755    ///
756    /// This is the fastest way to retrieve a single node when you know its ID.
757    /// Skips parsing, binding, optimization, and physical planning entirely.
758    ///
759    /// # Performance
760    ///
761    /// - Time complexity: O(1) average case
762    /// - No lock contention (uses DashMap internally)
763    /// - ~20-30x faster than equivalent MATCH query
764    ///
765    /// # Example
766    ///
767    /// ```ignore
768    /// let session = db.session();
769    /// let node_id = session.create_node(&["Person"]);
770    ///
771    /// // Direct lookup - O(1), no query planning
772    /// let node = session.get_node(node_id);
773    /// assert!(node.is_some());
774    /// ```
775    #[must_use]
776    pub fn get_node(&self, id: NodeId) -> Option<Node> {
777        let (epoch, tx_id) = self.get_transaction_context();
778        self.store
779            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
780    }
781
782    /// Gets a single property from a node by ID, bypassing query planning.
783    ///
784    /// More efficient than `get_node()` when you only need one property,
785    /// as it avoids loading the full node with all properties.
786    ///
787    /// # Performance
788    ///
789    /// - Time complexity: O(1) average case
790    /// - No query planning overhead
791    ///
792    /// # Example
793    ///
794    /// ```ignore
795    /// let session = db.session();
796    /// let id = session.create_node_with_props(&["Person"], [("name", "Alice".into())]);
797    ///
798    /// // Direct property access - O(1)
799    /// let name = session.get_node_property(id, "name");
800    /// assert_eq!(name, Some(Value::String("Alice".into())));
801    /// ```
802    #[must_use]
803    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
804        self.get_node(id)
805            .and_then(|node| node.get_property(key).cloned())
806    }
807
808    /// Gets an edge by ID directly, bypassing query planning.
809    ///
810    /// # Performance
811    ///
812    /// - Time complexity: O(1) average case
813    /// - No lock contention
814    #[must_use]
815    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
816        let (epoch, tx_id) = self.get_transaction_context();
817        self.store
818            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
819    }
820
821    /// Gets outgoing neighbors of a node directly, bypassing query planning.
822    ///
823    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
824    ///
825    /// # Performance
826    ///
827    /// - Time complexity: O(degree) where degree is the number of outgoing edges
828    /// - Uses adjacency index for direct access
829    /// - ~10-20x faster than equivalent MATCH query
830    ///
831    /// # Example
832    ///
833    /// ```ignore
834    /// let session = db.session();
835    /// let alice = session.create_node(&["Person"]);
836    /// let bob = session.create_node(&["Person"]);
837    /// session.create_edge(alice, bob, "KNOWS");
838    ///
839    /// // Direct neighbor lookup - O(degree)
840    /// let neighbors = session.get_neighbors_outgoing(alice);
841    /// assert_eq!(neighbors.len(), 1);
842    /// assert_eq!(neighbors[0].0, bob);
843    /// ```
844    #[must_use]
845    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
846        self.store.edges_from(node, Direction::Outgoing).collect()
847    }
848
849    /// Gets incoming neighbors of a node directly, bypassing query planning.
850    ///
851    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
852    ///
853    /// # Performance
854    ///
855    /// - Time complexity: O(degree) where degree is the number of incoming edges
856    /// - Uses backward adjacency index for direct access
857    #[must_use]
858    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
859        self.store.edges_from(node, Direction::Incoming).collect()
860    }
861
862    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
863    ///
864    /// # Example
865    ///
866    /// ```ignore
867    /// let neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
868    /// ```
869    #[must_use]
870    pub fn get_neighbors_outgoing_by_type(
871        &self,
872        node: NodeId,
873        edge_type: &str,
874    ) -> Vec<(NodeId, EdgeId)> {
875        self.store
876            .edges_from(node, Direction::Outgoing)
877            .filter(|(_, edge_id)| {
878                self.get_edge(*edge_id)
879                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
880            })
881            .collect()
882    }
883
884    /// Checks if a node exists, bypassing query planning.
885    ///
886    /// # Performance
887    ///
888    /// - Time complexity: O(1)
889    /// - Fastest existence check available
890    #[must_use]
891    pub fn node_exists(&self, id: NodeId) -> bool {
892        self.get_node(id).is_some()
893    }
894
895    /// Checks if an edge exists, bypassing query planning.
896    #[must_use]
897    pub fn edge_exists(&self, id: EdgeId) -> bool {
898        self.get_edge(id).is_some()
899    }
900
901    /// Gets the degree (number of edges) of a node.
902    ///
903    /// Returns (outgoing_degree, incoming_degree).
904    #[must_use]
905    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
906        let out = self.store.out_degree(node);
907        let in_degree = self.store.in_degree(node);
908        (out, in_degree)
909    }
910
911    /// Batch lookup of multiple nodes by ID.
912    ///
913    /// More efficient than calling `get_node()` in a loop because it
914    /// amortizes overhead.
915    ///
916    /// # Performance
917    ///
918    /// - Time complexity: O(n) where n is the number of IDs
919    /// - Better cache utilization than individual lookups
920    #[must_use]
921    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
922        let (epoch, tx_id) = self.get_transaction_context();
923        let tx = tx_id.unwrap_or(TxId::SYSTEM);
924        ids.iter()
925            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
926            .collect()
927    }
928}
929
930#[cfg(test)]
931mod tests {
932    use crate::database::GrafeoDB;
933
934    #[test]
935    fn test_session_create_node() {
936        let db = GrafeoDB::new_in_memory();
937        let session = db.session();
938
939        let id = session.create_node(&["Person"]);
940        assert!(id.is_valid());
941        assert_eq!(db.node_count(), 1);
942    }
943
944    #[test]
945    fn test_session_transaction() {
946        let db = GrafeoDB::new_in_memory();
947        let mut session = db.session();
948
949        assert!(!session.in_transaction());
950
951        session.begin_tx().unwrap();
952        assert!(session.in_transaction());
953
954        session.commit().unwrap();
955        assert!(!session.in_transaction());
956    }
957
958    #[test]
959    fn test_session_transaction_context() {
960        let db = GrafeoDB::new_in_memory();
961        let mut session = db.session();
962
963        // Without transaction - context should have current epoch and no tx_id
964        let (_epoch1, tx_id1) = session.get_transaction_context();
965        assert!(tx_id1.is_none());
966
967        // Start a transaction
968        session.begin_tx().unwrap();
969        let (epoch2, tx_id2) = session.get_transaction_context();
970        assert!(tx_id2.is_some());
971        // Transaction should have a valid epoch
972        let _ = epoch2; // Use the variable
973
974        // Commit and verify
975        session.commit().unwrap();
976        let (epoch3, tx_id3) = session.get_transaction_context();
977        assert!(tx_id3.is_none());
978        // Epoch should have advanced after commit
979        assert!(epoch3.as_u64() >= epoch2.as_u64());
980    }
981
982    #[test]
983    fn test_session_rollback() {
984        let db = GrafeoDB::new_in_memory();
985        let mut session = db.session();
986
987        session.begin_tx().unwrap();
988        session.rollback().unwrap();
989        assert!(!session.in_transaction());
990    }
991
992    #[test]
993    fn test_session_rollback_discards_versions() {
994        use grafeo_common::types::TxId;
995
996        let db = GrafeoDB::new_in_memory();
997
998        // Create a node outside of any transaction (at system level)
999        let node_before = db.store().create_node(&["Person"]);
1000        assert!(node_before.is_valid());
1001        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1002
1003        // Start a transaction
1004        let mut session = db.session();
1005        session.begin_tx().unwrap();
1006        let tx_id = session.current_tx.unwrap();
1007
1008        // Create a node versioned with the transaction's ID
1009        let epoch = db.store().current_epoch();
1010        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
1011        assert!(node_in_tx.is_valid());
1012
1013        // Should see 2 nodes at this point
1014        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1015
1016        // Rollback the transaction
1017        session.rollback().unwrap();
1018        assert!(!session.in_transaction());
1019
1020        // The node created in the transaction should be discarded
1021        // Only the first node should remain visible
1022        let count_after = db.node_count();
1023        assert_eq!(
1024            count_after, 1,
1025            "Rollback should discard uncommitted node, but got {count_after}"
1026        );
1027
1028        // The original node should still be accessible
1029        let current_epoch = db.store().current_epoch();
1030        assert!(
1031            db.store()
1032                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
1033                .is_some(),
1034            "Original node should still exist"
1035        );
1036
1037        // The node created in the transaction should not be accessible
1038        assert!(
1039            db.store()
1040                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
1041                .is_none(),
1042            "Transaction node should be gone"
1043        );
1044    }
1045
1046    #[test]
1047    fn test_session_create_node_in_transaction() {
1048        // Test that session.create_node() is transaction-aware
1049        let db = GrafeoDB::new_in_memory();
1050
1051        // Create a node outside of any transaction
1052        let node_before = db.create_node(&["Person"]);
1053        assert!(node_before.is_valid());
1054        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1055
1056        // Start a transaction and create a node through the session
1057        let mut session = db.session();
1058        session.begin_tx().unwrap();
1059
1060        // Create a node through session.create_node() - should be versioned with tx
1061        let node_in_tx = session.create_node(&["Person"]);
1062        assert!(node_in_tx.is_valid());
1063
1064        // Should see 2 nodes at this point
1065        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1066
1067        // Rollback the transaction
1068        session.rollback().unwrap();
1069
1070        // The node created via session.create_node() should be discarded
1071        let count_after = db.node_count();
1072        assert_eq!(
1073            count_after, 1,
1074            "Rollback should discard node created via session.create_node(), but got {count_after}"
1075        );
1076    }
1077
1078    #[test]
1079    fn test_session_create_node_with_props_in_transaction() {
1080        use grafeo_common::types::Value;
1081
1082        // Test that session.create_node_with_props() is transaction-aware
1083        let db = GrafeoDB::new_in_memory();
1084
1085        // Create a node outside of any transaction
1086        db.create_node(&["Person"]);
1087        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
1088
1089        // Start a transaction and create a node with properties
1090        let mut session = db.session();
1091        session.begin_tx().unwrap();
1092
1093        let node_in_tx =
1094            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1095        assert!(node_in_tx.is_valid());
1096
1097        // Should see 2 nodes
1098        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
1099
1100        // Rollback the transaction
1101        session.rollback().unwrap();
1102
1103        // The node should be discarded
1104        let count_after = db.node_count();
1105        assert_eq!(
1106            count_after, 1,
1107            "Rollback should discard node created via session.create_node_with_props()"
1108        );
1109    }
1110
1111    #[cfg(feature = "gql")]
1112    mod gql_tests {
1113        use super::*;
1114
1115        #[test]
1116        fn test_gql_query_execution() {
1117            let db = GrafeoDB::new_in_memory();
1118            let session = db.session();
1119
1120            // Create some test data
1121            session.create_node(&["Person"]);
1122            session.create_node(&["Person"]);
1123            session.create_node(&["Animal"]);
1124
1125            // Execute a GQL query
1126            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1127
1128            // Should return 2 Person nodes
1129            assert_eq!(result.row_count(), 2);
1130            assert_eq!(result.column_count(), 1);
1131            assert_eq!(result.columns[0], "n");
1132        }
1133
1134        #[test]
1135        fn test_gql_empty_result() {
1136            let db = GrafeoDB::new_in_memory();
1137            let session = db.session();
1138
1139            // No data in database
1140            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
1141
1142            assert_eq!(result.row_count(), 0);
1143        }
1144
1145        #[test]
1146        fn test_gql_parse_error() {
1147            let db = GrafeoDB::new_in_memory();
1148            let session = db.session();
1149
1150            // Invalid GQL syntax
1151            let result = session.execute("MATCH (n RETURN n");
1152
1153            assert!(result.is_err());
1154        }
1155
1156        #[test]
1157        fn test_gql_relationship_traversal() {
1158            let db = GrafeoDB::new_in_memory();
1159            let session = db.session();
1160
1161            // Create a graph: Alice -> Bob, Alice -> Charlie
1162            let alice = session.create_node(&["Person"]);
1163            let bob = session.create_node(&["Person"]);
1164            let charlie = session.create_node(&["Person"]);
1165
1166            session.create_edge(alice, bob, "KNOWS");
1167            session.create_edge(alice, charlie, "KNOWS");
1168
1169            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
1170            let result = session
1171                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1172                .unwrap();
1173
1174            // Should return 2 rows (Alice->Bob, Alice->Charlie)
1175            assert_eq!(result.row_count(), 2);
1176            assert_eq!(result.column_count(), 2);
1177            assert_eq!(result.columns[0], "a");
1178            assert_eq!(result.columns[1], "b");
1179        }
1180
1181        #[test]
1182        fn test_gql_relationship_with_type_filter() {
1183            let db = GrafeoDB::new_in_memory();
1184            let session = db.session();
1185
1186            // Create a graph: Alice -KNOWS-> Bob, Alice -WORKS_WITH-> Charlie
1187            let alice = session.create_node(&["Person"]);
1188            let bob = session.create_node(&["Person"]);
1189            let charlie = session.create_node(&["Person"]);
1190
1191            session.create_edge(alice, bob, "KNOWS");
1192            session.create_edge(alice, charlie, "WORKS_WITH");
1193
1194            // Query only KNOWS relationships
1195            let result = session
1196                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
1197                .unwrap();
1198
1199            // Should return only 1 row (Alice->Bob)
1200            assert_eq!(result.row_count(), 1);
1201        }
1202
1203        #[test]
1204        fn test_gql_semantic_error_undefined_variable() {
1205            let db = GrafeoDB::new_in_memory();
1206            let session = db.session();
1207
1208            // Reference undefined variable 'x' in RETURN
1209            let result = session.execute("MATCH (n:Person) RETURN x");
1210
1211            // Should fail with semantic error
1212            assert!(result.is_err());
1213            let Err(err) = result else {
1214                panic!("Expected error")
1215            };
1216            assert!(
1217                err.to_string().contains("Undefined variable"),
1218                "Expected undefined variable error, got: {}",
1219                err
1220            );
1221        }
1222
1223        #[test]
1224        fn test_gql_where_clause_property_filter() {
1225            use grafeo_common::types::Value;
1226
1227            let db = GrafeoDB::new_in_memory();
1228            let session = db.session();
1229
1230            // Create people with ages
1231            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1232            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1233            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1234
1235            // Query with WHERE clause: age > 30
1236            let result = session
1237                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
1238                .unwrap();
1239
1240            // Should return 2 people (ages 35 and 45)
1241            assert_eq!(result.row_count(), 2);
1242        }
1243
1244        #[test]
1245        fn test_gql_where_clause_equality() {
1246            use grafeo_common::types::Value;
1247
1248            let db = GrafeoDB::new_in_memory();
1249            let session = db.session();
1250
1251            // Create people with names
1252            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1253            session.create_node_with_props(&["Person"], [("name", Value::String("Bob".into()))]);
1254            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1255
1256            // Query with WHERE clause: name = "Alice"
1257            let result = session
1258                .execute("MATCH (n:Person) WHERE n.name = \"Alice\" RETURN n")
1259                .unwrap();
1260
1261            // Should return 2 people named Alice
1262            assert_eq!(result.row_count(), 2);
1263        }
1264
1265        #[test]
1266        fn test_gql_return_property_access() {
1267            use grafeo_common::types::Value;
1268
1269            let db = GrafeoDB::new_in_memory();
1270            let session = db.session();
1271
1272            // Create people with names and ages
1273            session.create_node_with_props(
1274                &["Person"],
1275                [
1276                    ("name", Value::String("Alice".into())),
1277                    ("age", Value::Int64(30)),
1278                ],
1279            );
1280            session.create_node_with_props(
1281                &["Person"],
1282                [
1283                    ("name", Value::String("Bob".into())),
1284                    ("age", Value::Int64(25)),
1285                ],
1286            );
1287
1288            // Query returning properties
1289            let result = session
1290                .execute("MATCH (n:Person) RETURN n.name, n.age")
1291                .unwrap();
1292
1293            // Should return 2 rows with name and age columns
1294            assert_eq!(result.row_count(), 2);
1295            assert_eq!(result.column_count(), 2);
1296            assert_eq!(result.columns[0], "n.name");
1297            assert_eq!(result.columns[1], "n.age");
1298
1299            // Check that we get actual values
1300            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
1301            assert!(names.contains(&&Value::String("Alice".into())));
1302            assert!(names.contains(&&Value::String("Bob".into())));
1303        }
1304
1305        #[test]
1306        fn test_gql_return_mixed_expressions() {
1307            use grafeo_common::types::Value;
1308
1309            let db = GrafeoDB::new_in_memory();
1310            let session = db.session();
1311
1312            // Create a person
1313            session.create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1314
1315            // Query returning both node and property
1316            let result = session
1317                .execute("MATCH (n:Person) RETURN n, n.name")
1318                .unwrap();
1319
1320            assert_eq!(result.row_count(), 1);
1321            assert_eq!(result.column_count(), 2);
1322            assert_eq!(result.columns[0], "n");
1323            assert_eq!(result.columns[1], "n.name");
1324
1325            // Second column should be the name
1326            assert_eq!(result.rows[0][1], Value::String("Alice".into()));
1327        }
1328    }
1329
1330    #[cfg(feature = "cypher")]
1331    mod cypher_tests {
1332        use super::*;
1333
1334        #[test]
1335        fn test_cypher_query_execution() {
1336            let db = GrafeoDB::new_in_memory();
1337            let session = db.session();
1338
1339            // Create some test data
1340            session.create_node(&["Person"]);
1341            session.create_node(&["Person"]);
1342            session.create_node(&["Animal"]);
1343
1344            // Execute a Cypher query
1345            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1346
1347            // Should return 2 Person nodes
1348            assert_eq!(result.row_count(), 2);
1349            assert_eq!(result.column_count(), 1);
1350            assert_eq!(result.columns[0], "n");
1351        }
1352
1353        #[test]
1354        fn test_cypher_empty_result() {
1355            let db = GrafeoDB::new_in_memory();
1356            let session = db.session();
1357
1358            // No data in database
1359            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
1360
1361            assert_eq!(result.row_count(), 0);
1362        }
1363
1364        #[test]
1365        fn test_cypher_parse_error() {
1366            let db = GrafeoDB::new_in_memory();
1367            let session = db.session();
1368
1369            // Invalid Cypher syntax
1370            let result = session.execute_cypher("MATCH (n RETURN n");
1371
1372            assert!(result.is_err());
1373        }
1374    }
1375
1376    // ==================== Direct Lookup API Tests ====================
1377
1378    mod direct_lookup_tests {
1379        use super::*;
1380        use grafeo_common::types::Value;
1381
1382        #[test]
1383        fn test_get_node() {
1384            let db = GrafeoDB::new_in_memory();
1385            let session = db.session();
1386
1387            let id = session.create_node(&["Person"]);
1388            let node = session.get_node(id);
1389
1390            assert!(node.is_some());
1391            let node = node.unwrap();
1392            assert_eq!(node.id, id);
1393        }
1394
1395        #[test]
1396        fn test_get_node_not_found() {
1397            use grafeo_common::types::NodeId;
1398
1399            let db = GrafeoDB::new_in_memory();
1400            let session = db.session();
1401
1402            // Try to get a non-existent node
1403            let node = session.get_node(NodeId::new(9999));
1404            assert!(node.is_none());
1405        }
1406
1407        #[test]
1408        fn test_get_node_property() {
1409            let db = GrafeoDB::new_in_memory();
1410            let session = db.session();
1411
1412            let id = session
1413                .create_node_with_props(&["Person"], [("name", Value::String("Alice".into()))]);
1414
1415            let name = session.get_node_property(id, "name");
1416            assert_eq!(name, Some(Value::String("Alice".into())));
1417
1418            // Non-existent property
1419            let missing = session.get_node_property(id, "missing");
1420            assert!(missing.is_none());
1421        }
1422
1423        #[test]
1424        fn test_get_edge() {
1425            let db = GrafeoDB::new_in_memory();
1426            let session = db.session();
1427
1428            let alice = session.create_node(&["Person"]);
1429            let bob = session.create_node(&["Person"]);
1430            let edge_id = session.create_edge(alice, bob, "KNOWS");
1431
1432            let edge = session.get_edge(edge_id);
1433            assert!(edge.is_some());
1434            let edge = edge.unwrap();
1435            assert_eq!(edge.id, edge_id);
1436            assert_eq!(edge.src, alice);
1437            assert_eq!(edge.dst, bob);
1438        }
1439
1440        #[test]
1441        fn test_get_edge_not_found() {
1442            use grafeo_common::types::EdgeId;
1443
1444            let db = GrafeoDB::new_in_memory();
1445            let session = db.session();
1446
1447            let edge = session.get_edge(EdgeId::new(9999));
1448            assert!(edge.is_none());
1449        }
1450
1451        #[test]
1452        fn test_get_neighbors_outgoing() {
1453            let db = GrafeoDB::new_in_memory();
1454            let session = db.session();
1455
1456            let alice = session.create_node(&["Person"]);
1457            let bob = session.create_node(&["Person"]);
1458            let carol = session.create_node(&["Person"]);
1459
1460            session.create_edge(alice, bob, "KNOWS");
1461            session.create_edge(alice, carol, "KNOWS");
1462
1463            let neighbors = session.get_neighbors_outgoing(alice);
1464            assert_eq!(neighbors.len(), 2);
1465
1466            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1467            assert!(neighbor_ids.contains(&bob));
1468            assert!(neighbor_ids.contains(&carol));
1469        }
1470
1471        #[test]
1472        fn test_get_neighbors_incoming() {
1473            let db = GrafeoDB::new_in_memory();
1474            let session = db.session();
1475
1476            let alice = session.create_node(&["Person"]);
1477            let bob = session.create_node(&["Person"]);
1478            let carol = session.create_node(&["Person"]);
1479
1480            session.create_edge(bob, alice, "KNOWS");
1481            session.create_edge(carol, alice, "KNOWS");
1482
1483            let neighbors = session.get_neighbors_incoming(alice);
1484            assert_eq!(neighbors.len(), 2);
1485
1486            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
1487            assert!(neighbor_ids.contains(&bob));
1488            assert!(neighbor_ids.contains(&carol));
1489        }
1490
1491        #[test]
1492        fn test_get_neighbors_outgoing_by_type() {
1493            let db = GrafeoDB::new_in_memory();
1494            let session = db.session();
1495
1496            let alice = session.create_node(&["Person"]);
1497            let bob = session.create_node(&["Person"]);
1498            let company = session.create_node(&["Company"]);
1499
1500            session.create_edge(alice, bob, "KNOWS");
1501            session.create_edge(alice, company, "WORKS_AT");
1502
1503            let knows_neighbors = session.get_neighbors_outgoing_by_type(alice, "KNOWS");
1504            assert_eq!(knows_neighbors.len(), 1);
1505            assert_eq!(knows_neighbors[0].0, bob);
1506
1507            let works_neighbors = session.get_neighbors_outgoing_by_type(alice, "WORKS_AT");
1508            assert_eq!(works_neighbors.len(), 1);
1509            assert_eq!(works_neighbors[0].0, company);
1510
1511            // No edges of this type
1512            let no_neighbors = session.get_neighbors_outgoing_by_type(alice, "LIKES");
1513            assert!(no_neighbors.is_empty());
1514        }
1515
1516        #[test]
1517        fn test_node_exists() {
1518            use grafeo_common::types::NodeId;
1519
1520            let db = GrafeoDB::new_in_memory();
1521            let session = db.session();
1522
1523            let id = session.create_node(&["Person"]);
1524
1525            assert!(session.node_exists(id));
1526            assert!(!session.node_exists(NodeId::new(9999)));
1527        }
1528
1529        #[test]
1530        fn test_edge_exists() {
1531            use grafeo_common::types::EdgeId;
1532
1533            let db = GrafeoDB::new_in_memory();
1534            let session = db.session();
1535
1536            let alice = session.create_node(&["Person"]);
1537            let bob = session.create_node(&["Person"]);
1538            let edge_id = session.create_edge(alice, bob, "KNOWS");
1539
1540            assert!(session.edge_exists(edge_id));
1541            assert!(!session.edge_exists(EdgeId::new(9999)));
1542        }
1543
1544        #[test]
1545        fn test_get_degree() {
1546            let db = GrafeoDB::new_in_memory();
1547            let session = db.session();
1548
1549            let alice = session.create_node(&["Person"]);
1550            let bob = session.create_node(&["Person"]);
1551            let carol = session.create_node(&["Person"]);
1552
1553            // Alice knows Bob and Carol (2 outgoing)
1554            session.create_edge(alice, bob, "KNOWS");
1555            session.create_edge(alice, carol, "KNOWS");
1556            // Bob knows Alice (1 incoming for Alice)
1557            session.create_edge(bob, alice, "KNOWS");
1558
1559            let (out_degree, in_degree) = session.get_degree(alice);
1560            assert_eq!(out_degree, 2);
1561            assert_eq!(in_degree, 1);
1562
1563            // Node with no edges
1564            let lonely = session.create_node(&["Person"]);
1565            let (out, in_deg) = session.get_degree(lonely);
1566            assert_eq!(out, 0);
1567            assert_eq!(in_deg, 0);
1568        }
1569
1570        #[test]
1571        fn test_get_nodes_batch() {
1572            let db = GrafeoDB::new_in_memory();
1573            let session = db.session();
1574
1575            let alice = session.create_node(&["Person"]);
1576            let bob = session.create_node(&["Person"]);
1577            let carol = session.create_node(&["Person"]);
1578
1579            let nodes = session.get_nodes_batch(&[alice, bob, carol]);
1580            assert_eq!(nodes.len(), 3);
1581            assert!(nodes[0].is_some());
1582            assert!(nodes[1].is_some());
1583            assert!(nodes[2].is_some());
1584
1585            // With non-existent node
1586            use grafeo_common::types::NodeId;
1587            let nodes_with_missing = session.get_nodes_batch(&[alice, NodeId::new(9999), carol]);
1588            assert_eq!(nodes_with_missing.len(), 3);
1589            assert!(nodes_with_missing[0].is_some());
1590            assert!(nodes_with_missing[1].is_none()); // Missing node
1591            assert!(nodes_with_missing[2].is_some());
1592        }
1593
1594        #[test]
1595        fn test_auto_commit_setting() {
1596            let db = GrafeoDB::new_in_memory();
1597            let mut session = db.session();
1598
1599            // Default is auto-commit enabled
1600            assert!(session.auto_commit());
1601
1602            session.set_auto_commit(false);
1603            assert!(!session.auto_commit());
1604
1605            session.set_auto_commit(true);
1606            assert!(session.auto_commit());
1607        }
1608
1609        #[test]
1610        fn test_transaction_double_begin_error() {
1611            let db = GrafeoDB::new_in_memory();
1612            let mut session = db.session();
1613
1614            session.begin_tx().unwrap();
1615            let result = session.begin_tx();
1616
1617            assert!(result.is_err());
1618            // Clean up
1619            session.rollback().unwrap();
1620        }
1621
1622        #[test]
1623        fn test_commit_without_transaction_error() {
1624            let db = GrafeoDB::new_in_memory();
1625            let mut session = db.session();
1626
1627            let result = session.commit();
1628            assert!(result.is_err());
1629        }
1630
1631        #[test]
1632        fn test_rollback_without_transaction_error() {
1633            let db = GrafeoDB::new_in_memory();
1634            let mut session = db.session();
1635
1636            let result = session.rollback();
1637            assert!(result.is_err());
1638        }
1639
1640        #[test]
1641        fn test_create_edge_in_transaction() {
1642            let db = GrafeoDB::new_in_memory();
1643            let mut session = db.session();
1644
1645            // Create nodes outside transaction
1646            let alice = session.create_node(&["Person"]);
1647            let bob = session.create_node(&["Person"]);
1648
1649            // Create edge in transaction
1650            session.begin_tx().unwrap();
1651            let edge_id = session.create_edge(alice, bob, "KNOWS");
1652
1653            // Edge should be visible in the transaction
1654            assert!(session.edge_exists(edge_id));
1655
1656            // Commit
1657            session.commit().unwrap();
1658
1659            // Edge should still be visible
1660            assert!(session.edge_exists(edge_id));
1661        }
1662
1663        #[test]
1664        fn test_neighbors_empty_node() {
1665            let db = GrafeoDB::new_in_memory();
1666            let session = db.session();
1667
1668            let lonely = session.create_node(&["Person"]);
1669
1670            assert!(session.get_neighbors_outgoing(lonely).is_empty());
1671            assert!(session.get_neighbors_incoming(lonely).is_empty());
1672            assert!(
1673                session
1674                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
1675                    .is_empty()
1676            );
1677        }
1678    }
1679}