Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::cache::QueryCache;
37use crate::transaction::TransactionManager;
38
39/// Storage key suffix for the implicit default graph within a schema.
40/// Auto-created by `CREATE SCHEMA` and auto-dropped by `DROP SCHEMA`.
41const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
42
43/// Parses a DDL default-value literal string into a [`Value`].
44///
45/// Handles string literals (single- or double-quoted), integers, floats,
46/// booleans (`true`/`false`), and `NULL`.
47fn parse_default_literal(text: &str) -> Value {
48    if text.eq_ignore_ascii_case("null") {
49        return Value::Null;
50    }
51    if text.eq_ignore_ascii_case("true") {
52        return Value::Bool(true);
53    }
54    if text.eq_ignore_ascii_case("false") {
55        return Value::Bool(false);
56    }
57    // String literal: strip surrounding quotes
58    if (text.starts_with('\'') && text.ends_with('\''))
59        || (text.starts_with('"') && text.ends_with('"'))
60    {
61        return Value::String(text[1..text.len() - 1].into());
62    }
63    // Try integer, then float
64    if let Ok(i) = text.parse::<i64>() {
65        return Value::Int64(i);
66    }
67    if let Ok(f) = text.parse::<f64>() {
68        return Value::Float64(f);
69    }
70    // Fallback: treat as string
71    Value::String(text.into())
72}
73
74/// Runtime configuration for creating a new session.
75///
76/// Groups the shared parameters passed to all session constructors, keeping
77/// call sites readable and avoiding long argument lists.
78pub(crate) struct SessionConfig {
79    pub transaction_manager: Arc<TransactionManager>,
80    pub query_cache: Arc<QueryCache>,
81    pub catalog: Arc<Catalog>,
82    pub adaptive_config: AdaptiveConfig,
83    pub factorized_execution: bool,
84    pub graph_model: GraphModel,
85    pub query_timeout: Option<Duration>,
86    pub commit_counter: Arc<AtomicUsize>,
87    pub gc_interval: usize,
88    /// When true, the session permanently blocks all mutations.
89    pub read_only: bool,
90    /// The identity bound to this session (for permission checks).
91    pub identity: crate::auth::Identity,
92    /// Named graph projections shared with the database.
93    #[cfg(feature = "lpg")]
94    pub projections: Arc<
95        parking_lot::RwLock<
96            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
97        >,
98    >,
99}
100
101/// Your handle to the database - execute queries and manage transactions.
102///
103/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
104/// tracks its own transaction state, so you can have multiple concurrent
105/// sessions without them interfering.
106pub struct Session {
107    /// The underlying store.
108    #[cfg(feature = "lpg")]
109    store: Arc<LpgStore>,
110    /// Graph store trait object for pluggable storage backends (read path).
111    graph_store: Arc<dyn GraphStore>,
112    /// Writable graph store (None for read-only databases).
113    graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
114    /// Schema and metadata catalog shared across sessions.
115    catalog: Arc<Catalog>,
116    /// RDF triple store (if RDF feature is enabled).
117    #[cfg(feature = "triple-store")]
118    rdf_store: Arc<RdfStore>,
119    /// Transaction manager.
120    transaction_manager: Arc<TransactionManager>,
121    /// Query cache shared across sessions.
122    query_cache: Arc<QueryCache>,
123    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
124    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
125    /// from within `execute(&self)`.
126    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
127    /// Whether the current transaction is read-only (blocks mutations).
128    read_only_tx: parking_lot::Mutex<bool>,
129    /// Whether the database itself is read-only (set at open time, never changes).
130    /// When true, `read_only_tx` is always true regardless of transaction flags.
131    db_read_only: bool,
132    /// The identity bound to this session (determines permission level).
133    identity: crate::auth::Identity,
134    /// Whether the session is in auto-commit mode.
135    auto_commit: bool,
136    /// Adaptive execution configuration.
137    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
138    adaptive_config: AdaptiveConfig,
139    /// Whether to use factorized execution for multi-hop queries.
140    factorized_execution: bool,
141    /// The graph data model this session operates on.
142    graph_model: GraphModel,
143    /// Maximum time a query may run before being cancelled.
144    query_timeout: Option<Duration>,
145    /// Shared commit counter for triggering auto-GC.
146    commit_counter: Arc<AtomicUsize>,
147    /// GC every N commits (0 = disabled).
148    gc_interval: usize,
149    /// Node count at the start of the current transaction (for PreparedCommit stats).
150    transaction_start_node_count: AtomicUsize,
151    /// Edge count at the start of the current transaction (for PreparedCommit stats).
152    transaction_start_edge_count: AtomicUsize,
153    /// WAL for logging schema changes.
154    #[cfg(feature = "wal")]
155    wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
156    /// Shared WAL graph context tracker for named graph awareness.
157    #[cfg(feature = "wal")]
158    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
159    /// CDC log for change tracking.
160    #[cfg(feature = "cdc")]
161    cdc_log: Arc<crate::cdc::CdcLog>,
162    /// Buffered CDC events for the current transaction.
163    /// Flushed to `cdc_log` on commit, discarded on rollback.
164    #[cfg(feature = "cdc")]
165    cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
166    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
167    current_graph: parking_lot::Mutex<Option<String>>,
168    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
169    /// None = "not set" (uses default schema).
170    current_schema: parking_lot::Mutex<Option<String>>,
171    /// Session time zone override.
172    time_zone: parking_lot::Mutex<Option<String>>,
173    /// Session-level parameters (SET PARAMETER).
174    session_params:
175        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
176    /// Override epoch for time-travel queries (None = use transaction/current epoch).
177    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
178    /// Savepoints within the current transaction.
179    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
180    /// Nesting depth for nested transactions (0 = outermost).
181    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
182    /// releases it, nested `ROLLBACK` rolls back to it.
183    transaction_nesting_depth: parking_lot::Mutex<u32>,
184    /// Named graphs touched during the current transaction (for cross-graph atomicity).
185    /// `None` represents the default graph. Populated at `BEGIN` time and on each
186    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
187    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
188    /// Shared metrics registry (populated when the `metrics` feature is enabled).
189    #[cfg(feature = "metrics")]
190    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
191    /// Transaction start time for duration tracking.
192    #[cfg(feature = "metrics")]
193    tx_start_time: parking_lot::Mutex<Option<Instant>>,
194    /// Named graph projections shared with the database.
195    #[cfg(feature = "lpg")]
196    projections: Arc<
197        parking_lot::RwLock<
198            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
199        >,
200    >,
201}
202
203/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
204#[derive(Clone)]
205struct GraphSavepoint {
206    graph_name: Option<String>,
207    next_node_id: u64,
208    next_edge_id: u64,
209    undo_log_position: usize,
210}
211
212/// Savepoint state: name + per-graph snapshots + the graph that was active.
213#[derive(Clone)]
214struct SavepointState {
215    name: String,
216    graph_snapshots: Vec<GraphSavepoint>,
217    /// The graph that was active when the savepoint was created.
218    /// Reserved for future use (e.g., restoring graph context on rollback).
219    #[allow(dead_code)]
220    active_graph: Option<String>,
221    /// CDC event buffer position at savepoint creation.
222    /// On rollback-to-savepoint, the buffer is truncated to this position.
223    #[cfg(feature = "cdc")]
224    cdc_event_position: usize,
225}
226
227impl Session {
228    /// Creates a new session with adaptive execution configuration.
229    #[cfg(feature = "lpg")]
230    #[allow(dead_code)] // Used when lpg enabled without triple-store
231    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
232        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
233        let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
234        Self {
235            store,
236            graph_store,
237            graph_store_mut,
238            catalog: cfg.catalog,
239            #[cfg(feature = "triple-store")]
240            rdf_store: Arc::new(RdfStore::new()),
241            transaction_manager: cfg.transaction_manager,
242            query_cache: cfg.query_cache,
243            current_transaction: parking_lot::Mutex::new(None),
244            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
245            db_read_only: cfg.read_only,
246            identity: cfg.identity,
247            auto_commit: true,
248            adaptive_config: cfg.adaptive_config,
249            factorized_execution: cfg.factorized_execution,
250            graph_model: cfg.graph_model,
251            query_timeout: cfg.query_timeout,
252            commit_counter: cfg.commit_counter,
253            gc_interval: cfg.gc_interval,
254            transaction_start_node_count: AtomicUsize::new(0),
255            transaction_start_edge_count: AtomicUsize::new(0),
256            #[cfg(feature = "wal")]
257            wal: None,
258            #[cfg(feature = "wal")]
259            wal_graph_context: None,
260            #[cfg(feature = "cdc")]
261            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
262            #[cfg(feature = "cdc")]
263            cdc_pending_events: None,
264            current_graph: parking_lot::Mutex::new(None),
265            current_schema: parking_lot::Mutex::new(None),
266            time_zone: parking_lot::Mutex::new(None),
267            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
268            viewing_epoch_override: parking_lot::Mutex::new(None),
269            savepoints: parking_lot::Mutex::new(Vec::new()),
270            transaction_nesting_depth: parking_lot::Mutex::new(0),
271            touched_graphs: parking_lot::Mutex::new(Vec::new()),
272            #[cfg(feature = "metrics")]
273            metrics: None,
274            #[cfg(feature = "metrics")]
275            tx_start_time: parking_lot::Mutex::new(None),
276            projections: cfg.projections,
277        }
278    }
279
280    /// Sets the WAL for this session (shared with the database).
281    ///
282    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
283    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
284    #[cfg(all(feature = "wal", feature = "lpg"))]
285    pub(crate) fn set_wal(
286        &mut self,
287        wal: Arc<grafeo_storage::wal::LpgWal>,
288        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
289    ) {
290        // Wrap the graph store so query-engine mutations are WAL-logged
291        let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
292            Arc::clone(&self.store),
293            Arc::clone(&wal),
294            Arc::clone(&wal_graph_context),
295        ));
296        self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
297        self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
298        self.wal = Some(wal);
299        self.wal_graph_context = Some(wal_graph_context);
300    }
301
302    /// Sets the CDC log for this session (shared with the database).
303    ///
304    /// Wraps the current write store with a `CdcGraphStore` decorator so
305    /// that all session mutations (INSERT, SET, DELETE via query execution)
306    /// buffer CDC events. The buffer is flushed to the `CdcLog` on commit
307    /// and discarded on rollback.
308    #[cfg(feature = "cdc")]
309    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
310        // Wrap the WRITE store only with CdcGraphStore to intercept mutations.
311        // The read store (self.graph_store) is left unchanged for zero read overhead.
312        if let Some(ref write_store) = self.graph_store_mut {
313            let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
314                Arc::clone(write_store),
315                Arc::clone(&cdc_log),
316            ));
317            self.cdc_pending_events = Some(cdc_store.pending_events());
318            self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
319        }
320        self.cdc_log = cdc_log;
321    }
322
323    /// Sets the metrics registry for this session (shared with the database).
324    #[cfg(feature = "metrics")]
325    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
326        self.metrics = Some(metrics);
327    }
328
329    /// Creates a session backed by an external graph store.
330    ///
331    /// The external store handles all data operations. Transaction management
332    /// (begin/commit/rollback) is not supported for external stores.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if the internal arena allocation fails (out of memory).
337    pub(crate) fn with_external_store(
338        read_store: Arc<dyn GraphStore>,
339        write_store: Option<Arc<dyn GraphStoreMut>>,
340        cfg: SessionConfig,
341    ) -> Result<Self> {
342        Ok(Self {
343            #[cfg(feature = "lpg")]
344            store: Arc::new(LpgStore::new()?),
345            graph_store: read_store,
346            graph_store_mut: write_store,
347            catalog: cfg.catalog,
348            #[cfg(feature = "triple-store")]
349            rdf_store: Arc::new(RdfStore::new()),
350            transaction_manager: cfg.transaction_manager,
351            query_cache: cfg.query_cache,
352            current_transaction: parking_lot::Mutex::new(None),
353            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
354            db_read_only: cfg.read_only,
355            identity: cfg.identity,
356            auto_commit: true,
357            adaptive_config: cfg.adaptive_config,
358            factorized_execution: cfg.factorized_execution,
359            graph_model: cfg.graph_model,
360            query_timeout: cfg.query_timeout,
361            commit_counter: cfg.commit_counter,
362            gc_interval: cfg.gc_interval,
363            transaction_start_node_count: AtomicUsize::new(0),
364            transaction_start_edge_count: AtomicUsize::new(0),
365            #[cfg(feature = "wal")]
366            wal: None,
367            #[cfg(feature = "wal")]
368            wal_graph_context: None,
369            #[cfg(feature = "cdc")]
370            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
371            #[cfg(feature = "cdc")]
372            cdc_pending_events: None,
373            current_graph: parking_lot::Mutex::new(None),
374            current_schema: parking_lot::Mutex::new(None),
375            time_zone: parking_lot::Mutex::new(None),
376            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
377            viewing_epoch_override: parking_lot::Mutex::new(None),
378            savepoints: parking_lot::Mutex::new(Vec::new()),
379            transaction_nesting_depth: parking_lot::Mutex::new(0),
380            touched_graphs: parking_lot::Mutex::new(Vec::new()),
381            #[cfg(feature = "metrics")]
382            metrics: None,
383            #[cfg(feature = "metrics")]
384            tx_start_time: parking_lot::Mutex::new(None),
385            #[cfg(feature = "lpg")]
386            projections: cfg.projections,
387        })
388    }
389
390    /// Returns the graph model this session operates on.
391    #[must_use]
392    pub fn graph_model(&self) -> GraphModel {
393        self.graph_model
394    }
395
396    /// Returns the identity bound to this session.
397    #[must_use]
398    pub fn identity(&self) -> &crate::auth::Identity {
399        &self.identity
400    }
401
402    // === Session State Management ===
403
404    /// Sets the current graph for this session (USE GRAPH).
405    pub fn use_graph(&self, name: &str) {
406        *self.current_graph.lock() = Some(name.to_string());
407    }
408
409    /// Returns the current graph name, if any.
410    #[must_use]
411    pub fn current_graph(&self) -> Option<String> {
412        self.current_graph.lock().clone()
413    }
414
415    /// Sets the current schema for this session (SESSION SET SCHEMA).
416    ///
417    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
418    pub fn set_schema(&self, name: &str) {
419        *self.current_schema.lock() = Some(name.to_string());
420    }
421
422    /// Returns the current schema name, if any.
423    ///
424    /// `None` means "not set", which resolves to the default schema.
425    #[must_use]
426    pub fn current_schema(&self) -> Option<String> {
427        self.current_schema.lock().clone()
428    }
429
430    /// Computes the effective storage key for a graph, accounting for schema context.
431    ///
432    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
433    /// Uses `/` as separator since it is invalid in GQL identifiers.
434    fn effective_graph_key(&self, graph_name: &str) -> String {
435        let schema = self.current_schema.lock().clone();
436        match schema {
437            Some(s) => format!("{s}/{graph_name}"),
438            None => graph_name.to_string(),
439        }
440    }
441
442    /// Computes the effective storage key for a type, accounting for schema context.
443    ///
444    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
445    fn effective_type_key(&self, type_name: &str) -> String {
446        let schema = self.current_schema.lock().clone();
447        match schema {
448            Some(s) => format!("{s}/{type_name}"),
449            None => type_name.to_string(),
450        }
451    }
452
453    /// Returns the effective storage key for the current graph, accounting for schema.
454    ///
455    /// Combines `current_schema` and `current_graph` into a flat lookup key.
456    fn active_graph_storage_key(&self) -> Option<String> {
457        let graph = self.current_graph.lock().clone();
458        let schema = self.current_schema.lock().clone();
459        match (&schema, &graph) {
460            (None, None) => None,
461            (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
462            (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
463            (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
464                Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
465            }
466            (None, Some(name)) => Some(name.clone()),
467            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
468        }
469    }
470
471    /// Returns the graph store for the currently active graph.
472    ///
473    /// If `current_graph` is `None` or `"default"`, returns the session's
474    /// default `graph_store` (already WAL-wrapped for the default graph).
475    /// Otherwise looks up the named graph in the root store and wraps it
476    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
477    /// graph context.
478    fn active_store(&self) -> Arc<dyn GraphStore> {
479        let key = self.active_graph_storage_key();
480        match key {
481            None => Arc::clone(&self.graph_store),
482            #[cfg(feature = "lpg")]
483            Some(ref name) => match self.store.graph(name) {
484                Some(named_store) => {
485                    #[cfg(feature = "wal")]
486                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
487                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
488                            named_store,
489                            Arc::clone(wal),
490                            name.clone(),
491                            Arc::clone(ctx),
492                        )) as Arc<dyn GraphStore>;
493                    }
494                    named_store as Arc<dyn GraphStore>
495                }
496                None => Arc::clone(&self.graph_store),
497            },
498            #[cfg(not(feature = "lpg"))]
499            Some(_) => Arc::clone(&self.graph_store),
500        }
501    }
502
503    /// Returns the writable store for the active graph, if available.
504    ///
505    /// Returns `None` for read-only databases. For named graphs, wraps
506    /// the store with WAL logging when durability is enabled.
507    fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
508        let key = self.active_graph_storage_key();
509        match key {
510            None => self.graph_store_mut.as_ref().map(Arc::clone),
511            #[cfg(feature = "lpg")]
512            Some(ref name) => match self.store.graph(name) {
513                Some(named_store) => {
514                    let mut store: Arc<dyn GraphStoreMut> = named_store;
515
516                    #[cfg(feature = "wal")]
517                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
518                        store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
519                            // WAL needs Arc<LpgStore>, get it fresh
520                            self.store
521                                .graph(name)
522                                .unwrap_or_else(|| Arc::clone(&self.store)),
523                            Arc::clone(wal),
524                            name.clone(),
525                            Arc::clone(ctx),
526                        ));
527                    }
528
529                    #[cfg(feature = "cdc")]
530                    if let Some(ref pending) = self.cdc_pending_events {
531                        store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
532                            store,
533                            Arc::clone(&self.cdc_log),
534                            Arc::clone(pending),
535                        ));
536                    }
537
538                    Some(store)
539                }
540                None => self.graph_store_mut.as_ref().map(Arc::clone),
541            },
542            #[cfg(not(feature = "lpg"))]
543            Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
544        }
545    }
546
547    /// Returns the concrete `LpgStore` for the currently active graph.
548    ///
549    /// Used by direct CRUD methods that need the concrete store type
550    /// for versioned operations.
551    #[cfg(feature = "lpg")]
552    fn active_lpg_store(&self) -> Arc<LpgStore> {
553        let key = self.active_graph_storage_key();
554        match key {
555            None => Arc::clone(&self.store),
556            Some(ref name) => self
557                .store
558                .graph(name)
559                .unwrap_or_else(|| Arc::clone(&self.store)),
560        }
561    }
562
563    /// Resolves a graph name to a concrete `LpgStore`.
564    /// `None` and `"default"` resolve to the session's root store.
565    #[cfg(feature = "lpg")]
566    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
567        match graph_name {
568            None => Arc::clone(&self.store),
569            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
570            Some(name) => self
571                .store
572                .graph(name)
573                .unwrap_or_else(|| Arc::clone(&self.store)),
574        }
575    }
576
577    /// Records the current graph as "touched" if a transaction is active.
578    ///
579    /// Uses the full storage key (schema/graph) so that commit/rollback
580    /// can resolve the correct store via `resolve_store`.
581    fn track_graph_touch(&self) {
582        if self.current_transaction.lock().is_some() {
583            let key = self.active_graph_storage_key();
584            let mut touched = self.touched_graphs.lock();
585            if !touched.contains(&key) {
586                touched.push(key);
587            }
588        }
589    }
590
591    /// Sets the session time zone.
592    pub fn set_time_zone(&self, tz: &str) {
593        *self.time_zone.lock() = Some(tz.to_string());
594    }
595
596    /// Returns the session time zone, if set.
597    #[must_use]
598    pub fn time_zone(&self) -> Option<String> {
599        self.time_zone.lock().clone()
600    }
601
602    /// Sets a session parameter.
603    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
604        self.session_params.lock().insert(key.to_string(), value);
605    }
606
607    /// Gets a session parameter by cloning it.
608    #[must_use]
609    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
610        self.session_params.lock().get(key).cloned()
611    }
612
613    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
614    pub fn reset_session(&self) {
615        *self.current_schema.lock() = None;
616        *self.current_graph.lock() = None;
617        *self.time_zone.lock() = None;
618        self.session_params.lock().clear();
619        *self.viewing_epoch_override.lock() = None;
620    }
621
622    /// Resets only the session schema (Section 7.2 GR1).
623    pub fn reset_schema(&self) {
624        *self.current_schema.lock() = None;
625    }
626
627    /// Resets only the session graph (Section 7.2 GR2).
628    pub fn reset_graph(&self) {
629        *self.current_graph.lock() = None;
630    }
631
632    /// Resets only the session time zone (Section 7.2 GR3).
633    pub fn reset_time_zone(&self) {
634        *self.time_zone.lock() = None;
635    }
636
637    /// Resets only session parameters (Section 7.2 GR4).
638    pub fn reset_parameters(&self) {
639        self.session_params.lock().clear();
640    }
641
642    // --- Time-travel API ---
643
644    /// Sets a viewing epoch override for time-travel queries.
645    ///
646    /// While set, all queries on this session see the database as it existed
647    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
648    /// to return to normal behavior.
649    pub fn set_viewing_epoch(&self, epoch: EpochId) {
650        *self.viewing_epoch_override.lock() = Some(epoch);
651    }
652
653    /// Clears the viewing epoch override, returning to normal behavior.
654    pub fn clear_viewing_epoch(&self) {
655        *self.viewing_epoch_override.lock() = None;
656    }
657
658    /// Returns the current viewing epoch override, if any.
659    #[must_use]
660    pub fn viewing_epoch(&self) -> Option<EpochId> {
661        *self.viewing_epoch_override.lock()
662    }
663
664    /// Returns all versions of a node with their creation/deletion epochs.
665    ///
666    /// Properties and labels reflect the current state (not versioned per-epoch).
667    #[cfg(feature = "lpg")]
668    #[must_use]
669    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
670        self.active_lpg_store().get_node_history(id)
671    }
672
673    /// Returns all versions of an edge with their creation/deletion epochs.
674    ///
675    /// Properties reflect the current state (not versioned per-epoch).
676    #[cfg(feature = "lpg")]
677    #[must_use]
678    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
679        self.active_lpg_store().get_edge_history(id)
680    }
681
682    /// Checks that the session's graph model supports LPG operations.
683    fn require_lpg(&self, language: &str) -> Result<()> {
684        if self.graph_model == GraphModel::Rdf {
685            return Err(grafeo_common::utils::error::Error::Internal(format!(
686                "This is an RDF database. {language} queries require an LPG database."
687            )));
688        }
689        Ok(())
690    }
691
692    /// Checks that the session's identity is permitted to execute the given
693    /// statement kind. Returns an error if the role is insufficient.
694    fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
695        crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
696            grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
697                grafeo_common::utils::error::QueryErrorKind::Semantic,
698                denied.to_string(),
699            ))
700        })
701    }
702
703    /// Executes a session or transaction command, returning an empty result.
704    #[cfg(feature = "gql")]
705    fn execute_session_command(
706        &self,
707        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
708    ) -> Result<QueryResult> {
709        use grafeo_adapters::query::gql::ast::SessionCommand;
710        #[cfg(feature = "lpg")]
711        use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
712        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
713
714        // Check role-based permission for graph management commands
715        match &cmd {
716            SessionCommand::CreateGraph { .. }
717            | SessionCommand::DropGraph { .. }
718            | SessionCommand::CreateProjection { .. }
719            | SessionCommand::DropProjection { .. } => {
720                self.require_permission(crate::auth::StatementKind::Write)?;
721            }
722            _ => {} // Session state + transaction control: always allowed
723        }
724
725        // Check per-graph grants for graph-scoped commands
726        if self.identity.has_grants() {
727            match &cmd {
728                SessionCommand::CreateGraph { name, .. }
729                | SessionCommand::DropGraph { name, .. } => {
730                    if !self
731                        .identity
732                        .can_access_graph(name, crate::auth::Role::ReadWrite)
733                    {
734                        return Err(Error::Query(QueryError::new(
735                            QueryErrorKind::Semantic,
736                            format!(
737                                "permission denied: no grant for graph '{name}' (user: {})",
738                                self.identity.user_id()
739                            ),
740                        )));
741                    }
742                }
743                _ => {}
744            }
745        }
746
747        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
748        if *self.read_only_tx.lock() {
749            match &cmd {
750                SessionCommand::CreateGraph { .. }
751                | SessionCommand::DropGraph { .. }
752                | SessionCommand::CreateProjection { .. }
753                | SessionCommand::DropProjection { .. } => {
754                    return Err(Error::Transaction(
755                        grafeo_common::utils::error::TransactionError::ReadOnly,
756                    ));
757                }
758                _ => {} // Session state + transaction control allowed
759            }
760        }
761
762        match cmd {
763            #[cfg(feature = "lpg")]
764            SessionCommand::CreateGraph {
765                name,
766                if_not_exists,
767                typed,
768                like_graph,
769                copy_of,
770                open: _,
771            } => {
772                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
773                let storage_key = self.effective_graph_key(&name);
774
775                // Validate source graph exists for LIKE / AS COPY OF
776                if let Some(ref src) = like_graph {
777                    let src_key = self.effective_graph_key(src);
778                    if self.store.graph(&src_key).is_none() {
779                        return Err(Error::Query(QueryError::new(
780                            QueryErrorKind::Semantic,
781                            format!("Source graph '{src}' does not exist"),
782                        )));
783                    }
784                }
785                if let Some(ref src) = copy_of {
786                    let src_key = self.effective_graph_key(src);
787                    if self.store.graph(&src_key).is_none() {
788                        return Err(Error::Query(QueryError::new(
789                            QueryErrorKind::Semantic,
790                            format!("Source graph '{src}' does not exist"),
791                        )));
792                    }
793                }
794
795                let created = self
796                    .store
797                    .create_graph(&storage_key)
798                    .map_err(|e| Error::Internal(e.to_string()))?;
799                if !created && !if_not_exists {
800                    return Err(Error::Query(QueryError::new(
801                        QueryErrorKind::Semantic,
802                        format!("Graph '{name}' already exists"),
803                    )));
804                }
805                if created {
806                    #[cfg(feature = "wal")]
807                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
808                        name: storage_key.clone(),
809                    });
810                }
811
812                // AS COPY OF: copy data from source graph
813                if let Some(ref src) = copy_of {
814                    let src_key = self.effective_graph_key(src);
815                    self.store
816                        .copy_graph(Some(&src_key), Some(&storage_key))
817                        .map_err(|e| Error::Internal(e.to_string()))?;
818                }
819
820                // Bind to graph type if specified.
821                // If the parser produced a '/' in the name it is already a qualified
822                // "schema/type" key; otherwise resolve against the current schema.
823                if let Some(type_name) = typed
824                    && let Err(e) = self.catalog.bind_graph_type(
825                        &storage_key,
826                        if type_name.contains('/') {
827                            type_name.clone()
828                        } else {
829                            self.effective_type_key(&type_name)
830                        },
831                    )
832                {
833                    return Err(Error::Query(QueryError::new(
834                        QueryErrorKind::Semantic,
835                        e.to_string(),
836                    )));
837                }
838
839                // LIKE: copy graph type binding from source
840                if let Some(ref src) = like_graph {
841                    let src_key = self.effective_graph_key(src);
842                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
843                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
844                    }
845                }
846
847                Ok(QueryResult::empty())
848            }
849            #[cfg(feature = "lpg")]
850            SessionCommand::DropGraph { name, if_exists } => {
851                let storage_key = self.effective_graph_key(&name);
852                let dropped = self.store.drop_graph(&storage_key);
853                if !dropped && !if_exists {
854                    return Err(Error::Query(QueryError::new(
855                        QueryErrorKind::Semantic,
856                        format!("Graph '{name}' does not exist"),
857                    )));
858                }
859                if dropped {
860                    #[cfg(feature = "wal")]
861                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
862                        name: storage_key.clone(),
863                    });
864                    // If this session was using the dropped graph, reset to default
865                    let mut current = self.current_graph.lock();
866                    if current
867                        .as_deref()
868                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
869                    {
870                        *current = None;
871                    }
872                }
873                Ok(QueryResult::empty())
874            }
875            #[cfg(feature = "lpg")]
876            SessionCommand::UseGraph(name) => {
877                // Check per-graph grant before switching
878                if self.identity.has_grants()
879                    && !name.eq_ignore_ascii_case("default")
880                    && !self
881                        .identity
882                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
883                {
884                    return Err(Error::Query(QueryError::new(
885                        QueryErrorKind::Semantic,
886                        format!(
887                            "permission denied: no grant for graph '{name}' (user: {})",
888                            self.identity.user_id()
889                        ),
890                    )));
891                }
892                // Verify graph exists (resolve within current schema)
893                let effective_key = self.effective_graph_key(&name);
894                if !name.eq_ignore_ascii_case("default")
895                    && self.store.graph(&effective_key).is_none()
896                {
897                    return Err(Error::Query(QueryError::new(
898                        QueryErrorKind::Semantic,
899                        format!("Graph '{name}' does not exist"),
900                    )));
901                }
902                self.use_graph(&name);
903                // Track the new graph if in a transaction
904                self.track_graph_touch();
905                Ok(QueryResult::empty())
906            }
907            #[cfg(feature = "lpg")]
908            SessionCommand::SessionSetGraph(name) => {
909                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
910                // Check per-graph grant before switching (same as USE GRAPH)
911                if self.identity.has_grants()
912                    && !name.eq_ignore_ascii_case("default")
913                    && !self
914                        .identity
915                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
916                {
917                    return Err(Error::Query(QueryError::new(
918                        QueryErrorKind::Semantic,
919                        format!(
920                            "permission denied: no grant for graph '{name}' (user: {})",
921                            self.identity.user_id()
922                        ),
923                    )));
924                }
925                let effective_key = self.effective_graph_key(&name);
926                if !name.eq_ignore_ascii_case("default")
927                    && self.store.graph(&effective_key).is_none()
928                {
929                    return Err(Error::Query(QueryError::new(
930                        QueryErrorKind::Semantic,
931                        format!("Graph '{name}' does not exist"),
932                    )));
933                }
934                self.use_graph(&name);
935                // Track the new graph if in a transaction
936                self.track_graph_touch();
937                Ok(QueryResult::empty())
938            }
939            SessionCommand::SessionSetSchema(name) => {
940                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
941                if !self.catalog.schema_exists(&name) {
942                    return Err(Error::Query(QueryError::new(
943                        QueryErrorKind::Semantic,
944                        format!("Schema '{name}' does not exist"),
945                    )));
946                }
947                self.set_schema(&name);
948                Ok(QueryResult::empty())
949            }
950            SessionCommand::SessionSetTimeZone(tz) => {
951                self.set_time_zone(&tz);
952                Ok(QueryResult::empty())
953            }
954            SessionCommand::SessionSetParameter(key, expr) => {
955                if key.eq_ignore_ascii_case("viewing_epoch") {
956                    match Self::eval_integer_literal(&expr) {
957                        Some(n) if n >= 0 => {
958                            self.set_viewing_epoch(EpochId::new(n as u64));
959                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
960                        }
961                        _ => Err(Error::Query(QueryError::new(
962                            QueryErrorKind::Semantic,
963                            "viewing_epoch must be a non-negative integer literal",
964                        ))),
965                    }
966                } else {
967                    // For now, store parameter name with Null value.
968                    // Full expression evaluation would require building and executing a plan.
969                    self.set_parameter(&key, Value::Null);
970                    Ok(QueryResult::empty())
971                }
972            }
973            SessionCommand::SessionReset(target) => {
974                use grafeo_adapters::query::gql::ast::SessionResetTarget;
975                match target {
976                    SessionResetTarget::All => self.reset_session(),
977                    SessionResetTarget::Schema => self.reset_schema(),
978                    SessionResetTarget::Graph => self.reset_graph(),
979                    SessionResetTarget::TimeZone => self.reset_time_zone(),
980                    SessionResetTarget::Parameters => self.reset_parameters(),
981                }
982                Ok(QueryResult::empty())
983            }
984            SessionCommand::SessionClose => {
985                self.reset_session();
986                Ok(QueryResult::empty())
987            }
988            #[cfg(feature = "lpg")]
989            SessionCommand::StartTransaction {
990                read_only,
991                isolation_level,
992            } => {
993                let engine_level = isolation_level.map(|l| match l {
994                    TransactionIsolationLevel::ReadCommitted => {
995                        crate::transaction::IsolationLevel::ReadCommitted
996                    }
997                    TransactionIsolationLevel::SnapshotIsolation => {
998                        crate::transaction::IsolationLevel::SnapshotIsolation
999                    }
1000                    TransactionIsolationLevel::Serializable => {
1001                        crate::transaction::IsolationLevel::Serializable
1002                    }
1003                });
1004                self.begin_transaction_inner(read_only, engine_level)?;
1005                Ok(QueryResult::status("Transaction started"))
1006            }
1007            #[cfg(feature = "lpg")]
1008            SessionCommand::Commit => {
1009                self.commit_inner()?;
1010                Ok(QueryResult::status("Transaction committed"))
1011            }
1012            #[cfg(feature = "lpg")]
1013            SessionCommand::Rollback => {
1014                self.rollback_inner()?;
1015                Ok(QueryResult::status("Transaction rolled back"))
1016            }
1017            #[cfg(feature = "lpg")]
1018            SessionCommand::Savepoint(name) => {
1019                self.savepoint(&name)?;
1020                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1021            }
1022            #[cfg(feature = "lpg")]
1023            SessionCommand::RollbackToSavepoint(name) => {
1024                self.rollback_to_savepoint(&name)?;
1025                Ok(QueryResult::status(format!(
1026                    "Rolled back to savepoint '{name}'"
1027                )))
1028            }
1029            #[cfg(feature = "lpg")]
1030            SessionCommand::ReleaseSavepoint(name) => {
1031                self.release_savepoint(&name)?;
1032                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1033            }
1034            #[cfg(feature = "lpg")]
1035            SessionCommand::CreateProjection {
1036                name,
1037                node_labels,
1038                edge_types,
1039            } => {
1040                use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1041                use std::collections::hash_map::Entry;
1042
1043                let spec = ProjectionSpec::new()
1044                    .with_node_labels(node_labels)
1045                    .with_edge_types(edge_types);
1046
1047                let store = self.active_store();
1048                let projection = Arc::new(GraphProjection::new(store, spec));
1049                let mut projections = self.projections.write();
1050                match projections.entry(name.clone()) {
1051                    Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1052                        QueryErrorKind::Semantic,
1053                        format!("Projection '{name}' already exists"),
1054                    ))),
1055                    Entry::Vacant(e) => {
1056                        e.insert(projection);
1057                        Ok(QueryResult::status(format!("Projection '{name}' created")))
1058                    }
1059                }
1060            }
1061            #[cfg(feature = "lpg")]
1062            SessionCommand::DropProjection { name } => {
1063                let removed = self.projections.write().remove(&name).is_some();
1064                if !removed {
1065                    return Err(Error::Query(QueryError::new(
1066                        QueryErrorKind::Semantic,
1067                        format!("Projection '{name}' does not exist"),
1068                    )));
1069                }
1070                Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1071            }
1072            #[cfg(feature = "lpg")]
1073            SessionCommand::ShowProjections => {
1074                let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1075                names.sort();
1076                let rows: Vec<Vec<Value>> =
1077                    names.into_iter().map(|n| vec![Value::from(n)]).collect();
1078                Ok(QueryResult {
1079                    columns: vec!["name".to_string()],
1080                    column_types: Vec::new(),
1081                    rows,
1082                    ..QueryResult::empty()
1083                })
1084            }
1085            #[cfg(not(feature = "lpg"))]
1086            _ => Err(grafeo_common::utils::error::Error::Internal(
1087                "This command requires the `lpg` feature".to_string(),
1088            )),
1089        }
1090    }
1091
1092    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
1093    #[cfg(feature = "wal")]
1094    fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1095        if let Some(ref wal) = self.wal
1096            && let Err(e) = wal.log(record)
1097        {
1098            grafeo_warn!("Failed to log schema change to WAL: {}", e);
1099        }
1100    }
1101
1102    /// Executes a schema DDL command, returning a status result.
1103    #[cfg(all(feature = "lpg", feature = "gql"))]
1104    fn execute_schema_command(
1105        &self,
1106        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1107    ) -> Result<QueryResult> {
1108        use crate::catalog::{
1109            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1110        };
1111        use grafeo_adapters::query::gql::ast::SchemaStatement;
1112        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1113        #[cfg(feature = "wal")]
1114        use grafeo_storage::wal::WalRecord;
1115
1116        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
1117        macro_rules! wal_log {
1118            ($self:expr, $record:expr) => {
1119                #[cfg(feature = "wal")]
1120                $self.log_schema_wal(&$record);
1121            };
1122        }
1123
1124        let result = match cmd {
1125            SchemaStatement::CreateNodeType(stmt) => {
1126                let effective_name = self.effective_type_key(&stmt.name);
1127                #[cfg(feature = "wal")]
1128                let props_for_wal: Vec<(String, String, bool)> = stmt
1129                    .properties
1130                    .iter()
1131                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1132                    .collect();
1133                let def = NodeTypeDefinition {
1134                    name: effective_name.clone(),
1135                    properties: stmt
1136                        .properties
1137                        .iter()
1138                        .map(|p| TypedProperty {
1139                            name: p.name.clone(),
1140                            data_type: PropertyDataType::from_type_name(&p.data_type),
1141                            nullable: p.nullable,
1142                            default_value: p
1143                                .default_value
1144                                .as_ref()
1145                                .map(|s| parse_default_literal(s)),
1146                        })
1147                        .collect(),
1148                    constraints: Vec::new(),
1149                    parent_types: stmt.parent_types.clone(),
1150                };
1151                let result = if stmt.or_replace {
1152                    let _ = self.catalog.drop_node_type(&effective_name);
1153                    self.catalog.register_node_type(def)
1154                } else {
1155                    self.catalog.register_node_type(def)
1156                };
1157                match result {
1158                    Ok(()) => {
1159                        wal_log!(
1160                            self,
1161                            WalRecord::CreateNodeType {
1162                                name: effective_name.clone(),
1163                                properties: props_for_wal,
1164                                constraints: Vec::new(),
1165                            }
1166                        );
1167                        Ok(QueryResult::status(format!(
1168                            "Created node type '{}'",
1169                            stmt.name
1170                        )))
1171                    }
1172                    Err(e) if stmt.if_not_exists => {
1173                        let _ = e;
1174                        Ok(QueryResult::status("No change"))
1175                    }
1176                    Err(e) => Err(Error::Query(QueryError::new(
1177                        QueryErrorKind::Semantic,
1178                        e.to_string(),
1179                    ))),
1180                }
1181            }
1182            SchemaStatement::CreateEdgeType(stmt) => {
1183                let effective_name = self.effective_type_key(&stmt.name);
1184                #[cfg(feature = "wal")]
1185                let props_for_wal: Vec<(String, String, bool)> = stmt
1186                    .properties
1187                    .iter()
1188                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1189                    .collect();
1190                let def = EdgeTypeDefinition {
1191                    name: effective_name.clone(),
1192                    properties: stmt
1193                        .properties
1194                        .iter()
1195                        .map(|p| TypedProperty {
1196                            name: p.name.clone(),
1197                            data_type: PropertyDataType::from_type_name(&p.data_type),
1198                            nullable: p.nullable,
1199                            default_value: p
1200                                .default_value
1201                                .as_ref()
1202                                .map(|s| parse_default_literal(s)),
1203                        })
1204                        .collect(),
1205                    constraints: Vec::new(),
1206                    source_node_types: stmt.source_node_types.clone(),
1207                    target_node_types: stmt.target_node_types.clone(),
1208                };
1209                let result = if stmt.or_replace {
1210                    let _ = self.catalog.drop_edge_type_def(&effective_name);
1211                    self.catalog.register_edge_type_def(def)
1212                } else {
1213                    self.catalog.register_edge_type_def(def)
1214                };
1215                match result {
1216                    Ok(()) => {
1217                        wal_log!(
1218                            self,
1219                            WalRecord::CreateEdgeType {
1220                                name: effective_name.clone(),
1221                                properties: props_for_wal,
1222                                constraints: Vec::new(),
1223                            }
1224                        );
1225                        Ok(QueryResult::status(format!(
1226                            "Created edge type '{}'",
1227                            stmt.name
1228                        )))
1229                    }
1230                    Err(e) if stmt.if_not_exists => {
1231                        let _ = e;
1232                        Ok(QueryResult::status("No change"))
1233                    }
1234                    Err(e) => Err(Error::Query(QueryError::new(
1235                        QueryErrorKind::Semantic,
1236                        e.to_string(),
1237                    ))),
1238                }
1239            }
1240            SchemaStatement::CreateVectorIndex(stmt) => {
1241                Self::create_vector_index_on_store(
1242                    &self.active_lpg_store(),
1243                    &stmt.node_label,
1244                    &stmt.property,
1245                    stmt.dimensions,
1246                    stmt.metric.as_deref(),
1247                )?;
1248                wal_log!(
1249                    self,
1250                    WalRecord::CreateIndex {
1251                        name: stmt.name.clone(),
1252                        label: stmt.node_label.clone(),
1253                        property: stmt.property.clone(),
1254                        index_type: "vector".to_string(),
1255                    }
1256                );
1257                Ok(QueryResult::status(format!(
1258                    "Created vector index '{}'",
1259                    stmt.name
1260                )))
1261            }
1262            SchemaStatement::DropNodeType { name, if_exists } => {
1263                let effective_name = self.effective_type_key(&name);
1264                match self.catalog.drop_node_type(&effective_name) {
1265                    Ok(()) => {
1266                        wal_log!(
1267                            self,
1268                            WalRecord::DropNodeType {
1269                                name: effective_name
1270                            }
1271                        );
1272                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1273                    }
1274                    Err(e) if if_exists => {
1275                        let _ = e;
1276                        Ok(QueryResult::status("No change"))
1277                    }
1278                    Err(e) => Err(Error::Query(QueryError::new(
1279                        QueryErrorKind::Semantic,
1280                        e.to_string(),
1281                    ))),
1282                }
1283            }
1284            SchemaStatement::DropEdgeType { name, if_exists } => {
1285                let effective_name = self.effective_type_key(&name);
1286                match self.catalog.drop_edge_type_def(&effective_name) {
1287                    Ok(()) => {
1288                        wal_log!(
1289                            self,
1290                            WalRecord::DropEdgeType {
1291                                name: effective_name
1292                            }
1293                        );
1294                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1295                    }
1296                    Err(e) if if_exists => {
1297                        let _ = e;
1298                        Ok(QueryResult::status("No change"))
1299                    }
1300                    Err(e) => Err(Error::Query(QueryError::new(
1301                        QueryErrorKind::Semantic,
1302                        e.to_string(),
1303                    ))),
1304                }
1305            }
1306            SchemaStatement::CreateIndex(stmt) => {
1307                use crate::catalog::IndexType as CatalogIndexType;
1308                use grafeo_adapters::query::gql::ast::IndexKind;
1309                let active = self.active_lpg_store();
1310                let index_type_str = match stmt.index_kind {
1311                    IndexKind::Property => "property",
1312                    IndexKind::BTree => "btree",
1313                    IndexKind::Text => "text",
1314                    IndexKind::Vector => "vector",
1315                };
1316                match stmt.index_kind {
1317                    IndexKind::Property | IndexKind::BTree => {
1318                        for prop in &stmt.properties {
1319                            active.create_property_index(prop);
1320                        }
1321                    }
1322                    IndexKind::Text => {
1323                        for prop in &stmt.properties {
1324                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1325                        }
1326                    }
1327                    IndexKind::Vector => {
1328                        for prop in &stmt.properties {
1329                            Self::create_vector_index_on_store(
1330                                &active,
1331                                &stmt.label,
1332                                prop,
1333                                stmt.options.dimensions,
1334                                stmt.options.metric.as_deref(),
1335                            )?;
1336                        }
1337                    }
1338                }
1339                // Register each property index in the catalog for SHOW INDEXES
1340                // and DROP INDEX name-based lookup.
1341                let catalog_index_type = match stmt.index_kind {
1342                    IndexKind::Property => CatalogIndexType::Hash,
1343                    IndexKind::BTree => CatalogIndexType::BTree,
1344                    IndexKind::Text => CatalogIndexType::FullText,
1345                    IndexKind::Vector => CatalogIndexType::Hash,
1346                };
1347                let label_id = self.catalog.get_or_create_label(&stmt.label);
1348                for prop in &stmt.properties {
1349                    let prop_id = self.catalog.get_or_create_property_key(prop);
1350                    self.catalog
1351                        .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1352                }
1353                #[cfg(feature = "wal")]
1354                for prop in &stmt.properties {
1355                    wal_log!(
1356                        self,
1357                        WalRecord::CreateIndex {
1358                            name: stmt.name.clone(),
1359                            label: stmt.label.clone(),
1360                            property: prop.clone(),
1361                            index_type: index_type_str.to_string(),
1362                        }
1363                    );
1364                }
1365                Ok(QueryResult::status(format!(
1366                    "Created {} index '{}'",
1367                    index_type_str, stmt.name
1368                )))
1369            }
1370            SchemaStatement::DropIndex { name, if_exists } => {
1371                // Look up the index by name in the catalog to find the
1372                // underlying property, then drop from both catalog and store.
1373                if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1374                    let def = self.catalog.get_index(index_id);
1375                    self.catalog.drop_index(index_id);
1376                    if let Some(def) = def
1377                        && let Some(prop_name) =
1378                            self.catalog.get_property_key_name(def.property_key)
1379                    {
1380                        self.active_lpg_store().drop_property_index(&prop_name);
1381                    }
1382                    wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1383                    Ok(QueryResult::status(format!("Dropped index '{name}'")))
1384                } else if if_exists {
1385                    Ok(QueryResult::status("No change".to_string()))
1386                } else {
1387                    Err(Error::Query(QueryError::new(
1388                        QueryErrorKind::Semantic,
1389                        format!("Index '{name}' does not exist"),
1390                    )))
1391                }
1392            }
1393            SchemaStatement::CreateConstraint(stmt) => {
1394                use crate::catalog::TypeConstraint;
1395                use grafeo_adapters::query::gql::ast::ConstraintKind;
1396                let kind_str = match stmt.constraint_kind {
1397                    ConstraintKind::Unique => "unique",
1398                    ConstraintKind::NodeKey => "node_key",
1399                    ConstraintKind::NotNull => "not_null",
1400                    ConstraintKind::Exists => "exists",
1401                };
1402                let constraint_name = stmt
1403                    .name
1404                    .clone()
1405                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1406
1407                // Register constraint in catalog type definitions
1408                match stmt.constraint_kind {
1409                    ConstraintKind::Unique => {
1410                        for prop in &stmt.properties {
1411                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1412                            let prop_id = self.catalog.get_or_create_property_key(prop);
1413                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1414                        }
1415                        let _ = self.catalog.add_constraint_to_type(
1416                            &stmt.label,
1417                            TypeConstraint::Unique(stmt.properties.clone()),
1418                        );
1419                    }
1420                    ConstraintKind::NodeKey => {
1421                        for prop in &stmt.properties {
1422                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1423                            let prop_id = self.catalog.get_or_create_property_key(prop);
1424                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1425                            let _ = self.catalog.add_required_property(label_id, prop_id);
1426                        }
1427                        let _ = self.catalog.add_constraint_to_type(
1428                            &stmt.label,
1429                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1430                        );
1431                    }
1432                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1433                        for prop in &stmt.properties {
1434                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1435                            let prop_id = self.catalog.get_or_create_property_key(prop);
1436                            let _ = self.catalog.add_required_property(label_id, prop_id);
1437                            let _ = self.catalog.add_constraint_to_type(
1438                                &stmt.label,
1439                                TypeConstraint::NotNull(prop.clone()),
1440                            );
1441                        }
1442                    }
1443                }
1444
1445                wal_log!(
1446                    self,
1447                    WalRecord::CreateConstraint {
1448                        name: constraint_name.clone(),
1449                        label: stmt.label.clone(),
1450                        properties: stmt.properties.clone(),
1451                        kind: kind_str.to_string(),
1452                    }
1453                );
1454                Ok(QueryResult::status(format!(
1455                    "Created {kind_str} constraint '{constraint_name}'"
1456                )))
1457            }
1458            SchemaStatement::DropConstraint { name, if_exists } => {
1459                let _ = if_exists;
1460                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1461                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1462            }
1463            SchemaStatement::CreateGraphType(stmt) => {
1464                use crate::catalog::GraphTypeDefinition;
1465                use grafeo_adapters::query::gql::ast::InlineElementType;
1466
1467                let effective_name = self.effective_type_key(&stmt.name);
1468
1469                // GG04: LIKE clause copies type from existing graph
1470                let (mut node_types, mut edge_types, open) =
1471                    if let Some(ref like_graph) = stmt.like_graph {
1472                        // Infer types from the graph's bound type, or use its existing types
1473                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1474                            if let Some(existing) = self
1475                                .catalog
1476                                .schema()
1477                                .and_then(|s| s.get_graph_type(&type_name))
1478                            {
1479                                (
1480                                    existing.allowed_node_types.clone(),
1481                                    existing.allowed_edge_types.clone(),
1482                                    existing.open,
1483                                )
1484                            } else {
1485                                (Vec::new(), Vec::new(), true)
1486                            }
1487                        } else {
1488                            // GG22: Infer from graph data (labels used in graph)
1489                            let nt = self.catalog.all_node_type_names();
1490                            let et = self.catalog.all_edge_type_names();
1491                            if nt.is_empty() && et.is_empty() {
1492                                (Vec::new(), Vec::new(), true)
1493                            } else {
1494                                (nt, et, false)
1495                            }
1496                        }
1497                    } else {
1498                        // Prefix element type names with schema for consistency
1499                        let nt = stmt
1500                            .node_types
1501                            .iter()
1502                            .map(|n| self.effective_type_key(n))
1503                            .collect();
1504                        let et = stmt
1505                            .edge_types
1506                            .iter()
1507                            .map(|n| self.effective_type_key(n))
1508                            .collect();
1509                        (nt, et, stmt.open)
1510                    };
1511
1512                // GG03: Register inline element types and add their names
1513                for inline in &stmt.inline_types {
1514                    match inline {
1515                        InlineElementType::Node {
1516                            name,
1517                            properties,
1518                            key_labels,
1519                            ..
1520                        } => {
1521                            let inline_effective = self.effective_type_key(name);
1522                            let def = NodeTypeDefinition {
1523                                name: inline_effective.clone(),
1524                                properties: properties
1525                                    .iter()
1526                                    .map(|p| TypedProperty {
1527                                        name: p.name.clone(),
1528                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1529                                        nullable: p.nullable,
1530                                        default_value: None,
1531                                    })
1532                                    .collect(),
1533                                constraints: Vec::new(),
1534                                parent_types: key_labels.clone(),
1535                            };
1536                            // Register or replace so inline defs override existing
1537                            self.catalog.register_or_replace_node_type(def);
1538                            #[cfg(feature = "wal")]
1539                            {
1540                                let props_for_wal: Vec<(String, String, bool)> = properties
1541                                    .iter()
1542                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1543                                    .collect();
1544                                self.log_schema_wal(&WalRecord::CreateNodeType {
1545                                    name: inline_effective.clone(),
1546                                    properties: props_for_wal,
1547                                    constraints: Vec::new(),
1548                                });
1549                            }
1550                            if !node_types.contains(&inline_effective) {
1551                                node_types.push(inline_effective);
1552                            }
1553                        }
1554                        InlineElementType::Edge {
1555                            name,
1556                            properties,
1557                            source_node_types,
1558                            target_node_types,
1559                            ..
1560                        } => {
1561                            let inline_effective = self.effective_type_key(name);
1562                            let def = EdgeTypeDefinition {
1563                                name: inline_effective.clone(),
1564                                properties: properties
1565                                    .iter()
1566                                    .map(|p| TypedProperty {
1567                                        name: p.name.clone(),
1568                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1569                                        nullable: p.nullable,
1570                                        default_value: None,
1571                                    })
1572                                    .collect(),
1573                                constraints: Vec::new(),
1574                                source_node_types: source_node_types.clone(),
1575                                target_node_types: target_node_types.clone(),
1576                            };
1577                            self.catalog.register_or_replace_edge_type_def(def);
1578                            #[cfg(feature = "wal")]
1579                            {
1580                                let props_for_wal: Vec<(String, String, bool)> = properties
1581                                    .iter()
1582                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1583                                    .collect();
1584                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1585                                    name: inline_effective.clone(),
1586                                    properties: props_for_wal,
1587                                    constraints: Vec::new(),
1588                                });
1589                            }
1590                            if !edge_types.contains(&inline_effective) {
1591                                edge_types.push(inline_effective);
1592                            }
1593                        }
1594                    }
1595                }
1596
1597                let def = GraphTypeDefinition {
1598                    name: effective_name.clone(),
1599                    allowed_node_types: node_types.clone(),
1600                    allowed_edge_types: edge_types.clone(),
1601                    open,
1602                };
1603                let result = if stmt.or_replace {
1604                    // Drop existing first, ignore error if not found
1605                    let _ = self.catalog.drop_graph_type(&effective_name);
1606                    self.catalog.register_graph_type(def)
1607                } else {
1608                    self.catalog.register_graph_type(def)
1609                };
1610                match result {
1611                    Ok(()) => {
1612                        wal_log!(
1613                            self,
1614                            WalRecord::CreateGraphType {
1615                                name: effective_name.clone(),
1616                                node_types,
1617                                edge_types,
1618                                open,
1619                            }
1620                        );
1621                        Ok(QueryResult::status(format!(
1622                            "Created graph type '{}'",
1623                            stmt.name
1624                        )))
1625                    }
1626                    Err(e) if stmt.if_not_exists => {
1627                        let _ = e;
1628                        Ok(QueryResult::status("No change"))
1629                    }
1630                    Err(e) => Err(Error::Query(QueryError::new(
1631                        QueryErrorKind::Semantic,
1632                        e.to_string(),
1633                    ))),
1634                }
1635            }
1636            SchemaStatement::DropGraphType { name, if_exists } => {
1637                let effective_name = self.effective_type_key(&name);
1638                match self.catalog.drop_graph_type(&effective_name) {
1639                    Ok(()) => {
1640                        wal_log!(
1641                            self,
1642                            WalRecord::DropGraphType {
1643                                name: effective_name
1644                            }
1645                        );
1646                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1647                    }
1648                    Err(e) if if_exists => {
1649                        let _ = e;
1650                        Ok(QueryResult::status("No change"))
1651                    }
1652                    Err(e) => Err(Error::Query(QueryError::new(
1653                        QueryErrorKind::Semantic,
1654                        e.to_string(),
1655                    ))),
1656                }
1657            }
1658            SchemaStatement::CreateSchema {
1659                name,
1660                if_not_exists,
1661            } => match self.catalog.register_schema_namespace(name.clone()) {
1662                Ok(()) => {
1663                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1664                    // Auto-create the schema's default graph partition so that
1665                    // SESSION SET SCHEMA + queries work without an explicit graph.
1666                    let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1667                    if self.store.create_graph(&default_key).unwrap_or(false) {
1668                        wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1669                    }
1670                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1671                }
1672                Err(e) if if_not_exists => {
1673                    let _ = e;
1674                    Ok(QueryResult::status("No change"))
1675                }
1676                Err(e) => Err(Error::Query(QueryError::new(
1677                    QueryErrorKind::Semantic,
1678                    e.to_string(),
1679                ))),
1680            },
1681            SchemaStatement::DropSchema { name, if_exists } => {
1682                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping.
1683                // The auto-created __default__ graph is exempt from the check.
1684                let prefix = format!("{name}/");
1685                let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1686                let has_graphs = self
1687                    .store
1688                    .graph_names()
1689                    .iter()
1690                    .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1691                let has_types = self
1692                    .catalog
1693                    .all_node_type_names()
1694                    .iter()
1695                    .any(|n| n.starts_with(&prefix))
1696                    || self
1697                        .catalog
1698                        .all_edge_type_names()
1699                        .iter()
1700                        .any(|n| n.starts_with(&prefix))
1701                    || self
1702                        .catalog
1703                        .all_graph_type_names()
1704                        .iter()
1705                        .any(|n| n.starts_with(&prefix));
1706                if has_graphs || has_types {
1707                    return Err(Error::Query(QueryError::new(
1708                        QueryErrorKind::Semantic,
1709                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1710                    )));
1711                }
1712                match self.catalog.drop_schema_namespace(&name) {
1713                    Ok(()) => {
1714                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1715                        // Drop the auto-created default graph partition
1716                        if self.store.drop_graph(&default_graph_key) {
1717                            wal_log!(
1718                                self,
1719                                WalRecord::DropNamedGraph {
1720                                    name: default_graph_key,
1721                                }
1722                            );
1723                        }
1724                        // If this session was using the dropped schema, reset it
1725                        let mut current = self.current_schema.lock();
1726                        if current
1727                            .as_deref()
1728                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1729                        {
1730                            *current = None;
1731                        }
1732                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1733                    }
1734                    Err(e) if if_exists => {
1735                        let _ = e;
1736                        Ok(QueryResult::status("No change"))
1737                    }
1738                    Err(e) => Err(Error::Query(QueryError::new(
1739                        QueryErrorKind::Semantic,
1740                        e.to_string(),
1741                    ))),
1742                }
1743            }
1744            SchemaStatement::AlterNodeType(stmt) => {
1745                use grafeo_adapters::query::gql::ast::TypeAlteration;
1746                let effective_name = self.effective_type_key(&stmt.name);
1747                let mut wal_alts = Vec::new();
1748                for alt in &stmt.alterations {
1749                    match alt {
1750                        TypeAlteration::AddProperty(prop) => {
1751                            let typed = TypedProperty {
1752                                name: prop.name.clone(),
1753                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1754                                nullable: prop.nullable,
1755                                default_value: prop
1756                                    .default_value
1757                                    .as_ref()
1758                                    .map(|s| parse_default_literal(s)),
1759                            };
1760                            self.catalog
1761                                .alter_node_type_add_property(&effective_name, typed)
1762                                .map_err(|e| {
1763                                    Error::Query(QueryError::new(
1764                                        QueryErrorKind::Semantic,
1765                                        e.to_string(),
1766                                    ))
1767                                })?;
1768                            wal_alts.push((
1769                                "add".to_string(),
1770                                prop.name.clone(),
1771                                prop.data_type.clone(),
1772                                prop.nullable,
1773                            ));
1774                        }
1775                        TypeAlteration::DropProperty(name) => {
1776                            self.catalog
1777                                .alter_node_type_drop_property(&effective_name, name)
1778                                .map_err(|e| {
1779                                    Error::Query(QueryError::new(
1780                                        QueryErrorKind::Semantic,
1781                                        e.to_string(),
1782                                    ))
1783                                })?;
1784                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1785                        }
1786                    }
1787                }
1788                wal_log!(
1789                    self,
1790                    WalRecord::AlterNodeType {
1791                        name: effective_name,
1792                        alterations: wal_alts,
1793                    }
1794                );
1795                Ok(QueryResult::status(format!(
1796                    "Altered node type '{}'",
1797                    stmt.name
1798                )))
1799            }
1800            SchemaStatement::AlterEdgeType(stmt) => {
1801                use grafeo_adapters::query::gql::ast::TypeAlteration;
1802                let effective_name = self.effective_type_key(&stmt.name);
1803                let mut wal_alts = Vec::new();
1804                for alt in &stmt.alterations {
1805                    match alt {
1806                        TypeAlteration::AddProperty(prop) => {
1807                            let typed = TypedProperty {
1808                                name: prop.name.clone(),
1809                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1810                                nullable: prop.nullable,
1811                                default_value: prop
1812                                    .default_value
1813                                    .as_ref()
1814                                    .map(|s| parse_default_literal(s)),
1815                            };
1816                            self.catalog
1817                                .alter_edge_type_add_property(&effective_name, typed)
1818                                .map_err(|e| {
1819                                    Error::Query(QueryError::new(
1820                                        QueryErrorKind::Semantic,
1821                                        e.to_string(),
1822                                    ))
1823                                })?;
1824                            wal_alts.push((
1825                                "add".to_string(),
1826                                prop.name.clone(),
1827                                prop.data_type.clone(),
1828                                prop.nullable,
1829                            ));
1830                        }
1831                        TypeAlteration::DropProperty(name) => {
1832                            self.catalog
1833                                .alter_edge_type_drop_property(&effective_name, name)
1834                                .map_err(|e| {
1835                                    Error::Query(QueryError::new(
1836                                        QueryErrorKind::Semantic,
1837                                        e.to_string(),
1838                                    ))
1839                                })?;
1840                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1841                        }
1842                    }
1843                }
1844                wal_log!(
1845                    self,
1846                    WalRecord::AlterEdgeType {
1847                        name: effective_name,
1848                        alterations: wal_alts,
1849                    }
1850                );
1851                Ok(QueryResult::status(format!(
1852                    "Altered edge type '{}'",
1853                    stmt.name
1854                )))
1855            }
1856            SchemaStatement::AlterGraphType(stmt) => {
1857                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1858                let effective_name = self.effective_type_key(&stmt.name);
1859                let mut wal_alts = Vec::new();
1860                for alt in &stmt.alterations {
1861                    match alt {
1862                        GraphTypeAlteration::AddNodeType(name) => {
1863                            self.catalog
1864                                .alter_graph_type_add_node_type(&effective_name, name.clone())
1865                                .map_err(|e| {
1866                                    Error::Query(QueryError::new(
1867                                        QueryErrorKind::Semantic,
1868                                        e.to_string(),
1869                                    ))
1870                                })?;
1871                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1872                        }
1873                        GraphTypeAlteration::DropNodeType(name) => {
1874                            self.catalog
1875                                .alter_graph_type_drop_node_type(&effective_name, name)
1876                                .map_err(|e| {
1877                                    Error::Query(QueryError::new(
1878                                        QueryErrorKind::Semantic,
1879                                        e.to_string(),
1880                                    ))
1881                                })?;
1882                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1883                        }
1884                        GraphTypeAlteration::AddEdgeType(name) => {
1885                            self.catalog
1886                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
1887                                .map_err(|e| {
1888                                    Error::Query(QueryError::new(
1889                                        QueryErrorKind::Semantic,
1890                                        e.to_string(),
1891                                    ))
1892                                })?;
1893                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1894                        }
1895                        GraphTypeAlteration::DropEdgeType(name) => {
1896                            self.catalog
1897                                .alter_graph_type_drop_edge_type(&effective_name, name)
1898                                .map_err(|e| {
1899                                    Error::Query(QueryError::new(
1900                                        QueryErrorKind::Semantic,
1901                                        e.to_string(),
1902                                    ))
1903                                })?;
1904                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1905                        }
1906                    }
1907                }
1908                wal_log!(
1909                    self,
1910                    WalRecord::AlterGraphType {
1911                        name: effective_name,
1912                        alterations: wal_alts,
1913                    }
1914                );
1915                Ok(QueryResult::status(format!(
1916                    "Altered graph type '{}'",
1917                    stmt.name
1918                )))
1919            }
1920            SchemaStatement::CreateProcedure(stmt) => {
1921                use crate::catalog::ProcedureDefinition;
1922
1923                let def = ProcedureDefinition {
1924                    name: stmt.name.clone(),
1925                    params: stmt
1926                        .params
1927                        .iter()
1928                        .map(|p| (p.name.clone(), p.param_type.clone()))
1929                        .collect(),
1930                    returns: stmt
1931                        .returns
1932                        .iter()
1933                        .map(|r| (r.name.clone(), r.return_type.clone()))
1934                        .collect(),
1935                    body: stmt.body.clone(),
1936                };
1937
1938                if stmt.or_replace {
1939                    self.catalog.replace_procedure(def).map_err(|e| {
1940                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1941                    })?;
1942                } else {
1943                    match self.catalog.register_procedure(def) {
1944                        Ok(()) => {}
1945                        Err(_) if stmt.if_not_exists => {
1946                            return Ok(QueryResult::empty());
1947                        }
1948                        Err(e) => {
1949                            return Err(Error::Query(QueryError::new(
1950                                QueryErrorKind::Semantic,
1951                                e.to_string(),
1952                            )));
1953                        }
1954                    }
1955                }
1956
1957                wal_log!(
1958                    self,
1959                    WalRecord::CreateProcedure {
1960                        name: stmt.name.clone(),
1961                        params: stmt
1962                            .params
1963                            .iter()
1964                            .map(|p| (p.name.clone(), p.param_type.clone()))
1965                            .collect(),
1966                        returns: stmt
1967                            .returns
1968                            .iter()
1969                            .map(|r| (r.name.clone(), r.return_type.clone()))
1970                            .collect(),
1971                        body: stmt.body,
1972                    }
1973                );
1974                Ok(QueryResult::status(format!(
1975                    "Created procedure '{}'",
1976                    stmt.name
1977                )))
1978            }
1979            SchemaStatement::DropProcedure { name, if_exists } => {
1980                match self.catalog.drop_procedure(&name) {
1981                    Ok(()) => {}
1982                    Err(_) if if_exists => {
1983                        return Ok(QueryResult::empty());
1984                    }
1985                    Err(e) => {
1986                        return Err(Error::Query(QueryError::new(
1987                            QueryErrorKind::Semantic,
1988                            e.to_string(),
1989                        )));
1990                    }
1991                }
1992                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1993                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1994            }
1995            SchemaStatement::ShowIndexes => {
1996                return self.execute_show_indexes();
1997            }
1998            SchemaStatement::ShowConstraints => {
1999                return self.execute_show_constraints();
2000            }
2001            SchemaStatement::ShowNodeTypes => {
2002                return self.execute_show_node_types();
2003            }
2004            SchemaStatement::ShowEdgeTypes => {
2005                return self.execute_show_edge_types();
2006            }
2007            SchemaStatement::ShowGraphTypes => {
2008                return self.execute_show_graph_types();
2009            }
2010            SchemaStatement::ShowGraphType(name) => {
2011                return self.execute_show_graph_type(&name);
2012            }
2013            SchemaStatement::ShowCurrentGraphType => {
2014                return self.execute_show_current_graph_type();
2015            }
2016            SchemaStatement::ShowGraphs => {
2017                return self.execute_show_graphs();
2018            }
2019            SchemaStatement::ShowSchemas => {
2020                return self.execute_show_schemas();
2021            }
2022        };
2023
2024        // Invalidate all cached query plans after any successful DDL change.
2025        // DDL is rare, so clearing the entire cache is cheap and correct.
2026        if result.is_ok() {
2027            self.query_cache.clear();
2028        }
2029
2030        result
2031    }
2032
2033    /// Creates a vector index on the store by scanning existing nodes.
2034    #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2035    fn create_vector_index_on_store(
2036        store: &LpgStore,
2037        label: &str,
2038        property: &str,
2039        dimensions: Option<usize>,
2040        metric: Option<&str>,
2041    ) -> Result<()> {
2042        use grafeo_common::types::{PropertyKey, Value};
2043        use grafeo_common::utils::error::Error;
2044        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
2045
2046        let metric = match metric {
2047            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2048                Error::Internal(format!(
2049                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2050                ))
2051            })?,
2052            None => DistanceMetric::Cosine,
2053        };
2054
2055        let prop_key = PropertyKey::new(property);
2056        let mut found_dims: Option<usize> = dimensions;
2057        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2058
2059        for node in store.nodes_with_label(label) {
2060            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2061                if let Some(expected) = found_dims {
2062                    if v.len() != expected {
2063                        return Err(Error::Internal(format!(
2064                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
2065                            v.len(),
2066                            node.id.0
2067                        )));
2068                    }
2069                } else {
2070                    found_dims = Some(v.len());
2071                }
2072                vectors.push((node.id, v.to_vec()));
2073            }
2074        }
2075
2076        let Some(dims) = found_dims else {
2077            return Err(Error::Internal(format!(
2078                "No vector properties found on :{label}({property}) and no dimensions specified"
2079            )));
2080        };
2081
2082        let config = HnswConfig::new(dims, metric);
2083        let index = HnswIndex::with_capacity(config, vectors.len());
2084        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2085        for (node_id, vec) in &vectors {
2086            index.insert(*node_id, vec, &accessor);
2087        }
2088
2089        store.add_vector_index(label, property, Arc::new(index));
2090        Ok(())
2091    }
2092
2093    /// Stub for when vector-index feature is not enabled.
2094    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2095    fn create_vector_index_on_store(
2096        _store: &LpgStore,
2097        _label: &str,
2098        _property: &str,
2099        _dimensions: Option<usize>,
2100        _metric: Option<&str>,
2101    ) -> Result<()> {
2102        Err(grafeo_common::utils::error::Error::Internal(
2103            "Vector index support requires the 'vector-index' feature".to_string(),
2104        ))
2105    }
2106
2107    /// Creates a text index on the store by scanning existing nodes.
2108    #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2109    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2110        use grafeo_common::types::{PropertyKey, Value};
2111        use grafeo_core::index::text::{BM25Config, InvertedIndex};
2112
2113        let mut index = InvertedIndex::new(BM25Config::default());
2114        let prop_key = PropertyKey::new(property);
2115
2116        let nodes = store.nodes_by_label(label);
2117        for node_id in nodes {
2118            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2119                index.insert(node_id, text.as_str());
2120            }
2121        }
2122
2123        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2124        Ok(())
2125    }
2126
2127    /// Stub for when text-index feature is not enabled.
2128    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2129    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2130        Err(grafeo_common::utils::error::Error::Internal(
2131            "Text index support requires the 'text-index' feature".to_string(),
2132        ))
2133    }
2134
2135    /// Returns a table of all indexes from the catalog.
2136    fn execute_show_indexes(&self) -> Result<QueryResult> {
2137        let indexes = self.catalog.all_indexes();
2138        let columns = vec![
2139            "name".to_string(),
2140            "type".to_string(),
2141            "label".to_string(),
2142            "property".to_string(),
2143        ];
2144        let rows: Vec<Vec<Value>> = indexes
2145            .into_iter()
2146            .map(|def| {
2147                let label_name = self
2148                    .catalog
2149                    .get_label_name(def.label)
2150                    .unwrap_or_else(|| "?".into());
2151                let prop_name = self
2152                    .catalog
2153                    .get_property_key_name(def.property_key)
2154                    .unwrap_or_else(|| "?".into());
2155                vec![
2156                    Value::from(def.name),
2157                    Value::from(format!("{:?}", def.index_type)),
2158                    Value::from(&*label_name),
2159                    Value::from(&*prop_name),
2160                ]
2161            })
2162            .collect();
2163        Ok(QueryResult {
2164            columns,
2165            column_types: Vec::new(),
2166            rows,
2167            ..QueryResult::empty()
2168        })
2169    }
2170
2171    /// Returns a table of all constraints (currently metadata-only).
2172    fn execute_show_constraints(&self) -> Result<QueryResult> {
2173        // Constraints are tracked in WAL but not yet in a queryable catalog.
2174        // Return an empty table with the expected schema.
2175        Ok(QueryResult {
2176            columns: vec![
2177                "name".to_string(),
2178                "type".to_string(),
2179                "label".to_string(),
2180                "properties".to_string(),
2181            ],
2182            column_types: Vec::new(),
2183            rows: Vec::new(),
2184            ..QueryResult::empty()
2185        })
2186    }
2187
2188    /// Returns a table of all registered node types in the current schema.
2189    fn execute_show_node_types(&self) -> Result<QueryResult> {
2190        let columns = vec![
2191            "name".to_string(),
2192            "properties".to_string(),
2193            "constraints".to_string(),
2194            "parents".to_string(),
2195        ];
2196        let schema = self.current_schema.lock().clone();
2197        let all_names = self.catalog.all_node_type_names();
2198        let type_names: Vec<String> = match &schema {
2199            Some(s) => {
2200                let prefix = format!("{s}/");
2201                all_names
2202                    .into_iter()
2203                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2204                    .collect()
2205            }
2206            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2207        };
2208        let rows: Vec<Vec<Value>> = type_names
2209            .into_iter()
2210            .filter_map(|name| {
2211                let lookup = match &schema {
2212                    Some(s) => format!("{s}/{name}"),
2213                    None => name.clone(),
2214                };
2215                let def = self.catalog.get_node_type(&lookup)?;
2216                let props: Vec<String> = def
2217                    .properties
2218                    .iter()
2219                    .map(|p| {
2220                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2221                        format!("{} {}{}", p.name, p.data_type, nullable)
2222                    })
2223                    .collect();
2224                let constraints: Vec<String> =
2225                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
2226                let parents = def.parent_types.join(", ");
2227                Some(vec![
2228                    Value::from(name),
2229                    Value::from(props.join(", ")),
2230                    Value::from(constraints.join(", ")),
2231                    Value::from(parents),
2232                ])
2233            })
2234            .collect();
2235        Ok(QueryResult {
2236            columns,
2237            column_types: Vec::new(),
2238            rows,
2239            ..QueryResult::empty()
2240        })
2241    }
2242
2243    /// Returns a table of all registered edge types in the current schema.
2244    fn execute_show_edge_types(&self) -> Result<QueryResult> {
2245        let columns = vec![
2246            "name".to_string(),
2247            "properties".to_string(),
2248            "source_types".to_string(),
2249            "target_types".to_string(),
2250        ];
2251        let schema = self.current_schema.lock().clone();
2252        let all_names = self.catalog.all_edge_type_names();
2253        let type_names: Vec<String> = match &schema {
2254            Some(s) => {
2255                let prefix = format!("{s}/");
2256                all_names
2257                    .into_iter()
2258                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2259                    .collect()
2260            }
2261            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2262        };
2263        let rows: Vec<Vec<Value>> = type_names
2264            .into_iter()
2265            .filter_map(|name| {
2266                let lookup = match &schema {
2267                    Some(s) => format!("{s}/{name}"),
2268                    None => name.clone(),
2269                };
2270                let def = self.catalog.get_edge_type_def(&lookup)?;
2271                let props: Vec<String> = def
2272                    .properties
2273                    .iter()
2274                    .map(|p| {
2275                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2276                        format!("{} {}{}", p.name, p.data_type, nullable)
2277                    })
2278                    .collect();
2279                let src = def.source_node_types.join(", ");
2280                let tgt = def.target_node_types.join(", ");
2281                Some(vec![
2282                    Value::from(name),
2283                    Value::from(props.join(", ")),
2284                    Value::from(src),
2285                    Value::from(tgt),
2286                ])
2287            })
2288            .collect();
2289        Ok(QueryResult {
2290            columns,
2291            column_types: Vec::new(),
2292            rows,
2293            ..QueryResult::empty()
2294        })
2295    }
2296
2297    /// Returns a table of all registered graph types in the current schema.
2298    fn execute_show_graph_types(&self) -> Result<QueryResult> {
2299        let columns = vec![
2300            "name".to_string(),
2301            "open".to_string(),
2302            "node_types".to_string(),
2303            "edge_types".to_string(),
2304        ];
2305        let schema = self.current_schema.lock().clone();
2306        let all_names = self.catalog.all_graph_type_names();
2307        let type_names: Vec<String> = match &schema {
2308            Some(s) => {
2309                let prefix = format!("{s}/");
2310                all_names
2311                    .into_iter()
2312                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2313                    .collect()
2314            }
2315            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2316        };
2317        let rows: Vec<Vec<Value>> = type_names
2318            .into_iter()
2319            .filter_map(|name| {
2320                let lookup = match &schema {
2321                    Some(s) => format!("{s}/{name}"),
2322                    None => name.clone(),
2323                };
2324                let def = self.catalog.get_graph_type_def(&lookup)?;
2325                // Strip schema prefix from allowed type names for display
2326                let strip = |n: &String| -> String {
2327                    match &schema {
2328                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2329                        None => n.clone(),
2330                    }
2331                };
2332                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2333                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2334                Some(vec![
2335                    Value::from(name),
2336                    Value::from(def.open),
2337                    Value::from(node_types.join(", ")),
2338                    Value::from(edge_types.join(", ")),
2339                ])
2340            })
2341            .collect();
2342        Ok(QueryResult {
2343            columns,
2344            column_types: Vec::new(),
2345            rows,
2346            ..QueryResult::empty()
2347        })
2348    }
2349
2350    /// Returns the list of named graphs visible in the current schema context.
2351    ///
2352    /// When a session schema is set, only graphs belonging to that schema are
2353    /// shown (their compound prefix is stripped). When no schema is set, graphs
2354    /// without a schema prefix are shown (the default schema).
2355    #[cfg(feature = "lpg")]
2356    fn execute_show_graphs(&self) -> Result<QueryResult> {
2357        let schema = self.current_schema.lock().clone();
2358        let all_names = self.store.graph_names();
2359
2360        let mut names: Vec<String> = match &schema {
2361            Some(s) => {
2362                let prefix = format!("{s}/");
2363                all_names
2364                    .into_iter()
2365                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2366                    .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2367                    .collect()
2368            }
2369            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2370        };
2371        names.sort();
2372
2373        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2374        Ok(QueryResult {
2375            columns: vec!["name".to_string()],
2376            column_types: Vec::new(),
2377            rows,
2378            ..QueryResult::empty()
2379        })
2380    }
2381
2382    /// Returns the list of all schema namespaces.
2383    fn execute_show_schemas(&self) -> Result<QueryResult> {
2384        let mut names = self.catalog.schema_names();
2385        names.sort();
2386        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2387        Ok(QueryResult {
2388            columns: vec!["name".to_string()],
2389            column_types: Vec::new(),
2390            rows,
2391            ..QueryResult::empty()
2392        })
2393    }
2394
2395    /// Returns detailed info for a specific graph type.
2396    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2397        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2398
2399        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2400            Error::Query(QueryError::new(
2401                QueryErrorKind::Semantic,
2402                format!("Graph type '{name}' not found"),
2403            ))
2404        })?;
2405
2406        let columns = vec![
2407            "name".to_string(),
2408            "open".to_string(),
2409            "node_types".to_string(),
2410            "edge_types".to_string(),
2411        ];
2412        let rows = vec![vec![
2413            Value::from(def.name),
2414            Value::from(def.open),
2415            Value::from(def.allowed_node_types.join(", ")),
2416            Value::from(def.allowed_edge_types.join(", ")),
2417        ]];
2418        Ok(QueryResult {
2419            columns,
2420            column_types: Vec::new(),
2421            rows,
2422            ..QueryResult::empty()
2423        })
2424    }
2425
2426    /// Returns the graph type bound to the current graph.
2427    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2428        let graph_name = self
2429            .current_graph()
2430            .unwrap_or_else(|| "default".to_string());
2431        let columns = vec![
2432            "graph".to_string(),
2433            "graph_type".to_string(),
2434            "open".to_string(),
2435            "node_types".to_string(),
2436            "edge_types".to_string(),
2437        ];
2438
2439        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2440            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2441        {
2442            let rows = vec![vec![
2443                Value::from(graph_name),
2444                Value::from(type_name),
2445                Value::from(def.open),
2446                Value::from(def.allowed_node_types.join(", ")),
2447                Value::from(def.allowed_edge_types.join(", ")),
2448            ]];
2449            return Ok(QueryResult {
2450                columns,
2451                column_types: Vec::new(),
2452                rows,
2453                ..QueryResult::empty()
2454            });
2455        }
2456
2457        // No graph type binding found
2458        Ok(QueryResult {
2459            columns,
2460            column_types: Vec::new(),
2461            rows: vec![vec![
2462                Value::from(graph_name),
2463                Value::Null,
2464                Value::Null,
2465                Value::Null,
2466                Value::Null,
2467            ]],
2468            ..QueryResult::empty()
2469        })
2470    }
2471
2472    /// Executes a GQL query.
2473    ///
2474    /// # Errors
2475    ///
2476    /// Returns an error if the query fails to parse or execute.
2477    ///
2478    /// # Examples
2479    ///
2480    /// ```no_run
2481    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2482    /// use grafeo_engine::GrafeoDB;
2483    ///
2484    /// let db = GrafeoDB::new_in_memory();
2485    /// let session = db.session();
2486    ///
2487    /// // Create a node
2488    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2489    ///
2490    /// // Query nodes
2491    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2492    /// for row in result.rows() {
2493    ///     println!("{:?}", row);
2494    /// }
2495    /// # Ok(())
2496    /// # }
2497    /// ```
2498    #[cfg(feature = "gql")]
2499    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2500        self.require_lpg("GQL")?;
2501
2502        use crate::query::{
2503            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2504            processor::QueryLanguage, translators::gql,
2505        };
2506
2507        let _span = grafeo_info_span!(
2508            "grafeo::session::execute",
2509            language = "gql",
2510            query_len = query.len(),
2511        );
2512
2513        #[cfg(not(target_arch = "wasm32"))]
2514        let start_time = std::time::Instant::now();
2515
2516        // Parse and translate, checking for session/schema commands first
2517        let translation = gql::translate_full(query)?;
2518        let logical_plan = match translation {
2519            gql::GqlTranslationResult::SessionCommand(cmd) => {
2520                return self.execute_session_command(cmd);
2521            }
2522            #[cfg(feature = "lpg")]
2523            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2524                // All DDL requires Admin role
2525                self.require_permission(crate::auth::StatementKind::Admin)?;
2526                if *self.read_only_tx.lock() {
2527                    return Err(grafeo_common::utils::error::Error::Transaction(
2528                        grafeo_common::utils::error::TransactionError::ReadOnly,
2529                    ));
2530                }
2531                return self.execute_schema_command(cmd);
2532            }
2533            gql::GqlTranslationResult::Plan(plan) => {
2534                // Check role-based permission before read-only transaction check
2535                if plan.root.has_mutations() {
2536                    self.require_permission(crate::auth::StatementKind::Write)?;
2537                }
2538                // Block mutations in read-only transactions
2539                if *self.read_only_tx.lock() && plan.root.has_mutations() {
2540                    return Err(grafeo_common::utils::error::Error::Transaction(
2541                        grafeo_common::utils::error::TransactionError::ReadOnly,
2542                    ));
2543                }
2544                plan
2545            }
2546            #[cfg(not(feature = "lpg"))]
2547            gql::GqlTranslationResult::SchemaCommand(_) => {
2548                return Err(grafeo_common::utils::error::Error::Internal(
2549                    "Schema commands require the `lpg` feature".to_string(),
2550                ));
2551            }
2552        };
2553
2554        // Create cache key for this query
2555        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2556
2557        // Try to get cached optimized plan, or use the plan we just translated
2558        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2559            cached_plan
2560        } else {
2561            // Semantic validation
2562            let mut binder = Binder::new();
2563            let _binding_context = binder.bind(&logical_plan)?;
2564
2565            // Optimize the plan
2566            let active = self.active_store();
2567            let optimizer = Optimizer::from_graph_store(&*active);
2568            let plan = optimizer.optimize(logical_plan)?;
2569
2570            // Cache the optimized plan for future use
2571            self.query_cache.put_optimized(cache_key, plan.clone());
2572
2573            plan
2574        };
2575
2576        // Resolve the active store for query execution
2577        let active = self.active_store();
2578
2579        // EXPLAIN: annotate pushdown hints and return the plan tree
2580        if optimized_plan.explain {
2581            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2582            let mut plan = optimized_plan;
2583            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2584            return Ok(explain_result(&plan));
2585        }
2586
2587        // PROFILE: execute with per-operator instrumentation
2588        if optimized_plan.profile {
2589            let has_mutations = optimized_plan.root.has_mutations();
2590            return self.with_auto_commit(has_mutations, || {
2591                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2592                let planner = self.create_planner_for_store(
2593                    Arc::clone(&active),
2594                    viewing_epoch,
2595                    transaction_id,
2596                );
2597                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2598
2599                let executor = Executor::with_columns(physical_plan.columns.clone())
2600                    .with_deadline(self.query_deadline());
2601                let _result = executor.execute(physical_plan.operator.as_mut())?;
2602
2603                let total_time_ms;
2604                #[cfg(not(target_arch = "wasm32"))]
2605                {
2606                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2607                }
2608                #[cfg(target_arch = "wasm32")]
2609                {
2610                    total_time_ms = 0.0;
2611                }
2612
2613                let profile_tree = crate::query::profile::build_profile_tree(
2614                    &optimized_plan.root,
2615                    &mut entries.into_iter(),
2616                );
2617                Ok(crate::query::profile::profile_result(
2618                    &profile_tree,
2619                    total_time_ms,
2620                ))
2621            });
2622        }
2623
2624        let has_mutations = optimized_plan.root.has_mutations();
2625
2626        let result = self.with_auto_commit(has_mutations, || {
2627            // Get transaction context for MVCC visibility
2628            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2629
2630            // Convert to physical plan with transaction context
2631            // (Physical planning cannot be cached as it depends on transaction state)
2632            // Safe to use read-only fast path when: this query has no mutations AND
2633            // there is no active transaction that may have prior uncommitted writes.
2634            let has_active_tx = self.current_transaction.lock().is_some();
2635            let read_only = !has_mutations && !has_active_tx;
2636            let planner = self.create_planner_for_store_with_read_only(
2637                Arc::clone(&active),
2638                viewing_epoch,
2639                transaction_id,
2640                read_only,
2641            );
2642            let mut physical_plan = planner.plan(&optimized_plan)?;
2643
2644            // Execute the plan
2645            let executor = Executor::with_columns(physical_plan.columns.clone())
2646                .with_deadline(self.query_deadline());
2647            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2648
2649            // Add execution metrics
2650            let rows_scanned = result.rows.len() as u64;
2651            #[cfg(not(target_arch = "wasm32"))]
2652            {
2653                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2654                result.execution_time_ms = Some(elapsed_ms);
2655            }
2656            result.rows_scanned = Some(rows_scanned);
2657
2658            Ok(result)
2659        });
2660
2661        // Record metrics for this query execution.
2662        #[cfg(feature = "metrics")]
2663        {
2664            #[cfg(not(target_arch = "wasm32"))]
2665            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2666            #[cfg(target_arch = "wasm32")]
2667            let elapsed_ms = None;
2668            self.record_query_metrics("gql", elapsed_ms, &result);
2669        }
2670
2671        result
2672    }
2673
2674    /// Executes a GQL query with visibility at the specified epoch.
2675    ///
2676    /// This enables time-travel queries: the query sees the database
2677    /// as it existed at the given epoch.
2678    ///
2679    /// # Errors
2680    ///
2681    /// Returns an error if parsing or execution fails.
2682    #[cfg(feature = "gql")]
2683    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2684        let previous = self.viewing_epoch_override.lock().replace(epoch);
2685        let result = self.execute(query);
2686        *self.viewing_epoch_override.lock() = previous;
2687        result
2688    }
2689
2690    /// Executes a GQL query at a specific epoch with optional parameters.
2691    ///
2692    /// Combines epoch-based time travel with parameterized queries.
2693    ///
2694    /// # Errors
2695    ///
2696    /// Returns an error if parsing or execution fails.
2697    #[cfg(feature = "gql")]
2698    pub fn execute_at_epoch_with_params(
2699        &self,
2700        query: &str,
2701        epoch: EpochId,
2702        params: Option<std::collections::HashMap<String, Value>>,
2703    ) -> Result<QueryResult> {
2704        let previous = self.viewing_epoch_override.lock().replace(epoch);
2705        let result = if let Some(p) = params {
2706            self.execute_with_params(query, p)
2707        } else {
2708            self.execute(query)
2709        };
2710        *self.viewing_epoch_override.lock() = previous;
2711        result
2712    }
2713
2714    /// Executes a GQL query with parameters.
2715    ///
2716    /// # Errors
2717    ///
2718    /// Returns an error if the query fails to parse or execute.
2719    #[cfg(feature = "gql")]
2720    pub fn execute_with_params(
2721        &self,
2722        query: &str,
2723        params: std::collections::HashMap<String, Value>,
2724    ) -> Result<QueryResult> {
2725        self.require_lpg("GQL")?;
2726
2727        use crate::query::processor::{QueryLanguage, QueryProcessor};
2728
2729        // Reject writes if the identity lacks permission. Parse the query
2730        // to determine mutation status reliably (the text heuristic has false
2731        // negatives that could bypass authorization).
2732        let has_mutations = if self.identity.can_write() {
2733            // Fast path: identity can write, use heuristic for auto-commit only
2734            Self::query_looks_like_mutation(query)
2735        } else {
2736            // Restricted identity: parse to check mutations reliably
2737            use crate::query::translators::gql;
2738            match gql::translate(query) {
2739                Ok(plan) if plan.root.has_mutations() => {
2740                    self.require_permission(crate::auth::StatementKind::Write)?;
2741                    true
2742                }
2743                Ok(_) => false,
2744                // Parse error: let the processor handle it below
2745                Err(_) => Self::query_looks_like_mutation(query),
2746            }
2747        };
2748        let active = self.active_store();
2749
2750        self.with_auto_commit(has_mutations, || {
2751            // Get transaction context for MVCC visibility
2752            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2753
2754            // Create processor with transaction context
2755            let processor = QueryProcessor::for_stores_with_transaction(
2756                Arc::clone(&active),
2757                self.active_write_store(),
2758                Arc::clone(&self.transaction_manager),
2759            )?;
2760
2761            // Apply transaction context if in a transaction
2762            let processor = if let Some(transaction_id) = transaction_id {
2763                processor.with_transaction_context(viewing_epoch, transaction_id)
2764            } else {
2765                processor
2766            };
2767
2768            processor.process(query, QueryLanguage::Gql, Some(&params))
2769        })
2770    }
2771
2772    /// Executes a GQL query with parameters.
2773    ///
2774    /// # Errors
2775    ///
2776    /// Returns an error if no query language is enabled.
2777    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2778    pub fn execute_with_params(
2779        &self,
2780        _query: &str,
2781        _params: std::collections::HashMap<String, Value>,
2782    ) -> Result<QueryResult> {
2783        Err(grafeo_common::utils::error::Error::Internal(
2784            "No query language enabled".to_string(),
2785        ))
2786    }
2787
2788    /// Executes a GQL query.
2789    ///
2790    /// # Errors
2791    ///
2792    /// Returns an error if no query language is enabled.
2793    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2794    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2795        Err(grafeo_common::utils::error::Error::Internal(
2796            "No query language enabled".to_string(),
2797        ))
2798    }
2799
2800    /// Executes a Cypher query.
2801    ///
2802    /// # Errors
2803    ///
2804    /// Returns an error if the query fails to parse or execute.
2805    #[cfg(feature = "cypher")]
2806    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2807        use crate::query::{
2808            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2809            processor::QueryLanguage, translators::cypher,
2810        };
2811
2812        // Handle schema DDL and SHOW commands before the normal query path
2813        let translation = cypher::translate_full(query)?;
2814        match translation {
2815            #[cfg(feature = "lpg")]
2816            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2817                use grafeo_common::utils::error::{
2818                    Error as GrafeoError, QueryError, QueryErrorKind,
2819                };
2820                self.require_permission(crate::auth::StatementKind::Admin)?;
2821                if *self.read_only_tx.lock() {
2822                    return Err(GrafeoError::Query(QueryError::new(
2823                        QueryErrorKind::Semantic,
2824                        "Cannot execute schema DDL in a read-only transaction",
2825                    )));
2826                }
2827                return self.execute_schema_command(cmd);
2828            }
2829            #[cfg(not(feature = "lpg"))]
2830            cypher::CypherTranslationResult::SchemaCommand(_) => {
2831                return Err(grafeo_common::utils::error::Error::Internal(
2832                    "Schema DDL requires the `lpg` feature".to_string(),
2833                ));
2834            }
2835            cypher::CypherTranslationResult::ShowIndexes => {
2836                return self.execute_show_indexes();
2837            }
2838            cypher::CypherTranslationResult::ShowConstraints => {
2839                return self.execute_show_constraints();
2840            }
2841            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2842                return self.execute_show_current_graph_type();
2843            }
2844            cypher::CypherTranslationResult::Plan(_) => {
2845                // Fall through to normal execution below
2846            }
2847        }
2848
2849        #[cfg(not(target_arch = "wasm32"))]
2850        let start_time = std::time::Instant::now();
2851
2852        // Create cache key for this query
2853        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2854
2855        // Try to get cached optimized plan
2856        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2857            cached_plan
2858        } else {
2859            // Parse and translate the query to a logical plan
2860            let logical_plan = cypher::translate(query)?;
2861
2862            // Semantic validation
2863            let mut binder = Binder::new();
2864            let _binding_context = binder.bind(&logical_plan)?;
2865
2866            // Optimize the plan
2867            let active = self.active_store();
2868            let optimizer = Optimizer::from_graph_store(&*active);
2869            let plan = optimizer.optimize(logical_plan)?;
2870
2871            // Cache the optimized plan
2872            self.query_cache.put_optimized(cache_key, plan.clone());
2873
2874            plan
2875        };
2876
2877        // Check role-based permission for mutations
2878        if optimized_plan.root.has_mutations() {
2879            self.require_permission(crate::auth::StatementKind::Write)?;
2880        }
2881
2882        // Resolve the active store for query execution
2883        let active = self.active_store();
2884
2885        // EXPLAIN
2886        if optimized_plan.explain {
2887            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2888            let mut plan = optimized_plan;
2889            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2890            return Ok(explain_result(&plan));
2891        }
2892
2893        // PROFILE
2894        if optimized_plan.profile {
2895            let has_mutations = optimized_plan.root.has_mutations();
2896            return self.with_auto_commit(has_mutations, || {
2897                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2898                let planner = self.create_planner_for_store(
2899                    Arc::clone(&active),
2900                    viewing_epoch,
2901                    transaction_id,
2902                );
2903                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2904
2905                let executor = Executor::with_columns(physical_plan.columns.clone())
2906                    .with_deadline(self.query_deadline());
2907                let _result = executor.execute(physical_plan.operator.as_mut())?;
2908
2909                let total_time_ms;
2910                #[cfg(not(target_arch = "wasm32"))]
2911                {
2912                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2913                }
2914                #[cfg(target_arch = "wasm32")]
2915                {
2916                    total_time_ms = 0.0;
2917                }
2918
2919                let profile_tree = crate::query::profile::build_profile_tree(
2920                    &optimized_plan.root,
2921                    &mut entries.into_iter(),
2922                );
2923                Ok(crate::query::profile::profile_result(
2924                    &profile_tree,
2925                    total_time_ms,
2926                ))
2927            });
2928        }
2929
2930        let has_mutations = optimized_plan.root.has_mutations();
2931
2932        let result = self.with_auto_commit(has_mutations, || {
2933            // Get transaction context for MVCC visibility
2934            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2935
2936            // Convert to physical plan with transaction context
2937            let planner =
2938                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2939            let mut physical_plan = planner.plan(&optimized_plan)?;
2940
2941            // Execute the plan
2942            let executor = Executor::with_columns(physical_plan.columns.clone())
2943                .with_deadline(self.query_deadline());
2944            executor.execute(physical_plan.operator.as_mut())
2945        });
2946
2947        #[cfg(feature = "metrics")]
2948        {
2949            #[cfg(not(target_arch = "wasm32"))]
2950            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2951            #[cfg(target_arch = "wasm32")]
2952            let elapsed_ms = None;
2953            self.record_query_metrics("cypher", elapsed_ms, &result);
2954        }
2955
2956        result
2957    }
2958
2959    /// Executes a Gremlin query.
2960    ///
2961    /// # Errors
2962    ///
2963    /// Returns an error if the query fails to parse or execute.
2964    ///
2965    /// # Examples
2966    ///
2967    /// ```no_run
2968    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2969    /// use grafeo_engine::GrafeoDB;
2970    ///
2971    /// let db = GrafeoDB::new_in_memory();
2972    /// let session = db.session();
2973    ///
2974    /// // Create some nodes first
2975    /// session.create_node(&["Person"]);
2976    ///
2977    /// // Query using Gremlin
2978    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2979    /// # Ok(())
2980    /// # }
2981    /// ```
2982    #[cfg(feature = "gremlin")]
2983    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2984        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2985
2986        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2987        let start_time = Instant::now();
2988
2989        // Parse and translate the query to a logical plan
2990        let logical_plan = gremlin::translate(query)?;
2991
2992        // Semantic validation
2993        let mut binder = Binder::new();
2994        let _binding_context = binder.bind(&logical_plan)?;
2995
2996        // Optimize the plan
2997        let active = self.active_store();
2998        let optimizer = Optimizer::from_graph_store(&*active);
2999        let optimized_plan = optimizer.optimize(logical_plan)?;
3000
3001        let has_mutations = optimized_plan.root.has_mutations();
3002        if has_mutations {
3003            self.require_permission(crate::auth::StatementKind::Write)?;
3004        }
3005
3006        let result = self.with_auto_commit(has_mutations, || {
3007            // Get transaction context for MVCC visibility
3008            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3009
3010            // Convert to physical plan with transaction context
3011            let planner =
3012                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3013            let mut physical_plan = planner.plan(&optimized_plan)?;
3014
3015            // Execute the plan
3016            let executor = Executor::with_columns(physical_plan.columns.clone())
3017                .with_deadline(self.query_deadline());
3018            executor.execute(physical_plan.operator.as_mut())
3019        });
3020
3021        #[cfg(feature = "metrics")]
3022        {
3023            #[cfg(not(target_arch = "wasm32"))]
3024            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3025            #[cfg(target_arch = "wasm32")]
3026            let elapsed_ms = None;
3027            self.record_query_metrics("gremlin", elapsed_ms, &result);
3028        }
3029
3030        result
3031    }
3032
3033    /// Executes a Gremlin query with parameters.
3034    ///
3035    /// # Errors
3036    ///
3037    /// Returns an error if the query fails to parse or execute.
3038    #[cfg(feature = "gremlin")]
3039    pub fn execute_gremlin_with_params(
3040        &self,
3041        query: &str,
3042        params: std::collections::HashMap<String, Value>,
3043    ) -> Result<QueryResult> {
3044        use crate::query::{
3045            Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3046            translators::gremlin,
3047        };
3048
3049        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3050        let start_time = Instant::now();
3051
3052        // Parse and translate the query to a logical plan
3053        let mut logical_plan = gremlin::translate(query)?;
3054
3055        // Substitute parameters
3056        substitute_params(&mut logical_plan, &params)?;
3057
3058        // Semantic validation
3059        let mut binder = Binder::new();
3060        let _binding_context = binder.bind(&logical_plan)?;
3061
3062        // Optimize the plan
3063        let active = self.active_store();
3064        let optimizer = Optimizer::from_graph_store(&*active);
3065        let optimized_plan = optimizer.optimize(logical_plan)?;
3066
3067        let has_mutations = optimized_plan.root.has_mutations();
3068        if has_mutations {
3069            self.require_permission(crate::auth::StatementKind::Write)?;
3070        }
3071
3072        let result = self.with_auto_commit(has_mutations, || {
3073            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3074            let planner =
3075                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3076            let mut physical_plan = planner.plan(&optimized_plan)?;
3077            let executor = Executor::with_columns(physical_plan.columns.clone())
3078                .with_deadline(self.query_deadline());
3079            executor.execute(physical_plan.operator.as_mut())
3080        });
3081
3082        #[cfg(feature = "metrics")]
3083        {
3084            #[cfg(not(target_arch = "wasm32"))]
3085            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3086            #[cfg(target_arch = "wasm32")]
3087            let elapsed_ms = None;
3088            self.record_query_metrics("gremlin", elapsed_ms, &result);
3089        }
3090
3091        result
3092    }
3093
3094    /// Executes a GraphQL query against the LPG store.
3095    ///
3096    /// # Errors
3097    ///
3098    /// Returns an error if the query fails to parse or execute.
3099    ///
3100    /// # Examples
3101    ///
3102    /// ```no_run
3103    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3104    /// use grafeo_engine::GrafeoDB;
3105    ///
3106    /// let db = GrafeoDB::new_in_memory();
3107    /// let session = db.session();
3108    ///
3109    /// // Create some nodes first
3110    /// session.create_node(&["User"]);
3111    ///
3112    /// // Query using GraphQL
3113    /// let result = session.execute_graphql("query { user { id name } }")?;
3114    /// # Ok(())
3115    /// # }
3116    /// ```
3117    #[cfg(feature = "graphql")]
3118    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3119        use crate::query::{
3120            Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3121            translators::graphql,
3122        };
3123
3124        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3125        let start_time = Instant::now();
3126
3127        let mut logical_plan = graphql::translate(query)?;
3128
3129        // Substitute default parameter values from variable declarations
3130        if !logical_plan.default_params.is_empty() {
3131            let defaults = logical_plan.default_params.clone();
3132            substitute_params(&mut logical_plan, &defaults)?;
3133        }
3134
3135        let mut binder = Binder::new();
3136        let _binding_context = binder.bind(&logical_plan)?;
3137
3138        let active = self.active_store();
3139        let optimizer = Optimizer::from_graph_store(&*active);
3140        let optimized_plan = optimizer.optimize(logical_plan)?;
3141        let has_mutations = optimized_plan.root.has_mutations();
3142        if has_mutations {
3143            self.require_permission(crate::auth::StatementKind::Write)?;
3144        }
3145
3146        let result = self.with_auto_commit(has_mutations, || {
3147            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3148            let planner =
3149                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3150            let mut physical_plan = planner.plan(&optimized_plan)?;
3151            let executor = Executor::with_columns(physical_plan.columns.clone())
3152                .with_deadline(self.query_deadline());
3153            executor.execute(physical_plan.operator.as_mut())
3154        });
3155
3156        #[cfg(feature = "metrics")]
3157        {
3158            #[cfg(not(target_arch = "wasm32"))]
3159            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3160            #[cfg(target_arch = "wasm32")]
3161            let elapsed_ms = None;
3162            self.record_query_metrics("graphql", elapsed_ms, &result);
3163        }
3164
3165        result
3166    }
3167
3168    /// Executes a GraphQL query with parameters.
3169    ///
3170    /// # Errors
3171    ///
3172    /// Returns an error if the query fails to parse or execute.
3173    #[cfg(feature = "graphql")]
3174    pub fn execute_graphql_with_params(
3175        &self,
3176        query: &str,
3177        params: std::collections::HashMap<String, Value>,
3178    ) -> Result<QueryResult> {
3179        use crate::query::{
3180            Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3181            translators::graphql,
3182        };
3183
3184        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3185        let start_time = Instant::now();
3186
3187        // Parse and translate the query to a logical plan
3188        let mut logical_plan = graphql::translate(query)?;
3189
3190        // Merge default params with caller-supplied params
3191        if !logical_plan.default_params.is_empty() {
3192            let mut merged = logical_plan.default_params.clone();
3193            merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3194            substitute_params(&mut logical_plan, &merged)?;
3195        } else {
3196            substitute_params(&mut logical_plan, &params)?;
3197        }
3198
3199        // Semantic validation
3200        let mut binder = Binder::new();
3201        let _binding_context = binder.bind(&logical_plan)?;
3202
3203        // Optimize the plan
3204        let active = self.active_store();
3205        let optimizer = Optimizer::from_graph_store(&*active);
3206        let optimized_plan = optimizer.optimize(logical_plan)?;
3207
3208        let has_mutations = optimized_plan.root.has_mutations();
3209        if has_mutations {
3210            self.require_permission(crate::auth::StatementKind::Write)?;
3211        }
3212
3213        let result = self.with_auto_commit(has_mutations, || {
3214            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3215            let planner =
3216                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3217            let mut physical_plan = planner.plan(&optimized_plan)?;
3218            let executor = Executor::with_columns(physical_plan.columns.clone())
3219                .with_deadline(self.query_deadline());
3220            executor.execute(physical_plan.operator.as_mut())
3221        });
3222
3223        #[cfg(feature = "metrics")]
3224        {
3225            #[cfg(not(target_arch = "wasm32"))]
3226            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3227            #[cfg(target_arch = "wasm32")]
3228            let elapsed_ms = None;
3229            self.record_query_metrics("graphql", elapsed_ms, &result);
3230        }
3231
3232        result
3233    }
3234
3235    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
3236    ///
3237    /// # Errors
3238    ///
3239    /// Returns an error if the query fails to parse or execute.
3240    ///
3241    /// # Examples
3242    ///
3243    /// ```no_run
3244    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3245    /// use grafeo_engine::GrafeoDB;
3246    ///
3247    /// let db = GrafeoDB::new_in_memory();
3248    /// let session = db.session();
3249    ///
3250    /// let result = session.execute_sql(
3251    ///     "SELECT * FROM GRAPH_TABLE (
3252    ///         MATCH (n:Person)
3253    ///         COLUMNS (n.name AS name)
3254    ///     )"
3255    /// )?;
3256    /// # Ok(())
3257    /// # }
3258    /// ```
3259    #[cfg(feature = "sql-pgq")]
3260    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3261        use crate::query::{
3262            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3263            processor::QueryLanguage, translators::sql_pgq,
3264        };
3265
3266        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3267        let start_time = Instant::now();
3268
3269        // Parse and translate (always needed to check for DDL)
3270        let logical_plan = sql_pgq::translate(query)?;
3271
3272        // Handle DDL statements directly (they don't go through the query pipeline)
3273        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3274            self.require_permission(crate::auth::StatementKind::Admin)?;
3275            return Ok(QueryResult {
3276                columns: vec!["status".into()],
3277                column_types: vec![grafeo_common::types::LogicalType::String],
3278                rows: vec![vec![Value::from(format!(
3279                    "Property graph '{}' created ({} node tables, {} edge tables)",
3280                    cpg.name,
3281                    cpg.node_tables.len(),
3282                    cpg.edge_tables.len()
3283                ))]],
3284                execution_time_ms: None,
3285                rows_scanned: None,
3286                status_message: None,
3287                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3288            });
3289        }
3290
3291        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3292
3293        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3294            cached_plan
3295        } else {
3296            let mut binder = Binder::new();
3297            let _binding_context = binder.bind(&logical_plan)?;
3298            let active = self.active_store();
3299            let optimizer = Optimizer::from_graph_store(&*active);
3300            let plan = optimizer.optimize(logical_plan)?;
3301            self.query_cache.put_optimized(cache_key, plan.clone());
3302            plan
3303        };
3304
3305        let active = self.active_store();
3306        let has_mutations = optimized_plan.root.has_mutations();
3307        if has_mutations {
3308            self.require_permission(crate::auth::StatementKind::Write)?;
3309        }
3310
3311        let result = self.with_auto_commit(has_mutations, || {
3312            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3313            let planner =
3314                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3315            let mut physical_plan = planner.plan(&optimized_plan)?;
3316            let executor = Executor::with_columns(physical_plan.columns.clone())
3317                .with_deadline(self.query_deadline());
3318            executor.execute(physical_plan.operator.as_mut())
3319        });
3320
3321        #[cfg(feature = "metrics")]
3322        {
3323            #[cfg(not(target_arch = "wasm32"))]
3324            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3325            #[cfg(target_arch = "wasm32")]
3326            let elapsed_ms = None;
3327            self.record_query_metrics("sql", elapsed_ms, &result);
3328        }
3329
3330        result
3331    }
3332
3333    /// Executes a SQL/PGQ query with parameters.
3334    ///
3335    /// # Errors
3336    ///
3337    /// Returns an error if the query fails to parse or execute.
3338    #[cfg(feature = "sql-pgq")]
3339    pub fn execute_sql_with_params(
3340        &self,
3341        query: &str,
3342        params: std::collections::HashMap<String, Value>,
3343    ) -> Result<QueryResult> {
3344        use crate::query::processor::{QueryLanguage, QueryProcessor};
3345
3346        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3347        let start_time = Instant::now();
3348
3349        let has_mutations = if self.identity.can_write() {
3350            Self::query_looks_like_mutation(query)
3351        } else {
3352            use crate::query::translators::sql_pgq;
3353            match sql_pgq::translate(query) {
3354                Ok(plan) if plan.root.has_mutations() => {
3355                    self.require_permission(crate::auth::StatementKind::Write)?;
3356                    true
3357                }
3358                Ok(_) => false,
3359                Err(_) => Self::query_looks_like_mutation(query),
3360            }
3361        };
3362        if has_mutations {
3363            self.require_permission(crate::auth::StatementKind::Write)?;
3364        }
3365        let active = self.active_store();
3366
3367        let result = self.with_auto_commit(has_mutations, || {
3368            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3369            let processor = QueryProcessor::for_stores_with_transaction(
3370                Arc::clone(&active),
3371                self.active_write_store(),
3372                Arc::clone(&self.transaction_manager),
3373            )?;
3374            let processor = if let Some(transaction_id) = transaction_id {
3375                processor.with_transaction_context(viewing_epoch, transaction_id)
3376            } else {
3377                processor
3378            };
3379            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
3380        });
3381
3382        #[cfg(feature = "metrics")]
3383        {
3384            #[cfg(not(target_arch = "wasm32"))]
3385            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3386            #[cfg(target_arch = "wasm32")]
3387            let elapsed_ms = None;
3388            self.record_query_metrics("sql", elapsed_ms, &result);
3389        }
3390
3391        result
3392    }
3393
3394    /// Executes a query in the specified language by name.
3395    ///
3396    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
3397    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
3398    ///
3399    /// # Errors
3400    ///
3401    /// Returns an error if the language is unknown/disabled or the query fails.
3402    pub fn execute_language(
3403        &self,
3404        query: &str,
3405        language: &str,
3406        params: Option<std::collections::HashMap<String, Value>>,
3407    ) -> Result<QueryResult> {
3408        let _span = grafeo_info_span!(
3409            "grafeo::session::execute",
3410            language,
3411            query_len = query.len(),
3412        );
3413        match language {
3414            "gql" => {
3415                if let Some(p) = params {
3416                    self.execute_with_params(query, p)
3417                } else {
3418                    self.execute(query)
3419                }
3420            }
3421            #[cfg(feature = "cypher")]
3422            "cypher" => {
3423                if let Some(p) = params {
3424                    use crate::query::processor::{QueryLanguage, QueryProcessor};
3425
3426                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3427                    let start_time = Instant::now();
3428
3429                    let has_mutations = if self.identity.can_write() {
3430                        Self::query_looks_like_mutation(query)
3431                    } else {
3432                        use crate::query::translators::cypher;
3433                        match cypher::translate(query) {
3434                            Ok(plan) if plan.root.has_mutations() => {
3435                                self.require_permission(crate::auth::StatementKind::Write)?;
3436                                true
3437                            }
3438                            Ok(_) => false,
3439                            Err(_) => Self::query_looks_like_mutation(query),
3440                        }
3441                    };
3442                    let active = self.active_store();
3443                    let result = self.with_auto_commit(has_mutations, || {
3444                        let processor = QueryProcessor::for_stores_with_transaction(
3445                            Arc::clone(&active),
3446                            self.active_write_store(),
3447                            Arc::clone(&self.transaction_manager),
3448                        )?;
3449                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3450                        let processor = if let Some(transaction_id) = transaction_id {
3451                            processor.with_transaction_context(viewing_epoch, transaction_id)
3452                        } else {
3453                            processor
3454                        };
3455                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3456                    });
3457
3458                    #[cfg(feature = "metrics")]
3459                    {
3460                        #[cfg(not(target_arch = "wasm32"))]
3461                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3462                        #[cfg(target_arch = "wasm32")]
3463                        let elapsed_ms = None;
3464                        self.record_query_metrics("cypher", elapsed_ms, &result);
3465                    }
3466
3467                    result
3468                } else {
3469                    self.execute_cypher(query)
3470                }
3471            }
3472            #[cfg(feature = "gremlin")]
3473            "gremlin" => {
3474                if let Some(p) = params {
3475                    self.execute_gremlin_with_params(query, p)
3476                } else {
3477                    self.execute_gremlin(query)
3478                }
3479            }
3480            #[cfg(feature = "graphql")]
3481            "graphql" => {
3482                if let Some(p) = params {
3483                    self.execute_graphql_with_params(query, p)
3484                } else {
3485                    self.execute_graphql(query)
3486                }
3487            }
3488            #[cfg(all(feature = "graphql", feature = "triple-store"))]
3489            "graphql-rdf" => {
3490                if let Some(p) = params {
3491                    self.execute_graphql_rdf_with_params(query, p)
3492                } else {
3493                    self.execute_graphql_rdf(query)
3494                }
3495            }
3496            #[cfg(feature = "sql-pgq")]
3497            "sql" | "sql-pgq" => {
3498                if let Some(p) = params {
3499                    self.execute_sql_with_params(query, p)
3500                } else {
3501                    self.execute_sql(query)
3502                }
3503            }
3504            #[cfg(all(feature = "sparql", feature = "triple-store"))]
3505            "sparql" => {
3506                if let Some(p) = params {
3507                    self.execute_sparql_with_params(query, p)
3508                } else {
3509                    self.execute_sparql(query)
3510                }
3511            }
3512            other => Err(grafeo_common::utils::error::Error::Query(
3513                grafeo_common::utils::error::QueryError::new(
3514                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3515                    format!("Unknown query language: '{other}'"),
3516                ),
3517            )),
3518        }
3519    }
3520
3521    /// Begins a new transaction.
3522    ///
3523    /// # Errors
3524    ///
3525    /// Returns an error if a transaction is already active.
3526    ///
3527    /// # Examples
3528    ///
3529    /// ```no_run
3530    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3531    /// use grafeo_engine::GrafeoDB;
3532    ///
3533    /// let db = GrafeoDB::new_in_memory();
3534    /// let mut session = db.session();
3535    ///
3536    /// session.begin_transaction()?;
3537    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3538    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3539    /// session.commit()?; // Both inserts committed atomically
3540    /// # Ok(())
3541    /// # }
3542    /// ```
3543    /// Clears all cached query plans.
3544    ///
3545    /// The plan cache is shared across all sessions on the same database,
3546    /// so clearing from one session affects all sessions.
3547    pub fn clear_plan_cache(&self) {
3548        self.query_cache.clear();
3549    }
3550
3551    /// Begins a new transaction on this session.
3552    ///
3553    /// Uses the default isolation level (`SnapshotIsolation`).
3554    ///
3555    /// # Errors
3556    ///
3557    /// Returns an error if a transaction is already active.
3558    #[cfg(feature = "lpg")]
3559    pub fn begin_transaction(&mut self) -> Result<()> {
3560        self.begin_transaction_inner(false, None)
3561    }
3562
3563    /// Begins a transaction with a specific isolation level.
3564    ///
3565    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3566    ///
3567    /// # Errors
3568    ///
3569    /// Returns an error if a transaction is already active.
3570    #[cfg(feature = "lpg")]
3571    pub fn begin_transaction_with_isolation(
3572        &mut self,
3573        isolation_level: crate::transaction::IsolationLevel,
3574    ) -> Result<()> {
3575        self.begin_transaction_inner(false, Some(isolation_level))
3576    }
3577
3578    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3579    #[cfg(feature = "lpg")]
3580    fn begin_transaction_inner(
3581        &self,
3582        read_only: bool,
3583        isolation_level: Option<crate::transaction::IsolationLevel>,
3584    ) -> Result<()> {
3585        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3586        let mut current = self.current_transaction.lock();
3587        if current.is_some() {
3588            // Nested transaction: create an auto-savepoint instead of a new tx.
3589            drop(current);
3590            let mut depth = self.transaction_nesting_depth.lock();
3591            *depth += 1;
3592            let sp_name = format!("_nested_tx_{}", *depth);
3593            self.savepoint(&sp_name)?;
3594            return Ok(());
3595        }
3596
3597        let active = self.active_lpg_store();
3598        self.transaction_start_node_count
3599            .store(active.node_count(), Ordering::Relaxed);
3600        self.transaction_start_edge_count
3601            .store(active.edge_count(), Ordering::Relaxed);
3602        let transaction_id = if let Some(level) = isolation_level {
3603            self.transaction_manager.begin_with_isolation(level)
3604        } else {
3605            self.transaction_manager.begin()
3606        };
3607        *current = Some(transaction_id);
3608        *self.read_only_tx.lock() = read_only || self.db_read_only;
3609
3610        // Record the initial graph as "touched" for cross-graph atomicity.
3611        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3612        let key = self.active_graph_storage_key();
3613        let mut touched = self.touched_graphs.lock();
3614        touched.clear();
3615        touched.push(key);
3616
3617        #[cfg(feature = "metrics")]
3618        {
3619            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3620            #[cfg(not(target_arch = "wasm32"))]
3621            {
3622                *self.tx_start_time.lock() = Some(Instant::now());
3623            }
3624        }
3625
3626        Ok(())
3627    }
3628
3629    /// Commits the current transaction.
3630    ///
3631    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3632    ///
3633    /// # Errors
3634    ///
3635    /// Returns an error if no transaction is active.
3636    #[cfg(feature = "lpg")]
3637    pub fn commit(&mut self) -> Result<()> {
3638        self.commit_inner()
3639    }
3640
3641    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3642    #[cfg(feature = "lpg")]
3643    fn commit_inner(&self) -> Result<()> {
3644        let _span = grafeo_debug_span!("grafeo::tx::commit");
3645        // Nested transaction: release the auto-savepoint (changes are preserved).
3646        {
3647            let mut depth = self.transaction_nesting_depth.lock();
3648            if *depth > 0 {
3649                let sp_name = format!("_nested_tx_{depth}");
3650                *depth -= 1;
3651                drop(depth);
3652                return self.release_savepoint(&sp_name);
3653            }
3654        }
3655
3656        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3657            grafeo_common::utils::error::Error::Transaction(
3658                grafeo_common::utils::error::TransactionError::InvalidState(
3659                    "No active transaction".to_string(),
3660                ),
3661            )
3662        })?;
3663
3664        // Validate the transaction first (conflict detection) before committing data.
3665        // If this fails, we rollback the data changes instead of making them permanent.
3666        let touched = self.touched_graphs.lock().clone();
3667        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3668            Ok(epoch) => epoch,
3669            Err(e) => {
3670                // Conflict detected: rollback the data changes
3671                for graph_name in &touched {
3672                    let store = self.resolve_store(graph_name);
3673                    store.rollback_transaction_properties(transaction_id);
3674                }
3675                #[cfg(feature = "triple-store")]
3676                self.rollback_rdf_transaction(transaction_id);
3677                // Discard buffered CDC events on conflict rollback
3678                #[cfg(feature = "cdc")]
3679                if let Some(ref pending) = self.cdc_pending_events {
3680                    pending.lock().clear();
3681                }
3682                *self.read_only_tx.lock() = self.db_read_only;
3683                self.savepoints.lock().clear();
3684                self.touched_graphs.lock().clear();
3685                #[cfg(feature = "metrics")]
3686                {
3687                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3688                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3689                    #[cfg(not(target_arch = "wasm32"))]
3690                    if let Some(start) = self.tx_start_time.lock().take() {
3691                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3692                        crate::metrics::record_metric!(
3693                            self.metrics,
3694                            tx_duration,
3695                            observe duration_ms
3696                        );
3697                    }
3698                }
3699                return Err(e);
3700            }
3701        };
3702
3703        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3704        for graph_name in &touched {
3705            let store = self.resolve_store(graph_name);
3706            store.finalize_version_epochs(transaction_id, commit_epoch);
3707        }
3708
3709        // Commit succeeded: discard undo logs (make changes permanent)
3710        #[cfg(feature = "triple-store")]
3711        self.commit_rdf_transaction(transaction_id);
3712
3713        for graph_name in &touched {
3714            let store = self.resolve_store(graph_name);
3715            store.commit_transaction_properties(transaction_id);
3716        }
3717
3718        // Flush buffered CDC events now that the transaction is committed.
3719        // All buffered events have PENDING epoch; assign the real commit_epoch.
3720        // Uses record_batch to acquire the write lock once per commit.
3721        #[cfg(feature = "cdc")]
3722        if let Some(ref pending) = self.cdc_pending_events {
3723            let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3724            self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3725                e.epoch = commit_epoch;
3726                e
3727            }));
3728        }
3729
3730        // Log transaction commit and epoch advance to WAL so that crash
3731        // recovery can identify committed transactions and their epoch
3732        // boundaries. Without these markers, WAL recovery discards all
3733        // records as uncommitted (fixes #252 for the crash scenario).
3734        #[cfg(feature = "wal")]
3735        if let Some(ref wal) = self.wal {
3736            use grafeo_storage::wal::WalRecord;
3737            if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
3738                grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
3739            }
3740            if let Err(e) = wal.log(&WalRecord::EpochAdvance {
3741                epoch: commit_epoch,
3742            }) {
3743                grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
3744            }
3745        }
3746
3747        // Sync epoch for all touched graphs so that convenience lookups
3748        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3749        let current_epoch = self.transaction_manager.current_epoch();
3750        for graph_name in &touched {
3751            let store = self.resolve_store(graph_name);
3752            store.sync_epoch(current_epoch);
3753        }
3754
3755        // Reset read-only flag, clear savepoints and touched graphs
3756        *self.read_only_tx.lock() = self.db_read_only;
3757        self.savepoints.lock().clear();
3758        self.touched_graphs.lock().clear();
3759
3760        // Auto-GC: periodically prune old MVCC versions
3761        if self.gc_interval > 0 {
3762            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3763            if count.is_multiple_of(self.gc_interval) {
3764                let min_epoch = self.transaction_manager.min_active_epoch();
3765                for graph_name in &touched {
3766                    let store = self.resolve_store(graph_name);
3767                    store.gc_versions(min_epoch);
3768                }
3769                self.transaction_manager.gc();
3770                #[cfg(feature = "metrics")]
3771                crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3772            }
3773        }
3774
3775        #[cfg(feature = "metrics")]
3776        {
3777            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3778            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3779            #[cfg(not(target_arch = "wasm32"))]
3780            if let Some(start) = self.tx_start_time.lock().take() {
3781                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3782                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3783            }
3784        }
3785
3786        Ok(())
3787    }
3788
3789    /// Aborts the current transaction.
3790    ///
3791    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3792    ///
3793    /// # Errors
3794    ///
3795    /// Returns an error if no transaction is active.
3796    ///
3797    /// # Examples
3798    ///
3799    /// ```no_run
3800    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3801    /// use grafeo_engine::GrafeoDB;
3802    ///
3803    /// let db = GrafeoDB::new_in_memory();
3804    /// let mut session = db.session();
3805    ///
3806    /// session.begin_transaction()?;
3807    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3808    /// session.rollback()?; // Insert is discarded
3809    /// # Ok(())
3810    /// # }
3811    /// ```
3812    #[cfg(feature = "lpg")]
3813    pub fn rollback(&mut self) -> Result<()> {
3814        self.rollback_inner()
3815    }
3816
3817    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3818    #[cfg(feature = "lpg")]
3819    fn rollback_inner(&self) -> Result<()> {
3820        let _span = grafeo_debug_span!("grafeo::tx::rollback");
3821        // Nested transaction: rollback to the auto-savepoint.
3822        {
3823            let mut depth = self.transaction_nesting_depth.lock();
3824            if *depth > 0 {
3825                let sp_name = format!("_nested_tx_{depth}");
3826                *depth -= 1;
3827                drop(depth);
3828                return self.rollback_to_savepoint(&sp_name);
3829            }
3830        }
3831
3832        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3833            grafeo_common::utils::error::Error::Transaction(
3834                grafeo_common::utils::error::TransactionError::InvalidState(
3835                    "No active transaction".to_string(),
3836                ),
3837            )
3838        })?;
3839
3840        // Reset read-only flag
3841        *self.read_only_tx.lock() = self.db_read_only;
3842
3843        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3844        let touched = self.touched_graphs.lock().clone();
3845        for graph_name in &touched {
3846            let store = self.resolve_store(graph_name);
3847            store.discard_uncommitted_versions(transaction_id);
3848        }
3849
3850        // Discard pending operations in the RDF store
3851        #[cfg(feature = "triple-store")]
3852        self.rollback_rdf_transaction(transaction_id);
3853
3854        // Discard buffered CDC events on rollback
3855        #[cfg(feature = "cdc")]
3856        if let Some(ref pending) = self.cdc_pending_events {
3857            pending.lock().clear();
3858        }
3859
3860        // Clear savepoints and touched graphs
3861        self.savepoints.lock().clear();
3862        self.touched_graphs.lock().clear();
3863
3864        // Mark transaction as aborted in the manager
3865        let result = self.transaction_manager.abort(transaction_id);
3866
3867        #[cfg(feature = "metrics")]
3868        if result.is_ok() {
3869            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3870            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3871            #[cfg(not(target_arch = "wasm32"))]
3872            if let Some(start) = self.tx_start_time.lock().take() {
3873                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3874                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3875            }
3876        }
3877
3878        result
3879    }
3880
3881    /// Creates a named savepoint within the current transaction.
3882    ///
3883    /// The savepoint captures the current node/edge ID counters so that
3884    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3885    /// entities created after this point.
3886    ///
3887    /// # Errors
3888    ///
3889    /// Returns an error if no transaction is active.
3890    #[cfg(feature = "lpg")]
3891    pub fn savepoint(&self, name: &str) -> Result<()> {
3892        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3893            grafeo_common::utils::error::Error::Transaction(
3894                grafeo_common::utils::error::TransactionError::InvalidState(
3895                    "No active transaction".to_string(),
3896                ),
3897            )
3898        })?;
3899
3900        // Capture state for every graph touched so far.
3901        let touched = self.touched_graphs.lock().clone();
3902        let graph_snapshots: Vec<GraphSavepoint> = touched
3903            .iter()
3904            .map(|graph_name| {
3905                let store = self.resolve_store(graph_name);
3906                GraphSavepoint {
3907                    graph_name: graph_name.clone(),
3908                    next_node_id: store.peek_next_node_id(),
3909                    next_edge_id: store.peek_next_edge_id(),
3910                    undo_log_position: store.property_undo_log_position(tx_id),
3911                }
3912            })
3913            .collect();
3914
3915        self.savepoints.lock().push(SavepointState {
3916            name: name.to_string(),
3917            graph_snapshots,
3918            active_graph: self.current_graph.lock().clone(),
3919            #[cfg(feature = "cdc")]
3920            cdc_event_position: self
3921                .cdc_pending_events
3922                .as_ref()
3923                .map_or(0, |p| p.lock().len()),
3924        });
3925        Ok(())
3926    }
3927
3928    /// Rolls back to a named savepoint, undoing all writes made after it.
3929    ///
3930    /// The savepoint and any savepoints created after it are removed.
3931    /// Entities with IDs >= the savepoint snapshot are discarded.
3932    ///
3933    /// # Errors
3934    ///
3935    /// Returns an error if no transaction is active or the savepoint does not exist.
3936    #[cfg(feature = "lpg")]
3937    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3938        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3939            grafeo_common::utils::error::Error::Transaction(
3940                grafeo_common::utils::error::TransactionError::InvalidState(
3941                    "No active transaction".to_string(),
3942                ),
3943            )
3944        })?;
3945
3946        let mut savepoints = self.savepoints.lock();
3947
3948        // Find the savepoint by name (search from the end for nested savepoints)
3949        let pos = savepoints
3950            .iter()
3951            .rposition(|sp| sp.name == name)
3952            .ok_or_else(|| {
3953                grafeo_common::utils::error::Error::Transaction(
3954                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3955                        "Savepoint '{name}' not found"
3956                    )),
3957                )
3958            })?;
3959
3960        let sp_state = savepoints[pos].clone();
3961
3962        // Remove this savepoint and all later ones
3963        savepoints.truncate(pos);
3964        drop(savepoints);
3965
3966        // Roll back each graph that was captured in the savepoint.
3967        for gs in &sp_state.graph_snapshots {
3968            let store = self.resolve_store(&gs.graph_name);
3969
3970            // Replay property/label undo entries recorded after the savepoint
3971            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3972
3973            // Discard entities created after the savepoint
3974            let current_next_node = store.peek_next_node_id();
3975            let current_next_edge = store.peek_next_edge_id();
3976
3977            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3978                .map(NodeId::new)
3979                .collect();
3980            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3981                .map(EdgeId::new)
3982                .collect();
3983
3984            if !node_ids.is_empty() || !edge_ids.is_empty() {
3985                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3986            }
3987        }
3988
3989        // Also roll back any graphs that were touched AFTER the savepoint
3990        // but not captured in it. These need full discard since the savepoint
3991        // didn't include them.
3992        let touched = self.touched_graphs.lock().clone();
3993        for graph_name in &touched {
3994            let already_captured = sp_state
3995                .graph_snapshots
3996                .iter()
3997                .any(|gs| gs.graph_name == *graph_name);
3998            if !already_captured {
3999                let store = self.resolve_store(graph_name);
4000                store.discard_uncommitted_versions(transaction_id);
4001            }
4002        }
4003
4004        // Truncate CDC event buffer to the savepoint position.
4005        #[cfg(feature = "cdc")]
4006        if let Some(ref pending) = self.cdc_pending_events {
4007            pending.lock().truncate(sp_state.cdc_event_position);
4008        }
4009
4010        // Restore touched_graphs to only the graphs that were known at savepoint time.
4011        let mut touched = self.touched_graphs.lock();
4012        touched.clear();
4013        for gs in &sp_state.graph_snapshots {
4014            if !touched.contains(&gs.graph_name) {
4015                touched.push(gs.graph_name.clone());
4016            }
4017        }
4018
4019        Ok(())
4020    }
4021
4022    /// Releases (removes) a named savepoint without rolling back.
4023    ///
4024    /// # Errors
4025    ///
4026    /// Returns an error if no transaction is active or the savepoint does not exist.
4027    pub fn release_savepoint(&self, name: &str) -> Result<()> {
4028        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4029            grafeo_common::utils::error::Error::Transaction(
4030                grafeo_common::utils::error::TransactionError::InvalidState(
4031                    "No active transaction".to_string(),
4032                ),
4033            )
4034        })?;
4035
4036        let mut savepoints = self.savepoints.lock();
4037        let pos = savepoints
4038            .iter()
4039            .rposition(|sp| sp.name == name)
4040            .ok_or_else(|| {
4041                grafeo_common::utils::error::Error::Transaction(
4042                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
4043                        "Savepoint '{name}' not found"
4044                    )),
4045                )
4046            })?;
4047        savepoints.remove(pos);
4048        Ok(())
4049    }
4050
4051    /// Returns whether a transaction is active.
4052    #[must_use]
4053    pub fn in_transaction(&self) -> bool {
4054        self.current_transaction.lock().is_some()
4055    }
4056
4057    /// Returns the current transaction ID, if any.
4058    #[must_use]
4059    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4060        *self.current_transaction.lock()
4061    }
4062
4063    /// Returns a reference to the transaction manager.
4064    #[must_use]
4065    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4066        &self.transaction_manager
4067    }
4068
4069    /// Returns the store's current node count and the count at transaction start.
4070    #[cfg(feature = "lpg")]
4071    #[must_use]
4072    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4073        (
4074            self.transaction_start_node_count.load(Ordering::Relaxed),
4075            self.active_lpg_store().node_count(),
4076        )
4077    }
4078
4079    /// Returns the store's current edge count and the count at transaction start.
4080    #[cfg(feature = "lpg")]
4081    #[must_use]
4082    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4083        (
4084            self.transaction_start_edge_count.load(Ordering::Relaxed),
4085            self.active_lpg_store().edge_count(),
4086        )
4087    }
4088
4089    /// Prepares the current transaction for a two-phase commit.
4090    ///
4091    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
4092    /// lets you inspect pending changes and attach metadata before finalizing.
4093    /// The mutable borrow prevents concurrent operations while the commit is
4094    /// pending.
4095    ///
4096    /// If the `PreparedCommit` is dropped without calling `commit()` or
4097    /// `abort()`, the transaction is automatically rolled back.
4098    ///
4099    /// # Errors
4100    ///
4101    /// Returns an error if no transaction is active.
4102    ///
4103    /// # Examples
4104    ///
4105    /// ```no_run
4106    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
4107    /// use grafeo_engine::GrafeoDB;
4108    ///
4109    /// let db = GrafeoDB::new_in_memory();
4110    /// let mut session = db.session();
4111    ///
4112    /// session.begin_transaction()?;
4113    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
4114    ///
4115    /// let mut prepared = session.prepare_commit()?;
4116    /// println!("Nodes written: {}", prepared.info().nodes_written);
4117    /// prepared.set_metadata("audit_user", "admin");
4118    /// prepared.commit()?;
4119    /// # Ok(())
4120    /// # }
4121    /// ```
4122    #[cfg(feature = "lpg")]
4123    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4124        crate::transaction::PreparedCommit::new(self)
4125    }
4126
4127    /// Sets auto-commit mode.
4128    pub fn set_auto_commit(&mut self, auto_commit: bool) {
4129        self.auto_commit = auto_commit;
4130    }
4131
4132    /// Returns whether auto-commit is enabled.
4133    #[must_use]
4134    pub fn auto_commit(&self) -> bool {
4135        self.auto_commit
4136    }
4137
4138    /// Returns `true` if auto-commit should wrap this execution.
4139    ///
4140    /// Auto-commit kicks in when: the session is in auto-commit mode,
4141    /// no explicit transaction is active, and the query mutates data.
4142    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4143        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4144    }
4145
4146    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
4147    /// returns `true`. On error the transaction is rolled back.
4148    #[cfg(feature = "lpg")]
4149    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4150    where
4151        F: FnOnce() -> Result<QueryResult>,
4152    {
4153        if self.needs_auto_commit(has_mutations) {
4154            self.begin_transaction_inner(false, None)?;
4155            match body() {
4156                Ok(result) => {
4157                    self.commit_inner()?;
4158                    Ok(result)
4159                }
4160                Err(e) => {
4161                    let _ = self.rollback_inner();
4162                    Err(e)
4163                }
4164            }
4165        } else {
4166            body()
4167        }
4168    }
4169
4170    /// Non-LPG stub: no auto-commit wrapping (SPARQL UPDATE is atomic).
4171    #[cfg(not(feature = "lpg"))]
4172    fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4173    where
4174        F: FnOnce() -> Result<QueryResult>,
4175    {
4176        body()
4177    }
4178
4179    /// Quick heuristic: returns `true` when the query text looks like it
4180    /// performs a mutation. Used by `_with_params` paths that go through the
4181    /// `QueryProcessor` (where the logical plan isn't available before
4182    /// execution). False negatives are harmless: the data just won't be
4183    /// auto-committed, which matches the prior behaviour.
4184    fn query_looks_like_mutation(query: &str) -> bool {
4185        let upper = query.to_ascii_uppercase();
4186        upper.contains("INSERT")
4187            || upper.contains("CREATE")
4188            || upper.contains("DELETE")
4189            || upper.contains("MERGE")
4190            || upper.contains("SET")
4191            || upper.contains("REMOVE")
4192            || upper.contains("DROP")
4193            || upper.contains("ALTER")
4194    }
4195
4196    /// Computes the wall-clock deadline for query execution.
4197    #[must_use]
4198    fn query_deadline(&self) -> Option<Instant> {
4199        #[cfg(not(target_arch = "wasm32"))]
4200        {
4201            self.query_timeout.map(|d| Instant::now() + d)
4202        }
4203        #[cfg(target_arch = "wasm32")]
4204        {
4205            let _ = &self.query_timeout;
4206            None
4207        }
4208    }
4209
4210    /// Records query metrics for any language.
4211    ///
4212    /// Called after query execution to update counters, latency histogram,
4213    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
4214    /// where `Instant` is unavailable.
4215    #[cfg(feature = "metrics")]
4216    fn record_query_metrics(
4217        &self,
4218        language: &str,
4219        elapsed_ms: Option<f64>,
4220        result: &Result<crate::database::QueryResult>,
4221    ) {
4222        use crate::metrics::record_metric;
4223
4224        record_metric!(self.metrics, query_count, inc);
4225        if let Some(ref reg) = self.metrics {
4226            reg.query_count_by_language.increment(language);
4227        }
4228        if let Some(ms) = elapsed_ms {
4229            record_metric!(self.metrics, query_latency, observe ms);
4230        }
4231        match result {
4232            Ok(r) => {
4233                let returned = r.rows.len() as u64;
4234                record_metric!(self.metrics, rows_returned, add returned);
4235                if let Some(scanned) = r.rows_scanned {
4236                    record_metric!(self.metrics, rows_scanned, add scanned);
4237                }
4238            }
4239            Err(e) => {
4240                record_metric!(self.metrics, query_errors, inc);
4241                // Detect timeout errors
4242                let msg = e.to_string();
4243                if msg.contains("exceeded timeout") {
4244                    record_metric!(self.metrics, query_timeouts, inc);
4245                }
4246            }
4247        }
4248    }
4249
4250    /// Evaluates a simple integer literal from a session parameter expression.
4251    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4252        use grafeo_adapters::query::gql::ast::{Expression, Literal};
4253        match expr {
4254            Expression::Literal(Literal::Integer(n)) => Some(*n),
4255            _ => None,
4256        }
4257    }
4258
4259    /// Returns the current transaction context for MVCC visibility.
4260    ///
4261    /// Returns `(viewing_epoch, transaction_id)` where:
4262    /// - `viewing_epoch` is the epoch at which to check version visibility
4263    /// - `transaction_id` is the current transaction ID (if in a transaction)
4264    #[must_use]
4265    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4266        // Time-travel override takes precedence (read-only, no tx context)
4267        if let Some(epoch) = *self.viewing_epoch_override.lock() {
4268            return (epoch, None);
4269        }
4270
4271        if let Some(transaction_id) = *self.current_transaction.lock() {
4272            // In a transaction: use the transaction's start epoch
4273            let epoch = self
4274                .transaction_manager
4275                .start_epoch(transaction_id)
4276                .unwrap_or_else(|| self.transaction_manager.current_epoch());
4277            (epoch, Some(transaction_id))
4278        } else {
4279            // No transaction: use current epoch
4280            (self.transaction_manager.current_epoch(), None)
4281        }
4282    }
4283
4284    /// Creates a planner with transaction context and constraint validator.
4285    ///
4286    /// The `store` parameter is the graph store to plan against (use
4287    /// `self.active_store()` for graph-aware execution).
4288    fn create_planner_for_store(
4289        &self,
4290        store: Arc<dyn GraphStore>,
4291        viewing_epoch: EpochId,
4292        transaction_id: Option<TransactionId>,
4293    ) -> crate::query::Planner {
4294        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4295    }
4296
4297    fn create_planner_for_store_with_read_only(
4298        &self,
4299        store: Arc<dyn GraphStore>,
4300        viewing_epoch: EpochId,
4301        transaction_id: Option<TransactionId>,
4302        read_only: bool,
4303    ) -> crate::query::Planner {
4304        use crate::query::Planner;
4305        use grafeo_core::execution::operators::{LazyValue, SessionContext};
4306
4307        // Capture store reference for lazy introspection (only computed if info()/schema() called).
4308        let info_store = Arc::clone(&store);
4309        let schema_store = Arc::clone(&store);
4310
4311        let session_context = SessionContext {
4312            current_schema: self.current_schema(),
4313            current_graph: self.current_graph(),
4314            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4315            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4316        };
4317
4318        let write_store = self.active_write_store();
4319
4320        let mut planner = Planner::with_context(
4321            Arc::clone(&store),
4322            write_store,
4323            Arc::clone(&self.transaction_manager),
4324            transaction_id,
4325            viewing_epoch,
4326        )
4327        .with_factorized_execution(self.factorized_execution)
4328        .with_catalog(Arc::clone(&self.catalog))
4329        .with_session_context(session_context)
4330        .with_read_only(read_only);
4331
4332        // Attach the constraint validator for schema enforcement
4333        let validator =
4334            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
4335        planner = planner.with_validator(Arc::new(validator));
4336
4337        planner
4338    }
4339
4340    /// Builds a `Value::Map` for the `info()` introspection function.
4341    fn build_info_value(store: &dyn GraphStore) -> Value {
4342        use grafeo_common::types::PropertyKey;
4343        use std::collections::BTreeMap;
4344
4345        let mut map = BTreeMap::new();
4346        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4347        map.insert(
4348            PropertyKey::from("node_count"),
4349            Value::Int64(store.node_count() as i64),
4350        );
4351        map.insert(
4352            PropertyKey::from("edge_count"),
4353            Value::Int64(store.edge_count() as i64),
4354        );
4355        map.insert(
4356            PropertyKey::from("version"),
4357            Value::String(env!("CARGO_PKG_VERSION").into()),
4358        );
4359        Value::Map(map.into())
4360    }
4361
4362    /// Builds a `Value::Map` for the `schema()` introspection function.
4363    fn build_schema_value(store: &dyn GraphStore) -> Value {
4364        use grafeo_common::types::PropertyKey;
4365        use std::collections::BTreeMap;
4366
4367        let labels: Vec<Value> = store
4368            .all_labels()
4369            .into_iter()
4370            .map(|l| Value::String(l.into()))
4371            .collect();
4372        let edge_types: Vec<Value> = store
4373            .all_edge_types()
4374            .into_iter()
4375            .map(|t| Value::String(t.into()))
4376            .collect();
4377        let property_keys: Vec<Value> = store
4378            .all_property_keys()
4379            .into_iter()
4380            .map(|k| Value::String(k.into()))
4381            .collect();
4382
4383        let mut map = BTreeMap::new();
4384        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4385        map.insert(
4386            PropertyKey::from("edge_types"),
4387            Value::List(edge_types.into()),
4388        );
4389        map.insert(
4390            PropertyKey::from("property_keys"),
4391            Value::List(property_keys.into()),
4392        );
4393        Value::Map(map.into())
4394    }
4395
4396    /// Creates a node directly (bypassing query execution).
4397    ///
4398    /// This is a low-level API for testing and direct manipulation.
4399    /// If a transaction is active, the node will be versioned with the transaction ID.
4400    #[cfg(feature = "lpg")]
4401    pub fn create_node(&self, labels: &[&str]) -> NodeId {
4402        let (epoch, transaction_id) = self.get_transaction_context();
4403        self.active_lpg_store().create_node_versioned(
4404            labels,
4405            epoch,
4406            transaction_id.unwrap_or(TransactionId::SYSTEM),
4407        )
4408    }
4409
4410    /// Creates a node with properties.
4411    ///
4412    /// If a transaction is active, the node will be versioned with the transaction ID.
4413    #[cfg(feature = "lpg")]
4414    pub fn create_node_with_props<'a>(
4415        &self,
4416        labels: &[&str],
4417        properties: impl IntoIterator<Item = (&'a str, Value)>,
4418    ) -> NodeId {
4419        let (epoch, transaction_id) = self.get_transaction_context();
4420        self.active_lpg_store().create_node_with_props_versioned(
4421            labels,
4422            properties,
4423            epoch,
4424            transaction_id.unwrap_or(TransactionId::SYSTEM),
4425        )
4426    }
4427
4428    /// Creates an edge between two nodes.
4429    ///
4430    /// This is a low-level API for testing and direct manipulation.
4431    /// If a transaction is active, the edge will be versioned with the transaction ID.
4432    #[cfg(feature = "lpg")]
4433    pub fn create_edge(
4434        &self,
4435        src: NodeId,
4436        dst: NodeId,
4437        edge_type: &str,
4438    ) -> grafeo_common::types::EdgeId {
4439        let (epoch, transaction_id) = self.get_transaction_context();
4440        self.active_lpg_store().create_edge_versioned(
4441            src,
4442            dst,
4443            edge_type,
4444            epoch,
4445            transaction_id.unwrap_or(TransactionId::SYSTEM),
4446        )
4447    }
4448
4449    /// Creates an edge with properties within the active transaction context.
4450    #[cfg(feature = "lpg")]
4451    pub fn create_edge_with_props<'a>(
4452        &self,
4453        src: NodeId,
4454        dst: NodeId,
4455        edge_type: &str,
4456        properties: impl IntoIterator<Item = (&'a str, Value)>,
4457    ) -> grafeo_common::types::EdgeId {
4458        let (epoch, transaction_id) = self.get_transaction_context();
4459        let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4460        let store = self.active_lpg_store();
4461        let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4462        for (key, value) in properties {
4463            store.set_edge_property_versioned(eid, key, value, tid);
4464        }
4465        eid
4466    }
4467
4468    /// Sets a node property within the active transaction context.
4469    #[cfg(feature = "lpg")]
4470    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
4471        let (_, transaction_id) = self.get_transaction_context();
4472        if let Some(tid) = transaction_id {
4473            self.active_lpg_store()
4474                .set_node_property_versioned(id, key, value, tid);
4475        } else {
4476            self.active_lpg_store().set_node_property(id, key, value);
4477        }
4478    }
4479
4480    /// Sets an edge property within the active transaction context.
4481    #[cfg(feature = "lpg")]
4482    pub fn set_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str, value: Value) {
4483        let (_, transaction_id) = self.get_transaction_context();
4484        if let Some(tid) = transaction_id {
4485            self.active_lpg_store()
4486                .set_edge_property_versioned(id, key, value, tid);
4487        } else {
4488            self.active_lpg_store().set_edge_property(id, key, value);
4489        }
4490    }
4491
4492    /// Deletes a node within the active transaction context.
4493    #[cfg(feature = "lpg")]
4494    pub fn delete_node(&self, id: NodeId) -> bool {
4495        let (epoch, transaction_id) = self.get_transaction_context();
4496        if let Some(tid) = transaction_id {
4497            self.active_lpg_store()
4498                .delete_node_versioned(id, epoch, tid)
4499        } else {
4500            self.active_lpg_store().delete_node(id)
4501        }
4502    }
4503
4504    /// Deletes an edge within the active transaction context.
4505    #[cfg(feature = "lpg")]
4506    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4507        let (epoch, transaction_id) = self.get_transaction_context();
4508        if let Some(tid) = transaction_id {
4509            self.active_lpg_store()
4510                .delete_edge_versioned(id, epoch, tid)
4511        } else {
4512            self.active_lpg_store().delete_edge(id)
4513        }
4514    }
4515
4516    // =========================================================================
4517    // Direct Lookup APIs (bypass query planning for O(1) point reads)
4518    // =========================================================================
4519
4520    /// Gets a node by ID directly, bypassing query planning.
4521    ///
4522    /// This is the fastest way to retrieve a single node when you know its ID.
4523    /// Skips parsing, binding, optimization, and physical planning entirely.
4524    ///
4525    /// # Performance
4526    ///
4527    /// - Time complexity: O(1) average case
4528    /// - No lock contention (uses DashMap internally)
4529    /// - ~20-30x faster than equivalent MATCH query
4530    ///
4531    /// # Example
4532    ///
4533    /// ```no_run
4534    /// # use grafeo_engine::GrafeoDB;
4535    /// # let db = GrafeoDB::new_in_memory();
4536    /// let session = db.session();
4537    /// let node_id = session.create_node(&["Person"]);
4538    ///
4539    /// // Direct lookup - O(1), no query planning
4540    /// let node = session.get_node(node_id);
4541    /// assert!(node.is_some());
4542    /// ```
4543    #[cfg(feature = "lpg")]
4544    #[must_use]
4545    pub fn get_node(&self, id: NodeId) -> Option<Node> {
4546        let (epoch, transaction_id) = self.get_transaction_context();
4547        self.active_lpg_store().get_node_versioned(
4548            id,
4549            epoch,
4550            transaction_id.unwrap_or(TransactionId::SYSTEM),
4551        )
4552    }
4553
4554    /// Gets a single property from a node by ID, bypassing query planning.
4555    ///
4556    /// More efficient than `get_node()` when you only need one property,
4557    /// as it avoids loading the full node with all properties.
4558    ///
4559    /// # Performance
4560    ///
4561    /// - Time complexity: O(1) average case
4562    /// - No query planning overhead
4563    ///
4564    /// # Example
4565    ///
4566    /// ```no_run
4567    /// # use grafeo_engine::GrafeoDB;
4568    /// # use grafeo_common::types::Value;
4569    /// # let db = GrafeoDB::new_in_memory();
4570    /// let session = db.session();
4571    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
4572    ///
4573    /// // Direct property access - O(1)
4574    /// let name = session.get_node_property(id, "name");
4575    /// assert_eq!(name, Some(Value::String("Alix".into())));
4576    /// ```
4577    #[cfg(feature = "lpg")]
4578    #[must_use]
4579    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4580        self.get_node(id)
4581            .and_then(|node| node.get_property(key).cloned())
4582    }
4583
4584    /// Gets an edge by ID directly, bypassing query planning.
4585    ///
4586    /// # Performance
4587    ///
4588    /// - Time complexity: O(1) average case
4589    /// - No lock contention
4590    #[cfg(feature = "lpg")]
4591    #[must_use]
4592    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4593        let (epoch, transaction_id) = self.get_transaction_context();
4594        self.active_lpg_store().get_edge_versioned(
4595            id,
4596            epoch,
4597            transaction_id.unwrap_or(TransactionId::SYSTEM),
4598        )
4599    }
4600
4601    /// Gets outgoing neighbors of a node directly, bypassing query planning.
4602    ///
4603    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
4604    ///
4605    /// # Performance
4606    ///
4607    /// - Time complexity: O(degree) where degree is the number of outgoing edges
4608    /// - Uses adjacency index for direct access
4609    /// - ~10-20x faster than equivalent MATCH query
4610    ///
4611    /// # Example
4612    ///
4613    /// ```no_run
4614    /// # use grafeo_engine::GrafeoDB;
4615    /// # let db = GrafeoDB::new_in_memory();
4616    /// let session = db.session();
4617    /// let alix = session.create_node(&["Person"]);
4618    /// let gus = session.create_node(&["Person"]);
4619    /// session.create_edge(alix, gus, "KNOWS");
4620    ///
4621    /// // Direct neighbor lookup - O(degree)
4622    /// let neighbors = session.get_neighbors_outgoing(alix);
4623    /// assert_eq!(neighbors.len(), 1);
4624    /// assert_eq!(neighbors[0].0, gus);
4625    /// ```
4626    #[cfg(feature = "lpg")]
4627    #[must_use]
4628    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4629        self.active_lpg_store()
4630            .edges_from(node, Direction::Outgoing)
4631            .collect()
4632    }
4633
4634    /// Gets incoming neighbors of a node directly, bypassing query planning.
4635    ///
4636    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
4637    ///
4638    /// # Performance
4639    ///
4640    /// - Time complexity: O(degree) where degree is the number of incoming edges
4641    /// - Uses backward adjacency index for direct access
4642    #[cfg(feature = "lpg")]
4643    #[must_use]
4644    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4645        self.active_lpg_store()
4646            .edges_from(node, Direction::Incoming)
4647            .collect()
4648    }
4649
4650    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
4651    ///
4652    /// # Example
4653    ///
4654    /// ```no_run
4655    /// # use grafeo_engine::GrafeoDB;
4656    /// # let db = GrafeoDB::new_in_memory();
4657    /// # let session = db.session();
4658    /// # let alix = session.create_node(&["Person"]);
4659    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4660    /// ```
4661    #[cfg(feature = "lpg")]
4662    #[must_use]
4663    pub fn get_neighbors_outgoing_by_type(
4664        &self,
4665        node: NodeId,
4666        edge_type: &str,
4667    ) -> Vec<(NodeId, EdgeId)> {
4668        self.active_lpg_store()
4669            .edges_from(node, Direction::Outgoing)
4670            .filter(|(_, edge_id)| {
4671                self.get_edge(*edge_id)
4672                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
4673            })
4674            .collect()
4675    }
4676
4677    /// Checks if a node exists, bypassing query planning.
4678    ///
4679    /// # Performance
4680    ///
4681    /// - Time complexity: O(1)
4682    /// - Fastest existence check available
4683    #[cfg(feature = "lpg")]
4684    #[must_use]
4685    pub fn node_exists(&self, id: NodeId) -> bool {
4686        self.get_node(id).is_some()
4687    }
4688
4689    /// Checks if an edge exists, bypassing query planning.
4690    #[cfg(feature = "lpg")]
4691    #[must_use]
4692    pub fn edge_exists(&self, id: EdgeId) -> bool {
4693        self.get_edge(id).is_some()
4694    }
4695
4696    /// Gets the degree (number of edges) of a node.
4697    ///
4698    /// Returns (outgoing_degree, incoming_degree).
4699    #[cfg(feature = "lpg")]
4700    #[must_use]
4701    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4702        let active = self.active_lpg_store();
4703        let out = active.out_degree(node);
4704        let in_degree = active.in_degree(node);
4705        (out, in_degree)
4706    }
4707
4708    /// Batch lookup of multiple nodes by ID.
4709    ///
4710    /// More efficient than calling `get_node()` in a loop because it
4711    /// amortizes overhead.
4712    ///
4713    /// # Performance
4714    ///
4715    /// - Time complexity: O(n) where n is the number of IDs
4716    /// - Better cache utilization than individual lookups
4717    #[cfg(feature = "lpg")]
4718    #[must_use]
4719    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4720        let (epoch, transaction_id) = self.get_transaction_context();
4721        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4722        let active = self.active_lpg_store();
4723        ids.iter()
4724            .map(|&id| active.get_node_versioned(id, epoch, tx))
4725            .collect()
4726    }
4727
4728    // ── Change Data Capture ─────────────────────────────────────────────
4729
4730    /// Returns the full change history for an entity (node or edge).
4731    ///
4732    /// # Errors
4733    ///
4734    /// Currently infallible, but returns `Result` for forward compatibility.
4735    #[cfg(feature = "cdc")]
4736    pub fn history(
4737        &self,
4738        entity_id: impl Into<crate::cdc::EntityId>,
4739    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4740        Ok(self.cdc_log.history(entity_id.into()))
4741    }
4742
4743    /// Returns change events for an entity since the given epoch.
4744    ///
4745    /// # Errors
4746    ///
4747    /// Currently infallible, but returns `Result` for forward compatibility.
4748    #[cfg(feature = "cdc")]
4749    pub fn history_since(
4750        &self,
4751        entity_id: impl Into<crate::cdc::EntityId>,
4752        since_epoch: EpochId,
4753    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4754        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4755    }
4756
4757    /// Returns all change events across all entities in an epoch range.
4758    ///
4759    /// # Errors
4760    ///
4761    /// Currently infallible, but returns `Result` for forward compatibility.
4762    #[cfg(feature = "cdc")]
4763    pub fn changes_between(
4764        &self,
4765        start_epoch: EpochId,
4766        end_epoch: EpochId,
4767    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4768        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4769    }
4770}
4771
4772impl Drop for Session {
4773    fn drop(&mut self) {
4774        // Auto-rollback any active transaction to prevent leaked MVCC state,
4775        // dangling write locks, and uncommitted versions lingering in the store.
4776        #[cfg(feature = "lpg")]
4777        if self.in_transaction() {
4778            let _ = self.rollback_inner();
4779        }
4780
4781        #[cfg(feature = "metrics")]
4782        if let Some(ref reg) = self.metrics {
4783            reg.session_active
4784                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4785        }
4786    }
4787}
4788
4789#[cfg(test)]
4790mod tests {
4791    use super::parse_default_literal;
4792    use crate::database::GrafeoDB;
4793    use grafeo_common::types::Value;
4794
4795    // -----------------------------------------------------------------------
4796    // parse_default_literal
4797    // -----------------------------------------------------------------------
4798
4799    #[test]
4800    fn parse_default_literal_null() {
4801        assert_eq!(parse_default_literal("null"), Value::Null);
4802        assert_eq!(parse_default_literal("NULL"), Value::Null);
4803        assert_eq!(parse_default_literal("Null"), Value::Null);
4804    }
4805
4806    #[test]
4807    fn parse_default_literal_bool() {
4808        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4809        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4810        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4811        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4812    }
4813
4814    #[test]
4815    fn parse_default_literal_string_single_quoted() {
4816        assert_eq!(
4817            parse_default_literal("'hello'"),
4818            Value::String("hello".into())
4819        );
4820    }
4821
4822    #[test]
4823    fn parse_default_literal_string_double_quoted() {
4824        assert_eq!(
4825            parse_default_literal("\"world\""),
4826            Value::String("world".into())
4827        );
4828    }
4829
4830    #[test]
4831    fn parse_default_literal_integer() {
4832        assert_eq!(parse_default_literal("42"), Value::Int64(42));
4833        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4834        assert_eq!(parse_default_literal("0"), Value::Int64(0));
4835    }
4836
4837    #[test]
4838    fn parse_default_literal_float() {
4839        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4840        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4841    }
4842
4843    #[test]
4844    fn parse_default_literal_fallback_string() {
4845        // Not a recognized literal, not quoted, not a number
4846        assert_eq!(
4847            parse_default_literal("some_identifier"),
4848            Value::String("some_identifier".into())
4849        );
4850    }
4851
4852    #[test]
4853    fn test_session_create_node() {
4854        let db = GrafeoDB::new_in_memory();
4855        let session = db.session();
4856
4857        let id = session.create_node(&["Person"]);
4858        assert!(id.is_valid());
4859        assert_eq!(db.node_count(), 1);
4860    }
4861
4862    #[test]
4863    fn test_session_transaction() {
4864        let db = GrafeoDB::new_in_memory();
4865        let mut session = db.session();
4866
4867        assert!(!session.in_transaction());
4868
4869        session.begin_transaction().unwrap();
4870        assert!(session.in_transaction());
4871
4872        session.commit().unwrap();
4873        assert!(!session.in_transaction());
4874    }
4875
4876    #[test]
4877    fn test_session_transaction_context() {
4878        let db = GrafeoDB::new_in_memory();
4879        let mut session = db.session();
4880
4881        // Without transaction - context should have current epoch and no transaction_id
4882        let (_epoch1, transaction_id1) = session.get_transaction_context();
4883        assert!(transaction_id1.is_none());
4884
4885        // Start a transaction
4886        session.begin_transaction().unwrap();
4887        let (epoch2, transaction_id2) = session.get_transaction_context();
4888        assert!(transaction_id2.is_some());
4889        // Transaction should have a valid epoch
4890        let _ = epoch2; // Use the variable
4891
4892        // Commit and verify
4893        session.commit().unwrap();
4894        let (epoch3, tx_id3) = session.get_transaction_context();
4895        assert!(tx_id3.is_none());
4896        // Epoch should have advanced after commit
4897        assert!(epoch3.as_u64() >= epoch2.as_u64());
4898    }
4899
4900    #[test]
4901    fn test_session_rollback() {
4902        let db = GrafeoDB::new_in_memory();
4903        let mut session = db.session();
4904
4905        session.begin_transaction().unwrap();
4906        session.rollback().unwrap();
4907        assert!(!session.in_transaction());
4908    }
4909
4910    #[test]
4911    fn test_session_rollback_discards_versions() {
4912        use grafeo_common::types::TransactionId;
4913
4914        let db = GrafeoDB::new_in_memory();
4915
4916        // Create a node outside of any transaction (at system level)
4917        let node_before = db.store().create_node(&["Person"]);
4918        assert!(node_before.is_valid());
4919        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4920
4921        // Start a transaction
4922        let mut session = db.session();
4923        session.begin_transaction().unwrap();
4924        let transaction_id = session.current_transaction.lock().unwrap();
4925
4926        // Create a node versioned with the transaction's ID
4927        let epoch = db.store().current_epoch();
4928        let node_in_tx = db
4929            .store()
4930            .create_node_versioned(&["Person"], epoch, transaction_id);
4931        assert!(node_in_tx.is_valid());
4932
4933        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4934        // non-versioned lookups like node_count(). Verify the node is visible
4935        // only through the owning transaction.
4936        assert_eq!(
4937            db.node_count(),
4938            1,
4939            "PENDING nodes should be invisible to non-versioned node_count()"
4940        );
4941        assert!(
4942            db.store()
4943                .get_node_versioned(node_in_tx, epoch, transaction_id)
4944                .is_some(),
4945            "Transaction node should be visible to its own transaction"
4946        );
4947
4948        // Rollback the transaction
4949        session.rollback().unwrap();
4950        assert!(!session.in_transaction());
4951
4952        // The node created in the transaction should be discarded
4953        // Only the first node should remain visible
4954        let count_after = db.node_count();
4955        assert_eq!(
4956            count_after, 1,
4957            "Rollback should discard uncommitted node, but got {count_after}"
4958        );
4959
4960        // The original node should still be accessible
4961        let current_epoch = db.store().current_epoch();
4962        assert!(
4963            db.store()
4964                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4965                .is_some(),
4966            "Original node should still exist"
4967        );
4968
4969        // The node created in the transaction should not be accessible
4970        assert!(
4971            db.store()
4972                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4973                .is_none(),
4974            "Transaction node should be gone"
4975        );
4976    }
4977
4978    #[test]
4979    fn test_session_create_node_in_transaction() {
4980        // Test that session.create_node() is transaction-aware
4981        let db = GrafeoDB::new_in_memory();
4982
4983        // Create a node outside of any transaction
4984        let node_before = db.create_node(&["Person"]);
4985        assert!(node_before.is_valid());
4986        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4987
4988        // Start a transaction and create a node through the session
4989        let mut session = db.session();
4990        session.begin_transaction().unwrap();
4991        let transaction_id = session.current_transaction.lock().unwrap();
4992
4993        // Create a node through session.create_node() - should be versioned with tx
4994        let node_in_tx = session.create_node(&["Person"]);
4995        assert!(node_in_tx.is_valid());
4996
4997        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4998        // non-versioned lookups. Verify the node is visible only to its own tx.
4999        assert_eq!(
5000            db.node_count(),
5001            1,
5002            "PENDING nodes should be invisible to non-versioned node_count()"
5003        );
5004        let epoch = db.store().current_epoch();
5005        assert!(
5006            db.store()
5007                .get_node_versioned(node_in_tx, epoch, transaction_id)
5008                .is_some(),
5009            "Transaction node should be visible to its own transaction"
5010        );
5011
5012        // Rollback the transaction
5013        session.rollback().unwrap();
5014
5015        // The node created via session.create_node() should be discarded
5016        let count_after = db.node_count();
5017        assert_eq!(
5018            count_after, 1,
5019            "Rollback should discard node created via session.create_node(), but got {count_after}"
5020        );
5021    }
5022
5023    #[test]
5024    fn test_session_create_node_with_props_in_transaction() {
5025        use grafeo_common::types::Value;
5026
5027        // Test that session.create_node_with_props() is transaction-aware
5028        let db = GrafeoDB::new_in_memory();
5029
5030        // Create a node outside of any transaction
5031        db.create_node(&["Person"]);
5032        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5033
5034        // Start a transaction and create a node with properties
5035        let mut session = db.session();
5036        session.begin_transaction().unwrap();
5037        let transaction_id = session.current_transaction.lock().unwrap();
5038
5039        let node_in_tx =
5040            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5041        assert!(node_in_tx.is_valid());
5042
5043        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5044        // non-versioned lookups. Verify the node is visible only to its own tx.
5045        assert_eq!(
5046            db.node_count(),
5047            1,
5048            "PENDING nodes should be invisible to non-versioned node_count()"
5049        );
5050        let epoch = db.store().current_epoch();
5051        assert!(
5052            db.store()
5053                .get_node_versioned(node_in_tx, epoch, transaction_id)
5054                .is_some(),
5055            "Transaction node should be visible to its own transaction"
5056        );
5057
5058        // Rollback the transaction
5059        session.rollback().unwrap();
5060
5061        // The node should be discarded
5062        let count_after = db.node_count();
5063        assert_eq!(
5064            count_after, 1,
5065            "Rollback should discard node created via session.create_node_with_props()"
5066        );
5067    }
5068
5069    #[cfg(feature = "gql")]
5070    mod gql_tests {
5071        use super::*;
5072
5073        #[test]
5074        fn test_gql_query_execution() {
5075            let db = GrafeoDB::new_in_memory();
5076            let session = db.session();
5077
5078            // Create some test data
5079            session.create_node(&["Person"]);
5080            session.create_node(&["Person"]);
5081            session.create_node(&["Animal"]);
5082
5083            // Execute a GQL query
5084            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5085
5086            // Should return 2 Person nodes
5087            assert_eq!(result.row_count(), 2);
5088            assert_eq!(result.column_count(), 1);
5089            assert_eq!(result.columns[0], "n");
5090        }
5091
5092        #[test]
5093        fn test_gql_empty_result() {
5094            let db = GrafeoDB::new_in_memory();
5095            let session = db.session();
5096
5097            // No data in database
5098            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5099
5100            assert_eq!(result.row_count(), 0);
5101        }
5102
5103        #[test]
5104        fn test_gql_parse_error() {
5105            let db = GrafeoDB::new_in_memory();
5106            let session = db.session();
5107
5108            // Invalid GQL syntax
5109            let result = session.execute("MATCH (n RETURN n");
5110
5111            assert!(result.is_err());
5112        }
5113
5114        #[test]
5115        fn test_gql_relationship_traversal() {
5116            let db = GrafeoDB::new_in_memory();
5117            let session = db.session();
5118
5119            // Create a graph: Alix -> Gus, Alix -> Vincent
5120            let alix = session.create_node(&["Person"]);
5121            let gus = session.create_node(&["Person"]);
5122            let vincent = session.create_node(&["Person"]);
5123
5124            session.create_edge(alix, gus, "KNOWS");
5125            session.create_edge(alix, vincent, "KNOWS");
5126
5127            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
5128            let result = session
5129                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5130                .unwrap();
5131
5132            // Should return 2 rows (Alix->Gus, Alix->Vincent)
5133            assert_eq!(result.row_count(), 2);
5134            assert_eq!(result.column_count(), 2);
5135            assert_eq!(result.columns[0], "a");
5136            assert_eq!(result.columns[1], "b");
5137        }
5138
5139        #[test]
5140        fn test_gql_relationship_with_type_filter() {
5141            let db = GrafeoDB::new_in_memory();
5142            let session = db.session();
5143
5144            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
5145            let alix = session.create_node(&["Person"]);
5146            let gus = session.create_node(&["Person"]);
5147            let vincent = session.create_node(&["Person"]);
5148
5149            session.create_edge(alix, gus, "KNOWS");
5150            session.create_edge(alix, vincent, "WORKS_WITH");
5151
5152            // Query only KNOWS relationships
5153            let result = session
5154                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5155                .unwrap();
5156
5157            // Should return only 1 row (Alix->Gus)
5158            assert_eq!(result.row_count(), 1);
5159        }
5160
5161        #[test]
5162        fn test_gql_semantic_error_undefined_variable() {
5163            let db = GrafeoDB::new_in_memory();
5164            let session = db.session();
5165
5166            // Reference undefined variable 'x' in RETURN
5167            let result = session.execute("MATCH (n:Person) RETURN x");
5168
5169            // Should fail with semantic error
5170            assert!(result.is_err());
5171            let Err(err) = result else {
5172                panic!("Expected error")
5173            };
5174            assert!(
5175                err.to_string().contains("Undefined variable"),
5176                "Expected undefined variable error, got: {}",
5177                err
5178            );
5179        }
5180
5181        #[test]
5182        fn test_gql_where_clause_property_filter() {
5183            use grafeo_common::types::Value;
5184
5185            let db = GrafeoDB::new_in_memory();
5186            let session = db.session();
5187
5188            // Create people with ages
5189            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
5190            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
5191            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
5192
5193            // Query with WHERE clause: age > 30
5194            let result = session
5195                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5196                .unwrap();
5197
5198            // Should return 2 people (ages 35 and 45)
5199            assert_eq!(result.row_count(), 2);
5200        }
5201
5202        #[test]
5203        fn test_gql_where_clause_equality() {
5204            use grafeo_common::types::Value;
5205
5206            let db = GrafeoDB::new_in_memory();
5207            let session = db.session();
5208
5209            // Create people with names
5210            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5211            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
5212            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5213
5214            // Query with WHERE clause: name = "Alix"
5215            let result = session
5216                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5217                .unwrap();
5218
5219            // Should return 2 people named Alix
5220            assert_eq!(result.row_count(), 2);
5221        }
5222
5223        #[test]
5224        fn test_gql_return_property_access() {
5225            use grafeo_common::types::Value;
5226
5227            let db = GrafeoDB::new_in_memory();
5228            let session = db.session();
5229
5230            // Create people with names and ages
5231            session.create_node_with_props(
5232                &["Person"],
5233                [
5234                    ("name", Value::String("Alix".into())),
5235                    ("age", Value::Int64(30)),
5236                ],
5237            );
5238            session.create_node_with_props(
5239                &["Person"],
5240                [
5241                    ("name", Value::String("Gus".into())),
5242                    ("age", Value::Int64(25)),
5243                ],
5244            );
5245
5246            // Query returning properties
5247            let result = session
5248                .execute("MATCH (n:Person) RETURN n.name, n.age")
5249                .unwrap();
5250
5251            // Should return 2 rows with name and age columns
5252            assert_eq!(result.row_count(), 2);
5253            assert_eq!(result.column_count(), 2);
5254            assert_eq!(result.columns[0], "n.name");
5255            assert_eq!(result.columns[1], "n.age");
5256
5257            // Check that we get actual values
5258            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5259            assert!(names.contains(&&Value::String("Alix".into())));
5260            assert!(names.contains(&&Value::String("Gus".into())));
5261        }
5262
5263        #[test]
5264        fn test_gql_return_mixed_expressions() {
5265            use grafeo_common::types::Value;
5266
5267            let db = GrafeoDB::new_in_memory();
5268            let session = db.session();
5269
5270            // Create a person
5271            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5272
5273            // Query returning both node and property
5274            let result = session
5275                .execute("MATCH (n:Person) RETURN n, n.name")
5276                .unwrap();
5277
5278            assert_eq!(result.row_count(), 1);
5279            assert_eq!(result.column_count(), 2);
5280            assert_eq!(result.columns[0], "n");
5281            assert_eq!(result.columns[1], "n.name");
5282
5283            // Second column should be the name
5284            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5285        }
5286    }
5287
5288    #[cfg(feature = "cypher")]
5289    mod cypher_tests {
5290        use super::*;
5291
5292        #[test]
5293        fn test_cypher_query_execution() {
5294            let db = GrafeoDB::new_in_memory();
5295            let session = db.session();
5296
5297            // Create some test data
5298            session.create_node(&["Person"]);
5299            session.create_node(&["Person"]);
5300            session.create_node(&["Animal"]);
5301
5302            // Execute a Cypher query
5303            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5304
5305            // Should return 2 Person nodes
5306            assert_eq!(result.row_count(), 2);
5307            assert_eq!(result.column_count(), 1);
5308            assert_eq!(result.columns[0], "n");
5309        }
5310
5311        #[test]
5312        fn test_cypher_empty_result() {
5313            let db = GrafeoDB::new_in_memory();
5314            let session = db.session();
5315
5316            // No data in database
5317            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5318
5319            assert_eq!(result.row_count(), 0);
5320        }
5321
5322        #[test]
5323        fn test_cypher_parse_error() {
5324            let db = GrafeoDB::new_in_memory();
5325            let session = db.session();
5326
5327            // Invalid Cypher syntax
5328            let result = session.execute_cypher("MATCH (n RETURN n");
5329
5330            assert!(result.is_err());
5331        }
5332    }
5333
5334    // ==================== Direct Lookup API Tests ====================
5335
5336    mod direct_lookup_tests {
5337        use super::*;
5338        use grafeo_common::types::Value;
5339
5340        #[test]
5341        fn test_get_node() {
5342            let db = GrafeoDB::new_in_memory();
5343            let session = db.session();
5344
5345            let id = session.create_node(&["Person"]);
5346            let node = session.get_node(id);
5347
5348            assert!(node.is_some());
5349            let node = node.unwrap();
5350            assert_eq!(node.id, id);
5351        }
5352
5353        #[test]
5354        fn test_get_node_not_found() {
5355            use grafeo_common::types::NodeId;
5356
5357            let db = GrafeoDB::new_in_memory();
5358            let session = db.session();
5359
5360            // Try to get a non-existent node
5361            let node = session.get_node(NodeId::new(9999));
5362            assert!(node.is_none());
5363        }
5364
5365        #[test]
5366        fn test_get_node_property() {
5367            let db = GrafeoDB::new_in_memory();
5368            let session = db.session();
5369
5370            let id = session
5371                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5372
5373            let name = session.get_node_property(id, "name");
5374            assert_eq!(name, Some(Value::String("Alix".into())));
5375
5376            // Non-existent property
5377            let missing = session.get_node_property(id, "missing");
5378            assert!(missing.is_none());
5379        }
5380
5381        #[test]
5382        fn test_get_edge() {
5383            let db = GrafeoDB::new_in_memory();
5384            let session = db.session();
5385
5386            let alix = session.create_node(&["Person"]);
5387            let gus = session.create_node(&["Person"]);
5388            let edge_id = session.create_edge(alix, gus, "KNOWS");
5389
5390            let edge = session.get_edge(edge_id);
5391            assert!(edge.is_some());
5392            let edge = edge.unwrap();
5393            assert_eq!(edge.id, edge_id);
5394            assert_eq!(edge.src, alix);
5395            assert_eq!(edge.dst, gus);
5396        }
5397
5398        #[test]
5399        fn test_get_edge_not_found() {
5400            use grafeo_common::types::EdgeId;
5401
5402            let db = GrafeoDB::new_in_memory();
5403            let session = db.session();
5404
5405            let edge = session.get_edge(EdgeId::new(9999));
5406            assert!(edge.is_none());
5407        }
5408
5409        #[test]
5410        fn test_get_neighbors_outgoing() {
5411            let db = GrafeoDB::new_in_memory();
5412            let session = db.session();
5413
5414            let alix = session.create_node(&["Person"]);
5415            let gus = session.create_node(&["Person"]);
5416            let harm = session.create_node(&["Person"]);
5417
5418            session.create_edge(alix, gus, "KNOWS");
5419            session.create_edge(alix, harm, "KNOWS");
5420
5421            let neighbors = session.get_neighbors_outgoing(alix);
5422            assert_eq!(neighbors.len(), 2);
5423
5424            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5425            assert!(neighbor_ids.contains(&gus));
5426            assert!(neighbor_ids.contains(&harm));
5427        }
5428
5429        #[test]
5430        fn test_get_neighbors_incoming() {
5431            let db = GrafeoDB::new_in_memory();
5432            let session = db.session();
5433
5434            let alix = session.create_node(&["Person"]);
5435            let gus = session.create_node(&["Person"]);
5436            let harm = session.create_node(&["Person"]);
5437
5438            session.create_edge(gus, alix, "KNOWS");
5439            session.create_edge(harm, alix, "KNOWS");
5440
5441            let neighbors = session.get_neighbors_incoming(alix);
5442            assert_eq!(neighbors.len(), 2);
5443
5444            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5445            assert!(neighbor_ids.contains(&gus));
5446            assert!(neighbor_ids.contains(&harm));
5447        }
5448
5449        #[test]
5450        fn test_get_neighbors_outgoing_by_type() {
5451            let db = GrafeoDB::new_in_memory();
5452            let session = db.session();
5453
5454            let alix = session.create_node(&["Person"]);
5455            let gus = session.create_node(&["Person"]);
5456            let company = session.create_node(&["Company"]);
5457
5458            session.create_edge(alix, gus, "KNOWS");
5459            session.create_edge(alix, company, "WORKS_AT");
5460
5461            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5462            assert_eq!(knows_neighbors.len(), 1);
5463            assert_eq!(knows_neighbors[0].0, gus);
5464
5465            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5466            assert_eq!(works_neighbors.len(), 1);
5467            assert_eq!(works_neighbors[0].0, company);
5468
5469            // No edges of this type
5470            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5471            assert!(no_neighbors.is_empty());
5472        }
5473
5474        #[test]
5475        fn test_node_exists() {
5476            use grafeo_common::types::NodeId;
5477
5478            let db = GrafeoDB::new_in_memory();
5479            let session = db.session();
5480
5481            let id = session.create_node(&["Person"]);
5482
5483            assert!(session.node_exists(id));
5484            assert!(!session.node_exists(NodeId::new(9999)));
5485        }
5486
5487        #[test]
5488        fn test_edge_exists() {
5489            use grafeo_common::types::EdgeId;
5490
5491            let db = GrafeoDB::new_in_memory();
5492            let session = db.session();
5493
5494            let alix = session.create_node(&["Person"]);
5495            let gus = session.create_node(&["Person"]);
5496            let edge_id = session.create_edge(alix, gus, "KNOWS");
5497
5498            assert!(session.edge_exists(edge_id));
5499            assert!(!session.edge_exists(EdgeId::new(9999)));
5500        }
5501
5502        #[test]
5503        fn test_get_degree() {
5504            let db = GrafeoDB::new_in_memory();
5505            let session = db.session();
5506
5507            let alix = session.create_node(&["Person"]);
5508            let gus = session.create_node(&["Person"]);
5509            let harm = session.create_node(&["Person"]);
5510
5511            // Alix knows Gus and Harm (2 outgoing)
5512            session.create_edge(alix, gus, "KNOWS");
5513            session.create_edge(alix, harm, "KNOWS");
5514            // Gus knows Alix (1 incoming for Alix)
5515            session.create_edge(gus, alix, "KNOWS");
5516
5517            let (out_degree, in_degree) = session.get_degree(alix);
5518            assert_eq!(out_degree, 2);
5519            assert_eq!(in_degree, 1);
5520
5521            // Node with no edges
5522            let lonely = session.create_node(&["Person"]);
5523            let (out, in_deg) = session.get_degree(lonely);
5524            assert_eq!(out, 0);
5525            assert_eq!(in_deg, 0);
5526        }
5527
5528        #[test]
5529        fn test_get_nodes_batch() {
5530            let db = GrafeoDB::new_in_memory();
5531            let session = db.session();
5532
5533            let alix = session.create_node(&["Person"]);
5534            let gus = session.create_node(&["Person"]);
5535            let harm = session.create_node(&["Person"]);
5536
5537            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5538            assert_eq!(nodes.len(), 3);
5539            assert!(nodes[0].is_some());
5540            assert!(nodes[1].is_some());
5541            assert!(nodes[2].is_some());
5542
5543            // With non-existent node
5544            use grafeo_common::types::NodeId;
5545            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5546            assert_eq!(nodes_with_missing.len(), 3);
5547            assert!(nodes_with_missing[0].is_some());
5548            assert!(nodes_with_missing[1].is_none()); // Missing node
5549            assert!(nodes_with_missing[2].is_some());
5550        }
5551
5552        #[test]
5553        fn test_auto_commit_setting() {
5554            let db = GrafeoDB::new_in_memory();
5555            let mut session = db.session();
5556
5557            // Default is auto-commit enabled
5558            assert!(session.auto_commit());
5559
5560            session.set_auto_commit(false);
5561            assert!(!session.auto_commit());
5562
5563            session.set_auto_commit(true);
5564            assert!(session.auto_commit());
5565        }
5566
5567        #[test]
5568        fn test_transaction_double_begin_nests() {
5569            let db = GrafeoDB::new_in_memory();
5570            let mut session = db.session();
5571
5572            session.begin_transaction().unwrap();
5573            // Second begin_transaction creates a nested transaction (auto-savepoint)
5574            let result = session.begin_transaction();
5575            assert!(result.is_ok());
5576            // Commit the inner (releases savepoint)
5577            session.commit().unwrap();
5578            // Commit the outer
5579            session.commit().unwrap();
5580        }
5581
5582        #[test]
5583        fn test_commit_without_transaction_error() {
5584            let db = GrafeoDB::new_in_memory();
5585            let mut session = db.session();
5586
5587            let result = session.commit();
5588            assert!(result.is_err());
5589        }
5590
5591        #[test]
5592        fn test_rollback_without_transaction_error() {
5593            let db = GrafeoDB::new_in_memory();
5594            let mut session = db.session();
5595
5596            let result = session.rollback();
5597            assert!(result.is_err());
5598        }
5599
5600        #[test]
5601        fn test_create_edge_in_transaction() {
5602            let db = GrafeoDB::new_in_memory();
5603            let mut session = db.session();
5604
5605            // Create nodes outside transaction
5606            let alix = session.create_node(&["Person"]);
5607            let gus = session.create_node(&["Person"]);
5608
5609            // Create edge in transaction
5610            session.begin_transaction().unwrap();
5611            let edge_id = session.create_edge(alix, gus, "KNOWS");
5612
5613            // Edge should be visible in the transaction
5614            assert!(session.edge_exists(edge_id));
5615
5616            // Commit
5617            session.commit().unwrap();
5618
5619            // Edge should still be visible
5620            assert!(session.edge_exists(edge_id));
5621        }
5622
5623        #[test]
5624        fn test_neighbors_empty_node() {
5625            let db = GrafeoDB::new_in_memory();
5626            let session = db.session();
5627
5628            let lonely = session.create_node(&["Person"]);
5629
5630            assert!(session.get_neighbors_outgoing(lonely).is_empty());
5631            assert!(session.get_neighbors_incoming(lonely).is_empty());
5632            assert!(
5633                session
5634                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5635                    .is_empty()
5636            );
5637        }
5638    }
5639
5640    #[test]
5641    fn test_auto_gc_triggers_on_commit_interval() {
5642        use crate::config::Config;
5643
5644        let config = Config::in_memory().with_gc_interval(2);
5645        let db = GrafeoDB::with_config(config).unwrap();
5646        let mut session = db.session();
5647
5648        // First commit: counter = 1, no GC (not a multiple of 2)
5649        session.begin_transaction().unwrap();
5650        session.create_node(&["A"]);
5651        session.commit().unwrap();
5652
5653        // Second commit: counter = 2, GC should trigger (multiple of 2)
5654        session.begin_transaction().unwrap();
5655        session.create_node(&["B"]);
5656        session.commit().unwrap();
5657
5658        // Verify the database is still functional after GC
5659        assert_eq!(db.node_count(), 2);
5660    }
5661
5662    #[test]
5663    fn test_query_timeout_config_propagates_to_session() {
5664        use crate::config::Config;
5665        use std::time::Duration;
5666
5667        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5668        let db = GrafeoDB::with_config(config).unwrap();
5669        let session = db.session();
5670
5671        // Verify the session has a query deadline (timeout was set)
5672        assert!(session.query_deadline().is_some());
5673    }
5674
5675    #[test]
5676    fn test_no_query_timeout_returns_no_deadline() {
5677        let db = GrafeoDB::new_in_memory();
5678        let session = db.session();
5679
5680        // Default config has no timeout
5681        assert!(session.query_deadline().is_none());
5682    }
5683
5684    #[test]
5685    fn test_graph_model_accessor() {
5686        use crate::config::GraphModel;
5687
5688        let db = GrafeoDB::new_in_memory();
5689        let session = db.session();
5690
5691        assert_eq!(session.graph_model(), GraphModel::Lpg);
5692    }
5693
5694    #[cfg(feature = "gql")]
5695    #[test]
5696    fn test_external_store_session() {
5697        use grafeo_core::graph::GraphStoreMut;
5698        use std::sync::Arc;
5699
5700        let config = crate::config::Config::in_memory();
5701        let store =
5702            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5703        let db = GrafeoDB::with_store(store, config).unwrap();
5704
5705        let mut session = db.session();
5706
5707        // Use an explicit transaction so that INSERT and MATCH share the same
5708        // transaction context. With PENDING epochs, uncommitted versions are
5709        // only visible to the owning transaction.
5710        session.begin_transaction().unwrap();
5711        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5712
5713        // Verify we can query through it within the same transaction
5714        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5715        assert_eq!(result.row_count(), 1);
5716
5717        session.commit().unwrap();
5718    }
5719
5720    // ==================== Session Command Tests ====================
5721
5722    #[cfg(feature = "gql")]
5723    mod session_command_tests {
5724        use super::*;
5725        use grafeo_common::types::Value;
5726
5727        #[test]
5728        fn test_use_graph_sets_current_graph() {
5729            let db = GrafeoDB::new_in_memory();
5730            let session = db.session();
5731
5732            // Create the graph first, then USE it
5733            session.execute("CREATE GRAPH mydb").unwrap();
5734            session.execute("USE GRAPH mydb").unwrap();
5735
5736            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5737        }
5738
5739        #[test]
5740        fn test_use_graph_nonexistent_errors() {
5741            let db = GrafeoDB::new_in_memory();
5742            let session = db.session();
5743
5744            let result = session.execute("USE GRAPH doesnotexist");
5745            assert!(result.is_err());
5746            let err = result.unwrap_err().to_string();
5747            assert!(
5748                err.contains("does not exist"),
5749                "Expected 'does not exist' error, got: {err}"
5750            );
5751        }
5752
5753        #[test]
5754        fn test_use_graph_default_always_valid() {
5755            let db = GrafeoDB::new_in_memory();
5756            let session = db.session();
5757
5758            // "default" is always valid, even without CREATE GRAPH
5759            session.execute("USE GRAPH default").unwrap();
5760            assert_eq!(session.current_graph(), Some("default".to_string()));
5761        }
5762
5763        #[test]
5764        fn test_session_set_graph() {
5765            let db = GrafeoDB::new_in_memory();
5766            let session = db.session();
5767
5768            session.execute("CREATE GRAPH analytics").unwrap();
5769            session.execute("SESSION SET GRAPH analytics").unwrap();
5770            assert_eq!(session.current_graph(), Some("analytics".to_string()));
5771        }
5772
5773        #[test]
5774        fn test_session_set_graph_nonexistent_errors() {
5775            let db = GrafeoDB::new_in_memory();
5776            let session = db.session();
5777
5778            let result = session.execute("SESSION SET GRAPH nosuchgraph");
5779            assert!(result.is_err());
5780        }
5781
5782        #[test]
5783        fn test_session_set_time_zone() {
5784            let db = GrafeoDB::new_in_memory();
5785            let session = db.session();
5786
5787            assert_eq!(session.time_zone(), None);
5788
5789            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5790            assert_eq!(session.time_zone(), Some("UTC".to_string()));
5791
5792            session
5793                .execute("SESSION SET TIME ZONE 'America/New_York'")
5794                .unwrap();
5795            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5796        }
5797
5798        #[test]
5799        fn test_session_set_parameter() {
5800            let db = GrafeoDB::new_in_memory();
5801            let session = db.session();
5802
5803            session
5804                .execute("SESSION SET PARAMETER $timeout = 30")
5805                .unwrap();
5806
5807            // Parameter is stored (value is Null for now, since expression
5808            // evaluation is not yet wired up)
5809            assert!(session.get_parameter("timeout").is_some());
5810        }
5811
5812        #[test]
5813        fn test_session_reset_clears_all_state() {
5814            let db = GrafeoDB::new_in_memory();
5815            let session = db.session();
5816
5817            // Set various session state
5818            session.execute("CREATE GRAPH analytics").unwrap();
5819            session.execute("SESSION SET GRAPH analytics").unwrap();
5820            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5821            session
5822                .execute("SESSION SET PARAMETER $limit = 100")
5823                .unwrap();
5824
5825            // Verify state was set
5826            assert!(session.current_graph().is_some());
5827            assert!(session.time_zone().is_some());
5828            assert!(session.get_parameter("limit").is_some());
5829
5830            // Reset everything
5831            session.execute("SESSION RESET").unwrap();
5832
5833            assert_eq!(session.current_graph(), None);
5834            assert_eq!(session.time_zone(), None);
5835            assert!(session.get_parameter("limit").is_none());
5836        }
5837
5838        #[test]
5839        fn test_session_close_clears_state() {
5840            let db = GrafeoDB::new_in_memory();
5841            let session = db.session();
5842
5843            session.execute("CREATE GRAPH analytics").unwrap();
5844            session.execute("SESSION SET GRAPH analytics").unwrap();
5845            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5846
5847            session.execute("SESSION CLOSE").unwrap();
5848
5849            assert_eq!(session.current_graph(), None);
5850            assert_eq!(session.time_zone(), None);
5851        }
5852
5853        #[test]
5854        fn test_create_graph() {
5855            let db = GrafeoDB::new_in_memory();
5856            let session = db.session();
5857
5858            session.execute("CREATE GRAPH mydb").unwrap();
5859
5860            // Should be able to USE it now
5861            session.execute("USE GRAPH mydb").unwrap();
5862            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5863        }
5864
5865        #[test]
5866        fn test_create_graph_duplicate_errors() {
5867            let db = GrafeoDB::new_in_memory();
5868            let session = db.session();
5869
5870            session.execute("CREATE GRAPH mydb").unwrap();
5871            let result = session.execute("CREATE GRAPH mydb");
5872
5873            assert!(result.is_err());
5874            let err = result.unwrap_err().to_string();
5875            assert!(
5876                err.contains("already exists"),
5877                "Expected 'already exists' error, got: {err}"
5878            );
5879        }
5880
5881        #[test]
5882        fn test_create_graph_if_not_exists() {
5883            let db = GrafeoDB::new_in_memory();
5884            let session = db.session();
5885
5886            session.execute("CREATE GRAPH mydb").unwrap();
5887            // Should succeed silently with IF NOT EXISTS
5888            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5889        }
5890
5891        #[test]
5892        fn test_drop_graph() {
5893            let db = GrafeoDB::new_in_memory();
5894            let session = db.session();
5895
5896            session.execute("CREATE GRAPH mydb").unwrap();
5897            session.execute("DROP GRAPH mydb").unwrap();
5898
5899            // Should no longer be usable
5900            let result = session.execute("USE GRAPH mydb");
5901            assert!(result.is_err());
5902        }
5903
5904        #[test]
5905        fn test_drop_graph_nonexistent_errors() {
5906            let db = GrafeoDB::new_in_memory();
5907            let session = db.session();
5908
5909            let result = session.execute("DROP GRAPH nosuchgraph");
5910            assert!(result.is_err());
5911            let err = result.unwrap_err().to_string();
5912            assert!(
5913                err.contains("does not exist"),
5914                "Expected 'does not exist' error, got: {err}"
5915            );
5916        }
5917
5918        #[test]
5919        fn test_drop_graph_if_exists() {
5920            let db = GrafeoDB::new_in_memory();
5921            let session = db.session();
5922
5923            // Should succeed silently with IF EXISTS
5924            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5925        }
5926
5927        #[test]
5928        fn test_start_transaction_via_gql() {
5929            let db = GrafeoDB::new_in_memory();
5930            let session = db.session();
5931
5932            session.execute("START TRANSACTION").unwrap();
5933            assert!(session.in_transaction());
5934            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5935            session.execute("COMMIT").unwrap();
5936            assert!(!session.in_transaction());
5937
5938            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5939            assert_eq!(result.rows.len(), 1);
5940        }
5941
5942        #[test]
5943        fn test_start_transaction_read_only_blocks_insert() {
5944            let db = GrafeoDB::new_in_memory();
5945            let session = db.session();
5946
5947            session.execute("START TRANSACTION READ ONLY").unwrap();
5948            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5949            assert!(result.is_err());
5950            let err = result.unwrap_err().to_string();
5951            assert!(
5952                err.contains("read-only"),
5953                "Expected read-only error, got: {err}"
5954            );
5955            session.execute("ROLLBACK").unwrap();
5956        }
5957
5958        #[test]
5959        fn test_start_transaction_read_only_allows_reads() {
5960            let db = GrafeoDB::new_in_memory();
5961            let mut session = db.session();
5962            session.begin_transaction().unwrap();
5963            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5964            session.commit().unwrap();
5965
5966            session.execute("START TRANSACTION READ ONLY").unwrap();
5967            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5968            assert_eq!(result.rows.len(), 1);
5969            session.execute("COMMIT").unwrap();
5970        }
5971
5972        #[test]
5973        fn test_rollback_via_gql() {
5974            let db = GrafeoDB::new_in_memory();
5975            let session = db.session();
5976
5977            session.execute("START TRANSACTION").unwrap();
5978            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5979            session.execute("ROLLBACK").unwrap();
5980
5981            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5982            assert!(result.rows.is_empty());
5983        }
5984
5985        #[test]
5986        fn test_start_transaction_with_isolation_level() {
5987            let db = GrafeoDB::new_in_memory();
5988            let session = db.session();
5989
5990            session
5991                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5992                .unwrap();
5993            assert!(session.in_transaction());
5994            session.execute("ROLLBACK").unwrap();
5995        }
5996
5997        #[test]
5998        fn test_session_commands_return_empty_result() {
5999            let db = GrafeoDB::new_in_memory();
6000            let session = db.session();
6001
6002            session.execute("CREATE GRAPH test").unwrap();
6003            let result = session.execute("SESSION SET GRAPH test").unwrap();
6004            assert_eq!(result.row_count(), 0);
6005            assert_eq!(result.column_count(), 0);
6006        }
6007
6008        #[test]
6009        fn test_current_graph_default_is_none() {
6010            let db = GrafeoDB::new_in_memory();
6011            let session = db.session();
6012
6013            assert_eq!(session.current_graph(), None);
6014        }
6015
6016        #[test]
6017        fn test_time_zone_default_is_none() {
6018            let db = GrafeoDB::new_in_memory();
6019            let session = db.session();
6020
6021            assert_eq!(session.time_zone(), None);
6022        }
6023
6024        #[test]
6025        fn test_session_state_independent_across_sessions() {
6026            let db = GrafeoDB::new_in_memory();
6027            let session1 = db.session();
6028            let session2 = db.session();
6029
6030            session1.execute("CREATE GRAPH first").unwrap();
6031            session1.execute("CREATE GRAPH second").unwrap();
6032            session1.execute("SESSION SET GRAPH first").unwrap();
6033            session2.execute("SESSION SET GRAPH second").unwrap();
6034
6035            assert_eq!(session1.current_graph(), Some("first".to_string()));
6036            assert_eq!(session2.current_graph(), Some("second".to_string()));
6037        }
6038
6039        #[test]
6040        fn test_show_node_types() {
6041            let db = GrafeoDB::new_in_memory();
6042            let session = db.session();
6043
6044            session
6045                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6046                .unwrap();
6047
6048            let result = session.execute("SHOW NODE TYPES").unwrap();
6049            assert_eq!(
6050                result.columns,
6051                vec!["name", "properties", "constraints", "parents"]
6052            );
6053            assert_eq!(result.rows.len(), 1);
6054            // First column is the type name
6055            assert_eq!(result.rows[0][0], Value::from("Person"));
6056        }
6057
6058        #[test]
6059        fn test_show_edge_types() {
6060            let db = GrafeoDB::new_in_memory();
6061            let session = db.session();
6062
6063            session
6064                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6065                .unwrap();
6066
6067            let result = session.execute("SHOW EDGE TYPES").unwrap();
6068            assert_eq!(
6069                result.columns,
6070                vec!["name", "properties", "source_types", "target_types"]
6071            );
6072            assert_eq!(result.rows.len(), 1);
6073            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6074        }
6075
6076        #[test]
6077        fn test_show_graph_types() {
6078            let db = GrafeoDB::new_in_memory();
6079            let session = db.session();
6080
6081            session
6082                .execute("CREATE NODE TYPE Person (name STRING)")
6083                .unwrap();
6084            session
6085                .execute(
6086                    "CREATE GRAPH TYPE social (\
6087                        NODE TYPE Person (name STRING)\
6088                    )",
6089                )
6090                .unwrap();
6091
6092            let result = session.execute("SHOW GRAPH TYPES").unwrap();
6093            assert_eq!(
6094                result.columns,
6095                vec!["name", "open", "node_types", "edge_types"]
6096            );
6097            assert_eq!(result.rows.len(), 1);
6098            assert_eq!(result.rows[0][0], Value::from("social"));
6099        }
6100
6101        #[test]
6102        fn test_show_graph_type_named() {
6103            let db = GrafeoDB::new_in_memory();
6104            let session = db.session();
6105
6106            session
6107                .execute("CREATE NODE TYPE Person (name STRING)")
6108                .unwrap();
6109            session
6110                .execute(
6111                    "CREATE GRAPH TYPE social (\
6112                        NODE TYPE Person (name STRING)\
6113                    )",
6114                )
6115                .unwrap();
6116
6117            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6118            assert_eq!(result.rows.len(), 1);
6119            assert_eq!(result.rows[0][0], Value::from("social"));
6120        }
6121
6122        #[test]
6123        fn test_show_graph_type_not_found() {
6124            let db = GrafeoDB::new_in_memory();
6125            let session = db.session();
6126
6127            let result = session.execute("SHOW GRAPH TYPE nonexistent");
6128            assert!(result.is_err());
6129        }
6130
6131        #[test]
6132        fn test_show_indexes_via_gql() {
6133            let db = GrafeoDB::new_in_memory();
6134            let session = db.session();
6135
6136            let result = session.execute("SHOW INDEXES").unwrap();
6137            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6138        }
6139
6140        #[test]
6141        fn test_show_constraints_via_gql() {
6142            let db = GrafeoDB::new_in_memory();
6143            let session = db.session();
6144
6145            let result = session.execute("SHOW CONSTRAINTS").unwrap();
6146            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6147        }
6148
6149        #[test]
6150        fn test_pattern_form_graph_type_roundtrip() {
6151            let db = GrafeoDB::new_in_memory();
6152            let session = db.session();
6153
6154            // Register the types first
6155            session
6156                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6157                .unwrap();
6158            session
6159                .execute("CREATE NODE TYPE City (name STRING)")
6160                .unwrap();
6161            session
6162                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6163                .unwrap();
6164            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6165
6166            // Create graph type using pattern form
6167            session
6168                .execute(
6169                    "CREATE GRAPH TYPE social (\
6170                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6171                        (:Person)-[:LIVES_IN]->(:City)\
6172                    )",
6173                )
6174                .unwrap();
6175
6176            // Verify it was created
6177            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6178            assert_eq!(result.rows.len(), 1);
6179            assert_eq!(result.rows[0][0], Value::from("social"));
6180        }
6181    }
6182}