Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TxId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25/// Your handle to the database - execute queries and manage transactions.
26///
27/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
28/// tracks its own transaction state, so you can have multiple concurrent
29/// sessions without them interfering.
30pub struct Session {
31    /// The underlying store.
32    store: Arc<LpgStore>,
33    /// Graph store trait object for pluggable storage backends.
34    graph_store: Arc<dyn GraphStoreMut>,
35    /// Schema and metadata catalog shared across sessions.
36    catalog: Arc<Catalog>,
37    /// RDF triple store (if RDF feature is enabled).
38    #[cfg(feature = "rdf")]
39    rdf_store: Arc<RdfStore>,
40    /// Transaction manager.
41    tx_manager: Arc<TransactionManager>,
42    /// Query cache shared across sessions.
43    query_cache: Arc<QueryCache>,
44    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
45    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
46    /// from within `execute(&self)`.
47    current_tx: parking_lot::Mutex<Option<TxId>>,
48    /// Whether the current transaction is read-only (blocks mutations).
49    read_only_tx: parking_lot::Mutex<bool>,
50    /// Whether the session is in auto-commit mode.
51    auto_commit: bool,
52    /// Adaptive execution configuration.
53    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
54    adaptive_config: AdaptiveConfig,
55    /// Whether to use factorized execution for multi-hop queries.
56    factorized_execution: bool,
57    /// The graph data model this session operates on.
58    graph_model: GraphModel,
59    /// Maximum time a query may run before being cancelled.
60    query_timeout: Option<Duration>,
61    /// Shared commit counter for triggering auto-GC.
62    commit_counter: Arc<AtomicUsize>,
63    /// GC every N commits (0 = disabled).
64    gc_interval: usize,
65    /// Node count at the start of the current transaction (for PreparedCommit stats).
66    tx_start_node_count: AtomicUsize,
67    /// Edge count at the start of the current transaction (for PreparedCommit stats).
68    tx_start_edge_count: AtomicUsize,
69    /// WAL for logging schema changes.
70    #[cfg(feature = "wal")]
71    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
72    /// CDC log for change tracking.
73    #[cfg(feature = "cdc")]
74    cdc_log: Arc<crate::cdc::CdcLog>,
75    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
76    current_graph: parking_lot::Mutex<Option<String>>,
77    /// Session time zone override.
78    time_zone: parking_lot::Mutex<Option<String>>,
79    /// Session-level parameters (SET PARAMETER).
80    session_params:
81        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
82    /// Override epoch for time-travel queries (None = use transaction/current epoch).
83    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
84}
85
86impl Session {
87    /// Creates a new session with adaptive execution configuration.
88    #[allow(dead_code, clippy::too_many_arguments)]
89    pub(crate) fn with_adaptive(
90        store: Arc<LpgStore>,
91        tx_manager: Arc<TransactionManager>,
92        query_cache: Arc<QueryCache>,
93        catalog: Arc<Catalog>,
94        adaptive_config: AdaptiveConfig,
95        factorized_execution: bool,
96        graph_model: GraphModel,
97        query_timeout: Option<Duration>,
98        commit_counter: Arc<AtomicUsize>,
99        gc_interval: usize,
100    ) -> Self {
101        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
102        Self {
103            store,
104            graph_store,
105            catalog,
106            #[cfg(feature = "rdf")]
107            rdf_store: Arc::new(RdfStore::new()),
108            tx_manager,
109            query_cache,
110            current_tx: parking_lot::Mutex::new(None),
111            read_only_tx: parking_lot::Mutex::new(false),
112            auto_commit: true,
113            adaptive_config,
114            factorized_execution,
115            graph_model,
116            query_timeout,
117            commit_counter,
118            gc_interval,
119            tx_start_node_count: AtomicUsize::new(0),
120            tx_start_edge_count: AtomicUsize::new(0),
121            #[cfg(feature = "wal")]
122            wal: None,
123            #[cfg(feature = "cdc")]
124            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
125            current_graph: parking_lot::Mutex::new(None),
126            time_zone: parking_lot::Mutex::new(None),
127            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
128            viewing_epoch_override: parking_lot::Mutex::new(None),
129        }
130    }
131
132    /// Sets the WAL for this session (shared with the database).
133    ///
134    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
135    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
136    #[cfg(feature = "wal")]
137    pub(crate) fn set_wal(&mut self, wal: Arc<grafeo_adapters::storage::wal::LpgWal>) {
138        // Wrap the graph store so query-engine mutations are WAL-logged
139        self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
140            Arc::clone(&self.store),
141            Arc::clone(&wal),
142        ));
143        self.wal = Some(wal);
144    }
145
146    /// Sets the CDC log for this session (shared with the database).
147    #[cfg(feature = "cdc")]
148    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
149        self.cdc_log = cdc_log;
150    }
151
152    /// Creates a new session with RDF store and adaptive configuration.
153    #[cfg(feature = "rdf")]
154    #[allow(clippy::too_many_arguments)]
155    pub(crate) fn with_rdf_store_and_adaptive(
156        store: Arc<LpgStore>,
157        rdf_store: Arc<RdfStore>,
158        tx_manager: Arc<TransactionManager>,
159        query_cache: Arc<QueryCache>,
160        catalog: Arc<Catalog>,
161        adaptive_config: AdaptiveConfig,
162        factorized_execution: bool,
163        graph_model: GraphModel,
164        query_timeout: Option<Duration>,
165        commit_counter: Arc<AtomicUsize>,
166        gc_interval: usize,
167    ) -> Self {
168        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
169        Self {
170            store,
171            graph_store,
172            catalog,
173            rdf_store,
174            tx_manager,
175            query_cache,
176            current_tx: parking_lot::Mutex::new(None),
177            read_only_tx: parking_lot::Mutex::new(false),
178            auto_commit: true,
179            adaptive_config,
180            factorized_execution,
181            graph_model,
182            query_timeout,
183            commit_counter,
184            gc_interval,
185            tx_start_node_count: AtomicUsize::new(0),
186            tx_start_edge_count: AtomicUsize::new(0),
187            #[cfg(feature = "wal")]
188            wal: None,
189            #[cfg(feature = "cdc")]
190            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
191            current_graph: parking_lot::Mutex::new(None),
192            time_zone: parking_lot::Mutex::new(None),
193            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
194            viewing_epoch_override: parking_lot::Mutex::new(None),
195        }
196    }
197
198    /// Creates a session backed by an external graph store.
199    ///
200    /// The external store handles all data operations. Transaction management
201    /// (begin/commit/rollback) is not supported for external stores.
202    #[allow(clippy::too_many_arguments)]
203    pub(crate) fn with_external_store(
204        store: Arc<dyn GraphStoreMut>,
205        tx_manager: Arc<TransactionManager>,
206        query_cache: Arc<QueryCache>,
207        catalog: Arc<Catalog>,
208        adaptive_config: AdaptiveConfig,
209        factorized_execution: bool,
210        graph_model: GraphModel,
211        query_timeout: Option<Duration>,
212        commit_counter: Arc<AtomicUsize>,
213        gc_interval: usize,
214    ) -> Self {
215        Self {
216            store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), // dummy for LpgStore-specific ops
217            graph_store: store,
218            catalog,
219            #[cfg(feature = "rdf")]
220            rdf_store: Arc::new(RdfStore::new()),
221            tx_manager,
222            query_cache,
223            current_tx: parking_lot::Mutex::new(None),
224            read_only_tx: parking_lot::Mutex::new(false),
225            auto_commit: true,
226            adaptive_config,
227            factorized_execution,
228            graph_model,
229            query_timeout,
230            commit_counter,
231            gc_interval,
232            tx_start_node_count: AtomicUsize::new(0),
233            tx_start_edge_count: AtomicUsize::new(0),
234            #[cfg(feature = "wal")]
235            wal: None,
236            #[cfg(feature = "cdc")]
237            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
238            current_graph: parking_lot::Mutex::new(None),
239            time_zone: parking_lot::Mutex::new(None),
240            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
241            viewing_epoch_override: parking_lot::Mutex::new(None),
242        }
243    }
244
245    /// Returns the graph model this session operates on.
246    #[must_use]
247    pub fn graph_model(&self) -> GraphModel {
248        self.graph_model
249    }
250
251    // === Session State Management ===
252
253    /// Sets the current graph for this session (USE GRAPH).
254    pub fn use_graph(&self, name: &str) {
255        *self.current_graph.lock() = Some(name.to_string());
256    }
257
258    /// Returns the current graph name, if any.
259    #[must_use]
260    pub fn current_graph(&self) -> Option<String> {
261        self.current_graph.lock().clone()
262    }
263
264    /// Sets the session time zone.
265    pub fn set_time_zone(&self, tz: &str) {
266        *self.time_zone.lock() = Some(tz.to_string());
267    }
268
269    /// Returns the session time zone, if set.
270    #[must_use]
271    pub fn time_zone(&self) -> Option<String> {
272        self.time_zone.lock().clone()
273    }
274
275    /// Sets a session parameter.
276    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
277        self.session_params.lock().insert(key.to_string(), value);
278    }
279
280    /// Gets a session parameter by cloning it.
281    #[must_use]
282    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
283        self.session_params.lock().get(key).cloned()
284    }
285
286    /// Resets all session state to defaults.
287    pub fn reset_session(&self) {
288        *self.current_graph.lock() = None;
289        *self.time_zone.lock() = None;
290        self.session_params.lock().clear();
291        *self.viewing_epoch_override.lock() = None;
292    }
293
294    // --- Time-travel API ---
295
296    /// Sets a viewing epoch override for time-travel queries.
297    ///
298    /// While set, all queries on this session see the database as it existed
299    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
300    /// to return to normal behavior.
301    pub fn set_viewing_epoch(&self, epoch: EpochId) {
302        *self.viewing_epoch_override.lock() = Some(epoch);
303    }
304
305    /// Clears the viewing epoch override, returning to normal behavior.
306    pub fn clear_viewing_epoch(&self) {
307        *self.viewing_epoch_override.lock() = None;
308    }
309
310    /// Returns the current viewing epoch override, if any.
311    #[must_use]
312    pub fn viewing_epoch(&self) -> Option<EpochId> {
313        *self.viewing_epoch_override.lock()
314    }
315
316    /// Returns all versions of a node with their creation/deletion epochs.
317    ///
318    /// Properties and labels reflect the current state (not versioned per-epoch).
319    #[must_use]
320    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
321        self.store.get_node_history(id)
322    }
323
324    /// Returns all versions of an edge with their creation/deletion epochs.
325    ///
326    /// Properties reflect the current state (not versioned per-epoch).
327    #[must_use]
328    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
329        self.store.get_edge_history(id)
330    }
331
332    /// Checks that the session's graph model supports LPG operations.
333    fn require_lpg(&self, language: &str) -> Result<()> {
334        if self.graph_model == GraphModel::Rdf {
335            return Err(grafeo_common::utils::error::Error::Internal(format!(
336                "This is an RDF database. {language} queries require an LPG database."
337            )));
338        }
339        Ok(())
340    }
341
342    /// Executes a session or transaction command, returning an empty result.
343    #[cfg(feature = "gql")]
344    fn execute_session_command(
345        &self,
346        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
347    ) -> Result<QueryResult> {
348        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
349        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
350
351        match cmd {
352            SessionCommand::CreateGraph {
353                name,
354                if_not_exists,
355                typed,
356            } => {
357                let created = self
358                    .store
359                    .create_graph(&name)
360                    .map_err(|e| Error::Internal(e.to_string()))?;
361                if !created && !if_not_exists {
362                    return Err(Error::Query(QueryError::new(
363                        QueryErrorKind::Semantic,
364                        format!("Graph '{name}' already exists"),
365                    )));
366                }
367                // Bind to graph type if specified
368                if let Some(type_name) = typed
369                    && let Err(e) = self.catalog.bind_graph_type(&name, type_name.clone())
370                {
371                    return Err(Error::Query(QueryError::new(
372                        QueryErrorKind::Semantic,
373                        e.to_string(),
374                    )));
375                }
376                Ok(QueryResult::empty())
377            }
378            SessionCommand::DropGraph { name, if_exists } => {
379                let dropped = self.store.drop_graph(&name);
380                if !dropped && !if_exists {
381                    return Err(Error::Query(QueryError::new(
382                        QueryErrorKind::Semantic,
383                        format!("Graph '{name}' does not exist"),
384                    )));
385                }
386                Ok(QueryResult::empty())
387            }
388            SessionCommand::UseGraph(name) => {
389                // Verify graph exists (default graph is always valid)
390                if !name.eq_ignore_ascii_case("default") && self.store.graph(&name).is_none() {
391                    return Err(Error::Query(QueryError::new(
392                        QueryErrorKind::Semantic,
393                        format!("Graph '{name}' does not exist"),
394                    )));
395                }
396                self.use_graph(&name);
397                Ok(QueryResult::empty())
398            }
399            SessionCommand::SessionSetGraph(name) => {
400                self.use_graph(&name);
401                Ok(QueryResult::empty())
402            }
403            SessionCommand::SessionSetTimeZone(tz) => {
404                self.set_time_zone(&tz);
405                Ok(QueryResult::empty())
406            }
407            SessionCommand::SessionSetParameter(key, expr) => {
408                if key.eq_ignore_ascii_case("viewing_epoch") {
409                    match Self::eval_integer_literal(&expr) {
410                        Some(n) if n >= 0 => {
411                            self.set_viewing_epoch(EpochId::new(n as u64));
412                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
413                        }
414                        _ => Err(Error::Query(QueryError::new(
415                            QueryErrorKind::Semantic,
416                            "viewing_epoch must be a non-negative integer literal",
417                        ))),
418                    }
419                } else {
420                    // For now, store parameter name with Null value.
421                    // Full expression evaluation would require building and executing a plan.
422                    self.set_parameter(&key, Value::Null);
423                    Ok(QueryResult::empty())
424                }
425            }
426            SessionCommand::SessionReset => {
427                self.reset_session();
428                Ok(QueryResult::empty())
429            }
430            SessionCommand::SessionClose => {
431                self.reset_session();
432                Ok(QueryResult::empty())
433            }
434            SessionCommand::StartTransaction {
435                read_only,
436                isolation_level,
437            } => {
438                let engine_level = isolation_level.map(|l| match l {
439                    TransactionIsolationLevel::ReadCommitted => {
440                        crate::transaction::IsolationLevel::ReadCommitted
441                    }
442                    TransactionIsolationLevel::SnapshotIsolation => {
443                        crate::transaction::IsolationLevel::SnapshotIsolation
444                    }
445                    TransactionIsolationLevel::Serializable => {
446                        crate::transaction::IsolationLevel::Serializable
447                    }
448                });
449                self.begin_tx_inner(read_only, engine_level)?;
450                Ok(QueryResult::status("Transaction started"))
451            }
452            SessionCommand::Commit => {
453                self.commit_inner()?;
454                Ok(QueryResult::status("Transaction committed"))
455            }
456            SessionCommand::Rollback => {
457                self.rollback_inner()?;
458                Ok(QueryResult::status("Transaction rolled back"))
459            }
460        }
461    }
462
463    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
464    #[cfg(feature = "wal")]
465    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
466        if let Some(ref wal) = self.wal
467            && let Err(e) = wal.log(record)
468        {
469            tracing::warn!("Failed to log schema change to WAL: {}", e);
470        }
471    }
472
473    /// Executes a schema DDL command, returning a status result.
474    #[cfg(feature = "gql")]
475    fn execute_schema_command(
476        &self,
477        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
478    ) -> Result<QueryResult> {
479        use crate::catalog::{
480            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
481        };
482        use grafeo_adapters::query::gql::ast::SchemaStatement;
483        #[cfg(feature = "wal")]
484        use grafeo_adapters::storage::wal::WalRecord;
485        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
486
487        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
488        macro_rules! wal_log {
489            ($self:expr, $record:expr) => {
490                #[cfg(feature = "wal")]
491                $self.log_schema_wal(&$record);
492            };
493        }
494
495        match cmd {
496            SchemaStatement::CreateNodeType(stmt) => {
497                #[cfg(feature = "wal")]
498                let props_for_wal: Vec<(String, String, bool)> = stmt
499                    .properties
500                    .iter()
501                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
502                    .collect();
503                let def = NodeTypeDefinition {
504                    name: stmt.name.clone(),
505                    properties: stmt
506                        .properties
507                        .iter()
508                        .map(|p| TypedProperty {
509                            name: p.name.clone(),
510                            data_type: PropertyDataType::from_type_name(&p.data_type),
511                            nullable: p.nullable,
512                            default_value: None,
513                        })
514                        .collect(),
515                    constraints: Vec::new(),
516                };
517                let result = if stmt.or_replace {
518                    let _ = self.catalog.drop_node_type(&stmt.name);
519                    self.catalog.register_node_type(def)
520                } else {
521                    self.catalog.register_node_type(def)
522                };
523                match result {
524                    Ok(()) => {
525                        wal_log!(
526                            self,
527                            WalRecord::CreateNodeType {
528                                name: stmt.name.clone(),
529                                properties: props_for_wal,
530                                constraints: Vec::new(),
531                            }
532                        );
533                        Ok(QueryResult::status(format!(
534                            "Created node type '{}'",
535                            stmt.name
536                        )))
537                    }
538                    Err(e) if stmt.if_not_exists => {
539                        let _ = e;
540                        Ok(QueryResult::status("No change"))
541                    }
542                    Err(e) => Err(Error::Query(QueryError::new(
543                        QueryErrorKind::Semantic,
544                        e.to_string(),
545                    ))),
546                }
547            }
548            SchemaStatement::CreateEdgeType(stmt) => {
549                #[cfg(feature = "wal")]
550                let props_for_wal: Vec<(String, String, bool)> = stmt
551                    .properties
552                    .iter()
553                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
554                    .collect();
555                let def = EdgeTypeDefinition {
556                    name: stmt.name.clone(),
557                    properties: stmt
558                        .properties
559                        .iter()
560                        .map(|p| TypedProperty {
561                            name: p.name.clone(),
562                            data_type: PropertyDataType::from_type_name(&p.data_type),
563                            nullable: p.nullable,
564                            default_value: None,
565                        })
566                        .collect(),
567                    constraints: Vec::new(),
568                };
569                let result = if stmt.or_replace {
570                    let _ = self.catalog.drop_edge_type_def(&stmt.name);
571                    self.catalog.register_edge_type_def(def)
572                } else {
573                    self.catalog.register_edge_type_def(def)
574                };
575                match result {
576                    Ok(()) => {
577                        wal_log!(
578                            self,
579                            WalRecord::CreateEdgeType {
580                                name: stmt.name.clone(),
581                                properties: props_for_wal,
582                                constraints: Vec::new(),
583                            }
584                        );
585                        Ok(QueryResult::status(format!(
586                            "Created edge type '{}'",
587                            stmt.name
588                        )))
589                    }
590                    Err(e) if stmt.if_not_exists => {
591                        let _ = e;
592                        Ok(QueryResult::status("No change"))
593                    }
594                    Err(e) => Err(Error::Query(QueryError::new(
595                        QueryErrorKind::Semantic,
596                        e.to_string(),
597                    ))),
598                }
599            }
600            SchemaStatement::CreateVectorIndex(stmt) => {
601                Self::create_vector_index_on_store(
602                    &self.store,
603                    &stmt.node_label,
604                    &stmt.property,
605                    stmt.dimensions,
606                    stmt.metric.as_deref(),
607                )?;
608                wal_log!(
609                    self,
610                    WalRecord::CreateIndex {
611                        name: stmt.name.clone(),
612                        label: stmt.node_label.clone(),
613                        property: stmt.property.clone(),
614                        index_type: "vector".to_string(),
615                    }
616                );
617                Ok(QueryResult::status(format!(
618                    "Created vector index '{}'",
619                    stmt.name
620                )))
621            }
622            SchemaStatement::DropNodeType { name, if_exists } => {
623                match self.catalog.drop_node_type(&name) {
624                    Ok(()) => {
625                        wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
626                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
627                    }
628                    Err(e) if if_exists => {
629                        let _ = e;
630                        Ok(QueryResult::status("No change"))
631                    }
632                    Err(e) => Err(Error::Query(QueryError::new(
633                        QueryErrorKind::Semantic,
634                        e.to_string(),
635                    ))),
636                }
637            }
638            SchemaStatement::DropEdgeType { name, if_exists } => {
639                match self.catalog.drop_edge_type_def(&name) {
640                    Ok(()) => {
641                        wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
642                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
643                    }
644                    Err(e) if if_exists => {
645                        let _ = e;
646                        Ok(QueryResult::status("No change"))
647                    }
648                    Err(e) => Err(Error::Query(QueryError::new(
649                        QueryErrorKind::Semantic,
650                        e.to_string(),
651                    ))),
652                }
653            }
654            SchemaStatement::CreateIndex(stmt) => {
655                use grafeo_adapters::query::gql::ast::IndexKind;
656                let index_type_str = match stmt.index_kind {
657                    IndexKind::Property => "property",
658                    IndexKind::BTree => "btree",
659                    IndexKind::Text => "text",
660                    IndexKind::Vector => "vector",
661                };
662                match stmt.index_kind {
663                    IndexKind::Property | IndexKind::BTree => {
664                        for prop in &stmt.properties {
665                            self.store.create_property_index(prop);
666                        }
667                    }
668                    IndexKind::Text => {
669                        for prop in &stmt.properties {
670                            Self::create_text_index_on_store(&self.store, &stmt.label, prop)?;
671                        }
672                    }
673                    IndexKind::Vector => {
674                        for prop in &stmt.properties {
675                            Self::create_vector_index_on_store(
676                                &self.store,
677                                &stmt.label,
678                                prop,
679                                stmt.options.dimensions,
680                                stmt.options.metric.as_deref(),
681                            )?;
682                        }
683                    }
684                }
685                #[cfg(feature = "wal")]
686                for prop in &stmt.properties {
687                    wal_log!(
688                        self,
689                        WalRecord::CreateIndex {
690                            name: stmt.name.clone(),
691                            label: stmt.label.clone(),
692                            property: prop.clone(),
693                            index_type: index_type_str.to_string(),
694                        }
695                    );
696                }
697                Ok(QueryResult::status(format!(
698                    "Created {} index '{}'",
699                    index_type_str, stmt.name
700                )))
701            }
702            SchemaStatement::DropIndex { name, if_exists } => {
703                // Try to drop property index by name
704                let dropped = self.store.drop_property_index(&name);
705                if dropped || if_exists {
706                    if dropped {
707                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
708                    }
709                    Ok(QueryResult::status(if dropped {
710                        format!("Dropped index '{name}'")
711                    } else {
712                        "No change".to_string()
713                    }))
714                } else {
715                    Err(Error::Query(QueryError::new(
716                        QueryErrorKind::Semantic,
717                        format!("Index '{name}' does not exist"),
718                    )))
719                }
720            }
721            SchemaStatement::CreateConstraint(stmt) => {
722                use grafeo_adapters::query::gql::ast::ConstraintKind;
723                let kind_str = match stmt.constraint_kind {
724                    ConstraintKind::Unique => "unique",
725                    ConstraintKind::NodeKey => "node_key",
726                    ConstraintKind::NotNull => "not_null",
727                    ConstraintKind::Exists => "exists",
728                };
729                let constraint_name = stmt
730                    .name
731                    .clone()
732                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
733                wal_log!(
734                    self,
735                    WalRecord::CreateConstraint {
736                        name: constraint_name.clone(),
737                        label: stmt.label.clone(),
738                        properties: stmt.properties.clone(),
739                        kind: kind_str.to_string(),
740                    }
741                );
742                Ok(QueryResult::status(format!(
743                    "Created {kind_str} constraint '{constraint_name}'"
744                )))
745            }
746            SchemaStatement::DropConstraint { name, if_exists } => {
747                let _ = if_exists;
748                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
749                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
750            }
751            SchemaStatement::CreateGraphType(stmt) => {
752                use crate::catalog::GraphTypeDefinition;
753                use grafeo_adapters::query::gql::ast::InlineElementType;
754
755                // GG04: LIKE clause copies type from existing graph
756                let (mut node_types, mut edge_types, open) =
757                    if let Some(ref like_graph) = stmt.like_graph {
758                        // Infer types from the graph's bound type, or use its existing types
759                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
760                            if let Some(existing) = self
761                                .catalog
762                                .schema()
763                                .and_then(|s| s.get_graph_type(&type_name))
764                            {
765                                (
766                                    existing.allowed_node_types.clone(),
767                                    existing.allowed_edge_types.clone(),
768                                    existing.open,
769                                )
770                            } else {
771                                (Vec::new(), Vec::new(), true)
772                            }
773                        } else {
774                            // GG22: Infer from graph data (labels used in graph)
775                            let nt = self.catalog.all_node_type_names();
776                            let et = self.catalog.all_edge_type_names();
777                            if nt.is_empty() && et.is_empty() {
778                                (Vec::new(), Vec::new(), true)
779                            } else {
780                                (nt, et, false)
781                            }
782                        }
783                    } else {
784                        (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
785                    };
786
787                // GG03: Register inline element types and add their names
788                for inline in &stmt.inline_types {
789                    match inline {
790                        InlineElementType::Node {
791                            name, properties, ..
792                        } => {
793                            let def = NodeTypeDefinition {
794                                name: name.clone(),
795                                properties: properties
796                                    .iter()
797                                    .map(|p| TypedProperty {
798                                        name: p.name.clone(),
799                                        data_type: PropertyDataType::from_type_name(&p.data_type),
800                                        nullable: p.nullable,
801                                        default_value: None,
802                                    })
803                                    .collect(),
804                                constraints: Vec::new(),
805                            };
806                            // Register or replace so inline defs override existing
807                            self.catalog.register_or_replace_node_type(def);
808                            #[cfg(feature = "wal")]
809                            {
810                                let props_for_wal: Vec<(String, String, bool)> = properties
811                                    .iter()
812                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
813                                    .collect();
814                                self.log_schema_wal(&WalRecord::CreateNodeType {
815                                    name: name.clone(),
816                                    properties: props_for_wal,
817                                    constraints: Vec::new(),
818                                });
819                            }
820                            if !node_types.contains(name) {
821                                node_types.push(name.clone());
822                            }
823                        }
824                        InlineElementType::Edge {
825                            name, properties, ..
826                        } => {
827                            let def = EdgeTypeDefinition {
828                                name: name.clone(),
829                                properties: properties
830                                    .iter()
831                                    .map(|p| TypedProperty {
832                                        name: p.name.clone(),
833                                        data_type: PropertyDataType::from_type_name(&p.data_type),
834                                        nullable: p.nullable,
835                                        default_value: None,
836                                    })
837                                    .collect(),
838                                constraints: Vec::new(),
839                            };
840                            self.catalog.register_or_replace_edge_type_def(def);
841                            #[cfg(feature = "wal")]
842                            {
843                                let props_for_wal: Vec<(String, String, bool)> = properties
844                                    .iter()
845                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
846                                    .collect();
847                                self.log_schema_wal(&WalRecord::CreateEdgeType {
848                                    name: name.clone(),
849                                    properties: props_for_wal,
850                                    constraints: Vec::new(),
851                                });
852                            }
853                            if !edge_types.contains(name) {
854                                edge_types.push(name.clone());
855                            }
856                        }
857                    }
858                }
859
860                let def = GraphTypeDefinition {
861                    name: stmt.name.clone(),
862                    allowed_node_types: node_types.clone(),
863                    allowed_edge_types: edge_types.clone(),
864                    open,
865                };
866                let result = if stmt.or_replace {
867                    // Drop existing first, ignore error if not found
868                    let _ = self.catalog.drop_graph_type(&stmt.name);
869                    self.catalog.register_graph_type(def)
870                } else {
871                    self.catalog.register_graph_type(def)
872                };
873                match result {
874                    Ok(()) => {
875                        wal_log!(
876                            self,
877                            WalRecord::CreateGraphType {
878                                name: stmt.name.clone(),
879                                node_types,
880                                edge_types,
881                                open,
882                            }
883                        );
884                        Ok(QueryResult::status(format!(
885                            "Created graph type '{}'",
886                            stmt.name
887                        )))
888                    }
889                    Err(e) if stmt.if_not_exists => {
890                        let _ = e;
891                        Ok(QueryResult::status("No change"))
892                    }
893                    Err(e) => Err(Error::Query(QueryError::new(
894                        QueryErrorKind::Semantic,
895                        e.to_string(),
896                    ))),
897                }
898            }
899            SchemaStatement::DropGraphType { name, if_exists } => {
900                match self.catalog.drop_graph_type(&name) {
901                    Ok(()) => {
902                        wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
903                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
904                    }
905                    Err(e) if if_exists => {
906                        let _ = e;
907                        Ok(QueryResult::status("No change"))
908                    }
909                    Err(e) => Err(Error::Query(QueryError::new(
910                        QueryErrorKind::Semantic,
911                        e.to_string(),
912                    ))),
913                }
914            }
915            SchemaStatement::CreateSchema {
916                name,
917                if_not_exists,
918            } => match self.catalog.register_schema_namespace(name.clone()) {
919                Ok(()) => {
920                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
921                    Ok(QueryResult::status(format!("Created schema '{name}'")))
922                }
923                Err(e) if if_not_exists => {
924                    let _ = e;
925                    Ok(QueryResult::status("No change"))
926                }
927                Err(e) => Err(Error::Query(QueryError::new(
928                    QueryErrorKind::Semantic,
929                    e.to_string(),
930                ))),
931            },
932            SchemaStatement::DropSchema { name, if_exists } => {
933                match self.catalog.drop_schema_namespace(&name) {
934                    Ok(()) => {
935                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
936                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
937                    }
938                    Err(e) if if_exists => {
939                        let _ = e;
940                        Ok(QueryResult::status("No change"))
941                    }
942                    Err(e) => Err(Error::Query(QueryError::new(
943                        QueryErrorKind::Semantic,
944                        e.to_string(),
945                    ))),
946                }
947            }
948            SchemaStatement::AlterNodeType(stmt) => {
949                use grafeo_adapters::query::gql::ast::TypeAlteration;
950                let mut wal_alts = Vec::new();
951                for alt in &stmt.alterations {
952                    match alt {
953                        TypeAlteration::AddProperty(prop) => {
954                            let typed = TypedProperty {
955                                name: prop.name.clone(),
956                                data_type: PropertyDataType::from_type_name(&prop.data_type),
957                                nullable: prop.nullable,
958                                default_value: None,
959                            };
960                            self.catalog
961                                .alter_node_type_add_property(&stmt.name, typed)
962                                .map_err(|e| {
963                                    Error::Query(QueryError::new(
964                                        QueryErrorKind::Semantic,
965                                        e.to_string(),
966                                    ))
967                                })?;
968                            wal_alts.push((
969                                "add".to_string(),
970                                prop.name.clone(),
971                                prop.data_type.clone(),
972                                prop.nullable,
973                            ));
974                        }
975                        TypeAlteration::DropProperty(name) => {
976                            self.catalog
977                                .alter_node_type_drop_property(&stmt.name, name)
978                                .map_err(|e| {
979                                    Error::Query(QueryError::new(
980                                        QueryErrorKind::Semantic,
981                                        e.to_string(),
982                                    ))
983                                })?;
984                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
985                        }
986                    }
987                }
988                wal_log!(
989                    self,
990                    WalRecord::AlterNodeType {
991                        name: stmt.name.clone(),
992                        alterations: wal_alts,
993                    }
994                );
995                Ok(QueryResult::status(format!(
996                    "Altered node type '{}'",
997                    stmt.name
998                )))
999            }
1000            SchemaStatement::AlterEdgeType(stmt) => {
1001                use grafeo_adapters::query::gql::ast::TypeAlteration;
1002                let mut wal_alts = Vec::new();
1003                for alt in &stmt.alterations {
1004                    match alt {
1005                        TypeAlteration::AddProperty(prop) => {
1006                            let typed = TypedProperty {
1007                                name: prop.name.clone(),
1008                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1009                                nullable: prop.nullable,
1010                                default_value: None,
1011                            };
1012                            self.catalog
1013                                .alter_edge_type_add_property(&stmt.name, typed)
1014                                .map_err(|e| {
1015                                    Error::Query(QueryError::new(
1016                                        QueryErrorKind::Semantic,
1017                                        e.to_string(),
1018                                    ))
1019                                })?;
1020                            wal_alts.push((
1021                                "add".to_string(),
1022                                prop.name.clone(),
1023                                prop.data_type.clone(),
1024                                prop.nullable,
1025                            ));
1026                        }
1027                        TypeAlteration::DropProperty(name) => {
1028                            self.catalog
1029                                .alter_edge_type_drop_property(&stmt.name, name)
1030                                .map_err(|e| {
1031                                    Error::Query(QueryError::new(
1032                                        QueryErrorKind::Semantic,
1033                                        e.to_string(),
1034                                    ))
1035                                })?;
1036                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1037                        }
1038                    }
1039                }
1040                wal_log!(
1041                    self,
1042                    WalRecord::AlterEdgeType {
1043                        name: stmt.name.clone(),
1044                        alterations: wal_alts,
1045                    }
1046                );
1047                Ok(QueryResult::status(format!(
1048                    "Altered edge type '{}'",
1049                    stmt.name
1050                )))
1051            }
1052            SchemaStatement::AlterGraphType(stmt) => {
1053                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1054                let mut wal_alts = Vec::new();
1055                for alt in &stmt.alterations {
1056                    match alt {
1057                        GraphTypeAlteration::AddNodeType(name) => {
1058                            self.catalog
1059                                .alter_graph_type_add_node_type(&stmt.name, name.clone())
1060                                .map_err(|e| {
1061                                    Error::Query(QueryError::new(
1062                                        QueryErrorKind::Semantic,
1063                                        e.to_string(),
1064                                    ))
1065                                })?;
1066                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1067                        }
1068                        GraphTypeAlteration::DropNodeType(name) => {
1069                            self.catalog
1070                                .alter_graph_type_drop_node_type(&stmt.name, name)
1071                                .map_err(|e| {
1072                                    Error::Query(QueryError::new(
1073                                        QueryErrorKind::Semantic,
1074                                        e.to_string(),
1075                                    ))
1076                                })?;
1077                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1078                        }
1079                        GraphTypeAlteration::AddEdgeType(name) => {
1080                            self.catalog
1081                                .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1082                                .map_err(|e| {
1083                                    Error::Query(QueryError::new(
1084                                        QueryErrorKind::Semantic,
1085                                        e.to_string(),
1086                                    ))
1087                                })?;
1088                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1089                        }
1090                        GraphTypeAlteration::DropEdgeType(name) => {
1091                            self.catalog
1092                                .alter_graph_type_drop_edge_type(&stmt.name, name)
1093                                .map_err(|e| {
1094                                    Error::Query(QueryError::new(
1095                                        QueryErrorKind::Semantic,
1096                                        e.to_string(),
1097                                    ))
1098                                })?;
1099                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1100                        }
1101                    }
1102                }
1103                wal_log!(
1104                    self,
1105                    WalRecord::AlterGraphType {
1106                        name: stmt.name.clone(),
1107                        alterations: wal_alts,
1108                    }
1109                );
1110                Ok(QueryResult::status(format!(
1111                    "Altered graph type '{}'",
1112                    stmt.name
1113                )))
1114            }
1115            SchemaStatement::CreateProcedure(stmt) => {
1116                use crate::catalog::ProcedureDefinition;
1117
1118                let def = ProcedureDefinition {
1119                    name: stmt.name.clone(),
1120                    params: stmt
1121                        .params
1122                        .iter()
1123                        .map(|p| (p.name.clone(), p.param_type.clone()))
1124                        .collect(),
1125                    returns: stmt
1126                        .returns
1127                        .iter()
1128                        .map(|r| (r.name.clone(), r.return_type.clone()))
1129                        .collect(),
1130                    body: stmt.body.clone(),
1131                };
1132
1133                if stmt.or_replace {
1134                    self.catalog.replace_procedure(def).map_err(|e| {
1135                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1136                    })?;
1137                } else {
1138                    match self.catalog.register_procedure(def) {
1139                        Ok(()) => {}
1140                        Err(_) if stmt.if_not_exists => {
1141                            return Ok(QueryResult::empty());
1142                        }
1143                        Err(e) => {
1144                            return Err(Error::Query(QueryError::new(
1145                                QueryErrorKind::Semantic,
1146                                e.to_string(),
1147                            )));
1148                        }
1149                    }
1150                }
1151
1152                wal_log!(
1153                    self,
1154                    WalRecord::CreateProcedure {
1155                        name: stmt.name.clone(),
1156                        params: stmt
1157                            .params
1158                            .iter()
1159                            .map(|p| (p.name.clone(), p.param_type.clone()))
1160                            .collect(),
1161                        returns: stmt
1162                            .returns
1163                            .iter()
1164                            .map(|r| (r.name.clone(), r.return_type.clone()))
1165                            .collect(),
1166                        body: stmt.body,
1167                    }
1168                );
1169                Ok(QueryResult::status(format!(
1170                    "Created procedure '{}'",
1171                    stmt.name
1172                )))
1173            }
1174            SchemaStatement::DropProcedure { name, if_exists } => {
1175                match self.catalog.drop_procedure(&name) {
1176                    Ok(()) => {}
1177                    Err(_) if if_exists => {
1178                        return Ok(QueryResult::empty());
1179                    }
1180                    Err(e) => {
1181                        return Err(Error::Query(QueryError::new(
1182                            QueryErrorKind::Semantic,
1183                            e.to_string(),
1184                        )));
1185                    }
1186                }
1187                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1188                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1189            }
1190        }
1191    }
1192
1193    /// Creates a vector index on the store by scanning existing nodes.
1194    #[cfg(all(feature = "gql", feature = "vector-index"))]
1195    fn create_vector_index_on_store(
1196        store: &LpgStore,
1197        label: &str,
1198        property: &str,
1199        dimensions: Option<usize>,
1200        metric: Option<&str>,
1201    ) -> Result<()> {
1202        use grafeo_common::types::{PropertyKey, Value};
1203        use grafeo_common::utils::error::Error;
1204        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1205
1206        let metric = match metric {
1207            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1208                Error::Internal(format!(
1209                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1210                ))
1211            })?,
1212            None => DistanceMetric::Cosine,
1213        };
1214
1215        let prop_key = PropertyKey::new(property);
1216        let mut found_dims: Option<usize> = dimensions;
1217        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1218
1219        for node in store.nodes_with_label(label) {
1220            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1221                if let Some(expected) = found_dims {
1222                    if v.len() != expected {
1223                        return Err(Error::Internal(format!(
1224                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1225                            v.len(),
1226                            node.id.0
1227                        )));
1228                    }
1229                } else {
1230                    found_dims = Some(v.len());
1231                }
1232                vectors.push((node.id, v.to_vec()));
1233            }
1234        }
1235
1236        let Some(dims) = found_dims else {
1237            return Err(Error::Internal(format!(
1238                "No vector properties found on :{label}({property}) and no dimensions specified"
1239            )));
1240        };
1241
1242        let config = HnswConfig::new(dims, metric);
1243        let index = HnswIndex::with_capacity(config, vectors.len());
1244        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1245        for (node_id, vec) in &vectors {
1246            index.insert(*node_id, vec, &accessor);
1247        }
1248
1249        store.add_vector_index(label, property, Arc::new(index));
1250        Ok(())
1251    }
1252
1253    /// Stub for when vector-index feature is not enabled.
1254    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1255    fn create_vector_index_on_store(
1256        _store: &LpgStore,
1257        _label: &str,
1258        _property: &str,
1259        _dimensions: Option<usize>,
1260        _metric: Option<&str>,
1261    ) -> Result<()> {
1262        Err(grafeo_common::utils::error::Error::Internal(
1263            "Vector index support requires the 'vector-index' feature".to_string(),
1264        ))
1265    }
1266
1267    /// Creates a text index on the store by scanning existing nodes.
1268    #[cfg(all(feature = "gql", feature = "text-index"))]
1269    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1270        use grafeo_common::types::{PropertyKey, Value};
1271        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1272
1273        let mut index = InvertedIndex::new(BM25Config::default());
1274        let prop_key = PropertyKey::new(property);
1275
1276        let nodes = store.nodes_by_label(label);
1277        for node_id in nodes {
1278            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1279                index.insert(node_id, text.as_str());
1280            }
1281        }
1282
1283        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1284        Ok(())
1285    }
1286
1287    /// Stub for when text-index feature is not enabled.
1288    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1289    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1290        Err(grafeo_common::utils::error::Error::Internal(
1291            "Text index support requires the 'text-index' feature".to_string(),
1292        ))
1293    }
1294
1295    /// Executes a GQL query.
1296    ///
1297    /// # Errors
1298    ///
1299    /// Returns an error if the query fails to parse or execute.
1300    ///
1301    /// # Examples
1302    ///
1303    /// ```no_run
1304    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1305    /// use grafeo_engine::GrafeoDB;
1306    ///
1307    /// let db = GrafeoDB::new_in_memory();
1308    /// let session = db.session();
1309    ///
1310    /// // Create a node
1311    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
1312    ///
1313    /// // Query nodes
1314    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
1315    /// for row in &result.rows {
1316    ///     println!("{:?}", row);
1317    /// }
1318    /// # Ok(())
1319    /// # }
1320    /// ```
1321    #[cfg(feature = "gql")]
1322    pub fn execute(&self, query: &str) -> Result<QueryResult> {
1323        self.require_lpg("GQL")?;
1324
1325        use crate::query::{
1326            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1327            processor::QueryLanguage, translators::gql,
1328        };
1329
1330        #[cfg(not(target_arch = "wasm32"))]
1331        let start_time = std::time::Instant::now();
1332
1333        // Parse and translate, checking for session/schema commands first
1334        let translation = gql::translate_full(query)?;
1335        let logical_plan = match translation {
1336            gql::GqlTranslationResult::SessionCommand(cmd) => {
1337                return self.execute_session_command(cmd);
1338            }
1339            gql::GqlTranslationResult::SchemaCommand(cmd) => {
1340                // All DDL is a write operation
1341                if *self.read_only_tx.lock() {
1342                    return Err(grafeo_common::utils::error::Error::Transaction(
1343                        grafeo_common::utils::error::TransactionError::ReadOnly,
1344                    ));
1345                }
1346                return self.execute_schema_command(cmd);
1347            }
1348            gql::GqlTranslationResult::Plan(plan) => {
1349                // Block mutations in read-only transactions
1350                if *self.read_only_tx.lock() && plan.root.has_mutations() {
1351                    return Err(grafeo_common::utils::error::Error::Transaction(
1352                        grafeo_common::utils::error::TransactionError::ReadOnly,
1353                    ));
1354                }
1355                plan
1356            }
1357        };
1358
1359        // Create cache key for this query
1360        let cache_key = CacheKey::new(query, QueryLanguage::Gql);
1361
1362        // Try to get cached optimized plan, or use the plan we just translated
1363        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1364            cached_plan
1365        } else {
1366            // Semantic validation
1367            let mut binder = Binder::new();
1368            let _binding_context = binder.bind(&logical_plan)?;
1369
1370            // Optimize the plan
1371            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1372            let plan = optimizer.optimize(logical_plan)?;
1373
1374            // Cache the optimized plan for future use
1375            self.query_cache.put_optimized(cache_key, plan.clone());
1376
1377            plan
1378        };
1379
1380        // EXPLAIN: annotate pushdown hints and return the plan tree
1381        if optimized_plan.explain {
1382            use crate::query::processor::{annotate_pushdown_hints, explain_result};
1383            let mut plan = optimized_plan;
1384            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
1385            return Ok(explain_result(&plan));
1386        }
1387
1388        let has_mutations = optimized_plan.root.has_mutations();
1389
1390        self.with_auto_commit(has_mutations, || {
1391            // Get transaction context for MVCC visibility
1392            let (viewing_epoch, tx_id) = self.get_transaction_context();
1393
1394            // Convert to physical plan with transaction context
1395            // (Physical planning cannot be cached as it depends on transaction state)
1396            let planner = self.create_planner(viewing_epoch, tx_id);
1397            let mut physical_plan = planner.plan(&optimized_plan)?;
1398
1399            // Execute the plan
1400            let executor = Executor::with_columns(physical_plan.columns.clone())
1401                .with_deadline(self.query_deadline());
1402            let mut result = executor.execute(physical_plan.operator.as_mut())?;
1403
1404            // Add execution metrics
1405            let rows_scanned = result.rows.len() as u64;
1406            #[cfg(not(target_arch = "wasm32"))]
1407            {
1408                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
1409                result.execution_time_ms = Some(elapsed_ms);
1410            }
1411            result.rows_scanned = Some(rows_scanned);
1412
1413            Ok(result)
1414        })
1415    }
1416
1417    /// Executes a GQL query with visibility at the specified epoch.
1418    ///
1419    /// This enables time-travel queries: the query sees the database
1420    /// as it existed at the given epoch.
1421    ///
1422    /// # Errors
1423    ///
1424    /// Returns an error if parsing or execution fails.
1425    #[cfg(feature = "gql")]
1426    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
1427        let previous = self.viewing_epoch_override.lock().replace(epoch);
1428        let result = self.execute(query);
1429        *self.viewing_epoch_override.lock() = previous;
1430        result
1431    }
1432
1433    /// Executes a GQL query with parameters.
1434    ///
1435    /// # Errors
1436    ///
1437    /// Returns an error if the query fails to parse or execute.
1438    #[cfg(feature = "gql")]
1439    pub fn execute_with_params(
1440        &self,
1441        query: &str,
1442        params: std::collections::HashMap<String, Value>,
1443    ) -> Result<QueryResult> {
1444        self.require_lpg("GQL")?;
1445
1446        use crate::query::processor::{QueryLanguage, QueryProcessor};
1447
1448        let has_mutations = Self::query_looks_like_mutation(query);
1449
1450        self.with_auto_commit(has_mutations, || {
1451            // Get transaction context for MVCC visibility
1452            let (viewing_epoch, tx_id) = self.get_transaction_context();
1453
1454            // Create processor with transaction context
1455            let processor = QueryProcessor::for_graph_store_with_tx(
1456                Arc::clone(&self.graph_store),
1457                Arc::clone(&self.tx_manager),
1458            );
1459
1460            // Apply transaction context if in a transaction
1461            let processor = if let Some(tx_id) = tx_id {
1462                processor.with_tx_context(viewing_epoch, tx_id)
1463            } else {
1464                processor
1465            };
1466
1467            processor.process(query, QueryLanguage::Gql, Some(&params))
1468        })
1469    }
1470
1471    /// Executes a GQL query with parameters.
1472    ///
1473    /// # Errors
1474    ///
1475    /// Returns an error if no query language is enabled.
1476    #[cfg(not(any(feature = "gql", feature = "cypher")))]
1477    pub fn execute_with_params(
1478        &self,
1479        _query: &str,
1480        _params: std::collections::HashMap<String, Value>,
1481    ) -> Result<QueryResult> {
1482        Err(grafeo_common::utils::error::Error::Internal(
1483            "No query language enabled".to_string(),
1484        ))
1485    }
1486
1487    /// Executes a GQL query.
1488    ///
1489    /// # Errors
1490    ///
1491    /// Returns an error if no query language is enabled.
1492    #[cfg(not(any(feature = "gql", feature = "cypher")))]
1493    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
1494        Err(grafeo_common::utils::error::Error::Internal(
1495            "No query language enabled".to_string(),
1496        ))
1497    }
1498
1499    /// Executes a Cypher query.
1500    ///
1501    /// # Errors
1502    ///
1503    /// Returns an error if the query fails to parse or execute.
1504    #[cfg(feature = "cypher")]
1505    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
1506        use crate::query::{
1507            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
1508            processor::QueryLanguage, translators::cypher,
1509        };
1510
1511        // Create cache key for this query
1512        let cache_key = CacheKey::new(query, QueryLanguage::Cypher);
1513
1514        // Try to get cached optimized plan
1515        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1516            cached_plan
1517        } else {
1518            // Parse and translate the query to a logical plan
1519            let logical_plan = cypher::translate(query)?;
1520
1521            // Semantic validation
1522            let mut binder = Binder::new();
1523            let _binding_context = binder.bind(&logical_plan)?;
1524
1525            // Optimize the plan
1526            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1527            let plan = optimizer.optimize(logical_plan)?;
1528
1529            // Cache the optimized plan
1530            self.query_cache.put_optimized(cache_key, plan.clone());
1531
1532            plan
1533        };
1534
1535        let has_mutations = optimized_plan.root.has_mutations();
1536
1537        self.with_auto_commit(has_mutations, || {
1538            // Get transaction context for MVCC visibility
1539            let (viewing_epoch, tx_id) = self.get_transaction_context();
1540
1541            // Convert to physical plan with transaction context
1542            let planner = self.create_planner(viewing_epoch, tx_id);
1543            let mut physical_plan = planner.plan(&optimized_plan)?;
1544
1545            // Execute the plan
1546            let executor = Executor::with_columns(physical_plan.columns.clone())
1547                .with_deadline(self.query_deadline());
1548            executor.execute(physical_plan.operator.as_mut())
1549        })
1550    }
1551
1552    /// Executes a Gremlin query.
1553    ///
1554    /// # Errors
1555    ///
1556    /// Returns an error if the query fails to parse or execute.
1557    ///
1558    /// # Examples
1559    ///
1560    /// ```no_run
1561    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1562    /// use grafeo_engine::GrafeoDB;
1563    ///
1564    /// let db = GrafeoDB::new_in_memory();
1565    /// let session = db.session();
1566    ///
1567    /// // Create some nodes first
1568    /// session.create_node(&["Person"]);
1569    ///
1570    /// // Query using Gremlin
1571    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
1572    /// # Ok(())
1573    /// # }
1574    /// ```
1575    #[cfg(feature = "gremlin")]
1576    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
1577        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
1578
1579        // Parse and translate the query to a logical plan
1580        let logical_plan = gremlin::translate(query)?;
1581
1582        // Semantic validation
1583        let mut binder = Binder::new();
1584        let _binding_context = binder.bind(&logical_plan)?;
1585
1586        // Optimize the plan
1587        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1588        let optimized_plan = optimizer.optimize(logical_plan)?;
1589
1590        let has_mutations = optimized_plan.root.has_mutations();
1591
1592        self.with_auto_commit(has_mutations, || {
1593            // Get transaction context for MVCC visibility
1594            let (viewing_epoch, tx_id) = self.get_transaction_context();
1595
1596            // Convert to physical plan with transaction context
1597            let planner = self.create_planner(viewing_epoch, tx_id);
1598            let mut physical_plan = planner.plan(&optimized_plan)?;
1599
1600            // Execute the plan
1601            let executor = Executor::with_columns(physical_plan.columns.clone())
1602                .with_deadline(self.query_deadline());
1603            executor.execute(physical_plan.operator.as_mut())
1604        })
1605    }
1606
1607    /// Executes a Gremlin query with parameters.
1608    ///
1609    /// # Errors
1610    ///
1611    /// Returns an error if the query fails to parse or execute.
1612    #[cfg(feature = "gremlin")]
1613    pub fn execute_gremlin_with_params(
1614        &self,
1615        query: &str,
1616        params: std::collections::HashMap<String, Value>,
1617    ) -> Result<QueryResult> {
1618        use crate::query::processor::{QueryLanguage, QueryProcessor};
1619
1620        let has_mutations = Self::query_looks_like_mutation(query);
1621
1622        self.with_auto_commit(has_mutations, || {
1623            // Get transaction context for MVCC visibility
1624            let (viewing_epoch, tx_id) = self.get_transaction_context();
1625
1626            // Create processor with transaction context
1627            let processor = QueryProcessor::for_graph_store_with_tx(
1628                Arc::clone(&self.graph_store),
1629                Arc::clone(&self.tx_manager),
1630            );
1631
1632            // Apply transaction context if in a transaction
1633            let processor = if let Some(tx_id) = tx_id {
1634                processor.with_tx_context(viewing_epoch, tx_id)
1635            } else {
1636                processor
1637            };
1638
1639            processor.process(query, QueryLanguage::Gremlin, Some(&params))
1640        })
1641    }
1642
1643    /// Executes a GraphQL query against the LPG store.
1644    ///
1645    /// # Errors
1646    ///
1647    /// Returns an error if the query fails to parse or execute.
1648    ///
1649    /// # Examples
1650    ///
1651    /// ```no_run
1652    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1653    /// use grafeo_engine::GrafeoDB;
1654    ///
1655    /// let db = GrafeoDB::new_in_memory();
1656    /// let session = db.session();
1657    ///
1658    /// // Create some nodes first
1659    /// session.create_node(&["User"]);
1660    ///
1661    /// // Query using GraphQL
1662    /// let result = session.execute_graphql("query { user { id name } }")?;
1663    /// # Ok(())
1664    /// # }
1665    /// ```
1666    #[cfg(feature = "graphql")]
1667    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
1668        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
1669
1670        // Parse and translate the query to a logical plan
1671        let logical_plan = graphql::translate(query)?;
1672
1673        // Semantic validation
1674        let mut binder = Binder::new();
1675        let _binding_context = binder.bind(&logical_plan)?;
1676
1677        // Optimize the plan
1678        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1679        let optimized_plan = optimizer.optimize(logical_plan)?;
1680
1681        let has_mutations = optimized_plan.root.has_mutations();
1682
1683        self.with_auto_commit(has_mutations, || {
1684            // Get transaction context for MVCC visibility
1685            let (viewing_epoch, tx_id) = self.get_transaction_context();
1686
1687            // Convert to physical plan with transaction context
1688            let planner = self.create_planner(viewing_epoch, tx_id);
1689            let mut physical_plan = planner.plan(&optimized_plan)?;
1690
1691            // Execute the plan
1692            let executor = Executor::with_columns(physical_plan.columns.clone())
1693                .with_deadline(self.query_deadline());
1694            executor.execute(physical_plan.operator.as_mut())
1695        })
1696    }
1697
1698    /// Executes a GraphQL query with parameters.
1699    ///
1700    /// # Errors
1701    ///
1702    /// Returns an error if the query fails to parse or execute.
1703    #[cfg(feature = "graphql")]
1704    pub fn execute_graphql_with_params(
1705        &self,
1706        query: &str,
1707        params: std::collections::HashMap<String, Value>,
1708    ) -> Result<QueryResult> {
1709        use crate::query::processor::{QueryLanguage, QueryProcessor};
1710
1711        let has_mutations = Self::query_looks_like_mutation(query);
1712
1713        self.with_auto_commit(has_mutations, || {
1714            // Get transaction context for MVCC visibility
1715            let (viewing_epoch, tx_id) = self.get_transaction_context();
1716
1717            // Create processor with transaction context
1718            let processor = QueryProcessor::for_graph_store_with_tx(
1719                Arc::clone(&self.graph_store),
1720                Arc::clone(&self.tx_manager),
1721            );
1722
1723            // Apply transaction context if in a transaction
1724            let processor = if let Some(tx_id) = tx_id {
1725                processor.with_tx_context(viewing_epoch, tx_id)
1726            } else {
1727                processor
1728            };
1729
1730            processor.process(query, QueryLanguage::GraphQL, Some(&params))
1731        })
1732    }
1733
1734    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
1735    ///
1736    /// # Errors
1737    ///
1738    /// Returns an error if the query fails to parse or execute.
1739    ///
1740    /// # Examples
1741    ///
1742    /// ```no_run
1743    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1744    /// use grafeo_engine::GrafeoDB;
1745    ///
1746    /// let db = GrafeoDB::new_in_memory();
1747    /// let session = db.session();
1748    ///
1749    /// let result = session.execute_sql(
1750    ///     "SELECT * FROM GRAPH_TABLE (
1751    ///         MATCH (n:Person)
1752    ///         COLUMNS (n.name AS name)
1753    ///     )"
1754    /// )?;
1755    /// # Ok(())
1756    /// # }
1757    /// ```
1758    #[cfg(feature = "sql-pgq")]
1759    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
1760        use crate::query::{
1761            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
1762            processor::QueryLanguage, translators::sql_pgq,
1763        };
1764
1765        // Parse and translate (always needed to check for DDL)
1766        let logical_plan = sql_pgq::translate(query)?;
1767
1768        // Handle DDL statements directly (they don't go through the query pipeline)
1769        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
1770            return Ok(QueryResult {
1771                columns: vec!["status".into()],
1772                column_types: vec![grafeo_common::types::LogicalType::String],
1773                rows: vec![vec![Value::from(format!(
1774                    "Property graph '{}' created ({} node tables, {} edge tables)",
1775                    cpg.name,
1776                    cpg.node_tables.len(),
1777                    cpg.edge_tables.len()
1778                ))]],
1779                execution_time_ms: None,
1780                rows_scanned: None,
1781                status_message: None,
1782            });
1783        }
1784
1785        // Create cache key for query plans
1786        let cache_key = CacheKey::new(query, QueryLanguage::SqlPgq);
1787
1788        // Try to get cached optimized plan
1789        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
1790            cached_plan
1791        } else {
1792            // Semantic validation
1793            let mut binder = Binder::new();
1794            let _binding_context = binder.bind(&logical_plan)?;
1795
1796            // Optimize the plan
1797            let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1798            let plan = optimizer.optimize(logical_plan)?;
1799
1800            // Cache the optimized plan
1801            self.query_cache.put_optimized(cache_key, plan.clone());
1802
1803            plan
1804        };
1805
1806        let has_mutations = optimized_plan.root.has_mutations();
1807
1808        self.with_auto_commit(has_mutations, || {
1809            // Get transaction context for MVCC visibility
1810            let (viewing_epoch, tx_id) = self.get_transaction_context();
1811
1812            // Convert to physical plan with transaction context
1813            let planner = self.create_planner(viewing_epoch, tx_id);
1814            let mut physical_plan = planner.plan(&optimized_plan)?;
1815
1816            // Execute the plan
1817            let executor = Executor::with_columns(physical_plan.columns.clone())
1818                .with_deadline(self.query_deadline());
1819            executor.execute(physical_plan.operator.as_mut())
1820        })
1821    }
1822
1823    /// Executes a SQL/PGQ query with parameters.
1824    ///
1825    /// # Errors
1826    ///
1827    /// Returns an error if the query fails to parse or execute.
1828    #[cfg(feature = "sql-pgq")]
1829    pub fn execute_sql_with_params(
1830        &self,
1831        query: &str,
1832        params: std::collections::HashMap<String, Value>,
1833    ) -> Result<QueryResult> {
1834        use crate::query::processor::{QueryLanguage, QueryProcessor};
1835
1836        let has_mutations = Self::query_looks_like_mutation(query);
1837
1838        self.with_auto_commit(has_mutations, || {
1839            // Get transaction context for MVCC visibility
1840            let (viewing_epoch, tx_id) = self.get_transaction_context();
1841
1842            // Create processor with transaction context
1843            let processor = QueryProcessor::for_graph_store_with_tx(
1844                Arc::clone(&self.graph_store),
1845                Arc::clone(&self.tx_manager),
1846            );
1847
1848            // Apply transaction context if in a transaction
1849            let processor = if let Some(tx_id) = tx_id {
1850                processor.with_tx_context(viewing_epoch, tx_id)
1851            } else {
1852                processor
1853            };
1854
1855            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
1856        })
1857    }
1858
1859    /// Executes a SPARQL query.
1860    ///
1861    /// # Errors
1862    ///
1863    /// Returns an error if the query fails to parse or execute.
1864    #[cfg(all(feature = "sparql", feature = "rdf"))]
1865    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
1866        use crate::query::{
1867            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
1868        };
1869
1870        // Parse and translate the SPARQL query to a logical plan
1871        let logical_plan = sparql::translate(query)?;
1872
1873        // Optimize the plan
1874        let optimizer = Optimizer::from_graph_store(&*self.graph_store);
1875        let optimized_plan = optimizer.optimize(logical_plan)?;
1876
1877        // Convert to physical plan using RDF planner
1878        let planner =
1879            RdfPlanner::new(Arc::clone(&self.rdf_store)).with_tx_id(*self.current_tx.lock());
1880        let mut physical_plan = planner.plan(&optimized_plan)?;
1881
1882        // Execute the plan
1883        let executor = Executor::with_columns(physical_plan.columns.clone())
1884            .with_deadline(self.query_deadline());
1885        executor.execute(physical_plan.operator.as_mut())
1886    }
1887
1888    /// Executes a SPARQL query with parameters.
1889    ///
1890    /// # Errors
1891    ///
1892    /// Returns an error if the query fails to parse or execute.
1893    #[cfg(all(feature = "sparql", feature = "rdf"))]
1894    pub fn execute_sparql_with_params(
1895        &self,
1896        query: &str,
1897        _params: std::collections::HashMap<String, Value>,
1898    ) -> Result<QueryResult> {
1899        // TODO: Implement parameter substitution for SPARQL
1900        // For now, just execute the query without parameters
1901        self.execute_sparql(query)
1902    }
1903
1904    /// Executes a query in the specified language by name.
1905    ///
1906    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
1907    /// `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
1908    ///
1909    /// # Errors
1910    ///
1911    /// Returns an error if the language is unknown/disabled or the query fails.
1912    pub fn execute_language(
1913        &self,
1914        query: &str,
1915        language: &str,
1916        params: Option<std::collections::HashMap<String, Value>>,
1917    ) -> Result<QueryResult> {
1918        match language {
1919            "gql" => {
1920                if let Some(p) = params {
1921                    self.execute_with_params(query, p)
1922                } else {
1923                    self.execute(query)
1924                }
1925            }
1926            #[cfg(feature = "cypher")]
1927            "cypher" => {
1928                if let Some(p) = params {
1929                    use crate::query::processor::{QueryLanguage, QueryProcessor};
1930                    let has_mutations = Self::query_looks_like_mutation(query);
1931                    self.with_auto_commit(has_mutations, || {
1932                        let processor = QueryProcessor::for_graph_store_with_tx(
1933                            Arc::clone(&self.graph_store),
1934                            Arc::clone(&self.tx_manager),
1935                        );
1936                        let (viewing_epoch, tx_id) = self.get_transaction_context();
1937                        let processor = if let Some(tx_id) = tx_id {
1938                            processor.with_tx_context(viewing_epoch, tx_id)
1939                        } else {
1940                            processor
1941                        };
1942                        processor.process(query, QueryLanguage::Cypher, Some(&p))
1943                    })
1944                } else {
1945                    self.execute_cypher(query)
1946                }
1947            }
1948            #[cfg(feature = "gremlin")]
1949            "gremlin" => {
1950                if let Some(p) = params {
1951                    self.execute_gremlin_with_params(query, p)
1952                } else {
1953                    self.execute_gremlin(query)
1954                }
1955            }
1956            #[cfg(feature = "graphql")]
1957            "graphql" => {
1958                if let Some(p) = params {
1959                    self.execute_graphql_with_params(query, p)
1960                } else {
1961                    self.execute_graphql(query)
1962                }
1963            }
1964            #[cfg(feature = "sql-pgq")]
1965            "sql" | "sql-pgq" => {
1966                if let Some(p) = params {
1967                    self.execute_sql_with_params(query, p)
1968                } else {
1969                    self.execute_sql(query)
1970                }
1971            }
1972            #[cfg(all(feature = "sparql", feature = "rdf"))]
1973            "sparql" => {
1974                if let Some(p) = params {
1975                    self.execute_sparql_with_params(query, p)
1976                } else {
1977                    self.execute_sparql(query)
1978                }
1979            }
1980            other => Err(grafeo_common::utils::error::Error::Query(
1981                grafeo_common::utils::error::QueryError::new(
1982                    grafeo_common::utils::error::QueryErrorKind::Semantic,
1983                    format!("Unknown query language: '{other}'"),
1984                ),
1985            )),
1986        }
1987    }
1988
1989    /// Begins a new transaction.
1990    ///
1991    /// # Errors
1992    ///
1993    /// Returns an error if a transaction is already active.
1994    ///
1995    /// # Examples
1996    ///
1997    /// ```no_run
1998    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1999    /// use grafeo_engine::GrafeoDB;
2000    ///
2001    /// let db = GrafeoDB::new_in_memory();
2002    /// let mut session = db.session();
2003    ///
2004    /// session.begin_tx()?;
2005    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2006    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
2007    /// session.commit()?; // Both inserts committed atomically
2008    /// # Ok(())
2009    /// # }
2010    /// ```
2011    pub fn begin_tx(&mut self) -> Result<()> {
2012        self.begin_tx_inner(false, None)
2013    }
2014
2015    /// Begins a transaction with a specific isolation level.
2016    ///
2017    /// See [`begin_tx`](Self::begin_tx) for the default (`SnapshotIsolation`).
2018    ///
2019    /// # Errors
2020    ///
2021    /// Returns an error if a transaction is already active.
2022    pub fn begin_tx_with_isolation(
2023        &mut self,
2024        isolation_level: crate::transaction::IsolationLevel,
2025    ) -> Result<()> {
2026        self.begin_tx_inner(false, Some(isolation_level))
2027    }
2028
2029    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
2030    fn begin_tx_inner(
2031        &self,
2032        read_only: bool,
2033        isolation_level: Option<crate::transaction::IsolationLevel>,
2034    ) -> Result<()> {
2035        let mut current = self.current_tx.lock();
2036        if current.is_some() {
2037            return Err(grafeo_common::utils::error::Error::Transaction(
2038                grafeo_common::utils::error::TransactionError::InvalidState(
2039                    "Transaction already active".to_string(),
2040                ),
2041            ));
2042        }
2043
2044        self.tx_start_node_count
2045            .store(self.store.node_count(), Ordering::Relaxed);
2046        self.tx_start_edge_count
2047            .store(self.store.edge_count(), Ordering::Relaxed);
2048        let tx_id = if let Some(level) = isolation_level {
2049            self.tx_manager.begin_with_isolation(level)
2050        } else {
2051            self.tx_manager.begin()
2052        };
2053        *current = Some(tx_id);
2054        *self.read_only_tx.lock() = read_only;
2055        Ok(())
2056    }
2057
2058    /// Commits the current transaction.
2059    ///
2060    /// Makes all changes since [`begin_tx`](Self::begin_tx) permanent.
2061    ///
2062    /// # Errors
2063    ///
2064    /// Returns an error if no transaction is active.
2065    pub fn commit(&mut self) -> Result<()> {
2066        self.commit_inner()
2067    }
2068
2069    /// Core commit logic, usable from both `&mut self` and `&self` paths.
2070    fn commit_inner(&self) -> Result<()> {
2071        let tx_id = self.current_tx.lock().take().ok_or_else(|| {
2072            grafeo_common::utils::error::Error::Transaction(
2073                grafeo_common::utils::error::TransactionError::InvalidState(
2074                    "No active transaction".to_string(),
2075                ),
2076            )
2077        })?;
2078
2079        // Commit RDF store pending operations
2080        #[cfg(feature = "rdf")]
2081        self.rdf_store.commit_tx(tx_id);
2082
2083        self.tx_manager.commit(tx_id)?;
2084
2085        // Sync the LpgStore epoch with the TxManager so that
2086        // convenience lookups (edge_type, get_edge, get_node) that use
2087        // store.current_epoch() can see versions created at the latest epoch.
2088        self.store.sync_epoch(self.tx_manager.current_epoch());
2089
2090        // Reset read-only flag
2091        *self.read_only_tx.lock() = false;
2092
2093        // Auto-GC: periodically prune old MVCC versions
2094        if self.gc_interval > 0 {
2095            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
2096            if count.is_multiple_of(self.gc_interval) {
2097                let min_epoch = self.tx_manager.min_active_epoch();
2098                self.store.gc_versions(min_epoch);
2099                self.tx_manager.gc();
2100            }
2101        }
2102
2103        Ok(())
2104    }
2105
2106    /// Aborts the current transaction.
2107    ///
2108    /// Discards all changes since [`begin_tx`](Self::begin_tx).
2109    ///
2110    /// # Errors
2111    ///
2112    /// Returns an error if no transaction is active.
2113    ///
2114    /// # Examples
2115    ///
2116    /// ```no_run
2117    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2118    /// use grafeo_engine::GrafeoDB;
2119    ///
2120    /// let db = GrafeoDB::new_in_memory();
2121    /// let mut session = db.session();
2122    ///
2123    /// session.begin_tx()?;
2124    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2125    /// session.rollback()?; // Insert is discarded
2126    /// # Ok(())
2127    /// # }
2128    /// ```
2129    pub fn rollback(&mut self) -> Result<()> {
2130        self.rollback_inner()
2131    }
2132
2133    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
2134    fn rollback_inner(&self) -> Result<()> {
2135        let tx_id = self.current_tx.lock().take().ok_or_else(|| {
2136            grafeo_common::utils::error::Error::Transaction(
2137                grafeo_common::utils::error::TransactionError::InvalidState(
2138                    "No active transaction".to_string(),
2139                ),
2140            )
2141        })?;
2142
2143        // Reset read-only flag
2144        *self.read_only_tx.lock() = false;
2145
2146        // Discard uncommitted versions in the LPG store
2147        self.store.discard_uncommitted_versions(tx_id);
2148
2149        // Discard pending operations in the RDF store
2150        #[cfg(feature = "rdf")]
2151        self.rdf_store.rollback_tx(tx_id);
2152
2153        // Mark transaction as aborted in the manager
2154        self.tx_manager.abort(tx_id)
2155    }
2156
2157    /// Returns whether a transaction is active.
2158    #[must_use]
2159    pub fn in_transaction(&self) -> bool {
2160        self.current_tx.lock().is_some()
2161    }
2162
2163    /// Returns the current transaction ID, if any.
2164    #[must_use]
2165    pub(crate) fn current_tx_id(&self) -> Option<TxId> {
2166        *self.current_tx.lock()
2167    }
2168
2169    /// Returns a reference to the transaction manager.
2170    #[must_use]
2171    pub(crate) fn tx_manager(&self) -> &TransactionManager {
2172        &self.tx_manager
2173    }
2174
2175    /// Returns the store's current node count and the count at transaction start.
2176    #[must_use]
2177    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
2178        (
2179            self.tx_start_node_count.load(Ordering::Relaxed),
2180            self.store.node_count(),
2181        )
2182    }
2183
2184    /// Returns the store's current edge count and the count at transaction start.
2185    #[must_use]
2186    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
2187        (
2188            self.tx_start_edge_count.load(Ordering::Relaxed),
2189            self.store.edge_count(),
2190        )
2191    }
2192
2193    /// Prepares the current transaction for a two-phase commit.
2194    ///
2195    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
2196    /// lets you inspect pending changes and attach metadata before finalizing.
2197    /// The mutable borrow prevents concurrent operations while the commit is
2198    /// pending.
2199    ///
2200    /// If the `PreparedCommit` is dropped without calling `commit()` or
2201    /// `abort()`, the transaction is automatically rolled back.
2202    ///
2203    /// # Errors
2204    ///
2205    /// Returns an error if no transaction is active.
2206    ///
2207    /// # Examples
2208    ///
2209    /// ```no_run
2210    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2211    /// use grafeo_engine::GrafeoDB;
2212    ///
2213    /// let db = GrafeoDB::new_in_memory();
2214    /// let mut session = db.session();
2215    ///
2216    /// session.begin_tx()?;
2217    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2218    ///
2219    /// let mut prepared = session.prepare_commit()?;
2220    /// println!("Nodes written: {}", prepared.info().nodes_written);
2221    /// prepared.set_metadata("audit_user", "admin");
2222    /// prepared.commit()?;
2223    /// # Ok(())
2224    /// # }
2225    /// ```
2226    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
2227        crate::transaction::PreparedCommit::new(self)
2228    }
2229
2230    /// Sets auto-commit mode.
2231    pub fn set_auto_commit(&mut self, auto_commit: bool) {
2232        self.auto_commit = auto_commit;
2233    }
2234
2235    /// Returns whether auto-commit is enabled.
2236    #[must_use]
2237    pub fn auto_commit(&self) -> bool {
2238        self.auto_commit
2239    }
2240
2241    /// Returns `true` if auto-commit should wrap this execution.
2242    ///
2243    /// Auto-commit kicks in when: the session is in auto-commit mode,
2244    /// no explicit transaction is active, and the query mutates data.
2245    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
2246        self.auto_commit && has_mutations && self.current_tx.lock().is_none()
2247    }
2248
2249    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
2250    /// returns `true`. On error the transaction is rolled back.
2251    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
2252    where
2253        F: FnOnce() -> Result<QueryResult>,
2254    {
2255        if self.needs_auto_commit(has_mutations) {
2256            self.begin_tx_inner(false, None)?;
2257            match body() {
2258                Ok(result) => {
2259                    self.commit_inner()?;
2260                    Ok(result)
2261                }
2262                Err(e) => {
2263                    let _ = self.rollback_inner();
2264                    Err(e)
2265                }
2266            }
2267        } else {
2268            body()
2269        }
2270    }
2271
2272    /// Quick heuristic: returns `true` when the query text looks like it
2273    /// performs a mutation. Used by `_with_params` paths that go through the
2274    /// `QueryProcessor` (where the logical plan isn't available before
2275    /// execution). False negatives are harmless: the data just won't be
2276    /// auto-committed, which matches the prior behaviour.
2277    fn query_looks_like_mutation(query: &str) -> bool {
2278        let upper = query.to_ascii_uppercase();
2279        upper.contains("INSERT")
2280            || upper.contains("CREATE")
2281            || upper.contains("DELETE")
2282            || upper.contains("MERGE")
2283            || upper.contains("SET")
2284            || upper.contains("REMOVE")
2285            || upper.contains("DROP")
2286            || upper.contains("ALTER")
2287    }
2288
2289    /// Computes the wall-clock deadline for query execution.
2290    #[must_use]
2291    fn query_deadline(&self) -> Option<Instant> {
2292        #[cfg(not(target_arch = "wasm32"))]
2293        {
2294            self.query_timeout.map(|d| Instant::now() + d)
2295        }
2296        #[cfg(target_arch = "wasm32")]
2297        {
2298            let _ = &self.query_timeout;
2299            None
2300        }
2301    }
2302
2303    /// Evaluates a simple integer literal from a session parameter expression.
2304    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
2305        use grafeo_adapters::query::gql::ast::{Expression, Literal};
2306        match expr {
2307            Expression::Literal(Literal::Integer(n)) => Some(*n),
2308            _ => None,
2309        }
2310    }
2311
2312    /// Returns the current transaction context for MVCC visibility.
2313    ///
2314    /// Returns `(viewing_epoch, tx_id)` where:
2315    /// - `viewing_epoch` is the epoch at which to check version visibility
2316    /// - `tx_id` is the current transaction ID (if in a transaction)
2317    #[must_use]
2318    fn get_transaction_context(&self) -> (EpochId, Option<TxId>) {
2319        // Time-travel override takes precedence (read-only, no tx context)
2320        if let Some(epoch) = *self.viewing_epoch_override.lock() {
2321            return (epoch, None);
2322        }
2323
2324        if let Some(tx_id) = *self.current_tx.lock() {
2325            // In a transaction: use the transaction's start epoch
2326            let epoch = self
2327                .tx_manager
2328                .start_epoch(tx_id)
2329                .unwrap_or_else(|| self.tx_manager.current_epoch());
2330            (epoch, Some(tx_id))
2331        } else {
2332            // No transaction: use current epoch
2333            (self.tx_manager.current_epoch(), None)
2334        }
2335    }
2336
2337    /// Creates a planner with transaction context and constraint validator.
2338    fn create_planner(&self, viewing_epoch: EpochId, tx_id: Option<TxId>) -> crate::query::Planner {
2339        use crate::query::Planner;
2340
2341        let mut planner = Planner::with_context(
2342            Arc::clone(&self.graph_store),
2343            Arc::clone(&self.tx_manager),
2344            tx_id,
2345            viewing_epoch,
2346        )
2347        .with_factorized_execution(self.factorized_execution)
2348        .with_catalog(Arc::clone(&self.catalog));
2349
2350        // Attach the constraint validator for schema enforcement
2351        let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog));
2352        planner = planner.with_validator(Arc::new(validator));
2353
2354        planner
2355    }
2356
2357    /// Creates a node directly (bypassing query execution).
2358    ///
2359    /// This is a low-level API for testing and direct manipulation.
2360    /// If a transaction is active, the node will be versioned with the transaction ID.
2361    pub fn create_node(&self, labels: &[&str]) -> NodeId {
2362        let (epoch, tx_id) = self.get_transaction_context();
2363        self.store
2364            .create_node_versioned(labels, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2365    }
2366
2367    /// Creates a node with properties.
2368    ///
2369    /// If a transaction is active, the node will be versioned with the transaction ID.
2370    pub fn create_node_with_props<'a>(
2371        &self,
2372        labels: &[&str],
2373        properties: impl IntoIterator<Item = (&'a str, Value)>,
2374    ) -> NodeId {
2375        let (epoch, tx_id) = self.get_transaction_context();
2376        self.store.create_node_with_props_versioned(
2377            labels,
2378            properties,
2379            epoch,
2380            tx_id.unwrap_or(TxId::SYSTEM),
2381        )
2382    }
2383
2384    /// Creates an edge between two nodes.
2385    ///
2386    /// This is a low-level API for testing and direct manipulation.
2387    /// If a transaction is active, the edge will be versioned with the transaction ID.
2388    pub fn create_edge(
2389        &self,
2390        src: NodeId,
2391        dst: NodeId,
2392        edge_type: &str,
2393    ) -> grafeo_common::types::EdgeId {
2394        let (epoch, tx_id) = self.get_transaction_context();
2395        self.store
2396            .create_edge_versioned(src, dst, edge_type, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2397    }
2398
2399    // =========================================================================
2400    // Direct Lookup APIs (bypass query planning for O(1) point reads)
2401    // =========================================================================
2402
2403    /// Gets a node by ID directly, bypassing query planning.
2404    ///
2405    /// This is the fastest way to retrieve a single node when you know its ID.
2406    /// Skips parsing, binding, optimization, and physical planning entirely.
2407    ///
2408    /// # Performance
2409    ///
2410    /// - Time complexity: O(1) average case
2411    /// - No lock contention (uses DashMap internally)
2412    /// - ~20-30x faster than equivalent MATCH query
2413    ///
2414    /// # Example
2415    ///
2416    /// ```no_run
2417    /// # use grafeo_engine::GrafeoDB;
2418    /// # let db = GrafeoDB::new_in_memory();
2419    /// let session = db.session();
2420    /// let node_id = session.create_node(&["Person"]);
2421    ///
2422    /// // Direct lookup - O(1), no query planning
2423    /// let node = session.get_node(node_id);
2424    /// assert!(node.is_some());
2425    /// ```
2426    #[must_use]
2427    pub fn get_node(&self, id: NodeId) -> Option<Node> {
2428        let (epoch, tx_id) = self.get_transaction_context();
2429        self.store
2430            .get_node_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2431    }
2432
2433    /// Gets a single property from a node by ID, bypassing query planning.
2434    ///
2435    /// More efficient than `get_node()` when you only need one property,
2436    /// as it avoids loading the full node with all properties.
2437    ///
2438    /// # Performance
2439    ///
2440    /// - Time complexity: O(1) average case
2441    /// - No query planning overhead
2442    ///
2443    /// # Example
2444    ///
2445    /// ```no_run
2446    /// # use grafeo_engine::GrafeoDB;
2447    /// # use grafeo_common::types::Value;
2448    /// # let db = GrafeoDB::new_in_memory();
2449    /// let session = db.session();
2450    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
2451    ///
2452    /// // Direct property access - O(1)
2453    /// let name = session.get_node_property(id, "name");
2454    /// assert_eq!(name, Some(Value::String("Alix".into())));
2455    /// ```
2456    #[must_use]
2457    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
2458        self.get_node(id)
2459            .and_then(|node| node.get_property(key).cloned())
2460    }
2461
2462    /// Gets an edge by ID directly, bypassing query planning.
2463    ///
2464    /// # Performance
2465    ///
2466    /// - Time complexity: O(1) average case
2467    /// - No lock contention
2468    #[must_use]
2469    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
2470        let (epoch, tx_id) = self.get_transaction_context();
2471        self.store
2472            .get_edge_versioned(id, epoch, tx_id.unwrap_or(TxId::SYSTEM))
2473    }
2474
2475    /// Gets outgoing neighbors of a node directly, bypassing query planning.
2476    ///
2477    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
2478    ///
2479    /// # Performance
2480    ///
2481    /// - Time complexity: O(degree) where degree is the number of outgoing edges
2482    /// - Uses adjacency index for direct access
2483    /// - ~10-20x faster than equivalent MATCH query
2484    ///
2485    /// # Example
2486    ///
2487    /// ```no_run
2488    /// # use grafeo_engine::GrafeoDB;
2489    /// # let db = GrafeoDB::new_in_memory();
2490    /// let session = db.session();
2491    /// let alix = session.create_node(&["Person"]);
2492    /// let gus = session.create_node(&["Person"]);
2493    /// session.create_edge(alix, gus, "KNOWS");
2494    ///
2495    /// // Direct neighbor lookup - O(degree)
2496    /// let neighbors = session.get_neighbors_outgoing(alix);
2497    /// assert_eq!(neighbors.len(), 1);
2498    /// assert_eq!(neighbors[0].0, gus);
2499    /// ```
2500    #[must_use]
2501    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2502        self.store.edges_from(node, Direction::Outgoing).collect()
2503    }
2504
2505    /// Gets incoming neighbors of a node directly, bypassing query planning.
2506    ///
2507    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
2508    ///
2509    /// # Performance
2510    ///
2511    /// - Time complexity: O(degree) where degree is the number of incoming edges
2512    /// - Uses backward adjacency index for direct access
2513    #[must_use]
2514    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2515        self.store.edges_from(node, Direction::Incoming).collect()
2516    }
2517
2518    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
2519    ///
2520    /// # Example
2521    ///
2522    /// ```no_run
2523    /// # use grafeo_engine::GrafeoDB;
2524    /// # let db = GrafeoDB::new_in_memory();
2525    /// # let session = db.session();
2526    /// # let alix = session.create_node(&["Person"]);
2527    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
2528    /// ```
2529    #[must_use]
2530    pub fn get_neighbors_outgoing_by_type(
2531        &self,
2532        node: NodeId,
2533        edge_type: &str,
2534    ) -> Vec<(NodeId, EdgeId)> {
2535        self.store
2536            .edges_from(node, Direction::Outgoing)
2537            .filter(|(_, edge_id)| {
2538                self.get_edge(*edge_id)
2539                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
2540            })
2541            .collect()
2542    }
2543
2544    /// Checks if a node exists, bypassing query planning.
2545    ///
2546    /// # Performance
2547    ///
2548    /// - Time complexity: O(1)
2549    /// - Fastest existence check available
2550    #[must_use]
2551    pub fn node_exists(&self, id: NodeId) -> bool {
2552        self.get_node(id).is_some()
2553    }
2554
2555    /// Checks if an edge exists, bypassing query planning.
2556    #[must_use]
2557    pub fn edge_exists(&self, id: EdgeId) -> bool {
2558        self.get_edge(id).is_some()
2559    }
2560
2561    /// Gets the degree (number of edges) of a node.
2562    ///
2563    /// Returns (outgoing_degree, incoming_degree).
2564    #[must_use]
2565    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
2566        let out = self.store.out_degree(node);
2567        let in_degree = self.store.in_degree(node);
2568        (out, in_degree)
2569    }
2570
2571    /// Batch lookup of multiple nodes by ID.
2572    ///
2573    /// More efficient than calling `get_node()` in a loop because it
2574    /// amortizes overhead.
2575    ///
2576    /// # Performance
2577    ///
2578    /// - Time complexity: O(n) where n is the number of IDs
2579    /// - Better cache utilization than individual lookups
2580    #[must_use]
2581    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
2582        let (epoch, tx_id) = self.get_transaction_context();
2583        let tx = tx_id.unwrap_or(TxId::SYSTEM);
2584        ids.iter()
2585            .map(|&id| self.store.get_node_versioned(id, epoch, tx))
2586            .collect()
2587    }
2588
2589    // ── Change Data Capture ─────────────────────────────────────────────
2590
2591    /// Returns the full change history for an entity (node or edge).
2592    #[cfg(feature = "cdc")]
2593    pub fn history(
2594        &self,
2595        entity_id: impl Into<crate::cdc::EntityId>,
2596    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2597        Ok(self.cdc_log.history(entity_id.into()))
2598    }
2599
2600    /// Returns change events for an entity since the given epoch.
2601    #[cfg(feature = "cdc")]
2602    pub fn history_since(
2603        &self,
2604        entity_id: impl Into<crate::cdc::EntityId>,
2605        since_epoch: EpochId,
2606    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2607        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
2608    }
2609
2610    /// Returns all change events across all entities in an epoch range.
2611    #[cfg(feature = "cdc")]
2612    pub fn changes_between(
2613        &self,
2614        start_epoch: EpochId,
2615        end_epoch: EpochId,
2616    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2617        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
2618    }
2619}
2620
2621#[cfg(test)]
2622mod tests {
2623    use crate::database::GrafeoDB;
2624
2625    #[test]
2626    fn test_session_create_node() {
2627        let db = GrafeoDB::new_in_memory();
2628        let session = db.session();
2629
2630        let id = session.create_node(&["Person"]);
2631        assert!(id.is_valid());
2632        assert_eq!(db.node_count(), 1);
2633    }
2634
2635    #[test]
2636    fn test_session_transaction() {
2637        let db = GrafeoDB::new_in_memory();
2638        let mut session = db.session();
2639
2640        assert!(!session.in_transaction());
2641
2642        session.begin_tx().unwrap();
2643        assert!(session.in_transaction());
2644
2645        session.commit().unwrap();
2646        assert!(!session.in_transaction());
2647    }
2648
2649    #[test]
2650    fn test_session_transaction_context() {
2651        let db = GrafeoDB::new_in_memory();
2652        let mut session = db.session();
2653
2654        // Without transaction - context should have current epoch and no tx_id
2655        let (_epoch1, tx_id1) = session.get_transaction_context();
2656        assert!(tx_id1.is_none());
2657
2658        // Start a transaction
2659        session.begin_tx().unwrap();
2660        let (epoch2, tx_id2) = session.get_transaction_context();
2661        assert!(tx_id2.is_some());
2662        // Transaction should have a valid epoch
2663        let _ = epoch2; // Use the variable
2664
2665        // Commit and verify
2666        session.commit().unwrap();
2667        let (epoch3, tx_id3) = session.get_transaction_context();
2668        assert!(tx_id3.is_none());
2669        // Epoch should have advanced after commit
2670        assert!(epoch3.as_u64() >= epoch2.as_u64());
2671    }
2672
2673    #[test]
2674    fn test_session_rollback() {
2675        let db = GrafeoDB::new_in_memory();
2676        let mut session = db.session();
2677
2678        session.begin_tx().unwrap();
2679        session.rollback().unwrap();
2680        assert!(!session.in_transaction());
2681    }
2682
2683    #[test]
2684    fn test_session_rollback_discards_versions() {
2685        use grafeo_common::types::TxId;
2686
2687        let db = GrafeoDB::new_in_memory();
2688
2689        // Create a node outside of any transaction (at system level)
2690        let node_before = db.store().create_node(&["Person"]);
2691        assert!(node_before.is_valid());
2692        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2693
2694        // Start a transaction
2695        let mut session = db.session();
2696        session.begin_tx().unwrap();
2697        let tx_id = session.current_tx.lock().unwrap();
2698
2699        // Create a node versioned with the transaction's ID
2700        let epoch = db.store().current_epoch();
2701        let node_in_tx = db.store().create_node_versioned(&["Person"], epoch, tx_id);
2702        assert!(node_in_tx.is_valid());
2703
2704        // Should see 2 nodes at this point
2705        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2706
2707        // Rollback the transaction
2708        session.rollback().unwrap();
2709        assert!(!session.in_transaction());
2710
2711        // The node created in the transaction should be discarded
2712        // Only the first node should remain visible
2713        let count_after = db.node_count();
2714        assert_eq!(
2715            count_after, 1,
2716            "Rollback should discard uncommitted node, but got {count_after}"
2717        );
2718
2719        // The original node should still be accessible
2720        let current_epoch = db.store().current_epoch();
2721        assert!(
2722            db.store()
2723                .get_node_versioned(node_before, current_epoch, TxId::SYSTEM)
2724                .is_some(),
2725            "Original node should still exist"
2726        );
2727
2728        // The node created in the transaction should not be accessible
2729        assert!(
2730            db.store()
2731                .get_node_versioned(node_in_tx, current_epoch, TxId::SYSTEM)
2732                .is_none(),
2733            "Transaction node should be gone"
2734        );
2735    }
2736
2737    #[test]
2738    fn test_session_create_node_in_transaction() {
2739        // Test that session.create_node() is transaction-aware
2740        let db = GrafeoDB::new_in_memory();
2741
2742        // Create a node outside of any transaction
2743        let node_before = db.create_node(&["Person"]);
2744        assert!(node_before.is_valid());
2745        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2746
2747        // Start a transaction and create a node through the session
2748        let mut session = db.session();
2749        session.begin_tx().unwrap();
2750
2751        // Create a node through session.create_node() - should be versioned with tx
2752        let node_in_tx = session.create_node(&["Person"]);
2753        assert!(node_in_tx.is_valid());
2754
2755        // Should see 2 nodes at this point
2756        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2757
2758        // Rollback the transaction
2759        session.rollback().unwrap();
2760
2761        // The node created via session.create_node() should be discarded
2762        let count_after = db.node_count();
2763        assert_eq!(
2764            count_after, 1,
2765            "Rollback should discard node created via session.create_node(), but got {count_after}"
2766        );
2767    }
2768
2769    #[test]
2770    fn test_session_create_node_with_props_in_transaction() {
2771        use grafeo_common::types::Value;
2772
2773        // Test that session.create_node_with_props() is transaction-aware
2774        let db = GrafeoDB::new_in_memory();
2775
2776        // Create a node outside of any transaction
2777        db.create_node(&["Person"]);
2778        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
2779
2780        // Start a transaction and create a node with properties
2781        let mut session = db.session();
2782        session.begin_tx().unwrap();
2783
2784        let node_in_tx =
2785            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
2786        assert!(node_in_tx.is_valid());
2787
2788        // Should see 2 nodes
2789        assert_eq!(db.node_count(), 2, "Should have 2 nodes during transaction");
2790
2791        // Rollback the transaction
2792        session.rollback().unwrap();
2793
2794        // The node should be discarded
2795        let count_after = db.node_count();
2796        assert_eq!(
2797            count_after, 1,
2798            "Rollback should discard node created via session.create_node_with_props()"
2799        );
2800    }
2801
2802    #[cfg(feature = "gql")]
2803    mod gql_tests {
2804        use super::*;
2805
2806        #[test]
2807        fn test_gql_query_execution() {
2808            let db = GrafeoDB::new_in_memory();
2809            let session = db.session();
2810
2811            // Create some test data
2812            session.create_node(&["Person"]);
2813            session.create_node(&["Person"]);
2814            session.create_node(&["Animal"]);
2815
2816            // Execute a GQL query
2817            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
2818
2819            // Should return 2 Person nodes
2820            assert_eq!(result.row_count(), 2);
2821            assert_eq!(result.column_count(), 1);
2822            assert_eq!(result.columns[0], "n");
2823        }
2824
2825        #[test]
2826        fn test_gql_empty_result() {
2827            let db = GrafeoDB::new_in_memory();
2828            let session = db.session();
2829
2830            // No data in database
2831            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
2832
2833            assert_eq!(result.row_count(), 0);
2834        }
2835
2836        #[test]
2837        fn test_gql_parse_error() {
2838            let db = GrafeoDB::new_in_memory();
2839            let session = db.session();
2840
2841            // Invalid GQL syntax
2842            let result = session.execute("MATCH (n RETURN n");
2843
2844            assert!(result.is_err());
2845        }
2846
2847        #[test]
2848        fn test_gql_relationship_traversal() {
2849            let db = GrafeoDB::new_in_memory();
2850            let session = db.session();
2851
2852            // Create a graph: Alix -> Gus, Alix -> Vincent
2853            let alix = session.create_node(&["Person"]);
2854            let gus = session.create_node(&["Person"]);
2855            let vincent = session.create_node(&["Person"]);
2856
2857            session.create_edge(alix, gus, "KNOWS");
2858            session.create_edge(alix, vincent, "KNOWS");
2859
2860            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
2861            let result = session
2862                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
2863                .unwrap();
2864
2865            // Should return 2 rows (Alix->Gus, Alix->Vincent)
2866            assert_eq!(result.row_count(), 2);
2867            assert_eq!(result.column_count(), 2);
2868            assert_eq!(result.columns[0], "a");
2869            assert_eq!(result.columns[1], "b");
2870        }
2871
2872        #[test]
2873        fn test_gql_relationship_with_type_filter() {
2874            let db = GrafeoDB::new_in_memory();
2875            let session = db.session();
2876
2877            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
2878            let alix = session.create_node(&["Person"]);
2879            let gus = session.create_node(&["Person"]);
2880            let vincent = session.create_node(&["Person"]);
2881
2882            session.create_edge(alix, gus, "KNOWS");
2883            session.create_edge(alix, vincent, "WORKS_WITH");
2884
2885            // Query only KNOWS relationships
2886            let result = session
2887                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
2888                .unwrap();
2889
2890            // Should return only 1 row (Alix->Gus)
2891            assert_eq!(result.row_count(), 1);
2892        }
2893
2894        #[test]
2895        fn test_gql_semantic_error_undefined_variable() {
2896            let db = GrafeoDB::new_in_memory();
2897            let session = db.session();
2898
2899            // Reference undefined variable 'x' in RETURN
2900            let result = session.execute("MATCH (n:Person) RETURN x");
2901
2902            // Should fail with semantic error
2903            assert!(result.is_err());
2904            let Err(err) = result else {
2905                panic!("Expected error")
2906            };
2907            assert!(
2908                err.to_string().contains("Undefined variable"),
2909                "Expected undefined variable error, got: {}",
2910                err
2911            );
2912        }
2913
2914        #[test]
2915        fn test_gql_where_clause_property_filter() {
2916            use grafeo_common::types::Value;
2917
2918            let db = GrafeoDB::new_in_memory();
2919            let session = db.session();
2920
2921            // Create people with ages
2922            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
2923            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
2924            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
2925
2926            // Query with WHERE clause: age > 30
2927            let result = session
2928                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
2929                .unwrap();
2930
2931            // Should return 2 people (ages 35 and 45)
2932            assert_eq!(result.row_count(), 2);
2933        }
2934
2935        #[test]
2936        fn test_gql_where_clause_equality() {
2937            use grafeo_common::types::Value;
2938
2939            let db = GrafeoDB::new_in_memory();
2940            let session = db.session();
2941
2942            // Create people with names
2943            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
2944            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
2945            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
2946
2947            // Query with WHERE clause: name = "Alix"
2948            let result = session
2949                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
2950                .unwrap();
2951
2952            // Should return 2 people named Alix
2953            assert_eq!(result.row_count(), 2);
2954        }
2955
2956        #[test]
2957        fn test_gql_return_property_access() {
2958            use grafeo_common::types::Value;
2959
2960            let db = GrafeoDB::new_in_memory();
2961            let session = db.session();
2962
2963            // Create people with names and ages
2964            session.create_node_with_props(
2965                &["Person"],
2966                [
2967                    ("name", Value::String("Alix".into())),
2968                    ("age", Value::Int64(30)),
2969                ],
2970            );
2971            session.create_node_with_props(
2972                &["Person"],
2973                [
2974                    ("name", Value::String("Gus".into())),
2975                    ("age", Value::Int64(25)),
2976                ],
2977            );
2978
2979            // Query returning properties
2980            let result = session
2981                .execute("MATCH (n:Person) RETURN n.name, n.age")
2982                .unwrap();
2983
2984            // Should return 2 rows with name and age columns
2985            assert_eq!(result.row_count(), 2);
2986            assert_eq!(result.column_count(), 2);
2987            assert_eq!(result.columns[0], "n.name");
2988            assert_eq!(result.columns[1], "n.age");
2989
2990            // Check that we get actual values
2991            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
2992            assert!(names.contains(&&Value::String("Alix".into())));
2993            assert!(names.contains(&&Value::String("Gus".into())));
2994        }
2995
2996        #[test]
2997        fn test_gql_return_mixed_expressions() {
2998            use grafeo_common::types::Value;
2999
3000            let db = GrafeoDB::new_in_memory();
3001            let session = db.session();
3002
3003            // Create a person
3004            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3005
3006            // Query returning both node and property
3007            let result = session
3008                .execute("MATCH (n:Person) RETURN n, n.name")
3009                .unwrap();
3010
3011            assert_eq!(result.row_count(), 1);
3012            assert_eq!(result.column_count(), 2);
3013            assert_eq!(result.columns[0], "n");
3014            assert_eq!(result.columns[1], "n.name");
3015
3016            // Second column should be the name
3017            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
3018        }
3019    }
3020
3021    #[cfg(feature = "cypher")]
3022    mod cypher_tests {
3023        use super::*;
3024
3025        #[test]
3026        fn test_cypher_query_execution() {
3027            let db = GrafeoDB::new_in_memory();
3028            let session = db.session();
3029
3030            // Create some test data
3031            session.create_node(&["Person"]);
3032            session.create_node(&["Person"]);
3033            session.create_node(&["Animal"]);
3034
3035            // Execute a Cypher query
3036            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
3037
3038            // Should return 2 Person nodes
3039            assert_eq!(result.row_count(), 2);
3040            assert_eq!(result.column_count(), 1);
3041            assert_eq!(result.columns[0], "n");
3042        }
3043
3044        #[test]
3045        fn test_cypher_empty_result() {
3046            let db = GrafeoDB::new_in_memory();
3047            let session = db.session();
3048
3049            // No data in database
3050            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
3051
3052            assert_eq!(result.row_count(), 0);
3053        }
3054
3055        #[test]
3056        fn test_cypher_parse_error() {
3057            let db = GrafeoDB::new_in_memory();
3058            let session = db.session();
3059
3060            // Invalid Cypher syntax
3061            let result = session.execute_cypher("MATCH (n RETURN n");
3062
3063            assert!(result.is_err());
3064        }
3065    }
3066
3067    // ==================== Direct Lookup API Tests ====================
3068
3069    mod direct_lookup_tests {
3070        use super::*;
3071        use grafeo_common::types::Value;
3072
3073        #[test]
3074        fn test_get_node() {
3075            let db = GrafeoDB::new_in_memory();
3076            let session = db.session();
3077
3078            let id = session.create_node(&["Person"]);
3079            let node = session.get_node(id);
3080
3081            assert!(node.is_some());
3082            let node = node.unwrap();
3083            assert_eq!(node.id, id);
3084        }
3085
3086        #[test]
3087        fn test_get_node_not_found() {
3088            use grafeo_common::types::NodeId;
3089
3090            let db = GrafeoDB::new_in_memory();
3091            let session = db.session();
3092
3093            // Try to get a non-existent node
3094            let node = session.get_node(NodeId::new(9999));
3095            assert!(node.is_none());
3096        }
3097
3098        #[test]
3099        fn test_get_node_property() {
3100            let db = GrafeoDB::new_in_memory();
3101            let session = db.session();
3102
3103            let id = session
3104                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
3105
3106            let name = session.get_node_property(id, "name");
3107            assert_eq!(name, Some(Value::String("Alix".into())));
3108
3109            // Non-existent property
3110            let missing = session.get_node_property(id, "missing");
3111            assert!(missing.is_none());
3112        }
3113
3114        #[test]
3115        fn test_get_edge() {
3116            let db = GrafeoDB::new_in_memory();
3117            let session = db.session();
3118
3119            let alix = session.create_node(&["Person"]);
3120            let gus = session.create_node(&["Person"]);
3121            let edge_id = session.create_edge(alix, gus, "KNOWS");
3122
3123            let edge = session.get_edge(edge_id);
3124            assert!(edge.is_some());
3125            let edge = edge.unwrap();
3126            assert_eq!(edge.id, edge_id);
3127            assert_eq!(edge.src, alix);
3128            assert_eq!(edge.dst, gus);
3129        }
3130
3131        #[test]
3132        fn test_get_edge_not_found() {
3133            use grafeo_common::types::EdgeId;
3134
3135            let db = GrafeoDB::new_in_memory();
3136            let session = db.session();
3137
3138            let edge = session.get_edge(EdgeId::new(9999));
3139            assert!(edge.is_none());
3140        }
3141
3142        #[test]
3143        fn test_get_neighbors_outgoing() {
3144            let db = GrafeoDB::new_in_memory();
3145            let session = db.session();
3146
3147            let alix = session.create_node(&["Person"]);
3148            let gus = session.create_node(&["Person"]);
3149            let harm = session.create_node(&["Person"]);
3150
3151            session.create_edge(alix, gus, "KNOWS");
3152            session.create_edge(alix, harm, "KNOWS");
3153
3154            let neighbors = session.get_neighbors_outgoing(alix);
3155            assert_eq!(neighbors.len(), 2);
3156
3157            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
3158            assert!(neighbor_ids.contains(&gus));
3159            assert!(neighbor_ids.contains(&harm));
3160        }
3161
3162        #[test]
3163        fn test_get_neighbors_incoming() {
3164            let db = GrafeoDB::new_in_memory();
3165            let session = db.session();
3166
3167            let alix = session.create_node(&["Person"]);
3168            let gus = session.create_node(&["Person"]);
3169            let harm = session.create_node(&["Person"]);
3170
3171            session.create_edge(gus, alix, "KNOWS");
3172            session.create_edge(harm, alix, "KNOWS");
3173
3174            let neighbors = session.get_neighbors_incoming(alix);
3175            assert_eq!(neighbors.len(), 2);
3176
3177            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
3178            assert!(neighbor_ids.contains(&gus));
3179            assert!(neighbor_ids.contains(&harm));
3180        }
3181
3182        #[test]
3183        fn test_get_neighbors_outgoing_by_type() {
3184            let db = GrafeoDB::new_in_memory();
3185            let session = db.session();
3186
3187            let alix = session.create_node(&["Person"]);
3188            let gus = session.create_node(&["Person"]);
3189            let company = session.create_node(&["Company"]);
3190
3191            session.create_edge(alix, gus, "KNOWS");
3192            session.create_edge(alix, company, "WORKS_AT");
3193
3194            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3195            assert_eq!(knows_neighbors.len(), 1);
3196            assert_eq!(knows_neighbors[0].0, gus);
3197
3198            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
3199            assert_eq!(works_neighbors.len(), 1);
3200            assert_eq!(works_neighbors[0].0, company);
3201
3202            // No edges of this type
3203            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
3204            assert!(no_neighbors.is_empty());
3205        }
3206
3207        #[test]
3208        fn test_node_exists() {
3209            use grafeo_common::types::NodeId;
3210
3211            let db = GrafeoDB::new_in_memory();
3212            let session = db.session();
3213
3214            let id = session.create_node(&["Person"]);
3215
3216            assert!(session.node_exists(id));
3217            assert!(!session.node_exists(NodeId::new(9999)));
3218        }
3219
3220        #[test]
3221        fn test_edge_exists() {
3222            use grafeo_common::types::EdgeId;
3223
3224            let db = GrafeoDB::new_in_memory();
3225            let session = db.session();
3226
3227            let alix = session.create_node(&["Person"]);
3228            let gus = session.create_node(&["Person"]);
3229            let edge_id = session.create_edge(alix, gus, "KNOWS");
3230
3231            assert!(session.edge_exists(edge_id));
3232            assert!(!session.edge_exists(EdgeId::new(9999)));
3233        }
3234
3235        #[test]
3236        fn test_get_degree() {
3237            let db = GrafeoDB::new_in_memory();
3238            let session = db.session();
3239
3240            let alix = session.create_node(&["Person"]);
3241            let gus = session.create_node(&["Person"]);
3242            let harm = session.create_node(&["Person"]);
3243
3244            // Alix knows Gus and Harm (2 outgoing)
3245            session.create_edge(alix, gus, "KNOWS");
3246            session.create_edge(alix, harm, "KNOWS");
3247            // Gus knows Alix (1 incoming for Alix)
3248            session.create_edge(gus, alix, "KNOWS");
3249
3250            let (out_degree, in_degree) = session.get_degree(alix);
3251            assert_eq!(out_degree, 2);
3252            assert_eq!(in_degree, 1);
3253
3254            // Node with no edges
3255            let lonely = session.create_node(&["Person"]);
3256            let (out, in_deg) = session.get_degree(lonely);
3257            assert_eq!(out, 0);
3258            assert_eq!(in_deg, 0);
3259        }
3260
3261        #[test]
3262        fn test_get_nodes_batch() {
3263            let db = GrafeoDB::new_in_memory();
3264            let session = db.session();
3265
3266            let alix = session.create_node(&["Person"]);
3267            let gus = session.create_node(&["Person"]);
3268            let harm = session.create_node(&["Person"]);
3269
3270            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
3271            assert_eq!(nodes.len(), 3);
3272            assert!(nodes[0].is_some());
3273            assert!(nodes[1].is_some());
3274            assert!(nodes[2].is_some());
3275
3276            // With non-existent node
3277            use grafeo_common::types::NodeId;
3278            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
3279            assert_eq!(nodes_with_missing.len(), 3);
3280            assert!(nodes_with_missing[0].is_some());
3281            assert!(nodes_with_missing[1].is_none()); // Missing node
3282            assert!(nodes_with_missing[2].is_some());
3283        }
3284
3285        #[test]
3286        fn test_auto_commit_setting() {
3287            let db = GrafeoDB::new_in_memory();
3288            let mut session = db.session();
3289
3290            // Default is auto-commit enabled
3291            assert!(session.auto_commit());
3292
3293            session.set_auto_commit(false);
3294            assert!(!session.auto_commit());
3295
3296            session.set_auto_commit(true);
3297            assert!(session.auto_commit());
3298        }
3299
3300        #[test]
3301        fn test_transaction_double_begin_error() {
3302            let db = GrafeoDB::new_in_memory();
3303            let mut session = db.session();
3304
3305            session.begin_tx().unwrap();
3306            let result = session.begin_tx();
3307
3308            assert!(result.is_err());
3309            // Clean up
3310            session.rollback().unwrap();
3311        }
3312
3313        #[test]
3314        fn test_commit_without_transaction_error() {
3315            let db = GrafeoDB::new_in_memory();
3316            let mut session = db.session();
3317
3318            let result = session.commit();
3319            assert!(result.is_err());
3320        }
3321
3322        #[test]
3323        fn test_rollback_without_transaction_error() {
3324            let db = GrafeoDB::new_in_memory();
3325            let mut session = db.session();
3326
3327            let result = session.rollback();
3328            assert!(result.is_err());
3329        }
3330
3331        #[test]
3332        fn test_create_edge_in_transaction() {
3333            let db = GrafeoDB::new_in_memory();
3334            let mut session = db.session();
3335
3336            // Create nodes outside transaction
3337            let alix = session.create_node(&["Person"]);
3338            let gus = session.create_node(&["Person"]);
3339
3340            // Create edge in transaction
3341            session.begin_tx().unwrap();
3342            let edge_id = session.create_edge(alix, gus, "KNOWS");
3343
3344            // Edge should be visible in the transaction
3345            assert!(session.edge_exists(edge_id));
3346
3347            // Commit
3348            session.commit().unwrap();
3349
3350            // Edge should still be visible
3351            assert!(session.edge_exists(edge_id));
3352        }
3353
3354        #[test]
3355        fn test_neighbors_empty_node() {
3356            let db = GrafeoDB::new_in_memory();
3357            let session = db.session();
3358
3359            let lonely = session.create_node(&["Person"]);
3360
3361            assert!(session.get_neighbors_outgoing(lonely).is_empty());
3362            assert!(session.get_neighbors_incoming(lonely).is_empty());
3363            assert!(
3364                session
3365                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
3366                    .is_empty()
3367            );
3368        }
3369    }
3370
3371    #[test]
3372    fn test_auto_gc_triggers_on_commit_interval() {
3373        use crate::config::Config;
3374
3375        let config = Config::in_memory().with_gc_interval(2);
3376        let db = GrafeoDB::with_config(config).unwrap();
3377        let mut session = db.session();
3378
3379        // First commit: counter = 1, no GC (not a multiple of 2)
3380        session.begin_tx().unwrap();
3381        session.create_node(&["A"]);
3382        session.commit().unwrap();
3383
3384        // Second commit: counter = 2, GC should trigger (multiple of 2)
3385        session.begin_tx().unwrap();
3386        session.create_node(&["B"]);
3387        session.commit().unwrap();
3388
3389        // Verify the database is still functional after GC
3390        assert_eq!(db.node_count(), 2);
3391    }
3392
3393    #[test]
3394    fn test_query_timeout_config_propagates_to_session() {
3395        use crate::config::Config;
3396        use std::time::Duration;
3397
3398        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
3399        let db = GrafeoDB::with_config(config).unwrap();
3400        let session = db.session();
3401
3402        // Verify the session has a query deadline (timeout was set)
3403        assert!(session.query_deadline().is_some());
3404    }
3405
3406    #[test]
3407    fn test_no_query_timeout_returns_no_deadline() {
3408        let db = GrafeoDB::new_in_memory();
3409        let session = db.session();
3410
3411        // Default config has no timeout
3412        assert!(session.query_deadline().is_none());
3413    }
3414
3415    #[test]
3416    fn test_graph_model_accessor() {
3417        use crate::config::GraphModel;
3418
3419        let db = GrafeoDB::new_in_memory();
3420        let session = db.session();
3421
3422        assert_eq!(session.graph_model(), GraphModel::Lpg);
3423    }
3424
3425    #[cfg(feature = "gql")]
3426    #[test]
3427    fn test_external_store_session() {
3428        use grafeo_core::graph::GraphStoreMut;
3429        use std::sync::Arc;
3430
3431        let config = crate::config::Config::in_memory();
3432        let store =
3433            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
3434        let db = GrafeoDB::with_store(store, config).unwrap();
3435
3436        let session = db.session();
3437
3438        // Create data through a query (goes through the external graph_store)
3439        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
3440
3441        // Verify we can query through it
3442        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
3443        assert_eq!(result.row_count(), 1);
3444    }
3445
3446    // ==================== Session Command Tests ====================
3447
3448    #[cfg(feature = "gql")]
3449    mod session_command_tests {
3450        use super::*;
3451
3452        #[test]
3453        fn test_use_graph_sets_current_graph() {
3454            let db = GrafeoDB::new_in_memory();
3455            let session = db.session();
3456
3457            // Create the graph first, then USE it
3458            session.execute("CREATE GRAPH mydb").unwrap();
3459            session.execute("USE GRAPH mydb").unwrap();
3460
3461            assert_eq!(session.current_graph(), Some("mydb".to_string()));
3462        }
3463
3464        #[test]
3465        fn test_use_graph_nonexistent_errors() {
3466            let db = GrafeoDB::new_in_memory();
3467            let session = db.session();
3468
3469            let result = session.execute("USE GRAPH doesnotexist");
3470            assert!(result.is_err());
3471            let err = result.unwrap_err().to_string();
3472            assert!(
3473                err.contains("does not exist"),
3474                "Expected 'does not exist' error, got: {err}"
3475            );
3476        }
3477
3478        #[test]
3479        fn test_use_graph_default_always_valid() {
3480            let db = GrafeoDB::new_in_memory();
3481            let session = db.session();
3482
3483            // "default" is always valid, even without CREATE GRAPH
3484            session.execute("USE GRAPH default").unwrap();
3485            assert_eq!(session.current_graph(), Some("default".to_string()));
3486        }
3487
3488        #[test]
3489        fn test_session_set_graph() {
3490            let db = GrafeoDB::new_in_memory();
3491            let session = db.session();
3492
3493            // SESSION SET GRAPH does not verify existence
3494            session.execute("SESSION SET GRAPH analytics").unwrap();
3495            assert_eq!(session.current_graph(), Some("analytics".to_string()));
3496        }
3497
3498        #[test]
3499        fn test_session_set_time_zone() {
3500            let db = GrafeoDB::new_in_memory();
3501            let session = db.session();
3502
3503            assert_eq!(session.time_zone(), None);
3504
3505            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3506            assert_eq!(session.time_zone(), Some("UTC".to_string()));
3507
3508            session
3509                .execute("SESSION SET TIME ZONE 'America/New_York'")
3510                .unwrap();
3511            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
3512        }
3513
3514        #[test]
3515        fn test_session_set_parameter() {
3516            let db = GrafeoDB::new_in_memory();
3517            let session = db.session();
3518
3519            session
3520                .execute("SESSION SET PARAMETER $timeout = 30")
3521                .unwrap();
3522
3523            // Parameter is stored (value is Null for now, since expression
3524            // evaluation is not yet wired up)
3525            assert!(session.get_parameter("timeout").is_some());
3526        }
3527
3528        #[test]
3529        fn test_session_reset_clears_all_state() {
3530            let db = GrafeoDB::new_in_memory();
3531            let session = db.session();
3532
3533            // Set various session state
3534            session.execute("SESSION SET GRAPH analytics").unwrap();
3535            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3536            session
3537                .execute("SESSION SET PARAMETER $limit = 100")
3538                .unwrap();
3539
3540            // Verify state was set
3541            assert!(session.current_graph().is_some());
3542            assert!(session.time_zone().is_some());
3543            assert!(session.get_parameter("limit").is_some());
3544
3545            // Reset everything
3546            session.execute("SESSION RESET").unwrap();
3547
3548            assert_eq!(session.current_graph(), None);
3549            assert_eq!(session.time_zone(), None);
3550            assert!(session.get_parameter("limit").is_none());
3551        }
3552
3553        #[test]
3554        fn test_session_close_clears_state() {
3555            let db = GrafeoDB::new_in_memory();
3556            let session = db.session();
3557
3558            session.execute("SESSION SET GRAPH analytics").unwrap();
3559            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
3560
3561            session.execute("SESSION CLOSE").unwrap();
3562
3563            assert_eq!(session.current_graph(), None);
3564            assert_eq!(session.time_zone(), None);
3565        }
3566
3567        #[test]
3568        fn test_create_graph() {
3569            let db = GrafeoDB::new_in_memory();
3570            let session = db.session();
3571
3572            session.execute("CREATE GRAPH mydb").unwrap();
3573
3574            // Should be able to USE it now
3575            session.execute("USE GRAPH mydb").unwrap();
3576            assert_eq!(session.current_graph(), Some("mydb".to_string()));
3577        }
3578
3579        #[test]
3580        fn test_create_graph_duplicate_errors() {
3581            let db = GrafeoDB::new_in_memory();
3582            let session = db.session();
3583
3584            session.execute("CREATE GRAPH mydb").unwrap();
3585            let result = session.execute("CREATE GRAPH mydb");
3586
3587            assert!(result.is_err());
3588            let err = result.unwrap_err().to_string();
3589            assert!(
3590                err.contains("already exists"),
3591                "Expected 'already exists' error, got: {err}"
3592            );
3593        }
3594
3595        #[test]
3596        fn test_create_graph_if_not_exists() {
3597            let db = GrafeoDB::new_in_memory();
3598            let session = db.session();
3599
3600            session.execute("CREATE GRAPH mydb").unwrap();
3601            // Should succeed silently with IF NOT EXISTS
3602            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
3603        }
3604
3605        #[test]
3606        fn test_drop_graph() {
3607            let db = GrafeoDB::new_in_memory();
3608            let session = db.session();
3609
3610            session.execute("CREATE GRAPH mydb").unwrap();
3611            session.execute("DROP GRAPH mydb").unwrap();
3612
3613            // Should no longer be usable
3614            let result = session.execute("USE GRAPH mydb");
3615            assert!(result.is_err());
3616        }
3617
3618        #[test]
3619        fn test_drop_graph_nonexistent_errors() {
3620            let db = GrafeoDB::new_in_memory();
3621            let session = db.session();
3622
3623            let result = session.execute("DROP GRAPH nosuchgraph");
3624            assert!(result.is_err());
3625            let err = result.unwrap_err().to_string();
3626            assert!(
3627                err.contains("does not exist"),
3628                "Expected 'does not exist' error, got: {err}"
3629            );
3630        }
3631
3632        #[test]
3633        fn test_drop_graph_if_exists() {
3634            let db = GrafeoDB::new_in_memory();
3635            let session = db.session();
3636
3637            // Should succeed silently with IF EXISTS
3638            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
3639        }
3640
3641        #[test]
3642        fn test_start_transaction_via_gql() {
3643            let db = GrafeoDB::new_in_memory();
3644            let session = db.session();
3645
3646            session.execute("START TRANSACTION").unwrap();
3647            assert!(session.in_transaction());
3648            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3649            session.execute("COMMIT").unwrap();
3650            assert!(!session.in_transaction());
3651
3652            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3653            assert_eq!(result.rows.len(), 1);
3654        }
3655
3656        #[test]
3657        fn test_start_transaction_read_only_blocks_insert() {
3658            let db = GrafeoDB::new_in_memory();
3659            let session = db.session();
3660
3661            session.execute("START TRANSACTION READ ONLY").unwrap();
3662            let result = session.execute("INSERT (:Person {name: 'Alix'})");
3663            assert!(result.is_err());
3664            let err = result.unwrap_err().to_string();
3665            assert!(
3666                err.contains("read-only"),
3667                "Expected read-only error, got: {err}"
3668            );
3669            session.execute("ROLLBACK").unwrap();
3670        }
3671
3672        #[test]
3673        fn test_start_transaction_read_only_allows_reads() {
3674            let db = GrafeoDB::new_in_memory();
3675            let mut session = db.session();
3676            session.begin_tx().unwrap();
3677            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3678            session.commit().unwrap();
3679
3680            session.execute("START TRANSACTION READ ONLY").unwrap();
3681            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3682            assert_eq!(result.rows.len(), 1);
3683            session.execute("COMMIT").unwrap();
3684        }
3685
3686        #[test]
3687        fn test_rollback_via_gql() {
3688            let db = GrafeoDB::new_in_memory();
3689            let session = db.session();
3690
3691            session.execute("START TRANSACTION").unwrap();
3692            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
3693            session.execute("ROLLBACK").unwrap();
3694
3695            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
3696            assert!(result.rows.is_empty());
3697        }
3698
3699        #[test]
3700        fn test_start_transaction_with_isolation_level() {
3701            let db = GrafeoDB::new_in_memory();
3702            let session = db.session();
3703
3704            session
3705                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
3706                .unwrap();
3707            assert!(session.in_transaction());
3708            session.execute("ROLLBACK").unwrap();
3709        }
3710
3711        #[test]
3712        fn test_session_commands_return_empty_result() {
3713            let db = GrafeoDB::new_in_memory();
3714            let session = db.session();
3715
3716            let result = session.execute("SESSION SET GRAPH test").unwrap();
3717            assert_eq!(result.row_count(), 0);
3718            assert_eq!(result.column_count(), 0);
3719        }
3720
3721        #[test]
3722        fn test_current_graph_default_is_none() {
3723            let db = GrafeoDB::new_in_memory();
3724            let session = db.session();
3725
3726            assert_eq!(session.current_graph(), None);
3727        }
3728
3729        #[test]
3730        fn test_time_zone_default_is_none() {
3731            let db = GrafeoDB::new_in_memory();
3732            let session = db.session();
3733
3734            assert_eq!(session.time_zone(), None);
3735        }
3736
3737        #[test]
3738        fn test_session_state_independent_across_sessions() {
3739            let db = GrafeoDB::new_in_memory();
3740            let session1 = db.session();
3741            let session2 = db.session();
3742
3743            session1.execute("SESSION SET GRAPH first").unwrap();
3744            session2.execute("SESSION SET GRAPH second").unwrap();
3745
3746            assert_eq!(session1.current_graph(), Some("first".to_string()));
3747            assert_eq!(session2.current_graph(), Some("second".to_string()));
3748        }
3749    }
3750}