Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::cache::QueryCache;
37use crate::transaction::TransactionManager;
38
39/// Storage key suffix for the implicit default graph within a schema.
40/// Auto-created by `CREATE SCHEMA` and auto-dropped by `DROP SCHEMA`.
41const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
42
43/// Parses a DDL default-value literal string into a [`Value`].
44///
45/// Handles string literals (single- or double-quoted), integers, floats,
46/// booleans (`true`/`false`), and `NULL`.
47fn parse_default_literal(text: &str) -> Value {
48    if text.eq_ignore_ascii_case("null") {
49        return Value::Null;
50    }
51    if text.eq_ignore_ascii_case("true") {
52        return Value::Bool(true);
53    }
54    if text.eq_ignore_ascii_case("false") {
55        return Value::Bool(false);
56    }
57    // String literal: strip surrounding quotes
58    if (text.starts_with('\'') && text.ends_with('\''))
59        || (text.starts_with('"') && text.ends_with('"'))
60    {
61        return Value::String(text[1..text.len() - 1].into());
62    }
63    // Try integer, then float
64    if let Ok(i) = text.parse::<i64>() {
65        return Value::Int64(i);
66    }
67    if let Ok(f) = text.parse::<f64>() {
68        return Value::Float64(f);
69    }
70    // Fallback: treat as string
71    Value::String(text.into())
72}
73
74/// Runtime configuration for creating a new session.
75///
76/// Groups the shared parameters passed to all session constructors, keeping
77/// call sites readable and avoiding long argument lists.
78pub(crate) struct SessionConfig {
79    pub transaction_manager: Arc<TransactionManager>,
80    pub query_cache: Arc<QueryCache>,
81    pub catalog: Arc<Catalog>,
82    pub adaptive_config: AdaptiveConfig,
83    pub factorized_execution: bool,
84    pub graph_model: GraphModel,
85    pub query_timeout: Option<Duration>,
86    pub commit_counter: Arc<AtomicUsize>,
87    pub gc_interval: usize,
88    /// When true, the session permanently blocks all mutations.
89    pub read_only: bool,
90    /// The identity bound to this session (for permission checks).
91    pub identity: crate::auth::Identity,
92    /// Named graph projections shared with the database.
93    #[cfg(feature = "lpg")]
94    pub projections: Arc<
95        parking_lot::RwLock<
96            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
97        >,
98    >,
99}
100
101/// Your handle to the database - execute queries and manage transactions.
102///
103/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
104/// tracks its own transaction state, so you can have multiple concurrent
105/// sessions without them interfering.
106pub struct Session {
107    /// The underlying store.
108    #[cfg(feature = "lpg")]
109    store: Arc<LpgStore>,
110    /// Graph store trait object for pluggable storage backends (read path).
111    graph_store: Arc<dyn GraphStore>,
112    /// Writable graph store (None for read-only databases).
113    graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
114    /// Schema and metadata catalog shared across sessions.
115    catalog: Arc<Catalog>,
116    /// RDF triple store (if RDF feature is enabled).
117    #[cfg(feature = "triple-store")]
118    rdf_store: Arc<RdfStore>,
119    /// Transaction manager.
120    transaction_manager: Arc<TransactionManager>,
121    /// Query cache shared across sessions.
122    query_cache: Arc<QueryCache>,
123    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
124    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
125    /// from within `execute(&self)`.
126    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
127    /// Whether the current transaction is read-only (blocks mutations).
128    read_only_tx: parking_lot::Mutex<bool>,
129    /// Whether the database itself is read-only (set at open time, never changes).
130    /// When true, `read_only_tx` is always true regardless of transaction flags.
131    db_read_only: bool,
132    /// The identity bound to this session (determines permission level).
133    identity: crate::auth::Identity,
134    /// Whether the session is in auto-commit mode.
135    auto_commit: bool,
136    /// Adaptive execution configuration.
137    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
138    adaptive_config: AdaptiveConfig,
139    /// Whether to use factorized execution for multi-hop queries.
140    factorized_execution: bool,
141    /// The graph data model this session operates on.
142    graph_model: GraphModel,
143    /// Maximum time a query may run before being cancelled.
144    query_timeout: Option<Duration>,
145    /// Shared commit counter for triggering auto-GC.
146    commit_counter: Arc<AtomicUsize>,
147    /// GC every N commits (0 = disabled).
148    gc_interval: usize,
149    /// Node count at the start of the current transaction (for PreparedCommit stats).
150    transaction_start_node_count: AtomicUsize,
151    /// Edge count at the start of the current transaction (for PreparedCommit stats).
152    transaction_start_edge_count: AtomicUsize,
153    /// WAL for logging schema changes.
154    #[cfg(feature = "wal")]
155    wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
156    /// Shared WAL graph context tracker for named graph awareness.
157    #[cfg(feature = "wal")]
158    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
159    /// CDC log for change tracking.
160    #[cfg(feature = "cdc")]
161    cdc_log: Arc<crate::cdc::CdcLog>,
162    /// Buffered CDC events for the current transaction.
163    /// Flushed to `cdc_log` on commit, discarded on rollback.
164    #[cfg(feature = "cdc")]
165    cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
166    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
167    current_graph: parking_lot::Mutex<Option<String>>,
168    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
169    /// None = "not set" (uses default schema).
170    current_schema: parking_lot::Mutex<Option<String>>,
171    /// Session time zone override.
172    time_zone: parking_lot::Mutex<Option<String>>,
173    /// Session-level parameters (SET PARAMETER).
174    session_params:
175        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
176    /// Override epoch for time-travel queries (None = use transaction/current epoch).
177    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
178    /// Savepoints within the current transaction.
179    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
180    /// Nesting depth for nested transactions (0 = outermost).
181    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
182    /// releases it, nested `ROLLBACK` rolls back to it.
183    transaction_nesting_depth: parking_lot::Mutex<u32>,
184    /// Named graphs touched during the current transaction (for cross-graph atomicity).
185    /// `None` represents the default graph. Populated at `BEGIN` time and on each
186    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
187    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
188    /// Shared metrics registry (populated when the `metrics` feature is enabled).
189    #[cfg(feature = "metrics")]
190    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
191    /// Transaction start time for duration tracking.
192    #[cfg(feature = "metrics")]
193    tx_start_time: parking_lot::Mutex<Option<Instant>>,
194    /// Named graph projections shared with the database.
195    #[cfg(feature = "lpg")]
196    projections: Arc<
197        parking_lot::RwLock<
198            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
199        >,
200    >,
201}
202
203/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
204#[derive(Clone)]
205struct GraphSavepoint {
206    graph_name: Option<String>,
207    next_node_id: u64,
208    next_edge_id: u64,
209    undo_log_position: usize,
210}
211
212/// Savepoint state: name + per-graph snapshots + the graph that was active.
213#[derive(Clone)]
214struct SavepointState {
215    name: String,
216    graph_snapshots: Vec<GraphSavepoint>,
217    /// The graph that was active when the savepoint was created.
218    /// Reserved for future use (e.g., restoring graph context on rollback).
219    #[allow(dead_code)]
220    active_graph: Option<String>,
221    /// CDC event buffer position at savepoint creation.
222    /// On rollback-to-savepoint, the buffer is truncated to this position.
223    #[cfg(feature = "cdc")]
224    cdc_event_position: usize,
225}
226
227impl Session {
228    /// Creates a new session with adaptive execution configuration.
229    #[cfg(feature = "lpg")]
230    #[allow(dead_code)] // Used when lpg enabled without triple-store
231    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
232        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
233        let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
234        Self {
235            store,
236            graph_store,
237            graph_store_mut,
238            catalog: cfg.catalog,
239            #[cfg(feature = "triple-store")]
240            rdf_store: Arc::new(RdfStore::new()),
241            transaction_manager: cfg.transaction_manager,
242            query_cache: cfg.query_cache,
243            current_transaction: parking_lot::Mutex::new(None),
244            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
245            db_read_only: cfg.read_only,
246            identity: cfg.identity,
247            auto_commit: true,
248            adaptive_config: cfg.adaptive_config,
249            factorized_execution: cfg.factorized_execution,
250            graph_model: cfg.graph_model,
251            query_timeout: cfg.query_timeout,
252            commit_counter: cfg.commit_counter,
253            gc_interval: cfg.gc_interval,
254            transaction_start_node_count: AtomicUsize::new(0),
255            transaction_start_edge_count: AtomicUsize::new(0),
256            #[cfg(feature = "wal")]
257            wal: None,
258            #[cfg(feature = "wal")]
259            wal_graph_context: None,
260            #[cfg(feature = "cdc")]
261            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
262            #[cfg(feature = "cdc")]
263            cdc_pending_events: None,
264            current_graph: parking_lot::Mutex::new(None),
265            current_schema: parking_lot::Mutex::new(None),
266            time_zone: parking_lot::Mutex::new(None),
267            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
268            viewing_epoch_override: parking_lot::Mutex::new(None),
269            savepoints: parking_lot::Mutex::new(Vec::new()),
270            transaction_nesting_depth: parking_lot::Mutex::new(0),
271            touched_graphs: parking_lot::Mutex::new(Vec::new()),
272            #[cfg(feature = "metrics")]
273            metrics: None,
274            #[cfg(feature = "metrics")]
275            tx_start_time: parking_lot::Mutex::new(None),
276            projections: cfg.projections,
277        }
278    }
279
280    /// Sets the WAL for this session (shared with the database).
281    ///
282    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
283    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
284    #[cfg(all(feature = "wal", feature = "lpg"))]
285    pub(crate) fn set_wal(
286        &mut self,
287        wal: Arc<grafeo_storage::wal::LpgWal>,
288        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
289    ) {
290        // Wrap the graph store so query-engine mutations are WAL-logged
291        let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
292            Arc::clone(&self.store),
293            Arc::clone(&wal),
294            Arc::clone(&wal_graph_context),
295        ));
296        self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
297        self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
298        self.wal = Some(wal);
299        self.wal_graph_context = Some(wal_graph_context);
300    }
301
302    /// Sets the CDC log for this session (shared with the database).
303    ///
304    /// Wraps the current write store with a `CdcGraphStore` decorator so
305    /// that all session mutations (INSERT, SET, DELETE via query execution)
306    /// buffer CDC events. The buffer is flushed to the `CdcLog` on commit
307    /// and discarded on rollback.
308    #[cfg(feature = "cdc")]
309    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
310        // Wrap the WRITE store only with CdcGraphStore to intercept mutations.
311        // The read store (self.graph_store) is left unchanged for zero read overhead.
312        if let Some(ref write_store) = self.graph_store_mut {
313            let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
314                Arc::clone(write_store),
315                Arc::clone(&cdc_log),
316            ));
317            self.cdc_pending_events = Some(cdc_store.pending_events());
318            self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
319        }
320        self.cdc_log = cdc_log;
321    }
322
323    /// Sets the metrics registry for this session (shared with the database).
324    #[cfg(feature = "metrics")]
325    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
326        self.metrics = Some(metrics);
327    }
328
329    /// Creates a session backed by an external graph store.
330    ///
331    /// The external store handles all data operations. Transaction management
332    /// (begin/commit/rollback) is not supported for external stores.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if the internal arena allocation fails (out of memory).
337    pub(crate) fn with_external_store(
338        read_store: Arc<dyn GraphStore>,
339        write_store: Option<Arc<dyn GraphStoreMut>>,
340        cfg: SessionConfig,
341    ) -> Result<Self> {
342        Ok(Self {
343            #[cfg(feature = "lpg")]
344            store: Arc::new(LpgStore::new()?),
345            graph_store: read_store,
346            graph_store_mut: write_store,
347            catalog: cfg.catalog,
348            #[cfg(feature = "triple-store")]
349            rdf_store: Arc::new(RdfStore::new()),
350            transaction_manager: cfg.transaction_manager,
351            query_cache: cfg.query_cache,
352            current_transaction: parking_lot::Mutex::new(None),
353            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
354            db_read_only: cfg.read_only,
355            identity: cfg.identity,
356            auto_commit: true,
357            adaptive_config: cfg.adaptive_config,
358            factorized_execution: cfg.factorized_execution,
359            graph_model: cfg.graph_model,
360            query_timeout: cfg.query_timeout,
361            commit_counter: cfg.commit_counter,
362            gc_interval: cfg.gc_interval,
363            transaction_start_node_count: AtomicUsize::new(0),
364            transaction_start_edge_count: AtomicUsize::new(0),
365            #[cfg(feature = "wal")]
366            wal: None,
367            #[cfg(feature = "wal")]
368            wal_graph_context: None,
369            #[cfg(feature = "cdc")]
370            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
371            #[cfg(feature = "cdc")]
372            cdc_pending_events: None,
373            current_graph: parking_lot::Mutex::new(None),
374            current_schema: parking_lot::Mutex::new(None),
375            time_zone: parking_lot::Mutex::new(None),
376            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
377            viewing_epoch_override: parking_lot::Mutex::new(None),
378            savepoints: parking_lot::Mutex::new(Vec::new()),
379            transaction_nesting_depth: parking_lot::Mutex::new(0),
380            touched_graphs: parking_lot::Mutex::new(Vec::new()),
381            #[cfg(feature = "metrics")]
382            metrics: None,
383            #[cfg(feature = "metrics")]
384            tx_start_time: parking_lot::Mutex::new(None),
385            #[cfg(feature = "lpg")]
386            projections: cfg.projections,
387        })
388    }
389
390    /// Returns the graph model this session operates on.
391    #[must_use]
392    pub fn graph_model(&self) -> GraphModel {
393        self.graph_model
394    }
395
396    /// Returns the identity bound to this session.
397    #[must_use]
398    pub fn identity(&self) -> &crate::auth::Identity {
399        &self.identity
400    }
401
402    // === Session State Management ===
403
404    /// Sets the current graph for this session (USE GRAPH).
405    pub fn use_graph(&self, name: &str) {
406        *self.current_graph.lock() = Some(name.to_string());
407    }
408
409    /// Returns the current graph name, if any.
410    #[must_use]
411    pub fn current_graph(&self) -> Option<String> {
412        self.current_graph.lock().clone()
413    }
414
415    /// Sets the current schema for this session (SESSION SET SCHEMA).
416    ///
417    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
418    pub fn set_schema(&self, name: &str) {
419        *self.current_schema.lock() = Some(name.to_string());
420    }
421
422    /// Returns the current schema name, if any.
423    ///
424    /// `None` means "not set", which resolves to the default schema.
425    #[must_use]
426    pub fn current_schema(&self) -> Option<String> {
427        self.current_schema.lock().clone()
428    }
429
430    /// Computes the effective storage key for a graph, accounting for schema context.
431    ///
432    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
433    /// Uses `/` as separator since it is invalid in GQL identifiers.
434    fn effective_graph_key(&self, graph_name: &str) -> String {
435        let schema = self.current_schema.lock().clone();
436        match schema {
437            Some(s) => format!("{s}/{graph_name}"),
438            None => graph_name.to_string(),
439        }
440    }
441
442    /// Computes the effective storage key for a type, accounting for schema context.
443    ///
444    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
445    fn effective_type_key(&self, type_name: &str) -> String {
446        let schema = self.current_schema.lock().clone();
447        match schema {
448            Some(s) => format!("{s}/{type_name}"),
449            None => type_name.to_string(),
450        }
451    }
452
453    /// Returns the effective storage key for the current graph, accounting for schema.
454    ///
455    /// Combines `current_schema` and `current_graph` into a flat lookup key.
456    fn active_graph_storage_key(&self) -> Option<String> {
457        let graph = self.current_graph.lock().clone();
458        let schema = self.current_schema.lock().clone();
459        match (&schema, &graph) {
460            (None, None) => None,
461            (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
462            (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
463            (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
464                Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
465            }
466            (None, Some(name)) => Some(name.clone()),
467            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
468        }
469    }
470
471    /// Returns the graph store for the currently active graph.
472    ///
473    /// If `current_graph` is `None` or `"default"`, returns the session's
474    /// default `graph_store` (already WAL-wrapped for the default graph).
475    /// Otherwise looks up the named graph in the root store and wraps it
476    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
477    /// graph context.
478    fn active_store(&self) -> Arc<dyn GraphStore> {
479        let key = self.active_graph_storage_key();
480        match key {
481            None => Arc::clone(&self.graph_store),
482            #[cfg(feature = "lpg")]
483            Some(ref name) => match self.store.graph(name) {
484                Some(named_store) => {
485                    #[cfg(feature = "wal")]
486                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
487                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
488                            named_store,
489                            Arc::clone(wal),
490                            name.clone(),
491                            Arc::clone(ctx),
492                        )) as Arc<dyn GraphStore>;
493                    }
494                    named_store as Arc<dyn GraphStore>
495                }
496                None => Arc::clone(&self.graph_store),
497            },
498            #[cfg(not(feature = "lpg"))]
499            Some(_) => Arc::clone(&self.graph_store),
500        }
501    }
502
503    /// Returns the writable store for the active graph, if available.
504    ///
505    /// Returns `None` for read-only databases. For named graphs, wraps
506    /// the store with WAL logging when durability is enabled.
507    fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
508        let key = self.active_graph_storage_key();
509        match key {
510            None => self.graph_store_mut.as_ref().map(Arc::clone),
511            #[cfg(feature = "lpg")]
512            Some(ref name) => match self.store.graph(name) {
513                Some(named_store) => {
514                    let mut store: Arc<dyn GraphStoreMut> = named_store;
515
516                    #[cfg(feature = "wal")]
517                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
518                        store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
519                            // WAL needs Arc<LpgStore>, get it fresh
520                            self.store
521                                .graph(name)
522                                .unwrap_or_else(|| Arc::clone(&self.store)),
523                            Arc::clone(wal),
524                            name.clone(),
525                            Arc::clone(ctx),
526                        ));
527                    }
528
529                    #[cfg(feature = "cdc")]
530                    if let Some(ref pending) = self.cdc_pending_events {
531                        store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
532                            store,
533                            Arc::clone(&self.cdc_log),
534                            Arc::clone(pending),
535                        ));
536                    }
537
538                    Some(store)
539                }
540                None => self.graph_store_mut.as_ref().map(Arc::clone),
541            },
542            #[cfg(not(feature = "lpg"))]
543            Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
544        }
545    }
546
547    /// Returns the concrete `LpgStore` for the currently active graph.
548    ///
549    /// Used by direct CRUD methods that need the concrete store type
550    /// for versioned operations.
551    #[cfg(feature = "lpg")]
552    fn active_lpg_store(&self) -> Arc<LpgStore> {
553        let key = self.active_graph_storage_key();
554        match key {
555            None => Arc::clone(&self.store),
556            Some(ref name) => self
557                .store
558                .graph(name)
559                .unwrap_or_else(|| Arc::clone(&self.store)),
560        }
561    }
562
563    /// Resolves a graph name to a concrete `LpgStore`.
564    /// `None` and `"default"` resolve to the session's root store.
565    #[cfg(feature = "lpg")]
566    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
567        match graph_name {
568            None => Arc::clone(&self.store),
569            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
570            Some(name) => self
571                .store
572                .graph(name)
573                .unwrap_or_else(|| Arc::clone(&self.store)),
574        }
575    }
576
577    /// Records the current graph as "touched" if a transaction is active.
578    ///
579    /// Uses the full storage key (schema/graph) so that commit/rollback
580    /// can resolve the correct store via `resolve_store`.
581    fn track_graph_touch(&self) {
582        if self.current_transaction.lock().is_some() {
583            let key = self.active_graph_storage_key();
584            let mut touched = self.touched_graphs.lock();
585            if !touched.contains(&key) {
586                touched.push(key);
587            }
588        }
589    }
590
591    /// Sets the session time zone.
592    pub fn set_time_zone(&self, tz: &str) {
593        *self.time_zone.lock() = Some(tz.to_string());
594    }
595
596    /// Returns the session time zone, if set.
597    #[must_use]
598    pub fn time_zone(&self) -> Option<String> {
599        self.time_zone.lock().clone()
600    }
601
602    /// Sets a session parameter.
603    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
604        self.session_params.lock().insert(key.to_string(), value);
605    }
606
607    /// Gets a session parameter by cloning it.
608    #[must_use]
609    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
610        self.session_params.lock().get(key).cloned()
611    }
612
613    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
614    pub fn reset_session(&self) {
615        *self.current_schema.lock() = None;
616        *self.current_graph.lock() = None;
617        *self.time_zone.lock() = None;
618        self.session_params.lock().clear();
619        *self.viewing_epoch_override.lock() = None;
620    }
621
622    /// Resets only the session schema (Section 7.2 GR1).
623    pub fn reset_schema(&self) {
624        *self.current_schema.lock() = None;
625    }
626
627    /// Resets only the session graph (Section 7.2 GR2).
628    pub fn reset_graph(&self) {
629        *self.current_graph.lock() = None;
630    }
631
632    /// Resets only the session time zone (Section 7.2 GR3).
633    pub fn reset_time_zone(&self) {
634        *self.time_zone.lock() = None;
635    }
636
637    /// Resets only session parameters (Section 7.2 GR4).
638    pub fn reset_parameters(&self) {
639        self.session_params.lock().clear();
640    }
641
642    // --- Time-travel API ---
643
644    /// Sets a viewing epoch override for time-travel queries.
645    ///
646    /// While set, all queries on this session see the database as it existed
647    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
648    /// to return to normal behavior.
649    pub fn set_viewing_epoch(&self, epoch: EpochId) {
650        *self.viewing_epoch_override.lock() = Some(epoch);
651    }
652
653    /// Clears the viewing epoch override, returning to normal behavior.
654    pub fn clear_viewing_epoch(&self) {
655        *self.viewing_epoch_override.lock() = None;
656    }
657
658    /// Returns the current viewing epoch override, if any.
659    #[must_use]
660    pub fn viewing_epoch(&self) -> Option<EpochId> {
661        *self.viewing_epoch_override.lock()
662    }
663
664    /// Returns all versions of a node with their creation/deletion epochs.
665    ///
666    /// Properties and labels reflect the current state (not versioned per-epoch).
667    #[cfg(feature = "lpg")]
668    #[must_use]
669    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
670        self.active_lpg_store().get_node_history(id)
671    }
672
673    /// Returns all versions of an edge with their creation/deletion epochs.
674    ///
675    /// Properties reflect the current state (not versioned per-epoch).
676    #[cfg(feature = "lpg")]
677    #[must_use]
678    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
679        self.active_lpg_store().get_edge_history(id)
680    }
681
682    /// Checks that the session's graph model supports LPG operations.
683    fn require_lpg(&self, language: &str) -> Result<()> {
684        if self.graph_model == GraphModel::Rdf {
685            return Err(grafeo_common::utils::error::Error::Internal(format!(
686                "This is an RDF database. {language} queries require an LPG database."
687            )));
688        }
689        Ok(())
690    }
691
692    /// Checks that the session's identity is permitted to execute the given
693    /// statement kind. Returns an error if the role is insufficient.
694    ///
695    /// Short-circuits for Admin identities (the common case for embedded use
696    /// and benchmarks) to avoid any overhead on the hot path.
697    #[inline]
698    fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
699        // Fast path: Admin can do everything
700        if self.identity.can_admin() {
701            return Ok(());
702        }
703        crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
704            grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
705                grafeo_common::utils::error::QueryErrorKind::Semantic,
706                denied.to_string(),
707            ))
708        })
709    }
710
711    /// Executes a session or transaction command, returning an empty result.
712    #[cfg(feature = "gql")]
713    fn execute_session_command(
714        &self,
715        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
716    ) -> Result<QueryResult> {
717        use grafeo_adapters::query::gql::ast::SessionCommand;
718        #[cfg(feature = "lpg")]
719        use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
720        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
721
722        // Check role-based permission for graph management commands
723        match &cmd {
724            SessionCommand::CreateGraph { .. }
725            | SessionCommand::DropGraph { .. }
726            | SessionCommand::CreateProjection { .. }
727            | SessionCommand::DropProjection { .. } => {
728                self.require_permission(crate::auth::StatementKind::Write)?;
729            }
730            _ => {} // Session state + transaction control: always allowed
731        }
732
733        // Check per-graph grants for graph-scoped commands
734        if self.identity.has_grants() {
735            match &cmd {
736                SessionCommand::CreateGraph { name, .. }
737                | SessionCommand::DropGraph { name, .. } => {
738                    if !self
739                        .identity
740                        .can_access_graph(name, crate::auth::Role::ReadWrite)
741                    {
742                        return Err(Error::Query(QueryError::new(
743                            QueryErrorKind::Semantic,
744                            format!(
745                                "permission denied: no grant for graph '{name}' (user: {})",
746                                self.identity.user_id()
747                            ),
748                        )));
749                    }
750                }
751                _ => {}
752            }
753        }
754
755        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
756        if *self.read_only_tx.lock() {
757            match &cmd {
758                SessionCommand::CreateGraph { .. }
759                | SessionCommand::DropGraph { .. }
760                | SessionCommand::CreateProjection { .. }
761                | SessionCommand::DropProjection { .. } => {
762                    return Err(Error::Transaction(
763                        grafeo_common::utils::error::TransactionError::ReadOnly,
764                    ));
765                }
766                _ => {} // Session state + transaction control allowed
767            }
768        }
769
770        match cmd {
771            #[cfg(feature = "lpg")]
772            SessionCommand::CreateGraph {
773                name,
774                if_not_exists,
775                typed,
776                like_graph,
777                copy_of,
778                open: _,
779            } => {
780                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
781                let storage_key = self.effective_graph_key(&name);
782
783                // Validate source graph exists for LIKE / AS COPY OF
784                if let Some(ref src) = like_graph {
785                    let src_key = self.effective_graph_key(src);
786                    if self.store.graph(&src_key).is_none() {
787                        return Err(Error::Query(QueryError::new(
788                            QueryErrorKind::Semantic,
789                            format!("Source graph '{src}' does not exist"),
790                        )));
791                    }
792                }
793                if let Some(ref src) = copy_of {
794                    let src_key = self.effective_graph_key(src);
795                    if self.store.graph(&src_key).is_none() {
796                        return Err(Error::Query(QueryError::new(
797                            QueryErrorKind::Semantic,
798                            format!("Source graph '{src}' does not exist"),
799                        )));
800                    }
801                }
802
803                let created = self
804                    .store
805                    .create_graph(&storage_key)
806                    .map_err(|e| Error::Internal(e.to_string()))?;
807                if !created && !if_not_exists {
808                    return Err(Error::Query(QueryError::new(
809                        QueryErrorKind::Semantic,
810                        format!("Graph '{name}' already exists"),
811                    )));
812                }
813                if created {
814                    #[cfg(feature = "wal")]
815                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
816                        name: storage_key.clone(),
817                    });
818                }
819
820                // AS COPY OF: copy data from source graph
821                if let Some(ref src) = copy_of {
822                    let src_key = self.effective_graph_key(src);
823                    self.store
824                        .copy_graph(Some(&src_key), Some(&storage_key))
825                        .map_err(|e| Error::Internal(e.to_string()))?;
826                }
827
828                // Bind to graph type if specified.
829                // If the parser produced a '/' in the name it is already a qualified
830                // "schema/type" key; otherwise resolve against the current schema.
831                if let Some(type_name) = typed
832                    && let Err(e) = self.catalog.bind_graph_type(
833                        &storage_key,
834                        if type_name.contains('/') {
835                            type_name.clone()
836                        } else {
837                            self.effective_type_key(&type_name)
838                        },
839                    )
840                {
841                    return Err(Error::Query(QueryError::new(
842                        QueryErrorKind::Semantic,
843                        e.to_string(),
844                    )));
845                }
846
847                // LIKE: copy graph type binding from source
848                if let Some(ref src) = like_graph {
849                    let src_key = self.effective_graph_key(src);
850                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
851                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
852                    }
853                }
854
855                Ok(QueryResult::empty())
856            }
857            #[cfg(feature = "lpg")]
858            SessionCommand::DropGraph { name, if_exists } => {
859                let storage_key = self.effective_graph_key(&name);
860                let dropped = self.store.drop_graph(&storage_key);
861                if !dropped && !if_exists {
862                    return Err(Error::Query(QueryError::new(
863                        QueryErrorKind::Semantic,
864                        format!("Graph '{name}' does not exist"),
865                    )));
866                }
867                if dropped {
868                    #[cfg(feature = "wal")]
869                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
870                        name: storage_key.clone(),
871                    });
872                    // If this session was using the dropped graph, reset to default
873                    let mut current = self.current_graph.lock();
874                    if current
875                        .as_deref()
876                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
877                    {
878                        *current = None;
879                    }
880                }
881                Ok(QueryResult::empty())
882            }
883            #[cfg(feature = "lpg")]
884            SessionCommand::UseGraph(name) => {
885                // Check per-graph grant before switching
886                if self.identity.has_grants()
887                    && !name.eq_ignore_ascii_case("default")
888                    && !self
889                        .identity
890                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
891                {
892                    return Err(Error::Query(QueryError::new(
893                        QueryErrorKind::Semantic,
894                        format!(
895                            "permission denied: no grant for graph '{name}' (user: {})",
896                            self.identity.user_id()
897                        ),
898                    )));
899                }
900                // Verify graph exists (resolve within current schema)
901                let effective_key = self.effective_graph_key(&name);
902                if !name.eq_ignore_ascii_case("default")
903                    && self.store.graph(&effective_key).is_none()
904                {
905                    return Err(Error::Query(QueryError::new(
906                        QueryErrorKind::Semantic,
907                        format!("Graph '{name}' does not exist"),
908                    )));
909                }
910                self.use_graph(&name);
911                // Track the new graph if in a transaction
912                self.track_graph_touch();
913                Ok(QueryResult::empty())
914            }
915            #[cfg(feature = "lpg")]
916            SessionCommand::SessionSetGraph(name) => {
917                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
918                // Check per-graph grant before switching (same as USE GRAPH)
919                if self.identity.has_grants()
920                    && !name.eq_ignore_ascii_case("default")
921                    && !self
922                        .identity
923                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
924                {
925                    return Err(Error::Query(QueryError::new(
926                        QueryErrorKind::Semantic,
927                        format!(
928                            "permission denied: no grant for graph '{name}' (user: {})",
929                            self.identity.user_id()
930                        ),
931                    )));
932                }
933                let effective_key = self.effective_graph_key(&name);
934                if !name.eq_ignore_ascii_case("default")
935                    && self.store.graph(&effective_key).is_none()
936                {
937                    return Err(Error::Query(QueryError::new(
938                        QueryErrorKind::Semantic,
939                        format!("Graph '{name}' does not exist"),
940                    )));
941                }
942                self.use_graph(&name);
943                // Track the new graph if in a transaction
944                self.track_graph_touch();
945                Ok(QueryResult::empty())
946            }
947            SessionCommand::SessionSetSchema(name) => {
948                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
949                if !self.catalog.schema_exists(&name) {
950                    return Err(Error::Query(QueryError::new(
951                        QueryErrorKind::Semantic,
952                        format!("Schema '{name}' does not exist"),
953                    )));
954                }
955                self.set_schema(&name);
956                Ok(QueryResult::empty())
957            }
958            SessionCommand::SessionSetTimeZone(tz) => {
959                self.set_time_zone(&tz);
960                Ok(QueryResult::empty())
961            }
962            SessionCommand::SessionSetParameter(key, expr) => {
963                if key.eq_ignore_ascii_case("viewing_epoch") {
964                    match Self::eval_integer_literal(&expr) {
965                        Some(n) if n >= 0 => {
966                            self.set_viewing_epoch(EpochId::new(n as u64));
967                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
968                        }
969                        _ => Err(Error::Query(QueryError::new(
970                            QueryErrorKind::Semantic,
971                            "viewing_epoch must be a non-negative integer literal",
972                        ))),
973                    }
974                } else {
975                    // For now, store parameter name with Null value.
976                    // Full expression evaluation would require building and executing a plan.
977                    self.set_parameter(&key, Value::Null);
978                    Ok(QueryResult::empty())
979                }
980            }
981            SessionCommand::SessionReset(target) => {
982                use grafeo_adapters::query::gql::ast::SessionResetTarget;
983                match target {
984                    SessionResetTarget::All => self.reset_session(),
985                    SessionResetTarget::Schema => self.reset_schema(),
986                    SessionResetTarget::Graph => self.reset_graph(),
987                    SessionResetTarget::TimeZone => self.reset_time_zone(),
988                    SessionResetTarget::Parameters => self.reset_parameters(),
989                }
990                Ok(QueryResult::empty())
991            }
992            SessionCommand::SessionClose => {
993                self.reset_session();
994                Ok(QueryResult::empty())
995            }
996            #[cfg(feature = "lpg")]
997            SessionCommand::StartTransaction {
998                read_only,
999                isolation_level,
1000            } => {
1001                let engine_level = isolation_level.map(|l| match l {
1002                    TransactionIsolationLevel::ReadCommitted => {
1003                        crate::transaction::IsolationLevel::ReadCommitted
1004                    }
1005                    TransactionIsolationLevel::SnapshotIsolation => {
1006                        crate::transaction::IsolationLevel::SnapshotIsolation
1007                    }
1008                    TransactionIsolationLevel::Serializable => {
1009                        crate::transaction::IsolationLevel::Serializable
1010                    }
1011                });
1012                self.begin_transaction_inner(read_only, engine_level)?;
1013                Ok(QueryResult::status("Transaction started"))
1014            }
1015            #[cfg(feature = "lpg")]
1016            SessionCommand::Commit => {
1017                self.commit_inner()?;
1018                Ok(QueryResult::status("Transaction committed"))
1019            }
1020            #[cfg(feature = "lpg")]
1021            SessionCommand::Rollback => {
1022                self.rollback_inner()?;
1023                Ok(QueryResult::status("Transaction rolled back"))
1024            }
1025            #[cfg(feature = "lpg")]
1026            SessionCommand::Savepoint(name) => {
1027                self.savepoint(&name)?;
1028                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1029            }
1030            #[cfg(feature = "lpg")]
1031            SessionCommand::RollbackToSavepoint(name) => {
1032                self.rollback_to_savepoint(&name)?;
1033                Ok(QueryResult::status(format!(
1034                    "Rolled back to savepoint '{name}'"
1035                )))
1036            }
1037            #[cfg(feature = "lpg")]
1038            SessionCommand::ReleaseSavepoint(name) => {
1039                self.release_savepoint(&name)?;
1040                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1041            }
1042            #[cfg(feature = "lpg")]
1043            SessionCommand::CreateProjection {
1044                name,
1045                node_labels,
1046                edge_types,
1047            } => {
1048                use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1049                use std::collections::hash_map::Entry;
1050
1051                let spec = ProjectionSpec::new()
1052                    .with_node_labels(node_labels)
1053                    .with_edge_types(edge_types);
1054
1055                let store = self.active_store();
1056                let projection = Arc::new(GraphProjection::new(store, spec));
1057                let mut projections = self.projections.write();
1058                match projections.entry(name.clone()) {
1059                    Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1060                        QueryErrorKind::Semantic,
1061                        format!("Projection '{name}' already exists"),
1062                    ))),
1063                    Entry::Vacant(e) => {
1064                        e.insert(projection);
1065                        Ok(QueryResult::status(format!("Projection '{name}' created")))
1066                    }
1067                }
1068            }
1069            #[cfg(feature = "lpg")]
1070            SessionCommand::DropProjection { name } => {
1071                let removed = self.projections.write().remove(&name).is_some();
1072                if !removed {
1073                    return Err(Error::Query(QueryError::new(
1074                        QueryErrorKind::Semantic,
1075                        format!("Projection '{name}' does not exist"),
1076                    )));
1077                }
1078                Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1079            }
1080            #[cfg(feature = "lpg")]
1081            SessionCommand::ShowProjections => {
1082                let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1083                names.sort();
1084                let rows: Vec<Vec<Value>> =
1085                    names.into_iter().map(|n| vec![Value::from(n)]).collect();
1086                Ok(QueryResult {
1087                    columns: vec!["name".to_string()],
1088                    column_types: Vec::new(),
1089                    rows,
1090                    ..QueryResult::empty()
1091                })
1092            }
1093            #[cfg(not(feature = "lpg"))]
1094            _ => Err(grafeo_common::utils::error::Error::Internal(
1095                "This command requires the `lpg` feature".to_string(),
1096            )),
1097        }
1098    }
1099
1100    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
1101    #[cfg(feature = "wal")]
1102    fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1103        if let Some(ref wal) = self.wal
1104            && let Err(e) = wal.log(record)
1105        {
1106            grafeo_warn!("Failed to log schema change to WAL: {}", e);
1107        }
1108    }
1109
1110    /// Executes a schema DDL command, returning a status result.
1111    #[cfg(all(feature = "lpg", feature = "gql"))]
1112    fn execute_schema_command(
1113        &self,
1114        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1115    ) -> Result<QueryResult> {
1116        use crate::catalog::{
1117            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1118        };
1119        use grafeo_adapters::query::gql::ast::SchemaStatement;
1120        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1121        #[cfg(feature = "wal")]
1122        use grafeo_storage::wal::WalRecord;
1123
1124        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
1125        macro_rules! wal_log {
1126            ($self:expr, $record:expr) => {
1127                #[cfg(feature = "wal")]
1128                $self.log_schema_wal(&$record);
1129            };
1130        }
1131
1132        let result = match cmd {
1133            SchemaStatement::CreateNodeType(stmt) => {
1134                let effective_name = self.effective_type_key(&stmt.name);
1135                #[cfg(feature = "wal")]
1136                let props_for_wal: Vec<(String, String, bool)> = stmt
1137                    .properties
1138                    .iter()
1139                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1140                    .collect();
1141                let def = NodeTypeDefinition {
1142                    name: effective_name.clone(),
1143                    properties: stmt
1144                        .properties
1145                        .iter()
1146                        .map(|p| TypedProperty {
1147                            name: p.name.clone(),
1148                            data_type: PropertyDataType::from_type_name(&p.data_type),
1149                            nullable: p.nullable,
1150                            default_value: p
1151                                .default_value
1152                                .as_ref()
1153                                .map(|s| parse_default_literal(s)),
1154                        })
1155                        .collect(),
1156                    constraints: Vec::new(),
1157                    parent_types: stmt.parent_types.clone(),
1158                };
1159                let result = if stmt.or_replace {
1160                    let _ = self.catalog.drop_node_type(&effective_name);
1161                    self.catalog.register_node_type(def)
1162                } else {
1163                    self.catalog.register_node_type(def)
1164                };
1165                match result {
1166                    Ok(()) => {
1167                        wal_log!(
1168                            self,
1169                            WalRecord::CreateNodeType {
1170                                name: effective_name.clone(),
1171                                properties: props_for_wal,
1172                                constraints: Vec::new(),
1173                            }
1174                        );
1175                        Ok(QueryResult::status(format!(
1176                            "Created node type '{}'",
1177                            stmt.name
1178                        )))
1179                    }
1180                    Err(e) if stmt.if_not_exists => {
1181                        let _ = e;
1182                        Ok(QueryResult::status("No change"))
1183                    }
1184                    Err(e) => Err(Error::Query(QueryError::new(
1185                        QueryErrorKind::Semantic,
1186                        e.to_string(),
1187                    ))),
1188                }
1189            }
1190            SchemaStatement::CreateEdgeType(stmt) => {
1191                let effective_name = self.effective_type_key(&stmt.name);
1192                #[cfg(feature = "wal")]
1193                let props_for_wal: Vec<(String, String, bool)> = stmt
1194                    .properties
1195                    .iter()
1196                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1197                    .collect();
1198                let def = EdgeTypeDefinition {
1199                    name: effective_name.clone(),
1200                    properties: stmt
1201                        .properties
1202                        .iter()
1203                        .map(|p| TypedProperty {
1204                            name: p.name.clone(),
1205                            data_type: PropertyDataType::from_type_name(&p.data_type),
1206                            nullable: p.nullable,
1207                            default_value: p
1208                                .default_value
1209                                .as_ref()
1210                                .map(|s| parse_default_literal(s)),
1211                        })
1212                        .collect(),
1213                    constraints: Vec::new(),
1214                    source_node_types: stmt.source_node_types.clone(),
1215                    target_node_types: stmt.target_node_types.clone(),
1216                };
1217                let result = if stmt.or_replace {
1218                    let _ = self.catalog.drop_edge_type_def(&effective_name);
1219                    self.catalog.register_edge_type_def(def)
1220                } else {
1221                    self.catalog.register_edge_type_def(def)
1222                };
1223                match result {
1224                    Ok(()) => {
1225                        wal_log!(
1226                            self,
1227                            WalRecord::CreateEdgeType {
1228                                name: effective_name.clone(),
1229                                properties: props_for_wal,
1230                                constraints: Vec::new(),
1231                            }
1232                        );
1233                        Ok(QueryResult::status(format!(
1234                            "Created edge type '{}'",
1235                            stmt.name
1236                        )))
1237                    }
1238                    Err(e) if stmt.if_not_exists => {
1239                        let _ = e;
1240                        Ok(QueryResult::status("No change"))
1241                    }
1242                    Err(e) => Err(Error::Query(QueryError::new(
1243                        QueryErrorKind::Semantic,
1244                        e.to_string(),
1245                    ))),
1246                }
1247            }
1248            SchemaStatement::CreateVectorIndex(stmt) => {
1249                Self::create_vector_index_on_store(
1250                    &self.active_lpg_store(),
1251                    &stmt.node_label,
1252                    &stmt.property,
1253                    stmt.dimensions,
1254                    stmt.metric.as_deref(),
1255                )?;
1256                wal_log!(
1257                    self,
1258                    WalRecord::CreateIndex {
1259                        name: stmt.name.clone(),
1260                        label: stmt.node_label.clone(),
1261                        property: stmt.property.clone(),
1262                        index_type: "vector".to_string(),
1263                    }
1264                );
1265                Ok(QueryResult::status(format!(
1266                    "Created vector index '{}'",
1267                    stmt.name
1268                )))
1269            }
1270            SchemaStatement::DropNodeType { name, if_exists } => {
1271                let effective_name = self.effective_type_key(&name);
1272                match self.catalog.drop_node_type(&effective_name) {
1273                    Ok(()) => {
1274                        wal_log!(
1275                            self,
1276                            WalRecord::DropNodeType {
1277                                name: effective_name
1278                            }
1279                        );
1280                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1281                    }
1282                    Err(e) if if_exists => {
1283                        let _ = e;
1284                        Ok(QueryResult::status("No change"))
1285                    }
1286                    Err(e) => Err(Error::Query(QueryError::new(
1287                        QueryErrorKind::Semantic,
1288                        e.to_string(),
1289                    ))),
1290                }
1291            }
1292            SchemaStatement::DropEdgeType { name, if_exists } => {
1293                let effective_name = self.effective_type_key(&name);
1294                match self.catalog.drop_edge_type_def(&effective_name) {
1295                    Ok(()) => {
1296                        wal_log!(
1297                            self,
1298                            WalRecord::DropEdgeType {
1299                                name: effective_name
1300                            }
1301                        );
1302                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1303                    }
1304                    Err(e) if if_exists => {
1305                        let _ = e;
1306                        Ok(QueryResult::status("No change"))
1307                    }
1308                    Err(e) => Err(Error::Query(QueryError::new(
1309                        QueryErrorKind::Semantic,
1310                        e.to_string(),
1311                    ))),
1312                }
1313            }
1314            SchemaStatement::CreateIndex(stmt) => {
1315                use crate::catalog::IndexType as CatalogIndexType;
1316                use grafeo_adapters::query::gql::ast::IndexKind;
1317                let active = self.active_lpg_store();
1318                let index_type_str = match stmt.index_kind {
1319                    IndexKind::Property => "property",
1320                    IndexKind::BTree => "btree",
1321                    IndexKind::Text => "text",
1322                    IndexKind::Vector => "vector",
1323                };
1324                match stmt.index_kind {
1325                    IndexKind::Property | IndexKind::BTree => {
1326                        for prop in &stmt.properties {
1327                            active.create_property_index(prop);
1328                        }
1329                    }
1330                    IndexKind::Text => {
1331                        for prop in &stmt.properties {
1332                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1333                        }
1334                    }
1335                    IndexKind::Vector => {
1336                        for prop in &stmt.properties {
1337                            Self::create_vector_index_on_store(
1338                                &active,
1339                                &stmt.label,
1340                                prop,
1341                                stmt.options.dimensions,
1342                                stmt.options.metric.as_deref(),
1343                            )?;
1344                        }
1345                    }
1346                }
1347                // Register each property index in the catalog for SHOW INDEXES
1348                // and DROP INDEX name-based lookup.
1349                let catalog_index_type = match stmt.index_kind {
1350                    IndexKind::Property => CatalogIndexType::Hash,
1351                    IndexKind::BTree => CatalogIndexType::BTree,
1352                    IndexKind::Text => CatalogIndexType::FullText,
1353                    IndexKind::Vector => CatalogIndexType::Hash,
1354                };
1355                let label_id = self.catalog.get_or_create_label(&stmt.label);
1356                for prop in &stmt.properties {
1357                    let prop_id = self.catalog.get_or_create_property_key(prop);
1358                    self.catalog
1359                        .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1360                }
1361                #[cfg(feature = "wal")]
1362                for prop in &stmt.properties {
1363                    wal_log!(
1364                        self,
1365                        WalRecord::CreateIndex {
1366                            name: stmt.name.clone(),
1367                            label: stmt.label.clone(),
1368                            property: prop.clone(),
1369                            index_type: index_type_str.to_string(),
1370                        }
1371                    );
1372                }
1373                Ok(QueryResult::status(format!(
1374                    "Created {} index '{}'",
1375                    index_type_str, stmt.name
1376                )))
1377            }
1378            SchemaStatement::DropIndex { name, if_exists } => {
1379                // Look up the index by name in the catalog to find the
1380                // underlying property, then drop from both catalog and store.
1381                if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1382                    let def = self.catalog.get_index(index_id);
1383                    self.catalog.drop_index(index_id);
1384                    if let Some(def) = def
1385                        && let Some(prop_name) =
1386                            self.catalog.get_property_key_name(def.property_key)
1387                    {
1388                        self.active_lpg_store().drop_property_index(&prop_name);
1389                    }
1390                    wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1391                    Ok(QueryResult::status(format!("Dropped index '{name}'")))
1392                } else if if_exists {
1393                    Ok(QueryResult::status("No change".to_string()))
1394                } else {
1395                    Err(Error::Query(QueryError::new(
1396                        QueryErrorKind::Semantic,
1397                        format!("Index '{name}' does not exist"),
1398                    )))
1399                }
1400            }
1401            SchemaStatement::CreateConstraint(stmt) => {
1402                use crate::catalog::TypeConstraint;
1403                use grafeo_adapters::query::gql::ast::ConstraintKind;
1404                let kind_str = match stmt.constraint_kind {
1405                    ConstraintKind::Unique => "unique",
1406                    ConstraintKind::NodeKey => "node_key",
1407                    ConstraintKind::NotNull => "not_null",
1408                    ConstraintKind::Exists => "exists",
1409                };
1410                let constraint_name = stmt
1411                    .name
1412                    .clone()
1413                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1414
1415                // Register constraint in catalog type definitions
1416                match stmt.constraint_kind {
1417                    ConstraintKind::Unique => {
1418                        for prop in &stmt.properties {
1419                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1420                            let prop_id = self.catalog.get_or_create_property_key(prop);
1421                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1422                        }
1423                        let _ = self.catalog.add_constraint_to_type(
1424                            &stmt.label,
1425                            TypeConstraint::Unique(stmt.properties.clone()),
1426                        );
1427                    }
1428                    ConstraintKind::NodeKey => {
1429                        for prop in &stmt.properties {
1430                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1431                            let prop_id = self.catalog.get_or_create_property_key(prop);
1432                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1433                            let _ = self.catalog.add_required_property(label_id, prop_id);
1434                        }
1435                        let _ = self.catalog.add_constraint_to_type(
1436                            &stmt.label,
1437                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1438                        );
1439                    }
1440                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1441                        for prop in &stmt.properties {
1442                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1443                            let prop_id = self.catalog.get_or_create_property_key(prop);
1444                            let _ = self.catalog.add_required_property(label_id, prop_id);
1445                            let _ = self.catalog.add_constraint_to_type(
1446                                &stmt.label,
1447                                TypeConstraint::NotNull(prop.clone()),
1448                            );
1449                        }
1450                    }
1451                }
1452
1453                wal_log!(
1454                    self,
1455                    WalRecord::CreateConstraint {
1456                        name: constraint_name.clone(),
1457                        label: stmt.label.clone(),
1458                        properties: stmt.properties.clone(),
1459                        kind: kind_str.to_string(),
1460                    }
1461                );
1462                Ok(QueryResult::status(format!(
1463                    "Created {kind_str} constraint '{constraint_name}'"
1464                )))
1465            }
1466            SchemaStatement::DropConstraint { name, if_exists } => {
1467                let _ = if_exists;
1468                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1469                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1470            }
1471            SchemaStatement::CreateGraphType(stmt) => {
1472                use crate::catalog::GraphTypeDefinition;
1473                use grafeo_adapters::query::gql::ast::InlineElementType;
1474
1475                let effective_name = self.effective_type_key(&stmt.name);
1476
1477                // GG04: LIKE clause copies type from existing graph
1478                let (mut node_types, mut edge_types, open) =
1479                    if let Some(ref like_graph) = stmt.like_graph {
1480                        // Infer types from the graph's bound type, or use its existing types
1481                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1482                            if let Some(existing) = self
1483                                .catalog
1484                                .schema()
1485                                .and_then(|s| s.get_graph_type(&type_name))
1486                            {
1487                                (
1488                                    existing.allowed_node_types.clone(),
1489                                    existing.allowed_edge_types.clone(),
1490                                    existing.open,
1491                                )
1492                            } else {
1493                                (Vec::new(), Vec::new(), true)
1494                            }
1495                        } else {
1496                            // GG22: Infer from graph data (labels used in graph)
1497                            let nt = self.catalog.all_node_type_names();
1498                            let et = self.catalog.all_edge_type_names();
1499                            if nt.is_empty() && et.is_empty() {
1500                                (Vec::new(), Vec::new(), true)
1501                            } else {
1502                                (nt, et, false)
1503                            }
1504                        }
1505                    } else {
1506                        // Prefix element type names with schema for consistency
1507                        let nt = stmt
1508                            .node_types
1509                            .iter()
1510                            .map(|n| self.effective_type_key(n))
1511                            .collect();
1512                        let et = stmt
1513                            .edge_types
1514                            .iter()
1515                            .map(|n| self.effective_type_key(n))
1516                            .collect();
1517                        (nt, et, stmt.open)
1518                    };
1519
1520                // GG03: Register inline element types and add their names
1521                for inline in &stmt.inline_types {
1522                    match inline {
1523                        InlineElementType::Node {
1524                            name,
1525                            properties,
1526                            key_labels,
1527                            ..
1528                        } => {
1529                            let inline_effective = self.effective_type_key(name);
1530                            let def = NodeTypeDefinition {
1531                                name: inline_effective.clone(),
1532                                properties: properties
1533                                    .iter()
1534                                    .map(|p| TypedProperty {
1535                                        name: p.name.clone(),
1536                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1537                                        nullable: p.nullable,
1538                                        default_value: None,
1539                                    })
1540                                    .collect(),
1541                                constraints: Vec::new(),
1542                                parent_types: key_labels.clone(),
1543                            };
1544                            // Register or replace so inline defs override existing
1545                            self.catalog.register_or_replace_node_type(def);
1546                            #[cfg(feature = "wal")]
1547                            {
1548                                let props_for_wal: Vec<(String, String, bool)> = properties
1549                                    .iter()
1550                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1551                                    .collect();
1552                                self.log_schema_wal(&WalRecord::CreateNodeType {
1553                                    name: inline_effective.clone(),
1554                                    properties: props_for_wal,
1555                                    constraints: Vec::new(),
1556                                });
1557                            }
1558                            if !node_types.contains(&inline_effective) {
1559                                node_types.push(inline_effective);
1560                            }
1561                        }
1562                        InlineElementType::Edge {
1563                            name,
1564                            properties,
1565                            source_node_types,
1566                            target_node_types,
1567                            ..
1568                        } => {
1569                            let inline_effective = self.effective_type_key(name);
1570                            let def = EdgeTypeDefinition {
1571                                name: inline_effective.clone(),
1572                                properties: properties
1573                                    .iter()
1574                                    .map(|p| TypedProperty {
1575                                        name: p.name.clone(),
1576                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1577                                        nullable: p.nullable,
1578                                        default_value: None,
1579                                    })
1580                                    .collect(),
1581                                constraints: Vec::new(),
1582                                source_node_types: source_node_types.clone(),
1583                                target_node_types: target_node_types.clone(),
1584                            };
1585                            self.catalog.register_or_replace_edge_type_def(def);
1586                            #[cfg(feature = "wal")]
1587                            {
1588                                let props_for_wal: Vec<(String, String, bool)> = properties
1589                                    .iter()
1590                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1591                                    .collect();
1592                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1593                                    name: inline_effective.clone(),
1594                                    properties: props_for_wal,
1595                                    constraints: Vec::new(),
1596                                });
1597                            }
1598                            if !edge_types.contains(&inline_effective) {
1599                                edge_types.push(inline_effective);
1600                            }
1601                        }
1602                    }
1603                }
1604
1605                let def = GraphTypeDefinition {
1606                    name: effective_name.clone(),
1607                    allowed_node_types: node_types.clone(),
1608                    allowed_edge_types: edge_types.clone(),
1609                    open,
1610                };
1611                let result = if stmt.or_replace {
1612                    // Drop existing first, ignore error if not found
1613                    let _ = self.catalog.drop_graph_type(&effective_name);
1614                    self.catalog.register_graph_type(def)
1615                } else {
1616                    self.catalog.register_graph_type(def)
1617                };
1618                match result {
1619                    Ok(()) => {
1620                        wal_log!(
1621                            self,
1622                            WalRecord::CreateGraphType {
1623                                name: effective_name.clone(),
1624                                node_types,
1625                                edge_types,
1626                                open,
1627                            }
1628                        );
1629                        Ok(QueryResult::status(format!(
1630                            "Created graph type '{}'",
1631                            stmt.name
1632                        )))
1633                    }
1634                    Err(e) if stmt.if_not_exists => {
1635                        let _ = e;
1636                        Ok(QueryResult::status("No change"))
1637                    }
1638                    Err(e) => Err(Error::Query(QueryError::new(
1639                        QueryErrorKind::Semantic,
1640                        e.to_string(),
1641                    ))),
1642                }
1643            }
1644            SchemaStatement::DropGraphType { name, if_exists } => {
1645                let effective_name = self.effective_type_key(&name);
1646                match self.catalog.drop_graph_type(&effective_name) {
1647                    Ok(()) => {
1648                        wal_log!(
1649                            self,
1650                            WalRecord::DropGraphType {
1651                                name: effective_name
1652                            }
1653                        );
1654                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1655                    }
1656                    Err(e) if if_exists => {
1657                        let _ = e;
1658                        Ok(QueryResult::status("No change"))
1659                    }
1660                    Err(e) => Err(Error::Query(QueryError::new(
1661                        QueryErrorKind::Semantic,
1662                        e.to_string(),
1663                    ))),
1664                }
1665            }
1666            SchemaStatement::CreateSchema {
1667                name,
1668                if_not_exists,
1669            } => match self.catalog.register_schema_namespace(name.clone()) {
1670                Ok(()) => {
1671                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1672                    // Auto-create the schema's default graph partition so that
1673                    // SESSION SET SCHEMA + queries work without an explicit graph.
1674                    let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1675                    if self.store.create_graph(&default_key).unwrap_or(false) {
1676                        wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1677                    }
1678                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1679                }
1680                Err(e) if if_not_exists => {
1681                    let _ = e;
1682                    Ok(QueryResult::status("No change"))
1683                }
1684                Err(e) => Err(Error::Query(QueryError::new(
1685                    QueryErrorKind::Semantic,
1686                    e.to_string(),
1687                ))),
1688            },
1689            SchemaStatement::DropSchema { name, if_exists } => {
1690                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping.
1691                // The auto-created __default__ graph is exempt from the check.
1692                let prefix = format!("{name}/");
1693                let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1694                let has_graphs = self
1695                    .store
1696                    .graph_names()
1697                    .iter()
1698                    .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1699                let has_types = self
1700                    .catalog
1701                    .all_node_type_names()
1702                    .iter()
1703                    .any(|n| n.starts_with(&prefix))
1704                    || self
1705                        .catalog
1706                        .all_edge_type_names()
1707                        .iter()
1708                        .any(|n| n.starts_with(&prefix))
1709                    || self
1710                        .catalog
1711                        .all_graph_type_names()
1712                        .iter()
1713                        .any(|n| n.starts_with(&prefix));
1714                if has_graphs || has_types {
1715                    return Err(Error::Query(QueryError::new(
1716                        QueryErrorKind::Semantic,
1717                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1718                    )));
1719                }
1720                match self.catalog.drop_schema_namespace(&name) {
1721                    Ok(()) => {
1722                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1723                        // Drop the auto-created default graph partition
1724                        if self.store.drop_graph(&default_graph_key) {
1725                            wal_log!(
1726                                self,
1727                                WalRecord::DropNamedGraph {
1728                                    name: default_graph_key,
1729                                }
1730                            );
1731                        }
1732                        // If this session was using the dropped schema, reset it
1733                        let mut current = self.current_schema.lock();
1734                        if current
1735                            .as_deref()
1736                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1737                        {
1738                            *current = None;
1739                        }
1740                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1741                    }
1742                    Err(e) if if_exists => {
1743                        let _ = e;
1744                        Ok(QueryResult::status("No change"))
1745                    }
1746                    Err(e) => Err(Error::Query(QueryError::new(
1747                        QueryErrorKind::Semantic,
1748                        e.to_string(),
1749                    ))),
1750                }
1751            }
1752            SchemaStatement::AlterNodeType(stmt) => {
1753                use grafeo_adapters::query::gql::ast::TypeAlteration;
1754                let effective_name = self.effective_type_key(&stmt.name);
1755                let mut wal_alts = Vec::new();
1756                for alt in &stmt.alterations {
1757                    match alt {
1758                        TypeAlteration::AddProperty(prop) => {
1759                            let typed = TypedProperty {
1760                                name: prop.name.clone(),
1761                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1762                                nullable: prop.nullable,
1763                                default_value: prop
1764                                    .default_value
1765                                    .as_ref()
1766                                    .map(|s| parse_default_literal(s)),
1767                            };
1768                            self.catalog
1769                                .alter_node_type_add_property(&effective_name, typed)
1770                                .map_err(|e| {
1771                                    Error::Query(QueryError::new(
1772                                        QueryErrorKind::Semantic,
1773                                        e.to_string(),
1774                                    ))
1775                                })?;
1776                            wal_alts.push((
1777                                "add".to_string(),
1778                                prop.name.clone(),
1779                                prop.data_type.clone(),
1780                                prop.nullable,
1781                            ));
1782                        }
1783                        TypeAlteration::DropProperty(name) => {
1784                            self.catalog
1785                                .alter_node_type_drop_property(&effective_name, name)
1786                                .map_err(|e| {
1787                                    Error::Query(QueryError::new(
1788                                        QueryErrorKind::Semantic,
1789                                        e.to_string(),
1790                                    ))
1791                                })?;
1792                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1793                        }
1794                    }
1795                }
1796                wal_log!(
1797                    self,
1798                    WalRecord::AlterNodeType {
1799                        name: effective_name,
1800                        alterations: wal_alts,
1801                    }
1802                );
1803                Ok(QueryResult::status(format!(
1804                    "Altered node type '{}'",
1805                    stmt.name
1806                )))
1807            }
1808            SchemaStatement::AlterEdgeType(stmt) => {
1809                use grafeo_adapters::query::gql::ast::TypeAlteration;
1810                let effective_name = self.effective_type_key(&stmt.name);
1811                let mut wal_alts = Vec::new();
1812                for alt in &stmt.alterations {
1813                    match alt {
1814                        TypeAlteration::AddProperty(prop) => {
1815                            let typed = TypedProperty {
1816                                name: prop.name.clone(),
1817                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1818                                nullable: prop.nullable,
1819                                default_value: prop
1820                                    .default_value
1821                                    .as_ref()
1822                                    .map(|s| parse_default_literal(s)),
1823                            };
1824                            self.catalog
1825                                .alter_edge_type_add_property(&effective_name, typed)
1826                                .map_err(|e| {
1827                                    Error::Query(QueryError::new(
1828                                        QueryErrorKind::Semantic,
1829                                        e.to_string(),
1830                                    ))
1831                                })?;
1832                            wal_alts.push((
1833                                "add".to_string(),
1834                                prop.name.clone(),
1835                                prop.data_type.clone(),
1836                                prop.nullable,
1837                            ));
1838                        }
1839                        TypeAlteration::DropProperty(name) => {
1840                            self.catalog
1841                                .alter_edge_type_drop_property(&effective_name, name)
1842                                .map_err(|e| {
1843                                    Error::Query(QueryError::new(
1844                                        QueryErrorKind::Semantic,
1845                                        e.to_string(),
1846                                    ))
1847                                })?;
1848                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1849                        }
1850                    }
1851                }
1852                wal_log!(
1853                    self,
1854                    WalRecord::AlterEdgeType {
1855                        name: effective_name,
1856                        alterations: wal_alts,
1857                    }
1858                );
1859                Ok(QueryResult::status(format!(
1860                    "Altered edge type '{}'",
1861                    stmt.name
1862                )))
1863            }
1864            SchemaStatement::AlterGraphType(stmt) => {
1865                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1866                let effective_name = self.effective_type_key(&stmt.name);
1867                let mut wal_alts = Vec::new();
1868                for alt in &stmt.alterations {
1869                    match alt {
1870                        GraphTypeAlteration::AddNodeType(name) => {
1871                            self.catalog
1872                                .alter_graph_type_add_node_type(&effective_name, name.clone())
1873                                .map_err(|e| {
1874                                    Error::Query(QueryError::new(
1875                                        QueryErrorKind::Semantic,
1876                                        e.to_string(),
1877                                    ))
1878                                })?;
1879                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1880                        }
1881                        GraphTypeAlteration::DropNodeType(name) => {
1882                            self.catalog
1883                                .alter_graph_type_drop_node_type(&effective_name, name)
1884                                .map_err(|e| {
1885                                    Error::Query(QueryError::new(
1886                                        QueryErrorKind::Semantic,
1887                                        e.to_string(),
1888                                    ))
1889                                })?;
1890                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1891                        }
1892                        GraphTypeAlteration::AddEdgeType(name) => {
1893                            self.catalog
1894                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
1895                                .map_err(|e| {
1896                                    Error::Query(QueryError::new(
1897                                        QueryErrorKind::Semantic,
1898                                        e.to_string(),
1899                                    ))
1900                                })?;
1901                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1902                        }
1903                        GraphTypeAlteration::DropEdgeType(name) => {
1904                            self.catalog
1905                                .alter_graph_type_drop_edge_type(&effective_name, name)
1906                                .map_err(|e| {
1907                                    Error::Query(QueryError::new(
1908                                        QueryErrorKind::Semantic,
1909                                        e.to_string(),
1910                                    ))
1911                                })?;
1912                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1913                        }
1914                    }
1915                }
1916                wal_log!(
1917                    self,
1918                    WalRecord::AlterGraphType {
1919                        name: effective_name,
1920                        alterations: wal_alts,
1921                    }
1922                );
1923                Ok(QueryResult::status(format!(
1924                    "Altered graph type '{}'",
1925                    stmt.name
1926                )))
1927            }
1928            SchemaStatement::CreateProcedure(stmt) => {
1929                use crate::catalog::ProcedureDefinition;
1930
1931                let def = ProcedureDefinition {
1932                    name: stmt.name.clone(),
1933                    params: stmt
1934                        .params
1935                        .iter()
1936                        .map(|p| (p.name.clone(), p.param_type.clone()))
1937                        .collect(),
1938                    returns: stmt
1939                        .returns
1940                        .iter()
1941                        .map(|r| (r.name.clone(), r.return_type.clone()))
1942                        .collect(),
1943                    body: stmt.body.clone(),
1944                };
1945
1946                if stmt.or_replace {
1947                    self.catalog.replace_procedure(def).map_err(|e| {
1948                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1949                    })?;
1950                } else {
1951                    match self.catalog.register_procedure(def) {
1952                        Ok(()) => {}
1953                        Err(_) if stmt.if_not_exists => {
1954                            return Ok(QueryResult::empty());
1955                        }
1956                        Err(e) => {
1957                            return Err(Error::Query(QueryError::new(
1958                                QueryErrorKind::Semantic,
1959                                e.to_string(),
1960                            )));
1961                        }
1962                    }
1963                }
1964
1965                wal_log!(
1966                    self,
1967                    WalRecord::CreateProcedure {
1968                        name: stmt.name.clone(),
1969                        params: stmt
1970                            .params
1971                            .iter()
1972                            .map(|p| (p.name.clone(), p.param_type.clone()))
1973                            .collect(),
1974                        returns: stmt
1975                            .returns
1976                            .iter()
1977                            .map(|r| (r.name.clone(), r.return_type.clone()))
1978                            .collect(),
1979                        body: stmt.body,
1980                    }
1981                );
1982                Ok(QueryResult::status(format!(
1983                    "Created procedure '{}'",
1984                    stmt.name
1985                )))
1986            }
1987            SchemaStatement::DropProcedure { name, if_exists } => {
1988                match self.catalog.drop_procedure(&name) {
1989                    Ok(()) => {}
1990                    Err(_) if if_exists => {
1991                        return Ok(QueryResult::empty());
1992                    }
1993                    Err(e) => {
1994                        return Err(Error::Query(QueryError::new(
1995                            QueryErrorKind::Semantic,
1996                            e.to_string(),
1997                        )));
1998                    }
1999                }
2000                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
2001                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
2002            }
2003            SchemaStatement::ShowIndexes => {
2004                return self.execute_show_indexes();
2005            }
2006            SchemaStatement::ShowConstraints => {
2007                return self.execute_show_constraints();
2008            }
2009            SchemaStatement::ShowNodeTypes => {
2010                return self.execute_show_node_types();
2011            }
2012            SchemaStatement::ShowEdgeTypes => {
2013                return self.execute_show_edge_types();
2014            }
2015            SchemaStatement::ShowGraphTypes => {
2016                return self.execute_show_graph_types();
2017            }
2018            SchemaStatement::ShowGraphType(name) => {
2019                return self.execute_show_graph_type(&name);
2020            }
2021            SchemaStatement::ShowCurrentGraphType => {
2022                return self.execute_show_current_graph_type();
2023            }
2024            SchemaStatement::ShowGraphs => {
2025                return self.execute_show_graphs();
2026            }
2027            SchemaStatement::ShowSchemas => {
2028                return self.execute_show_schemas();
2029            }
2030        };
2031
2032        // Invalidate all cached query plans after any successful DDL change.
2033        // DDL is rare, so clearing the entire cache is cheap and correct.
2034        if result.is_ok() {
2035            self.query_cache.clear();
2036        }
2037
2038        result
2039    }
2040
2041    /// Creates a vector index on the store by scanning existing nodes.
2042    #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2043    fn create_vector_index_on_store(
2044        store: &LpgStore,
2045        label: &str,
2046        property: &str,
2047        dimensions: Option<usize>,
2048        metric: Option<&str>,
2049    ) -> Result<()> {
2050        use grafeo_common::types::{PropertyKey, Value};
2051        use grafeo_common::utils::error::Error;
2052        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex, VectorIndexKind};
2053
2054        let metric = match metric {
2055            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2056                Error::Internal(format!(
2057                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2058                ))
2059            })?,
2060            None => DistanceMetric::Cosine,
2061        };
2062
2063        let prop_key = PropertyKey::new(property);
2064        let mut found_dims: Option<usize> = dimensions;
2065        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2066
2067        for node in store.nodes_with_label(label) {
2068            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2069                if let Some(expected) = found_dims {
2070                    if v.len() != expected {
2071                        return Err(Error::Internal(format!(
2072                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
2073                            v.len(),
2074                            node.id.0
2075                        )));
2076                    }
2077                } else {
2078                    found_dims = Some(v.len());
2079                }
2080                vectors.push((node.id, v.to_vec()));
2081            }
2082        }
2083
2084        let Some(dims) = found_dims else {
2085            return Err(Error::Internal(format!(
2086                "No vector properties found on :{label}({property}) and no dimensions specified"
2087            )));
2088        };
2089
2090        let config = HnswConfig::new(dims, metric);
2091        let index = HnswIndex::with_capacity(config, vectors.len());
2092        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2093        for (node_id, vec) in &vectors {
2094            index.insert(*node_id, vec, &accessor);
2095        }
2096
2097        store.add_vector_index(label, property, Arc::new(VectorIndexKind::Hnsw(index)));
2098        Ok(())
2099    }
2100
2101    /// Stub for when vector-index feature is not enabled.
2102    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2103    fn create_vector_index_on_store(
2104        _store: &LpgStore,
2105        _label: &str,
2106        _property: &str,
2107        _dimensions: Option<usize>,
2108        _metric: Option<&str>,
2109    ) -> Result<()> {
2110        Err(grafeo_common::utils::error::Error::Internal(
2111            "Vector index support requires the 'vector-index' feature".to_string(),
2112        ))
2113    }
2114
2115    /// Creates a text index on the store by scanning existing nodes.
2116    #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2117    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2118        use grafeo_common::types::{PropertyKey, Value};
2119        use grafeo_core::index::text::{BM25Config, InvertedIndex};
2120
2121        let mut index = InvertedIndex::new(BM25Config::default());
2122        let prop_key = PropertyKey::new(property);
2123
2124        let nodes = store.nodes_by_label(label);
2125        for node_id in nodes {
2126            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2127                index.insert(node_id, text.as_str());
2128            }
2129        }
2130
2131        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2132        Ok(())
2133    }
2134
2135    /// Stub for when text-index feature is not enabled.
2136    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2137    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2138        Err(grafeo_common::utils::error::Error::Internal(
2139            "Text index support requires the 'text-index' feature".to_string(),
2140        ))
2141    }
2142
2143    /// Returns a table of all indexes from the catalog.
2144    fn execute_show_indexes(&self) -> Result<QueryResult> {
2145        let indexes = self.catalog.all_indexes();
2146        let columns = vec![
2147            "name".to_string(),
2148            "type".to_string(),
2149            "label".to_string(),
2150            "property".to_string(),
2151        ];
2152        let rows: Vec<Vec<Value>> = indexes
2153            .into_iter()
2154            .map(|def| {
2155                let label_name = self
2156                    .catalog
2157                    .get_label_name(def.label)
2158                    .unwrap_or_else(|| "?".into());
2159                let prop_name = self
2160                    .catalog
2161                    .get_property_key_name(def.property_key)
2162                    .unwrap_or_else(|| "?".into());
2163                vec![
2164                    Value::from(def.name),
2165                    Value::from(format!("{:?}", def.index_type)),
2166                    Value::from(&*label_name),
2167                    Value::from(&*prop_name),
2168                ]
2169            })
2170            .collect();
2171        Ok(QueryResult {
2172            columns,
2173            column_types: Vec::new(),
2174            rows,
2175            ..QueryResult::empty()
2176        })
2177    }
2178
2179    /// Returns a table of all constraints (currently metadata-only).
2180    fn execute_show_constraints(&self) -> Result<QueryResult> {
2181        // Constraints are tracked in WAL but not yet in a queryable catalog.
2182        // Return an empty table with the expected schema.
2183        Ok(QueryResult {
2184            columns: vec![
2185                "name".to_string(),
2186                "type".to_string(),
2187                "label".to_string(),
2188                "properties".to_string(),
2189            ],
2190            column_types: Vec::new(),
2191            rows: Vec::new(),
2192            ..QueryResult::empty()
2193        })
2194    }
2195
2196    /// Returns a table of all registered node types in the current schema.
2197    fn execute_show_node_types(&self) -> Result<QueryResult> {
2198        let columns = vec![
2199            "name".to_string(),
2200            "properties".to_string(),
2201            "constraints".to_string(),
2202            "parents".to_string(),
2203        ];
2204        let schema = self.current_schema.lock().clone();
2205        let all_names = self.catalog.all_node_type_names();
2206        let type_names: Vec<String> = match &schema {
2207            Some(s) => {
2208                let prefix = format!("{s}/");
2209                all_names
2210                    .into_iter()
2211                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2212                    .collect()
2213            }
2214            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2215        };
2216        let rows: Vec<Vec<Value>> = type_names
2217            .into_iter()
2218            .filter_map(|name| {
2219                let lookup = match &schema {
2220                    Some(s) => format!("{s}/{name}"),
2221                    None => name.clone(),
2222                };
2223                let def = self.catalog.get_node_type(&lookup)?;
2224                let props: Vec<String> = def
2225                    .properties
2226                    .iter()
2227                    .map(|p| {
2228                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2229                        format!("{} {}{}", p.name, p.data_type, nullable)
2230                    })
2231                    .collect();
2232                let constraints: Vec<String> =
2233                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
2234                let parents = def.parent_types.join(", ");
2235                Some(vec![
2236                    Value::from(name),
2237                    Value::from(props.join(", ")),
2238                    Value::from(constraints.join(", ")),
2239                    Value::from(parents),
2240                ])
2241            })
2242            .collect();
2243        Ok(QueryResult {
2244            columns,
2245            column_types: Vec::new(),
2246            rows,
2247            ..QueryResult::empty()
2248        })
2249    }
2250
2251    /// Returns a table of all registered edge types in the current schema.
2252    fn execute_show_edge_types(&self) -> Result<QueryResult> {
2253        let columns = vec![
2254            "name".to_string(),
2255            "properties".to_string(),
2256            "source_types".to_string(),
2257            "target_types".to_string(),
2258        ];
2259        let schema = self.current_schema.lock().clone();
2260        let all_names = self.catalog.all_edge_type_names();
2261        let type_names: Vec<String> = match &schema {
2262            Some(s) => {
2263                let prefix = format!("{s}/");
2264                all_names
2265                    .into_iter()
2266                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2267                    .collect()
2268            }
2269            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2270        };
2271        let rows: Vec<Vec<Value>> = type_names
2272            .into_iter()
2273            .filter_map(|name| {
2274                let lookup = match &schema {
2275                    Some(s) => format!("{s}/{name}"),
2276                    None => name.clone(),
2277                };
2278                let def = self.catalog.get_edge_type_def(&lookup)?;
2279                let props: Vec<String> = def
2280                    .properties
2281                    .iter()
2282                    .map(|p| {
2283                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2284                        format!("{} {}{}", p.name, p.data_type, nullable)
2285                    })
2286                    .collect();
2287                let src = def.source_node_types.join(", ");
2288                let tgt = def.target_node_types.join(", ");
2289                Some(vec![
2290                    Value::from(name),
2291                    Value::from(props.join(", ")),
2292                    Value::from(src),
2293                    Value::from(tgt),
2294                ])
2295            })
2296            .collect();
2297        Ok(QueryResult {
2298            columns,
2299            column_types: Vec::new(),
2300            rows,
2301            ..QueryResult::empty()
2302        })
2303    }
2304
2305    /// Returns a table of all registered graph types in the current schema.
2306    fn execute_show_graph_types(&self) -> Result<QueryResult> {
2307        let columns = vec![
2308            "name".to_string(),
2309            "open".to_string(),
2310            "node_types".to_string(),
2311            "edge_types".to_string(),
2312        ];
2313        let schema = self.current_schema.lock().clone();
2314        let all_names = self.catalog.all_graph_type_names();
2315        let type_names: Vec<String> = match &schema {
2316            Some(s) => {
2317                let prefix = format!("{s}/");
2318                all_names
2319                    .into_iter()
2320                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2321                    .collect()
2322            }
2323            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2324        };
2325        let rows: Vec<Vec<Value>> = type_names
2326            .into_iter()
2327            .filter_map(|name| {
2328                let lookup = match &schema {
2329                    Some(s) => format!("{s}/{name}"),
2330                    None => name.clone(),
2331                };
2332                let def = self.catalog.get_graph_type_def(&lookup)?;
2333                // Strip schema prefix from allowed type names for display
2334                let strip = |n: &String| -> String {
2335                    match &schema {
2336                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2337                        None => n.clone(),
2338                    }
2339                };
2340                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2341                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2342                Some(vec![
2343                    Value::from(name),
2344                    Value::from(def.open),
2345                    Value::from(node_types.join(", ")),
2346                    Value::from(edge_types.join(", ")),
2347                ])
2348            })
2349            .collect();
2350        Ok(QueryResult {
2351            columns,
2352            column_types: Vec::new(),
2353            rows,
2354            ..QueryResult::empty()
2355        })
2356    }
2357
2358    /// Returns the list of named graphs visible in the current schema context.
2359    ///
2360    /// When a session schema is set, only graphs belonging to that schema are
2361    /// shown (their compound prefix is stripped). When no schema is set, graphs
2362    /// without a schema prefix are shown (the default schema).
2363    #[cfg(feature = "lpg")]
2364    fn execute_show_graphs(&self) -> Result<QueryResult> {
2365        let schema = self.current_schema.lock().clone();
2366        let all_names = self.store.graph_names();
2367
2368        let mut names: Vec<String> = match &schema {
2369            Some(s) => {
2370                let prefix = format!("{s}/");
2371                all_names
2372                    .into_iter()
2373                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2374                    .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2375                    .collect()
2376            }
2377            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2378        };
2379        names.sort();
2380
2381        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2382        Ok(QueryResult {
2383            columns: vec!["name".to_string()],
2384            column_types: Vec::new(),
2385            rows,
2386            ..QueryResult::empty()
2387        })
2388    }
2389
2390    /// Returns the list of all schema namespaces.
2391    fn execute_show_schemas(&self) -> Result<QueryResult> {
2392        let mut names = self.catalog.schema_names();
2393        names.sort();
2394        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2395        Ok(QueryResult {
2396            columns: vec!["name".to_string()],
2397            column_types: Vec::new(),
2398            rows,
2399            ..QueryResult::empty()
2400        })
2401    }
2402
2403    /// Returns detailed info for a specific graph type.
2404    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2405        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2406
2407        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2408            Error::Query(QueryError::new(
2409                QueryErrorKind::Semantic,
2410                format!("Graph type '{name}' not found"),
2411            ))
2412        })?;
2413
2414        let columns = vec![
2415            "name".to_string(),
2416            "open".to_string(),
2417            "node_types".to_string(),
2418            "edge_types".to_string(),
2419        ];
2420        let rows = vec![vec![
2421            Value::from(def.name),
2422            Value::from(def.open),
2423            Value::from(def.allowed_node_types.join(", ")),
2424            Value::from(def.allowed_edge_types.join(", ")),
2425        ]];
2426        Ok(QueryResult {
2427            columns,
2428            column_types: Vec::new(),
2429            rows,
2430            ..QueryResult::empty()
2431        })
2432    }
2433
2434    /// Returns the graph type bound to the current graph.
2435    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2436        let graph_name = self
2437            .current_graph()
2438            .unwrap_or_else(|| "default".to_string());
2439        let columns = vec![
2440            "graph".to_string(),
2441            "graph_type".to_string(),
2442            "open".to_string(),
2443            "node_types".to_string(),
2444            "edge_types".to_string(),
2445        ];
2446
2447        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2448            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2449        {
2450            let rows = vec![vec![
2451                Value::from(graph_name),
2452                Value::from(type_name),
2453                Value::from(def.open),
2454                Value::from(def.allowed_node_types.join(", ")),
2455                Value::from(def.allowed_edge_types.join(", ")),
2456            ]];
2457            return Ok(QueryResult {
2458                columns,
2459                column_types: Vec::new(),
2460                rows,
2461                ..QueryResult::empty()
2462            });
2463        }
2464
2465        // No graph type binding found
2466        Ok(QueryResult {
2467            columns,
2468            column_types: Vec::new(),
2469            rows: vec![vec![
2470                Value::from(graph_name),
2471                Value::Null,
2472                Value::Null,
2473                Value::Null,
2474                Value::Null,
2475            ]],
2476            ..QueryResult::empty()
2477        })
2478    }
2479
2480    /// Executes a GQL query.
2481    ///
2482    /// # Errors
2483    ///
2484    /// Returns an error if the query fails to parse or execute.
2485    ///
2486    /// # Examples
2487    ///
2488    /// ```no_run
2489    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2490    /// use grafeo_engine::GrafeoDB;
2491    ///
2492    /// let db = GrafeoDB::new_in_memory();
2493    /// let session = db.session();
2494    ///
2495    /// // Create a node
2496    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2497    ///
2498    /// // Query nodes
2499    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2500    /// for row in result.rows() {
2501    ///     println!("{:?}", row);
2502    /// }
2503    /// # Ok(())
2504    /// # }
2505    /// ```
2506    #[cfg(feature = "gql")]
2507    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2508        self.require_lpg("GQL")?;
2509
2510        use crate::query::{
2511            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2512            processor::QueryLanguage, translators::gql,
2513        };
2514
2515        let _span = grafeo_info_span!(
2516            "grafeo::session::execute",
2517            language = "gql",
2518            query_len = query.len(),
2519        );
2520
2521        #[cfg(not(target_arch = "wasm32"))]
2522        let start_time = std::time::Instant::now();
2523
2524        // Parse and translate, checking for session/schema commands first
2525        let translation = gql::translate_full(query)?;
2526        let logical_plan = match translation {
2527            gql::GqlTranslationResult::SessionCommand(cmd) => {
2528                return self.execute_session_command(cmd);
2529            }
2530            #[cfg(feature = "lpg")]
2531            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2532                // All DDL requires Admin role
2533                self.require_permission(crate::auth::StatementKind::Admin)?;
2534                if *self.read_only_tx.lock() {
2535                    return Err(grafeo_common::utils::error::Error::Transaction(
2536                        grafeo_common::utils::error::TransactionError::ReadOnly,
2537                    ));
2538                }
2539                return self.execute_schema_command(cmd);
2540            }
2541            gql::GqlTranslationResult::Plan(plan) => {
2542                // Only walk the operator tree when it matters: non-admin
2543                // identities need permission checks, read-only transactions
2544                // need mutation blocking. Admin sessions in auto-commit mode
2545                // skip the tree walk entirely.
2546                let read_only = *self.read_only_tx.lock();
2547                let need_check = read_only || !self.identity.can_admin();
2548                let is_mutation = need_check && plan.root.has_mutations();
2549                if is_mutation {
2550                    self.require_permission(crate::auth::StatementKind::Write)?;
2551                }
2552                if read_only && is_mutation {
2553                    return Err(grafeo_common::utils::error::Error::Transaction(
2554                        grafeo_common::utils::error::TransactionError::ReadOnly,
2555                    ));
2556                }
2557                plan
2558            }
2559            #[cfg(not(feature = "lpg"))]
2560            gql::GqlTranslationResult::SchemaCommand(_) => {
2561                return Err(grafeo_common::utils::error::Error::Internal(
2562                    "Schema commands require the `lpg` feature".to_string(),
2563                ));
2564            }
2565        };
2566
2567        // Create cache key for this query
2568        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2569
2570        // Try to get cached optimized plan, or use the plan we just translated
2571        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2572            cached_plan
2573        } else {
2574            // Semantic validation
2575            let mut binder = Binder::new();
2576            let _binding_context = binder.bind(&logical_plan)?;
2577
2578            // Optimize the plan
2579            let active = self.active_store();
2580            let optimizer = Optimizer::from_graph_store(&*active);
2581            let plan = optimizer.optimize(logical_plan)?;
2582
2583            // Cache the optimized plan for future use
2584            self.query_cache.put_optimized(cache_key, plan.clone());
2585
2586            plan
2587        };
2588
2589        // Resolve the active store for query execution
2590        let active = self.active_store();
2591
2592        // EXPLAIN: annotate pushdown hints and return the plan tree
2593        if optimized_plan.explain {
2594            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2595            let mut plan = optimized_plan;
2596            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2597            return Ok(explain_result(&plan));
2598        }
2599
2600        // PROFILE: execute with per-operator instrumentation
2601        if optimized_plan.profile {
2602            let has_mutations = optimized_plan.root.has_mutations();
2603            return self.with_auto_commit(has_mutations, || {
2604                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2605                let planner = self.create_planner_for_store(
2606                    Arc::clone(&active),
2607                    viewing_epoch,
2608                    transaction_id,
2609                );
2610                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2611
2612                let executor = Executor::with_columns(physical_plan.columns.clone())
2613                    .with_deadline(self.query_deadline());
2614                let _result = executor.execute(physical_plan.operator.as_mut())?;
2615
2616                let total_time_ms;
2617                #[cfg(not(target_arch = "wasm32"))]
2618                {
2619                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2620                }
2621                #[cfg(target_arch = "wasm32")]
2622                {
2623                    total_time_ms = 0.0;
2624                }
2625
2626                let profile_tree = crate::query::profile::build_profile_tree(
2627                    &optimized_plan.root,
2628                    &mut entries.into_iter(),
2629                );
2630                Ok(crate::query::profile::profile_result(
2631                    &profile_tree,
2632                    total_time_ms,
2633                ))
2634            });
2635        }
2636
2637        let has_mutations = optimized_plan.root.has_mutations();
2638
2639        let result = self.with_auto_commit(has_mutations, || {
2640            // Get transaction context for MVCC visibility
2641            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2642
2643            // Convert to physical plan with transaction context
2644            // (Physical planning cannot be cached as it depends on transaction state)
2645            // Safe to use read-only fast path when: this query has no mutations AND
2646            // there is no active transaction that may have prior uncommitted writes.
2647            let has_active_tx = self.current_transaction.lock().is_some();
2648            let read_only = !has_mutations && !has_active_tx;
2649            let planner = self.create_planner_for_store_with_read_only(
2650                Arc::clone(&active),
2651                viewing_epoch,
2652                transaction_id,
2653                read_only,
2654            );
2655            let mut physical_plan = planner.plan(&optimized_plan)?;
2656
2657            // Execute the plan
2658            let executor = Executor::with_columns(physical_plan.columns.clone())
2659                .with_deadline(self.query_deadline());
2660            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2661
2662            // Add execution metrics
2663            let rows_scanned = result.rows.len() as u64;
2664            #[cfg(not(target_arch = "wasm32"))]
2665            {
2666                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2667                result.execution_time_ms = Some(elapsed_ms);
2668            }
2669            result.rows_scanned = Some(rows_scanned);
2670
2671            Ok(result)
2672        });
2673
2674        // Record metrics for this query execution.
2675        #[cfg(feature = "metrics")]
2676        {
2677            #[cfg(not(target_arch = "wasm32"))]
2678            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2679            #[cfg(target_arch = "wasm32")]
2680            let elapsed_ms = None;
2681            self.record_query_metrics("gql", elapsed_ms, &result);
2682        }
2683
2684        result
2685    }
2686
2687    /// Executes a GQL query with visibility at the specified epoch.
2688    ///
2689    /// This enables time-travel queries: the query sees the database
2690    /// as it existed at the given epoch.
2691    ///
2692    /// # Errors
2693    ///
2694    /// Returns an error if parsing or execution fails.
2695    #[cfg(feature = "gql")]
2696    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2697        let previous = self.viewing_epoch_override.lock().replace(epoch);
2698        let result = self.execute(query);
2699        *self.viewing_epoch_override.lock() = previous;
2700        result
2701    }
2702
2703    /// Executes a GQL query at a specific epoch with optional parameters.
2704    ///
2705    /// Combines epoch-based time travel with parameterized queries.
2706    ///
2707    /// # Errors
2708    ///
2709    /// Returns an error if parsing or execution fails.
2710    #[cfg(feature = "gql")]
2711    pub fn execute_at_epoch_with_params(
2712        &self,
2713        query: &str,
2714        epoch: EpochId,
2715        params: Option<std::collections::HashMap<String, Value>>,
2716    ) -> Result<QueryResult> {
2717        let previous = self.viewing_epoch_override.lock().replace(epoch);
2718        let result = if let Some(p) = params {
2719            self.execute_with_params(query, p)
2720        } else {
2721            self.execute(query)
2722        };
2723        *self.viewing_epoch_override.lock() = previous;
2724        result
2725    }
2726
2727    /// Executes a GQL query with parameters.
2728    ///
2729    /// # Errors
2730    ///
2731    /// Returns an error if the query fails to parse or execute.
2732    #[cfg(feature = "gql")]
2733    pub fn execute_with_params(
2734        &self,
2735        query: &str,
2736        params: std::collections::HashMap<String, Value>,
2737    ) -> Result<QueryResult> {
2738        self.require_lpg("GQL")?;
2739
2740        use crate::query::processor::{QueryLanguage, QueryProcessor};
2741
2742        // Reject writes if the identity lacks permission. Parse the query
2743        // to determine mutation status reliably (the text heuristic has false
2744        // negatives that could bypass authorization).
2745        let has_mutations = if self.identity.can_write() {
2746            // Fast path: identity can write, use heuristic for auto-commit only
2747            Self::query_looks_like_mutation(query)
2748        } else {
2749            // Restricted identity: parse to check mutations reliably
2750            use crate::query::translators::gql;
2751            match gql::translate(query) {
2752                Ok(plan) if plan.root.has_mutations() => {
2753                    self.require_permission(crate::auth::StatementKind::Write)?;
2754                    true
2755                }
2756                Ok(_) => false,
2757                // Parse error: let the processor handle it below
2758                Err(_) => Self::query_looks_like_mutation(query),
2759            }
2760        };
2761        let active = self.active_store();
2762
2763        self.with_auto_commit(has_mutations, || {
2764            // Get transaction context for MVCC visibility
2765            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2766
2767            // Create processor with transaction context
2768            let processor = QueryProcessor::for_stores_with_transaction(
2769                Arc::clone(&active),
2770                self.active_write_store(),
2771                Arc::clone(&self.transaction_manager),
2772            )?;
2773
2774            // Apply transaction context if in a transaction
2775            let processor = if let Some(transaction_id) = transaction_id {
2776                processor.with_transaction_context(viewing_epoch, transaction_id)
2777            } else {
2778                processor
2779            };
2780
2781            processor.process(query, QueryLanguage::Gql, Some(&params))
2782        })
2783    }
2784
2785    /// Executes a GQL query with parameters.
2786    ///
2787    /// # Errors
2788    ///
2789    /// Returns an error if no query language is enabled.
2790    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2791    pub fn execute_with_params(
2792        &self,
2793        _query: &str,
2794        _params: std::collections::HashMap<String, Value>,
2795    ) -> Result<QueryResult> {
2796        Err(grafeo_common::utils::error::Error::Internal(
2797            "No query language enabled".to_string(),
2798        ))
2799    }
2800
2801    /// Executes a GQL query.
2802    ///
2803    /// # Errors
2804    ///
2805    /// Returns an error if no query language is enabled.
2806    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2807    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2808        Err(grafeo_common::utils::error::Error::Internal(
2809            "No query language enabled".to_string(),
2810        ))
2811    }
2812
2813    /// Executes a Cypher query.
2814    ///
2815    /// # Errors
2816    ///
2817    /// Returns an error if the query fails to parse or execute.
2818    #[cfg(feature = "cypher")]
2819    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2820        use crate::query::{
2821            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2822            processor::QueryLanguage, translators::cypher,
2823        };
2824
2825        // Handle schema DDL and SHOW commands before the normal query path
2826        let translation = cypher::translate_full(query)?;
2827        match translation {
2828            #[cfg(feature = "lpg")]
2829            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2830                use grafeo_common::utils::error::{
2831                    Error as GrafeoError, QueryError, QueryErrorKind,
2832                };
2833                self.require_permission(crate::auth::StatementKind::Admin)?;
2834                if *self.read_only_tx.lock() {
2835                    return Err(GrafeoError::Query(QueryError::new(
2836                        QueryErrorKind::Semantic,
2837                        "Cannot execute schema DDL in a read-only transaction",
2838                    )));
2839                }
2840                return self.execute_schema_command(cmd);
2841            }
2842            #[cfg(not(feature = "lpg"))]
2843            cypher::CypherTranslationResult::SchemaCommand(_) => {
2844                return Err(grafeo_common::utils::error::Error::Internal(
2845                    "Schema DDL requires the `lpg` feature".to_string(),
2846                ));
2847            }
2848            cypher::CypherTranslationResult::ShowIndexes => {
2849                return self.execute_show_indexes();
2850            }
2851            cypher::CypherTranslationResult::ShowConstraints => {
2852                return self.execute_show_constraints();
2853            }
2854            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2855                return self.execute_show_current_graph_type();
2856            }
2857            cypher::CypherTranslationResult::Plan(_) => {
2858                // Fall through to normal execution below
2859            }
2860        }
2861
2862        #[cfg(not(target_arch = "wasm32"))]
2863        let start_time = std::time::Instant::now();
2864
2865        // Create cache key for this query
2866        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2867
2868        // Try to get cached optimized plan
2869        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2870            cached_plan
2871        } else {
2872            // Parse and translate the query to a logical plan
2873            let logical_plan = cypher::translate(query)?;
2874
2875            // Semantic validation
2876            let mut binder = Binder::new();
2877            let _binding_context = binder.bind(&logical_plan)?;
2878
2879            // Optimize the plan
2880            let active = self.active_store();
2881            let optimizer = Optimizer::from_graph_store(&*active);
2882            let plan = optimizer.optimize(logical_plan)?;
2883
2884            // Cache the optimized plan
2885            self.query_cache.put_optimized(cache_key, plan.clone());
2886
2887            plan
2888        };
2889
2890        // Check role-based permission for mutations
2891        if optimized_plan.root.has_mutations() {
2892            self.require_permission(crate::auth::StatementKind::Write)?;
2893        }
2894
2895        // Resolve the active store for query execution
2896        let active = self.active_store();
2897
2898        // EXPLAIN
2899        if optimized_plan.explain {
2900            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2901            let mut plan = optimized_plan;
2902            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2903            return Ok(explain_result(&plan));
2904        }
2905
2906        // PROFILE
2907        if optimized_plan.profile {
2908            let has_mutations = optimized_plan.root.has_mutations();
2909            return self.with_auto_commit(has_mutations, || {
2910                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2911                let planner = self.create_planner_for_store(
2912                    Arc::clone(&active),
2913                    viewing_epoch,
2914                    transaction_id,
2915                );
2916                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2917
2918                let executor = Executor::with_columns(physical_plan.columns.clone())
2919                    .with_deadline(self.query_deadline());
2920                let _result = executor.execute(physical_plan.operator.as_mut())?;
2921
2922                let total_time_ms;
2923                #[cfg(not(target_arch = "wasm32"))]
2924                {
2925                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2926                }
2927                #[cfg(target_arch = "wasm32")]
2928                {
2929                    total_time_ms = 0.0;
2930                }
2931
2932                let profile_tree = crate::query::profile::build_profile_tree(
2933                    &optimized_plan.root,
2934                    &mut entries.into_iter(),
2935                );
2936                Ok(crate::query::profile::profile_result(
2937                    &profile_tree,
2938                    total_time_ms,
2939                ))
2940            });
2941        }
2942
2943        let has_mutations = optimized_plan.root.has_mutations();
2944
2945        let result = self.with_auto_commit(has_mutations, || {
2946            // Get transaction context for MVCC visibility
2947            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2948
2949            // Convert to physical plan with transaction context
2950            let planner =
2951                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2952            let mut physical_plan = planner.plan(&optimized_plan)?;
2953
2954            // Execute the plan
2955            let executor = Executor::with_columns(physical_plan.columns.clone())
2956                .with_deadline(self.query_deadline());
2957            executor.execute(physical_plan.operator.as_mut())
2958        });
2959
2960        #[cfg(feature = "metrics")]
2961        {
2962            #[cfg(not(target_arch = "wasm32"))]
2963            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2964            #[cfg(target_arch = "wasm32")]
2965            let elapsed_ms = None;
2966            self.record_query_metrics("cypher", elapsed_ms, &result);
2967        }
2968
2969        result
2970    }
2971
2972    /// Executes a Gremlin query.
2973    ///
2974    /// # Errors
2975    ///
2976    /// Returns an error if the query fails to parse or execute.
2977    ///
2978    /// # Examples
2979    ///
2980    /// ```no_run
2981    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2982    /// use grafeo_engine::GrafeoDB;
2983    ///
2984    /// let db = GrafeoDB::new_in_memory();
2985    /// let session = db.session();
2986    ///
2987    /// // Create some nodes first
2988    /// session.create_node(&["Person"]);
2989    ///
2990    /// // Query using Gremlin
2991    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2992    /// # Ok(())
2993    /// # }
2994    /// ```
2995    #[cfg(feature = "gremlin")]
2996    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2997        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2998
2999        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3000        let start_time = Instant::now();
3001
3002        // Parse and translate the query to a logical plan
3003        let logical_plan = gremlin::translate(query)?;
3004
3005        // Semantic validation
3006        let mut binder = Binder::new();
3007        let _binding_context = binder.bind(&logical_plan)?;
3008
3009        // Optimize the plan
3010        let active = self.active_store();
3011        let optimizer = Optimizer::from_graph_store(&*active);
3012        let optimized_plan = optimizer.optimize(logical_plan)?;
3013
3014        let has_mutations = optimized_plan.root.has_mutations();
3015        if has_mutations {
3016            self.require_permission(crate::auth::StatementKind::Write)?;
3017        }
3018
3019        let result = self.with_auto_commit(has_mutations, || {
3020            // Get transaction context for MVCC visibility
3021            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3022
3023            // Convert to physical plan with transaction context
3024            let planner =
3025                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3026            let mut physical_plan = planner.plan(&optimized_plan)?;
3027
3028            // Execute the plan
3029            let executor = Executor::with_columns(physical_plan.columns.clone())
3030                .with_deadline(self.query_deadline());
3031            executor.execute(physical_plan.operator.as_mut())
3032        });
3033
3034        #[cfg(feature = "metrics")]
3035        {
3036            #[cfg(not(target_arch = "wasm32"))]
3037            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3038            #[cfg(target_arch = "wasm32")]
3039            let elapsed_ms = None;
3040            self.record_query_metrics("gremlin", elapsed_ms, &result);
3041        }
3042
3043        result
3044    }
3045
3046    /// Executes a Gremlin query with parameters.
3047    ///
3048    /// # Errors
3049    ///
3050    /// Returns an error if the query fails to parse or execute.
3051    #[cfg(feature = "gremlin")]
3052    pub fn execute_gremlin_with_params(
3053        &self,
3054        query: &str,
3055        params: std::collections::HashMap<String, Value>,
3056    ) -> Result<QueryResult> {
3057        use crate::query::{
3058            Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3059            translators::gremlin,
3060        };
3061
3062        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3063        let start_time = Instant::now();
3064
3065        // Parse and translate the query to a logical plan
3066        let mut logical_plan = gremlin::translate(query)?;
3067
3068        // Substitute parameters
3069        substitute_params(&mut logical_plan, &params)?;
3070
3071        // Semantic validation
3072        let mut binder = Binder::new();
3073        let _binding_context = binder.bind(&logical_plan)?;
3074
3075        // Optimize the plan
3076        let active = self.active_store();
3077        let optimizer = Optimizer::from_graph_store(&*active);
3078        let optimized_plan = optimizer.optimize(logical_plan)?;
3079
3080        let has_mutations = optimized_plan.root.has_mutations();
3081        if has_mutations {
3082            self.require_permission(crate::auth::StatementKind::Write)?;
3083        }
3084
3085        let result = self.with_auto_commit(has_mutations, || {
3086            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3087            let planner =
3088                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3089            let mut physical_plan = planner.plan(&optimized_plan)?;
3090            let executor = Executor::with_columns(physical_plan.columns.clone())
3091                .with_deadline(self.query_deadline());
3092            executor.execute(physical_plan.operator.as_mut())
3093        });
3094
3095        #[cfg(feature = "metrics")]
3096        {
3097            #[cfg(not(target_arch = "wasm32"))]
3098            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3099            #[cfg(target_arch = "wasm32")]
3100            let elapsed_ms = None;
3101            self.record_query_metrics("gremlin", elapsed_ms, &result);
3102        }
3103
3104        result
3105    }
3106
3107    /// Executes a GraphQL query against the LPG store.
3108    ///
3109    /// # Errors
3110    ///
3111    /// Returns an error if the query fails to parse or execute.
3112    ///
3113    /// # Examples
3114    ///
3115    /// ```no_run
3116    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3117    /// use grafeo_engine::GrafeoDB;
3118    ///
3119    /// let db = GrafeoDB::new_in_memory();
3120    /// let session = db.session();
3121    ///
3122    /// // Create some nodes first
3123    /// session.create_node(&["User"]);
3124    ///
3125    /// // Query using GraphQL
3126    /// let result = session.execute_graphql("query { user { id name } }")?;
3127    /// # Ok(())
3128    /// # }
3129    /// ```
3130    #[cfg(feature = "graphql")]
3131    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3132        use crate::query::{
3133            Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3134            translators::graphql,
3135        };
3136
3137        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3138        let start_time = Instant::now();
3139
3140        let mut logical_plan = graphql::translate(query)?;
3141
3142        // Substitute default parameter values from variable declarations
3143        if !logical_plan.default_params.is_empty() {
3144            let defaults = logical_plan.default_params.clone();
3145            substitute_params(&mut logical_plan, &defaults)?;
3146        }
3147
3148        let mut binder = Binder::new();
3149        let _binding_context = binder.bind(&logical_plan)?;
3150
3151        let active = self.active_store();
3152        let optimizer = Optimizer::from_graph_store(&*active);
3153        let optimized_plan = optimizer.optimize(logical_plan)?;
3154        let has_mutations = optimized_plan.root.has_mutations();
3155        if has_mutations {
3156            self.require_permission(crate::auth::StatementKind::Write)?;
3157        }
3158
3159        let result = self.with_auto_commit(has_mutations, || {
3160            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3161            let planner =
3162                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3163            let mut physical_plan = planner.plan(&optimized_plan)?;
3164            let executor = Executor::with_columns(physical_plan.columns.clone())
3165                .with_deadline(self.query_deadline());
3166            executor.execute(physical_plan.operator.as_mut())
3167        });
3168
3169        #[cfg(feature = "metrics")]
3170        {
3171            #[cfg(not(target_arch = "wasm32"))]
3172            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3173            #[cfg(target_arch = "wasm32")]
3174            let elapsed_ms = None;
3175            self.record_query_metrics("graphql", elapsed_ms, &result);
3176        }
3177
3178        result
3179    }
3180
3181    /// Executes a GraphQL query with parameters.
3182    ///
3183    /// # Errors
3184    ///
3185    /// Returns an error if the query fails to parse or execute.
3186    #[cfg(feature = "graphql")]
3187    pub fn execute_graphql_with_params(
3188        &self,
3189        query: &str,
3190        params: std::collections::HashMap<String, Value>,
3191    ) -> Result<QueryResult> {
3192        use crate::query::{
3193            Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
3194            translators::graphql,
3195        };
3196
3197        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3198        let start_time = Instant::now();
3199
3200        // Parse and translate the query to a logical plan
3201        let mut logical_plan = graphql::translate(query)?;
3202
3203        // Merge default params with caller-supplied params
3204        if !logical_plan.default_params.is_empty() {
3205            let mut merged = logical_plan.default_params.clone();
3206            merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3207            substitute_params(&mut logical_plan, &merged)?;
3208        } else {
3209            substitute_params(&mut logical_plan, &params)?;
3210        }
3211
3212        // Semantic validation
3213        let mut binder = Binder::new();
3214        let _binding_context = binder.bind(&logical_plan)?;
3215
3216        // Optimize the plan
3217        let active = self.active_store();
3218        let optimizer = Optimizer::from_graph_store(&*active);
3219        let optimized_plan = optimizer.optimize(logical_plan)?;
3220
3221        let has_mutations = optimized_plan.root.has_mutations();
3222        if has_mutations {
3223            self.require_permission(crate::auth::StatementKind::Write)?;
3224        }
3225
3226        let result = self.with_auto_commit(has_mutations, || {
3227            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3228            let planner =
3229                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3230            let mut physical_plan = planner.plan(&optimized_plan)?;
3231            let executor = Executor::with_columns(physical_plan.columns.clone())
3232                .with_deadline(self.query_deadline());
3233            executor.execute(physical_plan.operator.as_mut())
3234        });
3235
3236        #[cfg(feature = "metrics")]
3237        {
3238            #[cfg(not(target_arch = "wasm32"))]
3239            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3240            #[cfg(target_arch = "wasm32")]
3241            let elapsed_ms = None;
3242            self.record_query_metrics("graphql", elapsed_ms, &result);
3243        }
3244
3245        result
3246    }
3247
3248    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
3249    ///
3250    /// # Errors
3251    ///
3252    /// Returns an error if the query fails to parse or execute.
3253    ///
3254    /// # Examples
3255    ///
3256    /// ```no_run
3257    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3258    /// use grafeo_engine::GrafeoDB;
3259    ///
3260    /// let db = GrafeoDB::new_in_memory();
3261    /// let session = db.session();
3262    ///
3263    /// let result = session.execute_sql(
3264    ///     "SELECT * FROM GRAPH_TABLE (
3265    ///         MATCH (n:Person)
3266    ///         COLUMNS (n.name AS name)
3267    ///     )"
3268    /// )?;
3269    /// # Ok(())
3270    /// # }
3271    /// ```
3272    #[cfg(feature = "sql-pgq")]
3273    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3274        use crate::query::{
3275            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3276            processor::QueryLanguage, translators::sql_pgq,
3277        };
3278
3279        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3280        let start_time = Instant::now();
3281
3282        // Parse and translate (always needed to check for DDL)
3283        let logical_plan = sql_pgq::translate(query)?;
3284
3285        // Handle DDL statements directly (they don't go through the query pipeline)
3286        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3287            self.require_permission(crate::auth::StatementKind::Admin)?;
3288            return Ok(QueryResult {
3289                columns: vec!["status".into()],
3290                column_types: vec![grafeo_common::types::LogicalType::String],
3291                rows: vec![vec![Value::from(format!(
3292                    "Property graph '{}' created ({} node tables, {} edge tables)",
3293                    cpg.name,
3294                    cpg.node_tables.len(),
3295                    cpg.edge_tables.len()
3296                ))]],
3297                execution_time_ms: None,
3298                rows_scanned: None,
3299                status_message: None,
3300                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3301            });
3302        }
3303
3304        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3305
3306        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3307            cached_plan
3308        } else {
3309            let mut binder = Binder::new();
3310            let _binding_context = binder.bind(&logical_plan)?;
3311            let active = self.active_store();
3312            let optimizer = Optimizer::from_graph_store(&*active);
3313            let plan = optimizer.optimize(logical_plan)?;
3314            self.query_cache.put_optimized(cache_key, plan.clone());
3315            plan
3316        };
3317
3318        let active = self.active_store();
3319        let has_mutations = optimized_plan.root.has_mutations();
3320        if has_mutations {
3321            self.require_permission(crate::auth::StatementKind::Write)?;
3322        }
3323
3324        let result = self.with_auto_commit(has_mutations, || {
3325            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3326            let planner =
3327                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3328            let mut physical_plan = planner.plan(&optimized_plan)?;
3329            let executor = Executor::with_columns(physical_plan.columns.clone())
3330                .with_deadline(self.query_deadline());
3331            executor.execute(physical_plan.operator.as_mut())
3332        });
3333
3334        #[cfg(feature = "metrics")]
3335        {
3336            #[cfg(not(target_arch = "wasm32"))]
3337            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3338            #[cfg(target_arch = "wasm32")]
3339            let elapsed_ms = None;
3340            self.record_query_metrics("sql", elapsed_ms, &result);
3341        }
3342
3343        result
3344    }
3345
3346    /// Executes a SQL/PGQ query with parameters.
3347    ///
3348    /// # Errors
3349    ///
3350    /// Returns an error if the query fails to parse or execute.
3351    #[cfg(feature = "sql-pgq")]
3352    pub fn execute_sql_with_params(
3353        &self,
3354        query: &str,
3355        params: std::collections::HashMap<String, Value>,
3356    ) -> Result<QueryResult> {
3357        use crate::query::processor::{QueryLanguage, QueryProcessor};
3358
3359        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3360        let start_time = Instant::now();
3361
3362        let has_mutations = if self.identity.can_write() {
3363            Self::query_looks_like_mutation(query)
3364        } else {
3365            use crate::query::translators::sql_pgq;
3366            match sql_pgq::translate(query) {
3367                Ok(plan) if plan.root.has_mutations() => {
3368                    self.require_permission(crate::auth::StatementKind::Write)?;
3369                    true
3370                }
3371                Ok(_) => false,
3372                Err(_) => Self::query_looks_like_mutation(query),
3373            }
3374        };
3375        if has_mutations {
3376            self.require_permission(crate::auth::StatementKind::Write)?;
3377        }
3378        let active = self.active_store();
3379
3380        let result = self.with_auto_commit(has_mutations, || {
3381            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3382            let processor = QueryProcessor::for_stores_with_transaction(
3383                Arc::clone(&active),
3384                self.active_write_store(),
3385                Arc::clone(&self.transaction_manager),
3386            )?;
3387            let processor = if let Some(transaction_id) = transaction_id {
3388                processor.with_transaction_context(viewing_epoch, transaction_id)
3389            } else {
3390                processor
3391            };
3392            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
3393        });
3394
3395        #[cfg(feature = "metrics")]
3396        {
3397            #[cfg(not(target_arch = "wasm32"))]
3398            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3399            #[cfg(target_arch = "wasm32")]
3400            let elapsed_ms = None;
3401            self.record_query_metrics("sql", elapsed_ms, &result);
3402        }
3403
3404        result
3405    }
3406
3407    /// Executes a query in the specified language by name.
3408    ///
3409    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
3410    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
3411    ///
3412    /// # Errors
3413    ///
3414    /// Returns an error if the language is unknown/disabled or the query fails.
3415    pub fn execute_language(
3416        &self,
3417        query: &str,
3418        language: &str,
3419        params: Option<std::collections::HashMap<String, Value>>,
3420    ) -> Result<QueryResult> {
3421        let _span = grafeo_info_span!(
3422            "grafeo::session::execute",
3423            language,
3424            query_len = query.len(),
3425        );
3426        match language {
3427            "gql" => {
3428                if let Some(p) = params {
3429                    self.execute_with_params(query, p)
3430                } else {
3431                    self.execute(query)
3432                }
3433            }
3434            #[cfg(feature = "cypher")]
3435            "cypher" => {
3436                if let Some(p) = params {
3437                    use crate::query::processor::{QueryLanguage, QueryProcessor};
3438
3439                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3440                    let start_time = Instant::now();
3441
3442                    let has_mutations = if self.identity.can_write() {
3443                        Self::query_looks_like_mutation(query)
3444                    } else {
3445                        use crate::query::translators::cypher;
3446                        match cypher::translate(query) {
3447                            Ok(plan) if plan.root.has_mutations() => {
3448                                self.require_permission(crate::auth::StatementKind::Write)?;
3449                                true
3450                            }
3451                            Ok(_) => false,
3452                            Err(_) => Self::query_looks_like_mutation(query),
3453                        }
3454                    };
3455                    let active = self.active_store();
3456                    let result = self.with_auto_commit(has_mutations, || {
3457                        let processor = QueryProcessor::for_stores_with_transaction(
3458                            Arc::clone(&active),
3459                            self.active_write_store(),
3460                            Arc::clone(&self.transaction_manager),
3461                        )?;
3462                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3463                        let processor = if let Some(transaction_id) = transaction_id {
3464                            processor.with_transaction_context(viewing_epoch, transaction_id)
3465                        } else {
3466                            processor
3467                        };
3468                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3469                    });
3470
3471                    #[cfg(feature = "metrics")]
3472                    {
3473                        #[cfg(not(target_arch = "wasm32"))]
3474                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3475                        #[cfg(target_arch = "wasm32")]
3476                        let elapsed_ms = None;
3477                        self.record_query_metrics("cypher", elapsed_ms, &result);
3478                    }
3479
3480                    result
3481                } else {
3482                    self.execute_cypher(query)
3483                }
3484            }
3485            #[cfg(feature = "gremlin")]
3486            "gremlin" => {
3487                if let Some(p) = params {
3488                    self.execute_gremlin_with_params(query, p)
3489                } else {
3490                    self.execute_gremlin(query)
3491                }
3492            }
3493            #[cfg(feature = "graphql")]
3494            "graphql" => {
3495                if let Some(p) = params {
3496                    self.execute_graphql_with_params(query, p)
3497                } else {
3498                    self.execute_graphql(query)
3499                }
3500            }
3501            #[cfg(all(feature = "graphql", feature = "triple-store"))]
3502            "graphql-rdf" => {
3503                if let Some(p) = params {
3504                    self.execute_graphql_rdf_with_params(query, p)
3505                } else {
3506                    self.execute_graphql_rdf(query)
3507                }
3508            }
3509            #[cfg(feature = "sql-pgq")]
3510            "sql" | "sql-pgq" => {
3511                if let Some(p) = params {
3512                    self.execute_sql_with_params(query, p)
3513                } else {
3514                    self.execute_sql(query)
3515                }
3516            }
3517            #[cfg(all(feature = "sparql", feature = "triple-store"))]
3518            "sparql" => {
3519                if let Some(p) = params {
3520                    self.execute_sparql_with_params(query, p)
3521                } else {
3522                    self.execute_sparql(query)
3523                }
3524            }
3525            other => Err(grafeo_common::utils::error::Error::Query(
3526                grafeo_common::utils::error::QueryError::new(
3527                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3528                    format!("Unknown query language: '{other}'"),
3529                ),
3530            )),
3531        }
3532    }
3533
3534    /// Begins a new transaction.
3535    ///
3536    /// # Errors
3537    ///
3538    /// Returns an error if a transaction is already active.
3539    ///
3540    /// # Examples
3541    ///
3542    /// ```no_run
3543    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3544    /// use grafeo_engine::GrafeoDB;
3545    ///
3546    /// let db = GrafeoDB::new_in_memory();
3547    /// let mut session = db.session();
3548    ///
3549    /// session.begin_transaction()?;
3550    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3551    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3552    /// session.commit()?; // Both inserts committed atomically
3553    /// # Ok(())
3554    /// # }
3555    /// ```
3556    /// Clears all cached query plans.
3557    ///
3558    /// The plan cache is shared across all sessions on the same database,
3559    /// so clearing from one session affects all sessions.
3560    pub fn clear_plan_cache(&self) {
3561        self.query_cache.clear();
3562    }
3563
3564    /// Begins a new transaction on this session.
3565    ///
3566    /// Uses the default isolation level (`SnapshotIsolation`).
3567    ///
3568    /// # Errors
3569    ///
3570    /// Returns an error if a transaction is already active.
3571    #[cfg(feature = "lpg")]
3572    pub fn begin_transaction(&mut self) -> Result<()> {
3573        self.begin_transaction_inner(false, None)
3574    }
3575
3576    /// Begins a transaction with a specific isolation level.
3577    ///
3578    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3579    ///
3580    /// # Errors
3581    ///
3582    /// Returns an error if a transaction is already active.
3583    #[cfg(feature = "lpg")]
3584    pub fn begin_transaction_with_isolation(
3585        &mut self,
3586        isolation_level: crate::transaction::IsolationLevel,
3587    ) -> Result<()> {
3588        self.begin_transaction_inner(false, Some(isolation_level))
3589    }
3590
3591    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3592    #[cfg(feature = "lpg")]
3593    fn begin_transaction_inner(
3594        &self,
3595        read_only: bool,
3596        isolation_level: Option<crate::transaction::IsolationLevel>,
3597    ) -> Result<()> {
3598        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3599        let mut current = self.current_transaction.lock();
3600        if current.is_some() {
3601            // Nested transaction: create an auto-savepoint instead of a new tx.
3602            drop(current);
3603            let mut depth = self.transaction_nesting_depth.lock();
3604            *depth += 1;
3605            let sp_name = format!("_nested_tx_{}", *depth);
3606            self.savepoint(&sp_name)?;
3607            return Ok(());
3608        }
3609
3610        let active = self.active_lpg_store();
3611        self.transaction_start_node_count
3612            .store(active.node_count(), Ordering::Relaxed);
3613        self.transaction_start_edge_count
3614            .store(active.edge_count(), Ordering::Relaxed);
3615        let transaction_id = if let Some(level) = isolation_level {
3616            self.transaction_manager.begin_with_isolation(level)
3617        } else {
3618            self.transaction_manager.begin()
3619        };
3620        *current = Some(transaction_id);
3621        *self.read_only_tx.lock() = read_only || self.db_read_only;
3622
3623        // Record the initial graph as "touched" for cross-graph atomicity.
3624        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3625        let key = self.active_graph_storage_key();
3626        let mut touched = self.touched_graphs.lock();
3627        touched.clear();
3628        touched.push(key);
3629
3630        #[cfg(feature = "metrics")]
3631        {
3632            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3633            #[cfg(not(target_arch = "wasm32"))]
3634            {
3635                *self.tx_start_time.lock() = Some(Instant::now());
3636            }
3637        }
3638
3639        Ok(())
3640    }
3641
3642    /// Commits the current transaction.
3643    ///
3644    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3645    ///
3646    /// # Errors
3647    ///
3648    /// Returns an error if no transaction is active.
3649    #[cfg(feature = "lpg")]
3650    pub fn commit(&mut self) -> Result<()> {
3651        self.commit_inner()
3652    }
3653
3654    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3655    #[cfg(feature = "lpg")]
3656    fn commit_inner(&self) -> Result<()> {
3657        let _span = grafeo_debug_span!("grafeo::tx::commit");
3658        // Nested transaction: release the auto-savepoint (changes are preserved).
3659        {
3660            let mut depth = self.transaction_nesting_depth.lock();
3661            if *depth > 0 {
3662                let sp_name = format!("_nested_tx_{depth}");
3663                *depth -= 1;
3664                drop(depth);
3665                return self.release_savepoint(&sp_name);
3666            }
3667        }
3668
3669        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3670            grafeo_common::utils::error::Error::Transaction(
3671                grafeo_common::utils::error::TransactionError::InvalidState(
3672                    "No active transaction".to_string(),
3673                ),
3674            )
3675        })?;
3676
3677        // Validate the transaction first (conflict detection) before committing data.
3678        // If this fails, we rollback the data changes instead of making them permanent.
3679        let touched = self.touched_graphs.lock().clone();
3680        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3681            Ok(epoch) => epoch,
3682            Err(e) => {
3683                // Conflict detected: rollback the data changes
3684                for graph_name in &touched {
3685                    let store = self.resolve_store(graph_name);
3686                    store.rollback_transaction_properties(transaction_id);
3687                }
3688                #[cfg(feature = "triple-store")]
3689                self.rollback_rdf_transaction(transaction_id);
3690                // Discard buffered CDC events on conflict rollback
3691                #[cfg(feature = "cdc")]
3692                if let Some(ref pending) = self.cdc_pending_events {
3693                    pending.lock().clear();
3694                }
3695                *self.read_only_tx.lock() = self.db_read_only;
3696                self.savepoints.lock().clear();
3697                self.touched_graphs.lock().clear();
3698                #[cfg(feature = "metrics")]
3699                {
3700                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3701                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3702                    #[cfg(not(target_arch = "wasm32"))]
3703                    if let Some(start) = self.tx_start_time.lock().take() {
3704                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3705                        crate::metrics::record_metric!(
3706                            self.metrics,
3707                            tx_duration,
3708                            observe duration_ms
3709                        );
3710                    }
3711                }
3712                return Err(e);
3713            }
3714        };
3715
3716        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3717        for graph_name in &touched {
3718            let store = self.resolve_store(graph_name);
3719            store.finalize_version_epochs(transaction_id, commit_epoch);
3720        }
3721
3722        // Commit succeeded: discard undo logs (make changes permanent)
3723        #[cfg(feature = "triple-store")]
3724        self.commit_rdf_transaction(transaction_id);
3725
3726        for graph_name in &touched {
3727            let store = self.resolve_store(graph_name);
3728            store.commit_transaction_properties(transaction_id);
3729        }
3730
3731        // Flush buffered CDC events now that the transaction is committed.
3732        // All buffered events have PENDING epoch; assign the real commit_epoch.
3733        // Uses record_batch to acquire the write lock once per commit.
3734        #[cfg(feature = "cdc")]
3735        if let Some(ref pending) = self.cdc_pending_events {
3736            let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3737            self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3738                e.epoch = commit_epoch;
3739                e
3740            }));
3741        }
3742
3743        // Log transaction commit and epoch advance to WAL so that crash
3744        // recovery can identify committed transactions and their epoch
3745        // boundaries. Without these markers, WAL recovery discards all
3746        // records as uncommitted (fixes #252 for the crash scenario).
3747        #[cfg(feature = "wal")]
3748        if let Some(ref wal) = self.wal {
3749            use grafeo_storage::wal::WalRecord;
3750            if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
3751                grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
3752            }
3753            if let Err(e) = wal.log(&WalRecord::EpochAdvance {
3754                epoch: commit_epoch,
3755            }) {
3756                grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
3757            }
3758        }
3759
3760        // Sync epoch for all touched graphs so that convenience lookups
3761        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3762        let current_epoch = self.transaction_manager.current_epoch();
3763        for graph_name in &touched {
3764            let store = self.resolve_store(graph_name);
3765            store.sync_epoch(current_epoch);
3766        }
3767
3768        // Reset read-only flag, clear savepoints and touched graphs
3769        *self.read_only_tx.lock() = self.db_read_only;
3770        self.savepoints.lock().clear();
3771        self.touched_graphs.lock().clear();
3772
3773        // Auto-GC: periodically prune old MVCC versions
3774        if self.gc_interval > 0 {
3775            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3776            if count.is_multiple_of(self.gc_interval) {
3777                let min_epoch = self.transaction_manager.min_active_epoch();
3778                for graph_name in &touched {
3779                    let store = self.resolve_store(graph_name);
3780                    store.gc_versions(min_epoch);
3781                }
3782                self.transaction_manager.gc();
3783                #[cfg(feature = "metrics")]
3784                crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3785            }
3786        }
3787
3788        #[cfg(feature = "metrics")]
3789        {
3790            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3791            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3792            #[cfg(not(target_arch = "wasm32"))]
3793            if let Some(start) = self.tx_start_time.lock().take() {
3794                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3795                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3796            }
3797        }
3798
3799        Ok(())
3800    }
3801
3802    /// Aborts the current transaction.
3803    ///
3804    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3805    ///
3806    /// # Errors
3807    ///
3808    /// Returns an error if no transaction is active.
3809    ///
3810    /// # Examples
3811    ///
3812    /// ```no_run
3813    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3814    /// use grafeo_engine::GrafeoDB;
3815    ///
3816    /// let db = GrafeoDB::new_in_memory();
3817    /// let mut session = db.session();
3818    ///
3819    /// session.begin_transaction()?;
3820    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3821    /// session.rollback()?; // Insert is discarded
3822    /// # Ok(())
3823    /// # }
3824    /// ```
3825    #[cfg(feature = "lpg")]
3826    pub fn rollback(&mut self) -> Result<()> {
3827        self.rollback_inner()
3828    }
3829
3830    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3831    #[cfg(feature = "lpg")]
3832    fn rollback_inner(&self) -> Result<()> {
3833        let _span = grafeo_debug_span!("grafeo::tx::rollback");
3834        // Nested transaction: rollback to the auto-savepoint.
3835        {
3836            let mut depth = self.transaction_nesting_depth.lock();
3837            if *depth > 0 {
3838                let sp_name = format!("_nested_tx_{depth}");
3839                *depth -= 1;
3840                drop(depth);
3841                return self.rollback_to_savepoint(&sp_name);
3842            }
3843        }
3844
3845        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3846            grafeo_common::utils::error::Error::Transaction(
3847                grafeo_common::utils::error::TransactionError::InvalidState(
3848                    "No active transaction".to_string(),
3849                ),
3850            )
3851        })?;
3852
3853        // Reset read-only flag
3854        *self.read_only_tx.lock() = self.db_read_only;
3855
3856        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3857        let touched = self.touched_graphs.lock().clone();
3858        for graph_name in &touched {
3859            let store = self.resolve_store(graph_name);
3860            store.discard_uncommitted_versions(transaction_id);
3861        }
3862
3863        // Discard pending operations in the RDF store
3864        #[cfg(feature = "triple-store")]
3865        self.rollback_rdf_transaction(transaction_id);
3866
3867        // Discard buffered CDC events on rollback
3868        #[cfg(feature = "cdc")]
3869        if let Some(ref pending) = self.cdc_pending_events {
3870            pending.lock().clear();
3871        }
3872
3873        // Clear savepoints and touched graphs
3874        self.savepoints.lock().clear();
3875        self.touched_graphs.lock().clear();
3876
3877        // Mark transaction as aborted in the manager
3878        let result = self.transaction_manager.abort(transaction_id);
3879
3880        #[cfg(feature = "metrics")]
3881        if result.is_ok() {
3882            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3883            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3884            #[cfg(not(target_arch = "wasm32"))]
3885            if let Some(start) = self.tx_start_time.lock().take() {
3886                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3887                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3888            }
3889        }
3890
3891        result
3892    }
3893
3894    /// Creates a named savepoint within the current transaction.
3895    ///
3896    /// The savepoint captures the current node/edge ID counters so that
3897    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3898    /// entities created after this point.
3899    ///
3900    /// # Errors
3901    ///
3902    /// Returns an error if no transaction is active.
3903    #[cfg(feature = "lpg")]
3904    pub fn savepoint(&self, name: &str) -> Result<()> {
3905        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3906            grafeo_common::utils::error::Error::Transaction(
3907                grafeo_common::utils::error::TransactionError::InvalidState(
3908                    "No active transaction".to_string(),
3909                ),
3910            )
3911        })?;
3912
3913        // Capture state for every graph touched so far.
3914        let touched = self.touched_graphs.lock().clone();
3915        let graph_snapshots: Vec<GraphSavepoint> = touched
3916            .iter()
3917            .map(|graph_name| {
3918                let store = self.resolve_store(graph_name);
3919                GraphSavepoint {
3920                    graph_name: graph_name.clone(),
3921                    next_node_id: store.peek_next_node_id(),
3922                    next_edge_id: store.peek_next_edge_id(),
3923                    undo_log_position: store.property_undo_log_position(tx_id),
3924                }
3925            })
3926            .collect();
3927
3928        self.savepoints.lock().push(SavepointState {
3929            name: name.to_string(),
3930            graph_snapshots,
3931            active_graph: self.current_graph.lock().clone(),
3932            #[cfg(feature = "cdc")]
3933            cdc_event_position: self
3934                .cdc_pending_events
3935                .as_ref()
3936                .map_or(0, |p| p.lock().len()),
3937        });
3938        Ok(())
3939    }
3940
3941    /// Rolls back to a named savepoint, undoing all writes made after it.
3942    ///
3943    /// The savepoint and any savepoints created after it are removed.
3944    /// Entities with IDs >= the savepoint snapshot are discarded.
3945    ///
3946    /// # Errors
3947    ///
3948    /// Returns an error if no transaction is active or the savepoint does not exist.
3949    #[cfg(feature = "lpg")]
3950    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3951        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3952            grafeo_common::utils::error::Error::Transaction(
3953                grafeo_common::utils::error::TransactionError::InvalidState(
3954                    "No active transaction".to_string(),
3955                ),
3956            )
3957        })?;
3958
3959        let mut savepoints = self.savepoints.lock();
3960
3961        // Find the savepoint by name (search from the end for nested savepoints)
3962        let pos = savepoints
3963            .iter()
3964            .rposition(|sp| sp.name == name)
3965            .ok_or_else(|| {
3966                grafeo_common::utils::error::Error::Transaction(
3967                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3968                        "Savepoint '{name}' not found"
3969                    )),
3970                )
3971            })?;
3972
3973        let sp_state = savepoints[pos].clone();
3974
3975        // Remove this savepoint and all later ones
3976        savepoints.truncate(pos);
3977        drop(savepoints);
3978
3979        // Roll back each graph that was captured in the savepoint.
3980        for gs in &sp_state.graph_snapshots {
3981            let store = self.resolve_store(&gs.graph_name);
3982
3983            // Replay property/label undo entries recorded after the savepoint
3984            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3985
3986            // Discard entities created after the savepoint
3987            let current_next_node = store.peek_next_node_id();
3988            let current_next_edge = store.peek_next_edge_id();
3989
3990            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3991                .map(NodeId::new)
3992                .collect();
3993            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3994                .map(EdgeId::new)
3995                .collect();
3996
3997            if !node_ids.is_empty() || !edge_ids.is_empty() {
3998                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3999            }
4000        }
4001
4002        // Also roll back any graphs that were touched AFTER the savepoint
4003        // but not captured in it. These need full discard since the savepoint
4004        // didn't include them.
4005        let touched = self.touched_graphs.lock().clone();
4006        for graph_name in &touched {
4007            let already_captured = sp_state
4008                .graph_snapshots
4009                .iter()
4010                .any(|gs| gs.graph_name == *graph_name);
4011            if !already_captured {
4012                let store = self.resolve_store(graph_name);
4013                store.discard_uncommitted_versions(transaction_id);
4014            }
4015        }
4016
4017        // Truncate CDC event buffer to the savepoint position.
4018        #[cfg(feature = "cdc")]
4019        if let Some(ref pending) = self.cdc_pending_events {
4020            pending.lock().truncate(sp_state.cdc_event_position);
4021        }
4022
4023        // Restore touched_graphs to only the graphs that were known at savepoint time.
4024        let mut touched = self.touched_graphs.lock();
4025        touched.clear();
4026        for gs in &sp_state.graph_snapshots {
4027            if !touched.contains(&gs.graph_name) {
4028                touched.push(gs.graph_name.clone());
4029            }
4030        }
4031
4032        Ok(())
4033    }
4034
4035    /// Releases (removes) a named savepoint without rolling back.
4036    ///
4037    /// # Errors
4038    ///
4039    /// Returns an error if no transaction is active or the savepoint does not exist.
4040    pub fn release_savepoint(&self, name: &str) -> Result<()> {
4041        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4042            grafeo_common::utils::error::Error::Transaction(
4043                grafeo_common::utils::error::TransactionError::InvalidState(
4044                    "No active transaction".to_string(),
4045                ),
4046            )
4047        })?;
4048
4049        let mut savepoints = self.savepoints.lock();
4050        let pos = savepoints
4051            .iter()
4052            .rposition(|sp| sp.name == name)
4053            .ok_or_else(|| {
4054                grafeo_common::utils::error::Error::Transaction(
4055                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
4056                        "Savepoint '{name}' not found"
4057                    )),
4058                )
4059            })?;
4060        savepoints.remove(pos);
4061        Ok(())
4062    }
4063
4064    /// Returns whether a transaction is active.
4065    #[must_use]
4066    pub fn in_transaction(&self) -> bool {
4067        self.current_transaction.lock().is_some()
4068    }
4069
4070    /// Returns the current transaction ID, if any.
4071    #[must_use]
4072    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4073        *self.current_transaction.lock()
4074    }
4075
4076    /// Returns a reference to the transaction manager.
4077    #[must_use]
4078    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4079        &self.transaction_manager
4080    }
4081
4082    /// Returns the store's current node count and the count at transaction start.
4083    #[cfg(feature = "lpg")]
4084    #[must_use]
4085    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4086        (
4087            self.transaction_start_node_count.load(Ordering::Relaxed),
4088            self.active_lpg_store().node_count(),
4089        )
4090    }
4091
4092    /// Returns the store's current edge count and the count at transaction start.
4093    #[cfg(feature = "lpg")]
4094    #[must_use]
4095    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4096        (
4097            self.transaction_start_edge_count.load(Ordering::Relaxed),
4098            self.active_lpg_store().edge_count(),
4099        )
4100    }
4101
4102    /// Prepares the current transaction for a two-phase commit.
4103    ///
4104    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
4105    /// lets you inspect pending changes and attach metadata before finalizing.
4106    /// The mutable borrow prevents concurrent operations while the commit is
4107    /// pending.
4108    ///
4109    /// If the `PreparedCommit` is dropped without calling `commit()` or
4110    /// `abort()`, the transaction is automatically rolled back.
4111    ///
4112    /// # Errors
4113    ///
4114    /// Returns an error if no transaction is active.
4115    ///
4116    /// # Examples
4117    ///
4118    /// ```no_run
4119    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
4120    /// use grafeo_engine::GrafeoDB;
4121    ///
4122    /// let db = GrafeoDB::new_in_memory();
4123    /// let mut session = db.session();
4124    ///
4125    /// session.begin_transaction()?;
4126    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
4127    ///
4128    /// let mut prepared = session.prepare_commit()?;
4129    /// println!("Nodes written: {}", prepared.info().nodes_written);
4130    /// prepared.set_metadata("audit_user", "admin");
4131    /// prepared.commit()?;
4132    /// # Ok(())
4133    /// # }
4134    /// ```
4135    #[cfg(feature = "lpg")]
4136    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4137        crate::transaction::PreparedCommit::new(self)
4138    }
4139
4140    /// Sets auto-commit mode.
4141    pub fn set_auto_commit(&mut self, auto_commit: bool) {
4142        self.auto_commit = auto_commit;
4143    }
4144
4145    /// Returns whether auto-commit is enabled.
4146    #[must_use]
4147    pub fn auto_commit(&self) -> bool {
4148        self.auto_commit
4149    }
4150
4151    /// Returns `true` if auto-commit should wrap this execution.
4152    ///
4153    /// Auto-commit kicks in when: the session is in auto-commit mode,
4154    /// no explicit transaction is active, and the query mutates data.
4155    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4156        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4157    }
4158
4159    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
4160    /// returns `true`. On error the transaction is rolled back.
4161    #[cfg(feature = "lpg")]
4162    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4163    where
4164        F: FnOnce() -> Result<QueryResult>,
4165    {
4166        if self.needs_auto_commit(has_mutations) {
4167            self.begin_transaction_inner(false, None)?;
4168            match body() {
4169                Ok(result) => {
4170                    self.commit_inner()?;
4171                    Ok(result)
4172                }
4173                Err(e) => {
4174                    let _ = self.rollback_inner();
4175                    Err(e)
4176                }
4177            }
4178        } else {
4179            body()
4180        }
4181    }
4182
4183    /// Non-LPG stub: no auto-commit wrapping (SPARQL UPDATE is atomic).
4184    #[cfg(not(feature = "lpg"))]
4185    fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4186    where
4187        F: FnOnce() -> Result<QueryResult>,
4188    {
4189        body()
4190    }
4191
4192    /// Quick heuristic: returns `true` when the query text looks like it
4193    /// performs a mutation. Used by `_with_params` paths that go through the
4194    /// `QueryProcessor` (where the logical plan isn't available before
4195    /// execution). False negatives are harmless: the data just won't be
4196    /// auto-committed, which matches the prior behaviour.
4197    fn query_looks_like_mutation(query: &str) -> bool {
4198        let upper = query.to_ascii_uppercase();
4199        upper.contains("INSERT")
4200            || upper.contains("CREATE")
4201            || upper.contains("DELETE")
4202            || upper.contains("MERGE")
4203            || upper.contains("SET")
4204            || upper.contains("REMOVE")
4205            || upper.contains("DROP")
4206            || upper.contains("ALTER")
4207    }
4208
4209    /// Computes the wall-clock deadline for query execution.
4210    #[must_use]
4211    fn query_deadline(&self) -> Option<Instant> {
4212        #[cfg(not(target_arch = "wasm32"))]
4213        {
4214            self.query_timeout.map(|d| Instant::now() + d)
4215        }
4216        #[cfg(target_arch = "wasm32")]
4217        {
4218            let _ = &self.query_timeout;
4219            None
4220        }
4221    }
4222
4223    /// Records query metrics for any language.
4224    ///
4225    /// Called after query execution to update counters, latency histogram,
4226    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
4227    /// where `Instant` is unavailable.
4228    #[cfg(feature = "metrics")]
4229    fn record_query_metrics(
4230        &self,
4231        language: &str,
4232        elapsed_ms: Option<f64>,
4233        result: &Result<crate::database::QueryResult>,
4234    ) {
4235        use crate::metrics::record_metric;
4236
4237        record_metric!(self.metrics, query_count, inc);
4238        if let Some(ref reg) = self.metrics {
4239            reg.query_count_by_language.increment(language);
4240        }
4241        if let Some(ms) = elapsed_ms {
4242            record_metric!(self.metrics, query_latency, observe ms);
4243        }
4244        match result {
4245            Ok(r) => {
4246                let returned = r.rows.len() as u64;
4247                record_metric!(self.metrics, rows_returned, add returned);
4248                if let Some(scanned) = r.rows_scanned {
4249                    record_metric!(self.metrics, rows_scanned, add scanned);
4250                }
4251            }
4252            Err(e) => {
4253                record_metric!(self.metrics, query_errors, inc);
4254                // Detect timeout errors
4255                let msg = e.to_string();
4256                if msg.contains("exceeded timeout") {
4257                    record_metric!(self.metrics, query_timeouts, inc);
4258                }
4259            }
4260        }
4261    }
4262
4263    /// Evaluates a simple integer literal from a session parameter expression.
4264    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4265        use grafeo_adapters::query::gql::ast::{Expression, Literal};
4266        match expr {
4267            Expression::Literal(Literal::Integer(n)) => Some(*n),
4268            _ => None,
4269        }
4270    }
4271
4272    /// Returns the current transaction context for MVCC visibility.
4273    ///
4274    /// Returns `(viewing_epoch, transaction_id)` where:
4275    /// - `viewing_epoch` is the epoch at which to check version visibility
4276    /// - `transaction_id` is the current transaction ID (if in a transaction)
4277    #[must_use]
4278    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4279        // Time-travel override takes precedence (read-only, no tx context)
4280        if let Some(epoch) = *self.viewing_epoch_override.lock() {
4281            return (epoch, None);
4282        }
4283
4284        if let Some(transaction_id) = *self.current_transaction.lock() {
4285            // In a transaction: use the transaction's start epoch
4286            let epoch = self
4287                .transaction_manager
4288                .start_epoch(transaction_id)
4289                .unwrap_or_else(|| self.transaction_manager.current_epoch());
4290            (epoch, Some(transaction_id))
4291        } else {
4292            // No transaction: use current epoch
4293            (self.transaction_manager.current_epoch(), None)
4294        }
4295    }
4296
4297    /// Creates a planner with transaction context and constraint validator.
4298    ///
4299    /// The `store` parameter is the graph store to plan against (use
4300    /// `self.active_store()` for graph-aware execution).
4301    fn create_planner_for_store(
4302        &self,
4303        store: Arc<dyn GraphStore>,
4304        viewing_epoch: EpochId,
4305        transaction_id: Option<TransactionId>,
4306    ) -> crate::query::Planner {
4307        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4308    }
4309
4310    fn create_planner_for_store_with_read_only(
4311        &self,
4312        store: Arc<dyn GraphStore>,
4313        viewing_epoch: EpochId,
4314        transaction_id: Option<TransactionId>,
4315        read_only: bool,
4316    ) -> crate::query::Planner {
4317        use crate::query::Planner;
4318        use grafeo_core::execution::operators::{LazyValue, SessionContext};
4319
4320        // Capture store reference for lazy introspection (only computed if info()/schema() called).
4321        let info_store = Arc::clone(&store);
4322        let schema_store = Arc::clone(&store);
4323
4324        let session_context = SessionContext {
4325            current_schema: self.current_schema(),
4326            current_graph: self.current_graph(),
4327            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4328            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4329        };
4330
4331        let write_store = self.active_write_store();
4332
4333        let mut planner = Planner::with_context(
4334            Arc::clone(&store),
4335            write_store,
4336            Arc::clone(&self.transaction_manager),
4337            transaction_id,
4338            viewing_epoch,
4339        )
4340        .with_factorized_execution(self.factorized_execution)
4341        .with_catalog(Arc::clone(&self.catalog))
4342        .with_session_context(session_context)
4343        .with_read_only(read_only);
4344
4345        // Attach the constraint validator for schema enforcement
4346        let validator =
4347            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
4348        planner = planner.with_validator(Arc::new(validator));
4349
4350        planner
4351    }
4352
4353    /// Builds a `Value::Map` for the `info()` introspection function.
4354    fn build_info_value(store: &dyn GraphStore) -> Value {
4355        use grafeo_common::types::PropertyKey;
4356        use std::collections::BTreeMap;
4357
4358        let mut map = BTreeMap::new();
4359        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4360        map.insert(
4361            PropertyKey::from("node_count"),
4362            Value::Int64(store.node_count() as i64),
4363        );
4364        map.insert(
4365            PropertyKey::from("edge_count"),
4366            Value::Int64(store.edge_count() as i64),
4367        );
4368        map.insert(
4369            PropertyKey::from("version"),
4370            Value::String(env!("CARGO_PKG_VERSION").into()),
4371        );
4372        Value::Map(map.into())
4373    }
4374
4375    /// Builds a `Value::Map` for the `schema()` introspection function.
4376    fn build_schema_value(store: &dyn GraphStore) -> Value {
4377        use grafeo_common::types::PropertyKey;
4378        use std::collections::BTreeMap;
4379
4380        let labels: Vec<Value> = store
4381            .all_labels()
4382            .into_iter()
4383            .map(|l| Value::String(l.into()))
4384            .collect();
4385        let edge_types: Vec<Value> = store
4386            .all_edge_types()
4387            .into_iter()
4388            .map(|t| Value::String(t.into()))
4389            .collect();
4390        let property_keys: Vec<Value> = store
4391            .all_property_keys()
4392            .into_iter()
4393            .map(|k| Value::String(k.into()))
4394            .collect();
4395
4396        let mut map = BTreeMap::new();
4397        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4398        map.insert(
4399            PropertyKey::from("edge_types"),
4400            Value::List(edge_types.into()),
4401        );
4402        map.insert(
4403            PropertyKey::from("property_keys"),
4404            Value::List(property_keys.into()),
4405        );
4406        Value::Map(map.into())
4407    }
4408
4409    /// Creates a node directly (bypassing query execution).
4410    ///
4411    /// This is a low-level API for testing and direct manipulation.
4412    /// If a transaction is active, the node will be versioned with the transaction ID.
4413    #[cfg(feature = "lpg")]
4414    pub fn create_node(&self, labels: &[&str]) -> NodeId {
4415        let (epoch, transaction_id) = self.get_transaction_context();
4416        self.active_lpg_store().create_node_versioned(
4417            labels,
4418            epoch,
4419            transaction_id.unwrap_or(TransactionId::SYSTEM),
4420        )
4421    }
4422
4423    /// Creates a node with properties.
4424    ///
4425    /// If a transaction is active, the node will be versioned with the transaction ID.
4426    #[cfg(feature = "lpg")]
4427    pub fn create_node_with_props<'a>(
4428        &self,
4429        labels: &[&str],
4430        properties: impl IntoIterator<Item = (&'a str, Value)>,
4431    ) -> NodeId {
4432        let (epoch, transaction_id) = self.get_transaction_context();
4433        self.active_lpg_store().create_node_with_props_versioned(
4434            labels,
4435            properties,
4436            epoch,
4437            transaction_id.unwrap_or(TransactionId::SYSTEM),
4438        )
4439    }
4440
4441    /// Creates an edge between two nodes.
4442    ///
4443    /// This is a low-level API for testing and direct manipulation.
4444    /// If a transaction is active, the edge will be versioned with the transaction ID.
4445    #[cfg(feature = "lpg")]
4446    pub fn create_edge(
4447        &self,
4448        src: NodeId,
4449        dst: NodeId,
4450        edge_type: &str,
4451    ) -> grafeo_common::types::EdgeId {
4452        let (epoch, transaction_id) = self.get_transaction_context();
4453        self.active_lpg_store().create_edge_versioned(
4454            src,
4455            dst,
4456            edge_type,
4457            epoch,
4458            transaction_id.unwrap_or(TransactionId::SYSTEM),
4459        )
4460    }
4461
4462    /// Creates an edge with properties within the active transaction context.
4463    #[cfg(feature = "lpg")]
4464    pub fn create_edge_with_props<'a>(
4465        &self,
4466        src: NodeId,
4467        dst: NodeId,
4468        edge_type: &str,
4469        properties: impl IntoIterator<Item = (&'a str, Value)>,
4470    ) -> grafeo_common::types::EdgeId {
4471        let (epoch, transaction_id) = self.get_transaction_context();
4472        let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4473        let store = self.active_lpg_store();
4474        let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4475        for (key, value) in properties {
4476            store.set_edge_property_versioned(eid, key, value, tid);
4477        }
4478        eid
4479    }
4480
4481    /// Sets a node property within the active transaction context.
4482    #[cfg(feature = "lpg")]
4483    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
4484        let (_, transaction_id) = self.get_transaction_context();
4485        if let Some(tid) = transaction_id {
4486            self.active_lpg_store()
4487                .set_node_property_versioned(id, key, value, tid);
4488        } else {
4489            self.active_lpg_store().set_node_property(id, key, value);
4490        }
4491    }
4492
4493    /// Sets an edge property within the active transaction context.
4494    #[cfg(feature = "lpg")]
4495    pub fn set_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str, value: Value) {
4496        let (_, transaction_id) = self.get_transaction_context();
4497        if let Some(tid) = transaction_id {
4498            self.active_lpg_store()
4499                .set_edge_property_versioned(id, key, value, tid);
4500        } else {
4501            self.active_lpg_store().set_edge_property(id, key, value);
4502        }
4503    }
4504
4505    /// Deletes a node within the active transaction context.
4506    #[cfg(feature = "lpg")]
4507    pub fn delete_node(&self, id: NodeId) -> bool {
4508        let (epoch, transaction_id) = self.get_transaction_context();
4509        if let Some(tid) = transaction_id {
4510            self.active_lpg_store()
4511                .delete_node_versioned(id, epoch, tid)
4512        } else {
4513            self.active_lpg_store().delete_node(id)
4514        }
4515    }
4516
4517    /// Deletes an edge within the active transaction context.
4518    #[cfg(feature = "lpg")]
4519    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4520        let (epoch, transaction_id) = self.get_transaction_context();
4521        if let Some(tid) = transaction_id {
4522            self.active_lpg_store()
4523                .delete_edge_versioned(id, epoch, tid)
4524        } else {
4525            self.active_lpg_store().delete_edge(id)
4526        }
4527    }
4528
4529    // =========================================================================
4530    // Direct Lookup APIs (bypass query planning for O(1) point reads)
4531    // =========================================================================
4532
4533    /// Gets a node by ID directly, bypassing query planning.
4534    ///
4535    /// This is the fastest way to retrieve a single node when you know its ID.
4536    /// Skips parsing, binding, optimization, and physical planning entirely.
4537    ///
4538    /// # Performance
4539    ///
4540    /// - Time complexity: O(1) average case
4541    /// - No lock contention (uses DashMap internally)
4542    /// - ~20-30x faster than equivalent MATCH query
4543    ///
4544    /// # Example
4545    ///
4546    /// ```no_run
4547    /// # use grafeo_engine::GrafeoDB;
4548    /// # let db = GrafeoDB::new_in_memory();
4549    /// let session = db.session();
4550    /// let node_id = session.create_node(&["Person"]);
4551    ///
4552    /// // Direct lookup - O(1), no query planning
4553    /// let node = session.get_node(node_id);
4554    /// assert!(node.is_some());
4555    /// ```
4556    #[cfg(feature = "lpg")]
4557    #[must_use]
4558    pub fn get_node(&self, id: NodeId) -> Option<Node> {
4559        let (epoch, transaction_id) = self.get_transaction_context();
4560        self.active_lpg_store().get_node_versioned(
4561            id,
4562            epoch,
4563            transaction_id.unwrap_or(TransactionId::SYSTEM),
4564        )
4565    }
4566
4567    /// Gets a single property from a node by ID, bypassing query planning.
4568    ///
4569    /// More efficient than `get_node()` when you only need one property,
4570    /// as it avoids loading the full node with all properties.
4571    ///
4572    /// # Performance
4573    ///
4574    /// - Time complexity: O(1) average case
4575    /// - No query planning overhead
4576    ///
4577    /// # Example
4578    ///
4579    /// ```no_run
4580    /// # use grafeo_engine::GrafeoDB;
4581    /// # use grafeo_common::types::Value;
4582    /// # let db = GrafeoDB::new_in_memory();
4583    /// let session = db.session();
4584    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
4585    ///
4586    /// // Direct property access - O(1)
4587    /// let name = session.get_node_property(id, "name");
4588    /// assert_eq!(name, Some(Value::String("Alix".into())));
4589    /// ```
4590    #[cfg(feature = "lpg")]
4591    #[must_use]
4592    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4593        self.get_node(id)
4594            .and_then(|node| node.get_property(key).cloned())
4595    }
4596
4597    /// Gets an edge by ID directly, bypassing query planning.
4598    ///
4599    /// # Performance
4600    ///
4601    /// - Time complexity: O(1) average case
4602    /// - No lock contention
4603    #[cfg(feature = "lpg")]
4604    #[must_use]
4605    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4606        let (epoch, transaction_id) = self.get_transaction_context();
4607        self.active_lpg_store().get_edge_versioned(
4608            id,
4609            epoch,
4610            transaction_id.unwrap_or(TransactionId::SYSTEM),
4611        )
4612    }
4613
4614    /// Gets outgoing neighbors of a node directly, bypassing query planning.
4615    ///
4616    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
4617    ///
4618    /// # Performance
4619    ///
4620    /// - Time complexity: O(degree) where degree is the number of outgoing edges
4621    /// - Uses adjacency index for direct access
4622    /// - ~10-20x faster than equivalent MATCH query
4623    ///
4624    /// # Example
4625    ///
4626    /// ```no_run
4627    /// # use grafeo_engine::GrafeoDB;
4628    /// # let db = GrafeoDB::new_in_memory();
4629    /// let session = db.session();
4630    /// let alix = session.create_node(&["Person"]);
4631    /// let gus = session.create_node(&["Person"]);
4632    /// session.create_edge(alix, gus, "KNOWS");
4633    ///
4634    /// // Direct neighbor lookup - O(degree)
4635    /// let neighbors = session.get_neighbors_outgoing(alix);
4636    /// assert_eq!(neighbors.len(), 1);
4637    /// assert_eq!(neighbors[0].0, gus);
4638    /// ```
4639    #[cfg(feature = "lpg")]
4640    #[must_use]
4641    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4642        self.active_lpg_store()
4643            .edges_from(node, Direction::Outgoing)
4644            .collect()
4645    }
4646
4647    /// Gets incoming neighbors of a node directly, bypassing query planning.
4648    ///
4649    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
4650    ///
4651    /// # Performance
4652    ///
4653    /// - Time complexity: O(degree) where degree is the number of incoming edges
4654    /// - Uses backward adjacency index for direct access
4655    #[cfg(feature = "lpg")]
4656    #[must_use]
4657    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4658        self.active_lpg_store()
4659            .edges_from(node, Direction::Incoming)
4660            .collect()
4661    }
4662
4663    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
4664    ///
4665    /// # Example
4666    ///
4667    /// ```no_run
4668    /// # use grafeo_engine::GrafeoDB;
4669    /// # let db = GrafeoDB::new_in_memory();
4670    /// # let session = db.session();
4671    /// # let alix = session.create_node(&["Person"]);
4672    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4673    /// ```
4674    #[cfg(feature = "lpg")]
4675    #[must_use]
4676    pub fn get_neighbors_outgoing_by_type(
4677        &self,
4678        node: NodeId,
4679        edge_type: &str,
4680    ) -> Vec<(NodeId, EdgeId)> {
4681        self.active_lpg_store()
4682            .edges_from(node, Direction::Outgoing)
4683            .filter(|(_, edge_id)| {
4684                self.get_edge(*edge_id)
4685                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
4686            })
4687            .collect()
4688    }
4689
4690    /// Checks if a node exists, bypassing query planning.
4691    ///
4692    /// # Performance
4693    ///
4694    /// - Time complexity: O(1)
4695    /// - Fastest existence check available
4696    #[cfg(feature = "lpg")]
4697    #[must_use]
4698    pub fn node_exists(&self, id: NodeId) -> bool {
4699        self.get_node(id).is_some()
4700    }
4701
4702    /// Checks if an edge exists, bypassing query planning.
4703    #[cfg(feature = "lpg")]
4704    #[must_use]
4705    pub fn edge_exists(&self, id: EdgeId) -> bool {
4706        self.get_edge(id).is_some()
4707    }
4708
4709    /// Gets the degree (number of edges) of a node.
4710    ///
4711    /// Returns (outgoing_degree, incoming_degree).
4712    #[cfg(feature = "lpg")]
4713    #[must_use]
4714    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4715        let active = self.active_lpg_store();
4716        let out = active.out_degree(node);
4717        let in_degree = active.in_degree(node);
4718        (out, in_degree)
4719    }
4720
4721    /// Batch lookup of multiple nodes by ID.
4722    ///
4723    /// More efficient than calling `get_node()` in a loop because it
4724    /// amortizes overhead.
4725    ///
4726    /// # Performance
4727    ///
4728    /// - Time complexity: O(n) where n is the number of IDs
4729    /// - Better cache utilization than individual lookups
4730    #[cfg(feature = "lpg")]
4731    #[must_use]
4732    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4733        let (epoch, transaction_id) = self.get_transaction_context();
4734        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4735        let active = self.active_lpg_store();
4736        ids.iter()
4737            .map(|&id| active.get_node_versioned(id, epoch, tx))
4738            .collect()
4739    }
4740
4741    // ── Change Data Capture ─────────────────────────────────────────────
4742
4743    /// Returns the full change history for an entity (node or edge).
4744    ///
4745    /// # Errors
4746    ///
4747    /// Currently infallible, but returns `Result` for forward compatibility.
4748    #[cfg(feature = "cdc")]
4749    pub fn history(
4750        &self,
4751        entity_id: impl Into<crate::cdc::EntityId>,
4752    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4753        Ok(self.cdc_log.history(entity_id.into()))
4754    }
4755
4756    /// Returns change events for an entity since the given epoch.
4757    ///
4758    /// # Errors
4759    ///
4760    /// Currently infallible, but returns `Result` for forward compatibility.
4761    #[cfg(feature = "cdc")]
4762    pub fn history_since(
4763        &self,
4764        entity_id: impl Into<crate::cdc::EntityId>,
4765        since_epoch: EpochId,
4766    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4767        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4768    }
4769
4770    /// Returns all change events across all entities in an epoch range.
4771    ///
4772    /// # Errors
4773    ///
4774    /// Currently infallible, but returns `Result` for forward compatibility.
4775    #[cfg(feature = "cdc")]
4776    pub fn changes_between(
4777        &self,
4778        start_epoch: EpochId,
4779        end_epoch: EpochId,
4780    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4781        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4782    }
4783}
4784
4785impl Drop for Session {
4786    fn drop(&mut self) {
4787        // Auto-rollback any active transaction to prevent leaked MVCC state,
4788        // dangling write locks, and uncommitted versions lingering in the store.
4789        #[cfg(feature = "lpg")]
4790        if self.in_transaction() {
4791            let _ = self.rollback_inner();
4792        }
4793
4794        #[cfg(feature = "metrics")]
4795        if let Some(ref reg) = self.metrics {
4796            reg.session_active
4797                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4798        }
4799    }
4800}
4801
4802#[cfg(test)]
4803mod tests {
4804    use super::parse_default_literal;
4805    use crate::database::GrafeoDB;
4806    use grafeo_common::types::Value;
4807
4808    // -----------------------------------------------------------------------
4809    // parse_default_literal
4810    // -----------------------------------------------------------------------
4811
4812    #[test]
4813    fn parse_default_literal_null() {
4814        assert_eq!(parse_default_literal("null"), Value::Null);
4815        assert_eq!(parse_default_literal("NULL"), Value::Null);
4816        assert_eq!(parse_default_literal("Null"), Value::Null);
4817    }
4818
4819    #[test]
4820    fn parse_default_literal_bool() {
4821        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4822        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4823        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4824        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4825    }
4826
4827    #[test]
4828    fn parse_default_literal_string_single_quoted() {
4829        assert_eq!(
4830            parse_default_literal("'hello'"),
4831            Value::String("hello".into())
4832        );
4833    }
4834
4835    #[test]
4836    fn parse_default_literal_string_double_quoted() {
4837        assert_eq!(
4838            parse_default_literal("\"world\""),
4839            Value::String("world".into())
4840        );
4841    }
4842
4843    #[test]
4844    fn parse_default_literal_integer() {
4845        assert_eq!(parse_default_literal("42"), Value::Int64(42));
4846        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4847        assert_eq!(parse_default_literal("0"), Value::Int64(0));
4848    }
4849
4850    #[test]
4851    fn parse_default_literal_float() {
4852        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4853        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4854    }
4855
4856    #[test]
4857    fn parse_default_literal_fallback_string() {
4858        // Not a recognized literal, not quoted, not a number
4859        assert_eq!(
4860            parse_default_literal("some_identifier"),
4861            Value::String("some_identifier".into())
4862        );
4863    }
4864
4865    #[test]
4866    fn test_session_create_node() {
4867        let db = GrafeoDB::new_in_memory();
4868        let session = db.session();
4869
4870        let id = session.create_node(&["Person"]);
4871        assert!(id.is_valid());
4872        assert_eq!(db.node_count(), 1);
4873    }
4874
4875    #[test]
4876    fn test_session_transaction() {
4877        let db = GrafeoDB::new_in_memory();
4878        let mut session = db.session();
4879
4880        assert!(!session.in_transaction());
4881
4882        session.begin_transaction().unwrap();
4883        assert!(session.in_transaction());
4884
4885        session.commit().unwrap();
4886        assert!(!session.in_transaction());
4887    }
4888
4889    #[test]
4890    fn test_session_transaction_context() {
4891        let db = GrafeoDB::new_in_memory();
4892        let mut session = db.session();
4893
4894        // Without transaction - context should have current epoch and no transaction_id
4895        let (_epoch1, transaction_id1) = session.get_transaction_context();
4896        assert!(transaction_id1.is_none());
4897
4898        // Start a transaction
4899        session.begin_transaction().unwrap();
4900        let (epoch2, transaction_id2) = session.get_transaction_context();
4901        assert!(transaction_id2.is_some());
4902        // Transaction should have a valid epoch
4903        let _ = epoch2; // Use the variable
4904
4905        // Commit and verify
4906        session.commit().unwrap();
4907        let (epoch3, tx_id3) = session.get_transaction_context();
4908        assert!(tx_id3.is_none());
4909        // Epoch should have advanced after commit
4910        assert!(epoch3.as_u64() >= epoch2.as_u64());
4911    }
4912
4913    #[test]
4914    fn test_session_rollback() {
4915        let db = GrafeoDB::new_in_memory();
4916        let mut session = db.session();
4917
4918        session.begin_transaction().unwrap();
4919        session.rollback().unwrap();
4920        assert!(!session.in_transaction());
4921    }
4922
4923    #[test]
4924    fn test_session_rollback_discards_versions() {
4925        use grafeo_common::types::TransactionId;
4926
4927        let db = GrafeoDB::new_in_memory();
4928
4929        // Create a node outside of any transaction (at system level)
4930        let node_before = db.store().create_node(&["Person"]);
4931        assert!(node_before.is_valid());
4932        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4933
4934        // Start a transaction
4935        let mut session = db.session();
4936        session.begin_transaction().unwrap();
4937        let transaction_id = session.current_transaction.lock().unwrap();
4938
4939        // Create a node versioned with the transaction's ID
4940        let epoch = db.store().current_epoch();
4941        let node_in_tx = db
4942            .store()
4943            .create_node_versioned(&["Person"], epoch, transaction_id);
4944        assert!(node_in_tx.is_valid());
4945
4946        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4947        // non-versioned lookups like node_count(). Verify the node is visible
4948        // only through the owning transaction.
4949        assert_eq!(
4950            db.node_count(),
4951            1,
4952            "PENDING nodes should be invisible to non-versioned node_count()"
4953        );
4954        assert!(
4955            db.store()
4956                .get_node_versioned(node_in_tx, epoch, transaction_id)
4957                .is_some(),
4958            "Transaction node should be visible to its own transaction"
4959        );
4960
4961        // Rollback the transaction
4962        session.rollback().unwrap();
4963        assert!(!session.in_transaction());
4964
4965        // The node created in the transaction should be discarded
4966        // Only the first node should remain visible
4967        let count_after = db.node_count();
4968        assert_eq!(
4969            count_after, 1,
4970            "Rollback should discard uncommitted node, but got {count_after}"
4971        );
4972
4973        // The original node should still be accessible
4974        let current_epoch = db.store().current_epoch();
4975        assert!(
4976            db.store()
4977                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4978                .is_some(),
4979            "Original node should still exist"
4980        );
4981
4982        // The node created in the transaction should not be accessible
4983        assert!(
4984            db.store()
4985                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4986                .is_none(),
4987            "Transaction node should be gone"
4988        );
4989    }
4990
4991    #[test]
4992    fn test_session_create_node_in_transaction() {
4993        // Test that session.create_node() is transaction-aware
4994        let db = GrafeoDB::new_in_memory();
4995
4996        // Create a node outside of any transaction
4997        let node_before = db.create_node(&["Person"]);
4998        assert!(node_before.is_valid());
4999        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5000
5001        // Start a transaction and create a node through the session
5002        let mut session = db.session();
5003        session.begin_transaction().unwrap();
5004        let transaction_id = session.current_transaction.lock().unwrap();
5005
5006        // Create a node through session.create_node() - should be versioned with tx
5007        let node_in_tx = session.create_node(&["Person"]);
5008        assert!(node_in_tx.is_valid());
5009
5010        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5011        // non-versioned lookups. Verify the node is visible only to its own tx.
5012        assert_eq!(
5013            db.node_count(),
5014            1,
5015            "PENDING nodes should be invisible to non-versioned node_count()"
5016        );
5017        let epoch = db.store().current_epoch();
5018        assert!(
5019            db.store()
5020                .get_node_versioned(node_in_tx, epoch, transaction_id)
5021                .is_some(),
5022            "Transaction node should be visible to its own transaction"
5023        );
5024
5025        // Rollback the transaction
5026        session.rollback().unwrap();
5027
5028        // The node created via session.create_node() should be discarded
5029        let count_after = db.node_count();
5030        assert_eq!(
5031            count_after, 1,
5032            "Rollback should discard node created via session.create_node(), but got {count_after}"
5033        );
5034    }
5035
5036    #[test]
5037    fn test_session_create_node_with_props_in_transaction() {
5038        use grafeo_common::types::Value;
5039
5040        // Test that session.create_node_with_props() is transaction-aware
5041        let db = GrafeoDB::new_in_memory();
5042
5043        // Create a node outside of any transaction
5044        db.create_node(&["Person"]);
5045        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5046
5047        // Start a transaction and create a node with properties
5048        let mut session = db.session();
5049        session.begin_transaction().unwrap();
5050        let transaction_id = session.current_transaction.lock().unwrap();
5051
5052        let node_in_tx =
5053            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5054        assert!(node_in_tx.is_valid());
5055
5056        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5057        // non-versioned lookups. Verify the node is visible only to its own tx.
5058        assert_eq!(
5059            db.node_count(),
5060            1,
5061            "PENDING nodes should be invisible to non-versioned node_count()"
5062        );
5063        let epoch = db.store().current_epoch();
5064        assert!(
5065            db.store()
5066                .get_node_versioned(node_in_tx, epoch, transaction_id)
5067                .is_some(),
5068            "Transaction node should be visible to its own transaction"
5069        );
5070
5071        // Rollback the transaction
5072        session.rollback().unwrap();
5073
5074        // The node should be discarded
5075        let count_after = db.node_count();
5076        assert_eq!(
5077            count_after, 1,
5078            "Rollback should discard node created via session.create_node_with_props()"
5079        );
5080    }
5081
5082    #[cfg(feature = "gql")]
5083    mod gql_tests {
5084        use super::*;
5085
5086        #[test]
5087        fn test_gql_query_execution() {
5088            let db = GrafeoDB::new_in_memory();
5089            let session = db.session();
5090
5091            // Create some test data
5092            session.create_node(&["Person"]);
5093            session.create_node(&["Person"]);
5094            session.create_node(&["Animal"]);
5095
5096            // Execute a GQL query
5097            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5098
5099            // Should return 2 Person nodes
5100            assert_eq!(result.row_count(), 2);
5101            assert_eq!(result.column_count(), 1);
5102            assert_eq!(result.columns[0], "n");
5103        }
5104
5105        #[test]
5106        fn test_gql_empty_result() {
5107            let db = GrafeoDB::new_in_memory();
5108            let session = db.session();
5109
5110            // No data in database
5111            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5112
5113            assert_eq!(result.row_count(), 0);
5114        }
5115
5116        #[test]
5117        fn test_gql_parse_error() {
5118            let db = GrafeoDB::new_in_memory();
5119            let session = db.session();
5120
5121            // Invalid GQL syntax
5122            let result = session.execute("MATCH (n RETURN n");
5123
5124            assert!(result.is_err());
5125        }
5126
5127        #[test]
5128        fn test_gql_relationship_traversal() {
5129            let db = GrafeoDB::new_in_memory();
5130            let session = db.session();
5131
5132            // Create a graph: Alix -> Gus, Alix -> Vincent
5133            let alix = session.create_node(&["Person"]);
5134            let gus = session.create_node(&["Person"]);
5135            let vincent = session.create_node(&["Person"]);
5136
5137            session.create_edge(alix, gus, "KNOWS");
5138            session.create_edge(alix, vincent, "KNOWS");
5139
5140            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
5141            let result = session
5142                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5143                .unwrap();
5144
5145            // Should return 2 rows (Alix->Gus, Alix->Vincent)
5146            assert_eq!(result.row_count(), 2);
5147            assert_eq!(result.column_count(), 2);
5148            assert_eq!(result.columns[0], "a");
5149            assert_eq!(result.columns[1], "b");
5150        }
5151
5152        #[test]
5153        fn test_gql_relationship_with_type_filter() {
5154            let db = GrafeoDB::new_in_memory();
5155            let session = db.session();
5156
5157            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
5158            let alix = session.create_node(&["Person"]);
5159            let gus = session.create_node(&["Person"]);
5160            let vincent = session.create_node(&["Person"]);
5161
5162            session.create_edge(alix, gus, "KNOWS");
5163            session.create_edge(alix, vincent, "WORKS_WITH");
5164
5165            // Query only KNOWS relationships
5166            let result = session
5167                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5168                .unwrap();
5169
5170            // Should return only 1 row (Alix->Gus)
5171            assert_eq!(result.row_count(), 1);
5172        }
5173
5174        #[test]
5175        fn test_gql_semantic_error_undefined_variable() {
5176            let db = GrafeoDB::new_in_memory();
5177            let session = db.session();
5178
5179            // Reference undefined variable 'x' in RETURN
5180            let result = session.execute("MATCH (n:Person) RETURN x");
5181
5182            // Should fail with semantic error
5183            assert!(result.is_err());
5184            let Err(err) = result else {
5185                panic!("Expected error")
5186            };
5187            assert!(
5188                err.to_string().contains("Undefined variable"),
5189                "Expected undefined variable error, got: {}",
5190                err
5191            );
5192        }
5193
5194        #[test]
5195        fn test_gql_where_clause_property_filter() {
5196            use grafeo_common::types::Value;
5197
5198            let db = GrafeoDB::new_in_memory();
5199            let session = db.session();
5200
5201            // Create people with ages
5202            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
5203            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
5204            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
5205
5206            // Query with WHERE clause: age > 30
5207            let result = session
5208                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5209                .unwrap();
5210
5211            // Should return 2 people (ages 35 and 45)
5212            assert_eq!(result.row_count(), 2);
5213        }
5214
5215        #[test]
5216        fn test_gql_where_clause_equality() {
5217            use grafeo_common::types::Value;
5218
5219            let db = GrafeoDB::new_in_memory();
5220            let session = db.session();
5221
5222            // Create people with names
5223            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5224            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
5225            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5226
5227            // Query with WHERE clause: name = "Alix"
5228            let result = session
5229                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5230                .unwrap();
5231
5232            // Should return 2 people named Alix
5233            assert_eq!(result.row_count(), 2);
5234        }
5235
5236        #[test]
5237        fn test_gql_return_property_access() {
5238            use grafeo_common::types::Value;
5239
5240            let db = GrafeoDB::new_in_memory();
5241            let session = db.session();
5242
5243            // Create people with names and ages
5244            session.create_node_with_props(
5245                &["Person"],
5246                [
5247                    ("name", Value::String("Alix".into())),
5248                    ("age", Value::Int64(30)),
5249                ],
5250            );
5251            session.create_node_with_props(
5252                &["Person"],
5253                [
5254                    ("name", Value::String("Gus".into())),
5255                    ("age", Value::Int64(25)),
5256                ],
5257            );
5258
5259            // Query returning properties
5260            let result = session
5261                .execute("MATCH (n:Person) RETURN n.name, n.age")
5262                .unwrap();
5263
5264            // Should return 2 rows with name and age columns
5265            assert_eq!(result.row_count(), 2);
5266            assert_eq!(result.column_count(), 2);
5267            assert_eq!(result.columns[0], "n.name");
5268            assert_eq!(result.columns[1], "n.age");
5269
5270            // Check that we get actual values
5271            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5272            assert!(names.contains(&&Value::String("Alix".into())));
5273            assert!(names.contains(&&Value::String("Gus".into())));
5274        }
5275
5276        #[test]
5277        fn test_gql_return_mixed_expressions() {
5278            use grafeo_common::types::Value;
5279
5280            let db = GrafeoDB::new_in_memory();
5281            let session = db.session();
5282
5283            // Create a person
5284            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5285
5286            // Query returning both node and property
5287            let result = session
5288                .execute("MATCH (n:Person) RETURN n, n.name")
5289                .unwrap();
5290
5291            assert_eq!(result.row_count(), 1);
5292            assert_eq!(result.column_count(), 2);
5293            assert_eq!(result.columns[0], "n");
5294            assert_eq!(result.columns[1], "n.name");
5295
5296            // Second column should be the name
5297            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5298        }
5299    }
5300
5301    #[cfg(feature = "cypher")]
5302    mod cypher_tests {
5303        use super::*;
5304
5305        #[test]
5306        fn test_cypher_query_execution() {
5307            let db = GrafeoDB::new_in_memory();
5308            let session = db.session();
5309
5310            // Create some test data
5311            session.create_node(&["Person"]);
5312            session.create_node(&["Person"]);
5313            session.create_node(&["Animal"]);
5314
5315            // Execute a Cypher query
5316            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5317
5318            // Should return 2 Person nodes
5319            assert_eq!(result.row_count(), 2);
5320            assert_eq!(result.column_count(), 1);
5321            assert_eq!(result.columns[0], "n");
5322        }
5323
5324        #[test]
5325        fn test_cypher_empty_result() {
5326            let db = GrafeoDB::new_in_memory();
5327            let session = db.session();
5328
5329            // No data in database
5330            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5331
5332            assert_eq!(result.row_count(), 0);
5333        }
5334
5335        #[test]
5336        fn test_cypher_parse_error() {
5337            let db = GrafeoDB::new_in_memory();
5338            let session = db.session();
5339
5340            // Invalid Cypher syntax
5341            let result = session.execute_cypher("MATCH (n RETURN n");
5342
5343            assert!(result.is_err());
5344        }
5345    }
5346
5347    // ==================== Direct Lookup API Tests ====================
5348
5349    mod direct_lookup_tests {
5350        use super::*;
5351        use grafeo_common::types::Value;
5352
5353        #[test]
5354        fn test_get_node() {
5355            let db = GrafeoDB::new_in_memory();
5356            let session = db.session();
5357
5358            let id = session.create_node(&["Person"]);
5359            let node = session.get_node(id);
5360
5361            assert!(node.is_some());
5362            let node = node.unwrap();
5363            assert_eq!(node.id, id);
5364        }
5365
5366        #[test]
5367        fn test_get_node_not_found() {
5368            use grafeo_common::types::NodeId;
5369
5370            let db = GrafeoDB::new_in_memory();
5371            let session = db.session();
5372
5373            // Try to get a non-existent node
5374            let node = session.get_node(NodeId::new(9999));
5375            assert!(node.is_none());
5376        }
5377
5378        #[test]
5379        fn test_get_node_property() {
5380            let db = GrafeoDB::new_in_memory();
5381            let session = db.session();
5382
5383            let id = session
5384                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5385
5386            let name = session.get_node_property(id, "name");
5387            assert_eq!(name, Some(Value::String("Alix".into())));
5388
5389            // Non-existent property
5390            let missing = session.get_node_property(id, "missing");
5391            assert!(missing.is_none());
5392        }
5393
5394        #[test]
5395        fn test_get_edge() {
5396            let db = GrafeoDB::new_in_memory();
5397            let session = db.session();
5398
5399            let alix = session.create_node(&["Person"]);
5400            let gus = session.create_node(&["Person"]);
5401            let edge_id = session.create_edge(alix, gus, "KNOWS");
5402
5403            let edge = session.get_edge(edge_id);
5404            assert!(edge.is_some());
5405            let edge = edge.unwrap();
5406            assert_eq!(edge.id, edge_id);
5407            assert_eq!(edge.src, alix);
5408            assert_eq!(edge.dst, gus);
5409        }
5410
5411        #[test]
5412        fn test_get_edge_not_found() {
5413            use grafeo_common::types::EdgeId;
5414
5415            let db = GrafeoDB::new_in_memory();
5416            let session = db.session();
5417
5418            let edge = session.get_edge(EdgeId::new(9999));
5419            assert!(edge.is_none());
5420        }
5421
5422        #[test]
5423        fn test_get_neighbors_outgoing() {
5424            let db = GrafeoDB::new_in_memory();
5425            let session = db.session();
5426
5427            let alix = session.create_node(&["Person"]);
5428            let gus = session.create_node(&["Person"]);
5429            let harm = session.create_node(&["Person"]);
5430
5431            session.create_edge(alix, gus, "KNOWS");
5432            session.create_edge(alix, harm, "KNOWS");
5433
5434            let neighbors = session.get_neighbors_outgoing(alix);
5435            assert_eq!(neighbors.len(), 2);
5436
5437            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5438            assert!(neighbor_ids.contains(&gus));
5439            assert!(neighbor_ids.contains(&harm));
5440        }
5441
5442        #[test]
5443        fn test_get_neighbors_incoming() {
5444            let db = GrafeoDB::new_in_memory();
5445            let session = db.session();
5446
5447            let alix = session.create_node(&["Person"]);
5448            let gus = session.create_node(&["Person"]);
5449            let harm = session.create_node(&["Person"]);
5450
5451            session.create_edge(gus, alix, "KNOWS");
5452            session.create_edge(harm, alix, "KNOWS");
5453
5454            let neighbors = session.get_neighbors_incoming(alix);
5455            assert_eq!(neighbors.len(), 2);
5456
5457            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5458            assert!(neighbor_ids.contains(&gus));
5459            assert!(neighbor_ids.contains(&harm));
5460        }
5461
5462        #[test]
5463        fn test_get_neighbors_outgoing_by_type() {
5464            let db = GrafeoDB::new_in_memory();
5465            let session = db.session();
5466
5467            let alix = session.create_node(&["Person"]);
5468            let gus = session.create_node(&["Person"]);
5469            let company = session.create_node(&["Company"]);
5470
5471            session.create_edge(alix, gus, "KNOWS");
5472            session.create_edge(alix, company, "WORKS_AT");
5473
5474            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5475            assert_eq!(knows_neighbors.len(), 1);
5476            assert_eq!(knows_neighbors[0].0, gus);
5477
5478            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5479            assert_eq!(works_neighbors.len(), 1);
5480            assert_eq!(works_neighbors[0].0, company);
5481
5482            // No edges of this type
5483            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5484            assert!(no_neighbors.is_empty());
5485        }
5486
5487        #[test]
5488        fn test_node_exists() {
5489            use grafeo_common::types::NodeId;
5490
5491            let db = GrafeoDB::new_in_memory();
5492            let session = db.session();
5493
5494            let id = session.create_node(&["Person"]);
5495
5496            assert!(session.node_exists(id));
5497            assert!(!session.node_exists(NodeId::new(9999)));
5498        }
5499
5500        #[test]
5501        fn test_edge_exists() {
5502            use grafeo_common::types::EdgeId;
5503
5504            let db = GrafeoDB::new_in_memory();
5505            let session = db.session();
5506
5507            let alix = session.create_node(&["Person"]);
5508            let gus = session.create_node(&["Person"]);
5509            let edge_id = session.create_edge(alix, gus, "KNOWS");
5510
5511            assert!(session.edge_exists(edge_id));
5512            assert!(!session.edge_exists(EdgeId::new(9999)));
5513        }
5514
5515        #[test]
5516        fn test_get_degree() {
5517            let db = GrafeoDB::new_in_memory();
5518            let session = db.session();
5519
5520            let alix = session.create_node(&["Person"]);
5521            let gus = session.create_node(&["Person"]);
5522            let harm = session.create_node(&["Person"]);
5523
5524            // Alix knows Gus and Harm (2 outgoing)
5525            session.create_edge(alix, gus, "KNOWS");
5526            session.create_edge(alix, harm, "KNOWS");
5527            // Gus knows Alix (1 incoming for Alix)
5528            session.create_edge(gus, alix, "KNOWS");
5529
5530            let (out_degree, in_degree) = session.get_degree(alix);
5531            assert_eq!(out_degree, 2);
5532            assert_eq!(in_degree, 1);
5533
5534            // Node with no edges
5535            let lonely = session.create_node(&["Person"]);
5536            let (out, in_deg) = session.get_degree(lonely);
5537            assert_eq!(out, 0);
5538            assert_eq!(in_deg, 0);
5539        }
5540
5541        #[test]
5542        fn test_get_nodes_batch() {
5543            let db = GrafeoDB::new_in_memory();
5544            let session = db.session();
5545
5546            let alix = session.create_node(&["Person"]);
5547            let gus = session.create_node(&["Person"]);
5548            let harm = session.create_node(&["Person"]);
5549
5550            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5551            assert_eq!(nodes.len(), 3);
5552            assert!(nodes[0].is_some());
5553            assert!(nodes[1].is_some());
5554            assert!(nodes[2].is_some());
5555
5556            // With non-existent node
5557            use grafeo_common::types::NodeId;
5558            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5559            assert_eq!(nodes_with_missing.len(), 3);
5560            assert!(nodes_with_missing[0].is_some());
5561            assert!(nodes_with_missing[1].is_none()); // Missing node
5562            assert!(nodes_with_missing[2].is_some());
5563        }
5564
5565        #[test]
5566        fn test_auto_commit_setting() {
5567            let db = GrafeoDB::new_in_memory();
5568            let mut session = db.session();
5569
5570            // Default is auto-commit enabled
5571            assert!(session.auto_commit());
5572
5573            session.set_auto_commit(false);
5574            assert!(!session.auto_commit());
5575
5576            session.set_auto_commit(true);
5577            assert!(session.auto_commit());
5578        }
5579
5580        #[test]
5581        fn test_transaction_double_begin_nests() {
5582            let db = GrafeoDB::new_in_memory();
5583            let mut session = db.session();
5584
5585            session.begin_transaction().unwrap();
5586            // Second begin_transaction creates a nested transaction (auto-savepoint)
5587            let result = session.begin_transaction();
5588            assert!(result.is_ok());
5589            // Commit the inner (releases savepoint)
5590            session.commit().unwrap();
5591            // Commit the outer
5592            session.commit().unwrap();
5593        }
5594
5595        #[test]
5596        fn test_commit_without_transaction_error() {
5597            let db = GrafeoDB::new_in_memory();
5598            let mut session = db.session();
5599
5600            let result = session.commit();
5601            assert!(result.is_err());
5602        }
5603
5604        #[test]
5605        fn test_rollback_without_transaction_error() {
5606            let db = GrafeoDB::new_in_memory();
5607            let mut session = db.session();
5608
5609            let result = session.rollback();
5610            assert!(result.is_err());
5611        }
5612
5613        #[test]
5614        fn test_create_edge_in_transaction() {
5615            let db = GrafeoDB::new_in_memory();
5616            let mut session = db.session();
5617
5618            // Create nodes outside transaction
5619            let alix = session.create_node(&["Person"]);
5620            let gus = session.create_node(&["Person"]);
5621
5622            // Create edge in transaction
5623            session.begin_transaction().unwrap();
5624            let edge_id = session.create_edge(alix, gus, "KNOWS");
5625
5626            // Edge should be visible in the transaction
5627            assert!(session.edge_exists(edge_id));
5628
5629            // Commit
5630            session.commit().unwrap();
5631
5632            // Edge should still be visible
5633            assert!(session.edge_exists(edge_id));
5634        }
5635
5636        #[test]
5637        fn test_neighbors_empty_node() {
5638            let db = GrafeoDB::new_in_memory();
5639            let session = db.session();
5640
5641            let lonely = session.create_node(&["Person"]);
5642
5643            assert!(session.get_neighbors_outgoing(lonely).is_empty());
5644            assert!(session.get_neighbors_incoming(lonely).is_empty());
5645            assert!(
5646                session
5647                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5648                    .is_empty()
5649            );
5650        }
5651    }
5652
5653    #[test]
5654    fn test_auto_gc_triggers_on_commit_interval() {
5655        use crate::config::Config;
5656
5657        let config = Config::in_memory().with_gc_interval(2);
5658        let db = GrafeoDB::with_config(config).unwrap();
5659        let mut session = db.session();
5660
5661        // First commit: counter = 1, no GC (not a multiple of 2)
5662        session.begin_transaction().unwrap();
5663        session.create_node(&["A"]);
5664        session.commit().unwrap();
5665
5666        // Second commit: counter = 2, GC should trigger (multiple of 2)
5667        session.begin_transaction().unwrap();
5668        session.create_node(&["B"]);
5669        session.commit().unwrap();
5670
5671        // Verify the database is still functional after GC
5672        assert_eq!(db.node_count(), 2);
5673    }
5674
5675    #[test]
5676    fn test_query_timeout_config_propagates_to_session() {
5677        use crate::config::Config;
5678        use std::time::Duration;
5679
5680        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5681        let db = GrafeoDB::with_config(config).unwrap();
5682        let session = db.session();
5683
5684        // Verify the session has a query deadline (timeout was set)
5685        assert!(session.query_deadline().is_some());
5686    }
5687
5688    #[test]
5689    fn test_no_query_timeout_returns_no_deadline() {
5690        let db = GrafeoDB::new_in_memory();
5691        let session = db.session();
5692
5693        // Default config has no timeout
5694        assert!(session.query_deadline().is_none());
5695    }
5696
5697    #[test]
5698    fn test_graph_model_accessor() {
5699        use crate::config::GraphModel;
5700
5701        let db = GrafeoDB::new_in_memory();
5702        let session = db.session();
5703
5704        assert_eq!(session.graph_model(), GraphModel::Lpg);
5705    }
5706
5707    #[cfg(feature = "gql")]
5708    #[test]
5709    fn test_external_store_session() {
5710        use grafeo_core::graph::GraphStoreMut;
5711        use std::sync::Arc;
5712
5713        let config = crate::config::Config::in_memory();
5714        let store =
5715            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5716        let db = GrafeoDB::with_store(store, config).unwrap();
5717
5718        let mut session = db.session();
5719
5720        // Use an explicit transaction so that INSERT and MATCH share the same
5721        // transaction context. With PENDING epochs, uncommitted versions are
5722        // only visible to the owning transaction.
5723        session.begin_transaction().unwrap();
5724        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5725
5726        // Verify we can query through it within the same transaction
5727        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5728        assert_eq!(result.row_count(), 1);
5729
5730        session.commit().unwrap();
5731    }
5732
5733    // ==================== Session Command Tests ====================
5734
5735    #[cfg(feature = "gql")]
5736    mod session_command_tests {
5737        use super::*;
5738        use grafeo_common::types::Value;
5739
5740        #[test]
5741        fn test_use_graph_sets_current_graph() {
5742            let db = GrafeoDB::new_in_memory();
5743            let session = db.session();
5744
5745            // Create the graph first, then USE it
5746            session.execute("CREATE GRAPH mydb").unwrap();
5747            session.execute("USE GRAPH mydb").unwrap();
5748
5749            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5750        }
5751
5752        #[test]
5753        fn test_use_graph_nonexistent_errors() {
5754            let db = GrafeoDB::new_in_memory();
5755            let session = db.session();
5756
5757            let result = session.execute("USE GRAPH doesnotexist");
5758            assert!(result.is_err());
5759            let err = result.unwrap_err().to_string();
5760            assert!(
5761                err.contains("does not exist"),
5762                "Expected 'does not exist' error, got: {err}"
5763            );
5764        }
5765
5766        #[test]
5767        fn test_use_graph_default_always_valid() {
5768            let db = GrafeoDB::new_in_memory();
5769            let session = db.session();
5770
5771            // "default" is always valid, even without CREATE GRAPH
5772            session.execute("USE GRAPH default").unwrap();
5773            assert_eq!(session.current_graph(), Some("default".to_string()));
5774        }
5775
5776        #[test]
5777        fn test_session_set_graph() {
5778            let db = GrafeoDB::new_in_memory();
5779            let session = db.session();
5780
5781            session.execute("CREATE GRAPH analytics").unwrap();
5782            session.execute("SESSION SET GRAPH analytics").unwrap();
5783            assert_eq!(session.current_graph(), Some("analytics".to_string()));
5784        }
5785
5786        #[test]
5787        fn test_session_set_graph_nonexistent_errors() {
5788            let db = GrafeoDB::new_in_memory();
5789            let session = db.session();
5790
5791            let result = session.execute("SESSION SET GRAPH nosuchgraph");
5792            assert!(result.is_err());
5793        }
5794
5795        #[test]
5796        fn test_session_set_time_zone() {
5797            let db = GrafeoDB::new_in_memory();
5798            let session = db.session();
5799
5800            assert_eq!(session.time_zone(), None);
5801
5802            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5803            assert_eq!(session.time_zone(), Some("UTC".to_string()));
5804
5805            session
5806                .execute("SESSION SET TIME ZONE 'America/New_York'")
5807                .unwrap();
5808            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5809        }
5810
5811        #[test]
5812        fn test_session_set_parameter() {
5813            let db = GrafeoDB::new_in_memory();
5814            let session = db.session();
5815
5816            session
5817                .execute("SESSION SET PARAMETER $timeout = 30")
5818                .unwrap();
5819
5820            // Parameter is stored (value is Null for now, since expression
5821            // evaluation is not yet wired up)
5822            assert!(session.get_parameter("timeout").is_some());
5823        }
5824
5825        #[test]
5826        fn test_session_reset_clears_all_state() {
5827            let db = GrafeoDB::new_in_memory();
5828            let session = db.session();
5829
5830            // Set various session state
5831            session.execute("CREATE GRAPH analytics").unwrap();
5832            session.execute("SESSION SET GRAPH analytics").unwrap();
5833            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5834            session
5835                .execute("SESSION SET PARAMETER $limit = 100")
5836                .unwrap();
5837
5838            // Verify state was set
5839            assert!(session.current_graph().is_some());
5840            assert!(session.time_zone().is_some());
5841            assert!(session.get_parameter("limit").is_some());
5842
5843            // Reset everything
5844            session.execute("SESSION RESET").unwrap();
5845
5846            assert_eq!(session.current_graph(), None);
5847            assert_eq!(session.time_zone(), None);
5848            assert!(session.get_parameter("limit").is_none());
5849        }
5850
5851        #[test]
5852        fn test_session_close_clears_state() {
5853            let db = GrafeoDB::new_in_memory();
5854            let session = db.session();
5855
5856            session.execute("CREATE GRAPH analytics").unwrap();
5857            session.execute("SESSION SET GRAPH analytics").unwrap();
5858            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5859
5860            session.execute("SESSION CLOSE").unwrap();
5861
5862            assert_eq!(session.current_graph(), None);
5863            assert_eq!(session.time_zone(), None);
5864        }
5865
5866        #[test]
5867        fn test_create_graph() {
5868            let db = GrafeoDB::new_in_memory();
5869            let session = db.session();
5870
5871            session.execute("CREATE GRAPH mydb").unwrap();
5872
5873            // Should be able to USE it now
5874            session.execute("USE GRAPH mydb").unwrap();
5875            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5876        }
5877
5878        #[test]
5879        fn test_create_graph_duplicate_errors() {
5880            let db = GrafeoDB::new_in_memory();
5881            let session = db.session();
5882
5883            session.execute("CREATE GRAPH mydb").unwrap();
5884            let result = session.execute("CREATE GRAPH mydb");
5885
5886            assert!(result.is_err());
5887            let err = result.unwrap_err().to_string();
5888            assert!(
5889                err.contains("already exists"),
5890                "Expected 'already exists' error, got: {err}"
5891            );
5892        }
5893
5894        #[test]
5895        fn test_create_graph_if_not_exists() {
5896            let db = GrafeoDB::new_in_memory();
5897            let session = db.session();
5898
5899            session.execute("CREATE GRAPH mydb").unwrap();
5900            // Should succeed silently with IF NOT EXISTS
5901            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5902        }
5903
5904        #[test]
5905        fn test_drop_graph() {
5906            let db = GrafeoDB::new_in_memory();
5907            let session = db.session();
5908
5909            session.execute("CREATE GRAPH mydb").unwrap();
5910            session.execute("DROP GRAPH mydb").unwrap();
5911
5912            // Should no longer be usable
5913            let result = session.execute("USE GRAPH mydb");
5914            assert!(result.is_err());
5915        }
5916
5917        #[test]
5918        fn test_drop_graph_nonexistent_errors() {
5919            let db = GrafeoDB::new_in_memory();
5920            let session = db.session();
5921
5922            let result = session.execute("DROP GRAPH nosuchgraph");
5923            assert!(result.is_err());
5924            let err = result.unwrap_err().to_string();
5925            assert!(
5926                err.contains("does not exist"),
5927                "Expected 'does not exist' error, got: {err}"
5928            );
5929        }
5930
5931        #[test]
5932        fn test_drop_graph_if_exists() {
5933            let db = GrafeoDB::new_in_memory();
5934            let session = db.session();
5935
5936            // Should succeed silently with IF EXISTS
5937            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5938        }
5939
5940        #[test]
5941        fn test_start_transaction_via_gql() {
5942            let db = GrafeoDB::new_in_memory();
5943            let session = db.session();
5944
5945            session.execute("START TRANSACTION").unwrap();
5946            assert!(session.in_transaction());
5947            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5948            session.execute("COMMIT").unwrap();
5949            assert!(!session.in_transaction());
5950
5951            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5952            assert_eq!(result.rows.len(), 1);
5953        }
5954
5955        #[test]
5956        fn test_start_transaction_read_only_blocks_insert() {
5957            let db = GrafeoDB::new_in_memory();
5958            let session = db.session();
5959
5960            session.execute("START TRANSACTION READ ONLY").unwrap();
5961            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5962            assert!(result.is_err());
5963            let err = result.unwrap_err().to_string();
5964            assert!(
5965                err.contains("read-only"),
5966                "Expected read-only error, got: {err}"
5967            );
5968            session.execute("ROLLBACK").unwrap();
5969        }
5970
5971        #[test]
5972        fn test_start_transaction_read_only_allows_reads() {
5973            let db = GrafeoDB::new_in_memory();
5974            let mut session = db.session();
5975            session.begin_transaction().unwrap();
5976            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5977            session.commit().unwrap();
5978
5979            session.execute("START TRANSACTION READ ONLY").unwrap();
5980            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5981            assert_eq!(result.rows.len(), 1);
5982            session.execute("COMMIT").unwrap();
5983        }
5984
5985        #[test]
5986        fn test_rollback_via_gql() {
5987            let db = GrafeoDB::new_in_memory();
5988            let session = db.session();
5989
5990            session.execute("START TRANSACTION").unwrap();
5991            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5992            session.execute("ROLLBACK").unwrap();
5993
5994            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5995            assert!(result.rows.is_empty());
5996        }
5997
5998        #[test]
5999        fn test_start_transaction_with_isolation_level() {
6000            let db = GrafeoDB::new_in_memory();
6001            let session = db.session();
6002
6003            session
6004                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
6005                .unwrap();
6006            assert!(session.in_transaction());
6007            session.execute("ROLLBACK").unwrap();
6008        }
6009
6010        #[test]
6011        fn test_session_commands_return_empty_result() {
6012            let db = GrafeoDB::new_in_memory();
6013            let session = db.session();
6014
6015            session.execute("CREATE GRAPH test").unwrap();
6016            let result = session.execute("SESSION SET GRAPH test").unwrap();
6017            assert_eq!(result.row_count(), 0);
6018            assert_eq!(result.column_count(), 0);
6019        }
6020
6021        #[test]
6022        fn test_current_graph_default_is_none() {
6023            let db = GrafeoDB::new_in_memory();
6024            let session = db.session();
6025
6026            assert_eq!(session.current_graph(), None);
6027        }
6028
6029        #[test]
6030        fn test_time_zone_default_is_none() {
6031            let db = GrafeoDB::new_in_memory();
6032            let session = db.session();
6033
6034            assert_eq!(session.time_zone(), None);
6035        }
6036
6037        #[test]
6038        fn test_session_state_independent_across_sessions() {
6039            let db = GrafeoDB::new_in_memory();
6040            let session1 = db.session();
6041            let session2 = db.session();
6042
6043            session1.execute("CREATE GRAPH first").unwrap();
6044            session1.execute("CREATE GRAPH second").unwrap();
6045            session1.execute("SESSION SET GRAPH first").unwrap();
6046            session2.execute("SESSION SET GRAPH second").unwrap();
6047
6048            assert_eq!(session1.current_graph(), Some("first".to_string()));
6049            assert_eq!(session2.current_graph(), Some("second".to_string()));
6050        }
6051
6052        #[test]
6053        fn test_show_node_types() {
6054            let db = GrafeoDB::new_in_memory();
6055            let session = db.session();
6056
6057            session
6058                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6059                .unwrap();
6060
6061            let result = session.execute("SHOW NODE TYPES").unwrap();
6062            assert_eq!(
6063                result.columns,
6064                vec!["name", "properties", "constraints", "parents"]
6065            );
6066            assert_eq!(result.rows.len(), 1);
6067            // First column is the type name
6068            assert_eq!(result.rows[0][0], Value::from("Person"));
6069        }
6070
6071        #[test]
6072        fn test_show_edge_types() {
6073            let db = GrafeoDB::new_in_memory();
6074            let session = db.session();
6075
6076            session
6077                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6078                .unwrap();
6079
6080            let result = session.execute("SHOW EDGE TYPES").unwrap();
6081            assert_eq!(
6082                result.columns,
6083                vec!["name", "properties", "source_types", "target_types"]
6084            );
6085            assert_eq!(result.rows.len(), 1);
6086            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6087        }
6088
6089        #[test]
6090        fn test_show_graph_types() {
6091            let db = GrafeoDB::new_in_memory();
6092            let session = db.session();
6093
6094            session
6095                .execute("CREATE NODE TYPE Person (name STRING)")
6096                .unwrap();
6097            session
6098                .execute(
6099                    "CREATE GRAPH TYPE social (\
6100                        NODE TYPE Person (name STRING)\
6101                    )",
6102                )
6103                .unwrap();
6104
6105            let result = session.execute("SHOW GRAPH TYPES").unwrap();
6106            assert_eq!(
6107                result.columns,
6108                vec!["name", "open", "node_types", "edge_types"]
6109            );
6110            assert_eq!(result.rows.len(), 1);
6111            assert_eq!(result.rows[0][0], Value::from("social"));
6112        }
6113
6114        #[test]
6115        fn test_show_graph_type_named() {
6116            let db = GrafeoDB::new_in_memory();
6117            let session = db.session();
6118
6119            session
6120                .execute("CREATE NODE TYPE Person (name STRING)")
6121                .unwrap();
6122            session
6123                .execute(
6124                    "CREATE GRAPH TYPE social (\
6125                        NODE TYPE Person (name STRING)\
6126                    )",
6127                )
6128                .unwrap();
6129
6130            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6131            assert_eq!(result.rows.len(), 1);
6132            assert_eq!(result.rows[0][0], Value::from("social"));
6133        }
6134
6135        #[test]
6136        fn test_show_graph_type_not_found() {
6137            let db = GrafeoDB::new_in_memory();
6138            let session = db.session();
6139
6140            let result = session.execute("SHOW GRAPH TYPE nonexistent");
6141            assert!(result.is_err());
6142        }
6143
6144        #[test]
6145        fn test_show_indexes_via_gql() {
6146            let db = GrafeoDB::new_in_memory();
6147            let session = db.session();
6148
6149            let result = session.execute("SHOW INDEXES").unwrap();
6150            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6151        }
6152
6153        #[test]
6154        fn test_show_constraints_via_gql() {
6155            let db = GrafeoDB::new_in_memory();
6156            let session = db.session();
6157
6158            let result = session.execute("SHOW CONSTRAINTS").unwrap();
6159            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6160        }
6161
6162        #[test]
6163        fn test_pattern_form_graph_type_roundtrip() {
6164            let db = GrafeoDB::new_in_memory();
6165            let session = db.session();
6166
6167            // Register the types first
6168            session
6169                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6170                .unwrap();
6171            session
6172                .execute("CREATE NODE TYPE City (name STRING)")
6173                .unwrap();
6174            session
6175                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6176                .unwrap();
6177            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6178
6179            // Create graph type using pattern form
6180            session
6181                .execute(
6182                    "CREATE GRAPH TYPE social (\
6183                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6184                        (:Person)-[:LIVES_IN]->(:City)\
6185                    )",
6186                )
6187                .unwrap();
6188
6189            // Verify it was created
6190            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6191            assert_eq!(result.rows.len(), 1);
6192            assert_eq!(result.rows[0][0], Value::from("social"));
6193        }
6194    }
6195}